Repository: hive Updated Branches: refs/heads/master 88da0e8b7 -> e19b861cf
HIVE-18916: SparkClientImpl doesn't error out if spark-submit fails (Sahil Takiar, reviewed by Aihua Xu) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/e19b861c Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/e19b861c Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/e19b861c Branch: refs/heads/master Commit: e19b861cfbcb15166f9255f8b375ff5d8056b417 Parents: 88da0e8 Author: Sahil Takiar <takiar.sa...@gmail.com> Authored: Mon Jul 2 11:30:14 2018 -0700 Committer: Sahil Takiar <stak...@cloudera.com> Committed: Mon Jul 2 11:30:14 2018 -0700 ---------------------------------------------------------------------- .../test/resources/testconfiguration.properties | 4 +- .../apache/hadoop/hive/ql/QOutProcessor.java | 2 + .../hadoop/hive/ql/exec/spark/SparkTask.java | 15 ++-- .../ql/exec/spark/session/SparkSessionImpl.java | 10 ++- .../spark/status/RemoteSparkJobMonitor.java | 1 - .../exec/spark/TestSparkInvalidFileFormat.java | 81 ++++++++++++++++++++ .../spark_submit_negative_executor_cores.q | 5 ++ .../spark_submit_negative_executor_memory.q | 5 ++ .../spark_submit_negative_executor_cores.q.out | 5 ++ .../spark_submit_negative_executor_memory.q.out | 5 ++ .../hive/spark/client/AbstractSparkClient.java | 25 +++--- .../spark/client/SparkSubmitSparkClient.java | 22 +++--- .../apache/hive/spark/client/rpc/RpcServer.java | 21 +++-- 13 files changed, 159 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/e19b861c/itests/src/test/resources/testconfiguration.properties ---------------------------------------------------------------------- diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties index d415b7d..385b71e 100644 --- a/itests/src/test/resources/testconfiguration.properties +++ b/itests/src/test/resources/testconfiguration.properties @@ -1684,7 +1684,9 @@ spark.query.negative.files=groupby2_map_skew_multi_distinct.q,\ spark.only.query.negative.files=spark_job_max_tasks.q,\ spark_stage_max_tasks.q,\ - spark_task_failure.q + spark_task_failure.q,\ + spark_submit_negative_executor_cores.q,\ + spark_submit_negative_executor_memory.q spark.perf.disabled.query.files=query14.q,\ query64.q http://git-wip-us.apache.org/repos/asf/hive/blob/e19b861c/itests/util/src/main/java/org/apache/hadoop/hive/ql/QOutProcessor.java ---------------------------------------------------------------------- diff --git a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QOutProcessor.java b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QOutProcessor.java index 359f027..1e4cddd 100644 --- a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QOutProcessor.java +++ b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QOutProcessor.java @@ -273,6 +273,8 @@ public class QOutProcessor { ppm.add(new PatternReplacementPair(Pattern.compile("attempt_[0-9_]+"), "attempt_#ID#")); ppm.add(new PatternReplacementPair(Pattern.compile("vertex_[0-9_]+"), "vertex_#ID#")); ppm.add(new PatternReplacementPair(Pattern.compile("task_[0-9_]+"), "task_#ID#")); + ppm.add(new PatternReplacementPair(Pattern.compile("for Spark session.*?:"), + "#SPARK_SESSION_ID#:")); partialPlanMask = ppm.toArray(new PatternReplacementPair[ppm.size()]); } /* This list may be modified by specific cli drivers to mask strings that change on every test */ http://git-wip-us.apache.org/repos/asf/hive/blob/e19b861c/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java index 02613f2..ad5049a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java @@ -180,7 +180,7 @@ public class SparkTask extends Task<SparkWork> { killJob(); } else if (rc == 4) { LOG.info("The Spark job or one stage of it has too many tasks" + - ". Cancelling Spark job " + sparkJobID + " with application ID " + jobID ); + ". Cancelling Spark job " + sparkJobID + " with application ID " + jobID); killJob(); } @@ -189,12 +189,7 @@ public class SparkTask extends Task<SparkWork> { } sparkJobStatus.cleanup(); } catch (Exception e) { - String msg = "Failed to execute Spark task " + getId() + ", with exception '" + Utilities.getNameMessage(e) + "'"; - - // Has to use full name to make sure it does not conflict with - // org.apache.commons.lang.StringUtils - console.printError(msg, "\n" + org.apache.hadoop.util.StringUtils.stringifyException(e)); - LOG.error(msg, e); + LOG.error("Failed to execute Spark task \"" + getId() + "\"", e); setException(e); if (e instanceof HiveException) { HiveException he = (HiveException) e; @@ -609,7 +604,7 @@ public class SparkTask extends Task<SparkWork> { private boolean isTaskFailure(Throwable error) { Pattern taskFailedPattern = Pattern.compile("Task.*in stage.*failed.*times"); while (error != null) { - if (taskFailedPattern.matcher(error.getMessage()).find()) { + if (error.getMessage() != null && taskFailedPattern.matcher(error.getMessage()).find()) { return true; } error = error.getCause(); @@ -621,8 +616,8 @@ public class SparkTask extends Task<SparkWork> { while (error != null) { if (error instanceof OutOfMemoryError) { return true; - } else if (error.getMessage().contains("Container killed by YARN for exceeding memory " + - "limits")) { + } else if (error.getMessage() != null && error.getMessage().contains("Container killed by " + + "YARN for exceeding memory limits")) { return true; } error = error.getCause(); http://git-wip-us.apache.org/repos/asf/hive/blob/e19b861c/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java index c8cb1ac..6e37d93 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java @@ -214,13 +214,19 @@ public class SparkSessionImpl implements SparkSession { return new HiveException(e, ErrorMsg.SPARK_CREATE_CLIENT_INVALID_RESOURCE_REQUEST, sessionId, matchedString.toString()); } else { - return new HiveException(e, ErrorMsg.SPARK_CREATE_CLIENT_ERROR, sessionId, Throwables.getRootCause(e).getMessage()); + return new HiveException(e, ErrorMsg.SPARK_CREATE_CLIENT_ERROR, sessionId, + getRootCause(oe)); } } e = e.getCause(); } - return new HiveException(oe, ErrorMsg.SPARK_CREATE_CLIENT_ERROR, sessionId, Throwables.getRootCause(oe).getMessage()); + return new HiveException(oe, ErrorMsg.SPARK_CREATE_CLIENT_ERROR, sessionId, getRootCause(oe)); + } + + private String getRootCause(Throwable e) { + Throwable rootCause = Throwables.getRootCause(e); + return rootCause.getClass().getName() + ": " + rootCause.getMessage(); } private boolean matches(String input, String regex, StringBuilder matchedString) { http://git-wip-us.apache.org/repos/asf/hive/blob/e19b861c/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java index 004b50b..560fb58 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java @@ -76,7 +76,6 @@ public class RemoteSparkJobMonitor extends SparkJobMonitor { if ((timeCount > monitorTimeoutInterval)) { HiveException he = new HiveException(ErrorMsg.SPARK_JOB_MONITOR_TIMEOUT, Long.toString(timeCount)); - console.printError(he.getMessage()); sparkJobStatus.setMonitorError(he); running = false; done = true; http://git-wip-us.apache.org/repos/asf/hive/blob/e19b861c/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkInvalidFileFormat.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkInvalidFileFormat.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkInvalidFileFormat.java new file mode 100644 index 0000000..bcc0924 --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkInvalidFileFormat.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.spark; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.DriverFactory; +import org.apache.hadoop.hive.ql.IDriver; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory; +import org.apache.hadoop.hive.ql.session.SessionState; + +import org.junit.Assert; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; + +public class TestSparkInvalidFileFormat { + + @Test + public void readTextFileAsParquet() throws IOException { + HiveConf conf = new HiveConf(); + conf.setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER, + SQLStdHiveAuthorizerFactory.class.getName()); + conf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, false); + conf.setVar(HiveConf.ConfVars.HIVE_EXECUTION_ENGINE, "spark"); + conf.set("spark.master", "local"); + + FileSystem fs = FileSystem.getLocal(conf); + Path tmpDir = new Path("TestSparkInvalidFileFormat-tmp"); + + File testFile = new File(conf.get("test.data.files"), "kv1.txt"); + + SessionState.start(conf); + + IDriver driver = null; + + try { + driver = DriverFactory.newDriver(conf); + Assert.assertEquals(0, + driver.run("CREATE TABLE test_table (key STRING, value STRING)").getResponseCode()); + Assert.assertEquals(0, driver.run( + "LOAD DATA LOCAL INPATH '" + testFile + "' INTO TABLE test_table").getResponseCode()); + Assert.assertEquals(0, + driver.run("ALTER TABLE test_table SET FILEFORMAT parquet").getResponseCode()); + Throwable exception = driver.run( + "SELECT * FROM test_table ORDER BY key LIMIT 10").getException(); + Assert.assertTrue(exception instanceof HiveException); + Assert.assertTrue(exception.getMessage().contains("Spark job failed due to task failures")); + Assert.assertTrue(exception.getMessage().contains("kv1.txt is not a Parquet file. expected " + + "magic number at tail [80, 65, 82, 49] but found [95, 57, 55, 10]")); + } finally { + if (driver != null) { + Assert.assertEquals(0, driver.run("DROP TABLE IF EXISTS test_table").getResponseCode()); + driver.destroy(); + } + if (fs.exists(tmpDir)) { + fs.delete(tmpDir, true); + } + } + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/e19b861c/ql/src/test/queries/clientnegative/spark_submit_negative_executor_cores.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientnegative/spark_submit_negative_executor_cores.q b/ql/src/test/queries/clientnegative/spark_submit_negative_executor_cores.q new file mode 100644 index 0000000..5a92390 --- /dev/null +++ b/ql/src/test/queries/clientnegative/spark_submit_negative_executor_cores.q @@ -0,0 +1,5 @@ +--! qt:dataset:src + +set spark.executor.cores=-1; + +select * from src order by key limit 10; http://git-wip-us.apache.org/repos/asf/hive/blob/e19b861c/ql/src/test/queries/clientnegative/spark_submit_negative_executor_memory.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientnegative/spark_submit_negative_executor_memory.q b/ql/src/test/queries/clientnegative/spark_submit_negative_executor_memory.q new file mode 100644 index 0000000..55bc4b8 --- /dev/null +++ b/ql/src/test/queries/clientnegative/spark_submit_negative_executor_memory.q @@ -0,0 +1,5 @@ +--! qt:dataset:src + +set spark.executor.memory=-1; + +select * from src order by key limit 10; http://git-wip-us.apache.org/repos/asf/hive/blob/e19b861c/ql/src/test/results/clientnegative/spark/spark_submit_negative_executor_cores.q.out ---------------------------------------------------------------------- diff --git a/ql/src/test/results/clientnegative/spark/spark_submit_negative_executor_cores.q.out b/ql/src/test/results/clientnegative/spark/spark_submit_negative_executor_cores.q.out new file mode 100644 index 0000000..47ac8b2 --- /dev/null +++ b/ql/src/test/results/clientnegative/spark/spark_submit_negative_executor_cores.q.out @@ -0,0 +1,5 @@ +PREHOOK: query: select * from src order by key limit 10 +PREHOOK: type: QUERY +PREHOOK: Input: default@src +#### A masked pattern was here #### +FAILED: Execution Error, return code 30041 from org.apache.hadoop.hive.ql.exec.spark.SparkTask. Failed to create Spark client #SPARK_SESSION_ID#: java.lang.RuntimeException: spark-submit process failed with exit code 1 and error "Error: Executor cores must be a positive number" http://git-wip-us.apache.org/repos/asf/hive/blob/e19b861c/ql/src/test/results/clientnegative/spark/spark_submit_negative_executor_memory.q.out ---------------------------------------------------------------------- diff --git a/ql/src/test/results/clientnegative/spark/spark_submit_negative_executor_memory.q.out b/ql/src/test/results/clientnegative/spark/spark_submit_negative_executor_memory.q.out new file mode 100644 index 0000000..4aa1337 --- /dev/null +++ b/ql/src/test/results/clientnegative/spark/spark_submit_negative_executor_memory.q.out @@ -0,0 +1,5 @@ +PREHOOK: query: select * from src order by key limit 10 +PREHOOK: type: QUERY +PREHOOK: Input: default@src +#### A masked pattern was here #### +FAILED: Execution Error, return code 30041 from org.apache.hadoop.hive.ql.exec.spark.SparkTask. Failed to create Spark client #SPARK_SESSION_ID#: java.lang.RuntimeException: spark-submit process failed with exit code 1 and error "Error: Executor Memory cores must be a positive number" http://git-wip-us.apache.org/repos/asf/hive/blob/e19b861c/spark-client/src/main/java/org/apache/hive/spark/client/AbstractSparkClient.java ---------------------------------------------------------------------- diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/AbstractSparkClient.java b/spark-client/src/main/java/org/apache/hive/spark/client/AbstractSparkClient.java index ed9222c..b2b5201 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/AbstractSparkClient.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/AbstractSparkClient.java @@ -24,7 +24,6 @@ import com.google.common.base.Charsets; import com.google.common.base.Joiner; import com.google.common.base.Strings; import com.google.common.base.Preconditions; -import com.google.common.base.Throwables; import com.google.common.collect.Maps; import com.google.common.io.Resources; @@ -41,8 +40,6 @@ import java.io.Serializable; import java.io.Writer; import java.net.URI; import java.net.URL; -import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; @@ -125,17 +122,19 @@ abstract class AbstractSparkClient implements SparkClient { } else { errorMsg = "Error while waiting for Remote Spark Driver to connect back to HiveServer2."; } - LOG.error(errorMsg, e); - driverFuture.cancel(true); - try { - driverFuture.get(); - } catch (InterruptedException ie) { - // Give up. - LOG.warn("Interrupted before driver thread was finished.", ie); - } catch (ExecutionException ee) { - LOG.error("Driver thread failed", ee); + if (driverFuture.isDone()) { + try { + driverFuture.get(); + } catch (InterruptedException ie) { + // Give up. + LOG.warn("Interrupted before driver thread was finished.", ie); + } catch (ExecutionException ee) { + LOG.error("Driver thread failed", ee); + } + } else { + driverFuture.cancel(true); } - throw Throwables.propagate(e); + throw new RuntimeException(errorMsg, e); } LOG.info("Successfully connected to Remote Spark Driver at: " + this.driverRpc.getRemoteAddress()); http://git-wip-us.apache.org/repos/asf/hive/blob/e19b861c/spark-client/src/main/java/org/apache/hive/spark/client/SparkSubmitSparkClient.java ---------------------------------------------------------------------- diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/SparkSubmitSparkClient.java b/spark-client/src/main/java/org/apache/hive/spark/client/SparkSubmitSparkClient.java index 31e89b8..7a6e77b 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/SparkSubmitSparkClient.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/SparkSubmitSparkClient.java @@ -32,6 +32,8 @@ import java.util.concurrent.Callable; import java.util.concurrent.Future; import java.util.concurrent.FutureTask; +import org.apache.commons.lang3.StringUtils; + import org.apache.hadoop.hive.common.log.LogRedirector; import org.apache.hadoop.hive.conf.Constants; import org.apache.hadoop.hive.conf.HiveConf; @@ -200,19 +202,19 @@ class SparkSubmitSparkClient extends AbstractSparkClient { try { int exitCode = child.waitFor(); if (exitCode != 0) { - StringBuilder errStr = new StringBuilder(); - synchronized(childErrorLog) { - for (Object aChildErrorLog : childErrorLog) { - errStr.append(aChildErrorLog); - errStr.append('\n'); + List<String> errorMessages = new ArrayList<>(); + synchronized (childErrorLog) { + for (String line : childErrorLog) { + if (StringUtils.containsIgnoreCase(line, "Error")) { + errorMessages.add("\"" + line + "\""); + } } } - LOG.warn("Child process exited with code {}", exitCode); - rpcServer.cancelClient(clientId, - "Child process (spark-submit) exited before connecting back with error log " + errStr.toString()); - } else { - LOG.info("Child process (spark-submit) exited successfully."); + String errStr = errorMessages.isEmpty() ? "?" : Joiner.on(',').join(errorMessages); + + rpcServer.cancelClient(clientId, new RuntimeException("spark-submit process failed " + + "with exit code " + exitCode + " and error " + errStr)); } } catch (InterruptedException ie) { LOG.warn("Thread waiting on the child process (spark-submit) is interrupted, killing the child process."); http://git-wip-us.apache.org/repos/asf/hive/blob/e19b861c/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcServer.java ---------------------------------------------------------------------- diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcServer.java b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcServer.java index babcb54..0c67ffd 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcServer.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcServer.java @@ -202,11 +202,12 @@ public class RpcServer implements Closeable { } /** - * Tells the RPC server to cancel the connection from an existing pending client + * Tells the RPC server to cancel the connection from an existing pending client. + * * @param clientId The identifier for the client - * @param msg The error message about why the connection should be canceled + * @param failure The error about why the connection should be canceled */ - public void cancelClient(final String clientId, final String msg) { + public void cancelClient(final String clientId, final Throwable failure) { final ClientInfo cinfo = pendingClients.remove(clientId); if (cinfo == null) { // Nothing to be done here. @@ -214,12 +215,22 @@ public class RpcServer implements Closeable { } cinfo.timeoutFuture.cancel(true); if (!cinfo.promise.isDone()) { - cinfo.promise.setFailure(new RuntimeException( - String.format("Cancelling Remote Spark Driver client connection '%s' with error: " + msg, clientId))); + cinfo.promise.setFailure(failure); } } /** + * Tells the RPC server to cancel the connection from an existing pending client. + * + * @param clientId The identifier for the client + * @param msg The error message about why the connection should be canceled + */ + public void cancelClient(final String clientId, final String msg) { + cancelClient(clientId, new RuntimeException(String.format( + "Cancelling Remote Spark Driver client connection '%s' with error: " + msg, clientId))); + } + + /** * Creates a secret for identifying a client connection. */ public String createSecret() {