[
https://issues.apache.org/jira/browse/SPARK-14803?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15251862#comment-15251862
]
Sun Rui commented on SPARK-14803:
---------------------------------
cc [~cloud_fan]
I will submit a PR for this, but not sure if it is the correct fix. please help
to review it
> A bug in EliminateSerialization rule in Catalyst Optimizer
> -----------------------------------------------------------
>
> Key: SPARK-14803
> URL: https://issues.apache.org/jira/browse/SPARK-14803
> Project: Spark
> Issue Type: Bug
> Components: Optimizer, SQL
> Reporter: Sun Rui
>
> When I rebased my PR https://github.com/apache/spark/pull/12493 to master, I
> found a bug in EliminateSerialization rule in Catalyst Optimizer, which was
> introduced in the PR https://github.com/apache/spark/pull/12260.
> The related code is:
> {code}
> object EliminateSerialization extends Rule[LogicalPlan] {
> def apply(plan: LogicalPlan): LogicalPlan = plan transform {
> case d @ DeserializeToObject(_, _, s: SerializeFromObject)
> if (d.outputObjectType == s.inputObjectType) =>
> // Adds an extra Project here, to preserve the output expr id of
> `DeserializeToObject`.
> val objAttr = Alias(s.child.output.head, "obj")(exprId =
> d.output.head.exprId)
> Project(objAttr :: Nil, s.child)
> {code}
> In my PR, when there are multiple successive calls to dapply(), the
> SerializeFromObject and DeserializeToObject logical operators will be
> eliminated and replaced with a Project operator. However, the involved object
> is Row, and there is no support for Row in UnsafeRowWriter.
> Detailed error message:
> {panel}
> 1. Error: dapply() on a DataFrame
> ----------------------------------------------
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in
> stage 1156.0 failed 1 times, most recent failure: Lost task 0.0 in stage
> 1156.0 (TID 9648, localhost): java.util.concurrent.ExecutionException:
> java.lang.Exception: failed to compile:
> org.codehaus.commons.compiler.CompileException: File 'generated.java', Line
> 31, Column 29: No applicable constructor/method found for actual parameters
> "int, org.apache.spark.sql.Row"; candidates are: "public void
> org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter.write(int,
> byte[])", "public void
> org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter.write(int,
> org.apache.spark.unsafe.types.UTF8String)", "public void
> org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter.write(int,
> org.apache.spark.sql.types.Decimal, int, int)", "public void
> org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter.write(int,
> double)", "public void
> org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter.write(int,
> byte[], int, int)", "public void
> org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter.write(int,
> org.apache.spark.unsafe.types.CalendarInterval)", "public void
> org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter.write(int,
> byte)", "public void
> org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter.write(int,
> boolean)", "public void
> org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter.write(int,
> short)", "public void
> org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter.write(int,
> int)", "public void
> org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter.write(int,
> long)", "public void
> org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter.write(int,
> float)"
> /* 001 */
> /* 002 */ public java.lang.Object generate(Object[] references) {
> /* 003 */ return new SpecificUnsafeProjection(references);
> /* 004 */ }
> /* 005 */
> /* 006 */ class SpecificUnsafeProjection extends
> org.apache.spark.sql.catalyst.expressions.UnsafeProjection {
> /* 007 */
> /* 008 */ private Object[] references;
> /* 009 */ private UnsafeRow result;
> /* 010 */ private
> org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder holder;
> /* 011 */ private
> org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter rowWriter;
> /* 012 */
> /* 013 */
> /* 014 */ public SpecificUnsafeProjection(Object[] references) {
> /* 015 */ this.references = references;
> /* 016 */ result = new UnsafeRow(1);
> /* 017 */ this.holder = new
> org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(result, 32);
> /* 018 */ this.rowWriter = new
> org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(holder, 1);
> /* 019 */ }
> /* 020 */
> /* 021 */ // Scala.Function1 need this
> /* 022 */ public java.lang.Object apply(java.lang.Object row) {
> /* 023 */ return apply((InternalRow) row);
> /* 024 */ }
> /* 025 */
> /* 026 */ public UnsafeRow apply(InternalRow i) {
> /* 027 */ holder.reset();
> /* 028 */
> /* 029 */ /* input[0, org.apache.spark.sql.Row] */
> /* 030 */ org.apache.spark.sql.Row value =
> (org.apache.spark.sql.Row)i.get(0, null);
> /* 031 */ rowWriter.write(0, value);
> /* 032 */ result.setTotalSize(holder.totalSize());
> /* 033 */ return result;
> /* 034 */ }
> /* 035 */ }
> /* 036 */
> at
> org.spark_project.guava.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:306)
> at
> org.spark_project.guava.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:293)
> at
> org.spark_project.guava.util.concurrent.AbstractFuture.get(AbstractFuture.java:116)
> at
> org.spark_project.guava.util.concurrent.Uninterruptibles.getUninterruptibly(Uninterruptibles.java:135)
> at
> org.spark_project.guava.cache.LocalCache$Segment.getAndRecordStats(LocalCache.java:2410)
> at
> org.spark_project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2380)
> at
> org.spark_project.guava.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342)
> at
> org.spark_project.guava.cache.LocalCache$Segment.get(LocalCache.java:2257)
> at org.spark_project.guava.cache.LocalCache.get(LocalCache.java:4000)
> at
> org.spark_project.guava.cache.LocalCache.getOrLoad(LocalCache.java:4004)
> at
> org.spark_project.guava.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4874)
> at
> org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.compile(CodeGenerator.scala:636)
> at
> org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.create(GenerateUnsafeProjection.scala:395)
> at
> org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.generate(GenerateUnsafeProjection.scala:352)
> at
> org.apache.spark.sql.catalyst.expressions.UnsafeProjection$.create(Projection.scala:151)
> at
> org.apache.spark.sql.execution.Project$$anonfun$8.apply(basicOperators.scala:67)
> at
> org.apache.spark.sql.execution.Project$$anonfun$8.apply(basicOperators.scala:66)
> at
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$23.apply(RDD.scala:771)
> at
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$23.apply(RDD.scala:771)
> at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:318)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:282)
> at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:318)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:282)
> at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:318)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:282)
> at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:318)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:282)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
> at org.apache.spark.scheduler.Task.run(Task.scala:86)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:254)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> {panel}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]