This is an automated email from the ASF dual-hosted git repository.
abstractdog pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 3ba23a022f9 Revert "HIVE-27951: hcatalog dynamic partitioning fails
with partition already exist error when exist parent partitions path (#4937)"
3ba23a022f9 is described below
commit 3ba23a022f9e57b9a03afb48c8c2032698006f96
Author: Laszlo Bodor <[email protected]>
AuthorDate: Fri Jan 5 03:48:51 2024 +0100
Revert "HIVE-27951: hcatalog dynamic partitioning fails with partition
already exist error when exist parent partitions path (#4937)"
This reverts commit 96d46dc36cfd3a68c73f8c77e1f97c1c78507b24.
---
.../mapreduce/FileOutputCommitterContainer.java | 37 ++++++++--------------
.../mapreduce/TestHCatDynamicPartitioned.java | 17 ++++------
.../TestHCatExternalDynamicPartitioned.java | 4 +--
3 files changed, 21 insertions(+), 37 deletions(-)
diff --git
a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputCommitterContainer.java
b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputCommitterContainer.java
index 2ad306165d1..de9ad252ff2 100644
---
a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputCommitterContainer.java
+++
b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputCommitterContainer.java
@@ -488,7 +488,7 @@ class FileOutputCommitterContainer extends
OutputCommitterContainer {
}
/**
- * Move task output from the temp directory to the final location
+ * Move all of the files from the temp directory to the final location
* @param srcf the file to move
* @param srcDir the source directory
* @param destDir the target directory
@@ -538,17 +538,17 @@ class FileOutputCommitterContainer extends
OutputCommitterContainer {
final Path finalOutputPath = getFinalPath(destFs, srcF, srcDir,
destDir, immutable);
if (immutable && destFs.exists(finalOutputPath) &&
!org.apache.hadoop.hive.metastore.utils.FileUtils.isDirEmpty(destFs,
finalOutputPath)) {
- if (partitionsDiscoveredByPath.containsKey(srcF.toString())) {
- throw new HCatException(ErrorType.ERROR_DUPLICATE_PARTITION,
- "Data already exists in " + finalOutputPath
- + ", duplicate publish not possible.");
- }
- // parent directory may exist for multi-partitions, check lower
level partitions
- Collections.addAll(srcQ,
srcFs.listStatus(srcF,HIDDEN_FILES_PATH_FILTER));
- } else if (srcStatus.isDirectory()) {
+ throw new HCatException(ErrorType.ERROR_DUPLICATE_PARTITION,
+ "Data already exists in " + finalOutputPath
+ + ", duplicate publish not possible.");
+ }
+ if (srcStatus.isDirectory()) {
if (canRename && dynamicPartitioningUsed) {
// If it is partition, move the partition directory instead of
each file.
- moves.add(Pair.of(srcF, finalOutputPath));
+ // If custom dynamic location provided, need to rename to final
output path
+ final Path parentDir = finalOutputPath.getParent();
+ Path dstPath = !customDynamicLocationUsed ? parentDir :
finalOutputPath;
+ moves.add(Pair.of(srcF, dstPath));
} else {
Collections.addAll(srcQ, srcFs.listStatus(srcF,
HIDDEN_FILES_PATH_FILTER));
}
@@ -558,27 +558,16 @@ class FileOutputCommitterContainer extends
OutputCommitterContainer {
}
}
- bulkMoveFiles(conf, srcFs, destFs, moves);
- }
-
- /**
- * Bulk move files from source to destination.
- * @param srcFs the source filesystem where the source files are
- * @param destFs the destionation filesystem where the destionation files are
- * @param pairs list of pairs of <source_path, destination_path>, move
source_path to destination_path
- * @throws java.io.IOException
- */
- private void bulkMoveFiles(final Configuration conf, final FileSystem srcFs,
final FileSystem destFs, List<Pair<Path, Path>> pairs) throws IOException{
- if (pairs.isEmpty()) {
+ if (moves.isEmpty()) {
return;
}
- final boolean canRename = srcFs.getUri().equals(destFs.getUri());
+
final List<Future<Pair<Path, Path>>> futures = new LinkedList<>();
final ExecutorService pool =
conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25) > 0 ?
Executors.newFixedThreadPool(conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname,
25),
new
ThreadFactoryBuilder().setDaemon(true).setNameFormat("Move-Thread-%d").build())
: null;
- for (final Pair<Path, Path> pair: pairs){
+ for (final Pair<Path, Path> pair: moves){
Path srcP = pair.getLeft();
Path dstP = pair.getRight();
final String msg = "Unable to move source " + srcP + " to destination "
+ dstP;
diff --git
a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatDynamicPartitioned.java
b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatDynamicPartitioned.java
index 5ee3a6348d1..a97162de993 100644
---
a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatDynamicPartitioned.java
+++
b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatDynamicPartitioned.java
@@ -52,13 +52,13 @@ public class TestHCatDynamicPartitioned extends
HCatMapReduceTest {
private static List<HCatFieldSchema> dataColumns;
private static final Logger LOG =
LoggerFactory.getLogger(TestHCatDynamicPartitioned.class);
protected static final int NUM_RECORDS = 20;
- protected static final int NUM_TOP_PARTITIONS = 5;
+ protected static final int NUM_PARTITIONS = 5;
public TestHCatDynamicPartitioned(String formatName, String serdeClass,
String inputFormatClass,
String outputFormatClass) throws Exception {
super(formatName, serdeClass, inputFormatClass, outputFormatClass);
tableName = "testHCatDynamicPartitionedTable_" + formatName;
- generateWriteRecords(NUM_RECORDS, NUM_TOP_PARTITIONS, 0);
+ generateWriteRecords(NUM_RECORDS, NUM_PARTITIONS, 0);
generateDataColumns();
}
@@ -67,8 +67,6 @@ public class TestHCatDynamicPartitioned extends
HCatMapReduceTest {
dataColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c1",
serdeConstants.INT_TYPE_NAME, "")));
dataColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c2",
serdeConstants.STRING_TYPE_NAME, "")));
dataColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("p1",
serdeConstants.STRING_TYPE_NAME, "")));
- dataColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("p2",
serdeConstants.STRING_TYPE_NAME, "")));
-
}
protected static void generateWriteRecords(int max, int mod, int offset) {
@@ -80,7 +78,6 @@ public class TestHCatDynamicPartitioned extends
HCatMapReduceTest {
objList.add(i);
objList.add("strvalue" + i);
objList.add(String.valueOf((i % mod) + offset));
- objList.add(String.valueOf((i / (max/2)) + offset));
writeRecords.add(new DefaultHCatRecord(objList));
}
}
@@ -89,7 +86,6 @@ public class TestHCatDynamicPartitioned extends
HCatMapReduceTest {
protected List<FieldSchema> getPartitionKeys() {
List<FieldSchema> fields = new ArrayList<FieldSchema>();
fields.add(new FieldSchema("p1", serdeConstants.STRING_TYPE_NAME, ""));
- fields.add(new FieldSchema("p2", serdeConstants.STRING_TYPE_NAME, ""));
return fields;
}
@@ -121,9 +117,8 @@ public class TestHCatDynamicPartitioned extends
HCatMapReduceTest {
protected void runHCatDynamicPartitionedTable(boolean asSingleMapTask,
String customDynamicPathPattern) throws Exception {
- generateWriteRecords(NUM_RECORDS, NUM_TOP_PARTITIONS, 0);
- runMRCreate(null, dataColumns, writeRecords.subList(0,NUM_RECORDS/2),
NUM_RECORDS/2, true, asSingleMapTask, customDynamicPathPattern);
- runMRCreate(null, dataColumns,
writeRecords.subList(NUM_RECORDS/2,NUM_RECORDS), NUM_RECORDS/2, true,
asSingleMapTask, customDynamicPathPattern);
+ generateWriteRecords(NUM_RECORDS, NUM_PARTITIONS, 0);
+ runMRCreate(null, dataColumns, writeRecords, NUM_RECORDS, true,
asSingleMapTask, customDynamicPathPattern);
runMRRead(NUM_RECORDS);
@@ -145,7 +140,7 @@ public class TestHCatDynamicPartitioned extends
HCatMapReduceTest {
//Test for duplicate publish
IOException exc = null;
try {
- generateWriteRecords(NUM_RECORDS, NUM_TOP_PARTITIONS, 0);
+ generateWriteRecords(NUM_RECORDS, NUM_PARTITIONS, 0);
Job job = runMRCreate(null, dataColumns, writeRecords, NUM_RECORDS,
false,
true, customDynamicPathPattern);
@@ -172,7 +167,7 @@ public class TestHCatDynamicPartitioned extends
HCatMapReduceTest {
driver.run(query);
res = new ArrayList<String>();
driver.getResults(res);
- assertEquals(NUM_TOP_PARTITIONS*2, res.size());
+ assertEquals(NUM_PARTITIONS, res.size());
query = "select * from " + tableName;
driver.run(query);
diff --git
a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatExternalDynamicPartitioned.java
b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatExternalDynamicPartitioned.java
index 9698f178a8e..18fcfdbdd2a 100644
---
a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatExternalDynamicPartitioned.java
+++
b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatExternalDynamicPartitioned.java
@@ -28,7 +28,7 @@ public class TestHCatExternalDynamicPartitioned extends
TestHCatDynamicPartition
throws Exception {
super(formatName, serdeClass, inputFormatClass, outputFormatClass);
tableName = "testHCatExternalDynamicPartitionedTable_" + formatName;
- generateWriteRecords(NUM_RECORDS, NUM_TOP_PARTITIONS, 0);
+ generateWriteRecords(NUM_RECORDS, NUM_PARTITIONS, 0);
generateDataColumns();
}
@@ -43,7 +43,7 @@ public class TestHCatExternalDynamicPartitioned extends
TestHCatDynamicPartition
*/
@Test
public void testHCatExternalDynamicCustomLocation() throws Exception {
- runHCatDynamicPartitionedTable(true,
"mapred/externalDynamicOutput/${p1}/{p2}");
+ runHCatDynamicPartitionedTable(true, "mapred/externalDynamicOutput/${p1}");
}
}