This is an automated email from the ASF dual-hosted git repository. mhubail pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit 0561d1072b6a217816b5c9564ccc0ae84a931a5c Author: Murtadha Hubail <[email protected]> AuthorDate: Fri Jun 28 04:39:14 2019 +0300 [NO ISSUE][REPL] Suspend Dataset Checkpointing on Replica Sync - user model changes: no - storage format changes: no - interface changes: yes Details: - Before synchronizing replicas, stop datasets checkpointing to prevent new files from being generated due to async IO operations triggered by checkpointing. - Instead of sync'ing current files to replicas then scheduling a flush and sync'ing any newly generated files, just flush datasets before the initial sync then sync all the files in one go. Change-Id: I058fd48bc0fb89a1e16448ce516c3410bb4d681d Reviewed-on: https://asterix-gerrit.ics.uci.edu/3469 Sonar-Qube: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> Integration-Tests: Jenkins <[email protected]> Reviewed-by: Murtadha Hubail <[email protected]> Reviewed-by: Till Westmann <[email protected]> --- .../common/transactions/ICheckpointManager.java | 10 ++++++++++ .../replication/sync/ReplicaSynchronizer.java | 17 +++++++++++------ .../service/recovery/CheckpointManager.java | 21 ++++++++++++++++----- 3 files changed, 37 insertions(+), 11 deletions(-) diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ICheckpointManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ICheckpointManager.java index 36cea55..954e399 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ICheckpointManager.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ICheckpointManager.java @@ -58,4 +58,14 @@ public interface ICheckpointManager extends ILifeCycleComponent { * @param id */ void completed(TxnId id); + + /** + * Suspends checkpointing datasets + */ + void suspend(); + + /** + * Resumes checkpointing datasets + */ + void resume(); } diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java index 0f0b5bd..123709b 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java @@ -22,6 +22,7 @@ import java.io.IOException; import org.apache.asterix.common.api.INcApplicationContext; import org.apache.asterix.common.replication.IReplicationStrategy; +import org.apache.asterix.common.transactions.ICheckpointManager; import org.apache.asterix.replication.api.PartitionReplica; import org.apache.asterix.replication.messaging.CheckpointPartitionIndexesTask; import org.apache.asterix.replication.messaging.ReplicationProtocol; @@ -45,21 +46,25 @@ public class ReplicaSynchronizer { public void sync() throws IOException { final Object syncLock = appCtx.getReplicaManager().getReplicaSyncLock(); synchronized (syncLock) { - syncFiles(); - checkpointReplicaIndexes(); - appCtx.getReplicationManager().register(replica); + final ICheckpointManager checkpointManager = appCtx.getTransactionSubsystem().getCheckpointManager(); + try { + // suspend checkpointing datasets to prevent async IO operations while sync'ing replicas + checkpointManager.suspend(); + syncFiles(); + checkpointReplicaIndexes(); + appCtx.getReplicationManager().register(replica); + } finally { + checkpointManager.resume(); + } } } private void syncFiles() throws IOException { final ReplicaFilesSynchronizer fileSync = new ReplicaFilesSynchronizer(appCtx, replica); - waitForReplicatedDatasetsIO(); - fileSync.sync(); // flush replicated dataset to generate disk component for any remaining in-memory components final IReplicationStrategy replStrategy = appCtx.getReplicationManager().getReplicationStrategy(); appCtx.getDatasetLifecycleManager().flushDataset(replStrategy); waitForReplicatedDatasetsIO(); - // sync any newly generated files fileSync.sync(); } diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java index ce523db..b85742e 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java @@ -18,6 +18,10 @@ */ package org.apache.asterix.transaction.management.service.recovery; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + import org.apache.asterix.common.api.IDatasetLifecycleManager; import org.apache.asterix.common.transactions.CheckpointProperties; import org.apache.asterix.common.transactions.ICheckpointManager; @@ -27,10 +31,6 @@ import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; - /** * An implementation of {@link ICheckpointManager} that defines the logic * of checkpoints. @@ -40,6 +40,7 @@ public class CheckpointManager extends AbstractCheckpointManager { private static final Logger LOGGER = LogManager.getLogger(); private static final long NO_SECURED_LSN = -1L; private final Map<TxnId, Long> securedLSNs; + private boolean suspended = false; public CheckpointManager(ITransactionSubsystem txnSubsystem, CheckpointProperties checkpointProperties) { super(txnSubsystem, checkpointProperties); @@ -76,7 +77,7 @@ public class CheckpointManager extends AbstractCheckpointManager { } final long minFirstLSN = txnSubsystem.getRecoveryManager().getMinFirstLSN(); boolean checkpointSucceeded = minFirstLSN >= checkpointTargetLSN; - if (!checkpointSucceeded) { + if (!checkpointSucceeded && !suspended) { // Flush datasets with indexes behind target checkpoint LSN IDatasetLifecycleManager datasetLifecycleManager = txnSubsystem.getApplicationContext().getDatasetLifecycleManager(); @@ -100,6 +101,16 @@ public class CheckpointManager extends AbstractCheckpointManager { securedLSNs.remove(id); } + @Override + public synchronized void suspend() { + suspended = true; + } + + @Override + public synchronized void resume() { + suspended = false; + } + private synchronized long getMinSecuredLSN() { return securedLSNs.isEmpty() ? NO_SECURED_LSN : Collections.min(securedLSNs.values()); }
