Author: xuefu
Date: Thu Dec 18 18:58:11 2014
New Revision: 1646511

URL: http://svn.apache.org/r1646511
Log:
HIVE-9094: TimeoutException when trying get executor count from RSC [Spark 
Branch] (Chengxiang via Xuefu)

Modified:
    
hive/branches/spark/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
    
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
    
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
    
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java

Modified: 
hive/branches/spark/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: 
http://svn.apache.org/viewvc/hive/branches/spark/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1646511&r1=1646510&r2=1646511&view=diff
==============================================================================
--- 
hive/branches/spark/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java 
(original)
+++ 
hive/branches/spark/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java 
Thu Dec 18 18:58:11 2014
@@ -1964,7 +1964,12 @@ public class HiveConf extends Configurat
     TEZ_EXEC_INPLACE_PROGRESS(
         "hive.tez.exec.inplace.progress",
         true,
-        "Updates tez job execution progress in-place in the terminal.")
+        "Updates tez job execution progress in-place in the terminal."),
+    SPARK_CLIENT_FUTURE_TIMEOUT(
+        "hive.spark.client.future.timeout",
+        "60s",
+        new TimeValidator(TimeUnit.SECONDS),
+        "remote spark client JobHandle future timeout value in seconds.")
     ;
 
     public final String varname;

Modified: 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
URL: 
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java?rev=1646511&r1=1646510&r2=1646511&view=diff
==============================================================================
--- 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
 (original)
+++ 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
 Thu Dec 18 18:58:11 2014
@@ -28,7 +28,7 @@ import java.util.Properties;
 import org.apache.commons.compress.utils.CharsetNames;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.spark.SparkConf;
 import org.apache.spark.SparkException;
 
@@ -39,22 +39,22 @@ public class HiveSparkClientFactory {
   private static final String SPARK_DEFAULT_MASTER = "local";
   private static final String SPARK_DEFAULT_APP_NAME = "Hive on Spark";
 
-  public static HiveSparkClient createHiveSparkClient(Configuration 
configuration)
+  public static HiveSparkClient createHiveSparkClient(HiveConf hiveconf)
     throws IOException, SparkException {
 
-    Map<String, String> conf = initiateSparkConf(configuration);
+    Map<String, String> sparkConf = initiateSparkConf(hiveconf);
     // Submit spark job through local spark context while spark master is 
local mode, otherwise submit
     // spark job through remote spark context.
-    String master = conf.get("spark.master");
+    String master = sparkConf.get("spark.master");
     if (master.equals("local") || master.startsWith("local[")) {
       // With local spark context, all user sessions share the same spark 
context.
-      return LocalHiveSparkClient.getInstance(generateSparkConf(conf));
+      return LocalHiveSparkClient.getInstance(generateSparkConf(sparkConf));
     } else {
-      return new RemoteHiveSparkClient(conf);
+      return new RemoteHiveSparkClient(hiveconf, sparkConf);
     }
   }
 
-  public static Map<String, String> initiateSparkConf(Configuration hiveConf) {
+  public static Map<String, String> initiateSparkConf(HiveConf hiveConf) {
     Map<String, String> sparkConf = new HashMap<String, String>();
 
     // set default spark configurations.

Modified: 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
URL: 
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java?rev=1646511&r1=1646510&r2=1646511&view=diff
==============================================================================
--- 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
 (original)
+++ 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
 Thu Dec 18 18:58:11 2014
@@ -71,12 +71,13 @@ public class RemoteHiveSparkClient imple
 
   private transient SparkClient remoteClient;
   private transient SparkConf sparkConf;
+  private transient HiveConf hiveConf;
 
   private transient List<String> localJars = new ArrayList<String>();
-
   private transient List<String> localFiles = new ArrayList<String>();
 
-  RemoteHiveSparkClient(Map<String, String> conf) throws IOException, 
SparkException {
+  RemoteHiveSparkClient(HiveConf hiveConf, Map<String, String> conf) throws 
IOException, SparkException {
+    this.hiveConf = hiveConf;
     sparkConf = HiveSparkClientFactory.generateSparkConf(conf);
     remoteClient = SparkClientFactory.createClient(conf);
   }
@@ -88,8 +89,9 @@ public class RemoteHiveSparkClient imple
 
   @Override
   public int getExecutorCount() throws Exception {
+    long timeout = 
hiveConf.getTimeVar(HiveConf.ConfVars.SPARK_CLIENT_FUTURE_TIMEOUT, 
TimeUnit.SECONDS);
     Future<Integer> handler = remoteClient.getExecutorCount();
-    return handler.get(5, TimeUnit.SECONDS).intValue();
+    return handler.get(timeout, TimeUnit.SECONDS).intValue();
   }
 
   @Override
@@ -108,9 +110,11 @@ public class RemoteHiveSparkClient imple
     byte[] scratchDirBytes = KryoSerializer.serialize(emptyScratchDir);
     byte[] sparkWorkBytes = KryoSerializer.serialize(sparkWork);
 
+    long timeout = 
hiveConf.getTimeVar(HiveConf.ConfVars.SPARK_CLIENT_FUTURE_TIMEOUT, 
TimeUnit.SECONDS);
+
     JobHandle<Serializable> jobHandle = remoteClient.submit(
         new JobStatusJob(jobConfBytes, scratchDirBytes, sparkWorkBytes));
-    return new SparkJobRef(jobHandle.getClientJobId(), new 
RemoteSparkJobStatus(remoteClient, jobHandle));
+    return new SparkJobRef(jobHandle.getClientJobId(), new 
RemoteSparkJobStatus(remoteClient, jobHandle, timeout));
   }
 
   private void refreshLocalResources(SparkWork sparkWork, HiveConf conf) {

Modified: 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java
URL: 
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java?rev=1646511&r1=1646510&r2=1646511&view=diff
==============================================================================
--- 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java
 (original)
+++ 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java
 Thu Dec 18 18:58:11 2014
@@ -56,10 +56,12 @@ public class RemoteSparkJobStatus implem
   private final long startTime;
   private final SparkClient sparkClient;
   private final JobHandle<Serializable> jobHandle;
+  private final transient long sparkClientTimeoutInSeconds;
 
-  public RemoteSparkJobStatus(SparkClient sparkClient, JobHandle<Serializable> 
jobHandle) {
+  public RemoteSparkJobStatus(SparkClient sparkClient, JobHandle<Serializable> 
jobHandle, long timeoutInSeconds) {
     this.sparkClient = sparkClient;
     this.jobHandle = jobHandle;
+    this.sparkClientTimeoutInSeconds = timeoutInSeconds;
     startTime = System.nanoTime();
   }
 
@@ -146,7 +148,7 @@ public class RemoteSparkJobStatus implem
     JobHandle<SparkJobInfo> getJobInfo = sparkClient.submit(
         new GetJobInfoJob(jobHandle.getClientJobId(), sparkJobId));
     try {
-      return getJobInfo.get();
+      return getJobInfo.get(sparkClientTimeoutInSeconds, TimeUnit.SECONDS);
     } catch (Throwable t) {
       LOG.warn("Error getting job info", t);
       return null;
@@ -156,7 +158,7 @@ public class RemoteSparkJobStatus implem
   private SparkStageInfo getSparkStageInfo(int stageId) {
     JobHandle<SparkStageInfo> getStageInfo = sparkClient.submit(new 
GetStageInfoJob(stageId));
     try {
-      return getStageInfo.get();
+      return getStageInfo.get(sparkClientTimeoutInSeconds, TimeUnit.SECONDS);
     } catch (Throwable t) {
       LOG.warn("Error getting stage info", t);
       return null;


Reply via email to