[ 
https://issues.apache.org/jira/browse/SPARK-18402?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15657769#comment-15657769
 ] 

Steve Loughran commented on SPARK-18402:
----------------------------------------

I've seen this before, somewhere. it's usually a transient problem in which not 
all the data came back —or so much did that something got confused. To make 
matters worse, this wasn't the write: it was in a cleanup phase afterwards in 
which there's a scan for a fake parent directory marker, deleting it if its 
there.

The good news, HADOOP-13164 (coming in Hadoop 2.8) fixes this problem; not only 
does it do a faster cleanup, it ignores any errors received on the basis that 
it's not important enough to break your job.

Even so, an interesting issue: I'd like to know more about it and see how to 
make it go away properly. That's before you go talk to AWS. If you are seeing 
it repeatedly, I'd very much suspect something client-side rather than in AWS 
itself. Which points the blame a into the org.apache.hadoop codebase before 
going near the com.amazon lines.

# I've just created HADOOP-13811 with the stack trace of mine that showed this; 
I'll tag this JIRA as a duplicate and close it. Interestingly, that stack trace 
came up during a run of the s3 streaming integration test I'd done in 
SPARK-7481; there I hypothesised it was some race condition in shutdown: 
threads were being interrupted, but the XML parser wasn't handling/reporting 
them properly. I've seen the odd XML parser error in HADOOP-13560, but that's 
what happens when you do a 6+GB PUT over a long-haul link.
# get involved on that JIRA and that I'll see if we can do some more 
diagnostics. in particular. if we could bump you up to some 2.8 JARs, we have 
both the better code and the metrics and monitoring to see WTF is going on. 


> spark: SAXParseException while writing from json to parquet on s3
> -----------------------------------------------------------------
>
>                 Key: SPARK-18402
>                 URL: https://issues.apache.org/jira/browse/SPARK-18402
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core, Spark Submit
>    Affects Versions: 1.6.2, 2.0.1
>         Environment: spark 2.0.1 hadoop 2.7.1
> hadoop aws 2.7.1
> ubuntu 14.04.5 on aws
> mesos 1.0.1
> Java 1.7.0_111, openjdk
>            Reporter: Luke Miner
>
> I'm trying to read in some json, infer a schema, and write it out again as 
> parquet to s3 (s3a). For some reason, about a third of the way through the 
> writing portion of the run, spark always errors out with the error included 
> below. 
> I can't find any obvious reasons for the issue:
> - it isn't out of memory and I have tried increasing the overhead memory
> - there are no long GC pauses.
> - There don't seem to be any additional error messages in the logs of the 
> individual executors.
> - This does not appear to be a problem with badly formed json or corrupted 
> files. I have unzipped and read in each file individually with no error.
> The script runs fine on another set of data that I have, which is of a very 
> similar structure, but several orders of magnitude smaller.
> I am using the FileOutputCommitter. The algorithm version doesn't seem to 
> matter.
> Here's a simplified version of the script:
> {code}
>     object Foo {
>       def parseJson(json: String): Option[Map[String, Any]] = {
>         if (json == null)
>           Some(Map())
>         else
>           parseOpt(json).map((j: JValue) => j.values.asInstanceOf[Map[String, 
> Any]])
>           }
>       }
>     }
>     // read in as text and parse json using json4s
>     val jsonRDD: RDD[String] = sc.textFile(inputPath)
>         .map(row -> Foo.parseJson(row))
>     // infer a schema that will encapsulate the most rows in a sample of size 
> sampleRowNum
>     val schema: StructType = Infer.getMostCommonSchema(sc, jsonRDD, 
> sampleRowNum)
>     // get documents compatibility with schema
>     val jsonWithCompatibilityRDD: RDD[(String, Boolean)] = jsonRDD
>       .map(js => (js, Infer.getSchemaCompatibility(schema, 
> Infer.inferSchema(js)).toBoolean))
>       .repartition(partitions)
>     val jsonCompatibleRDD: RDD[String] = jsonWithCompatibilityRDD
>       .filter { case (js: String, compatible: Boolean) => compatible }
>       .map { case (js: String, _: Boolean) => js }
>     // create a dataframe from documents with compatible schema
>     val dataFrame: DataFrame = 
> spark.read.schema(schema).json(jsonCompatibleRDD)
>     dataFrame.write.parquet("s3a://foo/foo")
> {code}
> It completes the earlier schema inferring steps successfully. The error 
> itself occurs on the last line, but I suppose that could encompass at least 
> the immediately preceding statement, if not earlier:
> {code}
>     org.apache.spark.SparkException: Task failed while writing rows
>         at 
> org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:261)
>         at 
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
>         at 
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
>         at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
>         at org.apache.spark.scheduler.Task.run(Task.scala:86)
>         at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>         at java.lang.Thread.run(Thread.java:745)
>     Caused by: java.lang.RuntimeException: Failed to commit task
>         at 
> org.apache.spark.sql.execution.datasources.DefaultWriterContainer.org$apache$spark$sql$execution$datasources$DefaultWriterContainer$$commitTask$1(WriterContainer.scala:275)
>         at 
> org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply$mcV$sp(WriterContainer.scala:257)
>         at 
> org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply(WriterContainer.scala:252)
>         at 
> org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply(WriterContainer.scala:252)
>         at 
> org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1345)
>         at 
> org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:258)
>         ... 8 more
>         Suppressed: java.lang.NullPointerException
>             at 
> org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:147)
>             at 
> org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:113)
>             at 
> org.apache.parquet.hadoop.ParquetRecordWriter.close(ParquetRecordWriter.java:112)
>             at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.close(ParquetFileFormat.scala:569)
>             at 
> org.apache.spark.sql.execution.datasources.DefaultWriterContainer.org$apache$spark$sql$execution$datasources$DefaultWriterContainer$$abortTask$1(WriterContainer.scala:282)
>             at 
> org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$2.apply$mcV$sp(WriterContainer.scala:258)
>             at 
> org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1354)
>             ... 9 more
>     Caused by: com.amazonaws.AmazonClientException: Unable to unmarshall 
> response (Failed to parse XML document with handler class 
> com.amazonaws.services.s3.model.transform.XmlResponsesSaxParser$ListBucketHandler).
>  Response Code: 200, Response Text: OK
>         at 
> com.amazonaws.http.AmazonHttpClient.handleResponse(AmazonHttpClient.java:738)
>         at 
> com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:399)
>         at 
> com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:232)
>         at 
> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3528)
>         at 
> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3480)
>         at 
> com.amazonaws.services.s3.AmazonS3Client.listObjects(AmazonS3Client.java:604)
>         at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:962)
>         at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.deleteUnnecessaryFakeDirectories(S3AFileSystem.java:1147)
>         at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.finishedWrite(S3AFileSystem.java:1136)
>         at 
> org.apache.hadoop.fs.s3a.S3AOutputStream.close(S3AOutputStream.java:142)
>         at 
> org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
>         at 
> org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
>         at 
> org.apache.parquet.hadoop.ParquetFileWriter.end(ParquetFileWriter.java:400)
>         at 
> org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:117)
>         at 
> org.apache.parquet.hadoop.ParquetRecordWriter.close(ParquetRecordWriter.java:112)
>         at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.close(ParquetFileFormat.scala:569)
>         at 
> org.apache.spark.sql.execution.datasources.DefaultWriterContainer.org$apache$spark$sql$execution$datasources$DefaultWriterContainer$$commitTask$1(WriterContainer.scala:267)
>         ... 13 more
>     Caused by: com.amazonaws.AmazonClientException: Failed to parse XML 
> document with handler class 
> com.amazonaws.services.s3.model.transform.XmlResponsesSaxParser$ListBucketHandler
>         at 
> com.amazonaws.services.s3.model.transform.XmlResponsesSaxParser.parseXmlInputStream(XmlResponsesSaxParser.java:150)
>         at 
> com.amazonaws.services.s3.model.transform.XmlResponsesSaxParser.parseListBucketObjectsResponse(XmlResponsesSaxParser.java:279)
>         at 
> com.amazonaws.services.s3.model.transform.Unmarshallers$ListObjectsUnmarshaller.unmarshall(Unmarshallers.java:75)
>         at 
> com.amazonaws.services.s3.model.transform.Unmarshallers$ListObjectsUnmarshaller.unmarshall(Unmarshallers.java:72)
>         at 
> com.amazonaws.services.s3.internal.S3XmlResponseHandler.handle(S3XmlResponseHandler.java:62)
>         at 
> com.amazonaws.services.s3.internal.S3XmlResponseHandler.handle(S3XmlResponseHandler.java:31)
>         at 
> com.amazonaws.http.AmazonHttpClient.handleResponse(AmazonHttpClient.java:712)
>         ... 29 more
>     Caused by: org.xml.sax.SAXParseException; lineNumber: 1; columnNumber: 2; 
> XML document structures must start and end within the same entity.
>         at 
> org.apache.xerces.util.ErrorHandlerWrapper.createSAXParseException(Unknown 
> Source)
>         at org.apache.xerces.util.ErrorHandlerWrapper.fatalError(Unknown 
> Source)
>         at org.apache.xerces.impl.XMLErrorReporter.reportError(Unknown Source)
>         at org.apache.xerces.impl.XMLErrorReporter.reportError(Unknown Source)
>         at org.apache.xerces.impl.XMLErrorReporter.reportError(Unknown Source)
>         at org.apache.xerces.impl.XMLScanner.reportFatalError(Unknown Source)
>         at 
> org.apache.xerces.impl.XMLDocumentFragmentScannerImpl.endEntity(Unknown 
> Source)
>         at org.apache.xerces.impl.XMLDocumentScannerImpl.endEntity(Unknown 
> Source)
>         at org.apache.xerces.impl.XMLEntityManager.endEntity(Unknown Source)
>         at org.apache.xerces.impl.XMLEntityScanner.load(Unknown Source)
>         at org.apache.xerces.impl.XMLEntityScanner.skipChar(Unknown Source)
>         at 
> org.apache.xerces.impl.XMLDocumentScannerImpl$PrologDispatcher.dispatch(Unknown
>  Source)
>         at 
> org.apache.xerces.impl.XMLDocumentFragmentScannerImpl.scanDocument(Unknown 
> Source)
>         at org.apache.xerces.parsers.XML11Configuration.parse(Unknown Source)
>         at org.apache.xerces.parsers.XML11Configuration.parse(Unknown Source)
>         at org.apache.xerces.parsers.XMLParser.parse(Unknown Source)
>         at org.apache.xerces.parsers.AbstractSAXParser.parse(Unknown Source)
>         at 
> com.amazonaws.services.s3.model.transform.XmlResponsesSaxParser.parseXmlInputStream(XmlResponsesSaxParser.java:141)
>         ... 35 more
> {code}
> Here's my conf:
> {code}
> spark.executor.extraJavaOptions -XX:+UseG1GC -XX:MaxPermSize=1G 
> -XX:+HeapDumpOnOutOfMemoryError
> spark.executor.memory   16G
> spark.executor.uri  https://foo/spark-2.0.1-bin-hadoop2.7.tgz
> spark.hadoop.fs.s3a.impl org.apache.hadoop.fs.s3a.S3AFileSystem
> spark.hadoop.fs.s3a.buffer.dir  /raid0/spark
> spark.hadoop.fs.s3n.buffer.dir  /raid0/spark
> spark.hadoop.fs.s3a.connection.timeout 500000
> spark.hadoop.fs.s3n.multipart.uploads.enabled   true
> spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version 2
> spark.hadoop.parquet.block.size 2147483648
> spark.hadoop.parquet.enable.summary-metadata    false
> spark.jars.packages 
> com.databricks:spark-avro_2.11:3.0.1,com.amazonaws:aws-java-sdk-pom:1.10.34
> spark.local.dir /raid0/spark
> spark.mesos.executor.memoryOverhead 4000
> spark.mesos.coarse  false
> spark.mesos.constraints  priority:1
> spark.network.timeout   600
> spark.rpc.message.maxSize    500
> spark.speculation   false
> spark.sql.parquet.mergeSchema   false
> spark.sql.planner.externalSort  true
> spark.submit.deployMode client
> spark.task.cpus 1
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to