This is an automated email from the ASF dual-hosted git repository.
sunchao pushed a commit to branch branch-3
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/branch-3 by this push:
new f45197e6bb3 HIVE-26815: Backport HIVE-26758 (Allow use scratchdir for
staging final job) (#3840)
f45197e6bb3 is described below
commit f45197e6bb3c4d4d9413de2ad12b01c2b923f0fc
Author: yigress <[email protected]>
AuthorDate: Wed Dec 7 13:23:12 2022 -0800
HIVE-26815: Backport HIVE-26758 (Allow use scratchdir for staging final
job) (#3840)
---
.../java/org/apache/hadoop/hive/conf/HiveConf.java | 6 ++-
.../hadoop/hive/ql/exec/FileSinkOperator.java | 13 ++++--
.../org/apache/hadoop/hive/ql/exec/MoveTask.java | 4 ++
.../org/apache/hadoop/hive/ql/metadata/Hive.java | 7 ++-
.../hadoop/hive/ql/optimizer/GenMapRedUtils.java | 13 ++++--
.../hadoop/hive/ql/parse/SemanticAnalyzer.java | 53 ++++++++++------------
6 files changed, 57 insertions(+), 39 deletions(-)
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 31ea29bc81c..44acef4532d 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -4485,7 +4485,11 @@ public class HiveConf extends Configuration {
"This parameter enables a number of optimizations when running on
blobstores:\n" +
"(1) If hive.blobstore.use.blobstore.as.scratchdir is false, force
the last Hive job to write to the blobstore.\n" +
"This is a performance optimization that forces the final
FileSinkOperator to write to the blobstore.\n" +
- "See HIVE-15121 for details.");
+ "See HIVE-15121 for details."),
+
+ HIVE_USE_SCRATCHDIR_FOR_STAGING("hive.use.scratchdir.for.staging", false,
+ "Use ${hive.exec.scratchdir} for query results instead of
${hive.exec.stagingdir}.\n" +
+ "This stages query results in ${hive.exec.scratchdir} before moving to
final destination.");
public final String varname;
public final String altName;
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
index 56c32bf78cc..713c19a6888 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
@@ -845,11 +845,14 @@ public class FileSinkOperator extends
TerminalOperator<FileSinkDesc> implements
}
private void createDpDirCheckSrc(final Path dpStagingPath, final Path
dpFinalPath) throws IOException {
- if (!fs.exists(dpStagingPath) && !fs.exists(dpFinalPath)) {
- fs.mkdirs(dpStagingPath);
- // move task will create dp final path
- if (reporter != null) {
- reporter.incrCounter(counterGroup,
Operator.HIVE_COUNTER_CREATED_DYNAMIC_PARTITIONS, 1);
+ if (!fs.exists(dpStagingPath)) {
+ FileSystem dpFs = dpFinalPath.getFileSystem(hconf);
+ if (!dpFs.exists(dpFinalPath)) {
+ fs.mkdirs(dpStagingPath);
+ // move task will create dp final path
+ if (reporter != null) {
+ reporter.incrCounter(counterGroup,
Operator.HIVE_COUNTER_CREATED_DYNAMIC_PARTITIONS, 1);
+ }
}
}
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
index e6ab88fc127..315381d85b9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
@@ -311,6 +311,10 @@ public class MoveTask extends Task<MoveWork> implements
Serializable {
}
}
else {
+ FileSystem targetFs = targetPath.getFileSystem(conf);
+ if (!targetFs.exists(targetPath.getParent())){
+ targetFs.mkdirs(targetPath.getParent());
+ }
moveFile(sourcePath, targetPath, lfd.getIsDfsDir());
}
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index 6ca82a67123..2d461ada49c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -1805,8 +1805,8 @@ public class Hive {
* See: HIVE-1707 and HIVE-2117 for background
*/
FileSystem oldPartPathFS = oldPartPath.getFileSystem(getConf());
- FileSystem loadPathFS = loadPath.getFileSystem(getConf());
- if (FileUtils.equalsFileSystem(oldPartPathFS,loadPathFS)) {
+ FileSystem tblPathFS = tblDataLocationPath.getFileSystem(getConf());
+ if (FileUtils.equalsFileSystem(oldPartPathFS, tblPathFS)) {
newPartPath = oldPartPath;
}
}
@@ -4314,6 +4314,9 @@ private void constructOneLBLocationMap(FileStatus fSta,
if (isNeedRecycle && conf.getBoolVar(HiveConf.ConfVars.REPLCMENABLED)) {
recycleDirToCmPath(path, purge);
}
+ if (!fs.exists(path)) {
+ return;
+ }
FileStatus[] statuses = fs.listStatus(path, pathFilter);
if (statuses == null || statuses.length == 0) {
return;
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
index f7eb711fb74..d333e6b8c7d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
@@ -1990,9 +1990,16 @@ public final class GenMapRedUtils {
// it must be on the same file system as the current destination
Context baseCtx = parseCtx.getContext();
- // Create the required temporary file in the HDFS location if the
destination
- // path of the FileSinkOperator table is a blobstore path.
- Path tmpDir =
baseCtx.getTempDirForFinalJobPath(fileSinkDesc.getDestPath());
+ Path tmpDir = null;
+ if (hconf.getBoolVar(ConfVars.HIVE_USE_SCRATCHDIR_FOR_STAGING)) {
+ tmpDir =
baseCtx.getTempDirForInterimJobPath(fileSinkDesc.getDestPath());
+ } else {
+ tmpDir =
baseCtx.getTempDirForFinalJobPath(fileSinkDesc.getDestPath());
+ }
+ DynamicPartitionCtx dpCtx = fileSinkDesc.getDynPartCtx();
+ if (dpCtx != null && dpCtx.getSPPath() != null) {
+ tmpDir = new Path(tmpDir, dpCtx.getSPPath());
+ }
// Change all the linked file sink descriptors
if (fileSinkDesc.isLinkedFileSink()) {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
index 89852816977..93d70100334 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
@@ -7259,37 +7259,12 @@ public class SemanticAnalyzer extends
BaseSemanticAnalyzer {
checkImmutableTable(qb, destinationTable, destinationPath, false);
- // check for partition
- List<FieldSchema> parts = destinationTable.getPartitionKeys();
- if (parts != null && parts.size() > 0) { // table is partitioned
- if (partSpec == null || partSpec.size() == 0) { // user did NOT
specify partition
- throw new SemanticException(generateErrorMessage(
- qb.getParseInfo().getDestForClause(dest),
- ErrorMsg.NEED_PARTITION_ERROR.getMsg()));
- }
- dpCtx = qbm.getDPCtx(dest);
- if (dpCtx == null) {
- destinationTable.validatePartColumnNames(partSpec, false);
- dpCtx = new DynamicPartitionCtx(partSpec,
- conf.getVar(HiveConf.ConfVars.DEFAULTPARTITIONNAME),
-
conf.getIntVar(HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTSPERNODE));
- qbm.setDPCtx(dest, dpCtx);
- }
- }
-
// Check for dynamic partitions.
dpCtx = checkDynPart(qb, qbm, destinationTable, partSpec, dest);
- if (dpCtx != null && dpCtx.getSPPath() != null) {
- destinationPath = new Path(destinationTable.getPath(),
dpCtx.getSPPath());
- }
boolean isNonNativeTable = destinationTable.isNonNative();
isMmTable =
AcidUtils.isInsertOnlyTable(destinationTable.getParameters());
- if (isNonNativeTable || isMmTable) {
- queryTmpdir = destinationPath;
- } else {
- queryTmpdir = ctx.getTempDirForFinalJobPath(destinationPath);
- }
+ queryTmpdir = getTmpDir(isNonNativeTable, isMmTable, false,
destinationPath, dpCtx);
if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) {
Utilities.FILE_OP_LOGGER.trace("create filesink w/DEST_TABLE
specifying " + queryTmpdir
+ " from " + destinationPath);
@@ -7398,7 +7373,7 @@ public class SemanticAnalyzer extends
BaseSemanticAnalyzer {
.getAuthority(), partPath.toUri().getPath());
isMmTable =
AcidUtils.isInsertOnlyTable(destinationTable.getParameters());
- queryTmpdir = isMmTable ? destinationPath :
ctx.getTempDirForFinalJobPath(destinationPath);
+ queryTmpdir = getTmpDir(false, isMmTable, false, destinationPath, null);
if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) {
Utilities.FILE_OP_LOGGER.trace("create filesink w/DEST_PARTITION
specifying "
+ queryTmpdir + " from " + destinationPath);
@@ -7514,7 +7489,7 @@ public class SemanticAnalyzer extends
BaseSemanticAnalyzer {
// no copy is required. we may want to revisit this policy in future
try {
Path qPath = FileUtils.makeQualified(destinationPath, conf);
- queryTmpdir = isMmTable ? qPath :
ctx.getTempDirForFinalJobPath(qPath);
+ queryTmpdir = getTmpDir(false, isMmTable, false, qPath, null);
if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) {
Utilities.FILE_OP_LOGGER.trace("Setting query directory " +
queryTmpdir
+ " from " + destinationPath + " (" + isMmTable + ")");
@@ -7781,6 +7756,28 @@ public class SemanticAnalyzer extends
BaseSemanticAnalyzer {
return output;
}
+ private Path getTmpDir(boolean isNonNativeTable, boolean isMmTable, boolean
isDirectInsert,
+ Path destinationPath, DynamicPartitionCtx dpCtx) {
+ /**
+ * We will directly insert to the final destination in the following cases:
+ * 1. Non native table
+ * 2. Micro-managed (insert only table)
+ * 3. Full ACID table and operation type is INSERT
+ */
+ Path destPath = null;
+ if (isNonNativeTable || isMmTable || isDirectInsert) {
+ destPath = destinationPath;
+ } else if (HiveConf.getBoolVar(conf,
ConfVars.HIVE_USE_SCRATCHDIR_FOR_STAGING)) {
+ destPath = ctx.getTempDirForInterimJobPath(destinationPath);
+ } else {
+ destPath = ctx.getTempDirForFinalJobPath(destinationPath);
+ }
+ if (dpCtx != null && dpCtx.getSPPath() != null) {
+ return new Path(destPath, dpCtx.getSPPath());
+ }
+ return destPath;
+ }
+
private boolean useBatchingSerializer(String serdeClassName) {
return SessionState.get().isHiveServerQuery() &&
hasSetBatchSerializer(serdeClassName);