http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/MetadataOnlyReplicationStrategy.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/MetadataOnlyReplicationStrategy.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/MetadataOnlyReplicationStrategy.java
index 2262437..5733a60 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/MetadataOnlyReplicationStrategy.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/MetadataOnlyReplicationStrategy.java
@@ -18,91 +18,12 @@
  */
 package org.apache.asterix.common.replication;
 
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.stream.Collectors;
-
-import org.apache.asterix.common.config.MetadataProperties;
-import org.apache.asterix.common.config.ReplicationProperties;
 import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties;
-import org.apache.hyracks.api.config.IConfigManager;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.control.common.config.ConfigManager;
-import org.apache.hyracks.control.common.controllers.NCConfig;
 
 public class MetadataOnlyReplicationStrategy implements IReplicationStrategy {
 
-    private String metadataPrimaryReplicaId;
-    private Replica metadataPrimaryReplica;
-    private Set<Replica> metadataNodeReplicas;
-    MetadataProperties metadataProperties;
-
     @Override
     public boolean isMatch(int datasetId) {
         return datasetId < 
MetadataIndexImmutableProperties.FIRST_AVAILABLE_USER_DATASET_ID && datasetId 
>= 0;
     }
-
-    @Override
-    public Set<Replica> getRemoteReplicas(String nodeId) {
-        if (nodeId.equals(metadataPrimaryReplicaId)) {
-            return metadataNodeReplicas;
-        }
-        return Collections.emptySet();
-    }
-
-    @Override
-    public Set<Replica> getRemoteReplicasAndSelf(String nodeId) {
-
-        if (nodeId.equals(metadataPrimaryReplicaId)) {
-            Set<Replica> replicasAndSelf = new HashSet<>();
-            replicasAndSelf.addAll(metadataNodeReplicas);
-            replicasAndSelf.add(metadataPrimaryReplica);
-            return replicasAndSelf;
-        }
-        return Collections.emptySet();
-    }
-
-    @Override
-    public Set<Replica> getRemotePrimaryReplicas(String nodeId) {
-        if (metadataNodeReplicas.stream().map(Replica::getId).filter(replicaId 
-> replicaId.equals(nodeId))
-                .count() != 0) {
-            return new HashSet<>(Arrays.asList(metadataPrimaryReplica));
-        }
-        return Collections.emptySet();
-    }
-
-    @Override
-    public MetadataOnlyReplicationStrategy from(ReplicationProperties p, 
IConfigManager configManager)
-            throws HyracksDataException {
-        MetadataOnlyReplicationStrategy st = new 
MetadataOnlyReplicationStrategy();
-        st.metadataProperties = p.getMetadataProperties();
-        st.metadataPrimaryReplicaId = 
st.metadataProperties.getMetadataNodeName();
-        st.metadataPrimaryReplica = new Replica(st.metadataPrimaryReplicaId,
-                ((ConfigManager) 
configManager).getNodeEffectiveConfig(st.metadataPrimaryReplicaId)
-                        .getString(NCConfig.Option.REPLICATION_LISTEN_ADDRESS),
-                ((ConfigManager) 
configManager).getNodeEffectiveConfig(st.metadataPrimaryReplicaId)
-                        .getInt(NCConfig.Option.REPLICATION_LISTEN_PORT));
-        final Set<Replica> replicas = new HashSet<>();
-        Set<String> candidateSet = new HashSet<>();
-        candidateSet.addAll(((ConfigManager) (configManager)).getNodeNames());
-        candidateSet.remove(st.metadataPrimaryReplicaId);
-        String[] candidateAry = new String[candidateSet.size()];
-        candidateSet.toArray(candidateAry);
-        for (int i = 0; i < candidateAry.length && i < 
p.getReplicationFactor(); i++) {
-            replicas.add(new Replica(candidateAry[i],
-                    ((ConfigManager) 
configManager).getNodeEffectiveConfig(candidateAry[i])
-                            
.getString(NCConfig.Option.REPLICATION_LISTEN_ADDRESS),
-                    ((ConfigManager) 
configManager).getNodeEffectiveConfig(candidateAry[i])
-                            .getInt(NCConfig.Option.REPLICATION_LISTEN_PORT)));
-        }
-        st.metadataNodeReplicas = replicas;
-        return st;
-    }
-
-    @Override
-    public boolean isParticipant(String nodeId) {
-        return true;
-    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/NoReplicationStrategy.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/NoReplicationStrategy.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/NoReplicationStrategy.java
index c32ee3c..c3d9ced 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/NoReplicationStrategy.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/NoReplicationStrategy.java
@@ -18,42 +18,10 @@
  */
 package org.apache.asterix.common.replication;
 
-import java.util.Collections;
-import java.util.Set;
-
-import org.apache.asterix.common.config.ReplicationProperties;
-import org.apache.hyracks.api.config.IConfigManager;
-import org.apache.hyracks.control.common.config.ConfigManager;
-import org.apache.hyracks.control.common.controllers.NCConfig;
-
 public class NoReplicationStrategy implements IReplicationStrategy {
 
     @Override
     public boolean isMatch(int datasetId) {
         return false;
     }
-
-    @Override
-    public boolean isParticipant(String nodeId) {
-        return false;
-    }
-
-    @Override
-    public Set<Replica> getRemotePrimaryReplicas(String nodeId) {
-        return Collections.emptySet();
-    }
-
-    @Override
-    public Set<Replica> getRemoteReplicas(String node) {
-        return Collections.emptySet();
-    }
-
-    public Set<Replica> getRemoteReplicasAndSelf(String nodeId) {
-        return Collections.emptySet();
-    }
-
-    @Override
-    public NoReplicationStrategy from(ReplicationProperties p, IConfigManager 
configManager) {
-        return new NoReplicationStrategy();
-    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/Replica.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/Replica.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/Replica.java
deleted file mode 100644
index 52dbd25..0000000
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/Replica.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.replication;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-
-public class Replica {
-
-    public enum ReplicaState {
-        ACTIVE,
-        DEAD,
-        UNKNOWN
-    }
-
-    private ReplicaState state = ReplicaState.UNKNOWN;
-    String nodeId;
-    String ipAddr;
-    int port;
-
-    public Replica(String id, String ip, int port) {
-        nodeId = id;
-        ipAddr = ip;
-        this.port = port;
-    }
-
-    public ReplicaState getState() {
-        return state;
-    }
-
-    public void setState(ReplicaState state) {
-        this.state = state;
-    }
-
-    public static Replica create(DataInput input) throws IOException {
-        Replica replica = new Replica(null, null, -1);
-        replica.readFields(input);
-        return replica;
-    }
-
-    public String getId() {
-        return nodeId;
-    }
-
-    public void writeFields(DataOutput output) throws IOException {
-        output.writeUTF(nodeId);
-        output.writeUTF(ipAddr);
-        output.writeInt(port);
-        output.writeInt(state.ordinal());
-    }
-
-    public void readFields(DataInput input) throws IOException {
-        this.nodeId = input.readUTF();
-        this.ipAddr = input.readUTF();
-        this.port = input.readInt();
-        this.state = ReplicaState.values()[input.readInt()];
-    }
-
-    public String getClusterIp() {
-        return ipAddr;
-    }
-
-    public void setClusterIp(String ip) {
-        ipAddr = ip;
-    }
-
-    public int getPort() {
-        return port;
-    }
-
-    public void setPort(int port) {
-        this.port = port;
-    }
-
-    public void serialize(OutputStream out) throws IOException {
-        DataOutputStream dos = new DataOutputStream(out);
-        writeFields(dos);
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (!(o instanceof Replica)) {
-            return false;
-        }
-        Replica other = (Replica) o;
-        return nodeId.equals(other.getId());
-    }
-
-    @Override
-    public int hashCode() {
-        return nodeId.hashCode();
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/ReplicaEvent.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/ReplicaEvent.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/ReplicaEvent.java
deleted file mode 100644
index ae02ca9..0000000
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/ReplicaEvent.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.replication;
-
-import java.io.DataInput;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-
-import 
org.apache.hyracks.api.application.IClusterLifecycleListener.ClusterEventType;
-
-public class ReplicaEvent {
-
-    Replica replica;
-    ClusterEventType eventType;
-
-    public ReplicaEvent(Replica replica, ClusterEventType eventType) {
-        this.replica = replica;
-        this.eventType = eventType;
-    }
-
-    public Replica getReplica() {
-        return replica;
-    }
-
-    public void setReplica(Replica replica) {
-        this.replica = replica;
-    }
-
-    public ClusterEventType getEventType() {
-        return eventType;
-    }
-
-    public void setEventType(ClusterEventType eventType) {
-        this.eventType = eventType;
-    }
-
-    public void serialize(OutputStream out) throws IOException {
-        DataOutputStream dos = new DataOutputStream(out);
-        replica.writeFields(dos);
-        dos.writeInt(eventType.ordinal());
-    }
-
-    public static ReplicaEvent create(DataInput input) throws IOException {
-        Replica replica = Replica.create(input);
-        ClusterEventType eventType = 
ClusterEventType.values()[input.readInt()];
-        ReplicaEvent event = new ReplicaEvent(replica, eventType);
-        return event;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/ReplicationStrategyFactory.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/ReplicationStrategyFactory.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/ReplicationStrategyFactory.java
index e6b6445..8d51a99 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/ReplicationStrategyFactory.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/ReplicationStrategyFactory.java
@@ -21,39 +21,31 @@ package org.apache.asterix.common.replication;
 import java.util.HashMap;
 import java.util.Map;
 
-import org.apache.asterix.common.config.ReplicationProperties;
-import org.apache.asterix.common.exceptions.ErrorCode;
-import org.apache.asterix.common.exceptions.RuntimeDataException;
-import org.apache.hyracks.api.config.IConfigManager;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
 public class ReplicationStrategyFactory {
 
     private static final Map<String, Class<? extends IReplicationStrategy>> 
BUILT_IN_REPLICATION_STRATEGY =
             new HashMap<>();
 
     static {
-        BUILT_IN_REPLICATION_STRATEGY.put("no_replication", 
NoReplicationStrategy.class);
-        BUILT_IN_REPLICATION_STRATEGY.put("chained_declustering", 
ChainedDeclusteringReplicationStrategy.class);
-        BUILT_IN_REPLICATION_STRATEGY.put("metadata_only", 
MetadataOnlyReplicationStrategy.class);
+        BUILT_IN_REPLICATION_STRATEGY.put("none", NoReplicationStrategy.class);
+        BUILT_IN_REPLICATION_STRATEGY.put("all", 
AllDatasetsReplicationStrategy.class);
+        BUILT_IN_REPLICATION_STRATEGY.put("metadata", 
MetadataOnlyReplicationStrategy.class);
     }
 
     private ReplicationStrategyFactory() {
         throw new AssertionError();
     }
 
-    public static IReplicationStrategy create(String name, 
ReplicationProperties repProp, IConfigManager ncConfig)
-            throws HyracksDataException {
+    public static IReplicationStrategy create(String name) {
         String strategyName = name.toLowerCase();
         if (!BUILT_IN_REPLICATION_STRATEGY.containsKey(strategyName)) {
-            throw new 
RuntimeDataException(ErrorCode.UNSUPPORTED_REPLICATION_STRATEGY, String.format(
-                    "%s. Available strategies: %s", strategyName, 
BUILT_IN_REPLICATION_STRATEGY.keySet().toString()));
+            throw new IllegalStateException("Couldn't find strategy with name: 
" + name);
         }
         Class<? extends IReplicationStrategy> clazz = 
BUILT_IN_REPLICATION_STRATEGY.get(strategyName);
         try {
-            return clazz.newInstance().from(repProp, ncConfig);
+            return clazz.newInstance();
         } catch (InstantiationException | IllegalAccessException e) {
-            throw new RuntimeDataException(ErrorCode.INSTANTIATION_ERROR, e, 
clazz.getName());
+            throw new IllegalStateException("Couldn't instantiated replication 
strategy: " + name, e);
         }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ResourceReference.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ResourceReference.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ResourceReference.java
index 8cc11a8..c7de7a3 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ResourceReference.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ResourceReference.java
@@ -91,6 +91,10 @@ public class ResourceReference {
         ref.root = tokens[--offset];
     }
 
+    public int getPartitionNum() {
+        return 
Integer.parseInt(partition.substring(StorageConstants.PARTITION_DIR_PREFIX.length()));
+    }
+
     protected static void parseLegacyPath(ResourceReference ref, String path) {
         // old format: 
root/partition/dataverse/datasetName_idx_IndexName/fileName
         final String[] tokens = path.split(File.separator);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java
index 4090b65..3274849 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java
@@ -20,7 +20,6 @@ package org.apache.asterix.common.transactions;
 
 import java.nio.ByteBuffer;
 
-import org.apache.asterix.common.replication.IReplicationThread;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
 
 public interface ILogRecord {
@@ -133,8 +132,6 @@ public interface ILogRecord {
 
     public void readRemoteLog(ByteBuffer buffer);
 
-    public void setReplicationThread(IReplicationThread replicationThread);
-
     public void setLogSource(byte logSource);
 
     public byte getLogSource();
@@ -150,7 +147,7 @@ public interface ILogRecord {
     public void setReplicated(boolean replicated);
 
     /**
-     * @return a flag indicating whether the log record should be sent to 
remote replicas
+     * @return a flag indicating whether the log was replicated
      */
     public boolean isReplicated();
 
@@ -169,4 +166,18 @@ public interface ILogRecord {
     public void logAppended(long lsn);
 
     public long getPreviousMarkerLSN();
+
+    /**
+     * Sets flag indicating if this log should be replicated or not
+     *
+     * @param replicate
+     */
+    void setReplicate(boolean replicate);
+
+    /**
+     * Gets a flag indicating if this log should be replicated or not
+     *
+     * @return the flag
+     */
+    boolean isReplicate();
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRequester.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRequester.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRequester.java
new file mode 100644
index 0000000..8e874a2
--- /dev/null
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRequester.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.transactions;
+
+public interface ILogRequester {
+
+    /**
+     * Sends a notification to the log requester that {@code logRecord} has 
been flushed.
+     *
+     * @param logRecord The log that has been flushed.
+     */
+    void notifyFlushed(ILogRecord logRecord);
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java
index dea7a67..bfe7963 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java
@@ -119,4 +119,15 @@ public interface IRecoveryManager {
      * @throws ACIDException
      */
     void startLocalRecovery(Set<Integer> partitions) throws IOException, 
ACIDException;
+
+    /**
+     * Replay the commited transactions' logs belonging to {@code partitions}. 
if {@code flush} is true,
+     * all datasets are flushed after the logs are replayed.
+     *
+     * @param partitions
+     * @param flush
+     * @throws HyracksDataException
+     */
+    void replayReplicaPartitionLogs(Set<Integer> partitions, boolean flush) 
throws HyracksDataException;
+
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
index 68afb2a..6b38aa1 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
@@ -24,7 +24,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.zip.CRC32;
 
 import org.apache.asterix.common.context.PrimaryIndexOperationTracker;
-import org.apache.asterix.common.replication.IReplicationThread;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
 import org.apache.hyracks.storage.am.common.tuples.SimpleTupleReference;
 import org.apache.hyracks.storage.am.common.tuples.SimpleTupleWriter;
@@ -94,18 +93,21 @@ public class LogRecord implements ILogRecord {
     private final CRC32 checksumGen;
     private int[] PKFields;
     private PrimaryIndexOperationTracker opTracker;
-    private IReplicationThread replicationThread;
+
     /**
      * The fields (numOfFlushedIndexes and nodeId) are used for remote flush 
logs only
      * to indicate the source of the log and how many indexes were flushed 
using its LSN.
      */
     private int numOfFlushedIndexes;
     private String nodeId;
-    private boolean replicated = false;
+    private final AtomicBoolean replicated;
+    private boolean replicate = false;
+    private ILogRequester requester;
 
     public LogRecord(ILogMarkerCallback callback) {
         this.callback = callback;
         isFlushed = new AtomicBoolean(false);
+        replicated = new AtomicBoolean(false);
         readPKValue = new PrimaryKeyTupleReference();
         readNewValue = SimpleTupleWriter.INSTANCE.createTupleReference();
         readOldValue = SimpleTupleWriter.INSTANCE.createTupleReference();
@@ -638,15 +640,6 @@ public class LogRecord implements ILogRecord {
         this.nodeId = nodeId;
     }
 
-    public IReplicationThread getReplicationThread() {
-        return replicationThread;
-    }
-
-    @Override
-    public void setReplicationThread(IReplicationThread replicationThread) {
-        this.replicationThread = replicationThread;
-    }
-
     @Override
     public void setLogSource(byte logSource) {
         this.logSource = logSource;
@@ -684,13 +677,13 @@ public class LogRecord implements ILogRecord {
     }
 
     @Override
-    public void setReplicated(boolean replicate) {
-        this.replicated = replicate;
+    public void setReplicated(boolean replicated) {
+        this.replicated.set(replicated);
     }
 
     @Override
     public boolean isReplicated() {
-        return replicated;
+        return replicated.get();
     }
 
     @Override
@@ -732,4 +725,22 @@ public class LogRecord implements ILogRecord {
     public ByteBuffer getMarker() {
         return marker;
     }
+
+    @Override
+    public void setReplicate(boolean replicate) {
+        this.replicate = replicate;
+    }
+
+    @Override
+    public boolean isReplicate() {
+        return replicate;
+    }
+
+    public ILogRequester getRequester() {
+        return requester;
+    }
+
+    public void setRequester(ILogRequester requester) {
+        this.requester = requester;
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-replication/pom.xml
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-replication/pom.xml 
b/asterixdb/asterix-replication/pom.xml
index 7072e48..bb983ad 100644
--- a/asterixdb/asterix-replication/pom.xml
+++ b/asterixdb/asterix-replication/pom.xml
@@ -51,10 +51,6 @@
       <artifactId>hyracks-util</artifactId>
     </dependency>
     <dependency>
-      <groupId>commons-io</groupId>
-      <artifactId>commons-io</artifactId>
-    </dependency>
-    <dependency>
       <groupId>org.apache.hyracks</groupId>
       <artifactId>hyracks-storage-am-lsm-common</artifactId>
     </dependency>
@@ -72,14 +68,6 @@
       <version>${project.version}</version>
     </dependency>
     <dependency>
-      <groupId>org.apache.hyracks</groupId>
-      <artifactId>hyracks-control-nc</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.hyracks</groupId>
-      <artifactId>hyracks-control-common</artifactId>
-    </dependency>
-    <dependency>
       <groupId>com.fasterxml.jackson.core</groupId>
       <artifactId>jackson-core</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/IReplicaTask.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/IReplicaTask.java
 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/IReplicaTask.java
index 001d41f..807ae5f 100644
--- 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/IReplicaTask.java
+++ 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/IReplicaTask.java
@@ -19,7 +19,6 @@
 package org.apache.asterix.replication.api;
 
 import org.apache.asterix.common.api.INcApplicationContext;
-import org.apache.asterix.common.replication.IReplicationThread;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public interface IReplicaTask extends IReplicationMessage {
@@ -31,5 +30,5 @@ public interface IReplicaTask extends IReplicationMessage {
      * @param worker
      * @throws HyracksDataException
      */
-    void perform(INcApplicationContext appCtx, IReplicationThread worker) 
throws HyracksDataException;
+    void perform(INcApplicationContext appCtx, IReplicationWorker worker) 
throws HyracksDataException;
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/IReplicationMessage.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/IReplicationMessage.java
 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/IReplicationMessage.java
index 2e1cb8a..bc6e87b 100644
--- 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/IReplicationMessage.java
+++ 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/IReplicationMessage.java
@@ -20,7 +20,7 @@ package org.apache.asterix.replication.api;
 
 import java.io.OutputStream;
 
-import org.apache.asterix.replication.functions.ReplicationProtocol;
+import org.apache.asterix.replication.messaging.ReplicationProtocol;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public interface IReplicationMessage {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/IReplicationWorker.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/IReplicationWorker.java
 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/IReplicationWorker.java
new file mode 100644
index 0000000..8840c3f
--- /dev/null
+++ 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/IReplicationWorker.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.replication.api;
+
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+
+import org.apache.asterix.common.transactions.LogRecord;
+
+public interface IReplicationWorker extends Runnable {
+
+    /**
+     * @return The replication socket channel.
+     */
+    SocketChannel getChannel();
+
+    /**
+     * Gets a reusable buffer that can be used to send data
+     *
+     * @return the reusable buffer
+     */
+    ByteBuffer getReusableBuffer();
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/PartitionReplica.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/PartitionReplica.java
 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/PartitionReplica.java
new file mode 100644
index 0000000..f752745
--- /dev/null
+++ 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/PartitionReplica.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.replication.api;
+
+import static 
org.apache.asterix.common.replication.IPartitionReplica.PartitionReplicaStatus.CATCHING_UP;
+import static 
org.apache.asterix.common.replication.IPartitionReplica.PartitionReplicaStatus.DISCONNECTED;
+import static 
org.apache.asterix.common.replication.IPartitionReplica.PartitionReplicaStatus.IN_SYNC;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.exceptions.ReplicationException;
+import org.apache.asterix.common.replication.IPartitionReplica;
+import org.apache.asterix.common.storage.ReplicaIdentifier;
+import org.apache.asterix.replication.messaging.ReplicationProtocol;
+import org.apache.asterix.replication.sync.ReplicaSynchronizer;
+import org.apache.hyracks.util.JSONUtil;
+import org.apache.hyracks.util.StorageUtil;
+import org.apache.hyracks.util.annotations.ThreadSafe;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+@ThreadSafe
+public class PartitionReplica implements IPartitionReplica {
+
+    private static final Logger LOGGER = LogManager.getLogger();
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+    private static final int INITIAL_BUFFER_SIZE = 
StorageUtil.getIntSizeInBytes(4, StorageUtil.StorageUnit.KILOBYTE);
+    private final INcApplicationContext appCtx;
+    private final ReplicaIdentifier id;
+    private ByteBuffer reusbaleBuf;
+    private PartitionReplicaStatus status = DISCONNECTED;
+    private SocketChannel sc;
+
+    public PartitionReplica(ReplicaIdentifier id, INcApplicationContext 
appCtx) {
+        this.id = id;
+        this.appCtx = appCtx;
+    }
+
+    @Override
+    public synchronized PartitionReplicaStatus getStatus() {
+        return status;
+    }
+
+    @Override
+    public ReplicaIdentifier getIdentifier() {
+        return id;
+    }
+
+    @Override
+    public synchronized void notifyFailure(Exception failure) {
+        setStatus(DISCONNECTED);
+    }
+
+    public synchronized void sync() {
+        if (status == IN_SYNC || status == CATCHING_UP) {
+            return;
+        }
+        setStatus(CATCHING_UP);
+        appCtx.getThreadExecutor().execute(() -> {
+            try {
+                new ReplicaSynchronizer(appCtx, this).sync();
+                setStatus(IN_SYNC);
+            } catch (Exception e) {
+                LOGGER.error(() -> "Failed to sync replica " + this, e);
+                notifyFailure(e);
+            } finally {
+                close();
+            }
+        });
+    }
+
+    public synchronized SocketChannel getChannel() {
+        try {
+            if (sc == null || !sc.isOpen() || !sc.isConnected()) {
+                sc = SocketChannel.open();
+                sc.configureBlocking(true);
+                sc.connect(id.getLocation());
+            }
+            return sc;
+        } catch (IOException e) {
+            throw new ReplicationException(e);
+        }
+    }
+
+    public synchronized void close() {
+        try {
+            if (sc != null && sc.isOpen()) {
+                ReplicationProtocol.sendGoodbye(sc);
+                sc.close();
+                sc = null;
+            }
+        } catch (IOException e) {
+            LOGGER.warn("Failed to close channel", e);
+        }
+    }
+
+    public synchronized ByteBuffer getReusableBuffer() {
+        if (reusbaleBuf == null) {
+            reusbaleBuf = ByteBuffer.allocate(INITIAL_BUFFER_SIZE);
+        }
+        return reusbaleBuf;
+    }
+
+    private JsonNode asJson() {
+        ObjectNode json = OBJECT_MAPPER.createObjectNode();
+        json.put("id", id.toString());
+        json.put("state", status.name());
+        return json;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        PartitionReplica that = (PartitionReplica) o;
+        return id.equals(that.id);
+    }
+
+    @Override
+    public int hashCode() {
+        return id.hashCode();
+    }
+
+    @Override
+    public String toString() {
+        try {
+            return JSONUtil.convertNode(asJson());
+        } catch (JsonProcessingException e) {
+            throw new ReplicationException(e);
+        }
+    }
+
+    private synchronized void setStatus(PartitionReplicaStatus status) {
+        if (this.status == status) {
+            return;
+        }
+        LOGGER.info(() -> "Replica " + this + " status changing: " + 
this.status + " -> " + status);
+        this.status = status;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/ReplicationDestination.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/ReplicationDestination.java
 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/ReplicationDestination.java
new file mode 100644
index 0000000..a092322
--- /dev/null
+++ 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/ReplicationDestination.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.replication.api;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.channels.SocketChannel;
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+
+import org.apache.asterix.common.exceptions.ReplicationException;
+import org.apache.asterix.common.replication.IPartitionReplica;
+import org.apache.asterix.common.replication.IReplicationDestination;
+import org.apache.asterix.replication.messaging.ReplicationProtocol;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class ReplicationDestination implements IReplicationDestination {
+
+    private static final Logger LOGGER = LogManager.getLogger();
+    private final Set<IPartitionReplica> replicas = new HashSet<>();
+    private final InetSocketAddress location;
+    private SocketChannel logRepChannel;
+
+    private ReplicationDestination(InetSocketAddress location) {
+        this.location = location;
+    }
+
+    public static ReplicationDestination at(InetSocketAddress location) {
+        return new ReplicationDestination(location);
+    }
+
+    @Override
+    public synchronized void add(IPartitionReplica replica) {
+        replicas.add(replica);
+    }
+
+    @Override
+    public synchronized void remove(IPartitionReplica replica) {
+        replicas.remove(replica);
+    }
+
+    @Override
+    public synchronized void notifyFailure(Exception failure) {
+        replicas.forEach(replica -> replica.notifyFailure(failure));
+        closeLogReplicationChannel();
+    }
+
+    @Override
+    public Set<IPartitionReplica> getReplicas() {
+        return new HashSet<>(replicas);
+    }
+
+    public synchronized Optional<IPartitionReplica> getPartitionReplica(int 
partition) {
+        return replicas.stream().filter(replica -> 
replica.getIdentifier().getPartition() == partition
+                && replica.getStatus() == 
IPartitionReplica.PartitionReplicaStatus.IN_SYNC).findAny();
+    }
+
+    public synchronized SocketChannel getLogReplicationChannel() {
+        try {
+            if (logRepChannel == null || !logRepChannel.isOpen() || 
!logRepChannel.isConnected()) {
+                logRepChannel = SocketChannel.open();
+                logRepChannel.configureBlocking(true);
+                logRepChannel.connect(location);
+            }
+            return logRepChannel;
+        } catch (IOException e) {
+            throw new ReplicationException(e);
+        }
+    }
+
+    private synchronized void closeLogReplicationChannel() {
+        try {
+            if (logRepChannel != null && logRepChannel.isOpen()) {
+                ReplicationProtocol.sendGoodbye(logRepChannel);
+                logRepChannel.close();
+                logRepChannel = null;
+            }
+        } catch (IOException e) {
+            LOGGER.warn("Exception while closing socket", e);
+        }
+    }
+
+    @Override
+    public InetSocketAddress getLocation() {
+        return location;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        ReplicationDestination that = (ReplicationDestination) o;
+        return Objects.equals(location, that.location);
+    }
+
+    @Override
+    public String toString() {
+        return location.toString();
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(location);
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/functions/ReplicaFilesRequest.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/functions/ReplicaFilesRequest.java
 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/functions/ReplicaFilesRequest.java
deleted file mode 100644
index f5b2378..0000000
--- 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/functions/ReplicaFilesRequest.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.replication.functions;
-
-import java.io.DataInput;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.HashSet;
-import java.util.Set;
-
-public class ReplicaFilesRequest {
-    private final Set<Integer> partitionIds;
-    private final Set<String> existingFiles;
-
-    public ReplicaFilesRequest(Set<Integer> partitionIds, Set<String> 
existingFiles) {
-        this.partitionIds = partitionIds;
-        this.existingFiles = existingFiles;
-    }
-
-    public void serialize(OutputStream out) throws IOException {
-        DataOutputStream dos = new DataOutputStream(out);
-        dos.writeInt(partitionIds.size());
-        for (Integer partitionId : partitionIds) {
-            dos.writeInt(partitionId);
-        }
-        dos.writeInt(existingFiles.size());
-        for (String fileName : existingFiles) {
-            dos.writeUTF(fileName);
-        }
-    }
-
-    public static ReplicaFilesRequest create(DataInput input) throws 
IOException {
-        int size = input.readInt();
-        Set<Integer> partitionIds = new HashSet<>(size);
-        for (int i = 0; i < size; i++) {
-            partitionIds.add(input.readInt());
-        }
-        int filesCount = input.readInt();
-        Set<String> existingFiles = new HashSet<>(filesCount);
-        for (int i = 0; i < filesCount; i++) {
-            existingFiles.add(input.readUTF());
-        }
-        return new ReplicaFilesRequest(partitionIds, existingFiles);
-    }
-
-    public Set<Integer> getPartitionIds() {
-        return partitionIds;
-    }
-
-    public Set<String> getExistingFiles() {
-        return existingFiles;
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/functions/ReplicaIndexFlushRequest.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/functions/ReplicaIndexFlushRequest.java
 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/functions/ReplicaIndexFlushRequest.java
deleted file mode 100644
index a3c269d..0000000
--- 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/functions/ReplicaIndexFlushRequest.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.replication.functions;
-
-import java.io.DataInput;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.HashSet;
-import java.util.Set;
-
-public class ReplicaIndexFlushRequest {
-    Set<Long> laggingRescouresIds;
-
-    public ReplicaIndexFlushRequest(Set<Long> laggingRescouresIds) {
-        this.laggingRescouresIds = laggingRescouresIds;
-    }
-
-    public void serialize(OutputStream out) throws IOException {
-        DataOutputStream dos = new DataOutputStream(out);
-        dos.writeInt(laggingRescouresIds.size());
-        for (Long resourceId : laggingRescouresIds) {
-            dos.writeLong(resourceId);
-        }
-    }
-
-    public static ReplicaIndexFlushRequest create(DataInput input) throws 
IOException {
-        int numOfResources = input.readInt();
-        Set<Long> laggingRescouresIds = new HashSet<Long>(numOfResources);
-        for (int i = 0; i < numOfResources; i++) {
-            laggingRescouresIds.add(input.readLong());
-        }
-        return new ReplicaIndexFlushRequest(laggingRescouresIds);
-    }
-
-    public Set<Long> getLaggingRescouresIds() {
-        return laggingRescouresIds;
-    }
-
-    public void setLaggingRescouresIds(Set<Long> laggingRescouresIds) {
-        this.laggingRescouresIds = laggingRescouresIds;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/functions/ReplicaLogsRequest.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/functions/ReplicaLogsRequest.java
 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/functions/ReplicaLogsRequest.java
deleted file mode 100644
index 90df1e7..0000000
--- 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/functions/ReplicaLogsRequest.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.replication.functions;
-
-import java.io.DataInput;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.HashSet;
-import java.util.Set;
-
-public class ReplicaLogsRequest {
-    Set<String> replicaIds;
-    long fromLSN;
-
-    public ReplicaLogsRequest(Set<String> replicaIds, long fromLSN) {
-        this.replicaIds = replicaIds;
-        this.fromLSN = fromLSN;
-    }
-
-    public void serialize(OutputStream out) throws IOException {
-        DataOutputStream dos = new DataOutputStream(out);
-        dos.writeInt(replicaIds.size());
-        for (String replicaId : replicaIds) {
-            dos.writeUTF(replicaId);
-        }
-        dos.writeLong(fromLSN);
-    }
-
-    public static ReplicaLogsRequest create(DataInput input) throws 
IOException {
-        int size = input.readInt();
-        Set<String> replicaIds = new HashSet<String>(size);
-        for (int i = 0; i < size; i++) {
-            replicaIds.add(input.readUTF());
-        }
-        long fromLSN = input.readLong();
-        return new ReplicaLogsRequest(replicaIds, fromLSN);
-    }
-
-    public Set<String> getReplicaIds() {
-        return replicaIds;
-    }
-
-    public void setReplicaIds(Set<String> replicaIds) {
-        this.replicaIds = replicaIds;
-    }
-
-    public long getFromLSN() {
-        return fromLSN;
-    }
-
-    public void setFromLSN(long fromLSN) {
-        this.fromLSN = fromLSN;
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/functions/ReplicationProtocol.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/functions/ReplicationProtocol.java
 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/functions/ReplicationProtocol.java
deleted file mode 100644
index 8094548..0000000
--- 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/functions/ReplicationProtocol.java
+++ /dev/null
@@ -1,354 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.replication.functions;
-
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.SocketChannel;
-
-import org.apache.asterix.common.exceptions.ReplicationException;
-import org.apache.asterix.common.replication.ReplicaEvent;
-import org.apache.asterix.replication.api.IReplicationMessage;
-import org.apache.asterix.replication.management.NetworkingUtil;
-import org.apache.asterix.replication.messaging.CheckpointPartitionIndexesTask;
-import org.apache.asterix.replication.messaging.DeleteFileTask;
-import org.apache.asterix.replication.messaging.PartitionResourcesListResponse;
-import org.apache.asterix.replication.messaging.PartitionResourcesListTask;
-import org.apache.asterix.replication.messaging.ReplicateFileTask;
-import org.apache.asterix.replication.storage.LSMComponentProperties;
-import org.apache.asterix.replication.storage.LSMIndexFileProperties;
-import org.apache.asterix.replication.storage.PartitionReplica;
-import org.apache.hyracks.data.std.util.ExtendedByteArrayOutputStream;
-
-public class ReplicationProtocol {
-
-    /**
-     * All replication messages start with ReplicationRequestType (4 bytes), 
then the length of the request in bytes
-     */
-    public static final String JOB_REPLICATION_ACK = "$";
-
-    public static final  int REPLICATION_REQUEST_TYPE_SIZE = Integer.BYTES;
-    private static final  int REPLICATION_REQUEST_HEADER_SIZE = 
REPLICATION_REQUEST_TYPE_SIZE + Integer.BYTES;
-
-    /*
-     * ReplicationRequestType:
-     * REPLICATE_LOG: txn log replication
-     * REPLICATE_FILE: replicate a file(s)
-     * DELETE_FILE: delete a file(s)
-     * GET_REPLICA_FILES: used during remote recovery to request lost LSM 
Components
-     * GET_REPLICA_MAX_LSN: used during remote recovery initialize a log 
manager LSN
-     * GOODBYE: used to notify replicas that the replication request has been 
completed
-     * REPLICA_EVENT: used to notify replicas about a remote replica 
split/merge.
-     * LSM_COMPONENT_PROPERTIES: used to send the properties of an LSM 
Component before its physical files are sent
-     * ACK: used to notify the requesting replica that the request has been 
completed successfully
-     * FLUSH_INDEX: request remote replica to flush an LSM component
-     */
-    public enum ReplicationRequestType {
-        REPLICATE_LOG,
-        REPLICATE_FILE,
-        DELETE_FILE,
-        GET_REPLICA_FILES,
-        GET_REPLICA_MAX_LSN,
-        GOODBYE,
-        REPLICA_EVENT,
-        LSM_COMPONENT_PROPERTIES,
-        ACK,
-        FLUSH_INDEX,
-        PARTITION_RESOURCES_REQUEST,
-        PARTITION_RESOURCES_RESPONSE,
-        REPLICATE_RESOURCE_FILE,
-        DELETE_RESOURCE_FILE,
-        CHECKPOINT_PARTITION
-    }
-
-    public static ByteBuffer readRequest(SocketChannel socketChannel, 
ByteBuffer dataBuffer) throws IOException {
-        //read request size
-        NetworkingUtil.readBytes(socketChannel, dataBuffer, Integer.BYTES);
-        int requestSize = dataBuffer.getInt();
-
-        if (dataBuffer.capacity() < requestSize) {
-            dataBuffer = ByteBuffer.allocate(requestSize);
-        }
-
-        //read request
-        NetworkingUtil.readBytes(socketChannel, dataBuffer, requestSize);
-
-        return dataBuffer;
-    }
-
-    public static ByteBuffer 
writeLSMComponentPropertiesRequest(LSMComponentProperties lsmCompProp, 
ByteBuffer buffer)
-            throws IOException {
-        ExtendedByteArrayOutputStream outputStream = new 
ExtendedByteArrayOutputStream();
-        try (DataOutputStream oos = new DataOutputStream(outputStream)) {
-            lsmCompProp.serialize(oos);
-            int requestSize = REPLICATION_REQUEST_HEADER_SIZE + oos.size();
-            if (buffer.capacity() < requestSize) {
-                buffer = ByteBuffer.allocate(requestSize);
-            } else {
-                buffer.clear();
-            }
-            
buffer.putInt(ReplicationRequestType.LSM_COMPONENT_PROPERTIES.ordinal());
-            buffer.putInt(oos.size());
-            buffer.put(outputStream.getByteArray(), 0, 
outputStream.getLength());
-            buffer.flip();
-            return buffer;
-        }
-    }
-
-    public static ReplicationRequestType getRequestType(SocketChannel 
socketChannel, ByteBuffer byteBuffer)
-            throws IOException {
-        //read replication request type
-        NetworkingUtil.readBytes(socketChannel, byteBuffer, 
REPLICATION_REQUEST_TYPE_SIZE);
-
-        ReplicationRequestType requestType = 
ReplicationProtocol.ReplicationRequestType.values()[byteBuffer.getInt()];
-        return requestType;
-    }
-
-    public static LSMComponentProperties readLSMPropertiesRequest(ByteBuffer 
buffer) throws IOException {
-        ByteArrayInputStream bais = new ByteArrayInputStream(buffer.array(), 
buffer.position(), buffer.limit());
-        DataInputStream dis = new DataInputStream(bais);
-        return LSMComponentProperties.create(dis);
-    }
-
-    public static ByteBuffer getGoodbyeBuffer() {
-        ByteBuffer bb = ByteBuffer.allocate(REPLICATION_REQUEST_TYPE_SIZE);
-        bb.putInt(ReplicationRequestType.GOODBYE.ordinal());
-        bb.flip();
-        return bb;
-    }
-
-    public static ByteBuffer getAckBuffer() {
-        ByteBuffer bb = ByteBuffer.allocate(REPLICATION_REQUEST_TYPE_SIZE);
-        bb.putInt(ReplicationRequestType.ACK.ordinal());
-        bb.flip();
-        return bb;
-    }
-
-    public static ByteBuffer writeFileReplicationRequest(ByteBuffer 
requestBuffer, LSMIndexFileProperties afp,
-            ReplicationRequestType requestType) throws IOException {
-        ExtendedByteArrayOutputStream outputStream = new 
ExtendedByteArrayOutputStream();
-        try (DataOutputStream oos = new DataOutputStream(outputStream)) {
-            afp.serialize(oos);
-            int requestSize = REPLICATION_REQUEST_HEADER_SIZE + oos.size();
-            if (requestBuffer.capacity() < requestSize) {
-                requestBuffer = ByteBuffer.allocate(requestSize);
-            } else {
-                requestBuffer.clear();
-            }
-            requestBuffer.putInt(requestType.ordinal());
-            requestBuffer.putInt(oos.size());
-            requestBuffer.put(outputStream.getByteArray(), 0, 
outputStream.getLength());
-            requestBuffer.flip();
-            return requestBuffer;
-        }
-    }
-
-    public static LSMIndexFileProperties readFileReplicationRequest(ByteBuffer 
buffer) throws IOException {
-        ByteArrayInputStream bais = new ByteArrayInputStream(buffer.array(), 
buffer.position(), buffer.limit());
-        DataInputStream dis = new DataInputStream(bais);
-        return LSMIndexFileProperties.create(dis);
-    }
-
-    public static ByteBuffer writeReplicaEventRequest(ReplicaEvent event) 
throws IOException {
-        ExtendedByteArrayOutputStream outputStream = new 
ExtendedByteArrayOutputStream();
-        try (DataOutputStream oos = new DataOutputStream(outputStream)) {
-            event.serialize(oos);
-            ByteBuffer buffer = 
ByteBuffer.allocate(REPLICATION_REQUEST_HEADER_SIZE + oos.size());
-            buffer.putInt(ReplicationRequestType.REPLICA_EVENT.ordinal());
-            buffer.putInt(oos.size());
-            buffer.put(outputStream.getByteArray(), 0, 
outputStream.getLength());
-            buffer.flip();
-            return buffer;
-        }
-    }
-
-    public static ReplicaEvent readReplicaEventRequest(ByteBuffer buffer) 
throws IOException {
-        ByteArrayInputStream bais = new ByteArrayInputStream(buffer.array(), 
buffer.position(), buffer.limit());
-        DataInputStream dis = new DataInputStream(bais);
-
-        return ReplicaEvent.create(dis);
-    }
-
-    public static ByteBuffer writeGetReplicaFilesRequest(ByteBuffer buffer, 
ReplicaFilesRequest request)
-            throws IOException {
-        ExtendedByteArrayOutputStream outputStream = new 
ExtendedByteArrayOutputStream();
-        try (DataOutputStream oos = new DataOutputStream(outputStream)) {
-            request.serialize(oos);
-
-            int requestSize = REPLICATION_REQUEST_HEADER_SIZE + oos.size();
-            if (buffer.capacity() < requestSize) {
-                buffer = ByteBuffer.allocate(requestSize);
-            } else {
-                buffer.clear();
-            }
-            buffer.putInt(ReplicationRequestType.GET_REPLICA_FILES.ordinal());
-            buffer.putInt(oos.size());
-            buffer.put(outputStream.getByteArray(), 0, 
outputStream.getLength());
-            buffer.flip();
-            return buffer;
-        }
-    }
-
-    public static ByteBuffer writeGetReplicaIndexFlushRequest(ByteBuffer 
buffer, ReplicaIndexFlushRequest request)
-            throws IOException {
-        ExtendedByteArrayOutputStream outputStream = new 
ExtendedByteArrayOutputStream();
-        try (DataOutputStream oos = new DataOutputStream(outputStream)) {
-            request.serialize(oos);
-            int requestSize = REPLICATION_REQUEST_HEADER_SIZE + oos.size();
-            if (buffer.capacity() < requestSize) {
-                buffer = ByteBuffer.allocate(requestSize);
-            } else {
-                buffer.clear();
-            }
-            buffer.putInt(ReplicationRequestType.FLUSH_INDEX.ordinal());
-            buffer.putInt(oos.size());
-            buffer.put(outputStream.getByteArray(), 0, 
outputStream.getLength());
-            buffer.flip();
-            return buffer;
-        }
-    }
-
-    public static ReplicaFilesRequest readReplicaFileRequest(ByteBuffer 
buffer) throws IOException {
-        ByteArrayInputStream bais = new ByteArrayInputStream(buffer.array(), 
buffer.position(), buffer.limit());
-        DataInputStream dis = new DataInputStream(bais);
-        return ReplicaFilesRequest.create(dis);
-    }
-
-    public static ReplicaIndexFlushRequest 
readReplicaIndexFlushRequest(ByteBuffer buffer) throws IOException {
-        ByteArrayInputStream bais = new ByteArrayInputStream(buffer.array(), 
buffer.position(), buffer.limit());
-        DataInputStream dis = new DataInputStream(bais);
-        return ReplicaIndexFlushRequest.create(dis);
-    }
-
-    public static void writeGetReplicaMaxLSNRequest(ByteBuffer requestBuffer) {
-        requestBuffer.clear();
-        
requestBuffer.putInt(ReplicationRequestType.GET_REPLICA_MAX_LSN.ordinal());
-        requestBuffer.flip();
-    }
-
-    public static int getJobIdFromLogAckMessage(String msg) {
-        return 
Integer.parseInt(msg.substring((msg.indexOf(JOB_REPLICATION_ACK) + 1)));
-    }
-
-    public static String getNodeIdFromLogAckMessage(String msg) {
-        return msg.substring(0, msg.indexOf(JOB_REPLICATION_ACK));
-    }
-
-    /**
-     * Sends a goodbye request to a remote replica indicating the end of a 
replication request.
-     *
-     * @param socketChannel
-     *            the remote replica socket.
-     * @throws IOException
-     */
-    public static void sendGoodbye(SocketChannel socketChannel) throws 
IOException {
-        ByteBuffer goodbyeBuffer = ReplicationProtocol.getGoodbyeBuffer();
-        NetworkingUtil.transferBufferToChannel(socketChannel, goodbyeBuffer);
-    }
-
-    public static void sendAck(SocketChannel socketChannel) throws IOException 
{
-        ByteBuffer ackBuffer = ReplicationProtocol.getAckBuffer();
-        NetworkingUtil.transferBufferToChannel(socketChannel, ackBuffer);
-    }
-
-    public static void sendAck(SocketChannel socketChannel, ByteBuffer buf) {
-        try {
-            buf.clear();
-            buf.putInt(ReplicationRequestType.ACK.ordinal());
-            buf.flip();
-            NetworkingUtil.transferBufferToChannel(socketChannel, buf);
-        } catch (IOException e) {
-            throw new ReplicationException(e);
-        }
-    }
-
-    public static void waitForAck(PartitionReplica replica) throws IOException 
{
-        final SocketChannel channel = replica.getChannel();
-        final ByteBuffer buf = replica.gerReusableBuffer();
-        ReplicationRequestType responseFunction = 
ReplicationProtocol.getRequestType(channel, buf);
-        if (responseFunction != ReplicationRequestType.ACK) {
-            throw new IllegalStateException("Unexpected response while waiting 
for ack.");
-        }
-    }
-
-    public static void sendTo(PartitionReplica replica, IReplicationMessage 
task) {
-        final SocketChannel channel = replica.getChannel();
-        final ByteBuffer buf = replica.gerReusableBuffer();
-        sendTo(channel, task, buf);
-    }
-
-    public static void sendTo(SocketChannel channel, IReplicationMessage task, 
ByteBuffer buf) {
-        ExtendedByteArrayOutputStream outputStream = new 
ExtendedByteArrayOutputStream();
-        try (DataOutputStream oos = new DataOutputStream(outputStream)) {
-            task.serialize(oos);
-            final int requestSize = REPLICATION_REQUEST_HEADER_SIZE + 
oos.size();
-            final ByteBuffer requestBuffer = ensureSize(buf, requestSize);
-            requestBuffer.putInt(task.getMessageType().ordinal());
-            requestBuffer.putInt(oos.size());
-            requestBuffer.put(outputStream.getByteArray(), 0, 
outputStream.getLength());
-            requestBuffer.flip();
-            NetworkingUtil.transferBufferToChannel(channel, requestBuffer);
-        } catch (IOException e) {
-            throw new ReplicationException(e);
-        }
-    }
-
-    public static IReplicationMessage read(SocketChannel socketChannel, 
ByteBuffer buffer) throws IOException {
-        final ReplicationRequestType type = getRequestType(socketChannel, 
buffer);
-        return readMessage(type, socketChannel, buffer);
-    }
-
-    public static IReplicationMessage readMessage(ReplicationRequestType type, 
SocketChannel socketChannel,
-            ByteBuffer buffer) {
-        try {
-            ReplicationProtocol.readRequest(socketChannel, buffer);
-            final ByteArrayInputStream input =
-                    new ByteArrayInputStream(buffer.array(), 
buffer.position(), buffer.limit());
-            try (DataInputStream dis = new DataInputStream(input)) {
-                switch (type) {
-                    case PARTITION_RESOURCES_REQUEST:
-                        return PartitionResourcesListTask.create(dis);
-                    case PARTITION_RESOURCES_RESPONSE:
-                        return PartitionResourcesListResponse.create(dis);
-                    case REPLICATE_RESOURCE_FILE:
-                        return ReplicateFileTask.create(dis);
-                    case DELETE_RESOURCE_FILE:
-                        return DeleteFileTask.create(dis);
-                    case CHECKPOINT_PARTITION:
-                        return CheckpointPartitionIndexesTask.create(dis);
-                    default:
-                        throw new IllegalStateException("Unrecognized 
replication message");
-                }
-            }
-        } catch (IOException e) {
-            throw new ReplicationException(e);
-        }
-    }
-
-    private static ByteBuffer ensureSize(ByteBuffer buffer, int size) {
-        if (buffer.capacity() < size) {
-            return ByteBuffer.allocate(size);
-        }
-        buffer.clear();
-        return buffer;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogMapping.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogMapping.java
 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogMapping.java
deleted file mode 100644
index 0444952..0000000
--- 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogMapping.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.replication.logging;
-
-import java.util.concurrent.atomic.AtomicInteger;
-
-public class RemoteLogMapping {
-
-    private String remoteNodeID;
-    private long remoteLSN;
-    private long localLSN;
-    public AtomicInteger numOfFlushedIndexes = new AtomicInteger();
-
-    public void setRemoteNodeID(String remoteNodeID) {
-        this.remoteNodeID = remoteNodeID;
-    }
-
-    public long getRemoteLSN() {
-        return remoteLSN;
-    }
-
-    public void setRemoteLSN(long remoteLSN) {
-        this.remoteLSN = remoteLSN;
-    }
-
-    public long getLocalLSN() {
-        return localLSN;
-    }
-
-    public void setLocalLSN(long localLSN) {
-        this.localLSN = localLSN;
-    }
-
-    @Override
-    public String toString() {
-        StringBuilder sb = new StringBuilder();
-        sb.append("Remote Node: " + remoteNodeID);
-        sb.append(" Remote LSN: " + remoteLSN);
-        sb.append(" Local LSN: " + localLSN);
-        return sb.toString();
-    }
-
-    public String getNodeUniqueLSN() {
-        return TxnLogUtil.getNodeUniqueLSN(remoteNodeID, remoteLSN);
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogRecord.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogRecord.java
 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogRecord.java
new file mode 100644
index 0000000..0b496b8
--- /dev/null
+++ 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogRecord.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.replication.logging;
+
+import org.apache.asterix.replication.api.IReplicationWorker;
+import org.apache.asterix.common.transactions.LogRecord;
+
+public class RemoteLogRecord extends LogRecord {
+
+    private long masterLsn;
+    private IReplicationWorker replicationWorker;
+
+    public long getMasterLsn() {
+        return masterLsn;
+    }
+
+    public void setMasterLsn(long masterLsn) {
+        this.masterLsn = masterLsn;
+    }
+
+    public IReplicationWorker getReplicationWorker() {
+        return replicationWorker;
+    }
+
+    public void setReplicationWorker(IReplicationWorker replicationWorker) {
+        this.replicationWorker = replicationWorker;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogsNotifier.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogsNotifier.java
 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogsNotifier.java
new file mode 100644
index 0000000..d63496f
--- /dev/null
+++ 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogsNotifier.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.replication.logging;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.dataflow.DatasetLocalResource;
+import org.apache.asterix.common.storage.DatasetResourceReference;
+import org.apache.asterix.common.storage.IIndexCheckpointManager;
+import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
+import org.apache.asterix.common.transactions.LogType;
+import org.apache.asterix.replication.messaging.ReplicationProtocol;
+import 
org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.common.LocalResource;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+class RemoteLogsNotifier implements Runnable {
+
+    private static final Logger LOGGER = LogManager.getLogger();
+    private final PersistentLocalResourceRepository localResourceRep;
+    private final IIndexCheckpointManagerProvider 
indexCheckpointManagerProvider;
+    private final LinkedBlockingQueue<RemoteLogRecord> remoteLogsQ;
+    private final INcApplicationContext appCtx;
+
+    public RemoteLogsNotifier(INcApplicationContext appCtx, 
LinkedBlockingQueue<RemoteLogRecord> remoteLogsQ) {
+        this.appCtx = appCtx;
+        this.remoteLogsQ = remoteLogsQ;
+        localResourceRep = (PersistentLocalResourceRepository) 
appCtx.getLocalResourceRepository();
+        indexCheckpointManagerProvider = 
appCtx.getIndexCheckpointManagerProvider();
+    }
+
+    @Override
+    public void run() {
+        final String nodeId = appCtx.getServiceContext().getNodeId();
+        Thread.currentThread().setName(nodeId + 
RemoteLogsNotifier.class.getSimpleName());
+        while (!Thread.currentThread().isInterrupted()) {
+            try {
+                final RemoteLogRecord logRecord = remoteLogsQ.take();
+                switch (logRecord.getLogType()) {
+                    case LogType.JOB_COMMIT:
+                    case LogType.ABORT:
+                        // send ACK to requester
+                        
logRecord.getReplicationWorker().getChannel().socket().getOutputStream()
+                                .write((nodeId + 
ReplicationProtocol.LOG_REPLICATION_ACK + logRecord.getTxnId() + System
+                                        .lineSeparator()).getBytes());
+                        break;
+                    case LogType.FLUSH:
+                        checkpointReplicaIndexes(logRecord, 
logRecord.getDatasetId());
+                        break;
+                    default:
+                        throw new IllegalStateException("Unexpected log type: 
" + logRecord.getLogType());
+                }
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+            } catch (IOException e) {
+                LOGGER.error("Failed to process replicated log", e);
+            }
+        }
+    }
+
+    private void checkpointReplicaIndexes(RemoteLogRecord remoteLogMapping, 
int datasetId) throws HyracksDataException {
+        final Set<Integer> masterPartitions = 
appCtx.getReplicaManager().getPartitions();
+        final Predicate<LocalResource> replicaIndexesPredicate = lr -> {
+            DatasetLocalResource dls = (DatasetLocalResource) lr.getResource();
+            return dls.getDatasetId() == datasetId && 
!masterPartitions.contains(dls.getPartition());
+        };
+        final Map<Long, LocalResource> resources = 
localResourceRep.getResources(replicaIndexesPredicate);
+        final List<DatasetResourceReference> replicaIndexesRef =
+                
resources.values().stream().map(DatasetResourceReference::of).collect(Collectors.toList());
+        for (DatasetResourceReference replicaIndexRef : replicaIndexesRef) {
+            final IIndexCheckpointManager indexCheckpointManager = 
indexCheckpointManagerProvider.get(replicaIndexRef);
+            synchronized (indexCheckpointManager) {
+                
indexCheckpointManager.masterFlush(remoteLogMapping.getMasterLsn(), 
remoteLogMapping.getLSN());
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogsProcessor.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogsProcessor.java
 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogsProcessor.java
new file mode 100644
index 0000000..c054e6c
--- /dev/null
+++ 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogsProcessor.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.replication.logging;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.transactions.ILogManager;
+import org.apache.asterix.common.transactions.ILogRecord;
+import org.apache.asterix.common.transactions.ILogRequester;
+import org.apache.asterix.common.transactions.LogSource;
+import org.apache.asterix.common.transactions.LogType;
+import org.apache.asterix.common.utils.TransactionUtil;
+import org.apache.asterix.replication.api.IReplicationWorker;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class RemoteLogsProcessor implements ILogRequester {
+
+    private static final Logger LOGGER = LogManager.getLogger();
+    private final LinkedBlockingQueue<RemoteLogRecord> remoteLogsQ;
+    private final ILogManager logManager;
+
+    public RemoteLogsProcessor(INcApplicationContext appCtx) {
+        logManager = appCtx.getTransactionSubsystem().getLogManager();
+        remoteLogsQ = new LinkedBlockingQueue<>();
+        appCtx.getThreadExecutor().execute(new RemoteLogsNotifier(appCtx, 
remoteLogsQ));
+    }
+
+    public void process(ByteBuffer logsBatch, RemoteLogRecord reusableLog, 
IReplicationWorker worker) {
+        while (logsBatch.hasRemaining()) {
+            // get rid of log size
+            logsBatch.getInt();
+            reusableLog.readRemoteLog(logsBatch);
+            reusableLog.setLogSource(LogSource.REMOTE);
+            switch (reusableLog.getLogType()) {
+                case LogType.UPDATE:
+                case LogType.ENTITY_COMMIT:
+                    logManager.log(reusableLog);
+                    break;
+                case LogType.JOB_COMMIT:
+                case LogType.ABORT:
+                    RemoteLogRecord jobTerminationLog = new RemoteLogRecord();
+                    
TransactionUtil.formJobTerminateLogRecord(jobTerminationLog, 
reusableLog.getTxnId(),
+                            reusableLog.getLogType() == LogType.JOB_COMMIT);
+                    jobTerminationLog.setRequester(this);
+                    jobTerminationLog.setReplicationWorker(worker);
+                    jobTerminationLog.setLogSource(LogSource.REMOTE);
+                    logManager.log(jobTerminationLog);
+                    break;
+                case LogType.FLUSH:
+                    RemoteLogRecord flushLog = new RemoteLogRecord();
+                    TransactionUtil
+                            .formFlushLogRecord(flushLog, 
reusableLog.getDatasetId(), null, reusableLog.getNodeId(),
+                                    reusableLog.getNumOfFlushedIndexes());
+                    flushLog.setRequester(this);
+                    flushLog.setLogSource(LogSource.REMOTE);
+                    flushLog.setMasterLsn(reusableLog.getLSN());
+                    logManager.log(flushLog);
+                    break;
+                default:
+                    LOGGER.error(() -> "Unsupported LogType: " + 
reusableLog.getLogType());
+            }
+        }
+    }
+
+    @Override
+    public void notifyFlushed(ILogRecord logRecord) {
+        remoteLogsQ.add((RemoteLogRecord) logRecord);
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/ReplicationLogBuffer.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/ReplicationLogBuffer.java
 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/ReplicationLogBuffer.java
index 283f69f..a94f073 100644
--- 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/ReplicationLogBuffer.java
+++ 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/ReplicationLogBuffer.java
@@ -22,9 +22,10 @@ import java.nio.ByteBuffer;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.asterix.common.transactions.ILogRecord;
-import org.apache.asterix.replication.management.ReplicationManager;
+import org.apache.asterix.replication.management.LogReplicationManager;
 
 public class ReplicationLogBuffer {
+
     private final int logBufferSize;
     private final AtomicBoolean full;
     private int appendOffset;
@@ -32,10 +33,10 @@ public class ReplicationLogBuffer {
     private final ByteBuffer appendBuffer;
     private final ByteBuffer replicationBuffer;
     private boolean stop;
-    private ReplicationManager replicationManager;
+    private final LogReplicationManager replicationManager;
     private final int batchSize;
 
-    public ReplicationLogBuffer(ReplicationManager replicationManager, int 
logBufferSize, int batchSize) {
+    public ReplicationLogBuffer(LogReplicationManager replicationManager, int 
logBufferSize, int batchSize) {
         this.replicationManager = replicationManager;
         this.logBufferSize = logBufferSize;
         this.batchSize = batchSize;
@@ -116,7 +117,7 @@ public class ReplicationLogBuffer {
     private void transferBuffer(ByteBuffer buffer) {
         if (buffer.remaining() <= batchSize) {
             //the current batch can be sent as it is
-            replicationManager.replicateTxnLogBatch(buffer);
+            replicationManager.transferBatch(buffer);
             return;
         }
         /**
@@ -141,7 +142,7 @@ public class ReplicationLogBuffer {
                 //return to the beginning of the batch position
                 buffer.reset();
             }
-            replicationManager.replicateTxnLogBatch(buffer);
+            replicationManager.transferBatch(buffer);
             //return the original limit to check the new remaining size
             buffer.limit(totalTransferLimit);
         }
@@ -159,7 +160,7 @@ public class ReplicationLogBuffer {
         return logBufferSize;
     }
 
-    public ReplicationManager getReplicationManager() {
+    public LogReplicationManager getReplicationManager() {
         return replicationManager;
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/TxnAck.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/TxnAck.java
 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/TxnAck.java
new file mode 100644
index 0000000..1b0ac23
--- /dev/null
+++ 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/TxnAck.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.replication.logging;
+
+import java.util.Set;
+
+import org.apache.asterix.common.replication.IReplicationDestination;
+import org.apache.asterix.common.transactions.ILogRecord;
+
+public class TxnAck {
+
+    private final Set<IReplicationDestination> pendingAcks;
+    private final ILogRecord logRecord;
+
+    public TxnAck(ILogRecord logRecord, Set<IReplicationDestination> 
pendingAcks) {
+        this.logRecord = logRecord;
+        this.pendingAcks = pendingAcks;
+    }
+
+    public synchronized void ack(IReplicationDestination dest) {
+        pendingAcks.remove(dest);
+        if (allAcked()) {
+            synchronized (logRecord) {
+                logRecord.setReplicated(true);
+                logRecord.notifyAll();
+            }
+        }
+    }
+
+    public synchronized boolean allAcked() {
+        return pendingAcks.isEmpty();
+    }
+}

Reply via email to