Repository: asterixdb
Updated Branches:
  refs/heads/master 86cbec537 -> a7fa05bb0


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a7fa05bb/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadNodePushable.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadNodePushable.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadNodePushable.java
new file mode 100644
index 0000000..f09bcd2
--- /dev/null
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadNodePushable.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.operators;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import 
org.apache.asterix.runtime.operators.LSMSecondaryIndexCreationTupleProcessorNodePushable.DeletedTupleCounter;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.primitive.LongPointable;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper;
+import 
org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+import org.apache.hyracks.storage.am.common.tuples.PermutingTupleReference;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+import 
org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentBulkLoader;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentId;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
+import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
+
+/**
+ * This operator node is used to bulk load incoming tuples (scanned from the 
primary index)
+ * into multiple disk components of the secondary index.
+ * Incoming tuple format:
+ * [component pos, anti-matter flag, secondary keys, primary keys, filter 
values]
+ */
+public class LSMSecondaryIndexBulkLoadNodePushable extends 
AbstractLSMSecondaryIndexCreationNodePushable {
+    // with tag fields
+
+    private final PermutingTupleReference sourceTuple;
+    private final PermutingTupleReference deletedKeyTuple;
+
+    private final IIndexDataflowHelper primaryIndexHelper;
+    private final IIndexDataflowHelper secondaryIndexHelper;
+
+    private ILSMIndex primaryIndex;
+    private ILSMIndex secondaryIndex;
+
+    private ILSMDiskComponent component;
+    private ILSMDiskComponentBulkLoader componentBulkLoader;
+    private int currentComponentPos = -1;
+
+    private ILSMDiskComponent[] diskComponents;
+
+    public LSMSecondaryIndexBulkLoadNodePushable(IHyracksTaskContext ctx, int 
partition, RecordDescriptor inputRecDesc,
+            IIndexDataflowHelperFactory primaryIndexHelperFactory,
+            IIndexDataflowHelperFactory secondaryIndexHelperFactory, int 
numTagFields, int numSecondaryKeys,
+            int numPrimaryKeys, boolean hasBuddyBTree) throws 
HyracksDataException {
+        super(ctx, partition, inputRecDesc, numTagFields, numSecondaryKeys, 
numPrimaryKeys, hasBuddyBTree);
+
+        this.primaryIndexHelper =
+                
primaryIndexHelperFactory.create(ctx.getJobletContext().getServiceContext(), 
partition);
+        this.secondaryIndexHelper =
+                
secondaryIndexHelperFactory.create(ctx.getJobletContext().getServiceContext(), 
partition);
+
+        int[] sourcePermutation = new int[inputRecDesc.getFieldCount() - 
numTagFields];
+        for (int i = 0; i < sourcePermutation.length; i++) {
+            sourcePermutation[i] = i + numTagFields;
+        }
+        sourceTuple = new PermutingTupleReference(sourcePermutation);
+
+        int[] deletedKeyPermutation = new int[inputRecDesc.getFieldCount() - 
numTagFields - numSecondaryKeys];
+        for (int i = 0; i < deletedKeyPermutation.length; i++) {
+            deletedKeyPermutation[i] = i + numTagFields + numSecondaryKeys;
+        }
+        deletedKeyTuple = new PermutingTupleReference(deletedKeyPermutation);
+    }
+
+    @Override
+    public void open() throws HyracksDataException {
+        super.open();
+        primaryIndexHelper.open();
+        primaryIndex = (ILSMIndex) primaryIndexHelper.getIndexInstance();
+        diskComponents = new 
ILSMDiskComponent[primaryIndex.getImmutableComponents().size()];
+        secondaryIndexHelper.open();
+        secondaryIndex = (ILSMIndex) secondaryIndexHelper.getIndexInstance();
+
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+        HyracksDataException closeException = null;
+        try {
+            endCurrentComponent();
+        } catch (HyracksDataException e) {
+            closeException = e;
+        }
+
+        activateComponents();
+
+        try {
+            if (primaryIndexHelper != null) {
+                primaryIndexHelper.close();
+            }
+        } catch (HyracksDataException e) {
+            if (closeException == null) {
+                closeException = e;
+            } else {
+                closeException.addSuppressed(e);
+            }
+        }
+
+        try {
+            if (secondaryIndexHelper != null) {
+                secondaryIndexHelper.close();
+            }
+        } catch (HyracksDataException e) {
+            if (closeException == null) {
+                closeException = e;
+            } else {
+                closeException.addSuppressed(e);
+            }
+        }
+
+        try {
+            // will definitely be called regardless of exceptions
+            writer.close();
+        } catch (HyracksDataException th) {
+            if (closeException == null) {
+                closeException = th;
+            } else {
+                closeException.addSuppressed(th);
+            }
+        }
+
+        if (closeException != null) {
+            throw closeException;
+        }
+    }
+
+    @Override
+    public void flush() throws HyracksDataException {
+        writer.flush();
+    }
+
+    @Override
+    public void fail() throws HyracksDataException {
+        writer.fail();
+    }
+
+    @Override
+    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        accessor.reset(buffer);
+        int tupleCount = accessor.getTupleCount();
+        for (int i = 0; i < tupleCount; i++) {
+            try {
+                // if both previous value and new value are null, then we skip
+                tuple.reset(accessor, i);
+                int componentPos = getComponentPos(tuple);
+                if (componentPos != currentComponentPos) {
+                    loadNewComponent(componentPos);
+                    currentComponentPos = componentPos;
+                }
+
+                if (isAntiMatterTuple(tuple)) {
+                    addAntiMatterTuple(tuple);
+                } else {
+                    addMatterTuple(tuple);
+                }
+            } catch (Exception e) {
+                throw HyracksDataException.create(e);
+            }
+        }
+    }
+
+    private void endCurrentComponent() throws HyracksDataException {
+        if (component != null) {
+            // set disk component id
+
+            componentBulkLoader.end();
+            diskComponents[currentComponentPos] = component;
+
+            componentBulkLoader = null;
+            component = null;
+        }
+    }
+
+    private void loadNewComponent(int componentPos) throws 
HyracksDataException {
+        endCurrentComponent();
+
+        component = secondaryIndex.createBulkLoadTarget();
+        componentBulkLoader = 
secondaryIndex.createComponentBulkLoader(component, 1.0f, false,
+                getNumDeletedTuples(componentPos), false, true, true);
+
+    }
+
+    private void addAntiMatterTuple(ITupleReference tuple) throws 
HyracksDataException {
+        if (hasBuddyBTree) {
+            deletedKeyTuple.reset(tuple);
+            componentBulkLoader.delete(deletedKeyTuple);
+        } else {
+            sourceTuple.reset(tuple);
+            componentBulkLoader.delete(sourceTuple);
+        }
+    }
+
+    private void addMatterTuple(ITupleReference tuple) throws 
HyracksDataException {
+        sourceTuple.reset(tuple);
+        componentBulkLoader.add(sourceTuple);
+
+    }
+
+    private void activateComponents() throws HyracksDataException {
+        List<ILSMDiskComponent> primaryComponents = 
primaryIndex.getImmutableComponents();
+        for (int i = diskComponents.length - 1; i >= 0; i--) {
+            // start from the oldest component to the newest component
+            if (diskComponents[i] != null && 
diskComponents[i].getComponentSize() > 0) {
+                
secondaryIndex.getIOOperationCallback().afterOperation(LSMOperationType.FLUSH, 
null, diskComponents[i]);
+
+                // setting component id has to be place between afterOperation 
and addBulkLoadedComponent,
+                // since afterOperation would set a flush component id (but 
it's not invalid)
+                // and addBulkLoadedComponent would finalize the component
+                ILSMDiskComponentId primaryComponentId = 
primaryComponents.get(i).getComponentId();
+                //set component id
+                
diskComponents[i].getMetadata().put(ILSMDiskComponentId.COMPONENT_ID_MIN_KEY,
+                        
LongPointable.FACTORY.createPointable(primaryComponentId.getMinId()));
+                
diskComponents[i].getMetadata().put(ILSMDiskComponentId.COMPONENT_ID_MAX_KEY,
+                        
LongPointable.FACTORY.createPointable(primaryComponentId.getMaxId()));
+
+                ((AbstractLSMIndex) 
secondaryIndex).getLsmHarness().addBulkLoadedComponent(diskComponents[i]);
+
+            }
+        }
+    }
+
+    private int getNumDeletedTuples(int componentPos) {
+        DeletedTupleCounter counter = (DeletedTupleCounter) 
ctx.getStateObject(partition);
+        return counter.get(componentPos);
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a7fa05bb/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadOperatorDescriptor.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadOperatorDescriptor.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadOperatorDescriptor.java
new file mode 100644
index 0000000..804b700
--- /dev/null
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadOperatorDescriptor.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.runtime.operators;
+
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
+import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
+import 
org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import 
org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+
+public class LSMSecondaryIndexBulkLoadOperatorDescriptor extends 
AbstractSingleActivityOperatorDescriptor {
+
+    private static final long serialVersionUID = 1L;
+
+    private final IIndexDataflowHelperFactory primaryIndexHelperFactory;
+    private final IIndexDataflowHelperFactory secondaryIndexHelperFactory;
+
+    private final int numTagFields;
+    private final int numSecondaryKeys;
+    private final int numPrimaryKeys;
+
+    private final boolean hasBuddyBTree;
+
+    public 
LSMSecondaryIndexBulkLoadOperatorDescriptor(IOperatorDescriptorRegistry spec, 
RecordDescriptor outRecDesc,
+            IIndexDataflowHelperFactory primaryIndexHelperFactory,
+            IIndexDataflowHelperFactory secondaryIndexHelperFactory, int 
numTagFields, int numSecondaryKeys,
+            int numPrimaryKeys, boolean hasBuddyBTree) {
+        super(spec, 1, 1);
+        this.outRecDescs[0] = outRecDesc;
+        this.primaryIndexHelperFactory = primaryIndexHelperFactory;
+        this.secondaryIndexHelperFactory = secondaryIndexHelperFactory;
+        this.numTagFields = numTagFields;
+        this.numSecondaryKeys = numSecondaryKeys;
+        this.numPrimaryKeys = numPrimaryKeys;
+        this.hasBuddyBTree = hasBuddyBTree;
+
+    }
+
+    @Override
+    public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+            IRecordDescriptorProvider recordDescProvider, int partition, int 
nPartitions) throws HyracksDataException {
+        return new LSMSecondaryIndexBulkLoadNodePushable(ctx, partition,
+                recordDescProvider.getInputRecordDescriptor(getActivityId(), 
0), primaryIndexHelperFactory,
+                secondaryIndexHelperFactory, numTagFields, numSecondaryKeys, 
numPrimaryKeys, hasBuddyBTree);
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a7fa05bb/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexCreationTupleProcessorNodePushable.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexCreationTupleProcessorNodePushable.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexCreationTupleProcessorNodePushable.java
new file mode 100644
index 0000000..b2a2fa4
--- /dev/null
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexCreationTupleProcessorNodePushable.java
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.operators;
+
+import java.io.DataOutput;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.TypeTagUtil;
+import org.apache.hyracks.api.comm.VSizeFrame;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.IMissingWriter;
+import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleReference;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import 
org.apache.hyracks.dataflow.common.data.marshalling.BooleanSerializerDeserializer;
+import 
org.apache.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import org.apache.hyracks.dataflow.common.utils.TupleUtils;
+import org.apache.hyracks.dataflow.std.base.AbstractStateObject;
+
+/**
+ * This operator node is used for post-processing tuples scanned from the 
primary index
+ * when creating a new secondary index.
+ * The incoming tuples could be either a matter-tuple or an anti-matter tuple.
+ * For an anti-matter tuple, it outputs a corresponding anti-matter tuple
+ * (if the secondary index has a buddy btree, then the result tuple only has 
primary key;
+ * otherwise, the result tuple has both secondary keys and primary key)
+ * For each matter-tuple (here old secondary keys refer to the secondary keys 
of the previous
+ * matter tuple with the same primary key)
+ * -If old secondary keys == new secondary keys
+ * --If with buddy btree
+ * ---generate a deleted-tuple
+ * --generate a new key
+ * -else
+ * --If old secondary keys are null?
+ * ---do nothing
+ * --else
+ * ---delete old secondary keys
+ * --If new keys are null?
+ * ---do nothing
+ * --else
+ * ---insert new keys
+ *
+ * Incoming tuple format:
+ * [component pos, anti-matter flag, secondary keys, primary keys, filter 
values]
+ */
+public class LSMSecondaryIndexCreationTupleProcessorNodePushable extends 
AbstractLSMSecondaryIndexCreationNodePushable {
+    // prevSourceTuple stores the previous matter tuple
+    private final ArrayTupleBuilder prevMatterTupleBuilder;
+    private final ArrayTupleReference prevMatterTuple = new 
ArrayTupleReference();
+
+    private boolean hasPrevMatterTuple;
+
+    private FrameTupleAppender appender;
+    private ArrayTupleBuilder tb;
+    private DataOutput dos;
+    private final IMissingWriter missingWriter;
+    private DeletedTupleCounter deletedTupleCounter;
+
+    public static class DeletedTupleCounter extends AbstractStateObject {
+        private final Map<Integer, Integer> map = new HashMap<>();
+
+        public DeletedTupleCounter(JobId jobId, int partition) {
+            super(jobId, partition);
+        }
+
+        public int get(int componentPos) {
+            Integer value = map.get(componentPos);
+            return value != null ? value : 0;
+        }
+
+        public void inc(int componentPos) {
+            map.put(componentPos, get(componentPos) + 1);
+        }
+    }
+
+    public 
LSMSecondaryIndexCreationTupleProcessorNodePushable(IHyracksTaskContext ctx, 
int partition,
+            RecordDescriptor inputRecDesc, IMissingWriterFactory 
missingWriterFactory, int numTagFields,
+            int numSecondaryKeys, int numPrimaryKeys, boolean hasBuddyBTree) 
throws HyracksDataException {
+
+        super(ctx, partition, inputRecDesc, numTagFields, numSecondaryKeys, 
numPrimaryKeys, hasBuddyBTree);
+
+        this.prevMatterTupleBuilder = new 
ArrayTupleBuilder(inputRecDesc.getFieldCount());
+
+        if (this.hasBuddyBTree) {
+            missingWriter = missingWriterFactory.createMissingWriter();
+        } else {
+            missingWriter = null;
+        }
+
+    }
+
+    @Override
+    public void open() throws HyracksDataException {
+        super.open();
+        deletedTupleCounter = new 
DeletedTupleCounter(ctx.getJobletContext().getJobId(), partition);
+        ctx.setStateObject(deletedTupleCounter);
+        try {
+            tb = new ArrayTupleBuilder(recordDesc.getFieldCount());
+            dos = tb.getDataOutput();
+            appender = new FrameTupleAppender(new VSizeFrame(ctx), true);
+        } catch (Exception e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+        HyracksDataException closeException = null;
+        try {
+            if (appender.getTupleCount() > 0) {
+                appender.write(writer, true);
+            }
+        } catch (HyracksDataException e) {
+            closeException = e;
+        }
+        try {
+            // will definitely be called regardless of exceptions
+            writer.close();
+        } catch (HyracksDataException th) {
+            if (closeException == null) {
+                closeException = th;
+            } else {
+                closeException.addSuppressed(th);
+            }
+        }
+        if (closeException != null) {
+            throw closeException;
+        }
+    }
+
+    @Override
+    public void flush() throws HyracksDataException {
+        appender.flush(writer);
+    }
+
+    @Override
+    public void fail() throws HyracksDataException {
+        writer.fail();
+    }
+
+    @Override
+    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        accessor.reset(buffer);
+        int tupleCount = accessor.getTupleCount();
+        for (int i = 0; i < tupleCount; i++) {
+            try {
+                // if both previous value and new value are null, then we skip
+                tuple.reset(accessor, i);
+                if (isAntiMatterTuple(tuple)) {
+                    processAntiMatterTuple(tuple);
+                    hasPrevMatterTuple = false;
+                } else {
+                    processMatterTuple(tuple);
+                    // save the matter tuple
+                    TupleUtils.copyTuple(prevMatterTupleBuilder, tuple, 
recordDesc.getFieldCount());
+                    
prevMatterTuple.reset(prevMatterTupleBuilder.getFieldEndOffsets(),
+                            prevMatterTupleBuilder.getByteArray());
+                    hasPrevMatterTuple = true;
+                }
+            } catch (Exception e) {
+                throw HyracksDataException.create(e);
+            }
+        }
+    }
+
+    private void processMatterTuple(ITupleReference tuple) throws 
HyracksDataException {
+
+        boolean isNewValueMissing = isSecondaryKeyMissing(tuple);
+        boolean isOldValueMissing = !hasPrevMatterTuple || 
!equalPrimaryKeys(tuple, prevMatterTuple)
+                || isSecondaryKeyMissing(prevMatterTuple);
+
+        if (isNewValueMissing && isOldValueMissing) {
+            // if both values are missing, then do nothing
+            return;
+        }
+        // At least one is not null
+        if (!isOldValueMissing && equalSecondaryKeys(prevMatterTuple, tuple)) {
+            if (hasBuddyBTree) {
+                // if the index has buddy btree, then we have to delete the 
index entry
+                // from the older disk components
+                writeAntiMatterTuple(prevMatterTuple, getComponentPos(tuple));
+            }
+            // we need to write the new tuple anyway
+            writeMatterTuple(tuple);
+            return;
+        }
+        if (!isOldValueMissing) {
+            // we need to delete the previous entry
+            writeAntiMatterTuple(prevMatterTuple, getComponentPos(tuple));
+        }
+        if (!isNewValueMissing) {
+            // we need to insert the new entry
+            writeMatterTuple(tuple);
+        }
+
+    }
+
+    private void processAntiMatterTuple(ITupleReference tuple) throws 
HyracksDataException {
+        boolean isNewValueMissing = isSecondaryKeyMissing(tuple);
+        // if the secondary value is missing (which means the secondary value 
of the previous matter tuple
+        // is also missing), we then simply ignore this tuple since there is 
nothing to delete
+        if (!isNewValueMissing) {
+            writeAntiMatterTuple(tuple, getComponentPos(tuple));
+        }
+    }
+
+    private void writeAntiMatterTuple(ITupleReference tuple, int componentPos) 
throws HyracksDataException {
+        deletedTupleCounter.inc(componentPos);
+        tb.reset();
+        // write tag fields
+        tb.addField(IntegerSerializerDeserializer.INSTANCE, componentPos);
+        tb.addField(BooleanSerializerDeserializer.INSTANCE, true);
+        if (hasBuddyBTree) {
+            // the output tuple does not have secondary keys (only primary 
keys + filter values)
+            // write secondary keys (missing)
+            for (int i = 0; i < numSecondaryKeys; i++) {
+                missingWriter.writeMissing(dos);
+                tb.addFieldEndOffset();
+            }
+        } else {
+            for (int i = numTagFields; i < numTagFields + numSecondaryKeys; 
i++) {
+                tb.addField(tuple.getFieldData(i), tuple.getFieldStart(i), 
tuple.getFieldLength(i));
+            }
+        }
+
+        // write all remaining fields
+        for (int i = numTagFields + numSecondaryKeys; i < 
recordDesc.getFieldCount(); i++) {
+            tb.addField(tuple.getFieldData(i), tuple.getFieldStart(i), 
tuple.getFieldLength(i));
+        }
+
+        FrameUtils.appendToWriter(writer, appender, tb.getFieldEndOffsets(), 
tb.getByteArray(), 0, tb.getSize());
+    }
+
+    private void writeMatterTuple(ITupleReference tuple) throws 
HyracksDataException {
+        // simply output the original tuple to the writer
+        TupleUtils.copyTuple(tb, tuple, recordDesc.getFieldCount());
+        FrameUtils.appendToWriter(writer, appender, tb.getFieldEndOffsets(), 
tb.getByteArray(), 0, tb.getSize());
+    }
+
+    private boolean isSecondaryKeyMissing(ITupleReference tuple) {
+        for (int i = numTagFields; i < numTagFields + numSecondaryKeys; i++) {
+            if (TypeTagUtil.isType(tuple, i, 
ATypeTag.SERIALIZED_MISSING_TYPE_TAG)
+                    || TypeTagUtil.isType(tuple, i, 
ATypeTag.SERIALIZED_NULL_TYPE_TAG)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    private boolean equalPrimaryKeys(ITupleReference tuple1, ITupleReference 
tuple2) {
+        for (int i = numTagFields + numSecondaryKeys; i < numTagFields + 
numPrimaryKeys + numSecondaryKeys; i++) {
+            if (!equalField(tuple1, tuple2, i)) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    private boolean equalSecondaryKeys(ITupleReference tuple1, ITupleReference 
tuple2) {
+        for (int i = numTagFields; i < numTagFields + numSecondaryKeys; i++) {
+            if (!equalField(tuple1, tuple2, i)) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    private boolean equalField(ITupleReference tuple1, ITupleReference tuple2, 
int fIdx) {
+        return 
LSMSecondaryUpsertOperatorNodePushable.equals(tuple1.getFieldData(fIdx), 
tuple1.getFieldStart(fIdx),
+                tuple1.getFieldLength(fIdx), tuple2.getFieldData(fIdx), 
tuple2.getFieldStart(fIdx),
+                tuple2.getFieldLength(fIdx));
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a7fa05bb/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexCreationTupleProcessorOperatorDescriptor.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexCreationTupleProcessorOperatorDescriptor.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexCreationTupleProcessorOperatorDescriptor.java
new file mode 100644
index 0000000..68e7603
--- /dev/null
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexCreationTupleProcessorOperatorDescriptor.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.runtime.operators;
+
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
+import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory;
+import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
+import 
org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+
+public class LSMSecondaryIndexCreationTupleProcessorOperatorDescriptor
+        extends AbstractSingleActivityOperatorDescriptor {
+
+    private static final long serialVersionUID = 1L;
+
+    private final IMissingWriterFactory missingWriterFactory;
+
+    private final int numTagFields;
+    private final int numSecondaryKeys;
+    private final int numPrimaryKeys;
+
+    private final boolean hasBuddyBTree;
+
+    public 
LSMSecondaryIndexCreationTupleProcessorOperatorDescriptor(IOperatorDescriptorRegistry
 spec,
+            RecordDescriptor outRecDesc, IMissingWriterFactory 
missingWriterFactory, int numTagFields,
+            int numSecondaryKeys, int numPrimaryKeys, boolean hasBuddyBTree) {
+        super(spec, 1, 1);
+        this.outRecDescs[0] = outRecDesc;
+        this.missingWriterFactory = missingWriterFactory;
+        this.numTagFields = numTagFields;
+        this.numSecondaryKeys = numSecondaryKeys;
+        this.numPrimaryKeys = numPrimaryKeys;
+        this.hasBuddyBTree = hasBuddyBTree;
+    }
+
+    @Override
+    public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+            IRecordDescriptorProvider recordDescProvider, int partition, int 
nPartitions) throws HyracksDataException {
+        return new LSMSecondaryIndexCreationTupleProcessorNodePushable(ctx, 
partition,
+                recordDescProvider.getInputRecordDescriptor(getActivityId(), 
0), missingWriterFactory, numTagFields,
+                numSecondaryKeys, numPrimaryKeys, hasBuddyBTree);
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a7fa05bb/asterixdb/asterix-runtime/src/test/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexCreationTupleProcessorTest.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-runtime/src/test/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexCreationTupleProcessorTest.java
 
b/asterixdb/asterix-runtime/src/test/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexCreationTupleProcessorTest.java
new file mode 100644
index 0000000..1db124a
--- /dev/null
+++ 
b/asterixdb/asterix-runtime/src/test/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexCreationTupleProcessorTest.java
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.runtime.operators;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.asterix.dataflow.data.nontagged.MissingWriterFactory;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.TypeTagUtil;
+import 
org.apache.asterix.runtime.operators.LSMSecondaryIndexCreationTupleProcessorNodePushable.DeletedTupleCounter;
+import org.apache.hyracks.api.comm.IFrame;
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.comm.VSizeFrame;
+import org.apache.hyracks.api.context.IHyracksJobletContext;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.state.IStateObject;
+import org.apache.hyracks.api.dataflow.value.IMissingWriter;
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.primitive.BooleanPointable;
+import org.apache.hyracks.data.std.primitive.IntegerPointable;
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleReference;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import 
org.apache.hyracks.dataflow.common.data.marshalling.BooleanSerializerDeserializer;
+import 
org.apache.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import org.apache.hyracks.dataflow.common.utils.TupleUtils;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+public class LSMSecondaryIndexCreationTupleProcessorTest {
+
+    private static final int FRAME_SIZE = 512;
+
+    private final ISerializerDeserializer[] fields =
+            { IntegerSerializerDeserializer.INSTANCE, 
BooleanSerializerDeserializer.INSTANCE,
+                    IntegerSerializerDeserializer.INSTANCE, 
IntegerSerializerDeserializer.INSTANCE };
+
+    private final RecordDescriptor recDesc = new RecordDescriptor(fields);
+
+    private final int numTagFields = 2;
+
+    private final int numSecondaryKeys = 1;
+
+    private final int numPrimaryKeys = 1;
+
+    private class ResultFrameWriter implements IFrameWriter {
+        FrameTupleAccessor resultAccessor = new FrameTupleAccessor(recDesc);
+
+        FrameTupleReference tuple = new FrameTupleReference();
+        final List<ITupleReference> resultTuples;
+
+        public ResultFrameWriter(List<ITupleReference> resultTuples) {
+            this.resultTuples = resultTuples;
+        }
+
+        @Override
+        public void open() throws HyracksDataException {
+        }
+
+        @Override
+        public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+            resultAccessor.reset(buffer);
+            int count = resultAccessor.getTupleCount();
+            for (int i = 0; i < count; i++) {
+                tuple.reset(resultAccessor, i);
+                resultTuples.add(TupleUtils.copyTuple(tuple));
+            }
+        }
+
+        @Override
+        public void fail() throws HyracksDataException {
+
+        }
+
+        @Override
+        public void close() throws HyracksDataException {
+        }
+    }
+
+    @Test
+    public void testBasic() {
+        List<Object[]> inputs = new ArrayList<>();
+        inputs.add(new Object[] { 0, false, 1, 1 });
+        inputs.add(new Object[] { 1, false, 1, 2 });
+        inputs.add(new Object[] { 2, false, 2, 3 });
+
+        List<Object[]> outputs = new ArrayList<>();
+        outputs.add(new Object[] { 0, false, 1, 1 });
+        outputs.add(new Object[] { 1, false, 1, 2 });
+        outputs.add(new Object[] { 2, false, 2, 3 });
+
+        List<Object[]> buddyOutputs = new ArrayList<>();
+        buddyOutputs.add(new Object[] { 0, false, 1, 1 });
+        buddyOutputs.add(new Object[] { 1, false, 1, 2 });
+        buddyOutputs.add(new Object[] { 2, false, 2, 3 });
+        try {
+            runTest(inputs, outputs, new int[] { 0, 0, 0 }, false);
+            runTest(inputs, buddyOutputs, new int[] { 0, 0, 0 }, true);
+        } catch (HyracksDataException e) {
+            e.printStackTrace();
+            Assert.fail(e.getMessage());
+        }
+    }
+
+    @Test
+    public void testUpsertWithSameSecondary() {
+        List<Object[]> inputs = new ArrayList<>();
+        inputs.add(new Object[] { 1, false, 1, 1 });
+        inputs.add(new Object[] { 0, false, 1, 1 });
+
+        List<Object[]> outputs = new ArrayList<>();
+        outputs.add(new Object[] { 1, false, 1, 1 });
+        outputs.add(new Object[] { 0, false, 1, 1 });
+
+        List<Object[]> buddyOutputs = new ArrayList<>();
+        buddyOutputs.add(new Object[] { 1, false, 1, 1 });
+        buddyOutputs.add(new Object[] { 0, true, null, 1 });
+        buddyOutputs.add(new Object[] { 0, false, 1, 1 });
+
+        try {
+            runTest(inputs, outputs, new int[] { 0, 0 }, false);
+            runTest(inputs, buddyOutputs, new int[] { 1, 0 }, true);
+        } catch (HyracksDataException e) {
+            e.printStackTrace();
+            Assert.fail(e.getMessage());
+        }
+    }
+
+    @Test
+    public void testUpsertWithDifferentSecondary() {
+        List<Object[]> inputs = new ArrayList<>();
+        inputs.add(new Object[] { 1, false, 2, 1 });
+        inputs.add(new Object[] { 0, false, 1, 1 });
+
+        List<Object[]> outputs = new ArrayList<>();
+        outputs.add(new Object[] { 1, false, 2, 1 });
+        outputs.add(new Object[] { 0, true, 2, 1 });
+        outputs.add(new Object[] { 0, false, 1, 1 });
+
+        List<Object[]> buddyOutputs = new ArrayList<>();
+        buddyOutputs.add(new Object[] { 1, false, 2, 1 });
+        buddyOutputs.add(new Object[] { 0, true, null, 1 });
+        buddyOutputs.add(new Object[] { 0, false, 1, 1 });
+        try {
+            runTest(inputs, outputs, new int[] { 1, 0 }, false);
+            runTest(inputs, buddyOutputs, new int[] { 1, 0 }, true);
+        } catch (HyracksDataException e) {
+            e.printStackTrace();
+            Assert.fail(e.getMessage());
+        }
+    }
+
+    @Test
+    public void testDelete() {
+        List<Object[]> inputs = new ArrayList<>();
+        inputs.add(new Object[] { 2, false, 1, 1 });
+        inputs.add(new Object[] { 1, true, 1, 1 });
+        inputs.add(new Object[] { 0, false, 2, 1 });
+
+        List<Object[]> outputs = new ArrayList<>();
+        outputs.add(new Object[] { 2, false, 1, 1 });
+        outputs.add(new Object[] { 1, true, 1, 1 });
+        outputs.add(new Object[] { 0, false, 2, 1 });
+
+        List<Object[]> buddyOutputs = new ArrayList<>();
+        buddyOutputs.add(new Object[] { 2, false, 1, 1 });
+        buddyOutputs.add(new Object[] { 1, true, null, 1 });
+        buddyOutputs.add(new Object[] { 0, false, 2, 1 });
+
+        try {
+            runTest(inputs, outputs, new int[] { 0, 1, 0 }, false);
+            runTest(inputs, buddyOutputs, new int[] { 0, 1, 0 }, true);
+
+        } catch (HyracksDataException e) {
+            e.printStackTrace();
+            Assert.fail(e.getMessage());
+        }
+    }
+
+    @Test
+    public void testDeleteNullSecondary() {
+        List<Object[]> inputs = new ArrayList<>();
+        inputs.add(new Object[] { 1, false, null, 1 });
+        inputs.add(new Object[] { 0, true, null, 1 });
+
+        List<Object[]> outputs = new ArrayList<>();
+
+        List<Object[]> buddyOutputs = new ArrayList<>();
+
+        try {
+            runTest(inputs, outputs, new int[] { 0, 0 }, false);
+            runTest(inputs, buddyOutputs, new int[] { 0, 0 }, true);
+        } catch (HyracksDataException e) {
+            e.printStackTrace();
+            Assert.fail(e.getMessage());
+        }
+    }
+
+    @Test
+    public void testNullSecondary() {
+        List<Object[]> inputs = new ArrayList<>();
+        inputs.add(new Object[] { 2, false, null, 1 });
+        inputs.add(new Object[] { 1, false, 2, 1 });
+        inputs.add(new Object[] { 0, false, null, 1 });
+
+        List<Object[]> outputs = new ArrayList<>();
+        outputs.add(new Object[] { 1, false, 2, 1 });
+        outputs.add(new Object[] { 0, true, 2, 1 });
+
+        List<Object[]> buddyOutputs = new ArrayList<>();
+        buddyOutputs.add(new Object[] { 1, false, 2, 1 });
+        buddyOutputs.add(new Object[] { 0, true, null, 1 });
+        try {
+            runTest(inputs, outputs, new int[] { 1, 0, 0 }, false);
+            runTest(inputs, buddyOutputs, new int[] { 1, 0, 0 }, true);
+        } catch (HyracksDataException e) {
+            e.printStackTrace();
+            Assert.fail(e.getMessage());
+        }
+    }
+
+    private void runTest(List<Object[]> inputTuples, List<Object[]> 
expectedTuples, int[] numDeletedTuples,
+            boolean hasBuddyBTree) throws HyracksDataException {
+        List<Object> stateObjects = new ArrayList<>();
+        IHyracksJobletContext jobletContext = 
Mockito.mock(IHyracksJobletContext.class);
+        IHyracksTaskContext ctx = Mockito.mock(IHyracksTaskContext.class);
+        Mockito.when(ctx.getJobletContext()).thenReturn(jobletContext);
+        Mockito.when(ctx.getInitialFrameSize()).thenReturn(FRAME_SIZE);
+        Mockito.when(ctx.allocateFrame(FRAME_SIZE)).thenAnswer(new 
Answer<ByteBuffer>() {
+            @Override
+            public ByteBuffer answer(InvocationOnMock invocation) throws 
Throwable {
+                return ByteBuffer.allocate(FRAME_SIZE);
+            }
+        });
+
+        Mockito.doAnswer(new Answer<Void>() {
+            @Override
+            public Void answer(InvocationOnMock invocation) throws Throwable {
+                stateObjects.add(invocation.getArguments()[0]);
+                return null;
+            }
+        }).when(ctx).setStateObject(Mockito.any(IStateObject.class));
+
+        List<ITupleReference> resultTuples = new ArrayList<>();
+        IFrameWriter resultWriter = new ResultFrameWriter(resultTuples);
+
+        LSMSecondaryIndexCreationTupleProcessorNodePushable op =
+                new LSMSecondaryIndexCreationTupleProcessorNodePushable(ctx, 
0, recDesc, MissingWriterFactory.INSTANCE,
+                        numTagFields, numSecondaryKeys, numPrimaryKeys, 
hasBuddyBTree);
+        op.setOutputFrameWriter(0, resultWriter, recDesc);
+
+        op.open();
+
+        IFrame frame = new VSizeFrame(ctx);
+        FrameTupleAppender appender = new FrameTupleAppender(frame, true);
+        // generate input tuples
+        for (Object[] inputTuple : inputTuples) {
+            ITupleReference tuple = buildTuple(inputTuple);
+            appender.append(tuple);
+        }
+        appender.write(op, true);
+        op.close();
+
+        // check results
+        Assert.assertEquals("Check the number of returned tuples", 
expectedTuples.size(), resultTuples.size());
+
+        for (int i = 0; i < expectedTuples.size(); i++) {
+            Object[] expectedTuple = expectedTuples.get(i);
+            ITupleReference resultTuple = resultTuples.get(i);
+
+            // check component pos
+            Assert.assertEquals("Check component position", expectedTuple[0],
+                    IntegerPointable.getInteger(resultTuple.getFieldData(0), 
resultTuple.getFieldStart(0)));
+
+            // check anti-matter flag
+            Assert.assertEquals("Check anti-matter", expectedTuple[1],
+                    BooleanPointable.getBoolean(resultTuple.getFieldData(1), 
resultTuple.getFieldStart(1)));
+
+            if (expectedTuple[2] == null) {
+                Assert.assertTrue("Check secondary key is empty",
+                        TypeTagUtil.isType(resultTuple, 2, 
ATypeTag.SERIALIZED_MISSING_TYPE_TAG));
+            } else {
+                Assert.assertEquals("Check secondary key", expectedTuple[2],
+                        
IntegerPointable.getInteger(resultTuple.getFieldData(2), 
resultTuple.getFieldStart(2)));
+            }
+
+            Assert.assertEquals("Check primary key", expectedTuple[3],
+                    IntegerPointable.getInteger(resultTuple.getFieldData(3), 
resultTuple.getFieldStart(3)));
+        }
+
+        //check num deleted tuples
+        Assert.assertEquals("Check state object", 1, stateObjects.size());
+        DeletedTupleCounter counter = (DeletedTupleCounter) 
stateObjects.get(0);
+        for (int i = 0; i < numDeletedTuples.length; i++) {
+            Assert.assertEquals("Check num of deleted tuples", 
numDeletedTuples[i], counter.get(i));
+        }
+    }
+
+    private ITupleReference buildTuple(Object[] values) throws 
HyracksDataException {
+        ArrayTupleBuilder builder = new ArrayTupleBuilder(numTagFields + 
numSecondaryKeys + numPrimaryKeys);
+        builder.addField(IntegerSerializerDeserializer.INSTANCE, (int) 
values[0]);
+        builder.addField(BooleanSerializerDeserializer.INSTANCE, (boolean) 
values[1]);
+        if (values[2] == null) {
+            IMissingWriter writer = 
MissingWriterFactory.INSTANCE.createMissingWriter();
+            writer.writeMissing(builder.getDataOutput());
+            builder.addFieldEndOffset();
+        } else {
+            builder.addField(IntegerSerializerDeserializer.INSTANCE, (int) 
values[2]);
+        }
+
+        builder.addField(IntegerSerializerDeserializer.INSTANCE, (int) 
values[3]);
+        ArrayTupleReference tuple = new ArrayTupleReference();
+        tuple.reset(builder.getFieldEndOffsets(), builder.getByteArray());
+        return tuple;
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a7fa05bb/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/pom.xml
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/pom.xml 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/pom.xml
index 2b92a20..3ea29bc 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/pom.xml
@@ -63,6 +63,11 @@
     </dependency>
     <dependency>
       <groupId>org.apache.hyracks</groupId>
+      <artifactId>hyracks-dataflow-std</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hyracks</groupId>
       <artifactId>hyracks-storage-am-common</artifactId>
       <version>${project.version}</version>
     </dependency>

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a7fa05bb/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeDiskComponentScanOperatorDescriptor.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeDiskComponentScanOperatorDescriptor.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeDiskComponentScanOperatorDescriptor.java
new file mode 100644
index 0000000..291363d
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeDiskComponentScanOperatorDescriptor.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.storage.am.lsm.btree.dataflow;
+
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
+import 
org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import 
org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
+import 
org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+
+public class LSMBTreeDiskComponentScanOperatorDescriptor extends 
AbstractSingleActivityOperatorDescriptor {
+
+    private static final long serialVersionUID = 1L;
+
+    protected final IIndexDataflowHelperFactory indexHelperFactory;
+    protected final ISearchOperationCallbackFactory searchCallbackFactory;
+
+    public 
LSMBTreeDiskComponentScanOperatorDescriptor(IOperatorDescriptorRegistry spec, 
RecordDescriptor outRecDesc,
+            IIndexDataflowHelperFactory indexHelperFactory, 
ISearchOperationCallbackFactory searchCallbackFactory) {
+        super(spec, 1, 1);
+        this.indexHelperFactory = indexHelperFactory;
+        this.searchCallbackFactory = searchCallbackFactory;
+        this.outRecDescs[0] = outRecDesc;
+    }
+
+    @Override
+    public LSMBTreeDiskComponentScanOperatorNodePushable 
createPushRuntime(final IHyracksTaskContext ctx,
+            IRecordDescriptorProvider recordDescProvider, int partition, int 
nPartitions) throws HyracksDataException {
+        return new LSMBTreeDiskComponentScanOperatorNodePushable(ctx, 
partition,
+                recordDescProvider.getInputRecordDescriptor(getActivityId(), 
0), indexHelperFactory,
+                searchCallbackFactory);
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a7fa05bb/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeDiskComponentScanOperatorNodePushable.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeDiskComponentScanOperatorNodePushable.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeDiskComponentScanOperatorNodePushable.java
new file mode 100644
index 0000000..1f71876
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeDiskComponentScanOperatorNodePushable.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.dataflow;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import 
org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
+import org.apache.hyracks.storage.am.common.api.ITreeIndex;
+import 
org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+import 
org.apache.hyracks.storage.am.common.dataflow.IndexSearchOperatorNodePushable;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
+import org.apache.hyracks.storage.common.ISearchPredicate;
+
+public class LSMBTreeDiskComponentScanOperatorNodePushable extends 
IndexSearchOperatorNodePushable {
+
+    public LSMBTreeDiskComponentScanOperatorNodePushable(IHyracksTaskContext 
ctx, int partition,
+            RecordDescriptor inputRecDesc, IIndexDataflowHelperFactory 
indexHelperFactory,
+            ISearchOperationCallbackFactory searchCallbackFactory) throws 
HyracksDataException {
+        super(ctx, inputRecDesc, partition, null, null, indexHelperFactory, 
false, false, null, searchCallbackFactory,
+                false);
+    }
+
+    @Override
+    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        try {
+            ((ILSMIndexAccessor) indexAccessor).scanDiskComponents(cursor);
+            writeSearchResults(0);
+        } catch (Exception e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    @Override
+    protected ISearchPredicate createSearchPredicate() {
+        // do nothing
+        // no need to create search predicate for disk component scan operation
+        return null;
+    }
+
+    @Override
+    protected void resetSearchPredicate(int tupleIndex) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    protected int getFieldCount() {
+        return ((ITreeIndex) index).getFieldCount() + 2;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a7fa05bb/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java
index 1c99b17..f6f4828 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java
@@ -675,7 +675,8 @@ public class ExternalBTreeWithBuddy extends 
AbstractLSMIndex implements ITreeInd
         }
     }
 
-    protected ILSMDiskComponent createBulkLoadTarget() throws 
HyracksDataException {
+    @Override
+    public ILSMDiskComponent createBulkLoadTarget() throws 
HyracksDataException {
         LSMComponentFileReferences componentFileRefs = 
fileManager.getRelFlushFileReference();
         return createDiskComponent(bulkComponentFactory, 
componentFileRefs.getInsertIndexFileReference(),
                 componentFileRefs.getDeleteIndexFileReference(), 
componentFileRefs.getBloomFilterFileReference(), true);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a7fa05bb/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
index e830b3e..ffdfe2a 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
@@ -455,7 +455,8 @@ public class LSMBTree extends AbstractLSMIndex implements 
ITreeIndex {
         return new LSMBTreeBulkLoader(this, fillLevel, verifyInput, 
numElementsHint);
     }
 
-    protected ILSMDiskComponent createBulkLoadTarget() throws 
HyracksDataException {
+    @Override
+    public ILSMDiskComponent createBulkLoadTarget() throws 
HyracksDataException {
         LSMComponentFileReferences componentFileRefs = 
fileManager.getRelFlushFileReference();
         return createDiskComponent(bulkLoadComponentFactory, 
componentFileRefs.getInsertIndexFileReference(),
                 componentFileRefs.getBloomFilterFileReference(), true);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a7fa05bb/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponentScanCursor.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponentScanCursor.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponentScanCursor.java
index 3c9f709..ab9b5af 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponentScanCursor.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponentScanCursor.java
@@ -19,7 +19,6 @@
 
 package org.apache.hyracks.storage.am.lsm.btree.impls;
 
-import org.apache.hyracks.api.exceptions.ErrorCode;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.data.std.api.IValueReference;
 import org.apache.hyracks.data.std.primitive.BooleanPointable;
@@ -107,13 +106,17 @@ public class LSMBTreeDiskComponentScanCursor extends 
LSMIndexSearchCursor {
             super.next();
             LSMBTreeTupleReference diskTuple = (LSMBTreeTupleReference) 
super.getTuple();
             if (diskTuple.isAntimatter()) {
-                setAntiMatterTuple(diskTuple, outputElement.getCursorIndex());
+                if (setAntiMatterTuple(diskTuple, 
outputElement.getCursorIndex())) {
+                    foundNext = true;
+                    return true;
+                }
             } else {
                 //matter tuple
                 setMatterTuple(diskTuple, outputElement.getCursorIndex());
+                foundNext = true;
+                return true;
             }
-            foundNext = true;
-            return true;
+
         }
 
         return false;
@@ -139,21 +142,23 @@ public class LSMBTreeDiskComponentScanCursor extends 
LSMIndexSearchCursor {
             }
             originalTuple = new PermutingTupleReference(permutation);
         }
-
         //build the matter tuple
         buildTuple(tupleBuilder, diskTuple, cursorIndex, MATTER_TUPLE_FLAG);
         outputTuple.reset(tupleBuilder.getFieldEndOffsets(), 
tupleBuilder.getByteArray());
         originalTuple.reset(outputTuple);
     }
 
-    private void setAntiMatterTuple(ITupleReference diskTuple, int 
cursorIndex) throws HyracksDataException {
+    private boolean setAntiMatterTuple(ITupleReference diskTuple, int 
cursorIndex) throws HyracksDataException {
         if (originalTuple == null || cmp.compare(diskTuple, originalTuple) != 
0) {
-            //This shouldn't happen, because we shouldn't place an anti-matter 
tuple
-            //into the primary index if that tuple is not there
-            throw 
HyracksDataException.create(ErrorCode.CANNOT_FIND_MATTER_TUPLE_FOR_ANTI_MATTER_TUPLE);
+            // This could happen sometimes...
+            // Consider insert tuple A into the memory component, and then 
delete it immediately.
+            // We would have -A in the memory component, but there is no tuple 
A in the previous disk components.
+            // But in this case, we can simply ignore it for the scan purpose
+            return false;
         }
         buildTuple(antiMatterTupleBuilder, originalTuple, cursorIndex, 
ANTIMATTER_TUPLE_FLAG);
         outputTuple.reset(antiMatterTupleBuilder.getFieldEndOffsets(), 
antiMatterTupleBuilder.getByteArray());
+        return true;
     }
 
     private void buildTuple(ArrayTupleBuilder builder, ITupleReference 
diskTuple, int cursorIndex,

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a7fa05bb/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java
index 9e1f30d..ae2c93d 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java
@@ -159,4 +159,11 @@ public interface ILSMIndex extends IIndex {
             boolean verifyInput, long numElementsHint, boolean 
checkIfEmptyIndex, boolean withFilter,
             boolean cleanupEmptyComponent) throws HyracksDataException;
 
+    /**
+     * Creates a disk component for the bulk load operation
+     *
+     * @return
+     * @throws HyracksDataException
+     */
+    ILSMDiskComponent createBulkLoadTarget() throws HyracksDataException;
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a7fa05bb/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
index d5edbb5..30266fb 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
@@ -549,12 +549,13 @@ public class LSMInvertedIndex extends AbstractLSMIndex 
implements IInvertedIndex
             componentBulkLoader.abort();
         }
 
-        private ILSMDiskComponent createBulkLoadTarget() throws 
HyracksDataException {
-            LSMComponentFileReferences componentFileRefs = 
fileManager.getRelFlushFileReference();
-            return createDiskInvIndexComponent(componentFactory, 
componentFileRefs.getInsertIndexFileReference(),
-                    componentFileRefs.getDeleteIndexFileReference(), 
componentFileRefs.getBloomFilterFileReference(),
-                    true);
-        }
+    }
+
+    @Override
+    public ILSMDiskComponent createBulkLoadTarget() throws 
HyracksDataException {
+        LSMComponentFileReferences componentFileRefs = 
fileManager.getRelFlushFileReference();
+        return createDiskInvIndexComponent(componentFactory, 
componentFileRefs.getInsertIndexFileReference(),
+                componentFileRefs.getDeleteIndexFileReference(), 
componentFileRefs.getBloomFilterFileReference(), true);
     }
 
     protected InMemoryInvertedIndex 
createInMemoryInvertedIndex(IVirtualBufferCache virtualBufferCache,

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a7fa05bb/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
index 56af16d..f295f5b 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
@@ -338,7 +338,8 @@ public class LSMRTree extends AbstractLSMRTree {
                 buddyBTreeFields);
     }
 
-    protected ILSMDiskComponent createBulkLoadTarget() throws 
HyracksDataException {
+    @Override
+    public ILSMDiskComponent createBulkLoadTarget() throws 
HyracksDataException {
         LSMComponentFileReferences componentFileRefs = 
fileManager.getRelFlushFileReference();
         return createDiskComponent(componentFactory, 
componentFileRefs.getInsertIndexFileReference(),
                 componentFileRefs.getDeleteIndexFileReference(), 
componentFileRefs.getBloomFilterFileReference(), true);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a7fa05bb/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
index ae72884..19f0ca0 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
@@ -311,12 +311,13 @@ public class LSMRTreeWithAntiMatterTuples extends 
AbstractLSMRTree {
             }
         }
 
-        private ILSMDiskComponent createBulkLoadTarget() throws 
HyracksDataException {
-            LSMComponentFileReferences relFlushFileRefs = 
fileManager.getRelFlushFileReference();
-            return createDiskComponent(bulkLoaComponentFactory, 
relFlushFileRefs.getInsertIndexFileReference(), null,
-                    null, true);
-        }
+    }
 
+    @Override
+    public ILSMDiskComponent createBulkLoadTarget() throws 
HyracksDataException {
+        LSMComponentFileReferences relFlushFileRefs = 
fileManager.getRelFlushFileReference();
+        return createDiskComponent(bulkLoaComponentFactory, 
relFlushFileRefs.getInsertIndexFileReference(), null, null,
+                true);
     }
 
     @Override

Reply via email to