nsivabalan commented on a change in pull request #5138:
URL: https://github.com/apache/hudi/pull/5138#discussion_r838921139
##########
File path:
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java
##########
@@ -404,6 +423,79 @@ public void
testLoadArchiveTimelineWithDamagedPlanFile(boolean enableArchiveMerg
assertThrows(HoodieException.class, () ->
metaClient.getArchivedTimeline().reload());
}
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testArchivalWithMultiWriters(boolean enableMetadata) throws
Exception {
+ HoodieWriteConfig writeConfig =
initTestTableAndGetWriteConfig(enableMetadata, 2, 4, 5, 2,
+ HoodieTableType.COPY_ON_WRITE, false, 10, 209715200,
+ HoodieFailedWritesCleaningPolicy.LAZY,
WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL);
+
+ final ExecutorService executors = Executors.newFixedThreadPool(2);
+ List<CompletableFuture<Boolean>> completableFutureList = new ArrayList<>();
+ CountDownLatch countDownLatch = new CountDownLatch(1);
+ IntStream.range(0, 2).forEach(index -> {
+ completableFutureList.add(CompletableFuture.supplyAsync(() -> {
+ HoodieTable table = HoodieSparkTable.create(writeConfig, context,
metaClient);
+ try {
+ countDownLatch.await(30, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ metaClient.reloadActiveTimeline();
+ while
(!metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant().get().getTimestamp().endsWith("29")
+ ||
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().countInstants()
> 4) {
+ try {
+ //System.out.println("Archiving " + index + ", total completed
instants " +
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().countInstants());
+ //System.out.println("Last active instant " +
metaClient.reloadActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant().get().toString());
+ HoodieTimelineArchiver archiver = new
HoodieTimelineArchiver(writeConfig, table);
+ archiver.archiveIfRequired(context, true);
+ // if not for below sleep, both archiving threads acquires lock in
quick succession and does not give space for main thread
+ // to complete the write operation when metadata table is enabled.
+ if (enableMetadata) {
+ Thread.sleep(2);
Review comment:
this is being invoked in a loop by 2 different threads. and
archiveIfRequired is acquiring a lock as you might be aware. Whats happening is
that, these two threads keeps acquiring the lock in quick successing and does
not give much space for the writeClient to acquire lock to perform more writes
(needs lock to write to metadata).
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
##########
@@ -468,14 +469,16 @@ protected void preWrite(String instantTime,
WriteOperationType writeOperationTyp
* @param metadata Commit Metadata corresponding to committed instant
* @param instantTime Instant Time
* @param extraMetadata Additional Metadata passed by user
+ * @param acquireLockForArchival true if lock has to be acquired for
archival. false otherwise.
*/
- protected void postCommit(HoodieTable table, HoodieCommitMetadata metadata,
String instantTime, Option<Map<String, String>> extraMetadata) {
+ protected void postCommit(HoodieTable table, HoodieCommitMetadata metadata,
String instantTime, Option<Map<String, String>> extraMetadata,
+ boolean acquireLockForArchival) {
Review comment:
there are two code paths from WriteClient stand point. in one flow, the
caller calls archival from within a lock already. where as, for another flow,
the caller does not have a lock. So, to avoid double locking, we have to do
this. We already have a tracking ticket to clean up this w/ a nice abstraction.
https://issues.apache.org/jira/browse/HUDI-2635
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
##########
@@ -468,14 +469,16 @@ protected void preWrite(String instantTime,
WriteOperationType writeOperationTyp
* @param metadata Commit Metadata corresponding to committed instant
* @param instantTime Instant Time
* @param extraMetadata Additional Metadata passed by user
+ * @param acquireLockForArchival true if lock has to be acquired for
archival. false otherwise.
*/
- protected void postCommit(HoodieTable table, HoodieCommitMetadata metadata,
String instantTime, Option<Map<String, String>> extraMetadata) {
+ protected void postCommit(HoodieTable table, HoodieCommitMetadata metadata,
String instantTime, Option<Map<String, String>> extraMetadata,
+ boolean acquireLockForArchival) {
Review comment:
So, this is not about OCC or not.
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java
##########
@@ -167,6 +178,9 @@ public boolean archiveIfRequired(HoodieEngineContext
context) throws IOException
return success;
} finally {
close();
+ if (acquireLock) {
+ txnManager.close();
Review comment:
my bad. it has to be end transaction.
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java
##########
@@ -143,11 +147,18 @@ private void close() {
}
}
+ public boolean archiveIfRequired(HoodieEngineContext context) throws
IOException {
+ return archiveIfRequired(context, false);
+ }
+
/**
* Check if commits need to be archived. If yes, archive commits.
*/
- public boolean archiveIfRequired(HoodieEngineContext context) throws
IOException {
+ public boolean archiveIfRequired(HoodieEngineContext context, boolean
acquireLock) throws IOException {
try {
+ if (acquireLock) {
+ txnManager.beginTransaction();
Review comment:
again, this is for double locking purpose. not for OCC vs single writer.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]