Repository: asterixdb Updated Branches: refs/heads/master 3c6dfc9da -> fae8224e2
[NO ISSUE][TX] Ensure NC Max Txn ID is Initialized on Startup - user model changes: no - storage format changes: no - interface changes: no Details: - Initialize the value of max txn id on the NC to the value stored in the latest checkpoint or the maximum txn id encountered during recovery. This ensures that the correct max value is reported to the CC after NC startup. Change-Id: Ib529d41c5c219f4b761752a68398127388286d67 Reviewed-on: https://asterix-gerrit.ics.uci.edu/2161 Reviewed-by: Murtadha Hubail <[email protected]> Integration-Tests: Murtadha Hubail <[email protected]> Tested-by: Murtadha Hubail <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/fae8224e Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/fae8224e Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/fae8224e Branch: refs/heads/master Commit: fae8224e2b728211a82fa107ff6bec3f35412d2a Parents: 3c6dfc9 Author: Murtadha Hubail <[email protected]> Authored: Sat Nov 18 15:25:19 2017 +0300 Committer: Murtadha Hubail <[email protected]> Committed: Sat Nov 18 04:28:15 2017 -0800 ---------------------------------------------------------------------- .../org/apache/asterix/app/nc/RecoveryManager.java | 13 +++++++------ .../apache/asterix/app/nc/TransactionSubsystem.java | 11 +++++++---- .../apache/asterix/common/transactions/Checkpoint.java | 2 +- .../common/transactions/ITransactionManager.java | 7 +++++++ .../service/transaction/TransactionManager.java | 3 ++- 5 files changed, 24 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fae8224e/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java index 2435b60..7bc5697 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java @@ -280,7 +280,7 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent { private synchronized void startRecoveryRedoPhase(Set<Integer> partitions, ILogReader logReader, long lowWaterMarkLSN, Set<Long> winnerTxnSet) throws IOException, ACIDException { int redoCount = 0; - long jobId; + long txnId = 0; long resourceId; long maxDiskLastLsn; @@ -307,16 +307,16 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent { LOGGER.info(logRecord.getLogRecordForDisplay()); } lsn = logRecord.getLSN(); - jobId = logRecord.getTxnId(); + txnId = logRecord.getTxnId(); foundWinner = false; switch (logRecord.getLogType()) { case LogType.UPDATE: if (partitions.contains(logRecord.getResourcePartition())) { - if (winnerTxnSet.contains(jobId)) { + if (winnerTxnSet.contains(txnId)) { foundWinner = true; - } else if (jobId2WinnerEntitiesMap.containsKey(jobId)) { - jobEntityWinners = jobId2WinnerEntitiesMap.get(jobId); - tempKeyTxnEntityId.setTxnId(jobId, logRecord.getDatasetId(), logRecord.getPKHashValue(), + } else if (jobId2WinnerEntitiesMap.containsKey(txnId)) { + jobEntityWinners = jobId2WinnerEntitiesMap.get(txnId); + tempKeyTxnEntityId.setTxnId(txnId, logRecord.getDatasetId(), logRecord.getPKHashValue(), logRecord.getPKValue(), logRecord.getPKValueSize()); if (jobEntityWinners.containsEntityCommitForTxnId(lsn, tempKeyTxnEntityId)) { foundWinner = true; @@ -396,6 +396,7 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent { } LOGGER.info("Logs REDO phase completed. Redo logs count: " + redoCount); } finally { + txnSubsystem.getTransactionManager().ensureMaxTxnId(txnId); //close all indexes Set<Long> resourceIdList = resourceId2MaxLSNMap.keySet(); for (long r : resourceIdList) { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fae8224e/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/TransactionSubsystem.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/TransactionSubsystem.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/TransactionSubsystem.java index 4752481..f922832 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/TransactionSubsystem.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/TransactionSubsystem.java @@ -80,10 +80,13 @@ public class TransactionSubsystem implements ITransactionSubsystem { } checkpointManager = CheckpointManagerFactory.create(this, checkpointProperties, replicationEnabled); final Checkpoint latestCheckpoint = checkpointManager.getLatest(); - if (latestCheckpoint != null && latestCheckpoint.getStorageVersion() != StorageConstants.VERSION) { - throw new IllegalStateException( - String.format("Storage version mismatch. Current version (%s). On disk version: (%s)", - StorageConstants.VERSION, latestCheckpoint.getStorageVersion())); + if (latestCheckpoint != null) { + if (latestCheckpoint.getStorageVersion() != StorageConstants.VERSION) { + throw new IllegalStateException( + String.format("Storage version mismatch. Current version (%s). On disk version: (%s)", + StorageConstants.VERSION, latestCheckpoint.getStorageVersion())); + } + transactionManager.ensureMaxTxnId(latestCheckpoint.getMaxTxnId()); } if (replicationEnabled) { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fae8224e/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/Checkpoint.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/Checkpoint.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/Checkpoint.java index cb278a7..825c7d1 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/Checkpoint.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/Checkpoint.java @@ -57,7 +57,7 @@ public class Checkpoint implements Comparable<Checkpoint> { return minMCTFirstLsn; } - public long getMaxJobId() { + public long getMaxTxnId() { return maxTxnId; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fae8224e/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionManager.java index 9603ce3..396d3f6 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionManager.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionManager.java @@ -101,4 +101,11 @@ public interface ITransactionManager { */ long getMaxTxnId(); + /** + * Sets the maximum txn id to the bigger value of {@code txnId} and its current value. + * + * @param txnId + */ + void ensureMaxTxnId(long txnId); + } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fae8224e/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java index c03369b..6b414b8 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java @@ -140,7 +140,8 @@ public class TransactionManager implements ITransactionManager, ILifeCycleCompon dumpTxnContext(os); } - private void ensureMaxTxnId(long txnId) { + @Override + public void ensureMaxTxnId(long txnId) { maxTxnId.updateAndGet(current -> Math.max(current, txnId)); }
