This is an automated email from the ASF dual-hosted git repository. mblow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit cc7d89ccbde59c209fdbe8a819c95fa567ea9224 Author: Murtadha Hubail <[email protected]> AuthorDate: Sun Jul 9 08:28:45 2023 -0700 [ASTERIXDB-3220][REPL] Allow waiting for IO on specific dataset partition - user model changes: no - storage format changes: no - interface changes: yes Details: - Add API to wait for IO on a specific dataset partition. - When waiting for a partition replica IO ops to finish, only wait for the replica partition rather than all partitions. Change-Id: I90f311f602b3c8526556f64d7b25672981fac320 Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17633 Integration-Tests: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> Reviewed-by: Ali Alsuliman <[email protected]> --- .../common/api/IDatasetLifecycleManager.java | 6 +++-- .../common/context/BaseOperationTracker.java | 8 +++--- .../apache/asterix/common/context/DatasetInfo.java | 31 ++++++++++++++++++++-- .../common/context/DatasetLifecycleManager.java | 4 +-- .../context/PrimaryIndexOperationTracker.java | 4 +-- .../ioopcallbacks/LSMIOOperationCallback.java | 6 +++-- .../ioopcallbacks/LSMIOOperationCallbackTest.java | 7 +++++ .../replication/sync/ReplicaSynchronizer.java | 2 +- .../SecondaryIndexOperationTrackerFactory.java | 4 ++- 9 files changed, 56 insertions(+), 16 deletions(-) diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java index 1c2a047d86..c7eee2163e 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java @@ -161,12 +161,14 @@ public interface IDatasetLifecycleManager extends IResourceLifecycleManager<IInd void flushDataset(IReplicationStrategy replicationStrategy, IntPredicate partitions) throws HyracksDataException; /** - * Waits for all ongoing IO operations on all open datasets that are matching {@code replicationStrategy}. + * Waits for all ongoing IO operations on all open datasets that are matching {@code replicationStrategy} and + * {@code partition}. * * @param replicationStrategy + * @param partition * @throws HyracksDataException */ - void waitForIO(IReplicationStrategy replicationStrategy) throws HyracksDataException; + void waitForIO(IReplicationStrategy replicationStrategy, int partition) throws HyracksDataException; /** * @return the current datasets io stats diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/BaseOperationTracker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/BaseOperationTracker.java index 5964bb473c..7d3dba4ead 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/BaseOperationTracker.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/BaseOperationTracker.java @@ -33,17 +33,19 @@ public class BaseOperationTracker implements ITransactionOperationTracker { private static final Logger LOGGER = LogManager.getLogger(); protected final int datasetID; protected final DatasetInfo dsInfo; + protected final int partition; - public BaseOperationTracker(int datasetID, DatasetInfo dsInfo) { + public BaseOperationTracker(int datasetID, DatasetInfo dsInfo, int partition) { this.datasetID = datasetID; this.dsInfo = dsInfo; + this.partition = partition; } @Override public void beforeOperation(ILSMIndex index, LSMOperationType opType, ISearchOperationCallback searchCallback, IModificationOperationCallback modificationCallback) throws HyracksDataException { if (opType == LSMOperationType.REPLICATE) { - dsInfo.declareActiveIOOperation(REPLICATE); + dsInfo.declareActiveIOOperation(REPLICATE, partition); } } @@ -59,7 +61,7 @@ public class BaseOperationTracker implements ITransactionOperationTracker { public void completeOperation(ILSMIndex index, LSMOperationType opType, ISearchOperationCallback searchCallback, IModificationOperationCallback modificationCallback) throws HyracksDataException { if (opType == LSMOperationType.REPLICATE) { - dsInfo.undeclareActiveIOOperation(REPLICATE); + dsInfo.undeclareActiveIOOperation(REPLICATE, partition); } } diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java index d15d9be927..87a3c2f02f 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java @@ -33,12 +33,16 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import it.unimi.dsi.fastutil.ints.Int2IntMap; +import it.unimi.dsi.fastutil.ints.Int2IntOpenHashMap; + public class DatasetInfo extends Info implements Comparable<DatasetInfo> { private static final Logger LOGGER = LogManager.getLogger(); // partition -> index private final Map<Integer, Set<IndexInfo>> partitionIndexes; // resourceID -> index private final Map<Long, IndexInfo> indexes; + private final Int2IntMap partitionPendingIO; private final int datasetID; private final ILogManager logManager; private final LogRecord waitLog = new LogRecord(); @@ -54,6 +58,7 @@ public class DatasetInfo extends Info implements Comparable<DatasetInfo> { public DatasetInfo(int datasetID, ILogManager logManager) { this.partitionIndexes = new HashMap<>(); this.indexes = new HashMap<>(); + this.partitionPendingIO = new Int2IntOpenHashMap(); this.setLastAccess(-1); this.datasetID = datasetID; this.setRegistered(false); @@ -74,7 +79,8 @@ public class DatasetInfo extends Info implements Comparable<DatasetInfo> { setLastAccess(System.currentTimeMillis()); } - public synchronized void declareActiveIOOperation(ILSMIOOperation.LSMIOOperationType opType) { + public synchronized void declareActiveIOOperation(ILSMIOOperation.LSMIOOperationType opType, int partition) { + partitionPendingIO.put(partition, partitionPendingIO.getOrDefault(partition, 0) + 1); numActiveIOOps++; switch (opType) { case FLUSH: @@ -91,7 +97,8 @@ public class DatasetInfo extends Info implements Comparable<DatasetInfo> { } } - public synchronized void undeclareActiveIOOperation(ILSMIOOperation.LSMIOOperationType opType) { + public synchronized void undeclareActiveIOOperation(ILSMIOOperation.LSMIOOperationType opType, int partition) { + partitionPendingIO.put(partition, partitionPendingIO.getOrDefault(partition, 0) - 1); numActiveIOOps--; switch (opType) { case FLUSH: @@ -253,6 +260,26 @@ public class DatasetInfo extends Info implements Comparable<DatasetInfo> { } } + public void waitForIO(int partition) throws HyracksDataException { + logManager.log(waitLog); + synchronized (this) { + while (partitionPendingIO.getOrDefault(partition, 0) > 0) { + try { + wait(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw HyracksDataException.create(e); + } + } + if (partitionPendingIO.getOrDefault(partition, 0) < 0) { + LOGGER.error("number of IO operations cannot be negative for dataset {}, partition {}", this, + partition); + throw new IllegalStateException( + "Number of IO operations cannot be negative: " + this + ", partition " + partition); + } + } + } + public synchronized int getPendingFlushes() { return pendingFlushes; } diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java index 117b4fc4d0..4fc9dd6728 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java @@ -552,10 +552,10 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC } @Override - public void waitForIO(IReplicationStrategy replicationStrategy) throws HyracksDataException { + public void waitForIO(IReplicationStrategy replicationStrategy, int partition) throws HyracksDataException { for (DatasetResource dsr : datasets.values()) { if (dsr.isOpen() && replicationStrategy.isMatch(dsr.getDatasetID())) { - dsr.getDatasetInfo().waitForIO(); + dsr.getDatasetInfo().waitForIO(partition); } } } diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java index b0d8e02af7..a4ad7cf232 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java @@ -59,7 +59,6 @@ import org.apache.logging.log4j.Logger; @NotThreadSafe public class PrimaryIndexOperationTracker extends BaseOperationTracker implements IoOperationCompleteListener { private static final Logger LOGGER = LogManager.getLogger(); - private final int partition; // Number of active operations on an ILSMIndex instance. private final AtomicInteger numActiveOperations; private final ILogManager logManager; @@ -71,8 +70,7 @@ public class PrimaryIndexOperationTracker extends BaseOperationTracker implement public PrimaryIndexOperationTracker(int datasetID, int partition, ILogManager logManager, DatasetInfo dsInfo, ILSMComponentIdGenerator idGenerator) { - super(datasetID, dsInfo); - this.partition = partition; + super(datasetID, dsInfo, partition); this.logManager = logManager; this.numActiveOperations = new AtomicInteger(); this.idGenerator = idGenerator; diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java index 1189b51960..f56e5c0a16 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java @@ -74,6 +74,7 @@ public class LSMIOOperationCallback implements ILSMIOOperationCallback { private final IIndexCheckpointManagerProvider indexCheckpointManagerProvider; protected final DatasetInfo dsInfo; protected final ILSMIndex lsmIndex; + private final int partition; private long firstLsnForCurrentMemoryComponent = 0L; private long persistenceLsn = 0L; private int pendingFlushes = 0; @@ -84,6 +85,7 @@ public class LSMIOOperationCallback implements ILSMIOOperationCallback { this.dsInfo = dsInfo; this.lsmIndex = lsmIndex; this.indexCheckpointManagerProvider = indexCheckpointManagerProvider; + this.partition = ResourceReference.ofIndex(lsmIndex.getIndexIdentifier()).getPartitionNum(); componentIds.add(componentId); } @@ -259,7 +261,7 @@ public class LSMIOOperationCallback implements ILSMIOOperationCallback { @Override public synchronized void scheduled(ILSMIOOperation operation) throws HyracksDataException { - dsInfo.declareActiveIOOperation(operation.getIOOpertionType()); + dsInfo.declareActiveIOOperation(operation.getIOOpertionType(), partition); if (operation.getIOOpertionType() == LSMIOOperationType.FLUSH) { pendingFlushes++; FlushOperation flush = (FlushOperation) operation; @@ -282,7 +284,7 @@ public class LSMIOOperationCallback implements ILSMIOOperationCallback { pendingFlushes == 0 ? firstLsnForCurrentMemoryComponent : (Long) map.get(KEY_FLUSH_LOG_LSN); } } - dsInfo.undeclareActiveIOOperation(operation.getIOOpertionType()); + dsInfo.undeclareActiveIOOperation(operation.getIOOpertionType(), partition); } public synchronized boolean hasPendingFlush() { diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMIOOperationCallbackTest.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMIOOperationCallbackTest.java index db0911bc17..33d513f7e4 100644 --- a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMIOOperationCallbackTest.java +++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMIOOperationCallbackTest.java @@ -77,6 +77,7 @@ public class LSMIOOperationCallbackTest extends TestCase { String indexId = "mockIndexId"; Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(numMemoryComponents); Mockito.when(mockIndex.getCurrentMemoryComponent()).thenReturn(Mockito.mock(AbstractLSMMemoryComponent.class)); + Mockito.when(mockIndex.getIndexIdentifier()).thenReturn(getIndexPath()); DatasetInfo dsInfo = new DatasetInfo(101, null); LSMComponentIdGenerator idGenerator = new LSMComponentIdGenerator(numMemoryComponents, MIN_VALID_COMPONENT_ID); LSMIOOperationCallback callback = new LSMIOOperationCallback(dsInfo, mockIndex, idGenerator.getId(), @@ -140,6 +141,7 @@ public class LSMIOOperationCallbackTest extends TestCase { ILSMComponentIdGenerator idGenerator = new LSMComponentIdGenerator(numMemoryComponents, MIN_VALID_COMPONENT_ID); ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class); Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(numMemoryComponents); + Mockito.when(mockIndex.getIndexIdentifier()).thenReturn(getIndexPath()); ILSMMemoryComponent mockComponent = Mockito.mock(AbstractLSMMemoryComponent.class); Mockito.when(mockIndex.getCurrentMemoryComponent()).thenReturn(mockComponent); LSMIOOperationCallback callback = new LSMIOOperationCallback(dsInfo, mockIndex, idGenerator.getId(), @@ -161,6 +163,7 @@ public class LSMIOOperationCallbackTest extends TestCase { ILSMComponentIdGenerator idGenerator = new LSMComponentIdGenerator(numMemoryComponents, MIN_VALID_COMPONENT_ID); ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class); Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(numMemoryComponents); + Mockito.when(mockIndex.getIndexIdentifier()).thenReturn(getIndexPath()); ILSMMemoryComponent mockComponent = Mockito.mock(AbstractLSMMemoryComponent.class); Mockito.when(mockIndex.getCurrentMemoryComponent()).thenReturn(mockComponent); LSMIOOperationCallback callback = new LSMIOOperationCallback(dsInfo, mockIndex, idGenerator.getId(), @@ -221,4 +224,8 @@ public class LSMIOOperationCallbackTest extends TestCase { Mockito.doReturn(indexCheckpointManager).when(indexCheckpointManagerProvider).get(Mockito.any()); return indexCheckpointManagerProvider; } + + private static String getIndexPath() { + return "storage/partition_0/dataverse/dataset/0/index"; + } } diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java index 459ff01245..68ccd54e50 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java @@ -103,6 +103,6 @@ public class ReplicaSynchronizer { private void waitForReplicatedDatasetsIO() throws HyracksDataException { // wait for IO operations to ensure replicated datasets files won't change during replica sync final IReplicationStrategy replStrategy = appCtx.getReplicationManager().getReplicationStrategy(); - appCtx.getDatasetLifecycleManager().waitForIO(replStrategy); + appCtx.getDatasetLifecycleManager().waitForIO(replStrategy, replica.getIdentifier().getPartition()); } } diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexOperationTrackerFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexOperationTrackerFactory.java index a104ae39fc..827b7131fc 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexOperationTrackerFactory.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexOperationTrackerFactory.java @@ -21,6 +21,7 @@ package org.apache.asterix.transaction.management.opcallbacks; import org.apache.asterix.common.api.IDatasetLifecycleManager; import org.apache.asterix.common.api.INcApplicationContext; import org.apache.asterix.common.context.BaseOperationTracker; +import org.apache.asterix.common.utils.StoragePathUtil; import org.apache.hyracks.api.application.INCServiceContext; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.io.IJsonSerializable; @@ -46,7 +47,8 @@ public class SecondaryIndexOperationTrackerFactory implements ILSMOperationTrack public ILSMOperationTracker getOperationTracker(INCServiceContext ctx, IResource resource) { IDatasetLifecycleManager dslcManager = ((INcApplicationContext) ctx.getApplicationContext()).getDatasetLifecycleManager(); - return new BaseOperationTracker(datasetId, dslcManager.getDatasetInfo(datasetId)); + int partition = StoragePathUtil.getPartitionNumFromRelativePath(resource.getPath()); + return new BaseOperationTracker(datasetId, dslcManager.getDatasetInfo(datasetId), partition); } @Override
