[ 
https://issues.apache.org/jira/browse/SPARK-10109?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15013088#comment-15013088
 ] 

Virgil Palanciuc commented on SPARK-10109:
------------------------------------------

Haven't tried it lately  - just ran it with parallel execution disabled.

Like I said -  root cause seems to be the fact that I was writing 2 dataframes 
simultaneously in the same destination (albeit with a partitioning scheme that 
made the destination non-overlapping). I partition by the columns "dpid" and 
"pid" and if I process 2 PIDs in parallel (on different threads), and they both 
have the same DPID (e.g I simultaneously write to  <blah>/dpid=1/pid=1/<files>  
and to <blah>/dpid=1/pid=2/<files> ), I get this problem. This seems to be 
caused by the fact that, on 2 different threads, I do 
"df.write.partitionBy().parquet('<blah>') " - i.e. I write 2 DataFrames 
simultaneously "to the same destination" (due to partitioning, the actual files 
should never overlap; but it still seems to be a problem).

Not sure if this was really a Spark bug or it's an application problem - if you 
think Spark should've worked in this scenario, let me know and I'll retry it. 
But it kinda' feels it was really an application bug (bad assumption on my part 
about how writing works)

> NPE when saving Parquet To HDFS
> -------------------------------
>
>                 Key: SPARK-10109
>                 URL: https://issues.apache.org/jira/browse/SPARK-10109
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 1.4.1
>         Environment: Sparc-ec2, standalone cluster on amazon
>            Reporter: Virgil Palanciuc
>
> Very simple code, trying to save a dataframe
> I get this in the driver
> {quote}
> 15/08/19 11:21:41 INFO TaskSetManager: Lost task 9.2 in stage 217.0 (TID 
> 4748) on executor 172.xx.xx.xx: java.lang.NullPointerException (null) 
> and  (not for that task):
> 15/08/19 11:21:46 WARN TaskSetManager: Lost task 5.0 in stage 543.0 (TID 
> 5607, 172.yy.yy.yy): java.lang.NullPointerException
>         at 
> parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:146)
>         at 
> parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:112)
>         at 
> parquet.hadoop.ParquetRecordWriter.close(ParquetRecordWriter.java:73)
>         at 
> org.apache.spark.sql.parquet.ParquetOutputWriter.close(newParquet.scala:88)
>         at 
> org.apache.spark.sql.sources.DynamicPartitionWriterContainer$$anonfun$clearOutputWriters$1.apply(commands.scala:536)
>         at 
> org.apache.spark.sql.sources.DynamicPartitionWriterContainer$$anonfun$clearOutputWriters$1.apply(commands.scala:536)
>         at 
> scala.collection.mutable.HashMap$$anon$2$$anonfun$foreach$3.apply(HashMap.scala:107)
>         at 
> scala.collection.mutable.HashMap$$anon$2$$anonfun$foreach$3.apply(HashMap.scala:107)
>         at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
>         at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
>         at scala.collection.mutable.HashMap$$anon$2.foreach(HashMap.scala:107)
>         at 
> org.apache.spark.sql.sources.DynamicPartitionWriterContainer.clearOutputWriters(commands.scala:536)
>         at 
> org.apache.spark.sql.sources.DynamicPartitionWriterContainer.abortTask(commands.scala:552)
>         at 
> org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org$apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$2(commands.scala:269)
>         at 
> org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insertWithDynamicPartitions$3.apply(commands.scala:229)
>         at 
> org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insertWithDynamicPartitions$3.apply(commands.scala:229)
>         at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
>         at org.apache.spark.scheduler.Task.run(Task.scala:70)
>         at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>         at java.lang.Thread.run(Thread.java:745)
> {quote}
> I get this in the executor log:
> {quote}
> 15/08/19 11:21:41 WARN DFSClient: DataStreamer Exception
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException):
>  No lease on 
> /gglogs/2015-07-27/_temporary/_attempt_201508191119_0217_m_000009_2/dpid=18432/pid=1109/part-r-00009-46ac3a79-a95c-4d9c-a2f1-b3ee76f6a46c.snappy.parquet
>  File does not exist. Holder DFSClient_NONMAPREDUCE_1730998114_63 does not 
> have any open files.
>       at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:2396)
>       at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:2387)
>       at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:2183)
>       at 
> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:481)
>       at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:297)
>       at 
> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java:44080)
>       at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:453)
>       at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1002)
>       at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1695)
>       at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1691)
>       at java.security.AccessController.doPrivileged(Native Method)
>       at javax.security.auth.Subject.doAs(Subject.java:415)
>       at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1408)
>       at org.apache.hadoop.ipc.Server$Handler.run(Server.java:1689)
>       at org.apache.hadoop.ipc.Client.call(Client.java:1225)
>       at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:202)
>       at com.sun.proxy.$Proxy14.addBlock(Unknown Source)
>       at sun.reflect.GeneratedMethodAccessor33.invoke(Unknown Source)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:606)
>       at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:164)
>       at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:83)
>       at com.sun.proxy.$Proxy14.addBlock(Unknown Source)
>       at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.addBlock(ClientNamenodeProtocolTranslatorPB.java:290)
>       at 
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.locateFollowingBlock(DFSOutputStream.java:1150)
>       at 
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1003)
>       at 
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:463)
> 15/08/19 11:21:41 ERROR InsertIntoHadoopFsRelation: Aborting task.
> java.lang.RuntimeException: Failed to commit task
>       at 
> org.apache.spark.sql.sources.DynamicPartitionWriterContainer.commitTask(commands.scala:546)
>       at 
> org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org$apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$2(commands.scala:266)
>       at 
> org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insertWithDynamicPartitions$3.apply(commands.scala:229)
>       at 
> org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insertWithDynamicPartitions$3.apply(commands.scala:229)
>       at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
>       at org.apache.spark.scheduler.Task.run(Task.scala:70)
>       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>       at java.lang.Thread.run(Thread.java:745)
> Caused by: 
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException):
>  No lease on 
> /gglogs/2015-07-27/_temporary/_attempt_201508191119_0217_m_000009_2/dpid=18432/pid=1109/part-r-00009-46ac3a79-a95c-4d9c-a2f1-b3ee76f6a46c.snappy.parquet
>  File does not exist. Holder DFSClient_NONMAPREDUCE_1730998114_63 does not 
> have any open files.
>       at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:2396)
>       at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:2387)
>       at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:2183)
>       at 
> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:481)
>       at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:297)
>       at 
> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java:44080)
>       at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:453)
>       at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1002)
>       at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1695)
>       at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1691)
>       at java.security.AccessController.doPrivileged(Native Method)
>       at javax.security.auth.Subject.doAs(Subject.java:415)
>       at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1408)
>       at org.apache.hadoop.ipc.Server$Handler.run(Server.java:1689)
>       at org.apache.hadoop.ipc.Client.call(Client.java:1225)
>       at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:202)
>       at com.sun.proxy.$Proxy14.addBlock(Unknown Source)
>       at sun.reflect.GeneratedMethodAccessor33.invoke(Unknown Source)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:606)
>       at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:164)
>       at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:83)
>       at com.sun.proxy.$Proxy14.addBlock(Unknown Source)
>       at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.addBlock(ClientNamenodeProtocolTranslatorPB.java:290)
>       at 
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.locateFollowingBlock(DFSOutputStream.java:1150)
>       at 
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1003)
>       at 
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:463)
> 15/08/19 11:21:41 ERROR DynamicPartitionWriterContainer: Task attempt 
> attempt_201508191119_0217_m_000009_2 aborted.
> 15/08/19 11:21:41 ERROR Executor: Exception in task 9.2 in stage 217.0 (TID 
> 4748)
> java.lang.NullPointerException
>       at 
> parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:146)
>       at 
> parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:112)
>       at parquet.hadoop.ParquetRecordWriter.close(ParquetRecordWriter.java:73)
>       at 
> org.apache.spark.sql.parquet.ParquetOutputWriter.close(newParquet.scala:88)
>       at 
> org.apache.spark.sql.sources.DynamicPartitionWriterContainer$$anonfun$clearOutputWriters$1.apply(commands.scala:536)
>       at 
> org.apache.spark.sql.sources.DynamicPartitionWriterContainer$$anonfun$clearOutputWriters$1.apply(commands.scala:536)
>       at 
> scala.collection.mutable.HashMap$$anon$2$$anonfun$foreach$3.apply(HashMap.scala:107)
>       at 
> scala.collection.mutable.HashMap$$anon$2$$anonfun$foreach$3.apply(HashMap.scala:107)
>       at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
>       at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
>       at scala.collection.mutable.HashMap$$anon$2.foreach(HashMap.scala:107)
>       at 
> org.apache.spark.sql.sources.DynamicPartitionWriterContainer.clearOutputWriters(commands.scala:536)
>       at 
> org.apache.spark.sql.sources.DynamicPartitionWriterContainer.abortTask(commands.scala:552)
>       at 
> org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org$apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$2(commands.scala:269)
>       at 
> org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insertWithDynamicPartitions$3.apply(commands.scala:229)
>       at 
> org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insertWithDynamicPartitions$3.apply(commands.scala:229)
>       at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
>       at org.apache.spark.scheduler.Task.run(Task.scala:70)
>       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>       at java.lang.Thread.run(Thread.java:745)
> {quote}
> this is the code that I'm using:
> {quote}
>         val ppRDD=
>               sparkContext
>               .sequenceFile[Array[Byte], String](inputPath)
>               .values
>               .repartition(numPartitions)
>               .filter( <criteria>)
>               .flatMap(line => parseGGLogLine(<params; including 2 
> broadcasted variables, one a "Set" and one a "Map">)
>             if(ppRDD.isEmpty())
>               logInfo(s"<message>")
>             else
>               ppRDD.toDF().write
>               .partitionBy("dpid","pid")
>               .mode( SaveMode.Append )
>               .parquet(outputPath)
> {quote}
> possibly relevant configuration:
> {quote}
>     "spark.sql.parquet" {
>       cacheMetadata = "true",
>       compression.codec = "snappy"
>     }
>     
>  "spark.serializer" = "org.apache.spark.serializer.KryoSerializer",
> {quote}
> I didn't modify the speculation setting, so I'm assuming it's disabled.
> Input path is  s3n://<path>
> Output path is hdfs:///<path> (i.e. ephemeral hdfs)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to