This is an automated email from the ASF dual-hosted git repository. mhubail pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit 610f6f5fb4d120e45a94b52ccf9b72000c3a72a5 Merge: f89de3d 0561d10 Author: Murtadha Hubail <[email protected]> AuthorDate: Tue Jul 2 21:26:24 2019 +0300 Merge commit '0561d10' from stabilization-f69489 Change-Id: I18115329ce7ab3501e7fcc9c6dc06d0e87c97688 .../asterix/common/transactions/ICheckpointManager.java | 12 +++++++++++- .../asterix/replication/sync/ReplicaSynchronizer.java | 17 +++++++++++------ .../management/service/recovery/CheckpointManager.java | 16 +++++++++++++++- 3 files changed, 37 insertions(+), 8 deletions(-) diff --cc asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ICheckpointManager.java index ac652e3,954e399..6926dac --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ICheckpointManager.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ICheckpointManager.java @@@ -60,9 -60,12 +60,19 @@@ public interface ICheckpointManager ext void completed(TxnId id); /** + * Checkpoints idle datasets by flushing their in-memory component to disk if needed. + * + * @throws HyracksDataException + */ + void checkpointIdleDatasets() throws HyracksDataException; - } ++ ++ /** + * Suspends checkpointing datasets + */ + void suspend(); + + /** + * Resumes checkpointing datasets + */ + void resume(); -} ++} diff --cc asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java index ba945be,b85742e..43758d3 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java @@@ -43,8 -39,8 +43,9 @@@ public class CheckpointManager extends private static final Logger LOGGER = LogManager.getLogger(); private static final long NO_SECURED_LSN = -1L; + private final long datasetCheckpointInterval; private final Map<TxnId, Long> securedLSNs; + private boolean suspended = false; public CheckpointManager(ITransactionSubsystem txnSubsystem, CheckpointProperties checkpointProperties) { super(txnSubsystem, checkpointProperties); @@@ -82,10 -77,11 +83,10 @@@ } final long minFirstLSN = txnSubsystem.getRecoveryManager().getMinFirstLSN(); boolean checkpointSucceeded = minFirstLSN >= checkpointTargetLSN; - if (!checkpointSucceeded) { + if (!checkpointSucceeded && !suspended) { // Flush datasets with indexes behind target checkpoint LSN - IDatasetLifecycleManager datasetLifecycleManager = - txnSubsystem.getApplicationContext().getDatasetLifecycleManager(); - datasetLifecycleManager.scheduleAsyncFlushForLaggingDatasets(checkpointTargetLSN); + final IDatasetLifecycleManager dlcm = txnSubsystem.getApplicationContext().getDatasetLifecycleManager(); + dlcm.asyncFlushMatchingIndexes(newLaggingDatasetPredicate(checkpointTargetLSN)); } capture(minFirstLSN, false); if (checkpointSucceeded) { @@@ -106,11 -102,15 +107,24 @@@ } @Override + public synchronized void checkpointIdleDatasets() throws HyracksDataException { ++ if (suspended) { ++ return; ++ } + final IDatasetLifecycleManager dlcm = txnSubsystem.getApplicationContext().getDatasetLifecycleManager(); + dlcm.asyncFlushMatchingIndexes(newIdleDatasetPredicate()); + } + ++ @Override + public synchronized void suspend() { + suspended = true; + } + + @Override + public synchronized void resume() { + suspended = false; + } + private synchronized long getMinSecuredLSN() { return securedLSNs.isEmpty() ? NO_SECURED_LSN : Collections.min(securedLSNs.values()); }
