[ https://issues.apache.org/jira/browse/IGNITE-11724?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Andrey Aleksandrov updated IGNITE-11724: ---------------------------------------- Description: Next code could hang in case if PairFunction logic will throw the exception: {code:java} public class Example { public static void main(String[] args) { String configPath = "/home/andrei/BDP/big-data-accelerator/modules/gridgain-spark-loader-examples/config/client.xml"; IgniteSparkSession igniteSession = IgniteSparkSession.builder() .appName("Spark Ignite catalog example") .master("local") .config("ignite.disableSparkSQLOptimization", true) .igniteConfig(configPath) .getOrCreate(); JavaSparkContext sparkCtx = new JavaSparkContext(igniteSession.sparkContext()); final JavaRDD<Row> records = sparkCtx.parallelize(Arrays.asList( new GenericRow() )); JavaPairRDD<Integer, Integer> rdd_records = records.mapToPair(new PairFunction<Row, Integer, Integer>() { @Override public Tuple2<Integer, Integer> call(Row row) throws Exception { throw new IllegalStateException("some error"); } }); JavaIgniteContext<Integer, Integer> igniteContext = new JavaIgniteContext<>(sparkCtx, configPath); JavaIgniteRDD<Integer, Integer> igniteRdd = igniteContext.<Integer, Integer>fromCache("Person"); igniteRdd.savePairs(rdd_records); igniteContext.close(true); } } Looks like next internal code (saveValues method)should also close the IgniteContext in case of an unexpected exception, not only data streamer: try { it.foreach(value ⇒ { val key = affinityKeyFunc(value, node.orNull) streamer.addData(key, value) } ) } finally { streamer.close() } }) } {code} was: Next code could hang in case if PairFunction logic will throw the exception: {code:java} JavaPairRDD<Key, Value> rdd_records = records.mapToPair(new MapFunction()); JavaIgniteContext<Key, Value> igniteContext = new JavaIgniteContext<>(sparkCtx, configUrl); JavaIgniteRDD<Key, Value> igniteRdd = igniteContext.<Key, Value>fromCache(cacheName); igniteRdd.savePairs(rdd_records); Looks like next internal code (saveValues method)should also close the IgniteContext in case of an unexpected exception, not only data streamer: try { it.foreach(value ⇒ { val key = affinityKeyFunc(value, node.orNull) streamer.addData(key, value) } ) } finally { streamer.close() } }) } {code} > IgniteSpark integration forget to close the IgniteContext and stops the > client node in case if error during PairFunction logic > ------------------------------------------------------------------------------------------------------------------------------- > > Key: IGNITE-11724 > URL: https://issues.apache.org/jira/browse/IGNITE-11724 > Project: Ignite > Issue Type: Bug > Components: spark > Affects Versions: 2.8 > Reporter: Andrey Aleksandrov > Assignee: Alexey Zinoviev > Priority: Major > Labels: await > Fix For: 2.8 > > Time Spent: 20m > Remaining Estimate: 0h > > Next code could hang in case if PairFunction logic will throw the exception: > {code:java} > public class Example { > public static void main(String[] args) { > String configPath = > "/home/andrei/BDP/big-data-accelerator/modules/gridgain-spark-loader-examples/config/client.xml"; > IgniteSparkSession igniteSession = IgniteSparkSession.builder() > .appName("Spark Ignite catalog example") > .master("local") > .config("ignite.disableSparkSQLOptimization", true) > .igniteConfig(configPath) > .getOrCreate(); > JavaSparkContext sparkCtx = new > JavaSparkContext(igniteSession.sparkContext()); > final JavaRDD<Row> records = sparkCtx.parallelize(Arrays.asList( > new GenericRow() > )); > JavaPairRDD<Integer, Integer> rdd_records = records.mapToPair(new > PairFunction<Row, Integer, Integer>() { > @Override > public Tuple2<Integer, Integer> call(Row row) throws Exception { > throw new IllegalStateException("some error"); > } > }); > JavaIgniteContext<Integer, Integer> igniteContext = new > JavaIgniteContext<>(sparkCtx, configPath); > JavaIgniteRDD<Integer, Integer> igniteRdd = igniteContext.<Integer, > Integer>fromCache("Person"); > igniteRdd.savePairs(rdd_records); > igniteContext.close(true); > } > } > Looks like next internal code (saveValues method)should also close the > IgniteContext in case of an unexpected exception, not only data streamer: > try { > it.foreach(value ⇒ > { val key = affinityKeyFunc(value, node.orNull) > streamer.addData(key, value) } > ) > } > finally > { streamer.close() } > }) > } > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)