This is an automated email from the ASF dual-hosted git repository.

sunchao pushed a commit to branch branch-3
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/branch-3 by this push:
     new f45197e6bb3 HIVE-26815: Backport HIVE-26758 (Allow use scratchdir for 
staging final job) (#3840)
f45197e6bb3 is described below

commit f45197e6bb3c4d4d9413de2ad12b01c2b923f0fc
Author: yigress <104102129+yigr...@users.noreply.github.com>
AuthorDate: Wed Dec 7 13:23:12 2022 -0800

    HIVE-26815: Backport HIVE-26758 (Allow use scratchdir for staging final 
job) (#3840)
---
 .../java/org/apache/hadoop/hive/conf/HiveConf.java |  6 ++-
 .../hadoop/hive/ql/exec/FileSinkOperator.java      | 13 ++++--
 .../org/apache/hadoop/hive/ql/exec/MoveTask.java   |  4 ++
 .../org/apache/hadoop/hive/ql/metadata/Hive.java   |  7 ++-
 .../hadoop/hive/ql/optimizer/GenMapRedUtils.java   | 13 ++++--
 .../hadoop/hive/ql/parse/SemanticAnalyzer.java     | 53 ++++++++++------------
 6 files changed, 57 insertions(+), 39 deletions(-)

diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java 
b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 31ea29bc81c..44acef4532d 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -4485,7 +4485,11 @@ public class HiveConf extends Configuration {
             "This parameter enables a number of optimizations when running on 
blobstores:\n" +
             "(1) If hive.blobstore.use.blobstore.as.scratchdir is false, force 
the last Hive job to write to the blobstore.\n" +
             "This is a performance optimization that forces the final 
FileSinkOperator to write to the blobstore.\n" +
-            "See HIVE-15121 for details.");
+            "See HIVE-15121 for details."),
+
+    HIVE_USE_SCRATCHDIR_FOR_STAGING("hive.use.scratchdir.for.staging", false,
+        "Use ${hive.exec.scratchdir} for query results instead of 
${hive.exec.stagingdir}.\n" +
+        "This stages query results in ${hive.exec.scratchdir} before moving to 
final destination.");
 
     public final String varname;
     public final String altName;
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
index 56c32bf78cc..713c19a6888 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
@@ -845,11 +845,14 @@ public class FileSinkOperator extends 
TerminalOperator<FileSinkDesc> implements
   }
 
   private void createDpDirCheckSrc(final Path dpStagingPath, final Path 
dpFinalPath) throws IOException {
-    if (!fs.exists(dpStagingPath) && !fs.exists(dpFinalPath)) {
-      fs.mkdirs(dpStagingPath);
-      // move task will create dp final path
-      if (reporter != null) {
-        reporter.incrCounter(counterGroup, 
Operator.HIVE_COUNTER_CREATED_DYNAMIC_PARTITIONS, 1);
+    if (!fs.exists(dpStagingPath)) {
+      FileSystem dpFs = dpFinalPath.getFileSystem(hconf);
+      if (!dpFs.exists(dpFinalPath)) {
+        fs.mkdirs(dpStagingPath);
+        // move task will create dp final path
+        if (reporter != null) {
+          reporter.incrCounter(counterGroup, 
Operator.HIVE_COUNTER_CREATED_DYNAMIC_PARTITIONS, 1);
+        }
       }
     }
   }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
index e6ab88fc127..315381d85b9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
@@ -311,6 +311,10 @@ public class MoveTask extends Task<MoveWork> implements 
Serializable {
             }
           }
           else {
+            FileSystem targetFs = targetPath.getFileSystem(conf);
+            if (!targetFs.exists(targetPath.getParent())){
+              targetFs.mkdirs(targetPath.getParent());
+            }
             moveFile(sourcePath, targetPath, lfd.getIsDfsDir());
           }
         }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java 
b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index 6ca82a67123..2d461ada49c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -1805,8 +1805,8 @@ public class Hive {
            * See: HIVE-1707 and HIVE-2117 for background
            */
           FileSystem oldPartPathFS = oldPartPath.getFileSystem(getConf());
-          FileSystem loadPathFS = loadPath.getFileSystem(getConf());
-          if (FileUtils.equalsFileSystem(oldPartPathFS,loadPathFS)) {
+          FileSystem tblPathFS = tblDataLocationPath.getFileSystem(getConf());
+          if (FileUtils.equalsFileSystem(oldPartPathFS, tblPathFS)) {
             newPartPath = oldPartPath;
           }
         }
@@ -4314,6 +4314,9 @@ private void constructOneLBLocationMap(FileStatus fSta,
     if (isNeedRecycle && conf.getBoolVar(HiveConf.ConfVars.REPLCMENABLED)) {
       recycleDirToCmPath(path, purge);
     }
+    if (!fs.exists(path)) {
+      return;
+    }
     FileStatus[] statuses = fs.listStatus(path, pathFilter);
     if (statuses == null || statuses.length == 0) {
       return;
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java 
b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
index f7eb711fb74..d333e6b8c7d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
@@ -1990,9 +1990,16 @@ public final class GenMapRedUtils {
         // it must be on the same file system as the current destination
         Context baseCtx = parseCtx.getContext();
 
-        // Create the required temporary file in the HDFS location if the 
destination
-        // path of the FileSinkOperator table is a blobstore path.
-        Path tmpDir = 
baseCtx.getTempDirForFinalJobPath(fileSinkDesc.getDestPath());
+        Path tmpDir = null;
+        if (hconf.getBoolVar(ConfVars.HIVE_USE_SCRATCHDIR_FOR_STAGING)) {
+          tmpDir = 
baseCtx.getTempDirForInterimJobPath(fileSinkDesc.getDestPath());
+        } else {
+          tmpDir = 
baseCtx.getTempDirForFinalJobPath(fileSinkDesc.getDestPath());
+        }
+        DynamicPartitionCtx dpCtx = fileSinkDesc.getDynPartCtx();
+        if (dpCtx != null && dpCtx.getSPPath() != null) {
+          tmpDir = new Path(tmpDir, dpCtx.getSPPath());
+        }
 
         // Change all the linked file sink descriptors
         if (fileSinkDesc.isLinkedFileSink()) {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
index 89852816977..93d70100334 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
@@ -7259,37 +7259,12 @@ public class SemanticAnalyzer extends 
BaseSemanticAnalyzer {
 
       checkImmutableTable(qb, destinationTable, destinationPath, false);
 
-      // check for partition
-      List<FieldSchema> parts = destinationTable.getPartitionKeys();
-      if (parts != null && parts.size() > 0) { // table is partitioned
-        if (partSpec == null || partSpec.size() == 0) { // user did NOT 
specify partition
-          throw new SemanticException(generateErrorMessage(
-              qb.getParseInfo().getDestForClause(dest),
-              ErrorMsg.NEED_PARTITION_ERROR.getMsg()));
-        }
-        dpCtx = qbm.getDPCtx(dest);
-        if (dpCtx == null) {
-          destinationTable.validatePartColumnNames(partSpec, false);
-          dpCtx = new DynamicPartitionCtx(partSpec,
-              conf.getVar(HiveConf.ConfVars.DEFAULTPARTITIONNAME),
-              
conf.getIntVar(HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTSPERNODE));
-          qbm.setDPCtx(dest, dpCtx);
-        }
-      }
-
       // Check for dynamic partitions.
       dpCtx = checkDynPart(qb, qbm, destinationTable, partSpec, dest);
-      if (dpCtx != null && dpCtx.getSPPath() != null) {
-        destinationPath = new Path(destinationTable.getPath(), 
dpCtx.getSPPath());
-      }
 
       boolean isNonNativeTable = destinationTable.isNonNative();
       isMmTable = 
AcidUtils.isInsertOnlyTable(destinationTable.getParameters());
-      if (isNonNativeTable || isMmTable) {
-        queryTmpdir = destinationPath;
-      } else {
-        queryTmpdir = ctx.getTempDirForFinalJobPath(destinationPath);
-      }
+      queryTmpdir = getTmpDir(isNonNativeTable, isMmTable, false, 
destinationPath, dpCtx);
       if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) {
         Utilities.FILE_OP_LOGGER.trace("create filesink w/DEST_TABLE 
specifying " + queryTmpdir
             + " from " + destinationPath);
@@ -7398,7 +7373,7 @@ public class SemanticAnalyzer extends 
BaseSemanticAnalyzer {
           .getAuthority(), partPath.toUri().getPath());
 
       isMmTable = 
AcidUtils.isInsertOnlyTable(destinationTable.getParameters());
-      queryTmpdir = isMmTable ? destinationPath : 
ctx.getTempDirForFinalJobPath(destinationPath);
+      queryTmpdir = getTmpDir(false, isMmTable, false, destinationPath, null);
       if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) {
         Utilities.FILE_OP_LOGGER.trace("create filesink w/DEST_PARTITION 
specifying "
             + queryTmpdir + " from " + destinationPath);
@@ -7514,7 +7489,7 @@ public class SemanticAnalyzer extends 
BaseSemanticAnalyzer {
         // no copy is required. we may want to revisit this policy in future
         try {
           Path qPath = FileUtils.makeQualified(destinationPath, conf);
-          queryTmpdir = isMmTable ? qPath : 
ctx.getTempDirForFinalJobPath(qPath);
+          queryTmpdir = getTmpDir(false, isMmTable, false, qPath, null);
           if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) {
             Utilities.FILE_OP_LOGGER.trace("Setting query directory " + 
queryTmpdir
                 + " from " + destinationPath + " (" + isMmTable + ")");
@@ -7781,6 +7756,28 @@ public class SemanticAnalyzer extends 
BaseSemanticAnalyzer {
     return output;
   }
 
+  private Path getTmpDir(boolean isNonNativeTable, boolean isMmTable, boolean 
isDirectInsert,
+      Path destinationPath, DynamicPartitionCtx dpCtx) {
+    /**
+     * We will directly insert to the final destination in the following cases:
+     * 1. Non native table
+     * 2. Micro-managed (insert only table)
+     * 3. Full ACID table and operation type is INSERT
+     */
+    Path destPath = null;
+    if (isNonNativeTable || isMmTable || isDirectInsert) {
+      destPath = destinationPath;
+    } else if (HiveConf.getBoolVar(conf, 
ConfVars.HIVE_USE_SCRATCHDIR_FOR_STAGING)) {
+      destPath = ctx.getTempDirForInterimJobPath(destinationPath);
+    } else {
+      destPath = ctx.getTempDirForFinalJobPath(destinationPath);
+    }
+    if (dpCtx != null && dpCtx.getSPPath() != null) {
+      return new Path(destPath, dpCtx.getSPPath());
+    }
+    return destPath;
+  }
+
   private boolean useBatchingSerializer(String serdeClassName) {
     return SessionState.get().isHiveServerQuery() &&
       hasSetBatchSerializer(serdeClassName);

Reply via email to