[NO ISSUE][TX] Rename Asterix JobId to TxnId - user model changes: no - storage format changes: yes - Txn log jobId changed from int to long. - interface changes: yes Update APIs to use long TxnId instead of int JobId
Details: - Rename TxnId -> TxnEntityId. - Rename Asterix JobId -> TxnId. - Rename Asterix JobIdFactory -> TxnIdFactory. - Change TxnId size from int to long and update log sizes accordingly. Change-Id: I0905595a50195b83c1afae5dde88e5502ad21b9f Reviewed-on: https://asterix-gerrit.ics.uci.edu/2149 Tested-by: Jenkins <[email protected]> Contrib: Jenkins <[email protected]> Integration-Tests: Jenkins <[email protected]> Reviewed-by: Till Westmann <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/592af654 Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/592af654 Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/592af654 Branch: refs/heads/master Commit: 592af6545d77a66d60a6fb02965d7ad6747e4e3a Parents: 9eead00 Author: Murtadha Hubail <[email protected]> Authored: Tue Nov 14 04:41:31 2017 +0300 Committer: Murtadha Hubail <[email protected]> Committed: Tue Nov 14 09:41:53 2017 -0800 ---------------------------------------------------------------------- .../operators/physical/CommitPOperator.java | 11 +- .../physical/InvertedIndexPOperator.java | 3 +- .../rules/SetupCommitExtensionOpRule.java | 8 +- .../apache/asterix/api/common/APIFramework.java | 9 +- .../apache/asterix/app/nc/RecoveryManager.java | 143 +++--- .../apache/asterix/utils/FeedOperations.java | 8 +- .../apache/asterix/utils/FlushDatasetUtil.java | 9 +- .../org/apache/asterix/utils/RebalanceUtil.java | 16 +- .../app/bootstrap/TestNodeController.java | 5 +- .../AbstractOperationCallbackFactory.java | 6 +- .../asterix/common/transactions/Checkpoint.java | 12 +- .../asterix/common/transactions/ILogRecord.java | 61 ++- .../common/transactions/IRecoveryManager.java | 6 +- .../transactions/ITransactionContext.java | 2 +- .../transactions/ITransactionManager.java | 14 +- .../asterix/common/transactions/JobId.java | 64 --- .../asterix/common/transactions/LogRecord.java | 30 +- .../asterix/common/transactions/TxnId.java | 61 +++ .../asterix/common/utils/TransactionUtil.java | 12 +- .../asterix/metadata/MetadataManager.java | 142 +++--- .../apache/asterix/metadata/MetadataNode.java | 452 +++++++++---------- .../metadata/MetadataTransactionContext.java | 12 +- .../asterix/metadata/api/IMetadataNode.java | 230 +++++----- .../asterix/metadata/api/IValueExtractor.java | 6 +- .../metadata/declared/MetadataProvider.java | 32 +- .../metadata/entities/BuiltinTypeMap.java | 6 +- .../asterix/metadata/entities/Dataset.java | 32 +- .../DatatypeTupleTranslator.java | 16 +- .../IndexTupleTranslator.java | 16 +- .../MetadataTupleTranslatorProvider.java | 10 +- .../asterix/metadata/utils/DatasetUtil.java | 14 +- .../asterix/metadata/utils/IndexUtil.java | 15 +- .../utils/SecondaryBTreeOperationsHelper.java | 6 +- ...econdaryCorrelatedBTreeOperationsHelper.java | 6 +- ...CorrelatedInvertedIndexOperationsHelper.java | 6 +- ...econdaryCorrelatedRTreeOperationsHelper.java | 6 +- ...daryCorrelatedTreeIndexOperationsHelper.java | 8 +- .../SecondaryInvertedIndexOperationsHelper.java | 6 +- .../utils/SecondaryRTreeOperationsHelper.java | 6 +- .../MetadataEntityValueExtractor.java | 5 +- .../NestedDatatypeNameValueExtractor.java | 4 +- .../TupleCopyValueExtractor.java | 4 +- .../management/ReplicationChannel.java | 4 +- .../management/ReplicationManager.java | 60 +-- .../job/listener/JobEventListenerFactory.java | 16 +- ...tiTransactionJobletEventListenerFactory.java | 16 +- .../std/FlushDatasetOperatorDescriptor.java | 10 +- .../asterix/tools/datagen/AdmDataGen.java | 4 +- ...tractIndexModificationOperationCallback.java | 2 +- .../LockThenSearchOperationCallback.java | 2 +- .../LockThenSearchOperationCallbackFactory.java | 8 +- ...exInstantSearchOperationCallbackFactory.java | 8 +- ...dexModificationOperationCallbackFactory.java | 8 +- ...maryIndexSearchOperationCallbackFactory.java | 8 +- ...dexModificationOperationCallbackFactory.java | 8 +- ...dexModificationOperationCallbackFactory.java | 8 +- ...dexModificationOperationCallbackFactory.java | 8 +- .../UpsertOperationCallbackFactory.java | 8 +- .../management/runtime/CommitRuntime.java | 10 +- .../runtime/CommitRuntimeFactory.java | 10 +- .../service/locking/ConcurrentLockManager.java | 64 +-- .../service/locking/DumpTablePrinter.java | 12 +- .../management/service/locking/Job.json | 4 +- .../service/locking/ResourceTablePrinter.java | 2 +- .../management/service/logging/LogBuffer.java | 25 +- .../management/service/logging/LogManager.java | 2 +- .../logging/LogManagerWithReplication.java | 10 +- .../recovery/AbstractCheckpointManager.java | 2 +- .../service/recovery/TxnEntityId.java | 175 +++++++ .../management/service/recovery/TxnId.java | 175 ------- .../service/transaction/JobIdFactory.java | 38 -- .../service/transaction/TransactionContext.java | 20 +- .../service/transaction/TransactionManager.java | 56 +-- .../service/transaction/TxnIdFactory.java | 42 ++ .../service/locking/LockManagerUnitTest.java | 8 +- .../management/service/locking/Locker.java | 2 +- .../management/service/locking/Request.java | 4 +- 77 files changed, 1178 insertions(+), 1181 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/asterixdb/blob/592af654/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitPOperator.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitPOperator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitPOperator.java index 2dd4c3d..abd18aa 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitPOperator.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitPOperator.java @@ -21,7 +21,7 @@ package org.apache.asterix.algebra.operators.physical; import java.util.List; -import org.apache.asterix.common.transactions.JobId; +import org.apache.asterix.common.transactions.TxnId; import org.apache.asterix.metadata.declared.MetadataProvider; import org.apache.asterix.metadata.entities.Dataset; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; @@ -39,17 +39,16 @@ import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext; import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper; import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory; import org.apache.hyracks.api.dataflow.value.RecordDescriptor; -import org.apache.hyracks.api.io.FileSplit; public class CommitPOperator extends AbstractPhysicalOperator { private final List<LogicalVariable> primaryKeyLogicalVars; - private final JobId jobId; + private final TxnId txnId; private final Dataset dataset; private final boolean isSink; - public CommitPOperator(JobId jobId, Dataset dataset, List<LogicalVariable> primaryKeyLogicalVars, boolean isSink) { - this.jobId = jobId; + public CommitPOperator(TxnId txnId, Dataset dataset, List<LogicalVariable> primaryKeyLogicalVars, boolean isSink) { + this.txnId = txnId; this.dataset = dataset; this.primaryKeyLogicalVars = primaryKeyLogicalVars; this.isSink = isSink; @@ -88,7 +87,7 @@ public class CommitPOperator extends AbstractPhysicalOperator { int[] primaryKeyFields = JobGenHelper.variablesToFieldIndexes(primaryKeyLogicalVars, inputSchemas[0]); //get dataset splits - IPushRuntimeFactory runtime = dataset.getCommitRuntimeFactory(metadataProvider, jobId, primaryKeyFields, + IPushRuntimeFactory runtime = dataset.getCommitRuntimeFactory(metadataProvider, txnId, primaryKeyFields, isSink); builder.contributeMicroOperator(op, runtime, recDesc); ILogicalOperator src = op.getInputs().get(0).getValue(); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/592af654/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java index 12114f0..c941320 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java @@ -18,7 +18,6 @@ */ package org.apache.asterix.algebra.operators.physical; -import org.apache.asterix.common.exceptions.MetadataException; import org.apache.asterix.metadata.MetadataManager; import org.apache.asterix.metadata.declared.DataSourceId; import org.apache.asterix.metadata.declared.MetadataProvider; @@ -159,7 +158,7 @@ public class InvertedIndexPOperator extends IndexSearchPOperator { jobSpec, outputRecDesc, queryField, dataflowHelperFactory, queryTokenizerFactory, searchModifierFactory, retainInput, retainMissing, context.getMissingWriterFactory(), dataset.getSearchCallbackFactory(metadataProvider.getStorageComponentProvider(), secondaryIndex, - ((JobEventListenerFactory) jobSpec.getJobletEventListenerFactory()).getJobId(), + ((JobEventListenerFactory) jobSpec.getJobletEventListenerFactory()).getTxnId(), IndexOperation.SEARCH, null), minFilterFieldIndexes, maxFilterFieldIndexes, isFullTextSearchQuery, numPrimaryKeys, propagateIndexFilter); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/592af654/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetupCommitExtensionOpRule.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetupCommitExtensionOpRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetupCommitExtensionOpRule.java index 18e5f5e..61339bf 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetupCommitExtensionOpRule.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetupCommitExtensionOpRule.java @@ -23,7 +23,7 @@ import java.util.List; import org.apache.asterix.algebra.operators.CommitOperator; import org.apache.asterix.algebra.operators.physical.CommitPOperator; -import org.apache.asterix.common.transactions.JobId; +import org.apache.asterix.common.transactions.TxnId; import org.apache.asterix.metadata.declared.DatasetDataSource; import org.apache.asterix.metadata.declared.MetadataProvider; import org.apache.asterix.metadata.entities.Dataset; @@ -99,14 +99,14 @@ public class SetupCommitExtensionOpRule implements IAlgebraicRewriteRule { primaryKeyLogicalVars.add(new LogicalVariable(varRefExpr.getVariableReference().getId())); } - //get JobId(TransactorId) + //get TxnId(TransactorId) MetadataProvider mp = (MetadataProvider) context.getMetadataProvider(); - JobId jobId = mp.getJobId(); + TxnId txnId = mp.getTxnId(); //create the logical and physical operator CommitOperator commitOperator = new CommitOperator(primaryKeyLogicalVars, isSink); CommitPOperator commitPOperator = - new CommitPOperator(jobId, dataset, primaryKeyLogicalVars, isSink); + new CommitPOperator(txnId, dataset, primaryKeyLogicalVars, isSink); commitOperator.setPhysicalOperator(commitPOperator); //create ExtensionOperator and put the commitOperator in it. http://git-wip-us.apache.org/repos/asf/asterixdb/blob/592af654/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java index 2078288..f7ecefc 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java @@ -38,6 +38,7 @@ import org.apache.asterix.common.exceptions.ACIDException; import org.apache.asterix.common.exceptions.AsterixException; import org.apache.asterix.common.exceptions.CompilationException; import org.apache.asterix.common.exceptions.ErrorCode; +import org.apache.asterix.common.transactions.TxnId; import org.apache.asterix.common.utils.Job; import org.apache.asterix.common.utils.Job.SubmissionMode; import org.apache.asterix.compiler.provider.ILangCompilationProvider; @@ -63,7 +64,7 @@ import org.apache.asterix.lang.common.util.FunctionUtil; import org.apache.asterix.metadata.declared.MetadataProvider; import org.apache.asterix.optimizer.base.FuzzyUtils; import org.apache.asterix.runtime.job.listener.JobEventListenerFactory; -import org.apache.asterix.transaction.management.service.transaction.JobIdFactory; +import org.apache.asterix.transaction.management.service.transaction.TxnIdFactory; import org.apache.asterix.translator.CompiledStatements.ICompiledDmlStatement; import org.apache.asterix.translator.IStatementExecutor.Stats; import org.apache.asterix.translator.SessionConfig; @@ -211,8 +212,8 @@ public class APIFramework { printPlanPostfix(output); } - org.apache.asterix.common.transactions.JobId asterixJobId = JobIdFactory.generateJobId(); - metadataProvider.setJobId(asterixJobId); + TxnId txnId = TxnIdFactory.create(); + metadataProvider.setTxnId(txnId); ILangExpressionToPlanTranslator t = translatorFactory.createExpressionToPlanTranslator(metadataProvider, varCounter); @@ -351,7 +352,7 @@ public class APIFramework { builder.setNormalizedKeyComputerFactoryProvider(format.getNormalizedKeyComputerFactoryProvider()); JobEventListenerFactory jobEventListenerFactory = - new JobEventListenerFactory(asterixJobId, metadataProvider.isWriteTransaction()); + new JobEventListenerFactory(txnId, metadataProvider.isWriteTransaction()); JobSpecification spec = compiler.createJob(metadataProvider.getApplicationContext(), jobEventListenerFactory); // When the top-level statement is a query, the statement parameter is null. http://git-wip-us.apache.org/repos/asf/asterixdb/blob/592af654/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java index 25385c6..2435b60 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java @@ -61,7 +61,7 @@ import org.apache.asterix.transaction.management.opcallbacks.AbstractIndexModifi import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository; import org.apache.asterix.transaction.management.service.logging.LogManager; import org.apache.asterix.transaction.management.service.recovery.AbstractCheckpointManager; -import org.apache.asterix.transaction.management.service.recovery.TxnId; +import org.apache.asterix.transaction.management.service.recovery.TxnEntityId; import org.apache.asterix.transaction.management.service.transaction.TransactionManagementConstants; import org.apache.commons.io.FileUtils; import org.apache.hyracks.api.application.INCServiceContext; @@ -87,7 +87,7 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent { private final LogManager logMgr; private final boolean replicationEnabled; private static final String RECOVERY_FILES_DIR_NAME = "recovery_temp"; - private Map<Integer, JobEntityCommits> jobId2WinnerEntitiesMap = null; + private Map<Long, JobEntityCommits> jobId2WinnerEntitiesMap = null; private final long cachedEntityCommitsPerJobSize; private final PersistentLocalResourceRepository localResourceRepository; private final ICheckpointManager checkpointManager; @@ -183,7 +183,7 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent { public synchronized void replayPartitionsLogs(Set<Integer> partitions, ILogReader logReader, long lowWaterMarkLSN) throws IOException, ACIDException { try { - Set<Integer> winnerJobSet = startRecoverysAnalysisPhase(partitions, logReader, lowWaterMarkLSN); + Set<Long> winnerJobSet = startRecoverysAnalysisPhase(partitions, logReader, lowWaterMarkLSN); startRecoveryRedoPhase(partitions, logReader, lowWaterMarkLSN, winnerJobSet); } finally { logReader.close(); @@ -191,13 +191,13 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent { } } - private synchronized Set<Integer> startRecoverysAnalysisPhase(Set<Integer> partitions, ILogReader logReader, + private synchronized Set<Long> startRecoverysAnalysisPhase(Set<Integer> partitions, ILogReader logReader, long lowWaterMarkLSN) throws IOException, ACIDException { int updateLogCount = 0; int entityCommitLogCount = 0; int jobCommitLogCount = 0; int abortLogCount = 0; - Set<Integer> winnerJobSet = new HashSet<>(); + Set<Long> winnerJobSet = new HashSet<>(); jobId2WinnerEntitiesMap = new HashMap<>(); //set log reader to the lowWaterMarkLsn ILogRecord logRecord; @@ -214,8 +214,8 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent { } break; case LogType.JOB_COMMIT: - winnerJobSet.add(logRecord.getJobId()); - cleanupJobCommits(logRecord.getJobId()); + winnerJobSet.add(logRecord.getTxnId()); + cleanupTxnCommits(logRecord.getTxnId()); jobCommitLogCount++; break; case LogType.ENTITY_COMMIT: @@ -249,38 +249,38 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent { return winnerJobSet; } - private void cleanupJobCommits(int jobId) { - if (jobId2WinnerEntitiesMap.containsKey(jobId)) { - JobEntityCommits jobEntityWinners = jobId2WinnerEntitiesMap.get(jobId); + private void cleanupTxnCommits(long txnId) { + if (jobId2WinnerEntitiesMap.containsKey(txnId)) { + JobEntityCommits jobEntityWinners = jobId2WinnerEntitiesMap.get(txnId); //to delete any spilled files as well jobEntityWinners.clear(); - jobId2WinnerEntitiesMap.remove(jobId); + jobId2WinnerEntitiesMap.remove(txnId); } } private void analyzeEntityCommitLog(ILogRecord logRecord) throws IOException { - int jobId = logRecord.getJobId(); + long txnId = logRecord.getTxnId(); JobEntityCommits jobEntityWinners; - if (!jobId2WinnerEntitiesMap.containsKey(jobId)) { - jobEntityWinners = new JobEntityCommits(jobId); + if (!jobId2WinnerEntitiesMap.containsKey(txnId)) { + jobEntityWinners = new JobEntityCommits(txnId); if (needToFreeMemory()) { // If we don't have enough memory for one more job, // we will force all jobs to spill their cached entities to disk. // This could happen only when we have many jobs with small // number of records and none of them have job commit. - freeJobsCachedEntities(jobId); + freeJobsCachedEntities(txnId); } - jobId2WinnerEntitiesMap.put(jobId, jobEntityWinners); + jobId2WinnerEntitiesMap.put(txnId, jobEntityWinners); } else { - jobEntityWinners = jobId2WinnerEntitiesMap.get(jobId); + jobEntityWinners = jobId2WinnerEntitiesMap.get(txnId); } jobEntityWinners.add(logRecord); } private synchronized void startRecoveryRedoPhase(Set<Integer> partitions, ILogReader logReader, - long lowWaterMarkLSN, Set<Integer> winnerJobSet) throws IOException, ACIDException { + long lowWaterMarkLSN, Set<Long> winnerTxnSet) throws IOException, ACIDException { int redoCount = 0; - int jobId = -1; + long jobId; long resourceId; long maxDiskLastLsn; @@ -296,7 +296,7 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent { Map<Long, LocalResource> resourcesMap = localResourceRepository.loadAndGetAllResources(); Map<Long, Long> resourceId2MaxLSNMap = new HashMap<>(); - TxnId tempKeyTxnId = new TxnId(-1, -1, -1, null, -1, false); + TxnEntityId tempKeyTxnEntityId = new TxnEntityId(-1, -1, -1, null, -1, false); ILogRecord logRecord = null; try { @@ -307,18 +307,18 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent { LOGGER.info(logRecord.getLogRecordForDisplay()); } lsn = logRecord.getLSN(); - jobId = logRecord.getJobId(); + jobId = logRecord.getTxnId(); foundWinner = false; switch (logRecord.getLogType()) { case LogType.UPDATE: if (partitions.contains(logRecord.getResourcePartition())) { - if (winnerJobSet.contains(jobId)) { + if (winnerTxnSet.contains(jobId)) { foundWinner = true; } else if (jobId2WinnerEntitiesMap.containsKey(jobId)) { jobEntityWinners = jobId2WinnerEntitiesMap.get(jobId); - tempKeyTxnId.setTxnId(jobId, logRecord.getDatasetId(), logRecord.getPKHashValue(), + tempKeyTxnEntityId.setTxnId(jobId, logRecord.getDatasetId(), logRecord.getPKHashValue(), logRecord.getPKValue(), logRecord.getPKValueSize()); - if (jobEntityWinners.containsEntityCommitForTxnId(lsn, tempKeyTxnId)) { + if (jobEntityWinners.containsEntityCommitForTxnId(lsn, tempKeyTxnEntityId)) { foundWinner = true; } } @@ -449,9 +449,9 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent { } @Override - public File createJobRecoveryFile(int jobId, String fileName) throws IOException { + public File createJobRecoveryFile(long txnId, String fileName) throws IOException { String recoveryDirPath = getRecoveryDirPath(); - Path jobRecoveryFolder = Paths.get(recoveryDirPath + File.separator + jobId); + Path jobRecoveryFolder = Paths.get(recoveryDirPath + File.separator + txnId); if (!Files.exists(jobRecoveryFolder)) { Files.createDirectories(jobRecoveryFolder); } @@ -459,10 +459,10 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent { File jobRecoveryFile = new File(jobRecoveryFolder.toString() + File.separator + fileName); if (!jobRecoveryFile.exists()) { if (!jobRecoveryFile.createNewFile()) { - throw new IOException("Failed to create file: " + fileName + " for job id(" + jobId + ")"); + throw new IOException("Failed to create file: " + fileName + " for txn id(" + txnId + ")"); } } else { - throw new IOException("File: " + fileName + " for job id(" + jobId + ") already exists"); + throw new IOException("File: " + fileName + " for txn id(" + txnId + ") already exists"); } return jobRecoveryFile; } @@ -483,11 +483,11 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent { return logDir + RECOVERY_FILES_DIR_NAME; } - private void freeJobsCachedEntities(int requestingJobId) throws IOException { + private void freeJobsCachedEntities(long requestingTxnId) throws IOException { if (jobId2WinnerEntitiesMap != null) { - for (Entry<Integer, JobEntityCommits> jobEntityCommits : jobId2WinnerEntitiesMap.entrySet()) { + for (Entry<Long, JobEntityCommits> jobEntityCommits : jobId2WinnerEntitiesMap.entrySet()) { //if the job is not the requester, free its memory - if (jobEntityCommits.getKey() != requestingJobId) { + if (jobEntityCommits.getKey() != requestingTxnId) { jobEntityCommits.getValue().spillToDiskAndfreeMemory(); } } @@ -496,7 +496,7 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent { @Override public void rollbackTransaction(ITransactionContext txnContext) throws ACIDException { - int abortedJobId = txnContext.getJobId().getId(); + long abortedTxnId = txnContext.getTxnId().getId(); // Obtain the first/last log record LSNs written by the Job long firstLSN = txnContext.getFirstLSN(); /* @@ -517,7 +517,7 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent { // check if the transaction actually wrote some logs. if (firstLSN == TransactionManagementConstants.LogManagerConstants.TERMINAL_LSN || firstLSN > lastLSN) { if (LOGGER.isLoggable(Level.INFO)) { - LOGGER.info("no need to roll back as there were no operations by the job " + txnContext.getJobId()); + LOGGER.info("no need to roll back as there were no operations by the txn " + txnContext.getTxnId()); } return; } @@ -527,13 +527,13 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent { LOGGER.info("collecting loser transaction's LSNs from " + firstLSN + " to " + lastLSN); } - Map<TxnId, List<Long>> jobLoserEntity2LSNsMap = new HashMap<>(); - TxnId tempKeyTxnId = new TxnId(-1, -1, -1, null, -1, false); + Map<TxnEntityId, List<Long>> jobLoserEntity2LSNsMap = new HashMap<>(); + TxnEntityId tempKeyTxnEntityId = new TxnEntityId(-1, -1, -1, null, -1, false); int updateLogCount = 0; int entityCommitLogCount = 0; - int logJobId = -1; + long logTxnId; long currentLSN = -1; - TxnId loserEntity = null; + TxnEntityId loserEntity; List<Long> undoLSNSet = null; //get active partitions on this node Set<Integer> activePartitions = localResourceRepository.getActivePartitions(); @@ -552,19 +552,20 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent { LOGGER.info(logRecord.getLogRecordForDisplay()); } } - logJobId = logRecord.getJobId(); - if (logJobId != abortedJobId) { + logTxnId = logRecord.getTxnId(); + if (logTxnId != abortedTxnId) { continue; } - tempKeyTxnId.setTxnId(logJobId, logRecord.getDatasetId(), logRecord.getPKHashValue(), + tempKeyTxnEntityId.setTxnId(logTxnId, logRecord.getDatasetId(), logRecord.getPKHashValue(), logRecord.getPKValue(), logRecord.getPKValueSize()); switch (logRecord.getLogType()) { case LogType.UPDATE: if (activePartitions.contains(logRecord.getResourcePartition())) { - undoLSNSet = jobLoserEntity2LSNsMap.get(tempKeyTxnId); + undoLSNSet = jobLoserEntity2LSNsMap.get(tempKeyTxnEntityId); if (undoLSNSet == null) { - loserEntity = new TxnId(logJobId, logRecord.getDatasetId(), logRecord.getPKHashValue(), - logRecord.getPKValue(), logRecord.getPKValueSize(), true); + loserEntity = + new TxnEntityId(logTxnId, logRecord.getDatasetId(), logRecord.getPKHashValue(), + logRecord.getPKValue(), logRecord.getPKValueSize(), true); undoLSNSet = new LinkedList<>(); jobLoserEntity2LSNsMap.put(loserEntity, undoLSNSet); } @@ -572,17 +573,17 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent { updateLogCount++; if (IS_DEBUG_MODE) { LOGGER.info(Thread.currentThread().getId() + "======> update[" + currentLSN + "]:" - + tempKeyTxnId); + + tempKeyTxnEntityId); } } break; case LogType.ENTITY_COMMIT: if (activePartitions.contains(logRecord.getResourcePartition())) { - jobLoserEntity2LSNsMap.remove(tempKeyTxnId); + jobLoserEntity2LSNsMap.remove(tempKeyTxnEntityId); entityCommitLogCount++; if (IS_DEBUG_MODE) { LOGGER.info(Thread.currentThread().getId() + "======> entity_commit[" + currentLSN + "]" - + tempKeyTxnId); + + tempKeyTxnEntityId); } } break; @@ -601,7 +602,7 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent { if (currentLSN != lastLSN) { throw new ACIDException("LastLSN mismatch: lastLSN(" + lastLSN + ") vs currentLSN(" + currentLSN - + ") during abort( " + txnContext.getJobId() + ")"); + + ") during abort( " + txnContext.getTxnId() + ")"); } //undo loserTxn's effect @@ -610,10 +611,10 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent { IDatasetLifecycleManager datasetLifecycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider().getDatasetLifecycleManager(); //TODO sort loser entities by smallest LSN to undo in one pass. - Iterator<Entry<TxnId, List<Long>>> iter = jobLoserEntity2LSNsMap.entrySet().iterator(); + Iterator<Entry<TxnEntityId, List<Long>>> iter = jobLoserEntity2LSNsMap.entrySet().iterator(); int undoCount = 0; while (iter.hasNext()) { - Map.Entry<TxnId, List<Long>> loserEntity2LSNsMap = iter.next(); + Map.Entry<TxnEntityId, List<Long>> loserEntity2LSNsMap = iter.next(); undoLSNSet = loserEntity2LSNsMap.getValue(); // The step below is important since the upsert operations must be done in reverse order. Collections.reverse(undoLSNSet); @@ -622,7 +623,7 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent { //read the corresponding log record to be undone. logRecord = logReader.read(undoLSN); if (logRecord == null) { - throw new ACIDException("IllegalState exception during abort( " + txnContext.getJobId() + ")"); + throw new ACIDException("IllegalState exception during abort( " + txnContext.getTxnId() + ")"); } if (IS_DEBUG_MODE) { LOGGER.info(logRecord.getLogRecordForDisplay()); @@ -713,25 +714,25 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent { private class JobEntityCommits { private static final String PARTITION_FILE_NAME_SEPARATOR = "_"; - private final int jobId; - private final Set<TxnId> cachedEntityCommitTxns = new HashSet<>(); + private final long txnId; + private final Set<TxnEntityId> cachedEntityCommitTxns = new HashSet<>(); private final List<File> jobEntitCommitOnDiskPartitionsFiles = new ArrayList<>(); //a flag indicating whether all the the commits for this jobs have been added. private boolean preparedForSearch = false; - private TxnId winnerEntity = null; + private TxnEntityId winnerEntity = null; private int currentPartitionSize = 0; private long partitionMaxLSN = 0; private String currentPartitonName; - public JobEntityCommits(int jobId) { - this.jobId = jobId; + public JobEntityCommits(long txnId) { + this.txnId = txnId; } public void add(ILogRecord logRecord) throws IOException { if (preparedForSearch) { throw new IOException("Cannot add new entity commits after preparing for search."); } - winnerEntity = new TxnId(logRecord.getJobId(), logRecord.getDatasetId(), logRecord.getPKHashValue(), + winnerEntity = new TxnEntityId(logRecord.getTxnId(), logRecord.getDatasetId(), logRecord.getPKHashValue(), logRecord.getPKValue(), logRecord.getPKValueSize(), true); cachedEntityCommitTxns.add(winnerEntity); //since log file is read sequentially, LSNs are always increasing @@ -772,15 +773,15 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent { preparedForSearch = true; } - public boolean containsEntityCommitForTxnId(long logLSN, TxnId txnId) throws IOException { + public boolean containsEntityCommitForTxnId(long logLSN, TxnEntityId txnEntityId) throws IOException { //if we don't have any partitions on disk, search only from memory if (jobEntitCommitOnDiskPartitionsFiles.size() == 0) { - return cachedEntityCommitTxns.contains(txnId); + return cachedEntityCommitTxns.contains(txnEntityId); } else { //get candidate partitions from disk ArrayList<File> candidatePartitions = getCandidiatePartitions(logLSN); for (File partition : candidatePartitions) { - if (serachPartition(partition, txnId)) { + if (serachPartition(partition, txnEntityId)) { return true; } } @@ -814,17 +815,17 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent { jobEntitCommitOnDiskPartitionsFiles.clear(); } - private boolean serachPartition(File partition, TxnId txnId) throws IOException { + private boolean serachPartition(File partition, TxnEntityId txnEntityId) throws IOException { //load partition from disk if it is not already in memory if (!partition.getName().equals(currentPartitonName)) { loadPartitionToMemory(partition, cachedEntityCommitTxns); currentPartitonName = partition.getName(); } - return cachedEntityCommitTxns.contains(txnId); + return cachedEntityCommitTxns.contains(txnEntityId); } private String getPartitionName(long maxLSN) { - return jobId + PARTITION_FILE_NAME_SEPARATOR + maxLSN; + return txnId + PARTITION_FILE_NAME_SEPARATOR + maxLSN; } private long getPartitionMaxLSNFromName(String partitionName) { @@ -835,18 +836,18 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent { //if we don't have enough memory to allocate for this partition, // we will ask recovery manager to free memory if (needToFreeMemory()) { - freeJobsCachedEntities(jobId); + freeJobsCachedEntities(txnId); } //allocate a buffer that can hold the current partition ByteBuffer buffer = ByteBuffer.allocate(currentPartitionSize); - for (Iterator<TxnId> iterator = cachedEntityCommitTxns.iterator(); iterator.hasNext();) { - TxnId txnId = iterator.next(); + for (Iterator<TxnEntityId> iterator = cachedEntityCommitTxns.iterator(); iterator.hasNext();) { + TxnEntityId txnEntityId = iterator.next(); //serialize the object and remove it from memory - txnId.serialize(buffer); + txnEntityId.serialize(buffer); iterator.remove(); } //name partition file based on job id and max lsn - File partitionFile = createJobRecoveryFile(jobId, getPartitionName(partitionMaxLSN)); + File partitionFile = createJobRecoveryFile(txnId, getPartitionName(partitionMaxLSN)); //write file to disk try (FileOutputStream fileOutputstream = new FileOutputStream(partitionFile, false); FileChannel fileChannel = fileOutputstream.getChannel()) { @@ -858,11 +859,11 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent { jobEntitCommitOnDiskPartitionsFiles.add(partitionFile); } - private void loadPartitionToMemory(File partition, Set<TxnId> partitionTxn) throws IOException { + private void loadPartitionToMemory(File partition, Set<TxnEntityId> partitionTxn) throws IOException { partitionTxn.clear(); //if we don't have enough memory to a load partition, we will ask recovery manager to free memory if (needToFreeMemory()) { - freeJobsCachedEntities(jobId); + freeJobsCachedEntities(txnId); } ByteBuffer buffer = ByteBuffer.allocateDirect((int) partition.length()); //load partition to memory @@ -873,9 +874,9 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent { } } buffer.flip(); - TxnId temp = null; + TxnEntityId temp; while (buffer.remaining() != 0) { - temp = TxnId.deserialize(buffer); + temp = TxnEntityId.deserialize(buffer); partitionTxn.add(temp); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/592af654/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java index 608769b..e42b5e5 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java @@ -42,7 +42,7 @@ import org.apache.asterix.common.exceptions.ACIDException; import org.apache.asterix.common.exceptions.AsterixException; import org.apache.asterix.common.functions.FunctionSignature; import org.apache.asterix.common.messaging.api.ICCMessageBroker; -import org.apache.asterix.common.transactions.JobId; +import org.apache.asterix.common.transactions.TxnId; import org.apache.asterix.common.utils.StoragePathUtil; import org.apache.asterix.compiler.provider.SqlppCompilationProvider; import org.apache.asterix.external.api.IAdapterFactory; @@ -279,7 +279,7 @@ public class FeedOperations { Map<ConnectorDescriptorId, ConnectorDescriptorId> connectorIdMapping = new HashMap<>(); Map<OperatorDescriptorId, List<LocationConstraint>> operatorLocations = new HashMap<>(); Map<OperatorDescriptorId, Integer> operatorCounts = new HashMap<>(); - List<JobId> jobIds = new ArrayList<>(); + List<TxnId> txnIds = new ArrayList<>(); FeedMetaOperatorDescriptor metaOp; for (int iter1 = 0; iter1 < jobsList.size(); iter1++) { @@ -415,11 +415,11 @@ public class FeedOperations { for (OperatorDescriptorId root : subJob.getRoots()) { jobSpec.addRoot(jobSpec.getOperatorMap().get(operatorIdMapping.get(root))); } - jobIds.add(((JobEventListenerFactory) subJob.getJobletEventListenerFactory()).getJobId()); + txnIds.add(((JobEventListenerFactory) subJob.getJobletEventListenerFactory()).getTxnId()); } // jobEventListenerFactory - jobSpec.setJobletEventListenerFactory(new MultiTransactionJobletEventListenerFactory(jobIds, true)); + jobSpec.setJobletEventListenerFactory(new MultiTransactionJobletEventListenerFactory(txnIds, true)); // useConnectorSchedulingPolicy jobSpec.setUseConnectorPolicyForScheduling(jobsList.get(0).isUseConnectorPolicyForScheduling()); // connectorAssignmentPolicy http://git-wip-us.apache.org/repos/asf/asterixdb/blob/592af654/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FlushDatasetUtil.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FlushDatasetUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FlushDatasetUtil.java index 958444c..4137fbd 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FlushDatasetUtil.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FlushDatasetUtil.java @@ -20,12 +20,13 @@ package org.apache.asterix.utils; import org.apache.asterix.common.config.CompilerProperties; +import org.apache.asterix.common.transactions.TxnId; import org.apache.asterix.common.utils.JobUtils; import org.apache.asterix.metadata.declared.MetadataProvider; import org.apache.asterix.metadata.entities.Dataset; import org.apache.asterix.runtime.job.listener.JobEventListenerFactory; import org.apache.asterix.runtime.operators.std.FlushDatasetOperatorDescriptor; -import org.apache.asterix.transaction.management.service.transaction.JobIdFactory; +import org.apache.asterix.transaction.management.service.transaction.TxnIdFactory; import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint; import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper; import org.apache.hyracks.algebricks.common.utils.Pair; @@ -59,8 +60,8 @@ public class FlushDatasetUtil { AlgebricksMetaOperatorDescriptor emptySource = new AlgebricksMetaOperatorDescriptor(spec, 0, 1, new IPushRuntimeFactory[] { new EmptyTupleSourceRuntimeFactory() }, rDescs); - org.apache.asterix.common.transactions.JobId jobId = JobIdFactory.generateJobId(); - FlushDatasetOperatorDescriptor flushOperator = new FlushDatasetOperatorDescriptor(spec, jobId, + TxnId txnId = TxnIdFactory.create(); + FlushDatasetOperatorDescriptor flushOperator = new FlushDatasetOperatorDescriptor(spec, txnId, dataset.getDatasetId()); spec.connect(new OneToOneConnectorDescriptor(spec), emptySource, 0, flushOperator, 0); @@ -72,7 +73,7 @@ public class FlushDatasetUtil { AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, emptySource, primaryPartitionConstraint); - JobEventListenerFactory jobEventListenerFactory = new JobEventListenerFactory(jobId, true); + JobEventListenerFactory jobEventListenerFactory = new JobEventListenerFactory(txnId, true); spec.setJobletEventListenerFactory(jobEventListenerFactory); JobUtils.runJob(hcc, spec, true); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/592af654/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java index 9e34d70..42f577f 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java @@ -34,7 +34,7 @@ import org.apache.asterix.active.IActiveEntityEventsListener; import org.apache.asterix.app.active.ActiveNotificationHandler; import org.apache.asterix.common.api.IMetadataLockManager; import org.apache.asterix.common.dataflow.ICcApplicationContext; -import org.apache.asterix.common.transactions.JobId; +import org.apache.asterix.common.transactions.TxnId; import org.apache.asterix.common.utils.JobUtils; import org.apache.asterix.dataflow.data.nontagged.MissingWriterFactory; import org.apache.asterix.metadata.MetadataManager; @@ -47,7 +47,7 @@ import org.apache.asterix.metadata.utils.DatasetUtil; import org.apache.asterix.metadata.utils.IndexUtil; import org.apache.asterix.rebalance.IDatasetRebalanceCallback; import org.apache.asterix.runtime.job.listener.JobEventListenerFactory; -import org.apache.asterix.transaction.management.service.transaction.JobIdFactory; +import org.apache.asterix.transaction.management.service.transaction.TxnIdFactory; import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint; import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; @@ -275,21 +275,21 @@ public class RebalanceUtil { private static void populateDataToRebalanceTarget(Dataset source, Dataset target, MetadataProvider metadataProvider, IHyracksClientConnection hcc) throws Exception { JobSpecification spec = new JobSpecification(); - JobId jobId = JobIdFactory.generateJobId(); - JobEventListenerFactory jobEventListenerFactory = new JobEventListenerFactory(jobId, true); + TxnId txnId = TxnIdFactory.create(); + JobEventListenerFactory jobEventListenerFactory = new JobEventListenerFactory(txnId, true); spec.setJobletEventListenerFactory(jobEventListenerFactory); // The pipeline starter. IOperatorDescriptor starter = DatasetUtil.createDummyKeyProviderOp(spec, source, metadataProvider); // Creates primary index scan op. - IOperatorDescriptor primaryScanOp = DatasetUtil.createPrimaryIndexScanOp(spec, metadataProvider, source, jobId); + IOperatorDescriptor primaryScanOp = DatasetUtil.createPrimaryIndexScanOp(spec, metadataProvider, source, txnId); // Creates secondary BTree upsert op. IOperatorDescriptor upsertOp = createPrimaryIndexUpsertOp(spec, metadataProvider, source, target); // The final commit operator. - IOperatorDescriptor commitOp = createUpsertCommitOp(spec, metadataProvider, jobId, target); + IOperatorDescriptor commitOp = createUpsertCommitOp(spec, metadataProvider, txnId, target); // Connects empty-tuple-source and scan. spec.connect(new OneToOneConnectorDescriptor(spec), starter, 0, primaryScanOp, 0); @@ -326,11 +326,11 @@ public class RebalanceUtil { // Creates the commit operator for populating the target dataset. private static IOperatorDescriptor createUpsertCommitOp(JobSpecification spec, MetadataProvider metadataProvider, - JobId jobId, Dataset target) throws AlgebricksException { + TxnId txnId, Dataset target) throws AlgebricksException { int[] primaryKeyFields = getPrimaryKeyPermutationForUpsert(target); return new AlgebricksMetaOperatorDescriptor(spec, 1, 0, new IPushRuntimeFactory[] { - target.getCommitRuntimeFactory(metadataProvider, jobId, primaryKeyFields, true) }, + target.getCommitRuntimeFactory(metadataProvider, txnId, primaryKeyFields, true) }, new RecordDescriptor[] { target.getPrimaryRecordDescriptor(metadataProvider) }); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/592af654/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java index 1810517..df8eef7 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java @@ -40,6 +40,7 @@ import org.apache.asterix.common.dataflow.LSMInsertDeleteOperatorNodePushable; import org.apache.asterix.common.exceptions.ACIDException; import org.apache.asterix.common.transactions.IRecoveryManager.ResourceType; import org.apache.asterix.common.transactions.ITransactionManager; +import org.apache.asterix.common.transactions.TxnId; import org.apache.asterix.dataflow.data.nontagged.MissingWriterFactory; import org.apache.asterix.file.StorageComponentProvider; import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider; @@ -152,8 +153,8 @@ public class TestNodeController { ExecutionTestUtil.tearDown(cleanupOnStop); } - public org.apache.asterix.common.transactions.JobId getTxnJobId(IHyracksTaskContext ctx) { - return new org.apache.asterix.common.transactions.JobId((int) ctx.getJobletContext().getJobId().getId()); + public TxnId getTxnJobId(IHyracksTaskContext ctx) { + return new TxnId(ctx.getJobletContext().getJobId().getId()); } public Pair<LSMInsertDeleteOperatorNodePushable, CommitRuntime> getInsertPipeline(IHyracksTaskContext ctx, http://git-wip-us.apache.org/repos/asf/asterixdb/blob/592af654/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/AbstractOperationCallbackFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/AbstractOperationCallbackFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/AbstractOperationCallbackFactory.java index d1d869e..d2b1276 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/AbstractOperationCallbackFactory.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/AbstractOperationCallbackFactory.java @@ -26,15 +26,15 @@ import org.apache.asterix.common.context.ITransactionSubsystemProvider; public abstract class AbstractOperationCallbackFactory implements Serializable { private static final long serialVersionUID = 1L; - protected final JobId jobId; + protected final TxnId txnId; protected final int datasetId; protected final int[] primaryKeyFields; protected final ITransactionSubsystemProvider txnSubsystemProvider; protected final byte resourceType; - public AbstractOperationCallbackFactory(JobId jobId, int datasetId, int[] primaryKeyFields, + public AbstractOperationCallbackFactory(TxnId txnId, int datasetId, int[] primaryKeyFields, ITransactionSubsystemProvider txnSubsystemProvider, byte resourceType) { - this.jobId = jobId; + this.txnId = txnId; this.datasetId = datasetId; this.primaryKeyFields = primaryKeyFields; this.txnSubsystemProvider = txnSubsystemProvider; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/592af654/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/Checkpoint.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/Checkpoint.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/Checkpoint.java index a4c41df..cb278a7 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/Checkpoint.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/Checkpoint.java @@ -31,19 +31,19 @@ public class Checkpoint implements Comparable<Checkpoint> { private final long checkpointLsn; private final long minMCTFirstLsn; - private final int maxJobId; + private final long maxTxnId; private final long timeStamp; private final boolean sharp; private final int storageVersion; @JsonCreator public Checkpoint(@JsonProperty("checkpointLsn") long checkpointLsn, - @JsonProperty("minMCTFirstLsn") long minMCTFirstLsn, @JsonProperty("maxJobId") int maxJobId, + @JsonProperty("minMCTFirstLsn") long minMCTFirstLsn, @JsonProperty("maxJobId") long maxTxnId, @JsonProperty("timeStamp") long timeStamp, @JsonProperty("sharp") boolean sharp, @JsonProperty("storageVersion") int storageVersion) { this.checkpointLsn = checkpointLsn; this.minMCTFirstLsn = minMCTFirstLsn; - this.maxJobId = maxJobId; + this.maxTxnId = maxTxnId; this.timeStamp = timeStamp; this.sharp = sharp; this.storageVersion = storageVersion; @@ -57,8 +57,8 @@ public class Checkpoint implements Comparable<Checkpoint> { return minMCTFirstLsn; } - public int getMaxJobId() { - return maxJobId; + public long getMaxJobId() { + return maxTxnId; } public long getTimeStamp() { @@ -108,7 +108,7 @@ public class Checkpoint implements Comparable<Checkpoint> { final int prime = 31; int result = 1; result = prime * result + (int) (checkpointLsn ^ (checkpointLsn >>> 32)); - result = prime * result + maxJobId; + result = prime * result + Long.hashCode(maxTxnId); result = prime * result + (int) (minMCTFirstLsn ^ (minMCTFirstLsn >>> 32)); result = prime * result + (sharp ? 1231 : 1237); result = prime * result + storageVersion; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/592af654/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java index 6ee0980..4090b65 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java @@ -32,36 +32,33 @@ public interface ILogRecord { LARGE_RECORD } - public static final int CHKSUM_LEN = Long.BYTES; - public static final int FLDCNT_LEN = Integer.BYTES; - public static final int DS_LEN = Integer.BYTES; - public static final int LOG_SOURCE_LEN = Byte.BYTES; - public static final int LOGRCD_SZ_LEN = Integer.BYTES; - public static final int NEWOP_LEN = Byte.BYTES; - public static final int NEWVALSZ_LEN = Integer.BYTES; - public static final int PKHASH_LEN = Integer.BYTES; - public static final int PKSZ_LEN = Integer.BYTES; - public static final int PRVLSN_LEN = Long.BYTES; - public static final int RS_PARTITION_LEN = Integer.BYTES; - public static final int RSID_LEN = Long.BYTES; - public static final int SEQ_NUM_LEN = Long.BYTES; - public static final int TYPE_LEN = Byte.BYTES; - public static final int UUID_LEN = Long.BYTES; - - public static final int ALL_RECORD_HEADER_LEN = LOG_SOURCE_LEN + TYPE_LEN + JobId.BYTES; - public static final int ENTITYCOMMIT_UPDATE_HEADER_LEN = RS_PARTITION_LEN + DatasetId.BYTES + PKHASH_LEN + PKSZ_LEN; - public static final int UPDATE_LSN_HEADER = RSID_LEN + LOGRCD_SZ_LEN; - public static final int UPDATE_BODY_HEADER = FLDCNT_LEN + NEWOP_LEN + NEWVALSZ_LEN; - // What are these fields? vvvvv - public static final int REMOTE_FLUSH_LOG_EXTRA_FIELDS_LEN = Long.BYTES + Integer.BYTES + Integer.BYTES; - - // How are the following computed? - public static final int JOB_TERMINATE_LOG_SIZE = ALL_RECORD_HEADER_LEN + CHKSUM_LEN; - public static final int ENTITY_COMMIT_LOG_BASE_SIZE = 30; // ALL_RECORD_HEADER_LEN + CHKSUM_LEN +? - public static final int UPDATE_LOG_BASE_SIZE = 51; // ALL_RECORD_HEADER_LEN + CHKSUM_LEN +? - public static final int FLUSH_LOG_SIZE = 18; // ALL_RECORD_HEADER_LEN + CHKSUM_LEN +? - public static final int WAIT_LOG_SIZE = ALL_RECORD_HEADER_LEN + CHKSUM_LEN; - public static final int MARKER_BASE_LOG_SIZE = + int CHKSUM_LEN = Long.BYTES; + int FLDCNT_LEN = Integer.BYTES; + int DS_LEN = Integer.BYTES; + int LOG_SOURCE_LEN = Byte.BYTES; + int LOGRCD_SZ_LEN = Integer.BYTES; + int NEWOP_LEN = Byte.BYTES; + int NEWVALSZ_LEN = Integer.BYTES; + int PKHASH_LEN = Integer.BYTES; + int PKSZ_LEN = Integer.BYTES; + int PRVLSN_LEN = Long.BYTES; + int RS_PARTITION_LEN = Integer.BYTES; + int RSID_LEN = Long.BYTES; + int SEQ_NUM_LEN = Long.BYTES; + int TYPE_LEN = Byte.BYTES; + int UUID_LEN = Long.BYTES; + + int ALL_RECORD_HEADER_LEN = LOG_SOURCE_LEN + TYPE_LEN + TxnId.BYTES; + int ENTITYCOMMIT_UPDATE_HEADER_LEN = RS_PARTITION_LEN + DatasetId.BYTES + PKHASH_LEN + PKSZ_LEN; + int UPDATE_LSN_HEADER = RSID_LEN + LOGRCD_SZ_LEN; + int UPDATE_BODY_HEADER = FLDCNT_LEN + NEWOP_LEN + NEWVALSZ_LEN; + + int JOB_TERMINATE_LOG_SIZE = ALL_RECORD_HEADER_LEN + CHKSUM_LEN; + int ENTITY_COMMIT_LOG_BASE_SIZE = ALL_RECORD_HEADER_LEN + ENTITYCOMMIT_UPDATE_HEADER_LEN + CHKSUM_LEN; + int UPDATE_LOG_BASE_SIZE = ENTITY_COMMIT_LOG_BASE_SIZE + UPDATE_LSN_HEADER + UPDATE_BODY_HEADER; + int FLUSH_LOG_SIZE = ALL_RECORD_HEADER_LEN + DatasetId.BYTES + CHKSUM_LEN; + int WAIT_LOG_SIZE = ALL_RECORD_HEADER_LEN + CHKSUM_LEN; + int MARKER_BASE_LOG_SIZE = ALL_RECORD_HEADER_LEN + CHKSUM_LEN + DS_LEN + RS_PARTITION_LEN + PRVLSN_LEN + LOGRCD_SZ_LEN; public RecordReadStatus readLogRecord(ByteBuffer buffer); @@ -80,9 +77,9 @@ public interface ILogRecord { public void setLogType(byte logType); - public int getJobId(); + long getTxnId(); - public void setJobId(int jobId); + void setTxnId(long jobId); public int getDatasetId(); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/592af654/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java index 7965aa5..dea7a67 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java @@ -98,13 +98,13 @@ public interface IRecoveryManager { /** * Creates a temporary file to be used during recovery * - * @param jobId + * @param txnId * @param fileName * @return A file to the created temporary file * @throws IOException - * if the file for the specified {@code jobId} with the {@code fileName} already exists + * if the file for the specified {@code txnId} with the {@code fileName} already exists */ - File createJobRecoveryFile(int jobId, String fileName) throws IOException; + File createJobRecoveryFile(long txnId, String fileName) throws IOException; /** * Deletes all temporary recovery files http://git-wip-us.apache.org/repos/asf/asterixdb/blob/592af654/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionContext.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionContext.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionContext.java index f9d924f..3dda5d3 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionContext.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionContext.java @@ -25,7 +25,7 @@ public interface ITransactionContext { public void registerIndexAndCallback(long resourceId, ILSMIndex index, AbstractOperationCallback callback, boolean isPrimaryIndex); - public JobId getJobId(); + public TxnId getTxnId(); public void setTimeout(boolean isTimeout); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/592af654/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionManager.java index 0123814..77c6a9f 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionManager.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionManager.java @@ -42,26 +42,26 @@ public interface ITransactionManager { * Begins a transaction identified by a transaction id and returns the * associated transaction context. * - * @param jobId + * @param txnId * a unique value for the transaction id. * @return the transaction context associated with the initiated transaction * @see ITransactionContext * @throws ACIDException */ - public ITransactionContext beginTransaction(JobId jobId) throws ACIDException; + public ITransactionContext beginTransaction(TxnId txnId) throws ACIDException; /** * Returns the transaction context of an active transaction given the * transaction id. * - * @param jobId + * @param txnId * a unique value for the transaction id. * @param createIfNotExist * TODO * @return * @throws ACIDException */ - public ITransactionContext getTransactionContext(JobId jobId, boolean createIfNotExist) throws ACIDException; + public ITransactionContext getTransactionContext(TxnId txnId, boolean createIfNotExist) throws ACIDException; /** * Commits a transaction. @@ -73,7 +73,7 @@ public interface ITransactionManager { * @param pkHash * TODO * @throws ACIDException - * @see ITransactionContextimport org.apache.hyracks.api.job.JobId; + * @see ITransactionContextimport org.apache.hyracks.api.job.TxnId; * @see ACIDException */ public void commitTransaction(ITransactionContext txnContext, DatasetId datasetId, int pkHash) @@ -125,8 +125,8 @@ public interface ITransactionManager { public ITransactionSubsystem getTransactionSubsystem(); /** - * @return The current max job id. + * @return The current max txn id. */ - int getMaxJobId(); + long getMaxTxnId(); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/592af654/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/JobId.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/JobId.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/JobId.java deleted file mode 100644 index 9654a92..0000000 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/JobId.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.common.transactions; - -import java.io.Serializable; - -public class JobId implements Serializable { - private static final long serialVersionUID = 1L; - /** - * The number of bytes used to represent {@link JobId} value. - */ - public static final int BYTES = Integer.BYTES; - - private int id; - - public JobId(int id) { - this.id = id; - } - - public int getId() { - return id; - } - - @Override - public int hashCode() { - return id; - } - - @Override - public boolean equals(Object o) { - if (o == this) { - return true; - } - if (!(o instanceof JobId)) { - return false; - } - return ((JobId) o).id == id; - } - - @Override - public String toString() { - return "JID:" + id; - } - - public void setId(int jobId) { - id = jobId; - } -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/592af654/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java index e80cfa6..68afb2a 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java @@ -32,10 +32,10 @@ import org.apache.hyracks.storage.am.common.tuples.SimpleTupleWriter; /** * == LogRecordFormat == * --------------------------- - * [Header1] (6 bytes) : for all log types + * [Header1] (10 bytes) : for all log types * LogSource(1) * LogType(1) - * JobId(4) + * TxnId(8) * --------------------------- * [Header2] (16 bytes + PKValueSize) : for entity_commit, upsert_entity_commit, and update log types * DatasetId(4) //stored in dataset_dataset in Metadata Node @@ -57,16 +57,6 @@ import org.apache.hyracks.storage.am.common.tuples.SimpleTupleWriter; * [Tail] (8 bytes) : for all log types * Checksum(8) * --------------------------- - * = LogSize = - * 1) JOB_COMMIT_LOG_SIZE: 14 bytes (Header1(6) + Tail(8)) - * 2) ENTITY_COMMIT || UPSERT_ENTITY_COMMIT: (Header1(6) + Header2(16) + Tail(8)) + PKValueSize - * --> ENTITY_COMMIT_LOG_BASE_SIZE = 30 - * 3) UPDATE: (Header1(6) + Header2(16) + + Header3(20) + Body(9) + Tail(8)) + PKValueSize + NewValueSize - * --> UPDATE_LOG_BASE_SIZE = 59 - * 4) FLUSH: 18 bytes (Header1(6) + DatasetId(4) + Tail(8)) - * 5) WAIT_LOG_SIZE: 14 bytes (Header1(6) + Tail(8)) - * --> WAIT_LOG only requires LogType Field, but in order to conform the log reader protocol - * it also includes LogSource and JobId fields. */ public class LogRecord implements ILogRecord { @@ -74,7 +64,7 @@ public class LogRecord implements ILogRecord { // ------------- fields in a log record (begin) ------------// private byte logSource; private byte logType; - private int jobId; + private long txnId; private int datasetId; private int PKHashValue; private int PKValueSize; @@ -130,7 +120,7 @@ public class LogRecord implements ILogRecord { private void doWriteLogRecord(ByteBuffer buffer) { buffer.put(logSource); buffer.put(logType); - buffer.putInt(jobId); + buffer.putLong(txnId); switch (logType) { case LogType.ENTITY_COMMIT: writeEntityInfo(buffer); @@ -248,7 +238,7 @@ public class LogRecord implements ILogRecord { } logSource = buffer.get(); logType = buffer.get(); - jobId = buffer.getInt(); + txnId = buffer.getLong(); switch (logType) { case LogType.FLUSH: if (buffer.remaining() < ILogRecord.DS_LEN) { @@ -454,7 +444,7 @@ public class LogRecord implements ILogRecord { builder.append(" LSN : ").append(LSN); builder.append(" LogType : ").append(LogType.toString(logType)); builder.append(" LogSize : ").append(logSize); - builder.append(" JobId : ").append(jobId); + builder.append(" TxnId : ").append(txnId); if (logType == LogType.ENTITY_COMMIT || logType == LogType.UPDATE) { builder.append(" DatasetId : ").append(datasetId); builder.append(" ResourcePartition : ").append(resourcePartition); @@ -503,13 +493,13 @@ public class LogRecord implements ILogRecord { } @Override - public int getJobId() { - return jobId; + public long getTxnId() { + return txnId; } @Override - public void setJobId(int jobId) { - this.jobId = jobId; + public void setTxnId(long jobId) { + this.txnId = jobId; } @Override http://git-wip-us.apache.org/repos/asf/asterixdb/blob/592af654/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/TxnId.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/TxnId.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/TxnId.java new file mode 100644 index 0000000..b0d38b8 --- /dev/null +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/TxnId.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.common.transactions; + +import java.io.Serializable; + +public class TxnId implements Serializable { + private static final long serialVersionUID = 1L; + /** + * The number of bytes used to represent {@link TxnId} value. + */ + public static final int BYTES = Long.BYTES; + + protected long id; + + public TxnId(long id) { + this.id = id; + } + + public long getId() { + return id; + } + + @Override + public int hashCode() { + return Long.hashCode(id); + } + + @Override + public boolean equals(Object o) { + if (o == this) { + return true; + } + if (!(o instanceof TxnId)) { + return false; + } + return ((TxnId) o).id == id; + } + + @Override + public String toString() { + return "TxnId:" + id; + } + +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/592af654/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/TransactionUtil.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/TransactionUtil.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/TransactionUtil.java index e9f96f9..be4b47f 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/TransactionUtil.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/TransactionUtil.java @@ -34,21 +34,21 @@ public class TransactionUtil { public static void formJobTerminateLogRecord(ITransactionContext txnCtx, LogRecord logRecord, boolean isCommit) { logRecord.setTxnCtx(txnCtx); - TransactionUtil.formJobTerminateLogRecord(logRecord, txnCtx.getJobId().getId(), isCommit); + TransactionUtil.formJobTerminateLogRecord(logRecord, txnCtx.getTxnId().getId(), isCommit); } - public static void formJobTerminateLogRecord(LogRecord logRecord, int jobId, boolean isCommit) { + public static void formJobTerminateLogRecord(LogRecord logRecord, long txnId, boolean isCommit) { logRecord.setLogType(isCommit ? LogType.JOB_COMMIT : LogType.ABORT); logRecord.setDatasetId(-1); logRecord.setPKHashValue(-1); - logRecord.setJobId(jobId); + logRecord.setTxnId(txnId); logRecord.computeAndSetLogSize(); } public static void formFlushLogRecord(LogRecord logRecord, int datasetId, PrimaryIndexOperationTracker opTracker, String nodeId, int numberOfIndexes) { logRecord.setLogType(LogType.FLUSH); - logRecord.setJobId(-1); + logRecord.setTxnId(-1); logRecord.setDatasetId(datasetId); logRecord.setOpTracker(opTracker); logRecord.setNumOfFlushedIndexes(numberOfIndexes); @@ -60,7 +60,7 @@ public class TransactionUtil { int PKHashValue, ITupleReference PKValue, int[] PKFields, int resourcePartition, byte entityCommitType) { logRecord.setTxnCtx(txnCtx); logRecord.setLogType(entityCommitType); - logRecord.setJobId(txnCtx.getJobId().getId()); + logRecord.setTxnId(txnCtx.getTxnId().getId()); logRecord.setDatasetId(datasetId); logRecord.setPKHashValue(PKHashValue); logRecord.setPKFieldCnt(PKFields.length); @@ -76,7 +76,7 @@ public class TransactionUtil { logRecord.setTxnCtx(txnCtx); logRecord.setLogSource(LogSource.LOCAL); logRecord.setLogType(LogType.MARKER); - logRecord.setJobId(txnCtx.getJobId().getId()); + logRecord.setTxnId(txnCtx.getTxnId().getId()); logRecord.setDatasetId(datasetId); logRecord.setResourcePartition(resourcePartition); marker.get(); // read the first byte since it is not part of the marker object
