http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CompilerExtensionManager.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CompilerExtensionManager.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CompilerExtensionManager.java
deleted file mode 100644
index 95ce100..0000000
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CompilerExtensionManager.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.app.cc;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.asterix.algebra.base.ILangExtension;
-import org.apache.asterix.algebra.base.ILangExtension.Language;
-import org.apache.asterix.algebra.extension.IAlgebraExtensionManager;
-import org.apache.asterix.app.translator.DefaultStatementExecutorFactory;
-import org.apache.asterix.common.api.ExtensionId;
-import org.apache.asterix.common.api.IExtension;
-import org.apache.asterix.common.config.AsterixExtension;
-import org.apache.asterix.common.exceptions.ErrorCode;
-import org.apache.asterix.common.exceptions.RuntimeDataException;
-import org.apache.asterix.compiler.provider.AqlCompilationProvider;
-import org.apache.asterix.compiler.provider.ILangCompilationProvider;
-import org.apache.asterix.compiler.provider.SqlppCompilationProvider;
-import org.apache.asterix.translator.IStatementExecutorFactory;
-import org.apache.hyracks.algebricks.common.utils.Pair;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-/**
- * AsterixDB's implementation of {@code IAlgebraExtensionManager} which takes 
care of
- * initializing extensions for App and Compilation purposes
- */
-public class CompilerExtensionManager implements IAlgebraExtensionManager {
-
-    private final Map<ExtensionId, IExtension> extensions = new HashMap<>();
-
-    private final IStatementExecutorExtension statementExecutorExtension;
-    private final ILangCompilationProvider aqlCompilationProvider;
-    private final ILangCompilationProvider sqlppCompilationProvider;
-    private final DefaultStatementExecutorFactory 
defaultQueryTranslatorFactory;
-
-    /**
-     * Initialize {@code CompilerExtensionManager} from configuration
-     *
-     * @param list
-     * @throws InstantiationException
-     * @throws IllegalAccessException
-     * @throws ClassNotFoundException
-     * @throws HyracksDataException
-     */
-    public CompilerExtensionManager(List<AsterixExtension> list)
-            throws InstantiationException, IllegalAccessException, 
ClassNotFoundException, HyracksDataException {
-        Pair<ExtensionId, ILangCompilationProvider> aqlcp = null;
-        Pair<ExtensionId, ILangCompilationProvider> sqlppcp = null;
-        IStatementExecutorExtension see = null;
-        defaultQueryTranslatorFactory = new DefaultStatementExecutorFactory();
-
-        if (list != null) {
-            for (AsterixExtension extensionConf : list) {
-                IExtension extension = (IExtension) 
Class.forName(extensionConf.getClassName()).newInstance();
-                extension.configure(extensionConf.getArgs());
-                if (extensions.containsKey(extension.getId())) {
-                    throw new 
RuntimeDataException(ErrorCode.EXTENSION_ID_CONFLICT, extension.getId());
-                }
-                extensions.put(extension.getId(), extension);
-                switch (extension.getExtensionKind()) {
-                    case STATEMENT_EXECUTOR:
-                        see = extendStatementExecutor(see, 
(IStatementExecutorExtension) extension);
-                        break;
-                    case LANG:
-                        ILangExtension le = (ILangExtension) extension;
-                        aqlcp = extendLangCompilationProvider(Language.AQL, 
aqlcp, le);
-                        sqlppcp = 
extendLangCompilationProvider(Language.SQLPP, sqlppcp, le);
-                        break;
-                    default:
-                        break;
-                }
-            }
-        }
-        this.statementExecutorExtension = see;
-        this.aqlCompilationProvider = aqlcp == null ? new 
AqlCompilationProvider() : aqlcp.second;
-        this.sqlppCompilationProvider = sqlppcp == null ? new 
SqlppCompilationProvider() : sqlppcp.second;
-    }
-
-    private Pair<ExtensionId, ILangCompilationProvider> 
extendLangCompilationProvider(Language lang,
-            Pair<ExtensionId, ILangCompilationProvider> cp, ILangExtension le) 
throws HyracksDataException {
-        if (cp != null && le.getLangCompilationProvider(lang) != null) {
-            throw new 
RuntimeDataException(ErrorCode.EXTENSION_COMPONENT_CONFLICT, le.getId(), 
cp.first,
-                    lang.toString());
-        }
-        return (le.getLangCompilationProvider(lang) != null)
-                ? new Pair<>(le.getId(), le.getLangCompilationProvider(lang)) 
: cp;
-    }
-
-    private IStatementExecutorExtension 
extendStatementExecutor(IStatementExecutorExtension qte,
-            IStatementExecutorExtension extension) throws HyracksDataException 
{
-        if (qte != null) {
-            throw new 
RuntimeDataException(ErrorCode.EXTENSION_COMPONENT_CONFLICT, qte.getId(), 
extension.getId(),
-                    IStatementExecutorFactory.class.getSimpleName());
-        }
-        return extension;
-    }
-
-    public IStatementExecutorFactory getQueryTranslatorFactory() {
-        return statementExecutorExtension == null ? 
defaultQueryTranslatorFactory
-                : statementExecutorExtension.getQueryTranslatorFactory();
-    }
-
-    public ILangCompilationProvider getAqlCompilationProvider() {
-        return aqlCompilationProvider;
-    }
-
-    public ILangCompilationProvider getSqlppCompilationProvider() {
-        return sqlppCompilationProvider;
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/IStatementExecutorExtension.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/IStatementExecutorExtension.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/IStatementExecutorExtension.java
index a4b2345..f7b6842 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/IStatementExecutorExtension.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/IStatementExecutorExtension.java
@@ -25,6 +25,7 @@ import 
org.apache.asterix.translator.IStatementExecutorFactory;
  * An interface for extensions of {@code IStatementExecutor}
  */
 public interface IStatementExecutorExtension extends IExtension {
+
     @Override
     default ExtensionKind getExtensionKind() {
         return ExtensionKind.STATEMENT_EXECUTOR;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/ResourceIdManager.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/ResourceIdManager.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/ResourceIdManager.java
index 7b01169..f43092b 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/ResourceIdManager.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/ResourceIdManager.java
@@ -23,7 +23,7 @@ import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.asterix.common.transactions.IResourceIdManager;
-import org.apache.asterix.runtime.util.ClusterStateManager;
+import org.apache.asterix.runtime.utils.ClusterStateManager;
 
 public class ResourceIdManager implements IResourceIdManager {
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalIndexingOperations.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalIndexingOperations.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalIndexingOperations.java
deleted file mode 100644
index a2518ec..0000000
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalIndexingOperations.java
+++ /dev/null
@@ -1,771 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.app.external;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Date;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.asterix.common.config.DatasetConfig.DatasetType;
-import 
org.apache.asterix.common.config.DatasetConfig.ExternalDatasetTransactionState;
-import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp;
-import org.apache.asterix.common.config.DatasetConfig.IndexType;
-import org.apache.asterix.common.config.IPropertiesProvider;
-import org.apache.asterix.common.config.StorageProperties;
-import org.apache.asterix.common.context.AsterixVirtualBufferCacheProvider;
-import org.apache.asterix.common.dataflow.LSMIndexUtil;
-import org.apache.asterix.common.exceptions.AsterixException;
-import 
org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory;
-import 
org.apache.asterix.common.ioopcallbacks.LSMBTreeWithBuddyIOOperationCallbackFactory;
-import 
org.apache.asterix.common.ioopcallbacks.LSMRTreeIOOperationCallbackFactory;
-import org.apache.asterix.common.transactions.IResourceFactory;
-import 
org.apache.asterix.dataflow.data.nontagged.valueproviders.PrimitiveValueProviderFactory;
-import org.apache.asterix.external.api.IAdapterFactory;
-import org.apache.asterix.external.indexing.ExternalFile;
-import org.apache.asterix.external.indexing.FilesIndexDescription;
-import org.apache.asterix.external.indexing.IndexingConstants;
-import 
org.apache.asterix.external.operators.ExternalDataScanOperatorDescriptor;
-import 
org.apache.asterix.external.operators.ExternalDatasetIndexesAbortOperatorDescriptor;
-import 
org.apache.asterix.external.operators.ExternalDatasetIndexesCommitOperatorDescriptor;
-import 
org.apache.asterix.external.operators.ExternalDatasetIndexesRecoverOperatorDescriptor;
-import 
org.apache.asterix.external.operators.ExternalFilesIndexOperatorDescriptor;
-import org.apache.asterix.external.operators.IndexInfoOperatorDescriptor;
-import org.apache.asterix.external.provider.AdapterFactoryProvider;
-import org.apache.asterix.external.util.ExternalDataConstants;
-import org.apache.asterix.file.IndexOperations;
-import org.apache.asterix.file.JobSpecificationUtils;
-import org.apache.asterix.formats.nontagged.BinaryComparatorFactoryProvider;
-import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
-import org.apache.asterix.formats.nontagged.TypeTraitProvider;
-import org.apache.asterix.metadata.MetadataException;
-import org.apache.asterix.metadata.MetadataManager;
-import org.apache.asterix.metadata.declared.MetadataProvider;
-import org.apache.asterix.metadata.entities.Dataset;
-import org.apache.asterix.metadata.entities.ExternalDatasetDetails;
-import org.apache.asterix.metadata.entities.Index;
-import org.apache.asterix.metadata.utils.DatasetUtils;
-import org.apache.asterix.metadata.utils.ExternalDatasetsRegistry;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.asterix.om.types.ATypeTag;
-import org.apache.asterix.om.types.BuiltinType;
-import org.apache.asterix.om.types.IAType;
-import org.apache.asterix.om.util.NonTaggedFormatUtil;
-import org.apache.asterix.runtime.util.AppContextInfo;
-import org.apache.asterix.runtime.util.RuntimeComponentsProvider;
-import 
org.apache.asterix.transaction.management.opcallbacks.SecondaryIndexOperationTrackerProvider;
-import 
org.apache.asterix.transaction.management.resource.ExternalBTreeLocalResourceMetadataFactory;
-import 
org.apache.asterix.transaction.management.resource.PersistentLocalResourceFactoryProvider;
-import 
org.apache.asterix.translator.CompiledStatements.CompiledCreateIndexStatement;
-import 
org.apache.asterix.translator.CompiledStatements.CompiledIndexDropStatement;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
-import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
-import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.common.utils.Pair;
-import 
org.apache.hyracks.algebricks.core.jobgen.impl.ConnectorPolicyAssignmentPolicy;
-import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
-import org.apache.hyracks.api.dataflow.value.ITypeTraits;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.job.JobSpecification;
-import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
-import org.apache.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
-import 
org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor;
-import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
-import 
org.apache.hyracks.storage.am.lsm.btree.dataflow.ExternalBTreeDataflowHelperFactory;
-import 
org.apache.hyracks.storage.am.lsm.btree.dataflow.ExternalBTreeWithBuddyDataflowHelperFactory;
-import 
org.apache.hyracks.storage.am.lsm.btree.dataflow.LSMBTreeDataflowHelperFactory;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
-import 
org.apache.hyracks.storage.am.lsm.common.dataflow.LSMTreeIndexCompactOperatorDescriptor;
-import 
org.apache.hyracks.storage.am.lsm.rtree.dataflow.ExternalRTreeDataflowHelperFactory;
-import org.apache.hyracks.storage.am.rtree.frames.RTreePolicyType;
-import org.apache.hyracks.storage.common.file.LocalResource;
-
-public class ExternalIndexingOperations {
-
-    public static final List<List<String>> FILE_INDEX_FIELD_NAMES = Collections
-            .singletonList(Collections.singletonList(""));
-    public static final List<IAType> FILE_INDEX_FIELD_TYPES = 
Collections.singletonList(BuiltinType.ASTRING);
-
-    private ExternalIndexingOperations() {
-    }
-
-    public static boolean isIndexible(ExternalDatasetDetails ds) {
-        String adapter = ds.getAdapter();
-        if 
(adapter.equalsIgnoreCase(ExternalDataConstants.ALIAS_HDFS_ADAPTER)) {
-            return true;
-        }
-        return false;
-    }
-
-    public static boolean isRefereshActive(ExternalDatasetDetails ds) {
-        return ds.getState() != ExternalDatasetTransactionState.COMMIT;
-    }
-
-    public static boolean isValidIndexName(String datasetName, String 
indexName) {
-        return 
(!datasetName.concat(IndexingConstants.EXTERNAL_FILE_INDEX_NAME_SUFFIX).equals(indexName));
-    }
-
-    public static String getFilesIndexName(String datasetName) {
-        return 
datasetName.concat(IndexingConstants.EXTERNAL_FILE_INDEX_NAME_SUFFIX);
-    }
-
-    public static int getRIDSize(Dataset dataset) {
-        ExternalDatasetDetails dsd = ((ExternalDatasetDetails) 
dataset.getDatasetDetails());
-        return 
IndexingConstants.getRIDSize(dsd.getProperties().get(IndexingConstants.KEY_INPUT_FORMAT));
-    }
-
-    public static IBinaryComparatorFactory[] getComparatorFactories(Dataset 
dataset) {
-        ExternalDatasetDetails dsd = ((ExternalDatasetDetails) 
dataset.getDatasetDetails());
-        return 
IndexingConstants.getComparatorFactories(dsd.getProperties().get(IndexingConstants.KEY_INPUT_FORMAT));
-    }
-
-    public static IBinaryComparatorFactory[] 
getBuddyBtreeComparatorFactories() {
-        return IndexingConstants.getBuddyBtreeComparatorFactories();
-    }
-
-    public static ArrayList<ExternalFile> 
getSnapshotFromExternalFileSystem(Dataset dataset)
-            throws AlgebricksException {
-        ArrayList<ExternalFile> files = new ArrayList<>();
-        ExternalDatasetDetails datasetDetails = (ExternalDatasetDetails) 
dataset.getDatasetDetails();
-        try {
-            // Create the file system object
-            FileSystem fs = 
getFileSystemObject(datasetDetails.getProperties());
-            // Get paths of dataset
-            String path = 
datasetDetails.getProperties().get(ExternalDataConstants.KEY_PATH);
-            String[] paths = path.split(",");
-
-            // Add fileStatuses to files
-            for (String aPath : paths) {
-                FileStatus[] fileStatuses = fs.listStatus(new Path(aPath));
-                for (int i = 0; i < fileStatuses.length; i++) {
-                    int nextFileNumber = files.size();
-                    if (fileStatuses[i].isDirectory()) {
-                        listSubFiles(dataset, fs, fileStatuses[i], files);
-                    } else {
-                        files.add(new ExternalFile(dataset.getDataverseName(), 
dataset.getDatasetName(), nextFileNumber,
-                                fileStatuses[i].getPath().toUri().getPath(),
-                                new 
Date(fileStatuses[i].getModificationTime()), fileStatuses[i].getLen(),
-                                ExternalFilePendingOp.PENDING_NO_OP));
-                    }
-                }
-            }
-            // Close file system
-            fs.close();
-            if (files.size() == 0) {
-                throw new AlgebricksException("File Snapshot retrieved from 
external file system is empty");
-            }
-            return files;
-        } catch (Exception e) {
-            e.printStackTrace();
-            throw new AlgebricksException("Unable to get list of HDFS files " 
+ e);
-        }
-    }
-
-    /* list all files under the directory
-     * src is expected to be a folder
-     */
-    private static void listSubFiles(Dataset dataset, FileSystem srcFs, 
FileStatus src, ArrayList<ExternalFile> files)
-            throws IOException {
-        Path path = src.getPath();
-        FileStatus[] fileStatuses = srcFs.listStatus(path);
-        for (int i = 0; i < fileStatuses.length; i++) {
-            int nextFileNumber = files.size();
-            if (fileStatuses[i].isDirectory()) {
-                listSubFiles(dataset, srcFs, fileStatuses[i], files);
-            } else {
-                files.add(new ExternalFile(dataset.getDataverseName(), 
dataset.getDatasetName(), nextFileNumber,
-                        fileStatuses[i].getPath().toUri().getPath(), new 
Date(fileStatuses[i].getModificationTime()),
-                        fileStatuses[i].getLen(), 
ExternalFilePendingOp.PENDING_NO_OP));
-            }
-        }
-    }
-
-    public static FileSystem getFileSystemObject(Map<String, String> map) 
throws IOException {
-        Configuration conf = new Configuration();
-        conf.set(ExternalDataConstants.KEY_HADOOP_FILESYSTEM_URI, 
map.get(ExternalDataConstants.KEY_HDFS_URL).trim());
-        conf.set(ExternalDataConstants.KEY_HADOOP_FILESYSTEM_CLASS, 
DistributedFileSystem.class.getName());
-        return FileSystem.get(conf);
-    }
-
-    public static JobSpecification buildFilesIndexReplicationJobSpec(Dataset 
dataset,
-            ArrayList<ExternalFile> externalFilesSnapshot, MetadataProvider 
metadataProvider, boolean createIndex)
-            throws MetadataException, AlgebricksException {
-        JobSpecification spec = JobSpecificationUtils.createJobSpecification();
-        IPropertiesProvider asterixPropertiesProvider = 
AppContextInfo.INSTANCE;
-        StorageProperties storageProperties = 
asterixPropertiesProvider.getStorageProperties();
-        Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = 
DatasetUtils.getMergePolicyFactory(dataset,
-                metadataProvider.getMetadataTxnContext());
-        ILSMMergePolicyFactory mergePolicyFactory = compactionInfo.first;
-        Map<String, String> mergePolicyFactoryProperties = 
compactionInfo.second;
-        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> 
secondarySplitsAndConstraint = metadataProvider
-                
.splitProviderAndPartitionConstraintsForFilesIndex(dataset.getDataverseName(), 
dataset.getDatasetName(),
-                        getFilesIndexName(dataset.getDatasetName()), true);
-        IFileSplitProvider secondaryFileSplitProvider = 
secondarySplitsAndConstraint.first;
-        FilesIndexDescription filesIndexDescription = new 
FilesIndexDescription();
-        IResourceFactory localResourceMetadata = new 
ExternalBTreeLocalResourceMetadataFactory(
-                filesIndexDescription.EXTERNAL_FILE_INDEX_TYPE_TRAITS, 
filesIndexDescription.FILES_INDEX_COMP_FACTORIES,
-                new int[] { 0 }, false, dataset.getDatasetId(), 
mergePolicyFactory, mergePolicyFactoryProperties);
-        PersistentLocalResourceFactoryProvider localResourceFactoryProvider =
-                new PersistentLocalResourceFactoryProvider(
-                        localResourceMetadata, 
LocalResource.ExternalBTreeResource);
-        ExternalBTreeDataflowHelperFactory indexDataflowHelperFactory = new 
ExternalBTreeDataflowHelperFactory(
-                mergePolicyFactory, mergePolicyFactoryProperties,
-                new 
SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
-                RuntimeComponentsProvider.RUNTIME_PROVIDER, 
LSMBTreeIOOperationCallbackFactory.INSTANCE,
-                storageProperties.getBloomFilterFalsePositiveRate(),
-                ExternalDatasetsRegistry.INSTANCE.getDatasetVersion(dataset), 
true);
-        ExternalFilesIndexOperatorDescriptor externalFilesOp = new 
ExternalFilesIndexOperatorDescriptor(spec,
-                RuntimeComponentsProvider.RUNTIME_PROVIDER, 
RuntimeComponentsProvider.RUNTIME_PROVIDER,
-                secondaryFileSplitProvider, indexDataflowHelperFactory, 
localResourceFactoryProvider,
-                externalFilesSnapshot, createIndex, 
LSMIndexUtil.getMetadataPageManagerFactory());
-        
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, 
externalFilesOp,
-                secondarySplitsAndConstraint.second);
-        spec.addRoot(externalFilesOp);
-        spec.setConnectorPolicyAssignmentPolicy(new 
ConnectorPolicyAssignmentPolicy());
-        return spec;
-    }
-
-    /**
-     * This method create an indexing operator that index records in HDFS
-     *
-     * @param jobSpec
-     * @param itemType
-     * @param dataset
-     * @param files
-     * @param indexerDesc
-     * @return
-     * @throws AlgebricksException
-     * @throws HyracksDataException
-     * @throws Exception
-     */
-    private static Pair<ExternalDataScanOperatorDescriptor, 
AlgebricksPartitionConstraint>
-            getExternalDataIndexingOperator(
-                    MetadataProvider metadataProvider, JobSpecification 
jobSpec, IAType itemType, Dataset dataset,
-                    List<ExternalFile> files, RecordDescriptor indexerDesc)
-                    throws HyracksDataException, AlgebricksException {
-        ExternalDatasetDetails externalDatasetDetails = 
(ExternalDatasetDetails) dataset.getDatasetDetails();
-        Map<String, String> configuration = 
externalDatasetDetails.getProperties();
-        IAdapterFactory adapterFactory = 
AdapterFactoryProvider.getIndexingAdapterFactory(
-                metadataProvider.getLibraryManager(), 
externalDatasetDetails.getAdapter(), configuration,
-                (ARecordType) itemType, files, true, null);
-        return new Pair<>(new ExternalDataScanOperatorDescriptor(jobSpec, 
indexerDesc, adapterFactory),
-                adapterFactory.getPartitionConstraint());
-    }
-
-    public static Pair<ExternalDataScanOperatorDescriptor, 
AlgebricksPartitionConstraint> createExternalIndexingOp(
-            JobSpecification spec, MetadataProvider metadataProvider, Dataset 
dataset, ARecordType itemType,
-            RecordDescriptor indexerDesc, List<ExternalFile> files) throws 
HyracksDataException, AlgebricksException {
-        if (files == null) {
-            files = 
MetadataManager.INSTANCE.getDatasetExternalFiles(metadataProvider.getMetadataTxnContext(),
 dataset);
-        }
-        return getExternalDataIndexingOperator(metadataProvider, spec, 
itemType, dataset, files, indexerDesc);
-    }
-
-    /**
-     * At the end of this method, we expect to have 4 sets as follows:
-     * metadataFiles should contain only the files that are appended in their 
original state
-     * addedFiles should contain new files that has number assigned starting 
after the max original file number
-     * deletedFiles should contain files that are no longer there in the file 
system
-     * appendedFiles should have the new file information of existing files
-     * The method should return false in case of zero delta
-     *
-     * @param dataset
-     * @param metadataFiles
-     * @param addedFiles
-     * @param deletedFiles
-     * @param appendedFiles
-     * @return
-     * @throws MetadataException
-     * @throws AlgebricksException
-     */
-    public static boolean isDatasetUptodate(Dataset dataset, 
List<ExternalFile> metadataFiles,
-            List<ExternalFile> addedFiles, List<ExternalFile> deletedFiles, 
List<ExternalFile> appendedFiles)
-            throws MetadataException, AlgebricksException {
-        boolean uptodate = true;
-        int newFileNumber = metadataFiles.get(metadataFiles.size() - 
1).getFileNumber() + 1;
-
-        ArrayList<ExternalFile> fileSystemFiles = 
getSnapshotFromExternalFileSystem(dataset);
-
-        // Loop over file system files < taking care of added files >
-        for (ExternalFile fileSystemFile : fileSystemFiles) {
-            boolean fileFound = false;
-            Iterator<ExternalFile> mdFilesIterator = metadataFiles.iterator();
-            while (mdFilesIterator.hasNext()) {
-                ExternalFile metadataFile = mdFilesIterator.next();
-                if 
(fileSystemFile.getFileName().equals(metadataFile.getFileName())) {
-                    // Same file name
-                    if 
(fileSystemFile.getLastModefiedTime().equals(metadataFile.getLastModefiedTime()))
 {
-                        // Same timestamp
-                        if (fileSystemFile.getSize() == 
metadataFile.getSize()) {
-                            // Same size -> no op
-                            mdFilesIterator.remove();
-                            fileFound = true;
-                        } else {
-                            // Different size -> append op
-                            
metadataFile.setPendingOp(ExternalFilePendingOp.PENDING_APPEND_OP);
-                            
fileSystemFile.setPendingOp(ExternalFilePendingOp.PENDING_APPEND_OP);
-                            appendedFiles.add(fileSystemFile);
-                            fileFound = true;
-                            uptodate = false;
-                        }
-                    } else {
-                        // Same file name, Different file mod date -> delete 
and add
-                        
metadataFile.setPendingOp(ExternalFilePendingOp.PENDING_DROP_OP);
-                        deletedFiles
-                                .add(new 
ExternalFile(metadataFile.getDataverseName(), metadataFile.getDatasetName(), 0,
-                                        metadataFile.getFileName(), 
metadataFile.getLastModefiedTime(),
-                                        metadataFile.getSize(), 
ExternalFilePendingOp.PENDING_DROP_OP));
-                        
fileSystemFile.setPendingOp(ExternalFilePendingOp.PENDING_ADD_OP);
-                        fileSystemFile.setFileNumber(newFileNumber);
-                        addedFiles.add(fileSystemFile);
-                        newFileNumber++;
-                        fileFound = true;
-                        uptodate = false;
-                    }
-                }
-                if (fileFound) {
-                    break;
-                }
-            }
-            if (!fileFound) {
-                // File not stored previously in metadata -> pending add op
-                
fileSystemFile.setPendingOp(ExternalFilePendingOp.PENDING_ADD_OP);
-                fileSystemFile.setFileNumber(newFileNumber);
-                addedFiles.add(fileSystemFile);
-                newFileNumber++;
-                uptodate = false;
-            }
-        }
-
-        // Done with files from external file system -> metadata files now 
contain both deleted files and appended ones
-        // first, correct number assignment to deleted and updated files
-        for (ExternalFile deletedFile : deletedFiles) {
-            deletedFile.setFileNumber(newFileNumber);
-            newFileNumber++;
-        }
-        for (ExternalFile appendedFile : appendedFiles) {
-            appendedFile.setFileNumber(newFileNumber);
-            newFileNumber++;
-        }
-
-        // include the remaining deleted files
-        Iterator<ExternalFile> mdFilesIterator = metadataFiles.iterator();
-        while (mdFilesIterator.hasNext()) {
-            ExternalFile metadataFile = mdFilesIterator.next();
-            if (metadataFile.getPendingOp() == 
ExternalFilePendingOp.PENDING_NO_OP) {
-                
metadataFile.setPendingOp(ExternalFilePendingOp.PENDING_DROP_OP);
-                deletedFiles.add(new 
ExternalFile(metadataFile.getDataverseName(), metadataFile.getDatasetName(),
-                        newFileNumber, metadataFile.getFileName(), 
metadataFile.getLastModefiedTime(),
-                        metadataFile.getSize(), metadataFile.getPendingOp()));
-                newFileNumber++;
-                uptodate = false;
-            }
-        }
-        return uptodate;
-    }
-
-    public static Dataset createTransactionDataset(Dataset dataset) {
-        ExternalDatasetDetails originalDsd = (ExternalDatasetDetails) 
dataset.getDatasetDetails();
-        ExternalDatasetDetails dsd = new 
ExternalDatasetDetails(originalDsd.getAdapter(), originalDsd.getProperties(),
-                originalDsd.getTimestamp(), 
ExternalDatasetTransactionState.BEGIN);
-        Dataset transactionDatset = new Dataset(dataset.getDataverseName(), 
dataset.getDatasetName(),
-                dataset.getItemTypeDataverseName(), dataset.getItemTypeName(), 
dataset.getNodeGroupName(),
-                dataset.getCompactionPolicy(), 
dataset.getCompactionPolicyProperties(), dsd, dataset.getHints(),
-                DatasetType.EXTERNAL, dataset.getDatasetId(), 
dataset.getPendingOp());
-        return transactionDatset;
-    }
-
-    public static boolean isFileIndex(Index index) {
-        return 
(index.getIndexName().equals(getFilesIndexName(index.getDatasetName())));
-    }
-
-    public static JobSpecification 
buildDropFilesIndexJobSpec(CompiledIndexDropStatement indexDropStmt,
-            MetadataProvider metadataProvider, Dataset dataset) throws 
AlgebricksException, MetadataException {
-        String dataverseName = indexDropStmt.getDataverseName() == null ? 
metadataProvider.getDefaultDataverseName()
-                : indexDropStmt.getDataverseName();
-        String datasetName = indexDropStmt.getDatasetName();
-        String indexName = indexDropStmt.getIndexName();
-        boolean temp = dataset.getDatasetDetails().isTemp();
-        JobSpecification spec = JobSpecificationUtils.createJobSpecification();
-        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> 
splitsAndConstraint = metadataProvider
-                
.splitProviderAndPartitionConstraintsForFilesIndex(dataverseName, datasetName, 
indexName, true);
-        StorageProperties storageProperties = 
AppContextInfo.INSTANCE.getStorageProperties();
-        Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = 
DatasetUtils.getMergePolicyFactory(dataset,
-                metadataProvider.getMetadataTxnContext());
-        IndexDropOperatorDescriptor btreeDrop = new 
IndexDropOperatorDescriptor(spec,
-                RuntimeComponentsProvider.RUNTIME_PROVIDER, 
RuntimeComponentsProvider.RUNTIME_PROVIDER,
-                splitsAndConstraint.first,
-                new LSMBTreeDataflowHelperFactory(new 
AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
-                        compactionInfo.first, compactionInfo.second,
-                        new 
SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
-                        RuntimeComponentsProvider.RUNTIME_PROVIDER, 
LSMBTreeIOOperationCallbackFactory.INSTANCE,
-                        storageProperties.getBloomFilterFalsePositiveRate(), 
false, null, null, null, null, !temp),
-                LSMIndexUtil.getMetadataPageManagerFactory());
-        
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, 
btreeDrop,
-                splitsAndConstraint.second);
-        spec.addRoot(btreeDrop);
-
-        return spec;
-    }
-
-    public static JobSpecification buildFilesIndexUpdateOp(Dataset ds, 
List<ExternalFile> metadataFiles,
-            List<ExternalFile> deletedFiles, List<ExternalFile> addedFiles, 
List<ExternalFile> appendedFiles,
-            MetadataProvider metadataProvider) throws MetadataException, 
AlgebricksException {
-        ArrayList<ExternalFile> files = new ArrayList<>();
-        for (ExternalFile file : metadataFiles) {
-            if (file.getPendingOp() == ExternalFilePendingOp.PENDING_DROP_OP) {
-                files.add(file);
-            } else if (file.getPendingOp() == 
ExternalFilePendingOp.PENDING_APPEND_OP) {
-                for (ExternalFile appendedFile : appendedFiles) {
-                    if (appendedFile.getFileName().equals(file.getFileName())) 
{
-                        files.add(new ExternalFile(file.getDataverseName(), 
file.getDatasetName(), file.getFileNumber(),
-                                file.getFileName(), 
file.getLastModefiedTime(), appendedFile.getSize(),
-                                ExternalFilePendingOp.PENDING_NO_OP));
-                    }
-                }
-            }
-        }
-        for (ExternalFile file : addedFiles) {
-            files.add(file);
-        }
-        Collections.sort(files);
-        return buildFilesIndexReplicationJobSpec(ds, files, metadataProvider, 
false);
-    }
-
-    public static JobSpecification buildIndexUpdateOp(Dataset ds, Index index, 
List<ExternalFile> metadataFiles,
-            List<ExternalFile> deletedFiles, List<ExternalFile> addedFiles, 
List<ExternalFile> appendedFiles,
-            MetadataProvider metadataProvider) throws AsterixException, 
AlgebricksException {
-        // Create files list
-        ArrayList<ExternalFile> files = new ArrayList<>();
-
-        for (ExternalFile metadataFile : metadataFiles) {
-            if (metadataFile.getPendingOp() != 
ExternalFilePendingOp.PENDING_APPEND_OP) {
-                files.add(metadataFile);
-            } else {
-                metadataFile.setPendingOp(ExternalFilePendingOp.PENDING_NO_OP);
-                files.add(metadataFile);
-            }
-        }
-        // add new files
-        for (ExternalFile file : addedFiles) {
-            files.add(file);
-        }
-        // add appended files
-        for (ExternalFile file : appendedFiles) {
-            files.add(file);
-        }
-
-        CompiledCreateIndexStatement ccis = new 
CompiledCreateIndexStatement(index.getIndexName(),
-                index.getDataverseName(), index.getDatasetName(), 
index.getKeyFieldNames(), index.getKeyFieldTypes(),
-                index.isEnforcingKeyFileds(), index.getGramLength(), 
index.getIndexType());
-        return IndexOperations.buildSecondaryIndexLoadingJobSpec(ccis, null, 
null, null, null, metadataProvider, files);
-    }
-
-    public static JobSpecification buildCommitJob(Dataset ds, List<Index> 
indexes, MetadataProvider metadataProvider)
-            throws AlgebricksException, AsterixException {
-        JobSpecification spec = JobSpecificationUtils.createJobSpecification();
-        IPropertiesProvider asterixPropertiesProvider = 
AppContextInfo.INSTANCE;
-        StorageProperties storageProperties = 
asterixPropertiesProvider.getStorageProperties();
-        Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = 
DatasetUtils.getMergePolicyFactory(ds,
-                metadataProvider.getMetadataTxnContext());
-        boolean temp = ds.getDatasetDetails().isTemp();
-        ILSMMergePolicyFactory mergePolicyFactory = compactionInfo.first;
-        Map<String, String> mergePolicyFactoryProperties = 
compactionInfo.second;
-        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> 
filesIndexSplitsAndConstraint = metadataProvider
-                
.splitProviderAndPartitionConstraintsForDataset(ds.getDataverseName(), 
ds.getDatasetName(),
-                        getFilesIndexName(ds.getDatasetName()), temp);
-        IFileSplitProvider filesIndexSplitProvider = 
filesIndexSplitsAndConstraint.first;
-        ExternalBTreeDataflowHelperFactory filesIndexDataflowHelperFactory = 
getFilesIndexDataflowHelperFactory(ds,
-                mergePolicyFactory, mergePolicyFactoryProperties, 
storageProperties, spec);
-        IndexInfoOperatorDescriptor filesIndexInfo = new 
IndexInfoOperatorDescriptor(filesIndexSplitProvider,
-                RuntimeComponentsProvider.RUNTIME_PROVIDER, 
RuntimeComponentsProvider.RUNTIME_PROVIDER);
-
-        ArrayList<ExternalBTreeWithBuddyDataflowHelperFactory> 
btreeDataflowHelperFactories = new ArrayList<>();
-        ArrayList<IndexInfoOperatorDescriptor> btreeInfos = new ArrayList<>();
-        ArrayList<ExternalRTreeDataflowHelperFactory> 
rtreeDataflowHelperFactories = new ArrayList<>();
-        ArrayList<IndexInfoOperatorDescriptor> rtreeInfos = new ArrayList<>();
-
-        for (Index index : indexes) {
-            if (isValidIndexName(index.getDatasetName(), 
index.getIndexName())) {
-                Pair<IFileSplitProvider, AlgebricksPartitionConstraint> 
indexSplitsAndConstraint = metadataProvider
-                        
.splitProviderAndPartitionConstraintsForDataset(ds.getDataverseName(), 
ds.getDatasetName(),
-                                index.getIndexName(), temp);
-                if (index.getIndexType() == IndexType.BTREE) {
-                    
btreeDataflowHelperFactories.add(getBTreeDataflowHelperFactory(ds, index, 
mergePolicyFactory,
-                            mergePolicyFactoryProperties, storageProperties, 
spec));
-                    btreeInfos.add(new 
IndexInfoOperatorDescriptor(indexSplitsAndConstraint.first,
-                            RuntimeComponentsProvider.RUNTIME_PROVIDER,
-                            RuntimeComponentsProvider.RUNTIME_PROVIDER));
-                } else if (index.getIndexType() == IndexType.RTREE) {
-                    
rtreeDataflowHelperFactories.add(getRTreeDataflowHelperFactory(ds, index, 
mergePolicyFactory,
-                            mergePolicyFactoryProperties, storageProperties, 
metadataProvider, spec));
-                    rtreeInfos.add(new 
IndexInfoOperatorDescriptor(indexSplitsAndConstraint.first,
-                            RuntimeComponentsProvider.RUNTIME_PROVIDER,
-                            RuntimeComponentsProvider.RUNTIME_PROVIDER));
-                }
-            }
-        }
-
-        ExternalDatasetIndexesCommitOperatorDescriptor op = new 
ExternalDatasetIndexesCommitOperatorDescriptor(spec,
-                filesIndexDataflowHelperFactory, filesIndexInfo, 
btreeDataflowHelperFactories, btreeInfos,
-                rtreeDataflowHelperFactories, rtreeInfos);
-
-        spec.addRoot(op);
-        
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, op,
-                filesIndexSplitsAndConstraint.second);
-        spec.setConnectorPolicyAssignmentPolicy(new 
ConnectorPolicyAssignmentPolicy());
-        return spec;
-    }
-
-    private static ExternalBTreeDataflowHelperFactory 
getFilesIndexDataflowHelperFactory(Dataset ds,
-            ILSMMergePolicyFactory mergePolicyFactory, Map<String, String> 
mergePolicyFactoryProperties,
-            StorageProperties storageProperties, JobSpecification spec) {
-        return new ExternalBTreeDataflowHelperFactory(mergePolicyFactory, 
mergePolicyFactoryProperties,
-                new SecondaryIndexOperationTrackerProvider(ds.getDatasetId()),
-                RuntimeComponentsProvider.RUNTIME_PROVIDER, 
LSMBTreeIOOperationCallbackFactory.INSTANCE,
-                storageProperties.getBloomFilterFalsePositiveRate(),
-                ExternalDatasetsRegistry.INSTANCE.getDatasetVersion(ds), true);
-    }
-
-    private static ExternalBTreeWithBuddyDataflowHelperFactory 
getBTreeDataflowHelperFactory(Dataset ds, Index index,
-            ILSMMergePolicyFactory mergePolicyFactory, Map<String, String> 
mergePolicyFactoryProperties,
-            StorageProperties storageProperties, JobSpecification spec) {
-        return new 
ExternalBTreeWithBuddyDataflowHelperFactory(mergePolicyFactory, 
mergePolicyFactoryProperties,
-                new SecondaryIndexOperationTrackerProvider(ds.getDatasetId()),
-                RuntimeComponentsProvider.RUNTIME_PROVIDER, 
LSMBTreeWithBuddyIOOperationCallbackFactory.INSTANCE,
-                storageProperties.getBloomFilterFalsePositiveRate(), new int[] 
{ index.getKeyFieldNames().size() },
-                ExternalDatasetsRegistry.INSTANCE.getDatasetVersion(ds), true);
-    }
-
-    @SuppressWarnings("rawtypes")
-    private static ExternalRTreeDataflowHelperFactory 
getRTreeDataflowHelperFactory(Dataset ds, Index index,
-            ILSMMergePolicyFactory mergePolicyFactory, Map<String, String> 
mergePolicyFactoryProperties,
-            StorageProperties storageProperties, MetadataProvider 
metadataProvider, JobSpecification spec)
-            throws AlgebricksException, AsterixException {
-        int numPrimaryKeys = getRIDSize(ds);
-        List<List<String>> secondaryKeyFields = index.getKeyFieldNames();
-        secondaryKeyFields.size();
-        ARecordType itemType = (ARecordType) 
metadataProvider.findType(ds.getItemTypeDataverseName(),
-                ds.getItemTypeName());
-        Pair<IAType, Boolean> spatialTypePair = 
Index.getNonNullableKeyFieldType(secondaryKeyFields.get(0), itemType);
-        IAType spatialType = spatialTypePair.first;
-        if (spatialType == null) {
-            throw new AsterixException("Could not find field " + 
secondaryKeyFields.get(0) + " in the schema.");
-        }
-        boolean isPointMBR = spatialType.getTypeTag() == ATypeTag.POINT || 
spatialType.getTypeTag() == ATypeTag.POINT3D;
-        int numDimensions = 
NonTaggedFormatUtil.getNumDimensions(spatialType.getTypeTag());
-        int numNestedSecondaryKeyFields = numDimensions * 2;
-        IPrimitiveValueProviderFactory[] valueProviderFactories =
-                new 
IPrimitiveValueProviderFactory[numNestedSecondaryKeyFields];
-        IBinaryComparatorFactory[] secondaryComparatorFactories =
-                new IBinaryComparatorFactory[numNestedSecondaryKeyFields];
-
-        ISerializerDeserializer[] secondaryRecFields = new 
ISerializerDeserializer[numPrimaryKeys
-                + numNestedSecondaryKeyFields];
-        ITypeTraits[] secondaryTypeTraits = new 
ITypeTraits[numNestedSecondaryKeyFields + numPrimaryKeys];
-        IAType nestedKeyType = 
NonTaggedFormatUtil.getNestedSpatialType(spatialType.getTypeTag());
-        ATypeTag keyType = nestedKeyType.getTypeTag();
-
-        keyType = nestedKeyType.getTypeTag();
-        for (int i = 0; i < numNestedSecondaryKeyFields; i++) {
-            ISerializerDeserializer keySerde = 
SerializerDeserializerProvider.INSTANCE
-                    .getSerializerDeserializer(nestedKeyType);
-            secondaryRecFields[i] = keySerde;
-
-            secondaryComparatorFactories[i] = 
BinaryComparatorFactoryProvider.INSTANCE
-                    .getBinaryComparatorFactory(nestedKeyType, true);
-            secondaryTypeTraits[i] = 
TypeTraitProvider.INSTANCE.getTypeTrait(nestedKeyType);
-            valueProviderFactories[i] = PrimitiveValueProviderFactory.INSTANCE;
-        }
-        // Add serializers and comparators for primary index fields.
-        for (int i = 0; i < numPrimaryKeys; i++) {
-            secondaryRecFields[numNestedSecondaryKeyFields + i] = 
IndexingConstants.getSerializerDeserializer(i);
-            secondaryTypeTraits[numNestedSecondaryKeyFields + i] = 
IndexingConstants.getTypeTraits(i);
-        }
-        int[] primaryKeyFields = new int[numPrimaryKeys];
-        for (int i = 0; i < primaryKeyFields.length; i++) {
-            primaryKeyFields[i] = i + numNestedSecondaryKeyFields;
-        }
-
-        return new ExternalRTreeDataflowHelperFactory(valueProviderFactories, 
RTreePolicyType.RTREE,
-                getBuddyBtreeComparatorFactories(), mergePolicyFactory, 
mergePolicyFactoryProperties,
-                new SecondaryIndexOperationTrackerProvider(ds.getDatasetId()),
-                RuntimeComponentsProvider.RUNTIME_PROVIDER, 
LSMRTreeIOOperationCallbackFactory.INSTANCE,
-                MetadataProvider.proposeLinearizer(keyType, 
secondaryComparatorFactories.length),
-                storageProperties.getBloomFilterFalsePositiveRate(), new int[] 
{ index.getKeyFieldNames().size() },
-                ExternalDatasetsRegistry.INSTANCE.getDatasetVersion(ds), true, 
isPointMBR);
-    }
-
-    public static JobSpecification buildAbortOp(Dataset ds, List<Index> 
indexes, MetadataProvider metadataProvider)
-            throws AlgebricksException, AsterixException {
-        JobSpecification spec = JobSpecificationUtils.createJobSpecification();
-        IPropertiesProvider asterixPropertiesProvider = 
AppContextInfo.INSTANCE;
-        StorageProperties storageProperties = 
asterixPropertiesProvider.getStorageProperties();
-        Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = 
DatasetUtils.getMergePolicyFactory(ds,
-                metadataProvider.getMetadataTxnContext());
-        ILSMMergePolicyFactory mergePolicyFactory = compactionInfo.first;
-        Map<String, String> mergePolicyFactoryProperties = 
compactionInfo.second;
-
-        boolean temp = ds.getDatasetDetails().isTemp();
-        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> 
filesIndexSplitsAndConstraint = metadataProvider
-                
.splitProviderAndPartitionConstraintsForDataset(ds.getDataverseName(), 
ds.getDatasetName(),
-                        getFilesIndexName(ds.getDatasetName()), temp);
-        IFileSplitProvider filesIndexSplitProvider = 
filesIndexSplitsAndConstraint.first;
-        ExternalBTreeDataflowHelperFactory filesIndexDataflowHelperFactory = 
getFilesIndexDataflowHelperFactory(ds,
-                mergePolicyFactory, mergePolicyFactoryProperties, 
storageProperties, spec);
-        IndexInfoOperatorDescriptor filesIndexInfo = new 
IndexInfoOperatorDescriptor(filesIndexSplitProvider,
-                RuntimeComponentsProvider.RUNTIME_PROVIDER, 
RuntimeComponentsProvider.RUNTIME_PROVIDER);
-
-        ArrayList<ExternalBTreeWithBuddyDataflowHelperFactory> 
btreeDataflowHelperFactories = new ArrayList<>();
-        ArrayList<IndexInfoOperatorDescriptor> btreeInfos = new ArrayList<>();
-        ArrayList<ExternalRTreeDataflowHelperFactory> 
rtreeDataflowHelperFactories = new ArrayList<>();
-        ArrayList<IndexInfoOperatorDescriptor> rtreeInfos = new ArrayList<>();
-
-        for (Index index : indexes) {
-            if (isValidIndexName(index.getDatasetName(), 
index.getIndexName())) {
-                Pair<IFileSplitProvider, AlgebricksPartitionConstraint> 
indexSplitsAndConstraint = metadataProvider
-                        
.splitProviderAndPartitionConstraintsForDataset(ds.getDataverseName(), 
ds.getDatasetName(),
-                                index.getIndexName(), temp);
-                if (index.getIndexType() == IndexType.BTREE) {
-                    
btreeDataflowHelperFactories.add(getBTreeDataflowHelperFactory(ds, index, 
mergePolicyFactory,
-                            mergePolicyFactoryProperties, storageProperties, 
spec));
-                    btreeInfos.add(new 
IndexInfoOperatorDescriptor(indexSplitsAndConstraint.first,
-                            RuntimeComponentsProvider.RUNTIME_PROVIDER,
-                            RuntimeComponentsProvider.RUNTIME_PROVIDER));
-                } else if (index.getIndexType() == IndexType.RTREE) {
-                    
rtreeDataflowHelperFactories.add(getRTreeDataflowHelperFactory(ds, index, 
mergePolicyFactory,
-                            mergePolicyFactoryProperties, storageProperties, 
metadataProvider, spec));
-                    rtreeInfos.add(new 
IndexInfoOperatorDescriptor(indexSplitsAndConstraint.first,
-                            RuntimeComponentsProvider.RUNTIME_PROVIDER,
-                            RuntimeComponentsProvider.RUNTIME_PROVIDER));
-                }
-            }
-        }
-
-        ExternalDatasetIndexesAbortOperatorDescriptor op = new 
ExternalDatasetIndexesAbortOperatorDescriptor(spec,
-                filesIndexDataflowHelperFactory, filesIndexInfo, 
btreeDataflowHelperFactories, btreeInfos,
-                rtreeDataflowHelperFactories, rtreeInfos);
-
-        spec.addRoot(op);
-        
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, op,
-                filesIndexSplitsAndConstraint.second);
-        spec.setConnectorPolicyAssignmentPolicy(new 
ConnectorPolicyAssignmentPolicy());
-        return spec;
-
-    }
-
-    public static JobSpecification buildRecoverOp(Dataset ds, List<Index> 
indexes, MetadataProvider metadataProvider)
-            throws AlgebricksException, AsterixException {
-        JobSpecification spec = JobSpecificationUtils.createJobSpecification();
-        IPropertiesProvider asterixPropertiesProvider = 
AppContextInfo.INSTANCE;
-        StorageProperties storageProperties = 
asterixPropertiesProvider.getStorageProperties();
-        Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = 
DatasetUtils.getMergePolicyFactory(ds,
-                metadataProvider.getMetadataTxnContext());
-        ILSMMergePolicyFactory mergePolicyFactory = compactionInfo.first;
-        Map<String, String> mergePolicyFactoryProperties = 
compactionInfo.second;
-        boolean temp = ds.getDatasetDetails().isTemp();
-
-        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> 
filesIndexSplitsAndConstraint = metadataProvider
-                
.splitProviderAndPartitionConstraintsForDataset(ds.getDataverseName(), 
ds.getDatasetName(),
-                        getFilesIndexName(ds.getDatasetName()), temp);
-        IFileSplitProvider filesIndexSplitProvider = 
filesIndexSplitsAndConstraint.first;
-        ExternalBTreeDataflowHelperFactory filesIndexDataflowHelperFactory = 
getFilesIndexDataflowHelperFactory(ds,
-                mergePolicyFactory, mergePolicyFactoryProperties, 
storageProperties, spec);
-        IndexInfoOperatorDescriptor filesIndexInfo = new 
IndexInfoOperatorDescriptor(filesIndexSplitProvider,
-                RuntimeComponentsProvider.RUNTIME_PROVIDER, 
RuntimeComponentsProvider.RUNTIME_PROVIDER);
-
-        ArrayList<ExternalBTreeWithBuddyDataflowHelperFactory> 
btreeDataflowHelperFactories = new ArrayList<>();
-        ArrayList<IndexInfoOperatorDescriptor> btreeInfos = new ArrayList<>();
-        ArrayList<ExternalRTreeDataflowHelperFactory> 
rtreeDataflowHelperFactories = new ArrayList<>();
-        ArrayList<IndexInfoOperatorDescriptor> rtreeInfos = new ArrayList<>();
-
-        for (Index index : indexes) {
-            if (isValidIndexName(index.getDatasetName(), 
index.getIndexName())) {
-                Pair<IFileSplitProvider, AlgebricksPartitionConstraint> 
indexSplitsAndConstraint = metadataProvider
-                        
.splitProviderAndPartitionConstraintsForDataset(ds.getDataverseName(), 
ds.getDatasetName(),
-                                index.getIndexName(), temp);
-                if (index.getIndexType() == IndexType.BTREE) {
-                    
btreeDataflowHelperFactories.add(getBTreeDataflowHelperFactory(ds, index, 
mergePolicyFactory,
-                            mergePolicyFactoryProperties, storageProperties, 
spec));
-                    btreeInfos.add(new 
IndexInfoOperatorDescriptor(indexSplitsAndConstraint.first,
-                            RuntimeComponentsProvider.RUNTIME_PROVIDER,
-                            RuntimeComponentsProvider.RUNTIME_PROVIDER));
-                } else if (index.getIndexType() == IndexType.RTREE) {
-                    
rtreeDataflowHelperFactories.add(getRTreeDataflowHelperFactory(ds, index, 
mergePolicyFactory,
-                            mergePolicyFactoryProperties, storageProperties, 
metadataProvider, spec));
-                    rtreeInfos.add(new 
IndexInfoOperatorDescriptor(indexSplitsAndConstraint.first,
-                            RuntimeComponentsProvider.RUNTIME_PROVIDER,
-                            RuntimeComponentsProvider.RUNTIME_PROVIDER));
-                }
-            }
-        }
-
-        ExternalDatasetIndexesRecoverOperatorDescriptor op = new 
ExternalDatasetIndexesRecoverOperatorDescriptor(spec,
-                filesIndexDataflowHelperFactory, filesIndexInfo, 
btreeDataflowHelperFactories, btreeInfos,
-                rtreeDataflowHelperFactories, rtreeInfos);
-
-        spec.addRoot(op);
-        
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, op,
-                filesIndexSplitsAndConstraint.second);
-        spec.setConnectorPolicyAssignmentPolicy(new 
ConnectorPolicyAssignmentPolicy());
-        return spec;
-    }
-
-    public static JobSpecification compactFilesIndexJobSpec(Dataset dataset, 
MetadataProvider metadataProvider)
-            throws MetadataException, AlgebricksException {
-        JobSpecification spec = JobSpecificationUtils.createJobSpecification();
-        IPropertiesProvider asterixPropertiesProvider = 
AppContextInfo.INSTANCE;
-        StorageProperties storageProperties = 
asterixPropertiesProvider.getStorageProperties();
-        Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = 
DatasetUtils.getMergePolicyFactory(dataset,
-                metadataProvider.getMetadataTxnContext());
-        ILSMMergePolicyFactory mergePolicyFactory = compactionInfo.first;
-        Map<String, String> mergePolicyFactoryProperties = 
compactionInfo.second;
-        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> 
secondarySplitsAndConstraint = metadataProvider
-                
.splitProviderAndPartitionConstraintsForFilesIndex(dataset.getDataverseName(), 
dataset.getDatasetName(),
-                        getFilesIndexName(dataset.getDatasetName()), true);
-        IFileSplitProvider secondaryFileSplitProvider = 
secondarySplitsAndConstraint.first;
-        ExternalBTreeDataflowHelperFactory indexDataflowHelperFactory = new 
ExternalBTreeDataflowHelperFactory(
-                mergePolicyFactory, mergePolicyFactoryProperties,
-                new 
SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
-                RuntimeComponentsProvider.RUNTIME_PROVIDER, 
LSMBTreeIOOperationCallbackFactory.INSTANCE,
-                storageProperties.getBloomFilterFalsePositiveRate(),
-                ExternalDatasetsRegistry.INSTANCE.getDatasetVersion(dataset), 
true);
-        FilesIndexDescription filesIndexDescription = new 
FilesIndexDescription();
-        LSMTreeIndexCompactOperatorDescriptor compactOp = new 
LSMTreeIndexCompactOperatorDescriptor(spec,
-                RuntimeComponentsProvider.RUNTIME_PROVIDER, 
RuntimeComponentsProvider.RUNTIME_PROVIDER,
-                secondaryFileSplitProvider, 
filesIndexDescription.EXTERNAL_FILE_INDEX_TYPE_TRAITS,
-                filesIndexDescription.FILES_INDEX_COMP_FACTORIES, new int[] { 
0 }, indexDataflowHelperFactory,
-                NoOpOperationCallbackFactory.INSTANCE, 
LSMIndexUtil.getMetadataPageManagerFactory());
-        spec.addRoot(compactOp);
-        
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, 
compactOp,
-                secondarySplitsAndConstraint.second);
-        spec.setConnectorPolicyAssignmentPolicy(new 
ConnectorPolicyAssignmentPolicy());
-        return spec;
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalLibraryUtils.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalLibraryUtils.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalLibraryUtils.java
index 95fe68c..4680465 100755
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalLibraryUtils.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalLibraryUtils.java
@@ -43,11 +43,11 @@ import org.apache.asterix.external.library.LibraryAdapter;
 import org.apache.asterix.external.library.LibraryFunction;
 import org.apache.asterix.metadata.MetadataManager;
 import org.apache.asterix.metadata.MetadataTransactionContext;
-import org.apache.asterix.metadata.api.IMetadataEntity;
 import org.apache.asterix.metadata.entities.DatasourceAdapter;
 import org.apache.asterix.metadata.entities.Dataverse;
 import org.apache.asterix.metadata.entities.Function;
 import org.apache.asterix.metadata.entities.Library;
+import org.apache.asterix.metadata.utils.MetadataUtil;
 import org.apache.asterix.runtime.formats.NonTaggedDataFormat;
 
 public class ExternalLibraryUtils {
@@ -81,7 +81,7 @@ public class ExternalLibraryUtils {
                         // get library file
                         File libraryDir = new 
File(installLibDir.getAbsolutePath() + File.separator + dataverse
                                 + File.separator + library);
-                        // install if needed (i,e, add the functions, 
adapters, datasources, parsers to the metadata) <Not required for use>
+                        // install if needed (i,e, add the functions, 
adapters, datasources, parsers to the metadata)
                         installLibraryIfNeeded(dataverse, libraryDir, 
uninstalledLibs);
                     }
                 }
@@ -96,7 +96,7 @@ public class ExternalLibraryUtils {
      * @throws Exception
      */
     private static Map<String, List<String>> uninstallLibraries() throws 
Exception {
-        Map<String, List<String>> uninstalledLibs = new HashMap<String, 
List<String>>();
+        Map<String, List<String>> uninstalledLibs = new HashMap<>();
         // get the directory of the un-install libraries
         File uninstallLibDir = getLibraryUninstallDir();
         String[] uninstallLibNames;
@@ -116,7 +116,7 @@ public class ExternalLibraryUtils {
                 // add the library to the list of uninstalled libraries
                 List<String> uinstalledLibsInDv = 
uninstalledLibs.get(dataverse);
                 if (uinstalledLibsInDv == null) {
-                    uinstalledLibsInDv = new ArrayList<String>();
+                    uinstalledLibsInDv = new ArrayList<>();
                     uninstalledLibs.put(dataverse, uinstalledLibsInDv);
                 }
                 uinstalledLibsInDv.add(libName);
@@ -172,7 +172,8 @@ public class ExternalLibraryUtils {
                 // belong to the library?
                 if 
(adapter.getAdapterIdentifier().getName().startsWith(libraryName + "#")) {
                     // remove adapter <! we didn't check if there are feeds 
which use this adapter>
-                    MetadataManager.INSTANCE.dropAdapter(mdTxnCtx, dataverse, 
adapter.getAdapterIdentifier().getName());
+                    MetadataManager.INSTANCE.dropAdapter(mdTxnCtx, dataverse,
+                            adapter.getAdapterIdentifier().getName());
                 }
             }
             // drop the library itself
@@ -203,7 +204,8 @@ public class ExternalLibraryUtils {
             Library libraryInMetadata = 
MetadataManager.INSTANCE.getLibrary(mdTxnCtx, dataverse, libraryName);
             if (libraryInMetadata != null && !wasUninstalled) {
                 // exists in metadata and was not un-installed, we return.
-                // Another place which shows that our metadata transactions 
are broken (we didn't call commit before!!!)
+                // Another place which shows that our metadata transactions 
are broken
+                // (we didn't call commit before!!!)
                 MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
                 return;
             }
@@ -235,13 +237,13 @@ public class ExternalLibraryUtils {
             Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, 
dataverse);
             if (dv == null) {
                 MetadataManager.INSTANCE.addDataverse(mdTxnCtx, new 
Dataverse(dataverse,
-                        NonTaggedDataFormat.NON_TAGGED_DATA_FORMAT, 
IMetadataEntity.PENDING_NO_OP));
+                        NonTaggedDataFormat.NON_TAGGED_DATA_FORMAT, 
MetadataUtil.PENDING_NO_OP));
             }
             // Add functions
             if (library.getLibraryFunctions() != null) {
                 for (LibraryFunction function : 
library.getLibraryFunctions().getLibraryFunction()) {
                     String[] fargs = function.getArguments().trim().split(",");
-                    List<String> args = new ArrayList<String>();
+                    List<String> args = new ArrayList<>();
                     for (String arg : fargs) {
                         args.add(arg);
                     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedOperations.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedOperations.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedOperations.java
deleted file mode 100644
index 34746d3..0000000
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedOperations.java
+++ /dev/null
@@ -1,179 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.app.external;
-
-import java.util.Collection;
-import java.util.List;
-import java.util.Set;
-import java.util.TreeSet;
-
-import org.apache.asterix.active.ActiveJobNotificationHandler;
-import org.apache.asterix.active.EntityId;
-import org.apache.asterix.active.IActiveMessage;
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.common.utils.StoragePathUtil;
-import org.apache.asterix.external.api.IAdapterFactory;
-import org.apache.asterix.external.feed.api.IFeedJoint;
-import org.apache.asterix.external.feed.management.FeedConnectionId;
-import org.apache.asterix.external.feed.management.FeedEventsListener;
-import org.apache.asterix.external.feed.message.EndFeedMessage;
-import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
-import org.apache.asterix.external.feed.watch.FeedConnectJobInfo;
-import org.apache.asterix.external.operators.FeedMessageOperatorDescriptor;
-import org.apache.asterix.external.util.FeedConstants;
-import org.apache.asterix.external.util.FeedUtils;
-import org.apache.asterix.external.util.FeedUtils.FeedRuntimeType;
-import org.apache.asterix.file.JobSpecificationUtils;
-import org.apache.asterix.metadata.declared.MetadataProvider;
-import org.apache.asterix.metadata.entities.Feed;
-import org.apache.asterix.runtime.util.ClusterStateManager;
-import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
-import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
-import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.common.utils.Pair;
-import org.apache.hyracks.algebricks.common.utils.Triple;
-import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
-import org.apache.hyracks.api.io.FileSplit;
-import org.apache.hyracks.api.job.JobSpecification;
-import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
-import org.apache.hyracks.dataflow.std.file.FileRemoveOperatorDescriptor;
-import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
-import org.apache.hyracks.dataflow.std.misc.NullSinkOperatorDescriptor;
-
-/**
- * Provides helper method(s) for creating JobSpec for operations on a feed.
- */
-public class FeedOperations {
-
-    /**
-     * Builds the job spec for ingesting a (primary) feed from its external 
source via the feed adaptor.
-     *
-     * @param primaryFeed
-     * @param metadataProvider
-     * @return JobSpecification the Hyracks job specification for receiving 
data from external source
-     * @throws Exception
-     */
-    public static Pair<JobSpecification, IAdapterFactory> 
buildFeedIntakeJobSpec(Feed primaryFeed,
-            MetadataProvider metadataProvider, FeedPolicyAccessor 
policyAccessor) throws Exception {
-        JobSpecification spec = JobSpecificationUtils.createJobSpecification();
-        spec.setFrameSize(FeedConstants.JobConstants.DEFAULT_FRAME_SIZE);
-        IAdapterFactory adapterFactory = null;
-        IOperatorDescriptor feedIngestor;
-        AlgebricksPartitionConstraint ingesterPc;
-        Triple<IOperatorDescriptor, AlgebricksPartitionConstraint, 
IAdapterFactory> t =
-                metadataProvider.buildFeedIntakeRuntime(spec, primaryFeed, 
policyAccessor);
-        feedIngestor = t.first;
-        ingesterPc = t.second;
-        adapterFactory = t.third;
-        
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, 
feedIngestor, ingesterPc);
-        NullSinkOperatorDescriptor nullSink = new 
NullSinkOperatorDescriptor(spec);
-        
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, 
nullSink, ingesterPc);
-        spec.connect(new OneToOneConnectorDescriptor(spec), feedIngestor, 0, 
nullSink, 0);
-        spec.addRoot(nullSink);
-        return new Pair<>(spec, adapterFactory);
-    }
-
-    /**
-     * Builds the job spec for sending message to an active feed to disconnect 
it from the
-     * its source.
-     */
-    public static Pair<JobSpecification, Boolean> 
buildDisconnectFeedJobSpec(MetadataProvider metadataProvider,
-            FeedConnectionId connectionId) throws AsterixException, 
AlgebricksException {
-
-        JobSpecification spec = JobSpecificationUtils.createJobSpecification();
-        IOperatorDescriptor feedMessenger;
-        AlgebricksPartitionConstraint messengerPc;
-        List<String> locations = null;
-        FeedRuntimeType sourceRuntimeType;
-        try {
-            FeedEventsListener listener = (FeedEventsListener) 
ActiveJobNotificationHandler.INSTANCE
-                    .getActiveEntityListener(connectionId.getFeedId());
-            FeedConnectJobInfo cInfo = 
listener.getFeedConnectJobInfo(connectionId);
-            IFeedJoint sourceFeedJoint = cInfo.getSourceFeedJoint();
-            IFeedJoint computeFeedJoint = cInfo.getComputeFeedJoint();
-
-            boolean terminateIntakeJob = false;
-            boolean completeDisconnect = computeFeedJoint == null || 
computeFeedJoint.getReceivers().isEmpty();
-            if (completeDisconnect) {
-                sourceRuntimeType = FeedRuntimeType.INTAKE;
-                locations = cInfo.getCollectLocations();
-                terminateIntakeJob = sourceFeedJoint.getReceivers().size() == 
1;
-            } else {
-                locations = cInfo.getComputeLocations();
-                sourceRuntimeType = FeedRuntimeType.COMPUTE;
-            }
-
-            Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> p = 
buildDisconnectFeedMessengerRuntime(spec,
-                    connectionId, locations, sourceRuntimeType, 
completeDisconnect, sourceFeedJoint.getOwnerFeedId());
-
-            feedMessenger = p.first;
-            messengerPc = p.second;
-
-            
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, 
feedMessenger, messengerPc);
-            NullSinkOperatorDescriptor nullSink = new 
NullSinkOperatorDescriptor(spec);
-            
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, 
nullSink, messengerPc);
-            spec.connect(new OneToOneConnectorDescriptor(spec), feedMessenger, 
0, nullSink, 0);
-            spec.addRoot(nullSink);
-            return new Pair<>(spec, terminateIntakeJob);
-
-        } catch (AlgebricksException e) {
-            throw new AsterixException(e);
-        }
-
-    }
-
-    private static Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> 
buildSendFeedMessageRuntime(
-            JobSpecification jobSpec, FeedConnectionId feedConenctionId, 
IActiveMessage feedMessage,
-            Collection<String> locations) throws AlgebricksException {
-        AlgebricksPartitionConstraint partitionConstraint =
-                new 
AlgebricksAbsolutePartitionConstraint(locations.toArray(new String[] {}));
-        FeedMessageOperatorDescriptor feedMessenger =
-                new FeedMessageOperatorDescriptor(jobSpec, feedConenctionId, 
feedMessage);
-        return new Pair<>(feedMessenger, partitionConstraint);
-    }
-
-    private static Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> 
buildDisconnectFeedMessengerRuntime(
-            JobSpecification jobSpec, FeedConnectionId feedConenctionId, 
List<String> locations,
-            FeedRuntimeType sourceFeedRuntimeType, boolean 
completeDisconnection, EntityId sourceFeedId)
-            throws AlgebricksException {
-        IActiveMessage feedMessage = new EndFeedMessage(feedConenctionId, 
sourceFeedRuntimeType, sourceFeedId,
-                completeDisconnection, 
EndFeedMessage.EndMessageType.DISCONNECT_FEED);
-        return buildSendFeedMessageRuntime(jobSpec, feedConenctionId, 
feedMessage, locations);
-    }
-
-    public static JobSpecification buildRemoveFeedStorageJob(Feed feed) throws 
Exception {
-        JobSpecification spec = JobSpecificationUtils.createJobSpecification();
-        AlgebricksAbsolutePartitionConstraint allCluster = 
ClusterStateManager.INSTANCE.getClusterLocations();
-        Set<String> nodes = new TreeSet<>();
-        for (String node : allCluster.getLocations()) {
-            nodes.add(node);
-        }
-        AlgebricksAbsolutePartitionConstraint locations =
-                new AlgebricksAbsolutePartitionConstraint(nodes.toArray(new 
String[nodes.size()]));
-        FileSplit[] feedLogFileSplits =
-                FeedUtils.splitsForAdapter(feed.getDataverseName(), 
feed.getFeedName(), locations);
-        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> 
splitsAndConstraint =
-                
StoragePathUtil.splitProviderAndPartitionConstraints(feedLogFileSplits);
-        FileRemoveOperatorDescriptor frod = new 
FileRemoveOperatorDescriptor(spec, splitsAndConstraint.first, true);
-        
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, frod, 
splitsAndConstraint.second);
-        spec.addRoot(frod);
-        return spec;
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedWorkCollection.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedWorkCollection.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedWorkCollection.java
index 6be7af9..ec7c239 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedWorkCollection.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedWorkCollection.java
@@ -24,17 +24,19 @@ import java.util.List;
 
 import org.apache.asterix.app.translator.DefaultStatementExecutorFactory;
 import org.apache.asterix.app.translator.QueryTranslator;
+import org.apache.asterix.common.context.IStorageComponentProvider;
 import org.apache.asterix.compiler.provider.AqlCompilationProvider;
 import org.apache.asterix.compiler.provider.ILangCompilationProvider;
 import org.apache.asterix.external.feed.api.IFeedWork;
 import org.apache.asterix.external.feed.api.IFeedWorkEventListener;
 import org.apache.asterix.external.feed.management.FeedConnectionRequest;
 import 
org.apache.asterix.external.feed.management.FeedConnectionRequest.ConnectionStatus;
+import org.apache.asterix.file.StorageComponentProvider;
 import org.apache.asterix.lang.aql.statement.SubscribeFeedStatement;
 import org.apache.asterix.lang.common.base.Statement;
 import org.apache.asterix.lang.common.statement.DataverseDecl;
 import org.apache.asterix.lang.common.struct.Identifier;
-import org.apache.asterix.runtime.util.AppContextInfo;
+import org.apache.asterix.runtime.utils.AppContextInfo;
 import org.apache.asterix.translator.IStatementExecutor;
 import org.apache.asterix.translator.SessionConfig;
 import org.apache.asterix.translator.SessionConfig.OutputFormat;
@@ -48,6 +50,7 @@ public class FeedWorkCollection {
 
     private static Logger LOGGER = 
Logger.getLogger(FeedWorkCollection.class.getName());
     private static final ILangCompilationProvider compilationProvider = new 
AqlCompilationProvider();
+    private static final IStorageComponentProvider storageComponentProvider = 
new StorageComponentProvider();
 
     /**
      * The task of subscribing to a feed to obtain data.
@@ -91,7 +94,8 @@ public class FeedWorkCollection {
                     List<Statement> statements = new ArrayList<>();
                     statements.add(dataverseDecl);
                     statements.add(subscribeStmt);
-                    IStatementExecutor translator = 
qtFactory.create(statements, pc, compilationProvider);
+                    IStatementExecutor translator = 
qtFactory.create(statements, pc, compilationProvider,
+                            storageComponentProvider);
                     
translator.compileAndExecute(AppContextInfo.INSTANCE.getHcc(), null,
                             QueryTranslator.ResultDelivery.IMMEDIATE);
                     if (LOGGER.isEnabledFor(Level.INFO)) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index b1ca062..b114e8c 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -29,26 +29,26 @@ import java.util.logging.Logger;
 
 import org.apache.asterix.active.ActiveManager;
 import org.apache.asterix.api.common.AppRuntimeContextProviderForRecovery;
-import org.apache.asterix.common.api.ThreadExecutor;
 import org.apache.asterix.common.api.IAppRuntimeContext;
 import org.apache.asterix.common.api.IDatasetLifecycleManager;
+import org.apache.asterix.common.api.ThreadExecutor;
 import org.apache.asterix.common.cluster.ClusterPartition;
+import org.apache.asterix.common.config.AsterixExtension;
 import org.apache.asterix.common.config.BuildProperties;
+import org.apache.asterix.common.config.ClusterProperties;
 import org.apache.asterix.common.config.CompilerProperties;
-import org.apache.asterix.common.config.AsterixExtension;
 import org.apache.asterix.common.config.ExtensionProperties;
 import org.apache.asterix.common.config.ExternalProperties;
 import org.apache.asterix.common.config.FeedProperties;
+import org.apache.asterix.common.config.MessagingProperties;
 import org.apache.asterix.common.config.MetadataProperties;
 import org.apache.asterix.common.config.PropertiesAccessor;
 import org.apache.asterix.common.config.ReplicationProperties;
 import org.apache.asterix.common.config.StorageProperties;
 import org.apache.asterix.common.config.TransactionProperties;
-import org.apache.asterix.common.config.ClusterProperties;
-import org.apache.asterix.common.config.IPropertiesProvider;
-import org.apache.asterix.common.config.MessagingProperties;
-import org.apache.asterix.common.context.FileMapManager;
 import org.apache.asterix.common.context.DatasetLifecycleManager;
+import org.apache.asterix.common.context.FileMapManager;
+import org.apache.asterix.common.context.IStorageComponentProvider;
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.library.ILibraryManager;
@@ -62,6 +62,7 @@ import 
org.apache.asterix.common.transactions.IRecoveryManager;
 import org.apache.asterix.common.transactions.IRecoveryManager.SystemState;
 import org.apache.asterix.common.transactions.ITransactionSubsystem;
 import org.apache.asterix.external.library.ExternalLibraryManager;
+import org.apache.asterix.file.StorageComponentProvider;
 import org.apache.asterix.metadata.MetadataManager;
 import org.apache.asterix.metadata.MetadataNode;
 import org.apache.asterix.metadata.api.IAsterixStateProxy;
@@ -74,7 +75,6 @@ import 
org.apache.asterix.replication.storage.ReplicaResourcesManager;
 import org.apache.asterix.runtime.transaction.GlobalResourceIdFactoryProvider;
 import 
org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
 import 
org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepositoryFactory;
-import 
org.apache.asterix.transaction.management.service.transaction.TransactionSubsystem;
 import org.apache.hyracks.api.application.INCApplicationContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.IIOManager;
@@ -99,13 +99,12 @@ import 
org.apache.hyracks.storage.common.file.ILocalResourceRepository;
 import org.apache.hyracks.storage.common.file.ILocalResourceRepositoryFactory;
 import org.apache.hyracks.storage.common.file.IResourceIdFactory;
 
-public class NCAppRuntimeContext implements IAppRuntimeContext, 
IPropertiesProvider {
+public class NCAppRuntimeContext implements IAppRuntimeContext {
     private static final Logger LOGGER = 
Logger.getLogger(NCAppRuntimeContext.class.getName());
 
     private ILSMMergePolicyFactory metadataMergePolicyFactory;
     private final INCApplicationContext ncApplicationContext;
     private final IResourceIdFactory resourceIdFactory;
-
     private CompilerProperties compilerProperties;
     private ExternalProperties externalProperties;
     private MetadataProperties metadataProperties;
@@ -115,7 +114,6 @@ public class NCAppRuntimeContext implements 
IAppRuntimeContext, IPropertiesProvi
     private BuildProperties buildProperties;
     private ReplicationProperties replicationProperties;
     private MessagingProperties messagingProperties;
-
     private ThreadExecutor threadExecutor;
     private IDatasetLifecycleManager datasetLifecycleManager;
     private IFileMapManager fileMapManager;
@@ -136,14 +134,14 @@ public class NCAppRuntimeContext implements 
IAppRuntimeContext, IPropertiesProvi
 
     private final ILibraryManager libraryManager;
     private final NCExtensionManager ncExtensionManager;
+    private final IStorageComponentProvider componentProvider;
 
     public NCAppRuntimeContext(INCApplicationContext ncApplicationContext, 
List<AsterixExtension> extensions)
-            throws AsterixException, InstantiationException, 
IllegalAccessException,
-            ClassNotFoundException, IOException {
+            throws AsterixException, InstantiationException, 
IllegalAccessException, ClassNotFoundException,
+            IOException {
         List<AsterixExtension> allExtensions = new ArrayList<>();
         this.ncApplicationContext = ncApplicationContext;
-        PropertiesAccessor propertiesAccessor =
-                
PropertiesAccessor.getInstance(ncApplicationContext.getAppConfig());
+        PropertiesAccessor propertiesAccessor = 
PropertiesAccessor.getInstance(ncApplicationContext.getAppConfig());
         compilerProperties = new CompilerProperties(propertiesAccessor);
         externalProperties = new ExternalProperties(propertiesAccessor);
         metadataProperties = new MetadataProperties(propertiesAccessor);
@@ -159,6 +157,7 @@ public class NCAppRuntimeContext implements 
IAppRuntimeContext, IPropertiesProvi
         }
         allExtensions.addAll(new 
ExtensionProperties(propertiesAccessor).getExtensions());
         ncExtensionManager = new NCExtensionManager(allExtensions);
+        componentProvider = new StorageComponentProvider();
         resourceIdFactory = new 
GlobalResourceIdFactoryProvider(ncApplicationContext).createResourceIdFactory();
     }
 
@@ -181,16 +180,15 @@ public class NCAppRuntimeContext implements 
IAppRuntimeContext, IPropertiesProvi
         metadataMergePolicyFactory = new PrefixMergePolicyFactory();
 
         ILocalResourceRepositoryFactory 
persistentLocalResourceRepositoryFactory =
-                new PersistentLocalResourceRepositoryFactory(
-                        ioManager, ncApplicationContext.getNodeId(), 
metadataProperties);
+                new PersistentLocalResourceRepositoryFactory(ioManager, 
ncApplicationContext.getNodeId(),
+                        metadataProperties);
 
-        localResourceRepository = (PersistentLocalResourceRepository) 
persistentLocalResourceRepositoryFactory
-                .createRepository();
+        localResourceRepository =
+                (PersistentLocalResourceRepository) 
persistentLocalResourceRepositoryFactory.createRepository();
 
-        IAppRuntimeContextProvider asterixAppRuntimeContextProvider =
-                new AppRuntimeContextProviderForRecovery(this);
-        txnSubsystem = new 
TransactionSubsystem(ncApplicationContext.getNodeId(), 
asterixAppRuntimeContextProvider,
-                txnProperties);
+        IAppRuntimeContextProvider asterixAppRuntimeContextProvider = new 
AppRuntimeContextProviderForRecovery(this);
+        txnSubsystem = new TransactionSubsystem(ncApplicationContext, 
ncApplicationContext.getNodeId(),
+                asterixAppRuntimeContextProvider, txnProperties);
 
         IRecoveryManager recoveryMgr = txnSubsystem.getRecoveryManager();
         SystemState systemState = recoveryMgr.getSystemState();
@@ -448,9 +446,9 @@ public class NCAppRuntimeContext implements 
IAppRuntimeContext, IPropertiesProvi
         // This way we can delay the registration of the metadataNode until
         // it is completely initialized.
         MetadataManager.initialize(proxy, MetadataNode.INSTANCE);
-        MetadataBootstrap.startUniverse(this, ncApplicationContext, 
newUniverse);
+        MetadataBootstrap.startUniverse(ncApplicationContext, newUniverse);
         MetadataBootstrap.startDDLRecovery();
-        ncExtensionManager.initializeMetadata();
+        ncExtensionManager.initializeMetadata(ncApplicationContext);
 
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("Metadata node bound");
@@ -473,4 +471,9 @@ public class NCAppRuntimeContext implements 
IAppRuntimeContext, IPropertiesProvi
         return ncExtensionManager;
     }
 
+    @Override
+    public IStorageComponentProvider getStorageComponentProvider() {
+        return componentProvider;
+    }
+
 }

Reply via email to