manojpec commented on a change in pull request #3827:
URL: https://github.com/apache/hudi/pull/3827#discussion_r735022504
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -654,16 +669,16 @@ public HoodieCleanMetadata clean(String cleanInstantTime)
throws HoodieIOExcepti
* {@link AbstractHoodieWriteClient#scheduleTableService(String, Option,
TableServiceType)} and disable inline scheduling
* of clean.
*/
- public HoodieCleanMetadata clean(String cleanInstantTime, boolean
scheduleInline) throws HoodieIOException {
+ public HoodieCleanMetadata clean(String cleanInstantTime, boolean
scheduleInline, boolean skipLocking) throws HoodieIOException {
Review comment:
Now that we are adding a new parameter, can we add the doc/comment for
these parameters?
##########
File path:
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
##########
@@ -895,6 +940,92 @@ public void testUpgradeDowngrade() throws IOException {
assertFalse(fs.exists(new Path(metadataTableBasePath)), "Metadata table
should not exist");
}
+ /**
+ * When table needs to be upgraded and when multi writer is enabled, hudi
rollsback partial commits. Upgrade itself is happening
+ * within a lock and hence rollback should not lock again.
+ *
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ @Test
+ public void testRollbackDuringUpgradeForDoubleLocking() throws IOException,
InterruptedException {
+ init(HoodieTableType.COPY_ON_WRITE, false);
+ HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
+
+ // Perform a commit. This should bootstrap the metadata table with latest
version.
+ List<HoodieRecord> records;
+ JavaRDD<WriteStatus> writeStatuses;
+ String commitTimestamp = HoodieActiveTimeline.createNewInstantTime();
+ Properties properties = new Properties();
+ properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath +
"/.hoodie/.locks");
+
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY,
"3");
+
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY,
"5000");
+ HoodieWriteConfig writeConfig = getWriteConfigBuilder(false, true, false)
+ .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).withAutoClean(false).build())
+
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
+
.withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(FileSystemBasedLockProviderTestClass.class).build())
+ .withProperties(properties)
+ .build();
+ try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext,
writeConfig)) {
+ records = dataGen.generateInserts(commitTimestamp, 5);
+ client.startCommitWithTime(commitTimestamp);
+ writeStatuses = client.insert(jsc.parallelize(records, 1),
commitTimestamp);
+ client.commit(commitTimestamp, writeStatuses);
+ }
+
+ // Metadata table should have been bootstrapped
+ assertTrue(fs.exists(new Path(metadataTableBasePath)), "Metadata table
should exist");
+ FileStatus oldStatus = fs.getFileStatus(new Path(metadataTableBasePath));
+
+ // trigger partial commit
+ metaClient.reloadActiveTimeline();
+ commitTimestamp = HoodieActiveTimeline.createNewInstantTime();
+ try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext,
writeConfig)) {
+ records = dataGen.generateInserts(commitTimestamp, 5);
+ client.startCommitWithTime(commitTimestamp);
+ writeStatuses = client.insert(jsc.parallelize(records, 1),
commitTimestamp);
+ }
+
+ // set hoodie.table.version to 2 in hoodie.properties file
+ changeTableVersion(HoodieTableVersion.TWO);
+ writeConfig = getWriteConfigBuilder(true, true, false)
+ .withRollbackUsingMarkers(false)
+ .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).withAutoClean(false).build())
+
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
+
.withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(FileSystemBasedLockProviderTestClass.class).build())
+ .withProperties(properties)
+ .build();
+
+ // With next commit the table should be deleted (as part of upgrade) and
partial commit should be rolled back.
+ metaClient.reloadActiveTimeline();
+ commitTimestamp = HoodieActiveTimeline.createNewInstantTime();
+ Thread.sleep(1000);
Review comment:
Any reasons why we need this sleep. If there any async tasks involved,
sleep is not a fool proof mechanism to wait for the background action to
complete. This will make the test non-deterministic.
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -593,7 +598,8 @@ public boolean rollback(final String commitInstantTime)
throws HoodieRollbackExc
Option<HoodieRollbackPlan> rollbackPlanOption =
table.scheduleRollback(context, rollbackInstantTime, commitInstantOpt.get(),
false);
if (rollbackPlanOption.isPresent()) {
// execute rollback
- HoodieRollbackMetadata rollbackMetadata = table.rollback(context,
rollbackInstantTime, commitInstantOpt.get(), true);
+ HoodieRollbackMetadata rollbackMetadata = table.rollback(context,
rollbackInstantTime, commitInstantOpt.get(), true,
Review comment:
Nit (nice to have) comment: Line exceeding 120 width.
##########
File path:
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
##########
@@ -895,6 +940,92 @@ public void testUpgradeDowngrade() throws IOException {
assertFalse(fs.exists(new Path(metadataTableBasePath)), "Metadata table
should not exist");
}
+ /**
+ * When table needs to be upgraded and when multi writer is enabled, hudi
rollsback partial commits. Upgrade itself is happening
+ * within a lock and hence rollback should not lock again.
+ *
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ @Test
+ public void testRollbackDuringUpgradeForDoubleLocking() throws IOException,
InterruptedException {
+ init(HoodieTableType.COPY_ON_WRITE, false);
+ HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
+
+ // Perform a commit. This should bootstrap the metadata table with latest
version.
+ List<HoodieRecord> records;
+ JavaRDD<WriteStatus> writeStatuses;
+ String commitTimestamp = HoodieActiveTimeline.createNewInstantTime();
+ Properties properties = new Properties();
+ properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath +
"/.hoodie/.locks");
+
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY,
"3");
+
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY,
"5000");
+ HoodieWriteConfig writeConfig = getWriteConfigBuilder(false, true, false)
+ .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).withAutoClean(false).build())
+
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
+
.withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(FileSystemBasedLockProviderTestClass.class).build())
+ .withProperties(properties)
+ .build();
+ try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext,
writeConfig)) {
+ records = dataGen.generateInserts(commitTimestamp, 5);
+ client.startCommitWithTime(commitTimestamp);
+ writeStatuses = client.insert(jsc.parallelize(records, 1),
commitTimestamp);
+ client.commit(commitTimestamp, writeStatuses);
+ }
+
+ // Metadata table should have been bootstrapped
+ assertTrue(fs.exists(new Path(metadataTableBasePath)), "Metadata table
should exist");
+ FileStatus oldStatus = fs.getFileStatus(new Path(metadataTableBasePath));
+
+ // trigger partial commit
+ metaClient.reloadActiveTimeline();
+ commitTimestamp = HoodieActiveTimeline.createNewInstantTime();
+ try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext,
writeConfig)) {
+ records = dataGen.generateInserts(commitTimestamp, 5);
+ client.startCommitWithTime(commitTimestamp);
+ writeStatuses = client.insert(jsc.parallelize(records, 1),
commitTimestamp);
+ }
+
+ // set hoodie.table.version to 2 in hoodie.properties file
+ changeTableVersion(HoodieTableVersion.TWO);
+ writeConfig = getWriteConfigBuilder(true, true, false)
+ .withRollbackUsingMarkers(false)
+ .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).withAutoClean(false).build())
+
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
+
.withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(FileSystemBasedLockProviderTestClass.class).build())
+ .withProperties(properties)
+ .build();
+
+ // With next commit the table should be deleted (as part of upgrade) and
partial commit should be rolled back.
+ metaClient.reloadActiveTimeline();
+ commitTimestamp = HoodieActiveTimeline.createNewInstantTime();
+ Thread.sleep(1000);
+ try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext,
writeConfig)) {
+ records = dataGen.generateInserts(commitTimestamp, 5);
+ client.startCommitWithTime(commitTimestamp);
+ writeStatuses = client.insert(jsc.parallelize(records, 1),
commitTimestamp);
+ assertNoWriteErrors(writeStatuses.collect());
+ }
+ assertFalse(fs.exists(new Path(metadataTableBasePath)), "Metadata table
should not exist");
+
+ // With next commit the table should be re-bootstrapped (currently in the
constructor. To be changed)
+ commitTimestamp = HoodieActiveTimeline.createNewInstantTime();
+ try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext,
writeConfig)) {
+ records = dataGen.generateInserts(commitTimestamp, 5);
+ client.startCommitWithTime(commitTimestamp);
+ writeStatuses = client.insert(jsc.parallelize(records, 1),
commitTimestamp);
+ assertNoWriteErrors(writeStatuses.collect());
+ }
+
Review comment:
can we have metadata table check here ?
`assertTrue(fs.exists(new Path(metadataTableBasePath)), "Metadata table
should not exist");`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]