[ https://issues.apache.org/jira/browse/SPARK-23183?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16335153#comment-16335153 ]
Sean Owen commented on SPARK-23183: ----------------------------------- I don't think a custom RDD that spawns threads is guaranteed to work; things like ThreadLocals aren't set as expected. > Failure caused by TaskContext is missing in the thread spawned by Custom RDD > ----------------------------------------------------------------------------- > > Key: SPARK-23183 > URL: https://issues.apache.org/jira/browse/SPARK-23183 > Project: Spark > Issue Type: Bug > Components: Spark Core > Affects Versions: 2.1.0, 2.2.1 > Environment: This issue is only reproducible when user code spawns a > new thread where the RDD iterator is used. It can be reproduced in any spark > version. > Reporter: Yongqin Xiao > Priority: Critical > Labels: easyfix > > This is related to the already resolved issue > https://issues.apache.org/jira/browse/SPARK-18406., which was resolved by > "explicitly pass the TID value to the {{unlock}} method". > The fix resolved the use cases at that time. > However, a new use case failed, which has a custom RDD followed by an > aggregator. The stack trace is: > {noformat} > 18/01/22 13:19:51 WARN TaskSetManager: Lost task 75.0 in stage 2.0 (TID 124, > psrhcdh58c2c001.informatica.com, executor 3): java.lang.RuntimeException: DTM > execution failed with error: > com.informatica.powercenter.sdk.dtm.DTMException: > java.lang.NullPointerException > [com.informatica.powercenter.sdk.dtm.DTMException: > java.lang.NullPointerException > at > com.informatica.products.infatransform.spark.edtm.DataExchangeRunnable$DataChannelRunnable.pushInputDataToDTM(DataExchangeRunnable.java:554) > at > com.informatica.products.infatransform.spark.edtm.DataExchangeRunnable$DataChannelRunnable.run(DataExchangeRunnable.java:424) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.NullPointerException > at > <---------------------------------------------------org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.next(TungstenAggregationIterator.scala:419) > at > org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.next(TungstenAggregationIterator.scala:79) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) > at scala.collection.convert.Wrappers$IteratorWrapper.next(Wrappers.scala:31) > at > com.informatica.products.infatransform.spark.edtm.DataExchangeRunnable$DataChannelRunnable.pushInputDataToDTM(DataExchangeRunnable.java:486) > ... 2 more > ] > at > com.informatica.products.infatransform.spark.edtm.DTMIteratorForBulkOutput.finishUp(DTMIteratorForBulkOutput.java:94) > at > com.informatica.products.infatransform.spark.edtm.DTMIteratorForBulkOutput.hasNext(DTMIteratorForBulkOutput.java:53) > at > scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:42) > at > org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) > at > org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:215) > at > org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:968) > at > org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:959) > at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:899) > at > org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:959) > at > org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:705) > at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:285) > at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) > at org.apache.spark.scheduler.Task.run(Task.scala:99) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:748) > Caused by: > com.informatica.products.infatransform.spark.boot.SparkDTMException: > com.informatica.powercenter.sdk.dtm.DTMException: > java.lang.NullPointerException > [com.informatica.powercenter.sdk.dtm.DTMException: > java.lang.NullPointerException > at > com.informatica.products.infatransform.spark.edtm.DataExchangeRunnable$DataChannelRunnable.pushInputDataToDTM(DataExchangeRunnable.java:554) > at > com.informatica.products.infatransform.spark.edtm.DataExchangeRunnable$DataChannelRunnable.run(DataExchangeRunnable.java:424) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.NullPointerException > at > org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.next(TungstenAggregationIterator.scala:419) > at > org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.next(TungstenAggregationIterator.scala:79) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) > at scala.collection.convert.Wrappers$IteratorWrapper.next(Wrappers.scala:31) > at > com.informatica.products.infatransform.spark.edtm.DataExchangeRunnable$DataChannelRunnable.pushInputDataToDTM(DataExchangeRunnable.java:486) > ... 2 more > ] > at > com.informatica.products.infatransform.spark.edtm.DataExchangeRunnable.run(DataExcha{noformat} > ngeRunnable.java:220) > ... 1 more > Code of TungstenAggregationIterator.scala:419: > TaskMetrics metrics = TaskContext..MODULE$.get().taskMetrics(); > It's possible that there are other spark code like this where TaskContext is > expected to be set in thread. > So, we have a simple request for spark to make TaskContext.setTaskContext > public instead of protected. -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org