This is an automated email from the ASF dual-hosted git repository.
abstractdog pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 3b064f45fb9 HIVE-27951: hcatalog dynamic partitioning fails with
partition already exist error when exist parent partitions path (#4979)
(yigress reviewed by Laszlo Bodor)
3b064f45fb9 is described below
commit 3b064f45fb909b68e3ceb3ac259b8b1b37c8c0de
Author: yigress <[email protected]>
AuthorDate: Fri Jan 12 13:04:11 2024 -0800
HIVE-27951: hcatalog dynamic partitioning fails with partition already
exist error when exist parent partitions path (#4979) (yigress reviewed by
Laszlo Bodor)
---
.../mapreduce/FileOutputCommitterContainer.java | 112 ++++++++++++---------
.../mapreduce/TestHCatDynamicPartitioned.java | 19 ++--
.../TestHCatExternalDynamicPartitioned.java | 4 +-
3 files changed, 80 insertions(+), 55 deletions(-)
diff --git
a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputCommitterContainer.java
b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputCommitterContainer.java
index de9ad252ff2..e585b5b08c5 100644
---
a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputCommitterContainer.java
+++
b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputCommitterContainer.java
@@ -488,7 +488,7 @@ class FileOutputCommitterContainer extends
OutputCommitterContainer {
}
/**
- * Move all of the files from the temp directory to the final location
+ * Move task output from the temp directory to the final location
* @param srcf the file to move
* @param srcDir the source directory
* @param destDir the target directory
@@ -497,7 +497,7 @@ class FileOutputCommitterContainer extends
OutputCommitterContainer {
*/
private void moveTaskOutputs(final Configuration conf, Path srcf, Path
srcDir,
Path destDir, boolean immutable) throws IOException {
- if(LOG.isDebugEnabled()) {
+ if (LOG.isDebugEnabled()) {
LOG.debug("moveTaskOutputs "
+ srcf + " from: " + srcDir + " to: " + destDir + " immutable: " +
immutable);
}
@@ -516,8 +516,8 @@ class FileOutputCommitterContainer extends
OutputCommitterContainer {
LinkedList<Pair<Path, Path>> moves = new LinkedList<>();
if (customDynamicLocationUsed) {
- if (immutable && destFs.exists(destDir) &&
- !org.apache.hadoop.hive.metastore.utils.FileUtils.isDirEmpty(destFs,
destDir)) {
+ if (immutable && destFs.exists(destDir)
+ &&
!org.apache.hadoop.hive.metastore.utils.FileUtils.isDirEmpty(destFs, destDir)) {
throw new HCatException(ErrorType.ERROR_DUPLICATE_PARTITION,
"Data already exists in " + destDir
+ ", duplicate publish not possible.");
@@ -536,19 +536,18 @@ class FileOutputCommitterContainer extends
OutputCommitterContainer {
FileStatus srcStatus = srcQ.remove();
Path srcF = srcStatus.getPath();
final Path finalOutputPath = getFinalPath(destFs, srcF, srcDir,
destDir, immutable);
- if (immutable && destFs.exists(finalOutputPath) &&
-
!org.apache.hadoop.hive.metastore.utils.FileUtils.isDirEmpty(destFs,
finalOutputPath)) {
- throw new HCatException(ErrorType.ERROR_DUPLICATE_PARTITION,
- "Data already exists in " + finalOutputPath
- + ", duplicate publish not possible.");
- }
- if (srcStatus.isDirectory()) {
+ if (immutable && destFs.exists(finalOutputPath)
+ &&
!org.apache.hadoop.hive.metastore.utils.FileUtils.isDirEmpty(destFs,
finalOutputPath)) {
+ if (partitionsDiscoveredByPath.containsKey(srcF.toString())) {
+ throw new HCatException(ErrorType.ERROR_DUPLICATE_PARTITION,
+ "Data already exists in " + finalOutputPath + ", duplicate
publish not possible.");
+ }
+ // parent directory may exist for multi-partitions, check lower
level partitions
+ Collections.addAll(srcQ, srcFs.listStatus(srcF,
HIDDEN_FILES_PATH_FILTER));
+ } else if (srcStatus.isDirectory()) {
if (canRename && dynamicPartitioningUsed) {
// If it is partition, move the partition directory instead of
each file.
- // If custom dynamic location provided, need to rename to final
output path
- final Path parentDir = finalOutputPath.getParent();
- Path dstPath = !customDynamicLocationUsed ? parentDir :
finalOutputPath;
- moves.add(Pair.of(srcF, dstPath));
+ moves.add(Pair.of(srcF, finalOutputPath));
} else {
Collections.addAll(srcQ, srcFs.listStatus(srcF,
HIDDEN_FILES_PATH_FILTER));
}
@@ -558,50 +557,69 @@ class FileOutputCommitterContainer extends
OutputCommitterContainer {
}
}
- if (moves.isEmpty()) {
+ bulkMoveFiles(conf, srcFs, destFs, moves);
+ }
+
+ /**
+ * Bulk move files from source to destination.
+ * @param srcFs the source filesystem where the source files are
+ * @param destFs the destionation filesystem where the destionation files are
+ * @param pairs list of pairs of <source_path, destination_path>, move
source_path to destination_path
+ * @throws java.io.IOException
+ */
+ private void bulkMoveFiles(final Configuration conf, final FileSystem srcFs,
final FileSystem destFs,
+ final List<Pair<Path, Path>> pairs) throws IOException {
+ if (pairs.isEmpty()) {
return;
}
-
+ final boolean canRename = srcFs.getUri().equals(destFs.getUri());
final List<Future<Pair<Path, Path>>> futures = new LinkedList<>();
- final ExecutorService pool =
conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25) > 0 ?
-
Executors.newFixedThreadPool(conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname,
25),
- new
ThreadFactoryBuilder().setDaemon(true).setNameFormat("Move-Thread-%d").build())
: null;
+ final int moveThreadsCount =
conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25);
- for (final Pair<Path, Path> pair: moves){
+ if (moveThreadsCount <= 0) {
+ for (final Pair<Path, Path> pair: pairs) {
+ Path srcP = pair.getLeft();
+ Path dstP = pair.getRight();
+ if (!moveFile(srcFs, srcP, destFs, dstP, conf, canRename)) {
+ throw new HCatException(ErrorType.ERROR_MOVE_FAILED,
+ "Unable to move from " + srcP + " to " + dstP);
+ }
+ }
+ return;
+ }
+
+ final ExecutorService pool = Executors.newFixedThreadPool(moveThreadsCount,
+ new
ThreadFactoryBuilder().setDaemon(true).setNameFormat("Move-Thread-%d").build());
+
+ for (final Pair<Path, Path> pair: pairs) {
Path srcP = pair.getLeft();
Path dstP = pair.getRight();
- final String msg = "Unable to move source " + srcP + " to destination "
+ dstP;
- if (null==pool) {
- moveFile(srcFs, srcP, destFs, dstP, conf, canRename);
- } else {
- futures.add(pool.submit(new Callable<Pair<Path, Path>>() {
- @Override
- public Pair<Path, Path> call() throws IOException {
- if (moveFile(srcFs, srcP, destFs, dstP, conf, canRename)) {
- return pair;
- } else {
- throw new HCatException(ErrorType.ERROR_MOVE_FAILED, msg);
- }
+ futures.add(pool.submit(new Callable<Pair<Path, Path>>() {
+ @Override
+ public Pair<Path, Path> call() throws IOException {
+ if (moveFile(srcFs, srcP, destFs, dstP, conf, canRename)) {
+ return pair;
+ } else {
+ throw new HCatException(ErrorType.ERROR_MOVE_FAILED,
+ "Unable to move from " + srcP + " to " + dstP);
}
- }));
- }
- }
- if (null != pool) {
- pool.shutdown();
- for (Future<Pair<Path, Path>> future : futures) {
- try {
- Pair<Path, Path> pair = future.get();
- LOG.debug("Moved src: {}, to dest: {}", pair.getLeft().toString(),
pair.getRight().toString());
- } catch (Exception e) {
- LOG.error("Failed to move {}", e.getMessage());
- pool.shutdownNow();
- throw new HCatException(ErrorType.ERROR_MOVE_FAILED, e.getMessage());
}
+ }));
+ }
+ pool.shutdown();
+ for (Future<Pair<Path, Path>> future : futures) {
+ try {
+ future.get();
+ } catch (Exception e) {
+ pool.shutdownNow();
+ throw new HCatException(ErrorType.ERROR_MOVE_FAILED, e.getMessage());
}
}
}
- private boolean moveFile(FileSystem srcFs, Path srcf, FileSystem destFs,
Path destf, Configuration conf, boolean canRename) throws IOException {
+ private boolean moveFile(final FileSystem srcFs, final Path srcf, final
FileSystem destFs, final Path destf,
+ final Configuration conf, final boolean canRename) throws IOException {
+ LOG.debug("Moving src: {}, to dest: {}", srcf, destf);
boolean moved;
if (canRename) {
destFs.mkdirs(destf.getParent());
diff --git
a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatDynamicPartitioned.java
b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatDynamicPartitioned.java
index a97162de993..9ee887b933b 100644
---
a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatDynamicPartitioned.java
+++
b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatDynamicPartitioned.java
@@ -52,13 +52,13 @@ public class TestHCatDynamicPartitioned extends
HCatMapReduceTest {
private static List<HCatFieldSchema> dataColumns;
private static final Logger LOG =
LoggerFactory.getLogger(TestHCatDynamicPartitioned.class);
protected static final int NUM_RECORDS = 20;
- protected static final int NUM_PARTITIONS = 5;
+ protected static final int NUM_TOP_PARTITIONS = 5;
public TestHCatDynamicPartitioned(String formatName, String serdeClass,
String inputFormatClass,
String outputFormatClass) throws Exception {
super(formatName, serdeClass, inputFormatClass, outputFormatClass);
tableName = "testHCatDynamicPartitionedTable_" + formatName;
- generateWriteRecords(NUM_RECORDS, NUM_PARTITIONS, 0);
+ generateWriteRecords(NUM_RECORDS, NUM_TOP_PARTITIONS, 0);
generateDataColumns();
}
@@ -67,6 +67,8 @@ public class TestHCatDynamicPartitioned extends
HCatMapReduceTest {
dataColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c1",
serdeConstants.INT_TYPE_NAME, "")));
dataColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c2",
serdeConstants.STRING_TYPE_NAME, "")));
dataColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("p1",
serdeConstants.STRING_TYPE_NAME, "")));
+ dataColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("p2",
serdeConstants.STRING_TYPE_NAME, "")));
+
}
protected static void generateWriteRecords(int max, int mod, int offset) {
@@ -78,6 +80,7 @@ public class TestHCatDynamicPartitioned extends
HCatMapReduceTest {
objList.add(i);
objList.add("strvalue" + i);
objList.add(String.valueOf((i % mod) + offset));
+ objList.add(String.valueOf((i / (max/2)) + offset));
writeRecords.add(new DefaultHCatRecord(objList));
}
}
@@ -86,6 +89,7 @@ public class TestHCatDynamicPartitioned extends
HCatMapReduceTest {
protected List<FieldSchema> getPartitionKeys() {
List<FieldSchema> fields = new ArrayList<FieldSchema>();
fields.add(new FieldSchema("p1", serdeConstants.STRING_TYPE_NAME, ""));
+ fields.add(new FieldSchema("p2", serdeConstants.STRING_TYPE_NAME, ""));
return fields;
}
@@ -117,8 +121,11 @@ public class TestHCatDynamicPartitioned extends
HCatMapReduceTest {
protected void runHCatDynamicPartitionedTable(boolean asSingleMapTask,
String customDynamicPathPattern) throws Exception {
- generateWriteRecords(NUM_RECORDS, NUM_PARTITIONS, 0);
- runMRCreate(null, dataColumns, writeRecords, NUM_RECORDS, true,
asSingleMapTask, customDynamicPathPattern);
+ generateWriteRecords(NUM_RECORDS, NUM_TOP_PARTITIONS, 0);
+ runMRCreate(null, dataColumns, writeRecords.subList(0,NUM_RECORDS/2),
NUM_RECORDS/2,
+ true, asSingleMapTask, customDynamicPathPattern);
+ runMRCreate(null, dataColumns,
writeRecords.subList(NUM_RECORDS/2,NUM_RECORDS), NUM_RECORDS/2,
+ true, asSingleMapTask, customDynamicPathPattern);
runMRRead(NUM_RECORDS);
@@ -140,7 +147,7 @@ public class TestHCatDynamicPartitioned extends
HCatMapReduceTest {
//Test for duplicate publish
IOException exc = null;
try {
- generateWriteRecords(NUM_RECORDS, NUM_PARTITIONS, 0);
+ generateWriteRecords(NUM_RECORDS, NUM_TOP_PARTITIONS, 0);
Job job = runMRCreate(null, dataColumns, writeRecords, NUM_RECORDS,
false,
true, customDynamicPathPattern);
@@ -167,7 +174,7 @@ public class TestHCatDynamicPartitioned extends
HCatMapReduceTest {
driver.run(query);
res = new ArrayList<String>();
driver.getResults(res);
- assertEquals(NUM_PARTITIONS, res.size());
+ assertEquals(NUM_TOP_PARTITIONS*2, res.size());
query = "select * from " + tableName;
driver.run(query);
diff --git
a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatExternalDynamicPartitioned.java
b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatExternalDynamicPartitioned.java
index 18fcfdbdd2a..f142f3d488f 100644
---
a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatExternalDynamicPartitioned.java
+++
b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatExternalDynamicPartitioned.java
@@ -28,7 +28,7 @@ public class TestHCatExternalDynamicPartitioned extends
TestHCatDynamicPartition
throws Exception {
super(formatName, serdeClass, inputFormatClass, outputFormatClass);
tableName = "testHCatExternalDynamicPartitionedTable_" + formatName;
- generateWriteRecords(NUM_RECORDS, NUM_PARTITIONS, 0);
+ generateWriteRecords(NUM_RECORDS, NUM_TOP_PARTITIONS, 0);
generateDataColumns();
}
@@ -43,7 +43,7 @@ public class TestHCatExternalDynamicPartitioned extends
TestHCatDynamicPartition
*/
@Test
public void testHCatExternalDynamicCustomLocation() throws Exception {
- runHCatDynamicPartitionedTable(true, "mapred/externalDynamicOutput/${p1}");
+ runHCatDynamicPartitionedTable(true,
"mapred/externalDynamicOutput/${p1}/${p2}");
}
}