This is an automated email from the ASF dual-hosted git repository. mhubail pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
The following commit(s) were added to refs/heads/master by this push: new 8f278c0 [NO ISSUE][REP] Add API to perform non-delta recovery for a replica 8f278c0 is described below commit 8f278c042d92135f737cd7b26bd56a3479e11106 Author: Murtadha Hubail <murtadha.hub...@couchbase.com> AuthorDate: Tue Sep 14 02:12:15 2021 +0300 [NO ISSUE][REP] Add API to perform non-delta recovery for a replica - user model changes: no - storage format changes: no - interface changes: no Details: - Add an option to perform non-delta recovery for a replica. Change-Id: Ib1837e8f1aefdd9e085ccfd62f1c6e6d4eb969e8 Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/13223 Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Reviewed-by: Murtadha Hubail <mhub...@apache.org> Reviewed-by: Ali Alsuliman <ali.al.solai...@gmail.com> --- .../asterix/replication/api/PartitionReplica.java | 6 +- .../replication/messaging/DeletePartitionTask.java | 75 ++++++++++++++++++++++ .../messaging/PartitionResourcesListResponse.java | 2 +- .../replication/messaging/ReplicationProtocol.java | 5 +- .../replication/sync/ReplicaFilesSynchronizer.java | 14 +++- .../replication/sync/ReplicaSynchronizer.java | 8 +-- .../PersistentLocalResourceRepository.java | 12 ++++ 7 files changed, 112 insertions(+), 10 deletions(-) diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/PartitionReplica.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/PartitionReplica.java index e265d03..3b10700 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/PartitionReplica.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/PartitionReplica.java @@ -79,17 +79,17 @@ public class PartitionReplica implements IPartitionReplica { } public synchronized void sync() { - sync(true); + sync(true, true); } - public synchronized void sync(boolean register) { + public synchronized void sync(boolean register, boolean deltaRecovery) { if (status == IN_SYNC || status == CATCHING_UP) { return; } setStatus(CATCHING_UP); appCtx.getThreadExecutor().execute(() -> { try { - new ReplicaSynchronizer(appCtx, this).sync(register); + new ReplicaSynchronizer(appCtx, this).sync(register, deltaRecovery); setStatus(IN_SYNC); } catch (Exception e) { LOGGER.error(() -> "Failed to sync replica " + this, e); diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/DeletePartitionTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/DeletePartitionTask.java new file mode 100644 index 0000000..90139df --- /dev/null +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/DeletePartitionTask.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.replication.messaging; + +import java.io.DataInput; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.OutputStream; + +import org.apache.asterix.common.api.INcApplicationContext; +import org.apache.asterix.common.exceptions.ReplicationException; +import org.apache.asterix.replication.api.IReplicaTask; +import org.apache.asterix.replication.api.IReplicationWorker; +import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public class DeletePartitionTask implements IReplicaTask { + + private static final Logger LOGGER = LogManager.getLogger(); + private final int partitionId; + + public DeletePartitionTask(int partitionId) { + this.partitionId = partitionId; + } + + @Override + public void perform(INcApplicationContext appCtx, IReplicationWorker worker) { + try { + PersistentLocalResourceRepository localResourceRepository = + (PersistentLocalResourceRepository) appCtx.getLocalResourceRepository(); + LOGGER.warn("deleting storage partition {}", partitionId); + localResourceRepository.deletePartition(partitionId); + ReplicationProtocol.sendAck(worker.getChannel(), worker.getReusableBuffer()); + } catch (Exception e) { + throw new ReplicationException(e); + } + } + + @Override + public ReplicationProtocol.ReplicationRequestType getMessageType() { + return ReplicationProtocol.ReplicationRequestType.DELETE_PARTITION; + } + + @Override + public void serialize(OutputStream out) throws HyracksDataException { + try { + DataOutputStream dos = new DataOutputStream(out); + dos.writeInt(partitionId); + } catch (IOException e) { + throw HyracksDataException.create(e); + } + } + + public static DeletePartitionTask create(DataInput input) throws IOException { + return new DeletePartitionTask(input.readInt()); + } +} diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListResponse.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListResponse.java index a9921c6..1a5ba88 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListResponse.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListResponse.java @@ -33,7 +33,7 @@ import org.apache.hyracks.api.exceptions.HyracksDataException; public class PartitionResourcesListResponse implements IReplicationMessage { private final int partition; - private Map<String, Long> partitionReplicatedResources; + private final Map<String, Long> partitionReplicatedResources; private final List<String> files; private final boolean owner; diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicationProtocol.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicationProtocol.java index ed2c93f..5b0c64e 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicationProtocol.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicationProtocol.java @@ -61,7 +61,8 @@ public class ReplicationProtocol { LSM_COMPONENT_MASK, MARK_COMPONENT_VALID, DROP_INDEX, - REPLICATE_LOGS + REPLICATE_LOGS, + DELETE_PARTITION } private static final Map<Integer, ReplicationRequestType> TYPES = new HashMap<>(); @@ -177,6 +178,8 @@ public class ReplicationProtocol { return MarkComponentValidTask.create(dis); case REPLICATE_LOGS: return ReplicateLogsTask.create(dis); + case DELETE_PARTITION: + return DeletePartitionTask.create(dis); default: throw new IllegalStateException("Unrecognized replication message"); } diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java index faf3f54..735318d 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java @@ -35,6 +35,7 @@ import org.apache.asterix.common.storage.IndexCheckpoint; import org.apache.asterix.common.storage.ResourceReference; import org.apache.asterix.common.utils.StoragePathUtil; import org.apache.asterix.replication.api.PartitionReplica; +import org.apache.asterix.replication.messaging.DeletePartitionTask; import org.apache.asterix.replication.messaging.PartitionResourcesListResponse; import org.apache.asterix.replication.messaging.PartitionResourcesListTask; import org.apache.asterix.replication.messaging.ReplicationProtocol; @@ -55,14 +56,19 @@ public class ReplicaFilesSynchronizer { private static final Logger LOGGER = LogManager.getLogger(); private final PartitionReplica replica; private final INcApplicationContext appCtx; + private final boolean deltaRecovery; - public ReplicaFilesSynchronizer(INcApplicationContext appCtx, PartitionReplica replica) { + public ReplicaFilesSynchronizer(INcApplicationContext appCtx, PartitionReplica replica, boolean deltaRecovery) { this.appCtx = appCtx; this.replica = replica; + this.deltaRecovery = deltaRecovery; } public void sync() throws IOException { final int partition = replica.getIdentifier().getPartition(); + if (!deltaRecovery) { + deletePartitionFromReplica(partition); + } PartitionResourcesListResponse replicaResourceResponse = getReplicaFiles(partition); Map<ResourceReference, Long> resourceReferenceLongMap = getValidReplicaResources( replicaResourceResponse.getPartitionReplicatedResources(), replicaResourceResponse.isOwner()); @@ -82,6 +88,12 @@ public class ReplicaFilesSynchronizer { deleteReplicaExtraFiles(replicaFiles, masterFiles); } + private void deletePartitionFromReplica(int partitionId) throws IOException { + DeletePartitionTask deletePartitionTask = new DeletePartitionTask(partitionId); + ReplicationProtocol.sendTo(replica, deletePartitionTask); + ReplicationProtocol.waitForAck(replica); + } + private void deleteReplicaExtraFiles(Set<String> replicaFiles, Set<String> masterFiles) { final List<String> replicaInvalidFiles = replicaFiles.stream().filter(file -> !masterFiles.contains(file)).collect(Collectors.toList()); diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java index 6030245..05e2e75 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java @@ -43,13 +43,13 @@ public class ReplicaSynchronizer { this.replica = replica; } - public void sync(boolean register) throws IOException { + public void sync(boolean register, boolean deltaRecovery) throws IOException { synchronized (appCtx.getReplicaManager().getReplicaSyncLock()) { final ICheckpointManager checkpointManager = appCtx.getTransactionSubsystem().getCheckpointManager(); try { // suspend checkpointing datasets to prevent async IO operations while sync'ing replicas checkpointManager.suspend(); - syncFiles(); + syncFiles(deltaRecovery); checkpointReplicaIndexes(); if (register) { appCtx.getReplicationManager().register(replica); @@ -60,8 +60,8 @@ public class ReplicaSynchronizer { } } - private void syncFiles() throws IOException { - final ReplicaFilesSynchronizer fileSync = new ReplicaFilesSynchronizer(appCtx, replica); + private void syncFiles(boolean deltaRecovery) throws IOException { + final ReplicaFilesSynchronizer fileSync = new ReplicaFilesSynchronizer(appCtx, replica, deltaRecovery); // flush replicated dataset to generate disk component for any remaining in-memory components final IReplicationStrategy replStrategy = appCtx.getReplicationManager().getReplicationStrategy(); appCtx.getDatasetLifecycleManager().flushDataset(replStrategy); diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java index ee5b16e..02f5772 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java @@ -695,4 +695,16 @@ public class PersistentLocalResourceRepository implements ILocalResourceReposito FileReference resolve = ioManager.resolve(path.toString()); return resolve.getFile().toPath(); } + + public void deletePartition(int partitionId) { + List<File> onDiskPartitions = getOnDiskPartitions(); + for (File onDiskPartition : onDiskPartitions) { + int partitionNum = StoragePathUtil.getPartitionNumFromRelativePath(onDiskPartition.getAbsolutePath()); + if (partitionNum == partitionId) { + LOGGER.warn("deleting partition {}", partitionNum); + FileUtils.deleteQuietly(onDiskPartition); + return; + } + } + } }