This is an automated email from the ASF dual-hosted git repository. mblow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit 5f076a8347df9a45634914c76b4117b38b5731a6 Author: Murtadha Hubail <[email protected]> AuthorDate: Mon Jul 10 16:38:47 2023 -0700 [ASTERIXDB-3221][REPL] Use IO scheduler for replication ops - user model changes: no - storage format changes: no - interface changes: no Details: - To allow concurrent replication operations, use the IO scheduler to schedule the operations using the exact same scheduling logic as the flush operations. - Maximum concurrent replication operations = maximum concurrent flush operations. - Allow connections to replicas to be recycled and reused by the replication operations and close them when no more pending operations. - Do not halt on replicate operations failures since failures are expected and replicas can be re-synced. Change-Id: I82aeb60381f90a254ca99274f7e9a38f64bc7a46 Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17635 Reviewed-by: Ali Alsuliman <[email protected]> Integration-Tests: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> --- .../org/apache/asterix/app/nc/HaltCallback.java | 4 +- .../replication/api/ReplicationDestination.java | 32 +++++++ .../management/IndexReplicationManager.java | 99 +++++++++---------- .../management/ReplicationOperation.java | 106 +++++++++++++++++++++ .../impls/AbstractAsynchronousScheduler.java | 52 +++++++++- 5 files changed, 241 insertions(+), 52 deletions(-) diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/HaltCallback.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/HaltCallback.java index 98020015dc..bb689ac5d0 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/HaltCallback.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/HaltCallback.java @@ -41,6 +41,8 @@ public class HaltCallback implements IIoOperationFailedCallback { @Override public void operationFailed(ILSMIOOperation operation, Throwable t) { LOGGER.error("Operation {} has failed", operation, t); - ExitUtil.halt(ExitUtil.EC_IO_OPERATION_FAILED); + if (operation.getIOOpertionType() != ILSMIOOperation.LSMIOOperationType.REPLICATE) { + ExitUtil.halt(ExitUtil.EC_IO_OPERATION_FAILED); + } } } diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/ReplicationDestination.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/ReplicationDestination.java index d8037569e8..f7a739bb38 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/ReplicationDestination.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/ReplicationDestination.java @@ -20,7 +20,10 @@ package org.apache.asterix.replication.api; import java.io.IOException; import java.net.InetSocketAddress; +import java.util.ArrayDeque; +import java.util.HashMap; import java.util.HashSet; +import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.Set; @@ -29,6 +32,7 @@ import org.apache.asterix.common.api.INcApplicationContext; import org.apache.asterix.common.exceptions.ReplicationException; import org.apache.asterix.common.replication.IPartitionReplica; import org.apache.asterix.common.replication.IReplicationDestination; +import org.apache.asterix.common.storage.ReplicaIdentifier; import org.apache.asterix.replication.management.NetworkingUtil; import org.apache.asterix.replication.messaging.ReplicationProtocol; import org.apache.hyracks.api.network.ISocketChannel; @@ -41,6 +45,7 @@ public class ReplicationDestination implements IReplicationDestination { private static final Logger LOGGER = LogManager.getLogger(); private final Set<IPartitionReplica> replicas = new HashSet<>(); private final InetSocketAddress inputLocation; + private final Map<ReplicaIdentifier, ArrayDeque<PartitionReplica>> replicasConnPool = new HashMap<>(); private InetSocketAddress resolvedLocation; private ISocketChannel logRepChannel; @@ -64,6 +69,11 @@ public class ReplicationDestination implements IReplicationDestination { @Override public synchronized void remove(IPartitionReplica replica) { replicas.remove(replica); + ArrayDeque<PartitionReplica> partitionConnections = replicasConnPool.remove(replica.getIdentifier()); + if (partitionConnections != null) { + partitionConnections.forEach(PartitionReplica::close); + partitionConnections.clear(); + } } @Override @@ -138,4 +148,26 @@ public class ReplicationDestination implements IReplicationDestination { public int hashCode() { return Objects.hash(inputLocation); } + + public synchronized PartitionReplica getPartitionReplicaConnection(ReplicaIdentifier identifier, + INcApplicationContext appCtx) { + ArrayDeque<PartitionReplica> partitionReplicas = + replicasConnPool.computeIfAbsent(identifier, k -> new ArrayDeque<>()); + if (!partitionReplicas.isEmpty()) { + return partitionReplicas.remove(); + } + return new PartitionReplica(identifier, appCtx); + } + + public synchronized void recycleConnection(PartitionReplica partitionReplica) { + ArrayDeque<PartitionReplica> partitionReplicas = replicasConnPool.get(partitionReplica.getIdentifier()); + if (partitionReplicas != null) { + partitionReplicas.add(partitionReplica); + } + } + + public synchronized void closeConnections() { + replicasConnPool + .forEach(((identifier, partitionReplicas) -> partitionReplicas.forEach(PartitionReplica::close))); + } } diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/IndexReplicationManager.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/IndexReplicationManager.java index 063709aa10..8c514bd694 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/IndexReplicationManager.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/IndexReplicationManager.java @@ -21,22 +21,21 @@ package org.apache.asterix.replication.management; import java.util.HashSet; import java.util.Optional; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.asterix.common.api.INcApplicationContext; import org.apache.asterix.common.exceptions.ReplicationException; -import org.apache.asterix.common.replication.IPartitionReplica; import org.apache.asterix.common.replication.IReplicationDestination; import org.apache.asterix.common.replication.IReplicationManager; import org.apache.asterix.common.replication.IReplicationStrategy; import org.apache.asterix.common.storage.DatasetResourceReference; -import org.apache.asterix.common.storage.ResourceReference; -import org.apache.asterix.replication.api.PartitionReplica; import org.apache.asterix.replication.api.ReplicationDestination; -import org.apache.asterix.replication.sync.IndexSynchronizer; import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.replication.IReplicationJob; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexReplicationJob; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -45,13 +44,15 @@ public class IndexReplicationManager { private static final Logger LOGGER = LogManager.getLogger(); private final IReplicationManager replicationManager; - private final Set<ReplicationDestination> destinations = new HashSet<>(); + private final Set<ReplicationDestination> destinations = ConcurrentHashMap.newKeySet(); private final LinkedBlockingQueue<IReplicationJob> replicationJobsQ = new LinkedBlockingQueue<>(); private final IReplicationStrategy replicationStrategy; private final PersistentLocalResourceRepository resourceRepository; private final INcApplicationContext appCtx; + private final ILSMIOOperationScheduler ioScheduler; private final Object transferLock = new Object(); private final Set<ReplicationDestination> failedDest = new HashSet<>(); + private final AtomicInteger pendingRepOpsCount = new AtomicInteger(); public IndexReplicationManager(INcApplicationContext appCtx, IReplicationManager replicationManager) { this.appCtx = appCtx; @@ -59,6 +60,8 @@ public class IndexReplicationManager { this.resourceRepository = (PersistentLocalResourceRepository) appCtx.getLocalResourceRepository(); replicationStrategy = replicationManager.getReplicationStrategy(); appCtx.getThreadExecutor().execute(new ReplicationJobsProcessor()); + ioScheduler = appCtx.getStorageComponentProvider().getIoOperationSchedulerProvider() + .getIoScheduler(appCtx.getServiceContext()); } public void register(ReplicationDestination dest) { @@ -72,12 +75,18 @@ public class IndexReplicationManager { public void unregister(IReplicationDestination dest) { synchronized (transferLock) { LOGGER.info(() -> "unregister " + dest); + for (ReplicationDestination existingDest : destinations) { + if (existingDest.equals(dest)) { + existingDest.closeConnections(); + break; + } + } destinations.remove(dest); failedDest.remove(dest); } } - private void handleFailure(ReplicationDestination dest, Exception e) { + public void handleFailure(ReplicationDestination dest, Exception e) { synchronized (transferLock) { if (failedDest.contains(dest)) { return; @@ -87,6 +96,7 @@ public class IndexReplicationManager { LOGGER.error("replica at {} failed", dest); failedDest.add(dest); } + dest.closeConnections(); replicationManager.notifyFailure(dest, e); } } @@ -99,71 +109,62 @@ public class IndexReplicationManager { process(job); } + public Set<ReplicationDestination> getDestinations() { + synchronized (transferLock) { + return destinations; + } + } + private void process(IReplicationJob job) { - try { - if (skip(job)) { - return; - } - synchronized (transferLock) { - if (destinations.isEmpty()) { - return; - } - final IndexSynchronizer synchronizer = new IndexSynchronizer(job, appCtx); - final int indexPartition = getJobPartition(job); - for (ReplicationDestination dest : destinations) { - try { - Optional<IPartitionReplica> partitionReplica = dest.getPartitionReplica(indexPartition); - if (!partitionReplica.isPresent()) { - continue; - } - PartitionReplica replica = (PartitionReplica) partitionReplica.get(); - synchronizer.sync(replica); - } catch (Exception e) { - handleFailure(dest, e); - } - } - closeChannels(); - } - } finally { + pendingRepOpsCount.incrementAndGet(); + Optional<DatasetResourceReference> jobIndexRefOpt = getJobIndexRef(job); + if (jobIndexRefOpt.isEmpty()) { + LOGGER.warn("skipping replication of {} due to missing dataset resource reference", job.getAnyFile()); afterReplication(job); + return; + } + ReplicationOperation rp = new ReplicationOperation(appCtx, jobIndexRefOpt.get(), job, this); + if (job.getExecutionType() == IReplicationJob.ReplicationExecutionType.SYNC) { + rp.call(); + } else { + try { + ioScheduler.scheduleOperation(rp); + } catch (HyracksDataException e) { + throw new ReplicationException(e); + } } } - private boolean skip(IReplicationJob job) { + public boolean skip(DatasetResourceReference indexRef) { + return !replicationStrategy.isMatch(indexRef.getDatasetId()); + } + + public Optional<DatasetResourceReference> getJobIndexRef(IReplicationJob job) { + final String fileToReplicate = job.getAnyFile(); try { - final String fileToReplicate = job.getAnyFile(); - final Optional<DatasetResourceReference> indexFileRefOpt = - resourceRepository.getLocalResourceReference(fileToReplicate); - if (!indexFileRefOpt.isPresent()) { - LOGGER.warn("skipping replication of {} due to missing dataset resource reference", fileToReplicate); - return true; - } - return !replicationStrategy.isMatch(indexFileRefOpt.get().getDatasetId()); + return resourceRepository.getLocalResourceReference(fileToReplicate); } catch (HyracksDataException e) { throw new IllegalStateException("Couldn't find resource for " + job.getAnyFile(), e); } } - private int getJobPartition(IReplicationJob job) { - return ResourceReference.of(job.getAnyFile()).getPartitionNum(); - } - private void closeChannels() { - if (!replicationJobsQ.isEmpty()) { - return; - } LOGGER.trace("no pending replication jobs; closing connections to replicas"); for (ReplicationDestination dest : destinations) { - dest.getReplicas().stream().map(PartitionReplica.class::cast).forEach(PartitionReplica::close); + dest.closeConnections(); } } - private static void afterReplication(IReplicationJob job) { + public void afterReplication(IReplicationJob job) { try { + int pendingOps = pendingRepOpsCount.decrementAndGet(); if (job.getOperation() == IReplicationJob.ReplicationOperation.REPLICATE && job instanceof ILSMIndexReplicationJob) { ((ILSMIndexReplicationJob) job).endReplication(); } + if (pendingOps == 0 && replicationJobsQ.isEmpty()) { + closeChannels(); + } } catch (HyracksDataException e) { throw new ReplicationException(e); } diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationOperation.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationOperation.java new file mode 100644 index 0000000000..258f24a2de --- /dev/null +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationOperation.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.replication.management; + +import java.util.Optional; +import java.util.Set; + +import org.apache.asterix.common.api.INcApplicationContext; +import org.apache.asterix.common.replication.IPartitionReplica; +import org.apache.asterix.common.storage.DatasetResourceReference; +import org.apache.asterix.replication.api.PartitionReplica; +import org.apache.asterix.replication.api.ReplicationDestination; +import org.apache.asterix.replication.sync.IndexSynchronizer; +import org.apache.hyracks.api.replication.IReplicationJob; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback; +import org.apache.hyracks.storage.am.lsm.common.impls.AbstractIoOperation; +import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentFileReferences; +import org.apache.hyracks.storage.am.lsm.common.impls.NoOpIOOperationCallbackFactory; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public class ReplicationOperation extends AbstractIoOperation { + + private static final Logger LOGGER = LogManager.getLogger(); + + private static final ILSMIOOperationCallback INSTANCE = + NoOpIOOperationCallbackFactory.INSTANCE.createIoOpCallback(null); + private final INcApplicationContext appCtx; + private final DatasetResourceReference indexRef; + private final IReplicationJob job; + private final IndexReplicationManager indexReplicationManager; + + public ReplicationOperation(INcApplicationContext appCtx, DatasetResourceReference indexRef, IReplicationJob job, + IndexReplicationManager indexReplicationManager) { + super(null, null, INSTANCE, indexRef.getRelativePath().toString()); + this.appCtx = appCtx; + this.indexRef = indexRef; + this.job = job; + this.indexReplicationManager = indexReplicationManager; + } + + @Override + public LSMIOOperationType getIOOpertionType() { + return LSMIOOperationType.REPLICATE; + } + + @Override + public LSMIOOperationStatus call() { + try { + Set<ReplicationDestination> destinations = indexReplicationManager.getDestinations(); + if (destinations.isEmpty() || indexReplicationManager.skip(indexRef)) { + return LSMIOOperationStatus.SUCCESS; + } + LOGGER.debug("started replicate operation on index {}", indexRef); + final IndexSynchronizer synchronizer = new IndexSynchronizer(job, appCtx); + final int indexPartition = indexRef.getPartitionId(); + for (ReplicationDestination dest : destinations) { + Optional<IPartitionReplica> partitionReplica = dest.getPartitionReplica(indexPartition); + if (partitionReplica.isEmpty()) { + continue; + } + PartitionReplica destReplica = null; + try { + destReplica = dest.getPartitionReplicaConnection(partitionReplica.get().getIdentifier(), appCtx); + synchronizer.sync(destReplica); + dest.recycleConnection(destReplica); + } catch (Exception e) { + if (destReplica != null) { + destReplica.close(); + } + indexReplicationManager.handleFailure(dest, e); + } + } + LOGGER.debug("completed replicate operation on index {}", indexRef); + return LSMIOOperationStatus.SUCCESS; + } finally { + indexReplicationManager.afterReplication(job); + } + } + + @Override + protected LSMComponentFileReferences getComponentFiles() { + return null; + } + + @Override + public long getRemainingPages() { + return 0; + } +} diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractAsynchronousScheduler.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractAsynchronousScheduler.java index e266a6fc09..049da38fa1 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractAsynchronousScheduler.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractAsynchronousScheduler.java @@ -38,9 +38,10 @@ public abstract class AbstractAsynchronousScheduler implements ILSMIOOperationSc private final int maxNumFlushes; protected final Map<String, ILSMIOOperation> runningFlushOperations = new HashMap<>(); + protected final Map<String, ILSMIOOperation> runningReplicateOperations = new HashMap<>(); protected final Deque<ILSMIOOperation> waitingFlushOperations = new ArrayDeque<>(); protected final Deque<ILSMIOOperation> waitingMergeOperations = new ArrayDeque<>(); - + protected final Deque<ILSMIOOperation> waitingReplicateOperations = new ArrayDeque<>(); protected final Map<String, Throwable> failedGroups = new HashMap<>(); public AbstractAsynchronousScheduler(ThreadFactory threadFactory, final IIoOperationFailedCallback callback, @@ -58,8 +59,11 @@ public abstract class AbstractAsynchronousScheduler implements ILSMIOOperationSc case MERGE: scheduleMerge(operation); break; + case REPLICATE: + scheduleReplicate(operation); + break; case NOOP: - return; + break; default: // this should never happen // just guard here to avoid silent failures in case of future extensions @@ -75,6 +79,10 @@ public abstract class AbstractAsynchronousScheduler implements ILSMIOOperationSc break; case MERGE: completeMerge(operation); + break; + case REPLICATE: + completeReplicate(operation); + break; case NOOP: return; default: @@ -149,6 +157,46 @@ public abstract class AbstractAsynchronousScheduler implements ILSMIOOperationSc } } + private void scheduleReplicate(ILSMIOOperation operation) { + String id = operation.getIndexIdentifier(); + synchronized (executor) { + if (runningReplicateOperations.size() >= maxNumFlushes || runningReplicateOperations.containsKey(id)) { + waitingReplicateOperations.add(operation); + } else { + runningReplicateOperations.put(id, operation); + executor.submit(operation); + } + } + } + + private void completeReplicate(ILSMIOOperation operation) { + String id = operation.getIndexIdentifier(); + synchronized (executor) { + runningReplicateOperations.remove(id); + // Schedule replicate in FIFO order. Must make sure that there is at most one scheduled replicate for each index. + for (ILSMIOOperation replicateOp : waitingReplicateOperations) { + String replicateOpId = replicateOp.getIndexIdentifier(); + if (runningReplicateOperations.size() < maxNumFlushes) { + if (!runningReplicateOperations.containsKey(replicateOpId) && !replicateOp.isCompleted()) { + runningReplicateOperations.put(replicateOpId, replicateOp); + executor.submit(replicateOp); + } + } else { + break; + } + } + // cleanup scheduled replicate + while (!waitingReplicateOperations.isEmpty()) { + ILSMIOOperation top = waitingReplicateOperations.peek(); + if (top.isCompleted() || runningReplicateOperations.get(top.getIndexIdentifier()) == top) { + waitingReplicateOperations.poll(); + } else { + break; + } + } + } + } + @Override public void close() throws IOException { executor.shutdown();
