[ASTERIXDB-2231][STO] Separate primary op tracker for each partition

- user model changes: no
- storage format changes: no.
- interface changes: yes.

Details:
- Separate primary index operation tracker for each partition, instead
of having a global one on each NC to achieve better scalability.
- As a coordinated change, separate component id generator for each
partition as well.
- Add partition to transaction context so that transaction operations
can operate on proper op tracker.
- Fixes [ASTERIXDB-2232] to calculate dataset partitions correctly.

Change-Id: I9eb3854d2343e45beeccb87b0d434e5f4efd69c9
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2263
Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Reviewed-by: abdullah alamoudi <bamou...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/2f934e31
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/2f934e31
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/2f934e31

Branch: refs/heads/master
Commit: 2f934e312e841223aa0e4941e01f23b9e63fdc18
Parents: f94fdcc
Author: luochen01 <cl...@uci.edu>
Authored: Thu Feb 1 09:17:23 2018 -0800
Committer: abdullah alamoudi <bamou...@gmail.com>
Committed: Thu Feb 1 23:35:43 2018 -0800

----------------------------------------------------------------------
 .../asterix/app/nc/NCAppRuntimeContext.java     |  22 +--
 .../test/dataflow/ComponentRollbackTest.java    |  14 +-
 .../dataflow/MultiPartitionLSMIndexTest.java    |   5 +-
 .../TestLsmBtreeIoOpCallbackFactory.java        |   3 +-
 .../TestPrimaryIndexOperationTracker.java       |   4 +-
 ...TestPrimaryIndexOperationTrackerFactory.java |  29 ++-
 .../asterix/test/metadata/MetadataTxnTest.java  |   7 +-
 .../apache/asterix/test/txn/LogManagerTest.java |   2 +-
 .../common/api/IDatasetLifecycleManager.java    |   6 +-
 .../common/api/INcApplicationContext.java       |   2 +-
 .../context/CorrelatedPrefixMergePolicy.java    |  25 +--
 .../asterix/common/context/DatasetInfo.java     |  43 +++--
 .../DatasetLSMComponentIdGeneratorFactory.java  |   9 +-
 .../common/context/DatasetLifecycleManager.java | 192 +++++++++++--------
 .../asterix/common/context/DatasetResource.java |  45 +++--
 .../context/PrimaryIndexOperationTracker.java   |  12 +-
 ...tractLSMIndexIOOperationCallbackFactory.java |  13 +-
 .../LSMBTreeIOOperationCallbackFactory.java     |   3 +-
 ...TreeWithBuddyIOOperationCallbackFactory.java |   3 +-
 ...InvertedIndexIOOperationCallbackFactory.java |   3 +-
 .../LSMRTreeIOOperationCallbackFactory.java     |   3 +-
 .../transactions/ITransactionContext.java       |   8 +-
 .../CorrelatedPrefixMergePolicyTest.java        |  27 +--
 .../apache/asterix/metadata/MetadataNode.java   |   5 +-
 .../metadata/bootstrap/MetadataBootstrap.java   |   2 +-
 .../asterix/metadata/entities/Dataset.java      |   7 +-
 .../job/listener/JobEventListenerFactory.java   |   3 +-
 ...dexModificationOperationCallbackFactory.java |   2 +-
 .../PrimaryIndexOperationTrackerFactory.java    |   9 +-
 ...dexModificationOperationCallbackFactory.java |   2 +-
 .../SecondaryIndexOperationTrackerFactory.java  |   5 +-
 .../UpsertOperationCallbackFactory.java         |   2 +-
 .../resource/DatasetLocalResourceFactory.java   |   3 +-
 .../management/service/logging/LogBuffer.java   |   2 +-
 .../transaction/AbstractTransactionContext.java |   2 +-
 .../transaction/AtomicTransactionContext.java   |   6 +-
 .../EntityLevelTransactionContext.java          |  44 +++--
 .../dataflow/ExternalBTreeLocalResource.java    |   4 +-
 .../ExternalBTreeWithBuddyLocalResource.java    |   4 +-
 .../btree/dataflow/LSMBTreeLocalResource.java   |   4 +-
 .../am/lsm/btree/impls/ExternalBTree.java       |   3 +-
 .../lsm/btree/impls/ExternalBTreeWithBuddy.java |   4 +-
 .../storage/am/lsm/btree/impls/LSMBTree.java    |   2 +-
 .../am/lsm/btree/utils/LSMBTreeUtil.java        |   6 +-
 .../api/ILSMComponentIdGeneratorFactory.java    |   6 +-
 .../api/ILSMIOOperationCallbackFactory.java     |   8 +-
 .../common/api/ILSMOperationTrackerFactory.java |   4 +-
 .../am/lsm/common/impls/AbstractLSMIndex.java   |   4 +-
 .../impls/LSMComponentIdGeneratorFactory.java   |   3 +-
 .../impls/NoOpIOOperationCallbackFactory.java   |   3 +-
 .../impls/NoOpOperationTrackerFactory.java      |   3 +-
 .../ThreadCountingOperationTrackerFactory.java  |   3 +-
 .../dataflow/LSMInvertedIndexLocalResource.java |   6 +-
 .../dataflow/ExternalRTreeLocalResource.java    |   8 +-
 .../rtree/dataflow/LSMRTreeLocalResource.java   |   9 +-
 .../LSMRTreeWithAntiMatterLocalResource.java    |   4 +-
 .../am/lsm/rtree/impls/AbstractLSMRTree.java    |   2 +-
 .../am/lsm/rtree/impls/ExternalRTree.java       |   2 +-
 .../storage/am/lsm/rtree/impls/LSMRTree.java    |   2 +-
 ...MBTreeModificationOperationCallbackTest.java |   2 +-
 .../LSMBTreeSearchOperationCallbackTest.java    |   2 +-
 .../am/lsm/btree/LSMBTreeUpdateInPlaceTest.java |   2 +-
 .../btree/impl/TestLsmBtreeLocalResource.java   |   4 +-
 63 files changed, 408 insertions(+), 270 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2f934e31/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index c554cbd..366438a 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -109,15 +109,15 @@ public class NCAppRuntimeContext implements 
INcApplicationContext {
     private ILSMMergePolicyFactory metadataMergePolicyFactory;
     private final INCServiceContext ncServiceContext;
     private final IResourceIdFactory resourceIdFactory;
-    private CompilerProperties compilerProperties;
-    private ExternalProperties externalProperties;
-    private MetadataProperties metadataProperties;
-    private StorageProperties storageProperties;
-    private TransactionProperties txnProperties;
-    private ActiveProperties activeProperties;
-    private BuildProperties buildProperties;
-    private ReplicationProperties replicationProperties;
-    private MessagingProperties messagingProperties;
+    private final CompilerProperties compilerProperties;
+    private final ExternalProperties externalProperties;
+    private final MetadataProperties metadataProperties;
+    private final StorageProperties storageProperties;
+    private final TransactionProperties txnProperties;
+    private final ActiveProperties activeProperties;
+    private final BuildProperties buildProperties;
+    private final ReplicationProperties replicationProperties;
+    private final MessagingProperties messagingProperties;
     private final NodeProperties nodeProperties;
     private ExecutorService threadExecutor;
     private IDatasetMemoryManager datasetMemoryManager;
@@ -373,8 +373,8 @@ public class NCAppRuntimeContext implements 
INcApplicationContext {
     }
 
     @Override
-    public ILSMOperationTracker getLSMBTreeOperationTracker(int datasetID) {
-        return datasetLifecycleManager.getOperationTracker(datasetID);
+    public ILSMOperationTracker getPrimaryOperationTracker(int datasetID, int 
partition) {
+        return datasetLifecycleManager.getOperationTracker(datasetID, 
partition);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2f934e31/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java
index a46b029..c6232f5 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java
@@ -144,7 +144,7 @@ public class ComponentRollbackTest {
             
Assert.assertTrue(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified());
             StorageTestUtils.searchAndAssertCount(nc, PARTITION, 
StorageTestUtils.TOTAL_NUM_OF_RECORDS);
             ILSMIndexAccessor lsmAccessor = 
lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
-            
dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID).refresh();
+            
dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, 
PARTITION).refresh();
             ((AbstractLSMIOOperationCallback) 
lsmBtree.getIOOperationCallback()).updateLastLSN(0);
             // rollback a memory component
             lsmAccessor.deleteComponents(memoryComponentsPredicate);
@@ -153,7 +153,7 @@ public class ComponentRollbackTest {
             // rollback the last disk component
             lsmAccessor = 
lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
             long lsn = 
AbstractLSMIOOperationCallback.getTreeIndexLSN(diskComponents.get(0).getMetadata());
-            
dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID).refresh();
+            
dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, 
PARTITION).refresh();
             ((AbstractLSMIOOperationCallback) 
lsmBtree.getIOOperationCallback()).updateLastLSN(0);
             DiskComponentLsnPredicate pred = new 
DiskComponentLsnPredicate(lsn);
             lsmAccessor.deleteComponents(pred);
@@ -201,7 +201,7 @@ public class ComponentRollbackTest {
             
Assert.assertTrue(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified());
             StorageTestUtils.searchAndAssertCount(nc, PARTITION, 
StorageTestUtils.TOTAL_NUM_OF_RECORDS);
             ILSMIndexAccessor lsmAccessor = 
lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
-            
dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID).refresh();
+            
dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, 
PARTITION).refresh();
             ((AbstractLSMIOOperationCallback) 
lsmBtree.getIOOperationCallback()).updateLastLSN(0);
             // rollback a memory component
             lsmAccessor.deleteComponents(memoryComponentsPredicate);
@@ -227,7 +227,7 @@ public class ComponentRollbackTest {
             // rollback the last disk component
             lsmAccessor = 
lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
             long lsn = 
AbstractLSMIOOperationCallback.getTreeIndexLSN(diskComponents.get(0).getMetadata());
-            
dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID).refresh();
+            
dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, 
PARTITION).refresh();
             ((AbstractLSMIOOperationCallback) 
lsmBtree.getIOOperationCallback()).updateLastLSN(0);
             DiskComponentLsnPredicate pred = new 
DiskComponentLsnPredicate(lsn);
             lsmAccessor.deleteComponents(pred);
@@ -276,7 +276,7 @@ public class ComponentRollbackTest {
             firstSearcher.waitUntilEntered();
             // now that we enetered, we will rollback
             ILSMIndexAccessor lsmAccessor = 
lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
-            
dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID).refresh();
+            
dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, 
PARTITION).refresh();
             ((AbstractLSMIOOperationCallback) 
lsmBtree.getIOOperationCallback()).updateLastLSN(0);
             // rollback a memory component
             lsmAccessor.deleteComponents(
@@ -298,7 +298,7 @@ public class ComponentRollbackTest {
             lsmAccessor = 
lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
             long lsn = 
AbstractLSMIOOperationCallback.getTreeIndexLSN(diskComponents.get(0).getMetadata());
 
-            
dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID).refresh();
+            
dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, 
PARTITION).refresh();
             ((AbstractLSMIOOperationCallback) 
lsmBtree.getIOOperationCallback()).updateLastLSN(0);
             DiskComponentLsnPredicate pred = new 
DiskComponentLsnPredicate(lsn);
             lsmAccessor.deleteComponents(pred);
@@ -702,7 +702,7 @@ public class ComponentRollbackTest {
                 public void run() {
                     ILSMIndexAccessor lsmAccessor = 
lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
                     try {
-                        
dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID).refresh();
+                        
dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, 
PARTITION).refresh();
                         ((AbstractLSMIOOperationCallback) 
lsmBtree.getIOOperationCallback()).updateLastLSN(0);
                         lsmAccessor.deleteComponents(predicate);
                     } catch (HyracksDataException e) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2f934e31/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/MultiPartitionLSMIndexTest.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/MultiPartitionLSMIndexTest.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/MultiPartitionLSMIndexTest.java
index 367d0b9..62705cc 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/MultiPartitionLSMIndexTest.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/MultiPartitionLSMIndexTest.java
@@ -251,7 +251,7 @@ public class MultiPartitionLSMIndexTest {
 
     /**
      * This test update partition 0, schedule flush and modify partition 1
-     * Then ensure that in partition 1, primary and secondary have different 
component ids
+     * Then ensure that in partition 1, primary and secondary have the same 
component ids
      */
     @Test
     public void testAllocateWhileFlushIsScheduled() {
@@ -400,7 +400,8 @@ public class MultiPartitionLSMIndexTest {
             AtomicBoolean arrivedAtSchduleFlush = new AtomicBoolean(false);
             AtomicBoolean finishedSchduleFlush = new AtomicBoolean(false);
             MutableBoolean proceedToScheduleFlush = new MutableBoolean(false);
-            addOpTrackerCallback(primaryLsmBtrees[0], new 
ITestOpCallback<Void>() {
+            // keep track of the flush of partition 1 since partitions 0 and 1 
are flushed seperately
+            addOpTrackerCallback(primaryLsmBtrees[1], new 
ITestOpCallback<Void>() {
                 @Override
                 public void before(Void t) {
                     synchronized (arrivedAtSchduleFlush) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2f934e31/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmBtreeIoOpCallbackFactory.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmBtreeIoOpCallbackFactory.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmBtreeIoOpCallbackFactory.java
index 4bfc581..c69ffe5 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmBtreeIoOpCallbackFactory.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmBtreeIoOpCallbackFactory.java
@@ -22,7 +22,6 @@ import java.util.List;
 
 import org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallback;
 import 
org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory;
-import org.apache.asterix.common.storage.IIndexCheckpointManager;
 import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.lsm.btree.impl.TestLsmBtree;
@@ -52,7 +51,7 @@ public class TestLsmBtreeIoOpCallbackFactory extends 
LSMBTreeIOOperationCallback
     }
 
     @Override
-    public synchronized ILSMIOOperationCallback createIoOpCallback(ILSMIndex 
index) {
+    public synchronized ILSMIOOperationCallback createIoOpCallback(ILSMIndex 
index) throws HyracksDataException {
         completedFlushes = 0;
         completedMerges = 0;
         rollbackFlushes = 0;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2f934e31/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTracker.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTracker.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTracker.java
index e376ff9..9a528d3 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTracker.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTracker.java
@@ -33,9 +33,9 @@ public class TestPrimaryIndexOperationTracker extends 
PrimaryIndexOperationTrack
 
     private final List<ITestOpCallback<Void>> callbacks = new ArrayList<>();
 
-    public TestPrimaryIndexOperationTracker(int datasetID, ILogManager 
logManager, DatasetInfo dsInfo,
+    public TestPrimaryIndexOperationTracker(int datasetID, int partition, 
ILogManager logManager, DatasetInfo dsInfo,
             ILSMComponentIdGenerator idGenerator) {
-        super(datasetID, logManager, dsInfo, idGenerator);
+        super(datasetID, partition, logManager, dsInfo, idGenerator);
     }
 
     public void addCallback(ITestOpCallback<Void> callback) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2f934e31/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTrackerFactory.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTrackerFactory.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTrackerFactory.java
index 5d7a7c6..e6b34b8 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTrackerFactory.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTrackerFactory.java
@@ -20,19 +20,22 @@ package org.apache.asterix.test.dataflow;
 
 import java.lang.reflect.Field;
 import java.lang.reflect.Modifier;
+import java.util.Map;
 
 import org.apache.asterix.common.api.INcApplicationContext;
 import org.apache.asterix.common.context.DatasetLifecycleManager;
 import org.apache.asterix.common.context.DatasetResource;
 import org.apache.asterix.common.context.PrimaryIndexOperationTracker;
+import org.apache.asterix.common.utils.StoragePathUtil;
 import 
org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexOperationTrackerFactory;
 import org.apache.hyracks.api.application.INCServiceContext;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
+import org.apache.hyracks.storage.common.IResource;
 
 public class TestPrimaryIndexOperationTrackerFactory extends 
PrimaryIndexOperationTrackerFactory {
 
     private static final long serialVersionUID = 1L;
-    private int datasetId;
+    private final int datasetId;
 
     public TestPrimaryIndexOperationTrackerFactory(int datasetId) {
         super(datasetId);
@@ -40,17 +43,19 @@ public class TestPrimaryIndexOperationTrackerFactory 
extends PrimaryIndexOperati
     }
 
     @Override
-    public ILSMOperationTracker getOperationTracker(INCServiceContext ctx) {
+    public ILSMOperationTracker getOperationTracker(INCServiceContext ctx, 
IResource resource) {
         try {
             INcApplicationContext appCtx = (INcApplicationContext) 
ctx.getApplicationContext();
             DatasetLifecycleManager dslcManager = (DatasetLifecycleManager) 
appCtx.getDatasetLifecycleManager();
             DatasetResource dsr = dslcManager.getDatasetLifecycle(datasetId);
-            PrimaryIndexOperationTracker opTracker = dsr.getOpTracker();
+            int partition = 
StoragePathUtil.getPartitionNumFromRelativePath(resource.getPath());
+            PrimaryIndexOperationTracker opTracker = 
dslcManager.getOperationTracker(datasetId, partition);
             if (!(opTracker instanceof TestPrimaryIndexOperationTracker)) {
-                Field opTrackerField = 
DatasetResource.class.getDeclaredField("datasetPrimaryOpTracker");
-                opTracker = new TestPrimaryIndexOperationTracker(datasetId,
-                        appCtx.getTransactionSubsystem().getLogManager(), 
dsr.getDatasetInfo(), dsr.getIdGenerator());
-                setFinal(opTrackerField, dsr, opTracker);
+                Field opTrackersField = 
DatasetResource.class.getDeclaredField("datasetPrimaryOpTrackers");
+                opTracker = new TestPrimaryIndexOperationTracker(datasetId, 
partition,
+                        appCtx.getTransactionSubsystem().getLogManager(), 
dsr.getDatasetInfo(),
+                        dslcManager.getComponentIdGenerator(datasetId, 
partition));
+                replaceMapEntry(opTrackersField, dsr, partition, opTracker);
             }
             return opTracker;
         } catch (Exception e) {
@@ -65,4 +70,14 @@ public class TestPrimaryIndexOperationTrackerFactory extends 
PrimaryIndexOperati
         modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL);
         field.set(obj, newValue);
     }
+
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    static void replaceMapEntry(Field field, Object obj, Object key, Object 
value)
+            throws Exception, IllegalAccessException {
+        field.setAccessible(true);
+        Field modifiersField = Field.class.getDeclaredField("modifiers");
+        modifiersField.setAccessible(true);
+        Map map = (Map) field.get(obj);
+        map.put(key, value);
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2f934e31/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/metadata/MetadataTxnTest.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/metadata/MetadataTxnTest.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/metadata/MetadataTxnTest.java
index a10c234..70e5f6e 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/metadata/MetadataTxnTest.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/metadata/MetadataTxnTest.java
@@ -42,6 +42,7 @@ import org.apache.asterix.metadata.entities.NodeGroup;
 import org.apache.asterix.metadata.utils.DatasetUtil;
 import org.apache.asterix.test.common.TestExecutor;
 import org.apache.asterix.testframework.context.TestCaseContext;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.hyracks.storage.am.lsm.common.impls.NoMergePolicyFactory;
 import org.junit.After;
 import org.junit.Assert;
@@ -203,8 +204,10 @@ public class MetadataTxnTest {
                 ((INcApplicationContext) 
integrationUtil.ncs[0].getApplicationContext()).getDatasetLifecycleManager();
         int maxMetadatasetId = 14;
         for (int i = 1; i <= maxMetadatasetId; i++) {
-            if (datasetLifecycleManager.getIndex(i, i) != null) {
-                final PrimaryIndexOperationTracker opTracker = 
datasetLifecycleManager.getOperationTracker(i);
+            ILSMIndex index = (ILSMIndex) datasetLifecycleManager.getIndex(i, 
i);
+            if (index != null) {
+                final PrimaryIndexOperationTracker opTracker =
+                        (PrimaryIndexOperationTracker) 
index.getOperationTracker();
                 Assert.assertEquals(0, opTracker.getNumActiveOperations());
             }
         }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2f934e31/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/LogManagerTest.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/LogManagerTest.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/LogManagerTest.java
index a1978eb..6a70a29 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/LogManagerTest.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/LogManagerTest.java
@@ -170,7 +170,7 @@ public class LogManagerTest {
         final TransactionOptions options = new 
TransactionOptions(ITransactionManager.AtomicityLevel.ENTITY_LEVEL);
         final ITransactionManager transactionManager = 
ncAppCtx.getTransactionSubsystem().getTransactionManager();
         final ITransactionContext txnCtx = 
transactionManager.beginTransaction(txnId, options);
-        txnCtx.register(resourceId, index, NoOpOperationCallback.INSTANCE, 
true);
+        txnCtx.register(resourceId, 0, index, NoOpOperationCallback.INSTANCE, 
true);
         return txnCtx;
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2f934e31/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
index 41c5ade..4441c6e 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
@@ -75,17 +75,19 @@ public interface IDatasetLifecycleManager extends 
IResourceLifecycleManager<IInd
      * creates (if necessary) and returns the primary index operation tracker 
of a dataset.
      *
      * @param datasetId
+     * @param partition
      * @return
      */
-    PrimaryIndexOperationTracker getOperationTracker(int datasetId);
+    PrimaryIndexOperationTracker getOperationTracker(int datasetId, int 
partition);
 
     /**
      * creates (if necessary) and returns the component Id generator of a 
dataset.
      *
      * @param datasetId
+     * @param partition
      * @return
      */
-    ILSMComponentIdGenerator getComponentIdGenerator(int datasetId);
+    ILSMComponentIdGenerator getComponentIdGenerator(int datasetId, int 
partition);
 
     /**
      * creates (if necessary) and returns the dataset virtual buffer caches.

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2f934e31/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
index 8a83c7b..fffc170 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
@@ -66,7 +66,7 @@ public interface INcApplicationContext extends 
IApplicationContext {
 
     IResourceIdFactory getResourceIdFactory();
 
-    ILSMOperationTracker getLSMBTreeOperationTracker(int datasetID);
+    ILSMOperationTracker getPrimaryOperationTracker(int datasetID, int 
partition);
 
     void initialize(boolean initialRun) throws IOException, ACIDException, 
AlgebricksException;
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2f934e31/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java
index e5fc998..41461ec 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java
@@ -23,7 +23,6 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Set;
-import java.util.stream.Collectors;
 
 import org.apache.asterix.common.api.IDatasetLifecycleManager;
 import org.apache.commons.lang3.tuple.Pair;
@@ -92,10 +91,10 @@ public class CorrelatedPrefixMergePolicy extends 
PrefixMergePolicy {
         ILSMComponent leftComponent = 
immutableComponents.get(mergeableIndexes.getLeft());
         ILSMComponent rightComponent = 
immutableComponents.get(mergeableIndexes.getRight());
         ILSMComponentId targetId = 
LSMComponentIdUtils.union(leftComponent.getId(), rightComponent.getId());
-        Set<IndexInfo> indexInfos = 
datasetLifecycleManager.getDatasetInfo(datasetId).getDatsetIndexInfos();
-        int partition = getIndexPartition(index, indexInfos);
-        triggerScheduledMerge(targetId,
-                indexInfos.stream().filter(info -> info.getPartition() == 
partition).collect(Collectors.toSet()));
+        int partition = ((PrimaryIndexOperationTracker) 
index.getOperationTracker()).getPartition();
+        Set<ILSMIndex> indexes =
+                
datasetLifecycleManager.getDatasetInfo(datasetId).getDatasetPartitionOpenIndexes(partition);
+        triggerScheduledMerge(targetId, indexes);
         return true;
     }
 
@@ -107,11 +106,8 @@ public class CorrelatedPrefixMergePolicy extends 
PrefixMergePolicy {
      * @param indexInfos
      * @throws HyracksDataException
      */
-    private void triggerScheduledMerge(ILSMComponentId targetId, 
Set<IndexInfo> indexInfos)
-            throws HyracksDataException {
-        for (IndexInfo info : indexInfos) {
-            ILSMIndex lsmIndex = info.getIndex();
-
+    private void triggerScheduledMerge(ILSMComponentId targetId, 
Set<ILSMIndex> indexes) throws HyracksDataException {
+        for (ILSMIndex lsmIndex : indexes) {
             List<ILSMDiskComponent> immutableComponents = new 
ArrayList<>(lsmIndex.getDiskComponents());
             if (isMergeOngoing(immutableComponents)) {
                 continue;
@@ -132,13 +128,4 @@ public class CorrelatedPrefixMergePolicy extends 
PrefixMergePolicy {
             accessor.scheduleMerge(lsmIndex.getIOOperationCallback(), 
mergableComponents);
         }
     }
-
-    private int getIndexPartition(ILSMIndex index, Set<IndexInfo> indexInfos) {
-        for (IndexInfo info : indexInfos) {
-            if (info.getIndex() == index) {
-                return info.getPartition();
-            }
-        }
-        return -1;
-    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2f934e31/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
index 9d63818..44baf77 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
@@ -30,6 +30,9 @@ import org.apache.logging.log4j.Logger;
 
 public class DatasetInfo extends Info implements Comparable<DatasetInfo> {
     private static final Logger LOGGER = LogManager.getLogger();
+    // partition -> index
+    private final Map<Integer, Set<IndexInfo>> partitionIndexes;
+    // resourceID -> index
     private final Map<Long, IndexInfo> indexes;
     private final int datasetID;
     private int numActiveIOOps;
@@ -40,6 +43,7 @@ public class DatasetInfo extends Info implements 
Comparable<DatasetInfo> {
     private boolean durable;
 
     public DatasetInfo(int datasetID) {
+        this.partitionIndexes = new HashMap<>();
         this.indexes = new HashMap<>();
         this.setLastAccess(-1);
         this.datasetID = datasetID;
@@ -69,26 +73,17 @@ public class DatasetInfo extends Info implements 
Comparable<DatasetInfo> {
         notifyAll();
     }
 
-    public synchronized Set<ILSMIndex> getDatasetIndexes() {
-        Set<ILSMIndex> datasetIndexes = new HashSet<>();
-        for (IndexInfo iInfo : getIndexes().values()) {
-            if (iInfo.isOpen()) {
-                datasetIndexes.add(iInfo.getIndex());
-            }
-        }
-
-        return datasetIndexes;
-    }
-
-    public synchronized Set<IndexInfo> getDatsetIndexInfos() {
-        Set<IndexInfo> infos = new HashSet<>();
-        for (IndexInfo iInfo : getIndexes().values()) {
-            if (iInfo.isOpen()) {
-                infos.add(iInfo);
+    public synchronized Set<ILSMIndex> getDatasetPartitionOpenIndexes(int 
partition) {
+        Set<ILSMIndex> indexSet = new HashSet<>();
+        Set<IndexInfo> partitionIndexInfos = 
this.partitionIndexes.get(partition);
+        if (partitionIndexInfos != null) {
+            for (IndexInfo iInfo : partitionIndexInfos) {
+                if (iInfo.isOpen()) {
+                    indexSet.add(iInfo.getIndex());
+                }
             }
         }
-
-        return infos;
+        return indexSet;
     }
 
     @Override
@@ -160,6 +155,18 @@ public class DatasetInfo extends Info implements 
Comparable<DatasetInfo> {
         return indexes;
     }
 
+    public synchronized void addIndex(long resourceID, IndexInfo indexInfo) {
+        indexes.put(resourceID, indexInfo);
+        partitionIndexes.computeIfAbsent(indexInfo.getPartition(), partition 
-> new HashSet<>()).add(indexInfo);
+    }
+
+    public synchronized void removeIndex(long resourceID) {
+        IndexInfo info = indexes.remove(resourceID);
+        if (info != null) {
+            partitionIndexes.get(info.getPartition()).remove(info);
+        }
+    }
+
     public boolean isRegistered() {
         return isRegistered;
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2f934e31/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLSMComponentIdGeneratorFactory.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLSMComponentIdGeneratorFactory.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLSMComponentIdGeneratorFactory.java
index 7b8397c..83e3144 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLSMComponentIdGeneratorFactory.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLSMComponentIdGeneratorFactory.java
@@ -21,9 +21,12 @@ package org.apache.asterix.common.context;
 
 import org.apache.asterix.common.api.IDatasetLifecycleManager;
 import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.utils.StoragePathUtil;
 import org.apache.hyracks.api.application.INCServiceContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator;
 import 
org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGeneratorFactory;
+import org.apache.hyracks.storage.common.IResource;
 
 /**
  * This factory implementation is used by AsterixDB layer so that indexes of a 
dataset (/partition)
@@ -41,10 +44,12 @@ public class DatasetLSMComponentIdGeneratorFactory 
implements ILSMComponentIdGen
     }
 
     @Override
-    public ILSMComponentIdGenerator getComponentIdGenerator(INCServiceContext 
serviceCtx) {
+    public ILSMComponentIdGenerator getComponentIdGenerator(INCServiceContext 
serviceCtx, IResource resource)
+            throws HyracksDataException {
         IDatasetLifecycleManager dslcManager =
                 ((INcApplicationContext) 
serviceCtx.getApplicationContext()).getDatasetLifecycleManager();
-        return dslcManager.getComponentIdGenerator(datasetId);
+        int partition = 
StoragePathUtil.getPartitionNumFromRelativePath(resource.getPath());
+        return dslcManager.getComponentIdGenerator(datasetId, partition);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2f934e31/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index 3a70515..1a61b8f 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -21,6 +21,7 @@ package org.apache.asterix.common.context;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -139,7 +140,7 @@ public class DatasetLifecycleManager implements 
IDatasetLifecycleManager, ILifeC
             throw HyracksDataException.create(ErrorCode.INDEX_DOES_NOT_EXIST);
         }
 
-        PrimaryIndexOperationTracker opTracker = dsr.getOpTracker();
+        PrimaryIndexOperationTracker opTracker = 
dsr.getOpTracker(iInfo.getPartition());
         if (iInfo.getReferenceCount() != 0 || (opTracker != null && 
opTracker.getNumActiveOperations() != 0)) {
             if (LOGGER.isErrorEnabled()) {
                 final String logMsg = String.format(
@@ -155,7 +156,7 @@ public class DatasetLifecycleManager implements 
IDatasetLifecycleManager, ILifeC
         DatasetInfo dsInfo = dsr.getDatasetInfo();
         dsInfo.waitForIO();
         closeIndex(iInfo);
-        dsInfo.getIndexes().remove(resourceID);
+        dsInfo.removeIndex(resourceID);
         if (dsInfo.getReferenceCount() == 0 && dsInfo.isOpen() && 
dsInfo.getIndexes().isEmpty()
                 && !dsInfo.isExternal()) {
             removeDatasetFromCache(dsInfo.getDatasetID());
@@ -203,10 +204,7 @@ public class DatasetLifecycleManager implements 
IDatasetLifecycleManager, ILifeC
         List<DatasetResource> datasetsResources = new 
ArrayList<>(datasets.values());
         Collections.sort(datasetsResources);
         for (DatasetResource dsr : datasetsResources) {
-            PrimaryIndexOperationTracker opTracker = dsr.getOpTracker();
-            if (opTracker != null && opTracker.getNumActiveOperations() == 0
-                    && dsr.getDatasetInfo().getReferenceCount() == 0 && 
dsr.getDatasetInfo().isOpen()
-                    && !dsr.isMetadataDataset()) {
+            if (isCandidateDatasetForEviction(dsr)) {
                 closeDataset(dsr);
                 LOGGER.info(() -> "Evicted Dataset" + dsr.getDatasetID());
                 return true;
@@ -215,14 +213,18 @@ public class DatasetLifecycleManager implements 
IDatasetLifecycleManager, ILifeC
         return false;
     }
 
-    private static void flushAndWaitForIO(DatasetInfo dsInfo, IndexInfo iInfo) 
throws HyracksDataException {
-        if (iInfo.isOpen()) {
-            ILSMIndexAccessor accessor = 
iInfo.getIndex().createAccessor(NoOpIndexAccessParameters.INSTANCE);
-            accessor.scheduleFlush(iInfo.getIndex().getIOOperationCallback());
+    private boolean isCandidateDatasetForEviction(DatasetResource dsr) {
+        for (PrimaryIndexOperationTracker opTracker : dsr.getOpTrackers()) {
+            if (opTracker.getNumActiveOperations() != 0) {
+                return false;
+            }
+        }
+        if (dsr.getDatasetInfo().getReferenceCount() != 0 || 
!dsr.getDatasetInfo().isOpen()
+                || dsr.isMetadataDataset()) {
+            return false;
         }
 
-        // Wait for the above flush op.
-        dsInfo.waitForIO();
+        return true;
     }
 
     public DatasetResource getDatasetLifecycle(int did) {
@@ -234,12 +236,9 @@ public class DatasetLifecycleManager implements 
IDatasetLifecycleManager, ILifeC
             dsr = datasets.get(did);
             if (dsr == null) {
                 DatasetInfo dsInfo = new DatasetInfo(did);
-                ILSMComponentIdGenerator idGenerator = new 
LSMComponentIdGenerator();
-                PrimaryIndexOperationTracker opTracker =
-                        new PrimaryIndexOperationTracker(did, logManager, 
dsInfo, idGenerator);
                 DatasetVirtualBufferCaches vbcs = new 
DatasetVirtualBufferCaches(did, storageProperties,
                         memoryManager.getNumPages(did), numPartitions);
-                dsr = new DatasetResource(dsInfo, opTracker, vbcs, 
idGenerator);
+                dsr = new DatasetResource(dsInfo, vbcs);
                 datasets.put(did, dsr);
             }
             return dsr;
@@ -318,13 +317,33 @@ public class DatasetLifecycleManager implements 
IDatasetLifecycleManager, ILifeC
     }
 
     @Override
-    public PrimaryIndexOperationTracker getOperationTracker(int datasetId) {
-        return datasets.get(datasetId).getOpTracker();
+    public synchronized PrimaryIndexOperationTracker getOperationTracker(int 
datasetId, int partition) {
+        DatasetResource dataset = datasets.get(datasetId);
+        PrimaryIndexOperationTracker opTracker = 
dataset.getOpTracker(partition);
+        if (opTracker == null) {
+            populateOpTrackerAndIdGenerator(dataset, partition);
+            opTracker = dataset.getOpTracker(partition);
+        }
+        return opTracker;
     }
 
     @Override
-    public ILSMComponentIdGenerator getComponentIdGenerator(int datasetId) {
-        return datasets.get(datasetId).getIdGenerator();
+    public synchronized ILSMComponentIdGenerator getComponentIdGenerator(int 
datasetId, int partition) {
+        DatasetResource dataset = datasets.get(datasetId);
+        ILSMComponentIdGenerator generator = 
dataset.getComponentIdGenerator(partition);
+        if (generator == null) {
+            populateOpTrackerAndIdGenerator(dataset, partition);
+            generator = dataset.getComponentIdGenerator(partition);
+        }
+        return generator;
+    }
+
+    private void populateOpTrackerAndIdGenerator(DatasetResource dataset, int 
partition) {
+        ILSMComponentIdGenerator idGenerator = new LSMComponentIdGenerator();
+        PrimaryIndexOperationTracker opTracker = new 
PrimaryIndexOperationTracker(dataset.getDatasetID(), partition,
+                logManager, dataset.getDatasetInfo(), idGenerator);
+        dataset.setPrimaryIndexOperationTracker(partition, opTracker);
+        dataset.setIdGenerator(partition, idGenerator);
     }
 
     private void validateDatasetLifecycleManagerState() throws 
HyracksDataException {
@@ -357,31 +376,40 @@ public class DatasetLifecycleManager implements 
IDatasetLifecycleManager, ILifeC
     public synchronized void scheduleAsyncFlushForLaggingDatasets(long 
targetLSN) throws HyracksDataException {
         //schedule flush for datasets with min LSN (Log Serial Number) < 
targetLSN
         for (DatasetResource dsr : datasets.values()) {
-            PrimaryIndexOperationTracker opTracker = dsr.getOpTracker();
-            synchronized (opTracker) {
-                for (IndexInfo iInfo : dsr.getIndexes().values()) {
-                    AbstractLSMIOOperationCallback ioCallback =
-                            (AbstractLSMIOOperationCallback) 
iInfo.getIndex().getIOOperationCallback();
-                    if (!(iInfo.getIndex().isCurrentMutableComponentEmpty() || 
ioCallback.hasPendingFlush()
-                            || opTracker.isFlushLogCreated() || 
opTracker.isFlushOnExit())) {
-                        long firstLSN = ioCallback.getFirstLSN();
-                        if (firstLSN < targetLSN) {
-                            LOGGER.info("Checkpoint flush dataset {}", 
dsr.getDatasetID());
-                            opTracker.setFlushOnExit(true);
-                            if (opTracker.getNumActiveOperations() == 0) {
-                                // No Modify operations currently, we need to 
trigger the flush and we can do so safely
-                                opTracker.flushIfRequested();
-                            }
-                            break;
-                        }
+            for (PrimaryIndexOperationTracker opTracker : dsr.getOpTrackers()) 
{
+                // check all partitions
+                synchronized (opTracker) {
+                    scheduleAsyncFlushForLaggingDatasetPartition(dsr, 
opTracker, targetLSN);
+                }
+            }
+        }
+    }
+
+    private void scheduleAsyncFlushForLaggingDatasetPartition(DatasetResource 
dsr,
+            PrimaryIndexOperationTracker opTracker, long targetLSN) throws 
HyracksDataException {
+        int partition = opTracker.getPartition();
+        for (ILSMIndex lsmIndex : 
dsr.getDatasetInfo().getDatasetPartitionOpenIndexes(partition)) {
+            AbstractLSMIOOperationCallback ioCallback =
+                    (AbstractLSMIOOperationCallback) 
lsmIndex.getIOOperationCallback();
+            if (!(lsmIndex.isCurrentMutableComponentEmpty() || 
ioCallback.hasPendingFlush()
+                    || opTracker.isFlushLogCreated() || 
opTracker.isFlushOnExit())) {
+                long firstLSN = ioCallback.getFirstLSN();
+                if (firstLSN < targetLSN) {
+                    LOGGER.info("Checkpoint flush dataset {} partition {}", 
dsr.getDatasetID(), partition);
+                    opTracker.setFlushOnExit(true);
+                    if (opTracker.getNumActiveOperations() == 0) {
+                        // No Modify operations currently, we need to trigger 
the flush and we can do so safely
+                        opTracker.flushIfRequested();
                     }
+                    break;
                 }
             }
         }
     }
 
     /*
-     * This method can only be called asynchronously safely if we're sure no 
modify operation will take place until the flush is scheduled
+     * This method can only be called asynchronously safely if we're sure no 
modify operation
+     * will take place until the flush is scheduled
      */
     private void flushDatasetOpenIndexes(DatasetResource dsr, boolean 
asyncFlush) throws HyracksDataException {
         DatasetInfo dsInfo = dsr.getDatasetInfo();
@@ -389,53 +417,61 @@ public class DatasetLifecycleManager implements 
IDatasetLifecycleManager, ILifeC
             // no memory components for external dataset
             return;
         }
-        PrimaryIndexOperationTracker primaryOpTracker = dsr.getOpTracker();
-        if (primaryOpTracker.getNumActiveOperations() > 0) {
-            throw new IllegalStateException(
-                    "flushDatasetOpenIndexes is called on a dataset with 
currently active operations");
-        }
-
-        ILSMComponentIdGenerator idGenerator = 
getComponentIdGenerator(dsInfo.getDatasetID());
-        idGenerator.refresh();
+        for (PrimaryIndexOperationTracker primaryOpTracker : 
dsr.getOpTrackers()) {
+            // flush each partition one by one
+            if (primaryOpTracker.getNumActiveOperations() > 0) {
+                throw new IllegalStateException(
+                        "flushDatasetOpenIndexes is called on a dataset with 
currently active operations");
+            }
+            int partition = primaryOpTracker.getPartition();
+            Collection<ILSMIndex> indexes = 
dsInfo.getDatasetPartitionOpenIndexes(partition);
+            ILSMComponentIdGenerator idGenerator = 
getComponentIdGenerator(dsInfo.getDatasetID(), partition);
+            idGenerator.refresh();
+
+            if (dsInfo.isDurable()) {
+                synchronized (logRecord) {
+                    TransactionUtil.formFlushLogRecord(logRecord, 
dsInfo.getDatasetID(), null);
+                    try {
+                        logManager.log(logRecord);
+                    } catch (ACIDException e) {
+                        throw new HyracksDataException("could not write flush 
log while closing dataset", e);
+                    }
 
-        if (dsInfo.isDurable()) {
-            synchronized (logRecord) {
-                TransactionUtil.formFlushLogRecord(logRecord, 
dsInfo.getDatasetID(), null);
-                try {
-                    logManager.log(logRecord);
-                } catch (ACIDException e) {
-                    throw new HyracksDataException("could not write flush log 
while closing dataset", e);
+                    try {
+                        //notification will come from LogBuffer class 
(notifyFlushTerminator)
+                        logRecord.wait();
+                    } catch (InterruptedException e) {
+                        Thread.currentThread().interrupt();
+                        throw HyracksDataException.create(e);
+                    }
                 }
+            }
+            for (ILSMIndex index : indexes) {
+                //update resource lsn
+                AbstractLSMIOOperationCallback ioOpCallback =
+                        (AbstractLSMIOOperationCallback) 
index.getIOOperationCallback();
+                ioOpCallback.updateLastLSN(logRecord.getLSN());
+            }
 
-                try {
-                    //notification will come from LogPage class 
(notifyFlushTerminator)
-                    logRecord.wait();
-                } catch (InterruptedException e) {
-                    throw new HyracksDataException(e);
+            if (asyncFlush) {
+                for (ILSMIndex index : indexes) {
+                    ILSMIndexAccessor accessor = 
index.createAccessor(NoOpIndexAccessParameters.INSTANCE);
+                    accessor.scheduleFlush(index.getIOOperationCallback());
+                }
+            } else {
+                for (ILSMIndex index : indexes) {
+                    // TODO: This is not efficient since we flush the indexes 
sequentially.
+                    // Think of a way to allow submitting the flush requests 
concurrently.
+                    // We don't do them concurrently because this may lead to 
a deadlock scenario
+                    // between the DatasetLifeCycleManager and the 
PrimaryIndexOperationTracker.
+                    ILSMIndexAccessor accessor = 
index.createAccessor(NoOpIndexAccessParameters.INSTANCE);
+                    accessor.scheduleFlush(index.getIOOperationCallback());
+                    // Wait for the above flush op.
+                    dsInfo.waitForIO();
                 }
             }
         }
 
-        for (IndexInfo iInfo : dsInfo.getIndexes().values()) {
-            //update resource lsn
-            AbstractLSMIOOperationCallback ioOpCallback =
-                    (AbstractLSMIOOperationCallback) 
iInfo.getIndex().getIOOperationCallback();
-            ioOpCallback.updateLastLSN(logRecord.getLSN());
-        }
-
-        if (asyncFlush) {
-            for (IndexInfo iInfo : dsInfo.getIndexes().values()) {
-                ILSMIndexAccessor accessor = 
iInfo.getIndex().createAccessor(NoOpIndexAccessParameters.INSTANCE);
-                
accessor.scheduleFlush(iInfo.getIndex().getIOOperationCallback());
-            }
-        } else {
-            for (IndexInfo iInfo : dsInfo.getIndexes().values()) {
-                // TODO: This is not efficient since we flush the indexes 
sequentially.
-                // Think of a way to allow submitting the flush requests 
concurrently. We don't do them concurrently because this
-                // may lead to a deadlock scenario between the 
DatasetLifeCycleManager and the PrimaryIndexOperationTracker.
-                flushAndWaitForIO(dsInfo, iInfo);
-            }
-        }
     }
 
     private void closeDataset(DatasetResource dsr) throws HyracksDataException 
{

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2f934e31/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
index c02de7e..8dcae23 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
@@ -18,6 +18,8 @@
  */
 package org.apache.asterix.common.context;
 
+import java.util.Collection;
+import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.asterix.common.dataflow.DatasetLocalResource;
@@ -41,17 +43,16 @@ import org.apache.hyracks.storage.common.LocalResource;
  */
 public class DatasetResource implements Comparable<DatasetResource> {
     private final DatasetInfo datasetInfo;
-    private final PrimaryIndexOperationTracker datasetPrimaryOpTracker;
     private final DatasetVirtualBufferCaches datasetVirtualBufferCaches;
-    private final ILSMComponentIdGenerator datasetComponentIdGenerator;
 
-    public DatasetResource(DatasetInfo datasetInfo, 
PrimaryIndexOperationTracker datasetPrimaryOpTracker,
-            DatasetVirtualBufferCaches datasetVirtualBufferCaches,
-            ILSMComponentIdGenerator datasetComponentIdGenerator) {
+    private final Map<Integer, PrimaryIndexOperationTracker> 
datasetPrimaryOpTrackers;
+    private final Map<Integer, ILSMComponentIdGenerator> 
datasetComponentIdGenerators;
+
+    public DatasetResource(DatasetInfo datasetInfo, DatasetVirtualBufferCaches 
datasetVirtualBufferCaches) {
         this.datasetInfo = datasetInfo;
-        this.datasetPrimaryOpTracker = datasetPrimaryOpTracker;
         this.datasetVirtualBufferCaches = datasetVirtualBufferCaches;
-        this.datasetComponentIdGenerator = datasetComponentIdGenerator;
+        this.datasetPrimaryOpTrackers = new HashMap<>();
+        this.datasetComponentIdGenerators = new HashMap<>();
     }
 
     public boolean isRegistered() {
@@ -108,7 +109,8 @@ public class DatasetResource implements 
Comparable<DatasetResource> {
         if (index == null) {
             throw new HyracksDataException("Attempt to register a null index");
         }
-        datasetInfo.getIndexes().put(resourceID, new IndexInfo(index, 
datasetInfo.getDatasetID(), resource,
+
+        datasetInfo.addIndex(resourceID, new IndexInfo(index, 
datasetInfo.getDatasetID(), resource,
                 ((DatasetLocalResource) 
resource.getResource()).getPartition()));
     }
 
@@ -116,12 +118,31 @@ public class DatasetResource implements 
Comparable<DatasetResource> {
         return datasetInfo;
     }
 
-    public PrimaryIndexOperationTracker getOpTracker() {
-        return datasetPrimaryOpTracker;
+    public PrimaryIndexOperationTracker getOpTracker(int partition) {
+        return datasetPrimaryOpTrackers.get(partition);
+    }
+
+    public Collection<PrimaryIndexOperationTracker> getOpTrackers() {
+        return datasetPrimaryOpTrackers.values();
+    }
+
+    public ILSMComponentIdGenerator getComponentIdGenerator(int partition) {
+        return datasetComponentIdGenerators.get(partition);
     }
 
-    public ILSMComponentIdGenerator getIdGenerator() {
-        return datasetComponentIdGenerator;
+    public void setPrimaryIndexOperationTracker(int partition, 
PrimaryIndexOperationTracker opTracker) {
+        if (datasetPrimaryOpTrackers.containsKey(partition)) {
+            throw new IllegalStateException(
+                    "PrimaryIndexOperationTracker has already been set for 
partition " + partition);
+        }
+        datasetPrimaryOpTrackers.put(partition, opTracker);
+    }
+
+    public void setIdGenerator(int partition, ILSMComponentIdGenerator 
idGenerator) {
+        if (datasetComponentIdGenerators.containsKey(partition)) {
+            throw new IllegalStateException("LSMComponentIdGenerator has 
already been set for partition " + partition);
+        }
+        datasetComponentIdGenerators.put(partition, idGenerator);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2f934e31/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
index 5170310..1a76b66 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
@@ -43,6 +43,7 @@ import 
org.apache.hyracks.storage.common.ISearchOperationCallback;
 
 public class PrimaryIndexOperationTracker extends BaseOperationTracker {
 
+    private final int partition;
     // Number of active operations on an ILSMIndex instance.
     private final AtomicInteger numActiveOperations;
     private final ILogManager logManager;
@@ -50,9 +51,10 @@ public class PrimaryIndexOperationTracker extends 
BaseOperationTracker {
     private boolean flushOnExit = false;
     private boolean flushLogCreated = false;
 
-    public PrimaryIndexOperationTracker(int datasetID, ILogManager logManager, 
DatasetInfo dsInfo,
+    public PrimaryIndexOperationTracker(int datasetID, int partition, 
ILogManager logManager, DatasetInfo dsInfo,
             ILSMComponentIdGenerator idGenerator) {
         super(datasetID, dsInfo);
+        this.partition = partition;
         this.logManager = logManager;
         this.numActiveOperations = new AtomicInteger();
         this.idGenerator = idGenerator;
@@ -100,7 +102,7 @@ public class PrimaryIndexOperationTracker extends 
BaseOperationTracker {
         // or if there is a flush scheduled by the checkpoint (flushOnExit), 
then schedule it
 
         boolean needsFlush = false;
-        Set<ILSMIndex> indexes = dsInfo.getDatasetIndexes();
+        Set<ILSMIndex> indexes = 
dsInfo.getDatasetPartitionOpenIndexes(partition);
 
         if (!flushOnExit) {
             for (ILSMIndex lsmIndex : indexes) {
@@ -146,7 +148,7 @@ public class PrimaryIndexOperationTracker extends 
BaseOperationTracker {
     //This method is called sequentially by LogPage.notifyFlushTerminator in 
the sequence flushes were scheduled.
     public synchronized void triggerScheduleFlush(LogRecord logRecord) throws 
HyracksDataException {
         idGenerator.refresh();
-        for (ILSMIndex lsmIndex : dsInfo.getDatasetIndexes()) {
+        for (ILSMIndex lsmIndex : 
dsInfo.getDatasetPartitionOpenIndexes(partition)) {
             //get resource
             ILSMIndexAccessor accessor = 
lsmIndex.createAccessor(NoOpIndexAccessParameters.INSTANCE);
             //update resource lsn
@@ -199,4 +201,8 @@ public class PrimaryIndexOperationTracker extends 
BaseOperationTracker {
         return flushLogCreated;
     }
 
+    public int getPartition() {
+        return partition;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2f934e31/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIndexIOOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIndexIOOperationCallbackFactory.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIndexIOOperationCallbackFactory.java
index ed56ab1..5b9883c 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIndexIOOperationCallbackFactory.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIndexIOOperationCallbackFactory.java
@@ -24,11 +24,13 @@ import java.io.ObjectStreamException;
 import org.apache.asterix.common.api.INcApplicationContext;
 import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
 import org.apache.hyracks.api.application.INCServiceContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator;
 import 
org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGeneratorFactory;
 import 
org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
 import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId;
+import org.apache.hyracks.storage.common.IResource;
 
 public abstract class AbstractLSMIndexIOOperationCallbackFactory implements 
ILSMIOOperationCallbackFactory {
 
@@ -38,17 +40,20 @@ public abstract class 
AbstractLSMIndexIOOperationCallbackFactory implements ILSM
 
     protected transient INCServiceContext ncCtx;
 
+    protected transient IResource resource;
+
     public 
AbstractLSMIndexIOOperationCallbackFactory(ILSMComponentIdGeneratorFactory 
idGeneratorFactory) {
         this.idGeneratorFactory = idGeneratorFactory;
     }
 
     @Override
-    public void initialize(INCServiceContext ncCtx) {
+    public void initialize(INCServiceContext ncCtx, IResource resource) {
         this.ncCtx = ncCtx;
+        this.resource = resource;
     }
 
-    protected ILSMComponentIdGenerator getComponentIdGenerator() {
-        return idGeneratorFactory.getComponentIdGenerator(ncCtx);
+    protected ILSMComponentIdGenerator getComponentIdGenerator() throws 
HyracksDataException {
+        return idGeneratorFactory.getComponentIdGenerator(ncCtx, resource);
     }
 
     protected IIndexCheckpointManagerProvider 
getIndexCheckpointManagerProvider() {
@@ -60,7 +65,7 @@ public abstract class 
AbstractLSMIndexIOOperationCallbackFactory implements ILSM
             private static final long serialVersionUID = 1L;
 
             @Override
-            public ILSMComponentIdGenerator 
getComponentIdGenerator(INCServiceContext serviceCtx) {
+            public ILSMComponentIdGenerator 
getComponentIdGenerator(INCServiceContext serviceCtx, IResource resource) {
                 // used for backward compatibility
                 // if idGeneratorFactory is not set for legacy lsm indexes, we 
return a default
                 // component id generator which always generates the missing 
component id.

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2f934e31/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallbackFactory.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallbackFactory.java
index 95245cb..97badb2 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallbackFactory.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallbackFactory.java
@@ -19,6 +19,7 @@
 
 package org.apache.asterix.common.ioopcallbacks;
 
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 import 
org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGeneratorFactory;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
@@ -32,7 +33,7 @@ public class LSMBTreeIOOperationCallbackFactory extends 
AbstractLSMIndexIOOperat
     }
 
     @Override
-    public ILSMIOOperationCallback createIoOpCallback(ILSMIndex index) {
+    public ILSMIOOperationCallback createIoOpCallback(ILSMIndex index) throws 
HyracksDataException {
         return new LSMBTreeIOOperationCallback(index, 
getComponentIdGenerator(), getIndexCheckpointManagerProvider());
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2f934e31/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallbackFactory.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallbackFactory.java
index 6c75ed6..9b32345 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallbackFactory.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallbackFactory.java
@@ -18,6 +18,7 @@
  */
 package org.apache.asterix.common.ioopcallbacks;
 
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 import 
org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGeneratorFactory;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
@@ -31,7 +32,7 @@ public class LSMBTreeWithBuddyIOOperationCallbackFactory 
extends AbstractLSMInde
     }
 
     @Override
-    public ILSMIOOperationCallback createIoOpCallback(ILSMIndex index) {
+    public ILSMIOOperationCallback createIoOpCallback(ILSMIndex index) throws 
HyracksDataException {
         return new LSMBTreeWithBuddyIOOperationCallback(index, 
getComponentIdGenerator(),
                 getIndexCheckpointManagerProvider());
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2f934e31/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallbackFactory.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallbackFactory.java
index fb73d19..766ef95 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallbackFactory.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallbackFactory.java
@@ -19,6 +19,7 @@
 
 package org.apache.asterix.common.ioopcallbacks;
 
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 import 
org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGeneratorFactory;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
@@ -32,7 +33,7 @@ public class LSMInvertedIndexIOOperationCallbackFactory 
extends AbstractLSMIndex
     }
 
     @Override
-    public ILSMIOOperationCallback createIoOpCallback(ILSMIndex index) {
+    public ILSMIOOperationCallback createIoOpCallback(ILSMIndex index) throws 
HyracksDataException {
         return new LSMInvertedIndexIOOperationCallback(index, 
getComponentIdGenerator(),
                 getIndexCheckpointManagerProvider());
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2f934e31/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallbackFactory.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallbackFactory.java
index 94be0bb..3a0afa8 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallbackFactory.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallbackFactory.java
@@ -19,6 +19,7 @@
 
 package org.apache.asterix.common.ioopcallbacks;
 
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 import 
org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGeneratorFactory;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
@@ -32,7 +33,7 @@ public class LSMRTreeIOOperationCallbackFactory extends 
AbstractLSMIndexIOOperat
     }
 
     @Override
-    public ILSMIOOperationCallback createIoOpCallback(ILSMIndex index) {
+    public ILSMIOOperationCallback createIoOpCallback(ILSMIndex index) throws 
HyracksDataException {
         return new LSMRTreeIOOperationCallback(index, 
getComponentIdGenerator(), getIndexCheckpointManagerProvider());
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2f934e31/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionContext.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionContext.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionContext.java
index c4a2d03..a3d5bc5 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionContext.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionContext.java
@@ -38,11 +38,13 @@ public interface ITransactionContext {
      * transaction.
      *
      * @param resourceId
+     * @param partition
      * @param index
      * @param callback
      * @param primaryIndex
      */
-    void register(long resourceId, ILSMIndex index, 
IModificationOperationCallback callback, boolean primaryIndex);
+    void register(long resourceId, int partition, ILSMIndex index, 
IModificationOperationCallback callback,
+            boolean primaryIndex);
 
     /**
      * Gets the unique transaction id.
@@ -135,8 +137,10 @@ public interface ITransactionContext {
      * Called to notify the transaction that an entity commit
      * log belonging to this transaction has been flushed to
      * disk.
+     *
+     * @param partition
      */
-    void notifyEntityCommitted();
+    void notifyEntityCommitted(int partition);
 
     /**
      * Called after an operation is performed on index

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2f934e31/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java
 
b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java
index 3aa7b17..f9f742a 100644
--- 
a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java
+++ 
b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java
@@ -22,15 +22,14 @@ package org.apache.asterix.test.context;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 
 import org.apache.asterix.common.api.IDatasetLifecycleManager;
 import org.apache.asterix.common.context.CorrelatedPrefixMergePolicy;
 import org.apache.asterix.common.context.DatasetInfo;
 import org.apache.asterix.common.context.IndexInfo;
+import org.apache.asterix.common.context.PrimaryIndexOperationTracker;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import 
org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent.ComponentState;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
@@ -60,6 +59,8 @@ public class CorrelatedPrefixMergePolicyTest extends TestCase 
{
 
     private final int DATASET_ID = 1;
 
+    private long nextResourceId = 0;
+
     @Test
     public void testBasic() {
         try {
@@ -183,19 +184,15 @@ public class CorrelatedPrefixMergePolicyTest extends 
TestCase {
         }
     }
 
-    private ILSMMergePolicy mockMergePolicy(IndexInfo... indexes) {
+    private ILSMMergePolicy mockMergePolicy(IndexInfo... indexInfos) {
         Map<String, String> properties = new HashMap<>();
         properties.put("max-tolerance-component-count", 
String.valueOf(MAX_COMPONENT_COUNT));
         properties.put("max-mergable-component-size", 
String.valueOf(MAX_COMPONENT_SIZE));
 
-        Set<IndexInfo> indexInfos = new HashSet<>();
-        for (IndexInfo info : indexes) {
-            indexInfos.add(info);
+        DatasetInfo dsInfo = new DatasetInfo(DATASET_ID);
+        for (IndexInfo index : indexInfos) {
+            dsInfo.addIndex(index.getResourceId(), index);
         }
-
-        DatasetInfo dsInfo = Mockito.mock(DatasetInfo.class);
-        Mockito.when(dsInfo.getDatsetIndexInfos()).thenReturn(indexInfos);
-
         IDatasetLifecycleManager manager = 
Mockito.mock(IDatasetLifecycleManager.class);
         Mockito.when(manager.getDatasetInfo(DATASET_ID)).thenReturn(dsInfo);
 
@@ -238,8 +235,16 @@ public class CorrelatedPrefixMergePolicyTest extends 
TestCase {
 
         
Mockito.when(index.createAccessor(Mockito.any(IIndexAccessParameters.class))).thenReturn(accessor);
         Mockito.when(index.isPrimaryIndex()).thenReturn(isPrimary);
+        if (isPrimary) {
+            PrimaryIndexOperationTracker opTracker = 
Mockito.mock(PrimaryIndexOperationTracker.class);
+            Mockito.when(opTracker.getPartition()).thenReturn(partition);
+            Mockito.when(index.getOperationTracker()).thenReturn(opTracker);
+        }
         final LocalResource localResource = Mockito.mock(LocalResource.class);
-        return new IndexInfo(index, DATASET_ID, localResource, partition);
+        Mockito.when(localResource.getId()).thenReturn(nextResourceId++);
+        IndexInfo indexInfo = new IndexInfo(index, DATASET_ID, localResource, 
partition);
+        indexInfo.setOpen(true);
+        return indexInfo;
     }
 
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2f934e31/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
index 6634e51..e8f2595 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
@@ -42,6 +42,7 @@ import 
org.apache.asterix.common.transactions.ITransactionSubsystem;
 import org.apache.asterix.common.transactions.ImmutableDatasetId;
 import org.apache.asterix.common.transactions.TransactionOptions;
 import org.apache.asterix.common.transactions.TxnId;
+import org.apache.asterix.common.utils.StoragePathUtil;
 import org.apache.asterix.external.indexing.ExternalFile;
 import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
 import org.apache.asterix.metadata.api.ExtensionMetadataDataset;
@@ -474,7 +475,9 @@ public class MetadataNode implements IMetadataNode {
             IIndexAccessParameters iap = new 
IndexAccessParameters(modCallback, NoOpOperationCallback.INSTANCE);
             ILSMIndexAccessor indexAccessor = lsmIndex.createAccessor(iap);
             txnCtx.setWriteTxn(true);
-            txnCtx.register(metadataIndex.getResourceId(), lsmIndex, 
modCallback, metadataIndex.isPrimaryIndex());
+            txnCtx.register(metadataIndex.getResourceId(),
+                    
StoragePathUtil.getPartitionNumFromRelativePath(resourceName), lsmIndex, 
modCallback,
+                    metadataIndex.isPrimaryIndex());
             LSMIndexUtil.checkAndSetFirstLSN((AbstractLSMIndex) lsmIndex, 
transactionSubsystem.getLogManager());
             switch (op) {
                 case INSERT:

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2f934e31/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
index 9753bcf..9ebd21b 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
@@ -63,7 +63,6 @@ import org.apache.asterix.metadata.utils.MetadataConstants;
 import org.apache.asterix.metadata.utils.MetadataUtil;
 import org.apache.asterix.om.types.IAType;
 import org.apache.asterix.runtime.formats.NonTaggedDataFormat;
-import org.apache.asterix.runtime.utils.ClusterStateManager;
 import 
org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexOperationTrackerFactory;
 import 
org.apache.asterix.transaction.management.opcallbacks.SecondaryIndexOperationTrackerFactory;
 import 
org.apache.asterix.transaction.management.resource.DatasetLocalResourceFactory;
@@ -476,4 +475,5 @@ public class MetadataBootstrap {
     public static void setNewUniverse(boolean isNewUniverse) {
         MetadataBootstrap.isNewUniverse = isNewUniverse;
     }
+
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2f934e31/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
index ea2d715..8cd7053 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
@@ -42,6 +42,7 @@ import org.apache.asterix.common.metadata.IDataset;
 import org.apache.asterix.common.transactions.IRecoveryManager.ResourceType;
 import org.apache.asterix.common.utils.JobUtils;
 import org.apache.asterix.common.utils.JobUtils.ProgressState;
+import org.apache.asterix.common.utils.StoragePathUtil;
 import org.apache.asterix.external.feed.management.FeedConnectionId;
 import org.apache.asterix.external.indexing.IndexingConstants;
 import org.apache.asterix.formats.nontagged.BinaryHashFunctionFactoryProvider;
@@ -818,6 +819,10 @@ public class Dataset implements IMetadataEntity<Dataset>, 
IDataset {
     protected int[] getDatasetPartitions(MetadataProvider metadataProvider) 
throws AlgebricksException {
         FileSplit[] splitsForDataset =
                 
metadataProvider.splitsForIndex(metadataProvider.getMetadataTxnContext(), this, 
getDatasetName());
-        return IntStream.range(0, splitsForDataset.length).toArray();
+        int[] partitions = new int[splitsForDataset.length];
+        for (int i = 0; i < partitions.length; i++) {
+            partitions[i] = 
StoragePathUtil.getPartitionNumFromRelativePath(splitsForDataset[i].getPath());
+        }
+        return partitions;
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2f934e31/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/JobEventListenerFactory.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/JobEventListenerFactory.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/JobEventListenerFactory.java
index 7913d48..6faffc7 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/JobEventListenerFactory.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/JobEventListenerFactory.java
@@ -18,13 +18,12 @@
  */
 package org.apache.asterix.runtime.job.listener;
 
-import static 
org.apache.asterix.common.transactions.ITransactionManager.AtomicityLevel;
-
 import org.apache.asterix.common.api.IJobEventListenerFactory;
 import org.apache.asterix.common.api.INcApplicationContext;
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.transactions.ITransactionContext;
 import org.apache.asterix.common.transactions.ITransactionManager;
+import 
org.apache.asterix.common.transactions.ITransactionManager.AtomicityLevel;
 import org.apache.asterix.common.transactions.TransactionOptions;
 import org.apache.asterix.common.transactions.TxnId;
 import org.apache.hyracks.api.context.IHyracksJobletContext;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2f934e31/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
index d057c50..10c6b8f 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
@@ -73,7 +73,7 @@ public class PrimaryIndexModificationOperationCallbackFactory 
extends AbstractOp
             IModificationOperationCallback modCallback = new 
PrimaryIndexModificationOperationCallback(
                     new DatasetId(datasetId), primaryKeyFields, txnCtx, 
txnSubsystem.getLockManager(), txnSubsystem,
                     resource.getId(), aResource.getPartition(), resourceType, 
indexOp, operatorNodePushable);
-            txnCtx.register(resource.getId(), index, modCallback, true);
+            txnCtx.register(resource.getId(), aResource.getPartition(), index, 
modCallback, true);
             return modCallback;
         } catch (ACIDException e) {
             throw HyracksDataException.create(e);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2f934e31/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexOperationTrackerFactory.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexOperationTrackerFactory.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexOperationTrackerFactory.java
index f40140a..eef1cb0 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexOperationTrackerFactory.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexOperationTrackerFactory.java
@@ -21,9 +21,12 @@ package 
org.apache.asterix.transaction.management.opcallbacks;
 
 import org.apache.asterix.common.api.IDatasetLifecycleManager;
 import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.utils.StoragePathUtil;
 import org.apache.hyracks.api.application.INCServiceContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
 import 
org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerFactory;
+import org.apache.hyracks.storage.common.IResource;
 
 public class PrimaryIndexOperationTrackerFactory implements 
ILSMOperationTrackerFactory {
 
@@ -36,10 +39,12 @@ public class PrimaryIndexOperationTrackerFactory implements 
ILSMOperationTracker
     }
 
     @Override
-    public ILSMOperationTracker getOperationTracker(INCServiceContext ctx) {
+    public ILSMOperationTracker getOperationTracker(INCServiceContext ctx, 
IResource resource)
+            throws HyracksDataException {
         IDatasetLifecycleManager dslcManager =
                 ((INcApplicationContext) 
ctx.getApplicationContext()).getDatasetLifecycleManager();
-        return dslcManager.getOperationTracker(datasetId);
+        int partition = 
StoragePathUtil.getPartitionNumFromRelativePath(resource.getPath());
+        return dslcManager.getOperationTracker(datasetId, partition);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2f934e31/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
index 26e1b22..a927da0 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
@@ -69,7 +69,7 @@ public class 
SecondaryIndexModificationOperationCallbackFactory extends Abstract
             IModificationOperationCallback modCallback = new 
SecondaryIndexModificationOperationCallback(
                     new DatasetId(datasetId), primaryKeyFields, txnCtx, 
txnSubsystem.getLockManager(), txnSubsystem,
                     resource.getId(), aResource.getPartition(), resourceType, 
indexOp);
-            txnCtx.register(resource.getId(), index, modCallback, false);
+            txnCtx.register(resource.getId(), aResource.getPartition(), index, 
modCallback, false);
             return modCallback;
         } catch (ACIDException e) {
             throw HyracksDataException.create(e);

Reply via email to