Author: xuefu
Date: Thu Dec 18 18:58:11 2014
New Revision: 1646511
URL: http://svn.apache.org/r1646511
Log:
HIVE-9094: TimeoutException when trying get executor count from RSC [Spark
Branch] (Chengxiang via Xuefu)
Modified:
hive/branches/spark/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java
Modified:
hive/branches/spark/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL:
http://svn.apache.org/viewvc/hive/branches/spark/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1646511&r1=1646510&r2=1646511&view=diff
==============================================================================
---
hive/branches/spark/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
(original)
+++
hive/branches/spark/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
Thu Dec 18 18:58:11 2014
@@ -1964,7 +1964,12 @@ public class HiveConf extends Configurat
TEZ_EXEC_INPLACE_PROGRESS(
"hive.tez.exec.inplace.progress",
true,
- "Updates tez job execution progress in-place in the terminal.")
+ "Updates tez job execution progress in-place in the terminal."),
+ SPARK_CLIENT_FUTURE_TIMEOUT(
+ "hive.spark.client.future.timeout",
+ "60s",
+ new TimeValidator(TimeUnit.SECONDS),
+ "remote spark client JobHandle future timeout value in seconds.")
;
public final String varname;
Modified:
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
URL:
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java?rev=1646511&r1=1646510&r2=1646511&view=diff
==============================================================================
---
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
(original)
+++
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
Thu Dec 18 18:58:11 2014
@@ -28,7 +28,7 @@ import java.util.Properties;
import org.apache.commons.compress.utils.CharsetNames;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkException;
@@ -39,22 +39,22 @@ public class HiveSparkClientFactory {
private static final String SPARK_DEFAULT_MASTER = "local";
private static final String SPARK_DEFAULT_APP_NAME = "Hive on Spark";
- public static HiveSparkClient createHiveSparkClient(Configuration
configuration)
+ public static HiveSparkClient createHiveSparkClient(HiveConf hiveconf)
throws IOException, SparkException {
- Map<String, String> conf = initiateSparkConf(configuration);
+ Map<String, String> sparkConf = initiateSparkConf(hiveconf);
// Submit spark job through local spark context while spark master is
local mode, otherwise submit
// spark job through remote spark context.
- String master = conf.get("spark.master");
+ String master = sparkConf.get("spark.master");
if (master.equals("local") || master.startsWith("local[")) {
// With local spark context, all user sessions share the same spark
context.
- return LocalHiveSparkClient.getInstance(generateSparkConf(conf));
+ return LocalHiveSparkClient.getInstance(generateSparkConf(sparkConf));
} else {
- return new RemoteHiveSparkClient(conf);
+ return new RemoteHiveSparkClient(hiveconf, sparkConf);
}
}
- public static Map<String, String> initiateSparkConf(Configuration hiveConf) {
+ public static Map<String, String> initiateSparkConf(HiveConf hiveConf) {
Map<String, String> sparkConf = new HashMap<String, String>();
// set default spark configurations.
Modified:
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
URL:
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java?rev=1646511&r1=1646510&r2=1646511&view=diff
==============================================================================
---
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
(original)
+++
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
Thu Dec 18 18:58:11 2014
@@ -71,12 +71,13 @@ public class RemoteHiveSparkClient imple
private transient SparkClient remoteClient;
private transient SparkConf sparkConf;
+ private transient HiveConf hiveConf;
private transient List<String> localJars = new ArrayList<String>();
-
private transient List<String> localFiles = new ArrayList<String>();
- RemoteHiveSparkClient(Map<String, String> conf) throws IOException,
SparkException {
+ RemoteHiveSparkClient(HiveConf hiveConf, Map<String, String> conf) throws
IOException, SparkException {
+ this.hiveConf = hiveConf;
sparkConf = HiveSparkClientFactory.generateSparkConf(conf);
remoteClient = SparkClientFactory.createClient(conf);
}
@@ -88,8 +89,9 @@ public class RemoteHiveSparkClient imple
@Override
public int getExecutorCount() throws Exception {
+ long timeout =
hiveConf.getTimeVar(HiveConf.ConfVars.SPARK_CLIENT_FUTURE_TIMEOUT,
TimeUnit.SECONDS);
Future<Integer> handler = remoteClient.getExecutorCount();
- return handler.get(5, TimeUnit.SECONDS).intValue();
+ return handler.get(timeout, TimeUnit.SECONDS).intValue();
}
@Override
@@ -108,9 +110,11 @@ public class RemoteHiveSparkClient imple
byte[] scratchDirBytes = KryoSerializer.serialize(emptyScratchDir);
byte[] sparkWorkBytes = KryoSerializer.serialize(sparkWork);
+ long timeout =
hiveConf.getTimeVar(HiveConf.ConfVars.SPARK_CLIENT_FUTURE_TIMEOUT,
TimeUnit.SECONDS);
+
JobHandle<Serializable> jobHandle = remoteClient.submit(
new JobStatusJob(jobConfBytes, scratchDirBytes, sparkWorkBytes));
- return new SparkJobRef(jobHandle.getClientJobId(), new
RemoteSparkJobStatus(remoteClient, jobHandle));
+ return new SparkJobRef(jobHandle.getClientJobId(), new
RemoteSparkJobStatus(remoteClient, jobHandle, timeout));
}
private void refreshLocalResources(SparkWork sparkWork, HiveConf conf) {
Modified:
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java
URL:
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java?rev=1646511&r1=1646510&r2=1646511&view=diff
==============================================================================
---
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java
(original)
+++
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java
Thu Dec 18 18:58:11 2014
@@ -56,10 +56,12 @@ public class RemoteSparkJobStatus implem
private final long startTime;
private final SparkClient sparkClient;
private final JobHandle<Serializable> jobHandle;
+ private final transient long sparkClientTimeoutInSeconds;
- public RemoteSparkJobStatus(SparkClient sparkClient, JobHandle<Serializable>
jobHandle) {
+ public RemoteSparkJobStatus(SparkClient sparkClient, JobHandle<Serializable>
jobHandle, long timeoutInSeconds) {
this.sparkClient = sparkClient;
this.jobHandle = jobHandle;
+ this.sparkClientTimeoutInSeconds = timeoutInSeconds;
startTime = System.nanoTime();
}
@@ -146,7 +148,7 @@ public class RemoteSparkJobStatus implem
JobHandle<SparkJobInfo> getJobInfo = sparkClient.submit(
new GetJobInfoJob(jobHandle.getClientJobId(), sparkJobId));
try {
- return getJobInfo.get();
+ return getJobInfo.get(sparkClientTimeoutInSeconds, TimeUnit.SECONDS);
} catch (Throwable t) {
LOG.warn("Error getting job info", t);
return null;
@@ -156,7 +158,7 @@ public class RemoteSparkJobStatus implem
private SparkStageInfo getSparkStageInfo(int stageId) {
JobHandle<SparkStageInfo> getStageInfo = sparkClient.submit(new
GetStageInfoJob(stageId));
try {
- return getStageInfo.get();
+ return getStageInfo.get(sparkClientTimeoutInSeconds, TimeUnit.SECONDS);
} catch (Throwable t) {
LOG.warn("Error getting stage info", t);
return null;