This is an automated email from the ASF dual-hosted git repository.

peeyush pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 0bf8f123d5 [NO ISSUE][TX]: Statement level atomicity for 
inserts/upserts/deletes
0bf8f123d5 is described below

commit 0bf8f123d500fb7bd1bbf91e1c4b29cd064fa258
Author: Peeyush Gupta <[email protected]>
AuthorDate: Mon Jul 10 10:57:20 2023 -0700

    [NO ISSUE][TX]: Statement level atomicity for inserts/upserts/deletes
    
    - user model changes: yes
    - storage format changes: no
    - interface changes: yes
    
    Details:
    With this change we introduce statement level atomicity for
    inserts/upserts/deletes. The statement level atomicity is
    only enabled for datasets created  without any type specification.
    
    Change-Id: I3b4aefeba07be921d128255393aec1b703198a40
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17598
    Integration-Tests: Jenkins <[email protected]>
    Tested-by: Jenkins <[email protected]>
    Reviewed-by: Peeyush Gupta <[email protected]>
    Reviewed-by: Murtadha Al Hubail <[email protected]>
---
 .../asterix/app/cc/CcApplicationContext.java       |  21 +-
 .../org/apache/asterix/app/cc/GlobalTxManager.java | 218 +++++++++++++++++++++
 .../app/message/AtomicJobCommitMessage.java        |  68 +++++++
 .../app/message/AtomicJobCompletionMessage.java    |  47 +++++
 .../message/AtomicJobRollbackCompleteMessage.java  |  48 +++++
 .../app/message/AtomicJobRollbackMessage.java      |  83 ++++++++
 .../asterix/app/message/EnableMergeMessage.java    |  51 +++++
 .../asterix/app/nc/IndexCheckpointManager.java     |  16 ++
 .../asterix/app/translator/QueryTranslator.java    | 136 +++++++++++--
 .../asterix/hyracks/bootstrap/CCApplication.java   |  32 ++-
 .../hyracks/bootstrap/GlobalRecoveryManager.java   |   5 +
 .../atomic-statements-1.01.ddl.sqlpp               |  35 +---
 .../atomic-statements-1.02.update.sqlpp            |  44 ++---
 .../atomic-statements-1.03.query.sqlpp             |  35 +---
 .../atomic-statements-1.04.update.sqlpp            |  44 ++---
 .../atomic-statements-1.05.query.sqlpp             |  35 +---
 .../atomic-statements-1.06.ddl.sqlpp               |  33 +---
 .../atomic-statements-2.01.ddl.sqlpp               |  35 +---
 .../atomic-statements-2.02.query.sqlpp             |  37 +---
 .../atomic-statements-2.03.update.sqlpp            |  41 ++--
 .../atomic-statements-2.04.query.sqlpp             |  35 +---
 .../atomic-statements-2.05.ddl.sqlpp               |  33 +---
 .../atomic-statements-1/atomic-statements-1.03.adm |   1 +
 .../atomic-statements-1/atomic-statements-1.05.adm |   1 +
 .../atomic-statements-2/atomic-statements-2.02.adm |   3 +
 .../atomic-statements-2/atomic-statements-2.04.adm |   1 +
 .../test/resources/runtimets/testsuite_sqlpp.xml   |  14 ++
 .../common/api/IDatasetLifecycleManager.java       |   3 +
 .../apache/asterix/common/api/INodeJobTracker.java |   3 +
 .../asterix/common/cluster/IGlobalTxManager.java   |  58 ++++++
 .../common/context/DatasetLifecycleManager.java    |   5 +
 .../context/PrimaryIndexOperationTracker.java      |  42 ++--
 .../common/dataflow/ICcApplicationContext.java     |   6 +
 .../LSMInsertDeleteOperatorNodePushable.java       |  32 ++-
 .../common/messaging/AtomicJobPreparedMessage.java |  57 ++++++
 .../common/storage/IIndexCheckpointManager.java    |   2 +
 .../transactions/IGlobalTransactionContext.java    |  55 ++++++
 .../asterix/common/utils/StorageConstants.java     |   3 +
 .../runtime/job/listener/NodeJobTracker.java       |   8 +
 .../LSMPrimaryInsertOperatorNodePushable.java      |  32 ++-
 .../LSMPrimaryUpsertOperatorNodePushable.java      |  33 +++-
 .../management/runtime/NoOpCommitRuntime.java      |   5 +
 .../transaction/AtomicNoWALTransactionContext.java |   2 +-
 .../service/transaction/AtomicTransactionLog.java  |  72 +++++++
 .../transaction/GlobalTransactionContext.java      | 155 +++++++++++++++
 .../service/transaction/GlobalTxInfo.java          |  50 +++--
 hyracks-fullstack/hyracks/hyracks-api/pom.xml      |   4 +
 .../java/org/apache/hyracks/api/job/JobId.java     |   8 +
 .../hyracks/hyracks-storage-am-lsm-common/pom.xml  |   4 +
 .../storage/am/lsm/common/api/ILSMComponentId.java |   7 +
 .../am/lsm/common/impls/AbstractLSMIndex.java      |   1 -
 .../am/lsm/common/impls/LSMComponentId.java        |  10 +-
 52 files changed, 1404 insertions(+), 405 deletions(-)

diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CcApplicationContext.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CcApplicationContext.java
index 0b8024bd96..bb7be73e31 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CcApplicationContext.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CcApplicationContext.java
@@ -35,6 +35,7 @@ import org.apache.asterix.common.api.IReceptionistFactory;
 import org.apache.asterix.common.api.IRequestTracker;
 import org.apache.asterix.common.cluster.IClusterStateManager;
 import org.apache.asterix.common.cluster.IGlobalRecoveryManager;
+import org.apache.asterix.common.cluster.IGlobalTxManager;
 import org.apache.asterix.common.config.ActiveProperties;
 import org.apache.asterix.common.config.BuildProperties;
 import org.apache.asterix.common.config.CloudProperties;
@@ -74,6 +75,7 @@ import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.IJobLifecycleListener;
 import org.apache.hyracks.api.result.IResultSet;
 import org.apache.hyracks.client.result.ResultSet;
+import org.apache.hyracks.control.nc.io.IOManager;
 import org.apache.hyracks.ipc.impl.HyracksConnection;
 import org.apache.hyracks.storage.common.IStorageManager;
 import org.apache.hyracks.util.NetworkUtil;
@@ -119,6 +121,8 @@ public class CcApplicationContext implements 
ICcApplicationContext {
     private final IAdapterFactoryService adapterFactoryService;
     private final ReentrantReadWriteLock compilationLock = new 
ReentrantReadWriteLock(true);
     private final IDataPartitioningProvider dataPartitioningProvider;
+    private final IGlobalTxManager globalTxManager;
+    private final IOManager ioManager;
 
     public CcApplicationContext(ICCServiceContext ccServiceCtx, 
HyracksConnection hcc,
             Supplier<IMetadataBootstrap> metadataBootstrapSupplier, 
IGlobalRecoveryManager globalRecoveryManager,
@@ -126,7 +130,8 @@ public class CcApplicationContext implements 
ICcApplicationContext {
             IStorageComponentProvider storageComponentProvider, 
IMetadataLockManager mdLockManager,
             IMetadataLockUtil mdLockUtil, IReceptionistFactory 
receptionistFactory,
             IConfigValidatorFactory configValidatorFactory, Object 
extensionManager,
-            IAdapterFactoryService adapterFactoryService) throws 
AlgebricksException, IOException {
+            IAdapterFactoryService adapterFactoryService, IGlobalTxManager 
globalTxManager, IOManager ioManager,
+            CloudProperties cloudProperties) throws AlgebricksException, 
IOException {
         this.ccServiceCtx = ccServiceCtx;
         this.hcc = hcc;
         this.activeLifeCycleListener = activeLifeCycleListener;
@@ -142,7 +147,7 @@ public class CcApplicationContext implements 
ICcApplicationContext {
         activeProperties = new ActiveProperties(propertiesAccessor);
         extensionProperties = new ExtensionProperties(propertiesAccessor);
         replicationProperties = new ReplicationProperties(propertiesAccessor);
-        cloudProperties = new CloudProperties(propertiesAccessor);
+        this.cloudProperties = cloudProperties;
         this.ftStrategy = ftStrategy;
         this.buildProperties = new BuildProperties(propertiesAccessor);
         this.messagingProperties = new MessagingProperties(propertiesAccessor);
@@ -163,6 +168,8 @@ public class CcApplicationContext implements 
ICcApplicationContext {
         configValidator = configValidatorFactory.create();
         this.adapterFactoryService = adapterFactoryService;
         dataPartitioningProvider = DataPartitioningProvider.create(this);
+        this.globalTxManager = globalTxManager;
+        this.ioManager = ioManager;
     }
 
     @Override
@@ -381,4 +388,14 @@ public class CcApplicationContext implements 
ICcApplicationContext {
     public CloudProperties getCloudProperties() {
         return cloudProperties;
     }
+
+    @Override
+    public IGlobalTxManager getGlobalTxManager() {
+        return globalTxManager;
+    }
+
+    @Override
+    public IOManager getIoManager() {
+        return ioManager;
+    }
 }
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/GlobalTxManager.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/GlobalTxManager.java
new file mode 100644
index 0000000000..beb55d84b2
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/GlobalTxManager.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.cc;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.asterix.app.message.AtomicJobCommitMessage;
+import org.apache.asterix.app.message.AtomicJobRollbackMessage;
+import org.apache.asterix.app.message.EnableMergeMessage;
+import org.apache.asterix.common.cluster.IGlobalTxManager;
+import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.asterix.common.messaging.api.ICCMessageBroker;
+import org.apache.asterix.common.transactions.IGlobalTransactionContext;
+import org.apache.asterix.common.utils.StorageConstants;
+import 
org.apache.asterix.transaction.management.service.transaction.GlobalTransactionContext;
+import 
org.apache.asterix.transaction.management.service.transaction.GlobalTxInfo;
+import org.apache.hyracks.api.application.ICCServiceContext;
+import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.control.nc.io.IOManager;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class GlobalTxManager implements IGlobalTxManager {
+
+    private static final Logger LOGGER = LogManager.getLogger();
+    private final Map<JobId, IGlobalTransactionContext> txnContextRepository = 
new ConcurrentHashMap<>();
+    private final ICCServiceContext serviceContext;
+    private final IOManager ioManager;
+    public static final String GlOBAL_TX_PROPERTY_NAME = "GlobalTxProperty";
+
+    public GlobalTxManager(ICCServiceContext serviceContext, IOManager 
ioManager) {
+        this.serviceContext = serviceContext;
+        this.ioManager = ioManager;
+    }
+
+    @Override
+    public IGlobalTransactionContext beginTransaction(JobId jobId, int 
numParticipatingNodes,
+            int numParticipatingPartitions, List<Integer> 
participatingDatasetIds) throws ACIDException {
+        GlobalTransactionContext context = new GlobalTransactionContext(jobId, 
participatingDatasetIds,
+                numParticipatingNodes, numParticipatingPartitions);
+        txnContextRepository.put(jobId, context);
+        return context;
+    }
+
+    @Override
+    public void commitTransaction(JobId jobId) throws ACIDException {
+        IGlobalTransactionContext context = getTransactionContext(jobId);
+        if (context.getTxnStatus() == TransactionStatus.ACTIVE
+                || context.getTxnStatus() == TransactionStatus.PREPARED) {
+            synchronized (context) {
+                try {
+                    context.wait();
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                    throw new ACIDException(e);
+                }
+            }
+        }
+        txnContextRepository.remove(jobId);
+    }
+
+    @Override
+    public IGlobalTransactionContext getTransactionContext(JobId jobId) throws 
ACIDException {
+        IGlobalTransactionContext context = txnContextRepository.get(jobId);
+        if (context == null) {
+            throw new ACIDException("Transaction for jobId " + jobId + " does 
not exist");
+        }
+        return context;
+    }
+
+    @Override
+    public void handleJobPreparedMessage(JobId jobId, String nodeId, int 
datasetId,
+            Map<String, ILSMComponentId> componentIdMap) {
+        IGlobalTransactionContext context = getTransactionContext(jobId);
+        if (context.getNodeResourceMap().containsKey(nodeId)) {
+            context.getNodeResourceMap().get(nodeId).putAll(componentIdMap);
+        } else {
+            context.getNodeResourceMap().put(nodeId, componentIdMap);
+        }
+        if (context.incrementAndGetAcksReceived() == 
context.getNumPartitions()) {
+            context.setTxnStatus(TransactionStatus.PREPARED);
+            context.persist(ioManager);
+            context.resetAcksReceived();
+            sendJobCommitMessages(context);
+        }
+    }
+
+    private void sendJobCommitMessages(IGlobalTransactionContext context) {
+        for (String nodeId : context.getNodeResourceMap().keySet()) {
+            AtomicJobCommitMessage message = new 
AtomicJobCommitMessage(context.getJobId(), context.getDatasetIds());
+            try {
+                ((ICCMessageBroker) 
serviceContext.getMessageBroker()).sendRealTimeApplicationMessageToNC(message,
+                        nodeId);
+            } catch (Exception e) {
+                throw new ACIDException(e);
+            }
+        }
+    }
+
+    @Override
+    public void handleJobCompletionMessage(JobId jobId, String nodeId) {
+        IGlobalTransactionContext context = getTransactionContext(jobId);
+        if (context.incrementAndGetAcksReceived() == context.getNumNodes()) {
+            context.delete(ioManager);
+            context.setTxnStatus(TransactionStatus.COMMITTED);
+            sendEnableMergeMessages(context);
+            synchronized (context) {
+                context.notifyAll();
+            }
+        }
+    }
+
+    @Override
+    public void handleJobRollbackCompletionMessage(JobId jobId, String nodeId) 
{
+        IGlobalTransactionContext context = getTransactionContext(jobId);
+        if (context.incrementAndGetAcksReceived() == context.getNumNodes()) {
+            context.setTxnStatus(TransactionStatus.ROLLBACK);
+            context.delete(ioManager);
+            synchronized (context) {
+                context.notifyAll();
+            }
+        }
+    }
+
+    private void sendEnableMergeMessages(IGlobalTransactionContext context) {
+        for (String nodeId : context.getNodeResourceMap().keySet()) {
+            for (Integer datasetId : context.getDatasetIds()) {
+                EnableMergeMessage message = new 
EnableMergeMessage(context.getJobId(), datasetId);
+                try {
+                    ((ICCMessageBroker) 
serviceContext.getMessageBroker()).sendRealTimeApplicationMessageToNC(message,
+                            nodeId);
+                } catch (Exception e) {
+                    throw new ACIDException(e);
+                }
+            }
+        }
+    }
+
+    @Override
+    public void rollback() throws Exception {
+        Set<FileReference> txnLogFileRefs = 
ioManager.list(ioManager.resolve(StorageConstants.CC_TX_LOG_DIR));
+        for (FileReference txnLogFileRef : txnLogFileRefs) {
+            IGlobalTransactionContext context = new 
GlobalTransactionContext(txnLogFileRef, ioManager);
+            txnContextRepository.put(context.getJobId(), context);
+            sendJobRollbackMessages(context);
+        }
+    }
+
+    private void sendJobRollbackMessages(IGlobalTransactionContext context) 
throws Exception {
+        JobId jobId = context.getJobId();
+        for (String nodeId : context.getNodeResourceMap().keySet()) {
+            AtomicJobRollbackMessage rollbackMessage = new 
AtomicJobRollbackMessage(jobId, context.getDatasetIds(),
+                    context.getNodeResourceMap().get(nodeId));
+            ((ICCMessageBroker) 
serviceContext.getMessageBroker()).sendRealTimeApplicationMessageToNC(rollbackMessage,
+                    nodeId);
+        }
+        synchronized (context) {
+            try {
+                context.wait();
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                throw new ACIDException(e);
+            }
+        }
+        txnContextRepository.remove(jobId);
+    }
+
+    @Override
+    public void abortTransaction(JobId jobId) throws Exception {
+        IGlobalTransactionContext context = getTransactionContext(jobId);
+        if (context.getTxnStatus() == TransactionStatus.PREPARED) {
+            sendJobRollbackMessages(context);
+        }
+        txnContextRepository.remove(jobId);
+    }
+
+    @Override
+    public void notifyJobCreation(JobId jobId, JobSpecification spec) throws 
HyracksException {
+        GlobalTxInfo globalTxInfo = (GlobalTxInfo) 
spec.getProperty(GlOBAL_TX_PROPERTY_NAME);
+        if (globalTxInfo != null) {
+            beginTransaction(jobId, globalTxInfo.getNumNodes(), 
globalTxInfo.getNumPartitions(),
+                    globalTxInfo.getDatasetIds());
+        }
+    }
+
+    @Override
+    public void notifyJobStart(JobId jobId) throws HyracksException {
+
+    }
+
+    @Override
+    public void notifyJobFinish(JobId jobId, JobStatus jobStatus, 
List<Exception> exceptions) throws HyracksException {
+    }
+}
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/AtomicJobCommitMessage.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/AtomicJobCommitMessage.java
new file mode 100644
index 0000000000..55ae22557f
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/AtomicJobCommitMessage.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.message;
+
+import java.util.List;
+
+import org.apache.asterix.common.api.IDatasetLifecycleManager;
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.context.IndexInfo;
+import org.apache.asterix.common.context.PrimaryIndexOperationTracker;
+import org.apache.asterix.common.messaging.api.INcAddressedMessage;
+import org.apache.asterix.messaging.NCMessageBroker;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+/**
+ * Message sent from CC to all NCs asking to commit an atomic statement/job.
+ */
+public class AtomicJobCommitMessage implements INcAddressedMessage {
+
+    private static final Logger LOGGER = LogManager.getLogger();
+    private static final long serialVersionUID = 1L;
+    private final JobId jobId;
+    private final List<Integer> datasetIds;
+
+    public AtomicJobCommitMessage(JobId jobId, List<Integer> datasetIds) {
+        this.jobId = jobId;
+        this.datasetIds = datasetIds;
+    }
+
+    @Override
+    public void handle(INcApplicationContext appCtx) throws 
HyracksDataException, InterruptedException {
+        IDatasetLifecycleManager datasetLifecycleManager = 
appCtx.getDatasetLifecycleManager();
+        for (Integer datasetId : datasetIds) {
+            for (IndexInfo indexInfo : 
datasetLifecycleManager.getDatasetInfo(datasetId).getIndexes().values()) {
+                if (indexInfo.getIndex().isPrimaryIndex()) {
+                    ((PrimaryIndexOperationTracker) 
indexInfo.getIndex().getOperationTracker()).commit();
+                }
+            }
+        }
+        AtomicJobCompletionMessage message =
+                new AtomicJobCompletionMessage(jobId, 
appCtx.getServiceContext().getNodeId());
+        NCMessageBroker mb = (NCMessageBroker) 
appCtx.getServiceContext().getMessageBroker();
+        try {
+            mb.sendRealTimeMessageToCC(jobId.getCcId(), message);
+        } catch (Exception e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+}
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/AtomicJobCompletionMessage.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/AtomicJobCompletionMessage.java
new file mode 100644
index 0000000000..869e5d2522
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/AtomicJobCompletionMessage.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.message;
+
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.messaging.api.ICcAddressedMessage;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+/**
+ * Message sent from NC to CC on successful local commit of an atomic 
statement/job.
+ */
+public class AtomicJobCompletionMessage implements ICcAddressedMessage {
+
+    private static final Logger LOGGER = LogManager.getLogger();
+    private static final long serialVersionUID = 1L;
+    private final String nodeId;
+    private final JobId jobId;
+
+    public AtomicJobCompletionMessage(JobId jobId, String nodeId) {
+        this.jobId = jobId;
+        this.nodeId = nodeId;
+    }
+
+    @Override
+    public void handle(ICcApplicationContext appCtx) throws 
HyracksDataException, InterruptedException {
+        appCtx.getGlobalTxManager().handleJobCompletionMessage(jobId, nodeId);
+    }
+}
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/AtomicJobRollbackCompleteMessage.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/AtomicJobRollbackCompleteMessage.java
new file mode 100644
index 0000000000..90e1fbdf2f
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/AtomicJobRollbackCompleteMessage.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.message;
+
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.messaging.api.ICcAddressedMessage;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+/**
+ * Message sent from an NC to CC after successful local rollback of an atomic 
statement/job.
+ */
+public class AtomicJobRollbackCompleteMessage implements ICcAddressedMessage {
+
+    private static final Logger LOGGER = LogManager.getLogger();
+    private static final long serialVersionUID = 1L;
+    private final String nodeId;
+    private final JobId jobId;
+
+    public AtomicJobRollbackCompleteMessage(JobId jobId, String nodeId) {
+        this.jobId = jobId;
+        this.nodeId = nodeId;
+    }
+
+    @Override
+    public void handle(ICcApplicationContext appCtx) throws 
HyracksDataException, InterruptedException {
+        appCtx.getGlobalTxManager().handleJobRollbackCompletionMessage(jobId, 
nodeId);
+    }
+
+}
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/AtomicJobRollbackMessage.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/AtomicJobRollbackMessage.java
new file mode 100644
index 0000000000..aeb6be2e8c
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/AtomicJobRollbackMessage.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.message;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.asterix.common.api.IDatasetLifecycleManager;
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.messaging.api.INcAddressedMessage;
+import org.apache.asterix.common.storage.IIndexCheckpointManager;
+import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
+import org.apache.asterix.common.storage.IndexCheckpoint;
+import org.apache.asterix.common.storage.ResourceReference;
+import org.apache.asterix.messaging.NCMessageBroker;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+/**
+ * Message sent from CC to all NCs to rollback an atomic statement/job.
+ */
+public class AtomicJobRollbackMessage implements INcAddressedMessage {
+
+    private static final Logger LOGGER = LogManager.getLogger();
+    private static final long serialVersionUID = 1L;
+    private final JobId jobId;
+    private final List<Integer> datasetIds;
+    private final Map<String, ILSMComponentId> componentIdMap;
+
+    public AtomicJobRollbackMessage(JobId jobId, List<Integer> datasetIds,
+            Map<String, ILSMComponentId> componentIdMap) {
+        this.jobId = jobId;
+        this.datasetIds = datasetIds;
+        this.componentIdMap = componentIdMap;
+    }
+
+    @Override
+    public void handle(INcApplicationContext appCtx) throws 
HyracksDataException, InterruptedException {
+        IDatasetLifecycleManager datasetLifecycleManager = 
appCtx.getDatasetLifecycleManager();
+        IIndexCheckpointManagerProvider indexCheckpointManagerProvider =
+                datasetLifecycleManager.getIndexCheckpointManagerProvider();
+        componentIdMap.forEach((k, v) -> {
+            try {
+                IIndexCheckpointManager checkpointManager = 
indexCheckpointManagerProvider.get(ResourceReference.of(k));
+                if (checkpointManager.getCheckpointCount() > 0) {
+                    IndexCheckpoint checkpoint = checkpointManager.getLatest();
+                    if (checkpoint.getLastComponentId() == v.getMaxId()) {
+                        checkpointManager.deleteLatest(v.getMaxId(), 1);
+                    }
+                }
+            } catch (HyracksDataException e) {
+                throw new RuntimeException(e);
+            }
+        });
+        AtomicJobRollbackCompleteMessage message =
+                new AtomicJobRollbackCompleteMessage(jobId, 
appCtx.getServiceContext().getNodeId());
+        NCMessageBroker mb = (NCMessageBroker) 
appCtx.getServiceContext().getMessageBroker();
+        try {
+            mb.sendRealTimeMessageToPrimaryCC(message);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+}
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/EnableMergeMessage.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/EnableMergeMessage.java
new file mode 100644
index 0000000000..900da82183
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/EnableMergeMessage.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.message;
+
+import org.apache.asterix.common.api.IDatasetLifecycleManager;
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.context.IndexInfo;
+import org.apache.asterix.common.messaging.api.INcAddressedMessage;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class EnableMergeMessage implements INcAddressedMessage {
+
+    private static final Logger LOGGER = LogManager.getLogger();
+    private static final long serialVersionUID = 1L;
+    private final JobId jobId;
+    private final int datasetId;
+
+    public EnableMergeMessage(JobId jobId, int datasetId) {
+        this.jobId = jobId;
+        this.datasetId = datasetId;
+    }
+
+    @Override
+    public void handle(INcApplicationContext appCtx) throws 
HyracksDataException, InterruptedException {
+        IDatasetLifecycleManager datasetLifecycleManager = 
appCtx.getDatasetLifecycleManager();
+        for (IndexInfo indexInfo : 
datasetLifecycleManager.getDatasetInfo(datasetId).getIndexes().values()) {
+            if (indexInfo.getIndex().isPrimaryIndex()) {
+                
indexInfo.getIndex().getMergePolicy().diskComponentAdded(indexInfo.getIndex(), 
false);;
+            }
+        }
+    }
+}
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java
index 372cf69c7c..9383f063ff 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java
@@ -227,6 +227,22 @@ public class IndexCheckpointManager implements 
IIndexCheckpointManager {
         return IndexCheckpoint.fromJson(new 
String(ioManager.readAllBytes(checkpointPath)));
     }
 
+    @Override
+    public void deleteLatest(long latestId, int historyToDelete) {
+        try {
+            final Collection<FileReference> checkpointFiles = 
ioManager.list(indexPath, CHECKPOINT_FILE_FILTER);
+            if (!checkpointFiles.isEmpty()) {
+                for (FileReference checkpointFile : checkpointFiles) {
+                    if (getCheckpointIdFromFileName(checkpointFile) > 
(latestId - historyToDelete)) {
+                        ioManager.delete(checkpointFile);
+                    }
+                }
+            }
+        } catch (Exception e) {
+            LOGGER.warn(() -> "Couldn't delete history checkpoints at " + 
indexPath, e);
+        }
+    }
+
     private void deleteHistory(long latestId, int historyToKeep) {
         try {
             final Collection<FileReference> checkpointFiles = 
ioManager.list(indexPath, CHECKPOINT_FILE_FILTER);
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 28a6d2da68..dc165a07fa 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -55,6 +55,7 @@ import org.apache.asterix.api.http.server.ApiServlet;
 import org.apache.asterix.app.active.ActiveEntityEventsListener;
 import org.apache.asterix.app.active.ActiveNotificationHandler;
 import org.apache.asterix.app.active.FeedEventsListener;
+import org.apache.asterix.app.cc.GlobalTxManager;
 import org.apache.asterix.app.external.ExternalLibraryJobUtils;
 import org.apache.asterix.app.result.ExecutionError;
 import org.apache.asterix.app.result.ResultHandle;
@@ -69,6 +70,7 @@ import org.apache.asterix.common.api.IMetadataLockManager;
 import org.apache.asterix.common.api.IRequestTracker;
 import org.apache.asterix.common.api.IResponsePrinter;
 import org.apache.asterix.common.cluster.IClusterStateManager;
+import org.apache.asterix.common.cluster.IGlobalTxManager;
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
 import org.apache.asterix.common.config.DatasetConfig.IndexType;
 import org.apache.asterix.common.config.DatasetConfig.TransactionState;
@@ -209,6 +211,7 @@ import 
org.apache.asterix.runtime.fulltext.IFullTextFilterDescriptor;
 import org.apache.asterix.runtime.fulltext.StopwordsFullTextFilterDescriptor;
 import org.apache.asterix.runtime.operators.DatasetStreamStats;
 import 
org.apache.asterix.transaction.management.service.transaction.DatasetIdFactory;
+import 
org.apache.asterix.transaction.management.service.transaction.GlobalTxInfo;
 import org.apache.asterix.translator.AbstractLangTranslator;
 import org.apache.asterix.translator.ClientRequest;
 import 
org.apache.asterix.translator.CompiledStatements.CompiledCopyFromFileStatement;
@@ -291,6 +294,7 @@ public class QueryTranslator extends AbstractLangTranslator 
implements IStatemen
     protected final IResponsePrinter responsePrinter;
     protected final WarningCollector warningCollector;
     protected final ReentrantReadWriteLock compilationLock;
+    protected final IGlobalTxManager globalTxManager;
 
     public QueryTranslator(ICcApplicationContext appCtx, List<Statement> 
statements, SessionOutput output,
             ILangCompilationProvider compilationProvider, ExecutorService 
executorService,
@@ -313,6 +317,7 @@ public class QueryTranslator extends AbstractLangTranslator 
implements IStatemen
         if 
(appCtx.getServiceContext().getAppConfig().getBoolean(CCConfig.Option.ENFORCE_FRAME_WRITER_PROTOCOL))
 {
             this.jobFlags.add(JobFlag.ENFORCE_CONTRACT);
         }
+        this.globalTxManager = appCtx.getGlobalTxManager();
     }
 
     public SessionOutput getSessionOutput() {
@@ -3481,6 +3486,8 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
         boolean bActiveTxn = true;
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
         lockUtil.insertDeleteUpsertBegin(lockManager, 
metadataProvider.getLocks(), dataverseName, datasetName);
+        JobId jobId = null;
+        boolean atomic = false;
         try {
             metadataProvider.setWriteTransaction(true);
             Dataset dataset = metadataProvider.findDataset(dataverseName, 
copyStmt.getDatasetName());
@@ -3503,9 +3510,32 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
             bActiveTxn = false;
             if (spec != null && !isCompileOnly()) {
-                runJob(hcc, spec);
+                atomic = dataset.isAtomic();
+                if (atomic) {
+                    int numParticipatingNodes = 
appCtx.getNodeJobTracker().getJobParticipatingNodes(spec).size();
+                    int numParticipatingPartitions = 
appCtx.getNodeJobTracker().getNumParticipatingPartitions(spec);
+                    List<Integer> participatingDatasetIds = new ArrayList<>();
+                    participatingDatasetIds.add(dataset.getDatasetId());
+                    spec.setProperty(GlobalTxManager.GlOBAL_TX_PROPERTY_NAME, 
new GlobalTxInfo(participatingDatasetIds,
+                            numParticipatingNodes, 
numParticipatingPartitions));
+                }
+                jobId = JobUtils.runJob(hcc, spec, jobFlags, false);
+
+                String nameBefore = Thread.currentThread().getName();
+                try {
+                    Thread.currentThread().setName(nameBefore + " : 
WaitForCompletionForJobId: " + jobId);
+                    hcc.waitForCompletion(jobId);
+                } finally {
+                    Thread.currentThread().setName(nameBefore);
+                }
+                if (atomic) {
+                    globalTxManager.commitTransaction(jobId);
+                }
             }
         } catch (Exception e) {
+            if (atomic && jobId != null) {
+                globalTxManager.abortTransaction(jobId);
+            }
             if (bActiveTxn) {
                 abort(e, e, mdTxnCtx);
             }
@@ -3555,18 +3585,45 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
                 throw e;
             }
         };
-
         if (stmtInsertUpsert.getReturnExpression() != null) {
             deliverResult(hcc, resultSet, compiler, metadataProvider, locker, 
resultDelivery, outMetadata, stats,
-                    requestParameters, false);
+                    requestParameters, false, stmt);
         } else {
             locker.lock();
+            JobId jobId = null;
+            boolean atomic = false;
             try {
                 final JobSpecification jobSpec = compiler.compile();
                 if (jobSpec == null) {
                     return jobSpec;
                 }
-                runJob(hcc, jobSpec);
+                Dataset ds = metadataProvider.findDataset(((InsertStatement) 
stmt).getDataverseName(),
+                        ((InsertStatement) stmt).getDatasetName());
+                atomic = ds.isAtomic();
+                if (atomic) {
+                    int numParticipatingNodes = 
appCtx.getNodeJobTracker().getJobParticipatingNodes(jobSpec).size();
+                    int numParticipatingPartitions = 
appCtx.getNodeJobTracker().getNumParticipatingPartitions(jobSpec);
+                    List<Integer> participatingDatasetIds = new ArrayList<>();
+                    participatingDatasetIds.add(ds.getDatasetId());
+                    
jobSpec.setProperty(GlobalTxManager.GlOBAL_TX_PROPERTY_NAME, new GlobalTxInfo(
+                            participatingDatasetIds, numParticipatingNodes, 
numParticipatingPartitions));
+                }
+                jobId = JobUtils.runJob(hcc, jobSpec, jobFlags, false);
+                String nameBefore = Thread.currentThread().getName();
+                try {
+                    Thread.currentThread().setName(nameBefore + " : 
WaitForCompletionForJobId: " + jobId);
+                    hcc.waitForCompletion(jobId);
+                } finally {
+                    Thread.currentThread().setName(nameBefore);
+                }
+                if (atomic) {
+                    globalTxManager.commitTransaction(jobId);
+                }
+            } catch (Exception e) {
+                if (atomic && jobId != null) {
+                    globalTxManager.abortTransaction(jobId);
+                }
+                throw e;
             } finally {
                 locker.unlock();
             }
@@ -3586,6 +3643,8 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
         boolean bActiveTxn = true;
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
         lockUtil.insertDeleteUpsertBegin(lockManager, 
metadataProvider.getLocks(), dataverseName, datasetName);
+        boolean atomic = false;
+        JobId jobId = null;
         try {
             metadataProvider.setWriteTransaction(true);
             CompiledDeleteStatement clfrqs = new 
CompiledDeleteStatement(stmtDelete.getVariableExpr(), dataverseName,
@@ -3597,12 +3656,34 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
 
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
             bActiveTxn = false;
-
             if (jobSpec != null && !isCompileOnly()) {
-                runJob(hcc, jobSpec);
+                Dataset ds = metadataProvider.findDataset(dataverseName, 
datasetName);
+                atomic = ds.isAtomic();
+                if (atomic) {
+                    int numParticipatingNodes = 
appCtx.getNodeJobTracker().getJobParticipatingNodes(jobSpec).size();
+                    int numParticipatingPartitions = 
appCtx.getNodeJobTracker().getNumParticipatingPartitions(jobSpec);
+                    List<Integer> participatingDatasetIds = new ArrayList<>();
+                    participatingDatasetIds.add(ds.getDatasetId());
+                    
jobSpec.setProperty(GlobalTxManager.GlOBAL_TX_PROPERTY_NAME, new GlobalTxInfo(
+                            participatingDatasetIds, numParticipatingNodes, 
numParticipatingPartitions));
+                }
+                jobId = JobUtils.runJob(hcc, jobSpec, jobFlags, false);
+                String nameBefore = Thread.currentThread().getName();
+                try {
+                    Thread.currentThread().setName(nameBefore + " : 
WaitForCompletionForJobId: " + jobId);
+                    hcc.waitForCompletion(jobId);
+                } finally {
+                    Thread.currentThread().setName(nameBefore);
+                }
+                if (atomic) {
+                    globalTxManager.commitTransaction(jobId);
+                }
             }
             return jobSpec;
         } catch (Exception e) {
+            if (atomic && jobId != null) {
+                globalTxManager.abortTransaction(jobId);
+            }
             if (bActiveTxn) {
                 abort(e, e, mdTxnCtx);
             }
@@ -4557,19 +4638,19 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
             }
         };
         deliverResult(hcc, resultSet, compiler, metadataProvider, locker, 
resultDelivery, outMetadata, stats,
-                requestParameters, true);
+                requestParameters, true, null);
     }
 
     private void deliverResult(IHyracksClientConnection hcc, IResultSet 
resultSet, IStatementCompiler compiler,
             MetadataProvider metadataProvider, IMetadataLocker locker, 
ResultDelivery resultDelivery,
-            ResultMetadata outMetadata, Stats stats, IRequestParameters 
requestParameters, boolean cancellable)
-            throws Exception {
+            ResultMetadata outMetadata, Stats stats, IRequestParameters 
requestParameters, boolean cancellable,
+            Statement atomicStmt) throws Exception {
         final ResultSetId resultSetId = metadataProvider.getResultSetId();
         switch (resultDelivery) {
             case ASYNC:
                 MutableBoolean printed = new MutableBoolean(false);
                 executorService.submit(() -> asyncCreateAndRunJob(hcc, 
compiler, locker, resultDelivery,
-                        requestParameters, cancellable, resultSetId, printed, 
metadataProvider));
+                        requestParameters, cancellable, resultSetId, printed, 
metadataProvider, atomicStmt));
                 synchronized (printed) {
                     while (!printed.booleanValue()) {
                         printed.wait();
@@ -4583,7 +4664,7 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
                     responsePrinter.addResultPrinter(new 
ResultsPrinter(appCtx, resultReader,
                             metadataProvider.findOutputRecordType(), stats, 
sessionOutput));
                     responsePrinter.printResults();
-                }, requestParameters, cancellable, appCtx, metadataProvider);
+                }, requestParameters, cancellable, appCtx, metadataProvider, 
atomicStmt);
                 break;
             case DEFERRED:
                 createAndRunJob(hcc, jobFlags, null, compiler, locker, 
resultDelivery, id -> {
@@ -4595,7 +4676,7 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
                         
outMetadata.getResultSets().add(org.apache.commons.lang3.tuple.Triple.of(id, 
resultSetId,
                                 metadataProvider.findOutputRecordType()));
                     }
-                }, requestParameters, cancellable, appCtx, metadataProvider);
+                }, requestParameters, cancellable, appCtx, metadataProvider, 
atomicStmt);
                 break;
             default:
                 break;
@@ -4618,7 +4699,7 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
 
     private void asyncCreateAndRunJob(IHyracksClientConnection hcc, 
IStatementCompiler compiler, IMetadataLocker locker,
             ResultDelivery resultDelivery, IRequestParameters 
requestParameters, boolean cancellable,
-            ResultSetId resultSetId, MutableBoolean printed, MetadataProvider 
metadataProvider) {
+            ResultSetId resultSetId, MutableBoolean printed, MetadataProvider 
metadataProvider, Statement atomicStmt) {
         Mutable<JobId> jobId = new MutableObject<>(JobId.INVALID);
         try {
             createAndRunJob(hcc, jobFlags, jobId, compiler, locker, 
resultDelivery, id -> {
@@ -4630,7 +4711,7 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
                     printed.setTrue();
                     printed.notify();
                 }
-            }, requestParameters, cancellable, appCtx, metadataProvider);
+            }, requestParameters, cancellable, appCtx, metadataProvider, 
atomicStmt);
         } catch (Exception e) {
             if (Objects.equals(JobId.INVALID, jobId.getValue())) {
                 // compilation failed
@@ -4670,10 +4751,10 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
         return p.second;
     }
 
-    private static void createAndRunJob(IHyracksClientConnection hcc, 
EnumSet<JobFlag> jobFlags, Mutable<JobId> jId,
+    private void createAndRunJob(IHyracksClientConnection hcc, 
EnumSet<JobFlag> jobFlags, Mutable<JobId> jId,
             IStatementCompiler compiler, IMetadataLocker locker, 
ResultDelivery resultDelivery, IResultPrinter printer,
             IRequestParameters requestParameters, boolean cancellable, 
ICcApplicationContext appCtx,
-            MetadataProvider metadataProvider) throws Exception {
+            MetadataProvider metadataProvider, Statement atomicStatement) 
throws Exception {
         final IRequestTracker requestTracker = appCtx.getRequestTracker();
         final ClientRequest clientRequest =
                 (ClientRequest) 
requestTracker.get(requestParameters.getRequestReference().getUuid());
@@ -4681,6 +4762,8 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
             clientRequest.markCancellable();
         }
         locker.lock();
+        JobId jobId = null;
+        boolean atomic = false;
         try {
             final JobSpecification jobSpec = compiler.compile();
             if (jobSpec == null) {
@@ -4691,7 +4774,20 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
             appCtx.getReceptionist().ensureSchedulable(schedulableRequest);
             // ensure request not cancelled before running job
             ensureNotCancelled(clientRequest);
-            final JobId jobId = JobUtils.runJob(hcc, jobSpec, jobFlags, false);
+            if (atomicStatement != null) {
+                Dataset ds = metadataProvider.findDataset(((InsertStatement) 
atomicStatement).getDataverseName(),
+                        ((InsertStatement) atomicStatement).getDatasetName());
+                atomic = ds.isAtomic();
+                if (atomic) {
+                    int numParticipatingNodes = 
appCtx.getNodeJobTracker().getJobParticipatingNodes(jobSpec).size();
+                    int numParticipatingPartitions = 
appCtx.getNodeJobTracker().getNumParticipatingPartitions(jobSpec);
+                    List<Integer> participatingDatasetIds = new ArrayList<>();
+                    participatingDatasetIds.add(ds.getDatasetId());
+                    
jobSpec.setProperty(GlobalTxManager.GlOBAL_TX_PROPERTY_NAME, new GlobalTxInfo(
+                            participatingDatasetIds, numParticipatingNodes, 
numParticipatingPartitions));
+                }
+            }
+            jobId = JobUtils.runJob(hcc, jobSpec, jobFlags, false);
             clientRequest.setJobId(jobId);
             if (jId != null) {
                 jId.setValue(jobId);
@@ -4704,7 +4800,13 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
                 ensureNotCancelled(clientRequest);
                 printer.print(jobId);
             }
+            if (atomic) {
+                globalTxManager.commitTransaction(jobId);
+            }
         } catch (Exception e) {
+            if (atomic && jobId != null) {
+                globalTxManager.abortTransaction(jobId);
+            }
             if (org.apache.hyracks.api.util.ExceptionUtils.getRootCause(e) 
instanceof InterruptedException) {
                 Thread.currentThread().interrupt();
                 throw new RuntimeDataException(ErrorCode.REQUEST_CANCELLED, 
clientRequest.getId());
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
index 2a66cfdece..0e740f73e0 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
@@ -23,6 +23,7 @@ import static 
org.apache.asterix.algebra.base.ILangExtension.Language.SQLPP;
 import static 
org.apache.asterix.api.http.server.ServletConstants.ASTERIX_APP_CONTEXT_INFO_ATTR;
 import static 
org.apache.asterix.api.http.server.ServletConstants.HYRACKS_CONNECTION_ATTR;
 import static 
org.apache.asterix.common.api.IClusterManagementWork.ClusterState.SHUTTING_DOWN;
+import static 
org.apache.hyracks.control.common.controllers.ControllerConfig.Option.CLOUD_DEPLOYMENT;
 
 import java.io.File;
 import java.io.IOException;
@@ -54,16 +55,20 @@ import org.apache.asterix.api.http.server.VersionApiServlet;
 import org.apache.asterix.app.active.ActiveNotificationHandler;
 import org.apache.asterix.app.cc.CCExtensionManager;
 import org.apache.asterix.app.cc.CcApplicationContext;
+import org.apache.asterix.app.cc.GlobalTxManager;
 import org.apache.asterix.app.config.ConfigValidator;
 import org.apache.asterix.app.io.PersistedResourceRegistry;
 import org.apache.asterix.app.replication.NcLifecycleCoordinator;
 import org.apache.asterix.app.result.JobResultCallback;
+import org.apache.asterix.cloud.CloudManagerProvider;
 import org.apache.asterix.common.api.AsterixThreadFactory;
 import org.apache.asterix.common.api.IConfigValidatorFactory;
 import org.apache.asterix.common.api.INodeJobTracker;
 import org.apache.asterix.common.api.IReceptionistFactory;
 import org.apache.asterix.common.cluster.IGlobalRecoveryManager;
+import org.apache.asterix.common.cluster.IGlobalTxManager;
 import org.apache.asterix.common.config.AsterixExtension;
+import org.apache.asterix.common.config.CloudProperties;
 import org.apache.asterix.common.config.ExtensionProperties;
 import org.apache.asterix.common.config.ExternalProperties;
 import org.apache.asterix.common.config.GlobalConfig;
@@ -77,6 +82,7 @@ import org.apache.asterix.common.library.ILibraryManager;
 import org.apache.asterix.common.metadata.IMetadataLockUtil;
 import org.apache.asterix.common.replication.INcLifecycleCoordinator;
 import org.apache.asterix.common.utils.Servlets;
+import org.apache.asterix.common.utils.StorageConstants;
 import org.apache.asterix.external.adapter.factory.AdapterFactoryService;
 import org.apache.asterix.file.StorageComponentProvider;
 import org.apache.asterix.messaging.CCMessageBroker;
@@ -99,12 +105,15 @@ import 
org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.config.IConfigManager;
 import org.apache.hyracks.api.control.IGatekeeper;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.IODeviceHandle;
 import org.apache.hyracks.api.job.resource.IJobCapacityController;
 import org.apache.hyracks.api.lifecycle.LifeCycleComponentManager;
 import org.apache.hyracks.api.result.IJobResultCallback;
 import org.apache.hyracks.control.cc.BaseCCApplication;
 import org.apache.hyracks.control.cc.ClusterControllerService;
 import org.apache.hyracks.control.common.controllers.CCConfig;
+import org.apache.hyracks.control.nc.io.DefaultDeviceResolver;
+import org.apache.hyracks.control.nc.io.IOManager;
 import org.apache.hyracks.http.api.IServlet;
 import org.apache.hyracks.http.server.HttpServer;
 import org.apache.hyracks.http.server.HttpServerConfig;
@@ -158,8 +167,19 @@ public class CCApplication extends BaseCCApplication {
         componentProvider = new StorageComponentProvider();
         ccExtensionManager = new CCExtensionManager(new 
ArrayList<>(getExtensions()));
         IGlobalRecoveryManager globalRecoveryManager = 
createGlobalRecoveryManager();
+
+        List<IODeviceHandle> devices = new ArrayList<>();
+        devices.add(new IODeviceHandle(new 
File(StorageConstants.CC_STORAGE_ROOT_DIR), "."));
+        IOManager ioManager = new IOManager(devices, new 
DefaultDeviceResolver(), 1, 10);
+        CloudProperties cloudProperties = null;
+        if (ccServiceCtx.getAppConfig().getBoolean(CLOUD_DEPLOYMENT)) {
+            cloudProperties = new 
CloudProperties(PropertiesAccessor.getInstance(ccServiceCtx.getAppConfig()));
+            ioManager = (IOManager) 
CloudManagerProvider.createIOManager(cloudProperties, ioManager);;
+        }
+        IGlobalTxManager globalTxManager = createGlobalTxManager(ioManager);
         appCtx = createApplicationContext(null, globalRecoveryManager, 
lifecycleCoordinator, Receptionist::new,
-                ConfigValidator::new, ccExtensionManager, new 
AdapterFactoryService());
+                ConfigValidator::new, ccExtensionManager, new 
AdapterFactoryService(), globalTxManager, ioManager,
+                cloudProperties);
         final CCConfig ccConfig = controllerService.getCCConfig();
         if (System.getProperty("java.rmi.server.hostname") == null) {
             System.setProperty("java.rmi.server.hostname", 
ccConfig.getClusterPublicAddress());
@@ -180,6 +200,7 @@ public class CCApplication extends BaseCCApplication {
         final INodeJobTracker nodeJobTracker = appCtx.getNodeJobTracker();
         ccServiceCtx.addJobLifecycleListener(nodeJobTracker);
         ccServiceCtx.addClusterLifecycleListener(nodeJobTracker);
+        ccServiceCtx.addJobLifecycleListener(globalTxManager);
 
         jobCapacityController = new 
JobCapacityController(controllerService.getResourceManager());
     }
@@ -207,18 +228,23 @@ public class CCApplication extends BaseCCApplication {
     protected ICcApplicationContext createApplicationContext(ILibraryManager 
libraryManager,
             IGlobalRecoveryManager globalRecoveryManager, 
INcLifecycleCoordinator lifecycleCoordinator,
             IReceptionistFactory receptionistFactory, IConfigValidatorFactory 
configValidatorFactory,
-            CCExtensionManager ccExtensionManager, IAdapterFactoryService 
adapterFactoryService)
+            CCExtensionManager ccExtensionManager, IAdapterFactoryService 
adapterFactoryService,
+            IGlobalTxManager globalTxManager, IOManager ioManager, 
CloudProperties cloudProperties)
             throws AlgebricksException, IOException {
         return new CcApplicationContext(ccServiceCtx, hcc, () -> 
MetadataManager.INSTANCE, globalRecoveryManager,
                 lifecycleCoordinator, new ActiveNotificationHandler(), 
componentProvider, new MetadataLockManager(),
                 createMetadataLockUtil(), receptionistFactory, 
configValidatorFactory, ccExtensionManager,
-                adapterFactoryService);
+                adapterFactoryService, globalTxManager, ioManager, 
cloudProperties);
     }
 
     protected IGlobalRecoveryManager createGlobalRecoveryManager() throws 
Exception {
         return ccExtensionManager.getGlobalRecoveryManager(ccServiceCtx, 
getHcc(), componentProvider);
     }
 
+    protected IGlobalTxManager createGlobalTxManager(IOManager ioManager) 
throws Exception {
+        return new GlobalTxManager(ccServiceCtx, ioManager);
+    }
+
     protected INcLifecycleCoordinator createNcLifeCycleCoordinator(boolean 
replicationEnabled) {
         return new NcLifecycleCoordinator(ccServiceCtx, replicationEnabled);
     }
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
index 2452b8f421..3f76a3144d 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
@@ -117,6 +117,7 @@ public class GlobalRecoveryManager implements 
IGlobalRecoveryManager {
             performGlobalStorageCleanup(mdTxnCtx, storageGlobalCleanupTimeout);
         }
         mdTxnCtx = doRecovery(appCtx, mdTxnCtx);
+        rollbackIncompleteAtomicTransactions(appCtx);
         MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
         recoveryCompleted = true;
         recovering = false;
@@ -127,6 +128,10 @@ public class GlobalRecoveryManager implements 
IGlobalRecoveryManager {
         appCtx.getClusterStateManager().refreshState();
     }
 
+    protected void rollbackIncompleteAtomicTransactions(ICcApplicationContext 
appCtx) throws Exception {
+        appCtx.getGlobalTxManager().rollback();
+    }
+
     protected void performGlobalStorageCleanup(MetadataTransactionContext 
mdTxnCtx, int storageGlobalCleanupTimeoutSecs)
             throws Exception {
         List<Dataverse> dataverses = 
MetadataManager.INSTANCE.getDataverses(mdTxnCtx);
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentId.java
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-1/atomic-statements-1.01.ddl.sqlpp
similarity index 56%
copy from 
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentId.java
copy to 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-1/atomic-statements-1.01.ddl.sqlpp
index c3835ebf2b..e41bd77b54 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentId.java
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-1/atomic-statements-1.01.ddl.sqlpp
@@ -16,36 +16,13 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.hyracks.storage.am.lsm.common.api;
 
-/**
- * Stores the id of the disk component, which is a interval (minId, maxId).
- * It is generated by {@link ILSMComponentIdGenerator}
- *
- */
-public interface ILSMComponentId {
-    public enum IdCompareResult {
-        UNKNOWN,
-        LESS_THAN,
-        GREATER_THAN,
-        INTERSECT,
-        INCLUDE
-    }
-
-    /**
-     * @return whether the id is missing
-     */
-    boolean missing();
+drop dataverse test if exists;
+create dataverse test;
+use test;
 
-    IdCompareResult compareTo(ILSMComponentId id);
+create dataset reviews primary key (id: int);
 
-    /**
-     * @return the min Id
-     */
-    long getMinId();
+CREATE PRIMARY INDEX review_idx_primary ON reviews;
 
-    /**
-     * @return the max Id
-     */
-    long getMaxId();
-}
+CREATE INDEX review_idx_review ON reviews(review: string?) TYPE BTREE;
\ No newline at end of file
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentId.java
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-1/atomic-statements-1.02.update.sqlpp
similarity index 55%
copy from 
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentId.java
copy to 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-1/atomic-statements-1.02.update.sqlpp
index c3835ebf2b..135826970f 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentId.java
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-1/atomic-statements-1.02.update.sqlpp
@@ -16,36 +16,18 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.hyracks.storage.am.lsm.common.api;
 
-/**
- * Stores the id of the disk component, which is a interval (minId, maxId).
- * It is generated by {@link ILSMComponentIdGenerator}
- *
- */
-public interface ILSMComponentId {
-    public enum IdCompareResult {
-        UNKNOWN,
-        LESS_THAN,
-        GREATER_THAN,
-        INTERSECT,
-        INCLUDE
-    }
-
-    /**
-     * @return whether the id is missing
-     */
-    boolean missing();
-
-    IdCompareResult compareTo(ILSMComponentId id);
-
-    /**
-     * @return the min Id
-     */
-    long getMinId();
+ use test;
 
-    /**
-     * @return the max Id
-     */
-    long getMaxId();
-}
+insert into reviews ([
+    {"id": 1, "year": null, "quarter": null, "review": "good"},
+    {"id": 2, "year": null, "quarter": null, "review": "good"},
+    {"id": 3, "year": 2018, "quarter": null, "review": "good"},
+    {"id": 4, "year": 2018, "quarter": null, "review": "bad"},
+    {"id": 5, "year": 2018, "quarter": 1, "review": "good"},
+    {"id": 5, "year": 2018, "quarter": 1, "review": "bad"},
+    {"id": 7, "year": 2018, "quarter": 2, "review": "good"},
+    {"id": 8, "year": 2018, "quarter": 2, "review": "bad"},
+    {"id": 9, "year": 2019, "quarter": null, "review": "good"},
+    {"id": 10, "year": 2019, "quarter": null, "review": "bad"}
+]);
\ No newline at end of file
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentId.java
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-1/atomic-statements-1.03.query.sqlpp
similarity index 56%
copy from 
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentId.java
copy to 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-1/atomic-statements-1.03.query.sqlpp
index c3835ebf2b..8709326bc9 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentId.java
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-1/atomic-statements-1.03.query.sqlpp
@@ -16,36 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.hyracks.storage.am.lsm.common.api;
+use test;
 
-/**
- * Stores the id of the disk component, which is a interval (minId, maxId).
- * It is generated by {@link ILSMComponentIdGenerator}
- *
- */
-public interface ILSMComponentId {
-    public enum IdCompareResult {
-        UNKNOWN,
-        LESS_THAN,
-        GREATER_THAN,
-        INTERSECT,
-        INCLUDE
-    }
-
-    /**
-     * @return whether the id is missing
-     */
-    boolean missing();
-
-    IdCompareResult compareTo(ILSMComponentId id);
-
-    /**
-     * @return the min Id
-     */
-    long getMinId();
-
-    /**
-     * @return the max Id
-     */
-    long getMaxId();
-}
+select value count(*)
+from reviews;
\ No newline at end of file
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentId.java
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-1/atomic-statements-1.04.update.sqlpp
similarity index 55%
copy from 
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentId.java
copy to 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-1/atomic-statements-1.04.update.sqlpp
index c3835ebf2b..4ab1bccec7 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentId.java
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-1/atomic-statements-1.04.update.sqlpp
@@ -16,36 +16,18 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.hyracks.storage.am.lsm.common.api;
 
-/**
- * Stores the id of the disk component, which is a interval (minId, maxId).
- * It is generated by {@link ILSMComponentIdGenerator}
- *
- */
-public interface ILSMComponentId {
-    public enum IdCompareResult {
-        UNKNOWN,
-        LESS_THAN,
-        GREATER_THAN,
-        INTERSECT,
-        INCLUDE
-    }
-
-    /**
-     * @return whether the id is missing
-     */
-    boolean missing();
-
-    IdCompareResult compareTo(ILSMComponentId id);
-
-    /**
-     * @return the min Id
-     */
-    long getMinId();
+use test;
 
-    /**
-     * @return the max Id
-     */
-    long getMaxId();
-}
+insert into reviews ([
+    {"id": 1, "year": null, "quarter": null, "review": "good"},
+    {"id": 2, "year": null, "quarter": null, "review": "good"},
+    {"id": 3, "year": 2018, "quarter": null, "review": "good"},
+    {"id": 4, "year": 2018, "quarter": null, "review": "bad"},
+    {"id": 5, "year": 2018, "quarter": 1, "review": "good"},
+    {"id": 6, "year": 2018, "quarter": 1, "review": "bad"},
+    {"id": 7, "year": 2018, "quarter": 2, "review": "good"},
+    {"id": 8, "year": 2018, "quarter": 2, "review": "bad"},
+    {"id": 9, "year": 2019, "quarter": null, "review": "good"},
+    {"id": 10, "year": 2019, "quarter": null, "review": "bad"}
+]);
\ No newline at end of file
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentId.java
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-1/atomic-statements-1.05.query.sqlpp
similarity index 56%
copy from 
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentId.java
copy to 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-1/atomic-statements-1.05.query.sqlpp
index c3835ebf2b..8709326bc9 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentId.java
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-1/atomic-statements-1.05.query.sqlpp
@@ -16,36 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.hyracks.storage.am.lsm.common.api;
+use test;
 
-/**
- * Stores the id of the disk component, which is a interval (minId, maxId).
- * It is generated by {@link ILSMComponentIdGenerator}
- *
- */
-public interface ILSMComponentId {
-    public enum IdCompareResult {
-        UNKNOWN,
-        LESS_THAN,
-        GREATER_THAN,
-        INTERSECT,
-        INCLUDE
-    }
-
-    /**
-     * @return whether the id is missing
-     */
-    boolean missing();
-
-    IdCompareResult compareTo(ILSMComponentId id);
-
-    /**
-     * @return the min Id
-     */
-    long getMinId();
-
-    /**
-     * @return the max Id
-     */
-    long getMaxId();
-}
+select value count(*)
+from reviews;
\ No newline at end of file
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentId.java
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-1/atomic-statements-1.06.ddl.sqlpp
similarity index 56%
copy from 
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentId.java
copy to 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-1/atomic-statements-1.06.ddl.sqlpp
index c3835ebf2b..3cc900bbb5 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentId.java
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-1/atomic-statements-1.06.ddl.sqlpp
@@ -16,36 +16,9 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.hyracks.storage.am.lsm.common.api;
 
-/**
- * Stores the id of the disk component, which is a interval (minId, maxId).
- * It is generated by {@link ILSMComponentIdGenerator}
- *
- */
-public interface ILSMComponentId {
-    public enum IdCompareResult {
-        UNKNOWN,
-        LESS_THAN,
-        GREATER_THAN,
-        INTERSECT,
-        INCLUDE
-    }
-
-    /**
-     * @return whether the id is missing
-     */
-    boolean missing();
-
-    IdCompareResult compareTo(ILSMComponentId id);
+use test;
 
-    /**
-     * @return the min Id
-     */
-    long getMinId();
+drop dataset reviews;
 
-    /**
-     * @return the max Id
-     */
-    long getMaxId();
-}
+drop dataverse test;
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentId.java
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-2/atomic-statements-2.01.ddl.sqlpp
similarity index 56%
copy from 
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentId.java
copy to 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-2/atomic-statements-2.01.ddl.sqlpp
index c3835ebf2b..e41bd77b54 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentId.java
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-2/atomic-statements-2.01.ddl.sqlpp
@@ -16,36 +16,13 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.hyracks.storage.am.lsm.common.api;
 
-/**
- * Stores the id of the disk component, which is a interval (minId, maxId).
- * It is generated by {@link ILSMComponentIdGenerator}
- *
- */
-public interface ILSMComponentId {
-    public enum IdCompareResult {
-        UNKNOWN,
-        LESS_THAN,
-        GREATER_THAN,
-        INTERSECT,
-        INCLUDE
-    }
-
-    /**
-     * @return whether the id is missing
-     */
-    boolean missing();
+drop dataverse test if exists;
+create dataverse test;
+use test;
 
-    IdCompareResult compareTo(ILSMComponentId id);
+create dataset reviews primary key (id: int);
 
-    /**
-     * @return the min Id
-     */
-    long getMinId();
+CREATE PRIMARY INDEX review_idx_primary ON reviews;
 
-    /**
-     * @return the max Id
-     */
-    long getMaxId();
-}
+CREATE INDEX review_idx_review ON reviews(review: string?) TYPE BTREE;
\ No newline at end of file
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentId.java
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-2/atomic-statements-2.02.query.sqlpp
similarity index 56%
copy from 
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentId.java
copy to 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-2/atomic-statements-2.02.query.sqlpp
index c3835ebf2b..ba1e16a1c9 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentId.java
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-2/atomic-statements-2.02.query.sqlpp
@@ -16,36 +16,11 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.hyracks.storage.am.lsm.common.api;
 
-/**
- * Stores the id of the disk component, which is a interval (minId, maxId).
- * It is generated by {@link ILSMComponentIdGenerator}
- *
- */
-public interface ILSMComponentId {
-    public enum IdCompareResult {
-        UNKNOWN,
-        LESS_THAN,
-        GREATER_THAN,
-        INTERSECT,
-        INCLUDE
-    }
-
-    /**
-     * @return whether the id is missing
-     */
-    boolean missing();
-
-    IdCompareResult compareTo(ILSMComponentId id);
-
-    /**
-     * @return the min Id
-     */
-    long getMinId();
+use test;
 
-    /**
-     * @return the max Id
-     */
-    long getMaxId();
-}
+insert into reviews ([
+    {"id": 1, "year": null, "quarter": null, "review": "good"},
+    {"id": 2, "year": null, "quarter": null, "review": "good"},
+    {"id": 3, "year": 2018, "quarter": null, "review": "good"}
+]) returning review;
\ No newline at end of file
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentId.java
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-2/atomic-statements-2.03.update.sqlpp
similarity index 56%
copy from 
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentId.java
copy to 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-2/atomic-statements-2.03.update.sqlpp
index c3835ebf2b..ea608d6997 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentId.java
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-2/atomic-statements-2.03.update.sqlpp
@@ -16,36 +16,17 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.hyracks.storage.am.lsm.common.api;
 
-/**
- * Stores the id of the disk component, which is a interval (minId, maxId).
- * It is generated by {@link ILSMComponentIdGenerator}
- *
- */
-public interface ILSMComponentId {
-    public enum IdCompareResult {
-        UNKNOWN,
-        LESS_THAN,
-        GREATER_THAN,
-        INTERSECT,
-        INCLUDE
-    }
-
-    /**
-     * @return whether the id is missing
-     */
-    boolean missing();
-
-    IdCompareResult compareTo(ILSMComponentId id);
+use test;
 
-    /**
-     * @return the min Id
-     */
-    long getMinId();
+insert into reviews ([
+    {"id": 4, "year": 2018, "quarter": null, "review": "bad"},
+    {"id": 5, "year": 2018, "quarter": 1, "review": "good"},
+    {"id": 6, "year": 2018, "quarter": 1, "review": "bad"},
+    {"id": 7, "year": 2018, "quarter": 2, "review": "good"},
+    {"id": 8, "year": 2018, "quarter": 2, "review": "bad"},
+    {"id": 9, "year": 2019, "quarter": null, "review": "good"},
+    {"id": 10, "year": 2019, "quarter": null, "review": "bad"}
+]);
 
-    /**
-     * @return the max Id
-     */
-    long getMaxId();
-}
+delete from reviews where year=2019;
\ No newline at end of file
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentId.java
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-2/atomic-statements-2.04.query.sqlpp
similarity index 56%
copy from 
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentId.java
copy to 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-2/atomic-statements-2.04.query.sqlpp
index c3835ebf2b..8709326bc9 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentId.java
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-2/atomic-statements-2.04.query.sqlpp
@@ -16,36 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.hyracks.storage.am.lsm.common.api;
+use test;
 
-/**
- * Stores the id of the disk component, which is a interval (minId, maxId).
- * It is generated by {@link ILSMComponentIdGenerator}
- *
- */
-public interface ILSMComponentId {
-    public enum IdCompareResult {
-        UNKNOWN,
-        LESS_THAN,
-        GREATER_THAN,
-        INTERSECT,
-        INCLUDE
-    }
-
-    /**
-     * @return whether the id is missing
-     */
-    boolean missing();
-
-    IdCompareResult compareTo(ILSMComponentId id);
-
-    /**
-     * @return the min Id
-     */
-    long getMinId();
-
-    /**
-     * @return the max Id
-     */
-    long getMaxId();
-}
+select value count(*)
+from reviews;
\ No newline at end of file
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentId.java
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-2/atomic-statements-2.05.ddl.sqlpp
similarity index 56%
copy from 
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentId.java
copy to 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-2/atomic-statements-2.05.ddl.sqlpp
index c3835ebf2b..3cc900bbb5 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentId.java
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-2/atomic-statements-2.05.ddl.sqlpp
@@ -16,36 +16,9 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.hyracks.storage.am.lsm.common.api;
 
-/**
- * Stores the id of the disk component, which is a interval (minId, maxId).
- * It is generated by {@link ILSMComponentIdGenerator}
- *
- */
-public interface ILSMComponentId {
-    public enum IdCompareResult {
-        UNKNOWN,
-        LESS_THAN,
-        GREATER_THAN,
-        INTERSECT,
-        INCLUDE
-    }
-
-    /**
-     * @return whether the id is missing
-     */
-    boolean missing();
-
-    IdCompareResult compareTo(ILSMComponentId id);
+use test;
 
-    /**
-     * @return the min Id
-     */
-    long getMinId();
+drop dataset reviews;
 
-    /**
-     * @return the max Id
-     */
-    long getMaxId();
-}
+drop dataverse test;
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/atomic-statements/atomic-statements-1/atomic-statements-1.03.adm
 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/atomic-statements/atomic-statements-1/atomic-statements-1.03.adm
new file mode 100644
index 0000000000..c227083464
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/atomic-statements/atomic-statements-1/atomic-statements-1.03.adm
@@ -0,0 +1 @@
+0
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/atomic-statements/atomic-statements-1/atomic-statements-1.05.adm
 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/atomic-statements/atomic-statements-1/atomic-statements-1.05.adm
new file mode 100644
index 0000000000..9a037142aa
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/atomic-statements/atomic-statements-1/atomic-statements-1.05.adm
@@ -0,0 +1 @@
+10
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/atomic-statements/atomic-statements-2/atomic-statements-2.02.adm
 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/atomic-statements/atomic-statements-2/atomic-statements-2.02.adm
new file mode 100644
index 0000000000..3ca0114e27
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/atomic-statements/atomic-statements-2/atomic-statements-2.02.adm
@@ -0,0 +1,3 @@
+"good"
+"good"
+"good"
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/atomic-statements/atomic-statements-2/atomic-statements-2.04.adm
 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/atomic-statements/atomic-statements-2/atomic-statements-2.04.adm
new file mode 100644
index 0000000000..301160a930
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/atomic-statements/atomic-statements-2/atomic-statements-2.04.adm
@@ -0,0 +1 @@
+8
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml 
b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
index a257c2d5bd..0afee45728 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
@@ -16328,4 +16328,18 @@
       </compilation-unit>
     </test-case>
   </test-group>
+  <test-group name="atomic-statements">
+    <test-case FilePath="atomic-statements">
+      <compilation-unit name="atomic-statements-1">
+        <output-dir compare="Clean-JSON">atomic-statements-1</output-dir>
+        <expected-error>HYR0033: Inserting duplicate keys into the primary 
storage</expected-error>
+        <source-location>false</source-location>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="atomic-statements">
+      <compilation-unit name="atomic-statements-2">
+        <output-dir compare="Clean-JSON">atomic-statements-2</output-dir>
+      </compilation-unit>
+    </test-case>
+  </test-group>
 </test-suite>
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
index 1c2a047d86..ac66f331e1 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
@@ -27,6 +27,7 @@ import org.apache.asterix.common.context.DatasetInfo;
 import org.apache.asterix.common.context.IndexInfo;
 import org.apache.asterix.common.context.PrimaryIndexOperationTracker;
 import org.apache.asterix.common.replication.IReplicationStrategy;
+import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
 import org.apache.asterix.common.storage.StorageIOStats;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator;
@@ -185,4 +186,6 @@ public interface IDatasetLifecycleManager extends 
IResourceLifecycleManager<IInd
      * @param partitionId
      */
     void closePartition(int partitionId);
+
+    IIndexCheckpointManagerProvider getIndexCheckpointManagerProvider();
 }
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INodeJobTracker.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INodeJobTracker.java
index 09ab5505aa..27a3d491fb 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INodeJobTracker.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INodeJobTracker.java
@@ -45,4 +45,7 @@ public interface INodeJobTracker extends 
IJobLifecycleListener, IClusterLifecycl
      * @return The participating nodes in the job execution
      */
     Set<String> getJobParticipatingNodes(JobSpecification spec);
+
+    int getNumParticipatingPartitions(JobSpecification spec);
+
 }
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IGlobalTxManager.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IGlobalTxManager.java
new file mode 100644
index 0000000000..498d174d3f
--- /dev/null
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IGlobalTxManager.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.cluster;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.asterix.common.transactions.IGlobalTransactionContext;
+import org.apache.hyracks.api.job.IJobLifecycleListener;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
+
+public interface IGlobalTxManager extends IJobLifecycleListener {
+
+    enum TransactionStatus {
+        ACTIVE,
+        PREPARED,
+        COMMITTED,
+        ABORTED,
+        ROLLBACK
+    }
+
+    IGlobalTransactionContext beginTransaction(JobId jobId, int 
numParticipatingNodes, int numParticipatingPartitions,
+            List<Integer> participatingDatasetIds) throws ACIDException;
+
+    void commitTransaction(JobId jobId) throws ACIDException;
+
+    void abortTransaction(JobId jobId) throws Exception;
+
+    IGlobalTransactionContext getTransactionContext(JobId jobId) throws 
ACIDException;
+
+    void handleJobPreparedMessage(JobId jobId, String nodeId, int datasetId,
+            Map<String, ILSMComponentId> componentIdMap);
+
+    void handleJobCompletionMessage(JobId jobId, String nodeId);
+
+    void handleJobRollbackCompletionMessage(JobId jobId, String nodeId);
+
+    void rollback() throws Exception;
+
+}
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index f9c410bb44..7a109cc674 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -654,4 +654,9 @@ public class DatasetLifecycleManager implements 
IDatasetLifecycleManager, ILifeC
         return !(lsmIndex.isCurrentMutableComponentEmpty() || 
ioCallback.hasPendingFlush()
                 || opTracker.isFlushLogCreated() || opTracker.isFlushOnExit());
     }
+
+    @Override
+    public IIndexCheckpointManagerProvider getIndexCheckpointManagerProvider() 
{
+        return indexCheckpointManagerProvider;
+    }
 }
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
index d9456d2b03..a1980b61a1 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
@@ -266,12 +266,14 @@ public class PrimaryIndexOperationTracker extends 
BaseOperationTracker implement
         }
     }
 
-    public void commit() throws HyracksDataException {
+    public void finishAllFlush() throws HyracksDataException {
         LogRecord logRecord = new LogRecord();
         triggerScheduleFlush(logRecord);
         List<FlushOperation> flushes = new ArrayList<>(getScheduledFlushes());
         LSMIndexUtil.waitFor(flushes);
+    }
 
+    public synchronized void commit() throws HyracksDataException {
         Set<ILSMIndex> indexes = 
dsInfo.getDatasetPartitionOpenIndexes(partition);
         for (ILSMIndex lsmIndex : indexes) {
             lsmIndex.commit();
@@ -285,13 +287,14 @@ public class PrimaryIndexOperationTracker extends 
BaseOperationTracker implement
             indexCheckpointManagerProvider.get(ref).flushed(componentSequence, 
0L, id.getMaxId());
         }
         lastFlushOperation.clear();
-
-        for (ILSMIndex lsmIndex : indexes) {
-            lsmIndex.getMergePolicy().diskComponentAdded(lsmIndex, false);
-        }
     }
 
     public void abort() throws HyracksDataException {
+        clear();
+    }
+
+    public void clear() throws HyracksDataException {
+        deleteMemoryComponent(false);
         Set<ILSMIndex> indexes = 
dsInfo.getDatasetPartitionOpenIndexes(partition);
         for (ILSMIndex lsmIndex : indexes) {
             lsmIndex.abort();
@@ -362,7 +365,16 @@ public class PrimaryIndexOperationTracker extends 
BaseOperationTracker implement
         return "Dataset (" + datasetID + "), Partition (" + partition + ")";
     }
 
-    public void deleteMemoryComponent() throws HyracksDataException {
+    public void deleteMemoryComponent(ILSMIndex lsmIndex, ILSMComponentId 
nextComponentId) throws HyracksDataException {
+        Map<String, Object> flushMap = new HashMap<>();
+        flushMap.put(LSMIOOperationCallback.KEY_FLUSH_LOG_LSN, 0L);
+        flushMap.put(LSMIOOperationCallback.KEY_NEXT_COMPONENT_ID, 
nextComponentId);
+        ILSMIndexAccessor accessor = 
lsmIndex.createAccessor(NoOpIndexAccessParameters.INSTANCE);
+        accessor.getOpContext().setParameters(flushMap);
+        accessor.deleteComponents(c -> c.getType() == 
ILSMComponent.LSMComponentType.MEMORY);
+    }
+
+    public void deleteMemoryComponent(boolean onlyPrimaryIndex) throws 
HyracksDataException {
         Set<ILSMIndex> indexes = 
dsInfo.getDatasetPartitionOpenIndexes(partition);
         ILSMIndex primaryLsmIndex = null;
         for (ILSMIndex lsmIndex : indexes) {
@@ -379,15 +391,21 @@ public class PrimaryIndexOperationTracker extends 
BaseOperationTracker implement
         Objects.requireNonNull(primaryLsmIndex, "no primary index found in " + 
indexes);
         idGenerator.refresh();
         ILSMComponentId nextComponentId = idGenerator.getId();
-        Map<String, Object> flushMap = new HashMap<>();
-        flushMap.put(LSMIOOperationCallback.KEY_FLUSH_LOG_LSN, 0L);
-        flushMap.put(LSMIOOperationCallback.KEY_NEXT_COMPONENT_ID, 
nextComponentId);
-        ILSMIndexAccessor accessor = 
primaryLsmIndex.createAccessor(NoOpIndexAccessParameters.INSTANCE);
-        accessor.getOpContext().setParameters(flushMap);
-        accessor.deleteComponents(c -> c.getType() == 
ILSMComponent.LSMComponentType.MEMORY);
+        if (onlyPrimaryIndex) {
+            deleteMemoryComponent(primaryLsmIndex, nextComponentId);
+        } else {
+            for (ILSMIndex lsmIndex : indexes) {
+                deleteMemoryComponent(lsmIndex, nextComponentId);
+            }
+        }
     }
 
     private boolean canSafelyFlush() {
         return numActiveOperations.get() == 0;
     }
+
+    public Map<String, FlushOperation> getLastFlushOperation() {
+        return lastFlushOperation;
+    }
+
 }
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
index 281b069c74..922c352950 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
@@ -27,6 +27,7 @@ import org.apache.asterix.common.api.INodeJobTracker;
 import org.apache.asterix.common.api.IRequestTracker;
 import org.apache.asterix.common.cluster.IClusterStateManager;
 import org.apache.asterix.common.cluster.IGlobalRecoveryManager;
+import org.apache.asterix.common.cluster.IGlobalTxManager;
 import org.apache.asterix.common.config.ExtensionProperties;
 import org.apache.asterix.common.context.IStorageComponentProvider;
 import org.apache.asterix.common.external.IAdapterFactoryService;
@@ -38,6 +39,7 @@ import 
org.apache.asterix.common.transactions.IResourceIdManager;
 import org.apache.asterix.common.transactions.ITxnIdFactory;
 import org.apache.hyracks.api.application.ICCServiceContext;
 import org.apache.hyracks.api.job.IJobLifecycleListener;
+import org.apache.hyracks.control.nc.io.IOManager;
 import org.apache.hyracks.storage.common.IStorageManager;
 
 /**
@@ -165,4 +167,8 @@ public interface ICcApplicationContext extends 
IApplicationContext {
      * @return the data partitioing provider
      */
     IDataPartitioningProvider getDataPartitioningProvider();
+
+    IGlobalTxManager getGlobalTxManager();
+
+    IOManager getIoManager();
 }
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMInsertDeleteOperatorNodePushable.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMInsertDeleteOperatorNodePushable.java
index 421fb7b104..64748ce923 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMInsertDeleteOperatorNodePushable.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMInsertDeleteOperatorNodePushable.java
@@ -19,9 +19,12 @@
 package org.apache.asterix.common.dataflow;
 
 import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
 
 import org.apache.asterix.common.api.INcApplicationContext;
 import org.apache.asterix.common.context.PrimaryIndexOperationTracker;
+import org.apache.asterix.common.messaging.AtomicJobPreparedMessage;
 import org.apache.asterix.common.transactions.ILogMarkerCallback;
 import org.apache.asterix.common.transactions.PrimaryIndexLogMarkerCallback;
 import org.apache.hyracks.api.comm.VSizeFrame;
@@ -32,6 +35,8 @@ import org.apache.hyracks.api.exceptions.ErrorCode;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.SourceLocation;
 import org.apache.hyracks.api.util.ExceptionUtils;
+import org.apache.hyracks.api.util.JavaSerializationUtils;
+import org.apache.hyracks.control.nc.NodeControllerService;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
@@ -45,10 +50,12 @@ import 
org.apache.hyracks.storage.am.common.impls.IndexAccessParameters;
 import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
 import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
 import org.apache.hyracks.storage.am.common.util.ResourceReleaseUtils;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
 import 
org.apache.hyracks.storage.am.lsm.common.dataflow.LSMIndexInsertUpdateDeleteOperatorNodePushable;
 import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
+import org.apache.hyracks.storage.am.lsm.common.impls.FlushOperation;
 import org.apache.hyracks.storage.common.IIndex;
 import org.apache.hyracks.storage.common.IIndexAccessParameters;
 import org.apache.hyracks.storage.common.LocalResource;
@@ -101,6 +108,9 @@ public class LSMInsertDeleteOperatorNodePushable extends 
LSMIndexInsertUpdateDel
                 IIndexDataflowHelper indexHelper = indexHelpers[i];
                 indexHelper.open();
                 indexes[i] = indexHelper.getIndexInstance();
+                if (((ILSMIndex) indexes[i]).isAtomic() && isPrimary()) {
+                    ((PrimaryIndexOperationTracker) ((ILSMIndex) 
indexes[i]).getOperationTracker()).clear();
+                }
                 LocalResource resource = indexHelper.getResource();
                 modCallbacks[i] = 
modOpCallbackFactory.createModificationOperationCallback(resource, ctx, this);
                 IIndexAccessParameters iap = new 
IndexAccessParameters(modCallbacks[i], NoOpOperationCallback.INSTANCE);
@@ -238,11 +248,31 @@ public class LSMInsertDeleteOperatorNodePushable extends 
LSMIndexInsertUpdateDel
 
     private void commitAtomicInsertDelete() throws HyracksDataException {
         if (isPrimary) {
+            final Map<String, ILSMComponentId> componentIdMap = new 
HashMap<>();
+            int datasetID = -1;
+            boolean atomic = false;
             for (IIndex index : indexes) {
                 if (((ILSMIndex) index).isAtomic()) {
                     PrimaryIndexOperationTracker opTracker =
                             ((PrimaryIndexOperationTracker) ((ILSMIndex) 
index).getOperationTracker());
-                    opTracker.commit();
+                    opTracker.finishAllFlush();
+                    for (Map.Entry<String, FlushOperation> entry : 
opTracker.getLastFlushOperation().entrySet()) {
+                        componentIdMap.put(entry.getKey(), 
entry.getValue().getFlushingComponent().getId());
+                    }
+                    datasetID = opTracker.getDatasetInfo().getDatasetID();
+                    atomic = true;
+                }
+            }
+
+            if (atomic) {
+                AtomicJobPreparedMessage message = new 
AtomicJobPreparedMessage(ctx.getJobletContext().getJobId(),
+                        
ctx.getJobletContext().getServiceContext().getNodeId(), datasetID, 
componentIdMap);
+                try {
+                    ((NodeControllerService) 
ctx.getJobletContext().getServiceContext().getControllerService())
+                            
.sendRealTimeApplicationMessageToCC(ctx.getJobletContext().getJobId().getCcId(),
+                                    JavaSerializationUtils.serialize(message), 
null);
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
                 }
             }
         }
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/AtomicJobPreparedMessage.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/AtomicJobPreparedMessage.java
new file mode 100644
index 0000000000..8adbf49da6
--- /dev/null
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/AtomicJobPreparedMessage.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.messaging;
+
+import java.util.Map;
+
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.messaging.api.ICcAddressedMessage;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+/**
+ * Message sent from an NC to CC for every partition handled by it after all
+ * the components generated by an atomic statement/job are flushed to disk.
+ */
+public class AtomicJobPreparedMessage implements ICcAddressedMessage {
+
+    private static final Logger LOGGER = LogManager.getLogger();
+    private static final long serialVersionUID = 1L;
+    private final JobId jobId;
+    private final String nodeId;
+    private final int datasetId;
+    private final Map<String, ILSMComponentId> componentIdMap;
+
+    public AtomicJobPreparedMessage(JobId jobId, String nodeId, int datasetId,
+            Map<String, ILSMComponentId> componentIdMap) {
+        this.nodeId = nodeId;
+        this.datasetId = datasetId;
+        this.componentIdMap = componentIdMap;
+        this.jobId = jobId;
+    }
+
+    @Override
+    public void handle(ICcApplicationContext appCtx) throws 
HyracksDataException, InterruptedException {
+        appCtx.getGlobalTxManager().handleJobPreparedMessage(jobId, nodeId, 
datasetId, componentIdMap);
+    }
+
+}
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IIndexCheckpointManager.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IIndexCheckpointManager.java
index beb8e07e0d..c43f4f0274 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IIndexCheckpointManager.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IIndexCheckpointManager.java
@@ -102,6 +102,8 @@ public interface IIndexCheckpointManager {
      */
     void delete();
 
+    void deleteLatest(long latestId, int historyToDelete);
+
     /**
      * Gets the index last valid component sequence.
      *
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IGlobalTransactionContext.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IGlobalTransactionContext.java
new file mode 100644
index 0000000000..5a175ac211
--- /dev/null
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IGlobalTransactionContext.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.transactions;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.asterix.common.cluster.IGlobalTxManager;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.control.nc.io.IOManager;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
+
+public interface IGlobalTransactionContext {
+
+    JobId getJobId();
+
+    int incrementAndGetAcksReceived();
+
+    int getAcksReceived();
+
+    int getNumNodes();
+
+    int getNumPartitions();
+
+    void resetAcksReceived();
+
+    void setTxnStatus(IGlobalTxManager.TransactionStatus status);
+
+    IGlobalTxManager.TransactionStatus getTxnStatus();
+
+    List<Integer> getDatasetIds();
+
+    Map<String, Map<String, ILSMComponentId>> getNodeResourceMap();
+
+    void persist(IOManager ioManager);
+
+    void delete(IOManager ioManager);
+
+}
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
index 1321c96306..7b6ed0d72f 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
@@ -18,6 +18,7 @@
  */
 package org.apache.asterix.common.utils;
 
+import java.nio.file.Paths;
 import java.util.LinkedHashMap;
 import java.util.Map;
 
@@ -30,6 +31,8 @@ import 
org.apache.hyracks.storage.am.lsm.common.impls.ConcurrentMergePolicyFacto
  */
 public class StorageConstants {
 
+    public static final String CC_STORAGE_ROOT_DIR = "/tmp/";
+    public static final String CC_TX_LOG_DIR = Paths.get("cc", 
"txnlogs").toString();
     public static final String STORAGE_ROOT_DIR_NAME = "storage";
     public static final String INGESTION_LOGS_DIR_NAME = "ingestion_logs";
     public static final String PARTITION_DIR_PREFIX = "partition_";
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/NodeJobTracker.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/NodeJobTracker.java
index a2ebd9a7b8..52fba60d8a 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/NodeJobTracker.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/NodeJobTracker.java
@@ -33,6 +33,7 @@ import org.apache.asterix.common.api.INodeJobTracker;
 import org.apache.hyracks.api.config.IOption;
 import org.apache.hyracks.api.constraints.Constraint;
 import org.apache.hyracks.api.constraints.expressions.ConstantExpression;
+import org.apache.hyracks.api.constraints.expressions.PartitionCountExpression;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.api.job.JobStatus;
@@ -81,4 +82,11 @@ public class NodeJobTracker implements INodeJobTracker {
                 
.map(ConstantExpression::getValue).map(Object::toString).filter(nodeJobs::containsKey)
                 .collect(Collectors.toSet());
     }
+
+    @Override
+    public int getNumParticipatingPartitions(JobSpecification spec) {
+        return spec.getUserConstraints().stream().filter(ce -> ce.getLValue() 
instanceof PartitionCountExpression)
+                
.map(Constraint::getRValue).map(ConstantExpression.class::cast).map(ConstantExpression::getValue)
+                
.map(Object::toString).map(Integer::parseInt).max(Integer::compare).get();
+    }
 }
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java
index 6e767ed2bf..ec016bcf9d 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java
@@ -19,11 +19,14 @@
 package org.apache.asterix.runtime.operators;
 
 import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
 
 import org.apache.asterix.common.api.INcApplicationContext;
 import org.apache.asterix.common.context.PrimaryIndexOperationTracker;
 import org.apache.asterix.common.dataflow.LSMIndexUtil;
 import org.apache.asterix.common.dataflow.NoOpFrameOperationCallbackFactory;
+import org.apache.asterix.common.messaging.AtomicJobPreparedMessage;
 import org.apache.asterix.common.transactions.ILogMarkerCallback;
 import org.apache.asterix.common.transactions.PrimaryIndexLogMarkerCallback;
 import 
org.apache.asterix.transaction.management.opcallbacks.LockThenSearchOperationCallback;
@@ -35,6 +38,8 @@ import org.apache.hyracks.api.exceptions.ErrorCode;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.SourceLocation;
 import org.apache.hyracks.api.util.CleanupUtils;
+import org.apache.hyracks.api.util.JavaSerializationUtils;
+import org.apache.hyracks.control.nc.NodeControllerService;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
@@ -54,10 +59,12 @@ import 
org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
 import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
 import org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallback;
 import org.apache.hyracks.storage.am.lsm.common.api.IFrameTupleProcessor;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
 import 
org.apache.hyracks.storage.am.lsm.common.dataflow.LSMIndexInsertUpdateDeleteOperatorNodePushable;
 import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
+import org.apache.hyracks.storage.am.lsm.common.impls.FlushOperation;
 import org.apache.hyracks.storage.am.lsm.common.impls.LSMTreeIndexAccessor;
 import org.apache.hyracks.storage.common.IIndex;
 import org.apache.hyracks.storage.common.IIndexAccessParameters;
@@ -216,6 +223,9 @@ public class LSMPrimaryInsertOperatorNodePushable extends 
LSMIndexInsertUpdateDe
                 indexHelper.open();
                 indexes[i] = indexHelper.getIndexInstance();
                 IIndex index = indexes[i];
+                if (((ILSMIndex) indexes[i]).isAtomic()) {
+                    ((PrimaryIndexOperationTracker) ((ILSMIndex) 
indexes[i]).getOperationTracker()).clear();
+                }
                 IIndexDataflowHelper keyIndexHelper = keyIndexHelpers[i];
                 IIndex indexForUniquessCheck;
                 if (keyIndexHelper != null) {
@@ -341,11 +351,31 @@ public class LSMPrimaryInsertOperatorNodePushable extends 
LSMIndexInsertUpdateDe
     }
 
     private void commitAtomicInsert() throws HyracksDataException {
+        final Map<String, ILSMComponentId> componentIdMap = new HashMap<>();
+        int datasetID = -1;
+        boolean atomic = false;
         for (IIndex index : indexes) {
             if (((ILSMIndex) index).isAtomic()) {
                 PrimaryIndexOperationTracker opTracker =
                         ((PrimaryIndexOperationTracker) ((ILSMIndex) 
index).getOperationTracker());
-                opTracker.commit();
+                opTracker.finishAllFlush();
+                for (Map.Entry<String, FlushOperation> entry : 
opTracker.getLastFlushOperation().entrySet()) {
+                    componentIdMap.put(entry.getKey(), 
entry.getValue().getFlushingComponent().getId());
+                }
+                datasetID = opTracker.getDatasetInfo().getDatasetID();
+                atomic = true;
+            }
+        }
+
+        if (atomic) {
+            AtomicJobPreparedMessage message = new 
AtomicJobPreparedMessage(ctx.getJobletContext().getJobId(),
+                    ctx.getJobletContext().getServiceContext().getNodeId(), 
datasetID, componentIdMap);
+            try {
+                ((NodeControllerService) 
ctx.getJobletContext().getServiceContext().getControllerService())
+                        
.sendRealTimeApplicationMessageToCC(ctx.getJobletContext().getJobId().getCcId(),
+                                JavaSerializationUtils.serialize(message), 
null);
+            } catch (Exception e) {
+                throw new RuntimeException(e);
             }
         }
     }
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
index 03130ae24a..de95d60593 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
@@ -24,11 +24,14 @@ import java.nio.ByteBuffer;
 import java.text.DateFormat;
 import java.text.SimpleDateFormat;
 import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
 
 import org.apache.asterix.common.api.INcApplicationContext;
 import org.apache.asterix.common.context.PrimaryIndexOperationTracker;
 import org.apache.asterix.common.dataflow.LSMIndexUtil;
 import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.asterix.common.messaging.AtomicJobPreparedMessage;
 import org.apache.asterix.common.transactions.ILogMarkerCallback;
 import org.apache.asterix.common.transactions.PrimaryIndexLogMarkerCallback;
 import org.apache.asterix.om.base.AInt8;
@@ -48,6 +51,8 @@ import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.util.CleanupUtils;
 import org.apache.hyracks.api.util.HyracksConstants;
+import org.apache.hyracks.api.util.JavaSerializationUtils;
+import org.apache.hyracks.control.nc.NodeControllerService;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleReference;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
@@ -69,10 +74,12 @@ import 
org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
 import org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallback;
 import 
org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallbackFactory;
 import org.apache.hyracks.storage.am.lsm.common.api.IFrameTupleProcessor;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
 import 
org.apache.hyracks.storage.am.lsm.common.dataflow.LSMIndexInsertUpdateDeleteOperatorNodePushable;
 import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
+import org.apache.hyracks.storage.am.lsm.common.impls.FlushOperation;
 import org.apache.hyracks.storage.am.lsm.common.impls.LSMTreeIndexAccessor;
 import org.apache.hyracks.storage.common.IIndex;
 import org.apache.hyracks.storage.common.IIndexAccessParameters;
@@ -297,6 +304,9 @@ public class LSMPrimaryUpsertOperatorNodePushable extends 
LSMIndexInsertUpdateDe
                 IIndexDataflowHelper indexHelper = indexHelpers[i];
                 indexHelper.open();
                 indexes[i] = indexHelper.getIndexInstance();
+                if (((ILSMIndex) indexes[i]).isAtomic()) {
+                    ((PrimaryIndexOperationTracker) ((ILSMIndex) 
indexes[i]).getOperationTracker()).clear();
+                }
                 if (ctx.getSharedObject() != null && i == 0) {
                     PrimaryIndexLogMarkerCallback callback =
                             new 
PrimaryIndexLogMarkerCallback((AbstractLSMIndex) indexes[0]);
@@ -557,12 +567,33 @@ public class LSMPrimaryUpsertOperatorNodePushable extends 
LSMIndexInsertUpdateDe
         // No op since nextFrame flushes by default
     }
 
+    // TODO: Refactor and remove duplicated code
     private void commitAtomicUpsert() throws HyracksDataException {
+        final Map<String, ILSMComponentId> componentIdMap = new HashMap<>();
+        int datasetID = -1;
+        boolean atomic = false;
         for (IIndex index : indexes) {
             if (((ILSMIndex) index).isAtomic()) {
                 PrimaryIndexOperationTracker opTracker =
                         ((PrimaryIndexOperationTracker) ((ILSMIndex) 
index).getOperationTracker());
-                opTracker.commit();
+                opTracker.finishAllFlush();
+                for (Map.Entry<String, FlushOperation> entry : 
opTracker.getLastFlushOperation().entrySet()) {
+                    componentIdMap.put(entry.getKey(), 
entry.getValue().getFlushingComponent().getId());
+                }
+                datasetID = opTracker.getDatasetInfo().getDatasetID();
+                atomic = true;
+            }
+        }
+
+        if (atomic) {
+            AtomicJobPreparedMessage message = new 
AtomicJobPreparedMessage(ctx.getJobletContext().getJobId(),
+                    ctx.getJobletContext().getServiceContext().getNodeId(), 
datasetID, componentIdMap);
+            try {
+                ((NodeControllerService) 
ctx.getJobletContext().getServiceContext().getControllerService())
+                        
.sendRealTimeApplicationMessageToCC(ctx.getJobletContext().getJobId().getCcId(),
+                                JavaSerializationUtils.serialize(message), 
null);
+            } catch (Exception e) {
+                throw new ACIDException(e);
             }
         }
     }
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/NoOpCommitRuntime.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/NoOpCommitRuntime.java
index 2312f15920..39c604333b 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/NoOpCommitRuntime.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/NoOpCommitRuntime.java
@@ -77,4 +77,9 @@ public class NoOpCommitRuntime extends CommitRuntime {
             message.getBuffer().flip();
         }
     }
+
+    @Override
+    public void fail() {
+        failed = true;
+    }
 }
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicNoWALTransactionContext.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicNoWALTransactionContext.java
index cfa39c61c0..3576ba1388 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicNoWALTransactionContext.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicNoWALTransactionContext.java
@@ -59,7 +59,7 @@ public class AtomicNoWALTransactionContext extends 
AtomicTransactionContext {
         for (ILSMOperationTracker opTrackerRef : modifiedIndexes) {
             PrimaryIndexOperationTracker primaryIndexOpTracker = 
(PrimaryIndexOperationTracker) opTrackerRef;
             try {
-                primaryIndexOpTracker.deleteMemoryComponent();
+                primaryIndexOpTracker.deleteMemoryComponent(true);
             } catch (HyracksDataException e) {
                 throw new ACIDException(e);
             }
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicTransactionLog.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicTransactionLog.java
new file mode 100644
index 0000000000..dc068bdebf
--- /dev/null
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicTransactionLog.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.transaction.management.service.transaction;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class AtomicTransactionLog {
+
+    private JobId jobId;
+    private List<Integer> datasetIds;
+    private Set<String> nodeIds;
+    private Map<String, Map<String, ILSMComponentId>> nodeResourceMap;
+    private int numPartitions;
+
+    @JsonCreator
+    public AtomicTransactionLog(@JsonProperty("jobId") JobId jobId,
+            @JsonProperty("datasetIds") List<Integer> datasetIds, 
@JsonProperty("nodeIds") Set<String> nodeIds,
+            @JsonProperty("nodeResourceMap") Map<String, Map<String, 
ILSMComponentId>> nodeResourceMap,
+            @JsonProperty("numPartitions") int numPartitions) {
+        this.jobId = jobId;
+        this.datasetIds = datasetIds;
+        this.nodeIds = nodeIds;
+        this.nodeResourceMap = nodeResourceMap;
+        this.numPartitions = numPartitions;
+    }
+
+    public JobId getJobId() {
+        return jobId;
+    }
+
+    public List<Integer> getDatasetIds() {
+        return datasetIds;
+    }
+
+    public Set<String> getNodeIds() {
+        return nodeIds;
+    }
+
+    public Map<String, Map<String, ILSMComponentId>> getNodeResourceMap() {
+        return nodeResourceMap;
+    }
+
+    public int getNumPartitions() {
+        return numPartitions;
+    }
+}
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/GlobalTransactionContext.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/GlobalTransactionContext.java
new file mode 100644
index 0000000000..9d1dd1d60b
--- /dev/null
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/GlobalTransactionContext.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.transaction.management.service.transaction;
+
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.asterix.common.cluster.IGlobalTxManager.TransactionStatus;
+import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.asterix.common.transactions.IGlobalTransactionContext;
+import org.apache.asterix.common.utils.StorageConstants;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.control.nc.io.IOManager;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
+import org.apache.hyracks.util.annotations.ThreadSafe;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+@ThreadSafe
+public class GlobalTransactionContext implements IGlobalTransactionContext {
+
+    protected final JobId jobId;
+    private AtomicInteger acksReceived;
+    private final int numNodes;
+    private TransactionStatus status;
+    private final List<Integer> datasetIds;
+    private final int numPartitions;
+    private final Map<String, Map<String, ILSMComponentId>> nodeResourceMap;
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+    public GlobalTransactionContext(JobId jobId, List<Integer> datasetIds, int 
numNodes, int numPartitions) {
+        this.jobId = jobId;
+        this.datasetIds = datasetIds;
+        this.numNodes = numNodes;
+        this.numPartitions = numPartitions;
+        this.acksReceived = new AtomicInteger(0);
+        this.nodeResourceMap = new HashMap<>();
+        this.status = TransactionStatus.ACTIVE;
+    }
+
+    public GlobalTransactionContext(FileReference txnLogFileRef, IOManager 
ioManager) {
+        try {
+            AtomicTransactionLog txnLog = OBJECT_MAPPER.readValue(new 
String(ioManager.readAllBytes(txnLogFileRef)),
+                    AtomicTransactionLog.class);
+            this.jobId = txnLog.getJobId();
+            this.datasetIds = txnLog.getDatasetIds();
+            this.nodeResourceMap = txnLog.getNodeResourceMap();
+            this.numNodes = nodeResourceMap.keySet().size();
+            this.numPartitions = txnLog.getNumPartitions();
+            this.acksReceived = new AtomicInteger(0);
+        } catch (JsonProcessingException | HyracksDataException e) {
+            throw new ACIDException(e);
+        }
+    }
+
+    @Override
+    public void setTxnStatus(TransactionStatus status) {
+        this.status = status;
+    }
+
+    @Override
+    public TransactionStatus getTxnStatus() {
+        return status;
+    }
+
+    @Override
+    public JobId getJobId() {
+        return jobId;
+    }
+
+    @Override
+    public int incrementAndGetAcksReceived() {
+        return acksReceived.incrementAndGet();
+    }
+
+    @Override
+    public int getAcksReceived() {
+        return acksReceived.get();
+    }
+
+    @Override
+    public void resetAcksReceived() {
+        acksReceived = new AtomicInteger(0);
+    }
+
+    public int getNumNodes() {
+        return numNodes;
+    }
+
+    public int getNumPartitions() {
+        return numPartitions;
+    }
+
+    public List<Integer> getDatasetIds() {
+        return datasetIds;
+    }
+
+    public Map<String, Map<String, ILSMComponentId>> getNodeResourceMap() {
+        return nodeResourceMap;
+    }
+
+    @Override
+    public void persist(IOManager ioManager) {
+        try {
+            FileReference fref = ioManager
+                    .resolve(Paths.get(StorageConstants.CC_TX_LOG_DIR, 
String.format("%s.log", jobId)).toString());
+            AtomicTransactionLog txnLog = new AtomicTransactionLog(jobId, 
datasetIds, nodeResourceMap.keySet(),
+                    nodeResourceMap, numPartitions);
+            ioManager.overwrite(fref,
+                    
OBJECT_MAPPER.writerWithDefaultPrettyPrinter().writeValueAsString(txnLog).getBytes());
+        } catch (HyracksDataException | JsonProcessingException e) {
+            throw new ACIDException(e);
+        }
+    }
+
+    @Override
+    public void delete(IOManager ioManager) {
+        try {
+            FileReference fref = ioManager
+                    .resolve(Paths.get(StorageConstants.CC_TX_LOG_DIR, 
String.format("%s.log", jobId)).toString());
+            ioManager.delete(fref);
+        } catch (HyracksDataException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public String prettyPrint() {
+        StringBuilder sb = new StringBuilder();
+        sb.append("\n" + jobId + "\n");
+        sb.append("TransactionState: " + status + "\n");
+        return sb.toString();
+    }
+}
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentId.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/GlobalTxInfo.java
similarity index 53%
copy from 
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentId.java
copy to 
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/GlobalTxInfo.java
index c3835ebf2b..b99da87eff 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentId.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/GlobalTxInfo.java
@@ -16,36 +16,32 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.hyracks.storage.am.lsm.common.api;
+package org.apache.asterix.transaction.management.service.transaction;
 
-/**
- * Stores the id of the disk component, which is a interval (minId, maxId).
- * It is generated by {@link ILSMComponentIdGenerator}
- *
- */
-public interface ILSMComponentId {
-    public enum IdCompareResult {
-        UNKNOWN,
-        LESS_THAN,
-        GREATER_THAN,
-        INTERSECT,
-        INCLUDE
-    }
+import java.io.Serializable;
+import java.util.List;
 
-    /**
-     * @return whether the id is missing
-     */
-    boolean missing();
+public class GlobalTxInfo implements Serializable {
 
-    IdCompareResult compareTo(ILSMComponentId id);
+    private final int numNodes;
+    private final List<Integer> datasetIds;
+    private final int numPartitions;
 
-    /**
-     * @return the min Id
-     */
-    long getMinId();
+    public GlobalTxInfo(List<Integer> datasetIds, int numNodes, int 
numPartitions) {
+        this.datasetIds = datasetIds;
+        this.numNodes = numNodes;
+        this.numPartitions = numPartitions;
+    }
 
-    /**
-     * @return the max Id
-     */
-    long getMaxId();
+    public int getNumNodes() {
+        return numNodes;
+    }
+
+    public List<Integer> getDatasetIds() {
+        return datasetIds;
+    }
+
+    public int getNumPartitions() {
+        return numPartitions;
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-api/pom.xml 
b/hyracks-fullstack/hyracks/hyracks-api/pom.xml
index 366b5855d6..136807cd13 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-api/pom.xml
@@ -98,5 +98,9 @@
       <groupId>com.google.guava</groupId>
       <artifactId>guava</artifactId>
     </dependency>
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-annotations</artifactId>
+    </dependency>
   </dependencies>
 </project>
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobId.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobId.java
index de6b5fff12..10db9f1298 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobId.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobId.java
@@ -31,6 +31,11 @@ import org.apache.hyracks.api.exceptions.ErrorCode;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.IWritable;
 
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+
+@JsonIgnoreProperties(ignoreUnknown = true)
 public final class JobId implements IWritable, Serializable, Comparable {
 
     private static final Pattern jobIdPattern = 
Pattern.compile("^JID:(\\d+)\\.(\\d+)$");
@@ -50,6 +55,7 @@ public final class JobId implements IWritable, Serializable, 
Comparable {
     private JobId() {
     }
 
+    @JsonCreator
     public JobId(long id) {
         this.id = id;
     }
@@ -58,6 +64,7 @@ public final class JobId implements IWritable, Serializable, 
Comparable {
         return id;
     }
 
+    @JsonIgnore
     public CcId getCcId() {
         if (ccId == null) {
             ccId = CcId.valueOf((int) (id >>> 
CcIdPartitionedLongFactory.ID_BITS));
@@ -65,6 +72,7 @@ public final class JobId implements IWritable, Serializable, 
Comparable {
         return ccId;
     }
 
+    @JsonIgnore
     public long getIdOnly() {
         return id & CcIdPartitionedLongFactory.MAX_ID;
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/pom.xml 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/pom.xml
index ba932e1f84..009a5f8538 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/pom.xml
@@ -98,5 +98,9 @@
       <groupId>it.unimi.dsi</groupId>
       <artifactId>fastutil-core</artifactId>
     </dependency>
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-annotations</artifactId>
+    </dependency>
   </dependencies>
 </project>
\ No newline at end of file
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentId.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentId.java
index c3835ebf2b..356c8c925d 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentId.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentId.java
@@ -18,11 +18,18 @@
  */
 package org.apache.hyracks.storage.am.lsm.common.api;
 
+import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId;
+
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+
 /**
  * Stores the id of the disk component, which is a interval (minId, maxId).
  * It is generated by {@link ILSMComponentIdGenerator}
  *
  */
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = 
JsonTypeInfo.As.WRAPPER_OBJECT, property = "type")
+@JsonSubTypes({ @JsonSubTypes.Type(value = LSMComponentId.class, name = 
"lsmComponentId"), })
 public interface ILSMComponentId {
     public enum IdCompareResult {
         UNKNOWN,
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
index b10dd5c08f..33fd38d2d3 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
@@ -300,7 +300,6 @@ public abstract class AbstractLSMIndex implements ILSMIndex 
{
 
     @Override
     public void abort() throws HyracksDataException {
-        resetMemoryComponents();
         for (ILSMDiskComponent c : temporaryDiskComponents) {
             c.deactivateAndDestroy();
         }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentId.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentId.java
index cf6c4a24f7..8260dde5e1 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentId.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentId.java
@@ -19,9 +19,14 @@
 
 package org.apache.hyracks.storage.am.lsm.common.impls;
 
+import java.io.Serializable;
+
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
 
-public class LSMComponentId implements ILSMComponentId {
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public class LSMComponentId implements ILSMComponentId, Serializable {
 
     public static final long NOT_FOUND = -1;
     public static final long MIN_VALID_COMPONENT_ID = 0;
@@ -37,7 +42,8 @@ public class LSMComponentId implements ILSMComponentId {
 
     private long maxId;
 
-    public LSMComponentId(long minId, long maxId) {
+    @JsonCreator
+    public LSMComponentId(@JsonProperty("minId") long minId, 
@JsonProperty("maxId") long maxId) {
         assert minId <= maxId;
         this.minId = minId;
         this.maxId = maxId;


Reply via email to