how to investigate skew and DataFrames and RangePartitioner
I have two questions First,I have a failure when I write parquet from Spark 1.6.1 on Amazon EMR to S3. This is full batch, which is over 200GB of source data. The partitioning is based on a geographic identifier we use, and also a date we got the data. However, because of geographical density we certainly could be hitting the fact we are getting tiles too dense. I’m trying to figure out how to figure out the size of the file it’s trying to write out. Second, We use to use RDDs and RangePartitioner for task partitioning. However, I don’t see this available in DataFrames. How does one achieve this now. Peter Halliday - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Error writing parquet to S3
Has anyone else seen this before? Before when I saw this there was an OOM but doesn’t seem so. Of course, I’m not sure how large the file that created this was either. Peter > On Jun 9, 2016, at 9:00 PM, Peter Halliday <pjh...@cornell.edu> wrote: > > I’m not 100% sure why I’m getting this. I don’t see any errors before this > at all. I’m not sure how to diagnose this. > > > Peter Halliday > > > > 2016-06-10 01:46:05,282] WARN org.apache.spark.scheduler.TaskSetManager > [task-result-getter-2hread] - Lost task 3737.0 in stage 2.0 (TID 10585, > ip-172-16-96-32.ec2.internal): org.apache.spark.SparkException: Task failed > while writing rows. > at > org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer.writeRows(WriterContainer.scala:414) > at > org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150) > at > org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) > at org.apache.spark.scheduler.Task.run(Task.scala:89) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.io.IOException: The file being written is in an invalid > state. Probably caused by an error thrown previously. Current state: COLUMN > at > org.apache.parquet.hadoop.ParquetFileWriter$STATE.error(ParquetFileWriter.java:146) > at > org.apache.parquet.hadoop.ParquetFileWriter$STATE.startBlock(ParquetFileWriter.java:138) > at > org.apache.parquet.hadoop.ParquetFileWriter.startBlock(ParquetFileWriter.java:195) > at > org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:153) > at > org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:113) > at > org.apache.parquet.hadoop.ParquetRecordWriter.close(ParquetRecordWriter.java:112) > at > org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.close(ParquetRelation.scala:101) > at > org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer.writeRows(WriterContainer.scala:405) > ... 8 more >
Error writing parquet to S3
I’m not 100% sure why I’m getting this. I don’t see any errors before this at all. I’m not sure how to diagnose this. Peter Halliday 2016-06-10 01:46:05,282] WARN org.apache.spark.scheduler.TaskSetManager [task-result-getter-2hread] - Lost task 3737.0 in stage 2.0 (TID 10585, ip-172-16-96-32.ec2.internal): org.apache.spark.SparkException: Task failed while writing rows. at org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer.writeRows(WriterContainer.scala:414) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.IOException: The file being written is in an invalid state. Probably caused by an error thrown previously. Current state: COLUMN at org.apache.parquet.hadoop.ParquetFileWriter$STATE.error(ParquetFileWriter.java:146) at org.apache.parquet.hadoop.ParquetFileWriter$STATE.startBlock(ParquetFileWriter.java:138) at org.apache.parquet.hadoop.ParquetFileWriter.startBlock(ParquetFileWriter.java:195) at org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:153) at org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:113) at org.apache.parquet.hadoop.ParquetRecordWriter.close(ParquetRecordWriter.java:112) at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.close(ParquetRelation.scala:101) at org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer.writeRows(WriterContainer.scala:405) ... 8 more
UnsupportedOperationException: converting from RDD to DataSets on 1.6.1
I have some code that was producing OOM during shuffle and was RDD. So, upon direction by a member of Databricks I started covering to Datasets. However, when we did we are getting an error that seems to be not liking something within one of our case classes. Peter Halliday [2016-06-08 19:12:22,083] ERROR org.apache.spark.deploy.yarn.ApplicationMaster [Driverhread] - User class threw exception: java.lang.UnsupportedOperationException: No Encoder found for Set[com.wix.accord.Violation] - field (class: "scala.collection.immutable.Set", name: "violations") - root class: "com.here.probe.ingestion.converters.ProbeValidation" java.lang.UnsupportedOperationException: No Encoder found for Set[com.wix.accord.Violation] - field (class: "scala.collection.immutable.Set", name: "violations") - root class: "com.here.probe.ingestion.converters.ProbeValidation" at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$extractorFor(ScalaReflection.scala:594) at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.apply(ScalaReflection.scala:494) at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.apply(ScalaReflection.scala:490) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.immutable.List.foreach(List.scala:318) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$extractorFor(ScalaReflection.scala:490) at org.apache.spark.sql.catalyst.ScalaReflection$.extractorsFor(ScalaReflection.scala:402) at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:54) at org.apache.spark.sql.SQLImplicits.newProductEncoder(SQLImplicits.scala:41) at com.here.probe.ingestion.IngestProbe.processLines(IngestProbe.scala:116) at com.here.probe.ingestion.IngestProbe.processFiles(IngestProbe.scala:86) at com.here.probe.ingestion.IngestProbe$.main(IngestProbe.scala:53) at com.here.probe.ingestion.IngestProbe.main(IngestProbe.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:542) [2016-06-08 19:12:22,086] INFO org.apache.spark.deploy.yarn.ApplicationMaster [Driverhread] - Final app status: FAILED, exitCode: 15, (reason: User class threw exception: java.lang.UnsupportedOperationException: No Encoder found for Set[com.wix.accord.Violation] - field (class: "scala.collection.immutable.Set", name: "violations") - root class: "com.here.probe.ingestion.converters.ProbeValidation”)
Re: EMR Spark log4j and metrics
I wonder if anyone can confirm is Spark on YARN the problem here? Or is it how AWS has put it together? I'm wondering if Spark on YARN has problems with configuration files for the workers and driver? Peter Halliday On Thu, Apr 14, 2016 at 1:09 PM, Peter Halliday <pjh...@cornell.edu> wrote: > An update to this is that I can see the log4j.properties files and the > metrics.properties files correctly on the master. When I submit a Spark > Step that runs Spark in deploy mode of cluster, I see the cluster files > being zipped up and pushed via hdfs to the driver and workers. However, I > don't see evidence than the configuration files are read from or used after > they pushed > > On Wed, Apr 13, 2016 at 11:22 AM, Peter Halliday <pjh...@cornell.edu> > wrote: > >> I have an existing cluster that I stand up via Docker images and >> CloudFormation Templates on AWS. We are moving to EMR and AWS Data >> Pipeline process, and having problems with metrics and log4j. We’ve sent a >> JSON configuration for spark-log4j and spark-metrics. The log4j file seems >> to be basically working for the master. However, the driver and executors >> it isn’t working for. I’m not sure why. Also, the metrics aren’t working >> anywhere. It’s using a cloud watch to log the metrics, and there’s no >> CloudWatch Sink for Spark it seems on EMR, and so we created one that we >> added to a jar than’s sent via —jars to spark-submit. >> >> Peter Halliday > > >
Re: EMR Spark log4j and metrics
An update to this is that I can see the log4j.properties files and the metrics.properties files correctly on the master. When I submit a Spark Step that runs Spark in deploy mode of cluster, I see the cluster files being zipped up and pushed via hdfs to the driver and workers. However, I don't see evidence than the configuration files are read from or used after they pushed On Wed, Apr 13, 2016 at 11:22 AM, Peter Halliday <pjh...@cornell.edu> wrote: > I have an existing cluster that I stand up via Docker images and > CloudFormation Templates on AWS. We are moving to EMR and AWS Data > Pipeline process, and having problems with metrics and log4j. We’ve sent a > JSON configuration for spark-log4j and spark-metrics. The log4j file seems > to be basically working for the master. However, the driver and executors > it isn’t working for. I’m not sure why. Also, the metrics aren’t working > anywhere. It’s using a cloud watch to log the metrics, and there’s no > CloudWatch Sink for Spark it seems on EMR, and so we created one that we > added to a jar than’s sent via —jars to spark-submit. > > Peter Halliday
EMR Spark log4j and metrics
I have an existing cluster that I stand up via Docker images and CloudFormation Templates on AWS. We are moving to EMR and AWS Data Pipeline process, and having problems with metrics and log4j. We’ve sent a JSON configuration for spark-log4j and spark-metrics. The log4j file seems to be basically working for the master. However, the driver and executors it isn’t working for. I’m not sure why. Also, the metrics aren’t working anywhere. It’s using a cloud watch to log the metrics, and there’s no CloudWatch Sink for Spark it seems on EMR, and so we created one that we added to a jar than’s sent via —jars to spark-submit. Peter Halliday - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
FileAlreadyExistsException and Streaming context
I’m getting a FileAlreadyExistsException. I’ve tired setting the save to SaveMode.Overwrite, and setting spark.hadooop.validateOutputSpecs to false. However, I am wonder if these settings are being ignored, because I’m using Spark Streaming. We aren’t using checkpointing though. Here’s the stack trace: http://pastebin.com/AqBFXkga <http://pastebin.com/AqBFXkga> Peter Halliday
Re: Get rid of FileAlreadyExistsError
I haven’t trie spark.hadoop.validateOutputSpecs. However, it seems that has to do with the existence of the output directory itself and not the files. Maybe I’m wrong? Peter > On Mar 1, 2016, at 11:53 AM, Sabarish Sasidharan > <sabarish.sasidha...@manthan.com> wrote: > > Have you tried spark.hadoop.validateOutputSpecs? > > On 01-Mar-2016 9:43 pm, "Peter Halliday" <pjh...@cornell.edu > <mailto:pjh...@cornell.edu>> wrote: > http://pastebin.com/vbbFzyzb <http://pastebin.com/vbbFzyzb> > > The problem seems to be to be two fold. First, the ParquetFileWriter in > Hadoop allows for an overwrite flag that Spark doesn’t allow to be set. The > second is that the DirectParquetOutputCommitter has an abortTask that’s > empty. I see SPARK-8413 open on this too, but no plans on changing this. > I’m surprised not to see this fixed yet. > > Peter Halliday > > > >> On Mar 1, 2016, at 10:01 AM, Ted Yu <yuzhih...@gmail.com >> <mailto:yuzhih...@gmail.com>> wrote: >> >> Do you mind pastebin'ning the stack trace with the error so that we know >> which part of the code is under discussion ? >> >> Thanks >> >> On Tue, Mar 1, 2016 at 7:48 AM, Peter Halliday <pjh...@cornell.edu >> <mailto:pjh...@cornell.edu>> wrote: >> I have a Spark application that has a Task seem to fail, but it actually did >> write out some of the files that were assigned it. And Spark assigns >> another executor that task, and it gets a FileAlreadyExistsException. The >> Hadoop code seems to allow for files to be overwritten, but I see the 1.5.1 >> version of this code doesn’t allow for this to be passed in. Is that >> correct? >> >> Peter Halliday >> - >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> <mailto:user-unsubscr...@spark.apache.org> >> For additional commands, e-mail: user-h...@spark.apache.org >> <mailto:user-h...@spark.apache.org> >> >> >
Re: Get rid of FileAlreadyExistsError
http://pastebin.com/vbbFzyzb The problem seems to be to be two fold. First, the ParquetFileWriter in Hadoop allows for an overwrite flag that Spark doesn’t allow to be set. The second is that the DirectParquetOutputCommitter has an abortTask that’s empty. I see SPARK-8413 open on this too, but no plans on changing this. I’m surprised not to see this fixed yet. Peter Halliday > On Mar 1, 2016, at 10:01 AM, Ted Yu <yuzhih...@gmail.com> wrote: > > Do you mind pastebin'ning the stack trace with the error so that we know > which part of the code is under discussion ? > > Thanks > > On Tue, Mar 1, 2016 at 7:48 AM, Peter Halliday <pjh...@cornell.edu > <mailto:pjh...@cornell.edu>> wrote: > I have a Spark application that has a Task seem to fail, but it actually did > write out some of the files that were assigned it. And Spark assigns another > executor that task, and it gets a FileAlreadyExistsException. The Hadoop > code seems to allow for files to be overwritten, but I see the 1.5.1 version > of this code doesn’t allow for this to be passed in. Is that correct? > > Peter Halliday > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > <mailto:user-unsubscr...@spark.apache.org> > For additional commands, e-mail: user-h...@spark.apache.org > <mailto:user-h...@spark.apache.org> > >
Get rid of FileAlreadyExistsError
I have a Spark application that has a Task seem to fail, but it actually did write out some of the files that were assigned it. And Spark assigns another executor that task, and it gets a FileAlreadyExistsException. The Hadoop code seems to allow for files to be overwritten, but I see the 1.5.1 version of this code doesn’t allow for this to be passed in. Is that correct? Peter Halliday - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org