[
https://issues.apache.org/jira/browse/SPARK-23183?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Hyukjin Kwon updated SPARK-23183:
---------------------------------
Labels: bulk-closed (was: )
> Failure caused by TaskContext is missing in the thread spawned by user code
> ---------------------------------------------------------------------------
>
> Key: SPARK-23183
> URL: https://issues.apache.org/jira/browse/SPARK-23183
> Project: Spark
> Issue Type: Bug
> Components: Spark Core
> Affects Versions: 2.1.0, 2.2.1
> Environment: This issue is only reproducible when user code spawns a
> new thread where the RDD iterator is used. It can be reproduced in any spark
> version.
> Reporter: Yongqin Xiao
> Priority: Major
> Labels: bulk-closed
>
> This is related to the already resolved issue
> https://issues.apache.org/jira/browse/SPARK-18406., which was resolved by
> "explicitly pass the TID value to the {{unlock}} method".
> The fix resolved the use cases at that time.
> However, a new use case failed, which has a custom RDD followed by an
> aggregator. The stack trace is:
> {noformat}
> 18/01/22 13:19:51 WARN TaskSetManager: Lost task 75.0 in stage 2.0 (TID 124,
> psrhcdh58c2c001.informatica.com, executor 3): java.lang.RuntimeException: DTM
> execution failed with error:
> com.informatica.powercenter.sdk.dtm.DTMException:
> java.lang.NullPointerException
> [com.informatica.powercenter.sdk.dtm.DTMException:
> java.lang.NullPointerException
> at
> com.informatica.products.infatransform.spark.edtm.DataExchangeRunnable$DataChannelRunnable.pushInputDataToDTM(DataExchangeRunnable.java:554)
> at
> com.informatica.products.infatransform.spark.edtm.DataExchangeRunnable$DataChannelRunnable.run(DataExchangeRunnable.java:424)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.NullPointerException
> at
> <---------------------------------------------------org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.next(TungstenAggregationIterator.scala:419)
> at
> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.next(TungstenAggregationIterator.scala:79)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
> at scala.collection.convert.Wrappers$IteratorWrapper.next(Wrappers.scala:31)
> at
> com.informatica.products.infatransform.spark.edtm.DataExchangeRunnable$DataChannelRunnable.pushInputDataToDTM(DataExchangeRunnable.java:486)
> ... 2 more
> ]
> at
> com.informatica.products.infatransform.spark.edtm.DTMIteratorForBulkOutput.finishUp(DTMIteratorForBulkOutput.java:94)
> at
> com.informatica.products.infatransform.spark.edtm.DTMIteratorForBulkOutput.hasNext(DTMIteratorForBulkOutput.java:53)
> at
> scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:42)
> at
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
> at
> org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:215)
> at
> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:968)
> at
> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:959)
> at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:899)
> at
> org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:959)
> at
> org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:705)
> at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
> at org.apache.spark.scheduler.Task.run(Task.scala:99)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:748)
> Caused by:
> com.informatica.products.infatransform.spark.boot.SparkDTMException:
> com.informatica.powercenter.sdk.dtm.DTMException:
> java.lang.NullPointerException
> [com.informatica.powercenter.sdk.dtm.DTMException:
> java.lang.NullPointerException
> at
> com.informatica.products.infatransform.spark.edtm.DataExchangeRunnable$DataChannelRunnable.pushInputDataToDTM(DataExchangeRunnable.java:554)
> at
> com.informatica.products.infatransform.spark.edtm.DataExchangeRunnable$DataChannelRunnable.run(DataExchangeRunnable.java:424)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.NullPointerException
> at
> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.next(TungstenAggregationIterator.scala:419)
> at
> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.next(TungstenAggregationIterator.scala:79)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
> at scala.collection.convert.Wrappers$IteratorWrapper.next(Wrappers.scala:31)
> at
> com.informatica.products.infatransform.spark.edtm.DataExchangeRunnable$DataChannelRunnable.pushInputDataToDTM(DataExchangeRunnable.java:486)
> ... 2 more
> ]
> at
> com.informatica.products.infatransform.spark.edtm.DataExchangeRunnable.run(DataExcha{noformat}
> ngeRunnable.java:220)
> ... 1 more
> Code of TungstenAggregationIterator.scala:419:
> TaskMetrics metrics = TaskContext..MODULE$.get().taskMetrics();
> It's possible that there are other spark code like this where TaskContext is
> expected to be set in thread.
> So, we have a simple request for spark to make TaskContext.setTaskContext
> public instead of protected.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]