http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java
new file mode 100644
index 0000000..f7e569c
--- /dev/null
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java
@@ -0,0 +1,543 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.metadata.utils;
+
+import java.io.DataOutput;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.asterix.common.config.DatasetConfig.DatasetType;
+import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp;
+import org.apache.asterix.common.config.IPropertiesProvider;
+import org.apache.asterix.common.context.ITransactionSubsystemProvider;
+import org.apache.asterix.common.context.TransactionSubsystemProvider;
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.transactions.IRecoveryManager.ResourceType;
+import org.apache.asterix.common.transactions.JobId;
+import org.apache.asterix.external.indexing.ExternalFile;
+import org.apache.asterix.external.indexing.IndexingConstants;
+import 
org.apache.asterix.external.operators.ExternalIndexBulkModifyOperatorDescriptor;
+import org.apache.asterix.external.operators.ExternalScanOperatorDescriptor;
+import org.apache.asterix.formats.nontagged.BinaryBooleanInspector;
+import org.apache.asterix.formats.nontagged.BinaryComparatorFactoryProvider;
+import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
+import org.apache.asterix.formats.nontagged.TypeTraitProvider;
+import org.apache.asterix.metadata.MetadataManager;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.metadata.entities.Index;
+import org.apache.asterix.metadata.entities.InternalDatasetDetails;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.runtime.evaluators.functions.AndDescriptor;
+import org.apache.asterix.runtime.evaluators.functions.CastTypeDescriptor;
+import org.apache.asterix.runtime.evaluators.functions.IsUnknownDescriptor;
+import org.apache.asterix.runtime.evaluators.functions.NotDescriptor;
+import org.apache.asterix.runtime.job.listener.JobEventListenerFactory;
+import org.apache.asterix.runtime.utils.AppContextInfo;
+import org.apache.asterix.runtime.utils.RuntimeComponentsProvider;
+import 
org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexInstantSearchOperationCallbackFactory;
+import 
org.apache.asterix.transaction.management.service.transaction.JobIdFactory;
+import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import 
org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
+import org.apache.hyracks.algebricks.data.ISerializerDeserializerProvider;
+import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import 
org.apache.hyracks.algebricks.runtime.evaluators.ColumnAccessEvalFactory;
+import 
org.apache.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
+import 
org.apache.hyracks.algebricks.runtime.operators.std.AssignRuntimeFactory;
+import 
org.apache.hyracks.algebricks.runtime.operators.std.StreamSelectRuntimeFactory;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.api.dataflow.value.ITypeTraits;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.IJobletEventListenerFactory;
+import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import 
org.apache.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import org.apache.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
+import 
org.apache.hyracks.dataflow.std.misc.ConstantTupleSourceOperatorDescriptor;
+import org.apache.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
+import 
org.apache.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
+import 
org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
+import 
org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+import 
org.apache.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
+import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
+
+@SuppressWarnings("rawtypes")
+// TODO: We should eventually have a hierarchy of classes that can create all
+// possible index job specs,
+// not just for creation.
+public abstract class SecondaryIndexOperationsHelper {
+    protected final PhysicalOptimizationConfig physOptConf;
+    protected final MetadataProvider metadataProvider;
+    protected final Dataset dataset;
+    protected final Index index;
+    protected final ARecordType itemType;
+    protected final ARecordType metaType;
+    protected final ARecordType enforcedItemType;
+    protected final ARecordType enforcedMetaType;
+    protected ISerializerDeserializer metaSerde;
+    protected ISerializerDeserializer payloadSerde;
+    protected IFileSplitProvider primaryFileSplitProvider;
+    protected AlgebricksPartitionConstraint primaryPartitionConstraint;
+    protected IFileSplitProvider secondaryFileSplitProvider;
+    protected AlgebricksPartitionConstraint secondaryPartitionConstraint;
+    protected boolean anySecondaryKeyIsNullable = false;
+    protected long numElementsHint;
+    protected IBinaryComparatorFactory[] primaryComparatorFactories;
+    protected int[] primaryBloomFilterKeyFields;
+    protected RecordDescriptor primaryRecDesc;
+    protected IBinaryComparatorFactory[] secondaryComparatorFactories;
+    protected ITypeTraits[] secondaryTypeTraits;
+    protected int[] secondaryBloomFilterKeyFields;
+    protected RecordDescriptor secondaryRecDesc;
+    protected IScalarEvaluatorFactory[] secondaryFieldAccessEvalFactories;
+    protected IPropertiesProvider propertiesProvider;
+    protected ILSMMergePolicyFactory mergePolicyFactory;
+    protected Map<String, String> mergePolicyFactoryProperties;
+    protected RecordDescriptor enforcedRecDesc;
+    protected int numFilterFields;
+    protected List<String> filterFieldName;
+    protected ITypeTraits[] filterTypeTraits;
+    protected IBinaryComparatorFactory[] filterCmpFactories;
+    protected int[] secondaryFilterFields;
+    protected int[] primaryFilterFields;
+    protected int[] primaryBTreeFields;
+    protected int[] secondaryBTreeFields;
+    protected List<ExternalFile> externalFiles;
+    protected int numPrimaryKeys;
+
+    // Prevent public construction. Should be created via createIndexCreator().
+    protected SecondaryIndexOperationsHelper(Dataset dataset, Index index, 
PhysicalOptimizationConfig physOptConf,
+            IPropertiesProvider propertiesProvider, MetadataProvider 
metadataProvider, ARecordType recType,
+            ARecordType metaType, ARecordType enforcedType, ARecordType 
enforcedMetaType) {
+        this.dataset = dataset;
+        this.index = index;
+        this.physOptConf = physOptConf;
+        this.propertiesProvider = propertiesProvider;
+        this.metadataProvider = metadataProvider;
+        this.itemType = recType;
+        this.metaType = metaType;
+        this.enforcedItemType = enforcedType;
+        this.enforcedMetaType = enforcedMetaType;
+    }
+
+    public static SecondaryIndexOperationsHelper 
createIndexOperationsHelper(Dataset dataset, Index index,
+            MetadataProvider metadataProvider, PhysicalOptimizationConfig 
physOptConf, ARecordType recType,
+            ARecordType metaType, ARecordType enforcedType, ARecordType 
enforcedMetaType) throws AlgebricksException {
+        IPropertiesProvider asterixPropertiesProvider = 
AppContextInfo.INSTANCE;
+        SecondaryIndexOperationsHelper indexOperationsHelper;
+        switch (index.getIndexType()) {
+            case BTREE:
+                indexOperationsHelper =
+                        new SecondaryBTreeOperationsHelper(dataset, index, 
physOptConf, asterixPropertiesProvider,
+                                metadataProvider, recType, metaType, 
enforcedType, enforcedMetaType);
+                break;
+            case RTREE:
+                indexOperationsHelper =
+                        new SecondaryRTreeOperationsHelper(dataset, index, 
physOptConf, asterixPropertiesProvider,
+                                metadataProvider, recType, metaType, 
enforcedType, enforcedMetaType);
+                break;
+            case SINGLE_PARTITION_WORD_INVIX:
+            case SINGLE_PARTITION_NGRAM_INVIX:
+            case LENGTH_PARTITIONED_WORD_INVIX:
+            case LENGTH_PARTITIONED_NGRAM_INVIX:
+                indexOperationsHelper = new 
SecondaryInvertedIndexOperationsHelper(dataset, index, physOptConf,
+                        asterixPropertiesProvider, metadataProvider, recType, 
metaType, enforcedType,
+                        enforcedMetaType);
+                break;
+            default:
+                throw new 
CompilationException(ErrorCode.COMPILATION_UNKNOWN_INDEX_TYPE, 
index.getIndexType());
+        }
+        indexOperationsHelper.init();
+        return indexOperationsHelper;
+    }
+
+    public abstract JobSpecification buildCreationJobSpec() throws 
AlgebricksException;
+
+    public abstract JobSpecification buildLoadingJobSpec() throws 
AlgebricksException;
+
+    public abstract JobSpecification buildCompactJobSpec() throws 
AlgebricksException;
+
+    protected void init() throws AlgebricksException {
+        payloadSerde = 
SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(itemType);
+        metaSerde =
+                metaType == null ? null : 
SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(metaType);
+        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> 
secondarySplitsAndConstraint =
+                
metadataProvider.getSplitProviderAndConstraints(index.getDataverseName(), 
index.getDatasetName(),
+                        index.getIndexName(), 
dataset.getDatasetDetails().isTemp());
+        secondaryFileSplitProvider = secondarySplitsAndConstraint.first;
+        secondaryPartitionConstraint = secondarySplitsAndConstraint.second;
+        numPrimaryKeys = DatasetUtil.getPartitioningKeys(dataset).size();
+        if (dataset.getDatasetType() == DatasetType.INTERNAL) {
+            filterFieldName = DatasetUtil.getFilterField(dataset);
+            if (filterFieldName != null) {
+                numFilterFields = 1;
+            } else {
+                numFilterFields = 0;
+            }
+            Pair<IFileSplitProvider, AlgebricksPartitionConstraint> 
primarySplitsAndConstraint =
+                    
metadataProvider.getSplitProviderAndConstraints(dataset.getDataverseName(),
+                            dataset.getDatasetName(), 
dataset.getDatasetName(), dataset.getDatasetDetails().isTemp());
+            primaryFileSplitProvider = primarySplitsAndConstraint.first;
+            primaryPartitionConstraint = primarySplitsAndConstraint.second;
+            setPrimaryRecDescAndComparators();
+        }
+        setSecondaryRecDescAndComparators();
+        numElementsHint = 
metadataProvider.getCardinalityPerPartitionHint(dataset);
+        Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
+                DatasetUtil.getMergePolicyFactory(dataset, 
metadataProvider.getMetadataTxnContext());
+        mergePolicyFactory = compactionInfo.first;
+        mergePolicyFactoryProperties = compactionInfo.second;
+        if (numFilterFields > 0) {
+            setFilterTypeTraitsAndComparators();
+        }
+    }
+
+    protected void setFilterTypeTraitsAndComparators() throws 
AlgebricksException {
+        filterTypeTraits = new ITypeTraits[numFilterFields];
+        filterCmpFactories = new IBinaryComparatorFactory[numFilterFields];
+        secondaryFilterFields = new int[numFilterFields];
+        primaryFilterFields = new int[numFilterFields];
+        primaryBTreeFields = new int[numPrimaryKeys + 1];
+        secondaryBTreeFields = new int[index.getKeyFieldNames().size() + 
numPrimaryKeys];
+        for (int i = 0; i < primaryBTreeFields.length; i++) {
+            primaryBTreeFields[i] = i;
+        }
+        for (int i = 0; i < secondaryBTreeFields.length; i++) {
+            secondaryBTreeFields[i] = i;
+        }
+
+        IAType type = itemType.getSubFieldType(filterFieldName);
+        filterCmpFactories[0] = 
BinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(type, true);
+        filterTypeTraits[0] = TypeTraitProvider.INSTANCE.getTypeTrait(type);
+        secondaryFilterFields[0] = getNumSecondaryKeys() + numPrimaryKeys;
+        primaryFilterFields[0] = numPrimaryKeys + 1;
+    }
+
+    protected abstract int getNumSecondaryKeys();
+
+    protected void setPrimaryRecDescAndComparators() throws 
AlgebricksException {
+        List<List<String>> partitioningKeys = 
DatasetUtil.getPartitioningKeys(dataset);
+        ISerializerDeserializer[] primaryRecFields =
+                new ISerializerDeserializer[numPrimaryKeys + 1 + 
(dataset.hasMetaPart() ? 1 : 0)];
+        ITypeTraits[] primaryTypeTraits = new ITypeTraits[numPrimaryKeys + 1 + 
(dataset.hasMetaPart() ? 1 : 0)];
+        primaryComparatorFactories = new 
IBinaryComparatorFactory[numPrimaryKeys];
+        primaryBloomFilterKeyFields = new int[numPrimaryKeys];
+        ISerializerDeserializerProvider serdeProvider = 
metadataProvider.getFormat().getSerdeProvider();
+        List<Integer> indicators = null;
+        if (dataset.hasMetaPart()) {
+            indicators = ((InternalDatasetDetails) 
dataset.getDatasetDetails()).getKeySourceIndicator();
+        }
+        for (int i = 0; i < numPrimaryKeys; i++) {
+            IAType keyType =
+                    (indicators == null || indicators.get(i) == 0) ? 
itemType.getSubFieldType(partitioningKeys.get(i))
+                            : 
metaType.getSubFieldType(partitioningKeys.get(i));
+            primaryRecFields[i] = 
serdeProvider.getSerializerDeserializer(keyType);
+            primaryComparatorFactories[i] =
+                    
BinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType, 
true);
+            primaryTypeTraits[i] = 
TypeTraitProvider.INSTANCE.getTypeTrait(keyType);
+            primaryBloomFilterKeyFields[i] = i;
+        }
+        primaryRecFields[numPrimaryKeys] = payloadSerde;
+        primaryTypeTraits[numPrimaryKeys] = 
TypeTraitProvider.INSTANCE.getTypeTrait(itemType);
+        if (dataset.hasMetaPart()) {
+            primaryRecFields[numPrimaryKeys + 1] = payloadSerde;
+            primaryTypeTraits[numPrimaryKeys + 1] = 
TypeTraitProvider.INSTANCE.getTypeTrait(itemType);
+        }
+        primaryRecDesc = new RecordDescriptor(primaryRecFields, 
primaryTypeTraits);
+    }
+
+    protected abstract void setSecondaryRecDescAndComparators() throws 
AlgebricksException;
+
+    protected AbstractOperatorDescriptor 
createDummyKeyProviderOp(JobSpecification spec) throws AlgebricksException {
+        // Build dummy tuple containing one field with a dummy value inside.
+        ArrayTupleBuilder tb = new ArrayTupleBuilder(1);
+        DataOutput dos = tb.getDataOutput();
+        tb.reset();
+        try {
+            // Serialize dummy value into a field.
+            IntegerSerializerDeserializer.INSTANCE.serialize(0, dos);
+        } catch (HyracksDataException e) {
+            throw new AsterixException(e);
+        }
+        // Add dummy field.
+        tb.addFieldEndOffset();
+        ISerializerDeserializer[] keyRecDescSers = { 
IntegerSerializerDeserializer.INSTANCE };
+        RecordDescriptor keyRecDesc = new RecordDescriptor(keyRecDescSers);
+        ConstantTupleSourceOperatorDescriptor keyProviderOp = new 
ConstantTupleSourceOperatorDescriptor(spec,
+                keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(), 
tb.getSize());
+        
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, 
keyProviderOp,
+                primaryPartitionConstraint);
+        return keyProviderOp;
+    }
+
+    protected BTreeSearchOperatorDescriptor 
createPrimaryIndexScanOp(JobSpecification spec)
+            throws AlgebricksException {
+        // -Infinity
+        int[] lowKeyFields = null;
+        // +Infinity
+        int[] highKeyFields = null;
+        ITransactionSubsystemProvider txnSubsystemProvider = 
TransactionSubsystemProvider.INSTANCE;
+        JobId jobId = JobIdFactory.generateJobId();
+        metadataProvider.setJobId(jobId);
+        boolean isWriteTransaction = metadataProvider.isWriteTransaction();
+        IJobletEventListenerFactory jobEventListenerFactory = new 
JobEventListenerFactory(jobId, isWriteTransaction);
+        spec.setJobletEventListenerFactory(jobEventListenerFactory);
+        Index primaryIndex = 
MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(),
+                dataset.getDataverseName(), dataset.getDatasetName(), 
dataset.getDatasetName());
+
+        boolean temp = dataset.getDatasetDetails().isTemp();
+        ISearchOperationCallbackFactory searchCallbackFactory = temp ? 
NoOpOperationCallbackFactory.INSTANCE
+                : new PrimaryIndexInstantSearchOperationCallbackFactory(jobId, 
dataset.getDatasetId(),
+                        primaryBloomFilterKeyFields, txnSubsystemProvider, 
ResourceType.LSM_BTREE);
+        BTreeSearchOperatorDescriptor primarySearchOp = new 
BTreeSearchOperatorDescriptor(spec, primaryRecDesc,
+                RuntimeComponentsProvider.RUNTIME_PROVIDER, 
RuntimeComponentsProvider.RUNTIME_PROVIDER,
+                primaryFileSplitProvider, primaryRecDesc.getTypeTraits(), 
primaryComparatorFactories,
+                primaryBloomFilterKeyFields, lowKeyFields, highKeyFields, 
true, true,
+                dataset.getIndexDataflowHelperFactory(metadataProvider, 
primaryIndex, itemType, metaType,
+                        mergePolicyFactory, mergePolicyFactoryProperties),
+                false, false, null, searchCallbackFactory, null, null,
+                
metadataProvider.getStorageComponentProvider().getMetadataPageManagerFactory());
+
+        
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, 
primarySearchOp,
+                primaryPartitionConstraint);
+        return primarySearchOp;
+    }
+
+    protected AlgebricksMetaOperatorDescriptor createAssignOp(JobSpecification 
spec, int numSecondaryKeyFields,
+            RecordDescriptor secondaryRecDesc) throws AlgebricksException {
+        int[] outColumns = new int[numSecondaryKeyFields + numFilterFields];
+        int[] projectionList = new int[numSecondaryKeyFields + numPrimaryKeys 
+ numFilterFields];
+        for (int i = 0; i < numSecondaryKeyFields + numFilterFields; i++) {
+            outColumns[i] = numPrimaryKeys + i;
+        }
+        int projCount = 0;
+        for (int i = 0; i < numSecondaryKeyFields; i++) {
+            projectionList[projCount++] = numPrimaryKeys + i;
+        }
+        for (int i = 0; i < numPrimaryKeys; i++) {
+            projectionList[projCount++] = i;
+        }
+        if (numFilterFields > 0) {
+            projectionList[projCount] = numPrimaryKeys + numSecondaryKeyFields;
+        }
+
+        IScalarEvaluatorFactory[] sefs = new 
IScalarEvaluatorFactory[secondaryFieldAccessEvalFactories.length];
+        for (int i = 0; i < secondaryFieldAccessEvalFactories.length; ++i) {
+            sefs[i] = secondaryFieldAccessEvalFactories[i];
+        }
+        AssignRuntimeFactory assign = new AssignRuntimeFactory(outColumns, 
sefs, projectionList);
+        AlgebricksMetaOperatorDescriptor asterixAssignOp = new 
AlgebricksMetaOperatorDescriptor(spec, 1, 1,
+                new IPushRuntimeFactory[] { assign }, new RecordDescriptor[] { 
secondaryRecDesc });
+        
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, 
asterixAssignOp,
+                primaryPartitionConstraint);
+        return asterixAssignOp;
+    }
+
+    protected AlgebricksMetaOperatorDescriptor createCastOp(JobSpecification 
spec, DatasetType dsType) {
+        CastTypeDescriptor castFuncDesc = (CastTypeDescriptor) 
CastTypeDescriptor.FACTORY.createFunctionDescriptor();
+        castFuncDesc.setImmutableStates(enforcedItemType, itemType);
+
+        int[] outColumns = new int[1];
+        int[] projectionList = new int[(dataset.hasMetaPart() ? 2 : 1) + 
numPrimaryKeys];
+        int recordIdx;
+        //external datascan operator returns a record as the first field, 
instead of the last in internal case
+        if (dsType == DatasetType.EXTERNAL) {
+            recordIdx = 0;
+            outColumns[0] = 0;
+        } else {
+            recordIdx = numPrimaryKeys;
+            outColumns[0] = numPrimaryKeys;
+        }
+        for (int i = 0; i <= numPrimaryKeys; i++) {
+            projectionList[i] = i;
+        }
+        if (dataset.hasMetaPart()) {
+            projectionList[numPrimaryKeys + 1] = numPrimaryKeys + 1;
+        }
+        IScalarEvaluatorFactory[] castEvalFact =
+                new IScalarEvaluatorFactory[] { new 
ColumnAccessEvalFactory(recordIdx) };
+        IScalarEvaluatorFactory[] sefs = new IScalarEvaluatorFactory[1];
+        sefs[0] = castFuncDesc.createEvaluatorFactory(castEvalFact);
+        AssignRuntimeFactory castAssign = new AssignRuntimeFactory(outColumns, 
sefs, projectionList);
+        return new AlgebricksMetaOperatorDescriptor(spec, 1, 1, new 
IPushRuntimeFactory[] { castAssign },
+                new RecordDescriptor[] { enforcedRecDesc });
+    }
+
+    protected ExternalSortOperatorDescriptor createSortOp(JobSpecification 
spec,
+            IBinaryComparatorFactory[] secondaryComparatorFactories, 
RecordDescriptor secondaryRecDesc) {
+        int[] sortFields = new int[secondaryComparatorFactories.length];
+        for (int i = 0; i < secondaryComparatorFactories.length; i++) {
+            sortFields[i] = i;
+        }
+        ExternalSortOperatorDescriptor sortOp = new 
ExternalSortOperatorDescriptor(spec,
+                physOptConf.getMaxFramesExternalSort(), sortFields, 
secondaryComparatorFactories, secondaryRecDesc);
+        
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, 
sortOp, primaryPartitionConstraint);
+        return sortOp;
+    }
+
+    protected TreeIndexBulkLoadOperatorDescriptor 
createTreeIndexBulkLoadOp(JobSpecification spec,
+            int[] fieldPermutation, IIndexDataflowHelperFactory 
dataflowHelperFactory, float fillFactor)
+            throws AlgebricksException {
+        TreeIndexBulkLoadOperatorDescriptor treeIndexBulkLoadOp = new 
TreeIndexBulkLoadOperatorDescriptor(spec,
+                secondaryRecDesc, RuntimeComponentsProvider.RUNTIME_PROVIDER,
+                RuntimeComponentsProvider.RUNTIME_PROVIDER, 
secondaryFileSplitProvider,
+                secondaryRecDesc.getTypeTraits(), 
secondaryComparatorFactories, secondaryBloomFilterKeyFields,
+                fieldPermutation, fillFactor, false, numElementsHint, false, 
dataflowHelperFactory,
+                
metadataProvider.getStorageComponentProvider().getMetadataPageManagerFactory());
+        
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, 
treeIndexBulkLoadOp,
+                secondaryPartitionConstraint);
+        return treeIndexBulkLoadOp;
+    }
+
+    public AlgebricksMetaOperatorDescriptor 
createFilterNullsSelectOp(JobSpecification spec, int numSecondaryKeyFields,
+            RecordDescriptor secondaryRecDesc) throws AlgebricksException {
+        IScalarEvaluatorFactory[] andArgsEvalFactories = new 
IScalarEvaluatorFactory[numSecondaryKeyFields];
+        NotDescriptor notDesc = new NotDescriptor();
+        IsUnknownDescriptor isUnknownDesc = new IsUnknownDescriptor();
+        for (int i = 0; i < numSecondaryKeyFields; i++) {
+            // Access column i, and apply 'is not null'.
+            ColumnAccessEvalFactory columnAccessEvalFactory = new 
ColumnAccessEvalFactory(i);
+            IScalarEvaluatorFactory isUnknownEvalFactory =
+                    isUnknownDesc.createEvaluatorFactory(new 
IScalarEvaluatorFactory[] { columnAccessEvalFactory });
+            IScalarEvaluatorFactory notEvalFactory =
+                    notDesc.createEvaluatorFactory(new 
IScalarEvaluatorFactory[] { isUnknownEvalFactory });
+            andArgsEvalFactories[i] = notEvalFactory;
+        }
+        IScalarEvaluatorFactory selectCond;
+        if (numSecondaryKeyFields > 1) {
+            // Create conjunctive condition where all secondary index keys must
+            // satisfy 'is not null'.
+            AndDescriptor andDesc = new AndDescriptor();
+            selectCond = andDesc.createEvaluatorFactory(andArgsEvalFactories);
+        } else {
+            selectCond = andArgsEvalFactories[0];
+        }
+        StreamSelectRuntimeFactory select =
+                new StreamSelectRuntimeFactory(selectCond, null, 
BinaryBooleanInspector.FACTORY, false, -1, null);
+        AlgebricksMetaOperatorDescriptor asterixSelectOp = new 
AlgebricksMetaOperatorDescriptor(spec, 1, 1,
+                new IPushRuntimeFactory[] { select }, new RecordDescriptor[] { 
secondaryRecDesc });
+        
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, 
asterixSelectOp,
+                primaryPartitionConstraint);
+        return asterixSelectOp;
+    }
+
+    // This method creates a source indexing operator for external data
+    protected ExternalScanOperatorDescriptor 
createExternalIndexingOp(JobSpecification spec)
+            throws AlgebricksException {
+        // A record + primary keys
+        ISerializerDeserializer[] serdes = new ISerializerDeserializer[1 + 
numPrimaryKeys];
+        ITypeTraits[] typeTraits = new ITypeTraits[1 + numPrimaryKeys];
+        // payload serde and type traits for the record slot
+        serdes[0] = payloadSerde;
+        typeTraits[0] = TypeTraitProvider.INSTANCE.getTypeTrait(itemType);
+        //  serdes and type traits for rid fields
+        for (int i = 1; i < serdes.length; i++) {
+            serdes[i] = IndexingConstants.getSerializerDeserializer(i - 1);
+            typeTraits[i] = IndexingConstants.getTypeTraits(i - 1);
+        }
+        // output record desc
+        RecordDescriptor indexerDesc = new RecordDescriptor(serdes, 
typeTraits);
+
+        // Create the operator and its partition constraits
+        Pair<ExternalScanOperatorDescriptor, AlgebricksPartitionConstraint> 
indexingOpAndConstraints;
+        try {
+            indexingOpAndConstraints = 
ExternalIndexingOperations.createExternalIndexingOp(spec, metadataProvider,
+                    dataset, itemType, indexerDesc, externalFiles);
+        } catch (Exception e) {
+            throw new AlgebricksException(e);
+        }
+        
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, 
indexingOpAndConstraints.first,
+                indexingOpAndConstraints.second);
+
+        // Set the primary partition constraints to this partition constraints
+        primaryPartitionConstraint = indexingOpAndConstraints.second;
+        return indexingOpAndConstraints.first;
+    }
+
+    protected AlgebricksMetaOperatorDescriptor 
createExternalAssignOp(JobSpecification spec, int numSecondaryKeys,
+            RecordDescriptor secondaryRecDesc) throws AlgebricksException {
+        int[] outColumns = new int[numSecondaryKeys];
+        int[] projectionList = new int[numSecondaryKeys + numPrimaryKeys];
+        for (int i = 0; i < numSecondaryKeys; i++) {
+            outColumns[i] = i + numPrimaryKeys + 1;
+            projectionList[i] = i + numPrimaryKeys + 1;
+        }
+
+        IScalarEvaluatorFactory[] sefs = new 
IScalarEvaluatorFactory[secondaryFieldAccessEvalFactories.length];
+        for (int i = 0; i < secondaryFieldAccessEvalFactories.length; ++i) {
+            sefs[i] = secondaryFieldAccessEvalFactories[i];
+        }
+        //add External RIDs to the projection list
+        for (int i = 0; i < numPrimaryKeys; i++) {
+            projectionList[numSecondaryKeys + i] = i + 1;
+        }
+
+        AssignRuntimeFactory assign = new AssignRuntimeFactory(outColumns, 
sefs, projectionList);
+        return new AlgebricksMetaOperatorDescriptor(spec, 1, 1, new 
IPushRuntimeFactory[] { assign },
+                new RecordDescriptor[] { secondaryRecDesc });
+    }
+
+    protected ExternalIndexBulkModifyOperatorDescriptor 
createExternalIndexBulkModifyOp(JobSpecification spec,
+            int[] fieldPermutation, IIndexDataflowHelperFactory 
dataflowHelperFactory, float fillFactor)
+            throws AlgebricksException {
+        // create a list of file ids
+        int numOfDeletedFiles = 0;
+        for (ExternalFile file : externalFiles) {
+            if (file.getPendingOp() == ExternalFilePendingOp.DROP_OP) {
+                numOfDeletedFiles++;
+            }
+        }
+        int[] deletedFiles = new int[numOfDeletedFiles];
+        int i = 0;
+        for (ExternalFile file : externalFiles) {
+            if (file.getPendingOp() == ExternalFilePendingOp.DROP_OP) {
+                deletedFiles[i] = file.getFileNumber();
+            }
+        }
+        ExternalIndexBulkModifyOperatorDescriptor treeIndexBulkLoadOp = new 
ExternalIndexBulkModifyOperatorDescriptor(
+                spec, RuntimeComponentsProvider.RUNTIME_PROVIDER, 
RuntimeComponentsProvider.RUNTIME_PROVIDER,
+                secondaryFileSplitProvider, secondaryTypeTraits, 
secondaryComparatorFactories,
+                secondaryBloomFilterKeyFields, dataflowHelperFactory, 
NoOpOperationCallbackFactory.INSTANCE,
+                deletedFiles, fieldPermutation, fillFactor, numElementsHint,
+                
metadataProvider.getStorageComponentProvider().getMetadataPageManagerFactory());
+        
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, 
treeIndexBulkLoadOp,
+                secondaryPartitionConstraint);
+        return treeIndexBulkLoadOp;
+    }
+
+    public List<ExternalFile> getExternalFiles() {
+        return externalFiles;
+    }
+
+    public void setExternalFiles(List<ExternalFile> externalFiles) {
+        this.externalFiles = externalFiles;
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryInvertedIndexOperationsHelper.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryInvertedIndexOperationsHelper.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryInvertedIndexOperationsHelper.java
new file mode 100644
index 0000000..b86004a
--- /dev/null
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryInvertedIndexOperationsHelper.java
@@ -0,0 +1,379 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.metadata.utils;
+
+import org.apache.asterix.common.config.DatasetConfig.IndexType;
+import org.apache.asterix.common.config.IPropertiesProvider;
+import org.apache.asterix.common.context.IStorageComponentProvider;
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.transactions.IResourceFactory;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.metadata.entities.Index;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.om.utils.NonTaggedFormatUtil;
+import org.apache.asterix.om.utils.RecordUtil;
+import org.apache.asterix.runtime.formats.FormatUtils;
+import org.apache.asterix.runtime.utils.RuntimeUtils;
+import 
org.apache.asterix.transaction.management.resource.LSMInvertedIndexLocalResourceMetadataFactory;
+import 
org.apache.asterix.transaction.management.resource.PersistentLocalResourceFactoryProvider;
+import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import 
org.apache.hyracks.algebricks.core.jobgen.impl.ConnectorPolicyAssignmentPolicy;
+import 
org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
+import org.apache.hyracks.algebricks.data.ISerializerDeserializerProvider;
+import org.apache.hyracks.algebricks.data.ITypeTraitProvider;
+import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.operators.base.SinkRuntimeFactory;
+import 
org.apache.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.api.dataflow.value.ITypeTraits;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
+import org.apache.hyracks.data.std.primitive.ShortPointable;
+import 
org.apache.hyracks.dataflow.common.data.marshalling.ShortSerializerDeserializer;
+import org.apache.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import org.apache.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
+import 
org.apache.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
+import 
org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
+import 
org.apache.hyracks.storage.am.lsm.invertedindex.dataflow.BinaryTokenizerOperatorDescriptor;
+import 
org.apache.hyracks.storage.am.lsm.invertedindex.dataflow.LSMInvertedIndexBulkLoadOperatorDescriptor;
+import 
org.apache.hyracks.storage.am.lsm.invertedindex.dataflow.LSMInvertedIndexCompactOperator;
+import 
org.apache.hyracks.storage.am.lsm.invertedindex.dataflow.LSMInvertedIndexCreateOperatorDescriptor;
+import 
org.apache.hyracks.storage.am.lsm.invertedindex.tokenizers.IBinaryTokenizerFactory;
+import org.apache.hyracks.storage.common.file.ILocalResourceFactoryProvider;
+import org.apache.hyracks.storage.common.file.LocalResource;
+
+public class SecondaryInvertedIndexOperationsHelper extends 
SecondaryIndexOperationsHelper {
+
+    private IAType secondaryKeyType;
+    private ITypeTraits[] invListsTypeTraits;
+    private IBinaryComparatorFactory[] tokenComparatorFactories;
+    private ITypeTraits[] tokenTypeTraits;
+    private IBinaryTokenizerFactory tokenizerFactory;
+    // For tokenization, sorting and loading. Represents <token, primary keys>.
+    private int numTokenKeyPairFields;
+    private IBinaryComparatorFactory[] tokenKeyPairComparatorFactories;
+    private RecordDescriptor tokenKeyPairRecDesc;
+    private boolean isPartitioned;
+    private int[] invertedIndexFields;
+    private int[] invertedIndexFieldsForNonBulkLoadOps;
+    private int[] secondaryFilterFieldsForNonBulkLoadOps;
+
+    protected SecondaryInvertedIndexOperationsHelper(Dataset dataset, Index 
index,
+            PhysicalOptimizationConfig physOptConf, IPropertiesProvider 
propertiesProvider,
+            MetadataProvider metadataProvider, ARecordType recType, 
ARecordType metaType, ARecordType enforcedType,
+            ARecordType enforcedMetaType) {
+        super(dataset, index, physOptConf, propertiesProvider, 
metadataProvider, recType, metaType, enforcedType,
+                enforcedMetaType);
+    }
+
+    @Override
+    @SuppressWarnings("rawtypes")
+    protected void setSecondaryRecDescAndComparators() throws 
AlgebricksException {
+        int numSecondaryKeys = index.getKeyFieldNames().size();
+        IndexType indexType = index.getIndexType();
+        boolean isEnforcingKeyTypes = index.isEnforcingKeyFileds();
+        // Sanity checks.
+        if (numPrimaryKeys > 1) {
+            throw new CompilationException(
+                    
ErrorCode.COMPILATION_ILLEGAL_INDEX_FOR_DATASET_WITH_COMPOSITE_PRIMARY_INDEX, 
indexType,
+                    
RecordUtil.toFullyQualifiedName(dataset.getDataverseName(), 
dataset.getDatasetName()));
+        }
+        if (numSecondaryKeys > 1) {
+            throw new 
CompilationException(ErrorCode.COMPILATION_ILLEGAL_INDEX_NUM_OF_FIELD, 
numSecondaryKeys,
+                    indexType, 1);
+        }
+        if (indexType == IndexType.LENGTH_PARTITIONED_WORD_INVIX
+                || indexType == IndexType.LENGTH_PARTITIONED_NGRAM_INVIX) {
+            isPartitioned = true;
+        } else {
+            isPartitioned = false;
+        }
+        // Prepare record descriptor used in the assign op, and the optional
+        // select op.
+        secondaryFieldAccessEvalFactories = new 
IScalarEvaluatorFactory[numSecondaryKeys + numFilterFields];
+        ISerializerDeserializer[] secondaryRecFields =
+                new ISerializerDeserializer[numPrimaryKeys + numSecondaryKeys 
+ numFilterFields];
+        ISerializerDeserializer[] enforcedRecFields =
+                new ISerializerDeserializer[1 + numPrimaryKeys + 
numFilterFields];
+        secondaryTypeTraits = new ITypeTraits[numSecondaryKeys + 
numPrimaryKeys];
+        ITypeTraits[] enforcedTypeTraits = new ITypeTraits[1 + numPrimaryKeys];
+        ISerializerDeserializerProvider serdeProvider = 
FormatUtils.getDefaultFormat().getSerdeProvider();
+        ITypeTraitProvider typeTraitProvider = 
FormatUtils.getDefaultFormat().getTypeTraitProvider();
+        if (numSecondaryKeys > 0) {
+            secondaryFieldAccessEvalFactories[0] = 
FormatUtils.getDefaultFormat().getFieldAccessEvaluatorFactory(
+                    isEnforcingKeyTypes ? enforcedItemType : itemType, 
index.getKeyFieldNames().get(0),
+                    numPrimaryKeys);
+            Pair<IAType, Boolean> keyTypePair = 
Index.getNonNullableOpenFieldType(index.getKeyFieldTypes().get(0),
+                    index.getKeyFieldNames().get(0), itemType);
+            secondaryKeyType = keyTypePair.first;
+            anySecondaryKeyIsNullable = anySecondaryKeyIsNullable || 
keyTypePair.second;
+            ISerializerDeserializer keySerde = 
serdeProvider.getSerializerDeserializer(secondaryKeyType);
+            secondaryRecFields[0] = keySerde;
+            secondaryTypeTraits[0] = 
typeTraitProvider.getTypeTrait(secondaryKeyType);
+        }
+        if (numFilterFields > 0) {
+            secondaryFieldAccessEvalFactories[numSecondaryKeys] = 
FormatUtils.getDefaultFormat()
+                    .getFieldAccessEvaluatorFactory(itemType, filterFieldName, 
numPrimaryKeys);
+            Pair<IAType, Boolean> keyTypePair = 
Index.getNonNullableKeyFieldType(filterFieldName, itemType);
+            IAType type = keyTypePair.first;
+            ISerializerDeserializer serde = 
serdeProvider.getSerializerDeserializer(type);
+            secondaryRecFields[numPrimaryKeys + numSecondaryKeys] = serde;
+        }
+        secondaryRecDesc = new RecordDescriptor(secondaryRecFields);
+        // Comparators and type traits for tokens.
+        int numTokenFields = (!isPartitioned) ? numSecondaryKeys : 
numSecondaryKeys + 1;
+        tokenComparatorFactories = new 
IBinaryComparatorFactory[numTokenFields];
+        tokenTypeTraits = new ITypeTraits[numTokenFields];
+        tokenComparatorFactories[0] = 
NonTaggedFormatUtil.getTokenBinaryComparatorFactory(secondaryKeyType);
+        tokenTypeTraits[0] = 
NonTaggedFormatUtil.getTokenTypeTrait(secondaryKeyType);
+        if (isPartitioned) {
+            // The partitioning field is hardcoded to be a short *without* an 
Asterix type tag.
+            tokenComparatorFactories[1] = 
PointableBinaryComparatorFactory.of(ShortPointable.FACTORY);
+            tokenTypeTraits[1] = ShortPointable.TYPE_TRAITS;
+        }
+        // Set tokenizer factory.
+        // TODO: We might want to expose the hashing option at the AQL level,
+        // and add the choice to the index metadata.
+        tokenizerFactory = 
NonTaggedFormatUtil.getBinaryTokenizerFactory(secondaryKeyType.getTypeTag(), 
indexType,
+                index.getGramLength());
+        // Type traits for inverted-list elements. Inverted lists contain
+        // primary keys.
+        invListsTypeTraits = new ITypeTraits[numPrimaryKeys];
+        if (numPrimaryKeys > 0) {
+            invListsTypeTraits[0] = primaryRecDesc.getTypeTraits()[0];
+            enforcedRecFields[0] = primaryRecDesc.getFields()[0];
+            enforcedTypeTraits[0] = primaryRecDesc.getTypeTraits()[0];
+        }
+        enforcedRecFields[numPrimaryKeys] = 
serdeProvider.getSerializerDeserializer(itemType);
+        enforcedRecDesc = new RecordDescriptor(enforcedRecFields, 
enforcedTypeTraits);
+        // For tokenization, sorting and loading.
+        // One token (+ optional partitioning field) + primary keys.
+        numTokenKeyPairFields = (!isPartitioned) ? 1 + numPrimaryKeys : 2 + 
numPrimaryKeys;
+        ISerializerDeserializer[] tokenKeyPairFields =
+                new ISerializerDeserializer[numTokenKeyPairFields + 
numFilterFields];
+        ITypeTraits[] tokenKeyPairTypeTraits = new 
ITypeTraits[numTokenKeyPairFields];
+        tokenKeyPairComparatorFactories = new 
IBinaryComparatorFactory[numTokenKeyPairFields];
+        tokenKeyPairFields[0] = 
serdeProvider.getSerializerDeserializer(secondaryKeyType);
+        tokenKeyPairTypeTraits[0] = tokenTypeTraits[0];
+        tokenKeyPairComparatorFactories[0] = 
NonTaggedFormatUtil.getTokenBinaryComparatorFactory(secondaryKeyType);
+        int pkOff = 1;
+        if (isPartitioned) {
+            tokenKeyPairFields[1] = ShortSerializerDeserializer.INSTANCE;
+            tokenKeyPairTypeTraits[1] = tokenTypeTraits[1];
+            tokenKeyPairComparatorFactories[1] = 
PointableBinaryComparatorFactory.of(ShortPointable.FACTORY);
+            pkOff = 2;
+        }
+        if (numPrimaryKeys > 0) {
+            tokenKeyPairFields[pkOff] = primaryRecDesc.getFields()[0];
+            tokenKeyPairTypeTraits[pkOff] = primaryRecDesc.getTypeTraits()[0];
+            tokenKeyPairComparatorFactories[pkOff] = 
primaryComparatorFactories[0];
+        }
+        if (numFilterFields > 0) {
+            tokenKeyPairFields[numPrimaryKeys + pkOff] = 
secondaryRecFields[numPrimaryKeys + numSecondaryKeys];
+        }
+        tokenKeyPairRecDesc = new RecordDescriptor(tokenKeyPairFields, 
tokenKeyPairTypeTraits);
+        if (filterFieldName != null) {
+            invertedIndexFields = new int[numTokenKeyPairFields];
+            for (int i = 0; i < invertedIndexFields.length; i++) {
+                invertedIndexFields[i] = i;
+            }
+            secondaryFilterFieldsForNonBulkLoadOps = new int[numFilterFields];
+            secondaryFilterFieldsForNonBulkLoadOps[0] = numSecondaryKeys + 
numPrimaryKeys;
+            invertedIndexFieldsForNonBulkLoadOps = new int[numSecondaryKeys + 
numPrimaryKeys];
+            for (int i = 0; i < invertedIndexFieldsForNonBulkLoadOps.length; 
i++) {
+                invertedIndexFieldsForNonBulkLoadOps[i] = i;
+            }
+        }
+    }
+
+    @Override
+    protected int getNumSecondaryKeys() {
+        return numTokenKeyPairFields - numPrimaryKeys;
+    }
+
+    @Override
+    public JobSpecification buildCreationJobSpec() throws AlgebricksException {
+        JobSpecification spec = RuntimeUtils.createJobSpecification();
+        IStorageComponentProvider storageComponentProvider = 
metadataProvider.getStorageComponentProvider();
+        //prepare a LocalResourceMetadata which will be stored in NC's local 
resource repository
+        IResourceFactory localResourceMetadata = new 
LSMInvertedIndexLocalResourceMetadataFactory(invListsTypeTraits,
+                primaryComparatorFactories, tokenTypeTraits, 
tokenComparatorFactories, tokenizerFactory, isPartitioned,
+                dataset.getDatasetId(), mergePolicyFactory, 
mergePolicyFactoryProperties, filterTypeTraits,
+                filterCmpFactories, invertedIndexFields, 
secondaryFilterFields, secondaryFilterFieldsForNonBulkLoadOps,
+                invertedIndexFieldsForNonBulkLoadOps, 
dataset.getIndexOperationTrackerFactory(index),
+                dataset.getIoOperationCallbackFactory(index),
+                storageComponentProvider.getMetadataPageManagerFactory());
+        ILocalResourceFactoryProvider localResourceFactoryProvider = new 
PersistentLocalResourceFactoryProvider(
+                localResourceMetadata, LocalResource.LSMInvertedIndexResource);
+
+        IIndexDataflowHelperFactory dataflowHelperFactory = 
createDataflowHelperFactory();
+        LSMInvertedIndexCreateOperatorDescriptor invIndexCreateOp =
+                new LSMInvertedIndexCreateOperatorDescriptor(spec, 
storageComponentProvider.getStorageManager(),
+                        secondaryFileSplitProvider, 
storageComponentProvider.getIndexLifecycleManagerProvider(),
+                        tokenTypeTraits, tokenComparatorFactories, 
invListsTypeTraits, primaryComparatorFactories,
+                        tokenizerFactory, dataflowHelperFactory, 
localResourceFactoryProvider,
+                        
dataset.getModificationCallbackFactory(storageComponentProvider, index, null,
+                                IndexOperation.CREATE, null),
+                        
storageComponentProvider.getMetadataPageManagerFactory());
+        
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, 
invIndexCreateOp,
+                secondaryPartitionConstraint);
+        spec.addRoot(invIndexCreateOp);
+        spec.setConnectorPolicyAssignmentPolicy(new 
ConnectorPolicyAssignmentPolicy());
+        return spec;
+    }
+
+    @Override
+    public JobSpecification buildLoadingJobSpec() throws AlgebricksException {
+        JobSpecification spec = RuntimeUtils.createJobSpecification();
+
+        // Create dummy key provider for feeding the primary index scan.
+        AbstractOperatorDescriptor keyProviderOp = 
createDummyKeyProviderOp(spec);
+
+        // Create primary index scan op.
+        BTreeSearchOperatorDescriptor primaryScanOp = 
createPrimaryIndexScanOp(spec);
+
+        AbstractOperatorDescriptor sourceOp = primaryScanOp;
+        boolean isEnforcingKeyTypes = index.isEnforcingKeyFileds();
+        int numSecondaryKeys = index.getKeyFieldNames().size();
+        if (isEnforcingKeyTypes && !enforcedItemType.equals(itemType)) {
+            sourceOp = createCastOp(spec, dataset.getDatasetType());
+            spec.connect(new OneToOneConnectorDescriptor(spec), primaryScanOp, 
0, sourceOp, 0);
+        }
+        AlgebricksMetaOperatorDescriptor asterixAssignOp = 
createAssignOp(spec, numSecondaryKeys, secondaryRecDesc);
+
+        // If any of the secondary fields are nullable, then add a select op
+        // that filters nulls.
+        AlgebricksMetaOperatorDescriptor selectOp = null;
+        if (anySecondaryKeyIsNullable || isEnforcingKeyTypes) {
+            selectOp = createFilterNullsSelectOp(spec, numSecondaryKeys, 
secondaryRecDesc);
+        }
+
+        // Create a tokenizer op.
+        AbstractOperatorDescriptor tokenizerOp = createTokenizerOp(spec);
+
+        // Sort by token + primary keys.
+        ExternalSortOperatorDescriptor sortOp =
+                createSortOp(spec, tokenKeyPairComparatorFactories, 
tokenKeyPairRecDesc);
+
+        // Create secondary inverted index bulk load op.
+        LSMInvertedIndexBulkLoadOperatorDescriptor invIndexBulkLoadOp = 
createInvertedIndexBulkLoadOp(spec);
+
+        AlgebricksMetaOperatorDescriptor metaOp = new 
AlgebricksMetaOperatorDescriptor(spec, 1, 0,
+                new IPushRuntimeFactory[] { new SinkRuntimeFactory() }, new 
RecordDescriptor[] {});
+        // Connect the operators.
+        spec.connect(new OneToOneConnectorDescriptor(spec), keyProviderOp, 0, 
primaryScanOp, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), sourceOp, 0, 
asterixAssignOp, 0);
+        if (anySecondaryKeyIsNullable || isEnforcingKeyTypes) {
+            spec.connect(new OneToOneConnectorDescriptor(spec), 
asterixAssignOp, 0, selectOp, 0);
+            spec.connect(new OneToOneConnectorDescriptor(spec), selectOp, 0, 
tokenizerOp, 0);
+        } else {
+            spec.connect(new OneToOneConnectorDescriptor(spec), 
asterixAssignOp, 0, tokenizerOp, 0);
+        }
+        spec.connect(new OneToOneConnectorDescriptor(spec), tokenizerOp, 0, 
sortOp, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), sortOp, 0, 
invIndexBulkLoadOp, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), 
invIndexBulkLoadOp, 0, metaOp, 0);
+        spec.addRoot(metaOp);
+        spec.setConnectorPolicyAssignmentPolicy(new 
ConnectorPolicyAssignmentPolicy());
+        return spec;
+    }
+
+    private AbstractOperatorDescriptor createTokenizerOp(JobSpecification 
spec) throws AlgebricksException {
+        int docField = 0;
+        int numSecondaryKeys = index.getKeyFieldNames().size();
+        int[] primaryKeyFields = new int[numPrimaryKeys + numFilterFields];
+        for (int i = 0; i < primaryKeyFields.length; i++) {
+            primaryKeyFields[i] = numSecondaryKeys + i;
+        }
+        BinaryTokenizerOperatorDescriptor tokenizerOp = new 
BinaryTokenizerOperatorDescriptor(spec,
+                tokenKeyPairRecDesc, tokenizerFactory, docField, 
primaryKeyFields, isPartitioned, false);
+        
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, 
tokenizerOp,
+                primaryPartitionConstraint);
+        return tokenizerOp;
+    }
+
+    @Override
+    protected ExternalSortOperatorDescriptor createSortOp(JobSpecification 
spec,
+            IBinaryComparatorFactory[] secondaryComparatorFactories, 
RecordDescriptor secondaryRecDesc) {
+        // Sort on token and primary keys.
+        int[] sortFields = new int[numTokenKeyPairFields];
+        for (int i = 0; i < numTokenKeyPairFields; i++) {
+            sortFields[i] = i;
+        }
+        ExternalSortOperatorDescriptor sortOp = new 
ExternalSortOperatorDescriptor(spec,
+                physOptConf.getMaxFramesExternalSort(), sortFields, 
tokenKeyPairComparatorFactories, secondaryRecDesc);
+        
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, 
sortOp, primaryPartitionConstraint);
+        return sortOp;
+    }
+
+    private LSMInvertedIndexBulkLoadOperatorDescriptor 
createInvertedIndexBulkLoadOp(JobSpecification spec)
+            throws AlgebricksException {
+        int[] fieldPermutation = new int[numTokenKeyPairFields + 
numFilterFields];
+        for (int i = 0; i < fieldPermutation.length; i++) {
+            fieldPermutation[i] = i;
+        }
+        IIndexDataflowHelperFactory dataflowHelperFactory = 
createDataflowHelperFactory();
+        IStorageComponentProvider storageComponentProvider = 
metadataProvider.getStorageComponentProvider();
+        LSMInvertedIndexBulkLoadOperatorDescriptor invIndexBulkLoadOp = new 
LSMInvertedIndexBulkLoadOperatorDescriptor(
+                spec, secondaryRecDesc, fieldPermutation, false, 
numElementsHint, false,
+                storageComponentProvider.getStorageManager(), 
secondaryFileSplitProvider,
+                storageComponentProvider.getIndexLifecycleManagerProvider(), 
tokenTypeTraits, tokenComparatorFactories,
+                invListsTypeTraits, primaryComparatorFactories, 
tokenizerFactory, dataflowHelperFactory,
+                storageComponentProvider.getMetadataPageManagerFactory());
+        
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, 
invIndexBulkLoadOp,
+                secondaryPartitionConstraint);
+        return invIndexBulkLoadOp;
+    }
+
+    private IIndexDataflowHelperFactory createDataflowHelperFactory() throws 
AlgebricksException {
+        return dataset.getIndexDataflowHelperFactory(metadataProvider, index, 
itemType, metaType, mergePolicyFactory,
+                mergePolicyFactoryProperties);
+    }
+
+    @Override
+    public JobSpecification buildCompactJobSpec() throws AsterixException, 
AlgebricksException {
+        JobSpecification spec = RuntimeUtils.createJobSpecification();
+        IIndexDataflowHelperFactory dataflowHelperFactory = 
createDataflowHelperFactory();
+        IStorageComponentProvider storageComponentProvider = 
metadataProvider.getStorageComponentProvider();
+        LSMInvertedIndexCompactOperator compactOp =
+                new LSMInvertedIndexCompactOperator(spec, 
storageComponentProvider.getStorageManager(),
+                        secondaryFileSplitProvider, 
storageComponentProvider.getIndexLifecycleManagerProvider(),
+                        tokenTypeTraits, tokenComparatorFactories, 
invListsTypeTraits, primaryComparatorFactories,
+                        tokenizerFactory, dataflowHelperFactory,
+                        
dataset.getModificationCallbackFactory(storageComponentProvider, index, null,
+                                IndexOperation.FULL_MERGE, null),
+                        
storageComponentProvider.getMetadataPageManagerFactory());
+        
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, 
compactOp,
+                secondaryPartitionConstraint);
+
+        spec.addRoot(compactOp);
+        spec.setConnectorPolicyAssignmentPolicy(new 
ConnectorPolicyAssignmentPolicy());
+        return spec;
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryRTreeOperationsHelper.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryRTreeOperationsHelper.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryRTreeOperationsHelper.java
new file mode 100644
index 0000000..460b635
--- /dev/null
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryRTreeOperationsHelper.java
@@ -0,0 +1,454 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.metadata.utils;
+
+import java.util.List;
+
+import org.apache.asterix.common.config.DatasetConfig.DatasetType;
+import org.apache.asterix.common.config.GlobalConfig;
+import org.apache.asterix.common.config.IPropertiesProvider;
+import org.apache.asterix.common.context.IStorageComponentProvider;
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.transactions.IResourceFactory;
+import org.apache.asterix.external.indexing.IndexingConstants;
+import org.apache.asterix.external.operators.ExternalScanOperatorDescriptor;
+import org.apache.asterix.formats.nontagged.BinaryComparatorFactoryProvider;
+import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
+import org.apache.asterix.formats.nontagged.TypeTraitProvider;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.metadata.entities.Index;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.om.utils.NonTaggedFormatUtil;
+import org.apache.asterix.runtime.utils.RuntimeUtils;
+import 
org.apache.asterix.transaction.management.resource.ExternalRTreeLocalResourceMetadataFactory;
+import 
org.apache.asterix.transaction.management.resource.LSMRTreeLocalResourceMetadataFactory;
+import 
org.apache.asterix.transaction.management.resource.PersistentLocalResourceFactoryProvider;
+import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import 
org.apache.hyracks.algebricks.core.jobgen.impl.ConnectorPolicyAssignmentPolicy;
+import 
org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
+import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
+import org.apache.hyracks.algebricks.runtime.operators.base.SinkRuntimeFactory;
+import 
org.apache.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
+import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.api.dataflow.value.ITypeTraits;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import org.apache.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
+import 
org.apache.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
+import org.apache.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
+import 
org.apache.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
+import 
org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+import 
org.apache.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
+import 
org.apache.hyracks.storage.am.common.dataflow.TreeIndexCreateOperatorDescriptor;
+import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
+import 
org.apache.hyracks.storage.am.lsm.common.dataflow.LSMTreeIndexCompactOperatorDescriptor;
+import org.apache.hyracks.storage.am.rtree.frames.RTreePolicyType;
+import org.apache.hyracks.storage.common.file.ILocalResourceFactoryProvider;
+import org.apache.hyracks.storage.common.file.LocalResource;
+
+@SuppressWarnings("rawtypes")
+public class SecondaryRTreeOperationsHelper extends 
SecondaryIndexOperationsHelper {
+
+    protected IPrimitiveValueProviderFactory[] valueProviderFactories;
+    protected int numNestedSecondaryKeyFields;
+    protected ATypeTag keyType;
+    protected int[] primaryKeyFields;
+    protected int[] rtreeFields;
+    protected boolean isPointMBR;
+    protected RecordDescriptor secondaryRecDescForPointMBR = null;
+
+    protected SecondaryRTreeOperationsHelper(Dataset dataset, Index index, 
PhysicalOptimizationConfig physOptConf,
+            IPropertiesProvider propertiesProvider, MetadataProvider 
metadataProvider, ARecordType recType,
+            ARecordType metaType, ARecordType enforcedType, ARecordType 
enforcedMetaType) {
+        super(dataset, index, physOptConf, propertiesProvider, 
metadataProvider, recType, metaType, enforcedType,
+                enforcedMetaType);
+    }
+
+    @Override
+    public JobSpecification buildCreationJobSpec() throws AlgebricksException {
+        JobSpecification spec = RuntimeUtils.createJobSpecification();
+        IIndexDataflowHelperFactory indexDataflowHelperFactory = 
dataset.getIndexDataflowHelperFactory(
+                metadataProvider, index, itemType, metaType, 
mergePolicyFactory, mergePolicyFactoryProperties);
+        IStorageComponentProvider storageComponentProvider = 
metadataProvider.getStorageComponentProvider();
+        ILocalResourceFactoryProvider localResourceFactoryProvider;
+        if (dataset.getDatasetType() == DatasetType.INTERNAL) {
+            IBinaryComparatorFactory[] btreeCompFactories = 
getComparatorFactoriesForDeletedKeyBTree();
+            //prepare a LocalResourceMetadata which will be stored in NC's 
local resource repository
+            IResourceFactory localResourceMetadata = new 
LSMRTreeLocalResourceMetadataFactory(secondaryTypeTraits,
+                    secondaryComparatorFactories, btreeCompFactories, 
valueProviderFactories, RTreePolicyType.RTREE,
+                    MetadataProvider.proposeLinearizer(keyType, 
secondaryComparatorFactories.length),
+                    dataset.getDatasetId(), mergePolicyFactory, 
mergePolicyFactoryProperties, filterTypeTraits,
+                    filterCmpFactories, rtreeFields, primaryKeyFields, 
secondaryFilterFields, isPointMBR,
+                    dataset.getIndexOperationTrackerFactory(index), 
dataset.getIoOperationCallbackFactory(index),
+                    storageComponentProvider.getMetadataPageManagerFactory());
+            localResourceFactoryProvider =
+                    new 
PersistentLocalResourceFactoryProvider(localResourceMetadata, 
LocalResource.LSMRTreeResource);
+        } else {
+            // External dataset
+            // Prepare a LocalResourceMetadata which will be stored in NC's 
local resource repository
+            IResourceFactory localResourceMetadata = new 
ExternalRTreeLocalResourceMetadataFactory(secondaryTypeTraits,
+                    secondaryComparatorFactories, 
ExternalIndexingOperations.getBuddyBtreeComparatorFactories(),
+                    valueProviderFactories, RTreePolicyType.RTREE,
+                    MetadataProvider.proposeLinearizer(keyType, 
secondaryComparatorFactories.length),
+                    dataset.getDatasetId(), mergePolicyFactory, 
mergePolicyFactoryProperties, primaryKeyFields,
+                    isPointMBR, dataset.getIndexOperationTrackerFactory(index),
+                    dataset.getIoOperationCallbackFactory(index),
+                    storageComponentProvider.getMetadataPageManagerFactory());
+            localResourceFactoryProvider = new 
PersistentLocalResourceFactoryProvider(localResourceMetadata,
+                    LocalResource.ExternalRTreeResource);
+        }
+
+        TreeIndexCreateOperatorDescriptor secondaryIndexCreateOp =
+                new TreeIndexCreateOperatorDescriptor(spec, 
storageComponentProvider.getStorageManager(),
+                        
storageComponentProvider.getIndexLifecycleManagerProvider(), 
secondaryFileSplitProvider,
+                        secondaryTypeTraits, secondaryComparatorFactories, 
null, indexDataflowHelperFactory,
+                        localResourceFactoryProvider,
+                        
dataset.getModificationCallbackFactory(storageComponentProvider, index, null,
+                                IndexOperation.CREATE, null),
+                        
storageComponentProvider.getMetadataPageManagerFactory());
+        
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, 
secondaryIndexCreateOp,
+                secondaryPartitionConstraint);
+        spec.addRoot(secondaryIndexCreateOp);
+        spec.setConnectorPolicyAssignmentPolicy(new 
ConnectorPolicyAssignmentPolicy());
+        return spec;
+    }
+
+    private IBinaryComparatorFactory[] 
getComparatorFactoriesForDeletedKeyBTree() {
+        IBinaryComparatorFactory[] btreeCompFactories = new 
IBinaryComparatorFactory[secondaryTypeTraits.length];
+        int i = 0;
+        for (; i < secondaryComparatorFactories.length; i++) {
+            btreeCompFactories[i] = secondaryComparatorFactories[i];
+        }
+        for (int j = 0; i < secondaryTypeTraits.length; i++, j++) {
+            btreeCompFactories[i] = primaryComparatorFactories[j];
+        }
+        return btreeCompFactories;
+    }
+
+    @Override
+    protected int getNumSecondaryKeys() {
+        return numNestedSecondaryKeyFields;
+    }
+
+    @Override
+    protected void setSecondaryRecDescAndComparators() throws 
AlgebricksException {
+        List<List<String>> secondaryKeyFields = index.getKeyFieldNames();
+        int numSecondaryKeys = secondaryKeyFields.size();
+        boolean isEnforcingKeyTypes = index.isEnforcingKeyFileds();
+        if (numSecondaryKeys != 1) {
+            throw new AsterixException("Cannot use " + numSecondaryKeys + " 
fields as a key for the R-tree index. "
+                    + "There can be only one field as a key for the R-tree 
index.");
+        }
+        Pair<IAType, Boolean> spatialTypePair = 
Index.getNonNullableOpenFieldType(index.getKeyFieldTypes().get(0),
+                secondaryKeyFields.get(0), itemType);
+        IAType spatialType = spatialTypePair.first;
+        anySecondaryKeyIsNullable = spatialTypePair.second;
+        if (spatialType == null) {
+            throw new AsterixException("Could not find field " + 
secondaryKeyFields.get(0) + " in the schema.");
+        }
+        isPointMBR = spatialType.getTypeTag() == ATypeTag.POINT || 
spatialType.getTypeTag() == ATypeTag.POINT3D;
+        int numDimensions = 
NonTaggedFormatUtil.getNumDimensions(spatialType.getTypeTag());
+        numNestedSecondaryKeyFields = numDimensions * 2;
+        int recordColumn = dataset.getDatasetType() == DatasetType.INTERNAL ? 
numPrimaryKeys : 0;
+        secondaryFieldAccessEvalFactories =
+                
metadataProvider.getFormat().createMBRFactory(isEnforcingKeyTypes ? 
enforcedItemType : itemType,
+                        secondaryKeyFields.get(0), recordColumn, 
numDimensions, filterFieldName);
+        secondaryComparatorFactories = new 
IBinaryComparatorFactory[numNestedSecondaryKeyFields];
+        valueProviderFactories = new 
IPrimitiveValueProviderFactory[numNestedSecondaryKeyFields];
+        ISerializerDeserializer[] secondaryRecFields =
+                new ISerializerDeserializer[numPrimaryKeys + 
numNestedSecondaryKeyFields + numFilterFields];
+        ISerializerDeserializer[] enforcedRecFields =
+                new ISerializerDeserializer[1 + numPrimaryKeys + 
numFilterFields];
+        secondaryTypeTraits = new ITypeTraits[numNestedSecondaryKeyFields + 
numPrimaryKeys];
+        ITypeTraits[] enforcedTypeTraits = new ITypeTraits[1 + numPrimaryKeys];
+        IAType nestedKeyType = 
NonTaggedFormatUtil.getNestedSpatialType(spatialType.getTypeTag());
+        keyType = nestedKeyType.getTypeTag();
+        for (int i = 0; i < numNestedSecondaryKeyFields; i++) {
+            ISerializerDeserializer keySerde =
+                    
SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(nestedKeyType);
+            secondaryRecFields[i] = keySerde;
+            secondaryComparatorFactories[i] =
+                    
BinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(nestedKeyType,
 true);
+            secondaryTypeTraits[i] = 
TypeTraitProvider.INSTANCE.getTypeTrait(nestedKeyType);
+            valueProviderFactories[i] =
+                    
metadataProvider.getStorageComponentProvider().getPrimitiveValueProviderFactory();
+
+        }
+        // Add serializers and comparators for primary index fields.
+        if (dataset.getDatasetType() == DatasetType.INTERNAL) {
+            for (int i = 0; i < numPrimaryKeys; i++) {
+                secondaryRecFields[numNestedSecondaryKeyFields + i] = 
primaryRecDesc.getFields()[i];
+                secondaryTypeTraits[numNestedSecondaryKeyFields + i] = 
primaryRecDesc.getTypeTraits()[i];
+                enforcedRecFields[i] = primaryRecDesc.getFields()[i];
+                enforcedTypeTraits[i] = primaryRecDesc.getTypeTraits()[i];
+            }
+        } else {
+            for (int i = 0; i < numPrimaryKeys; i++) {
+                secondaryRecFields[numNestedSecondaryKeyFields + i] = 
IndexingConstants.getSerializerDeserializer(i);
+                secondaryTypeTraits[numNestedSecondaryKeyFields + i] = 
IndexingConstants.getTypeTraits(i);
+                enforcedRecFields[i] = 
IndexingConstants.getSerializerDeserializer(i);
+                enforcedTypeTraits[i] = IndexingConstants.getTypeTraits(i);
+            }
+        }
+        enforcedRecFields[numPrimaryKeys] =
+                
SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(itemType);
+        enforcedRecDesc = new RecordDescriptor(enforcedRecFields, 
enforcedTypeTraits);
+        if (numFilterFields > 0) {
+            rtreeFields = new int[numNestedSecondaryKeyFields + 
numPrimaryKeys];
+            for (int i = 0; i < rtreeFields.length; i++) {
+                rtreeFields[i] = i;
+            }
+
+            Pair<IAType, Boolean> typePair = 
Index.getNonNullableKeyFieldType(filterFieldName, itemType);
+            IAType type = typePair.first;
+            ISerializerDeserializer serde = 
SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(type);
+            secondaryRecFields[numPrimaryKeys + numNestedSecondaryKeyFields] = 
serde;
+        }
+        secondaryRecDesc = new RecordDescriptor(secondaryRecFields);
+        primaryKeyFields = new int[numPrimaryKeys];
+        for (int i = 0; i < primaryKeyFields.length; i++) {
+            primaryKeyFields[i] = i + numNestedSecondaryKeyFields;
+        }
+        if (isPointMBR) {
+            int numNestedSecondaryKeyFieldForPointMBR = 
numNestedSecondaryKeyFields / 2;
+            ISerializerDeserializer[] recFieldsForPointMBR = new 
ISerializerDeserializer[numPrimaryKeys
+                    + numNestedSecondaryKeyFieldForPointMBR + numFilterFields];
+            int idx = 0;
+            for (int i = 0; i < numNestedSecondaryKeyFieldForPointMBR; i++) {
+                recFieldsForPointMBR[idx++] = secondaryRecFields[i];
+            }
+            for (int i = 0; i < numPrimaryKeys + numFilterFields; i++) {
+                recFieldsForPointMBR[idx++] = 
secondaryRecFields[numNestedSecondaryKeyFields + i];
+            }
+            secondaryRecDescForPointMBR = new 
RecordDescriptor(recFieldsForPointMBR);
+        }
+    }
+
+    @Override
+    public JobSpecification buildLoadingJobSpec() throws AsterixException, 
AlgebricksException {
+        /***************************************************
+         * [ About PointMBR Optimization ]
+         * Instead of storing a MBR(4 doubles) for a point(2 doubles) in RTree 
leaf node,
+         * PointMBR concept is introduced.
+         * PointMBR is a way to store a point as 2 doubles in RTree leaf node.
+         * This reduces RTree index size roughly in half.
+         * In order to fully benefit from the PointMBR concept, besides RTree,
+         * external sort operator during bulk-loading (from either data 
loading or index creation)
+         * must deal with point as 2 doubles instead of 4 doubles. Otherwise, 
external sort will suffer from twice as
+         * many doubles as it actually requires. For this purpose,
+         * PointMBR specific optimization logic is added as follows:
+         * 1) CreateMBR function in assign operator generates 2 doubles, 
instead of 4 doubles.
+         * 2) External sort operator sorts points represented with 2 doubles.
+         * 3) Bulk-loading in RTree takes 4 doubles by reading 2 doubles twice 
and then,
+         * do the same work as non-point MBR cases.
+         ***************************************************/
+        JobSpecification spec = RuntimeUtils.createJobSpecification();
+        int[] fieldPermutation = 
createFieldPermutationForBulkLoadOp(numNestedSecondaryKeyFields);
+        int numNestedSecondaryKeFieldsConsideringPointMBR =
+                isPointMBR ? numNestedSecondaryKeyFields / 2 : 
numNestedSecondaryKeyFields;
+        RecordDescriptor secondaryRecDescConsideringPointMBR =
+                isPointMBR ? secondaryRecDescForPointMBR : secondaryRecDesc;
+        boolean isEnforcingKeyTypes = index.isEnforcingKeyFileds();
+        IIndexDataflowHelperFactory indexDataflowHelperFactory = 
dataset.getIndexDataflowHelperFactory(
+                metadataProvider, index, itemType, metaType, 
mergePolicyFactory, mergePolicyFactoryProperties);
+        if (dataset.getDatasetType() == DatasetType.INTERNAL) {
+            // Create dummy key provider for feeding the primary index scan.
+            AbstractOperatorDescriptor keyProviderOp = 
createDummyKeyProviderOp(spec);
+
+            // Create primary index scan op.
+            BTreeSearchOperatorDescriptor primaryScanOp = 
createPrimaryIndexScanOp(spec);
+
+            // Assign op.
+            AbstractOperatorDescriptor sourceOp = primaryScanOp;
+            if (isEnforcingKeyTypes && !enforcedItemType.equals(itemType)) {
+                sourceOp = createCastOp(spec, dataset.getDatasetType());
+                spec.connect(new OneToOneConnectorDescriptor(spec), 
primaryScanOp, 0, sourceOp, 0);
+            }
+            AlgebricksMetaOperatorDescriptor asterixAssignOp = 
createAssignOp(spec,
+                    numNestedSecondaryKeFieldsConsideringPointMBR, 
secondaryRecDescConsideringPointMBR);
+
+            // If any of the secondary fields are nullable, then add a select 
op that filters nulls.
+            AlgebricksMetaOperatorDescriptor selectOp = null;
+            if (anySecondaryKeyIsNullable || isEnforcingKeyTypes) {
+                selectOp = createFilterNullsSelectOp(spec, 
numNestedSecondaryKeFieldsConsideringPointMBR,
+                        secondaryRecDescConsideringPointMBR);
+            }
+
+            // Sort by secondary keys.
+            ExternalSortOperatorDescriptor sortOp = createSortOp(spec,
+                    new IBinaryComparatorFactory[] {
+                            MetadataProvider.proposeLinearizer(keyType, 
secondaryComparatorFactories.length) },
+                    isPointMBR ? secondaryRecDescForPointMBR : 
secondaryRecDesc);
+            // Create secondary RTree bulk load op.
+            TreeIndexBulkLoadOperatorDescriptor secondaryBulkLoadOp = 
createTreeIndexBulkLoadOp(spec, fieldPermutation,
+                    indexDataflowHelperFactory, 
GlobalConfig.DEFAULT_TREE_FILL_FACTOR);
+            AlgebricksMetaOperatorDescriptor metaOp = new 
AlgebricksMetaOperatorDescriptor(spec, 1, 0,
+                    new IPushRuntimeFactory[] { new SinkRuntimeFactory() }, 
new RecordDescriptor[] {});
+            // Connect the operators.
+            spec.connect(new OneToOneConnectorDescriptor(spec), keyProviderOp, 
0, primaryScanOp, 0);
+            spec.connect(new OneToOneConnectorDescriptor(spec), sourceOp, 0, 
asterixAssignOp, 0);
+            if (anySecondaryKeyIsNullable || isEnforcingKeyTypes) {
+                spec.connect(new OneToOneConnectorDescriptor(spec), 
asterixAssignOp, 0, selectOp, 0);
+                spec.connect(new OneToOneConnectorDescriptor(spec), selectOp, 
0, sortOp, 0);
+            } else {
+                spec.connect(new OneToOneConnectorDescriptor(spec), 
asterixAssignOp, 0, sortOp, 0);
+            }
+            spec.connect(new OneToOneConnectorDescriptor(spec), sortOp, 0, 
secondaryBulkLoadOp, 0);
+            spec.connect(new OneToOneConnectorDescriptor(spec), 
secondaryBulkLoadOp, 0, metaOp, 0);
+            spec.addRoot(metaOp);
+            spec.setConnectorPolicyAssignmentPolicy(new 
ConnectorPolicyAssignmentPolicy());
+        } else {
+            // External dataset
+            /*
+             * In case of external data, this method is used to build loading 
jobs for both
+             * initial load on index creation
+             * and transaction load on dataset referesh
+             */
+            // Create external indexing scan operator
+            ExternalScanOperatorDescriptor primaryScanOp = 
createExternalIndexingOp(spec);
+            AbstractOperatorDescriptor sourceOp = primaryScanOp;
+            if (isEnforcingKeyTypes && !enforcedItemType.equals(itemType)) {
+                sourceOp = createCastOp(spec, dataset.getDatasetType());
+                spec.connect(new OneToOneConnectorDescriptor(spec), 
primaryScanOp, 0, sourceOp, 0);
+            }
+            // Assign op.
+            AlgebricksMetaOperatorDescriptor asterixAssignOp = 
createExternalAssignOp(spec,
+                    numNestedSecondaryKeFieldsConsideringPointMBR, 
secondaryRecDescConsideringPointMBR);
+
+            // If any of the secondary fields are nullable, then add a select 
op that filters nulls.
+            AlgebricksMetaOperatorDescriptor selectOp = null;
+            if (anySecondaryKeyIsNullable || isEnforcingKeyTypes) {
+                selectOp = createFilterNullsSelectOp(spec, 
numNestedSecondaryKeFieldsConsideringPointMBR,
+                        secondaryRecDescConsideringPointMBR);
+            }
+
+            // Sort by secondary keys.
+            ExternalSortOperatorDescriptor sortOp = createSortOp(spec,
+                    new IBinaryComparatorFactory[] {
+                            MetadataProvider.proposeLinearizer(keyType, 
secondaryComparatorFactories.length) },
+                    isPointMBR ? secondaryRecDescForPointMBR : 
secondaryRecDesc);
+            // Create secondary RTree bulk load op.
+            IOperatorDescriptor root;
+            AbstractTreeIndexOperatorDescriptor secondaryBulkLoadOp;
+            if (externalFiles != null) {
+                // Transaction load
+                secondaryBulkLoadOp = createExternalIndexBulkModifyOp(spec, 
fieldPermutation,
+                        indexDataflowHelperFactory, 
GlobalConfig.DEFAULT_TREE_FILL_FACTOR);
+                root = secondaryBulkLoadOp;
+            } else {
+                // Initial load
+                secondaryBulkLoadOp = createTreeIndexBulkLoadOp(spec, 
fieldPermutation, indexDataflowHelperFactory,
+                        GlobalConfig.DEFAULT_TREE_FILL_FACTOR);
+                AlgebricksMetaOperatorDescriptor metaOp = new 
AlgebricksMetaOperatorDescriptor(spec, 1, 0,
+                        new IPushRuntimeFactory[] { new SinkRuntimeFactory() },
+                        new RecordDescriptor[] { secondaryRecDesc });
+                spec.connect(new OneToOneConnectorDescriptor(spec), 
secondaryBulkLoadOp, 0, metaOp, 0);
+                root = metaOp;
+            }
+
+            spec.connect(new OneToOneConnectorDescriptor(spec), sourceOp, 0, 
asterixAssignOp, 0);
+            if (anySecondaryKeyIsNullable || isEnforcingKeyTypes) {
+                spec.connect(new OneToOneConnectorDescriptor(spec), 
asterixAssignOp, 0, selectOp, 0);
+                spec.connect(new OneToOneConnectorDescriptor(spec), selectOp, 
0, sortOp, 0);
+            } else {
+                spec.connect(new OneToOneConnectorDescriptor(spec), 
asterixAssignOp, 0, sortOp, 0);
+            }
+            spec.connect(new OneToOneConnectorDescriptor(spec), sortOp, 0, 
secondaryBulkLoadOp, 0);
+            spec.addRoot(root);
+            spec.setConnectorPolicyAssignmentPolicy(new 
ConnectorPolicyAssignmentPolicy());
+        }
+        return spec;
+    }
+
+    protected int[] createFieldPermutationForBulkLoadOp(int 
numSecondaryKeyFields) {
+        int[] fieldPermutation = new int[numSecondaryKeyFields + 
numPrimaryKeys + numFilterFields];
+        int numSecondaryKeyFieldsForPointMBR = numSecondaryKeyFields / 2;
+        int end = isPointMBR ? numSecondaryKeyFieldsForPointMBR : 
fieldPermutation.length;
+        for (int i = 0; i < end; i++) {
+            fieldPermutation[i] = i;
+        }
+        if (isPointMBR) {
+            
/*******************************************************************************
+             * For example, suppose that 2d point type data is indexed using 
RTree, there is no
+             * filter fields, and a primary key consists of a single field.
+             * ========== Without PointMBR optimization ==========
+             * If there is no point type optimization, the input operator of 
RTree's TreeIndexBulkLoadOperator
+             * delivers five variables to the TreeIndexBulkLoadOperator as 
follows:
+             * [$var1, $var2, $var3, $var4, $var5]
+             * where $var1 ~ $var4 together represent an MBR of a point object.
+             * Since it is a point object, $var1 and $var3 have always 
identical values. So do $var2 and $var3.
+             * $var5 represents a primary key value.
+             * fieldPermutation variable captures this order correctly by 
putting values in the array as follows:
+             * [0,1,2,3,4]
+             * =========== With PointMBR optimization ===========
+             * With PointMBR optimization, the input operator of RTree's 
TreeIndexBulkLoadOperator
+             * delivers 3 variables to the TreeIndexBulkLoadOperator as 
follows:
+             * [$var1, $var2, $var3]
+             * where $var1 and $var2 together represent an MBR of a point 
object.
+             * $var3 represents a primary key value.
+             * fieldPermutation variable captures this order correctly by 
putting values in the array as follows:
+             * [0,1,0,1,2]
+             * This means that bulkloadOp reads the pair of $var1 and $var2 
twice in order to provide the same
+             * output just like when there were no PointMBR optimization 
available.
+             * This adjustment is done in this if clause code.
+             
*********************************************************************************/
+            int idx = numSecondaryKeyFieldsForPointMBR;
+            //add the rest of the sk fields for pointMBR
+            for (int i = 0; i < numSecondaryKeyFieldsForPointMBR; i++) {
+                fieldPermutation[idx++] = i;
+            }
+            //add the pk and filter fields
+            end = numSecondaryKeyFieldsForPointMBR + numPrimaryKeys + 
numFilterFields;
+            for (int i = numSecondaryKeyFieldsForPointMBR; i < end; i++) {
+                fieldPermutation[idx++] = i;
+            }
+        }
+        return fieldPermutation;
+    }
+
+    @Override
+    public JobSpecification buildCompactJobSpec() throws AsterixException, 
AlgebricksException {
+        JobSpecification spec = RuntimeUtils.createJobSpecification();
+        IIndexDataflowHelperFactory indexDataflowHelperFactory = 
dataset.getIndexDataflowHelperFactory(
+                metadataProvider, index, itemType, metaType, 
mergePolicyFactory, mergePolicyFactoryProperties);
+        LSMTreeIndexCompactOperatorDescriptor compactOp = new 
LSMTreeIndexCompactOperatorDescriptor(spec,
+                
metadataProvider.getStorageComponentProvider().getStorageManager(),
+                
metadataProvider.getStorageComponentProvider().getIndexLifecycleManagerProvider(),
+                secondaryFileSplitProvider, secondaryTypeTraits, 
secondaryComparatorFactories,
+                secondaryBloomFilterKeyFields, indexDataflowHelperFactory,
+                
dataset.getModificationCallbackFactory(metadataProvider.getStorageComponentProvider(),
 index, null,
+                        IndexOperation.FULL_MERGE, null),
+                
metadataProvider.getStorageComponentProvider().getMetadataPageManagerFactory());
+
+        
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, 
compactOp,
+                secondaryPartitionConstraint);
+        spec.addRoot(compactOp);
+        spec.setConnectorPolicyAssignmentPolicy(new 
ConnectorPolicyAssignmentPolicy());
+        return spec;
+    }
+}

Reply via email to