Repository: hive Updated Branches: refs/heads/master 77c145043 -> aae62d871
HIVE-18766: Race condition during shutdown of RemoteDriver, error messages aren't always sent (Aihua Xu, reviewed by Sahil Takiar) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/aae62d87 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/aae62d87 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/aae62d87 Branch: refs/heads/master Commit: aae62d871bd3bf61281b03e2ef183b214e610cd5 Parents: 77c1450 Author: Aihua Xu <[email protected]> Authored: Tue Jun 5 13:16:54 2018 -0700 Committer: Aihua Xu <[email protected]> Committed: Wed Jun 6 14:14:39 2018 -0700 ---------------------------------------------------------------------- .../org/apache/hadoop/hive/conf/HiveConf.java | 2 +- .../apache/hive/spark/client/RemoteDriver.java | 44 +++++++++++++------- .../hive/spark/client/rpc/RpcConfiguration.java | 8 ++++ 3 files changed, 37 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/aae62d87/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index cd425aa..dd42fd1 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -4206,7 +4206,7 @@ public class HiveConf extends Configuration { "in shuffle. This should result in less shuffled data."), SPARK_CLIENT_FUTURE_TIMEOUT("hive.spark.client.future.timeout", "60s", new TimeValidator(TimeUnit.SECONDS), - "Timeout for requests from Hive client to remote Spark driver."), + "Timeout for requests between Hive client and remote Spark driver."), SPARK_JOB_MONITOR_TIMEOUT("hive.spark.job.monitor.timeout", "60s", new TimeValidator(TimeUnit.SECONDS), "Timeout for job monitor to get Spark job state."), http://git-wip-us.apache.org/repos/asf/hive/blob/aae62d87/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java ---------------------------------------------------------------------- diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java b/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java index caa850c..8130860 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java @@ -35,6 +35,8 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.io.FileUtils; @@ -92,6 +94,8 @@ public class RemoteDriver { public static final String REMOTE_DRIVER_PORT_CONF = "--remote-port"; public static final String REMOTE_DRIVER_CONF = "--remote-driver-conf"; + private final long futureTimeout; // Rpc call timeout in milliseconds + private RemoteDriver(String[] args) throws Exception { this.activeJobs = Maps.newConcurrentMap(); this.jcLock = new Object(); @@ -135,7 +139,9 @@ public class RemoteDriver { String secret = mapConf.get(SparkClientFactory.CONF_KEY_SECRET); Preconditions.checkArgument(secret != null, "No secret provided."); - int threadCount = new RpcConfiguration(mapConf).getRpcThreadCount(); + RpcConfiguration rpcConf = new RpcConfiguration(mapConf); + futureTimeout = rpcConf.getFutureTimeoutMs(); + int threadCount = rpcConf.getRpcThreadCount(); this.egroup = new NioEventLoopGroup( threadCount, new ThreadFactoryBuilder() @@ -232,13 +238,19 @@ public class RemoteDriver { for (JobWrapper<?> job : activeJobs.values()) { cancelJob(job); } + if (error != null) { - protocol.sendError(error); + try { + protocol.sendError(error).get(futureTimeout, TimeUnit.MILLISECONDS); + } catch(InterruptedException|ExecutionException|TimeoutException e) { + LOG.warn("Failed to send out the error during RemoteDriver shutdown", e); + } } if (jc != null) { jc.stop(); } clientRpc.close(); + egroup.shutdownGracefully(); synchronized (shutdownLock) { shutdownLock.notifyAll(); @@ -265,34 +277,35 @@ public class RemoteDriver { private class DriverProtocol extends BaseProtocol { - void sendError(Throwable error) { + Future<Void> sendError(Throwable error) { LOG.debug("Send error to Client: {}", Throwables.getStackTraceAsString(error)); - clientRpc.call(new Error(Throwables.getStackTraceAsString(error))); + return clientRpc.call(new Error(Throwables.getStackTraceAsString(error))); } - void sendErrorMessage(String cause) { + Future<Void> sendErrorMessage(String cause) { LOG.debug("Send error to Client: {}", cause); - clientRpc.call(new Error(cause)); + return clientRpc.call(new Error(cause)); } - <T extends Serializable> void jobFinished(String jobId, T result, + <T extends Serializable> + Future<Void> jobFinished(String jobId, T result, Throwable error, SparkCounters counters) { LOG.debug("Send job({}) result to Client.", jobId); - clientRpc.call(new JobResult(jobId, result, error, counters)); + return clientRpc.call(new JobResult<T>(jobId, result, error, counters)); } - void jobStarted(String jobId) { - clientRpc.call(new JobStarted(jobId)); + Future<Void> jobStarted(String jobId) { + return clientRpc.call(new JobStarted(jobId)); } - void jobSubmitted(String jobId, int sparkJobId) { + Future<Void> jobSubmitted(String jobId, int sparkJobId) { LOG.debug("Send job({}/{}) submitted to Client.", jobId, sparkJobId); - clientRpc.call(new JobSubmitted(jobId, sparkJobId)); + return clientRpc.call(new JobSubmitted(jobId, sparkJobId)); } - void sendMetrics(String jobId, int sparkJobId, int stageId, long taskId, Metrics metrics) { + Future<Void> sendMetrics(String jobId, int sparkJobId, int stageId, long taskId, Metrics metrics) { LOG.debug("Send task({}/{}/{}/{}) metric to Client.", jobId, sparkJobId, stageId, taskId); - clientRpc.call(new JobMetrics(jobId, sparkJobId, stageId, taskId, metrics)); + return clientRpc.call(new JobMetrics(jobId, sparkJobId, stageId, taskId, metrics)); } private void handle(ChannelHandlerContext ctx, CancelJob msg) { @@ -550,8 +563,7 @@ public class RemoteDriver { // If the main thread throws an exception for some reason, propagate the exception to the // client and initiate a safe shutdown if (rd.running) { - rd.protocol.sendError(e); - rd.shutdown(null); + rd.shutdown(e); } throw e; } http://git-wip-us.apache.org/repos/asf/hive/blob/aae62d87/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java ---------------------------------------------------------------------- diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java index 090c628..bd3a7a7 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java @@ -46,6 +46,7 @@ public final class RpcConfiguration { private static final Logger LOG = LoggerFactory.getLogger(RpcConfiguration.class); public static final ImmutableSet<String> HIVE_SPARK_RSC_CONFIGS = ImmutableSet.of( + HiveConf.ConfVars.SPARK_CLIENT_FUTURE_TIMEOUT.varname, HiveConf.ConfVars.SPARK_RPC_CLIENT_CONNECT_TIMEOUT.varname, HiveConf.ConfVars.SPARK_RPC_CLIENT_HANDSHAKE_TIMEOUT.varname, HiveConf.ConfVars.SPARK_RPC_CHANNEL_LOG_LEVEL.varname, @@ -55,6 +56,7 @@ public final class RpcConfiguration { HiveConf.ConfVars.SPARK_RPC_SERVER_ADDRESS.varname ); public static final ImmutableSet<String> HIVE_SPARK_TIME_CONFIGS = ImmutableSet.of( + HiveConf.ConfVars.SPARK_CLIENT_FUTURE_TIMEOUT.varname, HiveConf.ConfVars.SPARK_RPC_CLIENT_CONNECT_TIMEOUT.varname, HiveConf.ConfVars.SPARK_RPC_CLIENT_HANDSHAKE_TIMEOUT.varname ); @@ -71,6 +73,12 @@ public final class RpcConfiguration { this.config = Collections.unmodifiableMap(config); } + public long getFutureTimeoutMs() { + String value = config.get(HiveConf.ConfVars.SPARK_CLIENT_FUTURE_TIMEOUT.varname); + return value != null ? Long.parseLong(value) : DEFAULT_CONF.getTimeVar( + HiveConf.ConfVars.SPARK_CLIENT_FUTURE_TIMEOUT, TimeUnit.MILLISECONDS); + } + long getConnectTimeoutMs() { String value = config.get(HiveConf.ConfVars.SPARK_RPC_CLIENT_CONNECT_TIMEOUT.varname); return value != null ? Long.parseLong(value) : DEFAULT_CONF.getTimeVar(
