This is an automated email from the ASF dual-hosted git repository.

peeyush pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git


The following commit(s) were added to refs/heads/master by this push:
     new a13a66c375 [NO ISSUE][TX] Partition level atomicity implementation
a13a66c375 is described below

commit a13a66c37534fd1f0332fd1099c0041a7002580d
Author: Peeyush Gupta <[email protected]>
AuthorDate: Fri May 26 12:40:29 2023 -0700

    [NO ISSUE][TX] Partition level atomicity implementation
    
    - user model changes: yes
    - storage format changes: no
    - interface changes: yes
    
    Details:
    With this change, inserts/upserts/deletes on datasets
    created without any type will have partition level
    atomicity. To acheieve this we have made the following
    changes:
    
    - Only one insert/upsert/delete at a time on the dataset.
    - Queries do not read from memory components and directly
      go to valid disk components.
    - No locks taken by either writes or reads
    - No logs written.
    - The disk components generated by the operation are marked
      as valid and checkpoint is updated on operation completion
      and not on flush.
    
    Change-Id: I8ed0fac37e026c909c986e6d844c3fae3833911e
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17559
    Integration-Tests: Jenkins <[email protected]>
    Tested-by: Jenkins <[email protected]>
    Reviewed-by: Peeyush Gupta <[email protected]>
    Reviewed-by: Murtadha Hubail <[email protected]>
---
 .../asterix/app/io/PersistedResourceRegistry.java  |  3 +
 .../dataflow/TestPrimaryIndexOperationTracker.java |  5 +-
 .../TestPrimaryIndexOperationTrackerFactory.java   |  3 +-
 .../common/context/DatasetLifecycleManager.java    | 13 +++-
 .../context/PrimaryIndexOperationTracker.java      | 77 +++++++++++++++++--
 .../LSMInsertDeleteOperatorNodePushable.java       | 32 ++++++++
 .../AtomicLSMIOOperationCallback.java              | 54 ++++++++++++++
 .../AtomicLSMIndexIOOperationCallbackFactory.java  | 62 ++++++++++++++++
 .../ioopcallbacks/LSMIOOperationCallback.java      |  8 +-
 .../metadata/bootstrap/MetadataBootstrap.java      | 21 +++---
 .../ArrayBTreeResourceFactoryProvider.java         |  3 +-
 .../declared/BTreeResourceFactoryProvider.java     |  5 +-
 .../apache/asterix/metadata/entities/Dataset.java  | 26 ++++++-
 .../metadata/entities/InternalDatasetDetails.java  |  4 +
 .../DatasetTupleTranslator.java                    |  3 +-
 .../asterix/metadata/utils/MetadataLockUtil.java   |  2 +-
 .../LSMPrimaryInsertOperatorNodePushable.java      | 44 +++++++++--
 .../LSMPrimaryUpsertOperatorNodePushable.java      | 58 ++++++++++++---
 .../LSMSecondaryUpsertOperatorNodePushable.java    | 22 ++++--
 ...ryUpsertWithNestedPlanOperatorNodePushable.java | 14 +++-
 .../management/runtime/NoOpCommitRuntime.java      | 80 ++++++++++++++++++++
 .../runtime/NoOpCommitRuntimeFactory.java          | 46 ++++++++++++
 .../apache/hyracks/api/util/HyracksConstants.java  |  2 +
 .../am/lsm/btree/LSMBTreeOperatorTestHelper.java   |  2 +-
 ...ndexInsertUpdateDeleteOperatorNodePushable.java |  3 +
 .../dataflow/LSMColumnBTreeLocalResource.java      | 13 ++--
 .../LSMColumnBTreeLocalResourceFactory.java        |  6 +-
 .../lsm/btree/column/impls/lsm/LSMColumnBTree.java |  6 +-
 .../lsm/btree/column/utils/LSMColumnBTreeUtil.java |  4 +-
 .../lsm/btree/dataflow/LSMBTreeLocalResource.java  | 15 ++--
 .../dataflow/LSMBTreeLocalResourceFactory.java     |  8 +-
 .../storage/am/lsm/btree/impls/LSMBTree.java       |  4 +-
 .../storage/am/lsm/btree/utils/LSMBTreeUtil.java   | 19 ++++-
 .../storage/am/lsm/common/api/ILSMIndex.java       | 23 ++++++
 ...ndexInsertUpdateDeleteOperatorNodePushable.java | 14 ++++
 .../am/lsm/common/impls/AbstractLSMIndex.java      | 86 +++++++++++++++++++++-
 .../storage/am/lsm/common/impls/LSMHarness.java    | 10 ++-
 .../storage/am/lsm/btree/impl/TestLsmBtree.java    |  2 +-
 .../lsm/btree/impl/TestLsmBtreeLocalResource.java  |  4 +-
 .../impl/TestLsmBtreeLocalResourceFactory.java     |  2 +-
 40 files changed, 715 insertions(+), 93 deletions(-)

diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/io/PersistedResourceRegistry.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/io/PersistedResourceRegistry.java
index 077b657e33..e754c77175 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/io/PersistedResourceRegistry.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/io/PersistedResourceRegistry.java
@@ -31,6 +31,7 @@ import 
org.apache.asterix.common.context.CorrelatedPrefixMergePolicyFactory;
 import org.apache.asterix.common.context.DatasetInfoProvider;
 import org.apache.asterix.common.context.DatasetLSMComponentIdGeneratorFactory;
 import org.apache.asterix.common.dataflow.DatasetLocalResource;
+import 
org.apache.asterix.common.ioopcallbacks.AtomicLSMIndexIOOperationCallbackFactory;
 import 
org.apache.asterix.common.ioopcallbacks.LSMIndexIOOperationCallbackFactory;
 import 
org.apache.asterix.common.ioopcallbacks.LSMIndexPageWriteCallbackFactory;
 import org.apache.asterix.common.library.LibraryDescriptor;
@@ -190,6 +191,8 @@ public class PersistedResourceRegistry implements 
IPersistedResourceRegistry {
         // ILSMOperationTrackerFactory
         registeredClasses.put("NoOpIOOperationCallbackFactory", 
NoOpIOOperationCallbackFactory.class);
         registeredClasses.put("LSMBTreeIOOperationCallbackFactory", 
LSMIndexIOOperationCallbackFactory.class);
+        registeredClasses.put("AtomicLSMBTreeIOOperationCallbackFactory",
+                AtomicLSMIndexIOOperationCallbackFactory.class);
         registeredClasses.put("LSMIndexPageWriteCallbackFactory", 
LSMIndexPageWriteCallbackFactory.class);
         registeredClasses.put("NoOpPageWriteCallbackFactory", 
NoOpPageWriteCallbackFactory.class);
 
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTracker.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTracker.java
index 250c25f7dc..8f482ee4a9 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTracker.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTracker.java
@@ -23,6 +23,7 @@ import java.util.List;
 
 import org.apache.asterix.common.context.DatasetInfo;
 import org.apache.asterix.common.context.PrimaryIndexOperationTracker;
+import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
 import org.apache.asterix.common.transactions.ILogManager;
 import org.apache.asterix.common.transactions.LogRecord;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -34,8 +35,8 @@ public class TestPrimaryIndexOperationTracker extends 
PrimaryIndexOperationTrack
     private final List<ITestOpCallback<Void>> callbacks = new ArrayList<>();
 
     public TestPrimaryIndexOperationTracker(int datasetID, int partition, 
ILogManager logManager, DatasetInfo dsInfo,
-            ILSMComponentIdGenerator idGenerator) {
-        super(datasetID, partition, logManager, dsInfo, idGenerator);
+            ILSMComponentIdGenerator idGenerator, 
IIndexCheckpointManagerProvider indexCheckpointManagerProvider) {
+        super(datasetID, partition, logManager, dsInfo, idGenerator, 
indexCheckpointManagerProvider);
     }
 
     public void addCallback(ITestOpCallback<Void> callback) {
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTrackerFactory.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTrackerFactory.java
index 38fdf5689d..8cf708bc6f 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTrackerFactory.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTrackerFactory.java
@@ -60,7 +60,8 @@ public class TestPrimaryIndexOperationTrackerFactory extends 
PrimaryIndexOperati
                 Field opTrackersField = 
DatasetResource.class.getDeclaredField("datasetPrimaryOpTrackers");
                 opTracker = new TestPrimaryIndexOperationTracker(datasetId, 
partition,
                         appCtx.getTransactionSubsystem().getLogManager(), 
dsr.getDatasetInfo(),
-                        dslcManager.getComponentIdGenerator(datasetId, 
partition, resource.getPath()));
+                        dslcManager.getComponentIdGenerator(datasetId, 
partition, resource.getPath()),
+                        appCtx.getIndexCheckpointManagerProvider());
                 replaceMapEntry(opTrackersField, dsr, partition, opTracker);
             }
             return opTracker;
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index 117b4fc4d0..f9c410bb44 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -339,7 +339,7 @@ public class DatasetLifecycleManager implements 
IDatasetLifecycleManager, ILifeC
         ILSMComponentIdGenerator idGenerator =
                 new 
LSMComponentIdGenerator(storageProperties.getMemoryComponentsNum(), 
lastValidId);
         PrimaryIndexOperationTracker opTracker = new 
PrimaryIndexOperationTracker(dataset.getDatasetID(), partition,
-                logManager, dataset.getDatasetInfo(), idGenerator);
+                logManager, dataset.getDatasetInfo(), idGenerator, 
indexCheckpointManagerProvider);
         dataset.setPrimaryIndexOperationTracker(partition, opTracker);
         dataset.setIdGenerator(partition, idGenerator);
     }
@@ -424,7 +424,12 @@ public class DatasetLifecycleManager implements 
IDatasetLifecycleManager, ILifeC
             return;
         }
         // ensure all in-flight flushes gets scheduled
-        logManager.log(waitLog);
+        final boolean requiresWaitLog =
+                dsInfo.getIndexes().values().stream().noneMatch(indexInfo -> 
indexInfo.getIndex().isAtomic());
+        if (requiresWaitLog) {
+            logManager.log(waitLog);
+        }
+
         for (PrimaryIndexOperationTracker primaryOpTracker : 
dsr.getOpTrackers()) {
             if (!partitions.test(primaryOpTracker.getPartition())) {
                 continue;
@@ -439,7 +444,9 @@ public class DatasetLifecycleManager implements 
IDatasetLifecycleManager, ILifeC
             primaryOpTracker.flushIfNeeded();
         }
         // ensure requested flushes were scheduled
-        logManager.log(waitLog);
+        if (requiresWaitLog) {
+            logManager.log(waitLog);
+        }
         if (!asyncFlush) {
             List<FlushOperation> flushes = new ArrayList<>();
             for (PrimaryIndexOperationTracker primaryOpTracker : 
dsr.getOpTrackers()) {
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
index 2704e6429b..d9456d2b03 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
@@ -28,14 +28,18 @@ import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.asterix.common.dataflow.LSMIndexUtil;
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.ioopcallbacks.LSMIOOperationCallback;
 import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties;
+import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
+import org.apache.asterix.common.storage.ResourceReference;
 import org.apache.asterix.common.transactions.AbstractOperationCallback;
 import org.apache.asterix.common.transactions.ILogManager;
 import org.apache.asterix.common.transactions.LogRecord;
 import org.apache.asterix.common.utils.TransactionUtil;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.storage.am.common.impls.NoOpIndexAccessParameters;
 import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
@@ -50,6 +54,7 @@ import 
org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
 import 
org.apache.hyracks.storage.am.lsm.common.api.IoOperationCompleteListener;
 import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
 import org.apache.hyracks.storage.am.lsm.common.impls.FlushOperation;
+import 
org.apache.hyracks.storage.am.lsm.common.impls.IndexComponentFileReference;
 import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId;
 import org.apache.hyracks.storage.common.IModificationOperationCallback;
 import org.apache.hyracks.storage.common.ISearchOperationCallback;
@@ -70,14 +75,17 @@ public class PrimaryIndexOperationTracker extends 
BaseOperationTracker implement
     private boolean flushLogCreated = false;
     private final Map<String, FlushOperation> scheduledFlushes = new 
HashMap<>();
     private long lastFlushTime = System.nanoTime();
+    private final Map<String, FlushOperation> lastFlushOperation = new 
HashMap<>();
+    private final IIndexCheckpointManagerProvider 
indexCheckpointManagerProvider;
 
     public PrimaryIndexOperationTracker(int datasetID, int partition, 
ILogManager logManager, DatasetInfo dsInfo,
-            ILSMComponentIdGenerator idGenerator) {
+            ILSMComponentIdGenerator idGenerator, 
IIndexCheckpointManagerProvider indexCheckpointManagerProvider) {
         super(datasetID, dsInfo);
         this.partition = partition;
         this.logManager = logManager;
         this.numActiveOperations = new AtomicInteger();
         this.idGenerator = idGenerator;
+        this.indexCheckpointManagerProvider = indexCheckpointManagerProvider;
     }
 
     @Override
@@ -101,7 +109,7 @@ public class PrimaryIndexOperationTracker extends 
BaseOperationTracker implement
     }
 
     public synchronized void flushIfNeeded() throws HyracksDataException {
-        if (canSafelyFlush()) {
+        if (canSafelyFlush() && !isFlushLogCreated()) {
             flushIfRequested();
         }
     }
@@ -163,9 +171,16 @@ public class PrimaryIndexOperationTracker extends 
BaseOperationTracker implement
                 synchronized (opTracker) {
                     ILSMMemoryComponent memComponent = 
lsmIndex.getCurrentMemoryComponent();
                     if (memComponent.getWriterCount() > 0) {
-                        throw new IllegalStateException(
-                                "Can't request a flush on a component with 
writers inside: Index:" + lsmIndex
-                                        + " Component:" + memComponent);
+                        if (lsmIndex.isAtomic()) {
+                            LOGGER.debug(
+                                    "Can't request a flush on a component with 
writers inside: Index: {} Component: {}",
+                                    lsmIndex, memComponent);
+                            return;
+                        } else {
+                            throw new IllegalStateException(
+                                    "Can't request a flush on a component with 
writers inside: Index:" + lsmIndex
+                                            + " Component:" + memComponent);
+                        }
                     }
                     if (memComponent.getState() == 
ComponentState.READABLE_WRITABLE && memComponent.isModified()) {
                         memComponent.setUnwritable();
@@ -180,7 +195,7 @@ public class PrimaryIndexOperationTracker extends 
BaseOperationTracker implement
                         + " and partition " + partition + " and is modified 
but its component id is null");
             }
             LogRecord logRecord = new LogRecord();
-            if (dsInfo.isDurable()) {
+            if (dsInfo.isDurable() && !primaryLsmIndex.isAtomic()) {
                 /*
                  * Generate a FLUSH log.
                  * Flush will be triggered when the log is written to disk by 
LogFlusher.
@@ -194,7 +209,9 @@ public class PrimaryIndexOperationTracker extends 
BaseOperationTracker implement
                 }
                 flushLogCreated = true;
             } else {
-                //trigger flush for temporary indexes without generating a 
FLUSH log.
+                // trigger flush for temporary indexes and indexes on datasets 
with atomic statements enabled without
+                // generating a FLUSH log.
+                flushLogCreated = true;
                 triggerScheduleFlush(logRecord);
             }
         }
@@ -221,6 +238,16 @@ public class PrimaryIndexOperationTracker extends 
BaseOperationTracker implement
             Map<String, Object> flushMap = new HashMap<>();
             flushMap.put(LSMIOOperationCallback.KEY_FLUSH_LOG_LSN, flushLsn);
             flushMap.put(LSMIOOperationCallback.KEY_NEXT_COMPONENT_ID, 
nextComponentId);
+            for (ILSMIndex lsmIndex : 
dsInfo.getDatasetPartitionOpenIndexes(partition)) {
+                if (lsmIndex.isPrimaryIndex()) {
+                    if (lsmIndex.isCurrentMutableComponentEmpty()) {
+                        LOGGER.debug("Primary index on dataset {} and 
partition {} is empty... skipping flush",
+                                dsInfo.getDatasetID(), partition);
+                        return;
+                    }
+                    break;
+                }
+            }
             synchronized (scheduledFlushes) {
                 for (ILSMIndex lsmIndex : 
dsInfo.getDatasetPartitionOpenIndexes(partition)) {
                     ILSMIndexAccessor accessor = 
lsmIndex.createAccessor(NoOpIndexAccessParameters.INSTANCE);
@@ -228,6 +255,9 @@ public class PrimaryIndexOperationTracker extends 
BaseOperationTracker implement
                     ILSMIOOperation flush = accessor.scheduleFlush();
                     lastFlushTime = System.nanoTime();
                     scheduledFlushes.put(flush.getTarget().getRelativePath(), 
(FlushOperation) flush);
+                    if (lsmIndex.isAtomic()) {
+                        lastFlushOperation.put(lsmIndex.getIndexIdentifier(), 
(FlushOperation) flush);
+                    }
                     flush.addCompleteListener(this);
                 }
             }
@@ -236,6 +266,39 @@ public class PrimaryIndexOperationTracker extends 
BaseOperationTracker implement
         }
     }
 
+    public void commit() throws HyracksDataException {
+        LogRecord logRecord = new LogRecord();
+        triggerScheduleFlush(logRecord);
+        List<FlushOperation> flushes = new ArrayList<>(getScheduledFlushes());
+        LSMIndexUtil.waitFor(flushes);
+
+        Set<ILSMIndex> indexes = 
dsInfo.getDatasetPartitionOpenIndexes(partition);
+        for (ILSMIndex lsmIndex : indexes) {
+            lsmIndex.commit();
+        }
+        for (FlushOperation flush : lastFlushOperation.values()) {
+            FileReference target = flush.getTarget();
+            Map<String, Object> map = flush.getParameters();
+            final LSMComponentId id = (LSMComponentId) 
map.get(LSMIOOperationCallback.KEY_NEXT_COMPONENT_ID);
+            final ResourceReference ref = 
ResourceReference.of(target.getAbsolutePath());
+            final long componentSequence = 
IndexComponentFileReference.of(ref.getName()).getSequenceEnd();
+            indexCheckpointManagerProvider.get(ref).flushed(componentSequence, 
0L, id.getMaxId());
+        }
+        lastFlushOperation.clear();
+
+        for (ILSMIndex lsmIndex : indexes) {
+            lsmIndex.getMergePolicy().diskComponentAdded(lsmIndex, false);
+        }
+    }
+
+    public void abort() throws HyracksDataException {
+        Set<ILSMIndex> indexes = 
dsInfo.getDatasetPartitionOpenIndexes(partition);
+        for (ILSMIndex lsmIndex : indexes) {
+            lsmIndex.abort();
+        }
+        lastFlushOperation.clear();
+    }
+
     @Override
     public void completed(ILSMIOOperation operation) {
         synchronized (scheduledFlushes) {
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMInsertDeleteOperatorNodePushable.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMInsertDeleteOperatorNodePushable.java
index f9934f22d2..421fb7b104 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMInsertDeleteOperatorNodePushable.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMInsertDeleteOperatorNodePushable.java
@@ -21,6 +21,7 @@ package org.apache.asterix.common.dataflow;
 import java.nio.ByteBuffer;
 
 import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.context.PrimaryIndexOperationTracker;
 import org.apache.asterix.common.transactions.ILogMarkerCallback;
 import org.apache.asterix.common.transactions.PrimaryIndexLogMarkerCallback;
 import org.apache.hyracks.api.comm.VSizeFrame;
@@ -48,6 +49,7 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
 import 
org.apache.hyracks.storage.am.lsm.common.dataflow.LSMIndexInsertUpdateDeleteOperatorNodePushable;
 import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
+import org.apache.hyracks.storage.common.IIndex;
 import org.apache.hyracks.storage.common.IIndexAccessParameters;
 import org.apache.hyracks.storage.common.LocalResource;
 
@@ -212,6 +214,11 @@ public class LSMInsertDeleteOperatorNodePushable extends 
LSMIndexInsertUpdateDel
                 failure = ExceptionUtils.suppress(failure, th);
             }
         }
+        if (failure == null && !failed) {
+            commitAtomicInsertDelete();
+        } else {
+            abortAtomicInsertDelete();
+        }
         if (failure != null) {
             throw HyracksDataException.create(failure);
         }
@@ -219,6 +226,7 @@ public class LSMInsertDeleteOperatorNodePushable extends 
LSMIndexInsertUpdateDel
 
     @Override
     public void fail() throws HyracksDataException {
+        this.failed = true;
         if (writerOpen) {
             writer.fail();
         }
@@ -227,4 +235,28 @@ public class LSMInsertDeleteOperatorNodePushable extends 
LSMIndexInsertUpdateDel
     public boolean isPrimary() {
         return isPrimary;
     }
+
+    private void commitAtomicInsertDelete() throws HyracksDataException {
+        if (isPrimary) {
+            for (IIndex index : indexes) {
+                if (((ILSMIndex) index).isAtomic()) {
+                    PrimaryIndexOperationTracker opTracker =
+                            ((PrimaryIndexOperationTracker) ((ILSMIndex) 
index).getOperationTracker());
+                    opTracker.commit();
+                }
+            }
+        }
+    }
+
+    private void abortAtomicInsertDelete() throws HyracksDataException {
+        if (isPrimary) {
+            for (IIndex index : indexes) {
+                if (((ILSMIndex) index).isAtomic()) {
+                    PrimaryIndexOperationTracker opTracker =
+                            ((PrimaryIndexOperationTracker) ((ILSMIndex) 
index).getOperationTracker());
+                    opTracker.abort();
+                }
+            }
+        }
+    }
 }
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AtomicLSMIOOperationCallback.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AtomicLSMIOOperationCallback.java
new file mode 100644
index 0000000000..111c6e7205
--- /dev/null
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AtomicLSMIOOperationCallback.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.common.ioopcallbacks;
+
+import org.apache.asterix.common.context.DatasetInfo;
+import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.util.IoUtil;
+import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
+import 
org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationStatus;
+import 
org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
+
+public class AtomicLSMIOOperationCallback extends LSMIOOperationCallback {
+
+    public AtomicLSMIOOperationCallback(DatasetInfo dsInfo, ILSMIndex 
lsmIndex, ILSMComponentId componentId,
+            IIndexCheckpointManagerProvider indexCheckpointManagerProvider) {
+        super(dsInfo, lsmIndex, componentId, indexCheckpointManagerProvider);
+    }
+
+    @Override
+    public void afterFinalize(ILSMIOOperation operation) throws 
HyracksDataException {
+        if (operation.getStatus() == LSMIOOperationStatus.FAILURE) {
+            return;
+        }
+        if (operation.getIOOpertionType() != LSMIOOperationType.LOAD
+                && operation.getAccessor().getOpContext().getOperation() == 
IndexOperation.DELETE_COMPONENTS) {
+            deleteComponentsFromCheckpoint(operation);
+        } else if (operation.getIOOpertionType() == LSMIOOperationType.LOAD) {
+            addComponentToCheckpoint(operation);
+        } else if (isMerge(operation)) {
+            IoUtil.delete(getOperationMaskFilePath(operation));
+        }
+    }
+}
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AtomicLSMIndexIOOperationCallbackFactory.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AtomicLSMIndexIOOperationCallbackFactory.java
new file mode 100644
index 0000000000..3829c547cd
--- /dev/null
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AtomicLSMIndexIOOperationCallbackFactory.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.common.ioopcallbacks;
+
+import org.apache.asterix.common.api.IDatasetInfoProvider;
+import org.apache.asterix.common.api.ILSMComponentIdGeneratorFactory;
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.IJsonSerializable;
+import org.apache.hyracks.api.io.IPersistedResourceRegistry;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
+
+import com.fasterxml.jackson.databind.JsonNode;
+
+public class AtomicLSMIndexIOOperationCallbackFactory extends 
LSMIndexIOOperationCallbackFactory {
+
+    private static final long serialVersionUID = -2617830712731546932L;
+
+    public 
AtomicLSMIndexIOOperationCallbackFactory(ILSMComponentIdGeneratorFactory 
idGeneratorFactory,
+            IDatasetInfoProvider datasetInfoProvider) {
+        super(idGeneratorFactory, datasetInfoProvider);
+    }
+
+    protected IIndexCheckpointManagerProvider 
getIndexCheckpointManagerProvider() {
+        return ((INcApplicationContext) 
ncCtx.getApplicationContext()).getIndexCheckpointManagerProvider();
+    }
+
+    @Override
+    public ILSMIOOperationCallback createIoOpCallback(ILSMIndex index) throws 
HyracksDataException {
+        return new 
AtomicLSMIOOperationCallback(datasetInfoProvider.getDatasetInfo(ncCtx), index,
+                getComponentIdGenerator().getId(), 
getIndexCheckpointManagerProvider());
+    }
+
+    @SuppressWarnings("squid:S1172") // unused parameter
+    public static IJsonSerializable fromJson(IPersistedResourceRegistry 
registry, JsonNode json)
+            throws HyracksDataException {
+        final ILSMComponentIdGeneratorFactory idGeneratorFactory =
+                (ILSMComponentIdGeneratorFactory) 
registry.deserialize(json.get("idGeneratorFactory"));
+        final IDatasetInfoProvider datasetInfoProvider =
+                (IDatasetInfoProvider) 
registry.deserialize(json.get("datasetInfoProvider"));
+        return new 
AtomicLSMIndexIOOperationCallbackFactory(idGeneratorFactory, 
datasetInfoProvider);
+    }
+}
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java
index 999636da13..9ab4848c5c 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java
@@ -138,7 +138,7 @@ public class LSMIOOperationCallback implements 
ILSMIOOperationCallback {
         }
     }
 
-    private void addComponentToCheckpoint(ILSMIOOperation operation) throws 
HyracksDataException {
+    protected void addComponentToCheckpoint(ILSMIOOperation operation) throws 
HyracksDataException {
         // will always update the checkpoint file even if no new component was 
created
         FileReference target = operation.getTarget();
         Map<String, Object> map = operation.getParameters();
@@ -150,7 +150,7 @@ public class LSMIOOperationCallback implements 
ILSMIOOperationCallback {
         indexCheckpointManagerProvider.get(ref).flushed(componentSequence, 
lsn, id.getMaxId());
     }
 
-    private void deleteComponentsFromCheckpoint(ILSMIOOperation operation) 
throws HyracksDataException {
+    protected void deleteComponentsFromCheckpoint(ILSMIOOperation operation) 
throws HyracksDataException {
         // component was deleted... if a flush, do nothing.. if a merge, must 
update the checkpoint file
         if (operation.getIOOpertionType() == LSMIOOperationType.MERGE) {
             // Get component id of the last disk component
@@ -298,12 +298,12 @@ public class LSMIOOperationCallback implements 
ILSMIOOperationCallback {
         return 
indexCheckpointManagerProvider.get(resourceReference).getValidComponentSequence();
     }
 
-    private boolean isMerge(ILSMIOOperation operation) {
+    protected boolean isMerge(ILSMIOOperation operation) {
         return operation.getIOOpertionType() == LSMIOOperationType.MERGE
                 && operation.getAccessor().getOpContext().getOperation() != 
IndexOperation.DELETE_COMPONENTS;
     }
 
-    private static FileReference getOperationMaskFilePath(ILSMIOOperation 
operation) {
+    protected static FileReference getOperationMaskFilePath(ILSMIOOperation 
operation) {
         FileReference target = operation.getTarget();
         String componentSequence = 
getComponentSequence(target.getFile().getAbsolutePath());
         FileReference idxRelPath = target.getParent();
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
index 8880461685..d7637f4bd2 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
@@ -423,16 +423,17 @@ public class MetadataBootstrap {
         if (createMetadataDataset) {
             final double bloomFilterFalsePositiveRate =
                     
appContext.getStorageProperties().getBloomFilterFalsePositiveRate();
-            LSMBTreeLocalResourceFactory lsmBtreeFactory = new 
LSMBTreeLocalResourceFactory(
-                    storageComponentProvider.getStorageManager(), typeTraits, 
cmpFactories, null, null, null,
-                    opTrackerFactory, ioOpCallbackFactory, 
pageWriteCallbackFactory,
-                    storageComponentProvider.getMetadataPageManagerFactory(),
-                    new AsterixVirtualBufferCacheProvider(datasetId),
-                    storageComponentProvider.getIoOperationSchedulerProvider(),
-                    appContext.getMetadataMergePolicyFactory(), 
StorageConstants.DEFAULT_COMPACTION_POLICY_PROPERTIES,
-                    true, bloomFilterKeyFields, bloomFilterFalsePositiveRate, 
true, null,
-                    NoOpCompressorDecompressorFactory.INSTANCE, true,
-                    
TypeTraitProvider.INSTANCE.getTypeTrait(BuiltinType.ANULL), 
NullIntrospector.INSTANCE, false);
+            LSMBTreeLocalResourceFactory lsmBtreeFactory =
+                    new 
LSMBTreeLocalResourceFactory(storageComponentProvider.getStorageManager(), 
typeTraits,
+                            cmpFactories, null, null, null, opTrackerFactory, 
ioOpCallbackFactory,
+                            pageWriteCallbackFactory, 
storageComponentProvider.getMetadataPageManagerFactory(),
+                            new AsterixVirtualBufferCacheProvider(datasetId),
+                            
storageComponentProvider.getIoOperationSchedulerProvider(),
+                            appContext.getMetadataMergePolicyFactory(),
+                            
StorageConstants.DEFAULT_COMPACTION_POLICY_PROPERTIES, true, 
bloomFilterKeyFields,
+                            bloomFilterFalsePositiveRate, true, null, 
NoOpCompressorDecompressorFactory.INSTANCE, true,
+                            
TypeTraitProvider.INSTANCE.getTypeTrait(BuiltinType.ANULL), 
NullIntrospector.INSTANCE,
+                            false, false);
             DatasetLocalResourceFactory dsLocalResourceFactory =
                     new DatasetLocalResourceFactory(datasetId, 
lsmBtreeFactory);
             // TODO(amoudi) Creating the index should be done through the same 
code path as
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/ArrayBTreeResourceFactoryProvider.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/ArrayBTreeResourceFactoryProvider.java
index 9a2821eece..a61db6f480 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/ArrayBTreeResourceFactoryProvider.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/ArrayBTreeResourceFactoryProvider.java
@@ -100,7 +100,8 @@ public class ArrayBTreeResourceFactoryProvider implements 
IResourceFactoryProvid
                         pageWriteCallbackFactory, metadataPageManagerFactory, 
vbcProvider, ioSchedulerProvider,
                         mergePolicyFactory, mergePolicyProperties, true, null, 
bloomFilterFalsePositiveRate,
                         index.isPrimaryIndex(), btreeFields, 
compDecompFactory, false,
-                        typeTraitProvider.getTypeTrait(BuiltinType.ANULL), 
NullIntrospector.INSTANCE, false);
+                        typeTraitProvider.getTypeTrait(BuiltinType.ANULL), 
NullIntrospector.INSTANCE, false,
+                        dataset.isAtomic());
             default:
                 throw new 
CompilationException(ErrorCode.COMPILATION_UNKNOWN_DATASET_TYPE,
                         dataset.getDatasetType().toString());
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/BTreeResourceFactoryProvider.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/BTreeResourceFactoryProvider.java
index 88e96e73ab..ab4b58552c 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/BTreeResourceFactoryProvider.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/BTreeResourceFactoryProvider.java
@@ -113,7 +113,7 @@ public class BTreeResourceFactoryProvider implements 
IResourceFactoryProvider {
                             mergePolicyFactory, mergePolicyProperties, true, 
bloomFilterFields,
                             bloomFilterFalsePositiveRate, 
index.isPrimaryIndex(), btreeFields, compDecompFactory,
                             hasBloomFilter, 
typeTraitProvider.getTypeTrait(BuiltinType.ANULL),
-                            NullIntrospector.INSTANCE, 
isSecondaryNoIncrementalMaintenance);
+                            NullIntrospector.INSTANCE, 
isSecondaryNoIncrementalMaintenance, dataset.isAtomic());
                 } else {
                     //Column
                     List<Integer> keySourceIndicator =
@@ -127,7 +127,8 @@ public class BTreeResourceFactoryProvider implements 
IResourceFactoryProvider {
                             pageWriteCallbackFactory, 
metadataPageManagerFactory, vbcProvider, ioSchedulerProvider,
                             mergePolicyFactory, mergePolicyProperties, 
bloomFilterFields, bloomFilterFalsePositiveRate,
                             btreeFields, compDecompFactory, 
typeTraitProvider.getTypeTrait(BuiltinType.ANULL),
-                            NullIntrospector.INSTANCE, 
isSecondaryNoIncrementalMaintenance, columnManagerFactory);
+                            NullIntrospector.INSTANCE, 
isSecondaryNoIncrementalMaintenance, columnManagerFactory,
+                            dataset.isAtomic());
                 }
             default:
                 throw new 
CompilationException(ErrorCode.COMPILATION_UNKNOWN_DATASET_TYPE,
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
index b97880e350..e8baeb141b 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
@@ -37,6 +37,7 @@ import 
org.apache.asterix.common.context.IStorageComponentProvider;
 import org.apache.asterix.common.dataflow.NoOpFrameOperationCallbackFactory;
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.common.exceptions.ErrorCode;
+import 
org.apache.asterix.common.ioopcallbacks.AtomicLSMIndexIOOperationCallbackFactory;
 import 
org.apache.asterix.common.ioopcallbacks.LSMIndexIOOperationCallbackFactory;
 import 
org.apache.asterix.common.ioopcallbacks.LSMIndexPageWriteCallbackFactory;
 import org.apache.asterix.common.metadata.DatasetFullyQualifiedName;
@@ -81,6 +82,7 @@ import 
org.apache.asterix.transaction.management.opcallbacks.SecondaryIndexSearc
 import 
org.apache.asterix.transaction.management.opcallbacks.UpsertOperationCallbackFactory;
 import 
org.apache.asterix.transaction.management.resource.DatasetLocalResourceFactory;
 import org.apache.asterix.transaction.management.runtime.CommitRuntimeFactory;
+import 
org.apache.asterix.transaction.management.runtime.NoOpCommitRuntimeFactory;
 import 
org.apache.asterix.transaction.management.service.transaction.DatasetIdFactory;
 import org.apache.commons.lang3.mutable.MutableBoolean;
 import org.apache.commons.lang3.mutable.MutableObject;
@@ -473,7 +475,12 @@ public class Dataset implements IMetadataEntity<Dataset>, 
IDataset {
      */
     @SuppressWarnings("squid:S1172")
     public ILSMIOOperationCallbackFactory getIoOperationCallbackFactory(Index 
index) throws AlgebricksException {
-        return new 
LSMIndexIOOperationCallbackFactory(getComponentIdGeneratorFactory(), 
getDatasetInfoProvider());
+        if (isAtomic()) {
+            return new 
AtomicLSMIndexIOOperationCallbackFactory(getComponentIdGeneratorFactory(),
+                    getDatasetInfoProvider());
+        } else {
+            return new 
LSMIndexIOOperationCallbackFactory(getComponentIdGeneratorFactory(), 
getDatasetInfoProvider());
+        }
     }
 
     public ILSMPageWriteCallbackFactory getPageWriteCallbackFactory() throws 
AlgebricksException {
@@ -499,6 +506,11 @@ public class Dataset implements IMetadataEntity<Dataset>, 
IDataset {
         return new DatasetInfoProvider(getDatasetId());
     }
 
+    public boolean isAtomic() {
+        return datasetType == DatasetType.INTERNAL
+                && ((InternalDatasetDetails) 
getDatasetDetails()).isDatasetWithoutTypeSpecification();
+    }
+
     /**
      * Get search callback factory for this dataset with the passed index and 
operation
      *
@@ -514,6 +526,10 @@ public class Dataset implements IMetadataEntity<Dataset>, 
IDataset {
     public ISearchOperationCallbackFactory 
getSearchCallbackFactory(IStorageComponentProvider storageComponentProvider,
             Index index, IndexOperation op, int[] primaryKeyFields, int[] 
primaryKeyFieldsInSecondaryIndex,
             boolean proceedIndexOnlyPlan) throws AlgebricksException {
+        if (isAtomic()) {
+            return NoOpOperationCallbackFactory.INSTANCE;
+        }
+
         if (index.isPrimaryIndex()) {
             /**
              * Due to the read-committed isolation level,
@@ -566,6 +582,10 @@ public class Dataset implements IMetadataEntity<Dataset>, 
IDataset {
     public IModificationOperationCallbackFactory 
getModificationCallbackFactory(
             IStorageComponentProvider componentProvider, Index index, 
IndexOperation op, int[] primaryKeyFields)
             throws AlgebricksException {
+        if (isAtomic()) {
+            return NoOpOperationCallbackFactory.INSTANCE;
+        }
+
         if (index.isPrimaryIndex()) {
             return op == IndexOperation.UPSERT || op == IndexOperation.INSERT
                     ? new UpsertOperationCallbackFactory(getDatasetId(),
@@ -636,6 +656,10 @@ public class Dataset implements IMetadataEntity<Dataset>, 
IDataset {
         IBinaryHashFunctionFactory[] pkHashFunFactories = 
getPrimaryHashFunctionFactories(metadataProvider);
         ITuplePartitionerFactory partitionerFactory = new 
FieldHashPartitionerFactory(primaryKeyFieldPermutation,
                 pkHashFunFactories, datasetPartitions.length);
+        if (isAtomic()) {
+            return new NoOpCommitRuntimeFactory(datasetId, 
primaryKeyFieldPermutation,
+                    metadataProvider.isWriteTransaction(), datasetPartitions, 
isSink, partitionerFactory);
+        }
         return new CommitRuntimeFactory(datasetId, primaryKeyFieldPermutation, 
metadataProvider.isWriteTransaction(),
                 datasetPartitions, isSink, partitionerFactory);
     }
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/InternalDatasetDetails.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/InternalDatasetDetails.java
index fd19937a2c..5abad5d06a 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/InternalDatasetDetails.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/InternalDatasetDetails.java
@@ -145,6 +145,10 @@ public class InternalDatasetDetails implements 
IDatasetDetails {
         return filterSourceIndicator;
     }
 
+    public boolean isDatasetWithoutTypeSpecification() {
+        return isDatasetWithoutTypeSpecification;
+    }
+
     @Override
     public DatasetType getDatasetType() {
         return DatasetType.INTERNAL;
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java
index ac798aa735..39500539ae 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java
@@ -219,9 +219,10 @@ public class DatasetTupleTranslator extends 
AbstractTupleTranslator<Dataset> {
                     }
                 }
 
+                boolean isDatasetWithoutTypeSpec = primaryKeyTypes != null && 
!primaryKeyTypes.isEmpty();
                 datasetDetails = new InternalDatasetDetails(fileStructure, 
partitioningStrategy, partitioningKey,
                         partitioningKey, keyFieldSourceIndicator, 
primaryKeyTypes, autogenerated, filterSourceIndicator,
-                        filterField);
+                        filterField, isDatasetWithoutTypeSpec);
                 break;
             }
 
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/MetadataLockUtil.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/MetadataLockUtil.java
index 1de387fbab..8ee8f11b6a 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/MetadataLockUtil.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/MetadataLockUtil.java
@@ -248,7 +248,7 @@ public class MetadataLockUtil implements IMetadataLockUtil {
     public void insertDeleteUpsertBegin(IMetadataLockManager lockMgr, LockList 
locks, DataverseName dataverseName,
             String datasetName) throws AlgebricksException {
         lockMgr.acquireDataverseReadLock(locks, dataverseName);
-        lockMgr.acquireDatasetModifyLock(locks, dataverseName, datasetName);
+        lockMgr.acquireDatasetExclusiveModificationLock(locks, dataverseName, 
datasetName);
     }
 
     @Override
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java
index 3762e82991..6e767ed2bf 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java
@@ -21,6 +21,7 @@ package org.apache.asterix.runtime.operators;
 import java.nio.ByteBuffer;
 
 import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.context.PrimaryIndexOperationTracker;
 import org.apache.asterix.common.dataflow.LSMIndexUtil;
 import org.apache.asterix.common.dataflow.NoOpFrameOperationCallbackFactory;
 import org.apache.asterix.common.transactions.ILogMarkerCallback;
@@ -53,6 +54,7 @@ import 
org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
 import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
 import org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallback;
 import org.apache.hyracks.storage.am.lsm.common.api.IFrameTupleProcessor;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
 import 
org.apache.hyracks.storage.am.lsm.common.dataflow.LSMIndexInsertUpdateDeleteOperatorNodePushable;
 import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
@@ -61,6 +63,7 @@ import org.apache.hyracks.storage.common.IIndex;
 import org.apache.hyracks.storage.common.IIndexAccessParameters;
 import org.apache.hyracks.storage.common.IIndexAccessor;
 import org.apache.hyracks.storage.common.IIndexCursor;
+import org.apache.hyracks.storage.common.ISearchOperationCallback;
 import org.apache.hyracks.storage.common.MultiComparator;
 
 import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
@@ -76,7 +79,7 @@ public class LSMPrimaryInsertOperatorNodePushable extends 
LSMIndexInsertUpdateDe
     private MultiComparator keySearchCmp;
     private RangePredicate searchPred;
     private final IIndexCursor[] cursors;
-    private final LockThenSearchOperationCallback[] searchCallbacks;
+    private final ISearchOperationCallback[] searchCallbacks;
     private final ISearchOperationCallbackFactory searchCallbackFactory;
     private final IFrameTupleProcessor[] processors;
     private final LSMTreeIndexAccessor[] lsmAccessorForKeyIndexes;
@@ -101,7 +104,7 @@ public class LSMPrimaryInsertOperatorNodePushable extends 
LSMIndexInsertUpdateDe
                 modCallbackFactory, null, tuplePartitionerFactory, 
partitionsMap);
         this.sourceLoc = sourceLoc;
         this.frameOpCallbacks = new IFrameOperationCallback[partitions.length];
-        this.searchCallbacks = new 
LockThenSearchOperationCallback[partitions.length];
+        this.searchCallbacks = new ISearchOperationCallback[partitions.length];
         this.cursors = new IIndexCursor[partitions.length];
         this.lsmAccessorForUniqunessChecks = new 
LSMTreeIndexAccessor[partitions.length];
         this.lsmAccessorForKeyIndexes = new 
LSMTreeIndexAccessor[partitions.length];
@@ -138,7 +141,7 @@ public class LSMPrimaryInsertOperatorNodePushable extends 
LSMIndexInsertUpdateDe
             IIndexCursor cursor = cursors[i];
             IIndexAccessor indexAccessor = indexAccessors[i];
             LSMTreeIndexAccessor lsmAccessorForKeyIndex = 
lsmAccessorForKeyIndexes[i];
-            LockThenSearchOperationCallback searchCallback = 
searchCallbacks[i];
+            ISearchOperationCallback searchCallback = searchCallbacks[i];
             processors[i] = new IFrameTupleProcessor() {
                 @Override
                 public void process(FrameTupleAccessor accessor, 
ITupleReference tuple, int index)
@@ -155,7 +158,9 @@ public class LSMPrimaryInsertOperatorNodePushable extends 
LSMIndexInsertUpdateDe
                     try {
                         if (cursor.hasNext()) {
                             // duplicate, skip
-                            searchCallback.release();
+                            if (searchCallback instanceof 
LockThenSearchOperationCallback) {
+                                ((LockThenSearchOperationCallback) 
searchCallback).release();
+                            }
                             duplicate = true;
                         }
                     } finally {
@@ -226,7 +231,7 @@ public class LSMPrimaryInsertOperatorNodePushable extends 
LSMIndexInsertUpdateDe
                 }
                 modCallbacks[i] =
                         
modOpCallbackFactory.createModificationOperationCallback(indexHelper.getResource(),
 ctx, this);
-                searchCallbacks[i] = (LockThenSearchOperationCallback) 
searchCallbackFactory
+                searchCallbacks[i] = searchCallbackFactory
                         
.createSearchOperationCallback(indexHelper.getResource().getId(), ctx, this);
                 IIndexAccessParameters iap = new 
IndexAccessParameters(modCallbacks[i], NoOpOperationCallback.INSTANCE);
                 indexAccessors[i] = index.createAccessor(iap);
@@ -240,6 +245,8 @@ public class LSMPrimaryInsertOperatorNodePushable extends 
LSMIndexInsertUpdateDe
                         new 
IndexAccessParameters(NoOpOperationCallback.INSTANCE, searchCallbacks[i]);
                 lsmAccessorForUniqunessChecks[i] =
                         (LSMTreeIndexAccessor) 
indexForUniquessCheck.createAccessor(iapForUniquenessCheck);
+                setAtomicOpContextIfAtomic(indexForUniquessCheck, 
lsmAccessorForUniqunessChecks[i]);
+
                 cursors[i] = 
lsmAccessorForUniqunessChecks[i].createSearchCursor(false);
                 LSMIndexUtil.checkAndSetFirstLSN((AbstractLSMIndex) index,
                         appCtx.getTransactionSubsystem().getLogManager());
@@ -311,6 +318,12 @@ public class LSMPrimaryInsertOperatorNodePushable extends 
LSMIndexInsertUpdateDe
         failure = CleanupUtils.close(writer, failure);
         failure = CleanupUtils.close(indexHelpers, failure);
         failure = CleanupUtils.close(keyIndexHelpers, failure);
+        if (failure == null && !failed) {
+            commitAtomicInsert();
+        } else {
+            abortAtomicInsert();
+        }
+
         if (failure != null) {
             throw HyracksDataException.create(failure);
         }
@@ -318,6 +331,7 @@ public class LSMPrimaryInsertOperatorNodePushable extends 
LSMIndexInsertUpdateDe
 
     @Override
     public void fail() throws HyracksDataException {
+        failed = true;
         writer.fail();
     }
 
@@ -325,4 +339,24 @@ public class LSMPrimaryInsertOperatorNodePushable extends 
LSMIndexInsertUpdateDe
     public void flush() throws HyracksDataException {
         // No op since nextFrame flushes by default
     }
+
+    private void commitAtomicInsert() throws HyracksDataException {
+        for (IIndex index : indexes) {
+            if (((ILSMIndex) index).isAtomic()) {
+                PrimaryIndexOperationTracker opTracker =
+                        ((PrimaryIndexOperationTracker) ((ILSMIndex) 
index).getOperationTracker());
+                opTracker.commit();
+            }
+        }
+    }
+
+    private void abortAtomicInsert() throws HyracksDataException {
+        for (IIndex index : indexes) {
+            if (((ILSMIndex) index).isAtomic()) {
+                PrimaryIndexOperationTracker opTracker =
+                        ((PrimaryIndexOperationTracker) ((ILSMIndex) 
index).getOperationTracker());
+                opTracker.abort();
+            }
+        }
+    }
 }
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
index 3a9a020caf..03130ae24a 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
@@ -26,6 +26,7 @@ import java.text.SimpleDateFormat;
 import java.util.Date;
 
 import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.context.PrimaryIndexOperationTracker;
 import org.apache.asterix.common.dataflow.LSMIndexUtil;
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.transactions.ILogMarkerCallback;
@@ -68,6 +69,7 @@ import 
org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
 import org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallback;
 import 
org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallbackFactory;
 import org.apache.hyracks.storage.am.lsm.common.api.IFrameTupleProcessor;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
 import 
org.apache.hyracks.storage.am.lsm.common.dataflow.LSMIndexInsertUpdateDeleteOperatorNodePushable;
 import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
@@ -76,6 +78,7 @@ import org.apache.hyracks.storage.common.IIndex;
 import org.apache.hyracks.storage.common.IIndexAccessParameters;
 import org.apache.hyracks.storage.common.IIndexCursor;
 import org.apache.hyracks.storage.common.IModificationOperationCallback;
+import org.apache.hyracks.storage.common.ISearchOperationCallback;
 import org.apache.hyracks.storage.common.MultiComparator;
 import org.apache.hyracks.storage.common.projection.ITupleProjector;
 import org.apache.hyracks.storage.common.projection.ITupleProjectorFactory;
@@ -119,7 +122,7 @@ public class LSMPrimaryUpsertOperatorNodePushable extends 
LSMIndexInsertUpdateDe
     private final boolean hasMeta;
     private final int filterFieldIndex;
     private final int metaFieldIndex;
-    protected final LockThenSearchOperationCallback[] searchCallbacks;
+    protected final ISearchOperationCallback[] searchCallbacks;
     protected final IFrameOperationCallback[] frameOpCallbacks;
     private final IFrameOperationCallbackFactory frameOpCallbackFactory;
     private final ISearchOperationCallbackFactory searchCallbackFactory;
@@ -142,7 +145,7 @@ public class LSMPrimaryUpsertOperatorNodePushable extends 
LSMIndexInsertUpdateDe
                 modCallbackFactory, null, tuplePartitionerFactory, 
partitionsMap);
         this.hasSecondaries = hasSecondaries;
         this.frameOpCallbacks = new IFrameOperationCallback[partitions.length];
-        this.searchCallbacks = new 
LockThenSearchOperationCallback[partitions.length];
+        this.searchCallbacks = new ISearchOperationCallback[partitions.length];
         this.cursors = new IIndexCursor[partitions.length];
         this.processors = new IFrameTupleProcessor[partitions.length];
         this.key = new PermutingFrameTupleReference();
@@ -180,7 +183,7 @@ public class LSMPrimaryUpsertOperatorNodePushable extends 
LSMIndexInsertUpdateDe
         for (int i = 0; i < partitions.length; i++) {
             ILSMIndexAccessor lsmAccessor = (ILSMIndexAccessor) 
indexAccessors[i];
             IIndexCursor cursor = cursors[i];
-            LockThenSearchOperationCallback searchCallback = 
searchCallbacks[i];
+            ISearchOperationCallback searchCallback = searchCallbacks[i];
             IModificationOperationCallback modCallback = modCallbacks[i];
             IFrameOperationCallback frameOpCallback = frameOpCallbacks[i];
             processors[i] = new IFrameTupleProcessor() {
@@ -189,8 +192,7 @@ public class LSMPrimaryUpsertOperatorNodePushable extends 
LSMIndexInsertUpdateDe
                         throws HyracksDataException {
                     try {
                         tb.reset();
-                        AbstractIndexModificationOperationCallback 
abstractModCallback =
-                                (AbstractIndexModificationOperationCallback) 
modCallback;
+                        IModificationOperationCallback abstractModCallback = 
modCallback;
                         boolean recordWasInserted = false;
                         boolean recordWasDeleted = false;
                         boolean isDelete = isDeleteOperation(tuple, 
numOfPrimaryKeys);
@@ -223,11 +225,17 @@ public class LSMPrimaryUpsertOperatorNodePushable extends 
LSMIndexInsertUpdateDe
                         if (isDelete && prevTuple != null) {
                             // Only delete if it is a delete and not upsert
                             // And previous tuple with the same key was found
-                            abstractModCallback.setOp(Operation.DELETE);
+                            if (abstractModCallback instanceof 
AbstractIndexModificationOperationCallback) {
+                                ((AbstractIndexModificationOperationCallback) 
abstractModCallback)
+                                        .setOp(Operation.DELETE);
+                            }
                             lsmAccessor.forceDelete(tuple);
                             recordWasDeleted = true;
                         } else if (!isDelete) {
-                            abstractModCallback.setOp(Operation.UPSERT);
+                            if (abstractModCallback instanceof 
AbstractIndexModificationOperationCallback) {
+                                ((AbstractIndexModificationOperationCallback) 
abstractModCallback)
+                                        .setOp(Operation.UPSERT);
+                            }
                             lsmAccessor.forceUpsert(tuple);
                             recordWasInserted = true;
                         }
@@ -296,11 +304,12 @@ public class LSMPrimaryUpsertOperatorNodePushable extends 
LSMIndexInsertUpdateDe
                 }
                 modCallbacks[i] =
                         
modOpCallbackFactory.createModificationOperationCallback(indexHelper.getResource(),
 ctx, this);
-                searchCallbacks[i] = (LockThenSearchOperationCallback) 
searchCallbackFactory
+                searchCallbacks[i] = searchCallbackFactory
                         
.createSearchOperationCallback(indexHelper.getResource().getId(), ctx, this);
                 IIndexAccessParameters iap = new 
IndexAccessParameters(modCallbacks[i], searchCallbacks[i]);
                 iap.getParameters().put(HyracksConstants.TUPLE_PROJECTOR, 
tupleProjector);
                 indexAccessors[i] = indexes[i].createAccessor(iap);
+                setAtomicOpContextIfAtomic(indexes[i], indexAccessors[i]);
                 cursors[i] = ((LSMTreeIndexAccessor) 
indexAccessors[i]).createSearchCursor(false);
                 LSMIndexUtil.checkAndSetFirstLSN((AbstractLSMIndex) indexes[i],
                         appCtx.getTransactionSubsystem().getLogManager());
@@ -353,7 +362,7 @@ public class LSMPrimaryUpsertOperatorNodePushable extends 
LSMIndexInsertUpdateDe
     }
 
     protected void writeOutput(int tupleIndex, boolean recordWasInserted, 
boolean recordWasDeleted,
-            LockThenSearchOperationCallback searchCallback) throws IOException 
{
+            ISearchOperationCallback searchCallback) throws IOException {
         if (recordWasInserted || recordWasDeleted) {
             frameTuple.reset(accessor, tupleIndex);
             for (int i = 0; i < frameTuple.getFieldCount(); i++) {
@@ -363,7 +372,9 @@ public class LSMPrimaryUpsertOperatorNodePushable extends 
LSMIndexInsertUpdateDe
             FrameUtils.appendToWriter(writer, appender, 
tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize());
         } else {
             try {
-                searchCallback.release();
+                if (searchCallback instanceof LockThenSearchOperationCallback) 
{
+                    ((LockThenSearchOperationCallback) 
searchCallback).release();
+                }
             } catch (ACIDException e) {
                 throw HyracksDataException.create(e);
             }
@@ -506,6 +517,12 @@ public class LSMPrimaryUpsertOperatorNodePushable extends 
LSMIndexInsertUpdateDe
         failure = CleanupUtils.destroy(failure, cursors);
         failure = CleanupUtils.close(writer, failure);
         failure = CleanupUtils.close(indexHelpers, failure);
+        if (failure == null && !failed) {
+            commitAtomicUpsert();
+        } else {
+            abortAtomicUpsert();
+        }
+
         if (failure != null) {
             throw HyracksDataException.create(failure);
         }
@@ -531,6 +548,7 @@ public class LSMPrimaryUpsertOperatorNodePushable extends 
LSMIndexInsertUpdateDe
 
     @Override
     public void fail() throws HyracksDataException {
+        failed = true;
         writer.fail();
     }
 
@@ -538,4 +556,24 @@ public class LSMPrimaryUpsertOperatorNodePushable extends 
LSMIndexInsertUpdateDe
     public void flush() throws HyracksDataException {
         // No op since nextFrame flushes by default
     }
+
+    private void commitAtomicUpsert() throws HyracksDataException {
+        for (IIndex index : indexes) {
+            if (((ILSMIndex) index).isAtomic()) {
+                PrimaryIndexOperationTracker opTracker =
+                        ((PrimaryIndexOperationTracker) ((ILSMIndex) 
index).getOperationTracker());
+                opTracker.commit();
+            }
+        }
+    }
+
+    private void abortAtomicUpsert() throws HyracksDataException {
+        for (IIndex index : indexes) {
+            if (((ILSMIndex) index).isAtomic()) {
+                PrimaryIndexOperationTracker opTracker =
+                        ((PrimaryIndexOperationTracker) ((ILSMIndex) 
index).getOperationTracker());
+                opTracker.abort();
+            }
+        }
+    }
 }
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorNodePushable.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorNodePushable.java
index 6d5e88b0cb..3dc6d379c9 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorNodePushable.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorNodePushable.java
@@ -41,6 +41,7 @@ import 
org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory
 import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
 import 
org.apache.hyracks.storage.am.lsm.common.dataflow.LSMIndexInsertUpdateDeleteOperatorNodePushable;
+import org.apache.hyracks.storage.common.IModificationOperationCallback;
 
 /**
  * This operator node is used for secondary indexes with upsert operations.
@@ -101,8 +102,7 @@ public class LSMSecondaryUpsertOperatorNodePushable extends 
LSMIndexInsertUpdate
                 int storagePartition = tuplePartitioner.partition(accessor, i);
                 int storageIdx = 
storagePartitionId2Index.get(storagePartition);
                 ILSMIndexAccessor lsmAccessor = (ILSMIndexAccessor) 
indexAccessors[storageIdx];
-                AbstractIndexModificationOperationCallback abstractModCallback 
=
-                        (AbstractIndexModificationOperationCallback) 
modCallbacks[storageIdx];
+                IModificationOperationCallback abstractModCallback = 
modCallbacks[storageIdx];
                 frameTuple.reset(accessor, i);
                 int operation = 
operationInspector.getIntegerValue(frameTuple.getFieldData(operationFieldIndex),
                         frameTuple.getFieldStart(operationFieldIndex), 
frameTuple.getFieldLength(operationFieldIndex));
@@ -111,23 +111,33 @@ public class LSMSecondaryUpsertOperatorNodePushable 
extends LSMIndexInsertUpdate
 
                 if (operation == UPSERT_NEW) {
                     if (tupleFilterIsNull || tupleFilter.accept(frameTuple)) {
-                        abstractModCallback.setOp(Operation.INSERT);
+                        if (abstractModCallback instanceof 
AbstractIndexModificationOperationCallback) {
+                            ((AbstractIndexModificationOperationCallback) 
abstractModCallback).setOp(Operation.INSERT);
+                        }
                         lsmAccessor.forceInsert(tuple);
                     }
                 } else if (operation == UPSERT_EXISTING) {
                     if (!TupleUtils.equalTuples(tuple, prevTuple, 
numberOfFields)) {
                         if (prevTupleFilterIsNull || 
prevTupleFilter.accept(frameTuple)) {
-                            abstractModCallback.setOp(Operation.DELETE);
+                            if (abstractModCallback instanceof 
AbstractIndexModificationOperationCallback) {
+                                ((AbstractIndexModificationOperationCallback) 
abstractModCallback)
+                                        .setOp(Operation.DELETE);
+                            }
                             lsmAccessor.forceDelete(prevTuple);
                         }
                         if (tupleFilterIsNull || 
tupleFilter.accept(frameTuple)) {
-                            abstractModCallback.setOp(Operation.INSERT);
+                            if (abstractModCallback instanceof 
AbstractIndexModificationOperationCallback) {
+                                ((AbstractIndexModificationOperationCallback) 
abstractModCallback)
+                                        .setOp(Operation.INSERT);
+                            }
                             lsmAccessor.forceInsert(tuple);
                         }
                     }
                 } else if (operation == DELETE_EXISTING) {
                     if (prevTupleFilterIsNull || 
prevTupleFilter.accept(frameTuple)) {
-                        abstractModCallback.setOp(Operation.DELETE);
+                        if (abstractModCallback instanceof 
AbstractIndexModificationOperationCallback) {
+                            ((AbstractIndexModificationOperationCallback) 
abstractModCallback).setOp(Operation.DELETE);
+                        }
                         lsmAccessor.forceDelete(prevTuple);
                     }
                 }
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertWithNestedPlanOperatorNodePushable.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertWithNestedPlanOperatorNodePushable.java
index 17858a3ebe..3eb9437f83 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertWithNestedPlanOperatorNodePushable.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertWithNestedPlanOperatorNodePushable.java
@@ -41,6 +41,7 @@ import 
org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFa
 import 
org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
 import org.apache.hyracks.storage.am.common.tuples.ConcatenatingTupleReference;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
+import org.apache.hyracks.storage.common.IModificationOperationCallback;
 
 public class LSMSecondaryUpsertWithNestedPlanOperatorNodePushable extends 
LSMSecondaryUpsertOperatorNodePushable {
     private final NestedTupleSourceRuntime[] startOfNewKeyPipelines;
@@ -190,11 +191,13 @@ public class 
LSMSecondaryUpsertWithNestedPlanOperatorNodePushable extends LSMSec
                 int storagePartition = 
tuplePartitioner.partition(tuple.getFrameTupleAccessor(), 
tuple.getTupleIndex());
                 int storageIdx = 
storagePartitionId2Index.get(storagePartition);
                 ILSMIndexAccessor workingLSMAccessor = (ILSMIndexAccessor) 
indexAccessors[storageIdx];
-                AbstractIndexModificationOperationCallback abstractModCallback 
=
-                        (AbstractIndexModificationOperationCallback) 
modCallbacks[storageIdx];
+                IModificationOperationCallback abstractModCallback = 
modCallbacks[storageIdx];
                 // Finally, pass the tuple to our accessor. There are only two 
operations: insert or delete.
                 if (this.isInsert) {
-                    
abstractModCallback.setOp(AbstractIndexModificationOperationCallback.Operation.INSERT);
+                    if (abstractModCallback instanceof 
AbstractIndexModificationOperationCallback) {
+                        ((AbstractIndexModificationOperationCallback) 
abstractModCallback)
+                                
.setOp(AbstractIndexModificationOperationCallback.Operation.INSERT);
+                    }
                     try {
                         workingLSMAccessor.forceInsert(endTupleReference);
                     } catch (HyracksDataException e) {
@@ -203,7 +206,10 @@ public class 
LSMSecondaryUpsertWithNestedPlanOperatorNodePushable extends LSMSec
                         }
                     }
                 } else {
-                    
abstractModCallback.setOp(AbstractIndexModificationOperationCallback.Operation.DELETE);
+                    if (abstractModCallback instanceof 
AbstractIndexModificationOperationCallback) {
+                        ((AbstractIndexModificationOperationCallback) 
abstractModCallback)
+                                
.setOp(AbstractIndexModificationOperationCallback.Operation.DELETE);
+                    }
                     workingLSMAccessor.forceDelete(endTupleReference);
                 }
             }
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/NoOpCommitRuntime.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/NoOpCommitRuntime.java
new file mode 100644
index 0000000000..2312f15920
--- /dev/null
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/NoOpCommitRuntime.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.transaction.management.runtime;
+
+import java.nio.ByteBuffer;
+
+import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.asterix.common.transactions.TxnId;
+import org.apache.hyracks.api.comm.IFrame;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionerFactory;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.util.HyracksConstants;
+import org.apache.hyracks.dataflow.common.io.MessagingFrameTupleAppender;
+import org.apache.hyracks.dataflow.common.utils.TaskUtil;
+
+public class NoOpCommitRuntime extends CommitRuntime {
+
+    public NoOpCommitRuntime(IHyracksTaskContext ctx, TxnId txnId, int 
datasetId, int[] primaryKeyFields,
+            boolean isWriteTransaction, int resourcePartition, boolean isSink,
+            ITuplePartitionerFactory partitionerFactory, int[] 
datasetPartitions) {
+        super(ctx, txnId, datasetId, primaryKeyFields, isWriteTransaction, 
resourcePartition, isSink,
+                partitionerFactory, datasetPartitions);
+    }
+
+    @Override
+    public void open() throws HyracksDataException {
+        try {
+            transactionContext = 
transactionManager.getTransactionContext(txnId);
+            transactionContext.setWriteTxn(isWriteTransaction);
+            if (isSink) {
+                return;
+            }
+            initAccessAppend(ctx);
+            super.open();
+        } catch (ACIDException e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    @Override
+    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        tAccess.reset(buffer);
+        int nTuple = tAccess.getTupleCount();
+        for (int t = 0; t < nTuple; t++) {
+            tRef.reset(tAccess, t);
+            try {
+                if (!isSink) {
+                    appendTupleToFrame(t);
+                }
+            } catch (ACIDException e) {
+                throw HyracksDataException.create(e);
+            }
+        }
+        IFrame message = TaskUtil.get(HyracksConstants.KEY_MESSAGE, ctx);
+        if (message != null
+                && MessagingFrameTupleAppender.getMessageType(message) == 
MessagingFrameTupleAppender.MARKER_MESSAGE) {
+            message.reset();
+            
message.getBuffer().put(MessagingFrameTupleAppender.NULL_FEED_MESSAGE);
+            message.getBuffer().flip();
+        }
+    }
+}
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/NoOpCommitRuntimeFactory.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/NoOpCommitRuntimeFactory.java
new file mode 100644
index 0000000000..bfe610b278
--- /dev/null
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/NoOpCommitRuntimeFactory.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.transaction.management.runtime;
+
+import org.apache.asterix.common.api.IJobEventListenerFactory;
+import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionerFactory;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.IJobletEventListenerFactory;
+
+public class NoOpCommitRuntimeFactory extends CommitRuntimeFactory {
+
+    private static final long serialVersionUID = 2L;
+
+    public NoOpCommitRuntimeFactory(int datasetId, int[] primaryKeyFields, 
boolean isWriteTransaction,
+            int[] datasetPartitions, boolean isSink, ITuplePartitionerFactory 
partitionerFactory) {
+        super(datasetId, primaryKeyFields, isWriteTransaction, 
datasetPartitions, isSink, partitionerFactory);
+    }
+
+    @Override
+    public IPushRuntime[] createPushRuntime(IHyracksTaskContext ctx) throws 
HyracksDataException {
+        IJobletEventListenerFactory fact = 
ctx.getJobletContext().getJobletEventListenerFactory();
+        return new IPushRuntime[] { new NoOpCommitRuntime(ctx, 
((IJobEventListenerFactory) fact).getTxnId(datasetId),
+                datasetId, primaryKeyFields, isWriteTransaction,
+                
datasetPartitions[ctx.getTaskAttemptId().getTaskId().getPartition()], isSink, 
partitionerFactory,
+                datasetPartitions) };
+    }
+}
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/HyracksConstants.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/HyracksConstants.java
index 3c829e65ee..b076cf2c0d 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/HyracksConstants.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/HyracksConstants.java
@@ -30,6 +30,8 @@ public class HyracksConstants {
 
     public static final String TUPLE_PROJECTOR = "TUPLE_PROJECTOR";
 
+    public static final String ATOMIC_OP_CONTEXT = "ATOMIC_OP_CONTEXT";
+
     private HyracksConstants() {
     }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/lsm/btree/LSMBTreeOperatorTestHelper.java
 
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/lsm/btree/LSMBTreeOperatorTestHelper.java
index c3bc5cacb6..e279eaa844 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/lsm/btree/LSMBTreeOperatorTestHelper.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/lsm/btree/LSMBTreeOperatorTestHelper.java
@@ -49,6 +49,6 @@ public class LSMBTreeOperatorTestHelper extends 
LSMTreeOperatorTestHelper {
                 getVirtualBufferCacheProvider(), 
SynchronousSchedulerProvider.INSTANCE, MERGE_POLICY_FACTORY,
                 MERGE_POLICY_PROPERTIES, DURABLE, bloomFilterKeyFields,
                 
LSMTreeOperatorTestHelper.DEFAULT_BLOOM_FILTER_FALSE_POSITIVE_RATE, true, 
btreefields,
-                NoOpCompressorDecompressorFactory.INSTANCE, 
bloomFilterKeyFields != null, null, null, false);
+                NoOpCompressorDecompressorFactory.INSTANCE, 
bloomFilterKeyFields != null, null, null, false, false);
     }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexInsertUpdateDeleteOperatorNodePushable.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexInsertUpdateDeleteOperatorNodePushable.java
index 422aef36fc..10425f4d86 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexInsertUpdateDeleteOperatorNodePushable.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexInsertUpdateDeleteOperatorNodePushable.java
@@ -69,6 +69,7 @@ public class IndexInsertUpdateDeleteOperatorNodePushable 
extends AbstractUnaryIn
     protected final int[] partitions;
     protected final Int2IntMap storagePartitionId2Index;
     protected boolean writerOpen;
+    protected boolean failed;
 
     public IndexInsertUpdateDeleteOperatorNodePushable(IHyracksTaskContext 
ctx, int partition,
             IIndexDataflowHelperFactory indexHelperFactory, int[] 
fieldPermutation, RecordDescriptor inputRecDesc,
@@ -92,6 +93,7 @@ public class IndexInsertUpdateDeleteOperatorNodePushable 
extends AbstractUnaryIn
         this.op = op;
         this.tuple.setFieldPermutation(fieldPermutation);
         this.tuplePartitioner = tuplePartitionerFactory.createPartitioner(ctx);
+        this.failed = false;
     }
 
     @Override
@@ -203,6 +205,7 @@ public class IndexInsertUpdateDeleteOperatorNodePushable 
extends AbstractUnaryIn
 
     @Override
     public void fail() throws HyracksDataException {
+        failed = true;
         if (writerOpen) {
             writer.fail();
         }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/dataflow/LSMColumnBTreeLocalResource.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/dataflow/LSMColumnBTreeLocalResource.java
index 8c47bba1ff..a0b592a5e1 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/dataflow/LSMColumnBTreeLocalResource.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/dataflow/LSMColumnBTreeLocalResource.java
@@ -61,22 +61,22 @@ public class LSMColumnBTreeLocalResource extends 
LSMBTreeLocalResource {
             ILSMIOOperationSchedulerProvider ioSchedulerProvider,
             ICompressorDecompressorFactory compressorDecompressorFactory, 
ITypeTraits nullTypeTraits,
             INullIntrospector nullIntrospector, boolean 
isSecondaryNoIncrementalMaintenance,
-            IColumnManagerFactory columnManagerFactory) {
+            IColumnManagerFactory columnManagerFactory, boolean atomic) {
         super(typeTraits, cmpFactories, bloomFilterKeyFields, 
bloomFilterFalsePositiveRate, true, path, storageManager,
                 mergePolicyFactory, mergePolicyProperties, null, null, 
btreeFields, null, opTrackerProvider,
                 ioOpCallbackFactory, pageWriteCallbackFactory, 
metadataPageManagerFactory, vbcProvider,
                 ioSchedulerProvider, true, compressorDecompressorFactory, 
true, nullTypeTraits, nullIntrospector,
-                isSecondaryNoIncrementalMaintenance);
+                isSecondaryNoIncrementalMaintenance, atomic);
         this.columnManagerFactory = columnManagerFactory;
     }
 
     private LSMColumnBTreeLocalResource(IPersistedResourceRegistry registry, 
JsonNode json, int[] bloomFilterKeyFields,
             double bloomFilterFalsePositiveRate, boolean isPrimary, int[] 
btreeFields,
             ICompressorDecompressorFactory compressorDecompressorFactory, 
boolean hasBloomFilter,
-            boolean isSecondaryNoIncrementalMaintenance, IColumnManagerFactory 
columnManagerFactory)
+            boolean isSecondaryNoIncrementalMaintenance, IColumnManagerFactory 
columnManagerFactory, boolean atomic)
             throws HyracksDataException {
         super(registry, json, bloomFilterKeyFields, 
bloomFilterFalsePositiveRate, isPrimary, btreeFields,
-                compressorDecompressorFactory, hasBloomFilter, 
isSecondaryNoIncrementalMaintenance);
+                compressorDecompressorFactory, hasBloomFilter, 
isSecondaryNoIncrementalMaintenance, atomic);
         this.columnManagerFactory = columnManagerFactory;
     }
 
@@ -93,7 +93,7 @@ public class LSMColumnBTreeLocalResource extends 
LSMBTreeLocalResource {
                 opTrackerProvider.getOperationTracker(serviceCtx, this), 
ioSchedulerProvider.getIoScheduler(serviceCtx),
                 ioOpCallbackFactory, pageWriteCallbackFactory, btreeFields, 
metadataPageManagerFactory, false,
                 serviceCtx.getTracer(), compressorDecompressorFactory, 
nullTypeTraits, nullIntrospector,
-                columnManagerFactory);
+                columnManagerFactory, atomic);
     }
 
     public static IJsonSerializable fromJson(IPersistedResourceRegistry 
registry, JsonNode json)
@@ -109,11 +109,12 @@ public class LSMColumnBTreeLocalResource extends 
LSMBTreeLocalResource {
         JsonNode columnManagerFactoryNode = json.get("columnManagerFactory");
         boolean isSecondaryNoIncrementalMaintenance =
                 getOrDefaultBoolean(json, 
"isSecondaryNoIncrementalMaintenance", false);
+        boolean atomic = getOrDefaultBoolean(json, "atomic", false);
         IColumnManagerFactory columnManagerFactory =
                 (IColumnManagerFactory) 
registry.deserialize(columnManagerFactoryNode);
         return new LSMColumnBTreeLocalResource(registry, json, 
bloomFilterKeyFields, bloomFilterFalsePositiveRate,
                 isPrimary, btreeFields, compDecompFactory, hasBloomFilter, 
isSecondaryNoIncrementalMaintenance,
-                columnManagerFactory);
+                columnManagerFactory, atomic);
     }
 
     @Override
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/dataflow/LSMColumnBTreeLocalResourceFactory.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/dataflow/LSMColumnBTreeLocalResourceFactory.java
index eccb7c2ceb..2cd045a5d9 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/dataflow/LSMColumnBTreeLocalResourceFactory.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/dataflow/LSMColumnBTreeLocalResourceFactory.java
@@ -51,12 +51,12 @@ public class LSMColumnBTreeLocalResourceFactory extends 
LSMBTreeLocalResourceFac
             Map<String, String> mergePolicyProperties, int[] 
bloomFilterKeyFields, double bloomFilterFalsePositiveRate,
             int[] btreeFields, ICompressorDecompressorFactory 
compressorDecompressorFactory, ITypeTraits nullTypeTraits,
             INullIntrospector nullIntrospector, boolean 
isSecondaryNoIncrementalMaintenance,
-            IColumnManagerFactory columnManagerFactory) {
+            IColumnManagerFactory columnManagerFactory, boolean atomic) {
         super(storageManager, typeTraits, cmpFactories, filterTypeTraits, 
filterCmpFactories, filterFields,
                 opTrackerFactory, ioOpCallbackFactory, 
pageWriteCallbackFactory, metadataPageManagerFactory,
                 vbcProvider, ioSchedulerProvider, mergePolicyFactory, 
mergePolicyProperties, true, bloomFilterKeyFields,
                 bloomFilterFalsePositiveRate, true, btreeFields, 
compressorDecompressorFactory, true, nullTypeTraits,
-                nullIntrospector, isSecondaryNoIncrementalMaintenance);
+                nullIntrospector, isSecondaryNoIncrementalMaintenance, atomic);
         this.columnManagerFactory = columnManagerFactory;
     }
 
@@ -66,6 +66,6 @@ public class LSMColumnBTreeLocalResourceFactory extends 
LSMBTreeLocalResourceFac
                 bloomFilterFalsePositiveRate, fileRef.getRelativePath(), 
storageManager, mergePolicyFactory,
                 mergePolicyProperties, btreeFields, opTrackerProvider, 
ioOpCallbackFactory, pageWriteCallbackFactory,
                 metadataPageManagerFactory, vbcProvider, ioSchedulerProvider, 
compressorDecompressorFactory,
-                nullTypeTraits, nullIntrospector, 
isSecondaryNoIncrementalMaintenance, columnManagerFactory);
+                nullTypeTraits, nullIntrospector, 
isSecondaryNoIncrementalMaintenance, columnManagerFactory, atomic);
     }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/LSMColumnBTree.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/LSMColumnBTree.java
index 048d9dedd1..e95b3808ca 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/LSMColumnBTree.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/LSMColumnBTree.java
@@ -70,11 +70,13 @@ public class LSMColumnBTree extends LSMBTree {
             double bloomFilterFalsePositiveRate, int fieldCount, 
IBinaryComparatorFactory[] cmpFactories,
             ILSMMergePolicy mergePolicy, ILSMOperationTracker opTracker, 
ILSMIOOperationScheduler ioScheduler,
             ILSMIOOperationCallbackFactory ioOpCallbackFactory, 
ILSMPageWriteCallbackFactory pageWriteCallbackFactory,
-            int[] btreeFields, ITracer tracer, IColumnManager columnManager) 
throws HyracksDataException {
+            int[] btreeFields, ITracer tracer, IColumnManager columnManager, 
boolean atomic)
+            throws HyracksDataException {
         super(ioManager, virtualBufferCaches, interiorFrameFactory, 
insertLeafFrameFactory, deleteLeafFrameFactory,
                 diskBufferCache, fileManager, componentFactory, 
bulkloadComponentFactory, null, null, null,
                 bloomFilterFalsePositiveRate, fieldCount, cmpFactories, 
mergePolicy, opTracker, ioScheduler,
-                ioOpCallbackFactory, pageWriteCallbackFactory, true, true, 
btreeFields, null, true, false, tracer);
+                ioOpCallbackFactory, pageWriteCallbackFactory, true, true, 
btreeFields, null, true, false, tracer,
+                atomic);
         this.columnManager = columnManager;
         this.mergeComponentFactory = mergeComponentFactory;
     }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/utils/LSMColumnBTreeUtil.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/utils/LSMColumnBTreeUtil.java
index 1a554477f6..6ef0dc6f1d 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/utils/LSMColumnBTreeUtil.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/utils/LSMColumnBTreeUtil.java
@@ -63,7 +63,7 @@ public class LSMColumnBTreeUtil {
             ILSMIOOperationCallbackFactory ioOpCallbackFactory, 
ILSMPageWriteCallbackFactory pageWriteCallbackFactory,
             int[] btreeFields, IMetadataPageManagerFactory 
freePageManagerFactory, boolean updateAware, ITracer tracer,
             ICompressorDecompressorFactory compressorDecompressorFactory, 
ITypeTraits nullTypeTraits,
-            INullIntrospector nullIntrospector, IColumnManagerFactory 
columnManagerFactory)
+            INullIntrospector nullIntrospector, IColumnManagerFactory 
columnManagerFactory, boolean atomic)
             throws HyracksDataException {
 
         //Tuple writers
@@ -111,6 +111,6 @@ public class LSMColumnBTreeUtil {
                 deleteLeafFrameFactory, diskBufferCache, fileNameManager, 
flushComponentFactory, mergeComponentFactory,
                 bulkLoadComponentFactory, bloomFilterFalsePositiveRate, 
typeTraits.length, cmpFactories, mergePolicy,
                 opTracker, ioScheduler, ioOpCallbackFactory, 
pageWriteCallbackFactory, btreeFields, tracer,
-                columnManagerFactory.createColumnManager());
+                columnManagerFactory.createColumnManager(), atomic);
     }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResource.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResource.java
index 0a47fc0137..43555ca126 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResource.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResource.java
@@ -60,6 +60,7 @@ public class LSMBTreeLocalResource extends LsmResource {
     protected final int[] btreeFields;
     protected final ICompressorDecompressorFactory 
compressorDecompressorFactory;
     protected final boolean isSecondaryNoIncrementalMaintenance;
+    protected final boolean atomic;
 
     public LSMBTreeLocalResource(ITypeTraits[] typeTraits, 
IBinaryComparatorFactory[] cmpFactories,
             int[] bloomFilterKeyFields, double bloomFilterFalsePositiveRate, 
boolean isPrimary, String path,
@@ -71,8 +72,8 @@ public class LSMBTreeLocalResource extends LsmResource {
             IMetadataPageManagerFactory metadataPageManagerFactory, 
IVirtualBufferCacheProvider vbcProvider,
             ILSMIOOperationSchedulerProvider ioSchedulerProvider, boolean 
durable,
             ICompressorDecompressorFactory compressorDecompressorFactory, 
boolean hasBloomFilter,
-            ITypeTraits nullTypeTraits, INullIntrospector nullIntrospector,
-            boolean isSecondaryNoIncrementalMaintenance) {
+            ITypeTraits nullTypeTraits, INullIntrospector nullIntrospector, 
boolean isSecondaryNoIncrementalMaintenance,
+            boolean atomic) {
         super(path, storageManager, typeTraits, cmpFactories, 
filterTypeTraits, filterCmpFactories, filterFields,
                 opTrackerProvider, ioOpCallbackFactory, 
pageWriteCallbackFactory, metadataPageManagerFactory,
                 vbcProvider, ioSchedulerProvider, mergePolicyFactory, 
mergePolicyProperties, durable, nullTypeTraits,
@@ -84,12 +85,13 @@ public class LSMBTreeLocalResource extends LsmResource {
         this.compressorDecompressorFactory = compressorDecompressorFactory;
         this.hasBloomFilter = hasBloomFilter;
         this.isSecondaryNoIncrementalMaintenance = 
isSecondaryNoIncrementalMaintenance;
+        this.atomic = atomic;
     }
 
     protected LSMBTreeLocalResource(IPersistedResourceRegistry registry, 
JsonNode json, int[] bloomFilterKeyFields,
             double bloomFilterFalsePositiveRate, boolean isPrimary, int[] 
btreeFields,
             ICompressorDecompressorFactory compressorDecompressorFactory, 
boolean hasBloomFilter,
-            boolean isSecondaryNoIncrementalMaintenance) throws 
HyracksDataException {
+            boolean isSecondaryNoIncrementalMaintenance, boolean atomic) 
throws HyracksDataException {
         super(registry, json);
         this.bloomFilterKeyFields = bloomFilterKeyFields;
         this.bloomFilterFalsePositiveRate = bloomFilterFalsePositiveRate;
@@ -98,6 +100,7 @@ public class LSMBTreeLocalResource extends LsmResource {
         this.compressorDecompressorFactory = compressorDecompressorFactory;
         this.hasBloomFilter = hasBloomFilter;
         this.isSecondaryNoIncrementalMaintenance = 
isSecondaryNoIncrementalMaintenance;
+        this.atomic = atomic;
     }
 
     @Override
@@ -115,7 +118,7 @@ public class LSMBTreeLocalResource extends LsmResource {
                 opTrackerProvider.getOperationTracker(serviceCtx, this), 
ioSchedulerProvider.getIoScheduler(serviceCtx),
                 ioOpCallbackFactory, pageWriteCallbackFactory, isPrimary, 
filterTypeTraits, filterCmpFactories,
                 btreeFields, filterFields, durable, 
metadataPageManagerFactory, updateAware, serviceCtx.getTracer(),
-                compressorDecompressorFactory, hasBloomFilter, nullTypeTraits, 
nullIntrospector);
+                compressorDecompressorFactory, hasBloomFilter, nullTypeTraits, 
nullIntrospector, atomic);
     }
 
     public boolean isSecondaryNoIncrementalMaintenance() {
@@ -141,8 +144,9 @@ public class LSMBTreeLocalResource extends LsmResource {
                 .deserializeOrDefault(compressorDecompressorNode, 
NoOpCompressorDecompressorFactory.class);
         boolean isSecondaryNoIncrementalMaintenance =
                 getOrDefaultBoolean(json, 
"isSecondaryNoIncrementalMaintenance", false);
+        boolean atomic = getOrDefaultBoolean(json, "atomic", false);
         return new LSMBTreeLocalResource(registry, json, bloomFilterKeyFields, 
bloomFilterFalsePositiveRate, isPrimary,
-                btreeFields, compDecompFactory, hasBloomFilter, 
isSecondaryNoIncrementalMaintenance);
+                btreeFields, compDecompFactory, hasBloomFilter, 
isSecondaryNoIncrementalMaintenance, atomic);
     }
 
     @Override
@@ -156,6 +160,7 @@ public class LSMBTreeLocalResource extends LsmResource {
         json.putPOJO("btreeFields", btreeFields);
         json.putPOJO("compressorDecompressorFactory", 
compressorDecompressorFactory.toJson(registry));
         json.put("isSecondaryNoIncrementalMaintenance", 
isSecondaryNoIncrementalMaintenance);
+        json.put("atomic", atomic);
     }
 
     protected static boolean getOrDefaultHasBloomFilter(JsonNode json, boolean 
isPrimary) {
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResourceFactory.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResourceFactory.java
index 6695e9057e..bb44655d74 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResourceFactory.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResourceFactory.java
@@ -49,6 +49,7 @@ public class LSMBTreeLocalResourceFactory extends 
LsmResourceFactory {
     protected final int[] btreeFields;
     protected final ICompressorDecompressorFactory 
compressorDecompressorFactory;
     protected final boolean isSecondaryNoIncrementalMaintenance;
+    protected final boolean atomic;
 
     public LSMBTreeLocalResourceFactory(IStorageManager storageManager, 
ITypeTraits[] typeTraits,
             IBinaryComparatorFactory[] cmpFactories, ITypeTraits[] 
filterTypeTraits,
@@ -60,8 +61,8 @@ public class LSMBTreeLocalResourceFactory extends 
LsmResourceFactory {
             Map<String, String> mergePolicyProperties, boolean durable, int[] 
bloomFilterKeyFields,
             double bloomFilterFalsePositiveRate, boolean isPrimary, int[] 
btreeFields,
             ICompressorDecompressorFactory compressorDecompressorFactory, 
boolean hasBloomFilter,
-            ITypeTraits nullTypeTraits, INullIntrospector nullIntrospector,
-            boolean isSecondaryNoIncrementalMaintenance) {
+            ITypeTraits nullTypeTraits, INullIntrospector nullIntrospector, 
boolean isSecondaryNoIncrementalMaintenance,
+            boolean atomic) {
         super(storageManager, typeTraits, cmpFactories, filterTypeTraits, 
filterCmpFactories, filterFields,
                 opTrackerFactory, ioOpCallbackFactory, 
pageWriteCallbackFactory, metadataPageManagerFactory,
                 vbcProvider, ioSchedulerProvider, mergePolicyFactory, 
mergePolicyProperties, durable, nullTypeTraits,
@@ -73,6 +74,7 @@ public class LSMBTreeLocalResourceFactory extends 
LsmResourceFactory {
         this.btreeFields = btreeFields;
         this.compressorDecompressorFactory = compressorDecompressorFactory;
         this.isSecondaryNoIncrementalMaintenance = 
isSecondaryNoIncrementalMaintenance;
+        this.atomic = atomic;
     }
 
     @Override
@@ -82,7 +84,7 @@ public class LSMBTreeLocalResourceFactory extends 
LsmResourceFactory {
                 filterTypeTraits, filterCmpFactories, btreeFields, 
filterFields, opTrackerProvider, ioOpCallbackFactory,
                 pageWriteCallbackFactory, metadataPageManagerFactory, 
vbcProvider, ioSchedulerProvider, durable,
                 compressorDecompressorFactory, hasBloomFilter, nullTypeTraits, 
nullIntrospector,
-                isSecondaryNoIncrementalMaintenance);
+                isSecondaryNoIncrementalMaintenance, atomic);
     }
 
     private void readObject(java.io.ObjectInputStream in) throws IOException, 
ClassNotFoundException {
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
index eceefc68c5..05e78dd44d 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
@@ -100,11 +100,11 @@ public class LSMBTree extends AbstractLSMIndex implements 
ITreeIndex {
             ILSMMergePolicy mergePolicy, ILSMOperationTracker opTracker, 
ILSMIOOperationScheduler ioScheduler,
             ILSMIOOperationCallbackFactory ioOpCallbackFactory, 
ILSMPageWriteCallbackFactory pageWriteCallbackFactory,
             boolean needKeyDupCheck, boolean hasBloomFilter, int[] 
btreeFields, int[] filterFields, boolean durable,
-            boolean updateAware, ITracer tracer) throws HyracksDataException {
+            boolean updateAware, ITracer tracer, boolean atomic) throws 
HyracksDataException {
         super(ioManager, virtualBufferCaches, diskBufferCache, fileManager, 
bloomFilterFalsePositiveRate, mergePolicy,
                 opTracker, ioScheduler, ioOpCallbackFactory, 
pageWriteCallbackFactory, componentFactory,
                 bulkLoadComponentFactory, filterFrameFactory, filterManager, 
filterFields, durable, filterHelper,
-                btreeFields, tracer);
+                btreeFields, tracer, atomic);
         this.insertLeafFrameFactory = insertLeafFrameFactory;
         this.deleteLeafFrameFactory = deleteLeafFrameFactory;
         this.cmpFactories = cmpFactories;
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/utils/LSMBTreeUtil.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/utils/LSMBTreeUtil.java
index c3ca3d5bde..3106b2924c 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/utils/LSMBTreeUtil.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/utils/LSMBTreeUtil.java
@@ -72,6 +72,23 @@ public class LSMBTreeUtil {
             boolean updateAware, ITracer tracer, 
ICompressorDecompressorFactory compressorDecompressorFactory,
             boolean hasBloomFilter, ITypeTraits nullTypeTraits, 
INullIntrospector nullIntrospector)
             throws HyracksDataException {
+        return createLSMTree(ioManager, virtualBufferCaches, file, 
diskBufferCache, typeTraits, cmpFactories,
+                bloomFilterKeyFields, bloomFilterFalsePositiveRate, 
mergePolicy, opTracker, ioScheduler,
+                ioOpCallbackFactory, pageWriteCallbackFactory, 
needKeyDupCheck, filterTypeTraits, filterCmpFactories,
+                btreeFields, filterFields, durable, freePageManagerFactory, 
updateAware, tracer,
+                compressorDecompressorFactory, hasBloomFilter, nullTypeTraits, 
nullIntrospector, false);
+    }
+
+    public static LSMBTree createLSMTree(IIOManager ioManager, 
List<IVirtualBufferCache> virtualBufferCaches,
+            FileReference file, IBufferCache diskBufferCache, ITypeTraits[] 
typeTraits,
+            IBinaryComparatorFactory[] cmpFactories, int[] 
bloomFilterKeyFields, double bloomFilterFalsePositiveRate,
+            ILSMMergePolicy mergePolicy, ILSMOperationTracker opTracker, 
ILSMIOOperationScheduler ioScheduler,
+            ILSMIOOperationCallbackFactory ioOpCallbackFactory, 
ILSMPageWriteCallbackFactory pageWriteCallbackFactory,
+            boolean needKeyDupCheck, ITypeTraits[] filterTypeTraits, 
IBinaryComparatorFactory[] filterCmpFactories,
+            int[] btreeFields, int[] filterFields, boolean durable, 
IMetadataPageManagerFactory freePageManagerFactory,
+            boolean updateAware, ITracer tracer, 
ICompressorDecompressorFactory compressorDecompressorFactory,
+            boolean hasBloomFilter, ITypeTraits nullTypeTraits, 
INullIntrospector nullIntrospector, boolean atomic)
+            throws HyracksDataException {
         LSMBTreeTupleWriterFactory insertTupleWriterFactory = new 
LSMBTreeTupleWriterFactory(typeTraits,
                 cmpFactories.length, false, updateAware, nullTypeTraits, 
nullIntrospector);
         LSMBTreeTupleWriterFactory deleteTupleWriterFactory = new 
LSMBTreeTupleWriterFactory(typeTraits,
@@ -124,6 +141,6 @@ public class LSMBTreeUtil {
                 deleteLeafFrameFactory, diskBufferCache, fileNameManager, 
componentFactory, bulkLoadComponentFactory,
                 filterHelper, filterFrameFactory, filterManager, 
bloomFilterFalsePositiveRate, typeTraits.length,
                 cmpFactories, mergePolicy, opTracker, ioScheduler, 
ioOpCallbackFactory, pageWriteCallbackFactory,
-                needKeyDupCheck, hasBloomFilter, btreeFields, filterFields, 
durable, updateAware, tracer);
+                needKeyDupCheck, hasBloomFilter, btreeFields, filterFields, 
durable, updateAware, tracer, atomic);
     }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java
index 69b954775b..4756921400 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java
@@ -43,6 +43,27 @@ import org.apache.hyracks.storage.common.ISearchPredicate;
  */
 public interface ILSMIndex extends IIndex {
 
+    boolean isAtomic();
+
+    /**
+     * Commits the atomic statement by adding all the temporary disk 
components generated by an statement to the
+     * list of disk components. Queries after call to this function will be 
able to read all the changes made by
+     * the insert/upsert/delete statement.
+     *
+     * @throws HyracksDataException
+     */
+    void commit() throws HyracksDataException;
+
+    /**
+     * Aborts the ongoing statement by destroying all temporary disk 
components generated by the statement and
+     * resetting the memory components
+     *
+     * @throws HyracksDataException
+     */
+    void abort() throws HyracksDataException;
+
+    ILSMMergePolicy getMergePolicy();
+
     void deactivate(boolean flush) throws HyracksDataException;
 
     @Override
@@ -106,6 +127,8 @@ public interface ILSMIndex extends IIndex {
 
     void addDiskComponent(ILSMDiskComponent index) throws HyracksDataException;
 
+    void addBulkLoadedDiskComponent(ILSMDiskComponent c) throws 
HyracksDataException;
+
     void subsumeMergedComponents(ILSMDiskComponent newComponent, 
List<ILSMComponent> mergedComponents)
             throws HyracksDataException;
 
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LSMIndexInsertUpdateDeleteOperatorNodePushable.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LSMIndexInsertUpdateDeleteOperatorNodePushable.java
index c64e1e1667..2ec59d600d 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LSMIndexInsertUpdateDeleteOperatorNodePushable.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LSMIndexInsertUpdateDeleteOperatorNodePushable.java
@@ -19,11 +19,14 @@
 package org.apache.hyracks.storage.am.lsm.common.dataflow;
 
 import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
 
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.value.ITuplePartitionerFactory;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.util.HyracksConstants;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
 import 
org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
@@ -31,8 +34,11 @@ import 
org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
 import 
org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
 import 
org.apache.hyracks.storage.am.common.dataflow.IndexInsertUpdateDeleteOperatorNodePushable;
 import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexFrameWriter;
+import org.apache.hyracks.storage.common.IIndex;
+import org.apache.hyracks.storage.common.IIndexAccessor;
 
 public class LSMIndexInsertUpdateDeleteOperatorNodePushable extends 
IndexInsertUpdateDeleteOperatorNodePushable
         implements ILSMIndexFrameWriter {
@@ -134,4 +140,12 @@ public class 
LSMIndexInsertUpdateDeleteOperatorNodePushable extends IndexInsertU
         }
         appender.write(writer, true);
     }
+
+    protected void setAtomicOpContextIfAtomic(IIndex index, IIndexAccessor 
accessor) {
+        if (((ILSMIndex) index).isAtomic()) {
+            Map<String, Object> indexAccessorOpContextParameters = new 
HashMap<>();
+            
indexAccessorOpContextParameters.put(HyracksConstants.ATOMIC_OP_CONTEXT, true);
+            ((ILSMIndexAccessor) 
accessor).getOpContext().setParameters(indexAccessorOpContextParameters);
+        }
+    }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
index 2745b44b3a..b10dd5c08f 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
@@ -39,6 +39,7 @@ import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.api.io.IIOManager;
 import 
org.apache.hyracks.api.replication.IReplicationJob.ReplicationExecutionType;
 import org.apache.hyracks.api.replication.IReplicationJob.ReplicationOperation;
+import org.apache.hyracks.api.util.HyracksConstants;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
 import org.apache.hyracks.storage.am.common.impls.AbstractSearchPredicate;
 import org.apache.hyracks.storage.am.common.impls.NoOpIndexAccessParameters;
@@ -112,6 +113,9 @@ public abstract class AbstractLSMIndex implements ILSMIndex 
{
     protected final ILSMDiskComponentFactory bulkLoadComponentFactory;
     protected final ILSMPageWriteCallbackFactory pageWriteCallbackFactory;
     private int numScheduledFlushes = 0;
+    private final boolean atomic;
+    private final List<ILSMDiskComponent> temporaryDiskComponents;
+    private final ILSMMergePolicy mergePolicy;
 
     public AbstractLSMIndex(IIOManager ioManager, List<IVirtualBufferCache> 
virtualBufferCaches,
             IBufferCache diskBufferCache, ILSMIndexFileManager fileManager, 
double bloomFilterFalsePositiveRate,
@@ -119,8 +123,8 @@ public abstract class AbstractLSMIndex implements ILSMIndex 
{
             ILSMIOOperationCallbackFactory ioOpCallbackFactory, 
ILSMPageWriteCallbackFactory pageWriteCallbackFactory,
             ILSMDiskComponentFactory componentFactory, 
ILSMDiskComponentFactory bulkLoadComponentFactory,
             ILSMComponentFilterFrameFactory filterFrameFactory, 
LSMComponentFilterManager filterManager,
-            int[] filterFields, boolean durable, IComponentFilterHelper 
filterHelper, int[] treeFields, ITracer tracer)
-            throws HyracksDataException {
+            int[] filterFields, boolean durable, IComponentFilterHelper 
filterHelper, int[] treeFields, ITracer tracer,
+            boolean atomic) throws HyracksDataException {
         this.ioManager = ioManager;
         this.virtualBufferCaches = virtualBufferCaches;
         this.diskBufferCache = diskBufferCache;
@@ -139,6 +143,10 @@ public abstract class AbstractLSMIndex implements 
ILSMIndex {
         this.inactiveMemoryComponents = new ArrayList<>();
         this.durable = durable;
         this.tracer = tracer;
+        this.atomic = atomic;
+        this.temporaryDiskComponents = new ArrayList<>();
+        this.mergePolicy = mergePolicy;
+
         fileManager.initLastUsedSeq(ioOpCallback.getLastValidSequence());
         lsmHarness = new LSMHarness(this, ioScheduler, mergePolicy, opTracker, 
diskBufferCache.isReplicationEnabled(),
                 tracer);
@@ -152,6 +160,25 @@ public abstract class AbstractLSMIndex implements 
ILSMIndex {
         }
     }
 
+    public AbstractLSMIndex(IIOManager ioManager, List<IVirtualBufferCache> 
virtualBufferCaches,
+            IBufferCache diskBufferCache, ILSMIndexFileManager fileManager, 
double bloomFilterFalsePositiveRate,
+            ILSMMergePolicy mergePolicy, ILSMOperationTracker opTracker, 
ILSMIOOperationScheduler ioScheduler,
+            ILSMIOOperationCallbackFactory ioOpCallbackFactory, 
ILSMPageWriteCallbackFactory pageWriteCallbackFactory,
+            ILSMDiskComponentFactory componentFactory, 
ILSMDiskComponentFactory bulkLoadComponentFactory,
+            ILSMComponentFilterFrameFactory filterFrameFactory, 
LSMComponentFilterManager filterManager,
+            int[] filterFields, boolean durable, IComponentFilterHelper 
filterHelper, int[] treeFields, ITracer tracer)
+            throws HyracksDataException {
+        this(ioManager, virtualBufferCaches, diskBufferCache, fileManager, 
bloomFilterFalsePositiveRate, mergePolicy,
+                opTracker, ioScheduler, ioOpCallbackFactory, 
pageWriteCallbackFactory, componentFactory,
+                bulkLoadComponentFactory, filterFrameFactory, filterManager, 
filterFields, durable, filterHelper,
+                treeFields, tracer, false);
+    }
+
+    @Override
+    public boolean isAtomic() {
+        return atomic;
+    }
+
     @Override
     public synchronized void create() throws HyracksDataException {
         if (isActive) {
@@ -223,6 +250,9 @@ public abstract class AbstractLSMIndex implements ILSMIndex 
{
         for (ILSMDiskComponent c : diskComponents) {
             c.deactivateAndPurge();
         }
+        for (ILSMDiskComponent c : temporaryDiskComponents) {
+            c.deactivateAndPurge();
+        }
     }
 
     private void deallocateMemoryComponents() throws HyracksDataException {
@@ -247,6 +277,9 @@ public abstract class AbstractLSMIndex implements ILSMIndex 
{
         for (ILSMDiskComponent c : diskComponents) {
             c.destroy();
         }
+        for (ILSMDiskComponent c : temporaryDiskComponents) {
+            c.destroy();
+        }
     }
 
     @Override
@@ -265,6 +298,15 @@ public abstract class AbstractLSMIndex implements 
ILSMIndex {
         diskComponents.clear();
     }
 
+    @Override
+    public void abort() throws HyracksDataException {
+        resetMemoryComponents();
+        for (ILSMDiskComponent c : temporaryDiskComponents) {
+            c.deactivateAndDestroy();
+        }
+        temporaryDiskComponents.clear();
+    }
+
     private void resetMemoryComponents() throws HyracksDataException {
         if (memoryComponentsAllocated && memoryComponents != null) {
             for (ILSMMemoryComponent c : memoryComponents) {
@@ -299,7 +341,9 @@ public abstract class AbstractLSMIndex implements ILSMIndex 
{
                 operationalComponents.addAll(diskComponents);
                 break;
             case SEARCH:
-                if (memoryComponentsAllocated) {
+                // search should include memory components for datasets with 
atomic statements not enabled or search to
+                // check duplicate key while inserts/upserts on datasets with 
atomic statements enabled
+                if (memoryComponentsAllocated && (!atomic || 
isAtomicOpContext(ctx))) {
                     addOperationalMemoryComponents(operationalComponents, 
false);
                 }
                 if (filterManager != null) {
@@ -315,6 +359,9 @@ public abstract class AbstractLSMIndex implements ILSMIndex 
{
                 } else {
                     operationalComponents.addAll(diskComponents);
                 }
+                if (isAtomicOpContext(ctx)) {
+                    operationalComponents.addAll(temporaryDiskComponents);
+                }
 
                 break;
             case REPLICATE:
@@ -328,6 +375,11 @@ public abstract class AbstractLSMIndex implements 
ILSMIndex {
         }
     }
 
+    private boolean isAtomicOpContext(ILSMIndexOperationContext ctx) {
+        Map<String, Object> ctxParameters = ctx.getParameters();
+        return ctxParameters != null && (boolean) 
ctxParameters.getOrDefault(HyracksConstants.ATOMIC_OP_CONTEXT, false);
+    }
+
     @Override
     public void scanDiskComponents(ILSMIndexOperationContext ctx, IIndexCursor 
cursor) throws HyracksDataException {
         throw 
HyracksDataException.create(ErrorCode.DISK_COMPONENT_SCAN_NOT_ALLOWED_FOR_SECONDARY_INDEX);
@@ -553,12 +605,35 @@ public abstract class AbstractLSMIndex implements 
ILSMIndex {
 
     @Override
     public void addDiskComponent(ILSMDiskComponent c) throws 
HyracksDataException {
+        if (c != EmptyComponent.INSTANCE) {
+            if (atomic) {
+                temporaryDiskComponents.add(c);
+                LOGGER.debug("Adding new temporary disk component to index {}; 
current count: {}", c,
+                        temporaryDiskComponents.size());
+            } else {
+                diskComponents.add(0, c);
+            }
+        }
+        validateComponentIds();
+    }
+
+    @Override
+    public void addBulkLoadedDiskComponent(ILSMDiskComponent c) throws 
HyracksDataException {
         if (c != EmptyComponent.INSTANCE) {
             diskComponents.add(0, c);
         }
         validateComponentIds();
     }
 
+    @Override
+    public void commit() throws HyracksDataException {
+        for (ILSMDiskComponent c : temporaryDiskComponents) {
+            diskComponents.add(0, c);
+        }
+        temporaryDiskComponents.clear();
+        validateComponentIds();
+    }
+
     @Override
     public void subsumeMergedComponents(ILSMDiskComponent newComponent, 
List<ILSMComponent> mergedComponents)
             throws HyracksDataException {
@@ -879,4 +954,9 @@ public abstract class AbstractLSMIndex implements ILSMIndex 
{
         return pageWriteCallbackFactory;
     }
 
+    @Override
+    public ILSMMergePolicy getMergePolicy() {
+        return mergePolicy;
+    }
+
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
index e2c9fabb48..9fcce8b361 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
@@ -190,7 +190,9 @@ public class LSMHarness implements ILSMHarness {
                     if (opType == LSMOperationType.FLUSH) {
                         opTracker.notifyAll();
                         if (!failedOperation) {
-                            waitForLaggingMerge();
+                            if (!lsmIndex.isAtomic()) {
+                                waitForLaggingMerge();
+                            }
                         }
                     } else if (opType == LSMOperationType.MERGE) {
                         opTracker.notifyAll();
@@ -299,7 +301,9 @@ public class LSMHarness implements ILSMHarness {
                         componentsToBeReplicated.add(newComponent);
                         triggerReplication(componentsToBeReplicated, opType);
                     }
-                    mergePolicy.diskComponentAdded(lsmIndex, false);
+                    if (!lsmIndex.isAtomic()) {
+                        mergePolicy.diskComponentAdded(lsmIndex, false);
+                    }
                 }
                 break;
             case MERGE:
@@ -639,7 +643,7 @@ public class LSMHarness implements ILSMHarness {
             throw HyracksDataException.create(ioOperation.getFailure());
         }
         synchronized (opTracker) {
-            lsmIndex.addDiskComponent(c);
+            lsmIndex.addBulkLoadedDiskComponent(c);
             if (replicationEnabled) {
                 componentsToBeReplicated.clear();
                 componentsToBeReplicated.add(c);
diff --git 
a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtree.java
 
b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtree.java
index 2d0f079120..94d344742c 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtree.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtree.java
@@ -95,7 +95,7 @@ public class TestLsmBtree extends LSMBTree {
                 diskBufferCache, fileManager, componentFactory, 
bulkLoadComponentFactory, filterHelper,
                 filterFrameFactory, filterManager, 
bloomFilterFalsePositiveRate, fieldCount, cmpFactories, mergePolicy,
                 opTracker, ioScheduler, ioOperationCallbackFactory, 
pageWriteCallbackFactory, needKeyDupCheck,
-                hasBloomFilter, btreeFields, filterFields, durable, 
updateAware, tracer);
+                hasBloomFilter, btreeFields, filterFields, durable, 
updateAware, tracer, false);
         addModifyCallback(AllowTestOpCallback.INSTANCE);
         addSearchCallback(AllowTestOpCallback.INSTANCE);
         addFlushCallback(AllowTestOpCallback.INSTANCE);
diff --git 
a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtreeLocalResource.java
 
b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtreeLocalResource.java
index 039b244437..04fa11f1f0 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtreeLocalResource.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtreeLocalResource.java
@@ -63,14 +63,14 @@ public class TestLsmBtreeLocalResource extends 
LSMBTreeLocalResource {
                 btreeFields, filterFields, opTrackerProvider, 
ioOpCallbackFactory, pageWriteCallbackFactory,
                 metadataPageManagerFactory, vbcProvider, ioSchedulerProvider, 
durable,
                 NoOpCompressorDecompressorFactory.INSTANCE, hasBloomFilter, 
null, null,
-                isSecondaryNoIncrementalMaintenance);
+                isSecondaryNoIncrementalMaintenance, false);
     }
 
     protected TestLsmBtreeLocalResource(IPersistedResourceRegistry registry, 
JsonNode json, int[] bloomFilterKeyFields,
             double bloomFilterFalsePositiveRate, boolean isPrimary, int[] 
btreeFields, boolean hasBloomFilter,
             boolean isSecondaryNoIncrementalMaintenance) throws 
HyracksDataException {
         super(registry, json, bloomFilterKeyFields, 
bloomFilterFalsePositiveRate, isPrimary, btreeFields,
-                NoOpCompressorDecompressorFactory.INSTANCE, hasBloomFilter, 
isSecondaryNoIncrementalMaintenance);
+                NoOpCompressorDecompressorFactory.INSTANCE, hasBloomFilter, 
isSecondaryNoIncrementalMaintenance, false);
     }
 
     @Override
diff --git 
a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtreeLocalResourceFactory.java
 
b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtreeLocalResourceFactory.java
index 48adf91826..54b786ba91 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtreeLocalResourceFactory.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtreeLocalResourceFactory.java
@@ -53,7 +53,7 @@ public class TestLsmBtreeLocalResourceFactory extends 
LSMBTreeLocalResourceFacto
                 vbcProvider, ioSchedulerProvider, mergePolicyFactory, 
mergePolicyProperties, durable,
                 bloomFilterKeyFields, bloomFilterFalsePositiveRate, isPrimary, 
btreeFields,
                 NoOpCompressorDecompressorFactory.INSTANCE, hasBloomFilter, 
null, null,
-                isSecondaryNoIncrementalMaintenance);
+                isSecondaryNoIncrementalMaintenance, false);
     }
 
     @Override

Reply via email to