Repository: hive Updated Branches: refs/heads/master beccce398 -> f6726f84e
HIVE-20695: HoS Query fails with hive.exec.parallel=true (Yongzhi Chen, reviewed by Peter Vary) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/f6726f84 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/f6726f84 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/f6726f84 Branch: refs/heads/master Commit: f6726f84ea17cd825aa832c9c9a134c598ec8755 Parents: beccce3 Author: Yongzhi Chen <[email protected]> Authored: Fri Oct 12 23:17:13 2018 -0400 Committer: Yongzhi Chen <[email protected]> Committed: Fri Oct 12 23:39:49 2018 -0400 ---------------------------------------------------------------------- .../hive/ql/exec/spark/RemoteHiveSparkClient.java | 4 +++- .../hadoop/hive/ql/exec/spark/SparkUtilities.java | 1 + .../hive/ql/exec/spark/session/SparkSessionImpl.java | 15 ++++++++------- 3 files changed, 12 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/f6726f84/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java index d31a202..49b7614 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java @@ -229,7 +229,7 @@ public class RemoteHiveSparkClient implements HiveSparkClient { return new RemoteSparkJobRef(hiveConf, jobHandle, sparkJobStatus); } - private void refreshLocalResources(SparkWork sparkWork, HiveConf conf) throws IOException { + private synchronized void refreshLocalResources(SparkWork sparkWork, HiveConf conf) throws IOException { // add hive-exec jar addJars((new JobConf(this.getClass())).getJar()); @@ -264,6 +264,7 @@ public class RemoteHiveSparkClient implements HiveSparkClient { addResources(addedArchives); } + //This method is not thread safe private void addResources(String addedFiles) throws IOException { for (String addedFile : CSV_SPLITTER.split(Strings.nullToEmpty(addedFiles))) { try { @@ -281,6 +282,7 @@ public class RemoteHiveSparkClient implements HiveSparkClient { } } + //This method is not thread safe private void addJars(String addedJars) throws IOException { for (String addedJar : CSV_SPLITTER.split(Strings.nullToEmpty(addedJars))) { try { http://git-wip-us.apache.org/repos/asf/hive/blob/f6726f84/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java index fdc5361..d384ed6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java @@ -78,6 +78,7 @@ public class SparkUtilities { /** * Uploads a local file to HDFS + * This method is not thread safe * * @param source * @param conf http://git-wip-us.apache.org/repos/asf/hive/blob/f6726f84/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java index 6a8b42e..bb50129 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java @@ -98,7 +98,6 @@ public class SparkSessionImpl implements SparkSession { private final String sessionId; private volatile HiveSparkClient hiveSparkClient; private volatile Path scratchDir; - private final Object dirLock = new Object(); /** * The timestamp of the last completed Spark job. @@ -317,6 +316,7 @@ public class SparkSessionImpl implements SparkSession { return result; } + //This method is not thread safe private void cleanScratchDir() throws IOException { if (scratchDir != null) { FileSystem fs = scratchDir.getFileSystem(conf); @@ -324,15 +324,16 @@ public class SparkSessionImpl implements SparkSession { scratchDir = null; } } - + /** + * Create scratch directory for spark session if it does not exist. + * This method is not thread safe. + * @return Path to Spark session scratch directory. + * @throws IOException + */ @Override public Path getHDFSSessionDir() throws IOException { if (scratchDir == null) { - synchronized (dirLock) { - if (scratchDir == null) { - scratchDir = createScratchDir(); - } - } + scratchDir = createScratchDir(); } return scratchDir; }
