This is an automated email from the ASF dual-hosted git repository.

abstractdog pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 3b064f45fb9 HIVE-27951: hcatalog dynamic partitioning fails with 
partition already exist error when exist parent partitions path (#4979) 
(yigress reviewed by Laszlo Bodor)
3b064f45fb9 is described below

commit 3b064f45fb909b68e3ceb3ac259b8b1b37c8c0de
Author: yigress <[email protected]>
AuthorDate: Fri Jan 12 13:04:11 2024 -0800

    HIVE-27951: hcatalog dynamic partitioning fails with partition already 
exist error when exist parent partitions path (#4979) (yigress reviewed by 
Laszlo Bodor)
---
 .../mapreduce/FileOutputCommitterContainer.java    | 112 ++++++++++++---------
 .../mapreduce/TestHCatDynamicPartitioned.java      |  19 ++--
 .../TestHCatExternalDynamicPartitioned.java        |   4 +-
 3 files changed, 80 insertions(+), 55 deletions(-)

diff --git 
a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputCommitterContainer.java
 
b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputCommitterContainer.java
index de9ad252ff2..e585b5b08c5 100644
--- 
a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputCommitterContainer.java
+++ 
b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputCommitterContainer.java
@@ -488,7 +488,7 @@ class FileOutputCommitterContainer extends 
OutputCommitterContainer {
   }
 
   /**
-   * Move all of the files from the temp directory to the final location
+   * Move task output from the temp directory to the final location
    * @param srcf the file to move
    * @param srcDir the source directory
    * @param destDir the target directory
@@ -497,7 +497,7 @@ class FileOutputCommitterContainer extends 
OutputCommitterContainer {
    */
   private void moveTaskOutputs(final Configuration conf, Path srcf, Path 
srcDir,
       Path destDir, boolean immutable) throws IOException {
-    if(LOG.isDebugEnabled()) {
+    if (LOG.isDebugEnabled()) {
       LOG.debug("moveTaskOutputs "
           + srcf + " from: " + srcDir + " to: " + destDir + " immutable: " + 
immutable);
     }
@@ -516,8 +516,8 @@ class FileOutputCommitterContainer extends 
OutputCommitterContainer {
 
     LinkedList<Pair<Path, Path>> moves = new LinkedList<>();
     if (customDynamicLocationUsed) {
-      if (immutable && destFs.exists(destDir) &&
-          !org.apache.hadoop.hive.metastore.utils.FileUtils.isDirEmpty(destFs, 
destDir)) {
+      if (immutable && destFs.exists(destDir)
+          && 
!org.apache.hadoop.hive.metastore.utils.FileUtils.isDirEmpty(destFs, destDir)) {
         throw new HCatException(ErrorType.ERROR_DUPLICATE_PARTITION,
             "Data already exists in " + destDir
                 + ", duplicate publish not possible.");
@@ -536,19 +536,18 @@ class FileOutputCommitterContainer extends 
OutputCommitterContainer {
         FileStatus srcStatus = srcQ.remove();
         Path srcF = srcStatus.getPath();
         final Path finalOutputPath = getFinalPath(destFs, srcF, srcDir, 
destDir, immutable);
-        if (immutable && destFs.exists(finalOutputPath) &&
-            
!org.apache.hadoop.hive.metastore.utils.FileUtils.isDirEmpty(destFs, 
finalOutputPath)) {
-          throw new HCatException(ErrorType.ERROR_DUPLICATE_PARTITION,
-              "Data already exists in " + finalOutputPath
-                  + ", duplicate publish not possible.");
-        }
-        if (srcStatus.isDirectory()) {
+        if (immutable && destFs.exists(finalOutputPath)
+            && 
!org.apache.hadoop.hive.metastore.utils.FileUtils.isDirEmpty(destFs, 
finalOutputPath)) {
+          if (partitionsDiscoveredByPath.containsKey(srcF.toString())) {
+            throw new HCatException(ErrorType.ERROR_DUPLICATE_PARTITION,
+                "Data already exists in " + finalOutputPath + ", duplicate 
publish not possible.");
+          }
+          // parent directory may exist for multi-partitions, check lower 
level partitions
+          Collections.addAll(srcQ, srcFs.listStatus(srcF, 
HIDDEN_FILES_PATH_FILTER));
+        } else if (srcStatus.isDirectory()) {
           if (canRename && dynamicPartitioningUsed) {
             // If it is partition, move the partition directory instead of 
each file.
-            // If custom dynamic location provided, need to rename to final 
output path
-            final Path parentDir = finalOutputPath.getParent();
-            Path dstPath = !customDynamicLocationUsed ? parentDir : 
finalOutputPath;
-            moves.add(Pair.of(srcF, dstPath));
+            moves.add(Pair.of(srcF, finalOutputPath));
           } else {
             Collections.addAll(srcQ, srcFs.listStatus(srcF, 
HIDDEN_FILES_PATH_FILTER));
           }
@@ -558,50 +557,69 @@ class FileOutputCommitterContainer extends 
OutputCommitterContainer {
       }
     }
 
-    if (moves.isEmpty()) {
+    bulkMoveFiles(conf, srcFs, destFs, moves);
+  }
+
+  /**
+   * Bulk move files from source to destination.
+   * @param srcFs the source filesystem where the source files are
+   * @param destFs the destionation filesystem where the destionation files are
+   * @param pairs list of pairs of <source_path, destination_path>, move 
source_path to destination_path
+   * @throws java.io.IOException
+   */
+  private void bulkMoveFiles(final Configuration conf, final FileSystem srcFs, 
final FileSystem destFs,
+      final List<Pair<Path, Path>> pairs) throws IOException {
+    if (pairs.isEmpty()) {
       return;
     }
-
+    final boolean canRename = srcFs.getUri().equals(destFs.getUri());
     final List<Future<Pair<Path, Path>>> futures = new LinkedList<>();
-    final ExecutorService pool = 
conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25) > 0 ?
-        
Executors.newFixedThreadPool(conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname,
 25),
-            new 
ThreadFactoryBuilder().setDaemon(true).setNameFormat("Move-Thread-%d").build()) 
: null;
+    final int moveThreadsCount = 
conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25);
 
-    for (final Pair<Path, Path> pair: moves){
+    if (moveThreadsCount <= 0) {
+      for (final Pair<Path, Path> pair: pairs) {
+        Path srcP = pair.getLeft();
+        Path dstP = pair.getRight();
+        if (!moveFile(srcFs, srcP, destFs, dstP, conf, canRename)) {
+          throw new HCatException(ErrorType.ERROR_MOVE_FAILED,
+              "Unable to move from " + srcP + " to " + dstP);
+        }
+      }
+      return;
+    }
+
+    final ExecutorService pool = Executors.newFixedThreadPool(moveThreadsCount,
+        new 
ThreadFactoryBuilder().setDaemon(true).setNameFormat("Move-Thread-%d").build());
+
+    for (final Pair<Path, Path> pair: pairs) {
       Path srcP = pair.getLeft();
       Path dstP = pair.getRight();
-      final String msg = "Unable to move source " + srcP + " to destination " 
+ dstP;
-      if (null==pool) {
-        moveFile(srcFs, srcP, destFs, dstP, conf, canRename);
-      } else {
-        futures.add(pool.submit(new Callable<Pair<Path, Path>>() {
-          @Override
-          public Pair<Path, Path> call() throws IOException {
-            if (moveFile(srcFs, srcP, destFs, dstP, conf, canRename)) {
-              return pair;
-            } else {
-              throw new HCatException(ErrorType.ERROR_MOVE_FAILED, msg);
-            }
+      futures.add(pool.submit(new Callable<Pair<Path, Path>>() {
+        @Override
+        public Pair<Path, Path> call() throws IOException {
+          if (moveFile(srcFs, srcP, destFs, dstP, conf, canRename)) {
+            return pair;
+          } else {
+            throw new HCatException(ErrorType.ERROR_MOVE_FAILED,
+                "Unable to move from " + srcP + " to " + dstP);
           }
-        }));
-      }
-    }
-    if (null != pool) {
-      pool.shutdown();
-      for (Future<Pair<Path, Path>> future : futures) {
-        try {
-          Pair<Path, Path> pair = future.get();
-          LOG.debug("Moved src: {}, to dest: {}", pair.getLeft().toString(), 
pair.getRight().toString());
-        } catch (Exception e) {
-          LOG.error("Failed to move {}", e.getMessage());
-          pool.shutdownNow();
-          throw new HCatException(ErrorType.ERROR_MOVE_FAILED, e.getMessage());
         }
+      }));
+    }
+    pool.shutdown();
+    for (Future<Pair<Path, Path>> future : futures) {
+      try {
+        future.get();
+      } catch (Exception e) {
+        pool.shutdownNow();
+        throw new HCatException(ErrorType.ERROR_MOVE_FAILED, e.getMessage());
       }
     }
   }
 
-  private boolean moveFile(FileSystem srcFs, Path srcf, FileSystem destFs, 
Path destf, Configuration conf, boolean canRename) throws IOException {
+  private boolean moveFile(final FileSystem srcFs, final Path srcf, final 
FileSystem destFs, final Path destf,
+      final Configuration conf, final boolean canRename) throws IOException {
+    LOG.debug("Moving src: {}, to dest: {}", srcf, destf);
     boolean moved;
     if (canRename) {
       destFs.mkdirs(destf.getParent());
diff --git 
a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatDynamicPartitioned.java
 
b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatDynamicPartitioned.java
index a97162de993..9ee887b933b 100644
--- 
a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatDynamicPartitioned.java
+++ 
b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatDynamicPartitioned.java
@@ -52,13 +52,13 @@ public class TestHCatDynamicPartitioned extends 
HCatMapReduceTest {
   private static List<HCatFieldSchema> dataColumns;
   private static final Logger LOG = 
LoggerFactory.getLogger(TestHCatDynamicPartitioned.class);
   protected static final int NUM_RECORDS = 20;
-  protected static final int NUM_PARTITIONS = 5;
+  protected static final int NUM_TOP_PARTITIONS = 5;
 
   public TestHCatDynamicPartitioned(String formatName, String serdeClass, 
String inputFormatClass,
       String outputFormatClass) throws Exception {
     super(formatName, serdeClass, inputFormatClass, outputFormatClass);
     tableName = "testHCatDynamicPartitionedTable_" + formatName;
-    generateWriteRecords(NUM_RECORDS, NUM_PARTITIONS, 0);
+    generateWriteRecords(NUM_RECORDS, NUM_TOP_PARTITIONS, 0);
     generateDataColumns();
   }
 
@@ -67,6 +67,8 @@ public class TestHCatDynamicPartitioned extends 
HCatMapReduceTest {
     dataColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c1", 
serdeConstants.INT_TYPE_NAME, "")));
     dataColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c2", 
serdeConstants.STRING_TYPE_NAME, "")));
     dataColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("p1", 
serdeConstants.STRING_TYPE_NAME, "")));
+    dataColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("p2", 
serdeConstants.STRING_TYPE_NAME, "")));
+
   }
 
   protected static void generateWriteRecords(int max, int mod, int offset) {
@@ -78,6 +80,7 @@ public class TestHCatDynamicPartitioned extends 
HCatMapReduceTest {
       objList.add(i);
       objList.add("strvalue" + i);
       objList.add(String.valueOf((i % mod) + offset));
+      objList.add(String.valueOf((i / (max/2)) + offset));
       writeRecords.add(new DefaultHCatRecord(objList));
     }
   }
@@ -86,6 +89,7 @@ public class TestHCatDynamicPartitioned extends 
HCatMapReduceTest {
   protected List<FieldSchema> getPartitionKeys() {
     List<FieldSchema> fields = new ArrayList<FieldSchema>();
     fields.add(new FieldSchema("p1", serdeConstants.STRING_TYPE_NAME, ""));
+    fields.add(new FieldSchema("p2", serdeConstants.STRING_TYPE_NAME, ""));
     return fields;
   }
 
@@ -117,8 +121,11 @@ public class TestHCatDynamicPartitioned extends 
HCatMapReduceTest {
 
   protected void runHCatDynamicPartitionedTable(boolean asSingleMapTask,
       String customDynamicPathPattern) throws Exception {
-    generateWriteRecords(NUM_RECORDS, NUM_PARTITIONS, 0);
-    runMRCreate(null, dataColumns, writeRecords, NUM_RECORDS, true, 
asSingleMapTask, customDynamicPathPattern);
+    generateWriteRecords(NUM_RECORDS, NUM_TOP_PARTITIONS, 0);
+    runMRCreate(null, dataColumns, writeRecords.subList(0,NUM_RECORDS/2), 
NUM_RECORDS/2,
+        true, asSingleMapTask, customDynamicPathPattern);
+    runMRCreate(null, dataColumns, 
writeRecords.subList(NUM_RECORDS/2,NUM_RECORDS), NUM_RECORDS/2,
+        true, asSingleMapTask, customDynamicPathPattern);
 
     runMRRead(NUM_RECORDS);
 
@@ -140,7 +147,7 @@ public class TestHCatDynamicPartitioned extends 
HCatMapReduceTest {
     //Test for duplicate publish
     IOException exc = null;
     try {
-      generateWriteRecords(NUM_RECORDS, NUM_PARTITIONS, 0);
+      generateWriteRecords(NUM_RECORDS, NUM_TOP_PARTITIONS, 0);
       Job job = runMRCreate(null, dataColumns, writeRecords, NUM_RECORDS, 
false,
           true, customDynamicPathPattern);
 
@@ -167,7 +174,7 @@ public class TestHCatDynamicPartitioned extends 
HCatMapReduceTest {
     driver.run(query);
     res = new ArrayList<String>();
     driver.getResults(res);
-    assertEquals(NUM_PARTITIONS, res.size());
+    assertEquals(NUM_TOP_PARTITIONS*2, res.size());
 
     query = "select * from " + tableName;
     driver.run(query);
diff --git 
a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatExternalDynamicPartitioned.java
 
b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatExternalDynamicPartitioned.java
index 18fcfdbdd2a..f142f3d488f 100644
--- 
a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatExternalDynamicPartitioned.java
+++ 
b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatExternalDynamicPartitioned.java
@@ -28,7 +28,7 @@ public class TestHCatExternalDynamicPartitioned extends 
TestHCatDynamicPartition
       throws Exception {
     super(formatName, serdeClass, inputFormatClass, outputFormatClass);
     tableName = "testHCatExternalDynamicPartitionedTable_" + formatName;
-    generateWriteRecords(NUM_RECORDS, NUM_PARTITIONS, 0);
+    generateWriteRecords(NUM_RECORDS, NUM_TOP_PARTITIONS, 0);
     generateDataColumns();
   }
 
@@ -43,7 +43,7 @@ public class TestHCatExternalDynamicPartitioned extends 
TestHCatDynamicPartition
    */
   @Test
   public void testHCatExternalDynamicCustomLocation() throws Exception {
-    runHCatDynamicPartitionedTable(true, "mapred/externalDynamicOutput/${p1}");
+    runHCatDynamicPartitionedTable(true, 
"mapred/externalDynamicOutput/${p1}/${p2}");
   }
 
 }

Reply via email to