http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
new file mode 100644
index 0000000..80792b5
--- /dev/null
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
@@ -0,0 +1,528 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.metadata.utils;
+
+import java.io.DataOutput;
+import java.io.File;
+import java.rmi.RemoteException;
+import java.util.List;
+import java.util.Map;
+import java.util.logging.Logger;
+
+import org.apache.asterix.builders.IARecordBuilder;
+import org.apache.asterix.builders.RecordBuilder;
+import org.apache.asterix.common.config.DatasetConfig.DatasetType;
+import org.apache.asterix.common.context.CorrelatedPrefixMergePolicyFactory;
+import org.apache.asterix.common.context.IStorageComponentProvider;
+import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.transactions.IResourceFactory;
+import org.apache.asterix.external.indexing.IndexingConstants;
+import org.apache.asterix.formats.base.IDataFormat;
+import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
+import org.apache.asterix.formats.nontagged.TypeTraitProvider;
+import org.apache.asterix.metadata.MetadataException;
+import org.apache.asterix.metadata.MetadataManager;
+import org.apache.asterix.metadata.MetadataTransactionContext;
+import org.apache.asterix.metadata.declared.BTreeDataflowHelperFactoryProvider;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.metadata.entities.CompactionPolicy;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.metadata.entities.Dataverse;
+import org.apache.asterix.metadata.entities.ExternalDatasetDetails;
+import org.apache.asterix.metadata.entities.Index;
+import org.apache.asterix.metadata.entities.InternalDatasetDetails;
+import org.apache.asterix.om.base.AMutableString;
+import org.apache.asterix.om.base.AString;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.BuiltinType;
+import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.runtime.utils.RuntimeComponentsProvider;
+import org.apache.asterix.runtime.utils.RuntimeUtils;
+import 
org.apache.asterix.transaction.management.resource.LSMBTreeLocalResourceMetadataFactory;
+import 
org.apache.asterix.transaction.management.resource.PersistentLocalResourceFactoryProvider;
+import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;
+import org.apache.hyracks.algebricks.data.IBinaryHashFunctionFactoryProvider;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.api.dataflow.value.ITypeTraits;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileSplit;
+import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
+import 
org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+import 
org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor;
+import 
org.apache.hyracks.storage.am.common.dataflow.TreeIndexCreateOperatorDescriptor;
+import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
+import 
org.apache.hyracks.storage.am.lsm.common.dataflow.LSMTreeIndexCompactOperatorDescriptor;
+import org.apache.hyracks.storage.common.file.ILocalResourceFactoryProvider;
+import org.apache.hyracks.storage.common.file.LocalResource;
+
+public class DatasetUtil {
+    private static final Logger LOGGER = 
Logger.getLogger(DatasetUtil.class.getName());
+    /*
+     * Dataset related operations
+     */
+    public static final byte OP_READ = 0x00;
+    public static final byte OP_INSERT = 0x01;
+    public static final byte OP_DELETE = 0x02;
+    public static final byte OP_UPSERT = 0x03;
+
+    private DatasetUtil() {
+    }
+
+    public static IBinaryComparatorFactory[] 
computeKeysBinaryComparatorFactories(Dataset dataset,
+            ARecordType itemType, ARecordType metaItemType, 
IBinaryComparatorFactoryProvider comparatorFactoryProvider)
+            throws AlgebricksException {
+        List<List<String>> partitioningKeys = getPartitioningKeys(dataset);
+        IBinaryComparatorFactory[] bcfs = new 
IBinaryComparatorFactory[partitioningKeys.size()];
+        if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
+            // Get comparators for RID fields.
+            for (int i = 0; i < partitioningKeys.size(); i++) {
+                try {
+                    bcfs[i] = IndexingConstants.getComparatorFactory(i);
+                } catch (AsterixException e) {
+                    throw new AlgebricksException(e);
+                }
+            }
+        } else {
+            InternalDatasetDetails dsd = (InternalDatasetDetails) 
dataset.getDatasetDetails();
+            for (int i = 0; i < partitioningKeys.size(); i++) {
+                IAType keyType = (dataset.hasMetaPart() && 
dsd.getKeySourceIndicator().get(i).intValue() == 1)
+                        ? metaItemType.getSubFieldType(partitioningKeys.get(i))
+                        : itemType.getSubFieldType(partitioningKeys.get(i));
+                bcfs[i] = 
comparatorFactoryProvider.getBinaryComparatorFactory(keyType, true);
+            }
+        }
+        return bcfs;
+    }
+
+    public static int[] createBloomFilterKeyFields(Dataset dataset) throws 
AlgebricksException {
+        if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
+            throw new AlgebricksException("not implemented");
+        }
+        List<List<String>> partitioningKeys = getPartitioningKeys(dataset);
+        int[] bloomFilterKeyFields = new int[partitioningKeys.size()];
+        for (int i = 0; i < partitioningKeys.size(); ++i) {
+            bloomFilterKeyFields[i] = i;
+        }
+        return bloomFilterKeyFields;
+    }
+
+    public static IBinaryHashFunctionFactory[] 
computeKeysBinaryHashFunFactories(Dataset dataset, ARecordType itemType,
+            IBinaryHashFunctionFactoryProvider hashFunProvider) throws 
AlgebricksException {
+        if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
+            throw new AlgebricksException("not implemented");
+        }
+        List<List<String>> partitioningKeys = getPartitioningKeys(dataset);
+        IBinaryHashFunctionFactory[] bhffs = new 
IBinaryHashFunctionFactory[partitioningKeys.size()];
+        for (int i = 0; i < partitioningKeys.size(); i++) {
+            IAType keyType = itemType.getSubFieldType(partitioningKeys.get(i));
+            bhffs[i] = hashFunProvider.getBinaryHashFunctionFactory(keyType);
+        }
+        return bhffs;
+    }
+
+    public static ITypeTraits[] computeTupleTypeTraits(Dataset dataset, 
ARecordType itemType, ARecordType metaItemType)
+            throws AlgebricksException {
+        if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
+            throw new AlgebricksException("not implemented");
+        }
+        List<List<String>> partitioningKeys = 
DatasetUtil.getPartitioningKeys(dataset);
+        int numKeys = partitioningKeys.size();
+        ITypeTraits[] typeTraits;
+        if (metaItemType != null) {
+            typeTraits = new ITypeTraits[numKeys + 2];
+            List<Integer> indicator = ((InternalDatasetDetails) 
dataset.getDatasetDetails()).getKeySourceIndicator();
+            typeTraits[numKeys + 1] = 
TypeTraitProvider.INSTANCE.getTypeTrait(metaItemType);
+            for (int i = 0; i < numKeys; i++) {
+                IAType keyType;
+                if (indicator.get(i) == 0) {
+                    keyType = 
itemType.getSubFieldType(partitioningKeys.get(i));
+                } else {
+                    keyType = 
metaItemType.getSubFieldType(partitioningKeys.get(i));
+                }
+                typeTraits[i] = 
TypeTraitProvider.INSTANCE.getTypeTrait(keyType);
+            }
+        } else {
+            typeTraits = new ITypeTraits[numKeys + 1];
+            for (int i = 0; i < numKeys; i++) {
+                IAType keyType;
+                keyType = itemType.getSubFieldType(partitioningKeys.get(i));
+                typeTraits[i] = 
TypeTraitProvider.INSTANCE.getTypeTrait(keyType);
+            }
+        }
+        typeTraits[numKeys] = 
TypeTraitProvider.INSTANCE.getTypeTrait(itemType);
+        return typeTraits;
+    }
+
+    public static List<List<String>> getPartitioningKeys(Dataset dataset) {
+        if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
+            return IndexingConstants
+                    .getRIDKeys(((ExternalDatasetDetails) 
dataset.getDatasetDetails()).getProperties());
+        }
+        return ((InternalDatasetDetails) 
dataset.getDatasetDetails()).getPartitioningKey();
+    }
+
+    public static List<String> getFilterField(Dataset dataset) {
+        return ((InternalDatasetDetails) 
dataset.getDatasetDetails()).getFilterField();
+    }
+
+    public static IBinaryComparatorFactory[] 
computeFilterBinaryComparatorFactories(Dataset dataset,
+            ARecordType itemType, IBinaryComparatorFactoryProvider 
comparatorFactoryProvider)
+            throws AlgebricksException {
+        if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
+            return null;
+        }
+        List<String> filterField = getFilterField(dataset);
+        if (filterField == null) {
+            return null;
+        }
+        IBinaryComparatorFactory[] bcfs = new IBinaryComparatorFactory[1];
+        IAType type = itemType.getSubFieldType(filterField);
+        bcfs[0] = comparatorFactoryProvider.getBinaryComparatorFactory(type, 
true);
+        return bcfs;
+    }
+
+    public static ITypeTraits[] computeFilterTypeTraits(Dataset dataset, 
ARecordType itemType)
+            throws AlgebricksException {
+        if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
+            return null;
+        }
+        List<String> filterField = getFilterField(dataset);
+        if (filterField == null) {
+            return null;
+        }
+        ITypeTraits[] typeTraits = new ITypeTraits[1];
+        IAType type = itemType.getSubFieldType(filterField);
+        typeTraits[0] = TypeTraitProvider.INSTANCE.getTypeTrait(type);
+        return typeTraits;
+    }
+
+    public static int[] createFilterFields(Dataset dataset) throws 
AlgebricksException {
+        if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
+            return null;
+        }
+
+        List<String> filterField = getFilterField(dataset);
+        if (filterField == null) {
+            return null;
+        }
+        List<List<String>> partitioningKeys = 
DatasetUtil.getPartitioningKeys(dataset);
+        int numKeys = partitioningKeys.size();
+
+        int[] filterFields = new int[1];
+        filterFields[0] = numKeys + 1;
+        return filterFields;
+    }
+
+    public static int[] createBTreeFieldsWhenThereisAFilter(Dataset dataset) 
throws AlgebricksException {
+        if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
+            return null;
+        }
+
+        List<String> filterField = getFilterField(dataset);
+        if (filterField == null) {
+            return null;
+        }
+
+        List<List<String>> partitioningKeys = getPartitioningKeys(dataset);
+        int valueFields = dataset.hasMetaPart() ? 2 : 1;
+        int[] btreeFields = new int[partitioningKeys.size() + valueFields];
+        for (int i = 0; i < btreeFields.length; ++i) {
+            btreeFields[i] = i;
+        }
+        return btreeFields;
+    }
+
+    public static int getPositionOfPartitioningKeyField(Dataset dataset, 
String fieldExpr) {
+        List<List<String>> partitioningKeys = 
DatasetUtil.getPartitioningKeys(dataset);
+        for (int i = 0; i < partitioningKeys.size(); i++) {
+            if ((partitioningKeys.get(i).size() == 1) && 
partitioningKeys.get(i).get(0).equals(fieldExpr)) {
+                return i;
+            }
+        }
+        return -1;
+    }
+
+    public static Pair<ILSMMergePolicyFactory, Map<String, String>> 
getMergePolicyFactory(Dataset dataset,
+            MetadataTransactionContext mdTxnCtx) throws AlgebricksException, 
MetadataException {
+        String policyName = dataset.getCompactionPolicy();
+        CompactionPolicy compactionPolicy = 
MetadataManager.INSTANCE.getCompactionPolicy(mdTxnCtx,
+                MetadataConstants.METADATA_DATAVERSE_NAME, policyName);
+        String compactionPolicyFactoryClassName = 
compactionPolicy.getClassName();
+        ILSMMergePolicyFactory mergePolicyFactory;
+        try {
+            mergePolicyFactory =
+                    (ILSMMergePolicyFactory) 
Class.forName(compactionPolicyFactoryClassName).newInstance();
+            if (mergePolicyFactory.getName().compareTo("correlated-prefix") == 
0) {
+                ((CorrelatedPrefixMergePolicyFactory) 
mergePolicyFactory).setDatasetID(dataset.getDatasetId());
+            }
+        } catch (InstantiationException | IllegalAccessException | 
ClassNotFoundException e) {
+            throw new AlgebricksException(e);
+        }
+        Map<String, String> properties = 
dataset.getCompactionPolicyProperties();
+        return new Pair<>(mergePolicyFactory, properties);
+    }
+
+    @SuppressWarnings("unchecked")
+    public static void writePropertyTypeRecord(String name, String value, 
DataOutput out, ARecordType recordType)
+            throws HyracksDataException {
+        IARecordBuilder propertyRecordBuilder = new RecordBuilder();
+        ArrayBackedValueStorage fieldValue = new ArrayBackedValueStorage();
+        propertyRecordBuilder.reset(recordType);
+        AMutableString aString = new AMutableString("");
+        ISerializerDeserializer<AString> stringSerde =
+                
SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ASTRING);
+
+        // write field 0
+        fieldValue.reset();
+        aString.setValue(name);
+        stringSerde.serialize(aString, fieldValue.getDataOutput());
+        propertyRecordBuilder.addField(0, fieldValue);
+
+        // write field 1
+        fieldValue.reset();
+        aString.setValue(value);
+        stringSerde.serialize(aString, fieldValue.getDataOutput());
+        propertyRecordBuilder.addField(1, fieldValue);
+
+        propertyRecordBuilder.write(out, true);
+    }
+
+    public static ARecordType getMetaType(MetadataProvider metadataProvider, 
Dataset dataset)
+            throws AlgebricksException {
+        if (dataset.hasMetaPart()) {
+            return (ARecordType) 
metadataProvider.findType(dataset.getMetaItemTypeDataverseName(),
+                    dataset.getMetaItemTypeName());
+        }
+        return null;
+    }
+
+    public static JobSpecification createDropDatasetJobSpec(Dataset dataset, 
Index primaryIndex,
+            MetadataProvider metadataProvider)
+            throws AlgebricksException, HyracksDataException, RemoteException, 
ACIDException {
+        String datasetPath = dataset.getDataverseName() + File.separator + 
dataset.getDatasetName();
+        LOGGER.info("DROP DATASETPATH: " + datasetPath);
+        if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
+            return RuntimeUtils.createJobSpecification();
+        }
+        boolean temp = dataset.getDatasetDetails().isTemp();
+        ARecordType itemType =
+                (ARecordType) 
metadataProvider.findType(dataset.getItemTypeDataverseName(), 
dataset.getItemTypeName());
+        ARecordType metaType = DatasetUtil.getMetaType(metadataProvider, 
dataset);
+        JobSpecification specPrimary = RuntimeUtils.createJobSpecification();
+        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> 
splitsAndConstraint =
+                
metadataProvider.getSplitProviderAndConstraints(dataset.getDataverseName(), 
dataset.getDatasetName(),
+                        dataset.getDatasetName(), temp);
+        Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
+                DatasetUtil.getMergePolicyFactory(dataset, 
metadataProvider.getMetadataTxnContext());
+        IIndexDataflowHelperFactory indexDataflowHelperFactory = 
dataset.getIndexDataflowHelperFactory(
+                metadataProvider, primaryIndex, itemType, metaType, 
compactionInfo.first, compactionInfo.second);
+        IStorageComponentProvider storageComponentProvider = 
metadataProvider.getStorageComponentProvider();
+        IndexDropOperatorDescriptor primaryBtreeDrop =
+                new IndexDropOperatorDescriptor(specPrimary, 
storageComponentProvider.getStorageManager(),
+                        
storageComponentProvider.getIndexLifecycleManagerProvider(), 
splitsAndConstraint.first,
+                        indexDataflowHelperFactory, 
storageComponentProvider.getMetadataPageManagerFactory());
+        
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(specPrimary,
 primaryBtreeDrop,
+                splitsAndConstraint.second);
+        specPrimary.addRoot(primaryBtreeDrop);
+        return specPrimary;
+    }
+
+    public static JobSpecification buildDropFilesIndexJobSpec(MetadataProvider 
metadataProvider, Dataset dataset)
+            throws AlgebricksException {
+        String indexName = 
IndexingConstants.getFilesIndexName(dataset.getDatasetName());
+        JobSpecification spec = RuntimeUtils.createJobSpecification();
+        IStorageComponentProvider storageComponentProvider = 
metadataProvider.getStorageComponentProvider();
+        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> 
splitsAndConstraint =
+                
metadataProvider.splitProviderAndPartitionConstraintsForFilesIndex(dataset.getDataverseName(),
+                        dataset.getDatasetName(), indexName, true);
+        Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
+                DatasetUtil.getMergePolicyFactory(dataset, 
metadataProvider.getMetadataTxnContext());
+        String fileIndexName = 
BTreeDataflowHelperFactoryProvider.externalFileIndexName(dataset);
+        Index fileIndex = 
MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(),
+                dataset.getDataverseName(), dataset.getDatasetName(), 
fileIndexName);
+        IIndexDataflowHelperFactory dataflowHelperFactory = 
dataset.getIndexDataflowHelperFactory(metadataProvider,
+                fileIndex, null, null, compactionInfo.first, 
compactionInfo.second);
+        IndexDropOperatorDescriptor btreeDrop =
+                new IndexDropOperatorDescriptor(spec, 
storageComponentProvider.getStorageManager(),
+                        
storageComponentProvider.getIndexLifecycleManagerProvider(), 
splitsAndConstraint.first,
+                        dataflowHelperFactory, 
storageComponentProvider.getMetadataPageManagerFactory());
+        
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, 
btreeDrop,
+                splitsAndConstraint.second);
+        spec.addRoot(btreeDrop);
+        return spec;
+    }
+
+    public static JobSpecification dropDatasetJobSpec(Dataset dataset, Index 
primaryIndex,
+            MetadataProvider metadataProvider)
+            throws AlgebricksException, HyracksDataException, RemoteException, 
ACIDException {
+        String datasetPath = dataset.getDataverseName() + File.separator + 
dataset.getDatasetName();
+        LOGGER.info("DROP DATASETPATH: " + datasetPath);
+        if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
+            return RuntimeUtils.createJobSpecification();
+        }
+
+        boolean temp = dataset.getDatasetDetails().isTemp();
+        ARecordType itemType =
+                (ARecordType) 
metadataProvider.findType(dataset.getItemTypeDataverseName(), 
dataset.getItemTypeName());
+        ARecordType metaType = DatasetUtil.getMetaType(metadataProvider, 
dataset);
+        JobSpecification specPrimary = RuntimeUtils.createJobSpecification();
+        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> 
splitsAndConstraint =
+                
metadataProvider.getSplitProviderAndConstraints(dataset.getDataverseName(), 
dataset.getDatasetName(),
+                        dataset.getDatasetName(), temp);
+        Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
+                DatasetUtil.getMergePolicyFactory(dataset, 
metadataProvider.getMetadataTxnContext());
+
+        IIndexDataflowHelperFactory indexDataflowHelperFactory = 
dataset.getIndexDataflowHelperFactory(
+                metadataProvider, primaryIndex, itemType, metaType, 
compactionInfo.first, compactionInfo.second);
+        IStorageComponentProvider storageComponentProvider = 
metadataProvider.getStorageComponentProvider();
+        IndexDropOperatorDescriptor primaryBtreeDrop =
+                new IndexDropOperatorDescriptor(specPrimary, 
storageComponentProvider.getStorageManager(),
+                        
storageComponentProvider.getIndexLifecycleManagerProvider(), 
splitsAndConstraint.first,
+                        indexDataflowHelperFactory, 
storageComponentProvider.getMetadataPageManagerFactory());
+        
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(specPrimary,
 primaryBtreeDrop,
+                splitsAndConstraint.second);
+
+        specPrimary.addRoot(primaryBtreeDrop);
+
+        return specPrimary;
+    }
+
+    public static JobSpecification createDatasetJobSpec(Dataverse dataverse, 
String datasetName,
+            MetadataProvider metadataProvider) throws AsterixException, 
AlgebricksException {
+        IStorageComponentProvider storageComponentProvider = 
metadataProvider.getStorageComponentProvider();
+        String dataverseName = dataverse.getDataverseName();
+        IDataFormat format;
+        try {
+            format = (IDataFormat) 
Class.forName(dataverse.getDataFormat()).newInstance();
+        } catch (Exception e) {
+            throw new AsterixException(e);
+        }
+        Dataset dataset = metadataProvider.findDataset(dataverseName, 
datasetName);
+        if (dataset == null) {
+            throw new AsterixException("Could not find dataset " + datasetName 
+ " in dataverse " + dataverseName);
+        }
+        Index index = 
MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(), 
dataverseName,
+                datasetName, datasetName);
+        boolean temp = dataset.getDatasetDetails().isTemp();
+        ARecordType itemType =
+                (ARecordType) 
metadataProvider.findType(dataset.getItemTypeDataverseName(), 
dataset.getItemTypeName());
+        // get meta item type
+        ARecordType metaItemType = null;
+        if (dataset.hasMetaPart()) {
+            metaItemType = (ARecordType) 
metadataProvider.findType(dataset.getMetaItemTypeDataverseName(),
+                    dataset.getMetaItemTypeName());
+        }
+        JobSpecification spec = RuntimeUtils.createJobSpecification();
+        IBinaryComparatorFactory[] comparatorFactories = 
DatasetUtil.computeKeysBinaryComparatorFactories(dataset,
+                itemType, metaItemType, 
format.getBinaryComparatorFactoryProvider());
+        ITypeTraits[] typeTraits = DatasetUtil.computeTupleTypeTraits(dataset, 
itemType, metaItemType);
+        int[] bloomFilterKeyFields = 
DatasetUtil.createBloomFilterKeyFields(dataset);
+
+        ITypeTraits[] filterTypeTraits = 
DatasetUtil.computeFilterTypeTraits(dataset, itemType);
+        IBinaryComparatorFactory[] filterCmpFactories = 
DatasetUtil.computeFilterBinaryComparatorFactories(dataset,
+                itemType, format.getBinaryComparatorFactoryProvider());
+        int[] filterFields = DatasetUtil.createFilterFields(dataset);
+        int[] btreeFields = 
DatasetUtil.createBTreeFieldsWhenThereisAFilter(dataset);
+
+        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> 
splitsAndConstraint =
+                metadataProvider.getSplitProviderAndConstraints(dataverseName, 
datasetName, datasetName, temp);
+        FileSplit[] fs = splitsAndConstraint.first.getFileSplits();
+        StringBuilder sb = new StringBuilder();
+        for (int i = 0; i < fs.length; i++) {
+            sb.append(fs[i] + " ");
+        }
+        LOGGER.info("CREATING File Splits: " + sb.toString());
+
+        Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
+                DatasetUtil.getMergePolicyFactory(dataset, 
metadataProvider.getMetadataTxnContext());
+        //prepare a LocalResourceMetadata which will be stored in NC's local 
resource repository
+        IResourceFactory localResourceMetadata = new 
LSMBTreeLocalResourceMetadataFactory(typeTraits,
+                comparatorFactories, bloomFilterKeyFields, true, 
dataset.getDatasetId(), compactionInfo.first,
+                compactionInfo.second, filterTypeTraits, filterCmpFactories, 
btreeFields, filterFields,
+                dataset.getIndexOperationTrackerFactory(index), 
dataset.getIoOperationCallbackFactory(index),
+                storageComponentProvider.getMetadataPageManagerFactory());
+        ILocalResourceFactoryProvider localResourceFactoryProvider =
+                new 
PersistentLocalResourceFactoryProvider(localResourceMetadata, 
LocalResource.LSMBTreeResource);
+        IIndexDataflowHelperFactory dataflowHelperFactory = 
dataset.getIndexDataflowHelperFactory(metadataProvider,
+                index, itemType, metaItemType, compactionInfo.first, 
compactionInfo.second);
+        TreeIndexCreateOperatorDescriptor indexCreateOp = new 
TreeIndexCreateOperatorDescriptor(spec,
+                RuntimeComponentsProvider.RUNTIME_PROVIDER, 
RuntimeComponentsProvider.RUNTIME_PROVIDER,
+                splitsAndConstraint.first, typeTraits, comparatorFactories, 
bloomFilterKeyFields,
+                dataflowHelperFactory, localResourceFactoryProvider, 
NoOpOperationCallbackFactory.INSTANCE,
+                storageComponentProvider.getMetadataPageManagerFactory());
+        
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, 
indexCreateOp,
+                splitsAndConstraint.second);
+        spec.addRoot(indexCreateOp);
+        return spec;
+    }
+
+    public static JobSpecification compactDatasetJobSpec(Dataverse dataverse, 
String datasetName,
+            MetadataProvider metadataProvider) throws AsterixException, 
AlgebricksException {
+        String dataverseName = dataverse.getDataverseName();
+        IDataFormat format;
+        try {
+            format = (IDataFormat) 
Class.forName(dataverse.getDataFormat()).newInstance();
+        } catch (Exception e) {
+            throw new AsterixException(e);
+        }
+        Dataset dataset = metadataProvider.findDataset(dataverseName, 
datasetName);
+        if (dataset == null) {
+            throw new AsterixException("Could not find dataset " + datasetName 
+ " in dataverse " + dataverseName);
+        }
+        boolean temp = dataset.getDatasetDetails().isTemp();
+        ARecordType itemType =
+                (ARecordType) 
metadataProvider.findType(dataset.getItemTypeDataverseName(), 
dataset.getItemTypeName());
+        ARecordType metaItemType = DatasetUtil.getMetaType(metadataProvider, 
dataset);
+        JobSpecification spec = RuntimeUtils.createJobSpecification();
+        IBinaryComparatorFactory[] comparatorFactories = 
DatasetUtil.computeKeysBinaryComparatorFactories(dataset,
+                itemType, metaItemType, 
format.getBinaryComparatorFactoryProvider());
+        ITypeTraits[] typeTraits = DatasetUtil.computeTupleTypeTraits(dataset, 
itemType, metaItemType);
+        int[] blooFilterKeyFields = 
DatasetUtil.createBloomFilterKeyFields(dataset);
+        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> 
splitsAndConstraint =
+                metadataProvider.getSplitProviderAndConstraints(dataverseName, 
datasetName, datasetName, temp);
+        Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
+                DatasetUtil.getMergePolicyFactory(dataset, 
metadataProvider.getMetadataTxnContext());
+        Index index = 
MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(),
+                dataset.getDataverseName(), datasetName, datasetName);
+        IIndexDataflowHelperFactory dataflowHelperFactory = 
dataset.getIndexDataflowHelperFactory(metadataProvider,
+                index, itemType, metaItemType, compactionInfo.first, 
compactionInfo.second);
+        LSMTreeIndexCompactOperatorDescriptor compactOp = new 
LSMTreeIndexCompactOperatorDescriptor(spec,
+                RuntimeComponentsProvider.RUNTIME_PROVIDER, 
RuntimeComponentsProvider.RUNTIME_PROVIDER,
+                splitsAndConstraint.first, typeTraits, comparatorFactories, 
blooFilterKeyFields, dataflowHelperFactory,
+                NoOpOperationCallbackFactory.INSTANCE,
+                
metadataProvider.getStorageComponentProvider().getMetadataPageManagerFactory());
+        
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, 
compactOp,
+                splitsAndConstraint.second);
+
+        
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, 
compactOp,
+                splitsAndConstraint.second);
+        spec.addRoot(compactOp);
+        return spec;
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtils.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtils.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtils.java
deleted file mode 100644
index 18baab1..0000000
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtils.java
+++ /dev/null
@@ -1,285 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.asterix.metadata.utils;
-
-import java.io.DataOutput;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.asterix.builders.IARecordBuilder;
-import org.apache.asterix.builders.RecordBuilder;
-import org.apache.asterix.common.config.DatasetConfig.DatasetType;
-import org.apache.asterix.common.context.CorrelatedPrefixMergePolicyFactory;
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.external.indexing.IndexingConstants;
-import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
-import org.apache.asterix.formats.nontagged.TypeTraitProvider;
-import org.apache.asterix.metadata.MetadataException;
-import org.apache.asterix.metadata.MetadataManager;
-import org.apache.asterix.metadata.MetadataTransactionContext;
-import org.apache.asterix.metadata.declared.MetadataProvider;
-import org.apache.asterix.metadata.entities.CompactionPolicy;
-import org.apache.asterix.metadata.entities.Dataset;
-import org.apache.asterix.metadata.entities.ExternalDatasetDetails;
-import org.apache.asterix.metadata.entities.InternalDatasetDetails;
-import org.apache.asterix.om.base.AMutableString;
-import org.apache.asterix.om.base.AString;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.asterix.om.types.BuiltinType;
-import org.apache.asterix.om.types.IAType;
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.common.utils.Pair;
-import org.apache.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;
-import org.apache.hyracks.algebricks.data.IBinaryHashFunctionFactoryProvider;
-import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
-import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
-import org.apache.hyracks.api.dataflow.value.ITypeTraits;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
-
-public class DatasetUtils {
-    public static IBinaryComparatorFactory[] 
computeKeysBinaryComparatorFactories(Dataset dataset, ARecordType itemType,
-            ARecordType metaItemType, IBinaryComparatorFactoryProvider 
comparatorFactoryProvider)
-            throws AlgebricksException {
-        List<List<String>> partitioningKeys = getPartitioningKeys(dataset);
-        IBinaryComparatorFactory[] bcfs = new 
IBinaryComparatorFactory[partitioningKeys.size()];
-        if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
-            // Get comparators for RID fields.
-            for (int i = 0; i < partitioningKeys.size(); i++) {
-                try {
-                    bcfs[i] = IndexingConstants.getComparatorFactory(i);
-                } catch (AsterixException e) {
-                    throw new AlgebricksException(e);
-                }
-            }
-        } else {
-            InternalDatasetDetails dsd = (InternalDatasetDetails) 
dataset.getDatasetDetails();
-            for (int i = 0; i < partitioningKeys.size(); i++) {
-                IAType keyType = (dataset.hasMetaPart() && 
dsd.getKeySourceIndicator().get(i).intValue() == 1)
-                        ? metaItemType.getSubFieldType(partitioningKeys.get(i))
-                        : itemType.getSubFieldType(partitioningKeys.get(i));
-                bcfs[i] = 
comparatorFactoryProvider.getBinaryComparatorFactory(keyType, true);
-            }
-        }
-        return bcfs;
-    }
-
-    public static int[] createBloomFilterKeyFields(Dataset dataset) throws 
AlgebricksException {
-        if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
-            throw new AlgebricksException("not implemented");
-        }
-        List<List<String>> partitioningKeys = getPartitioningKeys(dataset);
-        int[] bloomFilterKeyFields = new int[partitioningKeys.size()];
-        for (int i = 0; i < partitioningKeys.size(); ++i) {
-            bloomFilterKeyFields[i] = i;
-        }
-        return bloomFilterKeyFields;
-    }
-
-    public static IBinaryHashFunctionFactory[] 
computeKeysBinaryHashFunFactories(Dataset dataset, ARecordType itemType,
-            IBinaryHashFunctionFactoryProvider hashFunProvider) throws 
AlgebricksException {
-        if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
-            throw new AlgebricksException("not implemented");
-        }
-        List<List<String>> partitioningKeys = getPartitioningKeys(dataset);
-        IBinaryHashFunctionFactory[] bhffs = new 
IBinaryHashFunctionFactory[partitioningKeys.size()];
-        for (int i = 0; i < partitioningKeys.size(); i++) {
-            IAType keyType = itemType.getSubFieldType(partitioningKeys.get(i));
-            bhffs[i] = hashFunProvider.getBinaryHashFunctionFactory(keyType);
-        }
-        return bhffs;
-    }
-
-    public static ITypeTraits[] computeTupleTypeTraits(Dataset dataset, 
ARecordType itemType, ARecordType metaItemType)
-            throws AlgebricksException {
-        if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
-            throw new AlgebricksException("not implemented");
-        }
-        List<List<String>> partitioningKeys = 
DatasetUtils.getPartitioningKeys(dataset);
-        int numKeys = partitioningKeys.size();
-        ITypeTraits[] typeTraits;
-        if (metaItemType != null) {
-            typeTraits = new ITypeTraits[numKeys + 2];
-            List<Integer> indicator = ((InternalDatasetDetails) 
dataset.getDatasetDetails()).getKeySourceIndicator();
-            typeTraits[numKeys + 1] = 
TypeTraitProvider.INSTANCE.getTypeTrait(metaItemType);
-            for (int i = 0; i < numKeys; i++) {
-                IAType keyType;
-                if (indicator.get(i) == 0) {
-                    keyType = 
itemType.getSubFieldType(partitioningKeys.get(i));
-                } else {
-                    keyType = 
metaItemType.getSubFieldType(partitioningKeys.get(i));
-                }
-                typeTraits[i] = 
TypeTraitProvider.INSTANCE.getTypeTrait(keyType);
-            }
-        } else {
-            typeTraits = new ITypeTraits[numKeys + 1];
-            for (int i = 0; i < numKeys; i++) {
-                IAType keyType;
-                keyType = itemType.getSubFieldType(partitioningKeys.get(i));
-                typeTraits[i] = 
TypeTraitProvider.INSTANCE.getTypeTrait(keyType);
-            }
-        }
-        typeTraits[numKeys] = 
TypeTraitProvider.INSTANCE.getTypeTrait(itemType);
-        return typeTraits;
-    }
-
-    public static List<List<String>> getPartitioningKeys(Dataset dataset) {
-        if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
-            return IndexingConstants.getRIDKeys(((ExternalDatasetDetails) 
dataset.getDatasetDetails()).getProperties());
-        }
-        return ((InternalDatasetDetails) 
dataset.getDatasetDetails()).getPartitioningKey();
-    }
-
-    public static List<String> getFilterField(Dataset dataset) {
-        return (((InternalDatasetDetails) 
dataset.getDatasetDetails())).getFilterField();
-    }
-
-    public static IBinaryComparatorFactory[] 
computeFilterBinaryComparatorFactories(Dataset dataset,
-            ARecordType itemType, IBinaryComparatorFactoryProvider 
comparatorFactoryProvider)
-            throws AlgebricksException {
-        if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
-            return null;
-        }
-        List<String> filterField = getFilterField(dataset);
-        if (filterField == null) {
-            return null;
-        }
-        IBinaryComparatorFactory[] bcfs = new IBinaryComparatorFactory[1];
-        IAType type = itemType.getSubFieldType(filterField);
-        bcfs[0] = comparatorFactoryProvider.getBinaryComparatorFactory(type, 
true);
-        return bcfs;
-    }
-
-    public static ITypeTraits[] computeFilterTypeTraits(Dataset dataset, 
ARecordType itemType)
-            throws AlgebricksException {
-        if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
-            return null;
-        }
-        List<String> filterField = getFilterField(dataset);
-        if (filterField == null) {
-            return null;
-        }
-        ITypeTraits[] typeTraits = new ITypeTraits[1];
-        IAType type = itemType.getSubFieldType(filterField);
-        typeTraits[0] = TypeTraitProvider.INSTANCE.getTypeTrait(type);
-        return typeTraits;
-    }
-
-    public static int[] createFilterFields(Dataset dataset) throws 
AlgebricksException {
-        if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
-            return null;
-        }
-
-        List<String> filterField = getFilterField(dataset);
-        if (filterField == null) {
-            return null;
-        }
-        List<List<String>> partitioningKeys = 
DatasetUtils.getPartitioningKeys(dataset);
-        int numKeys = partitioningKeys.size();
-
-        int[] filterFields = new int[1];
-        filterFields[0] = numKeys + 1;
-        return filterFields;
-    }
-
-    public static int[] createBTreeFieldsWhenThereisAFilter(Dataset dataset) 
throws AlgebricksException {
-        if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
-            return null;
-        }
-
-        List<String> filterField = getFilterField(dataset);
-        if (filterField == null) {
-            return null;
-        }
-
-        List<List<String>> partitioningKeys = getPartitioningKeys(dataset);
-        int valueFields = dataset.hasMetaPart() ? 2 : 1;
-        int[] btreeFields = new int[partitioningKeys.size() + valueFields];
-        for (int i = 0; i < btreeFields.length; ++i) {
-            btreeFields[i] = i;
-        }
-        return btreeFields;
-    }
-
-    public static int getPositionOfPartitioningKeyField(Dataset dataset, 
String fieldExpr) {
-        List<List<String>> partitioningKeys = 
DatasetUtils.getPartitioningKeys(dataset);
-        for (int i = 0; i < partitioningKeys.size(); i++) {
-            if ((partitioningKeys.get(i).size() == 1) && 
partitioningKeys.get(i).get(0).equals(fieldExpr)) {
-                return i;
-            }
-        }
-        return -1;
-    }
-
-    public static Pair<ILSMMergePolicyFactory, Map<String, String>> 
getMergePolicyFactory(Dataset dataset,
-            MetadataTransactionContext mdTxnCtx) throws AlgebricksException, 
MetadataException {
-        String policyName = dataset.getCompactionPolicy();
-        CompactionPolicy compactionPolicy = 
MetadataManager.INSTANCE.getCompactionPolicy(mdTxnCtx,
-                MetadataConstants.METADATA_DATAVERSE_NAME, policyName);
-        String compactionPolicyFactoryClassName = 
compactionPolicy.getClassName();
-        ILSMMergePolicyFactory mergePolicyFactory;
-        try {
-            mergePolicyFactory = (ILSMMergePolicyFactory) 
Class.forName(compactionPolicyFactoryClassName).newInstance();
-            if (mergePolicyFactory.getName().compareTo("correlated-prefix") == 
0) {
-                ((CorrelatedPrefixMergePolicyFactory) 
mergePolicyFactory).setDatasetID(dataset.getDatasetId());
-            }
-        } catch (InstantiationException | IllegalAccessException | 
ClassNotFoundException e) {
-            throw new AlgebricksException(e);
-        }
-        Map<String, String> properties = 
dataset.getCompactionPolicyProperties();
-        return new Pair<ILSMMergePolicyFactory, Map<String, 
String>>(mergePolicyFactory, properties);
-    }
-
-    @SuppressWarnings("unchecked")
-    public static void writePropertyTypeRecord(String name, String value, 
DataOutput out, ARecordType recordType)
-            throws HyracksDataException {
-        IARecordBuilder propertyRecordBuilder = new RecordBuilder();
-        ArrayBackedValueStorage fieldValue = new ArrayBackedValueStorage();
-        propertyRecordBuilder.reset(recordType);
-        AMutableString aString = new AMutableString("");
-        ISerializerDeserializer<AString> stringSerde =
-                
SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ASTRING);
-
-        // write field 0
-        fieldValue.reset();
-        aString.setValue(name);
-        stringSerde.serialize(aString, fieldValue.getDataOutput());
-        propertyRecordBuilder.addField(0, fieldValue);
-
-        // write field 1
-        fieldValue.reset();
-        aString.setValue(value);
-        stringSerde.serialize(aString, fieldValue.getDataOutput());
-        propertyRecordBuilder.addField(1, fieldValue);
-
-        propertyRecordBuilder.write(out, true);
-    }
-
-    public static ARecordType getMetaType(MetadataProvider metadataProvider, 
Dataset dataset)
-            throws AlgebricksException {
-        if (dataset.hasMetaPart()) {
-            return (ARecordType) 
metadataProvider.findType(dataset.getMetaItemTypeDataverseName(),
-                    dataset.getMetaItemTypeName());
-        }
-        return null;
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/ExternalDatasetsRegistry.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/ExternalDatasetsRegistry.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/ExternalDatasetsRegistry.java
index 17a3ebe..2e35ed4 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/ExternalDatasetsRegistry.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/ExternalDatasetsRegistry.java
@@ -34,11 +34,11 @@ import org.apache.asterix.metadata.entities.Dataset;
  * @author alamouda
  */
 public class ExternalDatasetsRegistry {
-    public static ExternalDatasetsRegistry INSTANCE = new 
ExternalDatasetsRegistry();
-    private ConcurrentHashMap<String, ExternalDatasetAccessManager> 
globalRegister;
+    public static final ExternalDatasetsRegistry INSTANCE = new 
ExternalDatasetsRegistry();
+    private final ConcurrentHashMap<String, ExternalDatasetAccessManager> 
globalRegister;
 
     private ExternalDatasetsRegistry() {
-        globalRegister = new ConcurrentHashMap<String, 
ExternalDatasetAccessManager>();
+        globalRegister = new ConcurrentHashMap<>();
     }
 
     /**
@@ -59,12 +59,12 @@ public class ExternalDatasetsRegistry {
 
     public int getAndLockDatasetVersion(Dataset dataset, MetadataProvider 
metadataProvider) {
 
-        Map<String, Integer> locks = null;
+        Map<String, Integer> locks;
         String lockKey = dataset.getDataverseName() + "." + 
dataset.getDatasetName();
         // check first if the lock was aquired already
         locks = metadataProvider.getLocks();
         if (locks == null) {
-            locks = new HashMap<String, Integer>();
+            locks = new HashMap<>();
             metadataProvider.setLocks(locks);
         } else {
             // if dataset was accessed already by this job, return the 
registered version
@@ -130,7 +130,10 @@ public class ExternalDatasetsRegistry {
             // if dataset was accessed already by this job, return the 
registered version
             Set<Entry<String, Integer>> aquiredLocks = locks.entrySet();
             for (Entry<String, Integer> entry : aquiredLocks) {
-                globalRegister.get(entry.getKey()).queryEnd(entry.getValue());
+                ExternalDatasetAccessManager accessManager = 
globalRegister.get(entry.getKey());
+                if (accessManager != null) {
+                    accessManager.queryEnd(entry.getValue());
+                }
             }
             locks.clear();
         }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/ExternalIndexingOperations.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/ExternalIndexingOperations.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/ExternalIndexingOperations.java
new file mode 100644
index 0000000..249f035
--- /dev/null
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/ExternalIndexingOperations.java
@@ -0,0 +1,621 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.metadata.utils;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.common.config.DatasetConfig.DatasetType;
+import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp;
+import org.apache.asterix.common.config.DatasetConfig.TransactionState;
+import org.apache.asterix.common.context.IStorageComponentProvider;
+import org.apache.asterix.common.transactions.IResourceFactory;
+import org.apache.asterix.external.api.IAdapterFactory;
+import org.apache.asterix.external.indexing.ExternalFile;
+import org.apache.asterix.external.indexing.FilesIndexDescription;
+import org.apache.asterix.external.indexing.IndexingConstants;
+import 
org.apache.asterix.external.operators.ExternalDatasetIndexesAbortOperatorDescriptor;
+import 
org.apache.asterix.external.operators.ExternalDatasetIndexesCommitOperatorDescriptor;
+import 
org.apache.asterix.external.operators.ExternalDatasetIndexesRecoverOperatorDescriptor;
+import 
org.apache.asterix.external.operators.ExternalFilesIndexOperatorDescriptor;
+import org.apache.asterix.external.operators.ExternalScanOperatorDescriptor;
+import org.apache.asterix.external.operators.IndexInfoOperatorDescriptor;
+import org.apache.asterix.external.provider.AdapterFactoryProvider;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.metadata.MetadataManager;
+import org.apache.asterix.metadata.declared.BTreeDataflowHelperFactoryProvider;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.metadata.entities.ExternalDatasetDetails;
+import org.apache.asterix.metadata.entities.Index;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.BuiltinType;
+import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.runtime.utils.RuntimeComponentsProvider;
+import org.apache.asterix.runtime.utils.RuntimeUtils;
+import 
org.apache.asterix.transaction.management.resource.ExternalBTreeLocalResourceMetadataFactory;
+import 
org.apache.asterix.transaction.management.resource.PersistentLocalResourceFactoryProvider;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import 
org.apache.hyracks.algebricks.core.jobgen.impl.ConnectorPolicyAssignmentPolicy;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
+import 
org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+import 
org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor;
+import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
+import 
org.apache.hyracks.storage.am.lsm.common.dataflow.LSMTreeIndexCompactOperatorDescriptor;
+import org.apache.hyracks.storage.common.file.LocalResource;
+
+public class ExternalIndexingOperations {
+    private static final Logger LOGGER = 
Logger.getLogger(ExternalIndexingOperations.class.getName());
+    public static final List<List<String>> FILE_INDEX_FIELD_NAMES =
+            
Collections.unmodifiableList(Collections.singletonList(Collections.singletonList("")));
+    public static final List<IAType> FILE_INDEX_FIELD_TYPES =
+            
Collections.unmodifiableList(Collections.singletonList(BuiltinType.ASTRING));
+
+    private ExternalIndexingOperations() {
+    }
+
+    public static boolean isIndexible(ExternalDatasetDetails ds) {
+        String adapter = ds.getAdapter();
+        if 
(adapter.equalsIgnoreCase(ExternalDataConstants.ALIAS_HDFS_ADAPTER)) {
+            return true;
+        }
+        return false;
+    }
+
+    public static boolean isRefereshActive(ExternalDatasetDetails ds) {
+        return ds.getState() != TransactionState.COMMIT;
+    }
+
+    public static boolean isValidIndexName(String datasetName, String 
indexName) {
+        return 
!datasetName.concat(IndexingConstants.EXTERNAL_FILE_INDEX_NAME_SUFFIX).equals(indexName);
+    }
+
+    public static int getRIDSize(Dataset dataset) {
+        ExternalDatasetDetails dsd = (ExternalDatasetDetails) 
dataset.getDatasetDetails();
+        return 
IndexingConstants.getRIDSize(dsd.getProperties().get(IndexingConstants.KEY_INPUT_FORMAT));
+    }
+
+    public static IBinaryComparatorFactory[] getComparatorFactories(Dataset 
dataset) {
+        ExternalDatasetDetails dsd = (ExternalDatasetDetails) 
dataset.getDatasetDetails();
+        return 
IndexingConstants.getComparatorFactories(dsd.getProperties().get(IndexingConstants.KEY_INPUT_FORMAT));
+    }
+
+    public static IBinaryComparatorFactory[] 
getBuddyBtreeComparatorFactories() {
+        return IndexingConstants.getBuddyBtreeComparatorFactories();
+    }
+
+    public static List<ExternalFile> getSnapshotFromExternalFileSystem(Dataset 
dataset) throws AlgebricksException {
+        ArrayList<ExternalFile> files = new ArrayList<>();
+        ExternalDatasetDetails datasetDetails = (ExternalDatasetDetails) 
dataset.getDatasetDetails();
+        try {
+            // Create the file system object
+            FileSystem fs = 
getFileSystemObject(datasetDetails.getProperties());
+            // Get paths of dataset
+            String path = 
datasetDetails.getProperties().get(ExternalDataConstants.KEY_PATH);
+            String[] paths = path.split(",");
+
+            // Add fileStatuses to files
+            for (String aPath : paths) {
+                FileStatus[] fileStatuses = fs.listStatus(new Path(aPath));
+                for (int i = 0; i < fileStatuses.length; i++) {
+                    int nextFileNumber = files.size();
+                    handleFile(dataset, files, fs, fileStatuses[i], 
nextFileNumber);
+                }
+            }
+            // Close file system
+            fs.close();
+            if (files.isEmpty()) {
+                throw new AlgebricksException("File Snapshot retrieved from 
external file system is empty");
+            }
+            return files;
+        } catch (Exception e) {
+            LOGGER.log(Level.WARNING, "Exception while trying to get snapshot 
from external system", e);
+            throw new AlgebricksException("Unable to get list of HDFS files " 
+ e);
+        }
+    }
+
+    private static void handleFile(Dataset dataset, List<ExternalFile> files, 
FileSystem fs, FileStatus fileStatus,
+            int nextFileNumber) throws IOException {
+        if (fileStatus.isDirectory()) {
+            listSubFiles(dataset, fs, fileStatus, files);
+        } else {
+            files.add(new ExternalFile(dataset.getDataverseName(), 
dataset.getDatasetName(), nextFileNumber,
+                    fileStatus.getPath().toUri().getPath(), new 
Date(fileStatus.getModificationTime()),
+                    fileStatus.getLen(), ExternalFilePendingOp.NO_OP));
+        }
+    }
+
+    /* list all files under the directory
+     * src is expected to be a folder
+     */
+    private static void listSubFiles(Dataset dataset, FileSystem srcFs, 
FileStatus src, List<ExternalFile> files)
+            throws IOException {
+        Path path = src.getPath();
+        FileStatus[] fileStatuses = srcFs.listStatus(path);
+        for (int i = 0; i < fileStatuses.length; i++) {
+            int nextFileNumber = files.size();
+            if (fileStatuses[i].isDirectory()) {
+                listSubFiles(dataset, srcFs, fileStatuses[i], files);
+            } else {
+                files.add(new ExternalFile(dataset.getDataverseName(), 
dataset.getDatasetName(), nextFileNumber,
+                        fileStatuses[i].getPath().toUri().getPath(), new 
Date(fileStatuses[i].getModificationTime()),
+                        fileStatuses[i].getLen(), 
ExternalFilePendingOp.NO_OP));
+            }
+        }
+    }
+
+    public static FileSystem getFileSystemObject(Map<String, String> map) 
throws IOException {
+        Configuration conf = new Configuration();
+        conf.set(ExternalDataConstants.KEY_HADOOP_FILESYSTEM_URI, 
map.get(ExternalDataConstants.KEY_HDFS_URL).trim());
+        conf.set(ExternalDataConstants.KEY_HADOOP_FILESYSTEM_CLASS, 
DistributedFileSystem.class.getName());
+        return FileSystem.get(conf);
+    }
+
+    public static JobSpecification buildFilesIndexReplicationJobSpec(Dataset 
dataset,
+            List<ExternalFile> externalFilesSnapshot, MetadataProvider 
metadataProvider, boolean createIndex)
+            throws AlgebricksException {
+        IStorageComponentProvider storageComponentProvider = 
metadataProvider.getStorageComponentProvider();
+        JobSpecification spec = RuntimeUtils.createJobSpecification();
+        Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
+                DatasetUtil.getMergePolicyFactory(dataset, 
metadataProvider.getMetadataTxnContext());
+        ILSMMergePolicyFactory mergePolicyFactory = compactionInfo.first;
+        Map<String, String> mergePolicyFactoryProperties = 
compactionInfo.second;
+        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> 
secondarySplitsAndConstraint =
+                
metadataProvider.splitProviderAndPartitionConstraintsForFilesIndex(dataset.getDataverseName(),
+                        dataset.getDatasetName(), 
IndexingConstants.getFilesIndexName(dataset.getDatasetName()), true);
+        IFileSplitProvider secondaryFileSplitProvider = 
secondarySplitsAndConstraint.first;
+        FilesIndexDescription filesIndexDescription = new 
FilesIndexDescription();
+        String fileIndexName = 
BTreeDataflowHelperFactoryProvider.externalFileIndexName(dataset);
+        Index fileIndex = 
MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(),
+                dataset.getDataverseName(), dataset.getDatasetName(), 
fileIndexName);
+        IResourceFactory localResourceMetadata = new 
ExternalBTreeLocalResourceMetadataFactory(
+                filesIndexDescription.EXTERNAL_FILE_INDEX_TYPE_TRAITS,
+                FilesIndexDescription.FILES_INDEX_COMP_FACTORIES, new int[] { 
0 }, false, dataset.getDatasetId(),
+                mergePolicyFactory, mergePolicyFactoryProperties, 
dataset.getIndexOperationTrackerFactory(fileIndex),
+                dataset.getIoOperationCallbackFactory(fileIndex),
+                storageComponentProvider.getMetadataPageManagerFactory());
+        PersistentLocalResourceFactoryProvider localResourceFactoryProvider =
+                new 
PersistentLocalResourceFactoryProvider(localResourceMetadata, 
LocalResource.ExternalBTreeResource);
+        IIndexDataflowHelperFactory dataflowHelperFactory = 
dataset.getIndexDataflowHelperFactory(metadataProvider,
+                fileIndex, null, null, mergePolicyFactory, 
mergePolicyFactoryProperties);
+        ExternalFilesIndexOperatorDescriptor externalFilesOp =
+                new ExternalFilesIndexOperatorDescriptor(spec, 
storageComponentProvider.getStorageManager(),
+                        
storageComponentProvider.getIndexLifecycleManagerProvider(), 
secondaryFileSplitProvider,
+                        dataflowHelperFactory, localResourceFactoryProvider, 
externalFilesSnapshot, createIndex,
+                        
storageComponentProvider.getMetadataPageManagerFactory());
+        
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, 
externalFilesOp,
+                secondarySplitsAndConstraint.second);
+        spec.addRoot(externalFilesOp);
+        spec.setConnectorPolicyAssignmentPolicy(new 
ConnectorPolicyAssignmentPolicy());
+        return spec;
+    }
+
+    /**
+     * This method create an indexing operator that index records in HDFS
+     *
+     * @param jobSpec
+     * @param itemType
+     * @param dataset
+     * @param files
+     * @param indexerDesc
+     * @return
+     * @throws AlgebricksException
+     * @throws HyracksDataException
+     * @throws Exception
+     */
+    private static Pair<ExternalScanOperatorDescriptor, 
AlgebricksPartitionConstraint> getIndexingOperator(
+            MetadataProvider metadataProvider, JobSpecification jobSpec, 
IAType itemType, Dataset dataset,
+            List<ExternalFile> files, RecordDescriptor indexerDesc) throws 
HyracksDataException, AlgebricksException {
+        ExternalDatasetDetails externalDatasetDetails = 
(ExternalDatasetDetails) dataset.getDatasetDetails();
+        Map<String, String> configuration = 
externalDatasetDetails.getProperties();
+        IAdapterFactory adapterFactory =
+                
AdapterFactoryProvider.getIndexingAdapterFactory(metadataProvider.getLibraryManager(),
+                        externalDatasetDetails.getAdapter(), configuration, 
(ARecordType) itemType, files, true, null);
+        return new Pair<>(new ExternalScanOperatorDescriptor(jobSpec, 
indexerDesc, adapterFactory),
+                adapterFactory.getPartitionConstraint());
+    }
+
+    public static Pair<ExternalScanOperatorDescriptor, 
AlgebricksPartitionConstraint> createExternalIndexingOp(
+            JobSpecification spec, MetadataProvider metadataProvider, Dataset 
dataset, ARecordType itemType,
+            RecordDescriptor indexerDesc, List<ExternalFile> files) throws 
HyracksDataException, AlgebricksException {
+        return getIndexingOperator(metadataProvider, spec, itemType, dataset,
+                files == null ? MetadataManager.INSTANCE
+                        
.getDatasetExternalFiles(metadataProvider.getMetadataTxnContext(), dataset) : 
files,
+                indexerDesc);
+    }
+
+    /**
+     * At the end of this method, we expect to have 4 sets as follows:
+     * metadataFiles should contain only the files that are appended in their 
original state
+     * addedFiles should contain new files that has number assigned starting 
after the max original file number
+     * deletedFiles should contain files that are no longer there in the file 
system
+     * appendedFiles should have the new file information of existing files
+     * The method should return false in case of zero delta
+     *
+     * @param dataset
+     * @param metadataFiles
+     * @param addedFiles
+     * @param deletedFiles
+     * @param appendedFiles
+     * @return
+     * @throws AlgebricksException
+     */
+    public static boolean isDatasetUptodate(Dataset dataset, 
List<ExternalFile> metadataFiles,
+            List<ExternalFile> addedFiles, List<ExternalFile> deletedFiles, 
List<ExternalFile> appendedFiles)
+            throws AlgebricksException {
+        boolean uptodate = true;
+        int newFileNumber = metadataFiles.get(metadataFiles.size() - 
1).getFileNumber() + 1;
+
+        List<ExternalFile> fileSystemFiles = 
getSnapshotFromExternalFileSystem(dataset);
+
+        // Loop over file system files < taking care of added files >
+        for (ExternalFile fileSystemFile : fileSystemFiles) {
+            boolean fileFound = false;
+            Iterator<ExternalFile> mdFilesIterator = metadataFiles.iterator();
+            while (mdFilesIterator.hasNext()) {
+                ExternalFile metadataFile = mdFilesIterator.next();
+                if 
(!fileSystemFile.getFileName().equals(metadataFile.getFileName())) {
+                    continue;
+                }
+                // Same file name
+                if 
(fileSystemFile.getLastModefiedTime().equals(metadataFile.getLastModefiedTime()))
 {
+                    // Same timestamp
+                    if (fileSystemFile.getSize() == metadataFile.getSize()) {
+                        // Same size -> no op
+                        mdFilesIterator.remove();
+                        fileFound = true;
+                    } else {
+                        // Different size -> append op
+                        
metadataFile.setPendingOp(ExternalFilePendingOp.APPEND_OP);
+                        
fileSystemFile.setPendingOp(ExternalFilePendingOp.APPEND_OP);
+                        appendedFiles.add(fileSystemFile);
+                        fileFound = true;
+                        uptodate = false;
+                    }
+                } else {
+                    // Same file name, Different file mod date -> delete and 
add
+                    metadataFile.setPendingOp(ExternalFilePendingOp.DROP_OP);
+                    deletedFiles.add(new 
ExternalFile(metadataFile.getDataverseName(), metadataFile.getDatasetName(),
+                            0, metadataFile.getFileName(), 
metadataFile.getLastModefiedTime(), metadataFile.getSize(),
+                            ExternalFilePendingOp.DROP_OP));
+                    fileSystemFile.setPendingOp(ExternalFilePendingOp.ADD_OP);
+                    fileSystemFile.setFileNumber(newFileNumber);
+                    addedFiles.add(fileSystemFile);
+                    newFileNumber++;
+                    fileFound = true;
+                    uptodate = false;
+                }
+                if (fileFound) {
+                    break;
+                }
+            }
+            if (!fileFound) {
+                // File not stored previously in metadata -> pending add op
+                fileSystemFile.setPendingOp(ExternalFilePendingOp.ADD_OP);
+                fileSystemFile.setFileNumber(newFileNumber);
+                addedFiles.add(fileSystemFile);
+                newFileNumber++;
+                uptodate = false;
+            }
+        }
+
+        // Done with files from external file system -> metadata files now 
contain both deleted files and appended ones
+        // first, correct number assignment to deleted and updated files
+        for (ExternalFile deletedFile : deletedFiles) {
+            deletedFile.setFileNumber(newFileNumber);
+            newFileNumber++;
+        }
+        for (ExternalFile appendedFile : appendedFiles) {
+            appendedFile.setFileNumber(newFileNumber);
+            newFileNumber++;
+        }
+
+        // include the remaining deleted files
+        Iterator<ExternalFile> mdFilesIterator = metadataFiles.iterator();
+        while (mdFilesIterator.hasNext()) {
+            ExternalFile metadataFile = mdFilesIterator.next();
+            if (metadataFile.getPendingOp() == ExternalFilePendingOp.NO_OP) {
+                metadataFile.setPendingOp(ExternalFilePendingOp.DROP_OP);
+                deletedFiles.add(new 
ExternalFile(metadataFile.getDataverseName(), metadataFile.getDatasetName(),
+                        newFileNumber, metadataFile.getFileName(), 
metadataFile.getLastModefiedTime(),
+                        metadataFile.getSize(), metadataFile.getPendingOp()));
+                newFileNumber++;
+                uptodate = false;
+            }
+        }
+        return uptodate;
+    }
+
+    public static Dataset createTransactionDataset(Dataset dataset) {
+        ExternalDatasetDetails originalDsd = (ExternalDatasetDetails) 
dataset.getDatasetDetails();
+        ExternalDatasetDetails dsd = new 
ExternalDatasetDetails(originalDsd.getAdapter(), originalDsd.getProperties(),
+                originalDsd.getTimestamp(), TransactionState.BEGIN);
+        return new Dataset(dataset.getDataverseName(), 
dataset.getDatasetName(), dataset.getItemTypeDataverseName(),
+                dataset.getItemTypeName(), dataset.getNodeGroupName(), 
dataset.getCompactionPolicy(),
+                dataset.getCompactionPolicyProperties(), dsd, 
dataset.getHints(), DatasetType.EXTERNAL,
+                dataset.getDatasetId(), dataset.getPendingOp());
+    }
+
+    public static JobSpecification buildDropFilesIndexJobSpec(MetadataProvider 
metadataProvider, Dataset dataset)
+            throws AlgebricksException {
+        String indexName = 
IndexingConstants.getFilesIndexName(dataset.getDatasetName());
+        JobSpecification spec = RuntimeUtils.createJobSpecification();
+        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> 
splitsAndConstraint =
+                
metadataProvider.splitProviderAndPartitionConstraintsForFilesIndex(dataset.getDataverseName(),
+                        dataset.getDatasetName(), indexName, true);
+        Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
+                DatasetUtil.getMergePolicyFactory(dataset, 
metadataProvider.getMetadataTxnContext());
+        String fileIndexName = 
BTreeDataflowHelperFactoryProvider.externalFileIndexName(dataset);
+        Index fileIndex = 
MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(),
+                dataset.getDataverseName(), dataset.getDatasetName(), 
fileIndexName);
+        IIndexDataflowHelperFactory dataflowHelperFactory = 
dataset.getIndexDataflowHelperFactory(metadataProvider,
+                fileIndex, null, null, compactionInfo.first, 
compactionInfo.second);
+        IndexDropOperatorDescriptor btreeDrop = new 
IndexDropOperatorDescriptor(spec,
+                
metadataProvider.getStorageComponentProvider().getStorageManager(),
+                
metadataProvider.getStorageComponentProvider().getIndexLifecycleManagerProvider(),
+                splitsAndConstraint.first, dataflowHelperFactory,
+                
metadataProvider.getStorageComponentProvider().getMetadataPageManagerFactory());
+        
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, 
btreeDrop,
+                splitsAndConstraint.second);
+        spec.addRoot(btreeDrop);
+
+        return spec;
+    }
+
+    public static JobSpecification buildFilesIndexUpdateOp(Dataset ds, 
List<ExternalFile> metadataFiles,
+            List<ExternalFile> addedFiles, List<ExternalFile> appendedFiles, 
MetadataProvider metadataProvider)
+            throws AlgebricksException {
+        ArrayList<ExternalFile> files = new ArrayList<>();
+        for (ExternalFile file : metadataFiles) {
+            if (file.getPendingOp() == ExternalFilePendingOp.DROP_OP) {
+                files.add(file);
+            } else if (file.getPendingOp() == ExternalFilePendingOp.APPEND_OP) 
{
+                for (ExternalFile appendedFile : appendedFiles) {
+                    if (appendedFile.getFileName().equals(file.getFileName())) 
{
+                        files.add(new ExternalFile(file.getDataverseName(), 
file.getDatasetName(),
+                                file.getFileNumber(), file.getFileName(), 
file.getLastModefiedTime(),
+                                appendedFile.getSize(), 
ExternalFilePendingOp.NO_OP));
+                    }
+                }
+            }
+        }
+        for (ExternalFile file : addedFiles) {
+            files.add(file);
+        }
+        Collections.sort(files);
+        return buildFilesIndexReplicationJobSpec(ds, files, metadataProvider, 
false);
+    }
+
+    public static JobSpecification buildIndexUpdateOp(Dataset ds, Index index, 
List<ExternalFile> metadataFiles,
+            List<ExternalFile> addedFiles, List<ExternalFile> appendedFiles, 
MetadataProvider metadataProvider)
+            throws AlgebricksException {
+        // Create files list
+        ArrayList<ExternalFile> files = new ArrayList<>();
+
+        for (ExternalFile metadataFile : metadataFiles) {
+            if (metadataFile.getPendingOp() != 
ExternalFilePendingOp.APPEND_OP) {
+                files.add(metadataFile);
+            } else {
+                metadataFile.setPendingOp(ExternalFilePendingOp.NO_OP);
+                files.add(metadataFile);
+            }
+        }
+        // add new files
+        for (ExternalFile file : addedFiles) {
+            files.add(file);
+        }
+        // add appended files
+        for (ExternalFile file : appendedFiles) {
+            files.add(file);
+        }
+        return IndexUtil.buildSecondaryIndexLoadingJobSpec(ds, index, null, 
null, null, null, metadataProvider, files);
+    }
+
+    public static JobSpecification buildCommitJob(Dataset ds, List<Index> 
indexes, MetadataProvider metadataProvider)
+            throws AlgebricksException {
+        JobSpecification spec = RuntimeUtils.createJobSpecification();
+        Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
+                DatasetUtil.getMergePolicyFactory(ds, 
metadataProvider.getMetadataTxnContext());
+        boolean temp = ds.getDatasetDetails().isTemp();
+        ILSMMergePolicyFactory mergePolicyFactory = compactionInfo.first;
+        Map<String, String> mergePolicyFactoryProperties = 
compactionInfo.second;
+        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> 
filesIndexSplitsAndConstraint =
+                
metadataProvider.getSplitProviderAndConstraints(ds.getDataverseName(), 
ds.getDatasetName(),
+                        
IndexingConstants.getFilesIndexName(ds.getDatasetName()), temp);
+        IFileSplitProvider filesIndexSplitProvider = 
filesIndexSplitsAndConstraint.first;
+        String fileIndexName = 
BTreeDataflowHelperFactoryProvider.externalFileIndexName(ds);
+        Index fileIndex = 
MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(),
+                ds.getDataverseName(), ds.getDatasetName(), fileIndexName);
+        IIndexDataflowHelperFactory filesIndexDataflowHelperFactory = 
ds.getIndexDataflowHelperFactory(
+                metadataProvider, fileIndex, null, null, mergePolicyFactory, 
mergePolicyFactoryProperties);
+        IndexInfoOperatorDescriptor filesIndexInfo = new 
IndexInfoOperatorDescriptor(filesIndexSplitProvider,
+                RuntimeComponentsProvider.RUNTIME_PROVIDER, 
RuntimeComponentsProvider.RUNTIME_PROVIDER);
+
+        ArrayList<IIndexDataflowHelperFactory> treeDataflowHelperFactories = 
new ArrayList<>();
+        ArrayList<IndexInfoOperatorDescriptor> treeInfos = new ArrayList<>();
+        for (Index index : indexes) {
+            if (isValidIndexName(index.getDatasetName(), 
index.getIndexName())) {
+                Pair<IFileSplitProvider, AlgebricksPartitionConstraint> 
indexSplitsAndConstraint =
+                        
metadataProvider.getSplitProviderAndConstraints(ds.getDataverseName(), 
ds.getDatasetName(),
+                                index.getIndexName(), temp);
+                IIndexDataflowHelperFactory indexDataflowHelperFactory = 
ds.getIndexDataflowHelperFactory(
+                        metadataProvider, index, null, null, 
mergePolicyFactory, mergePolicyFactoryProperties);
+                treeDataflowHelperFactories.add(indexDataflowHelperFactory);
+                treeInfos.add(new 
IndexInfoOperatorDescriptor(indexSplitsAndConstraint.first,
+                        RuntimeComponentsProvider.RUNTIME_PROVIDER, 
RuntimeComponentsProvider.RUNTIME_PROVIDER));
+            }
+        }
+
+        ExternalDatasetIndexesCommitOperatorDescriptor op = new 
ExternalDatasetIndexesCommitOperatorDescriptor(spec,
+                filesIndexDataflowHelperFactory, filesIndexInfo, 
treeDataflowHelperFactories, treeInfos);
+
+        spec.addRoot(op);
+        
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, op,
+                filesIndexSplitsAndConstraint.second);
+        spec.setConnectorPolicyAssignmentPolicy(new 
ConnectorPolicyAssignmentPolicy());
+        return spec;
+    }
+
+    public static JobSpecification buildAbortOp(Dataset ds, List<Index> 
indexes, MetadataProvider metadataProvider)
+            throws AlgebricksException {
+        JobSpecification spec = RuntimeUtils.createJobSpecification();
+        Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
+                DatasetUtil.getMergePolicyFactory(ds, 
metadataProvider.getMetadataTxnContext());
+        ILSMMergePolicyFactory mergePolicyFactory = compactionInfo.first;
+        Map<String, String> mergePolicyFactoryProperties = 
compactionInfo.second;
+
+        boolean temp = ds.getDatasetDetails().isTemp();
+        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> 
filesIndexSplitsAndConstraint =
+                
metadataProvider.getSplitProviderAndConstraints(ds.getDataverseName(), 
ds.getDatasetName(),
+                        
IndexingConstants.getFilesIndexName(ds.getDatasetName()), temp);
+        IFileSplitProvider filesIndexSplitProvider = 
filesIndexSplitsAndConstraint.first;
+        String fileIndexName = 
BTreeDataflowHelperFactoryProvider.externalFileIndexName(ds);
+        Index fileIndex = 
MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(),
+                ds.getDataverseName(), ds.getDatasetName(), fileIndexName);
+        IIndexDataflowHelperFactory filesIndexDataflowHelperFactory = 
ds.getIndexDataflowHelperFactory(
+                metadataProvider, fileIndex, null, null, mergePolicyFactory, 
mergePolicyFactoryProperties);
+        IndexInfoOperatorDescriptor filesIndexInfo = new 
IndexInfoOperatorDescriptor(filesIndexSplitProvider,
+                RuntimeComponentsProvider.RUNTIME_PROVIDER, 
RuntimeComponentsProvider.RUNTIME_PROVIDER);
+
+        ArrayList<IIndexDataflowHelperFactory> treeDataflowHelperFactories = 
new ArrayList<>();
+        ArrayList<IndexInfoOperatorDescriptor> treeInfos = new ArrayList<>();
+        for (Index index : indexes) {
+            if (isValidIndexName(index.getDatasetName(), 
index.getIndexName())) {
+                Pair<IFileSplitProvider, AlgebricksPartitionConstraint> 
indexSplitsAndConstraint =
+                        
metadataProvider.getSplitProviderAndConstraints(ds.getDataverseName(), 
ds.getDatasetName(),
+                                index.getIndexName(), temp);
+                IIndexDataflowHelperFactory indexDataflowHelperFactory = 
ds.getIndexDataflowHelperFactory(
+                        metadataProvider, index, null, null, 
mergePolicyFactory, mergePolicyFactoryProperties);
+                treeDataflowHelperFactories.add(indexDataflowHelperFactory);
+                treeInfos.add(new 
IndexInfoOperatorDescriptor(indexSplitsAndConstraint.first,
+                        RuntimeComponentsProvider.RUNTIME_PROVIDER, 
RuntimeComponentsProvider.RUNTIME_PROVIDER));
+            }
+        }
+
+        ExternalDatasetIndexesAbortOperatorDescriptor op = new 
ExternalDatasetIndexesAbortOperatorDescriptor(spec,
+                filesIndexDataflowHelperFactory, filesIndexInfo, 
treeDataflowHelperFactories, treeInfos);
+
+        spec.addRoot(op);
+        
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, op,
+                filesIndexSplitsAndConstraint.second);
+        spec.setConnectorPolicyAssignmentPolicy(new 
ConnectorPolicyAssignmentPolicy());
+        return spec;
+
+    }
+
+    public static JobSpecification buildRecoverOp(Dataset ds, List<Index> 
indexes, MetadataProvider metadataProvider)
+            throws AlgebricksException {
+        JobSpecification spec = RuntimeUtils.createJobSpecification();
+        Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
+                DatasetUtil.getMergePolicyFactory(ds, 
metadataProvider.getMetadataTxnContext());
+        ILSMMergePolicyFactory mergePolicyFactory = compactionInfo.first;
+        Map<String, String> mergePolicyFactoryProperties = 
compactionInfo.second;
+        boolean temp = ds.getDatasetDetails().isTemp();
+
+        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> 
filesIndexSplitsAndConstraint =
+                
metadataProvider.getSplitProviderAndConstraints(ds.getDataverseName(), 
ds.getDatasetName(),
+                        
IndexingConstants.getFilesIndexName(ds.getDatasetName()), temp);
+        IFileSplitProvider filesIndexSplitProvider = 
filesIndexSplitsAndConstraint.first;
+        String fileIndexName = 
BTreeDataflowHelperFactoryProvider.externalFileIndexName(ds);
+        Index fileIndex = 
MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(),
+                ds.getDataverseName(), ds.getDatasetName(), fileIndexName);
+        IIndexDataflowHelperFactory filesIndexDataflowHelperFactory = 
ds.getIndexDataflowHelperFactory(
+                metadataProvider, fileIndex, null, null, mergePolicyFactory, 
mergePolicyFactoryProperties);
+        IndexInfoOperatorDescriptor filesIndexInfo = new 
IndexInfoOperatorDescriptor(filesIndexSplitProvider,
+                RuntimeComponentsProvider.RUNTIME_PROVIDER, 
RuntimeComponentsProvider.RUNTIME_PROVIDER);
+
+        ArrayList<IIndexDataflowHelperFactory> treeDataflowHelperFactories = 
new ArrayList<>();
+        ArrayList<IndexInfoOperatorDescriptor> treeInfos = new ArrayList<>();
+        for (Index index : indexes) {
+            if (isValidIndexName(index.getDatasetName(), 
index.getIndexName())) {
+                Pair<IFileSplitProvider, AlgebricksPartitionConstraint> 
indexSplitsAndConstraint =
+                        
metadataProvider.getSplitProviderAndConstraints(ds.getDataverseName(), 
ds.getDatasetName(),
+                                index.getIndexName(), temp);
+                IIndexDataflowHelperFactory indexDataflowHelperFactory = 
ds.getIndexDataflowHelperFactory(
+                        metadataProvider, index, null, null, 
mergePolicyFactory, mergePolicyFactoryProperties);
+                treeDataflowHelperFactories.add(indexDataflowHelperFactory);
+                treeInfos.add(new 
IndexInfoOperatorDescriptor(indexSplitsAndConstraint.first,
+                        RuntimeComponentsProvider.RUNTIME_PROVIDER, 
RuntimeComponentsProvider.RUNTIME_PROVIDER));
+            }
+        }
+
+        ExternalDatasetIndexesRecoverOperatorDescriptor op = new 
ExternalDatasetIndexesRecoverOperatorDescriptor(spec,
+                filesIndexDataflowHelperFactory, filesIndexInfo, 
treeDataflowHelperFactories, treeInfos);
+
+        spec.addRoot(op);
+        
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, op,
+                filesIndexSplitsAndConstraint.second);
+        spec.setConnectorPolicyAssignmentPolicy(new 
ConnectorPolicyAssignmentPolicy());
+        return spec;
+    }
+
+    public static JobSpecification compactFilesIndexJobSpec(Dataset dataset, 
MetadataProvider metadataProvider,
+            IStorageComponentProvider storageComponentProvider) throws 
AlgebricksException {
+        JobSpecification spec = RuntimeUtils.createJobSpecification();
+        Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
+                DatasetUtil.getMergePolicyFactory(dataset, 
metadataProvider.getMetadataTxnContext());
+        ILSMMergePolicyFactory mergePolicyFactory = compactionInfo.first;
+        Map<String, String> mergePolicyFactoryProperties = 
compactionInfo.second;
+        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> 
secondarySplitsAndConstraint =
+                
metadataProvider.splitProviderAndPartitionConstraintsForFilesIndex(dataset.getDataverseName(),
+                        dataset.getDatasetName(), 
IndexingConstants.getFilesIndexName(dataset.getDatasetName()), true);
+        IFileSplitProvider secondaryFileSplitProvider = 
secondarySplitsAndConstraint.first;
+
+        String fileIndexName = 
BTreeDataflowHelperFactoryProvider.externalFileIndexName(dataset);
+        Index fileIndex = 
MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(),
+                dataset.getDataverseName(), dataset.getDatasetName(), 
fileIndexName);
+        IIndexDataflowHelperFactory dataflowHelperFactory = 
dataset.getIndexDataflowHelperFactory(metadataProvider,
+                fileIndex, null, null, mergePolicyFactory, 
mergePolicyFactoryProperties);
+        FilesIndexDescription filesIndexDescription = new 
FilesIndexDescription();
+        LSMTreeIndexCompactOperatorDescriptor compactOp = new 
LSMTreeIndexCompactOperatorDescriptor(spec,
+                RuntimeComponentsProvider.RUNTIME_PROVIDER, 
RuntimeComponentsProvider.RUNTIME_PROVIDER,
+                secondaryFileSplitProvider, 
filesIndexDescription.EXTERNAL_FILE_INDEX_TYPE_TRAITS,
+                FilesIndexDescription.FILES_INDEX_COMP_FACTORIES, new int[] { 
0 }, dataflowHelperFactory,
+                NoOpOperationCallbackFactory.INSTANCE, 
storageComponentProvider.getMetadataPageManagerFactory());
+        spec.addRoot(compactOp);
+        
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, 
compactOp,
+                secondarySplitsAndConstraint.second);
+        spec.setConnectorPolicyAssignmentPolicy(new 
ConnectorPolicyAssignmentPolicy());
+        return spec;
+    }
+
+    public static boolean isFileIndex(Index index) {
+        return 
index.getIndexName().equals(IndexingConstants.getFilesIndexName(index.getDatasetName()));
+    }
+}

Reply via email to