http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/SecondaryRTreeOperationsHelper.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/SecondaryRTreeOperationsHelper.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/SecondaryRTreeOperationsHelper.java
deleted file mode 100644
index c96ba4c..0000000
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/SecondaryRTreeOperationsHelper.java
+++ /dev/null
@@ -1,513 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.file;
-
-import java.util.List;
-
-import org.apache.asterix.app.external.ExternalIndexingOperations;
-import org.apache.asterix.common.config.StorageProperties;
-import org.apache.asterix.common.config.DatasetConfig.DatasetType;
-import org.apache.asterix.common.config.DatasetConfig.IndexType;
-import org.apache.asterix.common.config.GlobalConfig;
-import org.apache.asterix.common.config.IPropertiesProvider;
-import org.apache.asterix.common.context.AsterixVirtualBufferCacheProvider;
-import org.apache.asterix.common.dataflow.LSMIndexUtil;
-import org.apache.asterix.common.exceptions.AsterixException;
-import 
org.apache.asterix.common.ioopcallbacks.LSMRTreeIOOperationCallbackFactory;
-import org.apache.asterix.common.transactions.IResourceFactory;
-import 
org.apache.asterix.dataflow.data.nontagged.valueproviders.PrimitiveValueProviderFactory;
-import org.apache.asterix.external.indexing.IndexingConstants;
-import 
org.apache.asterix.external.operators.ExternalDataScanOperatorDescriptor;
-import org.apache.asterix.formats.nontagged.BinaryComparatorFactoryProvider;
-import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
-import org.apache.asterix.formats.nontagged.TypeTraitProvider;
-import org.apache.asterix.metadata.declared.MetadataProvider;
-import org.apache.asterix.metadata.entities.Index;
-import org.apache.asterix.metadata.utils.ExternalDatasetsRegistry;
-import org.apache.asterix.om.types.ATypeTag;
-import org.apache.asterix.om.types.IAType;
-import org.apache.asterix.om.util.NonTaggedFormatUtil;
-import org.apache.asterix.runtime.util.RuntimeComponentsProvider;
-import 
org.apache.asterix.transaction.management.opcallbacks.SecondaryIndexOperationTrackerProvider;
-import 
org.apache.asterix.transaction.management.resource.ExternalRTreeLocalResourceMetadataFactory;
-import 
org.apache.asterix.transaction.management.resource.LSMRTreeLocalResourceMetadataFactory;
-import 
org.apache.asterix.transaction.management.resource.PersistentLocalResourceFactoryProvider;
-import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.common.utils.Pair;
-import 
org.apache.hyracks.algebricks.core.jobgen.impl.ConnectorPolicyAssignmentPolicy;
-import 
org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
-import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
-import org.apache.hyracks.algebricks.runtime.operators.base.SinkRuntimeFactory;
-import 
org.apache.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
-import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
-import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
-import org.apache.hyracks.api.dataflow.value.ITypeTraits;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.job.JobSpecification;
-import org.apache.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
-import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
-import org.apache.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
-import 
org.apache.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
-import org.apache.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
-import 
org.apache.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
-import 
org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
-import 
org.apache.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
-import 
org.apache.hyracks.storage.am.common.dataflow.TreeIndexCreateOperatorDescriptor;
-import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
-import 
org.apache.hyracks.storage.am.lsm.common.dataflow.LSMTreeIndexCompactOperatorDescriptor;
-import 
org.apache.hyracks.storage.am.lsm.rtree.dataflow.ExternalRTreeDataflowHelperFactory;
-import 
org.apache.hyracks.storage.am.lsm.rtree.dataflow.LSMRTreeWithAntiMatterTuplesDataflowHelperFactory;
-import org.apache.hyracks.storage.am.rtree.frames.RTreePolicyType;
-import org.apache.hyracks.storage.common.file.ILocalResourceFactoryProvider;
-import org.apache.hyracks.storage.common.file.LocalResource;
-
-@SuppressWarnings("rawtypes")
-public class SecondaryRTreeOperationsHelper extends 
SecondaryIndexOperationsHelper {
-
-    protected IPrimitiveValueProviderFactory[] valueProviderFactories;
-    protected int numNestedSecondaryKeyFields;
-    protected ATypeTag keyType;
-    protected int[] primaryKeyFields;
-    protected int[] rtreeFields;
-    protected boolean isPointMBR;
-    protected RecordDescriptor secondaryRecDescForPointMBR = null;
-
-    protected SecondaryRTreeOperationsHelper(PhysicalOptimizationConfig 
physOptConf,
-            IPropertiesProvider propertiesProvider) {
-        super(physOptConf, propertiesProvider);
-    }
-
-    @Override
-    public JobSpecification buildCreationJobSpec() throws AsterixException, 
AlgebricksException {
-        JobSpecification spec = JobSpecificationUtils.createJobSpecification();
-
-        StorageProperties storageProperties = 
propertiesProvider.getStorageProperties();
-        boolean temp = dataset.getDatasetDetails().isTemp();
-        IIndexDataflowHelperFactory indexDataflowHelperFactory;
-        ILocalResourceFactoryProvider localResourceFactoryProvider;
-        if (dataset.getDatasetType() == DatasetType.INTERNAL) {
-
-            IBinaryComparatorFactory[] btreeCompFactories = 
getComparatorFactoriesForDeletedKeyBTree();
-
-            //prepare a LocalResourceMetadata which will be stored in NC's 
local resource repository
-            IResourceFactory localResourceMetadata = new 
LSMRTreeLocalResourceMetadataFactory(secondaryTypeTraits,
-                    secondaryComparatorFactories, btreeCompFactories, 
valueProviderFactories, RTreePolicyType.RTREE,
-                    MetadataProvider.proposeLinearizer(keyType, 
secondaryComparatorFactories.length),
-                    dataset.getDatasetId(), mergePolicyFactory, 
mergePolicyFactoryProperties, filterTypeTraits,
-                    filterCmpFactories, rtreeFields, primaryKeyFields, 
secondaryFilterFields, isPointMBR);
-            localResourceFactoryProvider =
-                    new 
PersistentLocalResourceFactoryProvider(localResourceMetadata, 
LocalResource.LSMRTreeResource);
-            indexDataflowHelperFactory = new 
LSMRTreeWithAntiMatterTuplesDataflowHelperFactory(valueProviderFactories,
-                    RTreePolicyType.RTREE, btreeCompFactories,
-                    new 
AsterixVirtualBufferCacheProvider(dataset.getDatasetId()), mergePolicyFactory,
-                    mergePolicyFactoryProperties, new 
SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
-                    RuntimeComponentsProvider.RUNTIME_PROVIDER, 
LSMRTreeIOOperationCallbackFactory.INSTANCE,
-                    MetadataProvider.proposeLinearizer(keyType, 
secondaryComparatorFactories.length), rtreeFields,
-                    filterTypeTraits, filterCmpFactories, 
secondaryFilterFields, !temp, isPointMBR);
-        } else {
-            // External dataset
-            // Prepare a LocalResourceMetadata which will be stored in NC's 
local resource repository
-            IResourceFactory localResourceMetadata = new 
ExternalRTreeLocalResourceMetadataFactory(secondaryTypeTraits,
-                    secondaryComparatorFactories, 
ExternalIndexingOperations.getBuddyBtreeComparatorFactories(),
-                    valueProviderFactories, RTreePolicyType.RTREE,
-                    MetadataProvider.proposeLinearizer(keyType, 
secondaryComparatorFactories.length),
-                    dataset.getDatasetId(), mergePolicyFactory, 
mergePolicyFactoryProperties, primaryKeyFields,
-                    isPointMBR);
-            localResourceFactoryProvider = new 
PersistentLocalResourceFactoryProvider(localResourceMetadata,
-                    LocalResource.ExternalRTreeResource);
-
-            indexDataflowHelperFactory = new 
ExternalRTreeDataflowHelperFactory(valueProviderFactories,
-                    RTreePolicyType.RTREE, 
ExternalIndexingOperations.getBuddyBtreeComparatorFactories(),
-                    mergePolicyFactory, mergePolicyFactoryProperties,
-                    new 
SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
-                    RuntimeComponentsProvider.RUNTIME_PROVIDER, 
LSMRTreeIOOperationCallbackFactory.INSTANCE,
-                    MetadataProvider.proposeLinearizer(keyType, 
secondaryComparatorFactories.length),
-                    storageProperties.getBloomFilterFalsePositiveRate(), new 
int[] { numNestedSecondaryKeyFields },
-                    
ExternalDatasetsRegistry.INSTANCE.getDatasetVersion(dataset), true, isPointMBR);
-        }
-
-        TreeIndexCreateOperatorDescriptor secondaryIndexCreateOp = new 
TreeIndexCreateOperatorDescriptor(spec,
-                RuntimeComponentsProvider.RUNTIME_PROVIDER, 
RuntimeComponentsProvider.RUNTIME_PROVIDER,
-                secondaryFileSplitProvider, secondaryTypeTraits, 
secondaryComparatorFactories, null,
-                indexDataflowHelperFactory, localResourceFactoryProvider, 
NoOpOperationCallbackFactory.INSTANCE,
-                LSMIndexUtil.getMetadataPageManagerFactory());
-        
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, 
secondaryIndexCreateOp,
-                secondaryPartitionConstraint);
-        spec.addRoot(secondaryIndexCreateOp);
-        spec.setConnectorPolicyAssignmentPolicy(new 
ConnectorPolicyAssignmentPolicy());
-        return spec;
-    }
-
-    private IBinaryComparatorFactory[] 
getComparatorFactoriesForDeletedKeyBTree() {
-        IBinaryComparatorFactory[] btreeCompFactories = new 
IBinaryComparatorFactory[secondaryTypeTraits.length];;
-        int i = 0;
-        for (; i < secondaryComparatorFactories.length; i++) {
-            btreeCompFactories[i] = secondaryComparatorFactories[i];
-        }
-        for (int j = 0; i < secondaryTypeTraits.length; i++, j++) {
-            btreeCompFactories[i] = primaryComparatorFactories[j];
-        }
-        return btreeCompFactories;
-    }
-
-    @Override
-    protected int getNumSecondaryKeys() {
-        return numNestedSecondaryKeyFields;
-    }
-
-    @Override
-    protected void setSecondaryRecDescAndComparators(IndexType indexType, 
List<List<String>> secondaryKeyFields,
-            List<IAType> secondaryKeyTypes, int gramLength, MetadataProvider 
metadata)
-            throws AlgebricksException, AsterixException {
-        int numSecondaryKeys = secondaryKeyFields.size();
-        if (numSecondaryKeys != 1) {
-            throw new AsterixException("Cannot use " + numSecondaryKeys
-                    + " fields as a key for the R-tree index. There can be 
only one field as a key for the R-tree index.");
-        }
-        Pair<IAType, Boolean> spatialTypePair =
-                Index.getNonNullableOpenFieldType(secondaryKeyTypes.get(0), 
secondaryKeyFields.get(0), itemType);
-        IAType spatialType = spatialTypePair.first;
-        anySecondaryKeyIsNullable = spatialTypePair.second;
-        if (spatialType == null) {
-            throw new AsterixException("Could not find field " + 
secondaryKeyFields.get(0) + " in the schema.");
-        }
-        isPointMBR = spatialType.getTypeTag() == ATypeTag.POINT || 
spatialType.getTypeTag() == ATypeTag.POINT3D;
-        int numDimensions = 
NonTaggedFormatUtil.getNumDimensions(spatialType.getTypeTag());
-        numNestedSecondaryKeyFields = numDimensions * 2;
-        int recordColumn = dataset.getDatasetType() == DatasetType.INTERNAL ? 
numPrimaryKeys : 0;
-        secondaryFieldAccessEvalFactories =
-                metadata.getFormat().createMBRFactory(isEnforcingKeyTypes ? 
enforcedItemType : itemType,
-                        secondaryKeyFields.get(0), recordColumn, 
numDimensions, filterFieldName);
-        secondaryComparatorFactories = new 
IBinaryComparatorFactory[numNestedSecondaryKeyFields];
-        valueProviderFactories = new 
IPrimitiveValueProviderFactory[numNestedSecondaryKeyFields];
-        ISerializerDeserializer[] secondaryRecFields =
-                new ISerializerDeserializer[numPrimaryKeys + 
numNestedSecondaryKeyFields + numFilterFields];
-        ISerializerDeserializer[] enforcedRecFields = new 
ISerializerDeserializer[1 + numPrimaryKeys + numFilterFields];
-        secondaryTypeTraits = new ITypeTraits[numNestedSecondaryKeyFields + 
numPrimaryKeys];
-        ITypeTraits[] enforcedTypeTraits = new ITypeTraits[1 + numPrimaryKeys];
-        IAType nestedKeyType = 
NonTaggedFormatUtil.getNestedSpatialType(spatialType.getTypeTag());
-        keyType = nestedKeyType.getTypeTag();
-        for (int i = 0; i < numNestedSecondaryKeyFields; i++) {
-            ISerializerDeserializer keySerde =
-                    
SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(nestedKeyType);
-            secondaryRecFields[i] = keySerde;
-            secondaryComparatorFactories[i] =
-                    
BinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(nestedKeyType,
 true);
-            secondaryTypeTraits[i] = 
TypeTraitProvider.INSTANCE.getTypeTrait(nestedKeyType);
-            valueProviderFactories[i] = PrimitiveValueProviderFactory.INSTANCE;
-
-        }
-        // Add serializers and comparators for primary index fields.
-        if (dataset.getDatasetType() == DatasetType.INTERNAL) {
-            for (int i = 0; i < numPrimaryKeys; i++) {
-                secondaryRecFields[numNestedSecondaryKeyFields + i] = 
primaryRecDesc.getFields()[i];
-                secondaryTypeTraits[numNestedSecondaryKeyFields + i] = 
primaryRecDesc.getTypeTraits()[i];
-                enforcedRecFields[i] = primaryRecDesc.getFields()[i];
-                enforcedTypeTraits[i] = primaryRecDesc.getTypeTraits()[i];
-            }
-        } else {
-            for (int i = 0; i < numPrimaryKeys; i++) {
-                secondaryRecFields[numNestedSecondaryKeyFields + i] = 
IndexingConstants.getSerializerDeserializer(i);
-                secondaryTypeTraits[numNestedSecondaryKeyFields + i] = 
IndexingConstants.getTypeTraits(i);
-                enforcedRecFields[i] = 
IndexingConstants.getSerializerDeserializer(i);
-                enforcedTypeTraits[i] = IndexingConstants.getTypeTraits(i);
-            }
-        }
-        enforcedRecFields[numPrimaryKeys] =
-                
SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(itemType);
-        enforcedRecDesc = new RecordDescriptor(enforcedRecFields, 
enforcedTypeTraits);
-        if (numFilterFields > 0) {
-            rtreeFields = new int[numNestedSecondaryKeyFields + 
numPrimaryKeys];
-            for (int i = 0; i < rtreeFields.length; i++) {
-                rtreeFields[i] = i;
-            }
-
-            Pair<IAType, Boolean> typePair = 
Index.getNonNullableKeyFieldType(filterFieldName, itemType);
-            IAType type = typePair.first;
-            ISerializerDeserializer serde = 
SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(type);
-            secondaryRecFields[numPrimaryKeys + numNestedSecondaryKeyFields] = 
serde;
-        }
-        secondaryRecDesc = new RecordDescriptor(secondaryRecFields);
-        primaryKeyFields = new int[numPrimaryKeys];
-        for (int i = 0; i < primaryKeyFields.length; i++) {
-            primaryKeyFields[i] = i + numNestedSecondaryKeyFields;
-        }
-        if (isPointMBR) {
-            int numNestedSecondaryKeyFieldForPointMBR = 
numNestedSecondaryKeyFields / 2;
-            ISerializerDeserializer[] recFieldsForPointMBR = new 
ISerializerDeserializer[numPrimaryKeys
-                    + numNestedSecondaryKeyFieldForPointMBR + numFilterFields];
-            int idx = 0;
-            for (int i = 0; i < numNestedSecondaryKeyFieldForPointMBR; i++) {
-                recFieldsForPointMBR[idx++] = secondaryRecFields[i];
-            }
-            for (int i = 0; i < numPrimaryKeys + numFilterFields; i++) {
-                recFieldsForPointMBR[idx++] = 
secondaryRecFields[numNestedSecondaryKeyFields + i];
-            }
-            secondaryRecDescForPointMBR = new 
RecordDescriptor(recFieldsForPointMBR);
-        }
-    }
-
-    @Override
-    public JobSpecification buildLoadingJobSpec() throws AsterixException, 
AlgebricksException {
-        /***************************************************
-         * [ About PointMBR Optimization ]
-         * Instead of storing a MBR(4 doubles) for a point(2 doubles) in RTree 
leaf node, PointMBR concept is introduced.
-         * PointMBR is a way to store a point as 2 doubles in RTree leaf node.
-         * This reduces RTree index size roughly in half.
-         * In order to fully benefit from the PointMBR concept, besides RTree,
-         * external sort operator during bulk-loading (from either data 
loading or index creation)
-         * must deal with point as 2 doubles instead of 4 doubles. Otherwise, 
external sort will suffer from twice as
-         * many doubles as it actually requires. For this purpose, PointMBR 
specific optimization logic is added as follows:
-         * 1) CreateMBR function in assign operator generates 2 doubles, 
instead of 4 doubles.
-         * 2) External sort operator sorts points represented with 2 doubles.
-         * 3) Bulk-loading in RTree takes 4 doubles by reading 2 doubles twice 
and then, do the same work as non-point MBR cases.
-         ***************************************************/
-        JobSpecification spec = JobSpecificationUtils.createJobSpecification();
-        boolean temp = dataset.getDatasetDetails().isTemp();
-        int[] fieldPermutation = 
createFieldPermutationForBulkLoadOp(numNestedSecondaryKeyFields);
-        int numNestedSecondaryKeFieldsConsideringPointMBR =
-                isPointMBR ? numNestedSecondaryKeyFields / 2 : 
numNestedSecondaryKeyFields;
-        RecordDescriptor secondaryRecDescConsideringPointMBR =
-                isPointMBR ? secondaryRecDescForPointMBR : secondaryRecDesc;
-        if (dataset.getDatasetType() == DatasetType.INTERNAL) {
-            // Create dummy key provider for feeding the primary index scan.
-            AbstractOperatorDescriptor keyProviderOp = 
createDummyKeyProviderOp(spec);
-
-            // Create primary index scan op.
-            BTreeSearchOperatorDescriptor primaryScanOp = 
createPrimaryIndexScanOp(spec);
-
-            // Assign op.
-            AbstractOperatorDescriptor sourceOp = primaryScanOp;
-            if (isEnforcingKeyTypes && !enforcedItemType.equals(itemType)) {
-                sourceOp = createCastOp(spec, dataset.getDatasetType());
-                spec.connect(new OneToOneConnectorDescriptor(spec), 
primaryScanOp, 0, sourceOp, 0);
-            }
-            AlgebricksMetaOperatorDescriptor asterixAssignOp = 
createAssignOp(spec, sourceOp,
-                    numNestedSecondaryKeFieldsConsideringPointMBR, 
secondaryRecDescConsideringPointMBR);
-
-            // If any of the secondary fields are nullable, then add a select 
op that filters nulls.
-            AlgebricksMetaOperatorDescriptor selectOp = null;
-            if (anySecondaryKeyIsNullable || isEnforcingKeyTypes) {
-                selectOp = createFilterNullsSelectOp(spec, 
numNestedSecondaryKeFieldsConsideringPointMBR,
-                        secondaryRecDescConsideringPointMBR);
-            }
-
-            // Sort by secondary keys.
-            ExternalSortOperatorDescriptor sortOp =
-                    createSortOp(spec,
-                            new IBinaryComparatorFactory[] { 
MetadataProvider.proposeLinearizer(keyType,
-                                    secondaryComparatorFactories.length) },
-                            isPointMBR ? secondaryRecDescForPointMBR : 
secondaryRecDesc);
-
-            StorageProperties storageProperties = 
propertiesProvider.getStorageProperties();
-
-            IBinaryComparatorFactory[] btreeCompFactories = 
getComparatorFactoriesForDeletedKeyBTree();
-            IIndexDataflowHelperFactory idff = new 
LSMRTreeWithAntiMatterTuplesDataflowHelperFactory(
-                    valueProviderFactories, RTreePolicyType.RTREE, 
btreeCompFactories,
-                    new 
AsterixVirtualBufferCacheProvider(dataset.getDatasetId()), mergePolicyFactory,
-                    mergePolicyFactoryProperties, new 
SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
-                    RuntimeComponentsProvider.RUNTIME_PROVIDER, 
LSMRTreeIOOperationCallbackFactory.INSTANCE,
-                    MetadataProvider.proposeLinearizer(keyType, 
secondaryComparatorFactories.length), rtreeFields,
-                    filterTypeTraits, filterCmpFactories, 
secondaryFilterFields, !temp, isPointMBR);;
-
-            // Create secondary RTree bulk load op.
-            TreeIndexBulkLoadOperatorDescriptor secondaryBulkLoadOp =
-                    createTreeIndexBulkLoadOp(spec, fieldPermutation, idff, 
GlobalConfig.DEFAULT_TREE_FILL_FACTOR);
-            AlgebricksMetaOperatorDescriptor metaOp = new 
AlgebricksMetaOperatorDescriptor(spec, 1, 0,
-                    new IPushRuntimeFactory[] { new SinkRuntimeFactory() }, 
new RecordDescriptor[] {});
-            // Connect the operators.
-            spec.connect(new OneToOneConnectorDescriptor(spec), keyProviderOp, 
0, primaryScanOp, 0);
-            spec.connect(new OneToOneConnectorDescriptor(spec), sourceOp, 0, 
asterixAssignOp, 0);
-            if (anySecondaryKeyIsNullable || isEnforcingKeyTypes) {
-                spec.connect(new OneToOneConnectorDescriptor(spec), 
asterixAssignOp, 0, selectOp, 0);
-                spec.connect(new OneToOneConnectorDescriptor(spec), selectOp, 
0, sortOp, 0);
-            } else {
-                spec.connect(new OneToOneConnectorDescriptor(spec), 
asterixAssignOp, 0, sortOp, 0);
-            }
-            spec.connect(new OneToOneConnectorDescriptor(spec), sortOp, 0, 
secondaryBulkLoadOp, 0);
-            spec.connect(new OneToOneConnectorDescriptor(spec), 
secondaryBulkLoadOp, 0, metaOp, 0);
-            spec.addRoot(metaOp);
-            spec.setConnectorPolicyAssignmentPolicy(new 
ConnectorPolicyAssignmentPolicy());
-        } else {
-            // External dataset
-            /*
-             * In case of external data, this method is used to build loading 
jobs for both initial load on index creation
-             * and transaction load on dataset referesh
-             */
-            // Create external indexing scan operator
-            ExternalDataScanOperatorDescriptor primaryScanOp = 
createExternalIndexingOp(spec);
-            AbstractOperatorDescriptor sourceOp = primaryScanOp;
-            if (isEnforcingKeyTypes && !enforcedItemType.equals(itemType)) {
-                sourceOp = createCastOp(spec, dataset.getDatasetType());
-                spec.connect(new OneToOneConnectorDescriptor(spec), 
primaryScanOp, 0, sourceOp, 0);
-            }
-            // Assign op.
-            AlgebricksMetaOperatorDescriptor asterixAssignOp = 
createExternalAssignOp(spec,
-                    numNestedSecondaryKeFieldsConsideringPointMBR, 
secondaryRecDescConsideringPointMBR);
-
-            // If any of the secondary fields are nullable, then add a select 
op that filters nulls.
-            AlgebricksMetaOperatorDescriptor selectOp = null;
-            if (anySecondaryKeyIsNullable || isEnforcingKeyTypes) {
-                selectOp = createFilterNullsSelectOp(spec, 
numNestedSecondaryKeFieldsConsideringPointMBR,
-                        secondaryRecDescConsideringPointMBR);
-            }
-
-            // Sort by secondary keys.
-            ExternalSortOperatorDescriptor sortOp =
-                    createSortOp(spec,
-                            new IBinaryComparatorFactory[] { 
MetadataProvider.proposeLinearizer(keyType,
-                                    secondaryComparatorFactories.length) },
-                            isPointMBR ? secondaryRecDescForPointMBR : 
secondaryRecDesc);
-            StorageProperties storageProperties = 
propertiesProvider.getStorageProperties();
-
-            // Create the dataflow helper factory
-            ExternalRTreeDataflowHelperFactory dataflowHelperFactory = new 
ExternalRTreeDataflowHelperFactory(
-                    valueProviderFactories, RTreePolicyType.RTREE, 
primaryComparatorFactories, mergePolicyFactory,
-                    mergePolicyFactoryProperties, new 
SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
-                    RuntimeComponentsProvider.RUNTIME_PROVIDER, 
LSMRTreeIOOperationCallbackFactory.INSTANCE,
-                    MetadataProvider.proposeLinearizer(keyType, 
secondaryComparatorFactories.length),
-                    storageProperties.getBloomFilterFalsePositiveRate(), new 
int[] { numNestedSecondaryKeyFields },
-                    
ExternalDatasetsRegistry.INSTANCE.getDatasetVersion(dataset), true, isPointMBR);
-            // Create secondary RTree bulk load op.
-            IOperatorDescriptor root;
-            AbstractTreeIndexOperatorDescriptor secondaryBulkLoadOp;
-            if (externalFiles != null) {
-                // Transaction load
-                secondaryBulkLoadOp = createExternalIndexBulkModifyOp(spec, 
fieldPermutation, dataflowHelperFactory,
-                        GlobalConfig.DEFAULT_TREE_FILL_FACTOR);
-                root = secondaryBulkLoadOp;
-            } else {
-                // Initial load
-                secondaryBulkLoadOp = createTreeIndexBulkLoadOp(spec, 
fieldPermutation, dataflowHelperFactory,
-                        GlobalConfig.DEFAULT_TREE_FILL_FACTOR);
-                AlgebricksMetaOperatorDescriptor metaOp = new 
AlgebricksMetaOperatorDescriptor(spec, 1, 0,
-                        new IPushRuntimeFactory[] { new SinkRuntimeFactory() },
-                        new RecordDescriptor[] { secondaryRecDesc });
-                spec.connect(new OneToOneConnectorDescriptor(spec), 
secondaryBulkLoadOp, 0, metaOp, 0);
-                root = metaOp;
-            }
-
-            spec.connect(new OneToOneConnectorDescriptor(spec), sourceOp, 0, 
asterixAssignOp, 0);
-            if (anySecondaryKeyIsNullable || isEnforcingKeyTypes) {
-                spec.connect(new OneToOneConnectorDescriptor(spec), 
asterixAssignOp, 0, selectOp, 0);
-                spec.connect(new OneToOneConnectorDescriptor(spec), selectOp, 
0, sortOp, 0);
-            } else {
-                spec.connect(new OneToOneConnectorDescriptor(spec), 
asterixAssignOp, 0, sortOp, 0);
-            }
-            spec.connect(new OneToOneConnectorDescriptor(spec), sortOp, 0, 
secondaryBulkLoadOp, 0);
-            spec.addRoot(root);
-            spec.setConnectorPolicyAssignmentPolicy(new 
ConnectorPolicyAssignmentPolicy());
-        }
-        return spec;
-    }
-
-    protected int[] createFieldPermutationForBulkLoadOp(int 
numSecondaryKeyFields) {
-        int[] fieldPermutation = new int[numSecondaryKeyFields + 
numPrimaryKeys + numFilterFields];
-        int numSecondaryKeyFieldsForPointMBR = numSecondaryKeyFields / 2;
-        int end = isPointMBR ? numSecondaryKeyFieldsForPointMBR : 
fieldPermutation.length;
-        for (int i = 0; i < end; i++) {
-            fieldPermutation[i] = i;
-        }
-        if (isPointMBR) {
-            
/*******************************************************************************
-             * For example, suppose that 2d point type data is indexed using 
RTree, there is no
-             * filter fields, and a primary key consists of a single field.
-             * ========== Without PointMBR optimization ==========
-             * If there is no point type optimization, the input operator of 
RTree's TreeIndexBulkLoadOperator
-             * delivers five variables to the TreeIndexBulkLoadOperator as 
follows:
-             * [$var1, $var2, $var3, $var4, $var5]
-             * where $var1 ~ $var4 together represent an MBR of a point object.
-             * Since it is a point object, $var1 and $var3 have always 
identical values. So do $var2 and $var3.
-             * $var5 represents a primary key value.
-             * fieldPermutation variable captures this order correctly by 
putting values in the array as follows:
-             * [0,1,2,3,4]
-             * =========== With PointMBR optimization ===========
-             * With PointMBR optimization, the input operator of RTree's 
TreeIndexBulkLoadOperator
-             * delivers 3 variables to the TreeIndexBulkLoadOperator as 
follows:
-             * [$var1, $var2, $var3]
-             * where $var1 and $var2 together represent an MBR of a point 
object.
-             * $var3 represents a primary key value.
-             * fieldPermutation variable captures this order correctly by 
putting values in the array as follows:
-             * [0,1,0,1,2]
-             * This means that bulkloadOp reads the pair of $var1 and $var2 
twice in order to provide the same
-             * output just like when there were no PointMBR optimization 
available.
-             * This adjustment is done in this if clause code.
-             
*********************************************************************************/
-            int idx = numSecondaryKeyFieldsForPointMBR;
-            //add the rest of the sk fields for pointMBR
-            for (int i = 0; i < numSecondaryKeyFieldsForPointMBR; i++) {
-                fieldPermutation[idx++] = i;
-            }
-            //add the pk and filter fields
-            end = numSecondaryKeyFieldsForPointMBR + numPrimaryKeys + 
numFilterFields;
-            for (int i = numSecondaryKeyFieldsForPointMBR; i < end; i++) {
-                fieldPermutation[idx++] = i;
-            }
-        }
-        return fieldPermutation;
-    }
-
-    @Override
-    public JobSpecification buildCompactJobSpec() throws AsterixException, 
AlgebricksException {
-        JobSpecification spec = JobSpecificationUtils.createJobSpecification();
-
-        StorageProperties storageProperties = 
propertiesProvider.getStorageProperties();
-        boolean temp = dataset.getDatasetDetails().isTemp();
-        LSMTreeIndexCompactOperatorDescriptor compactOp;
-        if (dataset.getDatasetType() == DatasetType.INTERNAL) {
-            IBinaryComparatorFactory[] btreeCompFactories = 
getComparatorFactoriesForDeletedKeyBTree();;
-            IIndexDataflowHelperFactory idff = new 
LSMRTreeWithAntiMatterTuplesDataflowHelperFactory(
-                    valueProviderFactories, RTreePolicyType.RTREE, 
btreeCompFactories,
-                    new 
AsterixVirtualBufferCacheProvider(dataset.getDatasetId()), mergePolicyFactory,
-                    mergePolicyFactoryProperties, new 
SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
-                    RuntimeComponentsProvider.RUNTIME_PROVIDER, 
LSMRTreeIOOperationCallbackFactory.INSTANCE,
-                    MetadataProvider.proposeLinearizer(keyType, 
secondaryComparatorFactories.length), rtreeFields,
-                    filterTypeTraits, filterCmpFactories, 
secondaryFilterFields, !temp, isPointMBR);
-            compactOp =
-                    new LSMTreeIndexCompactOperatorDescriptor(spec, 
RuntimeComponentsProvider.RUNTIME_PROVIDER,
-                            RuntimeComponentsProvider.RUNTIME_PROVIDER, 
secondaryFileSplitProvider,
-                            secondaryTypeTraits, secondaryComparatorFactories, 
secondaryBloomFilterKeyFields, idff,
-                            NoOpOperationCallbackFactory.INSTANCE, 
LSMIndexUtil.getMetadataPageManagerFactory());
-        } else {
-            // External dataset
-            compactOp =
-                    new LSMTreeIndexCompactOperatorDescriptor(spec, 
RuntimeComponentsProvider.RUNTIME_PROVIDER,
-                            RuntimeComponentsProvider.RUNTIME_PROVIDER, 
secondaryFileSplitProvider,
-                            secondaryTypeTraits, secondaryComparatorFactories, 
secondaryBloomFilterKeyFields,
-                            new 
ExternalRTreeDataflowHelperFactory(valueProviderFactories, 
RTreePolicyType.RTREE,
-                                    primaryComparatorFactories, 
mergePolicyFactory, mergePolicyFactoryProperties,
-                                    new 
SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
-                                    RuntimeComponentsProvider.RUNTIME_PROVIDER,
-                                    
LSMRTreeIOOperationCallbackFactory.INSTANCE,
-                                    
MetadataProvider.proposeLinearizer(keyType, 
secondaryComparatorFactories.length),
-                                    
storageProperties.getBloomFilterFalsePositiveRate(),
-                                    new int[] { numNestedSecondaryKeyFields },
-                                    
ExternalDatasetsRegistry.INSTANCE.getDatasetVersion(dataset), true, isPointMBR),
-                            NoOpOperationCallbackFactory.INSTANCE, 
LSMIndexUtil.getMetadataPageManagerFactory());
-        }
-
-        
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, 
compactOp,
-                secondaryPartitionConstraint);
-        spec.addRoot(compactOp);
-        spec.setConnectorPolicyAssignmentPolicy(new 
ConnectorPolicyAssignmentPolicy());
-        return spec;
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/StorageComponentProvider.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/StorageComponentProvider.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/StorageComponentProvider.java
new file mode 100644
index 0000000..d0d14c8
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/StorageComponentProvider.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.file;
+
+import org.apache.asterix.common.context.IStorageComponentProvider;
+import org.apache.asterix.common.context.ITransactionSubsystemProvider;
+import org.apache.asterix.common.context.TransactionSubsystemProvider;
+import 
org.apache.asterix.dataflow.data.nontagged.valueproviders.PrimitiveValueProviderFactory;
+import org.apache.asterix.formats.nontagged.BinaryComparatorFactoryProvider;
+import org.apache.asterix.formats.nontagged.TypeTraitProvider;
+import org.apache.asterix.runtime.utils.RuntimeComponentsProvider;
+import org.apache.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;
+import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
+import org.apache.hyracks.storage.am.common.api.IMetadataPageManagerFactory;
+import org.apache.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
+import 
org.apache.hyracks.storage.am.common.freepage.AppendOnlyLinkedMetadataPageManagerFactory;
+import 
org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationSchedulerProvider;
+import org.apache.hyracks.storage.common.IStorageManager;
+
+public class StorageComponentProvider implements IStorageComponentProvider {
+
+    @Override
+    public IPrimitiveValueProviderFactory getPrimitiveValueProviderFactory() {
+        return PrimitiveValueProviderFactory.INSTANCE;
+    }
+
+    @Override
+    public ITransactionSubsystemProvider getTransactionSubsystemProvider() {
+        return TransactionSubsystemProvider.INSTANCE;
+    }
+
+    @Override
+    public ILSMIOOperationSchedulerProvider getIoOperationSchedulerProvider() {
+        return RuntimeComponentsProvider.RUNTIME_PROVIDER;
+    }
+
+    @Override
+    public IMetadataPageManagerFactory getMetadataPageManagerFactory() {
+        return AppendOnlyLinkedMetadataPageManagerFactory.INSTANCE;
+    }
+
+    @Override
+    public IBinaryComparatorFactoryProvider getComparatorFactoryProvider() {
+        return BinaryComparatorFactoryProvider.INSTANCE;
+    }
+
+    @Override
+    public TypeTraitProvider getTypeTraitProvider() {
+        return TypeTraitProvider.INSTANCE;
+    }
+
+    @Override
+    public IIndexLifecycleManagerProvider getIndexLifecycleManagerProvider() {
+        return RuntimeComponentsProvider.RUNTIME_PROVIDER;
+    }
+
+    @Override
+    public IStorageManager getStorageManager() {
+        return RuntimeComponentsProvider.RUNTIME_PROVIDER;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
index b17a722..7955ad6 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
@@ -29,13 +29,13 @@ import javax.servlet.Servlet;
 
 import org.apache.asterix.active.ActiveLifecycleListener;
 import org.apache.asterix.api.http.server.ApiServlet;
-import org.apache.asterix.api.http.server.FullApiServlet;
 import org.apache.asterix.api.http.server.ClusterApiServlet;
 import org.apache.asterix.api.http.server.ClusterControllerDetailsApiServlet;
 import org.apache.asterix.api.http.server.ConnectorApiServlet;
 import org.apache.asterix.api.http.server.DdlApiServlet;
 import org.apache.asterix.api.http.server.DiagnosticsApiServlet;
 import org.apache.asterix.api.http.server.FeedServlet;
+import org.apache.asterix.api.http.server.FullApiServlet;
 import org.apache.asterix.api.http.server.NodeControllerDetailsApiServlet;
 import org.apache.asterix.api.http.server.QueryApiServlet;
 import org.apache.asterix.api.http.server.QueryResultApiServlet;
@@ -46,23 +46,25 @@ import 
org.apache.asterix.api.http.server.ShutdownApiServlet;
 import org.apache.asterix.api.http.server.UpdateApiServlet;
 import org.apache.asterix.api.http.server.VersionApiServlet;
 import org.apache.asterix.api.http.servlet.ServletConstants;
-import org.apache.asterix.app.cc.CompilerExtensionManager;
+import org.apache.asterix.app.cc.CCExtensionManager;
 import org.apache.asterix.app.cc.ResourceIdManager;
 import org.apache.asterix.app.external.ExternalLibraryUtils;
 import org.apache.asterix.common.api.AsterixThreadFactory;
 import org.apache.asterix.common.config.AsterixExtension;
 import org.apache.asterix.common.config.ExternalProperties;
 import org.apache.asterix.common.config.MetadataProperties;
+import org.apache.asterix.common.context.IStorageComponentProvider;
 import org.apache.asterix.common.library.ILibraryManager;
 import org.apache.asterix.common.utils.LetUtil.Lets;
 import org.apache.asterix.external.library.ExternalLibraryManager;
+import org.apache.asterix.file.StorageComponentProvider;
 import org.apache.asterix.messaging.CCMessageBroker;
 import org.apache.asterix.metadata.MetadataManager;
 import org.apache.asterix.metadata.api.IAsterixStateProxy;
 import org.apache.asterix.metadata.bootstrap.AsterixStateProxy;
 import org.apache.asterix.metadata.cluster.ClusterManagerProvider;
 import org.apache.asterix.runtime.job.resource.JobCapacityController;
-import org.apache.asterix.runtime.util.AppContextInfo;
+import org.apache.asterix.runtime.utils.AppContextInfo;
 import org.apache.hyracks.api.application.ICCApplicationContext;
 import org.apache.hyracks.api.application.ICCApplicationEntryPoint;
 import org.apache.hyracks.api.client.HyracksConnection;
@@ -83,7 +85,8 @@ public class CCApplicationEntryPoint implements 
ICCApplicationEntryPoint {
     private static final Logger LOGGER = 
Logger.getLogger(CCApplicationEntryPoint.class.getName());
     private static IAsterixStateProxy proxy;
     protected ICCApplicationContext appCtx;
-    protected CompilerExtensionManager ccExtensionManager;
+    protected CCExtensionManager ccExtensionManager;
+    protected IStorageComponentProvider componentProvider;
     private IJobCapacityController jobCapacityController;
     protected WebManager webManager;
 
@@ -98,17 +101,16 @@ public class CCApplicationEntryPoint implements 
ICCApplicationEntryPoint {
         }
 
         appCtx.setThreadFactory(new 
AsterixThreadFactory(appCtx.getThreadFactory(), new 
LifeCycleComponentManager()));
-        GlobalRecoveryManager.instantiate((HyracksConnection) 
getNewHyracksClientConnection());
         ILibraryManager libraryManager = new ExternalLibraryManager();
         ResourceIdManager resourceIdManager = new ResourceIdManager();
         ExternalLibraryUtils.setUpExternaLibraries(libraryManager, false);
-        AppContextInfo.initialize(appCtx, getNewHyracksClientConnection(), 
GlobalRecoveryManager.instance(),
-                libraryManager, resourceIdManager, () -> 
MetadataManager.INSTANCE);
-        ccExtensionManager = new CompilerExtensionManager(getExtensions());
+        componentProvider = new StorageComponentProvider();
+        GlobalRecoveryManager.instantiate((HyracksConnection) 
getNewHyracksClientConnection(), componentProvider);
+        AppContextInfo.initialize(appCtx, getNewHyracksClientConnection(), 
libraryManager, resourceIdManager,
+                () -> MetadataManager.INSTANCE, 
GlobalRecoveryManager.instance());
+        ccExtensionManager = new CCExtensionManager(getExtensions());
         AppContextInfo.INSTANCE.setExtensionManager(ccExtensionManager);
-
         final CCConfig ccConfig = controllerService.getCCConfig();
-
         if (System.getProperty("java.rmi.server.hostname") == null) {
             System.setProperty("java.rmi.server.hostname", 
ccConfig.clusterNetIpAddress);
         }
@@ -119,8 +121,7 @@ public class CCApplicationEntryPoint implements 
ICCApplicationEntryPoint {
 
         MetadataManager.initialize(proxy, metadataProperties);
 
-        AppContextInfo.INSTANCE.getCCApplicationContext()
-                .addJobLifecycleListener(ActiveLifecycleListener.INSTANCE);
+        
AppContextInfo.INSTANCE.getCCApplicationContext().addJobLifecycleListener(ActiveLifecycleListener.INSTANCE);
 
         // create event loop groups
         webManager = new WebManager();
@@ -161,19 +162,19 @@ public class CCApplicationEntryPoint implements 
ICCApplicationEntryPoint {
     }
 
     protected HttpServer setupWebServer(ExternalProperties externalProperties) 
throws Exception {
-        HttpServer webServer = new HttpServer(webManager.getBosses(), 
webManager.getWorkers(), externalProperties
-                .getWebInterfacePort());
+        HttpServer webServer = new HttpServer(webManager.getBosses(), 
webManager.getWorkers(),
+                externalProperties.getWebInterfacePort());
         IHyracksClientConnection hcc = getNewHyracksClientConnection();
         webServer.setAttribute(HYRACKS_CONNECTION_ATTR, hcc);
-        webServer.addLet(new ApiServlet(webServer.ctx(), new String[] { "/*" 
}, ccExtensionManager
-                .getAqlCompilationProvider(), 
ccExtensionManager.getSqlppCompilationProvider(), ccExtensionManager
-                        .getQueryTranslatorFactory()));
+        webServer.addLet(new ApiServlet(webServer.ctx(), new String[] { "/*" },
+                ccExtensionManager.getAqlCompilationProvider(), 
ccExtensionManager.getSqlppCompilationProvider(),
+                ccExtensionManager.getQueryTranslatorFactory(), 
componentProvider));
         return webServer;
     }
 
     protected HttpServer setupJSONAPIServer(ExternalProperties 
externalProperties) throws Exception {
-        HttpServer jsonAPIServer = new HttpServer(webManager.getBosses(), 
webManager.getWorkers(), externalProperties
-                .getAPIServerPort());
+        HttpServer jsonAPIServer =
+                new HttpServer(webManager.getBosses(), 
webManager.getWorkers(), externalProperties.getAPIServerPort());
         IHyracksClientConnection hcc = getNewHyracksClientConnection();
         jsonAPIServer.setAttribute(HYRACKS_CONNECTION_ATTR, hcc);
         jsonAPIServer.setAttribute(ASTERIX_BUILD_PROP_ATTR, 
AppContextInfo.INSTANCE);
@@ -211,8 +212,8 @@ public class CCApplicationEntryPoint implements 
ICCApplicationEntryPoint {
     }
 
     protected HttpServer setupQueryWebServer(ExternalProperties 
externalProperties) throws Exception {
-        HttpServer queryWebServer = new HttpServer(webManager.getBosses(), 
webManager.getWorkers(), externalProperties
-                .getQueryWebInterfacePort());
+        HttpServer queryWebServer = new HttpServer(webManager.getBosses(), 
webManager.getWorkers(),
+                externalProperties.getQueryWebInterfacePort());
         IHyracksClientConnection hcc = getNewHyracksClientConnection();
         queryWebServer.setAttribute(HYRACKS_CONNECTION_ATTR, hcc);
         queryWebServer.addLet(new 
QueryWebInterfaceServlet(queryWebServer.ctx(), new String[] { "/*" }));
@@ -229,8 +230,8 @@ public class CCApplicationEntryPoint implements 
ICCApplicationEntryPoint {
     }
 
     protected HttpServer setupFeedServer(ExternalProperties 
externalProperties) throws Exception {
-        HttpServer feedServer = new HttpServer(webManager.getBosses(), 
webManager.getWorkers(), externalProperties
-                .getFeedServerPort());
+        HttpServer feedServer = new HttpServer(webManager.getBosses(), 
webManager.getWorkers(),
+                externalProperties.getFeedServerPort());
         feedServer.setAttribute(HYRACKS_CONNECTION_ATTR, 
getNewHyracksClientConnection());
         feedServer.addLet(new FeedServlet(feedServer.ctx(), new String[] { "/" 
}));
         return feedServer;
@@ -240,35 +241,35 @@ public class CCApplicationEntryPoint implements 
ICCApplicationEntryPoint {
         switch (key) {
             case AQL:
                 return new FullApiServlet(server.ctx(), paths, 
ccExtensionManager.getAqlCompilationProvider(),
-                        ccExtensionManager.getQueryTranslatorFactory());
+                        ccExtensionManager.getQueryTranslatorFactory(), 
componentProvider);
             case AQL_QUERY:
                 return new QueryApiServlet(server.ctx(), paths, 
ccExtensionManager.getAqlCompilationProvider(),
-                        ccExtensionManager.getQueryTranslatorFactory());
+                        ccExtensionManager.getQueryTranslatorFactory(), 
componentProvider);
             case AQL_UPDATE:
                 return new UpdateApiServlet(server.ctx(), paths, 
ccExtensionManager.getAqlCompilationProvider(),
-                        ccExtensionManager.getQueryTranslatorFactory());
+                        ccExtensionManager.getQueryTranslatorFactory(), 
componentProvider);
             case AQL_DDL:
                 return new DdlApiServlet(server.ctx(), paths, 
ccExtensionManager.getAqlCompilationProvider(),
-                        ccExtensionManager.getQueryTranslatorFactory());
+                        ccExtensionManager.getQueryTranslatorFactory(), 
componentProvider);
             case SQLPP:
                 return new FullApiServlet(server.ctx(), paths, 
ccExtensionManager.getSqlppCompilationProvider(),
-                        ccExtensionManager.getQueryTranslatorFactory());
+                        ccExtensionManager.getQueryTranslatorFactory(), 
componentProvider);
             case SQLPP_QUERY:
                 return new QueryApiServlet(server.ctx(), paths, 
ccExtensionManager.getSqlppCompilationProvider(),
-                        ccExtensionManager.getQueryTranslatorFactory());
+                        ccExtensionManager.getQueryTranslatorFactory(), 
componentProvider);
             case SQLPP_UPDATE:
                 return new UpdateApiServlet(server.ctx(), paths, 
ccExtensionManager.getSqlppCompilationProvider(),
-                        ccExtensionManager.getQueryTranslatorFactory());
+                        ccExtensionManager.getQueryTranslatorFactory(), 
componentProvider);
             case SQLPP_DDL:
                 return new DdlApiServlet(server.ctx(), paths, 
ccExtensionManager.getSqlppCompilationProvider(),
-                        ccExtensionManager.getQueryTranslatorFactory());
+                        ccExtensionManager.getQueryTranslatorFactory(), 
componentProvider);
             case QUERY_STATUS:
                 return new QueryStatusApiServlet(server.ctx(), paths);
             case QUERY_RESULT:
                 return new QueryResultApiServlet(server.ctx(), paths);
             case QUERY_SERVICE:
-                return new QueryServiceServlet(server.ctx(), paths, 
ccExtensionManager
-                        .getSqlppCompilationProvider(), 
ccExtensionManager.getQueryTranslatorFactory());
+                return new QueryServiceServlet(server.ctx(), paths, 
ccExtensionManager.getSqlppCompilationProvider(),
+                        ccExtensionManager.getQueryTranslatorFactory(), 
componentProvider);
             case CONNECTOR:
                 return new ConnectorApiServlet(server.ctx(), paths);
             case SHUTDOWN:

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
index 41d8b0d..53b577a 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
@@ -41,7 +41,7 @@ import 
org.apache.asterix.metadata.cluster.AddNodeWorkResponse;
 import org.apache.asterix.metadata.cluster.ClusterManagerProvider;
 import org.apache.asterix.metadata.cluster.RemoveNodeWork;
 import org.apache.asterix.metadata.cluster.RemoveNodeWorkResponse;
-import org.apache.asterix.runtime.util.ClusterStateManager;
+import org.apache.asterix.runtime.utils.ClusterStateManager;
 import org.apache.hyracks.api.application.IClusterLifecycleListener;
 import org.apache.hyracks.api.exceptions.HyracksException;
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterWorkExecutor.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterWorkExecutor.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterWorkExecutor.java
index b1d8dd3..71fff3f 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterWorkExecutor.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterWorkExecutor.java
@@ -30,7 +30,7 @@ import org.apache.asterix.event.schema.cluster.Node;
 import org.apache.asterix.metadata.cluster.AddNodeWork;
 import org.apache.asterix.metadata.cluster.ClusterManagerProvider;
 import org.apache.asterix.metadata.cluster.RemoveNodeWork;
-import org.apache.asterix.runtime.util.ClusterStateManager;
+import org.apache.asterix.runtime.utils.ClusterStateManager;
 
 public class ClusterWorkExecutor implements Runnable {
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
index d437b5b..002e270 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
@@ -25,14 +25,14 @@ import java.util.Set;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import org.apache.asterix.app.external.ExternalIndexingOperations;
 import org.apache.asterix.common.api.IClusterManagementWork;
 import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
 import org.apache.asterix.common.api.IClusterManagementWorkResponse;
-import org.apache.asterix.common.cluster.IGlobalRecoveryMaanger;
+import org.apache.asterix.common.cluster.IGlobalRecoveryManager;
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
-import 
org.apache.asterix.common.config.DatasetConfig.ExternalDatasetTransactionState;
 import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp;
+import org.apache.asterix.common.config.DatasetConfig.TransactionState;
+import org.apache.asterix.common.context.IStorageComponentProvider;
 import org.apache.asterix.external.indexing.ExternalFile;
 import org.apache.asterix.metadata.MetadataManager;
 import org.apache.asterix.metadata.MetadataTransactionContext;
@@ -41,22 +41,25 @@ import org.apache.asterix.metadata.entities.Dataset;
 import org.apache.asterix.metadata.entities.Dataverse;
 import org.apache.asterix.metadata.entities.ExternalDatasetDetails;
 import org.apache.asterix.metadata.entities.Index;
+import org.apache.asterix.metadata.utils.ExternalIndexingOperations;
 import org.apache.asterix.metadata.utils.MetadataConstants;
-import org.apache.asterix.runtime.util.ClusterStateManager;
+import org.apache.asterix.runtime.utils.ClusterStateManager;
 import org.apache.hyracks.api.client.HyracksConnection;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
 
-public class GlobalRecoveryManager implements IGlobalRecoveryMaanger {
+public class GlobalRecoveryManager implements IGlobalRecoveryManager {
 
     private static final Logger LOGGER = 
Logger.getLogger(GlobalRecoveryManager.class.getName());
     private static GlobalRecoveryManager instance;
     private static ClusterState state;
+    private final IStorageComponentProvider componentProvider;
     private HyracksConnection hcc;
 
-    private GlobalRecoveryManager(HyracksConnection hcc) {
+    private GlobalRecoveryManager(HyracksConnection hcc, 
IStorageComponentProvider componentProvider) {
         setState(ClusterState.UNUSABLE);
         this.hcc = hcc;
+        this.componentProvider = componentProvider;
     }
 
     @Override
@@ -107,7 +110,7 @@ public class GlobalRecoveryManager implements 
IGlobalRecoveryMaanger {
                         List<Dataverse> dataverses = 
MetadataManager.INSTANCE.getDataverses(mdTxnCtx);
                         for (Dataverse dataverse : dataverses) {
                             if 
(!dataverse.getDataverseName().equals(MetadataConstants.METADATA_DATAVERSE_NAME))
 {
-                                MetadataProvider metadataProvider = new 
MetadataProvider(dataverse);
+                                MetadataProvider metadataProvider = new 
MetadataProvider(dataverse, componentProvider);
                                 List<Dataset> datasets = 
MetadataManager.INSTANCE.getDataverseDatasets(mdTxnCtx,
                                         dataverse.getDataverseName());
                                 for (Dataset dataset : datasets) {
@@ -118,16 +121,16 @@ public class GlobalRecoveryManager implements 
IGlobalRecoveryMaanger {
                                                 dataset.getDataverseName(), 
dataset.getDatasetName());
                                         if (!indexes.isEmpty()) {
                                             // Get the state of the dataset
-                                            ExternalDatasetDetails dsd = 
(ExternalDatasetDetails) dataset
-                                                    .getDatasetDetails();
-                                            ExternalDatasetTransactionState 
datasetState = dsd.getState();
-                                            if (datasetState == 
ExternalDatasetTransactionState.BEGIN) {
+                                            ExternalDatasetDetails dsd =
+                                                    (ExternalDatasetDetails) 
dataset.getDatasetDetails();
+                                            TransactionState datasetState = 
dsd.getState();
+                                            if (datasetState == 
TransactionState.BEGIN) {
                                                 List<ExternalFile> files = 
MetadataManager.INSTANCE
                                                         
.getDatasetExternalFiles(mdTxnCtx, dataset);
                                                 // if persumed abort, roll 
backward
                                                 // 1. delete all pending files
                                                 for (ExternalFile file : 
files) {
-                                                    if (file.getPendingOp() != 
ExternalFilePendingOp.PENDING_NO_OP) {
+                                                    if (file.getPendingOp() != 
ExternalFilePendingOp.NO_OP) {
                                                         
MetadataManager.INSTANCE.dropExternalFile(mdTxnCtx, file);
                                                     }
                                                 }
@@ -138,11 +141,11 @@ public class GlobalRecoveryManager implements 
IGlobalRecoveryMaanger {
                                                 executeHyracksJob(jobSpec);
                                                 // 3. correct the dataset state
                                                 ((ExternalDatasetDetails) 
dataset.getDatasetDetails())
-                                                        
.setState(ExternalDatasetTransactionState.COMMIT);
+                                                        
.setState(TransactionState.COMMIT);
                                                 
MetadataManager.INSTANCE.updateDataset(mdTxnCtx, dataset);
                                                 
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
                                                 mdTxnCtx = 
MetadataManager.INSTANCE.beginTransaction();
-                                            } else if (datasetState == 
ExternalDatasetTransactionState.READY_TO_COMMIT) {
+                                            } else if (datasetState == 
TransactionState.READY_TO_COMMIT) {
                                                 List<ExternalFile> files = 
MetadataManager.INSTANCE
                                                         
.getDatasetExternalFiles(mdTxnCtx, dataset);
                                                 // if ready to commit, roll 
forward
@@ -153,15 +156,15 @@ public class GlobalRecoveryManager implements 
IGlobalRecoveryMaanger {
                                                 executeHyracksJob(jobSpec);
                                                 // 2. add pending files in 
metadata
                                                 for (ExternalFile file : 
files) {
-                                                    if (file.getPendingOp() == 
ExternalFilePendingOp.PENDING_ADD_OP) {
+                                                    if (file.getPendingOp() == 
ExternalFilePendingOp.ADD_OP) {
                                                         
MetadataManager.INSTANCE.dropExternalFile(mdTxnCtx, file);
-                                                        
file.setPendingOp(ExternalFilePendingOp.PENDING_NO_OP);
+                                                        
file.setPendingOp(ExternalFilePendingOp.NO_OP);
                                                         
MetadataManager.INSTANCE.addExternalFile(mdTxnCtx, file);
-                                                    } else if (file
-                                                            .getPendingOp() == 
ExternalFilePendingOp.PENDING_DROP_OP) {
+                                                    } else if 
(file.getPendingOp() == ExternalFilePendingOp.DROP_OP) {
                                                         // find original file
                                                         for (ExternalFile 
originalFile : files) {
-                                                            if 
(originalFile.getFileName().equals(file.getFileName())) {
+                                                            if 
(originalFile.getFileName()
+                                                                    
.equals(file.getFileName())) {
                                                                 
MetadataManager.INSTANCE.dropExternalFile(mdTxnCtx,
                                                                         file);
                                                                 
MetadataManager.INSTANCE.dropExternalFile(mdTxnCtx,
@@ -170,10 +173,11 @@ public class GlobalRecoveryManager implements 
IGlobalRecoveryMaanger {
                                                             }
                                                         }
                                                     } else if (file
-                                                            .getPendingOp() == 
ExternalFilePendingOp.PENDING_APPEND_OP) {
+                                                            .getPendingOp() == 
ExternalFilePendingOp.APPEND_OP) {
                                                         // find original file
                                                         for (ExternalFile 
originalFile : files) {
-                                                            if 
(originalFile.getFileName().equals(file.getFileName())) {
+                                                            if 
(originalFile.getFileName()
+                                                                    
.equals(file.getFileName())) {
                                                                 
MetadataManager.INSTANCE.dropExternalFile(mdTxnCtx,
                                                                         file);
                                                                 
MetadataManager.INSTANCE.dropExternalFile(mdTxnCtx,
@@ -187,7 +191,7 @@ public class GlobalRecoveryManager implements 
IGlobalRecoveryMaanger {
                                                 }
                                                 // 3. correct the dataset state
                                                 ((ExternalDatasetDetails) 
dataset.getDatasetDetails())
-                                                        
.setState(ExternalDatasetTransactionState.COMMIT);
+                                                        
.setState(TransactionState.COMMIT);
                                                 
MetadataManager.INSTANCE.updateDataset(mdTxnCtx, dataset);
                                                 
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
                                                 mdTxnCtx = 
MetadataManager.INSTANCE.beginTransaction();
@@ -225,8 +229,8 @@ public class GlobalRecoveryManager implements 
IGlobalRecoveryMaanger {
         return instance;
     }
 
-    public static synchronized void instantiate(HyracksConnection hcc) {
-        instance = new GlobalRecoveryManager(hcc);
+    public static synchronized void instantiate(HyracksConnection hcc, 
IStorageComponentProvider componentProvider) {
+        instance = new GlobalRecoveryManager(hcc, componentProvider);
     }
 
     public static synchronized void setState(ClusterState state) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/FlushDatasetUtils.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/FlushDatasetUtils.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/FlushDatasetUtils.java
deleted file mode 100644
index 5a3419a..0000000
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/FlushDatasetUtils.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.asterix.util;
-
-import org.apache.asterix.common.config.CompilerProperties;
-import org.apache.asterix.metadata.MetadataTransactionContext;
-import org.apache.asterix.metadata.declared.MetadataProvider;
-import org.apache.asterix.metadata.entities.Dataset;
-import org.apache.asterix.runtime.job.listener.JobEventListenerFactory;
-import org.apache.asterix.runtime.operators.std.FlushDatasetOperatorDescriptor;
-import org.apache.asterix.runtime.util.AppContextInfo;
-import 
org.apache.asterix.transaction.management.service.transaction.JobIdFactory;
-import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
-import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
-import org.apache.hyracks.algebricks.common.utils.Pair;
-import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
-import 
org.apache.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
-import 
org.apache.hyracks.algebricks.runtime.operators.std.EmptyTupleSourceRuntimeFactory;
-import org.apache.hyracks.api.client.IHyracksClientConnection;
-import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.job.JobSpecification;
-import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
-import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
-
-public class FlushDatasetUtils {
-
-    public static void flushDataset(IHyracksClientConnection hcc, 
MetadataProvider metadataProvider,
-            MetadataTransactionContext mdTxnCtx, String dataverseName, String 
datasetName, String indexName)
-            throws Exception {
-        CompilerProperties compilerProperties = 
AppContextInfo.INSTANCE.getCompilerProperties();
-        int frameSize = compilerProperties.getFrameSize();
-        JobSpecification spec = new JobSpecification(frameSize);
-
-        RecordDescriptor[] rDescs = new RecordDescriptor[] { new 
RecordDescriptor(new ISerializerDeserializer[] {}) };
-        AlgebricksMetaOperatorDescriptor emptySource = new 
AlgebricksMetaOperatorDescriptor(spec, 0, 1,
-                new IPushRuntimeFactory[] { new 
EmptyTupleSourceRuntimeFactory() }, rDescs);
-
-        org.apache.asterix.common.transactions.JobId jobId = 
JobIdFactory.generateJobId();
-        Dataset dataset = metadataProvider.findDataset(dataverseName, 
datasetName);
-        FlushDatasetOperatorDescriptor flushOperator = new 
FlushDatasetOperatorDescriptor(spec, jobId,
-                dataset.getDatasetId());
-
-        spec.connect(new OneToOneConnectorDescriptor(spec), emptySource, 0, 
flushOperator, 0);
-
-        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> 
primarySplitsAndConstraint = metadataProvider
-                .splitProviderAndPartitionConstraintsForDataset(dataverseName, 
datasetName, indexName,
-                        dataset.getDatasetDetails().isTemp());
-        AlgebricksPartitionConstraint primaryPartitionConstraint = 
primarySplitsAndConstraint.second;
-
-        
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, 
emptySource,
-                primaryPartitionConstraint);
-
-        JobEventListenerFactory jobEventListenerFactory = new 
JobEventListenerFactory(jobId, true);
-        spec.setJobletEventListenerFactory(jobEventListenerFactory);
-        JobUtils.runJob(hcc, spec, true);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/JobUtils.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/JobUtils.java 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/JobUtils.java
deleted file mode 100644
index fb50b0c..0000000
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/JobUtils.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.asterix.util;
-
-import org.apache.asterix.api.common.Job;
-import org.apache.hyracks.api.client.IHyracksClientConnection;
-import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.api.job.JobSpecification;
-
-public class JobUtils {
-
-    public static JobId runJob(IHyracksClientConnection hcc, JobSpecification 
spec, boolean waitForCompletion)
-            throws Exception {
-        JobId[] jobIds = executeJobArray(hcc, new Job[] { new Job(spec) }, 
waitForCompletion);
-        return jobIds[0];
-    }
-
-    public static JobId[] executeJobArray(IHyracksClientConnection hcc, Job[] 
jobs, boolean waitForCompletion)
-            throws Exception {
-        JobId[] startedJobIds = new JobId[jobs.length];
-        for (int i = 0; i < jobs.length; i++) {
-            JobSpecification spec = jobs[i].getJobSpec();
-            spec.setMaxReattempts(0);
-            JobId jobId = hcc.startJob(spec);
-            startedJobIds[i] = jobId;
-            if (waitForCompletion) {
-                hcc.waitForCompletion(jobId);
-            }
-        }
-        return startedJobIds;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/ResourceUtils.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/ResourceUtils.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/ResourceUtils.java
deleted file mode 100644
index 50a21bc..0000000
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/ResourceUtils.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.asterix.util;
-
-import org.apache.asterix.app.resource.RequiredCapacityVisitor;
-import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
-import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlan;
-import org.apache.hyracks.api.job.resource.ClusterCapacity;
-import org.apache.hyracks.api.job.resource.IClusterCapacity;
-
-public class ResourceUtils {
-
-    private ResourceUtils() {
-    }
-
-    /**
-     * Calculates the required cluster capacity from a given query plan, the 
computation locations,
-     * the operator memory budgets, and frame size.
-     *
-     * @param plan,
-     *            a given query plan.
-     * @param computationLocations,
-     *            the partitions for computation.
-     * @param sortFrameLimit,
-     *            the frame limit for one sorter partition.
-     * @param groupFrameLimit,
-     *            the frame limit for one group-by partition.
-     * @param joinFrameLimit
-     *            the frame limit for one joiner partition.
-     * @param frameSize
-     *            the frame size used in query execution.
-     * @return the required cluster capacity for executing the query.
-     * @throws AlgebricksException
-     *             if the query plan is malformed.
-     */
-    public static IClusterCapacity getRequiredCompacity(ILogicalPlan plan,
-            AlgebricksAbsolutePartitionConstraint computationLocations, int 
sortFrameLimit, int groupFrameLimit,
-            int joinFrameLimit, int frameSize)
-            throws AlgebricksException {
-        // Creates a cluster capacity visitor.
-        IClusterCapacity clusterCapacity = new ClusterCapacity();
-        RequiredCapacityVisitor visitor = new 
RequiredCapacityVisitor(computationLocations.getLocations().length,
-                sortFrameLimit, groupFrameLimit, joinFrameLimit, frameSize, 
clusterCapacity);
-
-        // There could be only one root operator for a top-level query plan.
-        ILogicalOperator rootOp = plan.getRoots().get(0).getValue();
-        rootOp.accept(visitor, null);
-        return clusterCapacity;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/DataverseUtil.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/DataverseUtil.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/DataverseUtil.java
new file mode 100644
index 0000000..48fd782
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/DataverseUtil.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.utils;
+
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.metadata.entities.Dataverse;
+import org.apache.asterix.runtime.utils.RuntimeUtils;
+import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.dataflow.std.file.FileRemoveOperatorDescriptor;
+import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
+
+public class DataverseUtil {
+
+    private DataverseUtil() {
+    }
+
+    public static JobSpecification dropDataverseJobSpec(Dataverse dataverse, 
MetadataProvider metadata) {
+        JobSpecification jobSpec = RuntimeUtils.createJobSpecification();
+        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> 
splitsAndConstraint =
+                metadata.splitAndConstraints(dataverse.getDataverseName());
+        FileRemoveOperatorDescriptor frod =
+                new FileRemoveOperatorDescriptor(jobSpec, 
splitsAndConstraint.first, false);
+        
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(jobSpec, 
frod, splitsAndConstraint.second);
+        jobSpec.addRoot(frod);
+        return jobSpec;
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/ExtensionUtil.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/ExtensionUtil.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/ExtensionUtil.java
new file mode 100644
index 0000000..07eed0d
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/ExtensionUtil.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.utils;
+
+import org.apache.asterix.algebra.base.ILangExtension;
+import org.apache.asterix.algebra.base.ILangExtension.Language;
+import org.apache.asterix.app.cc.IStatementExecutorExtension;
+import org.apache.asterix.common.api.ExtensionId;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.exceptions.RuntimeDataException;
+import org.apache.asterix.compiler.provider.ILangCompilationProvider;
+import org.apache.asterix.metadata.api.IMetadataExtension;
+import org.apache.asterix.translator.IStatementExecutorFactory;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+
+/**
+ * Provide util methods dealing with extensions
+ */
+public class ExtensionUtil {
+
+    private ExtensionUtil() {
+    }
+
+    /**
+     * Verifies no conflict and return the language compilation provider
+     *
+     * @param lang
+     *            the language for the passed compilation provider
+     * @param cp
+     *            placeholder for compilation provider
+     * @param le
+     *            user defined extension for compilation provider
+     * @return a pair of extension id and extended compilation provider
+     * @throws RuntimeDataException
+     *             if there was a conflict between two extensions
+     */
+    public static Pair<ExtensionId, ILangCompilationProvider> 
extendLangCompilationProvider(Language lang,
+            Pair<ExtensionId, ILangCompilationProvider> cp, ILangExtension le) 
throws RuntimeDataException {
+        if (cp != null && le.getLangCompilationProvider(lang) != null) {
+            throw new 
RuntimeDataException(ErrorCode.EXTENSION_COMPONENT_CONFLICT, le.getId(), 
cp.first,
+                    lang.toString());
+        }
+        return (le.getLangCompilationProvider(lang) != null)
+                ? new Pair<>(le.getId(), le.getLangCompilationProvider(lang)) 
: cp;
+    }
+
+    /**
+     * Validate no extension conflict and return statement executor extension
+     *
+     * @param qte
+     *            place holder for statement executor extension
+     * @param extension
+     *            user defined extension
+     * @return the user defined extension
+     * @throws RuntimeDataException
+     *             if extension conflict was detected
+     */
+    public static IStatementExecutorExtension 
extendStatementExecutor(IStatementExecutorExtension qte,
+            IStatementExecutorExtension extension) throws RuntimeDataException 
{
+        if (qte != null) {
+            throw new 
RuntimeDataException(ErrorCode.EXTENSION_COMPONENT_CONFLICT, qte.getId(), 
extension.getId(),
+                    IStatementExecutorFactory.class.getSimpleName());
+        }
+        return extension;
+    }
+
+    /**
+     * Validate no extension conflict and extends tuple translator provider
+     *
+     * @param metadataExtension
+     *            place holder for tuple translator provider extension
+     * @param mde
+     *            user defined metadata extension
+     * @return the metadata extension if the extension defines a metadata 
tuple translator, null otherwise
+     * @throws RuntimeDataException
+     *             if an extension conflict was detected
+     */
+    public static IMetadataExtension 
extendTupleTranslatorProvider(IMetadataExtension metadataExtension,
+            IMetadataExtension mde) throws RuntimeDataException {
+        if (metadataExtension != null) {
+            throw new 
RuntimeDataException(ErrorCode.EXTENSION_COMPONENT_CONFLICT, 
metadataExtension.getId(),
+                    mde.getId(), IMetadataExtension.class.getSimpleName());
+        }
+        return mde.getMetadataTupleTranslatorProvider() == null ? null : mde;
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FlushDatasetUtil.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FlushDatasetUtil.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FlushDatasetUtil.java
new file mode 100644
index 0000000..bc8a79e
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FlushDatasetUtil.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.utils;
+
+import org.apache.asterix.common.config.CompilerProperties;
+import org.apache.asterix.common.utils.JobUtils;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.runtime.job.listener.JobEventListenerFactory;
+import org.apache.asterix.runtime.operators.std.FlushDatasetOperatorDescriptor;
+import org.apache.asterix.runtime.utils.AppContextInfo;
+import 
org.apache.asterix.transaction.management.service.transaction.JobIdFactory;
+import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
+import 
org.apache.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
+import 
org.apache.hyracks.algebricks.runtime.operators.std.EmptyTupleSourceRuntimeFactory;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
+
+public class FlushDatasetUtil {
+    private FlushDatasetUtil() {
+    }
+
+    public static void flushDataset(IHyracksClientConnection hcc, 
MetadataProvider metadataProvider,
+            String dataverseName, String datasetName, String indexName) throws 
Exception {
+        CompilerProperties compilerProperties = 
AppContextInfo.INSTANCE.getCompilerProperties();
+        int frameSize = compilerProperties.getFrameSize();
+        JobSpecification spec = new JobSpecification(frameSize);
+
+        RecordDescriptor[] rDescs = new RecordDescriptor[] { new 
RecordDescriptor(new ISerializerDeserializer[] {}) };
+        AlgebricksMetaOperatorDescriptor emptySource = new 
AlgebricksMetaOperatorDescriptor(spec, 0, 1,
+                new IPushRuntimeFactory[] { new 
EmptyTupleSourceRuntimeFactory() }, rDescs);
+
+        org.apache.asterix.common.transactions.JobId jobId = 
JobIdFactory.generateJobId();
+        Dataset dataset = metadataProvider.findDataset(dataverseName, 
datasetName);
+        FlushDatasetOperatorDescriptor flushOperator =
+                new FlushDatasetOperatorDescriptor(spec, jobId, 
dataset.getDatasetId());
+
+        spec.connect(new OneToOneConnectorDescriptor(spec), emptySource, 0, 
flushOperator, 0);
+
+        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> 
primarySplitsAndConstraint =
+                metadataProvider.getSplitProviderAndConstraints(dataverseName, 
datasetName, indexName,
+                        dataset.getDatasetDetails().isTemp());
+        AlgebricksPartitionConstraint primaryPartitionConstraint = 
primarySplitsAndConstraint.second;
+
+        
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, 
emptySource,
+                primaryPartitionConstraint);
+
+        JobEventListenerFactory jobEventListenerFactory = new 
JobEventListenerFactory(jobId, true);
+        spec.setJobletEventListenerFactory(jobEventListenerFactory);
+        JobUtils.runJob(hcc, spec, true);
+    }
+
+}

Reply via email to