Repository: asterixdb
Updated Branches:
  refs/heads/master 54249a8a9 -> cc7d2f0ce


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/cc7d2f0c/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
index 220b089..f59914d 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
@@ -35,6 +35,7 @@ public class StorageConstants {
      */
     public static final String INDEX_CHECKPOINT_FILE_PREFIX = 
".idx_checkpoint_";
     public static final String METADATA_FILE_NAME = ".metadata";
+    public static final String MASK_FILE_PREFIX = ".mask_";
     public static final String LEGACY_DATASET_INDEX_NAME_SEPARATOR = "_idx_";
 
     /**

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/cc7d2f0c/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
index b93ccb5..d2c5ad7 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
@@ -90,13 +90,21 @@ public class StoragePathUtil {
 
     /**
      * @param fileAbsolutePath
-     * @return the file relative path starting from the partition directory
+     * @return the file's index relative path starting from the storage 
directory
      */
     public static String getIndexFileRelativePath(String fileAbsolutePath) {
         return 
ResourceReference.of(fileAbsolutePath).getRelativePath().toString();
     }
 
     /**
+     * @param fileAbsolutePath
+     * @return the file's relative path starting from the storage directory
+     */
+    public static String getFileRelativePath(String fileAbsolutePath) {
+        return 
ResourceReference.of(fileAbsolutePath).getFileRelativePath().toString();
+    }
+
+    /**
      * Create a file
      * Note: this method is not thread safe. It is the responsibility of the 
caller to ensure no path conflict when
      * creating files simultaneously

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/cc7d2f0c/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/IReplicaTask.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/IReplicaTask.java
 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/IReplicaTask.java
new file mode 100644
index 0000000..001d41f
--- /dev/null
+++ 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/IReplicaTask.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.replication.api;
+
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.replication.IReplicationThread;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public interface IReplicaTask extends IReplicationMessage {
+
+    /**
+     * Performs the task on the replica
+     *
+     * @param appCtx
+     * @param worker
+     * @throws HyracksDataException
+     */
+    void perform(INcApplicationContext appCtx, IReplicationThread worker) 
throws HyracksDataException;
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/cc7d2f0c/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/IReplicationMessage.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/IReplicationMessage.java
 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/IReplicationMessage.java
new file mode 100644
index 0000000..2e1cb8a
--- /dev/null
+++ 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/IReplicationMessage.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.replication.api;
+
+import java.io.OutputStream;
+
+import org.apache.asterix.replication.functions.ReplicationProtocol;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public interface IReplicationMessage {
+
+    /**
+     * @return the message type
+     */
+    ReplicationProtocol.ReplicationRequestType getMessageType();
+
+    /**
+     * Serializes {@link IReplicationMessage} to {@code out}
+     *
+     * @param out
+     * @throws HyracksDataException
+     */
+    void serialize(OutputStream out) throws HyracksDataException;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/cc7d2f0c/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/functions/ReplicationProtocol.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/functions/ReplicationProtocol.java
 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/functions/ReplicationProtocol.java
index 8a52529..8094548 100644
--- 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/functions/ReplicationProtocol.java
+++ 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/functions/ReplicationProtocol.java
@@ -25,10 +25,18 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.SocketChannel;
 
+import org.apache.asterix.common.exceptions.ReplicationException;
 import org.apache.asterix.common.replication.ReplicaEvent;
+import org.apache.asterix.replication.api.IReplicationMessage;
 import org.apache.asterix.replication.management.NetworkingUtil;
+import org.apache.asterix.replication.messaging.CheckpointPartitionIndexesTask;
+import org.apache.asterix.replication.messaging.DeleteFileTask;
+import org.apache.asterix.replication.messaging.PartitionResourcesListResponse;
+import org.apache.asterix.replication.messaging.PartitionResourcesListTask;
+import org.apache.asterix.replication.messaging.ReplicateFileTask;
 import org.apache.asterix.replication.storage.LSMComponentProperties;
 import org.apache.asterix.replication.storage.LSMIndexFileProperties;
+import org.apache.asterix.replication.storage.PartitionReplica;
 import org.apache.hyracks.data.std.util.ExtendedByteArrayOutputStream;
 
 public class ReplicationProtocol {
@@ -38,8 +46,8 @@ public class ReplicationProtocol {
      */
     public static final String JOB_REPLICATION_ACK = "$";
 
-    public final static int REPLICATION_REQUEST_TYPE_SIZE = Integer.BYTES;
-    public final static int REPLICATION_REQUEST_HEADER_SIZE = 
REPLICATION_REQUEST_TYPE_SIZE + Integer.BYTES;
+    public static final  int REPLICATION_REQUEST_TYPE_SIZE = Integer.BYTES;
+    private static final  int REPLICATION_REQUEST_HEADER_SIZE = 
REPLICATION_REQUEST_TYPE_SIZE + Integer.BYTES;
 
     /*
      * ReplicationRequestType:
@@ -64,7 +72,12 @@ public class ReplicationProtocol {
         REPLICA_EVENT,
         LSM_COMPONENT_PROPERTIES,
         ACK,
-        FLUSH_INDEX
+        FLUSH_INDEX,
+        PARTITION_RESOURCES_REQUEST,
+        PARTITION_RESOURCES_RESPONSE,
+        REPLICATE_RESOURCE_FILE,
+        DELETE_RESOURCE_FILE,
+        CHECKPOINT_PARTITION
     }
 
     public static ByteBuffer readRequest(SocketChannel socketChannel, 
ByteBuffer dataBuffer) throws IOException {
@@ -256,4 +269,86 @@ public class ReplicationProtocol {
         ByteBuffer ackBuffer = ReplicationProtocol.getAckBuffer();
         NetworkingUtil.transferBufferToChannel(socketChannel, ackBuffer);
     }
-}
+
+    public static void sendAck(SocketChannel socketChannel, ByteBuffer buf) {
+        try {
+            buf.clear();
+            buf.putInt(ReplicationRequestType.ACK.ordinal());
+            buf.flip();
+            NetworkingUtil.transferBufferToChannel(socketChannel, buf);
+        } catch (IOException e) {
+            throw new ReplicationException(e);
+        }
+    }
+
+    public static void waitForAck(PartitionReplica replica) throws IOException 
{
+        final SocketChannel channel = replica.getChannel();
+        final ByteBuffer buf = replica.gerReusableBuffer();
+        ReplicationRequestType responseFunction = 
ReplicationProtocol.getRequestType(channel, buf);
+        if (responseFunction != ReplicationRequestType.ACK) {
+            throw new IllegalStateException("Unexpected response while waiting 
for ack.");
+        }
+    }
+
+    public static void sendTo(PartitionReplica replica, IReplicationMessage 
task) {
+        final SocketChannel channel = replica.getChannel();
+        final ByteBuffer buf = replica.gerReusableBuffer();
+        sendTo(channel, task, buf);
+    }
+
+    public static void sendTo(SocketChannel channel, IReplicationMessage task, 
ByteBuffer buf) {
+        ExtendedByteArrayOutputStream outputStream = new 
ExtendedByteArrayOutputStream();
+        try (DataOutputStream oos = new DataOutputStream(outputStream)) {
+            task.serialize(oos);
+            final int requestSize = REPLICATION_REQUEST_HEADER_SIZE + 
oos.size();
+            final ByteBuffer requestBuffer = ensureSize(buf, requestSize);
+            requestBuffer.putInt(task.getMessageType().ordinal());
+            requestBuffer.putInt(oos.size());
+            requestBuffer.put(outputStream.getByteArray(), 0, 
outputStream.getLength());
+            requestBuffer.flip();
+            NetworkingUtil.transferBufferToChannel(channel, requestBuffer);
+        } catch (IOException e) {
+            throw new ReplicationException(e);
+        }
+    }
+
+    public static IReplicationMessage read(SocketChannel socketChannel, 
ByteBuffer buffer) throws IOException {
+        final ReplicationRequestType type = getRequestType(socketChannel, 
buffer);
+        return readMessage(type, socketChannel, buffer);
+    }
+
+    public static IReplicationMessage readMessage(ReplicationRequestType type, 
SocketChannel socketChannel,
+            ByteBuffer buffer) {
+        try {
+            ReplicationProtocol.readRequest(socketChannel, buffer);
+            final ByteArrayInputStream input =
+                    new ByteArrayInputStream(buffer.array(), 
buffer.position(), buffer.limit());
+            try (DataInputStream dis = new DataInputStream(input)) {
+                switch (type) {
+                    case PARTITION_RESOURCES_REQUEST:
+                        return PartitionResourcesListTask.create(dis);
+                    case PARTITION_RESOURCES_RESPONSE:
+                        return PartitionResourcesListResponse.create(dis);
+                    case REPLICATE_RESOURCE_FILE:
+                        return ReplicateFileTask.create(dis);
+                    case DELETE_RESOURCE_FILE:
+                        return DeleteFileTask.create(dis);
+                    case CHECKPOINT_PARTITION:
+                        return CheckpointPartitionIndexesTask.create(dis);
+                    default:
+                        throw new IllegalStateException("Unrecognized 
replication message");
+                }
+            }
+        } catch (IOException e) {
+            throw new ReplicationException(e);
+        }
+    }
+
+    private static ByteBuffer ensureSize(ByteBuffer buffer, int size) {
+        if (buffer.capacity() < size) {
+            return ByteBuffer.allocate(size);
+        }
+        buffer.clear();
+        return buffer;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/cc7d2f0c/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
index dfc23d3..ba24e07 100644
--- 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
+++ 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
@@ -24,10 +24,10 @@ import java.io.RandomAccessFile;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
+import java.nio.channels.AsynchronousCloseException;
 import java.nio.channels.FileChannel;
 import java.nio.channels.ServerSocketChannel;
 import java.nio.channels.SocketChannel;
-import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -71,6 +71,10 @@ import 
org.apache.asterix.replication.functions.ReplicaIndexFlushRequest;
 import org.apache.asterix.replication.functions.ReplicationProtocol;
 import 
org.apache.asterix.replication.functions.ReplicationProtocol.ReplicationRequestType;
 import org.apache.asterix.replication.logging.RemoteLogMapping;
+import org.apache.asterix.replication.messaging.CheckpointPartitionIndexesTask;
+import org.apache.asterix.replication.messaging.DeleteFileTask;
+import org.apache.asterix.replication.messaging.PartitionResourcesListTask;
+import org.apache.asterix.replication.messaging.ReplicateFileTask;
 import org.apache.asterix.replication.storage.LSMComponentLSNSyncTask;
 import org.apache.asterix.replication.storage.LSMComponentProperties;
 import org.apache.asterix.replication.storage.LSMIndexFileProperties;
@@ -108,12 +112,12 @@ public class ReplicationChannel extends Thread implements 
IReplicationChannel {
     private final Map<Long, RemoteLogMapping> localLsn2RemoteMapping;
     private final Map<String, RemoteLogMapping> replicaUniqueLSN2RemoteMapping;
     private final LSMComponentsSyncService lsmComponentLSNMappingService;
-    private final Set<Integer> nodeHostedPartitions;
     private final ReplicationNotifier replicationNotifier;
     private final Object flushLogslock = new Object();
     private final IDatasetLifecycleManager dsLifecycleManager;
     private final PersistentLocalResourceRepository localResourceRep;
     private final IIndexCheckpointManagerProvider 
indexCheckpointManagerProvider;
+    private final INcApplicationContext appCtx;
 
     public ReplicationChannel(String nodeId, ReplicationProperties 
replicationProperties, ILogManager logManager,
             IReplicaResourcesManager replicaResoucesManager, 
IReplicationManager replicationManager,
@@ -135,19 +139,8 @@ public class ReplicationChannel extends Thread implements 
IReplicationChannel {
         lsmComponentLSNMappingService = new LSMComponentsSyncService();
         replicationNotifier = new ReplicationNotifier();
         replicationThreads = 
Executors.newCachedThreadPool(ncServiceContext.getThreadFactory());
-        Map<String, ClusterPartition[]> nodePartitions =
-                
asterixAppRuntimeContextProvider.getAppContext().getMetadataProperties().getNodePartitions();
-        Set<String> nodeReplicationClients = 
replicationProperties.getRemotePrimaryReplicasIds(nodeId);
-        List<Integer> clientsPartitions = new ArrayList<>();
-        for (String clientId : nodeReplicationClients) {
-            for (ClusterPartition clusterPartition : 
nodePartitions.get(clientId)) {
-                clientsPartitions.add(clusterPartition.getPartitionId());
-            }
-        }
-        nodeHostedPartitions = new HashSet<>(clientsPartitions.size());
-        nodeHostedPartitions.addAll(clientsPartitions);
-        this.indexCheckpointManagerProvider =
-                ((INcApplicationContext) 
ncServiceContext.getApplicationContext()).getIndexCheckpointManagerProvider();
+        this.appCtx = (INcApplicationContext) 
ncServiceContext.getApplicationContext();
+        this.indexCheckpointManagerProvider = 
appCtx.getIndexCheckpointManagerProvider();
     }
 
     @Override
@@ -167,12 +160,14 @@ public class ReplicationChannel extends Thread implements 
IReplicationChannel {
             LOGGER.log(Level.INFO, "opened Replication Channel @ IP Address: " 
+ nodeIP + ":" + dataPort);
 
             //start accepting replication requests
-            while (true) {
+            while (serverSocketChannel.isOpen()) {
                 SocketChannel socketChannel = serverSocketChannel.accept();
                 socketChannel.configureBlocking(true);
                 //start a new thread to handle the request
                 replicationThreads.execute(new 
ReplicationThread(socketChannel));
             }
+        } catch (AsynchronousCloseException e) {
+            LOGGER.log(Level.WARNING, "Replication channel closed", e);
         } catch (IOException e) {
             throw new IllegalStateException(
                     "Could not open replication channel @ IP Address: " + 
nodeIP + ":" + dataPort, e);
@@ -209,10 +204,8 @@ public class ReplicationChannel extends Thread implements 
IReplicationChannel {
 
     @Override
     public void close() throws IOException {
-        if (!serverSocketChannel.isOpen()) {
-            serverSocketChannel.close();
-            LOGGER.log(Level.INFO, "Replication channel closed.");
-        }
+        serverSocketChannel.close();
+        LOGGER.log(Level.INFO, "Replication channel closed.");
     }
 
     /**
@@ -263,6 +256,18 @@ public class ReplicationChannel extends Thread implements 
IReplicationChannel {
                         case FLUSH_INDEX:
                             handleFlushIndex();
                             break;
+                        case PARTITION_RESOURCES_REQUEST:
+                            handleGetPartitionResources();
+                            break;
+                        case REPLICATE_RESOURCE_FILE:
+                            handleReplicateResourceFile();
+                            break;
+                        case DELETE_RESOURCE_FILE:
+                            handleDeleteResourceFile();
+                            break;
+                        case CHECKPOINT_PARTITION:
+                            handleCheckpointPartition();
+                            break;
                         default:
                             throw new IllegalStateException("Unknown 
replication request");
                     }
@@ -476,10 +481,7 @@ public class ReplicationChannel extends Thread implements 
IReplicationChannel {
                 switch (remoteLog.getLogType()) {
                     case LogType.UPDATE:
                     case LogType.ENTITY_COMMIT:
-                        //if the log partition belongs to a partitions hosted 
on this node, replicated it
-                        if 
(nodeHostedPartitions.contains(remoteLog.getResourcePartition())) {
-                            logManager.log(remoteLog);
-                        }
+                        logManager.log(remoteLog);
                         break;
                     case LogType.JOB_COMMIT:
                     case LogType.ABORT:
@@ -542,10 +544,15 @@ public class ReplicationChannel extends Thread implements 
IReplicationChannel {
         }
 
         @Override
-        public SocketChannel getReplicationClientSocket() {
+        public SocketChannel getChannel() {
             return socketChannel;
         }
 
+        @Override
+        public ByteBuffer getReusableBuffer() {
+            return outBuffer;
+        }
+
         private void checkpointReplicaIndexes(RemoteLogMapping 
remoteLogMapping, int datasetId) {
             try {
                 Predicate<LocalResource> replicaIndexesPredicate = lr -> {
@@ -568,6 +575,30 @@ public class ReplicationChannel extends Thread implements 
IReplicationChannel {
                 LOGGER.log(Level.SEVERE, "Failed to checkpoint replica 
indexes", e);
             }
         }
+
+        private void handleGetPartitionResources() throws IOException {
+            final PartitionResourcesListTask task = 
(PartitionResourcesListTask) ReplicationProtocol
+                    
.readMessage(ReplicationRequestType.PARTITION_RESOURCES_REQUEST, socketChannel, 
inBuffer);
+            task.perform(appCtx, this);
+        }
+
+        private void handleReplicateResourceFile() throws HyracksDataException 
{
+            ReplicateFileTask task = (ReplicateFileTask) ReplicationProtocol
+                    
.readMessage(ReplicationRequestType.REPLICATE_RESOURCE_FILE, socketChannel, 
inBuffer);
+            task.perform(appCtx, this);
+        }
+
+        private void handleDeleteResourceFile() throws HyracksDataException {
+            DeleteFileTask task = (DeleteFileTask) ReplicationProtocol
+                    .readMessage(ReplicationRequestType.DELETE_RESOURCE_FILE, 
socketChannel, inBuffer);
+            task.perform(appCtx, this);
+        }
+
+        private void handleCheckpointPartition() throws HyracksDataException {
+            CheckpointPartitionIndexesTask task = 
(CheckpointPartitionIndexesTask) ReplicationProtocol
+                    .readMessage(ReplicationRequestType.CHECKPOINT_PARTITION, 
socketChannel, inBuffer);
+            task.perform(appCtx, this);
+        }
     }
 
     /**
@@ -581,7 +612,7 @@ public class ReplicationChannel extends Thread implements 
IReplicationChannel {
                 try {
                     LogRecord logRecord = 
pendingNotificationRemoteLogsQ.take();
                     //send ACK to requester
-                    
logRecord.getReplicationThread().getReplicationClientSocket().socket().getOutputStream()
+                    
logRecord.getReplicationThread().getChannel().socket().getOutputStream()
                             .write((localNodeID + 
ReplicationProtocol.JOB_REPLICATION_ACK + logRecord.getTxnId()
                                     + System.lineSeparator()).getBytes());
                 } catch (InterruptedException e) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/cc7d2f0c/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
index 5cf7eab..b933db8 100644
--- 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
+++ 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
@@ -40,6 +40,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
@@ -53,7 +54,9 @@ import java.util.logging.Logger;
 import java.util.stream.Collectors;
 
 import org.apache.asterix.common.cluster.ClusterPartition;
+import org.apache.asterix.common.config.ClusterProperties;
 import org.apache.asterix.common.config.ReplicationProperties;
+import org.apache.asterix.common.replication.IPartitionReplica;
 import org.apache.asterix.common.replication.IReplicaResourcesManager;
 import org.apache.asterix.common.replication.IReplicationManager;
 import org.apache.asterix.common.replication.IReplicationStrategy;
@@ -1177,6 +1180,25 @@ public class ReplicationManager implements 
IReplicationManager {
         buffer.position(buffer.limit());
     }
 
+    @Override
+    public void register(IPartitionReplica replica) {
+        // find the replica node based on ip and replication port
+        final Optional<Node> replicaNode = 
ClusterProperties.INSTANCE.getCluster().getNode().stream()
+                .filter(node -> 
node.getClusterIp().equals(replica.getIdentifier().getLocation().getHostString())
+                        && node.getReplicationPort().intValue() == 
replica.getIdentifier().getLocation().getPort())
+                .findAny();
+        if (!replicaNode.isPresent()) {
+            throw new IllegalStateException("Couldn't find node for replica: " 
+ replica);
+        }
+        Replica replicaRef = new Replica(replicaNode.get());
+        final String replicaId = replicaRef.getId();
+        replicas.putIfAbsent(replicaId, replicaRef);
+        replica2PartitionsMap.computeIfAbsent(replicaId, k -> new HashSet<>());
+        
replica2PartitionsMap.get(replicaId).add(replica.getIdentifier().getPartition());
+        updateReplicaInfo(replicaRef);
+        checkReplicaState(replicaId, false, true);
+    }
+
     //supporting classes
     /**
      * This class is responsible for processing replica events.

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/cc7d2f0c/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java
 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java
new file mode 100644
index 0000000..2c1937b
--- /dev/null
+++ 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.replication.messaging;
+
+import java.io.DataInput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Collection;
+
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.replication.IReplicationThread;
+import org.apache.asterix.common.storage.DatasetResourceReference;
+import org.apache.asterix.common.storage.IIndexCheckpointManager;
+import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
+import org.apache.asterix.replication.api.IReplicaTask;
+import org.apache.asterix.replication.functions.ReplicationProtocol;
+import 
org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.common.LocalResource;
+
+/**
+ * A task to initialize the checkpoints for all indexes in a partition with 
the replica's current LSN
+ */
+public class CheckpointPartitionIndexesTask implements IReplicaTask {
+
+    private final int partition;
+
+    public CheckpointPartitionIndexesTask(int partition) {
+        this.partition = partition;
+    }
+
+    @Override
+    public void perform(INcApplicationContext appCtx, IReplicationThread 
worker) throws HyracksDataException {
+        final IIndexCheckpointManagerProvider indexCheckpointManagerProvider =
+                appCtx.getIndexCheckpointManagerProvider();
+        PersistentLocalResourceRepository resRepo =
+                (PersistentLocalResourceRepository) 
appCtx.getLocalResourceRepository();
+        final Collection<LocalResource> partitionResources = 
resRepo.getPartitionResources(partition).values();
+        final long currentLSN = 
appCtx.getTransactionSubsystem().getLogManager().getAppendLSN();
+        for (LocalResource ls : partitionResources) {
+            final IIndexCheckpointManager indexCheckpointManager =
+                    
indexCheckpointManagerProvider.get(DatasetResourceReference.of(ls));
+            indexCheckpointManager.delete();
+            indexCheckpointManager.init(currentLSN);
+        }
+        ReplicationProtocol.sendAck(worker.getChannel(), 
worker.getReusableBuffer());
+    }
+
+    @Override
+    public ReplicationProtocol.ReplicationRequestType getMessageType() {
+        return ReplicationProtocol.ReplicationRequestType.CHECKPOINT_PARTITION;
+    }
+
+    @Override
+    public void serialize(OutputStream out) throws HyracksDataException {
+        try {
+            DataOutputStream dos = new DataOutputStream(out);
+            dos.writeInt(partition);
+        } catch (IOException e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    public static CheckpointPartitionIndexesTask create(DataInput input) 
throws HyracksDataException {
+        try {
+            int partition = input.readInt();
+            return new CheckpointPartitionIndexesTask(partition);
+        } catch (IOException e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/cc7d2f0c/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/DeleteFileTask.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/DeleteFileTask.java
 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/DeleteFileTask.java
new file mode 100644
index 0000000..ea43ee9
--- /dev/null
+++ 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/DeleteFileTask.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.replication.messaging;
+
+import java.io.DataInput;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.file.Files;
+import java.util.logging.Logger;
+
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.exceptions.ReplicationException;
+import org.apache.asterix.common.replication.IReplicationThread;
+import org.apache.asterix.replication.api.IReplicaTask;
+import org.apache.asterix.replication.functions.ReplicationProtocol;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.IIOManager;
+
+/**
+ * A task to delete a file on a replica if exists
+ */
+public class DeleteFileTask implements IReplicaTask {
+
+    private static final Logger LOGGER = 
Logger.getLogger(DeleteFileTask.class.getName());
+    private final String file;
+
+    public DeleteFileTask(String file) {
+        this.file = file;
+    }
+
+    @Override
+    public void perform(INcApplicationContext appCtx, IReplicationThread 
worker) {
+        try {
+            final IIOManager ioManager = appCtx.getIoManager();
+            final File localFile = ioManager.resolve(file).getFile();
+            if (localFile.exists()) {
+                Files.delete(localFile.toPath());
+                LOGGER.info(() -> "Deleted file: " + 
localFile.getAbsolutePath());
+            } else {
+                LOGGER.warning(() -> "Requested to delete a non-existing file: 
" + localFile.getAbsolutePath());
+            }
+            ReplicationProtocol.sendAck(worker.getChannel(), 
worker.getReusableBuffer());
+        } catch (IOException e) {
+            throw new ReplicationException(e);
+        }
+    }
+
+    @Override
+    public ReplicationProtocol.ReplicationRequestType getMessageType() {
+        return ReplicationProtocol.ReplicationRequestType.DELETE_RESOURCE_FILE;
+    }
+
+    @Override
+    public void serialize(OutputStream out) throws HyracksDataException {
+        try {
+            DataOutputStream dos = new DataOutputStream(out);
+            dos.writeUTF(file);
+        } catch (IOException e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    public static DeleteFileTask create(DataInput input) throws IOException {
+        return new DeleteFileTask(input.readUTF());
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/cc7d2f0c/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListResponse.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListResponse.java
 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListResponse.java
new file mode 100644
index 0000000..85b7bb9
--- /dev/null
+++ 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListResponse.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.replication.messaging;
+
+import java.io.DataInput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.asterix.replication.api.IReplicationMessage;
+import org.apache.asterix.replication.functions.ReplicationProtocol;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class PartitionResourcesListResponse implements IReplicationMessage {
+
+    private final int partition;
+    private final List<String> resources;
+
+    public PartitionResourcesListResponse(int partition, List<String> 
resources) {
+        this.partition = partition;
+        this.resources = resources;
+    }
+
+    @Override
+    public ReplicationProtocol.ReplicationRequestType getMessageType() {
+        return 
ReplicationProtocol.ReplicationRequestType.PARTITION_RESOURCES_RESPONSE;
+    }
+
+    @Override
+    public void serialize(OutputStream out) throws HyracksDataException {
+        try {
+            DataOutputStream dos = new DataOutputStream(out);
+            dos.writeInt(partition);
+            dos.writeInt(resources.size());
+            for (String file : resources) {
+                dos.writeUTF(file);
+            }
+        } catch (IOException e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    public List<String> getResources() {
+        return resources;
+    }
+
+    public static PartitionResourcesListResponse create(DataInput input) 
throws IOException {
+        int partition = input.readInt();
+        int size = input.readInt();
+        List<String> resources = new ArrayList<>();
+        for (int i = 0; i < size; i++) {
+            resources.add(input.readUTF());
+        }
+        return new PartitionResourcesListResponse(partition, resources);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/cc7d2f0c/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListTask.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListTask.java
 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListTask.java
new file mode 100644
index 0000000..b2b8ac6
--- /dev/null
+++ 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListTask.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.replication.messaging;
+
+import java.io.DataInput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.replication.IReplicationThread;
+import org.apache.asterix.common.utils.StoragePathUtil;
+import org.apache.asterix.replication.api.IReplicaTask;
+import org.apache.asterix.replication.functions.ReplicationProtocol;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * A task to get the list of the files in a partition on a replica
+ */
+public class PartitionResourcesListTask implements IReplicaTask {
+
+    private final int partition;
+
+    public PartitionResourcesListTask(int partition) {
+        this.partition = partition;
+    }
+
+    @Override
+    public void perform(INcApplicationContext appCtx, IReplicationThread 
worker) throws HyracksDataException {
+        //TODO delete any invalid files with masks
+        final List<String> partitionResources =
+                
appCtx.getReplicaResourcesManager().getPartitionIndexesFiles(partition, 
false).stream()
+                        
.map(StoragePathUtil::getFileRelativePath).collect(Collectors.toList());
+        final PartitionResourcesListResponse response =
+                new PartitionResourcesListResponse(partition, 
partitionResources);
+        ReplicationProtocol.sendTo(worker.getChannel(), response, 
worker.getReusableBuffer());
+    }
+
+    @Override
+    public ReplicationProtocol.ReplicationRequestType getMessageType() {
+        return 
ReplicationProtocol.ReplicationRequestType.PARTITION_RESOURCES_REQUEST;
+    }
+
+    @Override
+    public void serialize(OutputStream out) throws HyracksDataException {
+        try {
+            DataOutputStream dos = new DataOutputStream(out);
+            dos.writeInt(partition);
+        } catch (IOException e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    public static PartitionResourcesListTask create(DataInput input) throws 
HyracksDataException {
+        try {
+            int partition = input.readInt();
+            return new PartitionResourcesListTask(partition);
+        } catch (IOException e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/cc7d2f0c/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateFileTask.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateFileTask.java
 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateFileTask.java
new file mode 100644
index 0000000..14e9180
--- /dev/null
+++ 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateFileTask.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.replication.messaging;
+
+import java.io.DataInput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.RandomAccessFile;
+import java.nio.channels.FileChannel;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.logging.Logger;
+
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.exceptions.ReplicationException;
+import org.apache.asterix.common.replication.IReplicationThread;
+import org.apache.asterix.common.utils.StorageConstants;
+import org.apache.asterix.replication.api.IReplicaTask;
+import org.apache.asterix.replication.functions.ReplicationProtocol;
+import org.apache.asterix.replication.management.NetworkingUtil;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.IIOManager;
+
+/**
+ * A task to replicate a file from a master replica
+ */
+public class ReplicateFileTask implements IReplicaTask {
+
+    private static final Logger LOGGER = 
Logger.getLogger(DeleteFileTask.class.getName());
+    private final String file;
+    private final long size;
+
+    public ReplicateFileTask(String file, long size) {
+        this.file = file;
+        this.size = size;
+    }
+
+    @Override
+    public void perform(INcApplicationContext appCtx, IReplicationThread 
worker) throws HyracksDataException {
+        try {
+            final IIOManager ioManager = appCtx.getIoManager();
+            // resolve path
+            final FileReference localPath = ioManager.resolve(file);
+            final Path resourceDir = 
Files.createDirectories(localPath.getFile().getParentFile().toPath());
+            // create mask
+            final Path maskPath = Paths.get(resourceDir.toString(),
+                    StorageConstants.MASK_FILE_PREFIX + 
localPath.getFile().getName());
+            Files.createFile(maskPath);
+
+            // receive actual file
+            final Path filePath = Paths.get(resourceDir.toString(), 
localPath.getFile().getName());
+            Files.createFile(filePath);
+            try (RandomAccessFile fileOutputStream = new 
RandomAccessFile(filePath.toFile(),
+                    "rw"); FileChannel fileChannel = 
fileOutputStream.getChannel()) {
+                fileOutputStream.setLength(size);
+                NetworkingUtil.downloadFile(fileChannel, worker.getChannel());
+                fileChannel.force(true);
+            }
+            //delete mask
+            Files.delete(maskPath);
+            LOGGER.info(() -> "Replicated file: " + localPath);
+            ReplicationProtocol.sendAck(worker.getChannel(), 
worker.getReusableBuffer());
+        } catch (IOException e) {
+            throw new ReplicationException(e);
+        }
+    }
+
+    @Override
+    public ReplicationProtocol.ReplicationRequestType getMessageType() {
+        return 
ReplicationProtocol.ReplicationRequestType.REPLICATE_RESOURCE_FILE;
+    }
+
+    @Override
+    public void serialize(OutputStream out) throws HyracksDataException {
+        try {
+            DataOutputStream dos = new DataOutputStream(out);
+            dos.writeUTF(file);
+            dos.writeLong(size);
+        } catch (IOException e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    public static ReplicateFileTask create(DataInput input) throws IOException 
{
+        final String s = input.readUTF();
+        final long i = input.readLong();
+        return new ReplicateFileTask(s, i);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/cc7d2f0c/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/FileSynchronizer.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/FileSynchronizer.java
 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/FileSynchronizer.java
new file mode 100644
index 0000000..8aa4487
--- /dev/null
+++ 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/FileSynchronizer.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.replication.recovery;
+
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.channels.FileChannel;
+import java.nio.channels.SocketChannel;
+
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.exceptions.ReplicationException;
+import org.apache.asterix.replication.functions.ReplicationProtocol;
+import org.apache.asterix.replication.management.NetworkingUtil;
+import org.apache.asterix.replication.messaging.DeleteFileTask;
+import org.apache.asterix.replication.messaging.ReplicateFileTask;
+import org.apache.asterix.replication.storage.PartitionReplica;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.IIOManager;
+
+public class FileSynchronizer {
+
+    private final INcApplicationContext appCtx;
+    private final PartitionReplica replica;
+
+    public FileSynchronizer(INcApplicationContext appCtx, PartitionReplica 
replica) {
+        this.appCtx = appCtx;
+        this.replica = replica;
+    }
+
+    public void replicate(String file) {
+        try {
+            final IIOManager ioManager = appCtx.getIoManager();
+            final SocketChannel channel = replica.getChannel();
+            final FileReference filePath = ioManager.resolve(file);
+            ReplicateFileTask task = new ReplicateFileTask(file, 
filePath.getFile().length());
+            ReplicationProtocol.sendTo(replica, task);
+            // send the file itself
+            try (RandomAccessFile fromFile = new 
RandomAccessFile(filePath.getFile(),
+                    "r"); FileChannel fileChannel = fromFile.getChannel()) {
+                NetworkingUtil.sendFile(fileChannel, channel);
+            }
+            ReplicationProtocol.waitForAck(replica);
+        } catch (IOException e) {
+            throw new ReplicationException(e);
+        }
+    }
+
+    public void delete(String file) {
+        try {
+            final DeleteFileTask task = new DeleteFileTask(file);
+            ReplicationProtocol.sendTo(replica, task);
+            ReplicationProtocol.waitForAck(replica);
+        } catch (IOException e) {
+            throw new ReplicationException(e);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/cc7d2f0c/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/ReplicaFilesSynchronizer.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/ReplicaFilesSynchronizer.java
 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/ReplicaFilesSynchronizer.java
new file mode 100644
index 0000000..2021cee
--- /dev/null
+++ 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/ReplicaFilesSynchronizer.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.replication.recovery;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.utils.StoragePathUtil;
+import org.apache.asterix.replication.functions.ReplicationProtocol;
+import org.apache.asterix.replication.messaging.PartitionResourcesListResponse;
+import org.apache.asterix.replication.messaging.PartitionResourcesListTask;
+import org.apache.asterix.replication.storage.PartitionReplica;
+
+/**
+ * Ensures that the files between master and a replica are synchronized
+ */
+public class ReplicaFilesSynchronizer {
+
+    private final PartitionReplica replica;
+    private final INcApplicationContext appCtx;
+
+    public ReplicaFilesSynchronizer(INcApplicationContext appCtx, 
PartitionReplica replica) {
+        this.appCtx = appCtx;
+        this.replica = replica;
+    }
+
+    public void sync() throws IOException {
+        final int partition = replica.getIdentifier().getPartition();
+        final Set<String> replicaFiles = getReplicaFiles(partition);
+        final Set<String> masterFiles =
+                
appCtx.getReplicaResourcesManager().getPartitionIndexesFiles(partition, 
false).stream()
+                        
.map(StoragePathUtil::getFileRelativePath).collect(Collectors.toSet());
+        // find files on master and not on replica
+        final List<String> replicaMissingFiles =
+                masterFiles.stream().filter(file -> 
!replicaFiles.contains(file)).collect(Collectors.toList());
+        replicateMissingFiles(replicaMissingFiles);
+        // find files on replica and not on master
+        final List<String> replicaInvalidFiles =
+                replicaFiles.stream().filter(file -> 
!masterFiles.contains(file)).collect(Collectors.toList());
+        deleteInvalidFiles(replicaInvalidFiles);
+    }
+
+    private Set<String> getReplicaFiles(int partition) throws IOException {
+        final PartitionResourcesListTask replicaFilesRequest = new 
PartitionResourcesListTask(partition);
+        final SocketChannel channel = replica.getChannel();
+        final ByteBuffer reusableBuffer = replica.gerReusableBuffer();
+        ReplicationProtocol.sendTo(replica, replicaFilesRequest);
+        final PartitionResourcesListResponse response =
+                (PartitionResourcesListResponse) 
ReplicationProtocol.read(channel, reusableBuffer);
+        return new HashSet<>(response.getResources());
+    }
+
+    private void replicateMissingFiles(List<String> files) {
+        final FileSynchronizer sync = new FileSynchronizer(appCtx, replica);
+        files.forEach(sync::replicate);
+    }
+
+    private void deleteInvalidFiles(List<String> files) {
+        final FileSynchronizer sync = new FileSynchronizer(appCtx, replica);
+        files.forEach(sync::delete);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/cc7d2f0c/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/ReplicaSynchronizer.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/ReplicaSynchronizer.java
 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/ReplicaSynchronizer.java
new file mode 100644
index 0000000..5c88460
--- /dev/null
+++ 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/ReplicaSynchronizer.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.replication.recovery;
+
+import java.io.IOException;
+
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.config.ReplicationProperties;
+import org.apache.asterix.common.replication.IReplicationStrategy;
+import org.apache.asterix.replication.functions.ReplicationProtocol;
+import org.apache.asterix.replication.messaging.CheckpointPartitionIndexesTask;
+import org.apache.asterix.replication.storage.PartitionReplica;
+
+/**
+ * Performs the steps required to ensure any newly added replica
+ * will be in-sync with master
+ */
+public class ReplicaSynchronizer {
+
+    private final INcApplicationContext appCtx;
+    private final PartitionReplica replica;
+
+    public ReplicaSynchronizer(INcApplicationContext appCtx, PartitionReplica 
replica) {
+        this.appCtx = appCtx;
+        this.replica = replica;
+    }
+
+    public void sync() throws IOException {
+        syncFiles();
+        checkpointReplicaIndexes();
+        appCtx.getReplicationManager().register(replica);
+    }
+
+    private void syncFiles() throws IOException {
+        final ReplicaFilesSynchronizer fileSync = new 
ReplicaFilesSynchronizer(appCtx, replica);
+        fileSync.sync();
+        // flush replicated dataset to generate disk component for any 
remaining in-memory components
+        final ReplicationProperties repl = appCtx.getReplicationProperties();
+        final IReplicationStrategy replStrategy = 
repl.getReplicationStrategy();
+        appCtx.getDatasetLifecycleManager().flushDataset(replStrategy);
+        // sync any newly generated files
+        fileSync.sync();
+    }
+
+    private void checkpointReplicaIndexes() throws IOException {
+        CheckpointPartitionIndexesTask task =
+                new 
CheckpointPartitionIndexesTask(replica.getIdentifier().getPartition());
+        ReplicationProtocol.sendTo(replica, task);
+        ReplicationProtocol.waitForAck(replica);
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/cc7d2f0c/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/PartitionReplica.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/PartitionReplica.java
 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/PartitionReplica.java
index c6d1b60..d9ce75e 100644
--- 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/PartitionReplica.java
+++ 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/PartitionReplica.java
@@ -22,12 +22,20 @@ import static 
org.apache.asterix.common.replication.IPartitionReplica.PartitionR
 import static 
org.apache.asterix.common.replication.IPartitionReplica.PartitionReplicaStatus.DISCONNECTED;
 import static 
org.apache.asterix.common.replication.IPartitionReplica.PartitionReplicaStatus.IN_SYNC;
 
+import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.nio.channels.SocketChannel;
+import java.util.logging.Level;
+import java.util.logging.Logger;
 
+import org.apache.asterix.common.api.INcApplicationContext;
 import org.apache.asterix.common.exceptions.ReplicationException;
 import org.apache.asterix.common.replication.IPartitionReplica;
 import org.apache.asterix.common.storage.ReplicaIdentifier;
+import org.apache.asterix.replication.functions.ReplicationProtocol;
+import org.apache.asterix.replication.recovery.ReplicaSynchronizer;
 import org.apache.hyracks.util.JSONUtil;
+import org.apache.hyracks.util.StorageUtil;
 import org.apache.hyracks.util.annotations.ThreadSafe;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
@@ -38,12 +46,18 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
 @ThreadSafe
 public class PartitionReplica implements IPartitionReplica {
 
+    private static final Logger LOGGER = 
Logger.getLogger(PartitionReplica.class.getName());
     private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+    private static final int INITIAL_BUFFER_SIZE = 
StorageUtil.getIntSizeInBytes(4, StorageUtil.StorageUnit.KILOBYTE);
+    private final INcApplicationContext appCtx;
     private final ReplicaIdentifier id;
+    private ByteBuffer reusbaleBuf;
     private PartitionReplicaStatus status = DISCONNECTED;
+    private SocketChannel sc;
 
-    public PartitionReplica(ReplicaIdentifier id) {
+    public PartitionReplica(ReplicaIdentifier id, INcApplicationContext 
appCtx) {
         this.id = id;
+        this.appCtx = appCtx;
     }
 
     @Override
@@ -60,9 +74,53 @@ public class PartitionReplica implements IPartitionReplica {
         if (status == IN_SYNC || status == CATCHING_UP) {
             return;
         }
+        setStatus(CATCHING_UP);
+        appCtx.getThreadExecutor().execute(() -> {
+            try {
+                new ReplicaSynchronizer(appCtx, this).sync();
+                setStatus(IN_SYNC);
+            } catch (Exception e) {
+                LOGGER.log(Level.SEVERE, e, () -> "Failed to sync replica " + 
this);
+                setStatus(DISCONNECTED);
+            } finally {
+                close();
+            }
+        });
     }
 
-    public JsonNode asJson() {
+    public synchronized SocketChannel getChannel() {
+        try {
+            if (sc == null || !sc.isOpen() || !sc.isConnected()) {
+                sc = SocketChannel.open();
+                sc.configureBlocking(true);
+                sc.connect(id.getLocation());
+            }
+            return sc;
+        } catch (IOException e) {
+            throw new ReplicationException(e);
+        }
+    }
+
+    public synchronized void close() {
+        try {
+            if (sc != null && sc.isOpen()) {
+                ReplicationProtocol.sendGoodbye(sc);
+                sc.close();
+                sc = null;
+            }
+        } catch (IOException e) {
+            throw new ReplicationException(e);
+        }
+    }
+
+    public synchronized ByteBuffer gerReusableBuffer() {
+        if (reusbaleBuf == null) {
+            reusbaleBuf = ByteBuffer.allocate(INITIAL_BUFFER_SIZE);
+        }
+        return reusbaleBuf;
+    }
+
+    private JsonNode asJson() {
         ObjectNode json = OBJECT_MAPPER.createObjectNode();
         json.put("id", id.toString());
         json.put("state", status.name());
@@ -94,4 +152,9 @@ public class PartitionReplica implements IPartitionReplica {
             throw new ReplicationException(e);
         }
     }
+
+    private synchronized void setStatus(PartitionReplicaStatus status) {
+        LOGGER.info(() -> "Replica " + this + " status changing: " + 
this.status + " -> " + status);
+        this.status = status;
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/cc7d2f0c/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java
 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java
index 2ff74a8..398f97d 100644
--- 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java
+++ 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java
@@ -182,6 +182,7 @@ public class ReplicaResourcesManager implements 
IReplicaResourcesManager {
      * @param partition
      * @return Absolute paths to all partition files
      */
+    @Override
     public List<String> getPartitionIndexesFiles(int partition, boolean 
relativePath) throws HyracksDataException {
         List<String> partitionFiles = new ArrayList<String>();
         Set<File> partitionIndexes = 
localRepository.getPartitionIndexes(partition);

Reply via email to