This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 6ef00d147c1 [HUDI-5816] List all partitions as the fallback mechanism
in Hive and Glue Sync (#8388)
6ef00d147c1 is described below
commit 6ef00d147c12e41f71958299a6cb54eae1dce2f6
Author: Y Ethan Guo <[email protected]>
AuthorDate: Sun May 7 21:44:22 2023 -0700
[HUDI-5816] List all partitions as the fallback mechanism in Hive and Glue
Sync (#8388)
- Avoid loading archived timeline during Hive and Glue Sync.
- Add the fallback mechanism in Hive and Glue catalog sync
so that if the last commit time synced falls behind to be before
the start of the active timeline of Hudi table, the sync gets all
partition paths on storage and resolves the difference compared
to what's in the metastore, instead of reading archived timeline.
- Enhances the tests to cover the new logic.
---
.../java/org/apache/hudi/common/fs/FSUtils.java | 3 +-
.../java/org/apache/hudi/hive/HiveSyncTool.java | 112 +++++++++++++++------
.../org/apache/hudi/hive/TestHiveSyncTool.java | 77 +++++++++++++-
.../apache/hudi/hive/testutils/HiveTestUtil.java | 58 +++++++++--
.../apache/hudi/sync/common/HoodieSyncClient.java | 106 ++++++++++++++++---
5 files changed, 297 insertions(+), 59 deletions(-)
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
index b84298e80d8..a7a32ae527e 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
@@ -235,7 +235,8 @@ public class FSUtils {
String fullPartitionPathStr = fullPartitionPath.toString();
if (!fullPartitionPathStr.startsWith(basePath.toString())) {
- throw new IllegalArgumentException("Partition path does not belong to
base-path");
+ throw new IllegalArgumentException("Partition path \"" +
fullPartitionPathStr
+ + "\" does not belong to base-path \"" + basePath + "\"");
}
int partitionStartIndex = fullPartitionPathStr.indexOf(basePath.getName(),
diff --git
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
index 95048dd8a71..5d35eeef87e 100644
---
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
+++
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
@@ -258,13 +258,28 @@ public class HiveSyncTool extends HoodieSyncTool
implements AutoCloseable {
lastCommitTimeSynced = syncClient.getLastCommitTimeSynced(tableName);
}
LOG.info("Last commit time synced was found to be " +
lastCommitTimeSynced.orElse("null"));
- List<String> writtenPartitionsSince =
syncClient.getWrittenPartitionsSince(lastCommitTimeSynced);
- LOG.info("Storage partitions scan complete. Found " +
writtenPartitionsSince.size());
- // Sync the partitions if needed
- // find dropped partitions, if any, in the latest commit
- Set<String> droppedPartitions =
syncClient.getDroppedPartitionsSince(lastCommitTimeSynced);
- boolean partitionsChanged = syncPartitions(tableName,
writtenPartitionsSince, droppedPartitions);
+ boolean partitionsChanged;
+ if (!lastCommitTimeSynced.isPresent()
+ ||
syncClient.getActiveTimeline().isBeforeTimelineStarts(lastCommitTimeSynced.get()))
{
+ // If the last commit time synced is before the start of the active
timeline,
+ // the Hive sync falls back to list all partitions on storage, instead of
+ // reading active and archived timelines for written partitions.
+ LOG.info("Sync all partitions given the last commit time synced is empty
or "
+ + "before the start of the active timeline. Listing all partitions
in "
+ + config.getString(META_SYNC_BASE_PATH)
+ + ", file system: " + config.getHadoopFileSystem());
+ partitionsChanged = syncAllPartitions(tableName);
+ } else {
+ List<String> writtenPartitionsSince =
syncClient.getWrittenPartitionsSince(lastCommitTimeSynced);
+ LOG.info("Storage partitions scan complete. Found " +
writtenPartitionsSince.size());
+
+ // Sync the partitions if needed
+ // find dropped partitions, if any, in the latest commit
+ Set<String> droppedPartitions =
syncClient.getDroppedPartitionsSince(lastCommitTimeSynced);
+ partitionsChanged = syncPartitions(tableName, writtenPartitionsSince,
droppedPartitions);
+ }
+
boolean meetSyncConditions = schemaChanged || partitionsChanged;
if (!config.getBoolean(META_SYNC_CONDITIONAL_SYNC) || meetSyncConditions) {
syncClient.updateLastCommitTimeSynced(tableName);
@@ -366,46 +381,83 @@ public class HiveSyncTool extends HoodieSyncTool
implements AutoCloseable {
PartitionFilterGenerator.generatePushDownFilter(writtenPartitions,
partitionFields, config));
}
+ /**
+ * Syncs all partitions on storage to the metastore, by only making
incremental changes.
+ *
+ * @param tableName The table name in the metastore.
+ * @return {@code true} if one or more partition(s) are changed in the
metastore;
+ * {@code false} otherwise.
+ */
+ private boolean syncAllPartitions(String tableName) {
+ try {
+ if (config.getSplitStrings(META_SYNC_PARTITION_FIELDS).isEmpty()) {
+ return false;
+ }
+
+ List<Partition> allPartitionsInMetastore =
syncClient.getAllPartitions(tableName);
+ List<String> allPartitionsOnStorage =
syncClient.getAllPartitionPathsOnStorage();
+ return syncPartitions(
+ tableName,
+ syncClient.getPartitionEvents(allPartitionsInMetastore,
allPartitionsOnStorage));
+ } catch (Exception e) {
+ throw new HoodieHiveSyncException("Failed to sync partitions for table "
+ tableName, e);
+ }
+ }
+
/**
* Syncs the list of storage partitions passed in (checks if the partition
is in hive, if not adds it or if the
* partition path does not match, it updates the partition path).
*
- * @param writtenPartitionsSince partitions has been added, updated, or
dropped since last synced.
+ * @param tableName The table name in the metastore.
+ * @param writtenPartitionsSince Partitions has been added, updated, or
dropped since last synced.
+ * @param droppedPartitions Partitions that are dropped since last sync.
+ * @return {@code true} if one or more partition(s) are changed in the
metastore;
+ * {@code false} otherwise.
*/
private boolean syncPartitions(String tableName, List<String>
writtenPartitionsSince, Set<String> droppedPartitions) {
- boolean partitionsChanged;
try {
if (writtenPartitionsSince.isEmpty() ||
config.getSplitStrings(META_SYNC_PARTITION_FIELDS).isEmpty()) {
return false;
}
List<Partition> hivePartitions = getTablePartitions(tableName,
writtenPartitionsSince);
- List<PartitionEvent> partitionEvents =
- syncClient.getPartitionEvents(hivePartitions,
writtenPartitionsSince, droppedPartitions);
-
- List<String> newPartitions = filterPartitions(partitionEvents,
PartitionEventType.ADD);
- if (!newPartitions.isEmpty()) {
- LOG.info("New Partitions " + newPartitions);
- syncClient.addPartitionsToTable(tableName, newPartitions);
- }
+ return syncPartitions(
+ tableName,
+ syncClient.getPartitionEvents(
+ hivePartitions, writtenPartitionsSince, droppedPartitions));
+ } catch (Exception e) {
+ throw new HoodieHiveSyncException("Failed to sync partitions for table "
+ tableName, e);
+ }
+ }
- List<String> updatePartitions = filterPartitions(partitionEvents,
PartitionEventType.UPDATE);
- if (!updatePartitions.isEmpty()) {
- LOG.info("Changed Partitions " + updatePartitions);
- syncClient.updatePartitionsToTable(tableName, updatePartitions);
- }
+ /**
+ * Syncs added, updated, and dropped partitions to the metastore.
+ *
+ * @param tableName The table name in the metastore.
+ * @param partitionEventList The partition change event list.
+ * @return {@code true} if one or more partition(s) are changed in the
metastore;
+ * {@code false} otherwise.
+ */
+ private boolean syncPartitions(String tableName, List<PartitionEvent>
partitionEventList) {
+ List<String> newPartitions = filterPartitions(partitionEventList,
PartitionEventType.ADD);
+ if (!newPartitions.isEmpty()) {
+ LOG.info("New Partitions " + newPartitions);
+ syncClient.addPartitionsToTable(tableName, newPartitions);
+ }
- List<String> dropPartitions = filterPartitions(partitionEvents,
PartitionEventType.DROP);
- if (!dropPartitions.isEmpty()) {
- LOG.info("Drop Partitions " + dropPartitions);
- syncClient.dropPartitions(tableName, dropPartitions);
- }
+ List<String> updatePartitions = filterPartitions(partitionEventList,
PartitionEventType.UPDATE);
+ if (!updatePartitions.isEmpty()) {
+ LOG.info("Changed Partitions " + updatePartitions);
+ syncClient.updatePartitionsToTable(tableName, updatePartitions);
+ }
- partitionsChanged = !updatePartitions.isEmpty() ||
!newPartitions.isEmpty() || !dropPartitions.isEmpty();
- } catch (Exception e) {
- throw new HoodieHiveSyncException("Failed to sync partitions for table "
+ tableName, e);
+ List<String> dropPartitions = filterPartitions(partitionEventList,
PartitionEventType.DROP);
+ if (!dropPartitions.isEmpty()) {
+ LOG.info("Drop Partitions " + dropPartitions);
+ syncClient.dropPartitions(tableName, dropPartitions);
}
- return partitionsChanged;
+
+ return !updatePartitions.isEmpty() || !newPartitions.isEmpty() ||
!dropPartitions.isEmpty();
}
private List<String> filterPartitions(List<PartitionEvent> events,
PartitionEventType eventType) {
diff --git
a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
index d1aaf024403..9a8ada9b65b 100644
---
a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
+++
b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
@@ -18,6 +18,7 @@
package org.apache.hudi.hive;
+import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieSyncTableStrategy;
@@ -71,6 +72,8 @@ import java.util.Set;
import java.util.stream.Collectors;
import static org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath;
+import static
org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION;
+import static
org.apache.hudi.common.table.timeline.HoodieTimeline.DELTA_COMMIT_ACTION;
import static
org.apache.hudi.hive.HiveSyncConfig.HIVE_SYNC_FILTER_PUSHDOWN_ENABLED;
import static
org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_AUTO_CREATE_DATABASE;
import static
org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_CREATE_MANAGED_TABLE;
@@ -282,7 +285,8 @@ public class TestHiveSyncTool {
// Manually change a hive partition location to check if the sync will
detect
// it and generate a partition update event for it.
ddlExecutor.runSQL("ALTER TABLE `" + HiveTestUtil.TABLE_NAME
- + "` PARTITION (`datestr`='2050-01-01') SET LOCATION
'/some/new/location'");
+ + "` PARTITION (`datestr`='2050-01-01') SET LOCATION '"
+ + FSUtils.getPartitionPath(basePath, "2050/1/1").toString() + "'");
hivePartitions = hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME);
List<String> writtenPartitionsSince =
hiveClient.getWrittenPartitionsSince(Option.empty());
@@ -291,14 +295,66 @@ public class TestHiveSyncTool {
assertEquals(PartitionEventType.UPDATE,
partitionEvents.iterator().next().eventType,
"The one partition event must of type UPDATE");
+ // Add a partition that does not belong to the table, i.e., not in the
same base path
+ // This should not happen in production. However, if this happens, when
doing fallback
+ // to list all partitions in the metastore and we find such a partition,
we simply ignore
+ // it without dropping it from the metastore and notify the user with an
error message,
+ // so the user may manually fix it.
+
+ String dummyBasePath = new Path(basePath).getParent().toString() +
"/dummy_basepath";
+ ddlExecutor.runSQL("ALTER TABLE `" + HiveTestUtil.TABLE_NAME
+ + "` ADD PARTITION (`datestr`='xyz') LOCATION '" + dummyBasePath +
"/xyz'");
+
// Lets do the sync
reSyncHiveTable();
// Sync should update the changed partition to correct path
List<Partition> tablePartitions =
hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME);
- assertEquals(7, tablePartitions.size(), "The one partition we wrote should
be added to hive");
+ assertEquals(8, tablePartitions.size(), "The two partitions we wrote
should be added to hive");
assertEquals(instantTime,
hiveClient.getLastCommitTimeSynced(HiveTestUtil.TABLE_NAME).get(),
"The last commit that was synced should be 100");
+
+ // Verify that there is one ADD, UPDATE, and DROP event for each type
+ hivePartitions = hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME);
+ List<String> allPartitionPathsOnStorage =
hiveClient.getAllPartitionPathsOnStorage()
+ .stream().sorted().collect(Collectors.toList());
+ String dropPartition = allPartitionPathsOnStorage.remove(0);
+ allPartitionPathsOnStorage.add("2050/01/02");
+ partitionEvents = hiveClient.getPartitionEvents(hivePartitions,
allPartitionPathsOnStorage);
+ assertEquals(3, partitionEvents.size(), "There should be only one
partition event");
+ assertEquals(
+ "2050/01/02",
+ partitionEvents.stream().filter(e -> e.eventType ==
PartitionEventType.ADD)
+ .findFirst().get().storagePartition,
+ "There should be only one partition event of type ADD");
+ assertEquals(
+ "2050/01/01",
+ partitionEvents.stream().filter(e -> e.eventType ==
PartitionEventType.UPDATE)
+ .findFirst().get().storagePartition,
+ "There should be only one partition event of type UPDATE");
+ assertEquals(
+ dropPartition,
+ partitionEvents.stream().filter(e -> e.eventType ==
PartitionEventType.DROP)
+ .findFirst().get().storagePartition,
+ "There should be only one partition event of type DROP");
+
+ // Simulate the case where the last sync timestamp is before the start of
the active timeline,
+ // by overwriting the same table with some partitions deleted and new
partitions added
+ HiveTestUtil.createCOWTable("200", 6, useSchemaFromCommitMetadata);
+ reInitHiveSyncClient();
+ reSyncHiveTable();
+ tablePartitions = hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME);
+ assertEquals(Option.of("200"),
hiveClient.getLastCommitTimeSynced(HiveTestUtil.TABLE_NAME));
+ assertEquals(7, tablePartitions.size());
+
+ // Trigger the fallback of listing all partitions again. There is no
partition change.
+ HiveTestUtil.commitToTable("300", 1, useSchemaFromCommitMetadata);
+ HiveTestUtil.removeCommitFromActiveTimeline("200", COMMIT_ACTION);
+ reInitHiveSyncClient();
+ reSyncHiveTable();
+ tablePartitions = hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME);
+ assertEquals(Option.of("300"),
hiveClient.getLastCommitTimeSynced(HiveTestUtil.TABLE_NAME));
+ assertEquals(7, tablePartitions.size());
}
@ParameterizedTest
@@ -1336,6 +1392,9 @@ public class TestHiveSyncTool {
String commitTime0 = "100";
String commitTime1 = "101";
String commitTime2 = "102";
+ String commitTime3 = "103";
+ String commitTime4 = "104";
+ String commitTime5 = "105";
HiveTestUtil.createMORTable(commitTime0, commitTime1, 2, true, true);
reInitHiveSyncClient();
@@ -1344,10 +1403,22 @@ public class TestHiveSyncTool {
assertTrue(hiveClient.tableExists(tableName));
assertEquals(commitTime1,
hiveClient.getLastCommitTimeSynced(tableName).get());
- HiveTestUtil.addMORPartitions(0, true, true, true,
ZonedDateTime.now().plusDays(2), commitTime1, commitTime2);
+ HiveTestUtil.addMORPartitions(0, true, true, true,
ZonedDateTime.now().plusDays(2), commitTime2, commitTime3);
reSyncHiveTable();
assertEquals(commitTime1,
hiveClient.getLastCommitTimeSynced(tableName).get());
+
+ // Let the last commit time synced to be before the start of the active
timeline,
+ // to trigger the fallback of listing all partitions. There is no
partition change
+ // and the last commit time synced should still be the same.
+ HiveTestUtil.addMORPartitions(0, true, true, true,
ZonedDateTime.now().plusDays(2), commitTime4, commitTime5);
+ HiveTestUtil.removeCommitFromActiveTimeline(commitTime0, COMMIT_ACTION);
+ HiveTestUtil.removeCommitFromActiveTimeline(commitTime1,
DELTA_COMMIT_ACTION);
+ HiveTestUtil.removeCommitFromActiveTimeline(commitTime2, COMMIT_ACTION);
+ HiveTestUtil.removeCommitFromActiveTimeline(commitTime3,
DELTA_COMMIT_ACTION);
+ reInitHiveSyncClient();
+ reSyncHiveTable();
+ assertEquals(commitTime1,
hiveClient.getLastCommitTimeSynced(tableName).get());
}
private void reSyncHiveTable() {
diff --git
a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java
b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java
index 4e2943be8d5..ad61d6e0d30 100644
---
a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java
+++
b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java
@@ -68,6 +68,8 @@ import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.junit.platform.commons.JUnitException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
@@ -88,6 +90,7 @@ import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
+import static
org.apache.hudi.common.table.HoodieTableMetaClient.METAFOLDER_NAME;
import static
org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_BATCH_SYNC_PARTITION_NUM;
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_PASS;
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_URL;
@@ -103,6 +106,7 @@ import static org.junit.jupiter.api.Assertions.fail;
@SuppressWarnings("SameParameterValue")
public class HiveTestUtil {
+ private static final Logger LOG =
LoggerFactory.getLogger(HiveTestUtil.class);
public static final String DB_NAME = "testdb";
public static final String TABLE_NAME = "test1";
@@ -190,22 +194,56 @@ public class HiveTestUtil {
public static void createCOWTable(String instantTime, int
numberOfPartitions, boolean useSchemaFromCommitMetadata,
String basePath, String databaseName,
String tableName) throws IOException, URISyntaxException {
Path path = new Path(basePath);
- FileIOUtils.deleteDirectory(new File(basePath));
+ if (fileSystem.exists(path)) {
+ fileSystem.delete(path, true);
+ }
HoodieTableMetaClient.withPropertyBuilder()
- .setTableType(HoodieTableType.COPY_ON_WRITE)
- .setTableName(tableName)
- .setPayloadClass(HoodieAvroPayload.class)
- .initTable(configuration, basePath);
+ .setTableType(HoodieTableType.COPY_ON_WRITE)
+ .setTableName(tableName)
+ .setPayloadClass(HoodieAvroPayload.class)
+ .initTable(configuration, basePath);
boolean result = fileSystem.mkdirs(path);
checkResult(result);
+ commitToTable(instantTime, numberOfPartitions, useSchemaFromCommitMetadata,
+ basePath, databaseName, tableName);
+ }
+
+ public static void commitToTable(
+ String instantTime, int numberOfPartitions, boolean
useSchemaFromCommitMetadata)
+ throws IOException, URISyntaxException {
+ commitToTable(instantTime, numberOfPartitions, useSchemaFromCommitMetadata,
+ basePath, DB_NAME, TABLE_NAME);
+ }
+
+ public static void commitToTable(
+ String instantTime, int numberOfPartitions, boolean
useSchemaFromCommitMetadata,
+ String basePath, String databaseName, String tableName) throws
IOException, URISyntaxException {
ZonedDateTime dateTime = ZonedDateTime.now();
HoodieCommitMetadata commitMetadata = createPartitions(numberOfPartitions,
true,
- useSchemaFromCommitMetadata, dateTime, instantTime, basePath);
+ useSchemaFromCommitMetadata, dateTime, instantTime, basePath);
createdTablesSet.add(databaseName + "." + tableName);
createCommitFile(commitMetadata, instantTime, basePath);
}
+ public static void removeCommitFromActiveTimeline(String instantTime, String
actionType) {
+ List<Path> pathsToDelete = new ArrayList<>();
+ Path metaFolderPath = new Path(basePath, METAFOLDER_NAME);
+ String actionSuffix = "." + actionType;
+ pathsToDelete.add(new Path(metaFolderPath, instantTime + actionSuffix));
+ pathsToDelete.add(new Path(metaFolderPath, instantTime + actionSuffix +
".requested"));
+ pathsToDelete.add(new Path(metaFolderPath, instantTime + actionSuffix +
".inflight"));
+ pathsToDelete.forEach(path -> {
+ try {
+ if (fileSystem.exists(path)) {
+ fileSystem.delete(path, false);
+ }
+ } catch (IOException e) {
+ LOG.warn("Error deleting file: ", e);
+ }
+ });
+ }
+
public static void createCOWTable(String instantTime, int
numberOfPartitions, boolean useSchemaFromCommitMetadata)
throws IOException, URISyntaxException {
createCOWTable(instantTime, numberOfPartitions,
useSchemaFromCommitMetadata, basePath, DB_NAME, TABLE_NAME);
@@ -481,7 +519,7 @@ public class HiveTestUtil {
public static void createCommitFile(HoodieCommitMetadata commitMetadata,
String instantTime, String basePath) throws IOException {
byte[] bytes =
commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8);
- Path fullPath = new Path(basePath + "/" +
HoodieTableMetaClient.METAFOLDER_NAME + "/"
+ Path fullPath = new Path(basePath + "/" + METAFOLDER_NAME + "/"
+ HoodieTimeline.makeCommitFileName(instantTime));
FSDataOutputStream fsout = fileSystem.create(fullPath, true);
fsout.write(bytes);
@@ -490,7 +528,7 @@ public class HiveTestUtil {
public static void createReplaceCommitFile(HoodieReplaceCommitMetadata
commitMetadata, String instantTime) throws IOException {
byte[] bytes =
commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8);
- Path fullPath = new Path(basePath + "/" +
HoodieTableMetaClient.METAFOLDER_NAME + "/"
+ Path fullPath = new Path(basePath + "/" + METAFOLDER_NAME + "/"
+ HoodieTimeline.makeReplaceFileName(instantTime));
FSDataOutputStream fsout = fileSystem.create(fullPath, true);
fsout.write(bytes);
@@ -505,7 +543,7 @@ public class HiveTestUtil {
private static void createCompactionCommitFile(HoodieCommitMetadata
commitMetadata, String instantTime)
throws IOException {
byte[] bytes =
commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8);
- Path fullPath = new Path(basePath + "/" +
HoodieTableMetaClient.METAFOLDER_NAME + "/"
+ Path fullPath = new Path(basePath + "/" + METAFOLDER_NAME + "/"
+ HoodieTimeline.makeCommitFileName(instantTime));
FSDataOutputStream fsout = fileSystem.create(fullPath, true);
fsout.write(bytes);
@@ -515,7 +553,7 @@ public class HiveTestUtil {
private static void createDeltaCommitFile(HoodieCommitMetadata
deltaCommitMetadata, String deltaCommitTime)
throws IOException {
byte[] bytes =
deltaCommitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8);
- Path fullPath = new Path(basePath + "/" +
HoodieTableMetaClient.METAFOLDER_NAME + "/"
+ Path fullPath = new Path(basePath + "/" + METAFOLDER_NAME + "/"
+ HoodieTimeline.makeDeltaFileName(deltaCommitTime));
FSDataOutputStream fsout = fileSystem.create(fullPath, true);
fsout.write(bytes);
diff --git
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java
index 7f73be33070..b6f1d0bdfa2 100644
---
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java
+++
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java
@@ -27,6 +27,7 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.hadoop.CachingPath;
import org.apache.hudi.sync.common.model.Partition;
import org.apache.hudi.sync.common.model.PartitionEvent;
import org.apache.hudi.sync.common.model.PartitionValueExtractor;
@@ -112,16 +113,25 @@ public abstract class HoodieSyncClient implements
HoodieMetaSyncOperations, Auto
}
}
+ /**
+ * Gets all relative partitions paths in the Hudi table on storage.
+ *
+ * @return All relative partitions paths.
+ */
+ public List<String> getAllPartitionPathsOnStorage() {
+ HoodieLocalEngineContext engineContext = new
HoodieLocalEngineContext(metaClient.getHadoopConf());
+ return FSUtils.getAllPartitionPaths(engineContext,
+ config.getString(META_SYNC_BASE_PATH),
+ config.getBoolean(META_SYNC_USE_FILE_LISTING_FROM_METADATA),
+ config.getBoolean(META_SYNC_ASSUME_DATE_PARTITION));
+ }
+
public List<String> getWrittenPartitionsSince(Option<String>
lastCommitTimeSynced) {
if (!lastCommitTimeSynced.isPresent()) {
LOG.info("Last commit time synced is not known, listing all partitions
in "
+ config.getString(META_SYNC_BASE_PATH)
+ ",FS :" + config.getHadoopFileSystem());
- HoodieLocalEngineContext engineContext = new
HoodieLocalEngineContext(metaClient.getHadoopConf());
- return FSUtils.getAllPartitionPaths(engineContext,
- config.getString(META_SYNC_BASE_PATH),
- config.getBoolean(META_SYNC_USE_FILE_LISTING_FROM_METADATA),
- config.getBoolean(META_SYNC_ASSUME_DATE_PARTITION));
+ return getAllPartitionPathsOnStorage();
} else {
LOG.info("Last commit time synced is " + lastCommitTimeSynced.get() + ",
Getting commits since then");
return TimelineUtils.getWrittenPartitions(
@@ -129,27 +139,75 @@ public abstract class HoodieSyncClient implements
HoodieMetaSyncOperations, Auto
}
}
+ /**
+ * Gets the partition events for changed partitions.
+ * <p>
+ * This compares the list of all partitions of a table stored in the
metastore and
+ * on the storage:
+ * (1) Partitions exist in the metastore, but NOT the storage: drops them in
the metastore;
+ * (2) Partitions exist on the storage, but NOT the metastore: adds them to
the metastore;
+ * (3) Partitions exist in both, but the partition path is different: update
them in the metastore.
+ *
+ * @param allPartitionsInMetastore All partitions of a table stored in the
metastore.
+ * @param allPartitionsOnStorage All partitions of a table stored on the
storage.
+ * @return partition events for changed partitions.
+ */
+ public List<PartitionEvent> getPartitionEvents(List<Partition>
allPartitionsInMetastore,
+ List<String>
allPartitionsOnStorage) {
+ Map<String, String> paths =
getPartitionValuesToPathMapping(allPartitionsInMetastore);
+ Set<String> partitionsToDrop = new HashSet<>(paths.keySet());
+
+ List<PartitionEvent> events = new ArrayList<>();
+ for (String storagePartition : allPartitionsOnStorage) {
+ Path storagePartitionPath =
FSUtils.getPartitionPath(config.getString(META_SYNC_BASE_PATH),
storagePartition);
+ String fullStoragePartitionPath =
Path.getPathWithoutSchemeAndAuthority(storagePartitionPath).toUri().getPath();
+ // Check if the partition values or if hdfs path is the same
+ List<String> storagePartitionValues =
partitionValueExtractor.extractPartitionValuesInPath(storagePartition);
+
+ if (!storagePartitionValues.isEmpty()) {
+ String storageValue = String.join(", ", storagePartitionValues);
+ // Remove partitions that exist on storage from the `partitionsToDrop`
set,
+ // so the remaining partitions that exist in the metastore should be
dropped
+ partitionsToDrop.remove(storageValue);
+ if (!paths.containsKey(storageValue)) {
+ events.add(PartitionEvent.newPartitionAddEvent(storagePartition));
+ } else if (!paths.get(storageValue).equals(fullStoragePartitionPath)) {
+ events.add(PartitionEvent.newPartitionUpdateEvent(storagePartition));
+ }
+ }
+ }
+
+ partitionsToDrop.forEach(storageValue -> {
+ String storagePath = paths.get(storageValue);
+ try {
+ String relativePath = FSUtils.getRelativePartitionPath(
+ metaClient.getBasePathV2(), new CachingPath(storagePath));
+ events.add(PartitionEvent.newPartitionDropEvent(relativePath));
+ } catch (IllegalArgumentException e) {
+ LOG.error("Cannot parse the path stored in the metastore, ignoring it
for "
+ + "generating DROP partition event: \"" + storagePath + "\".", e);
+ }
+ });
+ return events;
+ }
+
/**
* Iterate over the storage partitions and find if there are any new
partitions that need to be added or updated.
* Generate a list of PartitionEvent based on the changes required.
*/
- public List<PartitionEvent> getPartitionEvents(List<Partition>
tablePartitions, List<String> partitionStoragePartitions, Set<String>
droppedPartitions) {
- Map<String, String> paths = new HashMap<>();
- for (Partition tablePartition : tablePartitions) {
- List<String> hivePartitionValues = tablePartition.getValues();
- String fullTablePartitionPath =
- Path.getPathWithoutSchemeAndAuthority(new
Path(tablePartition.getStorageLocation())).toUri().getPath();
- paths.put(String.join(", ", hivePartitionValues),
fullTablePartitionPath);
- }
+ public List<PartitionEvent> getPartitionEvents(List<Partition>
partitionsInMetastore,
+ List<String>
writtenPartitionsOnStorage,
+ Set<String>
droppedPartitionsOnStorage) {
+ Map<String, String> paths =
getPartitionValuesToPathMapping(partitionsInMetastore);
List<PartitionEvent> events = new ArrayList<>();
- for (String storagePartition : partitionStoragePartitions) {
+ for (String storagePartition : writtenPartitionsOnStorage) {
Path storagePartitionPath =
FSUtils.getPartitionPath(config.getString(META_SYNC_BASE_PATH),
storagePartition);
String fullStoragePartitionPath =
Path.getPathWithoutSchemeAndAuthority(storagePartitionPath).toUri().getPath();
// Check if the partition values or if hdfs path is the same
List<String> storagePartitionValues =
partitionValueExtractor.extractPartitionValuesInPath(storagePartition);
- if (droppedPartitions.contains(storagePartition)) {
+ if (droppedPartitionsOnStorage.contains(storagePartition)) {
events.add(PartitionEvent.newPartitionDropEvent(storagePartition));
} else {
if (!storagePartitionValues.isEmpty()) {
@@ -164,4 +222,22 @@ public abstract class HoodieSyncClient implements
HoodieMetaSyncOperations, Auto
}
return events;
}
+
+ /**
+ * Gets the partition values to the absolute path mapping based on the
+ * partition information from the metastore.
+ *
+ * @param partitionsInMetastore Partitions in the metastore.
+ * @return The partition values to the absolute path mapping.
+ */
+ private Map<String, String> getPartitionValuesToPathMapping(List<Partition>
partitionsInMetastore) {
+ Map<String, String> paths = new HashMap<>();
+ for (Partition tablePartition : partitionsInMetastore) {
+ List<String> hivePartitionValues = tablePartition.getValues();
+ String fullTablePartitionPath =
+ Path.getPathWithoutSchemeAndAuthority(new
Path(tablePartition.getStorageLocation())).toUri().getPath();
+ paths.put(String.join(", ", hivePartitionValues),
fullTablePartitionPath);
+ }
+ return paths;
+ }
}