[NO ISSUE][REPL] Ensure Valid Component ID is Initialized On Replica Sync - user model changes: no - storage format changes: no - interface changes: yes
Details: - Currently, the first time a replica is synchronized from master, the valid component id on each replicated index's initial checkpoint will be the initial value of a component id (-1). This value is fixed when the the replica receives a flushed component from the index. However, if the master fails before any component is flushed to a replica and that replica is promoted to master, it will start from an invalid component id. This change ensures that the initial checkpoint of replicated indexes is initialized to the maximum component id that appears on master. This will ensure that if the replica is promoted, it will at least start from a component that wasn't previously used on master. - Replace assertion of component ids validation by illegal state. Change-Id: I85395ad823a630725c4cab4bead1c61546dc61ae Reviewed-on: https://asterix-gerrit.ics.uci.edu/2973 Sonar-Qube: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> Reviewed-by: Michael Blow <[email protected]> Integration-Tests: Jenkins <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/bd728afe Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/bd728afe Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/bd728afe Branch: refs/heads/master Commit: bd728afe9be8c245e1de24936f8084ba3f537d05 Parents: f7d634c Author: Murtadha Hubail <[email protected]> Authored: Thu Sep 20 20:56:01 2018 +0300 Committer: Murtadha Hubail <[email protected]> Committed: Thu Sep 20 12:30:12 2018 -0700 ---------------------------------------------------------------------- .../asterix/app/nc/IndexCheckpointManager.java | 5 +++-- .../common/storage/IIndexCheckpointManager.java | 3 ++- .../asterix/common/storage/IndexCheckpoint.java | 5 ++--- .../CheckpointPartitionIndexesTask.java | 10 +++++++--- .../messaging/ReplicateFileTask.java | 4 +++- .../replication/sync/ReplicaSynchronizer.java | 12 +++++++++++- .../PersistentLocalResourceRepository.java | 20 ++++++++++++++++++-- .../am/lsm/common/impls/AbstractLSMIndex.java | 10 +++++----- 8 files changed, 51 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/asterixdb/blob/bd728afe/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java index 7b08bad..420585a 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java @@ -55,7 +55,8 @@ public class IndexCheckpointManager implements IIndexCheckpointManager { } @Override - public synchronized void init(long validComponentSequence, long lsn) throws HyracksDataException { + public synchronized void init(long validComponentSequence, long lsn, long validComponentId) + throws HyracksDataException { List<IndexCheckpoint> checkpoints; try { checkpoints = getCheckpoints(); @@ -66,7 +67,7 @@ public class IndexCheckpointManager implements IIndexCheckpointManager { LOGGER.warn(() -> "Checkpoints found on initializing: " + indexPath); delete(); } - IndexCheckpoint firstCheckpoint = IndexCheckpoint.first(validComponentSequence, lsn); + IndexCheckpoint firstCheckpoint = IndexCheckpoint.first(validComponentSequence, lsn, validComponentId); persist(firstCheckpoint); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/bd728afe/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IIndexCheckpointManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IIndexCheckpointManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IIndexCheckpointManager.java index dd9ede5..2f0eddf 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IIndexCheckpointManager.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IIndexCheckpointManager.java @@ -29,9 +29,10 @@ public interface IIndexCheckpointManager { * * @param validComponentSequence * @param lsn + * @param validComponentId * @throws HyracksDataException */ - void init(long validComponentSequence, long lsn) throws HyracksDataException; + void init(long validComponentSequence, long lsn, long validComponentId) throws HyracksDataException; /** * Called when a new LSM disk component is flushed. When called, the index checkpoint is updated http://git-wip-us.apache.org/repos/asf/asterixdb/blob/bd728afe/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexCheckpoint.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexCheckpoint.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexCheckpoint.java index 9654473..cb34600 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexCheckpoint.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexCheckpoint.java @@ -23,7 +23,6 @@ import java.util.HashMap; import java.util.Map; import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -42,12 +41,12 @@ public class IndexCheckpoint { private long lastComponentId; private Map<Long, Long> masterNodeFlushMap; - public static IndexCheckpoint first(long lastComponentSequence, long lowWatermark) { + public static IndexCheckpoint first(long lastComponentSequence, long lowWatermark, long validComponentId) { IndexCheckpoint firstCheckpoint = new IndexCheckpoint(); firstCheckpoint.id = INITIAL_CHECKPOINT_ID; firstCheckpoint.lowWatermark = lowWatermark; firstCheckpoint.validComponentSequence = lastComponentSequence; - firstCheckpoint.lastComponentId = LSMComponentId.EMPTY_INDEX_LAST_COMPONENT_ID.getMaxId(); + firstCheckpoint.lastComponentId = validComponentId; firstCheckpoint.masterNodeFlushMap = new HashMap<>(); return firstCheckpoint; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/bd728afe/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java index 448613b..e778cce 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java @@ -45,9 +45,11 @@ import org.apache.hyracks.storage.common.LocalResource; public class CheckpointPartitionIndexesTask implements IReplicaTask { private final int partition; + private final long maxComponentId; - public CheckpointPartitionIndexesTask(int partition) { + public CheckpointPartitionIndexesTask(int partition, long maxComponentId) { this.partition = partition; + this.maxComponentId = maxComponentId; } @Override @@ -75,7 +77,7 @@ public class CheckpointPartitionIndexesTask implements IReplicaTask { maxComponentSequence = Math.max(maxComponentSequence, IndexComponentFileReference.of(file).getSequenceEnd()); } - indexCheckpointManager.init(maxComponentSequence, currentLSN); + indexCheckpointManager.init(maxComponentSequence, currentLSN, maxComponentId); } ReplicationProtocol.sendAck(worker.getChannel(), worker.getReusableBuffer()); } @@ -90,6 +92,7 @@ public class CheckpointPartitionIndexesTask implements IReplicaTask { try { DataOutputStream dos = new DataOutputStream(out); dos.writeInt(partition); + dos.writeLong(maxComponentId); } catch (IOException e) { throw HyracksDataException.create(e); } @@ -98,7 +101,8 @@ public class CheckpointPartitionIndexesTask implements IReplicaTask { public static CheckpointPartitionIndexesTask create(DataInput input) throws HyracksDataException { try { int partition = input.readInt(); - return new CheckpointPartitionIndexesTask(partition); + long maxComponentId = input.readLong(); + return new CheckpointPartitionIndexesTask(partition, maxComponentId); } catch (IOException e) { throw HyracksDataException.create(e); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/bd728afe/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateFileTask.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateFileTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateFileTask.java index f53d448..ae36c13 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateFileTask.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateFileTask.java @@ -40,6 +40,7 @@ import org.apache.asterix.replication.management.NetworkingUtil; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.io.FileReference; import org.apache.hyracks.api.io.IIOManager; +import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -98,7 +99,8 @@ public class ReplicateFileTask implements IReplicaTask { final IIndexCheckpointManager indexCheckpointManager = checkpointManagerProvider.get(indexRef); final long currentLSN = appCtx.getTransactionSubsystem().getLogManager().getAppendLSN(); indexCheckpointManager.delete(); - indexCheckpointManager.init(Long.MIN_VALUE, currentLSN); + indexCheckpointManager.init(Long.MIN_VALUE, currentLSN, + LSMComponentId.EMPTY_INDEX_LAST_COMPONENT_ID.getMaxId()); LOGGER.info(() -> "Checkpoint index: " + indexRef); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/bd728afe/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java index ef85977..09f1205 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java @@ -25,6 +25,8 @@ import org.apache.asterix.common.replication.IReplicationStrategy; import org.apache.asterix.replication.api.PartitionReplica; import org.apache.asterix.replication.messaging.CheckpointPartitionIndexesTask; import org.apache.asterix.replication.messaging.ReplicationProtocol; +import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository; +import org.apache.hyracks.api.exceptions.HyracksDataException; /** * Performs the steps required to ensure any newly added replica @@ -60,9 +62,17 @@ public class ReplicaSynchronizer { } private void checkpointReplicaIndexes() throws IOException { + final int partition = replica.getIdentifier().getPartition(); CheckpointPartitionIndexesTask task = - new CheckpointPartitionIndexesTask(replica.getIdentifier().getPartition()); + new CheckpointPartitionIndexesTask(partition, getPartitionMaxComponentId(partition)); ReplicationProtocol.sendTo(replica, task); ReplicationProtocol.waitForAck(replica); } + + private long getPartitionMaxComponentId(int partition) throws HyracksDataException { + final IReplicationStrategy replStrategy = appCtx.getReplicationManager().getReplicationStrategy(); + final PersistentLocalResourceRepository localResourceRepository = + (PersistentLocalResourceRepository) appCtx.getLocalResourceRepository(); + return localResourceRepository.getReplicatedIndexesMaxComponentId(partition, replStrategy); + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/bd728afe/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java index c0da095..8f870c0 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java @@ -66,8 +66,8 @@ import org.apache.hyracks.api.replication.IReplicationJob.ReplicationJobType; import org.apache.hyracks.api.replication.IReplicationJob.ReplicationOperation; import org.apache.hyracks.api.util.IoUtil; import org.apache.hyracks.storage.am.common.api.ITreeIndexFrame; -import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager; import org.apache.hyracks.storage.am.lsm.common.impls.IndexComponentFileReference; +import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId; import org.apache.hyracks.storage.common.ILocalResourceRepository; import org.apache.hyracks.storage.common.LocalResource; import org.apache.hyracks.util.ExitUtil; @@ -196,7 +196,8 @@ public class PersistentLocalResourceRepository implements ILocalResourceReposito byte[] bytes = OBJECT_MAPPER.writeValueAsBytes(resource.toJson(persistedResourceRegistry)); final Path path = Paths.get(resourceFile.getAbsolutePath()); Files.write(path, bytes); - indexCheckpointManagerProvider.get(DatasetResourceReference.of(resource)).init(Long.MIN_VALUE, 0); + indexCheckpointManagerProvider.get(DatasetResourceReference.of(resource)).init(Long.MIN_VALUE, 0, + LSMComponentId.EMPTY_INDEX_LAST_COMPONENT_ID.getMaxId()); deleteResourceFileMask(resourceFile); } catch (Exception e) { cleanup(resourceFile); @@ -393,6 +394,21 @@ public class PersistentLocalResourceRepository implements ILocalResourceReposito return partitionReplicatedFiles; } + public long getReplicatedIndexesMaxComponentId(int partition, IReplicationStrategy strategy) + throws HyracksDataException { + long maxComponentId = LSMComponentId.MIN_VALID_COMPONENT_ID; + final Map<Long, LocalResource> partitionResources = getPartitionResources(partition); + for (LocalResource lr : partitionResources.values()) { + DatasetLocalResource datasetLocalResource = (DatasetLocalResource) lr.getResource(); + if (strategy.isMatch(datasetLocalResource.getDatasetId())) { + final IIndexCheckpointManager indexCheckpointManager = + indexCheckpointManagerProvider.get(DatasetResourceReference.of(lr)); + maxComponentId = Math.max(maxComponentId, indexCheckpointManager.getLatest().getLastComponentId()); + } + } + return maxComponentId; + } + private List<String> getIndexFiles(File indexDir) { final List<String> indexFiles = new ArrayList<>(); if (indexDir.isDirectory()) { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/bd728afe/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java index 3d928a4..9199fbb 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java @@ -577,7 +577,7 @@ public abstract class AbstractLSMIndex implements ILSMIndex { if (c != EmptyComponent.INSTANCE) { diskComponents.add(0, c); } - assert checkComponentIds(); + validateComponentIds(); } @Override @@ -588,7 +588,7 @@ public abstract class AbstractLSMIndex implements ILSMIndex { if (newComponent != EmptyComponent.INSTANCE) { diskComponents.add(swapIndex, newComponent); } - assert checkComponentIds(); + validateComponentIds(); } /** @@ -597,16 +597,16 @@ public abstract class AbstractLSMIndex implements ILSMIndex { * * @throws HyracksDataException */ - private boolean checkComponentIds() throws HyracksDataException { + private void validateComponentIds() throws HyracksDataException { for (int i = 0; i < diskComponents.size() - 1; i++) { ILSMComponentId id1 = diskComponents.get(i).getId(); ILSMComponentId id2 = diskComponents.get(i + 1).getId(); IdCompareResult cmp = id1.compareTo(id2); if (cmp != IdCompareResult.UNKNOWN && cmp != IdCompareResult.GREATER_THAN) { - return false; + throw new IllegalStateException( + "found non-decreasing component ids (" + id1 + " -> " + id2 + ") on index " + this); } } - return true; } @Override
