[ 
https://issues.apache.org/jira/browse/IGNITE-11724?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrey Aleksandrov updated IGNITE-11724:
----------------------------------------
    Description: 
Next code could hang in case if PairFunction logic will throw the exception:
{code:java}
public class Example {
    public static void main(String[] args) {
        String configPath = 
"/home/andrei/BDP/big-data-accelerator/modules/gridgain-spark-loader-examples/config/client.xml";
        IgniteSparkSession igniteSession = IgniteSparkSession.builder()
                .appName("Spark Ignite catalog example")
                .master("local")
                .config("ignite.disableSparkSQLOptimization", true)
                .igniteConfig(configPath)
                .getOrCreate();
        JavaSparkContext sparkCtx = new 
JavaSparkContext(igniteSession.sparkContext());
        final JavaRDD<Row> records = sparkCtx.parallelize(Arrays.asList(
                new GenericRow()
        ));
        JavaPairRDD<Integer, Integer> rdd_records = records.mapToPair(new 
PairFunction<Row, Integer, Integer>() {
            @Override
            public Tuple2<Integer, Integer> call(Row row) throws Exception {
                throw new IllegalStateException("some error");
            }
        });
        JavaIgniteContext<Integer, Integer> igniteContext = new 
JavaIgniteContext<>(sparkCtx, configPath);
        JavaIgniteRDD<Integer, Integer> igniteRdd = igniteContext.<Integer, 
Integer>fromCache("Person");
        igniteRdd.savePairs(rdd_records);
        igniteContext.close(true);
    }
}

Looks like next internal code (saveValues method)should also close the 
IgniteContext in case of an unexpected exception, not only data streamer:

try {
     it.foreach(value ⇒

{          val key = affinityKeyFunc(value, node.orNull)           
streamer.addData(key, value)        }

)
     }
     finally

{         streamer.close()     }

})
 }

{code}

  was:
Next code could hang in case if PairFunction logic will throw the exception:

{code:java}

JavaPairRDD<Key, Value> rdd_records = records.mapToPair(new MapFunction());

JavaIgniteContext<Key, Value> igniteContext = new JavaIgniteContext<>(sparkCtx, 
configUrl);

JavaIgniteRDD<Key, Value> igniteRdd = igniteContext.<Key, 
Value>fromCache(cacheName);

igniteRdd.savePairs(rdd_records);

Looks like next internal code (saveValues method)should also close the 
IgniteContext in case of an unexpected exception, not only data streamer:

try {
     it.foreach(value ⇒

{          val key = affinityKeyFunc(value, node.orNull)           
streamer.addData(key, value)        }

)
     }
     finally

{         streamer.close()     }

})
 }

{code}


> IgniteSpark integration forget to close the IgniteContext and stops the 
> client node in case if error during PairFunction logic 
> -------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: IGNITE-11724
>                 URL: https://issues.apache.org/jira/browse/IGNITE-11724
>             Project: Ignite
>          Issue Type: Bug
>          Components: spark
>    Affects Versions: 2.8
>            Reporter: Andrey Aleksandrov
>            Assignee: Alexey Zinoviev
>            Priority: Major
>              Labels: await
>             Fix For: 2.8
>
>          Time Spent: 20m
>  Remaining Estimate: 0h
>
> Next code could hang in case if PairFunction logic will throw the exception:
> {code:java}
> public class Example {
>     public static void main(String[] args) {
>         String configPath = 
> "/home/andrei/BDP/big-data-accelerator/modules/gridgain-spark-loader-examples/config/client.xml";
>         IgniteSparkSession igniteSession = IgniteSparkSession.builder()
>                 .appName("Spark Ignite catalog example")
>                 .master("local")
>                 .config("ignite.disableSparkSQLOptimization", true)
>                 .igniteConfig(configPath)
>                 .getOrCreate();
>         JavaSparkContext sparkCtx = new 
> JavaSparkContext(igniteSession.sparkContext());
>         final JavaRDD<Row> records = sparkCtx.parallelize(Arrays.asList(
>                 new GenericRow()
>         ));
>         JavaPairRDD<Integer, Integer> rdd_records = records.mapToPair(new 
> PairFunction<Row, Integer, Integer>() {
>             @Override
>             public Tuple2<Integer, Integer> call(Row row) throws Exception {
>                 throw new IllegalStateException("some error");
>             }
>         });
>         JavaIgniteContext<Integer, Integer> igniteContext = new 
> JavaIgniteContext<>(sparkCtx, configPath);
>         JavaIgniteRDD<Integer, Integer> igniteRdd = igniteContext.<Integer, 
> Integer>fromCache("Person");
>         igniteRdd.savePairs(rdd_records);
>         igniteContext.close(true);
>     }
> }
> Looks like next internal code (saveValues method)should also close the 
> IgniteContext in case of an unexpected exception, not only data streamer:
> try {
>      it.foreach(value ⇒
> {          val key = affinityKeyFunc(value, node.orNull)           
> streamer.addData(key, value)        }
> )
>      }
>      finally
> {         streamer.close()     }
> })
>  }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to