Repository: asterixdb
Updated Branches:
  refs/heads/master 893d385f7 -> 5070d633e


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5070d633/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexSearchOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexSearchOperationCallbackFactory.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexSearchOperationCallbackFactory.java
index 0b96164..972668a 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexSearchOperationCallbackFactory.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexSearchOperationCallbackFactory.java
@@ -32,6 +32,6 @@ public class SecondaryIndexSearchOperationCallbackFactory 
implements ISearchOper
     @Override
     public ISearchOperationCallback createSearchOperationCallback(long 
resourceId, IHyracksTaskContext ctx, IOperatorNodePushable operatorNodePushable)
             throws HyracksDataException {
-        return new SecondaryIndexSearchOperationCallback();
+        return new SecondaryIndexSearchOperationCallback(resourceId);
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5070d633/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java
index 79ce788..735d7ea 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java
@@ -23,7 +23,6 @@ import org.apache.asterix.common.api.IJobEventListenerFactory;
 import org.apache.asterix.common.context.ITransactionSubsystemProvider;
 import org.apache.asterix.common.dataflow.DatasetLocalResource;
 import org.apache.asterix.common.exceptions.ACIDException;
-import org.apache.asterix.common.transactions.AbstractOperationCallback;
 import org.apache.asterix.common.transactions.AbstractOperationCallbackFactory;
 import org.apache.asterix.common.transactions.DatasetId;
 import org.apache.asterix.common.transactions.ITransactionContext;
@@ -67,12 +66,12 @@ public class 
TempDatasetPrimaryIndexModificationOperationCallbackFactory extends
         try {
             IJobletEventListenerFactory fact = 
ctx.getJobletContext().getJobletEventListenerFactory();
             ITransactionContext txnCtx = txnSubsystem.getTransactionManager()
-                    .getTransactionContext(((IJobEventListenerFactory) 
fact).getTxnId(txnId), false);
+                    .getTransactionContext(((IJobEventListenerFactory) 
fact).getTxnId(txnId));
             DatasetLocalResource aResource = (DatasetLocalResource) 
resource.getResource();
             IModificationOperationCallback modCallback = new 
TempDatasetIndexModificationOperationCallback(
                     new DatasetId(datasetId), primaryKeyFields, txnCtx, 
txnSubsystem.getLockManager(), txnSubsystem,
                     resource.getId(), aResource.getPartition(), resourceType, 
indexOp);
-            txnCtx.registerIndexAndCallback(resource.getId(), index, 
(AbstractOperationCallback) modCallback, true);
+            txnCtx.register(resource.getId(), index, modCallback, true);
             return modCallback;
         } catch (ACIDException e) {
             throw new HyracksDataException(e);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5070d633/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java
index 8a27914..b744606 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java
@@ -23,7 +23,6 @@ import org.apache.asterix.common.api.IJobEventListenerFactory;
 import org.apache.asterix.common.context.ITransactionSubsystemProvider;
 import org.apache.asterix.common.dataflow.DatasetLocalResource;
 import org.apache.asterix.common.exceptions.ACIDException;
-import org.apache.asterix.common.transactions.AbstractOperationCallback;
 import org.apache.asterix.common.transactions.AbstractOperationCallbackFactory;
 import org.apache.asterix.common.transactions.DatasetId;
 import org.apache.asterix.common.transactions.ITransactionContext;
@@ -69,11 +68,11 @@ public class 
TempDatasetSecondaryIndexModificationOperationCallbackFactory exten
         try {
             IJobletEventListenerFactory fact = 
ctx.getJobletContext().getJobletEventListenerFactory();
             ITransactionContext txnCtx = txnSubsystem.getTransactionManager()
-                    .getTransactionContext(((IJobEventListenerFactory) 
fact).getTxnId(txnId), false);
+                    .getTransactionContext(((IJobEventListenerFactory) 
fact).getTxnId(txnId));
             IModificationOperationCallback modCallback = new 
TempDatasetIndexModificationOperationCallback(
                     new DatasetId(datasetId), primaryKeyFields, txnCtx, 
txnSubsystem.getLockManager(), txnSubsystem,
                     resource.getId(), aResource.getPartition(), resourceType, 
indexOp);
-            txnCtx.registerIndexAndCallback(resource.getId(), index, 
(AbstractOperationCallback) modCallback, false);
+            txnCtx.register(resource.getId(), index, modCallback, false);
             return modCallback;
         } catch (ACIDException e) {
             throw new HyracksDataException(e);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5070d633/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java
index dfd3eb1..da4aab8 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java
@@ -22,7 +22,6 @@ import org.apache.asterix.common.api.IJobEventListenerFactory;
 import org.apache.asterix.common.context.ITransactionSubsystemProvider;
 import org.apache.asterix.common.dataflow.DatasetLocalResource;
 import org.apache.asterix.common.exceptions.ACIDException;
-import org.apache.asterix.common.transactions.AbstractOperationCallback;
 import org.apache.asterix.common.transactions.AbstractOperationCallbackFactory;
 import org.apache.asterix.common.transactions.DatasetId;
 import org.apache.asterix.common.transactions.ITransactionContext;
@@ -66,11 +65,11 @@ public class UpsertOperationCallbackFactory extends 
AbstractOperationCallbackFac
         try {
             IJobletEventListenerFactory fact = 
ctx.getJobletContext().getJobletEventListenerFactory();
             ITransactionContext txnCtx = txnSubsystem.getTransactionManager()
-                    .getTransactionContext(((IJobEventListenerFactory) 
fact).getTxnId(txnId), false);
+                    .getTransactionContext(((IJobEventListenerFactory) 
fact).getTxnId(txnId));
             IModificationOperationCallback modCallback = new 
UpsertOperationCallback(new DatasetId(datasetId),
                     primaryKeyFields, txnCtx, txnSubsystem.getLockManager(), 
txnSubsystem, resource.getId(),
                     aResource.getPartition(), resourceType, indexOp);
-            txnCtx.registerIndexAndCallback(resource.getId(), index, 
(AbstractOperationCallback) modCallback, true);
+            txnCtx.register(resource.getId(), index, modCallback, true);
             return modCallback;
         } catch (ACIDException e) {
             throw new HyracksDataException(e);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5070d633/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java
index fe758e1..672e881 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java
@@ -83,7 +83,7 @@ public class CommitRuntime extends 
AbstractOneInputOneOutputOneFramePushRuntime
     @Override
     public void open() throws HyracksDataException {
         try {
-            transactionContext = 
transactionManager.getTransactionContext(txnId, false);
+            transactionContext = 
transactionManager.getTransactionContext(txnId);
             transactionContext.setWriteTxn(isWriteTransaction);
             ILogMarkerCallback callback = 
TaskUtil.get(ILogMarkerCallback.KEY_MARKER_CALLBACK, ctx);
             logRecord = new LogRecord(callback);
@@ -111,9 +111,7 @@ public class CommitRuntime extends 
AbstractOneInputOneOutputOneFramePushRuntime
                  * active operation count of PrimaryIndexOptracker. By 
maintaining the count correctly and only allowing
                  * flushing when the count is 0, it can guarantee the no-steal 
policy for temporary datasets, too.
                  */
-                // TODO: Fix this for upserts. an upsert tuple right now 
expect to notify the opTracker twice (one for
-                // delete and one for insert)
-                transactionContext.notifyOptracker(false);
+                transactionContext.notifyEntityCommitted();
             } else {
                 tRef.reset(tAccess, t);
                 try {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5070d633/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
index 3d78ad9..6ebf52c 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
@@ -234,27 +234,26 @@ public class LogBuffer implements ILogBuffer {
     private void batchUnlock(int beginOffset, int endOffset) throws 
ACIDException {
         if (endOffset > beginOffset) {
             logBufferTailReader.initializeScan(beginOffset, endOffset);
-
             ITransactionContext txnCtx;
-
             LogRecord logRecord = logBufferTailReader.next();
             while (logRecord != null) {
                 if (logRecord.getLogSource() == LogSource.LOCAL) {
                     if (logRecord.getLogType() == LogType.ENTITY_COMMIT) {
                         reusableTxnId.setId(logRecord.getTxnId());
                         reusableDatasetId.setId(logRecord.getDatasetId());
-                        txnCtx = 
txnSubsystem.getTransactionManager().getTransactionContext(reusableTxnId, 
false);
+                        txnCtx = 
txnSubsystem.getTransactionManager().getTransactionContext(reusableTxnId);
                         
txnSubsystem.getLockManager().unlock(reusableDatasetId, 
logRecord.getPKHashValue(),
                                 LockMode.ANY, txnCtx);
-                        txnCtx.notifyOptracker(false);
+                        txnCtx.notifyEntityCommitted();
                         if 
(txnSubsystem.getTransactionProperties().isCommitProfilerEnabled()) {
                             txnSubsystem.incrementEntityCommitCount();
                         }
+                    } else if (logRecord.getLogType() == LogType.UPDATE) {
+                        reusableTxnId.setId(logRecord.getTxnId());
+                        txnCtx = 
txnSubsystem.getTransactionManager().getTransactionContext(reusableTxnId);
+                        
txnCtx.notifyUpdateCommitted(logRecord.getResourceId());
                     } else if (logRecord.getLogType() == LogType.JOB_COMMIT
                             || logRecord.getLogType() == LogType.ABORT) {
-                        reusableTxnId.setId(logRecord.getTxnId());
-                        txnCtx = 
txnSubsystem.getTransactionManager().getTransactionContext(reusableTxnId, 
false);
-                        txnCtx.notifyOptracker(true);
                         notifyJobTermination();
                     } else if (logRecord.getLogType() == LogType.FLUSH) {
                         notifyFlushTermination();
@@ -266,7 +265,6 @@ public class LogBuffer implements ILogBuffer {
                         notifyReplicationTermination();
                     }
                 }
-
                 logRecord = logBufferTailReader.next();
             }
         }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5070d633/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AbstractTransactionContext.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AbstractTransactionContext.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AbstractTransactionContext.java
new file mode 100644
index 0000000..43fe266
--- /dev/null
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AbstractTransactionContext.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.transaction.management.service.transaction;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.asterix.common.context.ITransactionOperationTracker;
+import org.apache.asterix.common.transactions.ITransactionContext;
+import org.apache.asterix.common.transactions.ITransactionManager;
+import org.apache.asterix.common.transactions.TxnId;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import org.apache.hyracks.storage.common.IModificationOperationCallback;
+import org.apache.hyracks.util.annotations.ThreadSafe;
+
+@ThreadSafe
+public abstract class AbstractTransactionContext implements 
ITransactionContext {
+
+    protected final TxnId txnId;
+    protected final Map<Long, ITransactionOperationTracker> txnOpTrackers;
+    private final AtomicLong firstLSN;
+    private final AtomicLong lastLSN;
+    private final AtomicInteger txnState;
+    private final AtomicBoolean isWriteTxn;
+    private boolean isTimeout = false;
+
+    protected AbstractTransactionContext(TxnId txnId) {
+        this.txnId = txnId;
+        firstLSN = new AtomicLong(-1);
+        lastLSN = new AtomicLong(-1);
+        txnState = new AtomicInteger(ITransactionManager.ACTIVE);
+        isTimeout = false;
+        isWriteTxn = new AtomicBoolean();
+        txnOpTrackers = new HashMap<>();
+    }
+
+    @Override
+    public long getFirstLSN() {
+        return firstLSN.get();
+    }
+
+    @Override
+    public void setLastLSN(long newValue) {
+        firstLSN.compareAndSet(-1, newValue);
+        lastLSN.set(Math.max(lastLSN.get(), newValue));
+    }
+
+    @Override
+    public void setTxnState(int txnState) {
+        this.txnState.set(txnState);
+    }
+
+    @Override
+    public int getTxnState() {
+        return txnState.get();
+    }
+
+    @Override
+    public TxnId getTxnId() {
+        return txnId;
+    }
+
+    @Override
+    public synchronized void setTimeout(boolean isTimeout) {
+        this.isTimeout = isTimeout;
+    }
+
+    @Override
+    public synchronized boolean isTimeout() {
+        return isTimeout;
+    }
+
+    @Override
+    public void setWriteTxn(boolean isWriteTxn) {
+        this.isWriteTxn.set(isWriteTxn);
+    }
+
+    @Override
+    public boolean isWriteTxn() {
+        return isWriteTxn.get();
+    }
+
+    @Override
+    public long getLastLSN() {
+        return lastLSN.get();
+    }
+
+    @Override
+    public void complete() {
+        try {
+            if (txnState.get() == ITransactionManager.ABORTED) {
+                cleanupForAbort();
+            }
+        } finally {
+            synchronized (txnOpTrackers) {
+                txnOpTrackers.forEach((resource, opTracker) -> 
opTracker.afterTransaction(resource));
+            }
+        }
+    }
+
+    @Override
+    public void register(long resourceId, ILSMIndex index, 
IModificationOperationCallback callback,
+            boolean primaryIndex) {
+        synchronized (txnOpTrackers) {
+            if (!txnOpTrackers.containsKey(resourceId)) {
+                final ITransactionOperationTracker txnOpTracker =
+                        (ITransactionOperationTracker) 
index.getOperationTracker();
+                txnOpTrackers.put(resourceId, txnOpTracker);
+                txnOpTracker.beforeTransaction(resourceId);
+            }
+        }
+    }
+
+    public String prettyPrint() {
+        StringBuilder sb = new StringBuilder();
+        sb.append("\n" + txnId + "\n");
+        sb.append("isWriteTxn: " + isWriteTxn + "\n");
+        sb.append("firstLSN: " + firstLSN.get() + "\n");
+        sb.append("lastLSN: " + lastLSN.get() + "\n");
+        sb.append("TransactionState: " + txnState + "\n");
+        sb.append("isTimeout: " + isTimeout + "\n");
+        return sb.toString();
+    }
+
+    protected abstract void cleanupForAbort();
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5070d633/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicTransactionContext.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicTransactionContext.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicTransactionContext.java
new file mode 100644
index 0000000..1d132a8
--- /dev/null
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicTransactionContext.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.transaction.management.service.transaction;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.asterix.common.context.PrimaryIndexOperationTracker;
+import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.asterix.common.transactions.TxnId;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
+import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
+import org.apache.hyracks.storage.common.IModificationOperationCallback;
+import org.apache.hyracks.util.annotations.ThreadSafe;
+
+@ThreadSafe
+public class AtomicTransactionContext extends AbstractTransactionContext {
+
+    private final Map<Long, ILSMOperationTracker> opTrackers = new HashMap<>();
+    private final Map<Long, AtomicInteger> indexPendingOps = new HashMap<>();
+    private final Map<Long, IModificationOperationCallback> callbacks = new 
HashMap<>();
+
+    public AtomicTransactionContext(TxnId txnId) {
+        super(txnId);
+    }
+
+    @Override
+    public void register(long resourceId, ILSMIndex index, 
IModificationOperationCallback callback,
+            boolean primaryIndex) {
+        super.register(resourceId, index, callback, primaryIndex);
+        synchronized (txnOpTrackers) {
+            if (primaryIndex && !opTrackers.containsKey(resourceId)) {
+                opTrackers.put(resourceId, index.getOperationTracker());
+                callbacks.put(resourceId, callback);
+                indexPendingOps.put(resourceId, new AtomicInteger(0));
+            }
+        }
+    }
+
+    @Override
+    public void notifyUpdateCommitted(long resourceId) {
+        try {
+            opTrackers.get(resourceId).completeOperation(null, 
LSMOperationType.MODIFICATION, null,
+                    callbacks.get(resourceId));
+        } catch (HyracksDataException e) {
+            throw new ACIDException(e);
+        }
+    }
+
+    @Override
+    public void notifyEntityCommitted() {
+        throw new IllegalStateException("Unexpected entity commit in atomic 
transaction");
+    }
+
+    @Override
+    public void beforeOperation(long resourceId) {
+        indexPendingOps.get(resourceId).incrementAndGet();
+    }
+
+    @Override
+    public void afterOperation(long resourceId) {
+        indexPendingOps.get(resourceId).decrementAndGet();
+    }
+
+    @Override
+    public void cleanupForAbort() {
+        // each opTracker should be cleaned
+        opTrackers.forEach((resId, opTracker) -> 
((PrimaryIndexOperationTracker) opTracker)
+                
.cleanupNumActiveOperationsForAbortedJob(indexPendingOps.get(resId).get()));
+    }
+
+    @Override
+    public int hashCode() {
+        return Long.hashCode(txnId.getId());
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        AtomicTransactionContext that = (AtomicTransactionContext) o;
+        return this.txnId.equals(that.txnId);
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5070d633/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/EntityLevelTransactionContext.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/EntityLevelTransactionContext.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/EntityLevelTransactionContext.java
new file mode 100644
index 0000000..e195451
--- /dev/null
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/EntityLevelTransactionContext.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.transaction.management.service.transaction;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.asterix.common.context.PrimaryIndexOperationTracker;
+import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.asterix.common.transactions.TxnId;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
+import org.apache.hyracks.storage.common.IModificationOperationCallback;
+import org.apache.hyracks.util.annotations.ThreadSafe;
+
+@ThreadSafe
+public class EntityLevelTransactionContext extends AbstractTransactionContext {
+
+    private PrimaryIndexOperationTracker primaryIndexOpTracker;
+    private IModificationOperationCallback primaryIndexCallback;
+    private final AtomicInteger pendingOps;
+
+    public EntityLevelTransactionContext(TxnId txnId) {
+        super(txnId);
+        pendingOps = new AtomicInteger(0);
+    }
+
+    @Override
+    public void register(long resourceId, ILSMIndex index, 
IModificationOperationCallback callback,
+            boolean primaryIndex) {
+        super.register(resourceId, index, callback, primaryIndex);
+        synchronized (txnOpTrackers) {
+            if (primaryIndex && primaryIndexOpTracker == null) {
+                primaryIndexCallback = callback;
+                primaryIndexOpTracker = (PrimaryIndexOperationTracker) 
index.getOperationTracker();
+            }
+        }
+    }
+
+    @Override
+    public void beforeOperation(long resourceId) {
+        pendingOps.incrementAndGet();
+    }
+
+    @Override
+    public void notifyUpdateCommitted(long resourceId) {
+        // no op
+    }
+
+    @Override
+    public void notifyEntityCommitted() {
+        try {
+            primaryIndexOpTracker.completeOperation(null, 
LSMOperationType.MODIFICATION, null, primaryIndexCallback);
+        } catch (HyracksDataException e) {
+            throw new ACIDException(e);
+        }
+    }
+
+    @Override
+    public void afterOperation(long resourceId) {
+        pendingOps.decrementAndGet();
+    }
+
+    @Override
+    protected void cleanupForAbort() {
+        if (primaryIndexOpTracker != null) {
+            
primaryIndexOpTracker.cleanupNumActiveOperationsForAbortedJob(pendingOps.get());
+        }
+    }
+
+    @Override
+    public int hashCode() {
+        return Long.hashCode(txnId.getId());
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        EntityLevelTransactionContext that = (EntityLevelTransactionContext) o;
+        return this.txnId.equals(that.txnId);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5070d633/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/FieldsHashValueGenerator.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/FieldsHashValueGenerator.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/FieldsHashValueGenerator.java
deleted file mode 100644
index c408f1d..0000000
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/FieldsHashValueGenerator.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.asterix.transaction.management.service.transaction;
-
-import org.apache.hyracks.api.dataflow.value.IBinaryHashFunction;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
-
-public class FieldsHashValueGenerator {
-    public static int computeFieldsHashValue(ITupleReference tuple, int[] 
fieldIndexes,
-            IBinaryHashFunction[] fieldHashFunctions) throws 
HyracksDataException {
-        int h = 0;
-        for (int i = 0; i < fieldIndexes.length; i++) {
-            int primaryKeyFieldIdx = fieldIndexes[i];
-            int fh = 
fieldHashFunctions[i].hash(tuple.getFieldData(primaryKeyFieldIdx),
-                    tuple.getFieldStart(primaryKeyFieldIdx), 
tuple.getFieldLength(primaryKeyFieldIdx));
-            h = h * 31 + fh;
-            if (h < 0) {
-                h = h * (-1);
-            }
-        }
-        return h;
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5070d633/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/MutableResourceId.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/MutableResourceId.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/MutableResourceId.java
deleted file mode 100644
index c8e134c..0000000
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/MutableResourceId.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.transaction.management.service.transaction;
-
-public class MutableResourceId {
-    long id;
-
-    public MutableResourceId(long id) {
-        this.id = id;
-    }
-
-    public void setId(long id) {
-        this.id = id;
-    }
-
-    public long getId() {
-        return id;
-    }
-
-    @Override
-    public int hashCode() {
-        return (int) id;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if ((o == null) || !(o instanceof MutableResourceId)) {
-            return false;
-        }
-        return ((MutableResourceId) o).id == this.id;
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5070d633/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionContext.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionContext.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionContext.java
deleted file mode 100644
index 1681a27..0000000
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionContext.java
+++ /dev/null
@@ -1,271 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.transaction.management.service.transaction;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.asterix.common.context.ITransactionOperationTracker;
-import org.apache.asterix.common.context.PrimaryIndexOperationTracker;
-import org.apache.asterix.common.exceptions.ACIDException;
-import org.apache.asterix.common.transactions.AbstractOperationCallback;
-import org.apache.asterix.common.transactions.ITransactionContext;
-import org.apache.asterix.common.transactions.ITransactionManager;
-import org.apache.asterix.common.transactions.LogRecord;
-import org.apache.asterix.common.transactions.TxnId;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
-import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
-import org.apache.hyracks.storage.common.IModificationOperationCallback;
-
-/*
- * An object of TransactionContext is created and accessed(read/written) by 
multiple threads which work for
- * a single job identified by a txnId. Thus, the member variables in the 
object can be read/written
- * concurrently. Please see each variable declaration to know which one is 
accessed concurrently and
- * which one is not.
- */
-public class TransactionContext implements ITransactionContext {
-
-    private static final long serialVersionUID = -6105616785783310111L;
-
-    // txnId is set once and read concurrently.
-    private final TxnId txnId;
-
-    // There are no concurrent writers on both firstLSN and lastLSN
-    // since both values are updated by serialized log appenders.
-    // But readers and writers can be different threads,
-    // so both LSNs are atomic variables in order to be read and written
-    // atomically.
-    private final AtomicLong firstLSN;
-    private final AtomicLong lastLSN;
-
-    // txnState is read and written concurrently.
-    private final AtomicInteger txnState;
-
-    // isTimeout is read and written under the lockMgr's tableLatch
-    // Thus, no other synchronization is required separately.
-    private boolean isTimeout;
-
-    // isWriteTxn can be set concurrently by multiple threads.
-    private final AtomicBoolean isWriteTxn;
-
-    // isMetadataTxn is accessed by a single thread since the metadata is not
-    // partitioned
-    private boolean isMetadataTxn;
-
-    // indexMap is concurrently accessed by multiple threads,
-    // so those threads are synchronized on indexMap object itself
-    private final Map<Long, ITransactionOperationTracker> indexMap;
-
-    // TODO: fix ComponentLSNs' issues.
-    // primaryIndex, primaryIndexCallback, and primaryIndexOptracker will be
-    // modified accordingly
-    // when the issues of componentLSNs are fixed.
-    private ILSMIndex primaryIndex;
-    private AbstractOperationCallback primaryIndexCallback;
-    private PrimaryIndexOperationTracker primaryIndexOpTracker;
-
-    // The following three variables are used as temporary variables in order 
to
-    // avoid object creations.
-    // Those are used in synchronized methods.
-    private final LogRecord logRecord;
-
-    private final AtomicInteger transactorNumActiveOperations;
-
-    // TODO: implement transactionContext pool in order to avoid object
-    // creations.
-    // also, the pool can throttle the number of concurrent active jobs at 
every
-    // moment.
-    public TransactionContext(TxnId txnId) throws ACIDException {
-        this.txnId = txnId;
-        firstLSN = new AtomicLong(-1);
-        lastLSN = new AtomicLong(-1);
-        txnState = new AtomicInteger(ITransactionManager.ACTIVE);
-        isTimeout = false;
-        isWriteTxn = new AtomicBoolean(false);
-        isMetadataTxn = false;
-        indexMap = new HashMap<>();
-        primaryIndex = null;
-        logRecord = new LogRecord();
-        transactorNumActiveOperations = new AtomicInteger(0);
-    }
-
-    @Override
-    public void registerIndexAndCallback(long resourceId, ILSMIndex index, 
AbstractOperationCallback callback,
-            boolean isPrimaryIndex) {
-        synchronized (indexMap) {
-            if (isPrimaryIndex && primaryIndex == null) {
-                primaryIndex = index;
-                primaryIndexCallback = callback;
-                primaryIndexOpTracker = (PrimaryIndexOperationTracker) 
index.getOperationTracker();
-            }
-            if (!indexMap.containsKey(resourceId)) {
-                final ITransactionOperationTracker txnOpTracker =
-                        (ITransactionOperationTracker) 
index.getOperationTracker();
-                indexMap.put(resourceId, txnOpTracker);
-                txnOpTracker.beforeTransaction(resourceId);
-            }
-        }
-    }
-
-    public PrimaryIndexOperationTracker getPrimaryIndexOpTracker() {
-        synchronized (indexMap) {
-            return primaryIndexOpTracker;
-        }
-    }
-
-    // [Notice]
-    // This method is called sequentially by the LogAppender threads.
-    @Override
-    public void setLastLSN(long LSN) {
-        firstLSN.compareAndSet(-1, LSN);
-        lastLSN.set(Math.max(lastLSN.get(), LSN));
-    }
-
-    @Override
-    public void notifyOptracker(boolean isJobLevelCommit) {
-        try {
-            /**
-             * in case of transaction abort {@link 
TransactionContext#cleanupForAbort()} will
-             * clean the primaryIndexOpTracker state.
-             */
-            if (isJobLevelCommit && isMetadataTxn && txnState.get() != 
ITransactionManager.ABORTED) {
-                primaryIndexOpTracker.exclusiveJobCommitted();
-            } else if (!isJobLevelCommit) {
-                primaryIndexOpTracker.completeOperation(null, 
LSMOperationType.MODIFICATION, null,
-                        (IModificationOperationCallback) primaryIndexCallback);
-            }
-        } catch (HyracksDataException e) {
-            throw new IllegalStateException(e);
-        }
-    }
-
-    @Override
-    public void setWriteTxn(boolean isWriteTxn) {
-        this.isWriteTxn.set(isWriteTxn);
-    }
-
-    @Override
-    public boolean isWriteTxn() {
-        return isWriteTxn.get();
-    }
-
-    @Override
-    public long getFirstLSN() {
-        return firstLSN.get();
-    }
-
-    @Override
-    public long getLastLSN() {
-        return lastLSN.get();
-    }
-
-    @Override
-    public TxnId getTxnId() {
-        return txnId;
-    }
-
-    @Override
-    public void setTimeout(boolean isTimeout) {
-        this.isTimeout = isTimeout;
-    }
-
-    @Override
-    public boolean isTimeout() {
-        return isTimeout;
-    }
-
-    @Override
-    public void setTxnState(int txnState) {
-        this.txnState.set(txnState);
-    }
-
-    @Override
-    public int getTxnState() {
-        return txnState.get();
-    }
-
-    @Override
-    public int hashCode() {
-        return Long.hashCode(txnId.getId());
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        return (o == this);
-    }
-
-    @Override
-    public void setMetadataTransaction(boolean isMetadataTxn) {
-        this.isMetadataTxn = isMetadataTxn;
-    }
-
-    @Override
-    public boolean isMetadataTransaction() {
-        return isMetadataTxn;
-    }
-
-    @Override
-    public String prettyPrint() {
-        StringBuilder sb = new StringBuilder();
-        sb.append("\n" + txnId + "\n");
-        sb.append("isWriteTxn: " + isWriteTxn + "\n");
-        sb.append("firstLSN: " + firstLSN.get() + "\n");
-        sb.append("lastLSN: " + lastLSN.get() + "\n");
-        sb.append("TransactionState: " + txnState + "\n");
-        sb.append("isTimeout: " + isTimeout + "\n");
-        return sb.toString();
-    }
-
-    public LogRecord getLogRecord() {
-        return logRecord;
-    }
-
-    @Override
-    public void incrementNumActiveOperations() {
-        transactorNumActiveOperations.incrementAndGet();
-    }
-
-    @Override
-    public void decrementNumActiveOperations() {
-        transactorNumActiveOperations.decrementAndGet();
-    }
-
-    @Override
-    public void complete() {
-        try {
-            if (txnState.get() == ITransactionManager.ABORTED) {
-                cleanupForAbort();
-            }
-        } finally {
-            synchronized (indexMap) {
-                indexMap.forEach((resource, opTracker) -> 
opTracker.afterTransaction(resource));
-            }
-        }
-    }
-
-    private void cleanupForAbort() {
-        if (primaryIndexOpTracker != null) {
-            
primaryIndexOpTracker.cleanupNumActiveOperationsForAbortedJob(transactorNumActiveOperations.get());
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5070d633/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionContextFactory.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionContextFactory.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionContextFactory.java
new file mode 100644
index 0000000..4a465a4
--- /dev/null
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionContextFactory.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.transaction.management.service.transaction;
+
+import static 
org.apache.asterix.common.transactions.ITransactionManager.AtomicityLevel;
+
+import org.apache.asterix.common.transactions.ITransactionContext;
+import org.apache.asterix.common.transactions.TransactionOptions;
+import org.apache.asterix.common.transactions.TxnId;
+
+public class TransactionContextFactory {
+
+    private TransactionContextFactory() {
+    }
+
+    public static ITransactionContext create(TxnId txnId, TransactionOptions 
options) {
+        final AtomicityLevel atomicityLevel = options.getAtomicityLevel();
+        switch (atomicityLevel) {
+            case ATOMIC:
+                return new AtomicTransactionContext(txnId);
+            case ENTITY_LEVEL:
+                return new EntityLevelTransactionContext(txnId);
+            default:
+                throw new IllegalStateException("Unknown transaction context 
type: " + atomicityLevel);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5070d633/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java
index 1799ea1..c03369b 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java
@@ -18,6 +18,7 @@
  */
 package org.apache.asterix.transaction.management.service.transaction;
 
+import java.io.IOException;
 import java.io.OutputStream;
 import java.util.Map;
 import java.util.Set;
@@ -27,124 +28,93 @@ import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import org.apache.asterix.common.exceptions.ACIDException;
-import org.apache.asterix.common.transactions.DatasetId;
 import org.apache.asterix.common.transactions.ITransactionContext;
 import org.apache.asterix.common.transactions.ITransactionManager;
 import org.apache.asterix.common.transactions.ITransactionSubsystem;
-import org.apache.asterix.common.transactions.TxnId;
 import org.apache.asterix.common.transactions.LogRecord;
+import org.apache.asterix.common.transactions.TransactionOptions;
+import org.apache.asterix.common.transactions.TxnId;
 import org.apache.asterix.common.utils.TransactionUtil;
 import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
+import org.apache.hyracks.util.annotations.ThreadSafe;
 
-/**
- * An implementation of the @see ITransactionManager interface that provides
- * implementation of APIs for governing the lifecycle of a transaction.
- */
+@ThreadSafe
 public class TransactionManager implements ITransactionManager, 
ILifeCycleComponent {
 
-    public static final boolean IS_DEBUG_MODE = false;//true
     private static final Logger LOGGER = 
Logger.getLogger(TransactionManager.class.getName());
     private final ITransactionSubsystem txnSubsystem;
-    private Map<TxnId, ITransactionContext> transactionContextRepository = new 
ConcurrentHashMap<>();
-    private AtomicLong maxTxnId = new AtomicLong(0);
+    private final Map<TxnId, ITransactionContext> txnCtxRepository = new 
ConcurrentHashMap<>();
+    private final AtomicLong maxTxnId = new AtomicLong(0);
 
     public TransactionManager(ITransactionSubsystem provider) {
         this.txnSubsystem = provider;
     }
 
     @Override
-    public void abortTransaction(ITransactionContext txnCtx, DatasetId 
datasetId, int PKHashVal) throws ACIDException {
-        if (txnCtx.getTxnState() != ITransactionManager.ABORTED) {
-            txnCtx.setTxnState(ITransactionManager.ABORTED);
-        }
-        try {
-            if (txnCtx.isWriteTxn()) {
-                LogRecord logRecord = ((TransactionContext) 
txnCtx).getLogRecord();
-                TransactionUtil.formJobTerminateLogRecord(txnCtx, logRecord, 
false);
-                txnSubsystem.getLogManager().log(logRecord);
-                txnSubsystem.getRecoveryManager().rollbackTransaction(txnCtx);
-            }
-        } catch (Exception ae) {
-            String msg = "Could not complete rollback! System is in an 
inconsistent state";
-            if (LOGGER.isLoggable(Level.SEVERE)) {
-                LOGGER.severe(msg);
-            }
-            ae.printStackTrace();
-            throw new ACIDException(msg, ae);
-        } finally {
-            txnCtx.complete();
-            txnSubsystem.getLockManager().releaseLocks(txnCtx);
-            transactionContextRepository.remove(txnCtx.getTxnId());
+    public synchronized ITransactionContext beginTransaction(TxnId txnId, 
TransactionOptions options)
+            throws ACIDException {
+        ITransactionContext txnCtx = txnCtxRepository.get(txnId);
+        if (txnCtx != null) {
+            throw new ACIDException("Transaction with the same (" + txnId + ") 
already exists");
         }
+        txnCtx = TransactionContextFactory.create(txnId, options);
+        txnCtxRepository.put(txnId, txnCtx);
+        ensureMaxTxnId(txnId.getId());
+        return txnCtx;
     }
 
     @Override
-    public ITransactionContext beginTransaction(TxnId txnId) throws 
ACIDException {
-        return getTransactionContext(txnId, true);
-    }
-
-    @Override
-    public ITransactionContext getTransactionContext(TxnId txnId, boolean 
createIfNotExist) throws ACIDException {
-        setMaxTxnId(txnId.getId());
-        ITransactionContext txnCtx = transactionContextRepository.get(txnId);
+    public ITransactionContext getTransactionContext(TxnId txnId) throws 
ACIDException {
+        ITransactionContext txnCtx = txnCtxRepository.get(txnId);
         if (txnCtx == null) {
-            if (createIfNotExist) {
-                synchronized (this) {
-                    txnCtx = transactionContextRepository.get(txnId);
-                    if (txnCtx == null) {
-                        txnCtx = new TransactionContext(txnId);
-                        transactionContextRepository.put(txnId, txnCtx);
-                    }
-                }
-            } else {
-                throw new ACIDException("TransactionContext of " + txnId + " 
doesn't exist.");
-            }
+            throw new ACIDException("Transaction " + txnId + " doesn't 
exist.");
         }
         return txnCtx;
     }
 
     @Override
-    public void commitTransaction(ITransactionContext txnCtx, DatasetId 
datasetId, int PKHashVal)
-            throws ACIDException {
-        //Only job-level commits call this method.
+    public void commitTransaction(TxnId txnId) throws ACIDException {
+        final ITransactionContext txnCtx = getTransactionContext(txnId);
         try {
             if (txnCtx.isWriteTxn()) {
-                LogRecord logRecord = ((TransactionContext) 
txnCtx).getLogRecord();
+                LogRecord logRecord = new LogRecord();
                 TransactionUtil.formJobTerminateLogRecord(txnCtx, logRecord, 
true);
                 txnSubsystem.getLogManager().log(logRecord);
+                txnCtx.setTxnState(ITransactionManager.COMMITTED);
             }
-        } catch (Exception ae) {
+        } catch (Exception e) {
             if (LOGGER.isLoggable(Level.SEVERE)) {
                 LOGGER.severe(" caused exception in commit !" + 
txnCtx.getTxnId());
             }
-            throw ae;
+            throw e;
         } finally {
             txnCtx.complete();
             txnSubsystem.getLockManager().releaseLocks(txnCtx);
-            transactionContextRepository.remove(txnCtx.getTxnId());
-            txnCtx.setTxnState(ITransactionManager.COMMITTED);
+            txnCtxRepository.remove(txnCtx.getTxnId());
         }
     }
 
     @Override
-    public void completedTransaction(ITransactionContext txnContext, DatasetId 
datasetId, int PKHashVal,
-            boolean success) throws ACIDException {
-        if (!success) {
-            abortTransaction(txnContext, datasetId, PKHashVal);
-        } else {
-            commitTransaction(txnContext, datasetId, PKHashVal);
-        }
-    }
-
-    @Override
-    public ITransactionSubsystem getTransactionSubsystem() {
-        return txnSubsystem;
-    }
-
-    public void setMaxTxnId(long txnId) {
-        long maxId = maxTxnId.get();
-        if (txnId > maxId) {
-            maxTxnId.compareAndSet(maxId, txnId);
+    public void abortTransaction(TxnId txnId) throws ACIDException {
+        final ITransactionContext txnCtx = getTransactionContext(txnId);
+        try {
+            if (txnCtx.isWriteTxn()) {
+                LogRecord logRecord = new LogRecord();
+                TransactionUtil.formJobTerminateLogRecord(txnCtx, logRecord, 
false);
+                txnSubsystem.getLogManager().log(logRecord);
+                txnSubsystem.getRecoveryManager().rollbackTransaction(txnCtx);
+                txnCtx.setTxnState(ITransactionManager.ABORTED);
+            }
+        } catch (ACIDException e) {
+            String msg = "Could not complete rollback! System is in an 
inconsistent state";
+            if (LOGGER.isLoggable(Level.SEVERE)) {
+                LOGGER.log(Level.SEVERE, msg, e);
+            }
+            throw new ACIDException(msg, e);
+        } finally {
+            txnCtx.complete();
+            txnSubsystem.getLockManager().releaseLocks(txnCtx);
+            txnCtxRepository.remove(txnCtx.getTxnId());
         }
     }
 
@@ -167,45 +137,41 @@ public class TransactionManager implements 
ITransactionManager, ILifeCycleCompon
 
     @Override
     public void dumpState(OutputStream os) {
-        //#. dump TxnContext
         dumpTxnContext(os);
     }
 
+    private void ensureMaxTxnId(long txnId) {
+        maxTxnId.updateAndGet(current -> Math.max(current, txnId));
+    }
+
     private void dumpTxnContext(OutputStream os) {
         TxnId txnId;
         ITransactionContext txnCtx;
         StringBuilder sb = new StringBuilder();
-
         try {
             sb.append("\n>>dump_begin\t>>----- [ConfVars] -----");
-            Set<Map.Entry<TxnId, ITransactionContext>> entrySet = 
transactionContextRepository.entrySet();
-            if (entrySet != null) {
-                for (Map.Entry<TxnId, ITransactionContext> entry : entrySet) {
-                    if (entry != null) {
-                        txnId = entry.getKey();
-                        if (txnId != null) {
-                            sb.append("\n" + txnId);
-                        } else {
-                            sb.append("\nJID:null");
-                        }
-
-                        txnCtx = entry.getValue();
-                        if (txnCtx != null) {
-                            sb.append(txnCtx.prettyPrint());
-                        } else {
-                            sb.append("\nTxnCtx:null");
-                        }
+            Set<Map.Entry<TxnId, ITransactionContext>> entrySet = 
txnCtxRepository.entrySet();
+            for (Map.Entry<TxnId, ITransactionContext> entry : entrySet) {
+                if (entry != null) {
+                    txnId = entry.getKey();
+                    if (txnId != null) {
+                        sb.append("\n" + txnId);
+                    } else {
+                        sb.append("\nJID:null");
+                    }
+
+                    txnCtx = entry.getValue();
+                    if (txnCtx != null) {
+                        sb.append(((AbstractTransactionContext) 
txnCtx).prettyPrint());
+                    } else {
+                        sb.append("\nTxnCtx:null");
                     }
                 }
             }
-
             sb.append("\n>>dump_end\t>>----- [ConfVars] -----\n");
             os.write(sb.toString().getBytes());
-        } catch (Exception e) {
-            //ignore exception and continue dumping as much as possible.
-            if (IS_DEBUG_MODE) {
-                e.printStackTrace();
-            }
+        } catch (IOException e) {
+            LOGGER.log(Level.WARNING, "exception while dumping state", e);
         }
     }
 }

Reply via email to