http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateLogsTask.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateLogsTask.java
 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateLogsTask.java
new file mode 100644
index 0000000..b71f4b8
--- /dev/null
+++ 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateLogsTask.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.replication.messaging;
+
+import java.io.DataInput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.exceptions.ReplicationException;
+import org.apache.asterix.replication.api.IReplicationWorker;
+import org.apache.asterix.common.transactions.ILogManager;
+import org.apache.asterix.replication.api.IReplicaTask;
+import org.apache.asterix.replication.logging.RemoteLogRecord;
+import org.apache.asterix.replication.logging.RemoteLogsProcessor;
+import org.apache.asterix.replication.management.ReplicationChannel;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * A task to replicate transaction logs from master replica
+ */
+public class ReplicateLogsTask implements IReplicaTask {
+
+    public static final int END_REPLICATION_LOG_SIZE = 1;
+    private final String nodeId;
+
+    public ReplicateLogsTask(String nodeId) {
+        this.nodeId = nodeId;
+    }
+
+    @Override
+    public void perform(INcApplicationContext appCtx, IReplicationWorker 
worker) {
+        final ReplicationChannel replicationChannel = (ReplicationChannel) 
appCtx.getReplicationChannel();
+        final RemoteLogsProcessor logsProcessor = 
replicationChannel.getRemoteLogsProcessor();
+        final ILogManager logManager = 
appCtx.getTransactionSubsystem().getLogManager();
+        final RemoteLogRecord reusableLog = new RemoteLogRecord();
+        final SocketChannel channel = worker.getChannel();
+        ByteBuffer logsBuffer = 
ByteBuffer.allocate(logManager.getLogPageSize());
+        try {
+            while (true) {
+                // read a batch of logs
+                logsBuffer = ReplicationProtocol.readRequest(channel, 
logsBuffer);
+                // check if it is end of handshake
+                if (logsBuffer.remaining() == END_REPLICATION_LOG_SIZE) {
+                    break;
+                }
+                logsProcessor.process(logsBuffer, reusableLog, worker);
+            }
+        } catch (IOException e) {
+            throw new ReplicationException(e);
+        }
+    }
+
+    @Override
+    public ReplicationProtocol.ReplicationRequestType getMessageType() {
+        return ReplicationProtocol.ReplicationRequestType.REPLICATE_LOGS;
+    }
+
+    @Override
+    public void serialize(OutputStream out) throws HyracksDataException {
+        try {
+            DataOutputStream dos = new DataOutputStream(out);
+            dos.writeUTF(nodeId);
+        } catch (IOException e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    public static ReplicateLogsTask create(DataInput input) throws IOException 
{
+        final String node = input.readUTF();
+        return new ReplicateLogsTask(node);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicationProtocol.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicationProtocol.java
 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicationProtocol.java
new file mode 100644
index 0000000..280a2d4
--- /dev/null
+++ 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicationProtocol.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.replication.messaging;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Stream;
+
+import org.apache.asterix.common.exceptions.ReplicationException;
+import org.apache.asterix.replication.api.IReplicationMessage;
+import org.apache.asterix.replication.api.PartitionReplica;
+import org.apache.asterix.replication.management.NetworkingUtil;
+import org.apache.hyracks.data.std.util.ExtendedByteArrayOutputStream;
+import org.apache.hyracks.util.StorageUtil;
+
+public class ReplicationProtocol {
+
+    /**
+     * All replication messages start with ReplicationRequestType (4 bytes), 
then the length of the request in bytes
+     */
+    public static final String LOG_REPLICATION_ACK = "$";
+    public static final int INITIAL_BUFFER_SIZE = 
StorageUtil.getIntSizeInBytes(4, StorageUtil.StorageUnit.KILOBYTE);
+    private static final int REPLICATION_REQUEST_TYPE_SIZE = Integer.BYTES;
+    private static final int REPLICATION_REQUEST_HEADER_SIZE = 
REPLICATION_REQUEST_TYPE_SIZE + Integer.BYTES;
+
+    public enum ReplicationRequestType {
+        GOODBYE,
+        ACK,
+        PARTITION_RESOURCES_REQUEST,
+        PARTITION_RESOURCES_RESPONSE,
+        REPLICATE_RESOURCE_FILE,
+        DELETE_RESOURCE_FILE,
+        CHECKPOINT_PARTITION,
+        LSM_COMPONENT_MASK,
+        MARK_COMPONENT_VALID,
+        DROP_INDEX,
+        REPLICATE_LOGS
+    }
+
+    private static final Map<Integer, ReplicationRequestType> TYPES = new 
HashMap<>();
+
+    static {
+        Stream.of(ReplicationRequestType.values()).forEach(type -> 
TYPES.put(type.ordinal(), type));
+    }
+
+    public static ByteBuffer readRequest(SocketChannel socketChannel, 
ByteBuffer dataBuffer) throws IOException {
+        // read request size
+        NetworkingUtil.readBytes(socketChannel, dataBuffer, Integer.BYTES);
+        final int requestSize = dataBuffer.getInt();
+        final ByteBuffer buf = ensureSize(dataBuffer, requestSize);
+        // read request
+        NetworkingUtil.readBytes(socketChannel, buf, requestSize);
+        return dataBuffer;
+    }
+
+    public static ReplicationRequestType getRequestType(SocketChannel 
socketChannel, ByteBuffer byteBuffer)
+            throws IOException {
+        // read replication request type
+        NetworkingUtil.readBytes(socketChannel, byteBuffer, 
REPLICATION_REQUEST_TYPE_SIZE);
+        return TYPES.get(byteBuffer.getInt());
+    }
+
+    private static ByteBuffer getGoodbyeBuffer() {
+        ByteBuffer bb = ByteBuffer.allocate(REPLICATION_REQUEST_TYPE_SIZE);
+        bb.putInt(ReplicationRequestType.GOODBYE.ordinal());
+        bb.flip();
+        return bb;
+    }
+
+    public static int getTxnIdFromLogAckMessage(String msg) {
+        return Integer.parseInt(msg.substring(msg.indexOf(LOG_REPLICATION_ACK) 
+ 1));
+    }
+
+    public static void sendGoodbye(SocketChannel socketChannel) throws 
IOException {
+        ByteBuffer goodbyeBuffer = ReplicationProtocol.getGoodbyeBuffer();
+        NetworkingUtil.transferBufferToChannel(socketChannel, goodbyeBuffer);
+    }
+
+    public static void sendAck(SocketChannel socketChannel, ByteBuffer buf) {
+        try {
+            buf.clear();
+            buf.putInt(ReplicationRequestType.ACK.ordinal());
+            buf.flip();
+            NetworkingUtil.transferBufferToChannel(socketChannel, buf);
+        } catch (IOException e) {
+            throw new ReplicationException(e);
+        }
+    }
+
+    public static void waitForAck(PartitionReplica replica) throws IOException 
{
+        final SocketChannel channel = replica.getChannel();
+        final ByteBuffer buf = replica.getReusableBuffer();
+        ReplicationRequestType responseFunction = 
ReplicationProtocol.getRequestType(channel, buf);
+        if (responseFunction != ReplicationRequestType.ACK) {
+            throw new IllegalStateException("Unexpected response while waiting 
for ack.");
+        }
+    }
+
+    public static void sendTo(PartitionReplica replica, IReplicationMessage 
task) {
+        final SocketChannel channel = replica.getChannel();
+        final ByteBuffer buf = replica.getReusableBuffer();
+        sendTo(channel, task, buf);
+    }
+
+    public static void sendTo(SocketChannel channel, IReplicationMessage task, 
ByteBuffer buf) {
+        ExtendedByteArrayOutputStream outputStream = new 
ExtendedByteArrayOutputStream();
+        try (DataOutputStream oos = new DataOutputStream(outputStream)) {
+            task.serialize(oos);
+            final int requestSize = REPLICATION_REQUEST_HEADER_SIZE + 
oos.size();
+            final ByteBuffer requestBuffer = ensureSize(buf, requestSize);
+            requestBuffer.putInt(task.getMessageType().ordinal());
+            requestBuffer.putInt(oos.size());
+            requestBuffer.put(outputStream.getByteArray(), 0, 
outputStream.getLength());
+            requestBuffer.flip();
+            NetworkingUtil.transferBufferToChannel(channel, requestBuffer);
+        } catch (IOException e) {
+            throw new ReplicationException(e);
+        }
+    }
+
+    public static IReplicationMessage read(SocketChannel socketChannel, 
ByteBuffer buffer) throws IOException {
+        final ReplicationRequestType type = getRequestType(socketChannel, 
buffer);
+        return readMessage(type, socketChannel, buffer);
+    }
+
+    public static IReplicationMessage readMessage(ReplicationRequestType type, 
SocketChannel socketChannel,
+            ByteBuffer buffer) {
+        try {
+            ReplicationProtocol.readRequest(socketChannel, buffer);
+            final ByteArrayInputStream input =
+                    new ByteArrayInputStream(buffer.array(), 
buffer.position(), buffer.limit());
+            try (DataInputStream dis = new DataInputStream(input)) {
+                switch (type) {
+                    case PARTITION_RESOURCES_REQUEST:
+                        return PartitionResourcesListTask.create(dis);
+                    case PARTITION_RESOURCES_RESPONSE:
+                        return PartitionResourcesListResponse.create(dis);
+                    case REPLICATE_RESOURCE_FILE:
+                        return ReplicateFileTask.create(dis);
+                    case DELETE_RESOURCE_FILE:
+                        return DeleteFileTask.create(dis);
+                    case CHECKPOINT_PARTITION:
+                        return CheckpointPartitionIndexesTask.create(dis);
+                    case LSM_COMPONENT_MASK:
+                        return ComponentMaskTask.create(dis);
+                    case DROP_INDEX:
+                        return DropIndexTask.create(dis);
+                    case MARK_COMPONENT_VALID:
+                        return MarkComponentValidTask.create(dis);
+                    case REPLICATE_LOGS:
+                        return ReplicateLogsTask.create(dis);
+                    default:
+                        throw new IllegalStateException("Unrecognized 
replication message");
+                }
+            }
+        } catch (IOException e) {
+            throw new ReplicationException(e);
+        }
+    }
+
+    public static ByteBuffer getEndLogReplicationBuffer() {
+        final int logsBatchSize = 1;
+        final ByteBuffer endLogRepBuffer =
+                ByteBuffer.allocate(Integer.BYTES + 
ReplicateLogsTask.END_REPLICATION_LOG_SIZE);
+        endLogRepBuffer.putInt(logsBatchSize);
+        endLogRepBuffer.put((byte) 0);
+        endLogRepBuffer.flip();
+        return endLogRepBuffer;
+    }
+
+    private static ByteBuffer ensureSize(ByteBuffer buffer, int size) {
+        if (buffer == null || buffer.capacity() < size) {
+            return ByteBuffer.allocate(size);
+        }
+        buffer.clear();
+        return buffer;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/FileSynchronizer.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/FileSynchronizer.java
 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/FileSynchronizer.java
deleted file mode 100644
index 8aa4487..0000000
--- 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/FileSynchronizer.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.replication.recovery;
-
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.nio.channels.FileChannel;
-import java.nio.channels.SocketChannel;
-
-import org.apache.asterix.common.api.INcApplicationContext;
-import org.apache.asterix.common.exceptions.ReplicationException;
-import org.apache.asterix.replication.functions.ReplicationProtocol;
-import org.apache.asterix.replication.management.NetworkingUtil;
-import org.apache.asterix.replication.messaging.DeleteFileTask;
-import org.apache.asterix.replication.messaging.ReplicateFileTask;
-import org.apache.asterix.replication.storage.PartitionReplica;
-import org.apache.hyracks.api.io.FileReference;
-import org.apache.hyracks.api.io.IIOManager;
-
-public class FileSynchronizer {
-
-    private final INcApplicationContext appCtx;
-    private final PartitionReplica replica;
-
-    public FileSynchronizer(INcApplicationContext appCtx, PartitionReplica 
replica) {
-        this.appCtx = appCtx;
-        this.replica = replica;
-    }
-
-    public void replicate(String file) {
-        try {
-            final IIOManager ioManager = appCtx.getIoManager();
-            final SocketChannel channel = replica.getChannel();
-            final FileReference filePath = ioManager.resolve(file);
-            ReplicateFileTask task = new ReplicateFileTask(file, 
filePath.getFile().length());
-            ReplicationProtocol.sendTo(replica, task);
-            // send the file itself
-            try (RandomAccessFile fromFile = new 
RandomAccessFile(filePath.getFile(),
-                    "r"); FileChannel fileChannel = fromFile.getChannel()) {
-                NetworkingUtil.sendFile(fileChannel, channel);
-            }
-            ReplicationProtocol.waitForAck(replica);
-        } catch (IOException e) {
-            throw new ReplicationException(e);
-        }
-    }
-
-    public void delete(String file) {
-        try {
-            final DeleteFileTask task = new DeleteFileTask(file);
-            ReplicationProtocol.sendTo(replica, task);
-            ReplicationProtocol.waitForAck(replica);
-        } catch (IOException e) {
-            throw new ReplicationException(e);
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java
 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java
deleted file mode 100644
index 5d044b4..0000000
--- 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java
+++ /dev/null
@@ -1,320 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.replication.recovery;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.stream.Collectors;
-
-import org.apache.asterix.common.api.IDatasetLifecycleManager;
-import org.apache.asterix.common.api.INcApplicationContext;
-import org.apache.asterix.common.cluster.ClusterPartition;
-import org.apache.asterix.common.config.ReplicationProperties;
-import org.apache.asterix.common.exceptions.ACIDException;
-import org.apache.asterix.common.replication.IRemoteRecoveryManager;
-import org.apache.asterix.common.replication.IReplicationManager;
-import org.apache.asterix.common.replication.IReplicationStrategy;
-import org.apache.asterix.common.replication.Replica;
-import org.apache.asterix.common.transactions.ILogManager;
-import org.apache.asterix.common.transactions.IRecoveryManager;
-import org.apache.asterix.replication.storage.ReplicaResourcesManager;
-import 
org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-public class RemoteRecoveryManager implements IRemoteRecoveryManager {
-
-    private final IReplicationManager replicationManager;
-    private static final Logger LOGGER = LogManager.getLogger();
-    private final INcApplicationContext runtimeContext;
-    private final ReplicationProperties replicationProperties;
-    private Map<String, Set<String>> failbackRecoveryReplicas;
-    private IReplicationStrategy replicationStrategy;
-
-    public RemoteRecoveryManager(IReplicationManager replicationManager, 
INcApplicationContext runtimeContext,
-            ReplicationProperties replicationProperties) {
-        this.replicationManager = replicationManager;
-        this.runtimeContext = runtimeContext;
-        this.replicationProperties = replicationProperties;
-        this.replicationStrategy = replicationManager.getReplicationStrategy();
-    }
-
-    private Map<String, Set<String>> constructRemoteRecoveryPlan() {
-        //1. identify which replicas reside in this node
-        String localNodeId = runtimeContext.getTransactionSubsystem().getId();
-
-        Set<Replica> replicas = 
replicationStrategy.getRemoteReplicasAndSelf(localNodeId);
-        Map<String, Set<String>> recoveryCandidates = new HashMap<>();
-        Map<String, Integer> candidatesScore = new HashMap<>();
-
-        //2. identify which nodes has backup per lost node data
-        for (Replica node : replicas) {
-            Set<Replica> locations = 
replicationStrategy.getRemoteReplicasAndSelf(node.getId());
-
-            //since the local node just started, remove it from candidates
-            locations.remove(new Replica(localNodeId, "", -1));
-
-            //remove any dead replicas
-            Set<String> deadReplicas = replicationManager.getDeadReplicasIds();
-            for (String deadReplica : deadReplicas) {
-                locations.remove(new Replica(deadReplica, "", -1));
-            }
-
-            //no active replicas to recover from
-            if (locations.isEmpty()) {
-                throw new IllegalStateException("Could not find any ACTIVE 
replica to recover " + node + " data.");
-            }
-
-            for (Replica locationRep : locations) {
-                String location = locationRep.getId();
-                if (candidatesScore.containsKey(location)) {
-                    candidatesScore.put(location, 
candidatesScore.get(location) + 1);
-                } else {
-                    candidatesScore.put(location, 1);
-                }
-            }
-            recoveryCandidates.put(node.getId(), 
locations.stream().map(Replica::getId).collect(Collectors.toSet()));
-        }
-
-        Map<String, Set<String>> recoveryList = new HashMap<>();
-
-        //3. find best candidate to recover from per lost replica data
-        recoveryCandidates.forEach((key, value) -> {
-            int winnerScore = -1;
-            String winner = "";
-            for (String node : value) {
-
-                int nodeScore = candidatesScore.get(node);
-
-                if (nodeScore > winnerScore) {
-                    winnerScore = nodeScore;
-                    winner = node;
-                }
-            }
-
-            if (recoveryList.containsKey(winner)) {
-                recoveryList.get(winner).add(key);
-            } else {
-                Set<String> nodesToRecover = new HashSet<>();
-                nodesToRecover.add(key);
-                recoveryList.put(winner, nodesToRecover);
-            }
-
-        });
-
-        return recoveryList;
-    }
-
-    @Override
-    public void replayReplicaPartitionLogs(Set<Integer> partitions, boolean 
flush) throws HyracksDataException {
-        ILogManager logManager = 
runtimeContext.getTransactionSubsystem().getLogManager();
-        long minLSN = 
runtimeContext.getReplicaResourcesManager().getPartitionsMinLSN(partitions);
-        long readableSmallestLSN = logManager.getReadableSmallestLSN();
-        if (minLSN < readableSmallestLSN) {
-            minLSN = readableSmallestLSN;
-        }
-
-        //replay logs > minLSN that belong to these partitions
-        IRecoveryManager recoveryManager = 
runtimeContext.getTransactionSubsystem().getRecoveryManager();
-        try {
-            recoveryManager.replayPartitionsLogs(partitions, 
logManager.getLogReader(true), minLSN);
-            if (flush) {
-                runtimeContext.getDatasetLifecycleManager().flushAllDatasets();
-            }
-        } catch (IOException | ACIDException e) {
-            throw new HyracksDataException(e);
-        }
-    }
-
-    @Override
-    public void takeoverPartitons(Integer[] partitions) throws IOException, 
ACIDException {
-        /*
-         * TODO even though the takeover is always expected to succeed,
-         * in case of any failure during the takeover, the CC should be
-         * notified that the takeover failed.
-         */
-        Set<Integer> partitionsToTakeover = new 
HashSet<>(Arrays.asList(partitions));
-        replayReplicaPartitionLogs(partitionsToTakeover, false);
-
-        //mark these partitions as active in this node
-        PersistentLocalResourceRepository resourceRepository = 
(PersistentLocalResourceRepository) runtimeContext
-                .getLocalResourceRepository();
-        for (Integer patitionId : partitions) {
-            resourceRepository.addActivePartition(patitionId);
-        }
-    }
-
-    @Override
-    public void startFailbackProcess() {
-        int maxRecoveryAttempts = 
replicationProperties.getMaxRemoteRecoveryAttempts();
-        PersistentLocalResourceRepository resourceRepository = 
(PersistentLocalResourceRepository) runtimeContext
-                .getLocalResourceRepository();
-        IDatasetLifecycleManager datasetLifeCycleManager = 
runtimeContext.getDatasetLifecycleManager();
-        Map<String, ClusterPartition[]> nodePartitions = 
runtimeContext.getMetadataProperties().getNodePartitions();
-
-        while (true) {
-            //start recovery steps
-            try {
-                if (maxRecoveryAttempts <= 0) {
-                    //to avoid infinite loop in case of unexpected behavior.
-                    throw new IllegalStateException("Failed to perform remote 
recovery.");
-                }
-
-                /*** Prepare for Recovery ***/
-                //1. check remote replicas states
-                replicationManager.initializeReplicasState();
-                int activeReplicasCount = 
replicationManager.getActiveReplicasCount();
-
-                if (activeReplicasCount == 0) {
-                    throw new IllegalStateException("no ACTIVE remote 
replica(s) exists to perform remote recovery");
-                }
-
-                //2. clean any memory data that could've existed from previous 
failed recovery attempt
-                datasetLifeCycleManager.closeAllDatasets();
-
-                //3. remove any existing storage data and initialize storage 
metadata
-                resourceRepository.deleteStorageData();
-
-                //4. select remote replicas to recover from per lost replica 
data
-                failbackRecoveryReplicas = constructRemoteRecoveryPlan();
-
-                /*** Start Recovery Per Lost Replica ***/
-                for (Entry<String, Set<String>> remoteReplica : 
failbackRecoveryReplicas.entrySet()) {
-                    String replicaId = remoteReplica.getKey();
-                    Set<String> ncsToRecoverFor = remoteReplica.getValue();
-                    Set<Integer> partitionsIds = new HashSet<>();
-                    for (String node : ncsToRecoverFor) {
-                        
partitionsIds.addAll((Arrays.asList(nodePartitions.get(node))).stream()
-                                
.map(ClusterPartition::getPartitionId).collect(Collectors.toList()));
-                    }
-
-                    //1. Request indexes metadata and LSM components
-                    replicationManager.requestReplicaFiles(replicaId, 
partitionsIds, new HashSet<String>());
-                }
-                break;
-            } catch (IOException e) {
-                LOGGER.warn("Failed during remote recovery. Attempting 
again...", e);
-                maxRecoveryAttempts--;
-            }
-        }
-    }
-
-    @Override
-    public void completeFailbackProcess() throws IOException, 
InterruptedException {
-        ILogManager logManager = 
runtimeContext.getTransactionSubsystem().getLogManager();
-        ReplicaResourcesManager replicaResourcesManager = 
(ReplicaResourcesManager) runtimeContext
-                .getReplicaResourcesManager();
-        Map<String, ClusterPartition[]> nodePartitions = 
runtimeContext.getMetadataProperties().getNodePartitions();
-
-        /*
-         * for each lost partition, get the remaining files from replicas
-         * to complete the failback process.
-         */
-        try {
-            for (Entry<String, Set<String>> remoteReplica : 
failbackRecoveryReplicas.entrySet()) {
-                String replicaId = remoteReplica.getKey();
-                Set<String> NCsDataToRecover = remoteReplica.getValue();
-                Set<String> existingFiles = new HashSet<>();
-                Set<Integer> partitionsToRecover = new HashSet<>();
-                for (String nodeId : NCsDataToRecover) {
-                    //get partitions that will be recovered from this node
-                    ClusterPartition[] replicaPartitions = 
nodePartitions.get(nodeId);
-                    for (ClusterPartition partition : replicaPartitions) {
-                        existingFiles.addAll(
-                                
replicaResourcesManager.getPartitionIndexesFiles(partition.getPartitionId(), 
true));
-                        partitionsToRecover.add(partition.getPartitionId());
-                    }
-                }
-
-                //Request remaining indexes files
-                replicationManager.requestReplicaFiles(replicaId, 
partitionsToRecover, existingFiles);
-            }
-        } catch (IOException e) {
-            /*
-             * in case of failure during failback completion process we need 
to construct a new plan
-             * and get all the files from the start since the remote replicas 
will change in the new plan.
-             */
-            LOGGER.warn("Failed during completing failback. Restarting 
failback process...", e);
-            startFailbackProcess();
-        }
-
-        //get max LSN from selected remote replicas
-        long maxRemoteLSN = 
replicationManager.getMaxRemoteLSN(failbackRecoveryReplicas.keySet());
-
-        //6. force LogManager to start from a partition > maxLSN in selected 
remote replicas
-        logManager.renewLogFilesAndStartFromLSN(maxRemoteLSN);
-
-        //start replication service after failback completed
-        runtimeContext.getReplicationChannel().start();
-        runtimeContext.getReplicationManager().startReplicationThreads();
-
-        failbackRecoveryReplicas = null;
-    }
-
-    //TODO refactor common code between remote recovery and failback process
-    @Override
-    public void doRemoteRecoveryPlan(Map<String, Set<Integer>> recoveryPlan) 
throws HyracksDataException {
-        int maxRecoveryAttempts = 
replicationProperties.getMaxRemoteRecoveryAttempts();
-        PersistentLocalResourceRepository resourceRepository = 
(PersistentLocalResourceRepository) runtimeContext
-                .getLocalResourceRepository();
-        IDatasetLifecycleManager datasetLifeCycleManager = 
runtimeContext.getDatasetLifecycleManager();
-        ILogManager logManager = 
runtimeContext.getTransactionSubsystem().getLogManager();
-        while (true) {
-            //start recovery steps
-            try {
-                if (maxRecoveryAttempts <= 0) {
-                    //to avoid infinite loop in case of unexpected behavior.
-                    throw new IllegalStateException("Failed to perform remote 
recovery.");
-                }
-
-                /*** Prepare for Recovery ***/
-                //1. clean any memory data that could've existed from previous 
failed recovery attempt
-                datasetLifeCycleManager.closeAllDatasets();
-
-                //2. remove any existing storage data and initialize storage 
metadata
-                resourceRepository.deleteStorageData();
-
-                /*** Start Recovery Per Lost Replica ***/
-                for (Entry<String, Set<Integer>> remoteReplica : 
recoveryPlan.entrySet()) {
-                    String replicaId = remoteReplica.getKey();
-                    Set<Integer> partitionsToRecover = 
remoteReplica.getValue();
-
-                    //Request indexes metadata and LSM components
-                    replicationManager.requestReplicaFiles(replicaId, 
partitionsToRecover, new HashSet<String>());
-                }
-
-                //get max LSN from selected remote replicas
-                long maxRemoteLSN = 
replicationManager.getMaxRemoteLSN(recoveryPlan.keySet());
-
-                //6. force LogManager to start from a partition > maxLSN in 
selected remote replicas
-                logManager.renewLogFilesAndStartFromLSN(maxRemoteLSN);
-                break;
-            } catch (IOException e) {
-                LOGGER.warn("Failed during remote recovery. Attempting 
again...", e);
-                maxRecoveryAttempts--;
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/ReplicaFilesSynchronizer.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/ReplicaFilesSynchronizer.java
 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/ReplicaFilesSynchronizer.java
deleted file mode 100644
index 2021cee..0000000
--- 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/ReplicaFilesSynchronizer.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.replication.recovery;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.SocketChannel;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.stream.Collectors;
-
-import org.apache.asterix.common.api.INcApplicationContext;
-import org.apache.asterix.common.utils.StoragePathUtil;
-import org.apache.asterix.replication.functions.ReplicationProtocol;
-import org.apache.asterix.replication.messaging.PartitionResourcesListResponse;
-import org.apache.asterix.replication.messaging.PartitionResourcesListTask;
-import org.apache.asterix.replication.storage.PartitionReplica;
-
-/**
- * Ensures that the files between master and a replica are synchronized
- */
-public class ReplicaFilesSynchronizer {
-
-    private final PartitionReplica replica;
-    private final INcApplicationContext appCtx;
-
-    public ReplicaFilesSynchronizer(INcApplicationContext appCtx, 
PartitionReplica replica) {
-        this.appCtx = appCtx;
-        this.replica = replica;
-    }
-
-    public void sync() throws IOException {
-        final int partition = replica.getIdentifier().getPartition();
-        final Set<String> replicaFiles = getReplicaFiles(partition);
-        final Set<String> masterFiles =
-                
appCtx.getReplicaResourcesManager().getPartitionIndexesFiles(partition, 
false).stream()
-                        
.map(StoragePathUtil::getFileRelativePath).collect(Collectors.toSet());
-        // find files on master and not on replica
-        final List<String> replicaMissingFiles =
-                masterFiles.stream().filter(file -> 
!replicaFiles.contains(file)).collect(Collectors.toList());
-        replicateMissingFiles(replicaMissingFiles);
-        // find files on replica and not on master
-        final List<String> replicaInvalidFiles =
-                replicaFiles.stream().filter(file -> 
!masterFiles.contains(file)).collect(Collectors.toList());
-        deleteInvalidFiles(replicaInvalidFiles);
-    }
-
-    private Set<String> getReplicaFiles(int partition) throws IOException {
-        final PartitionResourcesListTask replicaFilesRequest = new 
PartitionResourcesListTask(partition);
-        final SocketChannel channel = replica.getChannel();
-        final ByteBuffer reusableBuffer = replica.gerReusableBuffer();
-        ReplicationProtocol.sendTo(replica, replicaFilesRequest);
-        final PartitionResourcesListResponse response =
-                (PartitionResourcesListResponse) 
ReplicationProtocol.read(channel, reusableBuffer);
-        return new HashSet<>(response.getResources());
-    }
-
-    private void replicateMissingFiles(List<String> files) {
-        final FileSynchronizer sync = new FileSynchronizer(appCtx, replica);
-        files.forEach(sync::replicate);
-    }
-
-    private void deleteInvalidFiles(List<String> files) {
-        final FileSynchronizer sync = new FileSynchronizer(appCtx, replica);
-        files.forEach(sync::delete);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/ReplicaSynchronizer.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/ReplicaSynchronizer.java
 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/ReplicaSynchronizer.java
deleted file mode 100644
index 1fa3246..0000000
--- 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/ReplicaSynchronizer.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.replication.recovery;
-
-import java.io.IOException;
-
-import org.apache.asterix.common.api.INcApplicationContext;
-import org.apache.asterix.common.config.ReplicationProperties;
-import org.apache.asterix.common.replication.IReplicationStrategy;
-import org.apache.asterix.replication.functions.ReplicationProtocol;
-import org.apache.asterix.replication.messaging.CheckpointPartitionIndexesTask;
-import org.apache.asterix.replication.storage.PartitionReplica;
-
-/**
- * Performs the steps required to ensure any newly added replica
- * will be in-sync with master
- */
-public class ReplicaSynchronizer {
-
-    private final INcApplicationContext appCtx;
-    private final PartitionReplica replica;
-
-    public ReplicaSynchronizer(INcApplicationContext appCtx, PartitionReplica 
replica) {
-        this.appCtx = appCtx;
-        this.replica = replica;
-    }
-
-    public void sync() throws IOException {
-        syncFiles();
-        checkpointReplicaIndexes();
-        appCtx.getReplicationManager().register(replica);
-    }
-
-    private void syncFiles() throws IOException {
-        final ReplicaFilesSynchronizer fileSync = new 
ReplicaFilesSynchronizer(appCtx, replica);
-        fileSync.sync();
-        // flush replicated dataset to generate disk component for any 
remaining in-memory components
-        final IReplicationStrategy replStrategy = 
appCtx.getReplicationManager().getReplicationStrategy();
-        appCtx.getDatasetLifecycleManager().flushDataset(replStrategy);
-        // sync any newly generated files
-        fileSync.sync();
-    }
-
-    private void checkpointReplicaIndexes() throws IOException {
-        CheckpointPartitionIndexesTask task =
-                new 
CheckpointPartitionIndexesTask(replica.getIdentifier().getPartition());
-        ReplicationProtocol.sendTo(replica, task);
-        ReplicationProtocol.waitForAck(replica);
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMComponentLSNSyncTask.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMComponentLSNSyncTask.java
 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMComponentLSNSyncTask.java
deleted file mode 100644
index 08c0ec7..0000000
--- 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMComponentLSNSyncTask.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.replication.storage;
-
-public class LSMComponentLSNSyncTask {
-
-    private String componentFilePath;
-    private String componentId;
-
-    public LSMComponentLSNSyncTask(String componentId, String 
componentFilePath) {
-        this.componentId = componentId;
-        this.componentFilePath = componentFilePath;
-    }
-
-    public String getComponentFilePath() {
-        return componentFilePath;
-    }
-
-    public String getComponentId() {
-        return componentId;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMComponentProperties.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMComponentProperties.java
 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMComponentProperties.java
deleted file mode 100644
index bf987d0..0000000
--- 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMComponentProperties.java
+++ /dev/null
@@ -1,159 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.replication.storage;
-
-import java.io.DataInput;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.file.Paths;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
-import org.apache.asterix.common.storage.ResourceReference;
-import org.apache.asterix.replication.logging.TxnLogUtil;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexReplicationJob;
-import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
-import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
-import 
org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager;
-
-public class LSMComponentProperties {
-
-    private AtomicInteger numberOfFiles;
-    private String componentId;
-    private long lsnOffset;
-    private long originalLSN;
-    private String nodeId;
-    private Long replicaLSN;
-    private String maskPath = null;
-    private String replicaPath = null;
-    private LSMOperationType opType;
-
-    public LSMComponentProperties(ILSMIndexReplicationJob job, String nodeId) {
-        this.nodeId = nodeId;
-        componentId = LSMComponentProperties.getLSMComponentID((String) 
job.getJobFiles().toArray()[0]);
-        numberOfFiles = new AtomicInteger(job.getJobFiles().size());
-        opType = job.getLSMOpType();
-        originalLSN = opType == LSMOperationType.FLUSH ?
-                LSMComponentProperties.getLSMComponentLSN((AbstractLSMIndex) 
job.getLSMIndex(),
-                        job.getLSMIndexOperationContext()) : 0;
-    }
-
-    public LSMComponentProperties() {
-    }
-
-    public static long getLSMComponentLSN(AbstractLSMIndex lsmIndex, 
ILSMIndexOperationContext ctx) {
-        long componentLSN = -1;
-        try {
-            componentLSN = ((AbstractLSMIOOperationCallback) 
lsmIndex.getIOOperationCallback())
-                    .getComponentLSN(ctx.getComponentsToBeReplicated());
-        } catch (HyracksDataException e) {
-            e.printStackTrace();
-        }
-        if (componentLSN < 0) {
-            componentLSN = 0;
-        }
-        return componentLSN;
-    }
-
-    public void serialize(OutputStream out) throws IOException {
-        DataOutputStream dos = new DataOutputStream(out);
-        dos.writeUTF(componentId);
-        dos.writeUTF(nodeId);
-        dos.writeInt(numberOfFiles.get());
-        dos.writeLong(originalLSN);
-        dos.writeLong(lsnOffset);
-        dos.writeInt(opType.ordinal());
-    }
-
-    public static LSMComponentProperties create(DataInput input) throws 
IOException {
-        LSMComponentProperties lsmCompProp = new LSMComponentProperties();
-        lsmCompProp.componentId = input.readUTF();
-        lsmCompProp.nodeId = input.readUTF();
-        lsmCompProp.numberOfFiles = new AtomicInteger(input.readInt());
-        lsmCompProp.originalLSN = input.readLong();
-        lsmCompProp.lsnOffset = input.readLong();
-        lsmCompProp.opType = LSMOperationType.values()[input.readInt()];
-        return lsmCompProp;
-    }
-
-    public String getMaskPath(ReplicaResourcesManager resourceManager) throws 
HyracksDataException {
-        if (maskPath == null) {
-            LSMIndexFileProperties afp = new LSMIndexFileProperties(this);
-            maskPath = getReplicaComponentPath(resourceManager) + 
File.separator + afp.getFileName()
-                    + ReplicaResourcesManager.LSM_COMPONENT_MASK_SUFFIX;
-        }
-        return maskPath;
-    }
-
-    public String getReplicaComponentPath(ReplicaResourcesManager 
resourceManager) throws HyracksDataException {
-        if (replicaPath == null) {
-            LSMIndexFileProperties afp = new LSMIndexFileProperties(this);
-            replicaPath = resourceManager.getIndexPath(afp);
-        }
-        return replicaPath;
-    }
-
-    /***
-     * @param filePath
-     *            any file of the LSM component
-     * @return a unique id based on the timestamp of the component
-     */
-    public static String getLSMComponentID(String filePath) {
-        final ResourceReference ref = ResourceReference.of(filePath);
-        final String fileUniqueTimestamp =
-                ref.getName().substring(0, 
ref.getName().lastIndexOf(AbstractLSMIndexFileManager.DELIMITER));
-        return Paths.get(ref.getRelativePath().toString(), 
fileUniqueTimestamp).toString();
-    }
-
-    public String getComponentId() {
-        return componentId;
-    }
-
-    public long getOriginalLSN() {
-        return originalLSN;
-    }
-
-    public String getNodeId() {
-        return nodeId;
-    }
-
-    public int markFileComplete() {
-        return numberOfFiles.decrementAndGet();
-    }
-
-    public Long getReplicaLSN() {
-        return replicaLSN;
-    }
-
-    public void setReplicaLSN(Long replicaLSN) {
-        this.replicaLSN = replicaLSN;
-    }
-
-    public LSMOperationType getOpType() {
-        return opType;
-    }
-
-    public String getNodeUniqueLSN() {
-        return TxnLogUtil.getNodeUniqueLSN(nodeId, originalLSN);
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMIndexFileProperties.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMIndexFileProperties.java
 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMIndexFileProperties.java
deleted file mode 100644
index 2ebf2cb..0000000
--- 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMIndexFileProperties.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.replication.storage;
-
-import java.io.DataInput;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.file.Paths;
-
-public class LSMIndexFileProperties {
-
-    private long fileSize;
-    private String nodeId;
-    private boolean lsmComponentFile;
-    private String filePath;
-    private boolean requiresAck = false;
-
-    public LSMIndexFileProperties() {
-    }
-
-    public LSMIndexFileProperties(String filePath, long fileSize, String 
nodeId, boolean lsmComponentFile,
-            boolean requiresAck) {
-        initialize(filePath, fileSize, nodeId, lsmComponentFile, requiresAck);
-    }
-
-    public LSMIndexFileProperties(LSMComponentProperties 
lsmComponentProperties) {
-        initialize(lsmComponentProperties.getComponentId(), -1, 
lsmComponentProperties.getNodeId(), false, false);
-    }
-
-    public void initialize(String filePath, long fileSize, String nodeId, 
boolean lsmComponentFile,
-            boolean requiresAck) {
-        this.filePath = filePath;
-        this.fileSize = fileSize;
-        this.nodeId = nodeId;
-        this.lsmComponentFile = lsmComponentFile;
-        this.requiresAck = requiresAck;
-    }
-
-    public void serialize(OutputStream out) throws IOException {
-        DataOutputStream dos = new DataOutputStream(out);
-        dos.writeUTF(nodeId);
-        dos.writeUTF(filePath);
-        dos.writeLong(fileSize);
-        dos.writeBoolean(lsmComponentFile);
-        dos.writeBoolean(requiresAck);
-    }
-
-    public static LSMIndexFileProperties create(DataInput input) throws 
IOException {
-        String nodeId = input.readUTF();
-        String filePath = input.readUTF();
-        long fileSize = input.readLong();
-        boolean lsmComponentFile = input.readBoolean();
-        boolean requiresAck = input.readBoolean();
-        LSMIndexFileProperties fileProp =
-                new LSMIndexFileProperties(filePath, fileSize, nodeId, 
lsmComponentFile, requiresAck);
-        return fileProp;
-    }
-
-    public String getFilePath() {
-        return filePath;
-    }
-
-    public long getFileSize() {
-        return fileSize;
-    }
-
-    public String getNodeId() {
-        return nodeId;
-    }
-
-    public boolean isLSMComponentFile() {
-        return lsmComponentFile;
-    }
-
-    public boolean requiresAck() {
-        return requiresAck;
-    }
-
-    public String getFileName() {
-        return Paths.get(filePath).toFile().getName();
-    }
-
-    @Override
-    public String toString() {
-        StringBuilder sb = new StringBuilder();
-        sb.append("File Path: " + filePath + "  ");
-        sb.append("File Size: " + fileSize + "  ");
-        sb.append("Node ID: " + nodeId + "  ");
-        sb.append("isLSMComponentFile : " + lsmComponentFile + "  ");
-        return sb.toString();
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/PartitionReplica.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/PartitionReplica.java
 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/PartitionReplica.java
deleted file mode 100644
index b7fa49d..0000000
--- 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/PartitionReplica.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.replication.storage;
-
-import static 
org.apache.asterix.common.replication.IPartitionReplica.PartitionReplicaStatus.CATCHING_UP;
-import static 
org.apache.asterix.common.replication.IPartitionReplica.PartitionReplicaStatus.DISCONNECTED;
-import static 
org.apache.asterix.common.replication.IPartitionReplica.PartitionReplicaStatus.IN_SYNC;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.SocketChannel;
-
-import org.apache.asterix.common.api.INcApplicationContext;
-import org.apache.asterix.common.exceptions.ReplicationException;
-import org.apache.asterix.common.replication.IPartitionReplica;
-import org.apache.asterix.common.storage.ReplicaIdentifier;
-import org.apache.asterix.replication.functions.ReplicationProtocol;
-import org.apache.asterix.replication.recovery.ReplicaSynchronizer;
-import org.apache.hyracks.util.JSONUtil;
-import org.apache.hyracks.util.StorageUtil;
-import org.apache.hyracks.util.annotations.ThreadSafe;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-
-@ThreadSafe
-public class PartitionReplica implements IPartitionReplica {
-
-    private static final Logger LOGGER = LogManager.getLogger();
-    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
-    private static final int INITIAL_BUFFER_SIZE = 
StorageUtil.getIntSizeInBytes(4, StorageUtil.StorageUnit.KILOBYTE);
-    private final INcApplicationContext appCtx;
-    private final ReplicaIdentifier id;
-    private ByteBuffer reusbaleBuf;
-    private PartitionReplicaStatus status = DISCONNECTED;
-    private SocketChannel sc;
-
-    public PartitionReplica(ReplicaIdentifier id, INcApplicationContext 
appCtx) {
-        this.id = id;
-        this.appCtx = appCtx;
-    }
-
-    @Override
-    public synchronized PartitionReplicaStatus getStatus() {
-        return status;
-    }
-
-    @Override
-    public ReplicaIdentifier getIdentifier() {
-        return id;
-    }
-
-    public synchronized void sync() {
-        if (status == IN_SYNC || status == CATCHING_UP) {
-            return;
-        }
-        setStatus(CATCHING_UP);
-        appCtx.getThreadExecutor().execute(() -> {
-            try {
-                new ReplicaSynchronizer(appCtx, this).sync();
-                setStatus(IN_SYNC);
-            } catch (Exception e) {
-                LOGGER.error(() -> "Failed to sync replica " + this, e);
-                setStatus(DISCONNECTED);
-            } finally {
-                close();
-            }
-        });
-    }
-
-    public synchronized SocketChannel getChannel() {
-        try {
-            if (sc == null || !sc.isOpen() || !sc.isConnected()) {
-                sc = SocketChannel.open();
-                sc.configureBlocking(true);
-                sc.connect(id.getLocation());
-            }
-            return sc;
-        } catch (IOException e) {
-            throw new ReplicationException(e);
-        }
-    }
-
-    public synchronized void close() {
-        try {
-            if (sc != null && sc.isOpen()) {
-                ReplicationProtocol.sendGoodbye(sc);
-                sc.close();
-                sc = null;
-            }
-        } catch (IOException e) {
-            throw new ReplicationException(e);
-        }
-    }
-
-    public synchronized ByteBuffer gerReusableBuffer() {
-        if (reusbaleBuf == null) {
-            reusbaleBuf = ByteBuffer.allocate(INITIAL_BUFFER_SIZE);
-        }
-        return reusbaleBuf;
-    }
-
-    private JsonNode asJson() {
-        ObjectNode json = OBJECT_MAPPER.createObjectNode();
-        json.put("id", id.toString());
-        json.put("state", status.name());
-        return json;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) {
-            return true;
-        }
-        if (o == null || getClass() != o.getClass()) {
-            return false;
-        }
-        PartitionReplica that = (PartitionReplica) o;
-        return id.equals(that.id);
-    }
-
-    @Override
-    public int hashCode() {
-        return id.hashCode();
-    }
-
-    @Override
-    public String toString() {
-        try {
-            return JSONUtil.convertNode(asJson());
-        } catch (JsonProcessingException e) {
-            throw new ReplicationException(e);
-        }
-    }
-
-    private synchronized void setStatus(PartitionReplicaStatus status) {
-        LOGGER.info(() -> "Replica " + this + " status changing: " + 
this.status + " -> " + status);
-        this.status = status;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java
 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java
deleted file mode 100644
index 398f97d..0000000
--- 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java
+++ /dev/null
@@ -1,226 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.replication.storage;
-
-import java.io.File;
-import java.io.FilenameFilter;
-import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
-
-import org.apache.asterix.common.cluster.ClusterPartition;
-import org.apache.asterix.common.config.MetadataProperties;
-import org.apache.asterix.common.dataflow.DatasetLocalResource;
-import org.apache.asterix.common.replication.IReplicaResourcesManager;
-import org.apache.asterix.common.storage.DatasetResourceReference;
-import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
-import org.apache.asterix.common.utils.StorageConstants;
-import org.apache.asterix.common.utils.StoragePathUtil;
-import 
org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
-import org.apache.commons.io.FileUtils;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.io.FileReference;
-import org.apache.hyracks.storage.common.ILocalResourceRepository;
-import org.apache.hyracks.storage.common.LocalResource;
-
-public class ReplicaResourcesManager implements IReplicaResourcesManager {
-    public static final String LSM_COMPONENT_MASK_SUFFIX = "_mask";
-    private final PersistentLocalResourceRepository localRepository;
-    private final Map<String, ClusterPartition[]> nodePartitions;
-    private final IIndexCheckpointManagerProvider 
indexCheckpointManagerProvider;
-
-    public ReplicaResourcesManager(ILocalResourceRepository localRepository, 
MetadataProperties metadataProperties,
-            IIndexCheckpointManagerProvider indexCheckpointManagerProvider) {
-        this.localRepository = (PersistentLocalResourceRepository) 
localRepository;
-        this.indexCheckpointManagerProvider = indexCheckpointManagerProvider;
-        nodePartitions = metadataProperties.getNodePartitions();
-    }
-
-    public void deleteIndexFile(LSMIndexFileProperties afp) throws 
HyracksDataException {
-        String indexPath = getIndexPath(afp);
-        if (indexPath != null) {
-            if (afp.isLSMComponentFile()) {
-                //delete index file
-                String indexFilePath = indexPath + File.separator + 
afp.getFileName();
-                File destFile = new File(indexFilePath);
-                FileUtils.deleteQuietly(destFile);
-            } else {
-                //delete index directory
-                FileUtils.deleteQuietly(new File(indexPath));
-            }
-        }
-    }
-
-    public String getIndexPath(LSMIndexFileProperties fileProperties) throws 
HyracksDataException {
-        final FileReference indexPath = 
localRepository.getIndexPath(Paths.get(fileProperties.getFilePath()));
-        if (!indexPath.getFile().exists()) {
-            indexPath.getFile().mkdirs();
-        }
-        return indexPath.toString();
-    }
-
-    public void createRemoteLSMComponentMask(LSMComponentProperties 
lsmComponentProperties) throws IOException {
-        String maskPath = lsmComponentProperties.getMaskPath(this);
-        Path path = Paths.get(maskPath);
-        if (!Files.exists(path)) {
-            File maskFile = new File(maskPath);
-            maskFile.createNewFile();
-        }
-    }
-
-    public void markLSMComponentReplicaAsValid(LSMComponentProperties 
lsmComponentProperties) throws IOException {
-        //remove mask to mark component as valid
-        String maskPath = lsmComponentProperties.getMaskPath(this);
-        Path path = Paths.get(maskPath);
-        Files.deleteIfExists(path);
-    }
-
-    public Set<File> getReplicaIndexes(String replicaId) throws 
HyracksDataException {
-        Set<File> remoteIndexesPaths = new HashSet<File>();
-        ClusterPartition[] partitions = nodePartitions.get(replicaId);
-        for (ClusterPartition partition : partitions) {
-            
remoteIndexesPaths.addAll(localRepository.getPartitionIndexes(partition.getPartitionId()));
-        }
-        return remoteIndexesPaths;
-    }
-
-    @Override
-    public long getPartitionsMinLSN(Set<Integer> partitions) throws 
HyracksDataException {
-        long minRemoteLSN = Long.MAX_VALUE;
-        for (Integer partition : partitions) {
-            final List<DatasetResourceReference> partitionResources = 
localRepository.getResources(resource -> {
-                DatasetLocalResource dsResource = (DatasetLocalResource) 
resource.getResource();
-                return dsResource.getPartition() == partition;
-            
}).values().stream().map(DatasetResourceReference::of).collect(Collectors.toList());
-            for (DatasetResourceReference indexRef : partitionResources) {
-                long remoteIndexMaxLSN = 
indexCheckpointManagerProvider.get(indexRef).getLowWatermark();
-                minRemoteLSN = Math.min(minRemoteLSN, remoteIndexMaxLSN);
-            }
-        }
-        return minRemoteLSN;
-    }
-
-    public Map<Long, DatasetResourceReference> 
getLaggingReplicaIndexesId2PathMap(String replicaId, long targetLSN)
-            throws HyracksDataException {
-        Map<Long, DatasetResourceReference> laggingReplicaIndexes = new 
HashMap<>();
-        final List<Integer> replicaPartitions =
-                
Arrays.stream(nodePartitions.get(replicaId)).map(ClusterPartition::getPartitionId)
-                        .collect(Collectors.toList());
-        for (int patition : replicaPartitions) {
-            final Map<Long, LocalResource> partitionResources = 
localRepository.getPartitionResources(patition);
-            final List<DatasetResourceReference> indexesRefs =
-                    
partitionResources.values().stream().map(DatasetResourceReference::of).collect(Collectors.toList());
-            for (DatasetResourceReference ref : indexesRefs) {
-                if (indexCheckpointManagerProvider.get(ref).getLowWatermark() 
< targetLSN) {
-                    laggingReplicaIndexes.put(ref.getResourceId(), ref);
-                }
-            }
-        }
-        return laggingReplicaIndexes;
-    }
-
-    public void cleanInvalidLSMComponents(String replicaId) {
-        //for every index in replica
-        Set<File> remoteIndexes = null;
-        try {
-            remoteIndexes = getReplicaIndexes(replicaId);
-        } catch (HyracksDataException e) {
-            throw new IllegalStateException(e);
-        }
-        for (File remoteIndexFile : remoteIndexes) {
-            //search for any mask
-            File[] masks = 
remoteIndexFile.listFiles(LSM_COMPONENTS_MASKS_FILTER);
-
-            for (File mask : masks) {
-                //delete all files belonging to this mask
-                deleteLSMComponentFilesForMask(mask);
-                //delete the mask itself
-                mask.delete();
-            }
-        }
-    }
-
-    private static void deleteLSMComponentFilesForMask(File maskFile) {
-        String lsmComponentTimeStamp = maskFile.getName().substring(0,
-                maskFile.getName().length() - 
LSM_COMPONENT_MASK_SUFFIX.length());
-        File indexFolder = maskFile.getParentFile();
-        File[] lsmComponentsFiles = 
indexFolder.listFiles(LSM_COMPONENTS_NON_MASKS_FILTER);
-        for (File lsmComponentFile : lsmComponentsFiles) {
-            if (lsmComponentFile.getName().contains(lsmComponentTimeStamp)) {
-                //match based on time stamp
-                lsmComponentFile.delete();
-            }
-        }
-    }
-
-    /**
-     * @param partition
-     * @return Absolute paths to all partition files
-     */
-    @Override
-    public List<String> getPartitionIndexesFiles(int partition, boolean 
relativePath) throws HyracksDataException {
-        List<String> partitionFiles = new ArrayList<String>();
-        Set<File> partitionIndexes = 
localRepository.getPartitionIndexes(partition);
-        for (File indexDir : partitionIndexes) {
-            if (indexDir.isDirectory()) {
-                File[] indexFiles = indexDir.listFiles(LSM_INDEX_FILES_FILTER);
-                if (indexFiles != null) {
-                    for (File file : indexFiles) {
-                        if (!relativePath) {
-                            partitionFiles.add(file.getAbsolutePath());
-                        } else {
-                            
partitionFiles.add(StoragePathUtil.getIndexFileRelativePath(file.getAbsolutePath()));
-                        }
-                    }
-                }
-            }
-        }
-        return partitionFiles;
-    }
-
-    private static final FilenameFilter LSM_COMPONENTS_MASKS_FILTER = new 
FilenameFilter() {
-        @Override
-        public boolean accept(File dir, String name) {
-            return name.endsWith(LSM_COMPONENT_MASK_SUFFIX);
-        }
-    };
-
-    private static final FilenameFilter LSM_COMPONENTS_NON_MASKS_FILTER = new 
FilenameFilter() {
-        @Override
-        public boolean accept(File dir, String name) {
-            return !name.endsWith(LSM_COMPONENT_MASK_SUFFIX);
-        }
-    };
-
-    private static final FilenameFilter LSM_INDEX_FILES_FILTER = new 
FilenameFilter() {
-        @Override
-        public boolean accept(File dir, String name) {
-            return name.equalsIgnoreCase(StorageConstants.METADATA_FILE_NAME) 
|| !name.startsWith(".");
-        }
-    };
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/FileSynchronizer.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/FileSynchronizer.java
 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/FileSynchronizer.java
new file mode 100644
index 0000000..e1649b3
--- /dev/null
+++ 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/FileSynchronizer.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.replication.sync;
+
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.channels.FileChannel;
+import java.nio.channels.SocketChannel;
+
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.exceptions.ReplicationException;
+import org.apache.asterix.replication.messaging.ReplicationProtocol;
+import org.apache.asterix.replication.management.NetworkingUtil;
+import org.apache.asterix.replication.messaging.DeleteFileTask;
+import org.apache.asterix.replication.messaging.ReplicateFileTask;
+import org.apache.asterix.replication.api.PartitionReplica;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.IIOManager;
+
+public class FileSynchronizer {
+
+    private final INcApplicationContext appCtx;
+    private final PartitionReplica replica;
+
+    public FileSynchronizer(INcApplicationContext appCtx, PartitionReplica 
replica) {
+        this.appCtx = appCtx;
+        this.replica = replica;
+    }
+
+    public void replicate(String file) {
+        replicate(file, false);
+    }
+
+    public void replicate(String file, boolean metadata) {
+        try {
+            final IIOManager ioManager = appCtx.getIoManager();
+            final SocketChannel channel = replica.getChannel();
+            final FileReference filePath = ioManager.resolve(file);
+            ReplicateFileTask task = new ReplicateFileTask(file, 
filePath.getFile().length(), metadata);
+            ReplicationProtocol.sendTo(replica, task);
+            // send the file itself
+            try (RandomAccessFile fromFile = new 
RandomAccessFile(filePath.getFile(),
+                    "r"); FileChannel fileChannel = fromFile.getChannel()) {
+                NetworkingUtil.sendFile(fileChannel, channel);
+            }
+            ReplicationProtocol.waitForAck(replica);
+        } catch (IOException e) {
+            throw new ReplicationException(e);
+        }
+    }
+
+    public void delete(String file) {
+        try {
+            final DeleteFileTask task = new DeleteFileTask(file);
+            ReplicationProtocol.sendTo(replica, task);
+            ReplicationProtocol.waitForAck(replica);
+        } catch (IOException e) {
+            throw new ReplicationException(e);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/IndexSynchronizer.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/IndexSynchronizer.java
 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/IndexSynchronizer.java
new file mode 100644
index 0000000..74f38e2
--- /dev/null
+++ 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/IndexSynchronizer.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.replication.sync;
+
+import static 
org.apache.hyracks.api.replication.IReplicationJob.ReplicationOperation.DELETE;
+import static 
org.apache.hyracks.api.replication.IReplicationJob.ReplicationOperation.REPLICATE;
+
+import java.io.IOException;
+import java.nio.file.Paths;
+
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
+import org.apache.asterix.common.storage.ResourceReference;
+import org.apache.asterix.common.utils.StoragePathUtil;
+import org.apache.asterix.replication.api.PartitionReplica;
+import org.apache.asterix.replication.messaging.ComponentMaskTask;
+import org.apache.asterix.replication.messaging.DropIndexTask;
+import org.apache.asterix.replication.messaging.MarkComponentValidTask;
+import org.apache.asterix.replication.messaging.ReplicationProtocol;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.replication.IReplicationJob;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexReplicationJob;
+import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
+import 
org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class IndexSynchronizer {
+
+    private static final Logger LOGGER = LogManager.getLogger();
+    private final IReplicationJob job;
+    private final INcApplicationContext appCtx;
+
+    public IndexSynchronizer(IReplicationJob job, INcApplicationContext 
appCtx) {
+        this.job = job;
+        this.appCtx = appCtx;
+    }
+
+    public void sync(PartitionReplica replica) throws IOException {
+        switch (job.getJobType()) {
+            case LSM_COMPONENT:
+                syncComponent(replica);
+                break;
+            case METADATA:
+                syncMetadata(replica);
+                break;
+            default:
+                throw new IllegalStateException("unrecognized job type: " + 
job.getJobType().name());
+        }
+    }
+
+    private void syncComponent(PartitionReplica replica) throws IOException {
+        if (job.getOperation() == REPLICATE) {
+            replicateComponent(replica);
+        } else if (job.getOperation() == DELETE) {
+            deleteComponent(replica);
+        }
+    }
+
+    private void syncMetadata(PartitionReplica replica) throws IOException {
+        if (job.getOperation() == REPLICATE) {
+            replicateIndexMetadata(replica);
+        } else if (job.getOperation() == DELETE) {
+            deleteIndexMetadata(replica);
+        }
+    }
+
+    private void replicateComponent(PartitionReplica replica) throws 
IOException {
+        // send component header
+        final String anyFile = job.getAnyFile();
+        final String lsmComponentID = getComponentId(anyFile);
+        final String indexFile = StoragePathUtil.getFileRelativePath(anyFile);
+        final ComponentMaskTask maskTask = new ComponentMaskTask(indexFile, 
lsmComponentID);
+        ReplicationProtocol.sendTo(replica, maskTask);
+        ReplicationProtocol.waitForAck(replica);
+        // send component files
+        final FileSynchronizer fileSynchronizer = new FileSynchronizer(appCtx, 
replica);
+        
job.getJobFiles().stream().map(StoragePathUtil::getFileRelativePath).forEach(fileSynchronizer::replicate);
+        // send mark component valid
+        MarkComponentValidTask markValidTask = new 
MarkComponentValidTask(indexFile, getReplicatedComponentLsn());
+        ReplicationProtocol.sendTo(replica, markValidTask);
+        ReplicationProtocol.waitForAck(replica);
+        LOGGER.debug("Replicated component ({}) to replica {}", indexFile, 
replica);
+    }
+
+    private void deleteComponent(PartitionReplica replica) {
+        FileSynchronizer fileSynchronizer = new FileSynchronizer(appCtx, 
replica);
+        
job.getJobFiles().stream().map(StoragePathUtil::getFileRelativePath).forEach(fileSynchronizer::delete);
+    }
+
+    private void replicateIndexMetadata(PartitionReplica replica) {
+        // send the index metadata file
+        final FileSynchronizer fileSynchronizer = new FileSynchronizer(appCtx, 
replica);
+        job.getJobFiles().stream().map(StoragePathUtil::getFileRelativePath)
+                .forEach(file -> fileSynchronizer.replicate(file, true));
+    }
+
+    private void deleteIndexMetadata(PartitionReplica replica) throws 
IOException {
+        final String file = 
StoragePathUtil.getFileRelativePath(job.getAnyFile());
+        final DropIndexTask task = new DropIndexTask(file);
+        ReplicationProtocol.sendTo(replica, task);
+        ReplicationProtocol.waitForAck(replica);
+    }
+
+    private long getReplicatedComponentLsn() throws HyracksDataException {
+        final ILSMIndexReplicationJob indexReplJob = (ILSMIndexReplicationJob) 
job;
+        if (indexReplJob.getLSMOpType() != LSMOperationType.FLUSH) {
+            return AbstractLSMIOOperationCallback.INVALID;
+        }
+        final ILSMIndex lsmIndex = indexReplJob.getLSMIndex();
+        final ILSMIndexOperationContext ctx = 
indexReplJob.getLSMIndexOperationContext();
+        return ((AbstractLSMIOOperationCallback) 
lsmIndex.getIOOperationCallback())
+                .getComponentLSN(ctx.getComponentsToBeReplicated());
+    }
+
+    private static String getComponentId(String filePath) {
+        final ResourceReference ref = ResourceReference.of(filePath);
+        final String fileUniqueTimestamp =
+                ref.getName().substring(0, 
ref.getName().lastIndexOf(AbstractLSMIndexFileManager.DELIMITER));
+        return Paths.get(ref.getRelativePath().toString(), 
fileUniqueTimestamp).toString();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java
 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java
new file mode 100644
index 0000000..5658779
--- /dev/null
+++ 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.replication.sync;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.utils.StoragePathUtil;
+import org.apache.asterix.replication.api.PartitionReplica;
+import org.apache.asterix.replication.messaging.PartitionResourcesListResponse;
+import org.apache.asterix.replication.messaging.PartitionResourcesListTask;
+import org.apache.asterix.replication.messaging.ReplicationProtocol;
+import 
org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
+
+/**
+ * Ensures that the files between master and a replica are synchronized
+ */
+public class ReplicaFilesSynchronizer {
+
+    private final PartitionReplica replica;
+    private final INcApplicationContext appCtx;
+
+    public ReplicaFilesSynchronizer(INcApplicationContext appCtx, 
PartitionReplica replica) {
+        this.appCtx = appCtx;
+        this.replica = replica;
+    }
+
+    public void sync() throws IOException {
+        final int partition = replica.getIdentifier().getPartition();
+        final Set<String> replicaFiles = getReplicaFiles(partition);
+        final PersistentLocalResourceRepository localResourceRepository =
+                (PersistentLocalResourceRepository) 
appCtx.getLocalResourceRepository();
+        final Set<String> masterFiles = 
localResourceRepository.getPartitionIndexesFiles(partition).stream()
+                
.map(StoragePathUtil::getFileRelativePath).collect(Collectors.toSet());
+        // find files on master and not on replica
+        final List<String> replicaMissingFiles =
+                masterFiles.stream().filter(file -> 
!replicaFiles.contains(file)).collect(Collectors.toList());
+        replicateMissingFiles(replicaMissingFiles);
+        // find files on replica and not on master
+        final List<String> replicaInvalidFiles =
+                replicaFiles.stream().filter(file -> 
!masterFiles.contains(file)).collect(Collectors.toList());
+        deleteInvalidFiles(replicaInvalidFiles);
+    }
+
+    private Set<String> getReplicaFiles(int partition) throws IOException {
+        final PartitionResourcesListTask replicaFilesRequest = new 
PartitionResourcesListTask(partition);
+        final SocketChannel channel = replica.getChannel();
+        final ByteBuffer reusableBuffer = replica.getReusableBuffer();
+        ReplicationProtocol.sendTo(replica, replicaFilesRequest);
+        final PartitionResourcesListResponse response =
+                (PartitionResourcesListResponse) 
ReplicationProtocol.read(channel, reusableBuffer);
+        return new HashSet<>(response.getResources());
+    }
+
+    private void replicateMissingFiles(List<String> files) {
+        final FileSynchronizer sync = new FileSynchronizer(appCtx, replica);
+        files.forEach(sync::replicate);
+    }
+
+    private void deleteInvalidFiles(List<String> files) {
+        final FileSynchronizer sync = new FileSynchronizer(appCtx, replica);
+        files.forEach(sync::delete);
+    }
+}
\ No newline at end of file

Reply via email to