Repository: asterixdb Updated Branches: refs/heads/master 54249a8a9 -> cc7d2f0ce
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/cc7d2f0c/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java index 220b089..f59914d 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java @@ -35,6 +35,7 @@ public class StorageConstants { */ public static final String INDEX_CHECKPOINT_FILE_PREFIX = ".idx_checkpoint_"; public static final String METADATA_FILE_NAME = ".metadata"; + public static final String MASK_FILE_PREFIX = ".mask_"; public static final String LEGACY_DATASET_INDEX_NAME_SEPARATOR = "_idx_"; /** http://git-wip-us.apache.org/repos/asf/asterixdb/blob/cc7d2f0c/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java index b93ccb5..d2c5ad7 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java @@ -90,13 +90,21 @@ public class StoragePathUtil { /** * @param fileAbsolutePath - * @return the file relative path starting from the partition directory + * @return the file's index relative path starting from the storage directory */ public static String getIndexFileRelativePath(String fileAbsolutePath) { return ResourceReference.of(fileAbsolutePath).getRelativePath().toString(); } /** + * @param fileAbsolutePath + * @return the file's relative path starting from the storage directory + */ + public static String getFileRelativePath(String fileAbsolutePath) { + return ResourceReference.of(fileAbsolutePath).getFileRelativePath().toString(); + } + + /** * Create a file * Note: this method is not thread safe. It is the responsibility of the caller to ensure no path conflict when * creating files simultaneously http://git-wip-us.apache.org/repos/asf/asterixdb/blob/cc7d2f0c/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/IReplicaTask.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/IReplicaTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/IReplicaTask.java new file mode 100644 index 0000000..001d41f --- /dev/null +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/IReplicaTask.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.replication.api; + +import org.apache.asterix.common.api.INcApplicationContext; +import org.apache.asterix.common.replication.IReplicationThread; +import org.apache.hyracks.api.exceptions.HyracksDataException; + +public interface IReplicaTask extends IReplicationMessage { + + /** + * Performs the task on the replica + * + * @param appCtx + * @param worker + * @throws HyracksDataException + */ + void perform(INcApplicationContext appCtx, IReplicationThread worker) throws HyracksDataException; +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/cc7d2f0c/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/IReplicationMessage.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/IReplicationMessage.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/IReplicationMessage.java new file mode 100644 index 0000000..2e1cb8a --- /dev/null +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/IReplicationMessage.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.replication.api; + +import java.io.OutputStream; + +import org.apache.asterix.replication.functions.ReplicationProtocol; +import org.apache.hyracks.api.exceptions.HyracksDataException; + +public interface IReplicationMessage { + + /** + * @return the message type + */ + ReplicationProtocol.ReplicationRequestType getMessageType(); + + /** + * Serializes {@link IReplicationMessage} to {@code out} + * + * @param out + * @throws HyracksDataException + */ + void serialize(OutputStream out) throws HyracksDataException; +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/cc7d2f0c/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/functions/ReplicationProtocol.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/functions/ReplicationProtocol.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/functions/ReplicationProtocol.java index 8a52529..8094548 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/functions/ReplicationProtocol.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/functions/ReplicationProtocol.java @@ -25,10 +25,18 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; +import org.apache.asterix.common.exceptions.ReplicationException; import org.apache.asterix.common.replication.ReplicaEvent; +import org.apache.asterix.replication.api.IReplicationMessage; import org.apache.asterix.replication.management.NetworkingUtil; +import org.apache.asterix.replication.messaging.CheckpointPartitionIndexesTask; +import org.apache.asterix.replication.messaging.DeleteFileTask; +import org.apache.asterix.replication.messaging.PartitionResourcesListResponse; +import org.apache.asterix.replication.messaging.PartitionResourcesListTask; +import org.apache.asterix.replication.messaging.ReplicateFileTask; import org.apache.asterix.replication.storage.LSMComponentProperties; import org.apache.asterix.replication.storage.LSMIndexFileProperties; +import org.apache.asterix.replication.storage.PartitionReplica; import org.apache.hyracks.data.std.util.ExtendedByteArrayOutputStream; public class ReplicationProtocol { @@ -38,8 +46,8 @@ public class ReplicationProtocol { */ public static final String JOB_REPLICATION_ACK = "$"; - public final static int REPLICATION_REQUEST_TYPE_SIZE = Integer.BYTES; - public final static int REPLICATION_REQUEST_HEADER_SIZE = REPLICATION_REQUEST_TYPE_SIZE + Integer.BYTES; + public static final int REPLICATION_REQUEST_TYPE_SIZE = Integer.BYTES; + private static final int REPLICATION_REQUEST_HEADER_SIZE = REPLICATION_REQUEST_TYPE_SIZE + Integer.BYTES; /* * ReplicationRequestType: @@ -64,7 +72,12 @@ public class ReplicationProtocol { REPLICA_EVENT, LSM_COMPONENT_PROPERTIES, ACK, - FLUSH_INDEX + FLUSH_INDEX, + PARTITION_RESOURCES_REQUEST, + PARTITION_RESOURCES_RESPONSE, + REPLICATE_RESOURCE_FILE, + DELETE_RESOURCE_FILE, + CHECKPOINT_PARTITION } public static ByteBuffer readRequest(SocketChannel socketChannel, ByteBuffer dataBuffer) throws IOException { @@ -256,4 +269,86 @@ public class ReplicationProtocol { ByteBuffer ackBuffer = ReplicationProtocol.getAckBuffer(); NetworkingUtil.transferBufferToChannel(socketChannel, ackBuffer); } -} + + public static void sendAck(SocketChannel socketChannel, ByteBuffer buf) { + try { + buf.clear(); + buf.putInt(ReplicationRequestType.ACK.ordinal()); + buf.flip(); + NetworkingUtil.transferBufferToChannel(socketChannel, buf); + } catch (IOException e) { + throw new ReplicationException(e); + } + } + + public static void waitForAck(PartitionReplica replica) throws IOException { + final SocketChannel channel = replica.getChannel(); + final ByteBuffer buf = replica.gerReusableBuffer(); + ReplicationRequestType responseFunction = ReplicationProtocol.getRequestType(channel, buf); + if (responseFunction != ReplicationRequestType.ACK) { + throw new IllegalStateException("Unexpected response while waiting for ack."); + } + } + + public static void sendTo(PartitionReplica replica, IReplicationMessage task) { + final SocketChannel channel = replica.getChannel(); + final ByteBuffer buf = replica.gerReusableBuffer(); + sendTo(channel, task, buf); + } + + public static void sendTo(SocketChannel channel, IReplicationMessage task, ByteBuffer buf) { + ExtendedByteArrayOutputStream outputStream = new ExtendedByteArrayOutputStream(); + try (DataOutputStream oos = new DataOutputStream(outputStream)) { + task.serialize(oos); + final int requestSize = REPLICATION_REQUEST_HEADER_SIZE + oos.size(); + final ByteBuffer requestBuffer = ensureSize(buf, requestSize); + requestBuffer.putInt(task.getMessageType().ordinal()); + requestBuffer.putInt(oos.size()); + requestBuffer.put(outputStream.getByteArray(), 0, outputStream.getLength()); + requestBuffer.flip(); + NetworkingUtil.transferBufferToChannel(channel, requestBuffer); + } catch (IOException e) { + throw new ReplicationException(e); + } + } + + public static IReplicationMessage read(SocketChannel socketChannel, ByteBuffer buffer) throws IOException { + final ReplicationRequestType type = getRequestType(socketChannel, buffer); + return readMessage(type, socketChannel, buffer); + } + + public static IReplicationMessage readMessage(ReplicationRequestType type, SocketChannel socketChannel, + ByteBuffer buffer) { + try { + ReplicationProtocol.readRequest(socketChannel, buffer); + final ByteArrayInputStream input = + new ByteArrayInputStream(buffer.array(), buffer.position(), buffer.limit()); + try (DataInputStream dis = new DataInputStream(input)) { + switch (type) { + case PARTITION_RESOURCES_REQUEST: + return PartitionResourcesListTask.create(dis); + case PARTITION_RESOURCES_RESPONSE: + return PartitionResourcesListResponse.create(dis); + case REPLICATE_RESOURCE_FILE: + return ReplicateFileTask.create(dis); + case DELETE_RESOURCE_FILE: + return DeleteFileTask.create(dis); + case CHECKPOINT_PARTITION: + return CheckpointPartitionIndexesTask.create(dis); + default: + throw new IllegalStateException("Unrecognized replication message"); + } + } + } catch (IOException e) { + throw new ReplicationException(e); + } + } + + private static ByteBuffer ensureSize(ByteBuffer buffer, int size) { + if (buffer.capacity() < size) { + return ByteBuffer.allocate(size); + } + buffer.clear(); + return buffer; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/cc7d2f0c/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java index dfc23d3..ba24e07 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java @@ -24,10 +24,10 @@ import java.io.RandomAccessFile; import java.net.InetAddress; import java.net.InetSocketAddress; import java.nio.ByteBuffer; +import java.nio.channels.AsynchronousCloseException; import java.nio.channels.FileChannel; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; -import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -71,6 +71,10 @@ import org.apache.asterix.replication.functions.ReplicaIndexFlushRequest; import org.apache.asterix.replication.functions.ReplicationProtocol; import org.apache.asterix.replication.functions.ReplicationProtocol.ReplicationRequestType; import org.apache.asterix.replication.logging.RemoteLogMapping; +import org.apache.asterix.replication.messaging.CheckpointPartitionIndexesTask; +import org.apache.asterix.replication.messaging.DeleteFileTask; +import org.apache.asterix.replication.messaging.PartitionResourcesListTask; +import org.apache.asterix.replication.messaging.ReplicateFileTask; import org.apache.asterix.replication.storage.LSMComponentLSNSyncTask; import org.apache.asterix.replication.storage.LSMComponentProperties; import org.apache.asterix.replication.storage.LSMIndexFileProperties; @@ -108,12 +112,12 @@ public class ReplicationChannel extends Thread implements IReplicationChannel { private final Map<Long, RemoteLogMapping> localLsn2RemoteMapping; private final Map<String, RemoteLogMapping> replicaUniqueLSN2RemoteMapping; private final LSMComponentsSyncService lsmComponentLSNMappingService; - private final Set<Integer> nodeHostedPartitions; private final ReplicationNotifier replicationNotifier; private final Object flushLogslock = new Object(); private final IDatasetLifecycleManager dsLifecycleManager; private final PersistentLocalResourceRepository localResourceRep; private final IIndexCheckpointManagerProvider indexCheckpointManagerProvider; + private final INcApplicationContext appCtx; public ReplicationChannel(String nodeId, ReplicationProperties replicationProperties, ILogManager logManager, IReplicaResourcesManager replicaResoucesManager, IReplicationManager replicationManager, @@ -135,19 +139,8 @@ public class ReplicationChannel extends Thread implements IReplicationChannel { lsmComponentLSNMappingService = new LSMComponentsSyncService(); replicationNotifier = new ReplicationNotifier(); replicationThreads = Executors.newCachedThreadPool(ncServiceContext.getThreadFactory()); - Map<String, ClusterPartition[]> nodePartitions = - asterixAppRuntimeContextProvider.getAppContext().getMetadataProperties().getNodePartitions(); - Set<String> nodeReplicationClients = replicationProperties.getRemotePrimaryReplicasIds(nodeId); - List<Integer> clientsPartitions = new ArrayList<>(); - for (String clientId : nodeReplicationClients) { - for (ClusterPartition clusterPartition : nodePartitions.get(clientId)) { - clientsPartitions.add(clusterPartition.getPartitionId()); - } - } - nodeHostedPartitions = new HashSet<>(clientsPartitions.size()); - nodeHostedPartitions.addAll(clientsPartitions); - this.indexCheckpointManagerProvider = - ((INcApplicationContext) ncServiceContext.getApplicationContext()).getIndexCheckpointManagerProvider(); + this.appCtx = (INcApplicationContext) ncServiceContext.getApplicationContext(); + this.indexCheckpointManagerProvider = appCtx.getIndexCheckpointManagerProvider(); } @Override @@ -167,12 +160,14 @@ public class ReplicationChannel extends Thread implements IReplicationChannel { LOGGER.log(Level.INFO, "opened Replication Channel @ IP Address: " + nodeIP + ":" + dataPort); //start accepting replication requests - while (true) { + while (serverSocketChannel.isOpen()) { SocketChannel socketChannel = serverSocketChannel.accept(); socketChannel.configureBlocking(true); //start a new thread to handle the request replicationThreads.execute(new ReplicationThread(socketChannel)); } + } catch (AsynchronousCloseException e) { + LOGGER.log(Level.WARNING, "Replication channel closed", e); } catch (IOException e) { throw new IllegalStateException( "Could not open replication channel @ IP Address: " + nodeIP + ":" + dataPort, e); @@ -209,10 +204,8 @@ public class ReplicationChannel extends Thread implements IReplicationChannel { @Override public void close() throws IOException { - if (!serverSocketChannel.isOpen()) { - serverSocketChannel.close(); - LOGGER.log(Level.INFO, "Replication channel closed."); - } + serverSocketChannel.close(); + LOGGER.log(Level.INFO, "Replication channel closed."); } /** @@ -263,6 +256,18 @@ public class ReplicationChannel extends Thread implements IReplicationChannel { case FLUSH_INDEX: handleFlushIndex(); break; + case PARTITION_RESOURCES_REQUEST: + handleGetPartitionResources(); + break; + case REPLICATE_RESOURCE_FILE: + handleReplicateResourceFile(); + break; + case DELETE_RESOURCE_FILE: + handleDeleteResourceFile(); + break; + case CHECKPOINT_PARTITION: + handleCheckpointPartition(); + break; default: throw new IllegalStateException("Unknown replication request"); } @@ -476,10 +481,7 @@ public class ReplicationChannel extends Thread implements IReplicationChannel { switch (remoteLog.getLogType()) { case LogType.UPDATE: case LogType.ENTITY_COMMIT: - //if the log partition belongs to a partitions hosted on this node, replicated it - if (nodeHostedPartitions.contains(remoteLog.getResourcePartition())) { - logManager.log(remoteLog); - } + logManager.log(remoteLog); break; case LogType.JOB_COMMIT: case LogType.ABORT: @@ -542,10 +544,15 @@ public class ReplicationChannel extends Thread implements IReplicationChannel { } @Override - public SocketChannel getReplicationClientSocket() { + public SocketChannel getChannel() { return socketChannel; } + @Override + public ByteBuffer getReusableBuffer() { + return outBuffer; + } + private void checkpointReplicaIndexes(RemoteLogMapping remoteLogMapping, int datasetId) { try { Predicate<LocalResource> replicaIndexesPredicate = lr -> { @@ -568,6 +575,30 @@ public class ReplicationChannel extends Thread implements IReplicationChannel { LOGGER.log(Level.SEVERE, "Failed to checkpoint replica indexes", e); } } + + private void handleGetPartitionResources() throws IOException { + final PartitionResourcesListTask task = (PartitionResourcesListTask) ReplicationProtocol + .readMessage(ReplicationRequestType.PARTITION_RESOURCES_REQUEST, socketChannel, inBuffer); + task.perform(appCtx, this); + } + + private void handleReplicateResourceFile() throws HyracksDataException { + ReplicateFileTask task = (ReplicateFileTask) ReplicationProtocol + .readMessage(ReplicationRequestType.REPLICATE_RESOURCE_FILE, socketChannel, inBuffer); + task.perform(appCtx, this); + } + + private void handleDeleteResourceFile() throws HyracksDataException { + DeleteFileTask task = (DeleteFileTask) ReplicationProtocol + .readMessage(ReplicationRequestType.DELETE_RESOURCE_FILE, socketChannel, inBuffer); + task.perform(appCtx, this); + } + + private void handleCheckpointPartition() throws HyracksDataException { + CheckpointPartitionIndexesTask task = (CheckpointPartitionIndexesTask) ReplicationProtocol + .readMessage(ReplicationRequestType.CHECKPOINT_PARTITION, socketChannel, inBuffer); + task.perform(appCtx, this); + } } /** @@ -581,7 +612,7 @@ public class ReplicationChannel extends Thread implements IReplicationChannel { try { LogRecord logRecord = pendingNotificationRemoteLogsQ.take(); //send ACK to requester - logRecord.getReplicationThread().getReplicationClientSocket().socket().getOutputStream() + logRecord.getReplicationThread().getChannel().socket().getOutputStream() .write((localNodeID + ReplicationProtocol.JOB_REPLICATION_ACK + logRecord.getTxnId() + System.lineSeparator()).getBytes()); } catch (InterruptedException e) { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/cc7d2f0c/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java index 5cf7eab..b933db8 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java @@ -40,6 +40,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; @@ -53,7 +54,9 @@ import java.util.logging.Logger; import java.util.stream.Collectors; import org.apache.asterix.common.cluster.ClusterPartition; +import org.apache.asterix.common.config.ClusterProperties; import org.apache.asterix.common.config.ReplicationProperties; +import org.apache.asterix.common.replication.IPartitionReplica; import org.apache.asterix.common.replication.IReplicaResourcesManager; import org.apache.asterix.common.replication.IReplicationManager; import org.apache.asterix.common.replication.IReplicationStrategy; @@ -1177,6 +1180,25 @@ public class ReplicationManager implements IReplicationManager { buffer.position(buffer.limit()); } + @Override + public void register(IPartitionReplica replica) { + // find the replica node based on ip and replication port + final Optional<Node> replicaNode = ClusterProperties.INSTANCE.getCluster().getNode().stream() + .filter(node -> node.getClusterIp().equals(replica.getIdentifier().getLocation().getHostString()) + && node.getReplicationPort().intValue() == replica.getIdentifier().getLocation().getPort()) + .findAny(); + if (!replicaNode.isPresent()) { + throw new IllegalStateException("Couldn't find node for replica: " + replica); + } + Replica replicaRef = new Replica(replicaNode.get()); + final String replicaId = replicaRef.getId(); + replicas.putIfAbsent(replicaId, replicaRef); + replica2PartitionsMap.computeIfAbsent(replicaId, k -> new HashSet<>()); + replica2PartitionsMap.get(replicaId).add(replica.getIdentifier().getPartition()); + updateReplicaInfo(replicaRef); + checkReplicaState(replicaId, false, true); + } + //supporting classes /** * This class is responsible for processing replica events. http://git-wip-us.apache.org/repos/asf/asterixdb/blob/cc7d2f0c/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java new file mode 100644 index 0000000..2c1937b --- /dev/null +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.replication.messaging; + +import java.io.DataInput; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.util.Collection; + +import org.apache.asterix.common.api.INcApplicationContext; +import org.apache.asterix.common.replication.IReplicationThread; +import org.apache.asterix.common.storage.DatasetResourceReference; +import org.apache.asterix.common.storage.IIndexCheckpointManager; +import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider; +import org.apache.asterix.replication.api.IReplicaTask; +import org.apache.asterix.replication.functions.ReplicationProtocol; +import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.storage.common.LocalResource; + +/** + * A task to initialize the checkpoints for all indexes in a partition with the replica's current LSN + */ +public class CheckpointPartitionIndexesTask implements IReplicaTask { + + private final int partition; + + public CheckpointPartitionIndexesTask(int partition) { + this.partition = partition; + } + + @Override + public void perform(INcApplicationContext appCtx, IReplicationThread worker) throws HyracksDataException { + final IIndexCheckpointManagerProvider indexCheckpointManagerProvider = + appCtx.getIndexCheckpointManagerProvider(); + PersistentLocalResourceRepository resRepo = + (PersistentLocalResourceRepository) appCtx.getLocalResourceRepository(); + final Collection<LocalResource> partitionResources = resRepo.getPartitionResources(partition).values(); + final long currentLSN = appCtx.getTransactionSubsystem().getLogManager().getAppendLSN(); + for (LocalResource ls : partitionResources) { + final IIndexCheckpointManager indexCheckpointManager = + indexCheckpointManagerProvider.get(DatasetResourceReference.of(ls)); + indexCheckpointManager.delete(); + indexCheckpointManager.init(currentLSN); + } + ReplicationProtocol.sendAck(worker.getChannel(), worker.getReusableBuffer()); + } + + @Override + public ReplicationProtocol.ReplicationRequestType getMessageType() { + return ReplicationProtocol.ReplicationRequestType.CHECKPOINT_PARTITION; + } + + @Override + public void serialize(OutputStream out) throws HyracksDataException { + try { + DataOutputStream dos = new DataOutputStream(out); + dos.writeInt(partition); + } catch (IOException e) { + throw HyracksDataException.create(e); + } + } + + public static CheckpointPartitionIndexesTask create(DataInput input) throws HyracksDataException { + try { + int partition = input.readInt(); + return new CheckpointPartitionIndexesTask(partition); + } catch (IOException e) { + throw HyracksDataException.create(e); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/cc7d2f0c/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/DeleteFileTask.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/DeleteFileTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/DeleteFileTask.java new file mode 100644 index 0000000..ea43ee9 --- /dev/null +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/DeleteFileTask.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.replication.messaging; + +import java.io.DataInput; +import java.io.DataOutputStream; +import java.io.File; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.file.Files; +import java.util.logging.Logger; + +import org.apache.asterix.common.api.INcApplicationContext; +import org.apache.asterix.common.exceptions.ReplicationException; +import org.apache.asterix.common.replication.IReplicationThread; +import org.apache.asterix.replication.api.IReplicaTask; +import org.apache.asterix.replication.functions.ReplicationProtocol; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.io.IIOManager; + +/** + * A task to delete a file on a replica if exists + */ +public class DeleteFileTask implements IReplicaTask { + + private static final Logger LOGGER = Logger.getLogger(DeleteFileTask.class.getName()); + private final String file; + + public DeleteFileTask(String file) { + this.file = file; + } + + @Override + public void perform(INcApplicationContext appCtx, IReplicationThread worker) { + try { + final IIOManager ioManager = appCtx.getIoManager(); + final File localFile = ioManager.resolve(file).getFile(); + if (localFile.exists()) { + Files.delete(localFile.toPath()); + LOGGER.info(() -> "Deleted file: " + localFile.getAbsolutePath()); + } else { + LOGGER.warning(() -> "Requested to delete a non-existing file: " + localFile.getAbsolutePath()); + } + ReplicationProtocol.sendAck(worker.getChannel(), worker.getReusableBuffer()); + } catch (IOException e) { + throw new ReplicationException(e); + } + } + + @Override + public ReplicationProtocol.ReplicationRequestType getMessageType() { + return ReplicationProtocol.ReplicationRequestType.DELETE_RESOURCE_FILE; + } + + @Override + public void serialize(OutputStream out) throws HyracksDataException { + try { + DataOutputStream dos = new DataOutputStream(out); + dos.writeUTF(file); + } catch (IOException e) { + throw HyracksDataException.create(e); + } + } + + public static DeleteFileTask create(DataInput input) throws IOException { + return new DeleteFileTask(input.readUTF()); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/cc7d2f0c/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListResponse.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListResponse.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListResponse.java new file mode 100644 index 0000000..85b7bb9 --- /dev/null +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListResponse.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.replication.messaging; + +import java.io.DataInput; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.List; + +import org.apache.asterix.replication.api.IReplicationMessage; +import org.apache.asterix.replication.functions.ReplicationProtocol; +import org.apache.hyracks.api.exceptions.HyracksDataException; + +public class PartitionResourcesListResponse implements IReplicationMessage { + + private final int partition; + private final List<String> resources; + + public PartitionResourcesListResponse(int partition, List<String> resources) { + this.partition = partition; + this.resources = resources; + } + + @Override + public ReplicationProtocol.ReplicationRequestType getMessageType() { + return ReplicationProtocol.ReplicationRequestType.PARTITION_RESOURCES_RESPONSE; + } + + @Override + public void serialize(OutputStream out) throws HyracksDataException { + try { + DataOutputStream dos = new DataOutputStream(out); + dos.writeInt(partition); + dos.writeInt(resources.size()); + for (String file : resources) { + dos.writeUTF(file); + } + } catch (IOException e) { + throw HyracksDataException.create(e); + } + } + + public List<String> getResources() { + return resources; + } + + public static PartitionResourcesListResponse create(DataInput input) throws IOException { + int partition = input.readInt(); + int size = input.readInt(); + List<String> resources = new ArrayList<>(); + for (int i = 0; i < size; i++) { + resources.add(input.readUTF()); + } + return new PartitionResourcesListResponse(partition, resources); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/cc7d2f0c/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListTask.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListTask.java new file mode 100644 index 0000000..b2b8ac6 --- /dev/null +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListTask.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.replication.messaging; + +import java.io.DataInput; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.util.List; +import java.util.stream.Collectors; + +import org.apache.asterix.common.api.INcApplicationContext; +import org.apache.asterix.common.replication.IReplicationThread; +import org.apache.asterix.common.utils.StoragePathUtil; +import org.apache.asterix.replication.api.IReplicaTask; +import org.apache.asterix.replication.functions.ReplicationProtocol; +import org.apache.hyracks.api.exceptions.HyracksDataException; + +/** + * A task to get the list of the files in a partition on a replica + */ +public class PartitionResourcesListTask implements IReplicaTask { + + private final int partition; + + public PartitionResourcesListTask(int partition) { + this.partition = partition; + } + + @Override + public void perform(INcApplicationContext appCtx, IReplicationThread worker) throws HyracksDataException { + //TODO delete any invalid files with masks + final List<String> partitionResources = + appCtx.getReplicaResourcesManager().getPartitionIndexesFiles(partition, false).stream() + .map(StoragePathUtil::getFileRelativePath).collect(Collectors.toList()); + final PartitionResourcesListResponse response = + new PartitionResourcesListResponse(partition, partitionResources); + ReplicationProtocol.sendTo(worker.getChannel(), response, worker.getReusableBuffer()); + } + + @Override + public ReplicationProtocol.ReplicationRequestType getMessageType() { + return ReplicationProtocol.ReplicationRequestType.PARTITION_RESOURCES_REQUEST; + } + + @Override + public void serialize(OutputStream out) throws HyracksDataException { + try { + DataOutputStream dos = new DataOutputStream(out); + dos.writeInt(partition); + } catch (IOException e) { + throw HyracksDataException.create(e); + } + } + + public static PartitionResourcesListTask create(DataInput input) throws HyracksDataException { + try { + int partition = input.readInt(); + return new PartitionResourcesListTask(partition); + } catch (IOException e) { + throw HyracksDataException.create(e); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/cc7d2f0c/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateFileTask.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateFileTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateFileTask.java new file mode 100644 index 0000000..14e9180 --- /dev/null +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateFileTask.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.replication.messaging; + +import java.io.DataInput; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.io.RandomAccessFile; +import java.nio.channels.FileChannel; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.logging.Logger; + +import org.apache.asterix.common.api.INcApplicationContext; +import org.apache.asterix.common.exceptions.ReplicationException; +import org.apache.asterix.common.replication.IReplicationThread; +import org.apache.asterix.common.utils.StorageConstants; +import org.apache.asterix.replication.api.IReplicaTask; +import org.apache.asterix.replication.functions.ReplicationProtocol; +import org.apache.asterix.replication.management.NetworkingUtil; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.io.FileReference; +import org.apache.hyracks.api.io.IIOManager; + +/** + * A task to replicate a file from a master replica + */ +public class ReplicateFileTask implements IReplicaTask { + + private static final Logger LOGGER = Logger.getLogger(DeleteFileTask.class.getName()); + private final String file; + private final long size; + + public ReplicateFileTask(String file, long size) { + this.file = file; + this.size = size; + } + + @Override + public void perform(INcApplicationContext appCtx, IReplicationThread worker) throws HyracksDataException { + try { + final IIOManager ioManager = appCtx.getIoManager(); + // resolve path + final FileReference localPath = ioManager.resolve(file); + final Path resourceDir = Files.createDirectories(localPath.getFile().getParentFile().toPath()); + // create mask + final Path maskPath = Paths.get(resourceDir.toString(), + StorageConstants.MASK_FILE_PREFIX + localPath.getFile().getName()); + Files.createFile(maskPath); + + // receive actual file + final Path filePath = Paths.get(resourceDir.toString(), localPath.getFile().getName()); + Files.createFile(filePath); + try (RandomAccessFile fileOutputStream = new RandomAccessFile(filePath.toFile(), + "rw"); FileChannel fileChannel = fileOutputStream.getChannel()) { + fileOutputStream.setLength(size); + NetworkingUtil.downloadFile(fileChannel, worker.getChannel()); + fileChannel.force(true); + } + //delete mask + Files.delete(maskPath); + LOGGER.info(() -> "Replicated file: " + localPath); + ReplicationProtocol.sendAck(worker.getChannel(), worker.getReusableBuffer()); + } catch (IOException e) { + throw new ReplicationException(e); + } + } + + @Override + public ReplicationProtocol.ReplicationRequestType getMessageType() { + return ReplicationProtocol.ReplicationRequestType.REPLICATE_RESOURCE_FILE; + } + + @Override + public void serialize(OutputStream out) throws HyracksDataException { + try { + DataOutputStream dos = new DataOutputStream(out); + dos.writeUTF(file); + dos.writeLong(size); + } catch (IOException e) { + throw HyracksDataException.create(e); + } + } + + public static ReplicateFileTask create(DataInput input) throws IOException { + final String s = input.readUTF(); + final long i = input.readLong(); + return new ReplicateFileTask(s, i); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/cc7d2f0c/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/FileSynchronizer.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/FileSynchronizer.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/FileSynchronizer.java new file mode 100644 index 0000000..8aa4487 --- /dev/null +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/FileSynchronizer.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.replication.recovery; + +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.channels.FileChannel; +import java.nio.channels.SocketChannel; + +import org.apache.asterix.common.api.INcApplicationContext; +import org.apache.asterix.common.exceptions.ReplicationException; +import org.apache.asterix.replication.functions.ReplicationProtocol; +import org.apache.asterix.replication.management.NetworkingUtil; +import org.apache.asterix.replication.messaging.DeleteFileTask; +import org.apache.asterix.replication.messaging.ReplicateFileTask; +import org.apache.asterix.replication.storage.PartitionReplica; +import org.apache.hyracks.api.io.FileReference; +import org.apache.hyracks.api.io.IIOManager; + +public class FileSynchronizer { + + private final INcApplicationContext appCtx; + private final PartitionReplica replica; + + public FileSynchronizer(INcApplicationContext appCtx, PartitionReplica replica) { + this.appCtx = appCtx; + this.replica = replica; + } + + public void replicate(String file) { + try { + final IIOManager ioManager = appCtx.getIoManager(); + final SocketChannel channel = replica.getChannel(); + final FileReference filePath = ioManager.resolve(file); + ReplicateFileTask task = new ReplicateFileTask(file, filePath.getFile().length()); + ReplicationProtocol.sendTo(replica, task); + // send the file itself + try (RandomAccessFile fromFile = new RandomAccessFile(filePath.getFile(), + "r"); FileChannel fileChannel = fromFile.getChannel()) { + NetworkingUtil.sendFile(fileChannel, channel); + } + ReplicationProtocol.waitForAck(replica); + } catch (IOException e) { + throw new ReplicationException(e); + } + } + + public void delete(String file) { + try { + final DeleteFileTask task = new DeleteFileTask(file); + ReplicationProtocol.sendTo(replica, task); + ReplicationProtocol.waitForAck(replica); + } catch (IOException e) { + throw new ReplicationException(e); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/cc7d2f0c/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/ReplicaFilesSynchronizer.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/ReplicaFilesSynchronizer.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/ReplicaFilesSynchronizer.java new file mode 100644 index 0000000..2021cee --- /dev/null +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/ReplicaFilesSynchronizer.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.replication.recovery; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.SocketChannel; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +import org.apache.asterix.common.api.INcApplicationContext; +import org.apache.asterix.common.utils.StoragePathUtil; +import org.apache.asterix.replication.functions.ReplicationProtocol; +import org.apache.asterix.replication.messaging.PartitionResourcesListResponse; +import org.apache.asterix.replication.messaging.PartitionResourcesListTask; +import org.apache.asterix.replication.storage.PartitionReplica; + +/** + * Ensures that the files between master and a replica are synchronized + */ +public class ReplicaFilesSynchronizer { + + private final PartitionReplica replica; + private final INcApplicationContext appCtx; + + public ReplicaFilesSynchronizer(INcApplicationContext appCtx, PartitionReplica replica) { + this.appCtx = appCtx; + this.replica = replica; + } + + public void sync() throws IOException { + final int partition = replica.getIdentifier().getPartition(); + final Set<String> replicaFiles = getReplicaFiles(partition); + final Set<String> masterFiles = + appCtx.getReplicaResourcesManager().getPartitionIndexesFiles(partition, false).stream() + .map(StoragePathUtil::getFileRelativePath).collect(Collectors.toSet()); + // find files on master and not on replica + final List<String> replicaMissingFiles = + masterFiles.stream().filter(file -> !replicaFiles.contains(file)).collect(Collectors.toList()); + replicateMissingFiles(replicaMissingFiles); + // find files on replica and not on master + final List<String> replicaInvalidFiles = + replicaFiles.stream().filter(file -> !masterFiles.contains(file)).collect(Collectors.toList()); + deleteInvalidFiles(replicaInvalidFiles); + } + + private Set<String> getReplicaFiles(int partition) throws IOException { + final PartitionResourcesListTask replicaFilesRequest = new PartitionResourcesListTask(partition); + final SocketChannel channel = replica.getChannel(); + final ByteBuffer reusableBuffer = replica.gerReusableBuffer(); + ReplicationProtocol.sendTo(replica, replicaFilesRequest); + final PartitionResourcesListResponse response = + (PartitionResourcesListResponse) ReplicationProtocol.read(channel, reusableBuffer); + return new HashSet<>(response.getResources()); + } + + private void replicateMissingFiles(List<String> files) { + final FileSynchronizer sync = new FileSynchronizer(appCtx, replica); + files.forEach(sync::replicate); + } + + private void deleteInvalidFiles(List<String> files) { + final FileSynchronizer sync = new FileSynchronizer(appCtx, replica); + files.forEach(sync::delete); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/cc7d2f0c/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/ReplicaSynchronizer.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/ReplicaSynchronizer.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/ReplicaSynchronizer.java new file mode 100644 index 0000000..5c88460 --- /dev/null +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/ReplicaSynchronizer.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.replication.recovery; + +import java.io.IOException; + +import org.apache.asterix.common.api.INcApplicationContext; +import org.apache.asterix.common.config.ReplicationProperties; +import org.apache.asterix.common.replication.IReplicationStrategy; +import org.apache.asterix.replication.functions.ReplicationProtocol; +import org.apache.asterix.replication.messaging.CheckpointPartitionIndexesTask; +import org.apache.asterix.replication.storage.PartitionReplica; + +/** + * Performs the steps required to ensure any newly added replica + * will be in-sync with master + */ +public class ReplicaSynchronizer { + + private final INcApplicationContext appCtx; + private final PartitionReplica replica; + + public ReplicaSynchronizer(INcApplicationContext appCtx, PartitionReplica replica) { + this.appCtx = appCtx; + this.replica = replica; + } + + public void sync() throws IOException { + syncFiles(); + checkpointReplicaIndexes(); + appCtx.getReplicationManager().register(replica); + } + + private void syncFiles() throws IOException { + final ReplicaFilesSynchronizer fileSync = new ReplicaFilesSynchronizer(appCtx, replica); + fileSync.sync(); + // flush replicated dataset to generate disk component for any remaining in-memory components + final ReplicationProperties repl = appCtx.getReplicationProperties(); + final IReplicationStrategy replStrategy = repl.getReplicationStrategy(); + appCtx.getDatasetLifecycleManager().flushDataset(replStrategy); + // sync any newly generated files + fileSync.sync(); + } + + private void checkpointReplicaIndexes() throws IOException { + CheckpointPartitionIndexesTask task = + new CheckpointPartitionIndexesTask(replica.getIdentifier().getPartition()); + ReplicationProtocol.sendTo(replica, task); + ReplicationProtocol.waitForAck(replica); + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/cc7d2f0c/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/PartitionReplica.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/PartitionReplica.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/PartitionReplica.java index c6d1b60..d9ce75e 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/PartitionReplica.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/PartitionReplica.java @@ -22,12 +22,20 @@ import static org.apache.asterix.common.replication.IPartitionReplica.PartitionR import static org.apache.asterix.common.replication.IPartitionReplica.PartitionReplicaStatus.DISCONNECTED; import static org.apache.asterix.common.replication.IPartitionReplica.PartitionReplicaStatus.IN_SYNC; +import java.io.IOException; +import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; +import java.util.logging.Level; +import java.util.logging.Logger; +import org.apache.asterix.common.api.INcApplicationContext; import org.apache.asterix.common.exceptions.ReplicationException; import org.apache.asterix.common.replication.IPartitionReplica; import org.apache.asterix.common.storage.ReplicaIdentifier; +import org.apache.asterix.replication.functions.ReplicationProtocol; +import org.apache.asterix.replication.recovery.ReplicaSynchronizer; import org.apache.hyracks.util.JSONUtil; +import org.apache.hyracks.util.StorageUtil; import org.apache.hyracks.util.annotations.ThreadSafe; import com.fasterxml.jackson.core.JsonProcessingException; @@ -38,12 +46,18 @@ import com.fasterxml.jackson.databind.node.ObjectNode; @ThreadSafe public class PartitionReplica implements IPartitionReplica { + private static final Logger LOGGER = Logger.getLogger(PartitionReplica.class.getName()); private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final int INITIAL_BUFFER_SIZE = StorageUtil.getIntSizeInBytes(4, StorageUtil.StorageUnit.KILOBYTE); + private final INcApplicationContext appCtx; private final ReplicaIdentifier id; + private ByteBuffer reusbaleBuf; private PartitionReplicaStatus status = DISCONNECTED; + private SocketChannel sc; - public PartitionReplica(ReplicaIdentifier id) { + public PartitionReplica(ReplicaIdentifier id, INcApplicationContext appCtx) { this.id = id; + this.appCtx = appCtx; } @Override @@ -60,9 +74,53 @@ public class PartitionReplica implements IPartitionReplica { if (status == IN_SYNC || status == CATCHING_UP) { return; } + setStatus(CATCHING_UP); + appCtx.getThreadExecutor().execute(() -> { + try { + new ReplicaSynchronizer(appCtx, this).sync(); + setStatus(IN_SYNC); + } catch (Exception e) { + LOGGER.log(Level.SEVERE, e, () -> "Failed to sync replica " + this); + setStatus(DISCONNECTED); + } finally { + close(); + } + }); } - public JsonNode asJson() { + public synchronized SocketChannel getChannel() { + try { + if (sc == null || !sc.isOpen() || !sc.isConnected()) { + sc = SocketChannel.open(); + sc.configureBlocking(true); + sc.connect(id.getLocation()); + } + return sc; + } catch (IOException e) { + throw new ReplicationException(e); + } + } + + public synchronized void close() { + try { + if (sc != null && sc.isOpen()) { + ReplicationProtocol.sendGoodbye(sc); + sc.close(); + sc = null; + } + } catch (IOException e) { + throw new ReplicationException(e); + } + } + + public synchronized ByteBuffer gerReusableBuffer() { + if (reusbaleBuf == null) { + reusbaleBuf = ByteBuffer.allocate(INITIAL_BUFFER_SIZE); + } + return reusbaleBuf; + } + + private JsonNode asJson() { ObjectNode json = OBJECT_MAPPER.createObjectNode(); json.put("id", id.toString()); json.put("state", status.name()); @@ -94,4 +152,9 @@ public class PartitionReplica implements IPartitionReplica { throw new ReplicationException(e); } } + + private synchronized void setStatus(PartitionReplicaStatus status) { + LOGGER.info(() -> "Replica " + this + " status changing: " + this.status + " -> " + status); + this.status = status; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/cc7d2f0c/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java index 2ff74a8..398f97d 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java @@ -182,6 +182,7 @@ public class ReplicaResourcesManager implements IReplicaResourcesManager { * @param partition * @return Absolute paths to all partition files */ + @Override public List<String> getPartitionIndexesFiles(int partition, boolean relativePath) throws HyracksDataException { List<String> partitionFiles = new ArrayList<String>(); Set<File> partitionIndexes = localRepository.getPartitionIndexes(partition);