http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/SecondaryIndexOperationsHelper.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/SecondaryIndexOperationsHelper.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/SecondaryIndexOperationsHelper.java
deleted file mode 100644
index c362e2e..0000000
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/SecondaryIndexOperationsHelper.java
+++ /dev/null
@@ -1,585 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.asterix.file;
-
-import java.io.DataOutput;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.asterix.app.external.ExternalIndexingOperations;
-import org.apache.asterix.common.config.StorageProperties;
-import org.apache.asterix.common.config.DatasetConfig.DatasetType;
-import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp;
-import org.apache.asterix.common.config.DatasetConfig.IndexType;
-import org.apache.asterix.common.config.IPropertiesProvider;
-import org.apache.asterix.common.context.AsterixVirtualBufferCacheProvider;
-import org.apache.asterix.common.context.ITransactionSubsystemProvider;
-import org.apache.asterix.common.context.TransactionSubsystemProvider;
-import org.apache.asterix.common.dataflow.LSMIndexUtil;
-import org.apache.asterix.common.exceptions.AsterixException;
-import 
org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory;
-import org.apache.asterix.metadata.declared.MetadataProvider;
-import org.apache.asterix.runtime.util.AppContextInfo;
-import org.apache.asterix.runtime.util.RuntimeComponentsProvider;
-import org.apache.asterix.common.transactions.IRecoveryManager.ResourceType;
-import org.apache.asterix.common.transactions.JobId;
-import org.apache.asterix.external.indexing.ExternalFile;
-import org.apache.asterix.external.indexing.IndexingConstants;
-import 
org.apache.asterix.external.operators.ExternalDataScanOperatorDescriptor;
-import 
org.apache.asterix.external.operators.ExternalIndexBulkModifyOperatorDescriptor;
-import org.apache.asterix.formats.nontagged.BinaryBooleanInspector;
-import org.apache.asterix.formats.nontagged.BinaryComparatorFactoryProvider;
-import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
-import org.apache.asterix.formats.nontagged.TypeTraitProvider;
-import org.apache.asterix.metadata.MetadataException;
-import org.apache.asterix.metadata.declared.MetadataProvider;
-import org.apache.asterix.metadata.entities.Dataset;
-import org.apache.asterix.metadata.entities.InternalDatasetDetails;
-import org.apache.asterix.metadata.utils.DatasetUtils;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.asterix.om.types.IAType;
-import org.apache.asterix.runtime.evaluators.functions.AndDescriptor;
-import org.apache.asterix.runtime.evaluators.functions.CastTypeDescriptor;
-import org.apache.asterix.runtime.evaluators.functions.IsUnknownDescriptor;
-import org.apache.asterix.runtime.evaluators.functions.NotDescriptor;
-import org.apache.asterix.runtime.job.listener.JobEventListenerFactory;
-import 
org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexInstantSearchOperationCallbackFactory;
-import 
org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexOperationTrackerProvider;
-import 
org.apache.asterix.transaction.management.service.transaction.JobIdFactory;
-import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
-import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.common.utils.Pair;
-import 
org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
-import org.apache.hyracks.algebricks.data.ISerializerDeserializerProvider;
-import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
-import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
-import 
org.apache.hyracks.algebricks.runtime.evaluators.ColumnAccessEvalFactory;
-import 
org.apache.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
-import 
org.apache.hyracks.algebricks.runtime.operators.std.AssignRuntimeFactory;
-import 
org.apache.hyracks.algebricks.runtime.operators.std.StreamSelectRuntimeFactory;
-import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
-import org.apache.hyracks.api.dataflow.value.ITypeTraits;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.job.IJobletEventListenerFactory;
-import org.apache.hyracks.api.job.JobSpecification;
-import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import 
org.apache.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
-import org.apache.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
-import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
-import 
org.apache.hyracks.dataflow.std.misc.ConstantTupleSourceOperatorDescriptor;
-import org.apache.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
-import 
org.apache.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
-import 
org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
-import 
org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
-import 
org.apache.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
-import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
-import 
org.apache.hyracks.storage.am.lsm.btree.dataflow.LSMBTreeDataflowHelperFactory;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
-
-@SuppressWarnings("rawtypes")
-// TODO: We should eventually have a hierarchy of classes that can create all
-// possible index job specs,
-// not just for creation.
-public abstract class SecondaryIndexOperationsHelper {
-    protected final PhysicalOptimizationConfig physOptConf;
-
-    protected int numPrimaryKeys;
-    protected int numSecondaryKeys;
-    protected MetadataProvider metadataProvider;
-    protected String dataverseName;
-    protected String datasetName;
-    protected Dataset dataset;
-    protected ARecordType itemType;
-    protected ARecordType metaType;
-    protected List<Integer> keySourceIndicators;
-    protected ISerializerDeserializer metaSerde;
-    protected ISerializerDeserializer payloadSerde;
-    protected IFileSplitProvider primaryFileSplitProvider;
-    protected AlgebricksPartitionConstraint primaryPartitionConstraint;
-    protected IFileSplitProvider secondaryFileSplitProvider;
-    protected AlgebricksPartitionConstraint secondaryPartitionConstraint;
-    protected String secondaryIndexName;
-    protected boolean anySecondaryKeyIsNullable = false;
-    protected boolean isEnforcingKeyTypes = false;
-
-    protected long numElementsHint;
-    protected IBinaryComparatorFactory[] primaryComparatorFactories;
-    protected int[] primaryBloomFilterKeyFields;
-    protected RecordDescriptor primaryRecDesc;
-    protected IBinaryComparatorFactory[] secondaryComparatorFactories;
-    protected ITypeTraits[] secondaryTypeTraits;
-    protected int[] secondaryBloomFilterKeyFields;
-    protected RecordDescriptor secondaryRecDesc;
-    protected IScalarEvaluatorFactory[] secondaryFieldAccessEvalFactories;
-
-    protected IPropertiesProvider propertiesProvider;
-    protected ILSMMergePolicyFactory mergePolicyFactory;
-    protected Map<String, String> mergePolicyFactoryProperties;
-    protected RecordDescriptor enforcedRecDesc;
-    protected ARecordType enforcedItemType;
-    protected ARecordType enforcedMetaType;
-    protected int numFilterFields;
-    protected List<String> filterFieldName;
-    protected ITypeTraits[] filterTypeTraits;
-    protected IBinaryComparatorFactory[] filterCmpFactories;
-    protected int[] secondaryFilterFields;
-    protected int[] primaryFilterFields;
-    protected int[] primaryBTreeFields;
-    protected int[] secondaryBTreeFields;
-    protected List<ExternalFile> externalFiles;
-
-    // Prevent public construction. Should be created via createIndexCreator().
-    protected SecondaryIndexOperationsHelper(PhysicalOptimizationConfig 
physOptConf,
-            IPropertiesProvider propertiesProvider) {
-        this.physOptConf = physOptConf;
-        this.propertiesProvider = propertiesProvider;
-    }
-
-    public static SecondaryIndexOperationsHelper 
createIndexOperationsHelper(IndexType indexType, String dataverseName,
-            String datasetName, String indexName, List<List<String>> 
secondaryKeyFields, List<IAType> secondaryKeyTypes,
-            boolean isEnforced, int gramLength, MetadataProvider 
metadataProvider,
-            PhysicalOptimizationConfig physOptConf, ARecordType recType, 
ARecordType metaType,
-            List<Integer> keySourceIndicators, ARecordType enforcedType) 
throws AsterixException, AlgebricksException {
-        IPropertiesProvider asterixPropertiesProvider = 
AppContextInfo.INSTANCE;
-        SecondaryIndexOperationsHelper indexOperationsHelper = null;
-        switch (indexType) {
-            case BTREE: {
-                indexOperationsHelper = new 
SecondaryBTreeOperationsHelper(physOptConf, asterixPropertiesProvider);
-                break;
-            }
-            case RTREE: {
-                indexOperationsHelper = new 
SecondaryRTreeOperationsHelper(physOptConf, asterixPropertiesProvider);
-                break;
-            }
-            case SINGLE_PARTITION_WORD_INVIX:
-            case SINGLE_PARTITION_NGRAM_INVIX:
-            case LENGTH_PARTITIONED_WORD_INVIX:
-            case LENGTH_PARTITIONED_NGRAM_INVIX: {
-                indexOperationsHelper =
-                        new 
SecondaryInvertedIndexOperationsHelper(physOptConf, asterixPropertiesProvider);
-                break;
-            }
-            default: {
-                throw new AsterixException("Unknown Index Type: " + indexType);
-            }
-        }
-        indexOperationsHelper.init(indexType, dataverseName, datasetName, 
indexName, secondaryKeyFields,
-                secondaryKeyTypes, isEnforced, gramLength, metadataProvider, 
recType, metaType, keySourceIndicators,
-                enforcedType);
-        return indexOperationsHelper;
-    }
-
-    public abstract JobSpecification buildCreationJobSpec() throws 
AsterixException, AlgebricksException;
-
-    public abstract JobSpecification buildLoadingJobSpec() throws 
AsterixException, AlgebricksException;
-
-    public abstract JobSpecification buildCompactJobSpec() throws 
AsterixException, AlgebricksException;
-
-    protected void init(IndexType indexType, String dvn, String dsn, String 
in, List<List<String>> secondaryKeyFields,
-            List<IAType> secondaryKeyTypes, boolean isEnforced, int 
gramLength, MetadataProvider metadataProvider,
-            ARecordType aRecType, ARecordType metaType, List<Integer> 
keySourceIndicators, ARecordType enforcedType)
-            throws AsterixException, AlgebricksException {
-        this.metadataProvider = metadataProvider;
-        dataverseName = dvn == null ? 
metadataProvider.getDefaultDataverseName() : dvn;
-        datasetName = dsn;
-        secondaryIndexName = in;
-        isEnforcingKeyTypes = isEnforced;
-        dataset = metadataProvider.findDataset(dataverseName, datasetName);
-        if (dataset == null) {
-            throw new AsterixException("Unknown dataset " + datasetName);
-        }
-        boolean temp = dataset.getDatasetDetails().isTemp();
-        itemType = aRecType;
-        this.metaType = metaType;
-        this.keySourceIndicators = keySourceIndicators;
-        enforcedItemType = enforcedType;
-        payloadSerde = 
SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(itemType);
-        metaSerde = metaType == null ? null
-                : 
SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(metaType);
-        numSecondaryKeys = secondaryKeyFields.size();
-        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> 
secondarySplitsAndConstraint = metadataProvider
-                .splitProviderAndPartitionConstraintsForDataset(dataverseName, 
datasetName, secondaryIndexName, temp);
-        secondaryFileSplitProvider = secondarySplitsAndConstraint.first;
-        secondaryPartitionConstraint = secondarySplitsAndConstraint.second;
-
-        if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
-            numPrimaryKeys = ExternalIndexingOperations.getRIDSize(dataset);
-        } else {
-            filterFieldName = DatasetUtils.getFilterField(dataset);
-            if (filterFieldName != null) {
-                numFilterFields = 1;
-            } else {
-                numFilterFields = 0;
-            }
-
-            numPrimaryKeys = DatasetUtils.getPartitioningKeys(dataset).size();
-            Pair<IFileSplitProvider, AlgebricksPartitionConstraint> 
primarySplitsAndConstraint = metadataProvider
-                    
.splitProviderAndPartitionConstraintsForDataset(dataverseName, datasetName, 
datasetName, temp);
-            primaryFileSplitProvider = primarySplitsAndConstraint.first;
-            primaryPartitionConstraint = primarySplitsAndConstraint.second;
-            setPrimaryRecDescAndComparators();
-        }
-        setSecondaryRecDescAndComparators(indexType, secondaryKeyFields, 
secondaryKeyTypes, gramLength,
-                metadataProvider);
-        numElementsHint = 
metadataProvider.getCardinalityPerPartitionHint(dataset);
-        Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
-                DatasetUtils.getMergePolicyFactory(dataset, 
metadataProvider.getMetadataTxnContext());
-        mergePolicyFactory = compactionInfo.first;
-        mergePolicyFactoryProperties = compactionInfo.second;
-
-        if (numFilterFields > 0) {
-            setFilterTypeTraitsAndComparators();
-        }
-    }
-
-    protected void setFilterTypeTraitsAndComparators() throws 
AlgebricksException {
-        filterTypeTraits = new ITypeTraits[numFilterFields];
-        filterCmpFactories = new IBinaryComparatorFactory[numFilterFields];
-        secondaryFilterFields = new int[numFilterFields];
-        primaryFilterFields = new int[numFilterFields];
-        primaryBTreeFields = new int[numPrimaryKeys + 1];
-        secondaryBTreeFields = new int[numSecondaryKeys + numPrimaryKeys];
-        for (int i = 0; i < primaryBTreeFields.length; i++) {
-            primaryBTreeFields[i] = i;
-        }
-        for (int i = 0; i < secondaryBTreeFields.length; i++) {
-            secondaryBTreeFields[i] = i;
-        }
-
-        IAType type = itemType.getSubFieldType(filterFieldName);
-        filterCmpFactories[0] = 
BinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(type, true);
-        filterTypeTraits[0] = TypeTraitProvider.INSTANCE.getTypeTrait(type);
-        secondaryFilterFields[0] = getNumSecondaryKeys() + numPrimaryKeys;
-        primaryFilterFields[0] = numPrimaryKeys + 1;
-    }
-
-    protected abstract int getNumSecondaryKeys();
-
-    protected void setPrimaryRecDescAndComparators() throws 
AlgebricksException {
-        List<List<String>> partitioningKeys = 
DatasetUtils.getPartitioningKeys(dataset);
-        int numPrimaryKeys = partitioningKeys.size();
-        ISerializerDeserializer[] primaryRecFields =
-                new ISerializerDeserializer[numPrimaryKeys + 1 + 
(dataset.hasMetaPart() ? 1 : 0)];
-        ITypeTraits[] primaryTypeTraits = new ITypeTraits[numPrimaryKeys + 1 + 
(dataset.hasMetaPart() ? 1 : 0)];
-        primaryComparatorFactories = new 
IBinaryComparatorFactory[numPrimaryKeys];
-        primaryBloomFilterKeyFields = new int[numPrimaryKeys];
-        ISerializerDeserializerProvider serdeProvider = 
metadataProvider.getFormat().getSerdeProvider();
-        List<Integer> indicators = null;
-        if (dataset.hasMetaPart()) {
-            indicators = ((InternalDatasetDetails) 
dataset.getDatasetDetails()).getKeySourceIndicator();
-        }
-        for (int i = 0; i < numPrimaryKeys; i++) {
-            IAType keyType =
-                    (indicators == null || indicators.get(i) == 0) ? 
itemType.getSubFieldType(partitioningKeys.get(i))
-                            : 
metaType.getSubFieldType(partitioningKeys.get(i));
-            primaryRecFields[i] = 
serdeProvider.getSerializerDeserializer(keyType);
-            primaryComparatorFactories[i] =
-                    
BinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType, 
true);
-            primaryTypeTraits[i] = 
TypeTraitProvider.INSTANCE.getTypeTrait(keyType);
-            primaryBloomFilterKeyFields[i] = i;
-        }
-        primaryRecFields[numPrimaryKeys] = payloadSerde;
-        primaryTypeTraits[numPrimaryKeys] = 
TypeTraitProvider.INSTANCE.getTypeTrait(itemType);
-        if (dataset.hasMetaPart()) {
-            primaryRecFields[numPrimaryKeys + 1] = payloadSerde;
-            primaryTypeTraits[numPrimaryKeys + 1] = 
TypeTraitProvider.INSTANCE.getTypeTrait(itemType);
-        }
-        primaryRecDesc = new RecordDescriptor(primaryRecFields, 
primaryTypeTraits);
-    }
-
-    protected abstract void setSecondaryRecDescAndComparators(IndexType 
indexType,
-            List<List<String>> secondaryKeyFields, List<IAType> 
secondaryKeyTypes, int gramLength,
-            MetadataProvider metadataProvider) throws AlgebricksException, 
AsterixException;
-
-    protected AbstractOperatorDescriptor 
createDummyKeyProviderOp(JobSpecification spec)
-            throws AsterixException, AlgebricksException {
-        // Build dummy tuple containing one field with a dummy value inside.
-        ArrayTupleBuilder tb = new ArrayTupleBuilder(1);
-        DataOutput dos = tb.getDataOutput();
-        tb.reset();
-        try {
-            // Serialize dummy value into a field.
-            IntegerSerializerDeserializer.INSTANCE.serialize(0, dos);
-        } catch (HyracksDataException e) {
-            throw new AsterixException(e);
-        }
-        // Add dummy field.
-        tb.addFieldEndOffset();
-        ISerializerDeserializer[] keyRecDescSers = { 
IntegerSerializerDeserializer.INSTANCE };
-        RecordDescriptor keyRecDesc = new RecordDescriptor(keyRecDescSers);
-        ConstantTupleSourceOperatorDescriptor keyProviderOp = new 
ConstantTupleSourceOperatorDescriptor(spec,
-                keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(), 
tb.getSize());
-        
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, 
keyProviderOp,
-                primaryPartitionConstraint);
-        return keyProviderOp;
-    }
-
-    protected BTreeSearchOperatorDescriptor 
createPrimaryIndexScanOp(JobSpecification spec) throws AlgebricksException {
-        // -Infinity
-        int[] lowKeyFields = null;
-        // +Infinity
-        int[] highKeyFields = null;
-        ITransactionSubsystemProvider txnSubsystemProvider = new 
TransactionSubsystemProvider();
-        JobId jobId = JobIdFactory.generateJobId();
-        metadataProvider.setJobId(jobId);
-        boolean isWriteTransaction = metadataProvider.isWriteTransaction();
-        IJobletEventListenerFactory jobEventListenerFactory = new 
JobEventListenerFactory(jobId, isWriteTransaction);
-        spec.setJobletEventListenerFactory(jobEventListenerFactory);
-
-        boolean temp = dataset.getDatasetDetails().isTemp();
-        ISearchOperationCallbackFactory searchCallbackFactory = temp ? 
NoOpOperationCallbackFactory.INSTANCE
-                : new PrimaryIndexInstantSearchOperationCallbackFactory(jobId, 
dataset.getDatasetId(),
-                        primaryBloomFilterKeyFields, txnSubsystemProvider, 
ResourceType.LSM_BTREE);
-        StorageProperties storageProperties = 
propertiesProvider.getStorageProperties();
-        BTreeSearchOperatorDescriptor primarySearchOp = new 
BTreeSearchOperatorDescriptor(spec, primaryRecDesc,
-                RuntimeComponentsProvider.RUNTIME_PROVIDER, 
RuntimeComponentsProvider.RUNTIME_PROVIDER,
-                primaryFileSplitProvider, primaryRecDesc.getTypeTraits(), 
primaryComparatorFactories,
-                primaryBloomFilterKeyFields, lowKeyFields, highKeyFields, 
true, true,
-                new LSMBTreeDataflowHelperFactory(new 
AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
-                        mergePolicyFactory, mergePolicyFactoryProperties,
-                        new 
PrimaryIndexOperationTrackerProvider(dataset.getDatasetId()),
-                        RuntimeComponentsProvider.RUNTIME_PROVIDER, 
LSMBTreeIOOperationCallbackFactory.INSTANCE,
-                        storageProperties.getBloomFilterFalsePositiveRate(), 
true, filterTypeTraits, filterCmpFactories,
-                        primaryBTreeFields, primaryFilterFields, !temp),
-                false, false, null, searchCallbackFactory, null, null, 
LSMIndexUtil
-                        .getMetadataPageManagerFactory());
-
-        
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, 
primarySearchOp,
-                primaryPartitionConstraint);
-        return primarySearchOp;
-    }
-
-    protected AlgebricksMetaOperatorDescriptor createAssignOp(JobSpecification 
spec,
-            AbstractOperatorDescriptor primaryScanOp, int 
numSecondaryKeyFields, RecordDescriptor secondaryRecDesc)
-            throws AlgebricksException {
-        int[] outColumns = new int[numSecondaryKeyFields + numFilterFields];
-        int[] projectionList = new int[numSecondaryKeyFields + numPrimaryKeys 
+ numFilterFields];
-        for (int i = 0; i < numSecondaryKeyFields + numFilterFields; i++) {
-            outColumns[i] = numPrimaryKeys + i;
-        }
-        int projCount = 0;
-        for (int i = 0; i < numSecondaryKeyFields; i++) {
-            projectionList[projCount++] = numPrimaryKeys + i;
-        }
-        for (int i = 0; i < numPrimaryKeys; i++) {
-            projectionList[projCount++] = i;
-        }
-        if (numFilterFields > 0) {
-            projectionList[projCount++] = numPrimaryKeys + 
numSecondaryKeyFields;
-        }
-
-        IScalarEvaluatorFactory[] sefs = new 
IScalarEvaluatorFactory[secondaryFieldAccessEvalFactories.length];
-        for (int i = 0; i < secondaryFieldAccessEvalFactories.length; ++i) {
-            sefs[i] = secondaryFieldAccessEvalFactories[i];
-        }
-        AssignRuntimeFactory assign = new AssignRuntimeFactory(outColumns, 
sefs, projectionList);
-        AlgebricksMetaOperatorDescriptor asterixAssignOp = new 
AlgebricksMetaOperatorDescriptor(spec, 1, 1,
-                new IPushRuntimeFactory[] { assign }, new RecordDescriptor[] { 
secondaryRecDesc });
-        
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, 
asterixAssignOp,
-                primaryPartitionConstraint);
-        return asterixAssignOp;
-    }
-
-    protected AlgebricksMetaOperatorDescriptor createCastOp(JobSpecification 
spec, DatasetType dsType) {
-        CastTypeDescriptor castFuncDesc = (CastTypeDescriptor) 
CastTypeDescriptor.FACTORY.createFunctionDescriptor();
-        castFuncDesc.setImmutableStates(enforcedItemType, itemType);
-
-        int[] outColumns = new int[1];
-        int[] projectionList = new int[(dataset.hasMetaPart() ? 2 : 1) + 
numPrimaryKeys];
-        int recordIdx;
-        //external datascan operator returns a record as the first field, 
instead of the last in internal case
-        if (dsType == DatasetType.EXTERNAL) {
-            recordIdx = 0;
-            outColumns[0] = 0;
-        } else {
-            recordIdx = numPrimaryKeys;
-            outColumns[0] = numPrimaryKeys;
-        }
-        for (int i = 0; i <= numPrimaryKeys; i++) {
-            projectionList[i] = i;
-        }
-        if (dataset.hasMetaPart()) {
-            projectionList[numPrimaryKeys + 1] = numPrimaryKeys + 1;
-        }
-        IScalarEvaluatorFactory[] castEvalFact =
-                new IScalarEvaluatorFactory[] { new 
ColumnAccessEvalFactory(recordIdx) };
-        IScalarEvaluatorFactory[] sefs = new IScalarEvaluatorFactory[1];
-        sefs[0] = castFuncDesc.createEvaluatorFactory(castEvalFact);
-        AssignRuntimeFactory castAssign = new AssignRuntimeFactory(outColumns, 
sefs, projectionList);
-        AlgebricksMetaOperatorDescriptor castRecAssignOp = new 
AlgebricksMetaOperatorDescriptor(spec, 1, 1,
-                new IPushRuntimeFactory[] { castAssign }, new 
RecordDescriptor[] { enforcedRecDesc });
-
-        return castRecAssignOp;
-    }
-
-    protected ExternalSortOperatorDescriptor createSortOp(JobSpecification 
spec,
-            IBinaryComparatorFactory[] secondaryComparatorFactories, 
RecordDescriptor secondaryRecDesc) {
-        int[] sortFields = new int[secondaryComparatorFactories.length];
-        for (int i = 0; i < secondaryComparatorFactories.length; i++) {
-            sortFields[i] = i;
-        }
-        ExternalSortOperatorDescriptor sortOp = new 
ExternalSortOperatorDescriptor(spec,
-                physOptConf.getMaxFramesExternalSort(), sortFields, 
secondaryComparatorFactories, secondaryRecDesc);
-        
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, 
sortOp, primaryPartitionConstraint);
-        return sortOp;
-    }
-
-    protected TreeIndexBulkLoadOperatorDescriptor 
createTreeIndexBulkLoadOp(JobSpecification spec,
-            int[] fieldPermutation, IIndexDataflowHelperFactory 
dataflowHelperFactory, float fillFactor)
-            throws MetadataException, AlgebricksException {
-        TreeIndexBulkLoadOperatorDescriptor treeIndexBulkLoadOp = new 
TreeIndexBulkLoadOperatorDescriptor(spec,
-                secondaryRecDesc, RuntimeComponentsProvider.RUNTIME_PROVIDER,
-                RuntimeComponentsProvider.RUNTIME_PROVIDER, 
secondaryFileSplitProvider,
-                secondaryRecDesc.getTypeTraits(), 
secondaryComparatorFactories, secondaryBloomFilterKeyFields,
-                fieldPermutation, fillFactor, false, numElementsHint, false, 
dataflowHelperFactory, LSMIndexUtil
-                        .getMetadataPageManagerFactory());
-        
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, 
treeIndexBulkLoadOp,
-                secondaryPartitionConstraint);
-        return treeIndexBulkLoadOp;
-    }
-
-    public AlgebricksMetaOperatorDescriptor 
createFilterNullsSelectOp(JobSpecification spec, int numSecondaryKeyFields,
-            RecordDescriptor secondaryRecDesc) throws AlgebricksException {
-        IScalarEvaluatorFactory[] andArgsEvalFactories = new 
IScalarEvaluatorFactory[numSecondaryKeyFields];
-        NotDescriptor notDesc = new NotDescriptor();
-        IsUnknownDescriptor isUnknownDesc = new IsUnknownDescriptor();
-        for (int i = 0; i < numSecondaryKeyFields; i++) {
-            // Access column i, and apply 'is not null'.
-            ColumnAccessEvalFactory columnAccessEvalFactory = new 
ColumnAccessEvalFactory(i);
-            IScalarEvaluatorFactory isUnknownEvalFactory =
-                    isUnknownDesc.createEvaluatorFactory(new 
IScalarEvaluatorFactory[] { columnAccessEvalFactory });
-            IScalarEvaluatorFactory notEvalFactory =
-                    notDesc.createEvaluatorFactory(new 
IScalarEvaluatorFactory[] { isUnknownEvalFactory });
-            andArgsEvalFactories[i] = notEvalFactory;
-        }
-        IScalarEvaluatorFactory selectCond = null;
-        if (numSecondaryKeyFields > 1) {
-            // Create conjunctive condition where all secondary index keys must
-            // satisfy 'is not null'.
-            AndDescriptor andDesc = new AndDescriptor();
-            selectCond = andDesc.createEvaluatorFactory(andArgsEvalFactories);
-        } else {
-            selectCond = andArgsEvalFactories[0];
-        }
-        StreamSelectRuntimeFactory select = new 
StreamSelectRuntimeFactory(selectCond, null,
-                BinaryBooleanInspector.FACTORY, false, -1, null);
-        AlgebricksMetaOperatorDescriptor asterixSelectOp = new 
AlgebricksMetaOperatorDescriptor(spec, 1, 1,
-                new IPushRuntimeFactory[] { select }, new RecordDescriptor[] { 
secondaryRecDesc });
-        
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, 
asterixSelectOp,
-                primaryPartitionConstraint);
-        return asterixSelectOp;
-    }
-
-    // This method creates a source indexing operator for external data
-    protected ExternalDataScanOperatorDescriptor 
createExternalIndexingOp(JobSpecification spec)
-            throws AlgebricksException, AsterixException {
-        // A record + primary keys
-        ISerializerDeserializer[] serdes = new ISerializerDeserializer[1 + 
numPrimaryKeys];
-        ITypeTraits[] typeTraits = new ITypeTraits[1 + numPrimaryKeys];
-        // payload serde and type traits for the record slot
-        serdes[0] = payloadSerde;
-        typeTraits[0] = TypeTraitProvider.INSTANCE.getTypeTrait(itemType);
-        //  serdes and type traits for rid fields
-        for (int i = 1; i < serdes.length; i++) {
-            serdes[i] = IndexingConstants.getSerializerDeserializer(i - 1);
-            typeTraits[i] = IndexingConstants.getTypeTraits(i - 1);
-        }
-        // output record desc
-        RecordDescriptor indexerDesc = new RecordDescriptor(serdes, 
typeTraits);
-
-        // Create the operator and its partition constraits
-        Pair<ExternalDataScanOperatorDescriptor, 
AlgebricksPartitionConstraint> indexingOpAndConstraints;
-        try {
-            indexingOpAndConstraints = 
ExternalIndexingOperations.createExternalIndexingOp(spec, metadataProvider,
-                    dataset, itemType, indexerDesc, externalFiles);
-        } catch (Exception e) {
-            throw new AlgebricksException(e);
-        }
-        
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, 
indexingOpAndConstraints.first,
-                indexingOpAndConstraints.second);
-
-        // Set the primary partition constraints to this partition constraints
-        primaryPartitionConstraint = indexingOpAndConstraints.second;
-        return indexingOpAndConstraints.first;
-    }
-
-    protected AlgebricksMetaOperatorDescriptor 
createExternalAssignOp(JobSpecification spec, int numSecondaryKeys,
-            RecordDescriptor secondaryRecDesc) throws AlgebricksException {
-        int[] outColumns = new int[numSecondaryKeys];
-        int[] projectionList = new int[numSecondaryKeys + numPrimaryKeys];
-        for (int i = 0; i < numSecondaryKeys; i++) {
-            outColumns[i] = i + numPrimaryKeys + 1;
-            projectionList[i] = i + numPrimaryKeys + 1;
-        }
-
-        IScalarEvaluatorFactory[] sefs = new 
IScalarEvaluatorFactory[secondaryFieldAccessEvalFactories.length];
-        for (int i = 0; i < secondaryFieldAccessEvalFactories.length; ++i) {
-            sefs[i] = secondaryFieldAccessEvalFactories[i];
-        }
-        //add External RIDs to the projection list
-        for (int i = 0; i < numPrimaryKeys; i++) {
-            projectionList[numSecondaryKeys + i] = i + 1;
-        }
-
-        AssignRuntimeFactory assign = new AssignRuntimeFactory(outColumns, 
sefs, projectionList);
-        AlgebricksMetaOperatorDescriptor asterixAssignOp = new 
AlgebricksMetaOperatorDescriptor(spec, 1, 1,
-                new IPushRuntimeFactory[] { assign }, new RecordDescriptor[] { 
secondaryRecDesc });
-        return asterixAssignOp;
-    }
-
-    protected ExternalIndexBulkModifyOperatorDescriptor 
createExternalIndexBulkModifyOp(JobSpecification spec,
-            int[] fieldPermutation, IIndexDataflowHelperFactory 
dataflowHelperFactory, float fillFactor)
-            throws MetadataException, AlgebricksException {
-        // create a list of file ids
-        int numOfDeletedFiles = 0;
-        for (ExternalFile file : externalFiles) {
-            if (file.getPendingOp() == ExternalFilePendingOp.PENDING_DROP_OP) {
-                numOfDeletedFiles++;
-            }
-        }
-        int[] deletedFiles = new int[numOfDeletedFiles];
-        int i = 0;
-        for (ExternalFile file : externalFiles) {
-            if (file.getPendingOp() == ExternalFilePendingOp.PENDING_DROP_OP) {
-                deletedFiles[i] = file.getFileNumber();
-            }
-        }
-        ExternalIndexBulkModifyOperatorDescriptor treeIndexBulkLoadOp = new 
ExternalIndexBulkModifyOperatorDescriptor(
-                spec, RuntimeComponentsProvider.RUNTIME_PROVIDER,
-                RuntimeComponentsProvider.RUNTIME_PROVIDER, 
secondaryFileSplitProvider, secondaryTypeTraits,
-                secondaryComparatorFactories, secondaryBloomFilterKeyFields, 
dataflowHelperFactory,
-                NoOpOperationCallbackFactory.INSTANCE, deletedFiles, 
fieldPermutation, fillFactor, numElementsHint,
-                LSMIndexUtil.getMetadataPageManagerFactory());
-        
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, 
treeIndexBulkLoadOp,
-                secondaryPartitionConstraint);
-        return treeIndexBulkLoadOp;
-    }
-
-    public List<ExternalFile> getExternalFiles() {
-        return externalFiles;
-    }
-
-    public void setExternalFiles(List<ExternalFile> externalFiles) {
-        this.externalFiles = externalFiles;
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/SecondaryInvertedIndexOperationsHelper.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/SecondaryInvertedIndexOperationsHelper.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/SecondaryInvertedIndexOperationsHelper.java
deleted file mode 100644
index 3f5a9e9..0000000
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/SecondaryInvertedIndexOperationsHelper.java
+++ /dev/null
@@ -1,383 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.file;
-
-import java.util.List;
-
-import org.apache.asterix.common.config.StorageProperties;
-import org.apache.asterix.common.config.DatasetConfig.IndexType;
-import org.apache.asterix.common.config.IPropertiesProvider;
-import org.apache.asterix.common.context.AsterixVirtualBufferCacheProvider;
-import org.apache.asterix.common.dataflow.LSMIndexUtil;
-import org.apache.asterix.common.exceptions.AsterixException;
-import 
org.apache.asterix.common.ioopcallbacks.LSMInvertedIndexIOOperationCallbackFactory;
-import org.apache.asterix.common.transactions.IResourceFactory;
-import org.apache.asterix.metadata.declared.MetadataProvider;
-import org.apache.asterix.metadata.entities.Index;
-import org.apache.asterix.om.types.IAType;
-import org.apache.asterix.om.util.NonTaggedFormatUtil;
-import org.apache.asterix.runtime.formats.FormatUtils;
-import org.apache.asterix.runtime.util.RuntimeComponentsProvider;
-import 
org.apache.asterix.transaction.management.opcallbacks.SecondaryIndexOperationTrackerProvider;
-import 
org.apache.asterix.transaction.management.resource.LSMInvertedIndexLocalResourceMetadataFactory;
-import 
org.apache.asterix.transaction.management.resource.PersistentLocalResourceFactoryProvider;
-import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.common.utils.Pair;
-import 
org.apache.hyracks.algebricks.core.jobgen.impl.ConnectorPolicyAssignmentPolicy;
-import 
org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
-import org.apache.hyracks.algebricks.data.ISerializerDeserializerProvider;
-import org.apache.hyracks.algebricks.data.ITypeTraitProvider;
-import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
-import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
-import org.apache.hyracks.algebricks.runtime.operators.base.SinkRuntimeFactory;
-import 
org.apache.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
-import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
-import org.apache.hyracks.api.dataflow.value.ITypeTraits;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.job.JobSpecification;
-import org.apache.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
-import org.apache.hyracks.data.std.primitive.ShortPointable;
-import 
org.apache.hyracks.dataflow.common.data.marshalling.ShortSerializerDeserializer;
-import org.apache.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
-import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
-import org.apache.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
-import 
org.apache.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
-import 
org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
-import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
-import 
org.apache.hyracks.storage.am.lsm.invertedindex.dataflow.BinaryTokenizerOperatorDescriptor;
-import 
org.apache.hyracks.storage.am.lsm.invertedindex.dataflow.LSMInvertedIndexBulkLoadOperatorDescriptor;
-import 
org.apache.hyracks.storage.am.lsm.invertedindex.dataflow.LSMInvertedIndexCompactOperator;
-import 
org.apache.hyracks.storage.am.lsm.invertedindex.dataflow.LSMInvertedIndexCreateOperatorDescriptor;
-import 
org.apache.hyracks.storage.am.lsm.invertedindex.dataflow.LSMInvertedIndexDataflowHelperFactory;
-import 
org.apache.hyracks.storage.am.lsm.invertedindex.dataflow.PartitionedLSMInvertedIndexDataflowHelperFactory;
-import 
org.apache.hyracks.storage.am.lsm.invertedindex.tokenizers.IBinaryTokenizerFactory;
-import org.apache.hyracks.storage.common.file.ILocalResourceFactoryProvider;
-import org.apache.hyracks.storage.common.file.LocalResource;
-
-public class SecondaryInvertedIndexOperationsHelper extends 
SecondaryIndexOperationsHelper {
-
-    private IAType secondaryKeyType;
-    private ITypeTraits[] invListsTypeTraits;
-    private IBinaryComparatorFactory[] tokenComparatorFactories;
-    private ITypeTraits[] tokenTypeTraits;
-    private IBinaryTokenizerFactory tokenizerFactory;
-    // For tokenization, sorting and loading. Represents <token, primary keys>.
-    private int numTokenKeyPairFields;
-    private IBinaryComparatorFactory[] tokenKeyPairComparatorFactories;
-    private RecordDescriptor tokenKeyPairRecDesc;
-    private boolean isPartitioned;
-    private int[] invertedIndexFields;
-    private int[] invertedIndexFieldsForNonBulkLoadOps;
-    private int[] secondaryFilterFieldsForNonBulkLoadOps;
-
-    protected 
SecondaryInvertedIndexOperationsHelper(PhysicalOptimizationConfig physOptConf,
-            IPropertiesProvider propertiesProvider) {
-        super(physOptConf, propertiesProvider);
-    }
-
-    @Override
-    @SuppressWarnings("rawtypes")
-    protected void setSecondaryRecDescAndComparators(IndexType indexType, 
List<List<String>> secondaryKeyFields,
-            List<IAType> secondaryKeyTypes, int gramLength, MetadataProvider 
metadata)
-            throws AlgebricksException, AsterixException {
-        // Sanity checks.
-        if (numPrimaryKeys > 1) {
-            throw new AsterixException("Cannot create inverted index on 
dataset with composite primary key.");
-        }
-        if (numSecondaryKeys > 1) {
-            throw new AsterixException("Cannot create composite inverted index 
on multiple fields.");
-        }
-        if (indexType == IndexType.LENGTH_PARTITIONED_WORD_INVIX
-                || indexType == IndexType.LENGTH_PARTITIONED_NGRAM_INVIX) {
-            isPartitioned = true;
-        } else {
-            isPartitioned = false;
-        }
-        // Prepare record descriptor used in the assign op, and the optional
-        // select op.
-        secondaryFieldAccessEvalFactories = new 
IScalarEvaluatorFactory[numSecondaryKeys + numFilterFields];
-        ISerializerDeserializer[] secondaryRecFields =
-                new ISerializerDeserializer[numPrimaryKeys + numSecondaryKeys 
+ numFilterFields];
-        ISerializerDeserializer[] enforcedRecFields = new 
ISerializerDeserializer[1 + numPrimaryKeys + numFilterFields];
-        secondaryTypeTraits = new ITypeTraits[numSecondaryKeys + 
numPrimaryKeys];
-        ITypeTraits[] enforcedTypeTraits = new ITypeTraits[1 + numPrimaryKeys];
-        ISerializerDeserializerProvider serdeProvider = 
FormatUtils.getDefaultFormat().getSerdeProvider();
-        ITypeTraitProvider typeTraitProvider = 
FormatUtils.getDefaultFormat().getTypeTraitProvider();
-        if (numSecondaryKeys > 0) {
-            secondaryFieldAccessEvalFactories[0] = 
FormatUtils.getDefaultFormat().getFieldAccessEvaluatorFactory(
-                    isEnforcingKeyTypes ? enforcedItemType : itemType, 
secondaryKeyFields.get(0), numPrimaryKeys);
-            Pair<IAType, Boolean> keyTypePair =
-                    
Index.getNonNullableOpenFieldType(secondaryKeyTypes.get(0), 
secondaryKeyFields.get(0), itemType);
-            secondaryKeyType = keyTypePair.first;
-            anySecondaryKeyIsNullable = anySecondaryKeyIsNullable || 
keyTypePair.second;
-            ISerializerDeserializer keySerde = 
serdeProvider.getSerializerDeserializer(secondaryKeyType);
-            secondaryRecFields[0] = keySerde;
-            secondaryTypeTraits[0] = 
typeTraitProvider.getTypeTrait(secondaryKeyType);
-        }
-        if (numFilterFields > 0) {
-            secondaryFieldAccessEvalFactories[numSecondaryKeys] = 
FormatUtils.getDefaultFormat()
-                    .getFieldAccessEvaluatorFactory(itemType, filterFieldName, 
numPrimaryKeys);
-            Pair<IAType, Boolean> keyTypePair = 
Index.getNonNullableKeyFieldType(filterFieldName, itemType);
-            IAType type = keyTypePair.first;
-            ISerializerDeserializer serde = 
serdeProvider.getSerializerDeserializer(type);
-            secondaryRecFields[numPrimaryKeys + numSecondaryKeys] = serde;
-        }
-        secondaryRecDesc = new RecordDescriptor(secondaryRecFields);
-        // Comparators and type traits for tokens.
-        int numTokenFields = (!isPartitioned) ? numSecondaryKeys : 
numSecondaryKeys + 1;
-        tokenComparatorFactories = new 
IBinaryComparatorFactory[numTokenFields];
-        tokenTypeTraits = new ITypeTraits[numTokenFields];
-        tokenComparatorFactories[0] = 
NonTaggedFormatUtil.getTokenBinaryComparatorFactory(secondaryKeyType);
-        tokenTypeTraits[0] = 
NonTaggedFormatUtil.getTokenTypeTrait(secondaryKeyType);
-        if (isPartitioned) {
-            // The partitioning field is hardcoded to be a short *without* an 
Asterix type tag.
-            tokenComparatorFactories[1] = 
PointableBinaryComparatorFactory.of(ShortPointable.FACTORY);
-            tokenTypeTraits[1] = ShortPointable.TYPE_TRAITS;
-        }
-        // Set tokenizer factory.
-        // TODO: We might want to expose the hashing option at the AQL level,
-        // and add the choice to the index metadata.
-        tokenizerFactory =
-                
NonTaggedFormatUtil.getBinaryTokenizerFactory(secondaryKeyType.getTypeTag(), 
indexType, gramLength);
-        // Type traits for inverted-list elements. Inverted lists contain
-        // primary keys.
-        invListsTypeTraits = new ITypeTraits[numPrimaryKeys];
-        if (numPrimaryKeys > 0) {
-            invListsTypeTraits[0] = primaryRecDesc.getTypeTraits()[0];
-            enforcedRecFields[0] = primaryRecDesc.getFields()[0];
-            enforcedTypeTraits[0] = primaryRecDesc.getTypeTraits()[0];
-        }
-        enforcedRecFields[numPrimaryKeys] = 
serdeProvider.getSerializerDeserializer(itemType);
-        enforcedRecDesc = new RecordDescriptor(enforcedRecFields, 
enforcedTypeTraits);
-        // For tokenization, sorting and loading.
-        // One token (+ optional partitioning field) + primary keys.
-        numTokenKeyPairFields = (!isPartitioned) ? 1 + numPrimaryKeys : 2 + 
numPrimaryKeys;
-        ISerializerDeserializer[] tokenKeyPairFields =
-                new ISerializerDeserializer[numTokenKeyPairFields + 
numFilterFields];
-        ITypeTraits[] tokenKeyPairTypeTraits = new 
ITypeTraits[numTokenKeyPairFields];
-        tokenKeyPairComparatorFactories = new 
IBinaryComparatorFactory[numTokenKeyPairFields];
-        tokenKeyPairFields[0] = 
serdeProvider.getSerializerDeserializer(secondaryKeyType);
-        tokenKeyPairTypeTraits[0] = tokenTypeTraits[0];
-        tokenKeyPairComparatorFactories[0] = 
NonTaggedFormatUtil.getTokenBinaryComparatorFactory(secondaryKeyType);
-        int pkOff = 1;
-        if (isPartitioned) {
-            tokenKeyPairFields[1] = ShortSerializerDeserializer.INSTANCE;
-            tokenKeyPairTypeTraits[1] = tokenTypeTraits[1];
-            tokenKeyPairComparatorFactories[1] = 
PointableBinaryComparatorFactory.of(ShortPointable.FACTORY);
-            pkOff = 2;
-        }
-        if (numPrimaryKeys > 0) {
-            tokenKeyPairFields[pkOff] = primaryRecDesc.getFields()[0];
-            tokenKeyPairTypeTraits[pkOff] = primaryRecDesc.getTypeTraits()[0];
-            tokenKeyPairComparatorFactories[pkOff] = 
primaryComparatorFactories[0];
-        }
-        if (numFilterFields > 0) {
-            tokenKeyPairFields[numPrimaryKeys + pkOff] = 
secondaryRecFields[numPrimaryKeys + numSecondaryKeys];
-        }
-        tokenKeyPairRecDesc = new RecordDescriptor(tokenKeyPairFields, 
tokenKeyPairTypeTraits);
-        if (filterFieldName != null) {
-            invertedIndexFields = new int[numTokenKeyPairFields];
-            for (int i = 0; i < invertedIndexFields.length; i++) {
-                invertedIndexFields[i] = i;
-            }
-            secondaryFilterFieldsForNonBulkLoadOps = new int[numFilterFields];
-            secondaryFilterFieldsForNonBulkLoadOps[0] = numSecondaryKeys + 
numPrimaryKeys;
-            invertedIndexFieldsForNonBulkLoadOps = new int[numSecondaryKeys + 
numPrimaryKeys];
-            for (int i = 0; i < invertedIndexFieldsForNonBulkLoadOps.length; 
i++) {
-                invertedIndexFieldsForNonBulkLoadOps[i] = i;
-            }
-        }
-
-    }
-
-    @Override
-    protected int getNumSecondaryKeys() {
-        return numTokenKeyPairFields - numPrimaryKeys;
-    }
-
-    @Override
-    public JobSpecification buildCreationJobSpec() throws AsterixException, 
AlgebricksException {
-        JobSpecification spec = JobSpecificationUtils.createJobSpecification();
-
-        //prepare a LocalResourceMetadata which will be stored in NC's local 
resource repository
-        IResourceFactory localResourceMetadata = new 
LSMInvertedIndexLocalResourceMetadataFactory(invListsTypeTraits,
-                primaryComparatorFactories, tokenTypeTraits, 
tokenComparatorFactories, tokenizerFactory, isPartitioned,
-                dataset.getDatasetId(), mergePolicyFactory, 
mergePolicyFactoryProperties, filterTypeTraits,
-                filterCmpFactories, invertedIndexFields, 
secondaryFilterFields, secondaryFilterFieldsForNonBulkLoadOps,
-                invertedIndexFieldsForNonBulkLoadOps);
-        ILocalResourceFactoryProvider localResourceFactoryProvider = new 
PersistentLocalResourceFactoryProvider(
-                localResourceMetadata, LocalResource.LSMInvertedIndexResource);
-
-        IIndexDataflowHelperFactory dataflowHelperFactory = 
createDataflowHelperFactory();
-        LSMInvertedIndexCreateOperatorDescriptor invIndexCreateOp =
-                new LSMInvertedIndexCreateOperatorDescriptor(spec, 
RuntimeComponentsProvider.RUNTIME_PROVIDER,
-                        secondaryFileSplitProvider, 
RuntimeComponentsProvider.RUNTIME_PROVIDER, tokenTypeTraits,
-                        tokenComparatorFactories, invListsTypeTraits, 
primaryComparatorFactories, tokenizerFactory,
-                        dataflowHelperFactory, localResourceFactoryProvider, 
NoOpOperationCallbackFactory.INSTANCE,
-                        LSMIndexUtil.getMetadataPageManagerFactory());
-        
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, 
invIndexCreateOp,
-                secondaryPartitionConstraint);
-        spec.addRoot(invIndexCreateOp);
-        spec.setConnectorPolicyAssignmentPolicy(new 
ConnectorPolicyAssignmentPolicy());
-        return spec;
-    }
-
-    @Override
-    public JobSpecification buildLoadingJobSpec() throws AsterixException, 
AlgebricksException {
-        JobSpecification spec = JobSpecificationUtils.createJobSpecification();
-
-        // Create dummy key provider for feeding the primary index scan.
-        AbstractOperatorDescriptor keyProviderOp = 
createDummyKeyProviderOp(spec);
-
-        // Create primary index scan op.
-        BTreeSearchOperatorDescriptor primaryScanOp = 
createPrimaryIndexScanOp(spec);
-
-        AbstractOperatorDescriptor sourceOp = primaryScanOp;
-        if (isEnforcingKeyTypes && !enforcedItemType.equals(itemType)) {
-            sourceOp = createCastOp(spec, dataset.getDatasetType());
-            spec.connect(new OneToOneConnectorDescriptor(spec), primaryScanOp, 
0, sourceOp, 0);
-        }
-        AlgebricksMetaOperatorDescriptor asterixAssignOp =
-                createAssignOp(spec, sourceOp, numSecondaryKeys, 
secondaryRecDesc);
-
-        // If any of the secondary fields are nullable, then add a select op
-        // that filters nulls.
-        AlgebricksMetaOperatorDescriptor selectOp = null;
-        if (anySecondaryKeyIsNullable || isEnforcingKeyTypes) {
-            selectOp = createFilterNullsSelectOp(spec, numSecondaryKeys, 
secondaryRecDesc);
-        }
-
-        // Create a tokenizer op.
-        AbstractOperatorDescriptor tokenizerOp = createTokenizerOp(spec);
-
-        // Sort by token + primary keys.
-        ExternalSortOperatorDescriptor sortOp =
-                createSortOp(spec, tokenKeyPairComparatorFactories, 
tokenKeyPairRecDesc);
-
-        // Create secondary inverted index bulk load op.
-        LSMInvertedIndexBulkLoadOperatorDescriptor invIndexBulkLoadOp = 
createInvertedIndexBulkLoadOp(spec);
-
-        AlgebricksMetaOperatorDescriptor metaOp = new 
AlgebricksMetaOperatorDescriptor(spec, 1, 0,
-                new IPushRuntimeFactory[] { new SinkRuntimeFactory() }, new 
RecordDescriptor[] {});
-        // Connect the operators.
-        spec.connect(new OneToOneConnectorDescriptor(spec), keyProviderOp, 0, 
primaryScanOp, 0);
-        spec.connect(new OneToOneConnectorDescriptor(spec), sourceOp, 0, 
asterixAssignOp, 0);
-        if (anySecondaryKeyIsNullable || isEnforcingKeyTypes) {
-            spec.connect(new OneToOneConnectorDescriptor(spec), 
asterixAssignOp, 0, selectOp, 0);
-            spec.connect(new OneToOneConnectorDescriptor(spec), selectOp, 0, 
tokenizerOp, 0);
-        } else {
-            spec.connect(new OneToOneConnectorDescriptor(spec), 
asterixAssignOp, 0, tokenizerOp, 0);
-        }
-        spec.connect(new OneToOneConnectorDescriptor(spec), tokenizerOp, 0, 
sortOp, 0);
-        spec.connect(new OneToOneConnectorDescriptor(spec), sortOp, 0, 
invIndexBulkLoadOp, 0);
-        spec.connect(new OneToOneConnectorDescriptor(spec), 
invIndexBulkLoadOp, 0, metaOp, 0);
-        spec.addRoot(metaOp);
-        spec.setConnectorPolicyAssignmentPolicy(new 
ConnectorPolicyAssignmentPolicy());
-        return spec;
-    }
-
-    private AbstractOperatorDescriptor createTokenizerOp(JobSpecification 
spec) throws AlgebricksException {
-        int docField = 0;
-        int[] primaryKeyFields = new int[numPrimaryKeys + numFilterFields];
-        for (int i = 0; i < primaryKeyFields.length; i++) {
-            primaryKeyFields[i] = numSecondaryKeys + i;
-        }
-        BinaryTokenizerOperatorDescriptor tokenizerOp = new 
BinaryTokenizerOperatorDescriptor(spec, tokenKeyPairRecDesc,
-                tokenizerFactory, docField, primaryKeyFields, isPartitioned, 
false);
-        
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, 
tokenizerOp,
-                primaryPartitionConstraint);
-        return tokenizerOp;
-    }
-
-    @Override
-    protected ExternalSortOperatorDescriptor createSortOp(JobSpecification 
spec,
-            IBinaryComparatorFactory[] secondaryComparatorFactories, 
RecordDescriptor secondaryRecDesc) {
-        // Sort on token and primary keys.
-        int[] sortFields = new int[numTokenKeyPairFields];
-        for (int i = 0; i < numTokenKeyPairFields; i++) {
-            sortFields[i] = i;
-        }
-        ExternalSortOperatorDescriptor sortOp = new 
ExternalSortOperatorDescriptor(spec,
-                physOptConf.getMaxFramesExternalSort(), sortFields, 
tokenKeyPairComparatorFactories, secondaryRecDesc);
-        
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, 
sortOp, primaryPartitionConstraint);
-        return sortOp;
-    }
-
-    private LSMInvertedIndexBulkLoadOperatorDescriptor 
createInvertedIndexBulkLoadOp(JobSpecification spec) {
-        int[] fieldPermutation = new int[numTokenKeyPairFields + 
numFilterFields];
-        for (int i = 0; i < fieldPermutation.length; i++) {
-            fieldPermutation[i] = i;
-        }
-        IIndexDataflowHelperFactory dataflowHelperFactory = 
createDataflowHelperFactory();
-        LSMInvertedIndexBulkLoadOperatorDescriptor invIndexBulkLoadOp = new 
LSMInvertedIndexBulkLoadOperatorDescriptor(
-                spec, secondaryRecDesc, fieldPermutation, false, 
numElementsHint, false,
-                RuntimeComponentsProvider.RUNTIME_PROVIDER, 
secondaryFileSplitProvider,
-                RuntimeComponentsProvider.RUNTIME_PROVIDER, tokenTypeTraits, 
tokenComparatorFactories,
-                invListsTypeTraits, primaryComparatorFactories, 
tokenizerFactory, dataflowHelperFactory,
-                LSMIndexUtil.getMetadataPageManagerFactory());
-        
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, 
invIndexBulkLoadOp,
-                secondaryPartitionConstraint);
-        return invIndexBulkLoadOp;
-    }
-
-    private IIndexDataflowHelperFactory createDataflowHelperFactory() {
-        StorageProperties storageProperties = 
propertiesProvider.getStorageProperties();
-        boolean temp = dataset.getDatasetDetails().isTemp();
-        if (!isPartitioned) {
-            return new LSMInvertedIndexDataflowHelperFactory(
-                    new 
AsterixVirtualBufferCacheProvider(dataset.getDatasetId()), mergePolicyFactory,
-                    mergePolicyFactoryProperties, new 
SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
-                    RuntimeComponentsProvider.RUNTIME_PROVIDER,
-                    LSMInvertedIndexIOOperationCallbackFactory.INSTANCE,
-                    storageProperties.getBloomFilterFalsePositiveRate(), 
invertedIndexFields, filterTypeTraits,
-                    filterCmpFactories, secondaryFilterFields, 
secondaryFilterFieldsForNonBulkLoadOps,
-                    invertedIndexFieldsForNonBulkLoadOps, !temp);
-        } else {
-            return new PartitionedLSMInvertedIndexDataflowHelperFactory(
-                    new 
AsterixVirtualBufferCacheProvider(dataset.getDatasetId()), mergePolicyFactory,
-                    mergePolicyFactoryProperties, new 
SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
-                    RuntimeComponentsProvider.RUNTIME_PROVIDER,
-                    LSMInvertedIndexIOOperationCallbackFactory.INSTANCE,
-                    storageProperties.getBloomFilterFalsePositiveRate(), 
invertedIndexFields, filterTypeTraits,
-                    filterCmpFactories, secondaryFilterFields, 
secondaryFilterFieldsForNonBulkLoadOps,
-                    invertedIndexFieldsForNonBulkLoadOps, !temp);
-        }
-    }
-
-    @Override
-    public JobSpecification buildCompactJobSpec() throws AsterixException, 
AlgebricksException {
-        JobSpecification spec = JobSpecificationUtils.createJobSpecification();
-
-        IIndexDataflowHelperFactory dataflowHelperFactory = 
createDataflowHelperFactory();
-        LSMInvertedIndexCompactOperator compactOp =
-                new LSMInvertedIndexCompactOperator(spec, 
RuntimeComponentsProvider.RUNTIME_PROVIDER,
-                        secondaryFileSplitProvider, 
RuntimeComponentsProvider.RUNTIME_PROVIDER, tokenTypeTraits,
-                        tokenComparatorFactories, invListsTypeTraits, 
primaryComparatorFactories, tokenizerFactory,
-                        dataflowHelperFactory, 
NoOpOperationCallbackFactory.INSTANCE, LSMIndexUtil
-                                .getMetadataPageManagerFactory());
-        
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, 
compactOp,
-                secondaryPartitionConstraint);
-
-        spec.addRoot(compactOp);
-        spec.setConnectorPolicyAssignmentPolicy(new 
ConnectorPolicyAssignmentPolicy());
-        return spec;
-    }
-}

Reply via email to