This is an automated email from the ASF dual-hosted git repository.

peeyush pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git


The following commit(s) were added to refs/heads/master by this push:
     new b9ce5f159d [NO ISSUE][MTD][TX] Implement atomic metadata transactions 
without WAL
b9ce5f159d is described below

commit b9ce5f159d83cda175aa789b42e197b081deb1f0
Author: Peeyush Gupta <[email protected]>
AuthorDate: Wed Jul 26 13:13:41 2023 -0700

    [NO ISSUE][MTD][TX] Implement atomic metadata transactions without WAL
    
    - user model changes: no
    - storage format changes: no
    - interface changes: yes
    
    Details:
    - With this change, metadata transactions without WAL are made atomic on
    cloud deployments.
    - Some refactoring related to global transaction manager.
    
    Change-Id: I282e0e4ca8a9bff68fa88613b9d34b14bc2b764c
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17657
    Reviewed-by: Peeyush Gupta <[email protected]>
    Reviewed-by: Murtadha Hubail <[email protected]>
    Tested-by: Peeyush Gupta <[email protected]>
---
 .../org/apache/asterix/app/cc/GlobalTxManager.java |   3 +-
 .../org/apache/asterix/app/nc/RecoveryManager.java |   9 ++
 .../apache/asterix/test/txn/LogManagerTest.java    |   2 +-
 .../asterix/common/cluster/IGlobalTxManager.java   |   3 +-
 .../LSMInsertDeleteOperatorNodePushable.java       |   4 +-
 .../common/messaging/AtomicJobPreparedMessage.java |   7 +-
 .../common/transactions/ITransactionManager.java   |   5 +
 .../asterix/common/utils/StorageConstants.java     |   1 +
 .../org/apache/asterix/metadata/MetadataNode.java  |  10 ++
 .../metadata/bootstrap/MetadataBootstrap.java      |   2 +-
 .../LSMPrimaryInsertOperatorNodePushable.java      |   4 +-
 .../LSMPrimaryUpsertOperatorNodePushable.java      |   4 +-
 .../transaction/AtomicNoWALTransactionContext.java | 108 ++++++++++++++++++++-
 .../transaction/MetadataAtomicTransactionLog.java  |  64 ++++++++++++
 .../transaction/TransactionContextFactory.java     |   5 +-
 .../service/transaction/TransactionManager.java    |  31 +++++-
 16 files changed, 237 insertions(+), 25 deletions(-)

diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/GlobalTxManager.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/GlobalTxManager.java
index d69376aabf..f6456dc942 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/GlobalTxManager.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/GlobalTxManager.java
@@ -104,8 +104,7 @@ public class GlobalTxManager implements IGlobalTxManager {
     }
 
     @Override
-    public void handleJobPreparedMessage(JobId jobId, String nodeId, int 
datasetId,
-            Map<String, ILSMComponentId> componentIdMap) {
+    public void handleJobPreparedMessage(JobId jobId, String nodeId, 
Map<String, ILSMComponentId> componentIdMap) {
         IGlobalTransactionContext context = txnContextRepository.get(jobId);
         if (context == null) {
             LOGGER.warn("JobPreparedMessage received for jobId " + jobId
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
index f6eb123df3..54942509e3 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
@@ -61,6 +61,7 @@ import 
org.apache.asterix.common.transactions.ITransactionContext;
 import org.apache.asterix.common.transactions.ITransactionSubsystem;
 import org.apache.asterix.common.transactions.LogType;
 import org.apache.asterix.common.transactions.TxnId;
+import org.apache.asterix.common.utils.StorageConstants;
 import 
org.apache.asterix.transaction.management.opcallbacks.AbstractIndexModificationOperationCallback;
 import 
org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
 import org.apache.asterix.transaction.management.service.logging.LogManager;
@@ -157,6 +158,9 @@ public class RecoveryManager implements IRecoveryManager, 
ILifeCycleComponent {
     @Override
     public void startLocalRecovery(Set<Integer> partitions) throws 
IOException, ACIDException {
         state = SystemState.RECOVERING;
+        if (appCtx.isCloudDeployment()) {
+            doMetadataRecovery();
+        }
         LOGGER.info("starting recovery for partitions {}", partitions);
         long readableSmallestLSN = logMgr.getReadableSmallestLSN();
         Checkpoint checkpointObject = checkpointManager.getLatest();
@@ -171,6 +175,11 @@ public class RecoveryManager implements IRecoveryManager, 
ILifeCycleComponent {
         replayPartitionsLogs(partitions, logMgr.getLogReader(true), 
lowWaterMarkLSN, true);
     }
 
+    public synchronized void doMetadataRecovery() {
+        LOGGER.info("starting recovery for metadata partition {}", 
StorageConstants.METADATA_PARTITION);
+        
appCtx.getTransactionSubsystem().getTransactionManager().rollbackMetadataTransactionsWithoutWAL();
+    }
+
     public synchronized void replayPartitionsLogs(Set<Integer> partitions, 
ILogReader logReader, long lowWaterMarkLSN,
             boolean closeOnFlushRedo) throws IOException, ACIDException {
         try {
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/LogManagerTest.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/LogManagerTest.java
index fb08fe35a8..cea3832a57 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/LogManagerTest.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/LogManagerTest.java
@@ -175,7 +175,7 @@ public class LogManagerTest {
         LogRecord logRecord = new LogRecord();
         final long txnId = 1;
         logRecord.setTxnCtx(TransactionContextFactory.create(new TxnId(txnId),
-                new 
TransactionOptions(ITransactionManager.AtomicityLevel.ENTITY_LEVEL)));
+                new 
TransactionOptions(ITransactionManager.AtomicityLevel.ENTITY_LEVEL), ncAppCtx));
         logRecord.setLogSource(LogSource.LOCAL);
         logRecord.setLogType(LogType.WAIT);
         logRecord.setTxnId(txnId);
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IGlobalTxManager.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IGlobalTxManager.java
index 498d174d3f..956ae9ecf7 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IGlobalTxManager.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IGlobalTxManager.java
@@ -46,8 +46,7 @@ public interface IGlobalTxManager extends 
IJobLifecycleListener {
 
     IGlobalTransactionContext getTransactionContext(JobId jobId) throws 
ACIDException;
 
-    void handleJobPreparedMessage(JobId jobId, String nodeId, int datasetId,
-            Map<String, ILSMComponentId> componentIdMap);
+    void handleJobPreparedMessage(JobId jobId, String nodeId, Map<String, 
ILSMComponentId> componentIdMap);
 
     void handleJobCompletionMessage(JobId jobId, String nodeId);
 
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMInsertDeleteOperatorNodePushable.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMInsertDeleteOperatorNodePushable.java
index 64748ce923..1a23ca857c 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMInsertDeleteOperatorNodePushable.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMInsertDeleteOperatorNodePushable.java
@@ -249,7 +249,6 @@ public class LSMInsertDeleteOperatorNodePushable extends 
LSMIndexInsertUpdateDel
     private void commitAtomicInsertDelete() throws HyracksDataException {
         if (isPrimary) {
             final Map<String, ILSMComponentId> componentIdMap = new 
HashMap<>();
-            int datasetID = -1;
             boolean atomic = false;
             for (IIndex index : indexes) {
                 if (((ILSMIndex) index).isAtomic()) {
@@ -259,14 +258,13 @@ public class LSMInsertDeleteOperatorNodePushable extends 
LSMIndexInsertUpdateDel
                     for (Map.Entry<String, FlushOperation> entry : 
opTracker.getLastFlushOperation().entrySet()) {
                         componentIdMap.put(entry.getKey(), 
entry.getValue().getFlushingComponent().getId());
                     }
-                    datasetID = opTracker.getDatasetInfo().getDatasetID();
                     atomic = true;
                 }
             }
 
             if (atomic) {
                 AtomicJobPreparedMessage message = new 
AtomicJobPreparedMessage(ctx.getJobletContext().getJobId(),
-                        
ctx.getJobletContext().getServiceContext().getNodeId(), datasetID, 
componentIdMap);
+                        
ctx.getJobletContext().getServiceContext().getNodeId(), componentIdMap);
                 try {
                     ((NodeControllerService) 
ctx.getJobletContext().getServiceContext().getControllerService())
                             
.sendRealTimeApplicationMessageToCC(ctx.getJobletContext().getJobId().getCcId(),
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/AtomicJobPreparedMessage.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/AtomicJobPreparedMessage.java
index 8adbf49da6..b4832ff4fc 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/AtomicJobPreparedMessage.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/AtomicJobPreparedMessage.java
@@ -38,20 +38,17 @@ public class AtomicJobPreparedMessage implements 
ICcAddressedMessage {
     private static final long serialVersionUID = 1L;
     private final JobId jobId;
     private final String nodeId;
-    private final int datasetId;
     private final Map<String, ILSMComponentId> componentIdMap;
 
-    public AtomicJobPreparedMessage(JobId jobId, String nodeId, int datasetId,
-            Map<String, ILSMComponentId> componentIdMap) {
+    public AtomicJobPreparedMessage(JobId jobId, String nodeId, Map<String, 
ILSMComponentId> componentIdMap) {
         this.nodeId = nodeId;
-        this.datasetId = datasetId;
         this.componentIdMap = componentIdMap;
         this.jobId = jobId;
     }
 
     @Override
     public void handle(ICcApplicationContext appCtx) throws 
HyracksDataException, InterruptedException {
-        appCtx.getGlobalTxManager().handleJobPreparedMessage(jobId, nodeId, 
datasetId, componentIdMap);
+        appCtx.getGlobalTxManager().handleJobPreparedMessage(jobId, nodeId, 
componentIdMap);
     }
 
 }
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionManager.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionManager.java
index 30c693d219..66b5359944 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionManager.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionManager.java
@@ -112,4 +112,9 @@ public interface ITransactionManager {
      */
     void ensureMaxTxnId(long txnId);
 
+    /**
+     * Rollback incomplete metadata transactions without WAL during recovery.
+     */
+    void rollbackMetadataTransactionsWithoutWAL();
+
 }
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
index 1321c96306..ed9c48e68b 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
@@ -30,6 +30,7 @@ import 
org.apache.hyracks.storage.am.lsm.common.impls.ConcurrentMergePolicyFacto
  */
 public class StorageConstants {
 
+    public static final String METADATA_TXN_NOWAL_DIR_NAME = "mtd-txn-logs";
     public static final String STORAGE_ROOT_DIR_NAME = "storage";
     public static final String INGESTION_LOGS_DIR_NAME = "ingestion_logs";
     public static final String PARTITION_DIR_PREFIX = "partition_";
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
index 522fd32556..b40a4eb07e 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
@@ -128,6 +128,7 @@ import 
org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
 import org.apache.hyracks.api.exceptions.ErrorCode;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.util.HyracksConstants;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleReference;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
@@ -1742,6 +1743,12 @@ public class MetadataNode implements IMetadataNode {
         return sb.toString();
     }
 
+    private void setAtomicOpContext(IIndexAccessor accessor) {
+        Map<String, Object> indexAccessorOpContextParameters = new HashMap<>();
+        
indexAccessorOpContextParameters.put(HyracksConstants.ATOMIC_OP_CONTEXT, true);
+        ((ILSMIndexAccessor) 
accessor).getOpContext().setParameters(indexAccessorOpContextParameters);
+    }
+
     private <T> void searchIndex(TxnId txnId, IMetadataIndex index, 
ITupleReference searchKey,
             IValueExtractor<T> valueExtractor, List<T> results) throws 
AlgebricksException, HyracksDataException {
         IBinaryComparatorFactory[] comparatorFactories = 
index.getKeyBinaryComparatorFactory();
@@ -1753,6 +1760,9 @@ public class MetadataNode implements IMetadataNode {
         IIndex indexInstance = datasetLifecycleManager.get(resourceName);
         datasetLifecycleManager.open(resourceName);
         IIndexAccessor indexAccessor = 
indexInstance.createAccessor(NoOpIndexAccessParameters.INSTANCE);
+        if (atomicNoWAL) {
+            setAtomicOpContext(indexAccessor);
+        }
         try {
             IBinaryComparator[] searchCmps = null;
             MultiComparator searchCmp = null;
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
index c1eba7c0cd..be935d0fce 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
@@ -433,7 +433,7 @@ public class MetadataBootstrap {
                             
StorageConstants.DEFAULT_COMPACTION_POLICY_PROPERTIES, true, 
bloomFilterKeyFields,
                             bloomFilterFalsePositiveRate, true, null, 
NoOpCompressorDecompressorFactory.INSTANCE, true,
                             
TypeTraitProvider.INSTANCE.getTypeTrait(BuiltinType.ANULL), 
NullIntrospector.INSTANCE,
-                            false, false);
+                            false, appContext.isCloudDeployment());
             DatasetLocalResourceFactory dsLocalResourceFactory =
                     new DatasetLocalResourceFactory(datasetId, 
lsmBtreeFactory);
             // TODO(amoudi) Creating the index should be done through the same 
code path as
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java
index ec016bcf9d..2c89b614b1 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java
@@ -352,7 +352,6 @@ public class LSMPrimaryInsertOperatorNodePushable extends 
LSMIndexInsertUpdateDe
 
     private void commitAtomicInsert() throws HyracksDataException {
         final Map<String, ILSMComponentId> componentIdMap = new HashMap<>();
-        int datasetID = -1;
         boolean atomic = false;
         for (IIndex index : indexes) {
             if (((ILSMIndex) index).isAtomic()) {
@@ -362,14 +361,13 @@ public class LSMPrimaryInsertOperatorNodePushable extends 
LSMIndexInsertUpdateDe
                 for (Map.Entry<String, FlushOperation> entry : 
opTracker.getLastFlushOperation().entrySet()) {
                     componentIdMap.put(entry.getKey(), 
entry.getValue().getFlushingComponent().getId());
                 }
-                datasetID = opTracker.getDatasetInfo().getDatasetID();
                 atomic = true;
             }
         }
 
         if (atomic) {
             AtomicJobPreparedMessage message = new 
AtomicJobPreparedMessage(ctx.getJobletContext().getJobId(),
-                    ctx.getJobletContext().getServiceContext().getNodeId(), 
datasetID, componentIdMap);
+                    ctx.getJobletContext().getServiceContext().getNodeId(), 
componentIdMap);
             try {
                 ((NodeControllerService) 
ctx.getJobletContext().getServiceContext().getControllerService())
                         
.sendRealTimeApplicationMessageToCC(ctx.getJobletContext().getJobId().getCcId(),
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
index de95d60593..6b9d3ebaf2 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
@@ -570,7 +570,6 @@ public class LSMPrimaryUpsertOperatorNodePushable extends 
LSMIndexInsertUpdateDe
     // TODO: Refactor and remove duplicated code
     private void commitAtomicUpsert() throws HyracksDataException {
         final Map<String, ILSMComponentId> componentIdMap = new HashMap<>();
-        int datasetID = -1;
         boolean atomic = false;
         for (IIndex index : indexes) {
             if (((ILSMIndex) index).isAtomic()) {
@@ -580,14 +579,13 @@ public class LSMPrimaryUpsertOperatorNodePushable extends 
LSMIndexInsertUpdateDe
                 for (Map.Entry<String, FlushOperation> entry : 
opTracker.getLastFlushOperation().entrySet()) {
                     componentIdMap.put(entry.getKey(), 
entry.getValue().getFlushingComponent().getId());
                 }
-                datasetID = opTracker.getDatasetInfo().getDatasetID();
                 atomic = true;
             }
         }
 
         if (atomic) {
             AtomicJobPreparedMessage message = new 
AtomicJobPreparedMessage(ctx.getJobletContext().getJobId(),
-                    ctx.getJobletContext().getServiceContext().getNodeId(), 
datasetID, componentIdMap);
+                    ctx.getJobletContext().getServiceContext().getNodeId(), 
componentIdMap);
             try {
                 ((NodeControllerService) 
ctx.getJobletContext().getServiceContext().getControllerService())
                         
.sendRealTimeApplicationMessageToCC(ctx.getJobletContext().getJobId().getCcId(),
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicNoWALTransactionContext.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicNoWALTransactionContext.java
index 3576ba1388..aa698beecc 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicNoWALTransactionContext.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicNoWALTransactionContext.java
@@ -18,25 +18,46 @@
  */
 package org.apache.asterix.transaction.management.service.transaction;
 
+import java.nio.file.Paths;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
+import org.apache.asterix.common.api.IDatasetLifecycleManager;
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.context.IndexInfo;
 import org.apache.asterix.common.context.PrimaryIndexOperationTracker;
 import org.apache.asterix.common.dataflow.LSMIndexUtil;
 import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.asterix.common.storage.IIndexCheckpointManager;
+import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
+import org.apache.asterix.common.storage.IndexCheckpoint;
+import org.apache.asterix.common.storage.ResourceReference;
 import org.apache.asterix.common.transactions.ITransactionManager;
 import org.apache.asterix.common.transactions.LogRecord;
 import org.apache.asterix.common.transactions.TxnId;
+import org.apache.asterix.common.utils.StorageConstants;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.IIOManager;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
 import org.apache.hyracks.storage.am.lsm.common.impls.FlushOperation;
 import org.apache.hyracks.util.annotations.ThreadSafe;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
 @ThreadSafe
 public class AtomicNoWALTransactionContext extends AtomicTransactionContext {
 
-    public AtomicNoWALTransactionContext(TxnId txnId) {
+    private final INcApplicationContext appCtx;
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+    public AtomicNoWALTransactionContext(TxnId txnId, INcApplicationContext 
appCtx) {
         super(txnId);
+        this.appCtx = appCtx;
     }
 
     @Override
@@ -59,7 +80,7 @@ public class AtomicNoWALTransactionContext extends 
AtomicTransactionContext {
         for (ILSMOperationTracker opTrackerRef : modifiedIndexes) {
             PrimaryIndexOperationTracker primaryIndexOpTracker = 
(PrimaryIndexOperationTracker) opTrackerRef;
             try {
-                primaryIndexOpTracker.deleteMemoryComponent(true);
+                primaryIndexOpTracker.abort();
             } catch (HyracksDataException e) {
                 throw new ACIDException(e);
             }
@@ -68,19 +89,102 @@ public class AtomicNoWALTransactionContext extends 
AtomicTransactionContext {
 
     private void ensureDurable() {
         List<FlushOperation> flushes = new ArrayList<>();
+        List<Integer> datasetIds = new ArrayList<>();
+        Map<String, ILSMComponentId> resourceMap = new HashMap<>();
         LogRecord dummyLogRecord = new LogRecord();
         try {
             for (ILSMOperationTracker opTrackerRef : modifiedIndexes) {
                 PrimaryIndexOperationTracker primaryIndexOpTracker = 
(PrimaryIndexOperationTracker) opTrackerRef;
                 primaryIndexOpTracker.triggerScheduleFlush(dummyLogRecord);
                 flushes.addAll(primaryIndexOpTracker.getScheduledFlushes());
+                
datasetIds.add(primaryIndexOpTracker.getDatasetInfo().getDatasetID());
+                for (Map.Entry<String, FlushOperation> entry : 
primaryIndexOpTracker.getLastFlushOperation()
+                        .entrySet()) {
+                    resourceMap.put(entry.getKey(), 
entry.getValue().getFlushingComponent().getId());
+                }
             }
             LSMIndexUtil.waitFor(flushes);
+            persistLogFile(datasetIds, resourceMap);
+        } catch (Exception e) {
+            deleteUncommittedRecords();
+            throw new ACIDException(e);
+        }
+        try {
+            commit();
+        } catch (Exception e) {
+            rollback(resourceMap);
+            throw new ACIDException(e);
+        } finally {
+            deleteLogFile();
+        }
+        enableMerge();
+    }
+
+    private void persistLogFile(List<Integer> datasetIds, Map<String, 
ILSMComponentId> resourceMap)
+            throws HyracksDataException, JsonProcessingException {
+        IIOManager ioManager = appCtx.getIoManager();
+        FileReference fref = 
ioManager.resolve(Paths.get(StorageConstants.METADATA_TXN_NOWAL_DIR_NAME,
+                StorageConstants.PARTITION_DIR_PREFIX + 
StorageConstants.METADATA_PARTITION,
+                String.format("%s.log", txnId)).toString());
+        MetadataAtomicTransactionLog txnLog = new 
MetadataAtomicTransactionLog(txnId, datasetIds,
+                appCtx.getServiceContext().getNodeId(), resourceMap);
+        ioManager.overwrite(fref, 
OBJECT_MAPPER.writerWithDefaultPrettyPrinter().writeValueAsString(txnLog).getBytes());
+    }
+
+    public void deleteLogFile() {
+        IIOManager ioManager = appCtx.getIoManager();
+        try {
+            FileReference fref = 
ioManager.resolve(Paths.get(StorageConstants.METADATA_TXN_NOWAL_DIR_NAME,
+                    StorageConstants.PARTITION_DIR_PREFIX + 
StorageConstants.METADATA_PARTITION,
+                    String.format("%s.log", txnId)).toString());
+            ioManager.delete(fref);
         } catch (HyracksDataException e) {
             throw new ACIDException(e);
         }
     }
 
+    private void commit() throws HyracksDataException {
+        for (ILSMOperationTracker opTrackerRef : modifiedIndexes) {
+            PrimaryIndexOperationTracker primaryIndexOpTracker = 
(PrimaryIndexOperationTracker) opTrackerRef;
+            primaryIndexOpTracker.commit();
+        }
+    }
+
+    private void enableMerge() {
+        for (ILSMOperationTracker opTrackerRef : modifiedIndexes) {
+            PrimaryIndexOperationTracker primaryIndexOpTracker = 
(PrimaryIndexOperationTracker) opTrackerRef;
+            for (IndexInfo indexInfo : 
primaryIndexOpTracker.getDatasetInfo().getIndexes().values()) {
+                if (indexInfo.getIndex().isPrimaryIndex()) {
+                    try {
+                        
indexInfo.getIndex().getMergePolicy().diskComponentAdded(indexInfo.getIndex(), 
false);
+                    } catch (HyracksDataException e) {
+                        throw new ACIDException(e);
+                    }
+                }
+            }
+        }
+    }
+
+    public void rollback(Map<String, ILSMComponentId> resourceMap) {
+        deleteUncommittedRecords();
+        IDatasetLifecycleManager datasetLifecycleManager = 
appCtx.getDatasetLifecycleManager();
+        IIndexCheckpointManagerProvider indexCheckpointManagerProvider =
+                datasetLifecycleManager.getIndexCheckpointManagerProvider();
+        resourceMap.forEach((k, v) -> {
+            try {
+                IIndexCheckpointManager checkpointManager = 
indexCheckpointManagerProvider.get(ResourceReference.of(k));
+                if (checkpointManager.getCheckpointCount() > 0) {
+                    IndexCheckpoint checkpoint = checkpointManager.getLatest();
+                    if (checkpoint.getLastComponentId() == v.getMaxId()) {
+                        checkpointManager.deleteLatest(v.getMaxId(), 1);
+                    }
+                }
+            } catch (HyracksDataException e) {
+                throw new ACIDException(e);
+            }
+        });
+    }
+
     @Override
     public boolean hasWAL() {
         return false;
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/MetadataAtomicTransactionLog.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/MetadataAtomicTransactionLog.java
new file mode 100644
index 0000000000..7b3af3f76a
--- /dev/null
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/MetadataAtomicTransactionLog.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.transaction.management.service.transaction;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.asterix.common.transactions.TxnId;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class MetadataAtomicTransactionLog {
+
+    private TxnId txnId;
+    private List<Integer> datasetIds;
+    private String nodeId;
+    private Map<String, ILSMComponentId> resourceMap;
+
+    @JsonCreator
+    public MetadataAtomicTransactionLog(@JsonProperty("txnId") TxnId txnId,
+            @JsonProperty("datasetIds") List<Integer> datasetIds, 
@JsonProperty("nodeId") String nodeId,
+            @JsonProperty("resourceMap") Map<String, ILSMComponentId> 
resourceMap) {
+        this.txnId = txnId;
+        this.datasetIds = datasetIds;
+        this.nodeId = nodeId;
+        this.resourceMap = resourceMap;
+    }
+
+    public TxnId getTxnId() {
+        return txnId;
+    }
+
+    public List<Integer> getDatasetIds() {
+        return datasetIds;
+    }
+
+    public String getNodeId() {
+        return nodeId;
+    }
+
+    public Map<String, ILSMComponentId> getResourceMap() {
+        return resourceMap;
+    }
+}
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionContextFactory.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionContextFactory.java
index 1076086346..e1e2ca8156 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionContextFactory.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionContextFactory.java
@@ -20,6 +20,7 @@ package 
org.apache.asterix.transaction.management.service.transaction;
 
 import static 
org.apache.asterix.common.transactions.ITransactionManager.AtomicityLevel;
 
+import org.apache.asterix.common.api.INcApplicationContext;
 import org.apache.asterix.common.transactions.ITransactionContext;
 import org.apache.asterix.common.transactions.TransactionOptions;
 import org.apache.asterix.common.transactions.TxnId;
@@ -29,13 +30,13 @@ public class TransactionContextFactory {
     private TransactionContextFactory() {
     }
 
-    public static ITransactionContext create(TxnId txnId, TransactionOptions 
options) {
+    public static ITransactionContext create(TxnId txnId, TransactionOptions 
options, INcApplicationContext appCtx) {
         final AtomicityLevel atomicityLevel = options.getAtomicityLevel();
         switch (atomicityLevel) {
             case ATOMIC:
                 return new AtomicTransactionContext(txnId);
             case ATOMIC_NO_WAL:
-                return new AtomicNoWALTransactionContext(txnId);
+                return new AtomicNoWALTransactionContext(txnId, appCtx);
             case ENTITY_LEVEL:
                 return new EntityLevelTransactionContext(txnId);
             default:
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java
index 8bbfbfdb85..ea7fcb89c8 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java
@@ -22,6 +22,7 @@ import static 
org.apache.asterix.transaction.management.service.transaction.Tran
 
 import java.io.IOException;
 import java.io.OutputStream;
+import java.nio.file.Paths;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
@@ -34,14 +35,19 @@ import 
org.apache.asterix.common.transactions.ITransactionSubsystem;
 import org.apache.asterix.common.transactions.LogRecord;
 import org.apache.asterix.common.transactions.TransactionOptions;
 import org.apache.asterix.common.transactions.TxnId;
+import org.apache.asterix.common.utils.StorageConstants;
 import org.apache.asterix.common.utils.TransactionUtil;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.IIOManager;
 import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
 import org.apache.hyracks.util.annotations.ThreadSafe;
 import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
+
 @ThreadSafe
 public class TransactionManager implements ITransactionManager, 
ILifeCycleComponent {
 
@@ -61,7 +67,7 @@ public class TransactionManager implements 
ITransactionManager, ILifeCycleCompon
         if (txnCtx != null) {
             throw new ACIDException("Transaction with the same (" + txnId + ") 
already exists");
         }
-        txnCtx = TransactionContextFactory.create(txnId, options);
+        txnCtx = TransactionContextFactory.create(txnId, options, 
txnSubsystem.getApplicationContext());
         txnCtxRepository.put(txnId, txnCtx);
         ensureMaxTxnId(txnId.getId());
         return txnCtx;
@@ -187,4 +193,27 @@ public class TransactionManager implements 
ITransactionManager, ILifeCycleCompon
             LOGGER.log(Level.WARN, "exception while dumping state", e);
         }
     }
+
+    @Override
+    public void rollbackMetadataTransactionsWithoutWAL() {
+        IIOManager ioManager = 
txnSubsystem.getApplicationContext().getIoManager();
+        try {
+            Set<FileReference> txnLogFileRefs =
+                    ioManager.list(ioManager.resolve(Paths
+                            .get(StorageConstants.METADATA_TXN_NOWAL_DIR_NAME,
+                                    StorageConstants.PARTITION_DIR_PREFIX + 
StorageConstants.METADATA_PARTITION)
+                            .toString()));
+            ObjectMapper objectMapper = new ObjectMapper();
+            for (FileReference txnLogFileRef : txnLogFileRefs) {
+                MetadataAtomicTransactionLog atomicTransactionLog = 
objectMapper.readValue(
+                        new String(ioManager.readAllBytes(txnLogFileRef)), 
MetadataAtomicTransactionLog.class);
+                AtomicNoWALTransactionContext context = new 
AtomicNoWALTransactionContext(
+                        atomicTransactionLog.getTxnId(), 
txnSubsystem.getApplicationContext());
+                context.rollback(atomicTransactionLog.getResourceMap());
+                context.deleteLogFile();
+            }
+        } catch (Exception e) {
+            throw new ACIDException(e);
+        }
+    }
 }


Reply via email to