[ASTERIXDB-2339] Add a new inverted index merge cursor

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- Implement a new inverted index merge cursor which uses two priority queues,
one for tokens and one for keys. For each token, we merge their inverted
lists using the key queue. After that, we fetch the next token and merge
their lists again. This reduces unnecessary token comparision a lot.
- Along this change, created a fast path for inverted index bulkloader.
Based on how the token+key pair is created, there is no need to copy
bulkloaded tuple and check whether it's a new token during merge.

Change-Id: I57d039cd7e08033884529a204bff9acffd96d9bb
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2519
Tested-by: Jenkins <[email protected]>
Contrib: Jenkins <[email protected]>
Integration-Tests: Jenkins <[email protected]>
Reviewed-by: Ian Maxon <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/3036c980
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/3036c980
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/3036c980

Branch: refs/heads/master
Commit: 3036c98099f6c912af29bced9b2bf71bdfa9026e
Parents: 7dc566b
Author: luochen01 <[email protected]>
Authored: Sat Mar 24 20:04:02 2018 -0700
Committer: Luo Chen <[email protected]>
Committed: Thu Mar 29 19:18:01 2018 -0700

----------------------------------------------------------------------
 ...IndexCreationTupleProcessorNodePushable.java |  10 +-
 .../LSMSecondaryUpsertOperatorNodePushable.java |  30 +-
 .../dataflow/common/utils/TupleUtils.java       |  26 ++
 .../am/lsm/btree/impls/ExternalBTree.java       |   4 +-
 .../lsm/btree/impls/ExternalBTreeWithBuddy.java |  10 +-
 .../storage/am/lsm/btree/impls/LSMBTree.java    |   8 +-
 ...AbstractLSMWithBloomFilterDiskComponent.java |   9 +-
 .../am/lsm/common/api/ILSMDiskComponent.java    |   8 +-
 .../common/impls/AbstractLSMDiskComponent.java  |  21 +-
 .../am/lsm/common/impls/EmptyComponent.java     |   7 +-
 .../impls/LSMIndexDiskComponentBulkLoader.java  |   7 +-
 .../lsm/common/impls/LSMIndexSearchCursor.java  |   4 +-
 .../invertedindex/impls/LSMInvertedIndex.java   |  14 +-
 .../impls/LSMInvertedIndexDiskComponent.java    |  15 +-
 .../impls/LSMInvertedIndexMergeCursor.java      | 369 +++++++++++++++++++
 .../ondisk/OnDiskInvertedIndex.java             | 194 ++++++----
 .../OnDiskInvertedIndexRangeSearchCursor.java   |  16 +-
 .../invertedindex/tuples/TokenKeyPairTuple.java |  95 +++++
 .../storage/am/lsm/rtree/impls/LSMRTree.java    |  12 +-
 .../impls/LSMRTreeWithAntiMatterTuples.java     |   6 +-
 .../common/AbstractInvertedIndexTest.java       |   7 +
 .../util/LSMInvertedIndexTestUtils.java         |  26 +-
 22 files changed, 733 insertions(+), 165 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/3036c980/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexCreationTupleProcessorNodePushable.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexCreationTupleProcessorNodePushable.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexCreationTupleProcessorNodePushable.java
index 9376d1b..ac7fc89 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexCreationTupleProcessorNodePushable.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexCreationTupleProcessorNodePushable.java
@@ -276,7 +276,7 @@ public class 
LSMSecondaryIndexCreationTupleProcessorNodePushable extends Abstrac
 
     private boolean equalPrimaryKeys(ITupleReference tuple1, ITupleReference 
tuple2) {
         for (int i = numTagFields + numSecondaryKeys; i < numTagFields + 
numPrimaryKeys + numSecondaryKeys; i++) {
-            if (!equalField(tuple1, tuple2, i)) {
+            if (!TupleUtils.equalFields(tuple1, tuple2, i)) {
                 return false;
             }
         }
@@ -285,16 +285,10 @@ public class 
LSMSecondaryIndexCreationTupleProcessorNodePushable extends Abstrac
 
     private boolean equalSecondaryKeys(ITupleReference tuple1, ITupleReference 
tuple2) {
         for (int i = numTagFields; i < numTagFields + numSecondaryKeys; i++) {
-            if (!equalField(tuple1, tuple2, i)) {
+            if (!TupleUtils.equalFields(tuple1, tuple2, i)) {
                 return false;
             }
         }
         return true;
     }
-
-    private boolean equalField(ITupleReference tuple1, ITupleReference tuple2, 
int fIdx) {
-        return 
LSMSecondaryUpsertOperatorNodePushable.equals(tuple1.getFieldData(fIdx), 
tuple1.getFieldStart(fIdx),
-                tuple1.getFieldLength(fIdx), tuple2.getFieldData(fIdx), 
tuple2.getFieldStart(fIdx),
-                tuple2.getFieldLength(fIdx));
-    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/3036c980/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorNodePushable.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorNodePushable.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorNodePushable.java
index a22e5e7..b928131 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorNodePushable.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorNodePushable.java
@@ -28,6 +28,7 @@ import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
+import org.apache.hyracks.dataflow.common.utils.TupleUtils;
 import 
org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
 import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
 import 
org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
@@ -55,7 +56,7 @@ import 
org.apache.hyracks.storage.am.lsm.common.dataflow.LSMIndexInsertUpdateDel
 public class LSMSecondaryUpsertOperatorNodePushable extends 
LSMIndexInsertUpdateDeleteOperatorNodePushable {
 
     private final PermutingFrameTupleReference prevValueTuple = new 
PermutingFrameTupleReference();
-    private int numberOfFields;
+    private final int numberOfFields;
     private AbstractIndexModificationOperationCallback abstractModCallback;
 
     public LSMSecondaryUpsertOperatorNodePushable(IHyracksTaskContext ctx, int 
partition,
@@ -74,31 +75,6 @@ public class LSMSecondaryUpsertOperatorNodePushable extends 
LSMIndexInsertUpdate
         abstractModCallback = (AbstractIndexModificationOperationCallback) 
modCallback;
     }
 
-    public static boolean equals(byte[] a, int aOffset, int aLength, byte[] b, 
int bOffset, int bLength) {
-        if (aLength != bLength) {
-            return false;
-        }
-        for (int i = 0; i < aLength; i++) {
-            if (a[aOffset + i] != b[bOffset + i]) {
-                return false;
-            }
-        }
-        return true;
-    }
-
-    public static boolean equalTuples(PermutingFrameTupleReference t1, 
PermutingFrameTupleReference t2, int numOfFields)
-            throws HyracksDataException {
-        byte[] t1Data = t1.getFieldData(0);
-        byte[] t2Data = t2.getFieldData(0);
-        for (int i = 0; i < numOfFields; i++) {
-            if (!equals(t1Data, t1.getFieldStart(i), t1.getFieldLength(i), 
t2Data, t2.getFieldStart(i),
-                    t2.getFieldLength(i))) {
-                return false;
-            }
-        }
-        return true;
-    }
-
     @Override
     public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
         accessor.reset(buffer);
@@ -117,7 +93,7 @@ public class LSMSecondaryUpsertOperatorNodePushable extends 
LSMIndexInsertUpdate
                 }
                 // At least, one is not null
                 // If they are equal, then we skip
-                if (equalTuples(tuple, prevValueTuple, numberOfFields)) {
+                if (TupleUtils.equalTuples(tuple, prevValueTuple, 
numberOfFields)) {
                     continue;
                 }
                 if (!isOldValueMissing) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/3036c980/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/utils/TupleUtils.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/utils/TupleUtils.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/utils/TupleUtils.java
index 08ed922..49b5309 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/utils/TupleUtils.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/utils/TupleUtils.java
@@ -164,4 +164,30 @@ public class TupleUtils {
             tupleBuilder.addField(tuple.getFieldData(i), 
tuple.getFieldStart(i), tuple.getFieldLength(i));
         }
     }
+
+    public static boolean equalTuples(ITupleReference tuple1, ITupleReference 
tuple2, int numCmpFields) {
+        for (int i = 0; i < numCmpFields; i++) {
+            if (!equalFields(tuple1, tuple2, i)) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    public static boolean equalFields(ITupleReference tuple1, ITupleReference 
tuple2, int fIdx) {
+        return equalFields(tuple1.getFieldData(fIdx), 
tuple1.getFieldStart(fIdx), tuple1.getFieldLength(fIdx),
+                tuple2.getFieldData(fIdx), tuple2.getFieldStart(fIdx), 
tuple2.getFieldLength(fIdx));
+    }
+
+    public static boolean equalFields(byte[] a, int aOffset, int aLength, 
byte[] b, int bOffset, int bLength) {
+        if (aLength != bLength) {
+            return false;
+        }
+        for (int i = 0; i < aLength; i++) {
+            if (a[aOffset + i] != b[bOffset + i]) {
+                return false;
+            }
+        }
+        return true;
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/3036c980/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTree.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTree.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTree.java
index c0f7571..4c2fc3b 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTree.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTree.java
@@ -431,8 +431,8 @@ public class ExternalBTree extends LSMBTree implements 
ITwoPCIndex {
                 component = createBulkLoadTarget();
             }
 
-            componentBulkLoader =
-                    component.createBulkLoader(fillFactor, verifyInput, 
numElementsHint, false, true, true);
+            componentBulkLoader = 
component.createBulkLoader(LSMIOOperationType.LOAD, fillFactor, verifyInput,
+                    numElementsHint, false, true, true);
         }
 
         // It is expected that the mode was set to insert operation before

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/3036c980/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java
index 1ba55f7..62fd850 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java
@@ -331,7 +331,8 @@ public class ExternalBTreeWithBuddy extends 
AbstractLSMIndex implements ITreeInd
                 numElements += ((AbstractLSMWithBloomFilterDiskComponent) 
mergeOp.getMergingComponents().get(i))
                         .getBloomFilter().getNumElements();
             }
-            componentBulkLoader = mergedComponent.createBulkLoader(1.0f, 
false, numElements, false, false, false);
+            componentBulkLoader = 
mergedComponent.createBulkLoader(LSMIOOperationType.MERGE, 1.0f, false, 
numElements,
+                    false, false, false);
             try {
                 while (buddyBtreeCursor.hasNext()) {
                     buddyBtreeCursor.next();
@@ -342,7 +343,8 @@ public class ExternalBTreeWithBuddy extends 
AbstractLSMIndex implements ITreeInd
                 buddyBtreeCursor.close();
             }
         } else {
-            componentBulkLoader = mergedComponent.createBulkLoader(1.0f, 
false, 0L, false, false, false);
+            componentBulkLoader =
+                    mergedComponent.createBulkLoader(LSMIOOperationType.MERGE, 
1.0f, false, 0L, false, false, false);
         }
 
         try {
@@ -510,8 +512,8 @@ public class ExternalBTreeWithBuddy extends 
AbstractLSMIndex implements ITreeInd
                 component = createBulkLoadTarget();
             }
 
-            componentBulkLoader =
-                    component.createBulkLoader(fillFactor, verifyInput, 
numElementsHint, false, true, false);
+            componentBulkLoader = 
component.createBulkLoader(LSMIOOperationType.LOAD, fillFactor, verifyInput,
+                    numElementsHint, false, true, false);
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/3036c980/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
index 41a11e6..f88947e 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
@@ -48,6 +48,7 @@ import 
org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
 import 
org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentBulkLoader;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentFactory;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
+import 
org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
 import 
org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
@@ -277,7 +278,8 @@ public class LSMBTree extends AbstractLSMIndex implements 
ITreeIndex {
             }
             component = createDiskComponent(componentFactory, 
flushOp.getTarget(), null, flushOp.getBloomFilterTarget(),
                     true);
-            componentBulkLoader = component.createBulkLoader(1.0f, false, 
numElements, false, false, false);
+            componentBulkLoader =
+                    component.createBulkLoader(LSMIOOperationType.MERGE, 1.0f, 
false, numElements, false, false, false);
             IIndexCursor scanCursor = accessor.createSearchCursor(false);
             accessor.search(scanCursor, nullPred);
             try {
@@ -336,8 +338,8 @@ public class LSMBTree extends AbstractLSMIndex implements 
ITreeIndex {
                     long numElements = getNumberOfElements(mergedComponents);
                     mergedComponent = createDiskComponent(componentFactory, 
mergeOp.getTarget(), null,
                             mergeOp.getBloomFilterTarget(), true);
-                    componentBulkLoader =
-                            mergedComponent.createBulkLoader(1.0f, false, 
numElements, false, false, false);
+                    componentBulkLoader = 
mergedComponent.createBulkLoader(LSMIOOperationType.MERGE, 1.0f, false,
+                            numElements, false, false, false);
                     while (cursor.hasNext()) {
                         cursor.next();
                         ITupleReference frameTuple = cursor.getTuple();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/3036c980/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/AbstractLSMWithBloomFilterDiskComponent.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/AbstractLSMWithBloomFilterDiskComponent.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/AbstractLSMWithBloomFilterDiskComponent.java
index 107190d..c98fa69 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/AbstractLSMWithBloomFilterDiskComponent.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/AbstractLSMWithBloomFilterDiskComponent.java
@@ -23,6 +23,7 @@ import 
org.apache.hyracks.storage.am.bloomfilter.impls.BloomCalculations;
 import org.apache.hyracks.storage.am.bloomfilter.impls.BloomFilter;
 import 
org.apache.hyracks.storage.am.bloomfilter.impls.BloomFilterSpecification;
 import org.apache.hyracks.storage.am.common.api.IMetadataPageManager;
+import 
org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType;
 import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMDiskComponent;
 import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
 import org.apache.hyracks.storage.am.lsm.common.impls.BloomFilterBulkLoader;
@@ -92,10 +93,10 @@ public abstract class 
AbstractLSMWithBloomFilterDiskComponent extends AbstractLS
     }
 
     @Override
-    public ChainedLSMDiskComponentBulkLoader createBulkLoader(float 
fillFactor, boolean verifyInput,
-            long numElementsHint, boolean checkIfEmptyIndex, boolean 
withFilter, boolean cleanupEmptyComponent)
-            throws HyracksDataException {
-        ChainedLSMDiskComponentBulkLoader chainedBulkLoader = 
super.createBulkLoader(fillFactor, verifyInput,
+    public ChainedLSMDiskComponentBulkLoader 
createBulkLoader(LSMIOOperationType opType, float fillFactor,
+            boolean verifyInput, long numElementsHint, boolean 
checkIfEmptyIndex, boolean withFilter,
+            boolean cleanupEmptyComponent) throws HyracksDataException {
+        ChainedLSMDiskComponentBulkLoader chainedBulkLoader = 
super.createBulkLoader(opType, fillFactor, verifyInput,
                 numElementsHint, checkIfEmptyIndex, withFilter, 
cleanupEmptyComponent);
         if (numElementsHint > 0) {
             
chainedBulkLoader.addBulkLoader(createBloomFilterBulkLoader(numElementsHint));

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/3036c980/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponent.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponent.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponent.java
index bd2bb45..1a0305f 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponent.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponent.java
@@ -22,6 +22,7 @@ import java.util.Set;
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.common.api.ITreeIndex;
+import 
org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType;
 import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
 import 
org.apache.hyracks.storage.am.lsm.common.impls.ChainedLSMDiskComponentBulkLoader;
 import org.apache.hyracks.storage.am.lsm.common.impls.DiskComponentMetadata;
@@ -50,6 +51,7 @@ public interface ILSMDiskComponent extends ILSMComponent {
     /**
      * @return LsmIndex of the component
      */
+    @Override
     AbstractLSMIndex getLsmIndex();
 
     /**
@@ -142,6 +144,7 @@ public interface ILSMDiskComponent extends ILSMComponent {
      * Creates a bulkloader pipeline which includes all chained operations, 
bulkloading individual elements of the
      * component: indexes, LSM filters, Bloom filters, buddy indexes, etc.
      *
+     * @param opType
      * @param fillFactor
      * @param verifyInput
      * @param numElementsHint
@@ -151,6 +154,7 @@ public interface ILSMDiskComponent extends ILSMComponent {
      * @return
      * @throws HyracksDataException
      */
-    ChainedLSMDiskComponentBulkLoader createBulkLoader(float fillFactor, 
boolean verifyInput, long numElementsHint,
-            boolean checkIfEmptyIndex, boolean withFilter, boolean 
cleanupEmptyComponent) throws HyracksDataException;
+    ChainedLSMDiskComponentBulkLoader createBulkLoader(LSMIOOperationType 
opType, float fillFactor, boolean verifyInput,
+            long numElementsHint, boolean checkIfEmptyIndex, boolean 
withFilter, boolean cleanupEmptyComponent)
+            throws HyracksDataException;
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/3036c980/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java
index 26c7b0d..633de6b 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java
@@ -23,6 +23,7 @@ import 
org.apache.hyracks.storage.am.common.api.IMetadataPageManager;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilter;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+import 
org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType;
 import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
 import org.apache.hyracks.storage.am.lsm.common.util.ComponentUtils;
 import org.apache.hyracks.storage.am.lsm.common.util.LSMComponentIdUtils;
@@ -201,17 +202,27 @@ public abstract class AbstractLSMDiskComponent extends 
AbstractLSMComponent impl
                 getIndex().createBulkLoader(fillFactor, verifyInput, 
numElementsHint, checkIfEmptyIndex));
     }
 
+    /**
+     * Allows sub-class extend this method to use specialized bulkloader for 
merge
+     */
+    protected IChainedComponentBulkLoader createMergeIndexBulkLoader(float 
fillFactor, boolean verifyInput,
+            long numElementsHint, boolean checkIfEmptyIndex) throws 
HyracksDataException {
+        return this.createIndexBulkLoader(fillFactor, verifyInput, 
numElementsHint, checkIfEmptyIndex);
+    }
+
     @Override
-    public ChainedLSMDiskComponentBulkLoader createBulkLoader(float 
fillFactor, boolean verifyInput,
-            long numElementsHint, boolean checkIfEmptyIndex, boolean 
withFilter, boolean cleanupEmptyComponent)
-            throws HyracksDataException {
+    public ChainedLSMDiskComponentBulkLoader 
createBulkLoader(LSMIOOperationType opType, float fillFactor,
+            boolean verifyInput, long numElementsHint, boolean 
checkIfEmptyIndex, boolean withFilter,
+            boolean cleanupEmptyComponent) throws HyracksDataException {
         ChainedLSMDiskComponentBulkLoader chainedBulkLoader =
                 new ChainedLSMDiskComponentBulkLoader(this, 
cleanupEmptyComponent);
         if (withFilter && getLsmIndex().getFilterFields() != null) {
             chainedBulkLoader.addBulkLoader(createFilterBulkLoader());
         }
-        chainedBulkLoader
-                .addBulkLoader(createIndexBulkLoader(fillFactor, verifyInput, 
numElementsHint, checkIfEmptyIndex));
+        IChainedComponentBulkLoader indexBulkloader = opType == 
LSMIOOperationType.MERGE
+                ? createMergeIndexBulkLoader(fillFactor, verifyInput, 
numElementsHint, checkIfEmptyIndex)
+                : createIndexBulkLoader(fillFactor, verifyInput, 
numElementsHint, checkIfEmptyIndex);
+        chainedBulkLoader.addBulkLoader(indexBulkloader);
         return chainedBulkLoader;
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/3036c980/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/EmptyComponent.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/EmptyComponent.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/EmptyComponent.java
index e3ca9f1..466ef24 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/EmptyComponent.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/EmptyComponent.java
@@ -27,6 +27,7 @@ import org.apache.hyracks.storage.am.common.api.ITreeIndex;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilter;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+import 
org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType;
 import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
 import org.apache.hyracks.storage.common.IIndex;
 
@@ -144,9 +145,9 @@ public class EmptyComponent implements ILSMDiskComponent {
     }
 
     @Override
-    public ChainedLSMDiskComponentBulkLoader createBulkLoader(float 
fillFactor, boolean verifyInput,
-            long numElementsHint, boolean checkIfEmptyIndex, boolean 
withFilter, boolean cleanupEmptyComponent)
-            throws HyracksDataException {
+    public ChainedLSMDiskComponentBulkLoader 
createBulkLoader(LSMIOOperationType opType, float fillFactor,
+            boolean verifyInput, long numElementsHint, boolean 
checkIfEmptyIndex, boolean withFilter,
+            boolean cleanupEmptyComponent) throws HyracksDataException {
         return null;
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/3036c980/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexDiskComponentBulkLoader.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexDiskComponentBulkLoader.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexDiskComponentBulkLoader.java
index 5e105a4..2ef6169 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexDiskComponentBulkLoader.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexDiskComponentBulkLoader.java
@@ -22,13 +22,14 @@ import 
org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
 import 
org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentBulkLoader;
+import 
org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
 import org.apache.hyracks.storage.common.IIndexBulkLoader;
 
 public class LSMIndexDiskComponentBulkLoader implements IIndexBulkLoader {
     private final AbstractLSMIndex lsmIndex;
     private final ILSMDiskComponentBulkLoader componentBulkLoader;
-    private ILSMIndexOperationContext opCtx;
+    private final ILSMIndexOperationContext opCtx;
 
     public LSMIndexDiskComponentBulkLoader(AbstractLSMIndex lsmIndex, 
ILSMIndexOperationContext opCtx, float fillFactor,
             boolean verifyInput, long numElementsHint) throws 
HyracksDataException {
@@ -37,8 +38,8 @@ public class LSMIndexDiskComponentBulkLoader implements 
IIndexBulkLoader {
         // Note that by using a flush target file name, we state that the
         // new bulk loaded component is "newer" than any other merged 
component.
         opCtx.setNewComponent(lsmIndex.createBulkLoadTarget());
-        this.componentBulkLoader =
-                opCtx.getNewComponent().createBulkLoader(fillFactor, 
verifyInput, numElementsHint, false, true, true);
+        this.componentBulkLoader = 
opCtx.getNewComponent().createBulkLoader(LSMIOOperationType.LOAD, fillFactor,
+                verifyInput, numElementsHint, false, true, true);
     }
 
     public ILSMDiskComponent getComponent() {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/3036c980/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexSearchCursor.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexSearchCursor.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexSearchCursor.java
index fb1fa63..12caec4 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexSearchCursor.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexSearchCursor.java
@@ -246,7 +246,7 @@ public abstract class LSMIndexSearchCursor extends 
EnforcedIndexCursor implement
         }
     }
 
-    public class PriorityQueueElement {
+    public static class PriorityQueueElement {
         private ITupleReference tuple;
         private final int cursorIndex;
 
@@ -268,7 +268,7 @@ public abstract class LSMIndexSearchCursor extends 
EnforcedIndexCursor implement
         }
     }
 
-    public class PriorityQueueComparator implements 
Comparator<PriorityQueueElement> {
+    public static class PriorityQueueComparator implements 
Comparator<PriorityQueueElement> {
 
         protected MultiComparator cmp;
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/3036c980/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
index e46c24a..0fae1ac 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
@@ -45,6 +45,7 @@ import 
org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
 import 
org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentBulkLoader;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentFactory;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
+import 
org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
 import 
org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
@@ -59,7 +60,6 @@ import 
org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
 import 
org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexOperationContext;
 import 
org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentFileReferences;
 import 
org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentFilterManager;
-import org.apache.hyracks.storage.am.lsm.common.impls.LSMIndexSearchCursor;
 import org.apache.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndex;
 import 
org.apache.hyracks.storage.am.lsm.invertedindex.inmemory.InMemoryInvertedIndex;
 import 
org.apache.hyracks.storage.am.lsm.invertedindex.inmemory.InMemoryInvertedIndexAccessor;
@@ -296,7 +296,7 @@ public class LSMInvertedIndex extends AbstractLSMIndex 
implements IInvertedIndex
         }
 
         ILSMDiskComponentBulkLoader componentBulkLoader =
-                component.createBulkLoader(1.0f, false, numBTreeTuples, false, 
false, false);
+                component.createBulkLoader(LSMIOOperationType.FLUSH, 1.0f, 
false, numBTreeTuples, false, false, false);
 
         // Create a scan cursor on the deleted keys BTree underlying the 
in-memory inverted index.
         IIndexCursor deletedKeysScanCursor = 
deletedKeysBTreeAccessor.createSearchCursor(false);
@@ -349,7 +349,7 @@ public class LSMInvertedIndex extends AbstractLSMIndex 
implements IInvertedIndex
         LSMInvertedIndexMergeOperation mergeOp = 
(LSMInvertedIndexMergeOperation) operation;
         RangePredicate mergePred = new RangePredicate(null, null, true, true, 
null, null);
         IIndexCursor cursor = mergeOp.getCursor();
-        ILSMIndexOperationContext opCtx = ((LSMIndexSearchCursor) 
cursor).getOpCtx();
+        ILSMIndexOperationContext opCtx = ((LSMInvertedIndexMergeCursor) 
cursor).getOpCtx();
         // Scan diskInvertedIndexes ignoring the memoryInvertedIndex.
         // Create an inverted index instance.
         ILSMDiskComponent component = createDiskComponent(componentFactory, 
mergeOp.getTarget(),
@@ -368,13 +368,15 @@ public class LSMInvertedIndex extends AbstractLSMIndex 
implements IInvertedIndex
                     numElements += ((LSMInvertedIndexDiskComponent) 
mergeOp.getMergingComponents().get(i))
                             .getBloomFilter().getNumElements();
                 }
-                componentBulkLoader = component.createBulkLoader(1.0f, false, 
numElements, false, false, false);
+                componentBulkLoader = 
component.createBulkLoader(LSMIOOperationType.MERGE, 1.0f, false, numElements,
+                        false, false, false);
                 loadDeleteTuples(opCtx, btreeCursor, mergePred, 
componentBulkLoader);
             } finally {
                 btreeCursor.destroy();
             }
         } else {
-            componentBulkLoader = component.createBulkLoader(1.0f, false, 0L, 
false, false, false);
+            componentBulkLoader =
+                    component.createBulkLoader(LSMIOOperationType.MERGE, 1.0f, 
false, 0L, false, false, false);
         }
         search(opCtx, cursor, mergePred);
         try {
@@ -495,7 +497,7 @@ public class LSMInvertedIndex extends AbstractLSMIndex 
implements IInvertedIndex
     protected ILSMIOOperation 
createMergeOperation(AbstractLSMIndexOperationContext opCtx,
             LSMComponentFileReferences mergeFileRefs, ILSMIOOperationCallback 
callback) throws HyracksDataException {
         ILSMIndexAccessor accessor = new 
LSMInvertedIndexAccessor(getHarness(), opCtx);
-        IIndexCursor cursor = new LSMInvertedIndexRangeSearchCursor(opCtx);
+        IIndexCursor cursor = new LSMInvertedIndexMergeCursor(opCtx);
         return new LSMInvertedIndexMergeOperation(accessor, cursor, 
mergeFileRefs.getInsertIndexFileReference(),
                 mergeFileRefs.getDeleteIndexFileReference(), 
mergeFileRefs.getBloomFilterFileReference(), callback,
                 fileManager.getBaseDir().getAbsolutePath());

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/3036c980/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDiskComponent.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDiskComponent.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDiskComponent.java
index 279a518..b030e83 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDiskComponent.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDiskComponent.java
@@ -25,11 +25,14 @@ import 
org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.bloomfilter.impls.BloomFilter;
 import org.apache.hyracks.storage.am.btree.impls.BTree;
 import org.apache.hyracks.storage.am.common.api.IMetadataPageManager;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilter;
 import 
org.apache.hyracks.storage.am.lsm.common.api.AbstractLSMWithBuddyDiskComponent;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilter;
 import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
+import 
org.apache.hyracks.storage.am.lsm.common.impls.IChainedComponentBulkLoader;
+import org.apache.hyracks.storage.am.lsm.common.impls.IndexWithBuddyBulkLoader;
 import org.apache.hyracks.storage.am.lsm.common.util.ComponentUtils;
 import 
org.apache.hyracks.storage.am.lsm.invertedindex.ondisk.OnDiskInvertedIndex;
+import org.apache.hyracks.storage.common.IIndexBulkLoader;
 import org.apache.hyracks.storage.common.buffercache.IBufferCache;
 
 public class LSMInvertedIndexDiskComponent extends 
AbstractLSMWithBuddyDiskComponent {
@@ -109,4 +112,14 @@ public class LSMInvertedIndexDiskComponent extends 
AbstractLSMWithBuddyDiskCompo
         // Flush deleted keys BTree.
         ComponentUtils.markAsValid(getBuddyIndex(), persist);
     }
+
+    @Override
+    protected IChainedComponentBulkLoader createMergeIndexBulkLoader(float 
fillFactor, boolean verifyInput,
+            long numElementsHint, boolean checkIfEmptyIndex) throws 
HyracksDataException {
+        IIndexBulkLoader indexBulkLoader =
+                invIndex.createMergeBulkLoader(fillFactor, verifyInput, 
numElementsHint, checkIfEmptyIndex);
+        IIndexBulkLoader buddyBulkLoader =
+                getBuddyIndex().createBulkLoader(fillFactor, verifyInput, 
numElementsHint, checkIfEmptyIndex);
+        return new IndexWithBuddyBulkLoader(indexBulkLoader, buddyBulkLoader);
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/3036c980/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexMergeCursor.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexMergeCursor.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexMergeCursor.java
new file mode 100644
index 0000000..c80455d
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexMergeCursor.java
@@ -0,0 +1,369 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.storage.am.lsm.invertedindex.impls;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.PriorityQueue;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import org.apache.hyracks.dataflow.common.utils.TupleUtils;
+import org.apache.hyracks.storage.am.bloomfilter.impls.BloomFilter;
+import org.apache.hyracks.storage.am.btree.impls.RangePredicate;
+import org.apache.hyracks.storage.am.common.api.ILSMIndexCursor;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
+import 
org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent.LSMComponentType;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMHarness;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
+import 
org.apache.hyracks.storage.am.lsm.common.impls.LSMIndexSearchCursor.PriorityQueueComparator;
+import 
org.apache.hyracks.storage.am.lsm.common.impls.LSMIndexSearchCursor.PriorityQueueElement;
+import org.apache.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndex;
+import 
org.apache.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndexAccessor;
+import 
org.apache.hyracks.storage.am.lsm.invertedindex.ondisk.OnDiskInvertedIndexRangeSearchCursor;
+import 
org.apache.hyracks.storage.am.lsm.invertedindex.tuples.TokenKeyPairTuple;
+import org.apache.hyracks.storage.common.EnforcedIndexCursor;
+import org.apache.hyracks.storage.common.ICursorInitialState;
+import org.apache.hyracks.storage.common.IIndexAccessor;
+import org.apache.hyracks.storage.common.IIndexCursor;
+import org.apache.hyracks.storage.common.ISearchPredicate;
+import org.apache.hyracks.storage.common.MultiComparator;
+
+/**
+ * This cursor is specially designed and optimized for merging inverted index.
+ * For simplicity, it assumes all components are disk components, and the 
cursor is not reused.
+ *
+ */
+public class LSMInvertedIndexMergeCursor extends EnforcedIndexCursor 
implements ILSMIndexCursor {
+    protected final LSMInvertedIndexOpContext opCtx;
+    protected PriorityQueueElement outputTokenElement;
+    protected OnDiskInvertedIndexRangeSearchCursor[] rangeCursors;
+    protected PriorityQueueElement[] tokenQueueElements;
+    protected PriorityQueue<PriorityQueueElement> tokenQueue;
+    protected PriorityQueueComparator tokenQueueCmp;
+
+    protected PriorityQueueElement outputKeyElement;
+    protected PriorityQueueElement[] keyQueueElements;
+    protected PriorityQueue<PriorityQueueElement> keyQueue;
+    protected PriorityQueueComparator keyQueueCmp;
+
+    protected boolean needPushElementIntoKeyQueue;
+
+    protected ILSMHarness lsmHarness;
+
+    protected MultiComparator tokenCmp;
+    protected MultiComparator keyCmp;
+
+    protected List<ILSMComponent> operationalComponents;
+
+    // Assuming the cursor for all deleted-keys indexes are of the same type.
+    protected IIndexCursor[] deletedKeysBTreeCursors;
+    protected BloomFilter[] bloomFilters;
+    protected final long[] hashes = BloomFilter.createHashArray();
+    protected ArrayList<IIndexAccessor> deletedKeysBTreeAccessors;
+    protected RangePredicate deletedKeyBTreeSearchPred;
+
+    protected final TokenKeyPairTuple outputTuple;
+
+    public LSMInvertedIndexMergeCursor(ILSMIndexOperationContext opCtx) {
+        this.opCtx = (LSMInvertedIndexOpContext) opCtx;
+        outputTokenElement = null;
+        outputKeyElement = null;
+        needPushElementIntoKeyQueue = false;
+
+        IInvertedIndex invertedIndex = (IInvertedIndex) this.opCtx.getIndex();
+        this.outputTuple = new 
TokenKeyPairTuple(invertedIndex.getTokenTypeTraits().length,
+                invertedIndex.getInvListTypeTraits().length);
+
+        this.tokenCmp = 
MultiComparator.create(invertedIndex.getTokenCmpFactories());
+        this.keyCmp = 
MultiComparator.create(invertedIndex.getInvListCmpFactories());
+        this.tokenQueueCmp = new PriorityQueueComparator(tokenCmp);
+        this.keyQueueCmp = new PriorityQueueComparator(keyCmp);
+    }
+
+    public LSMInvertedIndexOpContext getOpCtx() {
+        return opCtx;
+    }
+
+    @Override
+    public void doOpen(ICursorInitialState initState, ISearchPredicate 
searchPred) throws HyracksDataException {
+        LSMInvertedIndexRangeSearchCursorInitialState lsmInitState =
+                (LSMInvertedIndexRangeSearchCursorInitialState) initState;
+        int numComponents = lsmInitState.getNumComponents();
+        rangeCursors = new OnDiskInvertedIndexRangeSearchCursor[numComponents];
+        for (int i = 0; i < numComponents; i++) {
+            IInvertedIndexAccessor invIndexAccessor = (IInvertedIndexAccessor) 
lsmInitState.getIndexAccessors().get(i);
+            rangeCursors[i] = (OnDiskInvertedIndexRangeSearchCursor) 
invIndexAccessor.createRangeSearchCursor();
+            invIndexAccessor.rangeSearch(rangeCursors[i], 
lsmInitState.getSearchPredicate());
+        }
+        lsmHarness = lsmInitState.getLSMHarness();
+        operationalComponents = lsmInitState.getOperationalComponents();
+        deletedKeysBTreeAccessors = 
lsmInitState.getDeletedKeysBTreeAccessors();
+        bloomFilters = new BloomFilter[deletedKeysBTreeAccessors.size()];
+        if (!deletedKeysBTreeAccessors.isEmpty()) {
+            deletedKeysBTreeCursors = new 
IIndexCursor[deletedKeysBTreeAccessors.size()];
+            for (int i = 0; i < operationalComponents.size(); i++) {
+                ILSMComponent component = operationalComponents.get(i);
+                deletedKeysBTreeCursors[i] = 
deletedKeysBTreeAccessors.get(i).createSearchCursor(false);
+                if (component.getType() == LSMComponentType.MEMORY) {
+                    // No need for a bloom filter for the in-memory BTree.
+                    bloomFilters[i] = null;
+                } else {
+                    bloomFilters[i] = ((LSMInvertedIndexDiskComponent) 
component).getBloomFilter();
+                }
+            }
+        }
+        deletedKeyBTreeSearchPred = new RangePredicate(null, null, true, true, 
keyCmp, keyCmp);
+        initPriorityQueues();
+    }
+
+    private void initPriorityQueues() throws HyracksDataException {
+        int pqInitSize = (rangeCursors.length > 0) ? rangeCursors.length : 1;
+        tokenQueue = new PriorityQueue<>(pqInitSize, tokenQueueCmp);
+        keyQueue = new PriorityQueue<>(pqInitSize, keyQueueCmp);
+        tokenQueueElements = new PriorityQueueElement[pqInitSize];
+        keyQueueElements = new PriorityQueueElement[pqInitSize];
+        for (int i = 0; i < pqInitSize; i++) {
+            tokenQueueElements[i] = new PriorityQueueElement(i);
+            keyQueueElements[i] = new PriorityQueueElement(i);
+        }
+        for (int i = 0; i < rangeCursors.length; i++) {
+            if (rangeCursors[i].hasNext()) {
+                rangeCursors[i].next();
+                tokenQueueElements[i].reset(rangeCursors[i].getTuple());
+                tokenQueue.offer(tokenQueueElements[i]);
+            } else {
+                rangeCursors[i].close();
+            }
+        }
+        searchNextToken();
+    }
+
+    private void searchNextToken() throws HyracksDataException {
+        if (tokenQueue.isEmpty()) {
+            return;
+        }
+        if (!keyQueue.isEmpty()) {
+            throw new IllegalStateException("Illegal call of initializing key 
queue");
+        }
+        outputTokenElement = tokenQueue.poll();
+        initPushIntoKeyQueue(outputTokenElement);
+        ITupleReference tokenTuple = getTokenTuple(outputTokenElement);
+        outputTuple.setTokenTuple(tokenTuple);
+        // pop all same tokens
+        while (!tokenQueue.isEmpty()) {
+            PriorityQueueElement tokenElement = tokenQueue.peek();
+            if (TupleUtils.equalTuples(tokenTuple, 
getTokenTuple(tokenElement), tokenCmp.getKeyFieldCount())) {
+                initPushIntoKeyQueue(tokenElement);
+                tokenQueue.poll();
+            } else {
+                break;
+            }
+        }
+    }
+
+    private ITupleReference getKeyTuple(PriorityQueueElement tokenElement) {
+        return ((TokenKeyPairTuple) tokenElement.getTuple()).getKeyTuple();
+    }
+
+    private ITupleReference getTokenTuple(PriorityQueueElement tokenElement) {
+        return ((TokenKeyPairTuple) tokenElement.getTuple()).getTokenTuple();
+    }
+
+    private void initPushIntoKeyQueue(PriorityQueueElement tokenElement) {
+        PriorityQueueElement keyElement = 
keyQueueElements[tokenElement.getCursorIndex()];
+        keyElement.reset(getKeyTuple(tokenElement));
+        keyQueue.add(keyElement);
+    }
+
+    private void pushIntoKeyQueueAndReplace(PriorityQueueElement keyElement) 
throws HyracksDataException {
+        int cursorIndex = keyElement.getCursorIndex();
+        if (rangeCursors[cursorIndex].hasNext()) {
+            rangeCursors[cursorIndex].next();
+            TokenKeyPairTuple tuple = (TokenKeyPairTuple) 
rangeCursors[cursorIndex].getTuple();
+            if (tuple.isNewToken()) {
+                // if this element is a new token, then the current inverted 
list has exuasted
+                PriorityQueueElement tokenElement = 
tokenQueueElements[cursorIndex];
+                tokenElement.reset(tuple);
+                tokenQueue.offer(tokenElement);
+            } else {
+                keyElement.reset(tuple.getKeyTuple());
+                keyQueue.offer(keyElement);
+            }
+        } else {
+            rangeCursors[cursorIndex].close();
+        }
+    }
+
+    @Override
+    public boolean doHasNext() throws HyracksDataException {
+        checkPriorityQueue();
+        return !keyQueue.isEmpty();
+    }
+
+    @Override
+    public void doNext() throws HyracksDataException {
+        outputKeyElement = keyQueue.poll();
+        outputTuple.setKeyTuple(outputKeyElement.getTuple());
+        needPushElementIntoKeyQueue = true;
+    }
+
+    @Override
+    public ITupleReference doGetTuple() {
+        return outputTuple;
+    }
+
+    protected void checkPriorityQueue() throws HyracksDataException {
+        checkKeyQueue();
+        if (keyQueue.isEmpty()) {
+            // if key queue is empty, we search the next token and check again
+            searchNextToken();
+            checkKeyQueue();
+        }
+    }
+
+    protected void checkKeyQueue() throws HyracksDataException {
+        while (!keyQueue.isEmpty() || needPushElementIntoKeyQueue) {
+            if (!keyQueue.isEmpty()) {
+                PriorityQueueElement checkElement = keyQueue.peek();
+                // If there is no previous tuple or the previous tuple can be 
ignored
+                if (outputKeyElement == null) {
+                    if (isDeleted(checkElement)) {
+                        // If the key has been deleted then pop it and set 
needPush to true.
+                        // We cannot push immediately because the tuple may be
+                        // modified if hasNext() is called
+                        outputKeyElement = checkElement;
+                        needPushElementIntoKeyQueue = true;
+                    } else {
+                        // we have found the next record
+                        return;
+                    }
+                } else {
+                    // Compare the previous tuple and the head tuple in the PQ
+                    if (keyCmp.compare(outputKeyElement.getTuple(), 
checkElement.getTuple()) == 0) {
+                        // If the previous tuple and the head tuple are
+                        // identical
+                        // then pop the head tuple and push the next tuple from
+                        // the tree of head tuple
+
+                        // the head element of PQ is useless now
+                        PriorityQueueElement e = keyQueue.poll();
+                        pushIntoKeyQueueAndReplace(e);
+                    } else {
+                        // If the previous tuple and the head tuple are 
different
+                        // the info of previous tuple is useless
+                        if (needPushElementIntoKeyQueue) {
+                            pushIntoKeyQueueAndReplace(outputKeyElement);
+                            needPushElementIntoKeyQueue = false;
+                        }
+                        outputKeyElement = null;
+                    }
+                }
+            } else {
+                // the priority queue is empty and needPush
+                // NOSONAR: outputKeyElement is not null when 
needPushElementIntoKeyQueue = true
+                pushIntoKeyQueueAndReplace(outputKeyElement);
+                needPushElementIntoKeyQueue = false;
+                outputKeyElement = null;
+            }
+        }
+    }
+
+    /**
+     * Check deleted-keys BTrees whether they contain the key in the 
checkElement's tuple.
+     */
+    protected boolean isDeleted(PriorityQueueElement keyElement) throws 
HyracksDataException {
+        ITupleReference keyTuple = keyElement.getTuple();
+        int end = keyElement.getCursorIndex();
+        for (int i = 0; i < end; i++) {
+            if (bloomFilters[i] != null && !bloomFilters[i].contains(keyTuple, 
hashes)) {
+                continue;
+            }
+            deletedKeysBTreeCursors[i].close();
+            
deletedKeysBTreeAccessors.get(i).search(deletedKeysBTreeCursors[i], 
deletedKeyBTreeSearchPred);
+            try {
+                if (deletedKeysBTreeCursors[i].hasNext()) {
+                    return true;
+                }
+            } finally {
+                deletedKeysBTreeCursors[i].close();
+            }
+        }
+        return false;
+    }
+
+    @Override
+    public void doClose() throws HyracksDataException {
+        outputTokenElement = null;
+        outputKeyElement = null;
+        needPushElementIntoKeyQueue = false;
+        try {
+            if (rangeCursors != null) {
+                for (int i = 0; i < rangeCursors.length; i++) {
+                    rangeCursors[i].close();
+                }
+            }
+        } finally {
+            if (lsmHarness != null) {
+                lsmHarness.endSearch(opCtx);
+            }
+        }
+    }
+
+    @Override
+    public void doDestroy() throws HyracksDataException {
+        try {
+            if (tokenQueue != null) {
+                tokenQueue.clear();
+            }
+            if (keyQueue != null) {
+                keyQueue.clear();
+            }
+            if (rangeCursors != null) {
+                for (int i = 0; i < rangeCursors.length; i++) {
+                    if (rangeCursors[i] != null) {
+                        rangeCursors[i].destroy();
+                    }
+                }
+                rangeCursors = null;
+            }
+        } finally {
+            if (lsmHarness != null) {
+                lsmHarness.endSearch(opCtx);
+            }
+        }
+    }
+
+    @Override
+    public ITupleReference getFilterMinTuple() {
+        return null;
+    }
+
+    @Override
+    public ITupleReference getFilterMaxTuple() {
+        return null;
+    }
+
+    @Override
+    public boolean getSearchOperationCallbackProceedResult() {
+        return false;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/3036c980/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java
index 14ebe46..c3c9c21 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java
@@ -51,6 +51,7 @@ import 
org.apache.hyracks.storage.am.lsm.invertedindex.api.InvertedListCursor;
 import 
org.apache.hyracks.storage.am.lsm.invertedindex.impls.LSMInvertedIndexSearchCursorInitialState;
 import 
org.apache.hyracks.storage.am.lsm.invertedindex.search.InvertedIndexSearchPredicate;
 import 
org.apache.hyracks.storage.am.lsm.invertedindex.search.TOccurrenceSearcher;
+import 
org.apache.hyracks.storage.am.lsm.invertedindex.tuples.TokenKeyPairTuple;
 import org.apache.hyracks.storage.common.IIndexAccessParameters;
 import org.apache.hyracks.storage.common.IIndexAccessor;
 import org.apache.hyracks.storage.common.IIndexBulkLoader;
@@ -230,30 +231,28 @@ public class OnDiskInvertedIndex implements 
IInPlaceInvertedIndex {
         listCursor.open(initState, null);
     }
 
-    public final class OnDiskInvertedIndexBulkLoader implements 
IIndexBulkLoader {
-        private final ArrayTupleBuilder btreeTupleBuilder;
-        private final ArrayTupleReference btreeTupleReference;
-        private final IIndexBulkLoader btreeBulkloader;
+    public abstract class AbstractOnDiskInvertedIndexBulkLoader implements 
IIndexBulkLoader {
+        protected final ArrayTupleBuilder btreeTupleBuilder;
+        protected final ArrayTupleReference btreeTupleReference;
+        protected final IIndexBulkLoader btreeBulkloader;
 
-        private int currentInvListStartPageId;
-        private int currentInvListStartOffset;
-        private final ArrayTupleBuilder lastTupleBuilder;
-        private final ArrayTupleReference lastTuple;
+        protected int currentInvListStartPageId;
+        protected int currentInvListStartOffset;
+        protected final ArrayTupleBuilder lastTupleBuilder;
+        protected final ArrayTupleReference lastTuple;
 
-        private int currentPageId;
-        private ICachedPage currentPage;
-        private final MultiComparator tokenCmp;
-        private final MultiComparator invListCmp;
+        protected int currentPageId;
+        protected ICachedPage currentPage;
+        protected final MultiComparator invListCmp;
 
-        private final boolean verifyInput;
-        private final MultiComparator allCmp;
+        protected final boolean verifyInput;
+        protected final MultiComparator allCmp;
 
-        private final IFIFOPageQueue queue;
+        protected final IFIFOPageQueue queue;
 
-        public OnDiskInvertedIndexBulkLoader(float btreeFillFactor, boolean 
verifyInput, long numElementsHint,
+        public AbstractOnDiskInvertedIndexBulkLoader(float btreeFillFactor, 
boolean verifyInput, long numElementsHint,
                 boolean checkIfEmptyIndex, int startPageId) throws 
HyracksDataException {
             this.verifyInput = verifyInput;
-            this.tokenCmp = 
MultiComparator.create(btree.getComparatorFactories());
             this.invListCmp = MultiComparator.create(invListCmpFactories);
             if (verifyInput) {
                 allCmp = 
MultiComparator.create(btree.getComparatorFactories(), invListCmpFactories);
@@ -272,22 +271,15 @@ public class OnDiskInvertedIndex implements 
IInPlaceInvertedIndex {
             queue = bufferCache.createFIFOQueue();
         }
 
-        public void pinNextPage() throws HyracksDataException {
+        protected void pinNextPage() throws HyracksDataException {
             queue.put(currentPage);
             currentPageId++;
             currentPage = 
bufferCache.confiscatePage(BufferedFileHandle.getDiskPageId(fileId, 
currentPageId));
         }
 
-        private void createAndInsertBTreeTuple() throws HyracksDataException {
+        protected void insertBTreeTuple() throws HyracksDataException {
             // Build tuple.
-            btreeTupleBuilder.reset();
             DataOutput output = btreeTupleBuilder.getDataOutput();
-            // Add key fields.
-            lastTuple.reset(lastTupleBuilder.getFieldEndOffsets(), 
lastTupleBuilder.getByteArray());
-            for (int i = 0; i < numTokenFields; i++) {
-                btreeTupleBuilder.addField(lastTuple.getFieldData(i), 
lastTuple.getFieldStart(i),
-                        lastTuple.getFieldLength(i));
-            }
             // Add inverted-list 'pointer' value fields.
             try {
                 output.writeInt(currentInvListStartPageId);
@@ -304,77 +296,59 @@ public class OnDiskInvertedIndex implements 
IInPlaceInvertedIndex {
             // Reset tuple reference and add it into the BTree load.
             btreeTupleReference.reset(btreeTupleBuilder.getFieldEndOffsets(), 
btreeTupleBuilder.getByteArray());
             btreeBulkloader.add(btreeTupleReference);
+            btreeTupleBuilder.reset();
         }
 
-        /**
-         * Assumptions:
-         * The first btree.getMultiComparator().getKeyFieldCount() fields in 
tuple
-         * are btree keys (e.g., a string token).
-         * The next invListCmp.getKeyFieldCount() fields in tuple are keys of 
the
-         * inverted list (e.g., primary key).
-         * Key fields of inverted list are fixed size.
-         */
-        @Override
-        public void add(ITupleReference tuple) throws HyracksDataException {
-            boolean firstElement = lastTupleBuilder.getSize() == 0;
-            boolean startNewList = firstElement;
-            if (!firstElement) {
-                // If the current and the last token don't match, we start a 
new list.
-                lastTuple.reset(lastTupleBuilder.getFieldEndOffsets(), 
lastTupleBuilder.getByteArray());
-                startNewList = tokenCmp.compare(tuple, lastTuple) != 0;
-            }
-            if (startNewList) {
-                if (!firstElement) {
-                    // Create entry in btree for last inverted list.
-                    createAndInsertBTreeTuple();
-                }
-                if (!invListBuilder.startNewList(tuple, numTokenFields)) {
-                    pinNextPage();
-                    
invListBuilder.setTargetBuffer(currentPage.getBuffer().array(), 0);
-                    if (!invListBuilder.startNewList(tuple, numTokenFields)) {
-                        throw new IllegalStateException("Failed to create 
first inverted list.");
-                    }
-                }
-                currentInvListStartPageId = currentPageId;
-                currentInvListStartOffset = invListBuilder.getPos();
-            } else {
-                if (invListCmp.compare(tuple, lastTuple, numTokenFields) == 0) 
{
-                    // Duplicate inverted-list element.
-                    return;
+        protected void startNewList(ITupleReference tokenTuple) throws 
HyracksDataException {
+            if (!invListBuilder.startNewList(tokenTuple, numTokenFields)) {
+                pinNextPage();
+                
invListBuilder.setTargetBuffer(currentPage.getBuffer().array(), 0);
+                if (!invListBuilder.startNewList(tokenTuple, numTokenFields)) {
+                    throw new IllegalStateException("Failed to create first 
inverted list.");
                 }
             }
+            currentInvListStartPageId = currentPageId;
+            currentInvListStartOffset = invListBuilder.getPos();
+        }
 
-            // Append to current inverted list.
-            if (!invListBuilder.appendElement(tuple, numTokenFields, 
numInvListKeys)) {
+        protected void appendInvertedList(ITupleReference keyTuple, int 
startField) throws HyracksDataException {
+            if (!invListBuilder.appendElement(keyTuple, startField, 
numInvListKeys)) {
                 pinNextPage();
                 
invListBuilder.setTargetBuffer(currentPage.getBuffer().array(), 0);
-                if (!invListBuilder.appendElement(tuple, numTokenFields, 
numInvListKeys)) {
+                if (!invListBuilder.appendElement(keyTuple, startField, 
numInvListKeys)) {
                     throw new IllegalStateException(
                             "Failed to append element to inverted list after 
switching to a new page.");
                 }
             }
+        }
 
-            if (verifyInput && lastTupleBuilder.getSize() != 0) {
-                if (allCmp.compare(tuple, lastTuple) <= 0) {
-                    throw new HyracksDataException(
-                            "Input stream given to OnDiskInvertedIndex bulk 
load is not sorted.");
-                }
+        protected void verifyTuple(ITupleReference tuple) throws 
HyracksDataException {
+            if (lastTupleBuilder.getSize() > 0 && allCmp.compare(tuple, 
lastTuple) <= 0) {
+                HyracksDataException.create(ErrorCode.UNSORTED_LOAD_INPUT);
             }
+        }
 
-            // Remember last tuple by creating a copy.
-            // TODO: This portion can be optimized by only copying the token 
when it changes, and using the last appended inverted-list element as a 
reference.
+        protected void saveLastTuple(ITupleReference tuple) throws 
HyracksDataException {
             lastTupleBuilder.reset();
             for (int i = 0; i < tuple.getFieldCount(); i++) {
                 lastTupleBuilder.addField(tuple.getFieldData(i), 
tuple.getFieldStart(i), tuple.getFieldLength(i));
             }
+            lastTuple.reset(lastTupleBuilder.getFieldEndOffsets(), 
lastTupleBuilder.getByteArray());
+        }
+
+        protected void copyTokenToBTreeTuple(ITupleReference tokenTuple) 
throws HyracksDataException {
+            for (int i = 0; i < numTokenFields; i++) {
+                btreeTupleBuilder.addField(tokenTuple.getFieldData(i), 
tokenTuple.getFieldStart(i),
+                        tokenTuple.getFieldLength(i));
+            }
         }
 
         @Override
         public void end() throws HyracksDataException {
-            // The last tuple builder is empty if add() was never called.
-            if (lastTupleBuilder.getSize() != 0) {
-                createAndInsertBTreeTuple();
+            if (btreeTupleBuilder.getSize() != 0) {
+                insertBTreeTuple();
             }
+
             btreeBulkloader.end();
 
             if (currentPage != null) {
@@ -392,6 +366,72 @@ public class OnDiskInvertedIndex implements 
IInPlaceInvertedIndex {
         }
     }
 
+    public class OnDiskInvertedIndexMergeBulkLoader extends 
AbstractOnDiskInvertedIndexBulkLoader {
+
+        public OnDiskInvertedIndexMergeBulkLoader(float btreeFillFactor, 
boolean verifyInput, long numElementsHint,
+                boolean checkIfEmptyIndex, int startPageId) throws 
HyracksDataException {
+            super(btreeFillFactor, verifyInput, numElementsHint, 
checkIfEmptyIndex, startPageId);
+        }
+
+        @Override
+        public void add(ITupleReference tuple) throws HyracksDataException {
+            TokenKeyPairTuple pairTuple = (TokenKeyPairTuple) tuple;
+            ITupleReference tokenTuple = pairTuple.getTokenTuple();
+            ITupleReference keyTuple = pairTuple.getKeyTuple();
+            boolean startNewList = pairTuple.isNewToken();
+            if (startNewList) {
+                if (btreeTupleBuilder.getSize() > 0) {
+                    insertBTreeTuple();
+                }
+                startNewList(tokenTuple);
+                copyTokenToBTreeTuple(tokenTuple);
+            }
+            appendInvertedList(keyTuple, 0);
+            if (verifyInput) {
+                verifyTuple(tuple);
+                saveLastTuple(tuple);
+            }
+        }
+    }
+
+    public class OnDiskInvertedIndexBulkLoader extends 
AbstractOnDiskInvertedIndexBulkLoader {
+
+        public OnDiskInvertedIndexBulkLoader(float btreeFillFactor, boolean 
verifyInput, long numElementsHint,
+                boolean checkIfEmptyIndex, int startPageId) throws 
HyracksDataException {
+            super(btreeFillFactor, verifyInput, numElementsHint, 
checkIfEmptyIndex, startPageId);
+        }
+
+        @Override
+        public void add(ITupleReference tuple) throws HyracksDataException {
+            boolean firstElement = btreeTupleBuilder.getSize() == 0;
+            boolean startNewList = firstElement;
+            if (!firstElement) {
+                // If the current and the last token don't match, we start a 
new list.
+                startNewList = !TupleUtils.equalTuples(tuple, lastTuple, 
numTokenFields);
+            }
+            if (startNewList) {
+                if (!firstElement) {
+                    // Create entry in btree for last inverted list.
+                    insertBTreeTuple();
+                }
+                startNewList(tuple);
+                copyTokenToBTreeTuple(tuple);
+            } else {
+                if (invListCmp.compare(tuple, lastTuple, numTokenFields) == 0) 
{
+                    // Duplicate inverted-list element.
+                    return;
+                }
+            }
+            appendInvertedList(tuple, numTokenFields);
+            if (verifyInput) {
+                verifyTuple(tuple);
+            }
+
+            saveLastTuple(tuple);
+        }
+
+    }
+
     @Override
     public IBufferCache getBufferCache() {
         return bufferCache;
@@ -518,6 +558,12 @@ public class OnDiskInvertedIndex implements 
IInPlaceInvertedIndex {
                 rootPageId);
     }
 
+    public IIndexBulkLoader createMergeBulkLoader(float fillFactor, boolean 
verifyInput, long numElementsHint,
+            boolean checkIfEmptyIndex) throws HyracksDataException {
+        return new OnDiskInvertedIndexMergeBulkLoader(fillFactor, verifyInput, 
numElementsHint, checkIfEmptyIndex,
+                rootPageId);
+    }
+
     @Override
     public void validate() throws HyracksDataException {
         btree.validate();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/3036c980/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndexRangeSearchCursor.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndexRangeSearchCursor.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndexRangeSearchCursor.java
index 9d99c9e..11b483e 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndexRangeSearchCursor.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndexRangeSearchCursor.java
@@ -24,9 +24,9 @@ import org.apache.hyracks.storage.am.btree.impls.BTree;
 import org.apache.hyracks.storage.am.btree.impls.RangePredicate;
 import org.apache.hyracks.storage.am.common.api.IIndexOperationContext;
 import org.apache.hyracks.storage.am.common.impls.NoOpIndexAccessParameters;
-import org.apache.hyracks.storage.am.common.tuples.ConcatenatingTupleReference;
 import org.apache.hyracks.storage.am.common.tuples.PermutingTupleReference;
 import org.apache.hyracks.storage.am.lsm.invertedindex.api.InvertedListCursor;
+import 
org.apache.hyracks.storage.am.lsm.invertedindex.tuples.TokenKeyPairTuple;
 import org.apache.hyracks.storage.common.EnforcedIndexCursor;
 import org.apache.hyracks.storage.common.ICursorInitialState;
 import org.apache.hyracks.storage.common.IIndexAccessor;
@@ -49,7 +49,7 @@ public class OnDiskInvertedIndexRangeSearchCursor extends 
EnforcedIndexCursor {
     private RangePredicate btreePred;
 
     private final PermutingTupleReference tokenTuple;
-    private final ConcatenatingTupleReference concatTuple;
+    private final TokenKeyPairTuple resultTuple;
 
     public OnDiskInvertedIndexRangeSearchCursor(OnDiskInvertedIndex invIndex, 
IIndexOperationContext opCtx)
             throws HyracksDataException {
@@ -64,7 +64,7 @@ public class OnDiskInvertedIndexRangeSearchCursor extends 
EnforcedIndexCursor {
         }
         tokenTuple = new PermutingTupleReference(fieldPermutation);
         btreeCursor = btreeAccessor.createSearchCursor(false);
-        concatTuple = new ConcatenatingTupleReference(2);
+        resultTuple = new 
TokenKeyPairTuple(invIndex.getTokenTypeTraits().length, 
btree.getCmpFactories().length);
         invListRangeSearchCursor = 
invIndex.createInvertedListRangeSearchCursor();
         isInvListCursorOpen = false;
     }
@@ -95,10 +95,7 @@ public class OnDiskInvertedIndexRangeSearchCursor extends 
EnforcedIndexCursor {
     @Override
     public void doNext() throws HyracksDataException {
         invListRangeSearchCursor.next();
-        if (concatTuple.hasMaxTuples()) {
-            concatTuple.removeLastTuple();
-        }
-        concatTuple.addTuple(invListRangeSearchCursor.getTuple());
+        resultTuple.setKeyTuple(invListRangeSearchCursor.getTuple());
     }
 
     @Override
@@ -123,7 +120,7 @@ public class OnDiskInvertedIndexRangeSearchCursor extends 
EnforcedIndexCursor {
 
     @Override
     public ITupleReference doGetTuple() {
-        return concatTuple;
+        return resultTuple;
     }
 
     // Opens an inverted-list-scan cursor for the given tuple.
@@ -135,8 +132,7 @@ public class OnDiskInvertedIndexRangeSearchCursor extends 
EnforcedIndexCursor {
                     (OnDiskInvertedIndexOpContext) opCtx);
             invListRangeSearchCursor.prepareLoadPages();
             invListRangeSearchCursor.loadPages();
-            concatTuple.reset();
-            concatTuple.addTuple(tokenTuple);
+            resultTuple.setTokenTuple(tokenTuple);
             isInvListCursorOpen = true;
         } else {
             isInvListCursorOpen = false;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/3036c980/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/tuples/TokenKeyPairTuple.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/tuples/TokenKeyPairTuple.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/tuples/TokenKeyPairTuple.java
new file mode 100644
index 0000000..102fe96
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/tuples/TokenKeyPairTuple.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.storage.am.lsm.invertedindex.tuples;
+
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+
+public class TokenKeyPairTuple implements ITupleReference {
+
+    private ITupleReference tokenTuple;
+    private ITupleReference keyTuple;
+
+    private final int tokenFieldCount;
+    private final int keyFieldCount;
+
+    private boolean newToken;
+
+    public TokenKeyPairTuple(int tokenFieldCount, int keyFieldCount) {
+        this.tokenFieldCount = tokenFieldCount;
+        this.keyFieldCount = keyFieldCount;
+
+    }
+
+    public void setTokenTuple(ITupleReference token) {
+        this.tokenTuple = token;
+        this.keyTuple = null;
+    }
+
+    public void setKeyTuple(ITupleReference key) {
+        newToken = this.keyTuple == null;
+        this.keyTuple = key;
+    }
+
+    public ITupleReference getTokenTuple() {
+        return tokenTuple;
+    }
+
+    public ITupleReference getKeyTuple() {
+        return keyTuple;
+    }
+
+    @Override
+    public int getFieldCount() {
+        return tokenFieldCount + keyFieldCount;
+    }
+
+    @Override
+    public byte[] getFieldData(int fIdx) {
+        ITupleReference tuple = getTuple(fIdx);
+        int fieldIndex = getFieldIndex(fIdx);
+        return tuple.getFieldData(fieldIndex);
+    }
+
+    @Override
+    public int getFieldStart(int fIdx) {
+        ITupleReference tuple = getTuple(fIdx);
+        int fieldIndex = getFieldIndex(fIdx);
+        return tuple.getFieldStart(fieldIndex);
+    }
+
+    @Override
+    public int getFieldLength(int fIdx) {
+        ITupleReference tuple = getTuple(fIdx);
+        int fieldIndex = getFieldIndex(fIdx);
+        return tuple.getFieldLength(fieldIndex);
+    }
+
+    private ITupleReference getTuple(int fIdx) {
+        return fIdx < tokenFieldCount ? tokenTuple : keyTuple;
+    }
+
+    private int getFieldIndex(int fIdx) {
+        return fIdx < tokenFieldCount ? fIdx : fIdx - tokenFieldCount;
+    }
+
+    public boolean isNewToken() {
+        return newToken;
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/3036c980/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
index 4510618..fae6e1a 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
@@ -45,6 +45,7 @@ import 
org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
 import 
org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentBulkLoader;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentFactory;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
+import 
org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
 import 
org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
@@ -134,8 +135,8 @@ public class LSMRTree extends AbstractLSMRTree {
                 rTreeTupleSorter.sort();
                 component = createDiskComponent(componentFactory, 
flushOp.getTarget(), flushOp.getBTreeTarget(),
                         flushOp.getBloomFilterTarget(), true);
-                componentBulkLoader =
-                        component.createBulkLoader(1.0f, false, 
numBTreeTuples.longValue(), false, false, false);
+                componentBulkLoader = 
component.createBulkLoader(LSMIOOperationType.FLUSH, 1.0f, false,
+                        numBTreeTuples.longValue(), false, false, false);
                 flushLoadRTree(isEmpty, rTreeTupleSorter, componentBulkLoader);
                 // scan the memory BTree and bulk load delete tuples
                 flushLoadBtree(memBTreeAccessor, componentBulkLoader, 
btreeNullPredicate);
@@ -331,12 +332,13 @@ public class LSMRTree extends AbstractLSMRTree {
                         numElements += ((LSMRTreeDiskComponent) 
mergeOp.getMergingComponents().get(i)).getBloomFilter()
                                 .getNumElements();
                     }
-                    componentBulkLoader =
-                            mergedComponent.createBulkLoader(1.0f, false, 
numElements, false, false, false);
+                    componentBulkLoader = 
mergedComponent.createBulkLoader(LSMIOOperationType.MERGE, 1.0f, false,
+                            numElements, false, false, false);
                     mergeLoadBTree(opCtx, rtreeSearchPred, 
componentBulkLoader);
                 } else {
                     //no buddy-btree needed
-                    componentBulkLoader = 
mergedComponent.createBulkLoader(1.0f, false, 0L, false, false, false);
+                    componentBulkLoader = 
mergedComponent.createBulkLoader(LSMIOOperationType.MERGE, 1.0f, false, 0L,
+                            false, false, false);
                 }
                 //search old rtree components
                 while (cursor.hasNext()) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/3036c980/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
index a3ba4b1..f4e919a 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
@@ -40,6 +40,7 @@ import 
org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
 import 
org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentBulkLoader;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentFactory;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
+import 
org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
 import 
org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
@@ -109,7 +110,8 @@ public class LSMRTreeWithAntiMatterTuples extends 
AbstractLSMRTree {
                 try {
                     memRTreeAccessor.search(rtreeScanCursor, 
rtreeNullPredicate);
                     component = createDiskComponent(componentFactory, 
flushOp.getTarget(), null, null, true);
-                    componentBulkLoader = component.createBulkLoader(1.0f, 
false, 0L, false, false, false);
+                    componentBulkLoader =
+                            
component.createBulkLoader(LSMIOOperationType.FLUSH, 1.0f, false, 0L, false, 
false, false);
                     // Since the LSM-RTree is used as a secondary assumption, 
the
                     // primary key will be the last comparator in the BTree 
comparators
                     rTreeTupleSorter = new 
TreeTupleSorter(flushingComponent.getIndex().getFileId(), linearizerArray,
@@ -235,7 +237,7 @@ public class LSMRTreeWithAntiMatterTuples extends 
AbstractLSMRTree {
         ILSMDiskComponent component = createDiskComponent(componentFactory, 
mergeOp.getTarget(), null, null, true);
 
         ILSMDiskComponentBulkLoader componentBulkLoader =
-                component.createBulkLoader(1.0f, false, 0L, false, false, 
false);
+                component.createBulkLoader(LSMIOOperationType.MERGE, 1.0f, 
false, 0L, false, false, false);
         try {
             while (cursor.hasNext()) {
                 cursor.next();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/3036c980/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/org/apache/hyracks/storage/am/lsm/invertedindex/common/AbstractInvertedIndexTest.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/org/apache/hyracks/storage/am/lsm/invertedindex/common/AbstractInvertedIndexTest.java
 
b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/org/apache/hyracks/storage/am/lsm/invertedindex/common/AbstractInvertedIndexTest.java
index a420ba9..da87b27 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/org/apache/hyracks/storage/am/lsm/invertedindex/common/AbstractInvertedIndexTest.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/org/apache/hyracks/storage/am/lsm/invertedindex/common/AbstractInvertedIndexTest.java
@@ -25,6 +25,7 @@ import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.common.datagen.TupleGenerator;
 import org.apache.hyracks.storage.am.config.AccessMethodTestsConfig;
 import 
org.apache.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndexSearchModifier;
+import org.apache.hyracks.storage.am.lsm.invertedindex.impls.LSMInvertedIndex;
 import 
org.apache.hyracks.storage.am.lsm.invertedindex.search.ConjunctiveSearchModifier;
 import 
org.apache.hyracks.storage.am.lsm.invertedindex.search.JaccardSearchModifier;
 import 
org.apache.hyracks.storage.am.lsm.invertedindex.util.LSMInvertedIndexTestContext;
@@ -84,6 +85,12 @@ public abstract class AbstractInvertedIndexTest {
             LSMInvertedIndexTestUtils.compareActualAndExpectedIndexes(testCtx);
         }
         
LSMInvertedIndexTestUtils.compareActualAndExpectedIndexesRangeSearch(testCtx);
+        if (invIndexType == InvertedIndexType.LSM || invIndexType == 
InvertedIndexType.PARTITIONED_LSM) {
+            LSMInvertedIndex lsmIndex = (LSMInvertedIndex) invIndex;
+            if (!lsmIndex.isMemoryComponentsAllocated() || 
lsmIndex.isCurrentMutableComponentEmpty()) {
+                
LSMInvertedIndexTestUtils.compareActualAndExpectedIndexesMergeSearch(testCtx);
+            }
+        }
     }
 
     /**

Reply via email to