Repository: spark
Updated Branches:
refs/heads/master 310a8cd06 -> cce469435
[SPARK-24002][SQL] Task not serializable caused by
org.apache.parquet.io.api.Binary$ByteBufferBackedBinary.getBytes
## What changes were proposed in this pull request?
```
Py4JJavaError: An error occurred while calling o153.sql.
: org.apache.spark.SparkException: Job aborted.
at
org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:223)
at
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:189)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79)
at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:190)
at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:190)
at org.apache.spark.sql.Dataset$$anonfun$59.apply(Dataset.scala:3021)
at
org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:89)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:127)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3020)
at org.apache.spark.sql.Dataset.<init>(Dataset.scala:190)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:74)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:646)
at sun.reflect.GeneratedMethodAccessor153.invoke(Unknown Source)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
at py4j.Gateway.invoke(Gateway.java:293)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:226)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Exception thrown in Future.get:
at
org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:190)
at
org.apache.spark.sql.execution.InputAdapter.doExecuteBroadcast(WholeStageCodegenExec.scala:267)
at
org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoinExec.doConsume(BroadcastNestedLoopJoinExec.scala:530)
at
org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:155)
at
org.apache.spark.sql.execution.ProjectExec.consume(basicPhysicalOperators.scala:37)
at
org.apache.spark.sql.execution.ProjectExec.doConsume(basicPhysicalOperators.scala:69)
at
org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:155)
at
org.apache.spark.sql.execution.FilterExec.consume(basicPhysicalOperators.scala:144)
...
at
org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:190)
... 23 more
Caused by: java.util.concurrent.ExecutionException:
org.apache.spark.SparkException: Task not serializable
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:206)
at
org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:179)
... 276 more
Caused by: org.apache.spark.SparkException: Task not serializable
at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:340)
at
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:330)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:156)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2380)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:850)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:849)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:371)
at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:849)
at
org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:417)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:123)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:118)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$3.apply(SparkPlan.scala:152)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:149)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:118)
at
org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.prepareShuffleDependency(ShuffleExchangeExec.scala:89)
at
org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:125)
at
org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:116)
at
org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
at
org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:116)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:123)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:118)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$3.apply(SparkPlan.scala:152)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:149)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:118)
at
org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:271)
at
org.apache.spark.sql.execution.aggregate.HashAggregateExec.inputRDDs(HashAggregateExec.scala:181)
at
org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:414)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:123)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:118)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$3.apply(SparkPlan.scala:152)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:149)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:118)
at
org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:61)
at
org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:70)
at
org.apache.spark.sql.execution.SparkPlan.executeCollectResult(SparkPlan.scala:264)
at
org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anon$1$$anonfun$call$1.apply(BroadcastExchangeExec.scala:93)
at
org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anon$1$$anonfun$call$1.apply(BroadcastExchangeExec.scala:81)
at
org.apache.spark.sql.execution.SQLExecution$.withExecutionId(SQLExecution.scala:150)
at
org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anon$1.call(BroadcastExchangeExec.scala:80)
at
org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anon$1.call(BroadcastExchangeExec.scala:76)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
Caused by: java.nio.BufferUnderflowException
at java.nio.HeapByteBuffer.get(HeapByteBuffer.java:151)
at java.nio.ByteBuffer.get(ByteBuffer.java:715)
at
org.apache.parquet.io.api.Binary$ByteBufferBackedBinary.getBytes(Binary.java:405)
at
org.apache.parquet.io.api.Binary$ByteBufferBackedBinary.getBytesUnsafe(Binary.java:414)
at
org.apache.parquet.io.api.Binary$ByteBufferBackedBinary.writeObject(Binary.java:484)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1128)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
```
The Parquet filters are serializable but not thread safe. SparkPlan.prepare()
could be called in different threads (BroadcastExchange will call it in a
thread pool). Thus, we could serialize the same Parquet filter at the same
time. This is not easily reproduced. The fix is to avoid serializing these
Parquet filters in the driver. This PR is to avoid serializing these Parquet
filters by moving the parquet filter generation from the driver to executors.
## How was this patch tested?
Having two queries one is a 1000-line SQL query and a 3000-line SQL query. Need
to run at least one hour with a heavy write workload to reproduce once.
Author: gatorsmile <[email protected]>
Closes #21086 from gatorsmile/taskNotSerializable.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cce46943
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cce46943
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cce46943
Branch: refs/heads/master
Commit: cce469435d61bda5893d9aa6cfdf7ea46fa717df
Parents: 310a8cd
Author: gatorsmile <[email protected]>
Authored: Tue Apr 17 21:03:57 2018 -0700
Committer: gatorsmile <[email protected]>
Committed: Tue Apr 17 21:03:57 2018 -0700
----------------------------------------------------------------------
.../datasources/parquet/ParquetFileFormat.scala | 27 ++++++++++----------
1 file changed, 14 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/cce46943/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index 476bd02..d8f47ee 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -321,19 +321,6 @@ class ParquetFileFormat
SQLConf.PARQUET_INT96_AS_TIMESTAMP.key,
sparkSession.sessionState.conf.isParquetINT96AsTimestamp)
- // Try to push down filters when filter push-down is enabled.
- val pushed =
- if (sparkSession.sessionState.conf.parquetFilterPushDown) {
- filters
- // Collects all converted Parquet filter predicates. Notice that not
all predicates can be
- // converted (`ParquetFilters.createFilter` returns an `Option`).
That's why a `flatMap`
- // is used here.
- .flatMap(ParquetFilters.createFilter(requiredSchema, _))
- .reduceOption(FilterApi.and)
- } else {
- None
- }
-
val broadcastedHadoopConf =
sparkSession.sparkContext.broadcast(new
SerializableConfiguration(hadoopConf))
@@ -351,12 +338,26 @@ class ParquetFileFormat
val timestampConversion: Boolean =
sparkSession.sessionState.conf.isParquetINT96TimestampConversion
val capacity = sqlConf.parquetVectorizedReaderBatchSize
+ val enableParquetFilterPushDown: Boolean =
+ sparkSession.sessionState.conf.parquetFilterPushDown
// Whole stage codegen (PhysicalRDD) is able to deal with batches directly
val returningBatch = supportBatch(sparkSession, resultSchema)
(file: PartitionedFile) => {
assert(file.partitionValues.numFields == partitionSchema.size)
+ // Try to push down filters when filter push-down is enabled.
+ val pushed = if (enableParquetFilterPushDown) {
+ filters
+ // Collects all converted Parquet filter predicates. Notice that not
all predicates can be
+ // converted (`ParquetFilters.createFilter` returns an `Option`).
That's why a `flatMap`
+ // is used here.
+ .flatMap(ParquetFilters.createFilter(requiredSchema, _))
+ .reduceOption(FilterApi.and)
+ } else {
+ None
+ }
+
val fileSplit =
new FileSplit(new Path(new URI(file.filePath)), file.start,
file.length, Array.empty)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]