This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new da2919a [HUDI-1383] Fixing sorting of partition vals for hive sync
computation (#2402)
da2919a is described below
commit da2919a75f564be6c3d731a2c503959e416ebe71
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Wed Jan 6 07:49:44 2021 -0500
[HUDI-1383] Fixing sorting of partition vals for hive sync computation
(#2402)
---
.../org/apache/hudi/hive/HoodieHiveClient.java | 2 -
.../org/apache/hudi/hive/TestHiveSyncTool.java | 66 ++++++++++++++++++----
.../apache/hudi/hive/testutils/HiveTestUtil.java | 20 +++++++
3 files changed, 75 insertions(+), 13 deletions(-)
diff --git
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java
index 5c0c128..b621167 100644
---
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java
+++
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java
@@ -207,7 +207,6 @@ public class HoodieHiveClient extends
AbstractSyncHoodieClient {
Map<String, String> paths = new HashMap<>();
for (Partition tablePartition : tablePartitions) {
List<String> hivePartitionValues = tablePartition.getValues();
- Collections.sort(hivePartitionValues);
String fullTablePartitionPath =
Path.getPathWithoutSchemeAndAuthority(new
Path(tablePartition.getSd().getLocation())).toUri().getPath();
paths.put(String.join(", ", hivePartitionValues),
fullTablePartitionPath);
@@ -219,7 +218,6 @@ public class HoodieHiveClient extends
AbstractSyncHoodieClient {
String fullStoragePartitionPath =
Path.getPathWithoutSchemeAndAuthority(storagePartitionPath).toUri().getPath();
// Check if the partition values or if hdfs path is the same
List<String> storagePartitionValues =
partitionValueExtractor.extractPartitionValuesInPath(storagePartition);
- Collections.sort(storagePartitionValues);
if (!storagePartitionValues.isEmpty()) {
String storageValue = String.join(", ", storagePartitionValues);
if (!paths.containsKey(storageValue)) {
diff --git
a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
index 1d8cbd8..8a1ea4f 100644
---
a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
+++
b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
@@ -21,10 +21,10 @@ package org.apache.hudi.hive;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.testutils.SchemaTestUtil;
import org.apache.hudi.common.util.Option;
-import org.apache.hudi.sync.common.AbstractSyncHoodieClient.PartitionEvent;
-import
org.apache.hudi.sync.common.AbstractSyncHoodieClient.PartitionEvent.PartitionEventType;
import org.apache.hudi.hive.testutils.HiveTestUtil;
import org.apache.hudi.hive.util.HiveSchemaUtil;
+import org.apache.hudi.sync.common.AbstractSyncHoodieClient.PartitionEvent;
+import
org.apache.hudi.sync.common.AbstractSyncHoodieClient.PartitionEvent.PartitionEventType;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.parquet.schema.MessageType;
@@ -56,7 +56,7 @@ public class TestHiveSyncTool {
}
private static Iterable<Object[]> useJdbcAndSchemaFromCommitMetadata() {
- return Arrays.asList(new Object[][] { { true, true }, { true, false }, {
false, true }, { false, false } });
+ return Arrays.asList(new Object[][] {{true, true}, {true, false}, {false,
true}, {false, false}});
}
@BeforeEach
@@ -347,7 +347,7 @@ public class TestHiveSyncTool {
assertEquals(hiveClient.getTableSchema(roTableName).size(),
SchemaTestUtil.getSimpleSchema().getFields().size() +
HiveTestUtil.hiveSyncConfig.partitionFields.size()
+ HoodieRecord.HOODIE_META_COLUMNS.size(),
- "Hive Schema should match the table schema + partition
field");
+ "Hive Schema should match the table schema + partition field");
} else {
// The data generated and schema in the data file do not have metadata
columns, so we need a separate check.
assertEquals(hiveClient.getTableSchema(roTableName).size(),
@@ -377,7 +377,7 @@ public class TestHiveSyncTool {
assertEquals(hiveClient.getTableSchema(roTableName).size(),
SchemaTestUtil.getEvolvedSchema().getFields().size() +
HiveTestUtil.hiveSyncConfig.partitionFields.size()
+ HoodieRecord.HOODIE_META_COLUMNS.size(),
- "Hive Schema should match the evolved table schema +
partition field");
+ "Hive Schema should match the evolved table schema + partition
field");
} else {
// The data generated and schema in the data file do not have metadata
columns, so we need a separate check.
assertEquals(hiveClient.getTableSchema(roTableName).size(),
@@ -418,7 +418,7 @@ public class TestHiveSyncTool {
assertEquals(hiveClientRT.getTableSchema(snapshotTableName).size(),
SchemaTestUtil.getSimpleSchema().getFields().size() +
HiveTestUtil.hiveSyncConfig.partitionFields.size()
+ HoodieRecord.HOODIE_META_COLUMNS.size(),
- "Hive Schema should match the table schema + partition
field");
+ "Hive Schema should match the table schema + partition field");
} else {
// The data generated and schema in the data file do not have metadata
columns, so we need a separate check.
assertEquals(hiveClientRT.getTableSchema(snapshotTableName).size(),
@@ -489,6 +489,50 @@ public class TestHiveSyncTool {
"Table partitions should match the number of partitions we wrote");
assertEquals(instantTime,
hiveClient.getLastCommitTimeSynced(hiveSyncConfig.tableName).get(),
"The last commit that was sycned should be updated in the
TBLPROPERTIES");
+
+ // HoodieHiveClient had a bug where partition vals were sorted
+ // and stored as keys in a map. The following tests this particular case.
+ // Now lets create partition "2010/01/02" and followed by "2010/02/01".
+ String commitTime2 = "101";
+ HiveTestUtil.addCOWPartition("2010/01/02", true, true, commitTime2);
+
+ hiveClient = new HoodieHiveClient(hiveSyncConfig,
HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
+ List<String> writtenPartitionsSince =
hiveClient.getPartitionsWrittenToSince(Option.of(instantTime));
+ assertEquals(1, writtenPartitionsSince.size(), "We should have one
partition written after 100 commit");
+ List<Partition> hivePartitions =
hiveClient.scanTablePartitions(hiveSyncConfig.tableName);
+ List<PartitionEvent> partitionEvents =
hiveClient.getPartitionEvents(hivePartitions, writtenPartitionsSince);
+ assertEquals(1, partitionEvents.size(), "There should be only one
paritition event");
+ assertEquals(PartitionEventType.ADD,
partitionEvents.iterator().next().eventType, "The one partition event must of
type ADD");
+
+ tool = new HiveSyncTool(hiveSyncConfig, HiveTestUtil.getHiveConf(),
HiveTestUtil.fileSystem);
+ tool.syncHoodieTable();
+
+ // Sync should add the one partition
+ assertEquals(6,
hiveClient.scanTablePartitions(hiveSyncConfig.tableName).size(),
+ "Table partitions should match the number of partitions we wrote");
+ assertEquals(commitTime2,
hiveClient.getLastCommitTimeSynced(hiveSyncConfig.tableName).get(),
+ "The last commit that was sycned should be 101");
+
+ // create partition "2010/02/01" and ensure sync works
+ String commitTime3 = "102";
+ HiveTestUtil.addCOWPartition("2010/02/01", true, true, commitTime3);
+ HiveTestUtil.getCreatedTablesSet().add(hiveSyncConfig.databaseName + "." +
hiveSyncConfig.tableName);
+
+ hiveClient = new HoodieHiveClient(hiveSyncConfig,
HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
+
+ tool = new HiveSyncTool(hiveSyncConfig, HiveTestUtil.getHiveConf(),
HiveTestUtil.fileSystem);
+ tool.syncHoodieTable();
+
+ assertTrue(hiveClient.doesTableExist(hiveSyncConfig.tableName),
+ "Table " + hiveSyncConfig.tableName + " should exist after sync
completes");
+ assertEquals(hiveClient.getTableSchema(hiveSyncConfig.tableName).size(),
+ hiveClient.getDataSchema().getColumns().size() + 3,
+ "Hive Schema should match the table schema + partition fields");
+ assertEquals(7,
hiveClient.scanTablePartitions(hiveSyncConfig.tableName).size(),
+ "Table partitions should match the number of partitions we wrote");
+ assertEquals(commitTime3,
hiveClient.getLastCommitTimeSynced(hiveSyncConfig.tableName).get(),
+ "The last commit that was sycned should be updated in the
TBLPROPERTIES");
+ assertEquals(1,
hiveClient.getPartitionsWrittenToSince(Option.of(commitTime2)).size());
}
@ParameterizedTest
@@ -507,17 +551,17 @@ public class TestHiveSyncTool {
HoodieHiveClient hiveClient = new HoodieHiveClient(hiveSyncConfig,
HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
assertFalse(hiveClient.doesTableExist(hiveSyncConfig.tableName),
- "Table " + hiveSyncConfig.tableName + " should not exist
initially");
+ "Table " + hiveSyncConfig.tableName + " should not exist initially");
// Lets do the sync
HiveSyncTool tool = new HiveSyncTool(hiveSyncConfig,
HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
tool.syncHoodieTable();
assertTrue(hiveClient.doesTableExist(hiveSyncConfig.tableName),
- "Table " + hiveSyncConfig.tableName + " should exist after sync
completes");
+ "Table " + hiveSyncConfig.tableName + " should exist after sync
completes");
assertEquals(hiveClient.getTableSchema(hiveSyncConfig.tableName).size(),
- hiveClient.getDataSchema().getColumns().size(),
- "Hive Schema should match the table schema,ignoring the partition
fields");
+ hiveClient.getDataSchema().getColumns().size(),
+ "Hive Schema should match the table schema,ignoring the partition
fields");
assertEquals(0,
hiveClient.scanTablePartitions(hiveSyncConfig.tableName).size(),
- "Table should not have partitions because of the
NonPartitionedExtractor");
+ "Table should not have partitions because of the
NonPartitionedExtractor");
}
@ParameterizedTest
diff --git
a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java
b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java
index d0d1b66..0909053 100644
---
a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java
+++
b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java
@@ -210,6 +210,14 @@ public class HiveTestUtil {
createCommitFile(commitMetadata, instantTime);
}
+ public static void addCOWPartition(String partitionPath, boolean
isParquetSchemaSimple,
+ boolean useSchemaFromCommitMetadata, String instantTime) throws
IOException, URISyntaxException {
+ HoodieCommitMetadata commitMetadata =
+ createPartition(partitionPath, isParquetSchemaSimple,
useSchemaFromCommitMetadata, instantTime);
+ createdTablesSet.add(hiveSyncConfig.databaseName + "." +
hiveSyncConfig.tableName);
+ createCommitFile(commitMetadata, instantTime);
+ }
+
public static void addMORPartitions(int numberOfPartitions, boolean
isParquetSchemaSimple, boolean isLogSchemaSimple,
boolean useSchemaFromCommitMetadata, DateTime startFrom, String
instantTime, String deltaCommitTime)
throws IOException, URISyntaxException, InterruptedException {
@@ -266,6 +274,18 @@ public class HiveTestUtil {
return commitMetadata;
}
+ private static HoodieCommitMetadata createPartition(String partitionPath,
boolean isParquetSchemaSimple,
+ boolean useSchemaFromCommitMetadata, String instantTime) throws
IOException, URISyntaxException {
+ HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
+ Path partPath = new Path(hiveSyncConfig.basePath + "/" + partitionPath);
+ fileSystem.makeQualified(partPath);
+ fileSystem.mkdirs(partPath);
+ List<HoodieWriteStat> writeStats = createTestData(partPath,
isParquetSchemaSimple, instantTime);
+ writeStats.forEach(s -> commitMetadata.addWriteStat(partitionPath, s));
+ addSchemaToCommitMetadata(commitMetadata, isParquetSchemaSimple,
useSchemaFromCommitMetadata);
+ return commitMetadata;
+ }
+
private static List<HoodieWriteStat> createTestData(Path partPath, boolean
isParquetSchemaSimple, String instantTime)
throws IOException, URISyntaxException {
List<HoodieWriteStat> writeStats = new ArrayList<>();