http://git-wip-us.apache.org/repos/asf/asterixdb/blob/929344e9/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/AbstractLSMIOOperationCallbackTest.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/AbstractLSMIOOperationCallbackTest.java
 
b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/AbstractLSMIOOperationCallbackTest.java
index 0f2ea50..1c57d1b 100644
--- 
a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/AbstractLSMIOOperationCallbackTest.java
+++ 
b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/AbstractLSMIOOperationCallbackTest.java
@@ -21,6 +21,8 @@ package org.apache.asterix.test.ioopcallbacks;
 
 import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
 import org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallback;
+import org.apache.asterix.common.storage.IIndexCheckpointManager;
+import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator;
@@ -45,8 +47,8 @@ public abstract class AbstractLSMIOOperationCallbackTest 
extends TestCase {
         ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class);
         Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(2);
         
Mockito.when(mockIndex.getCurrentMemoryComponent()).thenReturn(Mockito.mock(AbstractLSMMemoryComponent.class));
-        LSMBTreeIOOperationCallback callback =
-                new LSMBTreeIOOperationCallback(mockIndex, new 
LSMComponentIdGenerator());
+        LSMBTreeIOOperationCallback callback = new 
LSMBTreeIOOperationCallback(mockIndex, new LSMComponentIdGenerator(),
+                mockIndexCheckpointManagerProvider());
 
         //request to flush first component
         callback.updateLastLSN(1);
@@ -57,13 +59,14 @@ public abstract class AbstractLSMIOOperationCallbackTest 
extends TestCase {
         callback.beforeOperation(LSMIOOperationType.FLUSH);
 
         Assert.assertEquals(1, callback.getComponentLSN(null));
-        callback.afterOperation(LSMIOOperationType.FLUSH, null, 
mockDiskComponent());
-        callback.afterFinalize(LSMIOOperationType.FLUSH, mockDiskComponent());
+        final ILSMDiskComponent diskComponent1 = mockDiskComponent();
+        callback.afterOperation(LSMIOOperationType.FLUSH, null, 
diskComponent1);
+        callback.afterFinalize(LSMIOOperationType.FLUSH, diskComponent1);
 
         Assert.assertEquals(2, callback.getComponentLSN(null));
-
-        callback.afterOperation(LSMIOOperationType.FLUSH, null, 
mockDiskComponent());
-        callback.afterFinalize(LSMIOOperationType.FLUSH, mockDiskComponent());
+        final ILSMDiskComponent diskComponent2 = mockDiskComponent();
+        callback.afterOperation(LSMIOOperationType.FLUSH, null, 
diskComponent2);
+        callback.afterFinalize(LSMIOOperationType.FLUSH, diskComponent2);
     }
 
     @Test
@@ -72,8 +75,8 @@ public abstract class AbstractLSMIOOperationCallbackTest 
extends TestCase {
         Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(2);
         
Mockito.when(mockIndex.getCurrentMemoryComponent()).thenReturn(Mockito.mock(AbstractLSMMemoryComponent.class));
 
-        LSMBTreeIOOperationCallback callback =
-                new LSMBTreeIOOperationCallback(mockIndex, new 
LSMComponentIdGenerator());
+        LSMBTreeIOOperationCallback callback = new 
LSMBTreeIOOperationCallback(mockIndex, new LSMComponentIdGenerator(),
+                mockIndexCheckpointManagerProvider());
 
         //request to flush first component
         callback.updateLastLSN(1);
@@ -90,11 +93,13 @@ public abstract class AbstractLSMIOOperationCallbackTest 
extends TestCase {
         //the scheduleFlush request would fail this time
 
         Assert.assertEquals(1, callback.getComponentLSN(null));
-        callback.afterOperation(LSMIOOperationType.FLUSH, null, 
mockDiskComponent());
-        callback.afterFinalize(LSMIOOperationType.FLUSH, mockDiskComponent());
-
+        final ILSMDiskComponent diskComponent1 = mockDiskComponent();
+        callback.afterOperation(LSMIOOperationType.FLUSH, null, 
diskComponent1);
+        callback.afterFinalize(LSMIOOperationType.FLUSH, diskComponent1);
+        final ILSMDiskComponent diskComponent2 = mockDiskComponent();
         Assert.assertEquals(2, callback.getComponentLSN(null));
-        callback.afterFinalize(LSMIOOperationType.FLUSH, mockDiskComponent());
+        callback.afterOperation(LSMIOOperationType.FLUSH, null, 
diskComponent2);
+        callback.afterFinalize(LSMIOOperationType.FLUSH, diskComponent2);
     }
 
     @Test
@@ -103,9 +108,8 @@ public abstract class AbstractLSMIOOperationCallbackTest 
extends TestCase {
         Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(2);
         
Mockito.when(mockIndex.getCurrentMemoryComponent()).thenReturn(Mockito.mock(AbstractLSMMemoryComponent.class));
 
-        LSMBTreeIOOperationCallback callback =
-                new LSMBTreeIOOperationCallback(mockIndex, new 
LSMComponentIdGenerator());
-
+        LSMBTreeIOOperationCallback callback = new 
LSMBTreeIOOperationCallback(mockIndex, new LSMComponentIdGenerator(),
+                mockIndexCheckpointManagerProvider());
         //request to flush first component
         callback.updateLastLSN(1);
         callback.beforeOperation(LSMIOOperationType.FLUSH);
@@ -144,7 +148,8 @@ public abstract class AbstractLSMIOOperationCallbackTest 
extends TestCase {
         ILSMMemoryComponent mockComponent = 
Mockito.mock(AbstractLSMMemoryComponent.class);
         
Mockito.when(mockIndex.getCurrentMemoryComponent()).thenReturn(mockComponent);
 
-        LSMBTreeIOOperationCallback callback = new 
LSMBTreeIOOperationCallback(mockIndex, idGenerator);
+        LSMBTreeIOOperationCallback callback =
+                new LSMBTreeIOOperationCallback(mockIndex, idGenerator, 
mockIndexCheckpointManagerProvider());
 
         ILSMComponentId initialId = idGenerator.getId();
         // simulate a partition is flushed before allocated
@@ -162,7 +167,8 @@ public abstract class AbstractLSMIOOperationCallbackTest 
extends TestCase {
         Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(2);
         ILSMMemoryComponent mockComponent = 
Mockito.mock(AbstractLSMMemoryComponent.class);
         
Mockito.when(mockIndex.getCurrentMemoryComponent()).thenReturn(mockComponent);
-        LSMBTreeIOOperationCallback callback = new 
LSMBTreeIOOperationCallback(mockIndex, idGenerator);
+        LSMBTreeIOOperationCallback callback =
+                new LSMBTreeIOOperationCallback(mockIndex, idGenerator, 
mockIndexCheckpointManagerProvider());
 
         ILSMComponentId id = idGenerator.getId();
         callback.allocated(mockComponent);
@@ -178,8 +184,9 @@ public abstract class AbstractLSMIOOperationCallbackTest 
extends TestCase {
             callback.beforeOperation(LSMIOOperationType.FLUSH);
             callback.recycled(mockComponent, true);
 
-            callback.afterOperation(LSMIOOperationType.FLUSH, null, 
mockDiskComponent());
-            callback.afterFinalize(LSMIOOperationType.FLUSH, 
mockDiskComponent());
+            final ILSMDiskComponent diskComponent = mockDiskComponent();
+            callback.afterOperation(LSMIOOperationType.FLUSH, null, 
diskComponent);
+            callback.afterFinalize(LSMIOOperationType.FLUSH, diskComponent);
             checkMemoryComponent(expectedId, mockComponent);
         }
     }
@@ -191,7 +198,8 @@ public abstract class AbstractLSMIOOperationCallbackTest 
extends TestCase {
         Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(2);
         ILSMMemoryComponent mockComponent = 
Mockito.mock(AbstractLSMMemoryComponent.class);
         
Mockito.when(mockIndex.getCurrentMemoryComponent()).thenReturn(mockComponent);
-        LSMBTreeIOOperationCallback callback = new 
LSMBTreeIOOperationCallback(mockIndex, idGenerator);
+        LSMBTreeIOOperationCallback callback =
+                new LSMBTreeIOOperationCallback(mockIndex, idGenerator, 
mockIndexCheckpointManagerProvider());
 
         ILSMComponentId id = idGenerator.getId();
         callback.allocated(mockComponent);
@@ -216,7 +224,8 @@ public abstract class AbstractLSMIOOperationCallbackTest 
extends TestCase {
         ILSMMemoryComponent mockComponent = 
Mockito.mock(AbstractLSMMemoryComponent.class);
         
Mockito.when(mockIndex.getCurrentMemoryComponent()).thenReturn(mockComponent);
         Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(2);
-        LSMBTreeIOOperationCallback callback = new 
LSMBTreeIOOperationCallback(mockIndex, idGenerator);
+        LSMBTreeIOOperationCallback callback =
+                new LSMBTreeIOOperationCallback(mockIndex, idGenerator, 
mockIndexCheckpointManagerProvider());
 
         ILSMComponentId id = idGenerator.getId();
         callback.allocated(mockComponent);
@@ -230,7 +239,9 @@ public abstract class AbstractLSMIOOperationCallbackTest 
extends TestCase {
 
         callback.updateLastLSN(0);
         callback.beforeOperation(LSMIOOperationType.FLUSH);
-        callback.afterFinalize(LSMIOOperationType.FLUSH, 
Mockito.mock(ILSMDiskComponent.class));
+        final ILSMDiskComponent diskComponent = mockDiskComponent();
+        callback.afterOperation(LSMIOOperationType.FLUSH, null, diskComponent);
+        callback.afterFinalize(LSMIOOperationType.FLUSH, diskComponent);
 
         // another flush is to be scheduled before the component is recycled
         idGenerator.refresh();
@@ -243,7 +254,9 @@ public abstract class AbstractLSMIOOperationCallbackTest 
extends TestCase {
         // schedule the next flush
         callback.updateLastLSN(0);
         callback.beforeOperation(LSMIOOperationType.FLUSH);
-        callback.afterFinalize(LSMIOOperationType.FLUSH, 
Mockito.mock(ILSMDiskComponent.class));
+        final ILSMDiskComponent diskComponent2 = mockDiskComponent();
+        callback.afterOperation(LSMIOOperationType.FLUSH, null, 
diskComponent2);
+        callback.afterFinalize(LSMIOOperationType.FLUSH, diskComponent2);
         callback.recycled(mockComponent, true);
         checkMemoryComponent(nextId, mockComponent);
     }
@@ -263,5 +276,14 @@ public abstract class AbstractLSMIOOperationCallbackTest 
extends TestCase {
         return component;
     }
 
-    protected abstract AbstractLSMIOOperationCallback getIoCallback();
+    protected IIndexCheckpointManagerProvider 
mockIndexCheckpointManagerProvider() throws HyracksDataException {
+        IIndexCheckpointManagerProvider indexCheckpointManagerProvider =
+                Mockito.mock(IIndexCheckpointManagerProvider.class);
+        IIndexCheckpointManager indexCheckpointManager = 
Mockito.mock(IIndexCheckpointManager.class);
+        
Mockito.doNothing().when(indexCheckpointManager).flushed(Mockito.any(), 
Mockito.anyLong());
+        
Mockito.doReturn(indexCheckpointManager).when(indexCheckpointManagerProvider).get(Mockito.any());
+        return indexCheckpointManagerProvider;
+    }
+
+    protected abstract AbstractLSMIOOperationCallback getIoCallback() throws 
HyracksDataException;
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/929344e9/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMBTreeIOOperationCallbackTest.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMBTreeIOOperationCallbackTest.java
 
b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMBTreeIOOperationCallbackTest.java
index c22e2e3..a4bc399 100644
--- 
a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMBTreeIOOperationCallbackTest.java
+++ 
b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMBTreeIOOperationCallbackTest.java
@@ -21,6 +21,7 @@ package org.apache.asterix.test.ioopcallbacks;
 
 import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
 import org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallback;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentIdGenerator;
 import org.mockito.Mockito;
@@ -28,10 +29,11 @@ import org.mockito.Mockito;
 public class LSMBTreeIOOperationCallbackTest extends 
AbstractLSMIOOperationCallbackTest {
 
     @Override
-    protected AbstractLSMIOOperationCallback getIoCallback() {
+    protected AbstractLSMIOOperationCallback getIoCallback() throws 
HyracksDataException {
         ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class);
         Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(2);
-        return new LSMBTreeIOOperationCallback(mockIndex, new 
LSMComponentIdGenerator());
+        return new LSMBTreeIOOperationCallback(mockIndex, new 
LSMComponentIdGenerator(),
+                mockIndexCheckpointManagerProvider());
     }
 
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/929344e9/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallbackTest.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallbackTest.java
 
b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallbackTest.java
index 356c80a..5f37c78 100644
--- 
a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallbackTest.java
+++ 
b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallbackTest.java
@@ -21,6 +21,7 @@ package org.apache.asterix.test.ioopcallbacks;
 
 import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
 import 
org.apache.asterix.common.ioopcallbacks.LSMBTreeWithBuddyIOOperationCallback;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentIdGenerator;
 import org.mockito.Mockito;
@@ -28,10 +29,11 @@ import org.mockito.Mockito;
 public class LSMBTreeWithBuddyIOOperationCallbackTest extends 
AbstractLSMIOOperationCallbackTest {
 
     @Override
-    protected AbstractLSMIOOperationCallback getIoCallback() {
+    protected AbstractLSMIOOperationCallback getIoCallback() throws 
HyracksDataException {
         ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class);
         Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(2);
-        return new LSMBTreeWithBuddyIOOperationCallback(mockIndex, new 
LSMComponentIdGenerator());
+        return new LSMBTreeWithBuddyIOOperationCallback(mockIndex, new 
LSMComponentIdGenerator(),
+                mockIndexCheckpointManagerProvider());
     }
 
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/929344e9/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMInvertedIndexIOOperationCallbackTest.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMInvertedIndexIOOperationCallbackTest.java
 
b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMInvertedIndexIOOperationCallbackTest.java
index ac4595e..343bc59 100644
--- 
a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMInvertedIndexIOOperationCallbackTest.java
+++ 
b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMInvertedIndexIOOperationCallbackTest.java
@@ -21,6 +21,7 @@ package org.apache.asterix.test.ioopcallbacks;
 
 import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
 import 
org.apache.asterix.common.ioopcallbacks.LSMInvertedIndexIOOperationCallback;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentIdGenerator;
 import org.mockito.Mockito;
@@ -28,10 +29,11 @@ import org.mockito.Mockito;
 public class LSMInvertedIndexIOOperationCallbackTest extends 
AbstractLSMIOOperationCallbackTest {
 
     @Override
-    protected AbstractLSMIOOperationCallback getIoCallback() {
+    protected AbstractLSMIOOperationCallback getIoCallback() throws 
HyracksDataException {
         ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class);
         Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(2);
-        return new LSMInvertedIndexIOOperationCallback(mockIndex, new 
LSMComponentIdGenerator());
+        return new LSMInvertedIndexIOOperationCallback(mockIndex, new 
LSMComponentIdGenerator(),
+                mockIndexCheckpointManagerProvider());
     }
 
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/929344e9/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMRTreeIOOperationCallbackTest.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMRTreeIOOperationCallbackTest.java
 
b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMRTreeIOOperationCallbackTest.java
index 0131e3f..10d95d8 100644
--- 
a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMRTreeIOOperationCallbackTest.java
+++ 
b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMRTreeIOOperationCallbackTest.java
@@ -21,6 +21,7 @@ package org.apache.asterix.test.ioopcallbacks;
 
 import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
 import org.apache.asterix.common.ioopcallbacks.LSMRTreeIOOperationCallback;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentIdGenerator;
 import org.mockito.Mockito;
@@ -28,10 +29,11 @@ import org.mockito.Mockito;
 public class LSMRTreeIOOperationCallbackTest extends 
AbstractLSMIOOperationCallbackTest {
 
     @Override
-    protected AbstractLSMIOOperationCallback getIoCallback() {
+    protected AbstractLSMIOOperationCallback getIoCallback() throws 
HyracksDataException {
         ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class);
         Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(2);
-        return new LSMRTreeIOOperationCallback(mockIndex, new 
LSMComponentIdGenerator());
+        return new LSMRTreeIOOperationCallback(mockIndex, new 
LSMComponentIdGenerator(),
+                mockIndexCheckpointManagerProvider());
     }
 
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/929344e9/asterixdb/asterix-installer/src/test/java/org/apache/asterix/installer/test/AsterixLifecycleIT.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-installer/src/test/java/org/apache/asterix/installer/test/AsterixLifecycleIT.java
 
b/asterixdb/asterix-installer/src/test/java/org/apache/asterix/installer/test/AsterixLifecycleIT.java
index 814e109..1434629 100644
--- 
a/asterixdb/asterix-installer/src/test/java/org/apache/asterix/installer/test/AsterixLifecycleIT.java
+++ 
b/asterixdb/asterix-installer/src/test/java/org/apache/asterix/installer/test/AsterixLifecycleIT.java
@@ -19,43 +19,18 @@
 package org.apache.asterix.installer.test;
 
 import java.io.File;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.logging.Logger;
 
-import org.apache.asterix.event.error.VerificationUtil;
-import org.apache.asterix.event.model.AsterixInstance;
 import org.apache.asterix.event.model.AsterixInstance.State;
-import org.apache.asterix.event.model.AsterixRuntimeState;
-import org.apache.asterix.event.service.ServiceProvider;
-import org.apache.asterix.installer.command.CommandHandler;
-import org.apache.asterix.test.common.TestExecutor;
-import org.apache.asterix.testframework.context.TestCaseContext;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
-import org.junit.FixMethodOrder;
-import org.junit.Test;
-import org.junit.runners.MethodSorters;
-import org.junit.runners.Parameterized.Parameters;
-
-@FixMethodOrder(MethodSorters.NAME_ASCENDING)
 
 public class AsterixLifecycleIT {
 
-    private static final int NUM_NC = 2;
-    private static final CommandHandler cmdHandler = new CommandHandler();
-    private static final String PATH_BASE = 
"src/test/resources/integrationts/lifecycle";
     private static final String PATH_ACTUAL = "target" + File.separator + 
"ittest" + File.separator;
-    private static final Logger LOGGER = 
Logger.getLogger(AsterixLifecycleIT.class.getName());
-    private static List<TestCaseContext> testCaseCollection;
-    private final TestExecutor testExecutor = new TestExecutor();
 
     @BeforeClass
     public static void setUp() throws Exception {
         
AsterixInstallerIntegrationUtil.init(AsterixInstallerIntegrationUtil.LOCAL_CLUSTER_PATH);
-        TestCaseContext.Builder b = new TestCaseContext.Builder();
-        testCaseCollection = b.build(new File(PATH_BASE));
         File outdir = new File(PATH_ACTUAL);
         outdir.mkdirs();
     }
@@ -70,88 +45,8 @@ public class AsterixLifecycleIT {
         }
     }
 
-    @Parameters
-    public static Collection<Object[]> tests() throws Exception {
-        Collection<Object[]> testArgs = new ArrayList<>();
-        return testArgs;
-    }
-
     public static void restartInstance() throws Exception {
         
AsterixInstallerIntegrationUtil.transformIntoRequiredState(State.INACTIVE);
         
AsterixInstallerIntegrationUtil.transformIntoRequiredState(State.ACTIVE);
     }
-
-    @Test
-    public void test_1_StopActiveInstance() throws Exception {
-        try {
-            LOGGER.info("Starting test: test_1_StopActiveInstance");
-            
AsterixInstallerIntegrationUtil.transformIntoRequiredState(State.ACTIVE);
-            String command = "stop -n " + 
AsterixInstallerIntegrationUtil.ASTERIX_INSTANCE_NAME;
-            cmdHandler.processCommand(command.split(" "));
-            Thread.sleep(4000);
-            AsterixInstance instance = 
ServiceProvider.INSTANCE.getLookupService()
-                    
.getAsterixInstance(AsterixInstallerIntegrationUtil.ASTERIX_INSTANCE_NAME);
-            AsterixRuntimeState state = 
VerificationUtil.getAsterixRuntimeState(instance);
-            assert (state.getFailedNCs().size() == NUM_NC && 
!state.isCcRunning());
-            LOGGER.info("PASSED: test_1_StopActiveInstance");
-        } catch (Exception e) {
-            throw new Exception("Test configure installer " + "\" FAILED!", e);
-        }
-    }
-
-    @Test
-    public void test_2_StartActiveInstance() throws Exception {
-        try {
-            LOGGER.info("Starting test: test_2_StartActiveInstance");
-            
AsterixInstallerIntegrationUtil.transformIntoRequiredState(State.INACTIVE);
-            String command = "start -n " + 
AsterixInstallerIntegrationUtil.ASTERIX_INSTANCE_NAME;
-            cmdHandler.processCommand(command.split(" "));
-            AsterixInstance instance = 
ServiceProvider.INSTANCE.getLookupService()
-                    
.getAsterixInstance(AsterixInstallerIntegrationUtil.ASTERIX_INSTANCE_NAME);
-            AsterixRuntimeState state = 
VerificationUtil.getAsterixRuntimeState(instance);
-            assert (state.getFailedNCs().size() == 0 && state.isCcRunning());
-            LOGGER.info("PASSED: test_2_StartActiveInstance");
-        } catch (Exception e) {
-            throw new Exception("Test configure installer " + "\" FAILED!", e);
-        }
-    }
-
-    @Test
-    public void test_3_DeleteActiveInstance() throws Exception {
-        try {
-            LOGGER.info("Starting test: test_3_DeleteActiveInstance");
-            
AsterixInstallerIntegrationUtil.transformIntoRequiredState(State.INACTIVE);
-            String command = "delete -n " + 
AsterixInstallerIntegrationUtil.ASTERIX_INSTANCE_NAME;
-            cmdHandler.processCommand(command.split(" "));
-            AsterixInstance instance = 
ServiceProvider.INSTANCE.getLookupService()
-                    
.getAsterixInstance(AsterixInstallerIntegrationUtil.ASTERIX_INSTANCE_NAME);
-            assert (instance == null);
-            LOGGER.info("PASSED: test_3_DeleteActiveInstance");
-        } catch (Exception e) {
-            throw new Exception("Test delete active instance " + "\" FAILED!", 
e);
-        } finally {
-            // recreate instance
-            AsterixInstallerIntegrationUtil.createInstance();
-        }
-    }
-
-    @Test
-    public void test() throws Exception {
-        for (TestCaseContext testCaseCtx : testCaseCollection) {
-            testExecutor.executeTest(PATH_ACTUAL, testCaseCtx, null, false);
-        }
-    }
-
-    public static void main(String[] args) throws Exception {
-        try {
-            setUp();
-            new AsterixLifecycleIT().test();
-        } catch (Exception e) {
-            e.printStackTrace();
-            LOGGER.info("TEST CASE(S) FAILED");
-        } finally {
-            tearDown();
-        }
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/929344e9/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogMapping.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogMapping.java
 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogMapping.java
index 3b2aff7..0444952 100644
--- 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogMapping.java
+++ 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogMapping.java
@@ -27,10 +27,6 @@ public class RemoteLogMapping {
     private long localLSN;
     public AtomicInteger numOfFlushedIndexes = new AtomicInteger();
 
-    public String getRemoteNodeID() {
-        return remoteNodeID;
-    }
-
     public void setRemoteNodeID(String remoteNodeID) {
         this.remoteNodeID = remoteNodeID;
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/929344e9/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
index 3143284..dfc23d3 100644
--- 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
+++ 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
@@ -27,9 +27,6 @@ import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 import java.nio.channels.ServerSocketChannel;
 import java.nio.channels.SocketChannel;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
@@ -39,13 +36,17 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.function.Predicate;
 import java.util.logging.Level;
 import java.util.logging.Logger;
+import java.util.stream.Collectors;
 
 import org.apache.asterix.common.api.IDatasetLifecycleManager;
+import org.apache.asterix.common.api.INcApplicationContext;
 import org.apache.asterix.common.cluster.ClusterPartition;
 import org.apache.asterix.common.config.ReplicationProperties;
 import org.apache.asterix.common.context.IndexInfo;
+import org.apache.asterix.common.dataflow.DatasetLocalResource;
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
 import org.apache.asterix.common.replication.IReplicaResourcesManager;
@@ -55,6 +56,9 @@ import 
org.apache.asterix.common.replication.IReplicationStrategy;
 import org.apache.asterix.common.replication.IReplicationThread;
 import org.apache.asterix.common.replication.ReplicaEvent;
 import org.apache.asterix.common.storage.DatasetResourceReference;
+import org.apache.asterix.common.storage.IIndexCheckpointManager;
+import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
+import org.apache.asterix.common.storage.ResourceReference;
 import org.apache.asterix.common.transactions.IAppRuntimeContextProvider;
 import org.apache.asterix.common.transactions.ILogManager;
 import org.apache.asterix.common.transactions.LogRecord;
@@ -72,9 +76,13 @@ import 
org.apache.asterix.replication.storage.LSMComponentProperties;
 import org.apache.asterix.replication.storage.LSMIndexFileProperties;
 import org.apache.asterix.replication.storage.ReplicaResourcesManager;
 import 
org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
+import org.apache.asterix.transaction.management.service.logging.LogBuffer;
 import org.apache.hyracks.api.application.INCServiceContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
 import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
+import 
org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager;
+import org.apache.hyracks.storage.common.LocalResource;
 import org.apache.hyracks.util.StorageUtil;
 import org.apache.hyracks.util.StorageUtil.StorageUnit;
 
@@ -89,7 +97,6 @@ public class ReplicationChannel extends Thread implements 
IReplicationChannel {
     private final String localNodeID;
     private final ILogManager logManager;
     private final ReplicaResourcesManager replicaResourcesManager;
-    private SocketChannel socketChannel = null;
     private ServerSocketChannel serverSocketChannel = null;
     private final IReplicationManager replicationManager;
     private final ReplicationProperties replicationProperties;
@@ -98,6 +105,7 @@ public class ReplicationChannel extends Thread implements 
IReplicationChannel {
     private final LinkedBlockingQueue<LSMComponentLSNSyncTask> 
lsmComponentRemoteLSN2LocalLSNMappingTaskQ;
     private final LinkedBlockingQueue<LogRecord> 
pendingNotificationRemoteLogsQ;
     private final Map<String, LSMComponentProperties> 
lsmComponentId2PropertiesMap;
+    private final Map<Long, RemoteLogMapping> localLsn2RemoteMapping;
     private final Map<String, RemoteLogMapping> replicaUniqueLSN2RemoteMapping;
     private final LSMComponentsSyncService lsmComponentLSNMappingService;
     private final Set<Integer> nodeHostedPartitions;
@@ -105,6 +113,7 @@ public class ReplicationChannel extends Thread implements 
IReplicationChannel {
     private final Object flushLogslock = new Object();
     private final IDatasetLifecycleManager dsLifecycleManager;
     private final PersistentLocalResourceRepository localResourceRep;
+    private final IIndexCheckpointManagerProvider 
indexCheckpointManagerProvider;
 
     public ReplicationChannel(String nodeId, ReplicationProperties 
replicationProperties, ILogManager logManager,
             IReplicaResourcesManager replicaResoucesManager, 
IReplicationManager replicationManager,
@@ -122,6 +131,7 @@ public class ReplicationChannel extends Thread implements 
IReplicationChannel {
         pendingNotificationRemoteLogsQ = new LinkedBlockingQueue<>();
         lsmComponentId2PropertiesMap = new ConcurrentHashMap<>();
         replicaUniqueLSN2RemoteMapping = new ConcurrentHashMap<>();
+        localLsn2RemoteMapping = new ConcurrentHashMap<>();
         lsmComponentLSNMappingService = new LSMComponentsSyncService();
         replicationNotifier = new ReplicationNotifier();
         replicationThreads = 
Executors.newCachedThreadPool(ncServiceContext.getThreadFactory());
@@ -136,6 +146,8 @@ public class ReplicationChannel extends Thread implements 
IReplicationChannel {
         }
         nodeHostedPartitions = new HashSet<>(clientsPartitions.size());
         nodeHostedPartitions.addAll(clientsPartitions);
+        this.indexCheckpointManagerProvider =
+                ((INcApplicationContext) 
ncServiceContext.getApplicationContext()).getIndexCheckpointManagerProvider();
     }
 
     @Override
@@ -156,7 +168,7 @@ public class ReplicationChannel extends Thread implements 
IReplicationChannel {
 
             //start accepting replication requests
             while (true) {
-                socketChannel = serverSocketChannel.accept();
+                SocketChannel socketChannel = serverSocketChannel.accept();
                 socketChannel.configureBlocking(true);
                 //start a new thread to handle the request
                 replicationThreads.execute(new 
ReplicationThread(socketChannel));
@@ -349,16 +361,19 @@ public class ReplicationChannel extends Thread implements 
IReplicationChannel {
                 }
                 if (afp.isLSMComponentFile()) {
                     String componentId = 
LSMComponentProperties.getLSMComponentID(afp.getFilePath());
-                    if (afp.getLSNByteOffset() > 
AbstractLSMIOOperationCallback.INVALID) {
-                        LSMComponentLSNSyncTask syncTask = new 
LSMComponentLSNSyncTask(componentId,
-                                destFile.getAbsolutePath(), 
afp.getLSNByteOffset());
+                    final LSMComponentProperties lsmComponentProperties = 
lsmComponentId2PropertiesMap.get(componentId);
+                    // merge operations do not generate flush logs
+                    if (afp.requiresAck() && 
lsmComponentProperties.getOpType() == LSMOperationType.FLUSH) {
+                        LSMComponentLSNSyncTask syncTask =
+                                new LSMComponentLSNSyncTask(componentId, 
destFile.getAbsolutePath());
                         
lsmComponentRemoteLSN2LocalLSNMappingTaskQ.offer(syncTask);
                     } else {
                         updateLSMComponentRemainingFiles(componentId);
                     }
                 } else {
                     //index metadata file
-                    
replicaResourcesManager.initializeReplicaIndexLSNMap(indexPath, 
logManager.getAppendLSN());
+                    final ResourceReference indexRef = 
ResourceReference.of(destFile.getAbsolutePath());
+                    
indexCheckpointManagerProvider.get(indexRef).init(logManager.getAppendLSN());
                 }
             }
         }
@@ -402,8 +417,7 @@ public class ReplicationChannel extends Thread implements 
IReplicationChannel {
                         try (RandomAccessFile fromFile = new 
RandomAccessFile(filePath, "r");
                                 FileChannel fileChannel = 
fromFile.getChannel();) {
                             long fileSize = fileChannel.size();
-                            fileProperties.initialize(filePath, fileSize, 
partition.getNodeId(), false,
-                                    AbstractLSMIOOperationCallback.INVALID, 
false);
+                            fileProperties.initialize(filePath, fileSize, 
partition.getNodeId(), false, false);
                             outBuffer = 
ReplicationProtocol.writeFileReplicationRequest(outBuffer, fileProperties,
                                     ReplicationRequestType.REPLICATE_FILE);
 
@@ -462,7 +476,7 @@ public class ReplicationChannel extends Thread implements 
IReplicationChannel {
                 switch (remoteLog.getLogType()) {
                     case LogType.UPDATE:
                     case LogType.ENTITY_COMMIT:
-                        //if the log partition belongs to a partitions hosted 
on this node, replicate it
+                        //if the log partition belongs to a partitions hosted 
on this node, replicated it
                         if 
(nodeHostedPartitions.contains(remoteLog.getResourcePartition())) {
                             logManager.log(remoteLog);
                         }
@@ -479,13 +493,21 @@ public class ReplicationChannel extends Thread implements 
IReplicationChannel {
                     case LogType.FLUSH:
                         //store mapping information for flush logs to use them 
in incoming LSM components.
                         RemoteLogMapping flushLogMap = new RemoteLogMapping();
+                        LogRecord flushLog = new LogRecord();
+                        TransactionUtil.formFlushLogRecord(flushLog, 
remoteLog.getDatasetId(), null,
+                                remoteLog.getNodeId(), 
remoteLog.getNumOfFlushedIndexes());
+                        flushLog.setReplicationThread(this);
+                        flushLog.setLogSource(LogSource.REMOTE);
                         flushLogMap.setRemoteNodeID(remoteLog.getNodeId());
                         flushLogMap.setRemoteLSN(remoteLog.getLSN());
-                        logManager.log(remoteLog);
-                        //the log LSN value is updated by logManager.log(.) to 
a local value
-                        flushLogMap.setLocalLSN(remoteLog.getLSN());
-                        
flushLogMap.numOfFlushedIndexes.set(remoteLog.getNumOfFlushedIndexes());
-                        
replicaUniqueLSN2RemoteMapping.put(flushLogMap.getNodeUniqueLSN(), flushLogMap);
+                        synchronized (localLsn2RemoteMapping) {
+                            logManager.log(flushLog);
+                            //the log LSN value is updated by 
logManager.log(.) to a local value
+                            flushLogMap.setLocalLSN(flushLog.getLSN());
+                            
flushLogMap.numOfFlushedIndexes.set(flushLog.getNumOfFlushedIndexes());
+                            
replicaUniqueLSN2RemoteMapping.put(flushLogMap.getNodeUniqueLSN(), flushLogMap);
+                            localLsn2RemoteMapping.put(flushLog.getLSN(), 
flushLogMap);
+                        }
                         synchronized (flushLogslock) {
                             flushLogslock.notify();
                         }
@@ -497,18 +519,55 @@ public class ReplicationChannel extends Thread implements 
IReplicationChannel {
         }
 
         /**
-         * this method is called sequentially by LogPage 
(notifyReplicationTerminator)
-         * for JOB_COMMIT and JOB_ABORT log types.
+         * this method is called sequentially by {@link 
LogBuffer#notifyReplicationTermination()}
+         * for JOB_COMMIT, JOB_ABORT, and FLUSH log types.
          */
         @Override
         public void notifyLogReplicationRequester(LogRecord logRecord) {
-            pendingNotificationRemoteLogsQ.offer(logRecord);
+            switch (logRecord.getLogType()) {
+                case LogType.JOB_COMMIT:
+                case LogType.ABORT:
+                    pendingNotificationRemoteLogsQ.offer(logRecord);
+                    break;
+                case LogType.FLUSH:
+                    final RemoteLogMapping remoteLogMapping;
+                    synchronized (localLsn2RemoteMapping) {
+                        remoteLogMapping = 
localLsn2RemoteMapping.remove(logRecord.getLSN());
+                    }
+                    checkpointReplicaIndexes(remoteLogMapping, 
logRecord.getDatasetId());
+                    break;
+                default:
+                    throw new IllegalStateException("Unexpected log type: " + 
logRecord.getLogType());
+            }
         }
 
         @Override
         public SocketChannel getReplicationClientSocket() {
             return socketChannel;
         }
+
+        private void checkpointReplicaIndexes(RemoteLogMapping 
remoteLogMapping, int datasetId) {
+            try {
+                Predicate<LocalResource> replicaIndexesPredicate = lr -> {
+                    DatasetLocalResource dls = (DatasetLocalResource) 
lr.getResource();
+                    return dls.getDatasetId() == datasetId && 
!localResourceRep.getActivePartitions()
+                            .contains(dls.getPartition());
+                };
+                final Map<Long, LocalResource> resources = 
localResourceRep.getResources(replicaIndexesPredicate);
+                final List<DatasetResourceReference> replicaIndexesRef =
+                        
resources.values().stream().map(DatasetResourceReference::of).collect(Collectors.toList());
+                for (DatasetResourceReference replicaIndexRef : 
replicaIndexesRef) {
+                    final IIndexCheckpointManager indexCheckpointManager =
+                            
indexCheckpointManagerProvider.get(replicaIndexRef);
+                    synchronized (indexCheckpointManager) {
+                        indexCheckpointManager
+                                .masterFlush(remoteLogMapping.getRemoteLSN(), 
remoteLogMapping.getLocalLSN());
+                    }
+                }
+            } catch (Exception e) {
+                LOGGER.log(Level.SEVERE, "Failed to checkpoint replica 
indexes", e);
+            }
+        }
     }
 
     /**
@@ -541,7 +600,6 @@ public class ReplicationChannel extends Thread implements 
IReplicationChannel {
      * the received LSM components to a local LSN.
      */
     private class LSMComponentsSyncService extends Thread {
-        private static final int BULKLOAD_LSN = 0;
 
         @Override
         public void run() {
@@ -560,90 +618,22 @@ public class ReplicationChannel extends Thread implements 
IReplicationChannel {
                         LOGGER.log(Level.SEVERE, "Unexpected exception during 
LSN synchronization", e);
                     }
                 }
-
             }
         }
 
         private void syncLSMComponentFlushLSN(LSMComponentProperties 
lsmCompProp, LSMComponentLSNSyncTask syncTask)
                 throws InterruptedException, IOException {
-            long remoteLSN = lsmCompProp.getOriginalLSN();
-            //LSN=0 (bulkload) does not need to be updated and there is no 
flush log corresponding to it
-            if (remoteLSN == BULKLOAD_LSN) {
-                //since this is the first LSM component of this index,
-                //then set the mapping in the LSN_MAP to the current log LSN 
because
-                //no other log could've been received for this index since 
bulkload replication is synchronous.
-                lsmCompProp.setReplicaLSN(logManager.getAppendLSN());
-                return;
-            }
-
-            //path to the LSM component file
-            Path path = Paths.get(syncTask.getComponentFilePath());
-            if (lsmCompProp.getReplicaLSN() == null) {
-                if (lsmCompProp.getOpType() == LSMOperationType.FLUSH) {
-                    //need to look up LSN mapping from memory
-                    RemoteLogMapping remoteLogMap = 
replicaUniqueLSN2RemoteMapping.get(lsmCompProp.getNodeUniqueLSN());
-                    //wait until flush log arrives, and verify the LSM 
component file still exists
-                    //The component file could be deleted if its NC fails.
-                    while (remoteLogMap == null && Files.exists(path)) {
-                        synchronized (flushLogslock) {
-                            flushLogslock.wait();
-                        }
-                        remoteLogMap = 
replicaUniqueLSN2RemoteMapping.get(lsmCompProp.getNodeUniqueLSN());
-                    }
-
-                    /**
-                     * file has been deleted due to its remote primary replica 
failure
-                     * before its LSN could've been synchronized.
-                     */
-                    if (remoteLogMap == null) {
-                        return;
-                    }
-                    lsmCompProp.setReplicaLSN(remoteLogMap.getLocalLSN());
-                } else if (lsmCompProp.getOpType() == LSMOperationType.MERGE) {
-                    //need to load the LSN mapping from disk
-                    Map<Long, Long> lsmMap = replicaResourcesManager
-                            
.getReplicaIndexLSNMap(lsmCompProp.getReplicaComponentPath(replicaResourcesManager));
-                    Long mappingLSN = lsmMap.get(lsmCompProp.getOriginalLSN());
-                    if (mappingLSN == null) {
-                        /**
-                         * this shouldn't happen unless this node just 
recovered and
-                         * the first component it received is a merged 
component due
-                         * to an on-going merge operation while recovery on 
the remote
-                         * replica. In this case, we use the current append 
LSN since
-                         * no new records exist for this index, otherwise they 
would've
-                         * been flushed. This could be prevented by waiting 
for any IO
-                         * to finish on the remote replica during recovery.
-                         */
-                        mappingLSN = logManager.getAppendLSN();
-                    }
-                    lsmCompProp.setReplicaLSN(mappingLSN);
+            final String componentFilePath = syncTask.getComponentFilePath();
+            final ResourceReference indexRef = 
ResourceReference.of(componentFilePath);
+            final IIndexCheckpointManager indexCheckpointManager = 
indexCheckpointManagerProvider.get(indexRef);
+            synchronized (indexCheckpointManager) {
+                long masterLsn = lsmCompProp.getOriginalLSN();
+                // wait until the lsn mapping is flushed to disk
+                while (!indexCheckpointManager.isFlushed(masterLsn)) {
+                    indexCheckpointManager.wait();
                 }
-            }
-
-            if (Files.notExists(path)) {
-                /**
-                 * This could happen when a merged component arrives and 
deletes
-                 * the flushed component (which we are trying to update) before
-                 * its flush log arrives since logs and components are received
-                 * on different threads.
-                 */
-                return;
-            }
-
-            File destFile = new File(syncTask.getComponentFilePath());
-            //prepare local LSN buffer
-            ByteBuffer metadataBuffer = ByteBuffer.allocate(Long.BYTES);
-            metadataBuffer.putLong(lsmCompProp.getReplicaLSN());
-            metadataBuffer.flip();
-
-            //replace the remote LSN value by the local one
-            try (RandomAccessFile fileOutputStream = new 
RandomAccessFile(destFile, "rw");
-                    FileChannel fileChannel = fileOutputStream.getChannel()) {
-                long lsnStartOffset = syncTask.getLSNByteOffset();
-                while (metadataBuffer.hasRemaining()) {
-                    lsnStartOffset += fileChannel.write(metadataBuffer, 
lsnStartOffset);
-                }
-                fileChannel.force(true);
+                indexCheckpointManager
+                        
.replicated(AbstractLSMIndexFileManager.getComponentEndTime(indexRef.getName()),
 masterLsn);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/929344e9/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
index 48c7083..5cf7eab 100644
--- 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
+++ 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
@@ -54,7 +54,6 @@ import java.util.stream.Collectors;
 
 import org.apache.asterix.common.cluster.ClusterPartition;
 import org.apache.asterix.common.config.ReplicationProperties;
-import org.apache.asterix.common.dataflow.LSMIndexUtil;
 import org.apache.asterix.common.replication.IReplicaResourcesManager;
 import org.apache.asterix.common.replication.IReplicationManager;
 import org.apache.asterix.common.replication.IReplicationStrategy;
@@ -63,6 +62,8 @@ import 
org.apache.asterix.common.replication.Replica.ReplicaState;
 import org.apache.asterix.common.replication.ReplicaEvent;
 import org.apache.asterix.common.replication.ReplicationJob;
 import org.apache.asterix.common.storage.DatasetResourceReference;
+import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
+import org.apache.asterix.common.storage.ResourceReference;
 import org.apache.asterix.common.transactions.IAppRuntimeContextProvider;
 import org.apache.asterix.common.transactions.ILogManager;
 import org.apache.asterix.common.transactions.ILogRecord;
@@ -84,7 +85,6 @@ import org.apache.hyracks.api.replication.IReplicationJob;
 import 
org.apache.hyracks.api.replication.IReplicationJob.ReplicationExecutionType;
 import org.apache.hyracks.api.replication.IReplicationJob.ReplicationJobType;
 import org.apache.hyracks.api.replication.IReplicationJob.ReplicationOperation;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexReplicationJob;
 import org.apache.hyracks.util.StorageUtil;
 import org.apache.hyracks.util.StorageUtil.StorageUnit;
@@ -136,6 +136,7 @@ public class ReplicationManager implements 
IReplicationManager {
     private final ByteBuffer txnLogsBatchSizeBuffer = 
ByteBuffer.allocate(Integer.BYTES);
     private final IReplicationStrategy replicationStrategy;
     private final PersistentLocalResourceRepository localResourceRepo;
+    private final IIndexCheckpointManagerProvider 
indexCheckpointManagerProvider;
 
     //TODO this class needs to be refactored by moving its private classes to 
separate files
     //and possibly using MessageBroker to send/receive remote replicas events.
@@ -148,6 +149,8 @@ public class ReplicationManager implements 
IReplicationManager {
         this.replicaResourcesManager = (ReplicaResourcesManager) 
remoteResoucesManager;
         this.asterixAppRuntimeContextProvider = 
asterixAppRuntimeContextProvider;
         this.logManager = logManager;
+        this.indexCheckpointManagerProvider =
+                
asterixAppRuntimeContextProvider.getAppContext().getIndexCheckpointManagerProvider();
         localResourceRepo =
                 (PersistentLocalResourceRepository) 
asterixAppRuntimeContextProvider.getLocalResourceRepository();
         this.hostIPAddressFirstOctet = 
replicationProperties.getReplicaIPAddress(nodeId).substring(0, 3);
@@ -284,7 +287,6 @@ public class ReplicationManager implements 
IReplicationManager {
             if (!replicationStrategy.isMatch(indexFileRef.getDatasetId())) {
                 return;
             }
-
             int jobPartitionId = indexFileRef.getPartitionId();
 
             ByteBuffer responseBuffer = null;
@@ -327,25 +329,10 @@ public class ReplicationManager implements 
IReplicationManager {
                                 FileChannel fileChannel = 
fromFile.getChannel();) {
 
                             long fileSize = fileChannel.size();
-
-                            if (LSMComponentJob != null) {
-                                /**
-                                 * since this is LSM_COMPONENT REPLICATE job, 
the job will contain
-                                 * only the component being replicated.
-                                 */
-                                ILSMDiskComponent diskComponent = 
LSMComponentJob.getLSMIndexOperationContext()
-                                        .getComponentsToBeReplicated().get(0);
-                                long lsnOffset = 
LSMIndexUtil.getComponentFileLSNOffset(LSMComponentJob.getLSMIndex(),
-                                        diskComponent, filePath);
-                                asterixFileProperties.initialize(filePath, 
fileSize, nodeId, isLSMComponentFile,
-                                        lsnOffset, remainingFiles == 0);
-                            } else {
-                                asterixFileProperties.initialize(filePath, 
fileSize, nodeId, isLSMComponentFile, -1L,
-                                        remainingFiles == 0);
-                            }
+                            asterixFileProperties.initialize(filePath, 
fileSize, nodeId, isLSMComponentFile,
+                                    remainingFiles == 0);
                             requestBuffer = 
ReplicationProtocol.writeFileReplicationRequest(requestBuffer,
                                     asterixFileProperties, 
ReplicationRequestType.REPLICATE_FILE);
-
                             Iterator<Map.Entry<String, SocketChannel>> 
iterator = replicasSockets.entrySet().iterator();
                             while (iterator.hasNext()) {
                                 Map.Entry<String, SocketChannel> entry = 
iterator.next();
@@ -378,8 +365,7 @@ public class ReplicationManager implements 
IReplicationManager {
                 } else if (job.getOperation() == ReplicationOperation.DELETE) {
                     for (String filePath : job.getJobFiles()) {
                         remainingFiles--;
-                        asterixFileProperties.initialize(filePath, -1, nodeId, 
isLSMComponentFile, -1L,
-                                remainingFiles == 0);
+                        asterixFileProperties.initialize(filePath, -1, nodeId, 
isLSMComponentFile, remainingFiles == 0);
                         
ReplicationProtocol.writeFileReplicationRequest(requestBuffer, 
asterixFileProperties,
                                 ReplicationRequestType.DELETE_FILE);
 
@@ -1026,10 +1012,10 @@ public class ReplicationManager implements 
IReplicationManager {
         ByteBuffer requestBuffer = ByteBuffer.allocate(INITIAL_BUFFER_SIZE);
         for (String replicaId : replicaIds) {
             //1. identify replica indexes with LSN less than 
nonSharpCheckpointTargetLSN.
-            Map<Long, String> laggingIndexes =
+            Map<Long, DatasetResourceReference> laggingIndexes =
                     
replicaResourcesManager.getLaggingReplicaIndexesId2PathMap(replicaId, 
nonSharpCheckpointTargetLSN);
 
-            if (laggingIndexes.size() > 0) {
+            if (!laggingIndexes.isEmpty()) {
                 //2. send request to remote replicas that have lagging indexes.
                 ReplicaIndexFlushRequest laggingIndexesResponse = null;
                 try (SocketChannel socketChannel = 
getReplicaSocket(replicaId)) {
@@ -1052,16 +1038,14 @@ public class ReplicationManager implements 
IReplicationManager {
                 }
 
                 /*
-                 * 4. update the LSN_MAP for indexes that were not flushed
+                 * 4. update checkpoints for indexes that were not flushed
                  * to the current append LSN to indicate no operations happened
                  * since the checkpoint start.
                  */
                 if (laggingIndexesResponse != null) {
                     for (Long resouceId : 
laggingIndexesResponse.getLaggingRescouresIds()) {
-                        String indexPath = laggingIndexes.get(resouceId);
-                        Map<Long, Long> indexLSNMap = 
replicaResourcesManager.getReplicaIndexLSNMap(indexPath);
-                        
indexLSNMap.put(ReplicaResourcesManager.REPLICA_INDEX_CREATION_LSN, startLSN);
-                        
replicaResourcesManager.updateReplicaIndexLSNMap(indexPath, indexLSNMap);
+                        final DatasetResourceReference 
datasetResourceReference = laggingIndexes.get(resouceId);
+                        
indexCheckpointManagerProvider.get(datasetResourceReference).advanceLowWatermark(startLSN);
                     }
                 }
             }
@@ -1141,12 +1125,11 @@ public class ReplicationManager implements 
IReplicationManager {
                     fileChannel.force(true);
                 }
 
-                //we need to create LSN map for .metadata files that belong to 
remote replicas
+                //we need to create initial map for .metadata files that 
belong to remote replicas
                 if (!fileProperties.isLSMComponentFile() && 
!fileProperties.getNodeId().equals(nodeId)) {
-                    //replica index
-                    
replicaResourcesManager.initializeReplicaIndexLSNMap(indexPath, 
logManager.getAppendLSN());
+                    final ResourceReference indexRef = 
ResourceReference.of(destFile.getAbsolutePath());
+                    
indexCheckpointManagerProvider.get(indexRef).init(logManager.getAppendLSN());
                 }
-
                 responseFunction = 
ReplicationProtocol.getRequestType(socketChannel, dataBuffer);
             }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/929344e9/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMComponentLSNSyncTask.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMComponentLSNSyncTask.java
 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMComponentLSNSyncTask.java
index f11adc2..08c0ec7 100644
--- 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMComponentLSNSyncTask.java
+++ 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMComponentLSNSyncTask.java
@@ -19,33 +19,21 @@
 package org.apache.asterix.replication.storage;
 
 public class LSMComponentLSNSyncTask {
+
     private String componentFilePath;
     private String componentId;
-    private long LSNByteOffset;
 
-    public LSMComponentLSNSyncTask(String componentId, String 
componentFilePath, long LSNByteOffset) {
+    public LSMComponentLSNSyncTask(String componentId, String 
componentFilePath) {
         this.componentId = componentId;
         this.componentFilePath = componentFilePath;
-        this.LSNByteOffset = LSNByteOffset;
     }
 
     public String getComponentFilePath() {
         return componentFilePath;
     }
 
-    public void setComponentFilePath(String componentFilePath) {
-        this.componentFilePath = componentFilePath;
-    }
-
     public String getComponentId() {
         return componentId;
     }
 
-    public void setComponentId(String componentId) {
-        this.componentId = componentId;
-    }
-
-    public long getLSNByteOffset() {
-        return LSNByteOffset;
-    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/929344e9/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMComponentProperties.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMComponentProperties.java
 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMComponentProperties.java
index 7ca6f2f..bf987d0 100644
--- 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMComponentProperties.java
+++ 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMComponentProperties.java
@@ -52,9 +52,10 @@ public class LSMComponentProperties {
         this.nodeId = nodeId;
         componentId = LSMComponentProperties.getLSMComponentID((String) 
job.getJobFiles().toArray()[0]);
         numberOfFiles = new AtomicInteger(job.getJobFiles().size());
-        originalLSN = 
LSMComponentProperties.getLSMComponentLSN((AbstractLSMIndex) job.getLSMIndex(),
-                job.getLSMIndexOperationContext());
         opType = job.getLSMOpType();
+        originalLSN = opType == LSMOperationType.FLUSH ?
+                LSMComponentProperties.getLSMComponentLSN((AbstractLSMIndex) 
job.getLSMIndex(),
+                        job.getLSMIndexOperationContext()) : 0;
     }
 
     public LSMComponentProperties() {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/929344e9/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMIndexFileProperties.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMIndexFileProperties.java
 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMIndexFileProperties.java
index f2747fe..2ebf2cb 100644
--- 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMIndexFileProperties.java
+++ 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMIndexFileProperties.java
@@ -31,27 +31,25 @@ public class LSMIndexFileProperties {
     private boolean lsmComponentFile;
     private String filePath;
     private boolean requiresAck = false;
-    private long LSNByteOffset;
 
     public LSMIndexFileProperties() {
     }
 
     public LSMIndexFileProperties(String filePath, long fileSize, String 
nodeId, boolean lsmComponentFile,
-            long LSNByteOffset, boolean requiresAck) {
-        initialize(filePath, fileSize, nodeId, lsmComponentFile, 
LSNByteOffset, requiresAck);
+            boolean requiresAck) {
+        initialize(filePath, fileSize, nodeId, lsmComponentFile, requiresAck);
     }
 
     public LSMIndexFileProperties(LSMComponentProperties 
lsmComponentProperties) {
-        initialize(lsmComponentProperties.getComponentId(), -1, 
lsmComponentProperties.getNodeId(), false, -1L, false);
+        initialize(lsmComponentProperties.getComponentId(), -1, 
lsmComponentProperties.getNodeId(), false, false);
     }
 
-    public void initialize(String filePath, long fileSize, String nodeId, 
boolean lsmComponentFile, long LSNByteOffset,
+    public void initialize(String filePath, long fileSize, String nodeId, 
boolean lsmComponentFile,
             boolean requiresAck) {
         this.filePath = filePath;
         this.fileSize = fileSize;
         this.nodeId = nodeId;
         this.lsmComponentFile = lsmComponentFile;
-        this.LSNByteOffset = LSNByteOffset;
         this.requiresAck = requiresAck;
     }
 
@@ -61,7 +59,6 @@ public class LSMIndexFileProperties {
         dos.writeUTF(filePath);
         dos.writeLong(fileSize);
         dos.writeBoolean(lsmComponentFile);
-        dos.writeLong(LSNByteOffset);
         dos.writeBoolean(requiresAck);
     }
 
@@ -70,10 +67,9 @@ public class LSMIndexFileProperties {
         String filePath = input.readUTF();
         long fileSize = input.readLong();
         boolean lsmComponentFile = input.readBoolean();
-        long LSNByteOffset = input.readLong();
         boolean requiresAck = input.readBoolean();
-        LSMIndexFileProperties fileProp = new LSMIndexFileProperties(filePath, 
fileSize, nodeId, lsmComponentFile,
-                LSNByteOffset, requiresAck);
+        LSMIndexFileProperties fileProp =
+                new LSMIndexFileProperties(filePath, fileSize, nodeId, 
lsmComponentFile, requiresAck);
         return fileProp;
     }
 
@@ -108,11 +104,6 @@ public class LSMIndexFileProperties {
         sb.append("File Size: " + fileSize + "  ");
         sb.append("Node ID: " + nodeId + "  ");
         sb.append("isLSMComponentFile : " + lsmComponentFile + "  ");
-        sb.append("LSN Byte Offset: " + LSNByteOffset);
         return sb.toString();
     }
-
-    public long getLSNByteOffset() {
-        return LSNByteOffset;
-    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/929344e9/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java
 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java
index 7eea4a4..2ff74a8 100644
--- 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java
+++ 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java
@@ -19,27 +19,26 @@
 package org.apache.asterix.replication.storage;
 
 import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
 import java.io.FilenameFilter;
 import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.logging.Level;
-import java.util.logging.Logger;
+import java.util.stream.Collectors;
 
 import org.apache.asterix.common.cluster.ClusterPartition;
 import org.apache.asterix.common.config.MetadataProperties;
+import org.apache.asterix.common.dataflow.DatasetLocalResource;
 import org.apache.asterix.common.replication.IReplicaResourcesManager;
+import org.apache.asterix.common.storage.DatasetResourceReference;
+import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
 import org.apache.asterix.common.utils.StorageConstants;
 import org.apache.asterix.common.utils.StoragePathUtil;
 import 
org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
@@ -50,16 +49,15 @@ import 
org.apache.hyracks.storage.common.ILocalResourceRepository;
 import org.apache.hyracks.storage.common.LocalResource;
 
 public class ReplicaResourcesManager implements IReplicaResourcesManager {
-    private static final Logger LOGGER = 
Logger.getLogger(ReplicaResourcesManager.class.getName());
-    public final static String LSM_COMPONENT_MASK_SUFFIX = "_mask";
-    private final static String REPLICA_INDEX_LSN_MAP_NAME = ".LSN_MAP";
-    public static final long REPLICA_INDEX_CREATION_LSN = -1;
+    public static final String LSM_COMPONENT_MASK_SUFFIX = "_mask";
     private final PersistentLocalResourceRepository localRepository;
     private final Map<String, ClusterPartition[]> nodePartitions;
+    private final IIndexCheckpointManagerProvider 
indexCheckpointManagerProvider;
 
-    public ReplicaResourcesManager(ILocalResourceRepository localRepository,
-            MetadataProperties metadataProperties) {
+    public ReplicaResourcesManager(ILocalResourceRepository localRepository, 
MetadataProperties metadataProperties,
+            IIndexCheckpointManagerProvider indexCheckpointManagerProvider) {
         this.localRepository = (PersistentLocalResourceRepository) 
localRepository;
+        this.indexCheckpointManagerProvider = indexCheckpointManagerProvider;
         nodePartitions = metadataProperties.getNodePartitions();
     }
 
@@ -86,12 +84,6 @@ public class ReplicaResourcesManager implements 
IReplicaResourcesManager {
         return indexPath.toString();
     }
 
-    public void initializeReplicaIndexLSNMap(String indexPath, long 
currentLSN) throws IOException {
-        HashMap<Long, Long> lsnMap = new HashMap<Long, Long>();
-        lsnMap.put(REPLICA_INDEX_CREATION_LSN, currentLSN);
-        updateReplicaIndexLSNMap(indexPath, lsnMap);
-    }
-
     public void createRemoteLSMComponentMask(LSMComponentProperties 
lsmComponentProperties) throws IOException {
         String maskPath = lsmComponentProperties.getMaskPath(this);
         Path path = Paths.get(maskPath);
@@ -106,13 +98,6 @@ public class ReplicaResourcesManager implements 
IReplicaResourcesManager {
         String maskPath = lsmComponentProperties.getMaskPath(this);
         Path path = Paths.get(maskPath);
         Files.deleteIfExists(path);
-
-        //add component LSN to the index LSNs map
-        Map<Long, Long> lsnMap = 
getReplicaIndexLSNMap(lsmComponentProperties.getReplicaComponentPath(this));
-        lsnMap.put(lsmComponentProperties.getOriginalLSN(), 
lsmComponentProperties.getReplicaLSN());
-
-        //update map on disk
-        
updateReplicaIndexLSNMap(lsmComponentProperties.getReplicaComponentPath(this), 
lsnMap);
     }
 
     public Set<File> getReplicaIndexes(String replicaId) throws 
HyracksDataException {
@@ -128,58 +113,37 @@ public class ReplicaResourcesManager implements 
IReplicaResourcesManager {
     public long getPartitionsMinLSN(Set<Integer> partitions) throws 
HyracksDataException {
         long minRemoteLSN = Long.MAX_VALUE;
         for (Integer partition : partitions) {
-            //for every index in replica
-            Set<File> remoteIndexes = 
localRepository.getPartitionIndexes(partition);
-            for (File indexFolder : remoteIndexes) {
-                //read LSN map
-                try {
-                    //get max LSN per index
-                    long remoteIndexMaxLSN = 
getReplicaIndexMaxLSN(indexFolder);
-
-                    //get min of all maximums
-                    minRemoteLSN = Math.min(minRemoteLSN, remoteIndexMaxLSN);
-                } catch (IOException e) {
-                    LOGGER.log(Level.INFO,
-                            indexFolder.getAbsolutePath() + " Couldn't read 
LSN map for index " + indexFolder);
-                    continue;
-                }
+            final List<DatasetResourceReference> partitionResources = 
localRepository.getResources(resource -> {
+                DatasetLocalResource dsResource = (DatasetLocalResource) 
resource.getResource();
+                return dsResource.getPartition() == partition;
+            
}).values().stream().map(DatasetResourceReference::of).collect(Collectors.toList());
+            for (DatasetResourceReference indexRef : partitionResources) {
+                long remoteIndexMaxLSN = 
indexCheckpointManagerProvider.get(indexRef).getLowWatermark();
+                minRemoteLSN = Math.min(minRemoteLSN, remoteIndexMaxLSN);
             }
         }
         return minRemoteLSN;
     }
 
-    public Map<Long, String> getLaggingReplicaIndexesId2PathMap(String 
replicaId, long targetLSN) throws IOException {
-        Map<Long, String> laggingReplicaIndexes = new HashMap<Long, String>();
-        try {
-            //for every index in replica
-            Set<File> remoteIndexes = getReplicaIndexes(replicaId);
-            for (File indexFolder : remoteIndexes) {
-                if (getReplicaIndexMaxLSN(indexFolder) < targetLSN) {
-                    File localResource = new File(
-                            indexFolder + File.separator + 
StorageConstants.METADATA_FILE_NAME);
-                    LocalResource resource = 
PersistentLocalResourceRepository.readLocalResource(localResource);
-                    laggingReplicaIndexes.put(resource.getId(), 
indexFolder.getAbsolutePath());
+    public Map<Long, DatasetResourceReference> 
getLaggingReplicaIndexesId2PathMap(String replicaId, long targetLSN)
+            throws HyracksDataException {
+        Map<Long, DatasetResourceReference> laggingReplicaIndexes = new 
HashMap<>();
+        final List<Integer> replicaPartitions =
+                
Arrays.stream(nodePartitions.get(replicaId)).map(ClusterPartition::getPartitionId)
+                        .collect(Collectors.toList());
+        for (int patition : replicaPartitions) {
+            final Map<Long, LocalResource> partitionResources = 
localRepository.getPartitionResources(patition);
+            final List<DatasetResourceReference> indexesRefs =
+                    
partitionResources.values().stream().map(DatasetResourceReference::of).collect(Collectors.toList());
+            for (DatasetResourceReference ref : indexesRefs) {
+                if (indexCheckpointManagerProvider.get(ref).getLowWatermark() 
< targetLSN) {
+                    laggingReplicaIndexes.put(ref.getResourceId(), ref);
                 }
             }
-        } catch (HyracksDataException e) {
-            e.printStackTrace();
         }
-
         return laggingReplicaIndexes;
     }
 
-    private long getReplicaIndexMaxLSN(File indexFolder) throws IOException {
-        long remoteIndexMaxLSN = 0;
-        //get max LSN per index
-        Map<Long, Long> lsnMap = 
getReplicaIndexLSNMap(indexFolder.getAbsolutePath());
-        if (lsnMap != null) {
-            for (Long lsn : lsnMap.values()) {
-                remoteIndexMaxLSN = Math.max(remoteIndexMaxLSN, lsn);
-            }
-        }
-        return remoteIndexMaxLSN;
-    }
-
     public void cleanInvalidLSMComponents(String replicaId) {
         //for every index in replica
         Set<File> remoteIndexes = null;
@@ -214,28 +178,6 @@ public class ReplicaResourcesManager implements 
IReplicaResourcesManager {
         }
     }
 
-    @SuppressWarnings({ "unchecked" })
-    public synchronized Map<Long, Long> getReplicaIndexLSNMap(String 
indexPath) throws IOException {
-        try (FileInputStream fis = new FileInputStream(indexPath + 
File.separator + REPLICA_INDEX_LSN_MAP_NAME);
-                ObjectInputStream oisFromFis = new ObjectInputStream(fis)) {
-            Map<Long, Long> lsnMap = null;
-            try {
-                lsnMap = (Map<Long, Long>) oisFromFis.readObject();
-            } catch (ClassNotFoundException e) {
-                e.printStackTrace();
-            }
-            return lsnMap;
-        }
-    }
-
-    public synchronized void updateReplicaIndexLSNMap(String indexPath, 
Map<Long, Long> lsnMap) throws IOException {
-        try (FileOutputStream fos = new FileOutputStream(indexPath + 
File.separator + REPLICA_INDEX_LSN_MAP_NAME);
-                ObjectOutputStream oosToFos = new ObjectOutputStream(fos)) {
-            oosToFos.writeObject(lsnMap);
-            oosToFos.flush();
-        }
-    }
-
     /**
      * @param partition
      * @return Absolute paths to all partition files

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/929344e9/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index e87a39b..ba43fb7 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@ -29,6 +29,7 @@ import java.io.ObjectOutputStream;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -38,8 +39,6 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.Predicate;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
 
 import org.apache.asterix.common.cluster.ClusterPartition;
 import org.apache.asterix.common.config.MetadataProperties;
@@ -48,10 +47,12 @@ import 
org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.replication.IReplicationManager;
 import org.apache.asterix.common.replication.ReplicationJob;
 import org.apache.asterix.common.storage.DatasetResourceReference;
+import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
 import org.apache.asterix.common.storage.ResourceReference;
 import org.apache.asterix.common.utils.StorageConstants;
 import org.apache.asterix.common.utils.StoragePathUtil;
 import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.filefilter.IOFileFilter;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.api.io.IIOManager;
@@ -72,7 +73,29 @@ public class PersistentLocalResourceRepository implements 
ILocalResourceReposito
     public static final Predicate<Path> INDEX_COMPONENTS = path -> 
!path.endsWith(StorageConstants.METADATA_FILE_NAME);
     // Private constants
     private static final int MAX_CACHED_RESOURCES = 1000;
-    private static final int RESOURCES_TREE_DEPTH_FROM_STORAGE_ROOT = 6;
+    private static final IOFileFilter METADATA_FILES_FILTER = new 
IOFileFilter() {
+        @Override
+        public boolean accept(File file) {
+            return file.getName().equals(StorageConstants.METADATA_FILE_NAME);
+        }
+
+        @Override
+        public boolean accept(File dir, String name) {
+            return false;
+        }
+    };
+
+    private static final IOFileFilter ALL_DIR_FILTER = new IOFileFilter() {
+        @Override
+        public boolean accept(File file) {
+            return true;
+        }
+
+        @Override
+        public boolean accept(File dir, String name) {
+            return true;
+        }
+    };
 
     // Finals
     private final IIOManager ioManager;
@@ -85,15 +108,17 @@ public class PersistentLocalResourceRepository implements 
ILocalResourceReposito
     private IReplicationManager replicationManager;
     private Set<Integer> nodeInactivePartitions;
     private final Path[] storageRoots;
+    private final IIndexCheckpointManagerProvider 
indexCheckpointManagerProvider;
 
-    public PersistentLocalResourceRepository(IIOManager ioManager, String 
nodeId,
-            MetadataProperties metadataProperties) {
+    public PersistentLocalResourceRepository(IIOManager ioManager, String 
nodeId, MetadataProperties metadataProperties,
+            IIndexCheckpointManagerProvider indexCheckpointManagerProvider) {
         this.ioManager = ioManager;
+        this.indexCheckpointManagerProvider = indexCheckpointManagerProvider;
         storageRoots = new Path[ioManager.getIODevices().size()];
         final List<IODeviceHandle> ioDevices = ioManager.getIODevices();
         for (int i = 0; i < ioDevices.size(); i++) {
-            storageRoots[i] =
-                    Paths.get(ioDevices.get(i).getMount().getAbsolutePath(), 
StorageConstants.STORAGE_ROOT_DIR_NAME);
+            storageRoots[i] = 
Paths.get(ioDevices.get(i).getMount().getAbsolutePath(),
+                    StorageConstants.STORAGE_ROOT_DIR_NAME);
         }
         createStorageRoots();
         resourceCache = 
CacheBuilder.newBuilder().maximumSize(MAX_CACHED_RESOURCES).build();
@@ -119,7 +144,7 @@ public class PersistentLocalResourceRepository implements 
ILocalResourceReposito
     }
 
     @Override
-    public LocalResource get(String relativePath) throws HyracksDataException {
+    public synchronized LocalResource get(String relativePath) throws 
HyracksDataException {
         LocalResource resource = resourceCache.getIfPresent(relativePath);
         if (resource == null) {
             FileReference resourceFile = getLocalResourceFileByName(ioManager, 
relativePath);
@@ -153,7 +178,7 @@ public class PersistentLocalResourceRepository implements 
ILocalResourceReposito
         }
 
         resourceCache.put(resource.getPath(), resource);
-
+        
indexCheckpointManagerProvider.get(DatasetResourceReference.of(resource)).init(0);
         //if replication enabled, send resource metadata info to remote nodes
         if (isReplicationEnabled) {
             createReplicationJob(ReplicationOperation.REPLICATE, resourceFile);
@@ -164,18 +189,16 @@ public class PersistentLocalResourceRepository implements 
ILocalResourceReposito
     public synchronized void delete(String relativePath) throws 
HyracksDataException {
         FileReference resourceFile = getLocalResourceFileByName(ioManager, 
relativePath);
         if (resourceFile.getFile().exists()) {
-            try {
-                // Invalidate before deleting the file just in case file 
deletion throws some exception.
-                // Since it's just a cache invalidation, it should not affect 
correctness.
-                resourceCache.invalidate(relativePath);
-                IoUtil.delete(resourceFile);
-            } finally {
-                // Regardless of successfully deleted or not, the operation 
should be replicated.
-                //if replication enabled, delete resource from remote replicas
-                if (isReplicationEnabled) {
-                    createReplicationJob(ReplicationOperation.DELETE, 
resourceFile);
-                }
+            if (isReplicationEnabled) {
+                createReplicationJob(ReplicationOperation.DELETE, 
resourceFile);
             }
+            // delete all checkpoints
+            final LocalResource localResource = 
readLocalResource(resourceFile.getFile());
+            
indexCheckpointManagerProvider.get(DatasetResourceReference.of(localResource)).delete();
+            // Invalidate before deleting the file just in case file deletion 
throws some exception.
+            // Since it's just a cache invalidation, it should not affect 
correctness.
+            resourceCache.invalidate(relativePath);
+            IoUtil.delete(resourceFile);
         } else {
             throw HyracksDataException
                     
.create(org.apache.hyracks.api.exceptions.ErrorCode.RESOURCE_DOES_NOT_EXIST, 
relativePath);
@@ -188,13 +211,13 @@ public class PersistentLocalResourceRepository implements 
ILocalResourceReposito
         return ioManager.resolve(fileName);
     }
 
-    public Map<Long, LocalResource> getResources(Predicate<LocalResource> 
filter) throws HyracksDataException {
+    public synchronized Map<Long, LocalResource> 
getResources(Predicate<LocalResource> filter)
+            throws HyracksDataException {
         Map<Long, LocalResource> resourcesMap = new HashMap<>();
         for (Path root : storageRoots) {
-            try (Stream<Path> stream = Files.find(root, 
RESOURCES_TREE_DEPTH_FROM_STORAGE_ROOT,
-                    (path, attr) -> 
path.getFileName().toString().equals(StorageConstants.METADATA_FILE_NAME))) {
-                final List<File> resourceMetadataFiles = 
stream.map(Path::toFile).collect(Collectors.toList());
-                for (File file : resourceMetadataFiles) {
+            final Collection<File> files = FileUtils.listFiles(root.toFile(), 
METADATA_FILES_FILTER, ALL_DIR_FILTER);
+            try {
+                for (File file : files) {
                     final LocalResource localResource = 
PersistentLocalResourceRepository.readLocalResource(file);
                     if (filter.test(localResource)) {
                         resourcesMap.put(localResource.getId(), localResource);
@@ -321,6 +344,13 @@ public class PersistentLocalResourceRepository implements 
ILocalResourceReposito
         return indexes;
     }
 
+    public Map<Long, LocalResource> getPartitionResources(int partition) 
throws HyracksDataException {
+        return getResources(resource -> {
+            DatasetLocalResource dsResource = (DatasetLocalResource) 
resource.getResource();
+            return dsResource.getPartition() == partition;
+        });
+    }
+
     /**
      * Given any index file, an absolute {@link FileReference} is returned 
which points to where the index of
      * {@code indexFile} is located.

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/929344e9/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepositoryFactory.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepositoryFactory.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepositoryFactory.java
index 43024b6..33c6260 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepositoryFactory.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepositoryFactory.java
@@ -19,6 +19,7 @@
 package org.apache.asterix.transaction.management.resource;
 
 import org.apache.asterix.common.config.MetadataProperties;
+import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.IIOManager;
 import org.apache.hyracks.storage.common.ILocalResourceRepository;
@@ -28,16 +29,19 @@ public class PersistentLocalResourceRepositoryFactory 
implements ILocalResourceR
     private final IIOManager ioManager;
     private final String nodeId;
     private final MetadataProperties metadataProperties;
+    private final IIndexCheckpointManagerProvider 
indexCheckpointManagerProvider;
 
     public PersistentLocalResourceRepositoryFactory(IIOManager ioManager, 
String nodeId,
-            MetadataProperties metadataProperties) {
+            MetadataProperties metadataProperties, 
IIndexCheckpointManagerProvider indexCheckpointManagerProvider) {
         this.ioManager = ioManager;
         this.nodeId = nodeId;
         this.metadataProperties = metadataProperties;
+        this.indexCheckpointManagerProvider = indexCheckpointManagerProvider;
     }
 
     @Override
     public ILocalResourceRepository createRepository() throws 
HyracksDataException {
-        return new PersistentLocalResourceRepository(ioManager, nodeId, 
metadataProperties);
+        return new PersistentLocalResourceRepository(ioManager, nodeId, 
metadataProperties,
+                indexCheckpointManagerProvider);
     }
 }

Reply via email to