[
https://issues.apache.org/jira/browse/SPARK-18402?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15657769#comment-15657769
]
Steve Loughran commented on SPARK-18402:
----------------------------------------
I've seen this before, somewhere. it's usually a transient problem in which not
all the data came back —or so much did that something got confused. To make
matters worse, this wasn't the write: it was in a cleanup phase afterwards in
which there's a scan for a fake parent directory marker, deleting it if its
there.
The good news, HADOOP-13164 (coming in Hadoop 2.8) fixes this problem; not only
does it do a faster cleanup, it ignores any errors received on the basis that
it's not important enough to break your job.
Even so, an interesting issue: I'd like to know more about it and see how to
make it go away properly. That's before you go talk to AWS. If you are seeing
it repeatedly, I'd very much suspect something client-side rather than in AWS
itself. Which points the blame a into the org.apache.hadoop codebase before
going near the com.amazon lines.
# I've just created HADOOP-13811 with the stack trace of mine that showed this;
I'll tag this JIRA as a duplicate and close it. Interestingly, that stack trace
came up during a run of the s3 streaming integration test I'd done in
SPARK-7481; there I hypothesised it was some race condition in shutdown:
threads were being interrupted, but the XML parser wasn't handling/reporting
them properly. I've seen the odd XML parser error in HADOOP-13560, but that's
what happens when you do a 6+GB PUT over a long-haul link.
# get involved on that JIRA and that I'll see if we can do some more
diagnostics. in particular. if we could bump you up to some 2.8 JARs, we have
both the better code and the metrics and monitoring to see WTF is going on.
> spark: SAXParseException while writing from json to parquet on s3
> -----------------------------------------------------------------
>
> Key: SPARK-18402
> URL: https://issues.apache.org/jira/browse/SPARK-18402
> Project: Spark
> Issue Type: Bug
> Components: Spark Core, Spark Submit
> Affects Versions: 1.6.2, 2.0.1
> Environment: spark 2.0.1 hadoop 2.7.1
> hadoop aws 2.7.1
> ubuntu 14.04.5 on aws
> mesos 1.0.1
> Java 1.7.0_111, openjdk
> Reporter: Luke Miner
>
> I'm trying to read in some json, infer a schema, and write it out again as
> parquet to s3 (s3a). For some reason, about a third of the way through the
> writing portion of the run, spark always errors out with the error included
> below.
> I can't find any obvious reasons for the issue:
> - it isn't out of memory and I have tried increasing the overhead memory
> - there are no long GC pauses.
> - There don't seem to be any additional error messages in the logs of the
> individual executors.
> - This does not appear to be a problem with badly formed json or corrupted
> files. I have unzipped and read in each file individually with no error.
> The script runs fine on another set of data that I have, which is of a very
> similar structure, but several orders of magnitude smaller.
> I am using the FileOutputCommitter. The algorithm version doesn't seem to
> matter.
> Here's a simplified version of the script:
> {code}
> object Foo {
> def parseJson(json: String): Option[Map[String, Any]] = {
> if (json == null)
> Some(Map())
> else
> parseOpt(json).map((j: JValue) => j.values.asInstanceOf[Map[String,
> Any]])
> }
> }
> }
> // read in as text and parse json using json4s
> val jsonRDD: RDD[String] = sc.textFile(inputPath)
> .map(row -> Foo.parseJson(row))
> // infer a schema that will encapsulate the most rows in a sample of size
> sampleRowNum
> val schema: StructType = Infer.getMostCommonSchema(sc, jsonRDD,
> sampleRowNum)
> // get documents compatibility with schema
> val jsonWithCompatibilityRDD: RDD[(String, Boolean)] = jsonRDD
> .map(js => (js, Infer.getSchemaCompatibility(schema,
> Infer.inferSchema(js)).toBoolean))
> .repartition(partitions)
> val jsonCompatibleRDD: RDD[String] = jsonWithCompatibilityRDD
> .filter { case (js: String, compatible: Boolean) => compatible }
> .map { case (js: String, _: Boolean) => js }
> // create a dataframe from documents with compatible schema
> val dataFrame: DataFrame =
> spark.read.schema(schema).json(jsonCompatibleRDD)
> dataFrame.write.parquet("s3a://foo/foo")
> {code}
> It completes the earlier schema inferring steps successfully. The error
> itself occurs on the last line, but I suppose that could encompass at least
> the immediately preceding statement, if not earlier:
> {code}
> org.apache.spark.SparkException: Task failed while writing rows
> at
> org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:261)
> at
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
> at
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
> at org.apache.spark.scheduler.Task.run(Task.scala:86)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.RuntimeException: Failed to commit task
> at
> org.apache.spark.sql.execution.datasources.DefaultWriterContainer.org$apache$spark$sql$execution$datasources$DefaultWriterContainer$$commitTask$1(WriterContainer.scala:275)
> at
> org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply$mcV$sp(WriterContainer.scala:257)
> at
> org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply(WriterContainer.scala:252)
> at
> org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply(WriterContainer.scala:252)
> at
> org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1345)
> at
> org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:258)
> ... 8 more
> Suppressed: java.lang.NullPointerException
> at
> org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:147)
> at
> org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:113)
> at
> org.apache.parquet.hadoop.ParquetRecordWriter.close(ParquetRecordWriter.java:112)
> at
> org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.close(ParquetFileFormat.scala:569)
> at
> org.apache.spark.sql.execution.datasources.DefaultWriterContainer.org$apache$spark$sql$execution$datasources$DefaultWriterContainer$$abortTask$1(WriterContainer.scala:282)
> at
> org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$2.apply$mcV$sp(WriterContainer.scala:258)
> at
> org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1354)
> ... 9 more
> Caused by: com.amazonaws.AmazonClientException: Unable to unmarshall
> response (Failed to parse XML document with handler class
> com.amazonaws.services.s3.model.transform.XmlResponsesSaxParser$ListBucketHandler).
> Response Code: 200, Response Text: OK
> at
> com.amazonaws.http.AmazonHttpClient.handleResponse(AmazonHttpClient.java:738)
> at
> com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:399)
> at
> com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:232)
> at
> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3528)
> at
> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3480)
> at
> com.amazonaws.services.s3.AmazonS3Client.listObjects(AmazonS3Client.java:604)
> at
> org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:962)
> at
> org.apache.hadoop.fs.s3a.S3AFileSystem.deleteUnnecessaryFakeDirectories(S3AFileSystem.java:1147)
> at
> org.apache.hadoop.fs.s3a.S3AFileSystem.finishedWrite(S3AFileSystem.java:1136)
> at
> org.apache.hadoop.fs.s3a.S3AOutputStream.close(S3AOutputStream.java:142)
> at
> org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
> at
> org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
> at
> org.apache.parquet.hadoop.ParquetFileWriter.end(ParquetFileWriter.java:400)
> at
> org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:117)
> at
> org.apache.parquet.hadoop.ParquetRecordWriter.close(ParquetRecordWriter.java:112)
> at
> org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.close(ParquetFileFormat.scala:569)
> at
> org.apache.spark.sql.execution.datasources.DefaultWriterContainer.org$apache$spark$sql$execution$datasources$DefaultWriterContainer$$commitTask$1(WriterContainer.scala:267)
> ... 13 more
> Caused by: com.amazonaws.AmazonClientException: Failed to parse XML
> document with handler class
> com.amazonaws.services.s3.model.transform.XmlResponsesSaxParser$ListBucketHandler
> at
> com.amazonaws.services.s3.model.transform.XmlResponsesSaxParser.parseXmlInputStream(XmlResponsesSaxParser.java:150)
> at
> com.amazonaws.services.s3.model.transform.XmlResponsesSaxParser.parseListBucketObjectsResponse(XmlResponsesSaxParser.java:279)
> at
> com.amazonaws.services.s3.model.transform.Unmarshallers$ListObjectsUnmarshaller.unmarshall(Unmarshallers.java:75)
> at
> com.amazonaws.services.s3.model.transform.Unmarshallers$ListObjectsUnmarshaller.unmarshall(Unmarshallers.java:72)
> at
> com.amazonaws.services.s3.internal.S3XmlResponseHandler.handle(S3XmlResponseHandler.java:62)
> at
> com.amazonaws.services.s3.internal.S3XmlResponseHandler.handle(S3XmlResponseHandler.java:31)
> at
> com.amazonaws.http.AmazonHttpClient.handleResponse(AmazonHttpClient.java:712)
> ... 29 more
> Caused by: org.xml.sax.SAXParseException; lineNumber: 1; columnNumber: 2;
> XML document structures must start and end within the same entity.
> at
> org.apache.xerces.util.ErrorHandlerWrapper.createSAXParseException(Unknown
> Source)
> at org.apache.xerces.util.ErrorHandlerWrapper.fatalError(Unknown
> Source)
> at org.apache.xerces.impl.XMLErrorReporter.reportError(Unknown Source)
> at org.apache.xerces.impl.XMLErrorReporter.reportError(Unknown Source)
> at org.apache.xerces.impl.XMLErrorReporter.reportError(Unknown Source)
> at org.apache.xerces.impl.XMLScanner.reportFatalError(Unknown Source)
> at
> org.apache.xerces.impl.XMLDocumentFragmentScannerImpl.endEntity(Unknown
> Source)
> at org.apache.xerces.impl.XMLDocumentScannerImpl.endEntity(Unknown
> Source)
> at org.apache.xerces.impl.XMLEntityManager.endEntity(Unknown Source)
> at org.apache.xerces.impl.XMLEntityScanner.load(Unknown Source)
> at org.apache.xerces.impl.XMLEntityScanner.skipChar(Unknown Source)
> at
> org.apache.xerces.impl.XMLDocumentScannerImpl$PrologDispatcher.dispatch(Unknown
> Source)
> at
> org.apache.xerces.impl.XMLDocumentFragmentScannerImpl.scanDocument(Unknown
> Source)
> at org.apache.xerces.parsers.XML11Configuration.parse(Unknown Source)
> at org.apache.xerces.parsers.XML11Configuration.parse(Unknown Source)
> at org.apache.xerces.parsers.XMLParser.parse(Unknown Source)
> at org.apache.xerces.parsers.AbstractSAXParser.parse(Unknown Source)
> at
> com.amazonaws.services.s3.model.transform.XmlResponsesSaxParser.parseXmlInputStream(XmlResponsesSaxParser.java:141)
> ... 35 more
> {code}
> Here's my conf:
> {code}
> spark.executor.extraJavaOptions -XX:+UseG1GC -XX:MaxPermSize=1G
> -XX:+HeapDumpOnOutOfMemoryError
> spark.executor.memory 16G
> spark.executor.uri https://foo/spark-2.0.1-bin-hadoop2.7.tgz
> spark.hadoop.fs.s3a.impl org.apache.hadoop.fs.s3a.S3AFileSystem
> spark.hadoop.fs.s3a.buffer.dir /raid0/spark
> spark.hadoop.fs.s3n.buffer.dir /raid0/spark
> spark.hadoop.fs.s3a.connection.timeout 500000
> spark.hadoop.fs.s3n.multipart.uploads.enabled true
> spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version 2
> spark.hadoop.parquet.block.size 2147483648
> spark.hadoop.parquet.enable.summary-metadata false
> spark.jars.packages
> com.databricks:spark-avro_2.11:3.0.1,com.amazonaws:aws-java-sdk-pom:1.10.34
> spark.local.dir /raid0/spark
> spark.mesos.executor.memoryOverhead 4000
> spark.mesos.coarse false
> spark.mesos.constraints priority:1
> spark.network.timeout 600
> spark.rpc.message.maxSize 500
> spark.speculation false
> spark.sql.parquet.mergeSchema false
> spark.sql.planner.externalSort true
> spark.submit.deployMode client
> spark.task.cpus 1
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]