http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index 7238db9..1beaed0 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -30,24 +30,17 @@ import 
org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp;
 import org.apache.asterix.common.config.DatasetConfig.IndexType;
 import org.apache.asterix.common.config.GlobalConfig;
 import org.apache.asterix.common.config.StorageProperties;
-import org.apache.asterix.common.context.AsterixVirtualBufferCacheProvider;
+import org.apache.asterix.common.context.IStorageComponentProvider;
 import org.apache.asterix.common.context.ITransactionSubsystemProvider;
-import org.apache.asterix.common.context.TransactionSubsystemProvider;
 import org.apache.asterix.common.dataflow.IApplicationContextInfo;
-import org.apache.asterix.common.dataflow.LSMIndexUtil;
 import 
org.apache.asterix.common.dataflow.LSMInvertedIndexInsertDeleteOperatorDescriptor;
 import 
org.apache.asterix.common.dataflow.LSMTreeInsertDeleteOperatorDescriptor;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.exceptions.CompilationException;
-import 
org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory;
-import 
org.apache.asterix.common.ioopcallbacks.LSMBTreeWithBuddyIOOperationCallbackFactory;
-import 
org.apache.asterix.common.ioopcallbacks.LSMInvertedIndexIOOperationCallbackFactory;
-import 
org.apache.asterix.common.ioopcallbacks.LSMRTreeIOOperationCallbackFactory;
 import org.apache.asterix.common.library.ILibraryManager;
 import org.apache.asterix.common.transactions.IRecoveryManager.ResourceType;
 import org.apache.asterix.common.transactions.JobId;
 import org.apache.asterix.common.utils.StoragePathUtil;
-import 
org.apache.asterix.dataflow.data.nontagged.valueproviders.PrimitiveValueProviderFactory;
 import org.apache.asterix.external.adapter.factory.LookupAdapterFactory;
 import org.apache.asterix.external.api.IAdapterFactory;
 import org.apache.asterix.external.api.IDataSourceAdapter;
@@ -55,9 +48,9 @@ import 
org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
 import org.apache.asterix.external.indexing.ExternalFile;
 import org.apache.asterix.external.indexing.IndexingConstants;
 import 
org.apache.asterix.external.operators.ExternalBTreeSearchOperatorDescriptor;
-import 
org.apache.asterix.external.operators.ExternalDataScanOperatorDescriptor;
 import org.apache.asterix.external.operators.ExternalLookupOperatorDescriptor;
 import 
org.apache.asterix.external.operators.ExternalRTreeSearchOperatorDescriptor;
+import org.apache.asterix.external.operators.ExternalScanOperatorDescriptor;
 import org.apache.asterix.external.operators.FeedIntakeOperatorDescriptor;
 import org.apache.asterix.external.provider.AdapterFactoryProvider;
 import org.apache.asterix.external.util.ExternalDataConstants;
@@ -79,29 +72,26 @@ import 
org.apache.asterix.metadata.entities.FeedPolicyEntity;
 import org.apache.asterix.metadata.entities.Index;
 import org.apache.asterix.metadata.entities.InternalDatasetDetails;
 import org.apache.asterix.metadata.feeds.FeedMetadataUtil;
-import org.apache.asterix.metadata.utils.DatasetUtils;
-import org.apache.asterix.metadata.utils.ExternalDatasetsRegistry;
+import org.apache.asterix.metadata.utils.DatasetUtil;
 import org.apache.asterix.metadata.utils.MetadataConstants;
 import org.apache.asterix.metadata.utils.SplitsAndConstraintsUtil;
 import org.apache.asterix.om.functions.BuiltinFunctions;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.ATypeTag;
 import org.apache.asterix.om.types.IAType;
-import org.apache.asterix.om.util.NonTaggedFormatUtil;
+import org.apache.asterix.om.utils.NonTaggedFormatUtil;
 import org.apache.asterix.runtime.base.AsterixTupleFilterFactory;
 import org.apache.asterix.runtime.formats.FormatUtils;
 import org.apache.asterix.runtime.job.listener.JobEventListenerFactory;
 import 
org.apache.asterix.runtime.operators.LSMInvertedIndexUpsertOperatorDescriptor;
 import org.apache.asterix.runtime.operators.LSMTreeUpsertOperatorDescriptor;
-import org.apache.asterix.runtime.util.AppContextInfo;
-import org.apache.asterix.runtime.util.ClusterStateManager;
-import org.apache.asterix.runtime.util.RuntimeComponentsProvider;
+import org.apache.asterix.runtime.utils.AppContextInfo;
+import org.apache.asterix.runtime.utils.ClusterStateManager;
+import org.apache.asterix.runtime.utils.RuntimeComponentsProvider;
 import 
org.apache.asterix.transaction.management.opcallbacks.LockThenSearchOperationCallbackFactory;
 import 
org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexInstantSearchOperationCallbackFactory;
 import 
org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexModificationOperationCallbackFactory;
-import 
org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexOperationTrackerProvider;
 import 
org.apache.asterix.transaction.management.opcallbacks.SecondaryIndexModificationOperationCallbackFactory;
-import 
org.apache.asterix.transaction.management.opcallbacks.SecondaryIndexOperationTrackerProvider;
 import 
org.apache.asterix.transaction.management.opcallbacks.SecondaryIndexSearchOperationCallbackFactory;
 import 
org.apache.asterix.transaction.management.opcallbacks.TempDatasetPrimaryIndexModificationOperationCallbackFactory;
 import 
org.apache.asterix.transaction.management.opcallbacks.TempDatasetSecondaryIndexModificationOperationCallbackFactory;
@@ -148,6 +138,7 @@ import 
org.apache.hyracks.dataflow.common.data.marshalling.ShortSerializerDeseri
 import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
 import org.apache.hyracks.dataflow.std.result.ResultWriterOperatorDescriptor;
 import 
org.apache.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
+import org.apache.hyracks.storage.am.common.api.IMetadataPageManagerFactory;
 import 
org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
 import org.apache.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
 import 
org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
@@ -155,22 +146,18 @@ import 
org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory
 import 
org.apache.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
 import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
 import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
-import 
org.apache.hyracks.storage.am.lsm.btree.dataflow.ExternalBTreeDataflowHelperFactory;
-import 
org.apache.hyracks.storage.am.lsm.btree.dataflow.ExternalBTreeWithBuddyDataflowHelperFactory;
-import 
org.apache.hyracks.storage.am.lsm.btree.dataflow.LSMBTreeDataflowHelperFactory;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
 import 
org.apache.hyracks.storage.am.lsm.invertedindex.dataflow.BinaryTokenizerOperatorDescriptor;
 import 
org.apache.hyracks.storage.am.lsm.invertedindex.dataflow.LSMInvertedIndexBulkLoadOperatorDescriptor;
-import 
org.apache.hyracks.storage.am.lsm.invertedindex.dataflow.LSMInvertedIndexDataflowHelperFactory;
-import 
org.apache.hyracks.storage.am.lsm.invertedindex.dataflow.PartitionedLSMInvertedIndexDataflowHelperFactory;
 import 
org.apache.hyracks.storage.am.lsm.invertedindex.tokenizers.IBinaryTokenizerFactory;
-import 
org.apache.hyracks.storage.am.lsm.rtree.dataflow.ExternalRTreeDataflowHelperFactory;
-import 
org.apache.hyracks.storage.am.lsm.rtree.dataflow.LSMRTreeWithAntiMatterTuplesDataflowHelperFactory;
 import 
org.apache.hyracks.storage.am.rtree.dataflow.RTreeSearchOperatorDescriptor;
-import org.apache.hyracks.storage.am.rtree.frames.RTreePolicyType;
 
 public class MetadataProvider implements IMetadataProvider<DataSourceId, 
String> {
 
+    private final IStorageComponentProvider storaegComponentProvider;
+    private final ITransactionSubsystemProvider txnSubsystemProvider;
+    private final IMetadataPageManagerFactory metadataPageManagerFactory;
+    private final IPrimitiveValueProviderFactory primitiveValueProviderFactory;
     private final StorageProperties storageProperties;
     private final ILibraryManager libraryManager;
     private final Dataverse defaultDataverse;
@@ -188,10 +175,14 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
     private boolean isTemporaryDatasetWriteJob = true;
     private boolean blockingOperatorDisabled = false;
 
-    public MetadataProvider(Dataverse defaultDataverse) {
+    public MetadataProvider(Dataverse defaultDataverse, 
IStorageComponentProvider componentProvider) {
         this.defaultDataverse = defaultDataverse;
-        this.storageProperties = 
AppContextInfo.INSTANCE.getStorageProperties();
-        this.libraryManager = AppContextInfo.INSTANCE.getLibraryManager();
+        this.storaegComponentProvider = componentProvider;
+        storageProperties = AppContextInfo.INSTANCE.getStorageProperties();
+        libraryManager = AppContextInfo.INSTANCE.getLibraryManager();
+        txnSubsystemProvider = 
componentProvider.getTransactionSubsystemProvider();
+        metadataPageManagerFactory = 
componentProvider.getMetadataPageManagerFactory();
+        primitiveValueProviderFactory = 
componentProvider.getPrimitiveValueProviderFactory();
     }
 
     public String getPropertyValue(String propertyName) {
@@ -408,8 +399,8 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
     protected Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> 
buildLoadableDatasetScan(
             JobSpecification jobSpec, IAdapterFactory adapterFactory, 
RecordDescriptor rDesc)
             throws AlgebricksException {
-        ExternalDataScanOperatorDescriptor dataScanner = new 
ExternalDataScanOperatorDescriptor(jobSpec, rDesc,
-                adapterFactory);
+        ExternalScanOperatorDescriptor dataScanner =
+                new ExternalScanOperatorDescriptor(jobSpec, rDesc, 
adapterFactory);
         AlgebricksPartitionConstraint constraint;
         try {
             constraint = adapterFactory.getPartitionConstraint();
@@ -437,8 +428,8 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
     public Triple<IOperatorDescriptor, AlgebricksPartitionConstraint, 
IAdapterFactory> buildFeedIntakeRuntime(
             JobSpecification jobSpec, Feed primaryFeed, FeedPolicyAccessor 
policyAccessor) throws Exception {
         Triple<IAdapterFactory, RecordDescriptor, 
IDataSourceAdapter.AdapterType> factoryOutput;
-        factoryOutput = 
FeedMetadataUtil.getPrimaryFeedFactoryAndOutput(primaryFeed, policyAccessor, 
mdTxnCtx,
-                libraryManager);
+        factoryOutput =
+                FeedMetadataUtil.getPrimaryFeedFactoryAndOutput(primaryFeed, 
policyAccessor, mdTxnCtx, libraryManager);
         ARecordType recordType = FeedMetadataUtil.getOutputType(primaryFeed, 
primaryFeed.getAdapterConfiguration(),
                 ExternalDataConstants.KEY_TYPE_NAME);
         IAdapterFactory adapterFactory = factoryOutput.first;
@@ -454,6 +445,8 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
                 feedIngestor = new FeedIntakeOperatorDescriptor(jobSpec, 
primaryFeed, libraryName,
                         adapterFactory.getClass().getName(), recordType, 
policyAccessor, factoryOutput.second);
                 break;
+            default:
+                break;
         }
 
         AlgebricksPartitionConstraint partitionConstraint = 
adapterFactory.getPartitionConstraint();
@@ -461,10 +454,10 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
     }
 
     public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> 
buildBtreeRuntime(JobSpecification jobSpec,
-            List<LogicalVariable> outputVars, IOperatorSchema opSchema, 
IVariableTypeEnvironment typeEnv,
-            JobGenContext context, boolean retainInput, boolean retainMissing, 
Dataset dataset, String indexName,
-            int[] lowKeyFields, int[] highKeyFields, boolean lowKeyInclusive, 
boolean highKeyInclusive,
-            Object implConfig, int[] minFilterFieldIndexes, int[] 
maxFilterFieldIndexes) throws AlgebricksException {
+            IOperatorSchema opSchema, IVariableTypeEnvironment typeEnv, 
JobGenContext context, boolean retainInput,
+            boolean retainMissing, Dataset dataset, String indexName, int[] 
lowKeyFields, int[] highKeyFields,
+            boolean lowKeyInclusive, boolean highKeyInclusive, int[] 
minFilterFieldIndexes,
+            int[] maxFilterFieldIndexes) throws AlgebricksException {
         boolean isSecondary = true;
         int numSecondaryKeys = 0;
         try {
@@ -474,43 +467,38 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
             if (primaryIndex != null && (dataset.getDatasetType() != 
DatasetType.EXTERNAL)) {
                 isSecondary = !indexName.equals(primaryIndex.getIndexName());
             }
-            int numPrimaryKeys = 
DatasetUtils.getPartitioningKeys(dataset).size();
+            Index theIndex = isSecondary ? 
MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
+                    dataset.getDatasetName(), indexName) : primaryIndex;
+            int numPrimaryKeys = 
DatasetUtil.getPartitioningKeys(dataset).size();
             RecordDescriptor outputRecDesc = 
JobGenHelper.mkRecordDescriptor(typeEnv, opSchema, context);
             int[] bloomFilterKeyFields;
             ITypeTraits[] typeTraits;
             IBinaryComparatorFactory[] comparatorFactories;
 
-            ARecordType itemType = (ARecordType) 
this.findType(dataset.getItemTypeDataverseName(),
-                    dataset.getItemTypeName());
+            ARecordType itemType =
+                    (ARecordType) 
this.findType(dataset.getItemTypeDataverseName(), dataset.getItemTypeName());
             ARecordType metaType = null;
             List<Integer> primaryKeyIndicators = null;
             if (dataset.hasMetaPart()) {
-                metaType = (ARecordType) 
findType(dataset.getMetaItemTypeDataverseName(),
-                        dataset.getMetaItemTypeName());
+                metaType =
+                        (ARecordType) 
findType(dataset.getMetaItemTypeDataverseName(), dataset.getMetaItemTypeName());
                 primaryKeyIndicators = ((InternalDatasetDetails) 
dataset.getDatasetDetails()).getKeySourceIndicator();
             }
 
-            ITypeTraits[] filterTypeTraits = 
DatasetUtils.computeFilterTypeTraits(dataset, itemType);
-            IBinaryComparatorFactory[] filterCmpFactories = 
DatasetUtils.computeFilterBinaryComparatorFactories(dataset,
-                    itemType, context.getBinaryComparatorFactoryProvider());
-            int[] filterFields = null;
-            int[] btreeFields = null;
-
+            ITypeTraits[] filterTypeTraits = 
DatasetUtil.computeFilterTypeTraits(dataset, itemType);
+            int[] filterFields;
+            int[] btreeFields;
             if (isSecondary) {
-                Index secondaryIndex = 
MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
-                        dataset.getDatasetName(), indexName);
-                numSecondaryKeys = secondaryIndex.getKeyFieldNames().size();
+                numSecondaryKeys = theIndex.getKeyFieldNames().size();
                 bloomFilterKeyFields = new int[numSecondaryKeys];
                 for (int i = 0; i < numSecondaryKeys; i++) {
                     bloomFilterKeyFields[i] = i;
                 }
                 Pair<IBinaryComparatorFactory[], ITypeTraits[]> 
comparatorFactoriesAndTypeTraits =
-                        
getComparatorFactoriesAndTypeTraitsOfSecondaryBTreeIndex(
-                                secondaryIndex.getKeyFieldNames(), 
secondaryIndex.getKeyFieldTypes(),
-                                DatasetUtils.getPartitioningKeys(dataset), 
itemType, dataset.getDatasetType(),
-                                dataset.hasMetaPart(), primaryKeyIndicators,
-                                secondaryIndex.getKeyFieldSourceIndicators(),
-                                metaType);
+                        
getComparatorFactoriesAndTypeTraitsOfSecondaryBTreeIndex(theIndex.getKeyFieldNames(),
+                                theIndex.getKeyFieldTypes(), 
DatasetUtil.getPartitioningKeys(dataset), itemType,
+                                dataset.getDatasetType(), 
dataset.hasMetaPart(), primaryKeyIndicators,
+                                theIndex.getKeyFieldSourceIndicators(), 
metaType);
                 comparatorFactories = comparatorFactoriesAndTypeTraits.first;
                 typeTraits = comparatorFactoriesAndTypeTraits.second;
                 if (filterTypeTraits != null) {
@@ -528,77 +516,58 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
                     bloomFilterKeyFields[i] = i;
                 }
                 // get meta item type
-                ARecordType metaItemType = DatasetUtils.getMetaType(this, 
dataset);
-                typeTraits = DatasetUtils.computeTupleTypeTraits(dataset, 
itemType, metaItemType);
-                comparatorFactories = 
DatasetUtils.computeKeysBinaryComparatorFactories(dataset, itemType, 
metaItemType,
+                ARecordType metaItemType = DatasetUtil.getMetaType(this, 
dataset);
+                typeTraits = DatasetUtil.computeTupleTypeTraits(dataset, 
itemType, metaItemType);
+                comparatorFactories = 
DatasetUtil.computeKeysBinaryComparatorFactories(dataset, itemType, 
metaItemType,
                         context.getBinaryComparatorFactoryProvider());
-                filterFields = DatasetUtils.createFilterFields(dataset);
-                btreeFields = 
DatasetUtils.createBTreeFieldsWhenThereisAFilter(dataset);
             }
 
             IApplicationContextInfo appContext = (IApplicationContextInfo) 
context.getAppContext();
             Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc;
-            spPc = 
splitProviderAndPartitionConstraintsForDataset(dataset.getDataverseName(), 
dataset.getDatasetName(),
-                    indexName, temp);
+            spPc = getSplitProviderAndConstraints(dataset.getDataverseName(), 
dataset.getDatasetName(), indexName,
+                    temp);
 
             ISearchOperationCallbackFactory searchCallbackFactory;
             if (isSecondary) {
                 searchCallbackFactory = temp ? 
NoOpOperationCallbackFactory.INSTANCE
                         : new SecondaryIndexSearchOperationCallbackFactory();
             } else {
-                JobId jobId = ((JobEventListenerFactory) 
jobSpec.getJobletEventListenerFactory()).getJobId();
                 int datasetId = dataset.getDatasetId();
                 int[] primaryKeyFields = new int[numPrimaryKeys];
                 for (int i = 0; i < numPrimaryKeys; i++) {
                     primaryKeyFields[i] = i;
                 }
 
-                ITransactionSubsystemProvider txnSubsystemProvider = new 
TransactionSubsystemProvider();
-
                 /**
                  * Due to the read-committed isolation level,
                  * we may acquire very short duration lock(i.e., instant lock) 
for readers.
                  */
                 searchCallbackFactory = temp ? 
NoOpOperationCallbackFactory.INSTANCE
-                        : new 
PrimaryIndexInstantSearchOperationCallbackFactory(jobId, datasetId, 
primaryKeyFields,
-                                txnSubsystemProvider, ResourceType.LSM_BTREE);
+                        : new 
PrimaryIndexInstantSearchOperationCallbackFactory(
+                                ((JobEventListenerFactory) 
jobSpec.getJobletEventListenerFactory()).getJobId(),
+                                datasetId, primaryKeyFields, 
txnSubsystemProvider, ResourceType.LSM_BTREE);
             }
-            Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = 
DatasetUtils
-                    .getMergePolicyFactory(dataset, mdTxnCtx);
+            Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
+                    DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx);
             RuntimeComponentsProvider rtcProvider = 
RuntimeComponentsProvider.RUNTIME_PROVIDER;
             BTreeSearchOperatorDescriptor btreeSearchOp;
             if (dataset.getDatasetType() == DatasetType.INTERNAL) {
                 btreeSearchOp = new BTreeSearchOperatorDescriptor(jobSpec, 
outputRecDesc,
-                        appContext.getStorageManagerInterface(), 
appContext.getIndexLifecycleManagerProvider(),
-                        spPc.first, typeTraits, comparatorFactories, 
bloomFilterKeyFields, lowKeyFields, highKeyFields,
+                        appContext.getStorageManager(), 
appContext.getIndexLifecycleManagerProvider(), spPc.first,
+                        typeTraits, comparatorFactories, bloomFilterKeyFields, 
lowKeyFields, highKeyFields,
                         lowKeyInclusive, highKeyInclusive,
-                        new LSMBTreeDataflowHelperFactory(new 
AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
-                                compactionInfo.first, compactionInfo.second,
-                                isSecondary ? new 
SecondaryIndexOperationTrackerProvider(dataset.getDatasetId())
-                                        : new 
PrimaryIndexOperationTrackerProvider(dataset.getDatasetId()),
-                                rtcProvider, 
LSMBTreeIOOperationCallbackFactory.INSTANCE,
-                                
storageProperties.getBloomFilterFalsePositiveRate(), !isSecondary, 
filterTypeTraits,
-                                filterCmpFactories, btreeFields, filterFields, 
!temp),
+                        dataset.getIndexDataflowHelperFactory(this, theIndex, 
itemType, metaType, compactionInfo.first,
+                                compactionInfo.second),
                         retainInput, retainMissing, 
context.getMissingWriterFactory(), searchCallbackFactory,
-                        minFilterFieldIndexes, maxFilterFieldIndexes, 
LSMIndexUtil
-                                .getMetadataPageManagerFactory());
+                        minFilterFieldIndexes, maxFilterFieldIndexes, 
metadataPageManagerFactory);
             } else {
-                // External dataset <- use the btree with buddy btree->
-                // Be Careful of Key Start Index ?
-                int[] buddyBreeFields = new int[] { numSecondaryKeys };
-                ExternalBTreeWithBuddyDataflowHelperFactory 
indexDataflowHelperFactory =
-                        new ExternalBTreeWithBuddyDataflowHelperFactory(
-                                compactionInfo.first, compactionInfo.second,
-                                new 
SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
-                                RuntimeComponentsProvider.RUNTIME_PROVIDER,
-                                
LSMBTreeWithBuddyIOOperationCallbackFactory.INSTANCE,
-                                
getStorageProperties().getBloomFilterFalsePositiveRate(), buddyBreeFields,
-                                
ExternalDatasetsRegistry.INSTANCE.getAndLockDatasetVersion(dataset, this), 
!temp);
+                IIndexDataflowHelperFactory indexDataflowHelperFactory = 
dataset.getIndexDataflowHelperFactory(this,
+                        theIndex, itemType, metaType, compactionInfo.first, 
compactionInfo.second);
                 btreeSearchOp = new 
ExternalBTreeSearchOperatorDescriptor(jobSpec, outputRecDesc, rtcProvider,
                         rtcProvider, spPc.first, typeTraits, 
comparatorFactories, bloomFilterKeyFields, lowKeyFields,
                         highKeyFields, lowKeyInclusive, highKeyInclusive, 
indexDataflowHelperFactory, retainInput,
-                        retainMissing, context.getMissingWriterFactory(), 
searchCallbackFactory, LSMIndexUtil
-                                .getMetadataPageManagerFactory());
+                        retainMissing, context.getMissingWriterFactory(), 
searchCallbackFactory,
+                        metadataPageManagerFactory);
             }
             return new Pair<>(btreeSearchOp, spPc.second);
         } catch (MetadataException me) {
@@ -611,8 +580,9 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
             JobGenContext context, boolean retainInput, boolean retainMissing, 
Dataset dataset, String indexName,
             int[] keyFields, int[] minFilterFieldIndexes, int[] 
maxFilterFieldIndexes) throws AlgebricksException {
         try {
-            ARecordType recType = (ARecordType) 
findType(dataset.getItemTypeDataverseName(), dataset.getItemTypeName());
-            int numPrimaryKeys = 
DatasetUtils.getPartitioningKeys(dataset).size();
+            ARecordType recType =
+                    (ARecordType) findType(dataset.getItemTypeDataverseName(), 
dataset.getItemTypeName());
+            int numPrimaryKeys = 
DatasetUtil.getPartitioningKeys(dataset).size();
 
             boolean temp = dataset.getDatasetDetails().isTemp();
             Index secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, 
dataset.getDataverseName(),
@@ -629,19 +599,18 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
                         "Cannot use " + numSecondaryKeys + " fields as a key 
for the R-tree index. "
                                 + "There can be only one field as a key for 
the R-tree index.");
             }
-            Pair<IAType, Boolean> keyTypePair = 
Index.getNonNullableOpenFieldType(secondaryKeyTypes.get(0),
-                    secondaryKeyFields.get(0), recType);
+            Pair<IAType, Boolean> keyTypePair =
+                    
Index.getNonNullableOpenFieldType(secondaryKeyTypes.get(0), 
secondaryKeyFields.get(0), recType);
             IAType keyType = keyTypePair.first;
             if (keyType == null) {
                 throw new AlgebricksException("Could not find field " + 
secondaryKeyFields.get(0) + " in the schema.");
             }
             int numDimensions = 
NonTaggedFormatUtil.getNumDimensions(keyType.getTypeTag());
-            boolean isPointMBR = keyType.getTypeTag() == ATypeTag.POINT || 
keyType.getTypeTag() == ATypeTag.POINT3D;
             int numNestedSecondaryKeyFields = numDimensions * 2;
             IPrimitiveValueProviderFactory[] valueProviderFactories =
                     new 
IPrimitiveValueProviderFactory[numNestedSecondaryKeyFields];
             for (int i = 0; i < numNestedSecondaryKeyFields; i++) {
-                valueProviderFactories[i] = 
PrimitiveValueProviderFactory.INSTANCE;
+                valueProviderFactories[i] = primitiveValueProviderFactory;
             }
 
             RecordDescriptor outputRecDesc = 
JobGenHelper.mkRecordDescriptor(typeEnv, opSchema, context);
@@ -655,27 +624,24 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
             ITypeTraits[] typeTraits = 
JobGenHelper.variablesToTypeTraits(outputVars, keysStartIndex,
                     numNestedSecondaryKeyFields + numPrimaryKeys, typeEnv, 
context);
             IApplicationContextInfo appContext = (IApplicationContextInfo) 
context.getAppContext();
-            Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc =
-                    
splitProviderAndPartitionConstraintsForDataset(dataset.getDataverseName(),
-                            dataset.getDatasetName(), indexName, temp);
+            Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc = 
getSplitProviderAndConstraints(
+                    dataset.getDataverseName(), dataset.getDatasetName(), 
indexName, temp);
             ARecordType metaType = null;
             if (dataset.hasMetaPart()) {
-                metaType = (ARecordType) 
findType(dataset.getMetaItemTypeDataverseName(),
-                        dataset.getMetaItemTypeName());
+                metaType =
+                        (ARecordType) 
findType(dataset.getMetaItemTypeDataverseName(), dataset.getMetaItemTypeName());
             }
 
-            IBinaryComparatorFactory[] primaryComparatorFactories = 
DatasetUtils.computeKeysBinaryComparatorFactories(
+            IBinaryComparatorFactory[] primaryComparatorFactories = 
DatasetUtil.computeKeysBinaryComparatorFactories(
                     dataset, recType, metaType, 
context.getBinaryComparatorFactoryProvider());
             int[] btreeFields = new int[primaryComparatorFactories.length];
             for (int i = 0; i < btreeFields.length; i++) {
                 btreeFields[i] = i + numNestedSecondaryKeyFields;
             }
 
-            ITypeTraits[] filterTypeTraits = 
DatasetUtils.computeFilterTypeTraits(dataset, recType);
-            IBinaryComparatorFactory[] filterCmpFactories = 
DatasetUtils.computeFilterBinaryComparatorFactories(dataset,
-                    recType, context.getBinaryComparatorFactoryProvider());
-            int[] filterFields = null;
-            int[] rtreeFields = null;
+            ITypeTraits[] filterTypeTraits = 
DatasetUtil.computeFilterTypeTraits(dataset, recType);
+            int[] filterFields;
+            int[] rtreeFields;
             if (filterTypeTraits != null) {
                 filterFields = new int[1];
                 filterFields[0] = numNestedSecondaryKeyFields + numPrimaryKeys;
@@ -684,46 +650,26 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
                     rtreeFields[i] = i;
                 }
             }
-
-            IAType nestedKeyType = 
NonTaggedFormatUtil.getNestedSpatialType(keyType.getTypeTag());
-            Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = 
DatasetUtils
-                    .getMergePolicyFactory(dataset, mdTxnCtx);
-            ISearchOperationCallbackFactory searchCallbackFactory = temp ? 
NoOpOperationCallbackFactory.INSTANCE
-                    : new SecondaryIndexSearchOperationCallbackFactory();
-
+            Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
+                    DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx);
+            ISearchOperationCallbackFactory searchCallbackFactory =
+                    temp ? NoOpOperationCallbackFactory.INSTANCE : new 
SecondaryIndexSearchOperationCallbackFactory();
             RTreeSearchOperatorDescriptor rtreeSearchOp;
+            IIndexDataflowHelperFactory indexDataflowHelperFactory = 
dataset.getIndexDataflowHelperFactory(this,
+                    secondaryIndex, recType, metaType, compactionInfo.first, 
compactionInfo.second);
             if (dataset.getDatasetType() == DatasetType.INTERNAL) {
-                IBinaryComparatorFactory[] deletedKeyBTreeCompFactories = 
getMergedComparatorFactories(
-                        comparatorFactories, primaryComparatorFactories);
-                IIndexDataflowHelperFactory idff = new 
LSMRTreeWithAntiMatterTuplesDataflowHelperFactory(
-                        valueProviderFactories, RTreePolicyType.RTREE, 
deletedKeyBTreeCompFactories,
-                        new 
AsterixVirtualBufferCacheProvider(dataset.getDatasetId()), compactionInfo.first,
-                        compactionInfo.second, new 
SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
-                        RuntimeComponentsProvider.RUNTIME_PROVIDER, 
LSMRTreeIOOperationCallbackFactory.INSTANCE,
-                        
MetadataProvider.proposeLinearizer(nestedKeyType.getTypeTag(), 
comparatorFactories.length),
-                        rtreeFields, filterTypeTraits, filterCmpFactories, 
filterFields, !temp, isPointMBR);
                 rtreeSearchOp = new RTreeSearchOperatorDescriptor(jobSpec, 
outputRecDesc,
-                        appContext.getStorageManagerInterface(), 
appContext.getIndexLifecycleManagerProvider(),
-                        spPc.first, typeTraits, comparatorFactories, 
keyFields, idff, retainInput, retainMissing,
-                        context.getMissingWriterFactory(), 
searchCallbackFactory, minFilterFieldIndexes,
-                        maxFilterFieldIndexes, 
LSMIndexUtil.getMetadataPageManagerFactory());
+                        appContext.getStorageManager(), 
appContext.getIndexLifecycleManagerProvider(), spPc.first,
+                        typeTraits, comparatorFactories, keyFields, 
indexDataflowHelperFactory, retainInput,
+                        retainMissing, context.getMissingWriterFactory(), 
searchCallbackFactory, minFilterFieldIndexes,
+                        maxFilterFieldIndexes, metadataPageManagerFactory);
             } else {
-                // External Dataset
-                ExternalRTreeDataflowHelperFactory indexDataflowHelperFactory 
= new ExternalRTreeDataflowHelperFactory(
-                        valueProviderFactories, RTreePolicyType.RTREE,
-                        IndexingConstants.getBuddyBtreeComparatorFactories(), 
compactionInfo.first,
-                        compactionInfo.second, new 
SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
-                        RuntimeComponentsProvider.RUNTIME_PROVIDER, 
LSMRTreeIOOperationCallbackFactory.INSTANCE,
-                        proposeLinearizer(nestedKeyType.getTypeTag(), 
comparatorFactories.length),
-                        
getStorageProperties().getBloomFilterFalsePositiveRate(),
-                        new int[] { numNestedSecondaryKeyFields },
-                        
ExternalDatasetsRegistry.INSTANCE.getAndLockDatasetVersion(dataset, this), 
!temp, isPointMBR);
                 // Create the operator
                 rtreeSearchOp = new 
ExternalRTreeSearchOperatorDescriptor(jobSpec, outputRecDesc,
-                        appContext.getStorageManagerInterface(), 
appContext.getIndexLifecycleManagerProvider(),
-                        spPc.first, typeTraits, comparatorFactories, 
keyFields, indexDataflowHelperFactory, retainInput,
-                        retainMissing, context.getMissingWriterFactory(), 
searchCallbackFactory, LSMIndexUtil
-                                .getMetadataPageManagerFactory());
+                        appContext.getStorageManager(), 
appContext.getIndexLifecycleManagerProvider(), spPc.first,
+                        typeTraits, comparatorFactories, keyFields, 
indexDataflowHelperFactory, retainInput,
+                        retainMissing, context.getMissingWriterFactory(), 
searchCallbackFactory,
+                        metadataPageManagerFactory);
             }
 
             return new Pair<>(rtreeSearchOp, spPc.second);
@@ -741,8 +687,8 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
         File outFile = new File(fs.getPath());
         String nodeId = fs.getNodeName();
 
-        SinkWriterRuntimeFactory runtime = new 
SinkWriterRuntimeFactory(printColumns, printerFactories, outFile,
-                getWriterFactory(), inputDesc);
+        SinkWriterRuntimeFactory runtime =
+                new SinkWriterRuntimeFactory(printColumns, printerFactories, 
outFile, getWriterFactory(), inputDesc);
         AlgebricksPartitionConstraint apc = new 
AlgebricksAbsolutePartitionConstraint(new String[] { nodeId });
         return new Pair<>(runtime, apc);
     }
@@ -776,7 +722,7 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
 
         Dataset dataset = MetadataManagerUtil.findExistingDataset(mdTxnCtx, 
dataverseName, datasetName);
         int numKeys = keys.size();
-        int numFilterFields = DatasetUtils.getFilterField(dataset) == null ? 0 
: 1;
+        int numFilterFields = DatasetUtil.getFilterField(dataset) == null ? 0 
: 1;
 
         // move key fields to front
         int[] fieldPermutation = new int[numKeys + 1 + numFilterFields];
@@ -797,56 +743,40 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
         try {
             boolean temp = dataset.getDatasetDetails().isTemp();
             isTemporaryDatasetWriteJob = isTemporaryDatasetWriteJob && temp;
-
             Index primaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, 
dataset.getDataverseName(),
                     dataset.getDatasetName(), dataset.getDatasetName());
             String indexName = primaryIndex.getIndexName();
-
-            ARecordType metaType = null;
-            if (dataset.hasMetaPart()) {
-                metaType = (ARecordType) 
findType(dataset.getMetaItemTypeDataverseName(),
-                        dataset.getMetaItemTypeName());
-            }
-
+            ARecordType metaType = dataset.hasMetaPart()
+                    ? (ARecordType) 
findType(dataset.getMetaItemTypeDataverseName(), dataset.getMetaItemTypeName())
+                    : null;
             String itemTypeName = dataset.getItemTypeName();
             ARecordType itemType = (ARecordType) MetadataManager.INSTANCE
                     .getDatatype(mdTxnCtx, dataset.getItemTypeDataverseName(), 
itemTypeName).getDatatype();
-            ITypeTraits[] typeTraits = 
DatasetUtils.computeTupleTypeTraits(dataset, itemType, null);
-            IBinaryComparatorFactory[] comparatorFactories = 
DatasetUtils.computeKeysBinaryComparatorFactories(dataset,
+            ITypeTraits[] typeTraits = 
DatasetUtil.computeTupleTypeTraits(dataset, itemType, null);
+            IBinaryComparatorFactory[] comparatorFactories = 
DatasetUtil.computeKeysBinaryComparatorFactories(dataset,
                     itemType, metaType, 
context.getBinaryComparatorFactoryProvider());
 
             Pair<IFileSplitProvider, AlgebricksPartitionConstraint> 
splitsAndConstraint =
-                    
splitProviderAndPartitionConstraintsForDataset(dataSource.getId().getDataverseName(),
-                            datasetName, indexName, temp);
+                    
getSplitProviderAndConstraints(dataSource.getId().getDataverseName(), 
datasetName, indexName,
+                            temp);
             IApplicationContextInfo appContext = (IApplicationContextInfo) 
context.getAppContext();
-
             long numElementsHint = getCardinalityPerPartitionHint(dataset);
 
-            ITypeTraits[] filterTypeTraits = 
DatasetUtils.computeFilterTypeTraits(dataset, itemType);
-            IBinaryComparatorFactory[] filterCmpFactories = 
DatasetUtils.computeFilterBinaryComparatorFactories(dataset,
-                    itemType, context.getBinaryComparatorFactoryProvider());
-            int[] filterFields = DatasetUtils.createFilterFields(dataset);
-            int[] btreeFields = 
DatasetUtils.createBTreeFieldsWhenThereisAFilter(dataset);
-
             // TODO
             // figure out the right behavior of the bulkload and then give the
             // right callback
             // (ex. what's the expected behavior when there is an error during
             // bulkload?)
-            Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = 
DatasetUtils
-                    .getMergePolicyFactory(dataset, mdTxnCtx);
-            TreeIndexBulkLoadOperatorDescriptor btreeBulkLoad = new 
TreeIndexBulkLoadOperatorDescriptor(spec, null,
-                    appContext.getStorageManagerInterface(), 
appContext.getIndexLifecycleManagerProvider(),
-                    splitsAndConstraint.first, typeTraits, 
comparatorFactories, bloomFilterKeyFields, fieldPermutation,
-                    GlobalConfig.DEFAULT_TREE_FILL_FACTOR, false, 
numElementsHint, true,
-                    new LSMBTreeDataflowHelperFactory(new 
AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
-                            compactionInfo.first, compactionInfo.second,
-                            new 
PrimaryIndexOperationTrackerProvider(dataset.getDatasetId()),
-                            RuntimeComponentsProvider.RUNTIME_PROVIDER,
-                            LSMBTreeIOOperationCallbackFactory.INSTANCE,
-                            
storageProperties.getBloomFilterFalsePositiveRate(), true, filterTypeTraits,
-                            filterCmpFactories, btreeFields, filterFields, 
!temp), LSMIndexUtil
-                                    .getMetadataPageManagerFactory());
+            Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
+                    DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx);
+            TreeIndexBulkLoadOperatorDescriptor btreeBulkLoad =
+                    new TreeIndexBulkLoadOperatorDescriptor(spec, null, 
appContext.getStorageManager(),
+                            appContext.getIndexLifecycleManagerProvider(), 
splitsAndConstraint.first, typeTraits,
+                            comparatorFactories, bloomFilterKeyFields, 
fieldPermutation,
+                            GlobalConfig.DEFAULT_TREE_FILL_FACTOR, false,
+                            numElementsHint, true, 
dataset.getIndexDataflowHelperFactory(this, primaryIndex, itemType,
+                                    metaType, compactionInfo.first, 
compactionInfo.second),
+                            metadataPageManagerFactory);
             return new Pair<>(btreeBulkLoad, splitsAndConstraint.second);
         } catch (MetadataException me) {
             throw new AlgebricksException(me);
@@ -957,10 +887,9 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
      * @param dataset
      * @return Number of elements that will be used to create a bloom filter 
per
      *         dataset per partition
-     * @throws MetadataException
      * @throws AlgebricksException
      */
-    public long getCardinalityPerPartitionHint(Dataset dataset) throws 
MetadataException, AlgebricksException {
+    public long getCardinalityPerPartitionHint(Dataset dataset) throws 
AlgebricksException {
         String numElementsHintString = 
dataset.getHints().get(DatasetCardinalityHint.NAME);
         long numElementsHint;
         if (numElementsHintString == null) {
@@ -969,34 +898,32 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
             numElementsHint = Long.parseLong(numElementsHintString);
         }
         int numPartitions = 0;
-        List<String> nodeGroup = 
MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, dataset.getNodeGroupName())
-                .getNodeNames();
+        List<String> nodeGroup =
+                MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, 
dataset.getNodeGroupName()).getNodeNames();
         for (String nd : nodeGroup) {
             numPartitions += 
ClusterStateManager.INSTANCE.getNodePartitionsCount(nd);
         }
-        numElementsHint = numElementsHint / numPartitions;
-        return numElementsHint;
+        return numElementsHint / numPartitions;
     }
 
     protected IAdapterFactory getConfiguredAdapterFactory(Dataset dataset, 
String adapterName,
-            Map<String, String> configuration, ARecordType itemType, boolean 
isPKAutoGenerated,
-            List<List<String>> primaryKeys, ARecordType metaType) throws 
AlgebricksException {
+            Map<String, String> configuration, ARecordType itemType, 
ARecordType metaType) throws AlgebricksException {
         try {
             configuration.put(ExternalDataConstants.KEY_DATAVERSE, 
dataset.getDataverseName());
             IAdapterFactory adapterFactory = 
AdapterFactoryProvider.getAdapterFactory(libraryManager, adapterName,
                     configuration, itemType, metaType);
 
             // check to see if dataset is indexed
-            Index filesIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, 
dataset.getDataverseName(),
-                    dataset.getDatasetName(),
-                    
dataset.getDatasetName().concat(IndexingConstants.EXTERNAL_FILE_INDEX_NAME_SUFFIX));
+            Index filesIndex =
+                    MetadataManager.INSTANCE.getIndex(mdTxnCtx, 
dataset.getDataverseName(), dataset.getDatasetName(),
+                            
dataset.getDatasetName().concat(IndexingConstants.EXTERNAL_FILE_INDEX_NAME_SUFFIX));
 
             if (filesIndex != null && filesIndex.getPendingOp() == 0) {
                 // get files
                 List<ExternalFile> files = 
MetadataManager.INSTANCE.getDatasetExternalFiles(mdTxnCtx, dataset);
                 Iterator<ExternalFile> iterator = files.iterator();
                 while (iterator.hasNext()) {
-                    if (iterator.next().getPendingOp() != 
ExternalFilePendingOp.PENDING_NO_OP) {
+                    if (iterator.next().getPendingOp() != 
ExternalFilePendingOp.NO_OP) {
                         iterator.remove();
                     }
                 }
@@ -1018,27 +945,27 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
                 numKeyFields / 2);
     }
 
-    public Pair<IFileSplitProvider, AlgebricksPartitionConstraint> 
splitProviderAndPartitionConstraintsForDataset(
-            String dataverseName, String datasetName, String targetIdxName, 
boolean temp) throws AlgebricksException {
+    public Pair<IFileSplitProvider, AlgebricksPartitionConstraint> 
getSplitProviderAndConstraints(String dataverseName,
+            String datasetName, String targetIdxName, boolean temp) throws 
AlgebricksException {
         FileSplit[] splits = splitsForDataset(mdTxnCtx, dataverseName, 
datasetName, targetIdxName, temp);
         return StoragePathUtil.splitProviderAndPartitionConstraints(splits);
     }
 
-    public Pair<IFileSplitProvider, AlgebricksPartitionConstraint> 
splitProviderAndPartitionConstraintsForDataverse(
-            String dataverse) {
-        return 
SplitsAndConstraintsUtil.splitProviderAndPartitionConstraintsForDataverse(dataverse);
+    public Pair<IFileSplitProvider, AlgebricksPartitionConstraint> 
splitAndConstraints(String dataverse) {
+        return 
SplitsAndConstraintsUtil.getDataverseSplitProviderAndConstraints(dataverse);
     }
 
     public FileSplit[] splitsForDataset(MetadataTransactionContext mdTxnCtx, 
String dataverseName, String datasetName,
             String targetIdxName, boolean temp) throws AlgebricksException {
-        return SplitsAndConstraintsUtil.splitsForDataset(mdTxnCtx, 
dataverseName, datasetName, targetIdxName, temp);
+        return SplitsAndConstraintsUtil.getDatasetSplits(mdTxnCtx, 
dataverseName, datasetName, targetIdxName, temp);
     }
 
     public DatasourceAdapter getAdapter(MetadataTransactionContext mdTxnCtx, 
String dataverseName, String adapterName)
             throws MetadataException {
         DatasourceAdapter adapter;
         // search in default namespace (built-in adapter)
-        adapter = MetadataManager.INSTANCE.getAdapter(mdTxnCtx, 
MetadataConstants.METADATA_DATAVERSE_NAME, adapterName);
+        adapter =
+                MetadataManager.INSTANCE.getAdapter(mdTxnCtx, 
MetadataConstants.METADATA_DATAVERSE_NAME, adapterName);
 
         // search in dataverse (user-defined adapter)
         if (adapter == null) {
@@ -1052,61 +979,61 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
     }
 
     public Pair<IFileSplitProvider, AlgebricksPartitionConstraint> 
splitProviderAndPartitionConstraintsForFilesIndex(
-            String dataverseName, String datasetName, String targetIdxName, 
boolean create) throws AlgebricksException {
-        return 
SplitsAndConstraintsUtil.splitProviderAndPartitionConstraintsForFilesIndex(mdTxnCtx,
 dataverseName,
-                datasetName, targetIdxName, create);
+            String dataverseName, String datasetName, String targetIdxName, 
boolean create)
+            throws AlgebricksException {
+        return 
SplitsAndConstraintsUtil.getFilesIndexSplitProviderAndConstraints(mdTxnCtx, 
dataverseName, datasetName,
+                targetIdxName, create);
     }
 
     public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> 
buildExternalDataLookupRuntime(
-            JobSpecification jobSpec, Dataset dataset, Index secondaryIndex, 
int[] ridIndexes, boolean retainInput,
-            IVariableTypeEnvironment typeEnv, List<LogicalVariable> 
outputVars, IOperatorSchema opSchema,
-            JobGenContext context, MetadataProvider metadataProvider, boolean 
retainMissing)
-            throws AlgebricksException {
+            JobSpecification jobSpec, Dataset dataset, int[] ridIndexes, 
boolean retainInput,
+            IVariableTypeEnvironment typeEnv, IOperatorSchema opSchema, 
JobGenContext context,
+            MetadataProvider metadataProvider, boolean retainMissing) throws 
AlgebricksException {
         try {
             // Get data type
-            IAType itemType;
-            itemType = 
MetadataManager.INSTANCE.getDatatype(metadataProvider.getMetadataTxnContext(),
-                    dataset.getDataverseName(), 
dataset.getItemTypeName()).getDatatype();
-
-            // Create the adapter factory <- right now there is only one. if 
there are more in the future, we can create
-            // a map->
+            ARecordType itemType =
+                    (ARecordType) 
MetadataManager.INSTANCE.getDatatype(metadataProvider.getMetadataTxnContext(),
+                            dataset.getDataverseName(), 
dataset.getItemTypeName()).getDatatype();
+            ARecordType metaType = null;
+            if (dataset.hasMetaPart()) {
+                metaType =
+                        (ARecordType) MetadataManager.INSTANCE
+                                
.getDatatype(metadataProvider.getMetadataTxnContext(),
+                                        
dataset.getMetaItemTypeDataverseName(), dataset.getMetaItemTypeName())
+                                .getDatatype();
+            }
             ExternalDatasetDetails datasetDetails = (ExternalDatasetDetails) 
dataset.getDatasetDetails();
-            LookupAdapterFactory<?> adapterFactory = 
AdapterFactoryProvider.getLookupAdapterFactory(libraryManager,
-                    datasetDetails.getProperties(), (ARecordType) itemType, 
ridIndexes, retainInput, retainMissing,
-                    context.getMissingWriterFactory());
+            LookupAdapterFactory<?> adapterFactory =
+                    
AdapterFactoryProvider.getLookupAdapterFactory(libraryManager, 
datasetDetails.getProperties(),
+                            itemType, ridIndexes, retainInput, retainMissing, 
context.getMissingWriterFactory());
 
             Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo;
             try {
-                compactionInfo = DatasetUtils.getMergePolicyFactory(dataset, 
metadataProvider.getMetadataTxnContext());
+                compactionInfo = DatasetUtil.getMergePolicyFactory(dataset, 
metadataProvider.getMetadataTxnContext());
             } catch (MetadataException e) {
                 throw new AlgebricksException(" Unabel to create merge policy 
factory for external dataset", e);
             }
 
             boolean temp = datasetDetails.isTemp();
+            String fileIndexName = 
BTreeDataflowHelperFactoryProvider.externalFileIndexName(dataset);
+            Index fileIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, 
dataset.getDataverseName(),
+                    dataset.getDatasetName(), fileIndexName);
             // Create the file index data flow helper
-            ExternalBTreeDataflowHelperFactory indexDataflowHelperFactory = 
new ExternalBTreeDataflowHelperFactory(
-                    compactionInfo.first, compactionInfo.second,
-                    new 
SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
-                    RuntimeComponentsProvider.RUNTIME_PROVIDER, 
LSMBTreeIOOperationCallbackFactory.INSTANCE,
-                    
metadataProvider.getStorageProperties().getBloomFilterFalsePositiveRate(),
-                    
ExternalDatasetsRegistry.INSTANCE.getAndLockDatasetVersion(dataset, 
metadataProvider), !temp);
-
+            IIndexDataflowHelperFactory indexDataflowHelperFactory = 
dataset.getIndexDataflowHelperFactory(this,
+                    fileIndex, itemType, metaType, compactionInfo.first, 
compactionInfo.second);
             // Create the out record descriptor, appContext and 
fileSplitProvider for the files index
             RecordDescriptor outRecDesc = 
JobGenHelper.mkRecordDescriptor(typeEnv, opSchema, context);
             IApplicationContextInfo appContext = (IApplicationContextInfo) 
context.getAppContext();
             Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc;
             spPc = 
metadataProvider.splitProviderAndPartitionConstraintsForFilesIndex(dataset.getDataverseName(),
-                    dataset.getDatasetName(),
-                    
dataset.getDatasetName().concat(IndexingConstants.EXTERNAL_FILE_INDEX_NAME_SUFFIX),
 false);
-            ISearchOperationCallbackFactory searchOpCallbackFactory = temp ? 
NoOpOperationCallbackFactory.INSTANCE
-                    : new SecondaryIndexSearchOperationCallbackFactory();
+                    dataset.getDatasetName(), fileIndexName, false);
+            ISearchOperationCallbackFactory searchOpCallbackFactory =
+                    temp ? NoOpOperationCallbackFactory.INSTANCE : new 
SecondaryIndexSearchOperationCallbackFactory();
             // Create the operator
             ExternalLookupOperatorDescriptor op = new 
ExternalLookupOperatorDescriptor(jobSpec, adapterFactory,
                     outRecDesc, indexDataflowHelperFactory, retainInput, 
appContext.getIndexLifecycleManagerProvider(),
-                    appContext.getStorageManagerInterface(), spPc.first, 
dataset.getDatasetId(),
-                    
metadataProvider.getStorageProperties().getBloomFilterFalsePositiveRate(), 
searchOpCallbackFactory,
-                    retainMissing, context.getMissingWriterFactory(), 
LSMIndexUtil
-                            .getMetadataPageManagerFactory());
+                    appContext.getStorageManager(), spPc.first, 
searchOpCallbackFactory, retainMissing,
+                    context.getMissingWriterFactory(), 
metadataPageManagerFactory);
             return new Pair<>(op, spPc.second);
         } catch (Exception e) {
             throw new AlgebricksException(e);
@@ -1129,7 +1056,7 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
         isTemporaryDatasetWriteJob = isTemporaryDatasetWriteJob && temp;
 
         int numKeys = primaryKeys.size();
-        int numFilterFields = DatasetUtils.getFilterField(dataset) == null ? 0 
: 1;
+        int numFilterFields = DatasetUtil.getFilterField(dataset) == null ? 0 
: 1;
         int numOfAdditionalFields = additionalNonFilterFields == null ? 0 : 
additionalNonFilterFields.size();
         // Move key fields to front. [keys, record, filters]
         int[] fieldPermutation = new int[numKeys + 1 + numFilterFields + 
numOfAdditionalFields];
@@ -1156,7 +1083,6 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
                 fieldPermutation[i++] = idx;
             }
         }
-
         try {
             Index primaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, 
dataset.getDataverseName(),
                     dataset.getDatasetName(), dataset.getDatasetName());
@@ -1166,15 +1092,14 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
             String itemTypeDataverseName = dataset.getItemTypeDataverseName();
             ARecordType itemType = (ARecordType) MetadataManager.INSTANCE
                     .getDatatype(mdTxnCtx, itemTypeDataverseName, 
itemTypeName).getDatatype();
-            ARecordType metaItemType = DatasetUtils.getMetaType(this, dataset);
-            ITypeTraits[] typeTraits = 
DatasetUtils.computeTupleTypeTraits(dataset, itemType, metaItemType);
+            ARecordType metaItemType = DatasetUtil.getMetaType(this, dataset);
+            ITypeTraits[] typeTraits = 
DatasetUtil.computeTupleTypeTraits(dataset, itemType, metaItemType);
             IApplicationContextInfo appContext = (IApplicationContextInfo) 
context.getAppContext();
-            IBinaryComparatorFactory[] comparatorFactories = 
DatasetUtils.computeKeysBinaryComparatorFactories(dataset,
+            IBinaryComparatorFactory[] comparatorFactories = 
DatasetUtil.computeKeysBinaryComparatorFactories(dataset,
                     itemType, metaItemType, 
context.getBinaryComparatorFactoryProvider());
             Pair<IFileSplitProvider, AlgebricksPartitionConstraint> 
splitsAndConstraint =
-                    
splitProviderAndPartitionConstraintsForDataset(dataSource.getId().getDataverseName(),
 datasetName,
-                            indexName, temp);
-
+                    
getSplitProviderAndConstraints(dataSource.getId().getDataverseName(), 
datasetName, indexName,
+                            temp);
             // prepare callback
             JobId jobId = ((JobEventListenerFactory) 
spec.getJobletEventListenerFactory()).getJobId();
             int datasetId = dataset.getDatasetId();
@@ -1183,13 +1108,6 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
                 primaryKeyFields[i] = i;
             }
 
-            ITypeTraits[] filterTypeTraits = 
DatasetUtils.computeFilterTypeTraits(dataset, itemType);
-            IBinaryComparatorFactory[] filterCmpFactories = 
DatasetUtils.computeFilterBinaryComparatorFactories(dataset,
-                    itemType, context.getBinaryComparatorFactoryProvider());
-            int[] filterFields = DatasetUtils.createFilterFields(dataset);
-            int[] btreeFields = 
DatasetUtils.createBTreeFieldsWhenThereisAFilter(dataset);
-
-            TransactionSubsystemProvider txnSubsystemProvider = new 
TransactionSubsystemProvider();
             IModificationOperationCallbackFactory modificationCallbackFactory 
= temp
                     ? new 
TempDatasetPrimaryIndexModificationOperationCallbackFactory(jobId, datasetId,
                             primaryKeyFields, txnSubsystemProvider, 
IndexOperation.UPSERT, ResourceType.LSM_BTREE)
@@ -1199,18 +1117,14 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
             LockThenSearchOperationCallbackFactory searchCallbackFactory = new 
LockThenSearchOperationCallbackFactory(
                     jobId, datasetId, primaryKeyFields, txnSubsystemProvider, 
ResourceType.LSM_BTREE);
 
-            Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = 
DatasetUtils
-                    .getMergePolicyFactory(dataset, mdTxnCtx);
-            IIndexDataflowHelperFactory idfh = new 
LSMBTreeDataflowHelperFactory(
-                    new AsterixVirtualBufferCacheProvider(datasetId), 
compactionInfo.first, compactionInfo.second,
-                    new 
PrimaryIndexOperationTrackerProvider(dataset.getDatasetId()),
-                    RuntimeComponentsProvider.RUNTIME_PROVIDER, 
LSMBTreeIOOperationCallbackFactory.INSTANCE,
-                    storageProperties.getBloomFilterFalsePositiveRate(), true, 
filterTypeTraits, filterCmpFactories,
-                    btreeFields, filterFields, !temp);
+            Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
+                    DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx);
+            IIndexDataflowHelperFactory idfh = 
dataset.getIndexDataflowHelperFactory(this, primaryIndex, itemType,
+                    metaItemType, compactionInfo.first, compactionInfo.second);
             LSMTreeUpsertOperatorDescriptor op;
 
-            ITypeTraits[] outputTypeTraits = new 
ITypeTraits[recordDesc.getFieldCount()
-                    + (dataset.hasMetaPart() ? 2 : 1) + numFilterFields];
+            ITypeTraits[] outputTypeTraits =
+                    new ITypeTraits[recordDesc.getFieldCount() + 
(dataset.hasMetaPart() ? 2 : 1) + numFilterFields];
             ISerializerDeserializer[] outputSerDes = new 
ISerializerDeserializer[recordDesc.getFieldCount()
                     + (dataset.hasMetaPart() ? 2 : 1) + numFilterFields];
 
@@ -1220,23 +1134,23 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
             f++;
             // add the previous meta second
             if (dataset.hasMetaPart()) {
-                outputSerDes[f] = 
FormatUtils.getDefaultFormat().getSerdeProvider().getSerializerDeserializer(
-                        metaItemType);
+                outputSerDes[f] =
+                        
FormatUtils.getDefaultFormat().getSerdeProvider().getSerializerDeserializer(metaItemType);
                 outputTypeTraits[f] = 
FormatUtils.getDefaultFormat().getTypeTraitProvider().getTypeTrait(metaItemType);
                 f++;
             }
             // add the previous filter third
             int fieldIdx = -1;
             if (numFilterFields > 0) {
-                String filterField = 
DatasetUtils.getFilterField(dataset).get(0);
+                String filterField = 
DatasetUtil.getFilterField(dataset).get(0);
                 for (i = 0; i < itemType.getFieldNames().length; i++) {
                     if (itemType.getFieldNames()[i].equals(filterField)) {
                         break;
                     }
                 }
                 fieldIdx = i;
-                outputTypeTraits[f] = 
FormatUtils.getDefaultFormat().getTypeTraitProvider().getTypeTrait(itemType
-                        .getFieldTypes()[fieldIdx]);
+                outputTypeTraits[f] = 
FormatUtils.getDefaultFormat().getTypeTraitProvider()
+                        .getTypeTrait(itemType.getFieldTypes()[fieldIdx]);
                 outputSerDes[f] = 
FormatUtils.getDefaultFormat().getSerdeProvider()
                         
.getSerializerDeserializer(itemType.getFieldTypes()[fieldIdx]);
                 f++;
@@ -1247,11 +1161,11 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
             }
 
             RecordDescriptor outputRecordDesc = new 
RecordDescriptor(outputSerDes, outputTypeTraits);
-            op = new LSMTreeUpsertOperatorDescriptor(spec, outputRecordDesc,
-                    appContext.getStorageManagerInterface(), 
appContext.getIndexLifecycleManagerProvider(),
-                    splitsAndConstraint.first, typeTraits, 
comparatorFactories, bloomFilterKeyFields, fieldPermutation,
-                    idfh, null, true, indexName, 
context.getMissingWriterFactory(), modificationCallbackFactory,
-                    searchCallbackFactory, null, 
LSMIndexUtil.getMetadataPageManagerFactory());
+            op = new LSMTreeUpsertOperatorDescriptor(spec, outputRecordDesc, 
appContext.getStorageManager(),
+                    appContext.getIndexLifecycleManagerProvider(), 
splitsAndConstraint.first, typeTraits,
+                    comparatorFactories, bloomFilterKeyFields, 
fieldPermutation, idfh, null, true, indexName,
+                    context.getMissingWriterFactory(), 
modificationCallbackFactory, searchCallbackFactory, null,
+                    metadataPageManagerFactory);
             op.setType(itemType);
             op.setFilterIndex(fieldIdx);
             return new Pair<>(op, splitsAndConstraint.second);
@@ -1271,8 +1185,8 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
         ISerializerDeserializer<?> payloadSerde = 
format.getSerdeProvider().getSerializerDeserializer(itemType);
         RecordDescriptor scannerDesc = new RecordDescriptor(new 
ISerializerDeserializer[] { payloadSerde });
 
-        ExternalDataScanOperatorDescriptor dataScanner = new 
ExternalDataScanOperatorDescriptor(jobSpec, scannerDesc,
-                adapterFactory);
+        ExternalScanOperatorDescriptor dataScanner =
+                new ExternalScanOperatorDescriptor(jobSpec, scannerDesc, 
adapterFactory);
 
         AlgebricksPartitionConstraint constraint;
         try {
@@ -1298,12 +1212,12 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
 
         int i = 0;
         for (; i < sidxKeyFieldCount; ++i) {
-            Pair<IAType, Boolean> keyPairType = 
Index.getNonNullableOpenFieldType(sidxKeyFieldTypes.get(i),
-                    sidxKeyFieldNames.get(i),
-                    (hasMeta && secondaryIndexIndicators.get(i).intValue() == 
1) ? metaType : recType);
+            Pair<IAType, Boolean> keyPairType =
+                    
Index.getNonNullableOpenFieldType(sidxKeyFieldTypes.get(i), 
sidxKeyFieldNames.get(i),
+                            (hasMeta && 
secondaryIndexIndicators.get(i).intValue() == 1) ? metaType : recType);
             IAType keyType = keyPairType.first;
-            comparatorFactories[i] = 
BinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType,
-                    true);
+            comparatorFactories[i] =
+                    
BinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType, 
true);
             typeTraits[i] = TypeTraitProvider.INSTANCE.getTypeTrait(keyType);
         }
 
@@ -1325,8 +1239,8 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
             } catch (AsterixException e) {
                 throw new AlgebricksException(e);
             }
-            comparatorFactories[i] = 
BinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType,
-                    true);
+            comparatorFactories[i] =
+                    
BinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType, 
true);
             typeTraits[i] = TypeTraitProvider.INSTANCE.getTypeTrait(keyType);
         }
 
@@ -1340,13 +1254,13 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
             List<LogicalVariable> additionalNonFilteringFields) throws 
AlgebricksException {
 
         String datasetName = dataSource.getId().getDatasourceName();
-        Dataset dataset = MetadataManagerUtil.findExistingDataset(mdTxnCtx, 
dataSource.getId().getDataverseName(),
-                datasetName);
+        Dataset dataset =
+                MetadataManagerUtil.findExistingDataset(mdTxnCtx, 
dataSource.getId().getDataverseName(), datasetName);
         boolean temp = dataset.getDatasetDetails().isTemp();
         isTemporaryDatasetWriteJob = isTemporaryDatasetWriteJob && temp;
 
         int numKeys = keys.size();
-        int numFilterFields = DatasetUtils.getFilterField(dataset) == null ? 0 
: 1;
+        int numFilterFields = DatasetUtil.getFilterField(dataset) == null ? 0 
: 1;
         // Move key fields to front.
         int[] fieldPermutation = new int[numKeys + 1 + numFilterFields
                 + (additionalNonFilteringFields == null ? 0 : 
additionalNonFilteringFields.size())];
@@ -1375,60 +1289,51 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
                     dataset.getDatasetName(), dataset.getDatasetName());
             String indexName = primaryIndex.getIndexName();
             ARecordType itemType = (ARecordType) MetadataManager.INSTANCE
-                    .getDatatype(mdTxnCtx, dataset.getItemTypeDataverseName(), 
dataset.getItemTypeName()).getDatatype();
-            ARecordType metaItemType = DatasetUtils.getMetaType(this, dataset);
-            ITypeTraits[] typeTraits = 
DatasetUtils.computeTupleTypeTraits(dataset, itemType, metaItemType);
+                    .getDatatype(mdTxnCtx, dataset.getItemTypeDataverseName(), 
dataset.getItemTypeName())
+                    .getDatatype();
+            ARecordType metaItemType = DatasetUtil.getMetaType(this, dataset);
+            ITypeTraits[] typeTraits = 
DatasetUtil.computeTupleTypeTraits(dataset, itemType, metaItemType);
 
             IApplicationContextInfo appContext = (IApplicationContextInfo) 
context.getAppContext();
-            IBinaryComparatorFactory[] comparatorFactories = 
DatasetUtils.computeKeysBinaryComparatorFactories(dataset,
+            IBinaryComparatorFactory[] comparatorFactories = 
DatasetUtil.computeKeysBinaryComparatorFactories(dataset,
                     itemType, metaItemType, 
context.getBinaryComparatorFactoryProvider());
             Pair<IFileSplitProvider, AlgebricksPartitionConstraint> 
splitsAndConstraint =
-                    
splitProviderAndPartitionConstraintsForDataset(dataSource.getId().getDataverseName(),
 datasetName,
-                            indexName, temp);
+                    
getSplitProviderAndConstraints(dataSource.getId().getDataverseName(), 
datasetName, indexName,
+                            temp);
 
             // prepare callback
-            JobId jobId = ((JobEventListenerFactory) 
spec.getJobletEventListenerFactory()).getJobId();
             int datasetId = dataset.getDatasetId();
             int[] primaryKeyFields = new int[numKeys];
             for (i = 0; i < numKeys; i++) {
                 primaryKeyFields[i] = i;
             }
-
-            ITypeTraits[] filterTypeTraits = 
DatasetUtils.computeFilterTypeTraits(dataset, itemType);
-            IBinaryComparatorFactory[] filterCmpFactories = 
DatasetUtils.computeFilterBinaryComparatorFactories(dataset,
-                    itemType, context.getBinaryComparatorFactoryProvider());
-            int[] filterFields = DatasetUtils.createFilterFields(dataset);
-            int[] btreeFields = 
DatasetUtils.createBTreeFieldsWhenThereisAFilter(dataset);
-
-            TransactionSubsystemProvider txnSubsystemProvider = new 
TransactionSubsystemProvider();
             IModificationOperationCallbackFactory modificationCallbackFactory 
= temp
-                    ? new 
TempDatasetPrimaryIndexModificationOperationCallbackFactory(jobId, datasetId,
+                    ? new 
TempDatasetPrimaryIndexModificationOperationCallbackFactory(
+                            ((JobEventListenerFactory) 
spec.getJobletEventListenerFactory()).getJobId(), datasetId,
                             primaryKeyFields, txnSubsystemProvider, indexOp, 
ResourceType.LSM_BTREE)
-                    : new 
PrimaryIndexModificationOperationCallbackFactory(jobId, datasetId, 
primaryKeyFields,
-                            txnSubsystemProvider, indexOp, 
ResourceType.LSM_BTREE, dataset.hasMetaPart());
-
-            Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = 
DatasetUtils
-                    .getMergePolicyFactory(dataset, mdTxnCtx);
-            IIndexDataflowHelperFactory idfh = new 
LSMBTreeDataflowHelperFactory(
-                    new AsterixVirtualBufferCacheProvider(datasetId), 
compactionInfo.first, compactionInfo.second,
-                    new 
PrimaryIndexOperationTrackerProvider(dataset.getDatasetId()),
-                    RuntimeComponentsProvider.RUNTIME_PROVIDER, 
LSMBTreeIOOperationCallbackFactory.INSTANCE,
-                    storageProperties.getBloomFilterFalsePositiveRate(), true, 
filterTypeTraits, filterCmpFactories,
-                    btreeFields, filterFields, !temp);
+                    : new PrimaryIndexModificationOperationCallbackFactory(
+                            ((JobEventListenerFactory) 
spec.getJobletEventListenerFactory()).getJobId(), datasetId,
+                            primaryKeyFields, txnSubsystemProvider, indexOp, 
ResourceType.LSM_BTREE,
+                            dataset.hasMetaPart());
+
+            Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
+                    DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx);
+            IIndexDataflowHelperFactory idfh = 
dataset.getIndexDataflowHelperFactory(this, primaryIndex, itemType,
+                    metaItemType, compactionInfo.first, compactionInfo.second);
             IOperatorDescriptor op;
             if (bulkload) {
                 long numElementsHint = getCardinalityPerPartitionHint(dataset);
-                op = new TreeIndexBulkLoadOperatorDescriptor(spec, recordDesc, 
appContext.getStorageManagerInterface(),
+                op = new TreeIndexBulkLoadOperatorDescriptor(spec, recordDesc, 
appContext.getStorageManager(),
                         appContext.getIndexLifecycleManagerProvider(), 
splitsAndConstraint.first, typeTraits,
                         comparatorFactories, bloomFilterKeyFields, 
fieldPermutation,
-                        GlobalConfig.DEFAULT_TREE_FILL_FACTOR, true, 
numElementsHint, true, idfh, LSMIndexUtil
-                                .getMetadataPageManagerFactory());
+                        GlobalConfig.DEFAULT_TREE_FILL_FACTOR, true, 
numElementsHint, true, idfh,
+                        metadataPageManagerFactory);
             } else {
-                op = new LSMTreeInsertDeleteOperatorDescriptor(spec, 
recordDesc,
-                        appContext.getStorageManagerInterface(), 
appContext.getIndexLifecycleManagerProvider(),
-                        splitsAndConstraint.first, typeTraits, 
comparatorFactories, bloomFilterKeyFields,
-                        fieldPermutation, indexOp, idfh, null, true, 
indexName, null, modificationCallbackFactory,
-                        NoOpOperationCallbackFactory.INSTANCE, 
LSMIndexUtil.getMetadataPageManagerFactory());
+                op = new LSMTreeInsertDeleteOperatorDescriptor(spec, 
recordDesc, appContext.getStorageManager(),
+                        appContext.getIndexLifecycleManagerProvider(), 
splitsAndConstraint.first, typeTraits,
+                        comparatorFactories, bloomFilterKeyFields, 
fieldPermutation, indexOp, idfh, null, true,
+                        indexName, null, modificationCallbackFactory, 
NoOpOperationCallbackFactory.INSTANCE,
+                        metadataPageManagerFactory);
             }
             return new Pair<>(op, splitsAndConstraint.second);
         } catch (MetadataException me) {
@@ -1496,7 +1401,7 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
         isTemporaryDatasetWriteJob = isTemporaryDatasetWriteJob && temp;
 
         int numKeys = primaryKeys.size() + secondaryKeys.size();
-        int numFilterFields = DatasetUtils.getFilterField(dataset) == null ? 0 
: 1;
+        int numFilterFields = DatasetUtil.getFilterField(dataset) == null ? 0 
: 1;
 
         // generate field permutations
         int[] fieldPermutation = new int[numKeys + numFilterFields];
@@ -1546,22 +1451,23 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
         }
 
         String itemTypeName = dataset.getItemTypeName();
-        IAType itemType;
+        ARecordType itemType;
         try {
-            itemType = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, 
dataset.getItemTypeDataverseName(), itemTypeName)
-                    .getDatatype();
+            itemType = (ARecordType) MetadataManager.INSTANCE
+                    .getDatatype(mdTxnCtx, dataset.getItemTypeDataverseName(), 
itemTypeName).getDatatype();
             validateRecordType(itemType);
-            ARecordType recType = (ARecordType) itemType;
+            ARecordType metaType = dataset.hasMetaPart()
+                    ? (ARecordType) 
MetadataManager.INSTANCE.getDatatype(mdTxnCtx,
+                            dataset.getMetaItemTypeDataverseName(), 
dataset.getMetaItemTypeName()).getDatatype()
+                    : null;
 
             // Index parameters.
             Index secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, 
dataset.getDataverseName(),
                     dataset.getDatasetName(), indexName);
 
-            ITypeTraits[] filterTypeTraits = 
DatasetUtils.computeFilterTypeTraits(dataset, recType);
-            IBinaryComparatorFactory[] filterCmpFactories = 
DatasetUtils.computeFilterBinaryComparatorFactories(dataset,
-                    recType, context.getBinaryComparatorFactoryProvider());
-            int[] filterFields = null;
-            int[] btreeFields = null;
+            ITypeTraits[] filterTypeTraits = 
DatasetUtil.computeFilterTypeTraits(dataset, itemType);
+            int[] filterFields;
+            int[] btreeFields;
             if (filterTypeTraits != null) {
                 filterFields = new int[1];
                 filterFields[0] = numKeys;
@@ -1577,73 +1483,60 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
             IBinaryComparatorFactory[] comparatorFactories = new 
IBinaryComparatorFactory[numKeys];
             for (i = 0; i < secondaryKeys.size(); ++i) {
                 Pair<IAType, Boolean> keyPairType = 
Index.getNonNullableOpenFieldType(secondaryKeyTypes.get(i),
-                        secondaryKeyNames.get(i), recType);
+                        secondaryKeyNames.get(i), itemType);
                 IAType keyType = keyPairType.first;
-                comparatorFactories[i] = 
BinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType,
-                        true);
+                comparatorFactories[i] =
+                        
BinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType, 
true);
                 typeTraits[i] = 
TypeTraitProvider.INSTANCE.getTypeTrait(keyType);
             }
-            List<List<String>> partitioningKeys = 
DatasetUtils.getPartitioningKeys(dataset);
+            List<List<String>> partitioningKeys = 
DatasetUtil.getPartitioningKeys(dataset);
             for (List<String> partitioningKey : partitioningKeys) {
-                IAType keyType = recType.getSubFieldType(partitioningKey);
-                comparatorFactories[i] = 
BinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType,
-                        true);
+                IAType keyType = itemType.getSubFieldType(partitioningKey);
+                comparatorFactories[i] =
+                        
BinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType, 
true);
                 typeTraits[i] = 
TypeTraitProvider.INSTANCE.getTypeTrait(keyType);
                 ++i;
             }
 
             IApplicationContextInfo appContext = (IApplicationContextInfo) 
context.getAppContext();
             Pair<IFileSplitProvider, AlgebricksPartitionConstraint> 
splitsAndConstraint =
-                    
splitProviderAndPartitionConstraintsForDataset(dataverseName, datasetName, 
indexName, temp);
+                    getSplitProviderAndConstraints(dataverseName, datasetName, 
indexName, temp);
 
             // prepare callback
             JobId jobId = ((JobEventListenerFactory) 
spec.getJobletEventListenerFactory()).getJobId();
             int datasetId = dataset.getDatasetId();
-            TransactionSubsystemProvider txnSubsystemProvider = new 
TransactionSubsystemProvider();
             IModificationOperationCallbackFactory modificationCallbackFactory 
= temp
                     ? new 
TempDatasetSecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
-                            modificationCallbackPrimaryKeyFields, 
txnSubsystemProvider, indexOp, ResourceType.LSM_BTREE)
+                            modificationCallbackPrimaryKeyFields, 
txnSubsystemProvider, indexOp,
+                            ResourceType.LSM_BTREE)
                     : new 
SecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
-                            modificationCallbackPrimaryKeyFields, 
txnSubsystemProvider, indexOp, ResourceType.LSM_BTREE,
-                            dataset.hasMetaPart());
+                            modificationCallbackPrimaryKeyFields, 
txnSubsystemProvider, indexOp,
+                            ResourceType.LSM_BTREE, dataset.hasMetaPart());
 
-            Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = 
DatasetUtils
-                    .getMergePolicyFactory(dataset, mdTxnCtx);
-            IIndexDataflowHelperFactory idfh = new 
LSMBTreeDataflowHelperFactory(
-                    new AsterixVirtualBufferCacheProvider(datasetId), 
compactionInfo.first, compactionInfo.second,
-                    new 
SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
-                    RuntimeComponentsProvider.RUNTIME_PROVIDER, 
LSMBTreeIOOperationCallbackFactory.INSTANCE,
-                    storageProperties.getBloomFilterFalsePositiveRate(), 
false, filterTypeTraits, filterCmpFactories,
-                    btreeFields, filterFields, !temp);
+            Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
+                    DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx);
+            IIndexDataflowHelperFactory idfh = 
dataset.getIndexDataflowHelperFactory(this, secondaryIndex, itemType,
+                    metaType, compactionInfo.first, compactionInfo.second);
             IOperatorDescriptor op;
             if (bulkload) {
                 long numElementsHint = getCardinalityPerPartitionHint(dataset);
-                op = new TreeIndexBulkLoadOperatorDescriptor(spec, recordDesc, 
appContext.getStorageManagerInterface(),
+                op = new TreeIndexBulkLoadOperatorDescriptor(spec, recordDesc, 
appContext.getStorageManager(),
                         appContext.getIndexLifecycleManagerProvider(), 
splitsAndConstraint.first, typeTraits,
                         comparatorFactories, bloomFilterKeyFields, 
fieldPermutation,
-                        GlobalConfig.DEFAULT_TREE_FILL_FACTOR, false, 
numElementsHint, false, idfh, LSMIndexUtil
-                                .getMetadataPageManagerFactory());
+                        GlobalConfig.DEFAULT_TREE_FILL_FACTOR, false, 
numElementsHint, false, idfh,
+                        metadataPageManagerFactory);
             } else if (indexOp == IndexOperation.UPSERT) {
-                op = new LSMTreeUpsertOperatorDescriptor(spec, recordDesc,
-                        appContext.getStorageManagerInterface(), 
appContext.getIndexLifecycleManagerProvider(),
-                        splitsAndConstraint.first, typeTraits, 
comparatorFactories, bloomFilterKeyFields,
-                        fieldPermutation, idfh, filterFactory, false, 
indexName, null, modificationCallbackFactory,
-                        NoOpOperationCallbackFactory.INSTANCE, 
prevFieldPermutation, LSMIndexUtil
-                                .getMetadataPageManagerFactory());
+                op = new LSMTreeUpsertOperatorDescriptor(spec, recordDesc, 
appContext.getStorageManager(),
+                        appContext.getIndexLifecycleManagerProvider(), 
splitsAndConstraint.first, typeTraits,
+                        comparatorFactories, bloomFilterKeyFields, 
fieldPermutation, idfh, filterFactory, false,
+                        indexName, null, modificationCallbackFactory, 
NoOpOperationCallbackFactory.INSTANCE,
+                        prevFieldPermutation, metadataPageManagerFactory);
             } else {
-                op = new LSMTreeInsertDeleteOperatorDescriptor(spec, 
recordDesc,
-                        appContext.getStorageManagerInterface(), 
appContext.getIndexLifecycleManagerProvider(),
-                        splitsAndConstraint.first, typeTraits, 
comparatorFactories, bloomFilterKeyFields,
-                        fieldPermutation, indexOp,
-                        new LSMBTreeDataflowHelperFactory(new 
AsterixVirtualBufferCacheProvider(datasetId),
-                                compactionInfo.first, compactionInfo.second,
-                                new 
SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
-                                RuntimeComponentsProvider.RUNTIME_PROVIDER,
-                                LSMBTreeIOOperationCallbackFactory.INSTANCE,
-                                
storageProperties.getBloomFilterFalsePositiveRate(), false, filterTypeTraits,
-                                filterCmpFactories, btreeFields, filterFields, 
!temp),
-                        filterFactory, false, indexName, null, 
modificationCallbackFactory,
-                        NoOpOperationCallbackFactory.INSTANCE, 
LSMIndexUtil.getMetadataPageManagerFactory());
+                op = new LSMTreeInsertDeleteOperatorDescriptor(spec, 
recordDesc, appContext.getStorageManager(),
+                        appContext.getIndexLifecycleManagerProvider(), 
splitsAndConstraint.first, typeTraits,
+                        comparatorFactories, bloomFilterKeyFields, 
fieldPermutation, indexOp, idfh, filterFactory,
+                        false, indexName, null, modificationCallbackFactory, 
NoOpOperationCallbackFactory.INSTANCE,
+                        metadataPageManagerFactory);
             }
             return new Pair<>(op, splitsAndConstraint.second);
         } catch (Exception e) {
@@ -1672,11 +1565,9 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
                     dataset.getDatasetName(), indexName);
             List<List<String>> secondaryKeyExprs = 
secondaryIndex.getKeyFieldNames();
             List<IAType> secondaryKeyTypes = secondaryIndex.getKeyFieldTypes();
-            Pair<IAType, Boolean> keyPairType = 
Index.getNonNullableOpenFieldType(secondaryKeyTypes.get(0),
-                    secondaryKeyExprs.get(0), recType);
+            Pair<IAType, Boolean> keyPairType =
+                    
Index.getNonNullableOpenFieldType(secondaryKeyTypes.get(0), 
secondaryKeyExprs.get(0), recType);
             IAType spatialType = keyPairType.first;
-            boolean isPointMBR = spatialType.getTypeTag() == ATypeTag.POINT
-                    || spatialType.getTypeTag() == ATypeTag.POINT3D;
             int dimension = 
NonTaggedFormatUtil.getNumDimensions(spatialType.getTypeTag());
             int numSecondaryKeys = dimension * 2;
             int numPrimaryKeys = primaryKeys.size();
@@ -1684,7 +1575,7 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
             ITypeTraits[] typeTraits = new ITypeTraits[numKeys];
             IBinaryComparatorFactory[] comparatorFactories = new 
IBinaryComparatorFactory[numSecondaryKeys];
 
-            int numFilterFields = DatasetUtils.getFilterField(dataset) == null 
? 0 : 1;
+            int numFilterFields = DatasetUtil.getFilterField(dataset) == null 
? 0 : 1;
             int[] fieldPermutation = new int[numKeys + numFilterFields];
             int[] modificationCallbackPrimaryKeyFields = new 
int[primaryKeys.size()];
             int i = 0;
@@ -1735,33 +1626,31 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
             IPrimitiveValueProviderFactory[] valueProviderFactories =
                     new IPrimitiveValueProviderFactory[numSecondaryKeys];
             for (i = 0; i < numSecondaryKeys; i++) {
-                comparatorFactories[i] = 
BinaryComparatorFactoryProvider.INSTANCE
-                        .getBinaryComparatorFactory(nestedKeyType, true);
+                comparatorFactories[i] =
+                        
BinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(nestedKeyType,
 true);
                 typeTraits[i] = 
TypeTraitProvider.INSTANCE.getTypeTrait(nestedKeyType);
-                valueProviderFactories[i] = 
PrimitiveValueProviderFactory.INSTANCE;
+                valueProviderFactories[i] = primitiveValueProviderFactory;
             }
-            List<List<String>> partitioningKeys = 
DatasetUtils.getPartitioningKeys(dataset);
+            List<List<String>> partitioningKeys = 
DatasetUtil.getPartitioningKeys(dataset);
             for (List<String> partitioningKey : partitioningKeys) {
                 IAType keyType = recType.getSubFieldType(partitioningKey);
                 typeTraits[i] = 
TypeTraitProvider.INSTANCE.getTypeTrait(keyType);
                 ++i;
             }
-            ARecordType metaItemType = DatasetUtils.getMetaType(this, dataset);
-            IBinaryComparatorFactory[] primaryComparatorFactories = 
DatasetUtils.computeKeysBinaryComparatorFactories(
+            ARecordType metaItemType = DatasetUtil.getMetaType(this, dataset);
+            IBinaryComparatorFactory[] primaryComparatorFactories = 
DatasetUtil.computeKeysBinaryComparatorFactories(
                     dataset, recType, metaItemType, 
context.getBinaryComparatorFactoryProvider());
             IApplicationContextInfo appContext = (IApplicationContextInfo) 
context.getAppContext();
             Pair<IFileSplitProvider, AlgebricksPartitionConstraint> 
splitsAndConstraint =
-                    
splitProviderAndPartitionConstraintsForDataset(dataverseName, datasetName, 
indexName, temp);
+                    getSplitProviderAndConstraints(dataverseName, datasetName, 
indexName, temp);
             int[] btreeFields = new int[primaryComparatorFactories.length];
             for (int k = 0; k < btreeFields.length; k++) {
                 btreeFields[k] = k + numSecondaryKeys;
             }
 
-            ITypeTraits[] filterTypeTraits = 
DatasetUtils.computeFilterTypeTraits(dataset, recType);
-            IBinaryComparatorFactory[] filterCmpFactories = 
DatasetUtils.computeFilterBinaryComparatorFactories(dataset,
-                    recType, context.getBinaryComparatorFactoryProvider());
-            int[] filterFields = null;
-            int[] rtreeFields = null;
+            ITypeTraits[] filterTypeTraits = 
DatasetUtil.computeFilterTypeTraits(dataset, recType);
+            int[] filterFields;
+            int[] rtreeFields;
   

<TRUNCATED>

Reply via email to