This is an automated email from the ASF dual-hosted git repository.
peeyush pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git
The following commit(s) were added to refs/heads/master by this push:
new 84f4a88075 [NO ISSUE][TX] Concurrently wirte checkpoints for atomic
statements
84f4a88075 is described below
commit 84f4a8807575d3c834e51a7ec7f2378a783af2c3
Author: Peeyush Gupta <[email protected]>
AuthorDate: Tue Aug 15 18:13:54 2023 -0700
[NO ISSUE][TX] Concurrently wirte checkpoints for atomic statements
- user model changes: no
- storage format changes: no
- interface changes: no
Change-Id: I3846bfa534ebe4077f55f3a9acccd3dc3d8d0cda
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17725
Reviewed-by: Peeyush Gupta <[email protected]>
Reviewed-by: Murtadha Hubail <[email protected]>
Integration-Tests: Jenkins <[email protected]>
Tested-by: Jenkins <[email protected]>
---
.../app/message/AtomicJobCommitMessage.java | 22 +++++++++++++++++++++-
1 file changed, 21 insertions(+), 1 deletion(-)
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/AtomicJobCommitMessage.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/AtomicJobCommitMessage.java
index fac023cffa..e653b7a899 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/AtomicJobCommitMessage.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/AtomicJobCommitMessage.java
@@ -18,7 +18,11 @@
*/
package org.apache.asterix.app.message;
+import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.Future;
import org.apache.asterix.common.api.IDatasetLifecycleManager;
import org.apache.asterix.common.api.INcApplicationContext;
@@ -46,13 +50,29 @@ public class AtomicJobCommitMessage implements
INcAddressedMessage {
@Override
public void handle(INcApplicationContext appCtx) throws
HyracksDataException, InterruptedException {
IDatasetLifecycleManager datasetLifecycleManager =
appCtx.getDatasetLifecycleManager();
+ ForkJoinPool commonPool = ForkJoinPool.commonPool();
+ List<Future> futures = new ArrayList<>();
for (Integer datasetId : datasetIds) {
for (IndexInfo indexInfo :
datasetLifecycleManager.getDatasetInfo(datasetId).getIndexes().values()) {
if (indexInfo.getIndex().isPrimaryIndex()) {
- ((PrimaryIndexOperationTracker)
indexInfo.getIndex().getOperationTracker()).commit();
+ futures.add(commonPool.submit(() -> {
+ try {
+ ((PrimaryIndexOperationTracker)
indexInfo.getIndex().getOperationTracker()).commit();
+ } catch (HyracksDataException e) {
+ throw new RuntimeException(e);
+ }
+ }));
}
}
}
+ for (Future f : futures) {
+ try {
+ f.get();
+ } catch (ExecutionException e) {
+ futures.forEach(future -> future.cancel(true));
+ throw HyracksDataException.create(e);
+ }
+ }
AtomicJobCompletionMessage message =
new AtomicJobCompletionMessage(jobId,
appCtx.getServiceContext().getNodeId());
NCMessageBroker mb = (NCMessageBroker)
appCtx.getServiceContext().getMessageBroker();