[
https://issues.apache.org/jira/browse/SPARK-10109?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15048821#comment-15048821
]
Paul Zaczkieiwcz commented on SPARK-10109:
------------------------------------------
Still seeing this in 1.5.1. My use case is the same. I've got several
independent spark jobs downloading data and appending to the same partitioned
parquet directory in HDFS. None of the partitions overlap but the _temporary
folder has become a bottleneck, only allowing one job to append at a time.
> NPE when saving Parquet To HDFS
> -------------------------------
>
> Key: SPARK-10109
> URL: https://issues.apache.org/jira/browse/SPARK-10109
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 1.4.1
> Environment: Sparc-ec2, standalone cluster on amazon
> Reporter: Virgil Palanciuc
>
> Very simple code, trying to save a dataframe
> I get this in the driver
> {quote}
> 15/08/19 11:21:41 INFO TaskSetManager: Lost task 9.2 in stage 217.0 (TID
> 4748) on executor 172.xx.xx.xx: java.lang.NullPointerException (null)
> and (not for that task):
> 15/08/19 11:21:46 WARN TaskSetManager: Lost task 5.0 in stage 543.0 (TID
> 5607, 172.yy.yy.yy): java.lang.NullPointerException
> at
> parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:146)
> at
> parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:112)
> at
> parquet.hadoop.ParquetRecordWriter.close(ParquetRecordWriter.java:73)
> at
> org.apache.spark.sql.parquet.ParquetOutputWriter.close(newParquet.scala:88)
> at
> org.apache.spark.sql.sources.DynamicPartitionWriterContainer$$anonfun$clearOutputWriters$1.apply(commands.scala:536)
> at
> org.apache.spark.sql.sources.DynamicPartitionWriterContainer$$anonfun$clearOutputWriters$1.apply(commands.scala:536)
> at
> scala.collection.mutable.HashMap$$anon$2$$anonfun$foreach$3.apply(HashMap.scala:107)
> at
> scala.collection.mutable.HashMap$$anon$2$$anonfun$foreach$3.apply(HashMap.scala:107)
> at
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
> at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
> at scala.collection.mutable.HashMap$$anon$2.foreach(HashMap.scala:107)
> at
> org.apache.spark.sql.sources.DynamicPartitionWriterContainer.clearOutputWriters(commands.scala:536)
> at
> org.apache.spark.sql.sources.DynamicPartitionWriterContainer.abortTask(commands.scala:552)
> at
> org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org$apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$2(commands.scala:269)
> at
> org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insertWithDynamicPartitions$3.apply(commands.scala:229)
> at
> org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insertWithDynamicPartitions$3.apply(commands.scala:229)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
> at org.apache.spark.scheduler.Task.run(Task.scala:70)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> {quote}
> I get this in the executor log:
> {quote}
> 15/08/19 11:21:41 WARN DFSClient: DataStreamer Exception
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException):
> No lease on
> /gglogs/2015-07-27/_temporary/_attempt_201508191119_0217_m_000009_2/dpid=18432/pid=1109/part-r-00009-46ac3a79-a95c-4d9c-a2f1-b3ee76f6a46c.snappy.parquet
> File does not exist. Holder DFSClient_NONMAPREDUCE_1730998114_63 does not
> have any open files.
> at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:2396)
> at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:2387)
> at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:2183)
> at
> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:481)
> at
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:297)
> at
> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java:44080)
> at
> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:453)
> at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1002)
> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1695)
> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1691)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:415)
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1408)
> at org.apache.hadoop.ipc.Server$Handler.run(Server.java:1689)
> at org.apache.hadoop.ipc.Client.call(Client.java:1225)
> at
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:202)
> at com.sun.proxy.$Proxy14.addBlock(Unknown Source)
> at sun.reflect.GeneratedMethodAccessor33.invoke(Unknown Source)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:164)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:83)
> at com.sun.proxy.$Proxy14.addBlock(Unknown Source)
> at
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.addBlock(ClientNamenodeProtocolTranslatorPB.java:290)
> at
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.locateFollowingBlock(DFSOutputStream.java:1150)
> at
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1003)
> at
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:463)
> 15/08/19 11:21:41 ERROR InsertIntoHadoopFsRelation: Aborting task.
> java.lang.RuntimeException: Failed to commit task
> at
> org.apache.spark.sql.sources.DynamicPartitionWriterContainer.commitTask(commands.scala:546)
> at
> org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org$apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$2(commands.scala:266)
> at
> org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insertWithDynamicPartitions$3.apply(commands.scala:229)
> at
> org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insertWithDynamicPartitions$3.apply(commands.scala:229)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
> at org.apache.spark.scheduler.Task.run(Task.scala:70)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> Caused by:
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException):
> No lease on
> /gglogs/2015-07-27/_temporary/_attempt_201508191119_0217_m_000009_2/dpid=18432/pid=1109/part-r-00009-46ac3a79-a95c-4d9c-a2f1-b3ee76f6a46c.snappy.parquet
> File does not exist. Holder DFSClient_NONMAPREDUCE_1730998114_63 does not
> have any open files.
> at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:2396)
> at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:2387)
> at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:2183)
> at
> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:481)
> at
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:297)
> at
> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java:44080)
> at
> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:453)
> at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1002)
> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1695)
> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1691)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:415)
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1408)
> at org.apache.hadoop.ipc.Server$Handler.run(Server.java:1689)
> at org.apache.hadoop.ipc.Client.call(Client.java:1225)
> at
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:202)
> at com.sun.proxy.$Proxy14.addBlock(Unknown Source)
> at sun.reflect.GeneratedMethodAccessor33.invoke(Unknown Source)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:164)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:83)
> at com.sun.proxy.$Proxy14.addBlock(Unknown Source)
> at
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.addBlock(ClientNamenodeProtocolTranslatorPB.java:290)
> at
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.locateFollowingBlock(DFSOutputStream.java:1150)
> at
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1003)
> at
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:463)
> 15/08/19 11:21:41 ERROR DynamicPartitionWriterContainer: Task attempt
> attempt_201508191119_0217_m_000009_2 aborted.
> 15/08/19 11:21:41 ERROR Executor: Exception in task 9.2 in stage 217.0 (TID
> 4748)
> java.lang.NullPointerException
> at
> parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:146)
> at
> parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:112)
> at parquet.hadoop.ParquetRecordWriter.close(ParquetRecordWriter.java:73)
> at
> org.apache.spark.sql.parquet.ParquetOutputWriter.close(newParquet.scala:88)
> at
> org.apache.spark.sql.sources.DynamicPartitionWriterContainer$$anonfun$clearOutputWriters$1.apply(commands.scala:536)
> at
> org.apache.spark.sql.sources.DynamicPartitionWriterContainer$$anonfun$clearOutputWriters$1.apply(commands.scala:536)
> at
> scala.collection.mutable.HashMap$$anon$2$$anonfun$foreach$3.apply(HashMap.scala:107)
> at
> scala.collection.mutable.HashMap$$anon$2$$anonfun$foreach$3.apply(HashMap.scala:107)
> at
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
> at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
> at scala.collection.mutable.HashMap$$anon$2.foreach(HashMap.scala:107)
> at
> org.apache.spark.sql.sources.DynamicPartitionWriterContainer.clearOutputWriters(commands.scala:536)
> at
> org.apache.spark.sql.sources.DynamicPartitionWriterContainer.abortTask(commands.scala:552)
> at
> org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org$apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$2(commands.scala:269)
> at
> org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insertWithDynamicPartitions$3.apply(commands.scala:229)
> at
> org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insertWithDynamicPartitions$3.apply(commands.scala:229)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
> at org.apache.spark.scheduler.Task.run(Task.scala:70)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> {quote}
> this is the code that I'm using:
> {quote}
> val ppRDD=
> sparkContext
> .sequenceFile[Array[Byte], String](inputPath)
> .values
> .repartition(numPartitions)
> .filter( <criteria>)
> .flatMap(line => parseGGLogLine(<params; including 2
> broadcasted variables, one a "Set" and one a "Map">)
> if(ppRDD.isEmpty())
> logInfo(s"<message>")
> else
> ppRDD.toDF().write
> .partitionBy("dpid","pid")
> .mode( SaveMode.Append )
> .parquet(outputPath)
> {quote}
> possibly relevant configuration:
> {quote}
> "spark.sql.parquet" {
> cacheMetadata = "true",
> compression.codec = "snappy"
> }
>
> "spark.serializer" = "org.apache.spark.serializer.KryoSerializer",
> {quote}
> I didn't modify the speculation setting, so I'm assuming it's disabled.
> Input path is s3n://<path>
> Output path is hdfs:///<path> (i.e. ephemeral hdfs)
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]