Repository: asterixdb
Updated Branches:
  refs/heads/master 9eead00d0 -> 592af6545


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/592af654/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/AbstractCheckpointManager.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/AbstractCheckpointManager.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/AbstractCheckpointManager.java
index 36a91dc..aad2a19 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/AbstractCheckpointManager.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/AbstractCheckpointManager.java
@@ -174,7 +174,7 @@ public abstract class AbstractCheckpointManager implements 
ICheckpointManager {
     protected void capture(long minMCTFirstLSN, boolean sharp) throws 
HyracksDataException {
         ILogManager logMgr = txnSubsystem.getLogManager();
         ITransactionManager txnMgr = txnSubsystem.getTransactionManager();
-        Checkpoint checkpointObject = new Checkpoint(logMgr.getAppendLSN(), 
minMCTFirstLSN, txnMgr.getMaxJobId(),
+        Checkpoint checkpointObject = new Checkpoint(logMgr.getAppendLSN(), 
minMCTFirstLSN, txnMgr.getMaxTxnId(),
                 System.currentTimeMillis(), sharp, StorageConstants.VERSION);
         persist(checkpointObject);
         cleanup();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/592af654/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/TxnEntityId.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/TxnEntityId.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/TxnEntityId.java
new file mode 100644
index 0000000..af74b13
--- /dev/null
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/TxnEntityId.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.transaction.management.service.recovery;
+
+import java.nio.ByteBuffer;
+
+import org.apache.asterix.common.transactions.ILogRecord;
+import org.apache.asterix.common.transactions.LogRecord;
+import org.apache.asterix.common.transactions.TxnId;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+
+public class TxnEntityId {
+    public boolean isByteArrayPKValue;
+    public long txnId;
+    public int datasetId;
+    public int pkHashValue;
+    public int pkSize;
+    public byte[] byteArrayPKValue;
+    public ITupleReference tupleReferencePKValue;
+
+    public TxnEntityId(long txnId, int datasetId, int pkHashValue, 
ITupleReference pkValue, int pkSize,
+            boolean isByteArrayPKValue) {
+        this.txnId = txnId;
+        this.datasetId = datasetId;
+        this.pkHashValue = pkHashValue;
+        this.pkSize = pkSize;
+        this.isByteArrayPKValue = isByteArrayPKValue;
+        if (isByteArrayPKValue) {
+            this.byteArrayPKValue = new byte[pkSize];
+            readPKValueIntoByteArray(pkValue, pkSize, byteArrayPKValue);
+        } else {
+            this.tupleReferencePKValue = pkValue;
+        }
+    }
+
+    public TxnEntityId() {
+    }
+
+    private static void readPKValueIntoByteArray(ITupleReference pkValue, int 
pkSize, byte[] byteArrayPKValue) {
+        int readOffset = pkValue.getFieldStart(0);
+        byte[] readBuffer = pkValue.getFieldData(0);
+        for (int i = 0; i < pkSize; i++) {
+            byteArrayPKValue[i] = readBuffer[readOffset + i];
+        }
+    }
+
+    public void setTxnId(long txnId, int datasetId, int pkHashValue, 
ITupleReference pkValue, int pkSize) {
+        this.txnId = txnId;
+        this.datasetId = datasetId;
+        this.pkHashValue = pkHashValue;
+        this.tupleReferencePKValue = pkValue;
+        this.pkSize = pkSize;
+        this.isByteArrayPKValue = false;
+    }
+
+    @Override
+    public String toString() {
+        return "[" + txnId + "," + datasetId + "," + pkHashValue + "," + 
pkSize + "]";
+    }
+
+    @Override
+    public int hashCode() {
+        return pkHashValue;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (o == this) {
+            return true;
+        }
+        if (!(o instanceof TxnEntityId)) {
+            return false;
+        }
+        TxnEntityId txnEntityId = (TxnEntityId) o;
+        return (txnEntityId.pkHashValue == pkHashValue && 
txnEntityId.datasetId == datasetId
+                && txnEntityId.txnId == txnId && pkSize == txnEntityId.pkSize 
&& isEqualTo(txnEntityId));
+    }
+
+    private boolean isEqualTo(TxnEntityId txnEntityId) {
+        if (isByteArrayPKValue && txnEntityId.isByteArrayPKValue) {
+            return isEqual(byteArrayPKValue, txnEntityId.byteArrayPKValue, 
pkSize);
+        } else if (isByteArrayPKValue && (!txnEntityId.isByteArrayPKValue)) {
+            return isEqual(byteArrayPKValue, 
txnEntityId.tupleReferencePKValue, pkSize);
+        } else if ((!isByteArrayPKValue) && txnEntityId.isByteArrayPKValue) {
+            return isEqual(txnEntityId.byteArrayPKValue, 
tupleReferencePKValue, pkSize);
+        } else {
+            return isEqual(tupleReferencePKValue, 
txnEntityId.tupleReferencePKValue, pkSize);
+        }
+    }
+
+    private static boolean isEqual(byte[] a, byte[] b, int size) {
+        for (int i = 0; i < size; i++) {
+            if (a[i] != b[i]) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    private static boolean isEqual(byte[] a, ITupleReference b, int size) {
+        int readOffset = b.getFieldStart(0);
+        byte[] readBuffer = b.getFieldData(0);
+        for (int i = 0; i < size; i++) {
+            if (a[i] != readBuffer[readOffset + i]) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    private static boolean isEqual(ITupleReference a, ITupleReference b, int 
size) {
+        int aOffset = a.getFieldStart(0);
+        byte[] aBuffer = a.getFieldData(0);
+        int bOffset = b.getFieldStart(0);
+        byte[] bBuffer = b.getFieldData(0);
+        for (int i = 0; i < size; i++) {
+            if (aBuffer[aOffset + i] != bBuffer[bOffset + i]) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    public void serialize(ByteBuffer buffer) {
+        buffer.putLong(txnId);
+        buffer.putInt(datasetId);
+        buffer.putInt(pkHashValue);
+        buffer.putInt(pkSize);
+        buffer.put((byte) (isByteArrayPKValue ? 1 : 0));
+        if (isByteArrayPKValue) {
+            buffer.put(byteArrayPKValue);
+        }
+    }
+
+    public static TxnEntityId deserialize(ByteBuffer buffer) {
+        TxnEntityId txnEntityId = new TxnEntityId();
+        txnEntityId.txnId = buffer.getLong();
+        txnEntityId.datasetId = buffer.getInt();
+        txnEntityId.pkHashValue = buffer.getInt();
+        txnEntityId.pkSize = buffer.getInt();
+        txnEntityId.isByteArrayPKValue = (buffer.get() == 1);
+        if (txnEntityId.isByteArrayPKValue) {
+            byte[] byteArrayPKValue = new byte[txnEntityId.pkSize];
+            buffer.get(byteArrayPKValue);
+            txnEntityId.byteArrayPKValue = byteArrayPKValue;
+        }
+        return txnEntityId;
+    }
+
+    public int getCurrentSize() {
+        //txn id, dataset id, pkHashValue, arraySize, isByteArrayPKValue
+        int size = TxnId.BYTES + ILogRecord.DS_LEN + LogRecord.PKHASH_LEN + 
LogRecord.PKSZ_LEN + Byte.BYTES;
+        //byte arraySize
+        if (isByteArrayPKValue && byteArrayPKValue != null) {
+            size += byteArrayPKValue.length;
+        }
+        return size;
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/592af654/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/TxnId.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/TxnId.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/TxnId.java
deleted file mode 100644
index 9cb54af..0000000
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/TxnId.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.transaction.management.service.recovery;
-
-import java.nio.ByteBuffer;
-
-import org.apache.asterix.common.transactions.ILogRecord;
-import org.apache.asterix.common.transactions.JobId;
-import org.apache.asterix.common.transactions.LogRecord;
-import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
-
-public class TxnId {
-    public boolean isByteArrayPKValue;
-    public int jobId;
-    public int datasetId;
-    public int pkHashValue;
-    public int pkSize;
-    public byte[] byteArrayPKValue;
-    public ITupleReference tupleReferencePKValue;
-
-    public TxnId(int jobId, int datasetId, int pkHashValue, ITupleReference 
pkValue, int pkSize,
-            boolean isByteArrayPKValue) {
-        this.jobId = jobId;
-        this.datasetId = datasetId;
-        this.pkHashValue = pkHashValue;
-        this.pkSize = pkSize;
-        this.isByteArrayPKValue = isByteArrayPKValue;
-        if (isByteArrayPKValue) {
-            this.byteArrayPKValue = new byte[pkSize];
-            readPKValueIntoByteArray(pkValue, pkSize, byteArrayPKValue);
-        } else {
-            this.tupleReferencePKValue = pkValue;
-        }
-    }
-
-    public TxnId() {
-    }
-
-    private static void readPKValueIntoByteArray(ITupleReference pkValue, int 
pkSize, byte[] byteArrayPKValue) {
-        int readOffset = pkValue.getFieldStart(0);
-        byte[] readBuffer = pkValue.getFieldData(0);
-        for (int i = 0; i < pkSize; i++) {
-            byteArrayPKValue[i] = readBuffer[readOffset + i];
-        }
-    }
-
-    public void setTxnId(int jobId, int datasetId, int pkHashValue, 
ITupleReference pkValue, int pkSize) {
-        this.jobId = jobId;
-        this.datasetId = datasetId;
-        this.pkHashValue = pkHashValue;
-        this.tupleReferencePKValue = pkValue;
-        this.pkSize = pkSize;
-        this.isByteArrayPKValue = false;
-    }
-
-    @Override
-    public String toString() {
-        return "[" + jobId + "," + datasetId + "," + pkHashValue + "," + 
pkSize + "]";
-    }
-
-    @Override
-    public int hashCode() {
-        return pkHashValue;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (o == this) {
-            return true;
-        }
-        if (!(o instanceof TxnId)) {
-            return false;
-        }
-        TxnId txnId = (TxnId) o;
-        return (txnId.pkHashValue == pkHashValue && txnId.datasetId == 
datasetId && txnId.jobId == jobId
-                && pkSize == txnId.pkSize && isEqualTo(txnId));
-    }
-
-    private boolean isEqualTo(TxnId txnId) {
-        if (isByteArrayPKValue && txnId.isByteArrayPKValue) {
-            return isEqual(byteArrayPKValue, txnId.byteArrayPKValue, pkSize);
-        } else if (isByteArrayPKValue && (!txnId.isByteArrayPKValue)) {
-            return isEqual(byteArrayPKValue, txnId.tupleReferencePKValue, 
pkSize);
-        } else if ((!isByteArrayPKValue) && txnId.isByteArrayPKValue) {
-            return isEqual(txnId.byteArrayPKValue, tupleReferencePKValue, 
pkSize);
-        } else {
-            return isEqual(tupleReferencePKValue, txnId.tupleReferencePKValue, 
pkSize);
-        }
-    }
-
-    private static boolean isEqual(byte[] a, byte[] b, int size) {
-        for (int i = 0; i < size; i++) {
-            if (a[i] != b[i]) {
-                return false;
-            }
-        }
-        return true;
-    }
-
-    private static boolean isEqual(byte[] a, ITupleReference b, int size) {
-        int readOffset = b.getFieldStart(0);
-        byte[] readBuffer = b.getFieldData(0);
-        for (int i = 0; i < size; i++) {
-            if (a[i] != readBuffer[readOffset + i]) {
-                return false;
-            }
-        }
-        return true;
-    }
-
-    private static boolean isEqual(ITupleReference a, ITupleReference b, int 
size) {
-        int aOffset = a.getFieldStart(0);
-        byte[] aBuffer = a.getFieldData(0);
-        int bOffset = b.getFieldStart(0);
-        byte[] bBuffer = b.getFieldData(0);
-        for (int i = 0; i < size; i++) {
-            if (aBuffer[aOffset + i] != bBuffer[bOffset + i]) {
-                return false;
-            }
-        }
-        return true;
-    }
-
-    public void serialize(ByteBuffer buffer) {
-        buffer.putInt(jobId);
-        buffer.putInt(datasetId);
-        buffer.putInt(pkHashValue);
-        buffer.putInt(pkSize);
-        buffer.put((byte) (isByteArrayPKValue ? 1 : 0));
-        if (isByteArrayPKValue) {
-            buffer.put(byteArrayPKValue);
-        }
-    }
-
-    public static TxnId deserialize(ByteBuffer buffer) {
-        TxnId txnId = new TxnId();
-        txnId.jobId = buffer.getInt();
-        txnId.datasetId = buffer.getInt();
-        txnId.pkHashValue = buffer.getInt();
-        txnId.pkSize = buffer.getInt();
-        txnId.isByteArrayPKValue = (buffer.get() == 1);
-        if (txnId.isByteArrayPKValue) {
-            byte[] byteArrayPKValue = new byte[txnId.pkSize];
-            buffer.get(byteArrayPKValue);
-            txnId.byteArrayPKValue = byteArrayPKValue;
-        }
-        return txnId;
-    }
-
-    public int getCurrentSize() {
-        //job id, dataset id, pkHashValue, arraySize, isByteArrayPKValue
-        int size = JobId.BYTES + ILogRecord.DS_LEN + LogRecord.PKHASH_LEN + 
LogRecord.PKSZ_LEN + Byte.BYTES;
-        //byte arraySize
-        if (isByteArrayPKValue && byteArrayPKValue != null) {
-            size += byteArrayPKValue.length;
-        }
-        return size;
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/592af654/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/JobIdFactory.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/JobIdFactory.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/JobIdFactory.java
deleted file mode 100644
index 6e0af1c..0000000
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/JobIdFactory.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.transaction.management.service.transaction;
-
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.asterix.common.transactions.JobId;
-
-/**
- * Represents a factory to generate unique transaction IDs.
- */
-public class JobIdFactory {
-    private static final AtomicInteger Id = new AtomicInteger();
-
-    public static JobId generateJobId() {
-        return new JobId(Id.incrementAndGet());
-    }
-
-    public static void initJobId(int id) {
-        Id.set(id);
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/592af654/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionContext.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionContext.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionContext.java
index 3159e6b..1681a27 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionContext.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionContext.java
@@ -30,8 +30,8 @@ import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.transactions.AbstractOperationCallback;
 import org.apache.asterix.common.transactions.ITransactionContext;
 import org.apache.asterix.common.transactions.ITransactionManager;
-import org.apache.asterix.common.transactions.JobId;
 import org.apache.asterix.common.transactions.LogRecord;
+import org.apache.asterix.common.transactions.TxnId;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
@@ -39,7 +39,7 @@ import 
org.apache.hyracks.storage.common.IModificationOperationCallback;
 
 /*
  * An object of TransactionContext is created and accessed(read/written) by 
multiple threads which work for
- * a single job identified by a jobId. Thus, the member variables in the 
object can be read/written
+ * a single job identified by a txnId. Thus, the member variables in the 
object can be read/written
  * concurrently. Please see each variable declaration to know which one is 
accessed concurrently and
  * which one is not.
  */
@@ -47,8 +47,8 @@ public class TransactionContext implements 
ITransactionContext {
 
     private static final long serialVersionUID = -6105616785783310111L;
 
-    // jobId is set once and read concurrently.
-    private final JobId jobId;
+    // txnId is set once and read concurrently.
+    private final TxnId txnId;
 
     // There are no concurrent writers on both firstLSN and lastLSN
     // since both values are updated by serialized log appenders.
@@ -95,8 +95,8 @@ public class TransactionContext implements 
ITransactionContext {
     // creations.
     // also, the pool can throttle the number of concurrent active jobs at 
every
     // moment.
-    public TransactionContext(JobId jobId) throws ACIDException {
-        this.jobId = jobId;
+    public TransactionContext(TxnId txnId) throws ACIDException {
+        this.txnId = txnId;
         firstLSN = new AtomicLong(-1);
         lastLSN = new AtomicLong(-1);
         txnState = new AtomicInteger(ITransactionManager.ACTIVE);
@@ -180,8 +180,8 @@ public class TransactionContext implements 
ITransactionContext {
     }
 
     @Override
-    public JobId getJobId() {
-        return jobId;
+    public TxnId getTxnId() {
+        return txnId;
     }
 
     @Override
@@ -206,7 +206,7 @@ public class TransactionContext implements 
ITransactionContext {
 
     @Override
     public int hashCode() {
-        return jobId.getId();
+        return Long.hashCode(txnId.getId());
     }
 
     @Override
@@ -227,7 +227,7 @@ public class TransactionContext implements 
ITransactionContext {
     @Override
     public String prettyPrint() {
         StringBuilder sb = new StringBuilder();
-        sb.append("\n" + jobId + "\n");
+        sb.append("\n" + txnId + "\n");
         sb.append("isWriteTxn: " + isWriteTxn + "\n");
         sb.append("firstLSN: " + firstLSN.get() + "\n");
         sb.append("lastLSN: " + lastLSN.get() + "\n");

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/592af654/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java
index c9a1bad..1799ea1 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java
@@ -22,7 +22,7 @@ import java.io.OutputStream;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -31,7 +31,7 @@ import org.apache.asterix.common.transactions.DatasetId;
 import org.apache.asterix.common.transactions.ITransactionContext;
 import org.apache.asterix.common.transactions.ITransactionManager;
 import org.apache.asterix.common.transactions.ITransactionSubsystem;
-import org.apache.asterix.common.transactions.JobId;
+import org.apache.asterix.common.transactions.TxnId;
 import org.apache.asterix.common.transactions.LogRecord;
 import org.apache.asterix.common.utils.TransactionUtil;
 import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
@@ -45,8 +45,8 @@ public class TransactionManager implements 
ITransactionManager, ILifeCycleCompon
     public static final boolean IS_DEBUG_MODE = false;//true
     private static final Logger LOGGER = 
Logger.getLogger(TransactionManager.class.getName());
     private final ITransactionSubsystem txnSubsystem;
-    private Map<JobId, ITransactionContext> transactionContextRepository = new 
ConcurrentHashMap<>();
-    private AtomicInteger maxJobId = new AtomicInteger(0);
+    private Map<TxnId, ITransactionContext> transactionContextRepository = new 
ConcurrentHashMap<>();
+    private AtomicLong maxTxnId = new AtomicLong(0);
 
     public TransactionManager(ITransactionSubsystem provider) {
         this.txnSubsystem = provider;
@@ -74,30 +74,30 @@ public class TransactionManager implements 
ITransactionManager, ILifeCycleCompon
         } finally {
             txnCtx.complete();
             txnSubsystem.getLockManager().releaseLocks(txnCtx);
-            transactionContextRepository.remove(txnCtx.getJobId());
+            transactionContextRepository.remove(txnCtx.getTxnId());
         }
     }
 
     @Override
-    public ITransactionContext beginTransaction(JobId jobId) throws 
ACIDException {
-        return getTransactionContext(jobId, true);
+    public ITransactionContext beginTransaction(TxnId txnId) throws 
ACIDException {
+        return getTransactionContext(txnId, true);
     }
 
     @Override
-    public ITransactionContext getTransactionContext(JobId jobId, boolean 
createIfNotExist) throws ACIDException {
-        setMaxJobId(jobId.getId());
-        ITransactionContext txnCtx = transactionContextRepository.get(jobId);
+    public ITransactionContext getTransactionContext(TxnId txnId, boolean 
createIfNotExist) throws ACIDException {
+        setMaxTxnId(txnId.getId());
+        ITransactionContext txnCtx = transactionContextRepository.get(txnId);
         if (txnCtx == null) {
             if (createIfNotExist) {
                 synchronized (this) {
-                    txnCtx = transactionContextRepository.get(jobId);
+                    txnCtx = transactionContextRepository.get(txnId);
                     if (txnCtx == null) {
-                        txnCtx = new TransactionContext(jobId);
-                        transactionContextRepository.put(jobId, txnCtx);
+                        txnCtx = new TransactionContext(txnId);
+                        transactionContextRepository.put(txnId, txnCtx);
                     }
                 }
             } else {
-                throw new ACIDException("TransactionContext of " + jobId + " 
doesn't exist.");
+                throw new ACIDException("TransactionContext of " + txnId + " 
doesn't exist.");
             }
         }
         return txnCtx;
@@ -115,13 +115,13 @@ public class TransactionManager implements 
ITransactionManager, ILifeCycleCompon
             }
         } catch (Exception ae) {
             if (LOGGER.isLoggable(Level.SEVERE)) {
-                LOGGER.severe(" caused exception in commit !" + 
txnCtx.getJobId());
+                LOGGER.severe(" caused exception in commit !" + 
txnCtx.getTxnId());
             }
             throw ae;
         } finally {
             txnCtx.complete();
             txnSubsystem.getLockManager().releaseLocks(txnCtx);
-            transactionContextRepository.remove(txnCtx.getJobId());
+            transactionContextRepository.remove(txnCtx.getTxnId());
             txnCtx.setTxnState(ITransactionManager.COMMITTED);
         }
     }
@@ -141,16 +141,16 @@ public class TransactionManager implements 
ITransactionManager, ILifeCycleCompon
         return txnSubsystem;
     }
 
-    public void setMaxJobId(int jobId) {
-        int maxId = maxJobId.get();
-        if (jobId > maxId) {
-            maxJobId.compareAndSet(maxId, jobId);
+    public void setMaxTxnId(long txnId) {
+        long maxId = maxTxnId.get();
+        if (txnId > maxId) {
+            maxTxnId.compareAndSet(maxId, txnId);
         }
     }
 
     @Override
-    public int getMaxJobId() {
-        return maxJobId.get();
+    public long getMaxTxnId() {
+        return maxTxnId.get();
     }
 
     @Override
@@ -172,19 +172,19 @@ public class TransactionManager implements 
ITransactionManager, ILifeCycleCompon
     }
 
     private void dumpTxnContext(OutputStream os) {
-        JobId jobId;
+        TxnId txnId;
         ITransactionContext txnCtx;
         StringBuilder sb = new StringBuilder();
 
         try {
             sb.append("\n>>dump_begin\t>>----- [ConfVars] -----");
-            Set<Map.Entry<JobId, ITransactionContext>> entrySet = 
transactionContextRepository.entrySet();
+            Set<Map.Entry<TxnId, ITransactionContext>> entrySet = 
transactionContextRepository.entrySet();
             if (entrySet != null) {
-                for (Map.Entry<JobId, ITransactionContext> entry : entrySet) {
+                for (Map.Entry<TxnId, ITransactionContext> entry : entrySet) {
                     if (entry != null) {
-                        jobId = entry.getKey();
-                        if (jobId != null) {
-                            sb.append("\n" + jobId);
+                        txnId = entry.getKey();
+                        if (txnId != null) {
+                            sb.append("\n" + txnId);
                         } else {
                             sb.append("\nJID:null");
                         }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/592af654/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TxnIdFactory.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TxnIdFactory.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TxnIdFactory.java
new file mode 100644
index 0000000..71d7f56
--- /dev/null
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TxnIdFactory.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.transaction.management.service.transaction;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.asterix.common.transactions.TxnId;
+
+/**
+ * Represents a factory to generate unique transaction IDs.
+ */
+public class TxnIdFactory {
+
+    private static final AtomicLong id = new AtomicLong();
+
+    private TxnIdFactory() {
+    }
+
+    public static TxnId create() {
+        return new TxnId(id.incrementAndGet());
+    }
+
+    public static void ensureMinimumId(long id) {
+        TxnIdFactory.id.set(id);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/592af654/asterixdb/asterix-transactions/src/test/java/org/apache/asterix/transaction/management/service/locking/LockManagerUnitTest.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-transactions/src/test/java/org/apache/asterix/transaction/management/service/locking/LockManagerUnitTest.java
 
b/asterixdb/asterix-transactions/src/test/java/org/apache/asterix/transaction/management/service/locking/LockManagerUnitTest.java
index 14e4020..64ac3cb 100644
--- 
a/asterixdb/asterix-transactions/src/test/java/org/apache/asterix/transaction/management/service/locking/LockManagerUnitTest.java
+++ 
b/asterixdb/asterix-transactions/src/test/java/org/apache/asterix/transaction/management/service/locking/LockManagerUnitTest.java
@@ -36,7 +36,7 @@ import java.util.logging.Logger;
 import org.apache.asterix.common.transactions.DatasetId;
 import org.apache.asterix.common.transactions.ILockManager;
 import org.apache.asterix.common.transactions.ITransactionContext;
-import org.apache.asterix.common.transactions.JobId;
+import org.apache.asterix.common.transactions.TxnId;
 import org.apache.asterix.transaction.management.service.locking.Request.Kind;
 import 
org.apache.asterix.transaction.management.service.transaction.TransactionManagementConstants.LockManagerConstants.LockMode;
 import org.junit.After;
@@ -300,7 +300,7 @@ public class LockManagerUnitTest {
      * @return throwable for said error
      */
     private static Throwable getError(Map<String, Throwable> errors, 
ITransactionContext txnCtx) {
-        return errors.get(txnCtx.getJobId().toString());
+        return errors.get(txnCtx.getTxnId().toString());
     }
 
     /**
@@ -318,7 +318,7 @@ public class LockManagerUnitTest {
         Throwable error = getError(errors, txnCtx);
         if (error == null) {
             throw new AssertionError(
-                    "expected " + clazz.getSimpleName() + " for " + 
txnCtx.getJobId() + ", got no " + "exception");
+                    "expected " + clazz.getSimpleName() + " for " + 
txnCtx.getTxnId() + ", got no " + "exception");
         }
         if (!clazz.isInstance(error)) {
             throw new AssertionError(error);
@@ -354,7 +354,7 @@ public class LockManagerUnitTest {
     private ITransactionContext j(int jId) {
         if (!jobId2TxnCtxMap.containsKey(jId)) {
             ITransactionContext mockTxnContext = 
mock(ITransactionContext.class);
-            when(mockTxnContext.getJobId()).thenReturn(new JobId(jId));
+            when(mockTxnContext.getTxnId()).thenReturn(new TxnId(jId));
             jobId2TxnCtxMap.put(jId, mockTxnContext);
         }
         return jobId2TxnCtxMap.get(jId);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/592af654/asterixdb/asterix-transactions/src/test/java/org/apache/asterix/transaction/management/service/locking/Locker.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-transactions/src/test/java/org/apache/asterix/transaction/management/service/locking/Locker.java
 
b/asterixdb/asterix-transactions/src/test/java/org/apache/asterix/transaction/management/service/locking/Locker.java
index 97b4f8a..ef7d40e 100644
--- 
a/asterixdb/asterix-transactions/src/test/java/org/apache/asterix/transaction/management/service/locking/Locker.java
+++ 
b/asterixdb/asterix-transactions/src/test/java/org/apache/asterix/transaction/management/service/locking/Locker.java
@@ -71,7 +71,7 @@ class Locker implements Runnable {
      */
     Locker(ILockManager lockMgr, ITransactionContext txnCtx, List<Request> 
allRequests, AtomicInteger time,
             PrintStream err) {
-        this.name = txnCtx == null ? "admin" : txnCtx.getJobId().toString();
+        this.name = txnCtx == null ? "admin" : txnCtx.getTxnId().toString();
         this.lockMgr = lockMgr;
 
         this.requests = new LinkedList<>();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/592af654/asterixdb/asterix-transactions/src/test/java/org/apache/asterix/transaction/management/service/locking/Request.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-transactions/src/test/java/org/apache/asterix/transaction/management/service/locking/Request.java
 
b/asterixdb/asterix-transactions/src/test/java/org/apache/asterix/transaction/management/service/locking/Request.java
index fd4dae5..112dc5f 100644
--- 
a/asterixdb/asterix-transactions/src/test/java/org/apache/asterix/transaction/management/service/locking/Request.java
+++ 
b/asterixdb/asterix-transactions/src/test/java/org/apache/asterix/transaction/management/service/locking/Request.java
@@ -58,7 +58,7 @@ abstract class Request {
 
     String asString(final Kind kind, final ITransactionContext txnCtx, final 
DatasetId dsId, final int hashValue,
             final byte lockMode) {
-        return txnCtx.getJobId() + ":" + kind.name() + ":" + dsId.getId() + 
":" + hashValue + ":"
+        return txnCtx.getTxnId() + ":" + kind.name() + ":" + dsId.getId() + 
":" + hashValue + ":"
                 + 
TransactionManagementConstants.LockManagerConstants.LockMode.toString(lockMode);
     }
 
@@ -147,7 +147,7 @@ abstract class Request {
 
                 @Override
                 public String toString() {
-                    return txnCtx.getJobId().toString() + ":" + kind.name();
+                    return txnCtx.getTxnId().toString() + ":" + kind.name();
                 }
             };
         }

Reply via email to