Hi All,

 

I face a very weird case. I have already simplify the scenario to the most
so everyone can replay the scenario. 

 

My env:

 

AWS EMR 4.1.0, Spark1.5

 

My code can run without any problem when I run it in a local mode, and it
has no problem when it run on a EMR cluster with one master and one task
node. 

 

But when I try to run a multiple node (more than 1 task node, which means 3
nodes cluster), the tasks will never return from one of it. 

 

The log as below:

 

15/11/19 21:19:07 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID
0, ip-10-165-121-188.ec2.internal, PROCESS_LOCAL, 2241 bytes)

15/11/19 21:19:07 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID
1, ip-10-155-160-147.ec2.internal, PROCESS_LOCAL, 2241 bytes)

15/11/19 21:19:07 INFO TaskSetManager: Starting task 2.0 in stage 0.0 (TID
2, ip-10-165-121-188.ec2.internal, PROCESS_LOCAL, 2241 bytes)

15/11/19 21:19:07 INFO TaskSetManager: Starting task 3.0 in stage 0.0 (TID
3, ip-10-155-160-147.ec2.internal, PROCESS_LOCAL, 2241 bytes)

 

So you can see the task will alternatively submitted to two instances, one
is ip-10-165-121-188 and another is ip-10-155-160-147.

And later only the tasks runs on the ip-10-165-121-188.ec2 will finish will
always just wait there, ip-10-155-160-147.ec2 never return.

 

The data and code has been tested in local mode, single spark cluster mode,
so it should not be an issue on logic or data.

 

And I have attached my test case here (I believe it is simple enough and no
any business logic is involved):

 

       public void createSiteGridExposure2() {

              JavaSparkContext ctx = this.createSparkContextTest("Test");

              ctx.textFile(siteEncodeLocation).flatMapToPair(new
PairFlatMapFunction<String, String, String>() {

                     @Override

                     public Iterable<Tuple2<String, String>> call(String
line) throws Exception {

                           List<Tuple2<String, String>> res = new
ArrayList<Tuple2<String, String>>();

                           return res;

                     }

              }).collectAsMap();

              ctx.stop();

       }

 

protected JavaSparkContext createSparkContextTest(String appName) {

              SparkConf sparkConf = new SparkConf().setAppName(appName);

 

              JavaSparkContext ctx = new JavaSparkContext(sparkConf);

              Configuration hadoopConf = ctx.hadoopConfiguration();

              if (awsAccessKeyId != null) {

 

                     hadoopConf.set("fs.s3.impl",
"org.apache.hadoop.fs.s3native.NativeS3FileSystem");

                     hadoopConf.set("fs.s3.awsAccessKeyId", awsAccessKeyId);

                     hadoopConf.set("fs.s3.awsSecretAccessKey",
awsSecretAccessKey);

 

                     hadoopConf.set("fs.s3n.impl",
"org.apache.hadoop.fs.s3native.NativeS3FileSystem");

                     hadoopConf.set("fs.s3n.awsAccessKeyId",
awsAccessKeyId);

                     hadoopConf.set("fs.s3n.awsSecretAccessKey",
awsSecretAccessKey);

              }

              return ctx;

       }

 

 

Anyone has any idea why this happened? I am a bit lost because the code
works in local mode and 2 node (1 master 1 task) clusters, but when it move
a multiple task nodes cluster, I have this issue. No error no exception, not
even timeout (because I wait more than 1 hours and there is no timeout
also).

 

Regards,

 

Shuai

Reply via email to