This is an automated email from the ASF dual-hosted git repository.

peeyush pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 84f4a88075 [NO ISSUE][TX] Concurrently wirte checkpoints for atomic 
statements
84f4a88075 is described below

commit 84f4a8807575d3c834e51a7ec7f2378a783af2c3
Author: Peeyush Gupta <[email protected]>
AuthorDate: Tue Aug 15 18:13:54 2023 -0700

    [NO ISSUE][TX] Concurrently wirte checkpoints for atomic statements
    
    - user model changes: no
    - storage format changes: no
    - interface changes: no
    
    Change-Id: I3846bfa534ebe4077f55f3a9acccd3dc3d8d0cda
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17725
    Reviewed-by: Peeyush Gupta <[email protected]>
    Reviewed-by: Murtadha Hubail <[email protected]>
    Integration-Tests: Jenkins <[email protected]>
    Tested-by: Jenkins <[email protected]>
---
 .../app/message/AtomicJobCommitMessage.java        | 22 +++++++++++++++++++++-
 1 file changed, 21 insertions(+), 1 deletion(-)

diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/AtomicJobCommitMessage.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/AtomicJobCommitMessage.java
index fac023cffa..e653b7a899 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/AtomicJobCommitMessage.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/AtomicJobCommitMessage.java
@@ -18,7 +18,11 @@
  */
 package org.apache.asterix.app.message;
 
+import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.Future;
 
 import org.apache.asterix.common.api.IDatasetLifecycleManager;
 import org.apache.asterix.common.api.INcApplicationContext;
@@ -46,13 +50,29 @@ public class AtomicJobCommitMessage implements 
INcAddressedMessage {
     @Override
     public void handle(INcApplicationContext appCtx) throws 
HyracksDataException, InterruptedException {
         IDatasetLifecycleManager datasetLifecycleManager = 
appCtx.getDatasetLifecycleManager();
+        ForkJoinPool commonPool = ForkJoinPool.commonPool();
+        List<Future> futures = new ArrayList<>();
         for (Integer datasetId : datasetIds) {
             for (IndexInfo indexInfo : 
datasetLifecycleManager.getDatasetInfo(datasetId).getIndexes().values()) {
                 if (indexInfo.getIndex().isPrimaryIndex()) {
-                    ((PrimaryIndexOperationTracker) 
indexInfo.getIndex().getOperationTracker()).commit();
+                    futures.add(commonPool.submit(() -> {
+                        try {
+                            ((PrimaryIndexOperationTracker) 
indexInfo.getIndex().getOperationTracker()).commit();
+                        } catch (HyracksDataException e) {
+                            throw new RuntimeException(e);
+                        }
+                    }));
                 }
             }
         }
+        for (Future f : futures) {
+            try {
+                f.get();
+            } catch (ExecutionException e) {
+                futures.forEach(future -> future.cancel(true));
+                throw HyracksDataException.create(e);
+            }
+        }
         AtomicJobCompletionMessage message =
                 new AtomicJobCompletionMessage(jobId, 
appCtx.getServiceContext().getNodeId());
         NCMessageBroker mb = (NCMessageBroker) 
appCtx.getServiceContext().getMessageBroker();

Reply via email to