http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateLogsTask.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateLogsTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateLogsTask.java new file mode 100644 index 0000000..b71f4b8 --- /dev/null +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateLogsTask.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.replication.messaging; + +import java.io.DataInput; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.nio.channels.SocketChannel; + +import org.apache.asterix.common.api.INcApplicationContext; +import org.apache.asterix.common.exceptions.ReplicationException; +import org.apache.asterix.replication.api.IReplicationWorker; +import org.apache.asterix.common.transactions.ILogManager; +import org.apache.asterix.replication.api.IReplicaTask; +import org.apache.asterix.replication.logging.RemoteLogRecord; +import org.apache.asterix.replication.logging.RemoteLogsProcessor; +import org.apache.asterix.replication.management.ReplicationChannel; +import org.apache.hyracks.api.exceptions.HyracksDataException; + +/** + * A task to replicate transaction logs from master replica + */ +public class ReplicateLogsTask implements IReplicaTask { + + public static final int END_REPLICATION_LOG_SIZE = 1; + private final String nodeId; + + public ReplicateLogsTask(String nodeId) { + this.nodeId = nodeId; + } + + @Override + public void perform(INcApplicationContext appCtx, IReplicationWorker worker) { + final ReplicationChannel replicationChannel = (ReplicationChannel) appCtx.getReplicationChannel(); + final RemoteLogsProcessor logsProcessor = replicationChannel.getRemoteLogsProcessor(); + final ILogManager logManager = appCtx.getTransactionSubsystem().getLogManager(); + final RemoteLogRecord reusableLog = new RemoteLogRecord(); + final SocketChannel channel = worker.getChannel(); + ByteBuffer logsBuffer = ByteBuffer.allocate(logManager.getLogPageSize()); + try { + while (true) { + // read a batch of logs + logsBuffer = ReplicationProtocol.readRequest(channel, logsBuffer); + // check if it is end of handshake + if (logsBuffer.remaining() == END_REPLICATION_LOG_SIZE) { + break; + } + logsProcessor.process(logsBuffer, reusableLog, worker); + } + } catch (IOException e) { + throw new ReplicationException(e); + } + } + + @Override + public ReplicationProtocol.ReplicationRequestType getMessageType() { + return ReplicationProtocol.ReplicationRequestType.REPLICATE_LOGS; + } + + @Override + public void serialize(OutputStream out) throws HyracksDataException { + try { + DataOutputStream dos = new DataOutputStream(out); + dos.writeUTF(nodeId); + } catch (IOException e) { + throw HyracksDataException.create(e); + } + } + + public static ReplicateLogsTask create(DataInput input) throws IOException { + final String node = input.readUTF(); + return new ReplicateLogsTask(node); + } +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicationProtocol.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicationProtocol.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicationProtocol.java new file mode 100644 index 0000000..280a2d4 --- /dev/null +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicationProtocol.java @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.replication.messaging; + +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.SocketChannel; +import java.util.HashMap; +import java.util.Map; +import java.util.stream.Stream; + +import org.apache.asterix.common.exceptions.ReplicationException; +import org.apache.asterix.replication.api.IReplicationMessage; +import org.apache.asterix.replication.api.PartitionReplica; +import org.apache.asterix.replication.management.NetworkingUtil; +import org.apache.hyracks.data.std.util.ExtendedByteArrayOutputStream; +import org.apache.hyracks.util.StorageUtil; + +public class ReplicationProtocol { + + /** + * All replication messages start with ReplicationRequestType (4 bytes), then the length of the request in bytes + */ + public static final String LOG_REPLICATION_ACK = "$"; + public static final int INITIAL_BUFFER_SIZE = StorageUtil.getIntSizeInBytes(4, StorageUtil.StorageUnit.KILOBYTE); + private static final int REPLICATION_REQUEST_TYPE_SIZE = Integer.BYTES; + private static final int REPLICATION_REQUEST_HEADER_SIZE = REPLICATION_REQUEST_TYPE_SIZE + Integer.BYTES; + + public enum ReplicationRequestType { + GOODBYE, + ACK, + PARTITION_RESOURCES_REQUEST, + PARTITION_RESOURCES_RESPONSE, + REPLICATE_RESOURCE_FILE, + DELETE_RESOURCE_FILE, + CHECKPOINT_PARTITION, + LSM_COMPONENT_MASK, + MARK_COMPONENT_VALID, + DROP_INDEX, + REPLICATE_LOGS + } + + private static final Map<Integer, ReplicationRequestType> TYPES = new HashMap<>(); + + static { + Stream.of(ReplicationRequestType.values()).forEach(type -> TYPES.put(type.ordinal(), type)); + } + + public static ByteBuffer readRequest(SocketChannel socketChannel, ByteBuffer dataBuffer) throws IOException { + // read request size + NetworkingUtil.readBytes(socketChannel, dataBuffer, Integer.BYTES); + final int requestSize = dataBuffer.getInt(); + final ByteBuffer buf = ensureSize(dataBuffer, requestSize); + // read request + NetworkingUtil.readBytes(socketChannel, buf, requestSize); + return dataBuffer; + } + + public static ReplicationRequestType getRequestType(SocketChannel socketChannel, ByteBuffer byteBuffer) + throws IOException { + // read replication request type + NetworkingUtil.readBytes(socketChannel, byteBuffer, REPLICATION_REQUEST_TYPE_SIZE); + return TYPES.get(byteBuffer.getInt()); + } + + private static ByteBuffer getGoodbyeBuffer() { + ByteBuffer bb = ByteBuffer.allocate(REPLICATION_REQUEST_TYPE_SIZE); + bb.putInt(ReplicationRequestType.GOODBYE.ordinal()); + bb.flip(); + return bb; + } + + public static int getTxnIdFromLogAckMessage(String msg) { + return Integer.parseInt(msg.substring(msg.indexOf(LOG_REPLICATION_ACK) + 1)); + } + + public static void sendGoodbye(SocketChannel socketChannel) throws IOException { + ByteBuffer goodbyeBuffer = ReplicationProtocol.getGoodbyeBuffer(); + NetworkingUtil.transferBufferToChannel(socketChannel, goodbyeBuffer); + } + + public static void sendAck(SocketChannel socketChannel, ByteBuffer buf) { + try { + buf.clear(); + buf.putInt(ReplicationRequestType.ACK.ordinal()); + buf.flip(); + NetworkingUtil.transferBufferToChannel(socketChannel, buf); + } catch (IOException e) { + throw new ReplicationException(e); + } + } + + public static void waitForAck(PartitionReplica replica) throws IOException { + final SocketChannel channel = replica.getChannel(); + final ByteBuffer buf = replica.getReusableBuffer(); + ReplicationRequestType responseFunction = ReplicationProtocol.getRequestType(channel, buf); + if (responseFunction != ReplicationRequestType.ACK) { + throw new IllegalStateException("Unexpected response while waiting for ack."); + } + } + + public static void sendTo(PartitionReplica replica, IReplicationMessage task) { + final SocketChannel channel = replica.getChannel(); + final ByteBuffer buf = replica.getReusableBuffer(); + sendTo(channel, task, buf); + } + + public static void sendTo(SocketChannel channel, IReplicationMessage task, ByteBuffer buf) { + ExtendedByteArrayOutputStream outputStream = new ExtendedByteArrayOutputStream(); + try (DataOutputStream oos = new DataOutputStream(outputStream)) { + task.serialize(oos); + final int requestSize = REPLICATION_REQUEST_HEADER_SIZE + oos.size(); + final ByteBuffer requestBuffer = ensureSize(buf, requestSize); + requestBuffer.putInt(task.getMessageType().ordinal()); + requestBuffer.putInt(oos.size()); + requestBuffer.put(outputStream.getByteArray(), 0, outputStream.getLength()); + requestBuffer.flip(); + NetworkingUtil.transferBufferToChannel(channel, requestBuffer); + } catch (IOException e) { + throw new ReplicationException(e); + } + } + + public static IReplicationMessage read(SocketChannel socketChannel, ByteBuffer buffer) throws IOException { + final ReplicationRequestType type = getRequestType(socketChannel, buffer); + return readMessage(type, socketChannel, buffer); + } + + public static IReplicationMessage readMessage(ReplicationRequestType type, SocketChannel socketChannel, + ByteBuffer buffer) { + try { + ReplicationProtocol.readRequest(socketChannel, buffer); + final ByteArrayInputStream input = + new ByteArrayInputStream(buffer.array(), buffer.position(), buffer.limit()); + try (DataInputStream dis = new DataInputStream(input)) { + switch (type) { + case PARTITION_RESOURCES_REQUEST: + return PartitionResourcesListTask.create(dis); + case PARTITION_RESOURCES_RESPONSE: + return PartitionResourcesListResponse.create(dis); + case REPLICATE_RESOURCE_FILE: + return ReplicateFileTask.create(dis); + case DELETE_RESOURCE_FILE: + return DeleteFileTask.create(dis); + case CHECKPOINT_PARTITION: + return CheckpointPartitionIndexesTask.create(dis); + case LSM_COMPONENT_MASK: + return ComponentMaskTask.create(dis); + case DROP_INDEX: + return DropIndexTask.create(dis); + case MARK_COMPONENT_VALID: + return MarkComponentValidTask.create(dis); + case REPLICATE_LOGS: + return ReplicateLogsTask.create(dis); + default: + throw new IllegalStateException("Unrecognized replication message"); + } + } + } catch (IOException e) { + throw new ReplicationException(e); + } + } + + public static ByteBuffer getEndLogReplicationBuffer() { + final int logsBatchSize = 1; + final ByteBuffer endLogRepBuffer = + ByteBuffer.allocate(Integer.BYTES + ReplicateLogsTask.END_REPLICATION_LOG_SIZE); + endLogRepBuffer.putInt(logsBatchSize); + endLogRepBuffer.put((byte) 0); + endLogRepBuffer.flip(); + return endLogRepBuffer; + } + + private static ByteBuffer ensureSize(ByteBuffer buffer, int size) { + if (buffer == null || buffer.capacity() < size) { + return ByteBuffer.allocate(size); + } + buffer.clear(); + return buffer; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/FileSynchronizer.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/FileSynchronizer.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/FileSynchronizer.java deleted file mode 100644 index 8aa4487..0000000 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/FileSynchronizer.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.replication.recovery; - -import java.io.IOException; -import java.io.RandomAccessFile; -import java.nio.channels.FileChannel; -import java.nio.channels.SocketChannel; - -import org.apache.asterix.common.api.INcApplicationContext; -import org.apache.asterix.common.exceptions.ReplicationException; -import org.apache.asterix.replication.functions.ReplicationProtocol; -import org.apache.asterix.replication.management.NetworkingUtil; -import org.apache.asterix.replication.messaging.DeleteFileTask; -import org.apache.asterix.replication.messaging.ReplicateFileTask; -import org.apache.asterix.replication.storage.PartitionReplica; -import org.apache.hyracks.api.io.FileReference; -import org.apache.hyracks.api.io.IIOManager; - -public class FileSynchronizer { - - private final INcApplicationContext appCtx; - private final PartitionReplica replica; - - public FileSynchronizer(INcApplicationContext appCtx, PartitionReplica replica) { - this.appCtx = appCtx; - this.replica = replica; - } - - public void replicate(String file) { - try { - final IIOManager ioManager = appCtx.getIoManager(); - final SocketChannel channel = replica.getChannel(); - final FileReference filePath = ioManager.resolve(file); - ReplicateFileTask task = new ReplicateFileTask(file, filePath.getFile().length()); - ReplicationProtocol.sendTo(replica, task); - // send the file itself - try (RandomAccessFile fromFile = new RandomAccessFile(filePath.getFile(), - "r"); FileChannel fileChannel = fromFile.getChannel()) { - NetworkingUtil.sendFile(fileChannel, channel); - } - ReplicationProtocol.waitForAck(replica); - } catch (IOException e) { - throw new ReplicationException(e); - } - } - - public void delete(String file) { - try { - final DeleteFileTask task = new DeleteFileTask(file); - ReplicationProtocol.sendTo(replica, task); - ReplicationProtocol.waitForAck(replica); - } catch (IOException e) { - throw new ReplicationException(e); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java deleted file mode 100644 index 5d044b4..0000000 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java +++ /dev/null @@ -1,320 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.replication.recovery; - -import java.io.IOException; -import java.util.Arrays; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.stream.Collectors; - -import org.apache.asterix.common.api.IDatasetLifecycleManager; -import org.apache.asterix.common.api.INcApplicationContext; -import org.apache.asterix.common.cluster.ClusterPartition; -import org.apache.asterix.common.config.ReplicationProperties; -import org.apache.asterix.common.exceptions.ACIDException; -import org.apache.asterix.common.replication.IRemoteRecoveryManager; -import org.apache.asterix.common.replication.IReplicationManager; -import org.apache.asterix.common.replication.IReplicationStrategy; -import org.apache.asterix.common.replication.Replica; -import org.apache.asterix.common.transactions.ILogManager; -import org.apache.asterix.common.transactions.IRecoveryManager; -import org.apache.asterix.replication.storage.ReplicaResourcesManager; -import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository; -import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -public class RemoteRecoveryManager implements IRemoteRecoveryManager { - - private final IReplicationManager replicationManager; - private static final Logger LOGGER = LogManager.getLogger(); - private final INcApplicationContext runtimeContext; - private final ReplicationProperties replicationProperties; - private Map<String, Set<String>> failbackRecoveryReplicas; - private IReplicationStrategy replicationStrategy; - - public RemoteRecoveryManager(IReplicationManager replicationManager, INcApplicationContext runtimeContext, - ReplicationProperties replicationProperties) { - this.replicationManager = replicationManager; - this.runtimeContext = runtimeContext; - this.replicationProperties = replicationProperties; - this.replicationStrategy = replicationManager.getReplicationStrategy(); - } - - private Map<String, Set<String>> constructRemoteRecoveryPlan() { - //1. identify which replicas reside in this node - String localNodeId = runtimeContext.getTransactionSubsystem().getId(); - - Set<Replica> replicas = replicationStrategy.getRemoteReplicasAndSelf(localNodeId); - Map<String, Set<String>> recoveryCandidates = new HashMap<>(); - Map<String, Integer> candidatesScore = new HashMap<>(); - - //2. identify which nodes has backup per lost node data - for (Replica node : replicas) { - Set<Replica> locations = replicationStrategy.getRemoteReplicasAndSelf(node.getId()); - - //since the local node just started, remove it from candidates - locations.remove(new Replica(localNodeId, "", -1)); - - //remove any dead replicas - Set<String> deadReplicas = replicationManager.getDeadReplicasIds(); - for (String deadReplica : deadReplicas) { - locations.remove(new Replica(deadReplica, "", -1)); - } - - //no active replicas to recover from - if (locations.isEmpty()) { - throw new IllegalStateException("Could not find any ACTIVE replica to recover " + node + " data."); - } - - for (Replica locationRep : locations) { - String location = locationRep.getId(); - if (candidatesScore.containsKey(location)) { - candidatesScore.put(location, candidatesScore.get(location) + 1); - } else { - candidatesScore.put(location, 1); - } - } - recoveryCandidates.put(node.getId(), locations.stream().map(Replica::getId).collect(Collectors.toSet())); - } - - Map<String, Set<String>> recoveryList = new HashMap<>(); - - //3. find best candidate to recover from per lost replica data - recoveryCandidates.forEach((key, value) -> { - int winnerScore = -1; - String winner = ""; - for (String node : value) { - - int nodeScore = candidatesScore.get(node); - - if (nodeScore > winnerScore) { - winnerScore = nodeScore; - winner = node; - } - } - - if (recoveryList.containsKey(winner)) { - recoveryList.get(winner).add(key); - } else { - Set<String> nodesToRecover = new HashSet<>(); - nodesToRecover.add(key); - recoveryList.put(winner, nodesToRecover); - } - - }); - - return recoveryList; - } - - @Override - public void replayReplicaPartitionLogs(Set<Integer> partitions, boolean flush) throws HyracksDataException { - ILogManager logManager = runtimeContext.getTransactionSubsystem().getLogManager(); - long minLSN = runtimeContext.getReplicaResourcesManager().getPartitionsMinLSN(partitions); - long readableSmallestLSN = logManager.getReadableSmallestLSN(); - if (minLSN < readableSmallestLSN) { - minLSN = readableSmallestLSN; - } - - //replay logs > minLSN that belong to these partitions - IRecoveryManager recoveryManager = runtimeContext.getTransactionSubsystem().getRecoveryManager(); - try { - recoveryManager.replayPartitionsLogs(partitions, logManager.getLogReader(true), minLSN); - if (flush) { - runtimeContext.getDatasetLifecycleManager().flushAllDatasets(); - } - } catch (IOException | ACIDException e) { - throw new HyracksDataException(e); - } - } - - @Override - public void takeoverPartitons(Integer[] partitions) throws IOException, ACIDException { - /* - * TODO even though the takeover is always expected to succeed, - * in case of any failure during the takeover, the CC should be - * notified that the takeover failed. - */ - Set<Integer> partitionsToTakeover = new HashSet<>(Arrays.asList(partitions)); - replayReplicaPartitionLogs(partitionsToTakeover, false); - - //mark these partitions as active in this node - PersistentLocalResourceRepository resourceRepository = (PersistentLocalResourceRepository) runtimeContext - .getLocalResourceRepository(); - for (Integer patitionId : partitions) { - resourceRepository.addActivePartition(patitionId); - } - } - - @Override - public void startFailbackProcess() { - int maxRecoveryAttempts = replicationProperties.getMaxRemoteRecoveryAttempts(); - PersistentLocalResourceRepository resourceRepository = (PersistentLocalResourceRepository) runtimeContext - .getLocalResourceRepository(); - IDatasetLifecycleManager datasetLifeCycleManager = runtimeContext.getDatasetLifecycleManager(); - Map<String, ClusterPartition[]> nodePartitions = runtimeContext.getMetadataProperties().getNodePartitions(); - - while (true) { - //start recovery steps - try { - if (maxRecoveryAttempts <= 0) { - //to avoid infinite loop in case of unexpected behavior. - throw new IllegalStateException("Failed to perform remote recovery."); - } - - /*** Prepare for Recovery ***/ - //1. check remote replicas states - replicationManager.initializeReplicasState(); - int activeReplicasCount = replicationManager.getActiveReplicasCount(); - - if (activeReplicasCount == 0) { - throw new IllegalStateException("no ACTIVE remote replica(s) exists to perform remote recovery"); - } - - //2. clean any memory data that could've existed from previous failed recovery attempt - datasetLifeCycleManager.closeAllDatasets(); - - //3. remove any existing storage data and initialize storage metadata - resourceRepository.deleteStorageData(); - - //4. select remote replicas to recover from per lost replica data - failbackRecoveryReplicas = constructRemoteRecoveryPlan(); - - /*** Start Recovery Per Lost Replica ***/ - for (Entry<String, Set<String>> remoteReplica : failbackRecoveryReplicas.entrySet()) { - String replicaId = remoteReplica.getKey(); - Set<String> ncsToRecoverFor = remoteReplica.getValue(); - Set<Integer> partitionsIds = new HashSet<>(); - for (String node : ncsToRecoverFor) { - partitionsIds.addAll((Arrays.asList(nodePartitions.get(node))).stream() - .map(ClusterPartition::getPartitionId).collect(Collectors.toList())); - } - - //1. Request indexes metadata and LSM components - replicationManager.requestReplicaFiles(replicaId, partitionsIds, new HashSet<String>()); - } - break; - } catch (IOException e) { - LOGGER.warn("Failed during remote recovery. Attempting again...", e); - maxRecoveryAttempts--; - } - } - } - - @Override - public void completeFailbackProcess() throws IOException, InterruptedException { - ILogManager logManager = runtimeContext.getTransactionSubsystem().getLogManager(); - ReplicaResourcesManager replicaResourcesManager = (ReplicaResourcesManager) runtimeContext - .getReplicaResourcesManager(); - Map<String, ClusterPartition[]> nodePartitions = runtimeContext.getMetadataProperties().getNodePartitions(); - - /* - * for each lost partition, get the remaining files from replicas - * to complete the failback process. - */ - try { - for (Entry<String, Set<String>> remoteReplica : failbackRecoveryReplicas.entrySet()) { - String replicaId = remoteReplica.getKey(); - Set<String> NCsDataToRecover = remoteReplica.getValue(); - Set<String> existingFiles = new HashSet<>(); - Set<Integer> partitionsToRecover = new HashSet<>(); - for (String nodeId : NCsDataToRecover) { - //get partitions that will be recovered from this node - ClusterPartition[] replicaPartitions = nodePartitions.get(nodeId); - for (ClusterPartition partition : replicaPartitions) { - existingFiles.addAll( - replicaResourcesManager.getPartitionIndexesFiles(partition.getPartitionId(), true)); - partitionsToRecover.add(partition.getPartitionId()); - } - } - - //Request remaining indexes files - replicationManager.requestReplicaFiles(replicaId, partitionsToRecover, existingFiles); - } - } catch (IOException e) { - /* - * in case of failure during failback completion process we need to construct a new plan - * and get all the files from the start since the remote replicas will change in the new plan. - */ - LOGGER.warn("Failed during completing failback. Restarting failback process...", e); - startFailbackProcess(); - } - - //get max LSN from selected remote replicas - long maxRemoteLSN = replicationManager.getMaxRemoteLSN(failbackRecoveryReplicas.keySet()); - - //6. force LogManager to start from a partition > maxLSN in selected remote replicas - logManager.renewLogFilesAndStartFromLSN(maxRemoteLSN); - - //start replication service after failback completed - runtimeContext.getReplicationChannel().start(); - runtimeContext.getReplicationManager().startReplicationThreads(); - - failbackRecoveryReplicas = null; - } - - //TODO refactor common code between remote recovery and failback process - @Override - public void doRemoteRecoveryPlan(Map<String, Set<Integer>> recoveryPlan) throws HyracksDataException { - int maxRecoveryAttempts = replicationProperties.getMaxRemoteRecoveryAttempts(); - PersistentLocalResourceRepository resourceRepository = (PersistentLocalResourceRepository) runtimeContext - .getLocalResourceRepository(); - IDatasetLifecycleManager datasetLifeCycleManager = runtimeContext.getDatasetLifecycleManager(); - ILogManager logManager = runtimeContext.getTransactionSubsystem().getLogManager(); - while (true) { - //start recovery steps - try { - if (maxRecoveryAttempts <= 0) { - //to avoid infinite loop in case of unexpected behavior. - throw new IllegalStateException("Failed to perform remote recovery."); - } - - /*** Prepare for Recovery ***/ - //1. clean any memory data that could've existed from previous failed recovery attempt - datasetLifeCycleManager.closeAllDatasets(); - - //2. remove any existing storage data and initialize storage metadata - resourceRepository.deleteStorageData(); - - /*** Start Recovery Per Lost Replica ***/ - for (Entry<String, Set<Integer>> remoteReplica : recoveryPlan.entrySet()) { - String replicaId = remoteReplica.getKey(); - Set<Integer> partitionsToRecover = remoteReplica.getValue(); - - //Request indexes metadata and LSM components - replicationManager.requestReplicaFiles(replicaId, partitionsToRecover, new HashSet<String>()); - } - - //get max LSN from selected remote replicas - long maxRemoteLSN = replicationManager.getMaxRemoteLSN(recoveryPlan.keySet()); - - //6. force LogManager to start from a partition > maxLSN in selected remote replicas - logManager.renewLogFilesAndStartFromLSN(maxRemoteLSN); - break; - } catch (IOException e) { - LOGGER.warn("Failed during remote recovery. Attempting again...", e); - maxRecoveryAttempts--; - } - } - } -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/ReplicaFilesSynchronizer.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/ReplicaFilesSynchronizer.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/ReplicaFilesSynchronizer.java deleted file mode 100644 index 2021cee..0000000 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/ReplicaFilesSynchronizer.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.replication.recovery; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.channels.SocketChannel; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.stream.Collectors; - -import org.apache.asterix.common.api.INcApplicationContext; -import org.apache.asterix.common.utils.StoragePathUtil; -import org.apache.asterix.replication.functions.ReplicationProtocol; -import org.apache.asterix.replication.messaging.PartitionResourcesListResponse; -import org.apache.asterix.replication.messaging.PartitionResourcesListTask; -import org.apache.asterix.replication.storage.PartitionReplica; - -/** - * Ensures that the files between master and a replica are synchronized - */ -public class ReplicaFilesSynchronizer { - - private final PartitionReplica replica; - private final INcApplicationContext appCtx; - - public ReplicaFilesSynchronizer(INcApplicationContext appCtx, PartitionReplica replica) { - this.appCtx = appCtx; - this.replica = replica; - } - - public void sync() throws IOException { - final int partition = replica.getIdentifier().getPartition(); - final Set<String> replicaFiles = getReplicaFiles(partition); - final Set<String> masterFiles = - appCtx.getReplicaResourcesManager().getPartitionIndexesFiles(partition, false).stream() - .map(StoragePathUtil::getFileRelativePath).collect(Collectors.toSet()); - // find files on master and not on replica - final List<String> replicaMissingFiles = - masterFiles.stream().filter(file -> !replicaFiles.contains(file)).collect(Collectors.toList()); - replicateMissingFiles(replicaMissingFiles); - // find files on replica and not on master - final List<String> replicaInvalidFiles = - replicaFiles.stream().filter(file -> !masterFiles.contains(file)).collect(Collectors.toList()); - deleteInvalidFiles(replicaInvalidFiles); - } - - private Set<String> getReplicaFiles(int partition) throws IOException { - final PartitionResourcesListTask replicaFilesRequest = new PartitionResourcesListTask(partition); - final SocketChannel channel = replica.getChannel(); - final ByteBuffer reusableBuffer = replica.gerReusableBuffer(); - ReplicationProtocol.sendTo(replica, replicaFilesRequest); - final PartitionResourcesListResponse response = - (PartitionResourcesListResponse) ReplicationProtocol.read(channel, reusableBuffer); - return new HashSet<>(response.getResources()); - } - - private void replicateMissingFiles(List<String> files) { - final FileSynchronizer sync = new FileSynchronizer(appCtx, replica); - files.forEach(sync::replicate); - } - - private void deleteInvalidFiles(List<String> files) { - final FileSynchronizer sync = new FileSynchronizer(appCtx, replica); - files.forEach(sync::delete); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/ReplicaSynchronizer.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/ReplicaSynchronizer.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/ReplicaSynchronizer.java deleted file mode 100644 index 1fa3246..0000000 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/ReplicaSynchronizer.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.replication.recovery; - -import java.io.IOException; - -import org.apache.asterix.common.api.INcApplicationContext; -import org.apache.asterix.common.config.ReplicationProperties; -import org.apache.asterix.common.replication.IReplicationStrategy; -import org.apache.asterix.replication.functions.ReplicationProtocol; -import org.apache.asterix.replication.messaging.CheckpointPartitionIndexesTask; -import org.apache.asterix.replication.storage.PartitionReplica; - -/** - * Performs the steps required to ensure any newly added replica - * will be in-sync with master - */ -public class ReplicaSynchronizer { - - private final INcApplicationContext appCtx; - private final PartitionReplica replica; - - public ReplicaSynchronizer(INcApplicationContext appCtx, PartitionReplica replica) { - this.appCtx = appCtx; - this.replica = replica; - } - - public void sync() throws IOException { - syncFiles(); - checkpointReplicaIndexes(); - appCtx.getReplicationManager().register(replica); - } - - private void syncFiles() throws IOException { - final ReplicaFilesSynchronizer fileSync = new ReplicaFilesSynchronizer(appCtx, replica); - fileSync.sync(); - // flush replicated dataset to generate disk component for any remaining in-memory components - final IReplicationStrategy replStrategy = appCtx.getReplicationManager().getReplicationStrategy(); - appCtx.getDatasetLifecycleManager().flushDataset(replStrategy); - // sync any newly generated files - fileSync.sync(); - } - - private void checkpointReplicaIndexes() throws IOException { - CheckpointPartitionIndexesTask task = - new CheckpointPartitionIndexesTask(replica.getIdentifier().getPartition()); - ReplicationProtocol.sendTo(replica, task); - ReplicationProtocol.waitForAck(replica); - } -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMComponentLSNSyncTask.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMComponentLSNSyncTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMComponentLSNSyncTask.java deleted file mode 100644 index 08c0ec7..0000000 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMComponentLSNSyncTask.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.replication.storage; - -public class LSMComponentLSNSyncTask { - - private String componentFilePath; - private String componentId; - - public LSMComponentLSNSyncTask(String componentId, String componentFilePath) { - this.componentId = componentId; - this.componentFilePath = componentFilePath; - } - - public String getComponentFilePath() { - return componentFilePath; - } - - public String getComponentId() { - return componentId; - } - -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMComponentProperties.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMComponentProperties.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMComponentProperties.java deleted file mode 100644 index bf987d0..0000000 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMComponentProperties.java +++ /dev/null @@ -1,159 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.replication.storage; - -import java.io.DataInput; -import java.io.DataOutputStream; -import java.io.File; -import java.io.IOException; -import java.io.OutputStream; -import java.nio.file.Paths; -import java.util.concurrent.atomic.AtomicInteger; - -import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback; -import org.apache.asterix.common.storage.ResourceReference; -import org.apache.asterix.replication.logging.TxnLogUtil; -import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexReplicationJob; -import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType; -import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex; -import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager; - -public class LSMComponentProperties { - - private AtomicInteger numberOfFiles; - private String componentId; - private long lsnOffset; - private long originalLSN; - private String nodeId; - private Long replicaLSN; - private String maskPath = null; - private String replicaPath = null; - private LSMOperationType opType; - - public LSMComponentProperties(ILSMIndexReplicationJob job, String nodeId) { - this.nodeId = nodeId; - componentId = LSMComponentProperties.getLSMComponentID((String) job.getJobFiles().toArray()[0]); - numberOfFiles = new AtomicInteger(job.getJobFiles().size()); - opType = job.getLSMOpType(); - originalLSN = opType == LSMOperationType.FLUSH ? - LSMComponentProperties.getLSMComponentLSN((AbstractLSMIndex) job.getLSMIndex(), - job.getLSMIndexOperationContext()) : 0; - } - - public LSMComponentProperties() { - } - - public static long getLSMComponentLSN(AbstractLSMIndex lsmIndex, ILSMIndexOperationContext ctx) { - long componentLSN = -1; - try { - componentLSN = ((AbstractLSMIOOperationCallback) lsmIndex.getIOOperationCallback()) - .getComponentLSN(ctx.getComponentsToBeReplicated()); - } catch (HyracksDataException e) { - e.printStackTrace(); - } - if (componentLSN < 0) { - componentLSN = 0; - } - return componentLSN; - } - - public void serialize(OutputStream out) throws IOException { - DataOutputStream dos = new DataOutputStream(out); - dos.writeUTF(componentId); - dos.writeUTF(nodeId); - dos.writeInt(numberOfFiles.get()); - dos.writeLong(originalLSN); - dos.writeLong(lsnOffset); - dos.writeInt(opType.ordinal()); - } - - public static LSMComponentProperties create(DataInput input) throws IOException { - LSMComponentProperties lsmCompProp = new LSMComponentProperties(); - lsmCompProp.componentId = input.readUTF(); - lsmCompProp.nodeId = input.readUTF(); - lsmCompProp.numberOfFiles = new AtomicInteger(input.readInt()); - lsmCompProp.originalLSN = input.readLong(); - lsmCompProp.lsnOffset = input.readLong(); - lsmCompProp.opType = LSMOperationType.values()[input.readInt()]; - return lsmCompProp; - } - - public String getMaskPath(ReplicaResourcesManager resourceManager) throws HyracksDataException { - if (maskPath == null) { - LSMIndexFileProperties afp = new LSMIndexFileProperties(this); - maskPath = getReplicaComponentPath(resourceManager) + File.separator + afp.getFileName() - + ReplicaResourcesManager.LSM_COMPONENT_MASK_SUFFIX; - } - return maskPath; - } - - public String getReplicaComponentPath(ReplicaResourcesManager resourceManager) throws HyracksDataException { - if (replicaPath == null) { - LSMIndexFileProperties afp = new LSMIndexFileProperties(this); - replicaPath = resourceManager.getIndexPath(afp); - } - return replicaPath; - } - - /*** - * @param filePath - * any file of the LSM component - * @return a unique id based on the timestamp of the component - */ - public static String getLSMComponentID(String filePath) { - final ResourceReference ref = ResourceReference.of(filePath); - final String fileUniqueTimestamp = - ref.getName().substring(0, ref.getName().lastIndexOf(AbstractLSMIndexFileManager.DELIMITER)); - return Paths.get(ref.getRelativePath().toString(), fileUniqueTimestamp).toString(); - } - - public String getComponentId() { - return componentId; - } - - public long getOriginalLSN() { - return originalLSN; - } - - public String getNodeId() { - return nodeId; - } - - public int markFileComplete() { - return numberOfFiles.decrementAndGet(); - } - - public Long getReplicaLSN() { - return replicaLSN; - } - - public void setReplicaLSN(Long replicaLSN) { - this.replicaLSN = replicaLSN; - } - - public LSMOperationType getOpType() { - return opType; - } - - public String getNodeUniqueLSN() { - return TxnLogUtil.getNodeUniqueLSN(nodeId, originalLSN); - } -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMIndexFileProperties.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMIndexFileProperties.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMIndexFileProperties.java deleted file mode 100644 index 2ebf2cb..0000000 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMIndexFileProperties.java +++ /dev/null @@ -1,109 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.replication.storage; - -import java.io.DataInput; -import java.io.DataOutputStream; -import java.io.IOException; -import java.io.OutputStream; -import java.nio.file.Paths; - -public class LSMIndexFileProperties { - - private long fileSize; - private String nodeId; - private boolean lsmComponentFile; - private String filePath; - private boolean requiresAck = false; - - public LSMIndexFileProperties() { - } - - public LSMIndexFileProperties(String filePath, long fileSize, String nodeId, boolean lsmComponentFile, - boolean requiresAck) { - initialize(filePath, fileSize, nodeId, lsmComponentFile, requiresAck); - } - - public LSMIndexFileProperties(LSMComponentProperties lsmComponentProperties) { - initialize(lsmComponentProperties.getComponentId(), -1, lsmComponentProperties.getNodeId(), false, false); - } - - public void initialize(String filePath, long fileSize, String nodeId, boolean lsmComponentFile, - boolean requiresAck) { - this.filePath = filePath; - this.fileSize = fileSize; - this.nodeId = nodeId; - this.lsmComponentFile = lsmComponentFile; - this.requiresAck = requiresAck; - } - - public void serialize(OutputStream out) throws IOException { - DataOutputStream dos = new DataOutputStream(out); - dos.writeUTF(nodeId); - dos.writeUTF(filePath); - dos.writeLong(fileSize); - dos.writeBoolean(lsmComponentFile); - dos.writeBoolean(requiresAck); - } - - public static LSMIndexFileProperties create(DataInput input) throws IOException { - String nodeId = input.readUTF(); - String filePath = input.readUTF(); - long fileSize = input.readLong(); - boolean lsmComponentFile = input.readBoolean(); - boolean requiresAck = input.readBoolean(); - LSMIndexFileProperties fileProp = - new LSMIndexFileProperties(filePath, fileSize, nodeId, lsmComponentFile, requiresAck); - return fileProp; - } - - public String getFilePath() { - return filePath; - } - - public long getFileSize() { - return fileSize; - } - - public String getNodeId() { - return nodeId; - } - - public boolean isLSMComponentFile() { - return lsmComponentFile; - } - - public boolean requiresAck() { - return requiresAck; - } - - public String getFileName() { - return Paths.get(filePath).toFile().getName(); - } - - @Override - public String toString() { - StringBuilder sb = new StringBuilder(); - sb.append("File Path: " + filePath + " "); - sb.append("File Size: " + fileSize + " "); - sb.append("Node ID: " + nodeId + " "); - sb.append("isLSMComponentFile : " + lsmComponentFile + " "); - return sb.toString(); - } -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/PartitionReplica.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/PartitionReplica.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/PartitionReplica.java deleted file mode 100644 index b7fa49d..0000000 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/PartitionReplica.java +++ /dev/null @@ -1,160 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.replication.storage; - -import static org.apache.asterix.common.replication.IPartitionReplica.PartitionReplicaStatus.CATCHING_UP; -import static org.apache.asterix.common.replication.IPartitionReplica.PartitionReplicaStatus.DISCONNECTED; -import static org.apache.asterix.common.replication.IPartitionReplica.PartitionReplicaStatus.IN_SYNC; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.channels.SocketChannel; - -import org.apache.asterix.common.api.INcApplicationContext; -import org.apache.asterix.common.exceptions.ReplicationException; -import org.apache.asterix.common.replication.IPartitionReplica; -import org.apache.asterix.common.storage.ReplicaIdentifier; -import org.apache.asterix.replication.functions.ReplicationProtocol; -import org.apache.asterix.replication.recovery.ReplicaSynchronizer; -import org.apache.hyracks.util.JSONUtil; -import org.apache.hyracks.util.StorageUtil; -import org.apache.hyracks.util.annotations.ThreadSafe; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ObjectNode; - -@ThreadSafe -public class PartitionReplica implements IPartitionReplica { - - private static final Logger LOGGER = LogManager.getLogger(); - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - private static final int INITIAL_BUFFER_SIZE = StorageUtil.getIntSizeInBytes(4, StorageUtil.StorageUnit.KILOBYTE); - private final INcApplicationContext appCtx; - private final ReplicaIdentifier id; - private ByteBuffer reusbaleBuf; - private PartitionReplicaStatus status = DISCONNECTED; - private SocketChannel sc; - - public PartitionReplica(ReplicaIdentifier id, INcApplicationContext appCtx) { - this.id = id; - this.appCtx = appCtx; - } - - @Override - public synchronized PartitionReplicaStatus getStatus() { - return status; - } - - @Override - public ReplicaIdentifier getIdentifier() { - return id; - } - - public synchronized void sync() { - if (status == IN_SYNC || status == CATCHING_UP) { - return; - } - setStatus(CATCHING_UP); - appCtx.getThreadExecutor().execute(() -> { - try { - new ReplicaSynchronizer(appCtx, this).sync(); - setStatus(IN_SYNC); - } catch (Exception e) { - LOGGER.error(() -> "Failed to sync replica " + this, e); - setStatus(DISCONNECTED); - } finally { - close(); - } - }); - } - - public synchronized SocketChannel getChannel() { - try { - if (sc == null || !sc.isOpen() || !sc.isConnected()) { - sc = SocketChannel.open(); - sc.configureBlocking(true); - sc.connect(id.getLocation()); - } - return sc; - } catch (IOException e) { - throw new ReplicationException(e); - } - } - - public synchronized void close() { - try { - if (sc != null && sc.isOpen()) { - ReplicationProtocol.sendGoodbye(sc); - sc.close(); - sc = null; - } - } catch (IOException e) { - throw new ReplicationException(e); - } - } - - public synchronized ByteBuffer gerReusableBuffer() { - if (reusbaleBuf == null) { - reusbaleBuf = ByteBuffer.allocate(INITIAL_BUFFER_SIZE); - } - return reusbaleBuf; - } - - private JsonNode asJson() { - ObjectNode json = OBJECT_MAPPER.createObjectNode(); - json.put("id", id.toString()); - json.put("state", status.name()); - return json; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - PartitionReplica that = (PartitionReplica) o; - return id.equals(that.id); - } - - @Override - public int hashCode() { - return id.hashCode(); - } - - @Override - public String toString() { - try { - return JSONUtil.convertNode(asJson()); - } catch (JsonProcessingException e) { - throw new ReplicationException(e); - } - } - - private synchronized void setStatus(PartitionReplicaStatus status) { - LOGGER.info(() -> "Replica " + this + " status changing: " + this.status + " -> " + status); - this.status = status; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java deleted file mode 100644 index 398f97d..0000000 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java +++ /dev/null @@ -1,226 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.replication.storage; - -import java.io.File; -import java.io.FilenameFilter; -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.stream.Collectors; - -import org.apache.asterix.common.cluster.ClusterPartition; -import org.apache.asterix.common.config.MetadataProperties; -import org.apache.asterix.common.dataflow.DatasetLocalResource; -import org.apache.asterix.common.replication.IReplicaResourcesManager; -import org.apache.asterix.common.storage.DatasetResourceReference; -import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider; -import org.apache.asterix.common.utils.StorageConstants; -import org.apache.asterix.common.utils.StoragePathUtil; -import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository; -import org.apache.commons.io.FileUtils; -import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.api.io.FileReference; -import org.apache.hyracks.storage.common.ILocalResourceRepository; -import org.apache.hyracks.storage.common.LocalResource; - -public class ReplicaResourcesManager implements IReplicaResourcesManager { - public static final String LSM_COMPONENT_MASK_SUFFIX = "_mask"; - private final PersistentLocalResourceRepository localRepository; - private final Map<String, ClusterPartition[]> nodePartitions; - private final IIndexCheckpointManagerProvider indexCheckpointManagerProvider; - - public ReplicaResourcesManager(ILocalResourceRepository localRepository, MetadataProperties metadataProperties, - IIndexCheckpointManagerProvider indexCheckpointManagerProvider) { - this.localRepository = (PersistentLocalResourceRepository) localRepository; - this.indexCheckpointManagerProvider = indexCheckpointManagerProvider; - nodePartitions = metadataProperties.getNodePartitions(); - } - - public void deleteIndexFile(LSMIndexFileProperties afp) throws HyracksDataException { - String indexPath = getIndexPath(afp); - if (indexPath != null) { - if (afp.isLSMComponentFile()) { - //delete index file - String indexFilePath = indexPath + File.separator + afp.getFileName(); - File destFile = new File(indexFilePath); - FileUtils.deleteQuietly(destFile); - } else { - //delete index directory - FileUtils.deleteQuietly(new File(indexPath)); - } - } - } - - public String getIndexPath(LSMIndexFileProperties fileProperties) throws HyracksDataException { - final FileReference indexPath = localRepository.getIndexPath(Paths.get(fileProperties.getFilePath())); - if (!indexPath.getFile().exists()) { - indexPath.getFile().mkdirs(); - } - return indexPath.toString(); - } - - public void createRemoteLSMComponentMask(LSMComponentProperties lsmComponentProperties) throws IOException { - String maskPath = lsmComponentProperties.getMaskPath(this); - Path path = Paths.get(maskPath); - if (!Files.exists(path)) { - File maskFile = new File(maskPath); - maskFile.createNewFile(); - } - } - - public void markLSMComponentReplicaAsValid(LSMComponentProperties lsmComponentProperties) throws IOException { - //remove mask to mark component as valid - String maskPath = lsmComponentProperties.getMaskPath(this); - Path path = Paths.get(maskPath); - Files.deleteIfExists(path); - } - - public Set<File> getReplicaIndexes(String replicaId) throws HyracksDataException { - Set<File> remoteIndexesPaths = new HashSet<File>(); - ClusterPartition[] partitions = nodePartitions.get(replicaId); - for (ClusterPartition partition : partitions) { - remoteIndexesPaths.addAll(localRepository.getPartitionIndexes(partition.getPartitionId())); - } - return remoteIndexesPaths; - } - - @Override - public long getPartitionsMinLSN(Set<Integer> partitions) throws HyracksDataException { - long minRemoteLSN = Long.MAX_VALUE; - for (Integer partition : partitions) { - final List<DatasetResourceReference> partitionResources = localRepository.getResources(resource -> { - DatasetLocalResource dsResource = (DatasetLocalResource) resource.getResource(); - return dsResource.getPartition() == partition; - }).values().stream().map(DatasetResourceReference::of).collect(Collectors.toList()); - for (DatasetResourceReference indexRef : partitionResources) { - long remoteIndexMaxLSN = indexCheckpointManagerProvider.get(indexRef).getLowWatermark(); - minRemoteLSN = Math.min(minRemoteLSN, remoteIndexMaxLSN); - } - } - return minRemoteLSN; - } - - public Map<Long, DatasetResourceReference> getLaggingReplicaIndexesId2PathMap(String replicaId, long targetLSN) - throws HyracksDataException { - Map<Long, DatasetResourceReference> laggingReplicaIndexes = new HashMap<>(); - final List<Integer> replicaPartitions = - Arrays.stream(nodePartitions.get(replicaId)).map(ClusterPartition::getPartitionId) - .collect(Collectors.toList()); - for (int patition : replicaPartitions) { - final Map<Long, LocalResource> partitionResources = localRepository.getPartitionResources(patition); - final List<DatasetResourceReference> indexesRefs = - partitionResources.values().stream().map(DatasetResourceReference::of).collect(Collectors.toList()); - for (DatasetResourceReference ref : indexesRefs) { - if (indexCheckpointManagerProvider.get(ref).getLowWatermark() < targetLSN) { - laggingReplicaIndexes.put(ref.getResourceId(), ref); - } - } - } - return laggingReplicaIndexes; - } - - public void cleanInvalidLSMComponents(String replicaId) { - //for every index in replica - Set<File> remoteIndexes = null; - try { - remoteIndexes = getReplicaIndexes(replicaId); - } catch (HyracksDataException e) { - throw new IllegalStateException(e); - } - for (File remoteIndexFile : remoteIndexes) { - //search for any mask - File[] masks = remoteIndexFile.listFiles(LSM_COMPONENTS_MASKS_FILTER); - - for (File mask : masks) { - //delete all files belonging to this mask - deleteLSMComponentFilesForMask(mask); - //delete the mask itself - mask.delete(); - } - } - } - - private static void deleteLSMComponentFilesForMask(File maskFile) { - String lsmComponentTimeStamp = maskFile.getName().substring(0, - maskFile.getName().length() - LSM_COMPONENT_MASK_SUFFIX.length()); - File indexFolder = maskFile.getParentFile(); - File[] lsmComponentsFiles = indexFolder.listFiles(LSM_COMPONENTS_NON_MASKS_FILTER); - for (File lsmComponentFile : lsmComponentsFiles) { - if (lsmComponentFile.getName().contains(lsmComponentTimeStamp)) { - //match based on time stamp - lsmComponentFile.delete(); - } - } - } - - /** - * @param partition - * @return Absolute paths to all partition files - */ - @Override - public List<String> getPartitionIndexesFiles(int partition, boolean relativePath) throws HyracksDataException { - List<String> partitionFiles = new ArrayList<String>(); - Set<File> partitionIndexes = localRepository.getPartitionIndexes(partition); - for (File indexDir : partitionIndexes) { - if (indexDir.isDirectory()) { - File[] indexFiles = indexDir.listFiles(LSM_INDEX_FILES_FILTER); - if (indexFiles != null) { - for (File file : indexFiles) { - if (!relativePath) { - partitionFiles.add(file.getAbsolutePath()); - } else { - partitionFiles.add(StoragePathUtil.getIndexFileRelativePath(file.getAbsolutePath())); - } - } - } - } - } - return partitionFiles; - } - - private static final FilenameFilter LSM_COMPONENTS_MASKS_FILTER = new FilenameFilter() { - @Override - public boolean accept(File dir, String name) { - return name.endsWith(LSM_COMPONENT_MASK_SUFFIX); - } - }; - - private static final FilenameFilter LSM_COMPONENTS_NON_MASKS_FILTER = new FilenameFilter() { - @Override - public boolean accept(File dir, String name) { - return !name.endsWith(LSM_COMPONENT_MASK_SUFFIX); - } - }; - - private static final FilenameFilter LSM_INDEX_FILES_FILTER = new FilenameFilter() { - @Override - public boolean accept(File dir, String name) { - return name.equalsIgnoreCase(StorageConstants.METADATA_FILE_NAME) || !name.startsWith("."); - } - }; -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/FileSynchronizer.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/FileSynchronizer.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/FileSynchronizer.java new file mode 100644 index 0000000..e1649b3 --- /dev/null +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/FileSynchronizer.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.replication.sync; + +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.channels.FileChannel; +import java.nio.channels.SocketChannel; + +import org.apache.asterix.common.api.INcApplicationContext; +import org.apache.asterix.common.exceptions.ReplicationException; +import org.apache.asterix.replication.messaging.ReplicationProtocol; +import org.apache.asterix.replication.management.NetworkingUtil; +import org.apache.asterix.replication.messaging.DeleteFileTask; +import org.apache.asterix.replication.messaging.ReplicateFileTask; +import org.apache.asterix.replication.api.PartitionReplica; +import org.apache.hyracks.api.io.FileReference; +import org.apache.hyracks.api.io.IIOManager; + +public class FileSynchronizer { + + private final INcApplicationContext appCtx; + private final PartitionReplica replica; + + public FileSynchronizer(INcApplicationContext appCtx, PartitionReplica replica) { + this.appCtx = appCtx; + this.replica = replica; + } + + public void replicate(String file) { + replicate(file, false); + } + + public void replicate(String file, boolean metadata) { + try { + final IIOManager ioManager = appCtx.getIoManager(); + final SocketChannel channel = replica.getChannel(); + final FileReference filePath = ioManager.resolve(file); + ReplicateFileTask task = new ReplicateFileTask(file, filePath.getFile().length(), metadata); + ReplicationProtocol.sendTo(replica, task); + // send the file itself + try (RandomAccessFile fromFile = new RandomAccessFile(filePath.getFile(), + "r"); FileChannel fileChannel = fromFile.getChannel()) { + NetworkingUtil.sendFile(fileChannel, channel); + } + ReplicationProtocol.waitForAck(replica); + } catch (IOException e) { + throw new ReplicationException(e); + } + } + + public void delete(String file) { + try { + final DeleteFileTask task = new DeleteFileTask(file); + ReplicationProtocol.sendTo(replica, task); + ReplicationProtocol.waitForAck(replica); + } catch (IOException e) { + throw new ReplicationException(e); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/IndexSynchronizer.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/IndexSynchronizer.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/IndexSynchronizer.java new file mode 100644 index 0000000..74f38e2 --- /dev/null +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/IndexSynchronizer.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.replication.sync; + +import static org.apache.hyracks.api.replication.IReplicationJob.ReplicationOperation.DELETE; +import static org.apache.hyracks.api.replication.IReplicationJob.ReplicationOperation.REPLICATE; + +import java.io.IOException; +import java.nio.file.Paths; + +import org.apache.asterix.common.api.INcApplicationContext; +import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback; +import org.apache.asterix.common.storage.ResourceReference; +import org.apache.asterix.common.utils.StoragePathUtil; +import org.apache.asterix.replication.api.PartitionReplica; +import org.apache.asterix.replication.messaging.ComponentMaskTask; +import org.apache.asterix.replication.messaging.DropIndexTask; +import org.apache.asterix.replication.messaging.MarkComponentValidTask; +import org.apache.asterix.replication.messaging.ReplicationProtocol; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.replication.IReplicationJob; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexReplicationJob; +import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType; +import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public class IndexSynchronizer { + + private static final Logger LOGGER = LogManager.getLogger(); + private final IReplicationJob job; + private final INcApplicationContext appCtx; + + public IndexSynchronizer(IReplicationJob job, INcApplicationContext appCtx) { + this.job = job; + this.appCtx = appCtx; + } + + public void sync(PartitionReplica replica) throws IOException { + switch (job.getJobType()) { + case LSM_COMPONENT: + syncComponent(replica); + break; + case METADATA: + syncMetadata(replica); + break; + default: + throw new IllegalStateException("unrecognized job type: " + job.getJobType().name()); + } + } + + private void syncComponent(PartitionReplica replica) throws IOException { + if (job.getOperation() == REPLICATE) { + replicateComponent(replica); + } else if (job.getOperation() == DELETE) { + deleteComponent(replica); + } + } + + private void syncMetadata(PartitionReplica replica) throws IOException { + if (job.getOperation() == REPLICATE) { + replicateIndexMetadata(replica); + } else if (job.getOperation() == DELETE) { + deleteIndexMetadata(replica); + } + } + + private void replicateComponent(PartitionReplica replica) throws IOException { + // send component header + final String anyFile = job.getAnyFile(); + final String lsmComponentID = getComponentId(anyFile); + final String indexFile = StoragePathUtil.getFileRelativePath(anyFile); + final ComponentMaskTask maskTask = new ComponentMaskTask(indexFile, lsmComponentID); + ReplicationProtocol.sendTo(replica, maskTask); + ReplicationProtocol.waitForAck(replica); + // send component files + final FileSynchronizer fileSynchronizer = new FileSynchronizer(appCtx, replica); + job.getJobFiles().stream().map(StoragePathUtil::getFileRelativePath).forEach(fileSynchronizer::replicate); + // send mark component valid + MarkComponentValidTask markValidTask = new MarkComponentValidTask(indexFile, getReplicatedComponentLsn()); + ReplicationProtocol.sendTo(replica, markValidTask); + ReplicationProtocol.waitForAck(replica); + LOGGER.debug("Replicated component ({}) to replica {}", indexFile, replica); + } + + private void deleteComponent(PartitionReplica replica) { + FileSynchronizer fileSynchronizer = new FileSynchronizer(appCtx, replica); + job.getJobFiles().stream().map(StoragePathUtil::getFileRelativePath).forEach(fileSynchronizer::delete); + } + + private void replicateIndexMetadata(PartitionReplica replica) { + // send the index metadata file + final FileSynchronizer fileSynchronizer = new FileSynchronizer(appCtx, replica); + job.getJobFiles().stream().map(StoragePathUtil::getFileRelativePath) + .forEach(file -> fileSynchronizer.replicate(file, true)); + } + + private void deleteIndexMetadata(PartitionReplica replica) throws IOException { + final String file = StoragePathUtil.getFileRelativePath(job.getAnyFile()); + final DropIndexTask task = new DropIndexTask(file); + ReplicationProtocol.sendTo(replica, task); + ReplicationProtocol.waitForAck(replica); + } + + private long getReplicatedComponentLsn() throws HyracksDataException { + final ILSMIndexReplicationJob indexReplJob = (ILSMIndexReplicationJob) job; + if (indexReplJob.getLSMOpType() != LSMOperationType.FLUSH) { + return AbstractLSMIOOperationCallback.INVALID; + } + final ILSMIndex lsmIndex = indexReplJob.getLSMIndex(); + final ILSMIndexOperationContext ctx = indexReplJob.getLSMIndexOperationContext(); + return ((AbstractLSMIOOperationCallback) lsmIndex.getIOOperationCallback()) + .getComponentLSN(ctx.getComponentsToBeReplicated()); + } + + private static String getComponentId(String filePath) { + final ResourceReference ref = ResourceReference.of(filePath); + final String fileUniqueTimestamp = + ref.getName().substring(0, ref.getName().lastIndexOf(AbstractLSMIndexFileManager.DELIMITER)); + return Paths.get(ref.getRelativePath().toString(), fileUniqueTimestamp).toString(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java new file mode 100644 index 0000000..5658779 --- /dev/null +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.replication.sync; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.SocketChannel; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +import org.apache.asterix.common.api.INcApplicationContext; +import org.apache.asterix.common.utils.StoragePathUtil; +import org.apache.asterix.replication.api.PartitionReplica; +import org.apache.asterix.replication.messaging.PartitionResourcesListResponse; +import org.apache.asterix.replication.messaging.PartitionResourcesListTask; +import org.apache.asterix.replication.messaging.ReplicationProtocol; +import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository; + +/** + * Ensures that the files between master and a replica are synchronized + */ +public class ReplicaFilesSynchronizer { + + private final PartitionReplica replica; + private final INcApplicationContext appCtx; + + public ReplicaFilesSynchronizer(INcApplicationContext appCtx, PartitionReplica replica) { + this.appCtx = appCtx; + this.replica = replica; + } + + public void sync() throws IOException { + final int partition = replica.getIdentifier().getPartition(); + final Set<String> replicaFiles = getReplicaFiles(partition); + final PersistentLocalResourceRepository localResourceRepository = + (PersistentLocalResourceRepository) appCtx.getLocalResourceRepository(); + final Set<String> masterFiles = localResourceRepository.getPartitionIndexesFiles(partition).stream() + .map(StoragePathUtil::getFileRelativePath).collect(Collectors.toSet()); + // find files on master and not on replica + final List<String> replicaMissingFiles = + masterFiles.stream().filter(file -> !replicaFiles.contains(file)).collect(Collectors.toList()); + replicateMissingFiles(replicaMissingFiles); + // find files on replica and not on master + final List<String> replicaInvalidFiles = + replicaFiles.stream().filter(file -> !masterFiles.contains(file)).collect(Collectors.toList()); + deleteInvalidFiles(replicaInvalidFiles); + } + + private Set<String> getReplicaFiles(int partition) throws IOException { + final PartitionResourcesListTask replicaFilesRequest = new PartitionResourcesListTask(partition); + final SocketChannel channel = replica.getChannel(); + final ByteBuffer reusableBuffer = replica.getReusableBuffer(); + ReplicationProtocol.sendTo(replica, replicaFilesRequest); + final PartitionResourcesListResponse response = + (PartitionResourcesListResponse) ReplicationProtocol.read(channel, reusableBuffer); + return new HashSet<>(response.getResources()); + } + + private void replicateMissingFiles(List<String> files) { + final FileSynchronizer sync = new FileSynchronizer(appCtx, replica); + files.forEach(sync::replicate); + } + + private void deleteInvalidFiles(List<String> files) { + final FileSynchronizer sync = new FileSynchronizer(appCtx, replica); + files.forEach(sync::delete); + } +} \ No newline at end of file
