Repository: asterixdb Updated Branches: refs/heads/master 9eead00d0 -> 592af6545
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/592af654/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/AbstractCheckpointManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/AbstractCheckpointManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/AbstractCheckpointManager.java index 36a91dc..aad2a19 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/AbstractCheckpointManager.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/AbstractCheckpointManager.java @@ -174,7 +174,7 @@ public abstract class AbstractCheckpointManager implements ICheckpointManager { protected void capture(long minMCTFirstLSN, boolean sharp) throws HyracksDataException { ILogManager logMgr = txnSubsystem.getLogManager(); ITransactionManager txnMgr = txnSubsystem.getTransactionManager(); - Checkpoint checkpointObject = new Checkpoint(logMgr.getAppendLSN(), minMCTFirstLSN, txnMgr.getMaxJobId(), + Checkpoint checkpointObject = new Checkpoint(logMgr.getAppendLSN(), minMCTFirstLSN, txnMgr.getMaxTxnId(), System.currentTimeMillis(), sharp, StorageConstants.VERSION); persist(checkpointObject); cleanup(); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/592af654/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/TxnEntityId.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/TxnEntityId.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/TxnEntityId.java new file mode 100644 index 0000000..af74b13 --- /dev/null +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/TxnEntityId.java @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.transaction.management.service.recovery; + +import java.nio.ByteBuffer; + +import org.apache.asterix.common.transactions.ILogRecord; +import org.apache.asterix.common.transactions.LogRecord; +import org.apache.asterix.common.transactions.TxnId; +import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; + +public class TxnEntityId { + public boolean isByteArrayPKValue; + public long txnId; + public int datasetId; + public int pkHashValue; + public int pkSize; + public byte[] byteArrayPKValue; + public ITupleReference tupleReferencePKValue; + + public TxnEntityId(long txnId, int datasetId, int pkHashValue, ITupleReference pkValue, int pkSize, + boolean isByteArrayPKValue) { + this.txnId = txnId; + this.datasetId = datasetId; + this.pkHashValue = pkHashValue; + this.pkSize = pkSize; + this.isByteArrayPKValue = isByteArrayPKValue; + if (isByteArrayPKValue) { + this.byteArrayPKValue = new byte[pkSize]; + readPKValueIntoByteArray(pkValue, pkSize, byteArrayPKValue); + } else { + this.tupleReferencePKValue = pkValue; + } + } + + public TxnEntityId() { + } + + private static void readPKValueIntoByteArray(ITupleReference pkValue, int pkSize, byte[] byteArrayPKValue) { + int readOffset = pkValue.getFieldStart(0); + byte[] readBuffer = pkValue.getFieldData(0); + for (int i = 0; i < pkSize; i++) { + byteArrayPKValue[i] = readBuffer[readOffset + i]; + } + } + + public void setTxnId(long txnId, int datasetId, int pkHashValue, ITupleReference pkValue, int pkSize) { + this.txnId = txnId; + this.datasetId = datasetId; + this.pkHashValue = pkHashValue; + this.tupleReferencePKValue = pkValue; + this.pkSize = pkSize; + this.isByteArrayPKValue = false; + } + + @Override + public String toString() { + return "[" + txnId + "," + datasetId + "," + pkHashValue + "," + pkSize + "]"; + } + + @Override + public int hashCode() { + return pkHashValue; + } + + @Override + public boolean equals(Object o) { + if (o == this) { + return true; + } + if (!(o instanceof TxnEntityId)) { + return false; + } + TxnEntityId txnEntityId = (TxnEntityId) o; + return (txnEntityId.pkHashValue == pkHashValue && txnEntityId.datasetId == datasetId + && txnEntityId.txnId == txnId && pkSize == txnEntityId.pkSize && isEqualTo(txnEntityId)); + } + + private boolean isEqualTo(TxnEntityId txnEntityId) { + if (isByteArrayPKValue && txnEntityId.isByteArrayPKValue) { + return isEqual(byteArrayPKValue, txnEntityId.byteArrayPKValue, pkSize); + } else if (isByteArrayPKValue && (!txnEntityId.isByteArrayPKValue)) { + return isEqual(byteArrayPKValue, txnEntityId.tupleReferencePKValue, pkSize); + } else if ((!isByteArrayPKValue) && txnEntityId.isByteArrayPKValue) { + return isEqual(txnEntityId.byteArrayPKValue, tupleReferencePKValue, pkSize); + } else { + return isEqual(tupleReferencePKValue, txnEntityId.tupleReferencePKValue, pkSize); + } + } + + private static boolean isEqual(byte[] a, byte[] b, int size) { + for (int i = 0; i < size; i++) { + if (a[i] != b[i]) { + return false; + } + } + return true; + } + + private static boolean isEqual(byte[] a, ITupleReference b, int size) { + int readOffset = b.getFieldStart(0); + byte[] readBuffer = b.getFieldData(0); + for (int i = 0; i < size; i++) { + if (a[i] != readBuffer[readOffset + i]) { + return false; + } + } + return true; + } + + private static boolean isEqual(ITupleReference a, ITupleReference b, int size) { + int aOffset = a.getFieldStart(0); + byte[] aBuffer = a.getFieldData(0); + int bOffset = b.getFieldStart(0); + byte[] bBuffer = b.getFieldData(0); + for (int i = 0; i < size; i++) { + if (aBuffer[aOffset + i] != bBuffer[bOffset + i]) { + return false; + } + } + return true; + } + + public void serialize(ByteBuffer buffer) { + buffer.putLong(txnId); + buffer.putInt(datasetId); + buffer.putInt(pkHashValue); + buffer.putInt(pkSize); + buffer.put((byte) (isByteArrayPKValue ? 1 : 0)); + if (isByteArrayPKValue) { + buffer.put(byteArrayPKValue); + } + } + + public static TxnEntityId deserialize(ByteBuffer buffer) { + TxnEntityId txnEntityId = new TxnEntityId(); + txnEntityId.txnId = buffer.getLong(); + txnEntityId.datasetId = buffer.getInt(); + txnEntityId.pkHashValue = buffer.getInt(); + txnEntityId.pkSize = buffer.getInt(); + txnEntityId.isByteArrayPKValue = (buffer.get() == 1); + if (txnEntityId.isByteArrayPKValue) { + byte[] byteArrayPKValue = new byte[txnEntityId.pkSize]; + buffer.get(byteArrayPKValue); + txnEntityId.byteArrayPKValue = byteArrayPKValue; + } + return txnEntityId; + } + + public int getCurrentSize() { + //txn id, dataset id, pkHashValue, arraySize, isByteArrayPKValue + int size = TxnId.BYTES + ILogRecord.DS_LEN + LogRecord.PKHASH_LEN + LogRecord.PKSZ_LEN + Byte.BYTES; + //byte arraySize + if (isByteArrayPKValue && byteArrayPKValue != null) { + size += byteArrayPKValue.length; + } + return size; + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/592af654/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/TxnId.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/TxnId.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/TxnId.java deleted file mode 100644 index 9cb54af..0000000 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/TxnId.java +++ /dev/null @@ -1,175 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.transaction.management.service.recovery; - -import java.nio.ByteBuffer; - -import org.apache.asterix.common.transactions.ILogRecord; -import org.apache.asterix.common.transactions.JobId; -import org.apache.asterix.common.transactions.LogRecord; -import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; - -public class TxnId { - public boolean isByteArrayPKValue; - public int jobId; - public int datasetId; - public int pkHashValue; - public int pkSize; - public byte[] byteArrayPKValue; - public ITupleReference tupleReferencePKValue; - - public TxnId(int jobId, int datasetId, int pkHashValue, ITupleReference pkValue, int pkSize, - boolean isByteArrayPKValue) { - this.jobId = jobId; - this.datasetId = datasetId; - this.pkHashValue = pkHashValue; - this.pkSize = pkSize; - this.isByteArrayPKValue = isByteArrayPKValue; - if (isByteArrayPKValue) { - this.byteArrayPKValue = new byte[pkSize]; - readPKValueIntoByteArray(pkValue, pkSize, byteArrayPKValue); - } else { - this.tupleReferencePKValue = pkValue; - } - } - - public TxnId() { - } - - private static void readPKValueIntoByteArray(ITupleReference pkValue, int pkSize, byte[] byteArrayPKValue) { - int readOffset = pkValue.getFieldStart(0); - byte[] readBuffer = pkValue.getFieldData(0); - for (int i = 0; i < pkSize; i++) { - byteArrayPKValue[i] = readBuffer[readOffset + i]; - } - } - - public void setTxnId(int jobId, int datasetId, int pkHashValue, ITupleReference pkValue, int pkSize) { - this.jobId = jobId; - this.datasetId = datasetId; - this.pkHashValue = pkHashValue; - this.tupleReferencePKValue = pkValue; - this.pkSize = pkSize; - this.isByteArrayPKValue = false; - } - - @Override - public String toString() { - return "[" + jobId + "," + datasetId + "," + pkHashValue + "," + pkSize + "]"; - } - - @Override - public int hashCode() { - return pkHashValue; - } - - @Override - public boolean equals(Object o) { - if (o == this) { - return true; - } - if (!(o instanceof TxnId)) { - return false; - } - TxnId txnId = (TxnId) o; - return (txnId.pkHashValue == pkHashValue && txnId.datasetId == datasetId && txnId.jobId == jobId - && pkSize == txnId.pkSize && isEqualTo(txnId)); - } - - private boolean isEqualTo(TxnId txnId) { - if (isByteArrayPKValue && txnId.isByteArrayPKValue) { - return isEqual(byteArrayPKValue, txnId.byteArrayPKValue, pkSize); - } else if (isByteArrayPKValue && (!txnId.isByteArrayPKValue)) { - return isEqual(byteArrayPKValue, txnId.tupleReferencePKValue, pkSize); - } else if ((!isByteArrayPKValue) && txnId.isByteArrayPKValue) { - return isEqual(txnId.byteArrayPKValue, tupleReferencePKValue, pkSize); - } else { - return isEqual(tupleReferencePKValue, txnId.tupleReferencePKValue, pkSize); - } - } - - private static boolean isEqual(byte[] a, byte[] b, int size) { - for (int i = 0; i < size; i++) { - if (a[i] != b[i]) { - return false; - } - } - return true; - } - - private static boolean isEqual(byte[] a, ITupleReference b, int size) { - int readOffset = b.getFieldStart(0); - byte[] readBuffer = b.getFieldData(0); - for (int i = 0; i < size; i++) { - if (a[i] != readBuffer[readOffset + i]) { - return false; - } - } - return true; - } - - private static boolean isEqual(ITupleReference a, ITupleReference b, int size) { - int aOffset = a.getFieldStart(0); - byte[] aBuffer = a.getFieldData(0); - int bOffset = b.getFieldStart(0); - byte[] bBuffer = b.getFieldData(0); - for (int i = 0; i < size; i++) { - if (aBuffer[aOffset + i] != bBuffer[bOffset + i]) { - return false; - } - } - return true; - } - - public void serialize(ByteBuffer buffer) { - buffer.putInt(jobId); - buffer.putInt(datasetId); - buffer.putInt(pkHashValue); - buffer.putInt(pkSize); - buffer.put((byte) (isByteArrayPKValue ? 1 : 0)); - if (isByteArrayPKValue) { - buffer.put(byteArrayPKValue); - } - } - - public static TxnId deserialize(ByteBuffer buffer) { - TxnId txnId = new TxnId(); - txnId.jobId = buffer.getInt(); - txnId.datasetId = buffer.getInt(); - txnId.pkHashValue = buffer.getInt(); - txnId.pkSize = buffer.getInt(); - txnId.isByteArrayPKValue = (buffer.get() == 1); - if (txnId.isByteArrayPKValue) { - byte[] byteArrayPKValue = new byte[txnId.pkSize]; - buffer.get(byteArrayPKValue); - txnId.byteArrayPKValue = byteArrayPKValue; - } - return txnId; - } - - public int getCurrentSize() { - //job id, dataset id, pkHashValue, arraySize, isByteArrayPKValue - int size = JobId.BYTES + ILogRecord.DS_LEN + LogRecord.PKHASH_LEN + LogRecord.PKSZ_LEN + Byte.BYTES; - //byte arraySize - if (isByteArrayPKValue && byteArrayPKValue != null) { - size += byteArrayPKValue.length; - } - return size; - } -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/592af654/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/JobIdFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/JobIdFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/JobIdFactory.java deleted file mode 100644 index 6e0af1c..0000000 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/JobIdFactory.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.transaction.management.service.transaction; - -import java.util.concurrent.atomic.AtomicInteger; - -import org.apache.asterix.common.transactions.JobId; - -/** - * Represents a factory to generate unique transaction IDs. - */ -public class JobIdFactory { - private static final AtomicInteger Id = new AtomicInteger(); - - public static JobId generateJobId() { - return new JobId(Id.incrementAndGet()); - } - - public static void initJobId(int id) { - Id.set(id); - } -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/592af654/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionContext.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionContext.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionContext.java index 3159e6b..1681a27 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionContext.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionContext.java @@ -30,8 +30,8 @@ import org.apache.asterix.common.exceptions.ACIDException; import org.apache.asterix.common.transactions.AbstractOperationCallback; import org.apache.asterix.common.transactions.ITransactionContext; import org.apache.asterix.common.transactions.ITransactionManager; -import org.apache.asterix.common.transactions.JobId; import org.apache.asterix.common.transactions.LogRecord; +import org.apache.asterix.common.transactions.TxnId; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType; @@ -39,7 +39,7 @@ import org.apache.hyracks.storage.common.IModificationOperationCallback; /* * An object of TransactionContext is created and accessed(read/written) by multiple threads which work for - * a single job identified by a jobId. Thus, the member variables in the object can be read/written + * a single job identified by a txnId. Thus, the member variables in the object can be read/written * concurrently. Please see each variable declaration to know which one is accessed concurrently and * which one is not. */ @@ -47,8 +47,8 @@ public class TransactionContext implements ITransactionContext { private static final long serialVersionUID = -6105616785783310111L; - // jobId is set once and read concurrently. - private final JobId jobId; + // txnId is set once and read concurrently. + private final TxnId txnId; // There are no concurrent writers on both firstLSN and lastLSN // since both values are updated by serialized log appenders. @@ -95,8 +95,8 @@ public class TransactionContext implements ITransactionContext { // creations. // also, the pool can throttle the number of concurrent active jobs at every // moment. - public TransactionContext(JobId jobId) throws ACIDException { - this.jobId = jobId; + public TransactionContext(TxnId txnId) throws ACIDException { + this.txnId = txnId; firstLSN = new AtomicLong(-1); lastLSN = new AtomicLong(-1); txnState = new AtomicInteger(ITransactionManager.ACTIVE); @@ -180,8 +180,8 @@ public class TransactionContext implements ITransactionContext { } @Override - public JobId getJobId() { - return jobId; + public TxnId getTxnId() { + return txnId; } @Override @@ -206,7 +206,7 @@ public class TransactionContext implements ITransactionContext { @Override public int hashCode() { - return jobId.getId(); + return Long.hashCode(txnId.getId()); } @Override @@ -227,7 +227,7 @@ public class TransactionContext implements ITransactionContext { @Override public String prettyPrint() { StringBuilder sb = new StringBuilder(); - sb.append("\n" + jobId + "\n"); + sb.append("\n" + txnId + "\n"); sb.append("isWriteTxn: " + isWriteTxn + "\n"); sb.append("firstLSN: " + firstLSN.get() + "\n"); sb.append("lastLSN: " + lastLSN.get() + "\n"); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/592af654/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java index c9a1bad..1799ea1 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java @@ -22,7 +22,7 @@ import java.io.OutputStream; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.logging.Level; import java.util.logging.Logger; @@ -31,7 +31,7 @@ import org.apache.asterix.common.transactions.DatasetId; import org.apache.asterix.common.transactions.ITransactionContext; import org.apache.asterix.common.transactions.ITransactionManager; import org.apache.asterix.common.transactions.ITransactionSubsystem; -import org.apache.asterix.common.transactions.JobId; +import org.apache.asterix.common.transactions.TxnId; import org.apache.asterix.common.transactions.LogRecord; import org.apache.asterix.common.utils.TransactionUtil; import org.apache.hyracks.api.lifecycle.ILifeCycleComponent; @@ -45,8 +45,8 @@ public class TransactionManager implements ITransactionManager, ILifeCycleCompon public static final boolean IS_DEBUG_MODE = false;//true private static final Logger LOGGER = Logger.getLogger(TransactionManager.class.getName()); private final ITransactionSubsystem txnSubsystem; - private Map<JobId, ITransactionContext> transactionContextRepository = new ConcurrentHashMap<>(); - private AtomicInteger maxJobId = new AtomicInteger(0); + private Map<TxnId, ITransactionContext> transactionContextRepository = new ConcurrentHashMap<>(); + private AtomicLong maxTxnId = new AtomicLong(0); public TransactionManager(ITransactionSubsystem provider) { this.txnSubsystem = provider; @@ -74,30 +74,30 @@ public class TransactionManager implements ITransactionManager, ILifeCycleCompon } finally { txnCtx.complete(); txnSubsystem.getLockManager().releaseLocks(txnCtx); - transactionContextRepository.remove(txnCtx.getJobId()); + transactionContextRepository.remove(txnCtx.getTxnId()); } } @Override - public ITransactionContext beginTransaction(JobId jobId) throws ACIDException { - return getTransactionContext(jobId, true); + public ITransactionContext beginTransaction(TxnId txnId) throws ACIDException { + return getTransactionContext(txnId, true); } @Override - public ITransactionContext getTransactionContext(JobId jobId, boolean createIfNotExist) throws ACIDException { - setMaxJobId(jobId.getId()); - ITransactionContext txnCtx = transactionContextRepository.get(jobId); + public ITransactionContext getTransactionContext(TxnId txnId, boolean createIfNotExist) throws ACIDException { + setMaxTxnId(txnId.getId()); + ITransactionContext txnCtx = transactionContextRepository.get(txnId); if (txnCtx == null) { if (createIfNotExist) { synchronized (this) { - txnCtx = transactionContextRepository.get(jobId); + txnCtx = transactionContextRepository.get(txnId); if (txnCtx == null) { - txnCtx = new TransactionContext(jobId); - transactionContextRepository.put(jobId, txnCtx); + txnCtx = new TransactionContext(txnId); + transactionContextRepository.put(txnId, txnCtx); } } } else { - throw new ACIDException("TransactionContext of " + jobId + " doesn't exist."); + throw new ACIDException("TransactionContext of " + txnId + " doesn't exist."); } } return txnCtx; @@ -115,13 +115,13 @@ public class TransactionManager implements ITransactionManager, ILifeCycleCompon } } catch (Exception ae) { if (LOGGER.isLoggable(Level.SEVERE)) { - LOGGER.severe(" caused exception in commit !" + txnCtx.getJobId()); + LOGGER.severe(" caused exception in commit !" + txnCtx.getTxnId()); } throw ae; } finally { txnCtx.complete(); txnSubsystem.getLockManager().releaseLocks(txnCtx); - transactionContextRepository.remove(txnCtx.getJobId()); + transactionContextRepository.remove(txnCtx.getTxnId()); txnCtx.setTxnState(ITransactionManager.COMMITTED); } } @@ -141,16 +141,16 @@ public class TransactionManager implements ITransactionManager, ILifeCycleCompon return txnSubsystem; } - public void setMaxJobId(int jobId) { - int maxId = maxJobId.get(); - if (jobId > maxId) { - maxJobId.compareAndSet(maxId, jobId); + public void setMaxTxnId(long txnId) { + long maxId = maxTxnId.get(); + if (txnId > maxId) { + maxTxnId.compareAndSet(maxId, txnId); } } @Override - public int getMaxJobId() { - return maxJobId.get(); + public long getMaxTxnId() { + return maxTxnId.get(); } @Override @@ -172,19 +172,19 @@ public class TransactionManager implements ITransactionManager, ILifeCycleCompon } private void dumpTxnContext(OutputStream os) { - JobId jobId; + TxnId txnId; ITransactionContext txnCtx; StringBuilder sb = new StringBuilder(); try { sb.append("\n>>dump_begin\t>>----- [ConfVars] -----"); - Set<Map.Entry<JobId, ITransactionContext>> entrySet = transactionContextRepository.entrySet(); + Set<Map.Entry<TxnId, ITransactionContext>> entrySet = transactionContextRepository.entrySet(); if (entrySet != null) { - for (Map.Entry<JobId, ITransactionContext> entry : entrySet) { + for (Map.Entry<TxnId, ITransactionContext> entry : entrySet) { if (entry != null) { - jobId = entry.getKey(); - if (jobId != null) { - sb.append("\n" + jobId); + txnId = entry.getKey(); + if (txnId != null) { + sb.append("\n" + txnId); } else { sb.append("\nJID:null"); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/592af654/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TxnIdFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TxnIdFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TxnIdFactory.java new file mode 100644 index 0000000..71d7f56 --- /dev/null +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TxnIdFactory.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.transaction.management.service.transaction; + +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.asterix.common.transactions.TxnId; + +/** + * Represents a factory to generate unique transaction IDs. + */ +public class TxnIdFactory { + + private static final AtomicLong id = new AtomicLong(); + + private TxnIdFactory() { + } + + public static TxnId create() { + return new TxnId(id.incrementAndGet()); + } + + public static void ensureMinimumId(long id) { + TxnIdFactory.id.set(id); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/592af654/asterixdb/asterix-transactions/src/test/java/org/apache/asterix/transaction/management/service/locking/LockManagerUnitTest.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-transactions/src/test/java/org/apache/asterix/transaction/management/service/locking/LockManagerUnitTest.java b/asterixdb/asterix-transactions/src/test/java/org/apache/asterix/transaction/management/service/locking/LockManagerUnitTest.java index 14e4020..64ac3cb 100644 --- a/asterixdb/asterix-transactions/src/test/java/org/apache/asterix/transaction/management/service/locking/LockManagerUnitTest.java +++ b/asterixdb/asterix-transactions/src/test/java/org/apache/asterix/transaction/management/service/locking/LockManagerUnitTest.java @@ -36,7 +36,7 @@ import java.util.logging.Logger; import org.apache.asterix.common.transactions.DatasetId; import org.apache.asterix.common.transactions.ILockManager; import org.apache.asterix.common.transactions.ITransactionContext; -import org.apache.asterix.common.transactions.JobId; +import org.apache.asterix.common.transactions.TxnId; import org.apache.asterix.transaction.management.service.locking.Request.Kind; import org.apache.asterix.transaction.management.service.transaction.TransactionManagementConstants.LockManagerConstants.LockMode; import org.junit.After; @@ -300,7 +300,7 @@ public class LockManagerUnitTest { * @return throwable for said error */ private static Throwable getError(Map<String, Throwable> errors, ITransactionContext txnCtx) { - return errors.get(txnCtx.getJobId().toString()); + return errors.get(txnCtx.getTxnId().toString()); } /** @@ -318,7 +318,7 @@ public class LockManagerUnitTest { Throwable error = getError(errors, txnCtx); if (error == null) { throw new AssertionError( - "expected " + clazz.getSimpleName() + " for " + txnCtx.getJobId() + ", got no " + "exception"); + "expected " + clazz.getSimpleName() + " for " + txnCtx.getTxnId() + ", got no " + "exception"); } if (!clazz.isInstance(error)) { throw new AssertionError(error); @@ -354,7 +354,7 @@ public class LockManagerUnitTest { private ITransactionContext j(int jId) { if (!jobId2TxnCtxMap.containsKey(jId)) { ITransactionContext mockTxnContext = mock(ITransactionContext.class); - when(mockTxnContext.getJobId()).thenReturn(new JobId(jId)); + when(mockTxnContext.getTxnId()).thenReturn(new TxnId(jId)); jobId2TxnCtxMap.put(jId, mockTxnContext); } return jobId2TxnCtxMap.get(jId); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/592af654/asterixdb/asterix-transactions/src/test/java/org/apache/asterix/transaction/management/service/locking/Locker.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-transactions/src/test/java/org/apache/asterix/transaction/management/service/locking/Locker.java b/asterixdb/asterix-transactions/src/test/java/org/apache/asterix/transaction/management/service/locking/Locker.java index 97b4f8a..ef7d40e 100644 --- a/asterixdb/asterix-transactions/src/test/java/org/apache/asterix/transaction/management/service/locking/Locker.java +++ b/asterixdb/asterix-transactions/src/test/java/org/apache/asterix/transaction/management/service/locking/Locker.java @@ -71,7 +71,7 @@ class Locker implements Runnable { */ Locker(ILockManager lockMgr, ITransactionContext txnCtx, List<Request> allRequests, AtomicInteger time, PrintStream err) { - this.name = txnCtx == null ? "admin" : txnCtx.getJobId().toString(); + this.name = txnCtx == null ? "admin" : txnCtx.getTxnId().toString(); this.lockMgr = lockMgr; this.requests = new LinkedList<>(); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/592af654/asterixdb/asterix-transactions/src/test/java/org/apache/asterix/transaction/management/service/locking/Request.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-transactions/src/test/java/org/apache/asterix/transaction/management/service/locking/Request.java b/asterixdb/asterix-transactions/src/test/java/org/apache/asterix/transaction/management/service/locking/Request.java index fd4dae5..112dc5f 100644 --- a/asterixdb/asterix-transactions/src/test/java/org/apache/asterix/transaction/management/service/locking/Request.java +++ b/asterixdb/asterix-transactions/src/test/java/org/apache/asterix/transaction/management/service/locking/Request.java @@ -58,7 +58,7 @@ abstract class Request { String asString(final Kind kind, final ITransactionContext txnCtx, final DatasetId dsId, final int hashValue, final byte lockMode) { - return txnCtx.getJobId() + ":" + kind.name() + ":" + dsId.getId() + ":" + hashValue + ":" + return txnCtx.getTxnId() + ":" + kind.name() + ":" + dsId.getId() + ":" + hashValue + ":" + TransactionManagementConstants.LockManagerConstants.LockMode.toString(lockMode); } @@ -147,7 +147,7 @@ abstract class Request { @Override public String toString() { - return txnCtx.getJobId().toString() + ":" + kind.name(); + return txnCtx.getTxnId().toString() + ":" + kind.name(); } }; }
