Hi All,
I face a very weird case. I have already simplify the scenario to the most so everyone can replay the scenario. My env: AWS EMR 4.1.0, Spark1.5 My code can run without any problem when I run it in a local mode, and it has no problem when it run on a EMR cluster with one master and one task node. But when I try to run a multiple node (more than 1 task node, which means 3 nodes cluster), the tasks will never return from one of it. The log as below: 15/11/19 21:19:07 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, ip-10-165-121-188.ec2.internal, PROCESS_LOCAL, 2241 bytes) 15/11/19 21:19:07 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, ip-10-155-160-147.ec2.internal, PROCESS_LOCAL, 2241 bytes) 15/11/19 21:19:07 INFO TaskSetManager: Starting task 2.0 in stage 0.0 (TID 2, ip-10-165-121-188.ec2.internal, PROCESS_LOCAL, 2241 bytes) 15/11/19 21:19:07 INFO TaskSetManager: Starting task 3.0 in stage 0.0 (TID 3, ip-10-155-160-147.ec2.internal, PROCESS_LOCAL, 2241 bytes) So you can see the task will alternatively submitted to two instances, one is ip-10-165-121-188 and another is ip-10-155-160-147. And later only the tasks runs on the ip-10-165-121-188.ec2 will finish will always just wait there, ip-10-155-160-147.ec2 never return. The data and code has been tested in local mode, single spark cluster mode, so it should not be an issue on logic or data. And I have attached my test case here (I believe it is simple enough and no any business logic is involved): public void createSiteGridExposure2() { JavaSparkContext ctx = this.createSparkContextTest("Test"); ctx.textFile(siteEncodeLocation).flatMapToPair(new PairFlatMapFunction<String, String, String>() { @Override public Iterable<Tuple2<String, String>> call(String line) throws Exception { List<Tuple2<String, String>> res = new ArrayList<Tuple2<String, String>>(); return res; } }).collectAsMap(); ctx.stop(); } protected JavaSparkContext createSparkContextTest(String appName) { SparkConf sparkConf = new SparkConf().setAppName(appName); JavaSparkContext ctx = new JavaSparkContext(sparkConf); Configuration hadoopConf = ctx.hadoopConfiguration(); if (awsAccessKeyId != null) { hadoopConf.set("fs.s3.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem"); hadoopConf.set("fs.s3.awsAccessKeyId", awsAccessKeyId); hadoopConf.set("fs.s3.awsSecretAccessKey", awsSecretAccessKey); hadoopConf.set("fs.s3n.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem"); hadoopConf.set("fs.s3n.awsAccessKeyId", awsAccessKeyId); hadoopConf.set("fs.s3n.awsSecretAccessKey", awsSecretAccessKey); } return ctx; } Anyone has any idea why this happened? I am a bit lost because the code works in local mode and 2 node (1 master 1 task) clusters, but when it move a multiple task nodes cluster, I have this issue. No error no exception, not even timeout (because I wait more than 1 hours and there is no timeout also). Regards, Shuai