nsivabalan commented on a change in pull request #3695:
URL: https://github.com/apache/hudi/pull/3695#discussion_r713842735
##########
File path:
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java
##########
@@ -418,4 +445,177 @@ public HoodieTableFileSystemView
getHoodieTableFileSystemView(HoodieTableMetaCli
}
return Pair.of(partitionPathStatMap, globalStat);
}
+
+ /**
+ * Validate the metadata tables contents to ensure it matches what is on the
file system.
+ */
+ public void validateMetadata(HoodieTestTable testTable, List<String>
inflightCommits, HoodieWriteConfig writeConfig,
+ String metadataTableBasePath, boolean
doFullValidation) throws IOException {
+ HoodieTableMetadata tableMetadata = metadata(writeConfig, context);
+ assertNotNull(tableMetadata, "MetadataReader should have been
initialized");
+ if (!writeConfig.isMetadataTableEnabled() ||
!writeConfig.getMetadataConfig().validateFileListingMetadata()) {
+ return;
+ }
+
+ assertEquals(inflightCommits, testTable.inflightCommits());
+
+ HoodieTimer timer = new HoodieTimer().startTimer();
+ HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
+
+ // Partitions should match
+ List<java.nio.file.Path> fsPartitionPaths =
testTable.getAllPartitionPaths();
+ List<String> fsPartitions = new ArrayList<>();
+ fsPartitionPaths.forEach(entry ->
fsPartitions.add(entry.getFileName().toString()));
+ List<String> metadataPartitions = tableMetadata.getAllPartitionPaths();
+
+ Collections.sort(fsPartitions);
+ Collections.sort(metadataPartitions);
+
+ assertEquals(fsPartitions.size(), metadataPartitions.size(), "Partitions
should match");
+ assertEquals(fsPartitions, metadataPartitions, "Partitions should match");
+
+ // Files within each partition should match
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ HoodieTable table = HoodieSparkTable.create(writeConfig, engineContext);
+ TableFileSystemView tableView = table.getHoodieView();
+ List<String> fullPartitionPaths = fsPartitions.stream().map(partition ->
basePath + "/" + partition).collect(Collectors.toList());
+ Map<String, FileStatus[]> partitionToFilesMap =
tableMetadata.getAllFilesInPartitions(fullPartitionPaths);
+ assertEquals(fsPartitions.size(), partitionToFilesMap.size());
+
+ fsPartitions.forEach(partition -> {
+ try {
+ validateFilesPerPartition(testTable, tableMetadata, tableView,
partitionToFilesMap, partition);
+ } catch (IOException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
Review comment:
lets remove e.printStackTrace();
##########
File path:
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
##########
@@ -612,214 +296,120 @@ public void testRollbackUnsyncedCommit(HoodieTableType
tableType) throws Excepti
@EnumSource(HoodieTableType.class)
public void testManualRollbacks(HoodieTableType tableType) throws Exception {
init(tableType);
- HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
+ doWriteOperationsAndBootstrapMetadata(testTable);
// Setting to archive more aggressively on the Metadata Table than the
Dataset
final int maxDeltaCommitsBeforeCompaction = 4;
final int minArchiveCommitsMetadata = 2;
final int minArchiveCommitsDataset = 4;
- HoodieWriteConfig config = getWriteConfigBuilder(true, true, false)
+ writeConfig = getWriteConfigBuilder(true, true, false)
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true)
.archiveCommitsWith(minArchiveCommitsMetadata,
minArchiveCommitsMetadata + 1).retainCommits(1)
.withMaxNumDeltaCommitsBeforeCompaction(maxDeltaCommitsBeforeCompaction).build())
.withCompactionConfig(HoodieCompactionConfig.newBuilder().archiveCommitsWith(minArchiveCommitsDataset,
minArchiveCommitsDataset + 1)
.retainCommits(1).retainFileVersions(1).withAutoClean(false).withAsyncClean(true).build())
.build();
-
- try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext,
config)) {
- // Initialize table with metadata
- String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
- List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 20);
- client.startCommitWithTime(newCommitTime);
- List<WriteStatus> writeStatuses =
client.bulkInsert(jsc.parallelize(records, 1), newCommitTime).collect();
- assertNoWriteErrors(writeStatuses);
- validateMetadata(client);
-
- // Perform multiple commits
- for (int i = 1; i < 10; ++i) {
- newCommitTime = HoodieActiveTimeline.createNewInstantTime();
- if (i == 1) {
- records = dataGen.generateInserts(newCommitTime, 5);
- } else {
- records = dataGen.generateUpdates(newCommitTime, 2);
- }
- client.startCommitWithTime(newCommitTime);
- writeStatuses = client.upsert(jsc.parallelize(records, 1),
newCommitTime).collect();
- assertNoWriteErrors(writeStatuses);
+ for (int i = 3; i < 10; i++) {
+ if (i == 3) {
+ testTable.doWriteOperation("00" + i, UPSERT, singletonList("p3"),
asList("p1", "p2", "p3"), 2);
+ syncTableMetadata(writeConfig);
+ } else {
+ testTable.doWriteOperation("00" + i, UPSERT, emptyList(), asList("p1",
"p2", "p3"), 2);
}
+ }
+ syncAndValidate(testTable, true);
- // We can only rollback those commits whose deltacommit have not been
archived yet.
- int numRollbacks = 0;
- boolean exceptionRaised = false;
-
- List<HoodieInstant> allInstants =
metaClient.reloadActiveTimeline().getCommitsTimeline().getReverseOrderedInstants()
- .collect(Collectors.toList());
- for (HoodieInstant instantToRollback : allInstants) {
- try {
- client.rollback(instantToRollback.getTimestamp());
- client.syncTableMetadata();
- ++numRollbacks;
- } catch (HoodieMetadataException e) {
- exceptionRaised = true;
- break;
- }
- }
+ // We can only rollback those commits whose deltacommit have not been
archived yet.
+ int numRollbacks = 0;
+ boolean exceptionRaised = false;
- assertTrue(exceptionRaised, "Rollback of archived instants should fail");
- // Since each rollback also creates a deltacommit, we can only support
rolling back of half of the original
- // instants present before rollback started.
- assertTrue(numRollbacks >= Math.max(minArchiveCommitsDataset,
minArchiveCommitsMetadata) / 2,
- "Rollbacks of non archived instants should work");
+ List<HoodieInstant> allInstants =
metaClient.reloadActiveTimeline().getCommitsTimeline().getReverseOrderedInstants().collect(Collectors.toList());
+ for (HoodieInstant instantToRollback : allInstants) {
+ try {
+ testTable.doRollback(instantToRollback.getTimestamp(),
String.valueOf(Time.now()));
+ syncTableMetadata(writeConfig);
+ ++numRollbacks;
+ } catch (HoodieMetadataException e) {
+ exceptionRaised = true;
+ break;
+ }
}
+
+ assertTrue(exceptionRaised, "Rollback of archived instants should fail");
+ // Since each rollback also creates a deltacommit, we can only support
rolling back of half of the original
+ // instants present before rollback started.
+ assertTrue(numRollbacks >= Math.max(minArchiveCommitsDataset,
minArchiveCommitsMetadata) / 2,
+ "Rollbacks of non archived instants should work");
}
/**
* Test sync of table operations.
*/
@ParameterizedTest
@EnumSource(HoodieTableType.class)
- @Disabled
public void testSync(HoodieTableType tableType) throws Exception {
init(tableType);
- HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
-
- String newCommitTime;
- List<HoodieRecord> records;
- List<WriteStatus> writeStatuses;
-
// Initial commits without metadata table enabled
- try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext,
getWriteConfig(true, false))) {
- newCommitTime = HoodieActiveTimeline.createNewInstantTime();
- records = dataGen.generateInserts(newCommitTime, 5);
- client.startCommitWithTime(newCommitTime);
- writeStatuses = client.bulkInsert(jsc.parallelize(records, 1),
newCommitTime).collect();
- assertNoWriteErrors(writeStatuses);
-
- newCommitTime = HoodieActiveTimeline.createNewInstantTime();
- records = dataGen.generateInserts(newCommitTime, 5);
- client.startCommitWithTime(newCommitTime);
- writeStatuses = client.bulkInsert(jsc.parallelize(records, 1),
newCommitTime).collect();
- assertNoWriteErrors(writeStatuses);
- }
-
+ writeConfig = getWriteConfigBuilder(true, false, false).build();
+ testTable.doWriteOperation("001", BULK_INSERT, asList("p1", "p2"),
asList("p1", "p2"), 1);
+ testTable.doWriteOperation("002", BULK_INSERT, asList("p1", "p2"), 1);
// Enable metadata table so it initialized by listing from file system
- try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext,
getWriteConfig(true, true))) {
- // inserts
- newCommitTime = HoodieActiveTimeline.createNewInstantTime();
- client.startCommitWithTime(newCommitTime);
- records = dataGen.generateInserts(newCommitTime, 5);
- writeStatuses = client.insert(jsc.parallelize(records, 1),
newCommitTime).collect();
- assertNoWriteErrors(writeStatuses);
-
- validateMetadata(client);
- assertTrue(metadata(client).isInSync());
- }
-
+ testTable.doWriteOperation("003", INSERT, asList("p1", "p2"), 1);
+ syncAndValidate(testTable, emptyList(), true, true, true);
// Various table operations without metadata table enabled
- String restoreToInstant;
- String inflightActionTimestamp;
- String beforeInflightActionTimestamp;
- try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext,
getWriteConfig(true, false))) {
- // updates
- newCommitTime = HoodieActiveTimeline.createNewInstantTime();
- client.startCommitWithTime(newCommitTime);
- records = dataGen.generateUniqueUpdates(newCommitTime, 5);
- writeStatuses = client.upsert(jsc.parallelize(records, 1),
newCommitTime).collect();
- assertNoWriteErrors(writeStatuses);
- assertTrue(metadata(client).isInSync());
-
- // updates and inserts
- newCommitTime = HoodieActiveTimeline.createNewInstantTime();
- client.startCommitWithTime(newCommitTime);
- records = dataGen.generateUpdates(newCommitTime, 10);
- writeStatuses = client.upsert(jsc.parallelize(records, 1),
newCommitTime).collect();
- assertNoWriteErrors(writeStatuses);
- assertTrue(metadata(client).isInSync());
-
- // Compaction
- if (metaClient.getTableType() == HoodieTableType.MERGE_ON_READ) {
- newCommitTime = HoodieActiveTimeline.createNewInstantTime();
- client.scheduleCompactionAtInstant(newCommitTime, Option.empty());
- client.compact(newCommitTime);
- assertTrue(metadata(client).isInSync());
- }
-
- // Savepoint
- restoreToInstant = newCommitTime;
- if (metaClient.getTableType() == HoodieTableType.COPY_ON_WRITE) {
- client.savepoint("hoodie", "metadata test");
- assertTrue(metadata(client).isInSync());
- }
-
- // Record a timestamp for creating an inflight instance for sync testing
- inflightActionTimestamp = HoodieActiveTimeline.createNewInstantTime();
- beforeInflightActionTimestamp = newCommitTime;
-
- // Deletes
- newCommitTime = HoodieActiveTimeline.createNewInstantTime();
- records = dataGen.generateDeletes(newCommitTime, 5);
- JavaRDD<HoodieKey> deleteKeys = jsc.parallelize(records, 1).map(r ->
r.getKey());
- client.startCommitWithTime(newCommitTime);
- client.delete(deleteKeys, newCommitTime);
- assertTrue(metadata(client).isInSync());
-
- // Clean
- newCommitTime = HoodieActiveTimeline.createNewInstantTime();
- client.clean(newCommitTime);
- assertTrue(metadata(client).isInSync());
-
- // updates
- newCommitTime = HoodieActiveTimeline.createNewInstantTime();
- client.startCommitWithTime(newCommitTime);
- records = dataGen.generateUniqueUpdates(newCommitTime, 10);
- writeStatuses = client.upsert(jsc.parallelize(records, 1),
newCommitTime).collect();
- assertNoWriteErrors(writeStatuses);
- assertTrue(metadata(client).isInSync());
+ testTable.doWriteOperation("004", UPSERT, asList("p1", "p2"), 1);
+ testTable.doWriteOperation("005", UPSERT, singletonList("p3"),
asList("p1", "p2", "p3"), 3);
+ syncAndValidate(testTable);
- // insert overwrite to test replacecommit
- newCommitTime = HoodieActiveTimeline.createNewInstantTime();
- client.startCommitWithTime(newCommitTime,
HoodieTimeline.REPLACE_COMMIT_ACTION);
- records = dataGen.generateInserts(newCommitTime, 5);
- HoodieWriteResult replaceResult =
client.insertOverwrite(jsc.parallelize(records, 1), newCommitTime);
- writeStatuses = replaceResult.getWriteStatuses().collect();
- assertNoWriteErrors(writeStatuses);
- assertTrue(metadata(client).isInSync());
+ // trigger compaction
+ if (MERGE_ON_READ.equals(tableType)) {
+ testTable = testTable.doCompaction("006", asList("p1", "p2"));
+ syncAndValidate(testTable);
}
- // If there is an incomplete operation, the Metadata Table is not updated
beyond that operations but the
- // in-memory merge should consider all the completed operations.
- Path inflightCleanPath = new Path(metaClient.getMetaPath(),
HoodieTimeline.makeInflightCleanerFileName(inflightActionTimestamp));
- fs.create(inflightCleanPath).close();
-
- try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext,
getWriteConfig(true, true))) {
- // Restore cannot be done until the metadata table is in sync. See
HUDI-1502 for details
- client.syncTableMetadata();
+ // trigger an upsert
+ testTable.doWriteOperation("007", UPSERT, asList("p1", "p2", "p3"), 2);
+ syncAndValidate(testTable, emptyList(), true, false, true);
- // Table should sync only before the inflightActionTimestamp
- HoodieBackedTableMetadataWriter writer =
- (HoodieBackedTableMetadataWriter)
SparkHoodieBackedTableMetadataWriter.create(hadoopConf, client.getConfig(),
context);
- assertEquals(writer.getMetadataReader().getUpdateTime().get(),
beforeInflightActionTimestamp);
+ // savepoint
+ if (COPY_ON_WRITE.equals(tableType)) {
+ testTable.doSavepoint("007");
+ syncAndValidate(testTable);
+ }
- // Reader should sync to all the completed instants
- HoodieTableMetadata metadata = HoodieTableMetadata.create(context,
client.getConfig().getMetadataConfig(),
- client.getConfig().getBasePath(),
FileSystemViewStorageConfig.SPILLABLE_DIR.defaultValue());
-
assertEquals(((HoodieBackedTableMetadata)metadata).getReaderTime().get(),
newCommitTime);
+ // trigger delete
+ testTable.doWriteOperation("008", DELETE, emptyList(), asList("p1", "p2",
"p3"), 2);
+ syncAndValidate(testTable, emptyList(), true, true, false);
- // Remove the inflight instance holding back table sync
- fs.delete(inflightCleanPath, false);
- client.syncTableMetadata();
+ // trigger clean
+ testTable.doCleanBasedOnCommits("009", asList("001", "002"));
+ syncAndValidate(testTable, emptyList(), true, false, false);
- writer =
-
(HoodieBackedTableMetadataWriter)SparkHoodieBackedTableMetadataWriter.create(hadoopConf,
client.getConfig(), context);
- assertEquals(writer.getMetadataReader().getUpdateTime().get(),
newCommitTime);
+ // trigger another upsert
+ testTable.doWriteOperation("010", UPSERT, asList("p1", "p2", "p3"), 2);
+ syncAndValidate(testTable, emptyList(), true, false, false);
- // Reader should sync to all the completed instants
- metadata = HoodieTableMetadata.create(context,
client.getConfig().getMetadataConfig(),
- client.getConfig().getBasePath(),
FileSystemViewStorageConfig.SPILLABLE_DIR.defaultValue());
- assertEquals(writer.getMetadataReader().getUpdateTime().get(),
newCommitTime);
- }
+ // trigger clustering
+ testTable.doCluster("011", new HashMap<>());
+ syncAndValidate(testTable, emptyList(), true, true, false);
- // Enable metadata table and ensure it is synced
+ // If there is an inflight operation, the Metadata Table is not updated
beyond that operations but the
+ // in-memory merge should consider all the completed operations.
+ HoodieCommitMetadata inflightCommitMeta =
testTable.doWriteOperation("007", UPSERT, emptyList(),
+ asList("p1", "p2", "p3"), 2, false, true);
+ // trigger upsert
+ testTable.doWriteOperation("013", UPSERT, emptyList(), asList("p1", "p2",
"p3"), 2);
+ // testTable validation will fetch only files pertaining to completed
commits. So, validateMetadata() will skip files for 006
Review comment:
fix comments wrt actual commit time. is it 006 or 007.
##########
File path:
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
##########
@@ -612,214 +296,120 @@ public void testRollbackUnsyncedCommit(HoodieTableType
tableType) throws Excepti
@EnumSource(HoodieTableType.class)
public void testManualRollbacks(HoodieTableType tableType) throws Exception {
init(tableType);
- HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
+ doWriteOperationsAndBootstrapMetadata(testTable);
// Setting to archive more aggressively on the Metadata Table than the
Dataset
final int maxDeltaCommitsBeforeCompaction = 4;
final int minArchiveCommitsMetadata = 2;
final int minArchiveCommitsDataset = 4;
- HoodieWriteConfig config = getWriteConfigBuilder(true, true, false)
+ writeConfig = getWriteConfigBuilder(true, true, false)
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true)
.archiveCommitsWith(minArchiveCommitsMetadata,
minArchiveCommitsMetadata + 1).retainCommits(1)
.withMaxNumDeltaCommitsBeforeCompaction(maxDeltaCommitsBeforeCompaction).build())
.withCompactionConfig(HoodieCompactionConfig.newBuilder().archiveCommitsWith(minArchiveCommitsDataset,
minArchiveCommitsDataset + 1)
.retainCommits(1).retainFileVersions(1).withAutoClean(false).withAsyncClean(true).build())
.build();
-
- try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext,
config)) {
- // Initialize table with metadata
- String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
- List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 20);
- client.startCommitWithTime(newCommitTime);
- List<WriteStatus> writeStatuses =
client.bulkInsert(jsc.parallelize(records, 1), newCommitTime).collect();
- assertNoWriteErrors(writeStatuses);
- validateMetadata(client);
-
- // Perform multiple commits
- for (int i = 1; i < 10; ++i) {
- newCommitTime = HoodieActiveTimeline.createNewInstantTime();
- if (i == 1) {
- records = dataGen.generateInserts(newCommitTime, 5);
- } else {
- records = dataGen.generateUpdates(newCommitTime, 2);
- }
- client.startCommitWithTime(newCommitTime);
- writeStatuses = client.upsert(jsc.parallelize(records, 1),
newCommitTime).collect();
- assertNoWriteErrors(writeStatuses);
+ for (int i = 3; i < 10; i++) {
+ if (i == 3) {
+ testTable.doWriteOperation("00" + i, UPSERT, singletonList("p3"),
asList("p1", "p2", "p3"), 2);
+ syncTableMetadata(writeConfig);
+ } else {
+ testTable.doWriteOperation("00" + i, UPSERT, emptyList(), asList("p1",
"p2", "p3"), 2);
}
+ }
+ syncAndValidate(testTable, true);
- // We can only rollback those commits whose deltacommit have not been
archived yet.
- int numRollbacks = 0;
- boolean exceptionRaised = false;
-
- List<HoodieInstant> allInstants =
metaClient.reloadActiveTimeline().getCommitsTimeline().getReverseOrderedInstants()
- .collect(Collectors.toList());
- for (HoodieInstant instantToRollback : allInstants) {
- try {
- client.rollback(instantToRollback.getTimestamp());
- client.syncTableMetadata();
- ++numRollbacks;
- } catch (HoodieMetadataException e) {
- exceptionRaised = true;
- break;
- }
- }
+ // We can only rollback those commits whose deltacommit have not been
archived yet.
+ int numRollbacks = 0;
+ boolean exceptionRaised = false;
- assertTrue(exceptionRaised, "Rollback of archived instants should fail");
- // Since each rollback also creates a deltacommit, we can only support
rolling back of half of the original
- // instants present before rollback started.
- assertTrue(numRollbacks >= Math.max(minArchiveCommitsDataset,
minArchiveCommitsMetadata) / 2,
- "Rollbacks of non archived instants should work");
+ List<HoodieInstant> allInstants =
metaClient.reloadActiveTimeline().getCommitsTimeline().getReverseOrderedInstants().collect(Collectors.toList());
+ for (HoodieInstant instantToRollback : allInstants) {
+ try {
+ testTable.doRollback(instantToRollback.getTimestamp(),
String.valueOf(Time.now()));
+ syncTableMetadata(writeConfig);
+ ++numRollbacks;
+ } catch (HoodieMetadataException e) {
+ exceptionRaised = true;
+ break;
+ }
}
+
+ assertTrue(exceptionRaised, "Rollback of archived instants should fail");
+ // Since each rollback also creates a deltacommit, we can only support
rolling back of half of the original
+ // instants present before rollback started.
+ assertTrue(numRollbacks >= Math.max(minArchiveCommitsDataset,
minArchiveCommitsMetadata) / 2,
+ "Rollbacks of non archived instants should work");
}
/**
* Test sync of table operations.
*/
@ParameterizedTest
@EnumSource(HoodieTableType.class)
- @Disabled
public void testSync(HoodieTableType tableType) throws Exception {
init(tableType);
- HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
-
- String newCommitTime;
- List<HoodieRecord> records;
- List<WriteStatus> writeStatuses;
-
// Initial commits without metadata table enabled
- try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext,
getWriteConfig(true, false))) {
- newCommitTime = HoodieActiveTimeline.createNewInstantTime();
- records = dataGen.generateInserts(newCommitTime, 5);
- client.startCommitWithTime(newCommitTime);
- writeStatuses = client.bulkInsert(jsc.parallelize(records, 1),
newCommitTime).collect();
- assertNoWriteErrors(writeStatuses);
-
- newCommitTime = HoodieActiveTimeline.createNewInstantTime();
- records = dataGen.generateInserts(newCommitTime, 5);
- client.startCommitWithTime(newCommitTime);
- writeStatuses = client.bulkInsert(jsc.parallelize(records, 1),
newCommitTime).collect();
- assertNoWriteErrors(writeStatuses);
- }
-
+ writeConfig = getWriteConfigBuilder(true, false, false).build();
+ testTable.doWriteOperation("001", BULK_INSERT, asList("p1", "p2"),
asList("p1", "p2"), 1);
+ testTable.doWriteOperation("002", BULK_INSERT, asList("p1", "p2"), 1);
// Enable metadata table so it initialized by listing from file system
- try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext,
getWriteConfig(true, true))) {
- // inserts
- newCommitTime = HoodieActiveTimeline.createNewInstantTime();
- client.startCommitWithTime(newCommitTime);
- records = dataGen.generateInserts(newCommitTime, 5);
- writeStatuses = client.insert(jsc.parallelize(records, 1),
newCommitTime).collect();
- assertNoWriteErrors(writeStatuses);
-
- validateMetadata(client);
- assertTrue(metadata(client).isInSync());
- }
-
+ testTable.doWriteOperation("003", INSERT, asList("p1", "p2"), 1);
+ syncAndValidate(testTable, emptyList(), true, true, true);
// Various table operations without metadata table enabled
- String restoreToInstant;
- String inflightActionTimestamp;
- String beforeInflightActionTimestamp;
- try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext,
getWriteConfig(true, false))) {
- // updates
- newCommitTime = HoodieActiveTimeline.createNewInstantTime();
- client.startCommitWithTime(newCommitTime);
- records = dataGen.generateUniqueUpdates(newCommitTime, 5);
- writeStatuses = client.upsert(jsc.parallelize(records, 1),
newCommitTime).collect();
- assertNoWriteErrors(writeStatuses);
- assertTrue(metadata(client).isInSync());
-
- // updates and inserts
- newCommitTime = HoodieActiveTimeline.createNewInstantTime();
- client.startCommitWithTime(newCommitTime);
- records = dataGen.generateUpdates(newCommitTime, 10);
- writeStatuses = client.upsert(jsc.parallelize(records, 1),
newCommitTime).collect();
- assertNoWriteErrors(writeStatuses);
- assertTrue(metadata(client).isInSync());
-
- // Compaction
- if (metaClient.getTableType() == HoodieTableType.MERGE_ON_READ) {
- newCommitTime = HoodieActiveTimeline.createNewInstantTime();
- client.scheduleCompactionAtInstant(newCommitTime, Option.empty());
- client.compact(newCommitTime);
- assertTrue(metadata(client).isInSync());
- }
-
- // Savepoint
- restoreToInstant = newCommitTime;
- if (metaClient.getTableType() == HoodieTableType.COPY_ON_WRITE) {
- client.savepoint("hoodie", "metadata test");
- assertTrue(metadata(client).isInSync());
- }
-
- // Record a timestamp for creating an inflight instance for sync testing
- inflightActionTimestamp = HoodieActiveTimeline.createNewInstantTime();
- beforeInflightActionTimestamp = newCommitTime;
-
- // Deletes
- newCommitTime = HoodieActiveTimeline.createNewInstantTime();
- records = dataGen.generateDeletes(newCommitTime, 5);
- JavaRDD<HoodieKey> deleteKeys = jsc.parallelize(records, 1).map(r ->
r.getKey());
- client.startCommitWithTime(newCommitTime);
- client.delete(deleteKeys, newCommitTime);
- assertTrue(metadata(client).isInSync());
-
- // Clean
- newCommitTime = HoodieActiveTimeline.createNewInstantTime();
- client.clean(newCommitTime);
- assertTrue(metadata(client).isInSync());
-
- // updates
- newCommitTime = HoodieActiveTimeline.createNewInstantTime();
- client.startCommitWithTime(newCommitTime);
- records = dataGen.generateUniqueUpdates(newCommitTime, 10);
- writeStatuses = client.upsert(jsc.parallelize(records, 1),
newCommitTime).collect();
- assertNoWriteErrors(writeStatuses);
- assertTrue(metadata(client).isInSync());
+ testTable.doWriteOperation("004", UPSERT, asList("p1", "p2"), 1);
+ testTable.doWriteOperation("005", UPSERT, singletonList("p3"),
asList("p1", "p2", "p3"), 3);
+ syncAndValidate(testTable);
- // insert overwrite to test replacecommit
- newCommitTime = HoodieActiveTimeline.createNewInstantTime();
- client.startCommitWithTime(newCommitTime,
HoodieTimeline.REPLACE_COMMIT_ACTION);
- records = dataGen.generateInserts(newCommitTime, 5);
- HoodieWriteResult replaceResult =
client.insertOverwrite(jsc.parallelize(records, 1), newCommitTime);
- writeStatuses = replaceResult.getWriteStatuses().collect();
- assertNoWriteErrors(writeStatuses);
- assertTrue(metadata(client).isInSync());
+ // trigger compaction
+ if (MERGE_ON_READ.equals(tableType)) {
+ testTable = testTable.doCompaction("006", asList("p1", "p2"));
+ syncAndValidate(testTable);
}
- // If there is an incomplete operation, the Metadata Table is not updated
beyond that operations but the
- // in-memory merge should consider all the completed operations.
- Path inflightCleanPath = new Path(metaClient.getMetaPath(),
HoodieTimeline.makeInflightCleanerFileName(inflightActionTimestamp));
- fs.create(inflightCleanPath).close();
-
- try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext,
getWriteConfig(true, true))) {
- // Restore cannot be done until the metadata table is in sync. See
HUDI-1502 for details
- client.syncTableMetadata();
+ // trigger an upsert
+ testTable.doWriteOperation("007", UPSERT, asList("p1", "p2", "p3"), 2);
+ syncAndValidate(testTable, emptyList(), true, false, true);
- // Table should sync only before the inflightActionTimestamp
- HoodieBackedTableMetadataWriter writer =
- (HoodieBackedTableMetadataWriter)
SparkHoodieBackedTableMetadataWriter.create(hadoopConf, client.getConfig(),
context);
- assertEquals(writer.getMetadataReader().getUpdateTime().get(),
beforeInflightActionTimestamp);
+ // savepoint
+ if (COPY_ON_WRITE.equals(tableType)) {
+ testTable.doSavepoint("007");
+ syncAndValidate(testTable);
+ }
- // Reader should sync to all the completed instants
- HoodieTableMetadata metadata = HoodieTableMetadata.create(context,
client.getConfig().getMetadataConfig(),
- client.getConfig().getBasePath(),
FileSystemViewStorageConfig.SPILLABLE_DIR.defaultValue());
-
assertEquals(((HoodieBackedTableMetadata)metadata).getReaderTime().get(),
newCommitTime);
+ // trigger delete
+ testTable.doWriteOperation("008", DELETE, emptyList(), asList("p1", "p2",
"p3"), 2);
+ syncAndValidate(testTable, emptyList(), true, true, false);
- // Remove the inflight instance holding back table sync
- fs.delete(inflightCleanPath, false);
- client.syncTableMetadata();
+ // trigger clean
+ testTable.doCleanBasedOnCommits("009", asList("001", "002"));
+ syncAndValidate(testTable, emptyList(), true, false, false);
- writer =
-
(HoodieBackedTableMetadataWriter)SparkHoodieBackedTableMetadataWriter.create(hadoopConf,
client.getConfig(), context);
- assertEquals(writer.getMetadataReader().getUpdateTime().get(),
newCommitTime);
+ // trigger another upsert
+ testTable.doWriteOperation("010", UPSERT, asList("p1", "p2", "p3"), 2);
+ syncAndValidate(testTable, emptyList(), true, false, false);
- // Reader should sync to all the completed instants
- metadata = HoodieTableMetadata.create(context,
client.getConfig().getMetadataConfig(),
- client.getConfig().getBasePath(),
FileSystemViewStorageConfig.SPILLABLE_DIR.defaultValue());
- assertEquals(writer.getMetadataReader().getUpdateTime().get(),
newCommitTime);
- }
+ // trigger clustering
+ testTable.doCluster("011", new HashMap<>());
+ syncAndValidate(testTable, emptyList(), true, true, false);
- // Enable metadata table and ensure it is synced
+ // If there is an inflight operation, the Metadata Table is not updated
beyond that operations but the
+ // in-memory merge should consider all the completed operations.
+ HoodieCommitMetadata inflightCommitMeta =
testTable.doWriteOperation("007", UPSERT, emptyList(),
Review comment:
guess we should avoid 007 at line 271. make that 008 and adjust
subsequent commit times as well.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]