http://git-wip-us.apache.org/repos/asf/asterixdb/blob/592af654/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryInvertedIndexOperationsHelper.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryInvertedIndexOperationsHelper.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryInvertedIndexOperationsHelper.java index 077e431..3626f16 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryInvertedIndexOperationsHelper.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryInvertedIndexOperationsHelper.java @@ -22,7 +22,7 @@ import org.apache.asterix.common.config.DatasetConfig.IndexType; import org.apache.asterix.common.config.GlobalConfig; import org.apache.asterix.common.exceptions.CompilationException; import org.apache.asterix.common.exceptions.ErrorCode; -import org.apache.asterix.common.transactions.JobId; +import org.apache.asterix.common.transactions.TxnId; import org.apache.asterix.dataflow.data.nontagged.MissingWriterFactory; import org.apache.asterix.metadata.declared.MetadataProvider; import org.apache.asterix.metadata.entities.Dataset; @@ -208,14 +208,14 @@ public class SecondaryInvertedIndexOperationsHelper extends SecondaryTreeIndexOp @Override public JobSpecification buildLoadingJobSpec() throws AlgebricksException { JobSpecification spec = RuntimeUtils.createJobSpecification(metadataProvider.getApplicationContext()); - JobId jobId = IndexUtil.bindJobEventListener(spec, metadataProvider); + TxnId txnId = IndexUtil.bindJobEventListener(spec, metadataProvider); // Create dummy key provider for feeding the primary index scan. IOperatorDescriptor keyProviderOp = DatasetUtil.createDummyKeyProviderOp(spec, dataset, metadataProvider); // Create primary index scan op. IOperatorDescriptor primaryScanOp = - DatasetUtil.createPrimaryIndexScanOp(spec, metadataProvider, dataset, jobId); + DatasetUtil.createPrimaryIndexScanOp(spec, metadataProvider, dataset, txnId); IOperatorDescriptor sourceOp = primaryScanOp; boolean isOverridingKeyFieldTypes = index.isOverridingKeyFieldTypes();
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/592af654/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryRTreeOperationsHelper.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryRTreeOperationsHelper.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryRTreeOperationsHelper.java index b11b527..613df21 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryRTreeOperationsHelper.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryRTreeOperationsHelper.java @@ -23,7 +23,7 @@ import java.util.List; import org.apache.asterix.common.config.DatasetConfig.DatasetType; import org.apache.asterix.common.config.GlobalConfig; import org.apache.asterix.common.exceptions.AsterixException; -import org.apache.asterix.common.transactions.JobId; +import org.apache.asterix.common.transactions.TxnId; import org.apache.asterix.external.indexing.IndexingConstants; import org.apache.asterix.external.operators.ExternalScanOperatorDescriptor; import org.apache.asterix.formats.nontagged.BinaryComparatorFactoryProvider; @@ -201,11 +201,11 @@ public class SecondaryRTreeOperationsHelper extends SecondaryTreeIndexOperations if (dataset.getDatasetType() == DatasetType.INTERNAL) { // Create dummy key provider for feeding the primary index scan. IOperatorDescriptor keyProviderOp = DatasetUtil.createDummyKeyProviderOp(spec, dataset, metadataProvider); - JobId jobId = IndexUtil.bindJobEventListener(spec, metadataProvider); + TxnId txnId = IndexUtil.bindJobEventListener(spec, metadataProvider); // Create primary index scan op. IOperatorDescriptor primaryScanOp = DatasetUtil.createPrimaryIndexScanOp(spec, metadataProvider, dataset, - jobId); + txnId); // Assign op. IOperatorDescriptor sourceOp = primaryScanOp; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/592af654/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/valueextractors/MetadataEntityValueExtractor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/valueextractors/MetadataEntityValueExtractor.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/valueextractors/MetadataEntityValueExtractor.java index 40623bd..54d94a9 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/valueextractors/MetadataEntityValueExtractor.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/valueextractors/MetadataEntityValueExtractor.java @@ -21,8 +21,7 @@ package org.apache.asterix.metadata.valueextractors; import java.rmi.RemoteException; -import org.apache.asterix.common.exceptions.MetadataException; -import org.apache.asterix.common.transactions.JobId; +import org.apache.asterix.common.transactions.TxnId; import org.apache.asterix.metadata.api.IMetadataEntityTupleTranslator; import org.apache.asterix.metadata.api.IValueExtractor; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; @@ -40,7 +39,7 @@ public class MetadataEntityValueExtractor<T> implements IValueExtractor<T> { } @Override - public T getValue(JobId jobId, ITupleReference tuple) + public T getValue(TxnId txnId, ITupleReference tuple) throws AlgebricksException, HyracksDataException, RemoteException { return tupleReaderWriter.getMetadataEntityFromTuple(tuple); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/592af654/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/valueextractors/NestedDatatypeNameValueExtractor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/valueextractors/NestedDatatypeNameValueExtractor.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/valueextractors/NestedDatatypeNameValueExtractor.java index 22aea26..cf02930 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/valueextractors/NestedDatatypeNameValueExtractor.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/valueextractors/NestedDatatypeNameValueExtractor.java @@ -24,7 +24,7 @@ import java.io.DataInput; import java.io.DataInputStream; import java.io.IOException; -import org.apache.asterix.common.transactions.JobId; +import org.apache.asterix.common.transactions.TxnId; import org.apache.asterix.metadata.api.IValueExtractor; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.api.exceptions.HyracksDataException; @@ -49,7 +49,7 @@ public class NestedDatatypeNameValueExtractor implements IValueExtractor<String> private final UTF8StringReader reader = new UTF8StringReader(); @Override - public String getValue(JobId jobId, ITupleReference tuple) throws AlgebricksException, HyracksDataException { + public String getValue(TxnId txnId, ITupleReference tuple) throws AlgebricksException, HyracksDataException { byte[] serRecord = tuple.getFieldData(2); int recordStartOffset = tuple.getFieldStart(2); int recordLength = tuple.getFieldLength(2); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/592af654/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/valueextractors/TupleCopyValueExtractor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/valueextractors/TupleCopyValueExtractor.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/valueextractors/TupleCopyValueExtractor.java index 5f16543..8960ba6 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/valueextractors/TupleCopyValueExtractor.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/valueextractors/TupleCopyValueExtractor.java @@ -21,7 +21,7 @@ package org.apache.asterix.metadata.valueextractors; import java.nio.ByteBuffer; -import org.apache.asterix.common.transactions.JobId; +import org.apache.asterix.common.transactions.TxnId; import org.apache.asterix.metadata.api.IValueExtractor; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.api.dataflow.value.ITypeTraits; @@ -48,7 +48,7 @@ public class TupleCopyValueExtractor implements IValueExtractor<ITupleReference> } @Override - public ITupleReference getValue(JobId jobId, ITupleReference tuple) + public ITupleReference getValue(TxnId txnId, ITupleReference tuple) throws AlgebricksException, HyracksDataException { int numBytes = tupleWriter.bytesRequired(tuple); tupleBytes = new byte[numBytes]; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/592af654/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java index 5e58802..9d8c351 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java @@ -470,7 +470,7 @@ public class ReplicationChannel extends Thread implements IReplicationChannel { case LogType.JOB_COMMIT: case LogType.ABORT: LogRecord jobTerminationLog = new LogRecord(); - TransactionUtil.formJobTerminateLogRecord(jobTerminationLog, remoteLog.getJobId(), + TransactionUtil.formJobTerminateLogRecord(jobTerminationLog, remoteLog.getTxnId(), remoteLog.getLogType() == LogType.JOB_COMMIT); jobTerminationLog.setReplicationThread(this); jobTerminationLog.setLogSource(LogSource.REMOTE); @@ -523,7 +523,7 @@ public class ReplicationChannel extends Thread implements IReplicationChannel { LogRecord logRecord = pendingNotificationRemoteLogsQ.take(); //send ACK to requester logRecord.getReplicationThread().getReplicationClientSocket().socket().getOutputStream() - .write((localNodeID + ReplicationProtocol.JOB_REPLICATION_ACK + logRecord.getJobId() + .write((localNodeID + ReplicationProtocol.JOB_REPLICATION_ACK + logRecord.getTxnId() + System.lineSeparator()).getBytes()); } catch (InterruptedException e) { Thread.currentThread().interrupt(); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/592af654/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java index 126114b..b0aa0fb 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java @@ -99,8 +99,8 @@ public class ReplicationManager implements IReplicationManager { private static final int MAX_JOB_COMMIT_ACK_WAIT = 10000; private final String nodeId; private ExecutorService replicationListenerThreads; - private final Map<Integer, Set<String>> jobCommitAcks; - private final Map<Integer, ILogRecord> replicationJobsPendingAcks; + private final Map<Long, Set<String>> txnCommitAcks; + private final Map<Long, ILogRecord> replicationTxnsPendingAcks; private ByteBuffer dataBuffer; private final LinkedBlockingQueue<IReplicationJob> replicationJobsQ; private final LinkedBlockingQueue<ReplicaEvent> replicaEventsQ; @@ -157,8 +157,8 @@ public class ReplicationManager implements IReplicationManager { terminateJobsReplication = new AtomicBoolean(false); jobsReplicationSuspended = new AtomicBoolean(true); replicationSuspended = new AtomicBoolean(true); - jobCommitAcks = new ConcurrentHashMap<>(); - replicationJobsPendingAcks = new ConcurrentHashMap<>(); + txnCommitAcks = new ConcurrentHashMap<>(); + replicationTxnsPendingAcks = new ConcurrentHashMap<>(); shuttingDownReplicaIds = new HashSet<>(); dataBuffer = ByteBuffer.allocate(INITIAL_BUFFER_SIZE); replicationMonitor = new ReplicasEventsMonitor(); @@ -228,7 +228,7 @@ public class ReplicationManager implements IReplicationManager { } Set<String> replicaIds = Collections.synchronizedSet(new HashSet<String>()); replicaIds.add(nodeId); - jobCommitAcks.put(logRecord.getJobId(), replicaIds); + txnCommitAcks.put(logRecord.getTxnId(), replicaIds); } appendToLogBuffer(logRecord); @@ -580,16 +580,16 @@ public class ReplicationManager implements IReplicationManager { //wait for any ACK to arrive before closing sockets. if (logsRepSockets != null) { - synchronized (jobCommitAcks) { + synchronized (txnCommitAcks) { try { long waitStartTime = System.currentTimeMillis(); - while (!jobCommitAcks.isEmpty()) { - jobCommitAcks.wait(1000); + while (!txnCommitAcks.isEmpty()) { + txnCommitAcks.wait(1000); long waitDuration = System.currentTimeMillis() - waitStartTime; if (waitDuration > MAX_JOB_COMMIT_ACK_WAIT) { LOGGER.log(Level.SEVERE, - "Timeout before receving all job ACKs from replicas. Pending jobs (" - + jobCommitAcks.keySet().toString() + ")"); + "Timeout before receving all job ACKs from replicas. Pending txns (" + + txnCommitAcks.keySet().toString() + ")"); break; } } @@ -747,9 +747,9 @@ public class ReplicationManager implements IReplicationManager { if (newState == ReplicaState.DEAD) { //assume the dead replica ACK has been received for all pending jobs - synchronized (jobCommitAcks) { - for (Integer jobId : jobCommitAcks.keySet()) { - addAckToJob(jobId, replicaId); + synchronized (txnCommitAcks) { + for (Long txnId : txnCommitAcks.keySet()) { + addAckToJob(txnId, replicaId); } } } @@ -777,27 +777,27 @@ public class ReplicationManager implements IReplicationManager { /** * When an ACK for a JOB_COMMIT is received, it is added to the corresponding job. * - * @param jobId + * @param txnId * @param replicaId * The remote replica id the ACK received from. */ - private void addAckToJob(int jobId, String replicaId) { - synchronized (jobCommitAcks) { + private void addAckToJob(long txnId, String replicaId) { + synchronized (txnCommitAcks) { //add ACK to the job - if (jobCommitAcks.containsKey(jobId)) { - Set<String> replicaIds = jobCommitAcks.get(jobId); + if (txnCommitAcks.containsKey(txnId)) { + Set<String> replicaIds = txnCommitAcks.get(txnId); replicaIds.add(replicaId); } else { if (LOGGER.isLoggable(Level.WARNING)) { - LOGGER.warning("Invalid job replication ACK received for jobId(" + jobId + ")"); + LOGGER.warning("Invalid job replication ACK received for txnId(" + txnId + ")"); } return; } //if got ACKs from all remote replicas, notify pending jobs if any - if (jobCommitAcks.get(jobId).size() == replicationFactor && replicationJobsPendingAcks.containsKey(jobId)) { - ILogRecord pendingLog = replicationJobsPendingAcks.get(jobId); + if (txnCommitAcks.get(txnId).size() == replicationFactor && replicationTxnsPendingAcks.containsKey(txnId)) { + ILogRecord pendingLog = replicationTxnsPendingAcks.get(txnId); synchronized (pendingLog) { pendingLog.notifyAll(); } @@ -807,23 +807,23 @@ public class ReplicationManager implements IReplicationManager { @Override public boolean hasBeenReplicated(ILogRecord logRecord) { - int jobId = logRecord.getJobId(); - if (jobCommitAcks.containsKey(jobId)) { - synchronized (jobCommitAcks) { + long txnId = logRecord.getTxnId(); + if (txnCommitAcks.containsKey(txnId)) { + synchronized (txnCommitAcks) { //check if all ACKs have been received - if (jobCommitAcks.get(jobId).size() == replicationFactor) { - jobCommitAcks.remove(jobId); + if (txnCommitAcks.get(txnId).size() == replicationFactor) { + txnCommitAcks.remove(txnId); //remove from pending jobs if exists - replicationJobsPendingAcks.remove(jobId); + replicationTxnsPendingAcks.remove(txnId); //notify any threads waiting for all jobs to finish - if (jobCommitAcks.size() == 0) { - jobCommitAcks.notifyAll(); + if (txnCommitAcks.size() == 0) { + txnCommitAcks.notifyAll(); } return true; } else { - replicationJobsPendingAcks.putIfAbsent(jobId, logRecord); + replicationTxnsPendingAcks.putIfAbsent(txnId, logRecord); return false; } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/592af654/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/JobEventListenerFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/JobEventListenerFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/JobEventListenerFactory.java index 2749b5a..1422e42 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/JobEventListenerFactory.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/JobEventListenerFactory.java @@ -23,7 +23,7 @@ import org.apache.asterix.common.exceptions.ACIDException; import org.apache.asterix.common.transactions.DatasetId; import org.apache.asterix.common.transactions.ITransactionContext; import org.apache.asterix.common.transactions.ITransactionManager; -import org.apache.asterix.common.transactions.JobId; +import org.apache.asterix.common.transactions.TxnId; import org.apache.hyracks.api.context.IHyracksJobletContext; import org.apache.hyracks.api.job.IJobletEventListener; import org.apache.hyracks.api.job.IJobletEventListenerFactory; @@ -32,16 +32,16 @@ import org.apache.hyracks.api.job.JobStatus; public class JobEventListenerFactory implements IJobletEventListenerFactory { private static final long serialVersionUID = 1L; - private final JobId jobId; + private final TxnId txnId; private final boolean transactionalWrite; - public JobEventListenerFactory(JobId jobId, boolean transactionalWrite) { - this.jobId = jobId; + public JobEventListenerFactory(TxnId txnId, boolean transactionalWrite) { + this.txnId = txnId; this.transactionalWrite = transactionalWrite; } - public JobId getJobId() { - return jobId; + public TxnId getTxnId() { + return txnId; } @Override @@ -53,7 +53,7 @@ public class JobEventListenerFactory implements IJobletEventListenerFactory { try { ITransactionManager txnManager = ((INcApplicationContext) jobletContext.getServiceContext() .getApplicationContext()).getTransactionSubsystem().getTransactionManager(); - ITransactionContext txnContext = txnManager.getTransactionContext(jobId, false); + ITransactionContext txnContext = txnManager.getTransactionContext(txnId, false); txnContext.setWriteTxn(transactionalWrite); txnManager.completedTransaction(txnContext, DatasetId.NULL, -1, !(jobStatus == JobStatus.FAILURE)); @@ -66,7 +66,7 @@ public class JobEventListenerFactory implements IJobletEventListenerFactory { public void jobletStart() { try { ((INcApplicationContext) jobletContext.getServiceContext().getApplicationContext()) - .getTransactionSubsystem().getTransactionManager().getTransactionContext(jobId, true); + .getTransactionSubsystem().getTransactionManager().getTransactionContext(txnId, true); } catch (ACIDException e) { throw new Error(e); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/592af654/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/MultiTransactionJobletEventListenerFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/MultiTransactionJobletEventListenerFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/MultiTransactionJobletEventListenerFactory.java index f41f326..a63f3ca 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/MultiTransactionJobletEventListenerFactory.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/MultiTransactionJobletEventListenerFactory.java @@ -25,7 +25,7 @@ import org.apache.asterix.common.exceptions.ACIDException; import org.apache.asterix.common.transactions.DatasetId; import org.apache.asterix.common.transactions.ITransactionContext; import org.apache.asterix.common.transactions.ITransactionManager; -import org.apache.asterix.common.transactions.JobId; +import org.apache.asterix.common.transactions.TxnId; import org.apache.hyracks.api.context.IHyracksJobletContext; import org.apache.hyracks.api.job.IJobletEventListener; import org.apache.hyracks.api.job.IJobletEventListenerFactory; @@ -38,11 +38,11 @@ import org.apache.hyracks.api.job.JobStatus; public class MultiTransactionJobletEventListenerFactory implements IJobletEventListenerFactory { private static final long serialVersionUID = 1L; - private final List<JobId> jobIds; + private final List<TxnId> txnIds; private final boolean transactionalWrite; - public MultiTransactionJobletEventListenerFactory(List<JobId> jobIds, boolean transactionalWrite) { - this.jobIds = jobIds; + public MultiTransactionJobletEventListenerFactory(List<TxnId> txnIds, boolean transactionalWrite) { + this.txnIds = txnIds; this.transactionalWrite = transactionalWrite; } @@ -56,8 +56,8 @@ public class MultiTransactionJobletEventListenerFactory implements IJobletEventL ITransactionManager txnManager = ((INcApplicationContext) jobletContext.getServiceContext().getApplicationContext()) .getTransactionSubsystem().getTransactionManager(); - for (JobId jobId : jobIds) { - ITransactionContext txnContext = txnManager.getTransactionContext(jobId, false); + for (TxnId txnId : txnIds) { + ITransactionContext txnContext = txnManager.getTransactionContext(txnId, false); txnContext.setWriteTxn(transactionalWrite); txnManager.completedTransaction(txnContext, DatasetId.NULL, -1, !(jobStatus == JobStatus.FAILURE)); @@ -70,9 +70,9 @@ public class MultiTransactionJobletEventListenerFactory implements IJobletEventL @Override public void jobletStart() { try { - for (JobId jobId : jobIds) { + for (TxnId txnId : txnIds) { ((INcApplicationContext) jobletContext.getServiceContext().getApplicationContext()) - .getTransactionSubsystem().getTransactionManager().getTransactionContext(jobId, true); + .getTransactionSubsystem().getTransactionManager().getTransactionContext(txnId, true); } } catch (ACIDException e) { throw new Error(e); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/592af654/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/std/FlushDatasetOperatorDescriptor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/std/FlushDatasetOperatorDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/std/FlushDatasetOperatorDescriptor.java index f2deb74..6f7287b 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/std/FlushDatasetOperatorDescriptor.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/std/FlushDatasetOperatorDescriptor.java @@ -28,7 +28,7 @@ import org.apache.asterix.common.transactions.ILockManager; import org.apache.asterix.common.transactions.ITransactionContext; import org.apache.asterix.common.transactions.ITransactionManager; import org.apache.asterix.common.transactions.ImmutableDatasetId; -import org.apache.asterix.common.transactions.JobId; +import org.apache.asterix.common.transactions.TxnId; import org.apache.asterix.transaction.management.service.transaction.TransactionManagementConstants.LockManagerConstants.LockMode; import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.dataflow.IOperatorNodePushable; @@ -40,12 +40,12 @@ import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePu public class FlushDatasetOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor { private static final long serialVersionUID = 1L; - private final JobId jobId; + private final TxnId txnId; private final DatasetId datasetId; - public FlushDatasetOperatorDescriptor(IOperatorDescriptorRegistry spec, JobId jobId, int datasetId) { + public FlushDatasetOperatorDescriptor(IOperatorDescriptorRegistry spec, TxnId txnId, int datasetId) { super(spec, 1, 0); - this.jobId = jobId; + this.txnId = txnId; this.datasetId = new ImmutableDatasetId(datasetId); } @@ -78,7 +78,7 @@ public class FlushDatasetOperatorDescriptor extends AbstractSingleActivityOperat ILockManager lockManager = appCtx.getTransactionSubsystem().getLockManager(); ITransactionManager txnManager = appCtx.getTransactionSubsystem().getTransactionManager(); // get the local transaction - ITransactionContext txnCtx = txnManager.getTransactionContext(jobId, false); + ITransactionContext txnCtx = txnManager.getTransactionContext(txnId, false); // lock the dataset granule lockManager.lock(datasetId, -1, LockMode.S, txnCtx); // flush the dataset synchronously http://git-wip-us.apache.org/repos/asf/asterixdb/blob/592af654/asterixdb/asterix-tools/src/test/java/org/apache/asterix/tools/datagen/AdmDataGen.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-tools/src/test/java/org/apache/asterix/tools/datagen/AdmDataGen.java b/asterixdb/asterix-tools/src/test/java/org/apache/asterix/tools/datagen/AdmDataGen.java index 818444e..09a898a 100644 --- a/asterixdb/asterix-tools/src/test/java/org/apache/asterix/tools/datagen/AdmDataGen.java +++ b/asterixdb/asterix-tools/src/test/java/org/apache/asterix/tools/datagen/AdmDataGen.java @@ -46,7 +46,7 @@ import org.apache.asterix.common.annotations.RecordDataGenAnnotation; import org.apache.asterix.common.annotations.TypeDataGen; import org.apache.asterix.common.annotations.UndeclaredFieldsDataGen; import org.apache.asterix.common.exceptions.ACIDException; -import org.apache.asterix.common.transactions.JobId; +import org.apache.asterix.common.transactions.TxnId; import org.apache.asterix.lang.aql.parser.AQLParserFactory; import org.apache.asterix.lang.aql.parser.ParseException; import org.apache.asterix.lang.common.base.IParser; @@ -941,7 +941,7 @@ public class AdmDataGen { List<Statement> statements = parser.parse(); aql.close(); // TODO: Need to fix how to use transactions here. - MetadataTransactionContext mdTxnCtx = new MetadataTransactionContext(new JobId(-1)); + MetadataTransactionContext mdTxnCtx = new MetadataTransactionContext(new TxnId(-1)); ADGenDmlTranslator dmlt = new ADGenDmlTranslator(mdTxnCtx, statements); dmlt.translate(); typeMap = dmlt.getTypeMap(); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/592af654/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/AbstractIndexModificationOperationCallback.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/AbstractIndexModificationOperationCallback.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/AbstractIndexModificationOperationCallback.java index 1acc235..3a2f195 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/AbstractIndexModificationOperationCallback.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/AbstractIndexModificationOperationCallback.java @@ -85,7 +85,7 @@ public abstract class AbstractIndexModificationOperationCallback extends Abstrac logRecord = new LogRecord(); logRecord.setTxnCtx(txnCtx); logRecord.setLogType(LogType.UPDATE); - logRecord.setJobId(txnCtx.getJobId().getId()); + logRecord.setTxnId(txnCtx.getTxnId().getId()); logRecord.setDatasetId(datasetId.getId()); logRecord.setResourceId(resourceId); logRecord.setResourcePartition(resourcePartition); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/592af654/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallback.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallback.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallback.java index f76cb89..c97fb1b 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallback.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallback.java @@ -55,7 +55,7 @@ public class LockThenSearchOperationCallback extends AbstractOperationCallback i logRecord.setTxnCtx(txnCtx); logRecord.setLogSource(LogSource.LOCAL); logRecord.setLogType(LogType.WAIT); - logRecord.setJobId(txnCtx.getJobId().getId()); + logRecord.setTxnId(txnCtx.getTxnId().getId()); logRecord.computeAndSetLogSize(); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/592af654/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallbackFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallbackFactory.java index b3d4f03..73b9b41 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallbackFactory.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallbackFactory.java @@ -24,7 +24,7 @@ import org.apache.asterix.common.transactions.AbstractOperationCallbackFactory; import org.apache.asterix.common.transactions.DatasetId; import org.apache.asterix.common.transactions.ITransactionContext; import org.apache.asterix.common.transactions.ITransactionSubsystem; -import org.apache.asterix.common.transactions.JobId; +import org.apache.asterix.common.transactions.TxnId; import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.dataflow.IOperatorNodePushable; import org.apache.hyracks.api.exceptions.HyracksDataException; @@ -35,9 +35,9 @@ public class LockThenSearchOperationCallbackFactory extends AbstractOperationCal private static final long serialVersionUID = 1L; - public LockThenSearchOperationCallbackFactory(JobId jobId, int datasetId, int[] entityIdFields, + public LockThenSearchOperationCallbackFactory(TxnId txnId, int datasetId, int[] entityIdFields, ITransactionSubsystemProvider txnSubsystemProvider, byte resourceType) { - super(jobId, datasetId, entityIdFields, txnSubsystemProvider, resourceType); + super(txnId, datasetId, entityIdFields, txnSubsystemProvider, resourceType); } @Override @@ -45,7 +45,7 @@ public class LockThenSearchOperationCallbackFactory extends AbstractOperationCal IOperatorNodePushable operatorNodePushable) throws HyracksDataException { ITransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx); try { - ITransactionContext txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(jobId, false); + ITransactionContext txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(txnId, false); return new LockThenSearchOperationCallback(new DatasetId(datasetId), primaryKeyFields, txnSubsystem, txnCtx, operatorNodePushable); } catch (ACIDException e) { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/592af654/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallbackFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallbackFactory.java index 0d447a9..dbf58e4 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallbackFactory.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallbackFactory.java @@ -25,7 +25,7 @@ import org.apache.asterix.common.transactions.AbstractOperationCallbackFactory; import org.apache.asterix.common.transactions.DatasetId; import org.apache.asterix.common.transactions.ITransactionContext; import org.apache.asterix.common.transactions.ITransactionSubsystem; -import org.apache.asterix.common.transactions.JobId; +import org.apache.asterix.common.transactions.TxnId; import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.dataflow.IOperatorNodePushable; import org.apache.hyracks.api.exceptions.HyracksDataException; @@ -37,9 +37,9 @@ public class PrimaryIndexInstantSearchOperationCallbackFactory extends AbstractO private static final long serialVersionUID = 1L; - public PrimaryIndexInstantSearchOperationCallbackFactory(JobId jobId, int datasetId, int[] entityIdFields, + public PrimaryIndexInstantSearchOperationCallbackFactory(TxnId txnId, int datasetId, int[] entityIdFields, ITransactionSubsystemProvider txnSubsystemProvider, byte resourceType) { - super(jobId, datasetId, entityIdFields, txnSubsystemProvider, resourceType); + super(txnId, datasetId, entityIdFields, txnSubsystemProvider, resourceType); } @Override @@ -47,7 +47,7 @@ public class PrimaryIndexInstantSearchOperationCallbackFactory extends AbstractO throws HyracksDataException { ITransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx); try { - ITransactionContext txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(jobId, false); + ITransactionContext txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(txnId, false); return new PrimaryIndexInstantSearchOperationCallback(new DatasetId(datasetId), primaryKeyFields, txnSubsystem.getLockManager(), txnCtx); } catch (ACIDException e) { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/592af654/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java index 932c925..b8c6084 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java @@ -27,7 +27,7 @@ import org.apache.asterix.common.transactions.AbstractOperationCallbackFactory; import org.apache.asterix.common.transactions.DatasetId; import org.apache.asterix.common.transactions.ITransactionContext; import org.apache.asterix.common.transactions.ITransactionSubsystem; -import org.apache.asterix.common.transactions.JobId; +import org.apache.asterix.common.transactions.TxnId; import org.apache.asterix.transaction.management.opcallbacks.AbstractIndexModificationOperationCallback.Operation; import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.dataflow.IOperatorNodePushable; @@ -48,9 +48,9 @@ public class PrimaryIndexModificationOperationCallbackFactory extends AbstractOp private static final long serialVersionUID = 1L; private final Operation indexOp; - public PrimaryIndexModificationOperationCallbackFactory(JobId jobId, int datasetId, int[] primaryKeyFields, + public PrimaryIndexModificationOperationCallbackFactory(TxnId txnId, int datasetId, int[] primaryKeyFields, ITransactionSubsystemProvider txnSubsystemProvider, Operation indexOp, byte resourceType) { - super(jobId, datasetId, primaryKeyFields, txnSubsystemProvider, resourceType); + super(txnId, datasetId, primaryKeyFields, txnSubsystemProvider, resourceType); this.indexOp = indexOp; } @@ -66,7 +66,7 @@ public class PrimaryIndexModificationOperationCallbackFactory extends AbstractOp } try { - ITransactionContext txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(jobId, false); + ITransactionContext txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(txnId, false); DatasetLocalResource aResource = (DatasetLocalResource) resource.getResource(); IModificationOperationCallback modCallback = new PrimaryIndexModificationOperationCallback( new DatasetId(datasetId), primaryKeyFields, txnCtx, txnSubsystem.getLockManager(), txnSubsystem, http://git-wip-us.apache.org/repos/asf/asterixdb/blob/592af654/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallbackFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallbackFactory.java index 8bdbb9e..49490a1 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallbackFactory.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallbackFactory.java @@ -25,7 +25,7 @@ import org.apache.asterix.common.transactions.AbstractOperationCallbackFactory; import org.apache.asterix.common.transactions.DatasetId; import org.apache.asterix.common.transactions.ITransactionContext; import org.apache.asterix.common.transactions.ITransactionSubsystem; -import org.apache.asterix.common.transactions.JobId; +import org.apache.asterix.common.transactions.TxnId; import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.dataflow.IOperatorNodePushable; import org.apache.hyracks.api.exceptions.HyracksDataException; @@ -37,9 +37,9 @@ public class PrimaryIndexSearchOperationCallbackFactory extends AbstractOperatio private static final long serialVersionUID = 1L; - public PrimaryIndexSearchOperationCallbackFactory(JobId jobId, int datasetId, int[] entityIdFields, + public PrimaryIndexSearchOperationCallbackFactory(TxnId txnId, int datasetId, int[] entityIdFields, ITransactionSubsystemProvider txnSubsystemProvider, byte resourceType) { - super(jobId, datasetId, entityIdFields, txnSubsystemProvider, resourceType); + super(txnId, datasetId, entityIdFields, txnSubsystemProvider, resourceType); } @Override @@ -47,7 +47,7 @@ public class PrimaryIndexSearchOperationCallbackFactory extends AbstractOperatio IOperatorNodePushable operatorNodePushable) throws HyracksDataException { ITransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx); try { - ITransactionContext txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(jobId, false); + ITransactionContext txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(txnId, false); return new PrimaryIndexSearchOperationCallback(new DatasetId(datasetId), primaryKeyFields, txnSubsystem.getLockManager(), txnCtx); } catch (ACIDException e) { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/592af654/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java index b339d27..1847e80 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java @@ -27,7 +27,7 @@ import org.apache.asterix.common.transactions.AbstractOperationCallbackFactory; import org.apache.asterix.common.transactions.DatasetId; import org.apache.asterix.common.transactions.ITransactionContext; import org.apache.asterix.common.transactions.ITransactionSubsystem; -import org.apache.asterix.common.transactions.JobId; +import org.apache.asterix.common.transactions.TxnId; import org.apache.asterix.transaction.management.opcallbacks.AbstractIndexModificationOperationCallback.Operation; import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.dataflow.IOperatorNodePushable; @@ -44,9 +44,9 @@ public class SecondaryIndexModificationOperationCallbackFactory extends Abstract private static final long serialVersionUID = 1L; private final Operation indexOp; - public SecondaryIndexModificationOperationCallbackFactory(JobId jobId, int datasetId, int[] primaryKeyFields, + public SecondaryIndexModificationOperationCallbackFactory(TxnId txnId, int datasetId, int[] primaryKeyFields, ITransactionSubsystemProvider txnSubsystemProvider, Operation indexOp, byte resourceType) { - super(jobId, datasetId, primaryKeyFields, txnSubsystemProvider, resourceType); + super(txnId, datasetId, primaryKeyFields, txnSubsystemProvider, resourceType); this.indexOp = indexOp; } @@ -62,7 +62,7 @@ public class SecondaryIndexModificationOperationCallbackFactory extends Abstract } try { - ITransactionContext txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(jobId, false); + ITransactionContext txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(txnId, false); DatasetLocalResource aResource = (DatasetLocalResource) resource.getResource(); IModificationOperationCallback modCallback = new SecondaryIndexModificationOperationCallback( new DatasetId(datasetId), primaryKeyFields, txnCtx, txnSubsystem.getLockManager(), txnSubsystem, http://git-wip-us.apache.org/repos/asf/asterixdb/blob/592af654/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java index 7b7eff6..38adf2b 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java @@ -27,7 +27,7 @@ import org.apache.asterix.common.transactions.AbstractOperationCallbackFactory; import org.apache.asterix.common.transactions.DatasetId; import org.apache.asterix.common.transactions.ITransactionContext; import org.apache.asterix.common.transactions.ITransactionSubsystem; -import org.apache.asterix.common.transactions.JobId; +import org.apache.asterix.common.transactions.TxnId; import org.apache.asterix.transaction.management.opcallbacks.AbstractIndexModificationOperationCallback.Operation; import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.dataflow.IOperatorNodePushable; @@ -44,10 +44,10 @@ public class TempDatasetPrimaryIndexModificationOperationCallbackFactory extends private static final long serialVersionUID = 1L; private final Operation indexOp; - public TempDatasetPrimaryIndexModificationOperationCallbackFactory(JobId jobId, int datasetId, + public TempDatasetPrimaryIndexModificationOperationCallbackFactory(TxnId txnId, int datasetId, int[] primaryKeyFields, ITransactionSubsystemProvider txnSubsystemProvider, Operation indexOp, byte resourceType) { - super(jobId, datasetId, primaryKeyFields, txnSubsystemProvider, resourceType); + super(txnId, datasetId, primaryKeyFields, txnSubsystemProvider, resourceType); this.indexOp = indexOp; } @@ -63,7 +63,7 @@ public class TempDatasetPrimaryIndexModificationOperationCallbackFactory extends } try { - ITransactionContext txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(jobId, false); + ITransactionContext txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(txnId, false); DatasetLocalResource aResource = (DatasetLocalResource) resource.getResource(); IModificationOperationCallback modCallback = new TempDatasetIndexModificationOperationCallback( new DatasetId(datasetId), primaryKeyFields, txnCtx, txnSubsystem.getLockManager(), txnSubsystem, http://git-wip-us.apache.org/repos/asf/asterixdb/blob/592af654/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java index 1dc1c4e..5e499fc 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java @@ -27,7 +27,7 @@ import org.apache.asterix.common.transactions.AbstractOperationCallbackFactory; import org.apache.asterix.common.transactions.DatasetId; import org.apache.asterix.common.transactions.ITransactionContext; import org.apache.asterix.common.transactions.ITransactionSubsystem; -import org.apache.asterix.common.transactions.JobId; +import org.apache.asterix.common.transactions.TxnId; import org.apache.asterix.transaction.management.opcallbacks.AbstractIndexModificationOperationCallback.Operation; import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.dataflow.IOperatorNodePushable; @@ -45,10 +45,10 @@ public class TempDatasetSecondaryIndexModificationOperationCallbackFactory exten private static final long serialVersionUID = 1L; private final Operation indexOp; - public TempDatasetSecondaryIndexModificationOperationCallbackFactory(JobId jobId, int datasetId, + public TempDatasetSecondaryIndexModificationOperationCallbackFactory(TxnId txnId, int datasetId, int[] primaryKeyFields, ITransactionSubsystemProvider txnSubsystemProvider, Operation indexOp, byte resourceType) { - super(jobId, datasetId, primaryKeyFields, txnSubsystemProvider, resourceType); + super(txnId, datasetId, primaryKeyFields, txnSubsystemProvider, resourceType); this.indexOp = indexOp; } @@ -65,7 +65,7 @@ public class TempDatasetSecondaryIndexModificationOperationCallbackFactory exten } try { - ITransactionContext txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(jobId, false); + ITransactionContext txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(txnId, false); IModificationOperationCallback modCallback = new TempDatasetIndexModificationOperationCallback( new DatasetId(datasetId), primaryKeyFields, txnCtx, txnSubsystem.getLockManager(), txnSubsystem, resource.getId(), aResource.getPartition(), resourceType, indexOp); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/592af654/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java index 15b4344..be75ab9 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java @@ -26,7 +26,7 @@ import org.apache.asterix.common.transactions.AbstractOperationCallbackFactory; import org.apache.asterix.common.transactions.DatasetId; import org.apache.asterix.common.transactions.ITransactionContext; import org.apache.asterix.common.transactions.ITransactionSubsystem; -import org.apache.asterix.common.transactions.JobId; +import org.apache.asterix.common.transactions.TxnId; import org.apache.asterix.transaction.management.opcallbacks.AbstractIndexModificationOperationCallback.Operation; import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.dataflow.IOperatorNodePushable; @@ -43,9 +43,9 @@ public class UpsertOperationCallbackFactory extends AbstractOperationCallbackFac private static final long serialVersionUID = 1L; protected final Operation indexOp; - public UpsertOperationCallbackFactory(JobId jobId, int datasetId, int[] primaryKeyFields, + public UpsertOperationCallbackFactory(TxnId txnId, int datasetId, int[] primaryKeyFields, ITransactionSubsystemProvider txnSubsystemProvider, Operation indexOp, byte resourceType) { - super(jobId, datasetId, primaryKeyFields, txnSubsystemProvider, resourceType); + super(txnId, datasetId, primaryKeyFields, txnSubsystemProvider, resourceType); this.indexOp = indexOp; } @@ -62,7 +62,7 @@ public class UpsertOperationCallbackFactory extends AbstractOperationCallbackFac } try { - ITransactionContext txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(jobId, false); + ITransactionContext txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(txnId, false); IModificationOperationCallback modCallback = new UpsertOperationCallback(new DatasetId(datasetId), primaryKeyFields, txnCtx, txnSubsystem.getLockManager(), txnSubsystem, resource.getId(), aResource.getPartition(), resourceType, indexOp); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/592af654/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java index 1e22458..fe758e1 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java @@ -27,7 +27,7 @@ import org.apache.asterix.common.transactions.ILogManager; import org.apache.asterix.common.transactions.ILogMarkerCallback; import org.apache.asterix.common.transactions.ITransactionContext; import org.apache.asterix.common.transactions.ITransactionManager; -import org.apache.asterix.common.transactions.JobId; +import org.apache.asterix.common.transactions.TxnId; import org.apache.asterix.common.transactions.LogRecord; import org.apache.asterix.common.transactions.LogType; import org.apache.asterix.common.utils.TransactionUtil; @@ -50,7 +50,7 @@ public class CommitRuntime extends AbstractOneInputOneOutputOneFramePushRuntime protected final ITransactionManager transactionManager; protected final ILogManager logMgr; - protected final JobId jobId; + protected final TxnId txnId; protected final int datasetId; protected final int[] primaryKeyFields; protected final boolean isTemporaryDatasetWriteJob; @@ -62,14 +62,14 @@ public class CommitRuntime extends AbstractOneInputOneOutputOneFramePushRuntime protected LogRecord logRecord; protected final boolean isSink; - public CommitRuntime(IHyracksTaskContext ctx, JobId jobId, int datasetId, int[] primaryKeyFields, + public CommitRuntime(IHyracksTaskContext ctx, TxnId txnId, int datasetId, int[] primaryKeyFields, boolean isTemporaryDatasetWriteJob, boolean isWriteTransaction, int resourcePartition, boolean isSink) { this.ctx = ctx; INcApplicationContext appCtx = (INcApplicationContext) ctx.getJobletContext().getServiceContext().getApplicationContext(); this.transactionManager = appCtx.getTransactionSubsystem().getTransactionManager(); this.logMgr = appCtx.getTransactionSubsystem().getLogManager(); - this.jobId = jobId; + this.txnId = txnId; this.datasetId = datasetId; this.primaryKeyFields = primaryKeyFields; this.tRef = new FrameTupleReference(); @@ -83,7 +83,7 @@ public class CommitRuntime extends AbstractOneInputOneOutputOneFramePushRuntime @Override public void open() throws HyracksDataException { try { - transactionContext = transactionManager.getTransactionContext(jobId, false); + transactionContext = transactionManager.getTransactionContext(txnId, false); transactionContext.setWriteTxn(isWriteTransaction); ILogMarkerCallback callback = TaskUtil.get(ILogMarkerCallback.KEY_MARKER_CALLBACK, ctx); logRecord = new LogRecord(callback); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/592af654/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java index 1b32d89..2e43957 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java @@ -19,7 +19,7 @@ package org.apache.asterix.transaction.management.runtime; -import org.apache.asterix.common.transactions.JobId; +import org.apache.asterix.common.transactions.TxnId; import org.apache.hyracks.algebricks.runtime.base.IPushRuntime; import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory; import org.apache.hyracks.api.context.IHyracksTaskContext; @@ -29,7 +29,7 @@ public class CommitRuntimeFactory implements IPushRuntimeFactory { private static final long serialVersionUID = 1L; - protected final JobId jobId; + protected final TxnId txnId; protected final int datasetId; protected final int[] primaryKeyFields; protected final boolean isTemporaryDatasetWriteJob; @@ -37,9 +37,9 @@ public class CommitRuntimeFactory implements IPushRuntimeFactory { protected int[] datasetPartitions; protected final boolean isSink; - public CommitRuntimeFactory(JobId jobId, int datasetId, int[] primaryKeyFields, boolean isTemporaryDatasetWriteJob, + public CommitRuntimeFactory(TxnId txnId, int datasetId, int[] primaryKeyFields, boolean isTemporaryDatasetWriteJob, boolean isWriteTransaction, int[] datasetPartitions, boolean isSink) { - this.jobId = jobId; + this.txnId = txnId; this.datasetId = datasetId; this.primaryKeyFields = primaryKeyFields; this.isTemporaryDatasetWriteJob = isTemporaryDatasetWriteJob; @@ -55,7 +55,7 @@ public class CommitRuntimeFactory implements IPushRuntimeFactory { @Override public IPushRuntime createPushRuntime(IHyracksTaskContext ctx) throws HyracksDataException { - return new CommitRuntime(ctx, jobId, datasetId, primaryKeyFields, isTemporaryDatasetWriteJob, + return new CommitRuntime(ctx, txnId, datasetId, primaryKeyFields, isTemporaryDatasetWriteJob, isWriteTransaction, datasetPartitions[ctx.getTaskAttemptId().getTaskId().getPartition()], isSink); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/592af654/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/ConcurrentLockManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/ConcurrentLockManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/ConcurrentLockManager.java index f386b39..3aa2578 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/ConcurrentLockManager.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/ConcurrentLockManager.java @@ -57,7 +57,7 @@ public class ConcurrentLockManager implements ILockManager, ILifeCycleComponent private ResourceArenaManager resArenaMgr; private RequestArenaManager reqArenaMgr; private JobArenaManager jobArenaMgr; - private ConcurrentHashMap<Integer, Long> jobId2JobSlotMap; + private ConcurrentHashMap<Long, Long> txnId2TxnSlotMap; private LockManagerStats stats = new LockManagerStats(10000); enum LockAction { @@ -96,7 +96,7 @@ public class ConcurrentLockManager implements ILockManager, ILifeCycleComponent resArenaMgr = new ResourceArenaManager(noArenas, lockManagerShrinkTimer); reqArenaMgr = new RequestArenaManager(noArenas, lockManagerShrinkTimer); jobArenaMgr = new JobArenaManager(noArenas, lockManagerShrinkTimer); - jobId2JobSlotMap = new ConcurrentHashMap<>(); + txnId2TxnSlotMap = new ConcurrentHashMap<>(); } @Override @@ -105,8 +105,8 @@ public class ConcurrentLockManager implements ILockManager, ILifeCycleComponent log("lock", datasetId.getId(), entityHashValue, lockMode, txnContext); stats.lock(); - final int jobId = txnContext.getJobId().getId(); - final long jobSlot = findOrAllocJobSlot(jobId); + final long txnId = txnContext.getTxnId().getId(); + final long jobSlot = findOrAllocJobSlot(txnId); final ResourceGroup group = table.get(datasetId.getId(), entityHashValue); group.getLatch(); try { @@ -328,7 +328,7 @@ public class ConcurrentLockManager implements ILockManager, ILifeCycleComponent log("instantLock", datasetId.getId(), entityHashValue, lockMode, txnContext); stats.instantLock(); - final int jobId = txnContext.getJobId().getId(); + final long txnId = txnContext.getTxnId().getId(); final ResourceGroup group = table.get(datasetId.getId(), entityHashValue); if (group.firstResourceIndex.get() == NILL) { validateJob(txnContext); @@ -349,7 +349,7 @@ public class ConcurrentLockManager implements ILockManager, ILifeCycleComponent return; } - final long jobSlot = findOrAllocJobSlot(jobId); + final long jobSlot = findOrAllocJobSlot(txnId); while (true) { final LockAction act = determineLockAction(resSlot, jobSlot, lockMode); @@ -389,8 +389,8 @@ public class ConcurrentLockManager implements ILockManager, ILifeCycleComponent log("tryLock", datasetId.getId(), entityHashValue, lockMode, txnContext); stats.tryLock(); - final int jobId = txnContext.getJobId().getId(); - final long jobSlot = findOrAllocJobSlot(jobId); + final long txnId = txnContext.getTxnId().getId(); + final long jobSlot = findOrAllocJobSlot(txnId); final ResourceGroup group = table.get(datasetId.getId(), entityHashValue); group.getLatch(); @@ -425,7 +425,7 @@ public class ConcurrentLockManager implements ILockManager, ILifeCycleComponent log("instantTryLock", datasetId.getId(), entityHashValue, lockMode, txnContext); stats.instantTryLock(); - final int jobId = txnContext.getJobId().getId(); + final long txnId = txnContext.getTxnId().getId(); final ResourceGroup group = table.get(datasetId.getId(), entityHashValue); if (group.firstResourceIndex.get() == NILL) { validateJob(txnContext); @@ -444,7 +444,7 @@ public class ConcurrentLockManager implements ILockManager, ILifeCycleComponent return true; } - final long jobSlot = findOrAllocJobSlot(jobId); + final long jobSlot = findOrAllocJobSlot(txnId); LockAction act = determineLockAction(resSlot, jobSlot, lockMode); switch (act) { @@ -467,8 +467,8 @@ public class ConcurrentLockManager implements ILockManager, ILifeCycleComponent public void unlock(DatasetId datasetId, int entityHashValue, byte lockMode, ITransactionContext txnContext) throws ACIDException { log("unlock", datasetId.getId(), entityHashValue, lockMode, txnContext); - final int jobId = txnContext.getJobId().getId(); - final long jobSlot = jobId2JobSlotMap.get(jobId); + final long txnId = txnContext.getTxnId().getId(); + final long jobSlot = txnId2TxnSlotMap.get(txnId); unlock(datasetId.getId(), entityHashValue, lockMode, jobSlot); } @@ -528,8 +528,8 @@ public class ConcurrentLockManager implements ILockManager, ILifeCycleComponent log("releaseLocks", NIL, NIL, LockMode.ANY, txnContext); stats.releaseLocks(); - int jobId = txnContext.getJobId().getId(); - Long jobSlot = jobId2JobSlotMap.get(jobId); + long txnId = txnContext.getTxnId().getId(); + Long jobSlot = txnId2TxnSlotMap.get(txnId); if (jobSlot == null) { // we don't know the job, so there are no locks for it - we're done return; @@ -557,19 +557,19 @@ public class ConcurrentLockManager implements ILockManager, ILifeCycleComponent LOGGER.finer("del job slot " + TypeUtil.Global.toString(jobSlot)); } jobArenaMgr.deallocate(jobSlot); - jobId2JobSlotMap.remove(jobId); + txnId2TxnSlotMap.remove(txnId); stats.logCounters(LOGGER, Level.FINE, true); } - private long findOrAllocJobSlot(int jobId) { - Long jobSlot = jobId2JobSlotMap.get(jobId); + private long findOrAllocJobSlot(long txnId) { + Long jobSlot = txnId2TxnSlotMap.get(txnId); if (jobSlot == null) { jobSlot = new Long(jobArenaMgr.allocate()); if (DEBUG_MODE) { - LOGGER.finer("new job slot " + TypeUtil.Global.toString(jobSlot) + " (" + jobId + ")"); + LOGGER.finer("new job slot " + TypeUtil.Global.toString(jobSlot) + " (" + txnId + ")"); } - jobArenaMgr.setJobId(jobSlot, jobId); - Long oldSlot = jobId2JobSlotMap.putIfAbsent(jobId, jobSlot); + jobArenaMgr.setTxnId(jobSlot, txnId); + Long oldSlot = txnId2TxnSlotMap.putIfAbsent(txnId, jobSlot); if (oldSlot != null) { // if another thread allocated a slot for this jobThreadId between // get(..) and putIfAbsent(..), we'll use that slot and @@ -917,7 +917,7 @@ public class ConcurrentLockManager implements ILockManager, ILifeCycleComponent private void validateJob(ITransactionContext txnContext) throws ACIDException { if (txnContext.getTxnState() == ITransactionManager.ABORTED) { - throw new ACIDException("" + txnContext.getJobId() + " is in ABORTED state."); + throw new ACIDException("" + txnContext.getTxnId() + " is in ABORTED state."); } else if (txnContext.isTimeout()) { requestAbort(txnContext, "timeout"); } @@ -926,7 +926,7 @@ public class ConcurrentLockManager implements ILockManager, ILifeCycleComponent private void requestAbort(ITransactionContext txnContext, String msg) throws ACIDException { txnContext.setTimeout(true); throw new ACIDException( - "Transaction " + txnContext.getJobId() + " should abort (requested by the Lock Manager)" + ":\n" + msg); + "Transaction " + txnContext.getTxnId() + " should abort (requested by the Lock Manager)" + ":\n" + msg); } /* @@ -949,7 +949,7 @@ public class ConcurrentLockManager implements ILockManager, ILifeCycleComponent sb.append(" , mode : ").append(LockMode.toString(lockMode)); } if (txnContext != null) { - sb.append(" , jobId : ").append(txnContext.getJobId()); + sb.append(" , txnId : ").append(txnContext.getTxnId()); } sb.append(" , thread : ").append(Thread.currentThread().getName()); sb.append(" }"); @@ -970,8 +970,8 @@ public class ConcurrentLockManager implements ILockManager, ILifeCycleComponent while (reqSlot != NILL) { byte lockMode = (byte) reqArenaMgr.getLockMode(reqSlot); long jobSlot = reqArenaMgr.getJobSlot(reqSlot); - int jobId = jobArenaMgr.getJobId(jobSlot); - assertLockCanBeFoundInJobQueue(dsId, entityHashValue, lockMode, jobId); + long txnId = jobArenaMgr.getTxnId(jobSlot); + assertLockCanBeFoundInJobQueue(dsId, entityHashValue, lockMode, txnId); reqSlot = reqArenaMgr.getNextRequest(reqSlot); } resSlot = resArenaMgr.getNext(resSlot); @@ -988,10 +988,10 @@ public class ConcurrentLockManager implements ILockManager, ILifeCycleComponent } } - private void assertLockCanBeFoundInJobQueue(int dsId, int entityHashValue, byte lockMode, int jobId) { - if (findLockInJobQueue(dsId, entityHashValue, jobId, lockMode) == NILL) { + private void assertLockCanBeFoundInJobQueue(int dsId, int entityHashValue, byte lockMode, long txnId) { + if (findLockInJobQueue(dsId, entityHashValue, txnId, lockMode) == NILL) { String msg = "request for " + LockMode.toString(lockMode) + " lock on dataset " + dsId + " entity " - + entityHashValue + " not found for job " + jobId + " in thread " + + entityHashValue + " not found for txn " + txnId + " in thread " + Thread.currentThread().getName(); LOGGER.severe(msg); throw new IllegalStateException(msg); @@ -1005,14 +1005,14 @@ public class ConcurrentLockManager implements ILockManager, ILifeCycleComponent * dataset id * @param entityHashValue * primary key hash value - * @param jobId + * @param txnId * job id * @param lockMode * lock mode * @return the slot of the request, if the lock request is found, NILL otherwise */ - private long findLockInJobQueue(final int dsId, final int entityHashValue, final int jobId, byte lockMode) { - Long jobSlot = jobId2JobSlotMap.get(jobId); + private long findLockInJobQueue(final int dsId, final int entityHashValue, final long txnId, byte lockMode) { + Long jobSlot = txnId2TxnSlotMap.get(txnId); if (jobSlot == null) { return NILL; } @@ -1040,7 +1040,7 @@ public class ConcurrentLockManager implements ILockManager, ILifeCycleComponent } private TablePrinter getDumpTablePrinter() { - return new DumpTablePrinter(table, resArenaMgr, reqArenaMgr, jobArenaMgr, jobId2JobSlotMap); + return new DumpTablePrinter(table, resArenaMgr, reqArenaMgr, jobArenaMgr, txnId2TxnSlotMap); } public String printByResource() { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/592af654/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/DumpTablePrinter.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/DumpTablePrinter.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/DumpTablePrinter.java index a75f756..26261c2 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/DumpTablePrinter.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/DumpTablePrinter.java @@ -26,15 +26,15 @@ public class DumpTablePrinter implements TablePrinter { private ResourceArenaManager resArenaMgr; private RequestArenaManager reqArenaMgr; private JobArenaManager jobArenaMgr; - private ConcurrentHashMap<Integer, Long> jobId2JobSlotMap; + private ConcurrentHashMap<Long, Long> txnIdToJobSlotMap; DumpTablePrinter(ResourceGroupTable table, ResourceArenaManager resArenaMgr, RequestArenaManager reqArenaMgr, - JobArenaManager jobArenaMgr, ConcurrentHashMap<Integer, Long> jobId2JobSlotMap) { + JobArenaManager jobArenaMgr, ConcurrentHashMap<Long, Long> txnIdToJobSlotMap) { this.table = table; this.resArenaMgr = resArenaMgr; this.reqArenaMgr = reqArenaMgr; this.jobArenaMgr = jobArenaMgr; - this.jobId2JobSlotMap = jobId2JobSlotMap; + this.txnIdToJobSlotMap = txnIdToJobSlotMap; } public StringBuilder append(StringBuilder sb) { @@ -52,10 +52,10 @@ public class DumpTablePrinter implements TablePrinter { reqArenaMgr.append(sb); sb.append(">>dump_end\t>>----- [reqArenaMgr] -----\n"); - sb.append(">>dump_begin\t>>----- [jobIdSlotMap] -----\n"); - for (Integer i : jobId2JobSlotMap.keySet()) { + sb.append(">>dump_begin\t>>----- [txnIdSlotMap] -----\n"); + for (Long i : txnIdToJobSlotMap.keySet()) { sb.append(i).append(" : "); - TypeUtil.Global.append(sb, jobId2JobSlotMap.get(i)); + TypeUtil.Global.append(sb, txnIdToJobSlotMap.get(i)); sb.append("\n"); } sb.append(">>dump_end\t>>----- [jobIdSlotMap] -----\n"); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/592af654/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/Job.json ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/Job.json b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/Job.json index a649b7c..5e0f588 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/Job.json +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/Job.json @@ -17,8 +17,8 @@ "initial" : "-1" }, { - "name" : "job id", - "type" : "INT" + "name" : "txn id", + "type" : "GLOBAL" } ] } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/592af654/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/ResourceTablePrinter.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/ResourceTablePrinter.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/ResourceTablePrinter.java index eff9e21..e8ef2c8 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/ResourceTablePrinter.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/ResourceTablePrinter.java @@ -99,7 +99,7 @@ public class ResourceTablePrinter implements TablePrinter { StringBuilder appendRequest(StringBuilder sb, long req) { long job = reqArenaMgr.getJobSlot(req); - sb.append("{ \"job\": ").append(jobArenaMgr.getJobId(job)); + sb.append("{ \"job\": ").append(jobArenaMgr.getTxnId(job)); sb.append(", \"mode\": \"").append(string(reqArenaMgr.getLockMode(req))); return sb.append("\" }"); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/592af654/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java index 668eab1..3d78ad9 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java @@ -33,11 +33,11 @@ import org.apache.asterix.common.transactions.ILogBuffer; import org.apache.asterix.common.transactions.ILogRecord; import org.apache.asterix.common.transactions.ITransactionContext; import org.apache.asterix.common.transactions.ITransactionSubsystem; -import org.apache.asterix.common.transactions.JobId; import org.apache.asterix.common.transactions.LogRecord; import org.apache.asterix.common.transactions.LogSource; import org.apache.asterix.common.transactions.LogType; import org.apache.asterix.common.transactions.MutableLong; +import org.apache.asterix.common.transactions.TxnId; import org.apache.asterix.transaction.management.service.transaction.TransactionManagementConstants.LockManagerConstants.LockMode; import org.apache.hyracks.api.exceptions.HyracksDataException; @@ -61,7 +61,7 @@ public class LogBuffer implements ILogBuffer { protected final LinkedBlockingQueue<ILogRecord> remoteJobsQ; private FileChannel fileChannel; private boolean stop; - private final JobId reusableJobId; + private final MutableTxnId reusableTxnId; private final DatasetId reusableDatasetId; public LogBuffer(ITransactionSubsystem txnSubsystem, int logPageSize, MutableLong flushLSN) { @@ -79,7 +79,7 @@ public class LogBuffer implements ILogBuffer { syncCommitQ = new LinkedBlockingQueue<>(logPageSize / ILogRecord.JOB_TERMINATE_LOG_SIZE); flushQ = new LinkedBlockingQueue<>(); remoteJobsQ = new LinkedBlockingQueue<>(); - reusableJobId = new JobId(-1); + reusableTxnId = new MutableTxnId(-1); reusableDatasetId = new DatasetId(-1); } @@ -241,9 +241,9 @@ public class LogBuffer implements ILogBuffer { while (logRecord != null) { if (logRecord.getLogSource() == LogSource.LOCAL) { if (logRecord.getLogType() == LogType.ENTITY_COMMIT) { - reusableJobId.setId(logRecord.getJobId()); + reusableTxnId.setId(logRecord.getTxnId()); reusableDatasetId.setId(logRecord.getDatasetId()); - txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(reusableJobId, false); + txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(reusableTxnId, false); txnSubsystem.getLockManager().unlock(reusableDatasetId, logRecord.getPKHashValue(), LockMode.ANY, txnCtx); txnCtx.notifyOptracker(false); @@ -252,8 +252,8 @@ public class LogBuffer implements ILogBuffer { } } else if (logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType() == LogType.ABORT) { - reusableJobId.setId(logRecord.getJobId()); - txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(reusableJobId, false); + reusableTxnId.setId(logRecord.getTxnId()); + txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(reusableTxnId, false); txnCtx.notifyOptracker(true); notifyJobTermination(); } else if (logRecord.getLogType() == LogType.FLUSH) { @@ -341,4 +341,15 @@ public class LogBuffer implements ILogBuffer { public int getLogPageSize() { return logPageSize; } + + private class MutableTxnId extends TxnId { + + public MutableTxnId(long id) { + super(id); + } + + public void setId(long id) { + this.id = id; + } + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/592af654/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java index 5f9369d..4d671f3 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java @@ -159,7 +159,7 @@ public class LogManager implements ILogManager, ILifeCycleComponent { ITransactionContext txnCtx = logRecord.getTxnCtx(); if (txnCtx.getTxnState() == ITransactionManager.ABORTED && logRecord.getLogType() != LogType.ABORT) { throw new ACIDException( - "Aborted job(" + txnCtx.getJobId() + ") tried to write non-abort type log record."); + "Aborted txn(" + txnCtx.getTxnId() + ") tried to write non-abort type log record."); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/592af654/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java index dd8fb6e..3f94749 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java @@ -35,7 +35,7 @@ public class LogManagerWithReplication extends LogManager { private IReplicationManager replicationManager; private final IReplicationStrategy replicationStrategy; - private final Set<Integer> replicatedJob = ConcurrentHashMap.newKeySet(); + private final Set<Long> replicatedTxn = ConcurrentHashMap.newKeySet(); public LogManagerWithReplication(ITransactionSubsystem txnSubsystem, IReplicationStrategy replicationStrategy) { super(txnSubsystem); @@ -51,13 +51,13 @@ public class LogManagerWithReplication extends LogManager { case LogType.UPDATE: case LogType.FLUSH: shouldReplicate = replicationStrategy.isMatch(logRecord.getDatasetId()); - if (shouldReplicate && !replicatedJob.contains(logRecord.getJobId())) { - replicatedJob.add(logRecord.getJobId()); + if (shouldReplicate && !replicatedTxn.contains(logRecord.getTxnId())) { + replicatedTxn.add(logRecord.getTxnId()); } break; case LogType.JOB_COMMIT: case LogType.ABORT: - shouldReplicate = replicatedJob.remove(logRecord.getJobId()); + shouldReplicate = replicatedTxn.remove(logRecord.getTxnId()); break; default: shouldReplicate = false; @@ -120,7 +120,7 @@ public class LogManagerWithReplication extends LogManager { ITransactionContext txnCtx = logRecord.getTxnCtx(); if (txnCtx.getTxnState() == ITransactionManager.ABORTED && logRecord.getLogType() != LogType.ABORT) { throw new ACIDException( - "Aborted job(" + txnCtx.getJobId() + ") tried to write non-abort type log record."); + "Aborted txn(" + txnCtx.getTxnId() + ") tried to write non-abort type log record."); } }
