http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
index e684285..4ebf055 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
@@ -19,58 +19,116 @@
 
 package org.apache.asterix.metadata.entities;
 
+import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
 
+import org.apache.asterix.active.ActiveJobNotificationHandler;
+import org.apache.asterix.active.IActiveEntityEventsListener;
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
+import org.apache.asterix.common.context.IStorageComponentProvider;
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import 
org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory;
+import 
org.apache.asterix.common.ioopcallbacks.LSMBTreeWithBuddyIOOperationCallbackFactory;
+import 
org.apache.asterix.common.ioopcallbacks.LSMInvertedIndexIOOperationCallbackFactory;
+import 
org.apache.asterix.common.ioopcallbacks.LSMRTreeIOOperationCallbackFactory;
+import org.apache.asterix.common.transactions.IRecoveryManager.ResourceType;
+import org.apache.asterix.common.transactions.JobId;
+import org.apache.asterix.common.utils.JobUtils;
+import org.apache.asterix.common.utils.JobUtils.ProgressState;
+import org.apache.asterix.external.feed.management.FeedConnectionId;
 import org.apache.asterix.metadata.IDatasetDetails;
 import org.apache.asterix.metadata.MetadataCache;
+import org.apache.asterix.metadata.MetadataManager;
+import org.apache.asterix.metadata.MetadataTransactionContext;
 import org.apache.asterix.metadata.api.IMetadataEntity;
+import org.apache.asterix.metadata.declared.BTreeDataflowHelperFactoryProvider;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.metadata.utils.DatasetUtil;
+import org.apache.asterix.metadata.utils.ExternalDatasetsRegistry;
+import org.apache.asterix.metadata.utils.ExternalIndexingOperations;
+import org.apache.asterix.metadata.utils.IndexUtil;
+import 
org.apache.asterix.metadata.utils.InvertedIndexDataflowHelperFactoryProvider;
+import org.apache.asterix.metadata.utils.MetadataConstants;
+import org.apache.asterix.metadata.utils.MetadataUtil;
+import org.apache.asterix.metadata.utils.RTreeDataflowHelperFactoryProvider;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.utils.RecordUtil;
+import 
org.apache.asterix.transaction.management.opcallbacks.LockThenSearchOperationCallbackFactory;
+import 
org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexInstantSearchOperationCallbackFactory;
+import 
org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexModificationOperationCallbackFactory;
+import 
org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexOperationTrackerFactory;
+import 
org.apache.asterix.transaction.management.opcallbacks.SecondaryIndexModificationOperationCallbackFactory;
+import 
org.apache.asterix.transaction.management.opcallbacks.SecondaryIndexOperationTrackerFactory;
+import 
org.apache.asterix.transaction.management.opcallbacks.SecondaryIndexSearchOperationCallbackFactory;
+import 
org.apache.asterix.transaction.management.opcallbacks.TempDatasetSecondaryIndexModificationOperationCallbackFactory;
+import 
org.apache.asterix.transaction.management.opcallbacks.UpsertOperationCallbackFactory;
+import org.apache.commons.lang3.mutable.MutableBoolean;
+import org.apache.commons.lang3.mutable.MutableObject;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
-import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.ITypeTraits;
+import org.apache.hyracks.api.job.JobSpecification;
+import 
org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
+import 
org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
+import 
org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
+import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
+import 
org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
+import 
org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerFactory;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
 
 /**
  * Metadata describing a dataset.
  */
 public class Dataset implements IMetadataEntity<Dataset> {
 
-    /**
-     * Dataset related operations
+    /*
+     * Constants
      */
-    public static final byte OP_READ = 0x00;
-    public static final byte OP_INSERT = 0x01;
-    public static final byte OP_DELETE = 0x02;
-    public static final byte OP_UPSERT = 0x03;
-
     private static final long serialVersionUID = 1L;
+    private static final Logger LOGGER = 
Logger.getLogger(Dataset.class.getName());
+    //TODO: Remove Singletons
+    private static final BTreeDataflowHelperFactoryProvider 
bTreeDataflowHelperFactoryProvider =
+            BTreeDataflowHelperFactoryProvider.INSTANCE;
+    private static final RTreeDataflowHelperFactoryProvider 
rTreeDataflowHelperFactoryProvider =
+            RTreeDataflowHelperFactoryProvider.INSTANCE;
+    private static final InvertedIndexDataflowHelperFactoryProvider 
invertedIndexDataflowHelperFactoryProvider =
+            InvertedIndexDataflowHelperFactoryProvider.INSTANCE;
+    /*
+     * Members
+     */
+    private final int datasetId;
     private final String dataverseName;
-    // Enforced to be unique within a dataverse.
     private final String datasetName;
-    // Dataverse of ItemType for this dataset
-    private final String itemTypeDataverseName;
-    // Type of items stored in this dataset.
-    private final String itemTypeName;
+    private final String recordTypeDataverseName;
+    private final String recordTypeName;
     private final String nodeGroupName;
-    private final String compactionPolicy;
+    private final String compactionPolicyFactory;
+    private final Map<String, String> hints;
     private final Map<String, String> compactionPolicyProperties;
     private final DatasetType datasetType;
     private final IDatasetDetails datasetDetails;
-    // Hints related to cardinatlity of dataset, avg size of tuples etc.
-    private final Map<String, String> hints;
-    private final int datasetId;
-    // Type of pending operations with respect to atomic DDL operation
+    private final String metaTypeDataverseName;
+    private final String metaTypeName;
     private int pendingOp;
 
-    // Dataverse of Meta ItemType for this dataset.
-    private final String metaItemTypeDataverseName;
-    // Type of Meta items stored in this dataset.
-    private final String metaItemTypeName;
-
-    public Dataset(String dataverseName, String datasetName, String 
itemTypeDataverseName, String itemTypeName,
+    public Dataset(String dataverseName, String datasetName, String 
recordTypeDataverseName, String recordTypeName,
             String nodeGroupName, String compactionPolicy, Map<String, String> 
compactionPolicyProperties,
             IDatasetDetails datasetDetails, Map<String, String> hints, 
DatasetType datasetType, int datasetId,
             int pendingOp) {
-        this(dataverseName, datasetName, itemTypeDataverseName, itemTypeName, 
null, null, nodeGroupName,
-                compactionPolicy, compactionPolicyProperties, datasetDetails, 
hints, datasetType, datasetId, pendingOp);
+        this(dataverseName, datasetName, recordTypeDataverseName, 
recordTypeName, /*metaTypeDataverseName*/null,
+                /*metaTypeName*/null, nodeGroupName, compactionPolicy, 
compactionPolicyProperties, datasetDetails,
+                hints, datasetType, datasetId, pendingOp);
     }
 
     public Dataset(String dataverseName, String datasetName, String 
itemTypeDataverseName, String itemTypeName,
@@ -79,12 +137,12 @@ public class Dataset implements IMetadataEntity<Dataset> {
             DatasetType datasetType, int datasetId, int pendingOp) {
         this.dataverseName = dataverseName;
         this.datasetName = datasetName;
-        this.itemTypeName = itemTypeName;
-        this.itemTypeDataverseName = itemTypeDataverseName;
-        this.metaItemTypeDataverseName = metaItemTypeDataverseName;
-        this.metaItemTypeName = metaItemTypeName;
+        this.recordTypeName = itemTypeName;
+        this.recordTypeDataverseName = itemTypeDataverseName;
+        this.metaTypeDataverseName = metaItemTypeDataverseName;
+        this.metaTypeName = metaItemTypeName;
         this.nodeGroupName = nodeGroupName;
-        this.compactionPolicy = compactionPolicy;
+        this.compactionPolicyFactory = compactionPolicy;
         this.compactionPolicyProperties = compactionPolicyProperties;
         this.datasetType = datasetType;
         this.datasetDetails = datasetDetails;
@@ -94,10 +152,10 @@ public class Dataset implements IMetadataEntity<Dataset> {
     }
 
     public Dataset(Dataset dataset) {
-        this(dataset.dataverseName, dataset.datasetName, 
dataset.itemTypeDataverseName, dataset.itemTypeName,
-                dataset.metaItemTypeDataverseName, dataset.metaItemTypeName, 
dataset.nodeGroupName,
-                dataset.compactionPolicy, dataset.compactionPolicyProperties, 
dataset.datasetDetails, dataset.hints,
-                dataset.datasetType, dataset.datasetId, dataset.pendingOp);
+        this(dataset.dataverseName, dataset.datasetName, 
dataset.recordTypeDataverseName, dataset.recordTypeName,
+                dataset.metaTypeDataverseName, dataset.metaTypeName, 
dataset.nodeGroupName,
+                dataset.compactionPolicyFactory, 
dataset.compactionPolicyProperties, dataset.datasetDetails,
+                dataset.hints, dataset.datasetType, dataset.datasetId, 
dataset.pendingOp);
     }
 
     public String getDataverseName() {
@@ -109,11 +167,11 @@ public class Dataset implements IMetadataEntity<Dataset> {
     }
 
     public String getItemTypeName() {
-        return itemTypeName;
+        return recordTypeName;
     }
 
     public String getItemTypeDataverseName() {
-        return itemTypeDataverseName;
+        return recordTypeDataverseName;
     }
 
     public String getNodeGroupName() {
@@ -121,7 +179,7 @@ public class Dataset implements IMetadataEntity<Dataset> {
     }
 
     public String getCompactionPolicy() {
-        return compactionPolicy;
+        return compactionPolicyFactory;
     }
 
     public Map<String, String> getCompactionPolicyProperties() {
@@ -149,15 +207,15 @@ public class Dataset implements IMetadataEntity<Dataset> {
     }
 
     public String getMetaItemTypeDataverseName() {
-        return metaItemTypeDataverseName;
+        return metaTypeDataverseName;
     }
 
     public String getMetaItemTypeName() {
-        return metaItemTypeName;
+        return metaTypeName;
     }
 
     public boolean hasMetaPart() {
-        return metaItemTypeDataverseName != null && metaItemTypeName != null;
+        return metaTypeDataverseName != null && metaTypeName != null;
     }
 
     public void setPendingOp(int pendingOp) {
@@ -192,12 +250,327 @@ public class Dataset implements IMetadataEntity<Dataset> 
{
         return true;
     }
 
-    public boolean allow(ILogicalOperator topOp, byte operation) {
+    public boolean allow(ILogicalOperator topOp, byte operation) {//NOSONAR: 
this method is meant to be extended
         return !hasMetaPart();
     }
 
+    /**
+     * Drop this dataset
+     *
+     * @param metadataProvider
+     *            metadata provider that can be used to get metadata info and 
runtimes
+     * @param mdTxnCtx
+     *            the transaction context
+     * @param jobsToExecute
+     *            a list of jobs to be executed as part of the drop operation
+     * @param bActiveTxn
+     *            whether the metadata transaction is ongoing
+     * @param progress
+     *            a mutable progress state used for error handling during the 
drop operation
+     * @param hcc
+     *            a client connection to hyracks master for job execution
+     * @throws Exception
+     *             if an error occur during the drop process or if the dataset 
can't be dropped for any reason
+     */
+    public void drop(MetadataProvider metadataProvider, 
MutableObject<MetadataTransactionContext> mdTxnCtx,
+            List<JobSpecification> jobsToExecute, MutableBoolean bActiveTxn, 
MutableObject<ProgressState> progress,
+            IHyracksClientConnection hcc) throws Exception {
+        Map<FeedConnectionId, Pair<JobSpecification, Boolean>> 
disconnectJobList = new HashMap<>();
+        if (getDatasetType() == DatasetType.INTERNAL) {
+            // prepare job spec(s) that would disconnect any active feeds 
involving the dataset.
+            IActiveEntityEventsListener[] activeListeners = 
ActiveJobNotificationHandler.INSTANCE.getEventListeners();
+            for (IActiveEntityEventsListener listener : activeListeners) {
+                if (listener.isEntityUsingDataset(dataverseName, datasetName)) 
{
+                    throw new 
CompilationException(ErrorCode.COMPILATION_CANT_DROP_ACTIVE_DATASET,
+                            RecordUtil.toFullyQualifiedName(dataverseName, 
datasetName),
+                            listener.getEntityId().toString());
+                }
+            }
+            // #. prepare jobs to drop the datatset and the indexes in NC
+            List<Index> indexes =
+                    
MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx.getValue(), dataverseName, 
datasetName);
+            for (int j = 0; j < indexes.size(); j++) {
+                if (indexes.get(j).isSecondaryIndex()) {
+                    jobsToExecute.add(IndexUtil.dropJob(indexes.get(j), 
metadataProvider, this));
+                }
+            }
+            Index primaryIndex =
+                    MetadataManager.INSTANCE.getIndex(mdTxnCtx.getValue(), 
dataverseName, datasetName, datasetName);
+            jobsToExecute.add(DatasetUtil.createDropDatasetJobSpec(this, 
primaryIndex, metadataProvider));
+            // #. mark the existing dataset as PendingDropOp
+            MetadataManager.INSTANCE.dropDataset(mdTxnCtx.getValue(), 
dataverseName, datasetName);
+            MetadataManager.INSTANCE.addDataset(mdTxnCtx.getValue(),
+                    new Dataset(dataverseName, datasetName, 
getItemTypeDataverseName(), getItemTypeName(),
+                            getMetaItemTypeDataverseName(), 
getMetaItemTypeName(), getNodeGroupName(),
+                            getCompactionPolicy(), 
getCompactionPolicyProperties(), getDatasetDetails(), getHints(),
+                            getDatasetType(), getDatasetId(), 
MetadataUtil.PENDING_DROP_OP));
+
+            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx.getValue());
+            bActiveTxn.setValue(false);
+            
progress.setValue(ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA);
+
+            // # disconnect the feeds
+            for (Pair<JobSpecification, Boolean> p : 
disconnectJobList.values()) {
+                JobUtils.runJob(hcc, p.first, true);
+            }
+
+            // #. run the jobs
+            for (JobSpecification jobSpec : jobsToExecute) {
+                JobUtils.runJob(hcc, jobSpec, true);
+            }
+
+            mdTxnCtx.setValue(MetadataManager.INSTANCE.beginTransaction());
+            bActiveTxn.setValue(true);
+            metadataProvider.setMetadataTxnContext(mdTxnCtx.getValue());
+        } else {
+            // External dataset
+            ExternalDatasetsRegistry.INSTANCE.removeDatasetInfo(this);
+            // #. prepare jobs to drop the datatset and the indexes in NC
+            List<Index> indexes =
+                    
MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx.getValue(), dataverseName, 
datasetName);
+            for (int j = 0; j < indexes.size(); j++) {
+                if (ExternalIndexingOperations.isFileIndex(indexes.get(j))) {
+                    jobsToExecute.add(IndexUtil.dropJob(indexes.get(j), 
metadataProvider, this));
+                } else {
+                    
jobsToExecute.add(DatasetUtil.buildDropFilesIndexJobSpec(metadataProvider, 
this));
+                }
+            }
+
+            // #. mark the existing dataset as PendingDropOp
+            MetadataManager.INSTANCE.dropDataset(mdTxnCtx.getValue(), 
dataverseName, datasetName);
+            MetadataManager.INSTANCE.addDataset(mdTxnCtx.getValue(),
+                    new Dataset(dataverseName, datasetName, 
getItemTypeDataverseName(), getItemTypeName(),
+                            getNodeGroupName(), getCompactionPolicy(), 
getCompactionPolicyProperties(),
+                            getDatasetDetails(), getHints(), getDatasetType(), 
getDatasetId(),
+                            MetadataUtil.PENDING_DROP_OP));
+
+            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx.getValue());
+            bActiveTxn.setValue(false);
+            
progress.setValue(ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA);
+
+            // #. run the jobs
+            for (JobSpecification jobSpec : jobsToExecute) {
+                JobUtils.runJob(hcc, jobSpec, true);
+            }
+            if (!indexes.isEmpty()) {
+                ExternalDatasetsRegistry.INSTANCE.removeDatasetInfo(this);
+            }
+            mdTxnCtx.setValue(MetadataManager.INSTANCE.beginTransaction());
+            bActiveTxn.setValue(true);
+            metadataProvider.setMetadataTxnContext(mdTxnCtx.getValue());
+        }
+
+        // #. finally, delete the dataset.
+        MetadataManager.INSTANCE.dropDataset(mdTxnCtx.getValue(), 
dataverseName, datasetName);
+        // Drop the associated nodegroup
+        String nodegroup = getNodeGroupName();
+        if 
(!nodegroup.equalsIgnoreCase(MetadataConstants.METADATA_DEFAULT_NODEGROUP_NAME))
 {
+            MetadataManager.INSTANCE.dropNodegroup(mdTxnCtx.getValue(), 
dataverseName + ":" + datasetName);
+        }
+    }
+
+    /**
+     * Create the index dataflow helper factory for a particular index on the 
dataset
+     *
+     * @param mdProvider
+     *            metadata provider to get metadata information, components, 
and runtimes
+     * @param index
+     *            the index to get the dataflow helper factory for
+     * @param recordType
+     *            the record type for the dataset
+     * @param metaType
+     *            the meta type for the dataset
+     * @param mergePolicyFactory
+     *            the merge policy factory of the dataset
+     * @param mergePolicyProperties
+     *            the merge policy properties for the dataset
+     * @return indexDataflowHelperFactory
+     *         an instance of {@link 
org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory}
+     * @throws AlgebricksException
+     *             if dataflow helper factory could not be created
+     */
+    public IIndexDataflowHelperFactory 
getIndexDataflowHelperFactory(MetadataProvider mdProvider, Index index,
+            ARecordType recordType, ARecordType metaType, 
ILSMMergePolicyFactory mergePolicyFactory,
+            Map<String, String> mergePolicyProperties) throws 
AlgebricksException {
+        ITypeTraits[] filterTypeTraits = 
DatasetUtil.computeFilterTypeTraits(this, recordType);
+        IBinaryComparatorFactory[] filterCmpFactories = 
DatasetUtil.computeFilterBinaryComparatorFactories(this,
+                recordType, 
mdProvider.getStorageComponentProvider().getComparatorFactoryProvider());
+        switch (index.getIndexType()) {
+            case BTREE:
+                return 
bTreeDataflowHelperFactoryProvider.getIndexDataflowHelperFactory(mdProvider, 
this, index,
+                        recordType, metaType, mergePolicyFactory, 
mergePolicyProperties, filterTypeTraits,
+                        filterCmpFactories);
+            case RTREE:
+                return 
rTreeDataflowHelperFactoryProvider.getIndexDataflowHelperFactory(mdProvider, 
this, index,
+                        recordType, metaType, mergePolicyFactory, 
mergePolicyProperties, filterTypeTraits,
+                        filterCmpFactories);
+            case LENGTH_PARTITIONED_NGRAM_INVIX:
+            case LENGTH_PARTITIONED_WORD_INVIX:
+            case SINGLE_PARTITION_NGRAM_INVIX:
+            case SINGLE_PARTITION_WORD_INVIX:
+                return 
invertedIndexDataflowHelperFactoryProvider.getIndexDataflowHelperFactory(mdProvider,
 this,
+                        index, recordType, metaType, mergePolicyFactory, 
mergePolicyProperties, filterTypeTraits,
+                        filterCmpFactories);
+            default:
+                throw new 
CompilationException(ErrorCode.COMPILATION_UNKNOWN_INDEX_TYPE,
+                        index.getIndexType().toString());
+        }
+    }
+
+    /**
+     * Get the IO Operation callback factory for the index which belongs to 
this dataset
+     *
+     * @param index
+     *            the index
+     * @return ioOperationCallbackFactory
+     *         an instance of {@link 
org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory}
+     *         to be used with IO operations
+     * @throws AlgebricksException
+     *             if the factory could not be created for the index/dataset 
combination
+     */
+    public ILSMIOOperationCallbackFactory getIoOperationCallbackFactory(Index 
index) throws AlgebricksException {
+        switch (index.getIndexType()) {
+            case BTREE:
+                return getDatasetType() == DatasetType.EXTERNAL
+                        && 
!index.getIndexName().equals(BTreeDataflowHelperFactoryProvider.externalFileIndexName(this))
+                                ? 
LSMBTreeWithBuddyIOOperationCallbackFactory.INSTANCE
+                                : LSMBTreeIOOperationCallbackFactory.INSTANCE;
+            case RTREE:
+                return LSMRTreeIOOperationCallbackFactory.INSTANCE;
+            case LENGTH_PARTITIONED_NGRAM_INVIX:
+            case LENGTH_PARTITIONED_WORD_INVIX:
+            case SINGLE_PARTITION_NGRAM_INVIX:
+            case SINGLE_PARTITION_WORD_INVIX:
+                return LSMInvertedIndexIOOperationCallbackFactory.INSTANCE;
+            default:
+                throw new 
CompilationException(ErrorCode.COMPILATION_UNKNOWN_INDEX_TYPE,
+                        index.getIndexType().toString());
+        }
+    }
+
+    /**
+     * get the IndexOperationTrackerFactory for a particular index on the 
dataset
+     *
+     * @param index
+     *            the index
+     * @return an instance of {@link 
org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerFactory}
+     */
+    public ILSMOperationTrackerFactory getIndexOperationTrackerFactory(Index 
index) {
+        return index.isPrimaryIndex() ? new 
PrimaryIndexOperationTrackerFactory(getDatasetId())
+                : new SecondaryIndexOperationTrackerFactory(getDatasetId());
+    }
+
+    /**
+     * Get search callback factory for this dataset with the passed index and 
operation
+     *
+     * @param index
+     *            the index
+     * @param jobId
+     *            the job id being compiled
+     * @param op
+     *            the operation this search is part of
+     * @param primaryKeyFields
+     *            the primary key fields indexes for locking purposes
+     * @return
+     *         an instance of {@link 
org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory}
+     * @throws AlgebricksException
+     *             if the callback factory could not be created
+     */
+    public ISearchOperationCallbackFactory 
getSearchCallbackFactory(IStorageComponentProvider storageComponentProvider,
+            Index index, JobId jobId, IndexOperation op, int[] 
primaryKeyFields) throws AlgebricksException {
+        if (getDatasetDetails().isTemp()) {
+            return NoOpOperationCallbackFactory.INSTANCE;
+        } else if (index.isPrimaryIndex()) {
+            /**
+             * Due to the read-committed isolation level,
+             * we may acquire very short duration lock(i.e., instant lock) for 
readers.
+             */
+            return (op == IndexOperation.UPSERT)
+                    ? new LockThenSearchOperationCallbackFactory(jobId, 
getDatasetId(), primaryKeyFields,
+                            
storageComponentProvider.getTransactionSubsystemProvider(), 
ResourceType.LSM_BTREE)
+                    : new 
PrimaryIndexInstantSearchOperationCallbackFactory(jobId, getDatasetId(), 
primaryKeyFields,
+                            
storageComponentProvider.getTransactionSubsystemProvider(), 
ResourceType.LSM_BTREE);
+        }
+        return new SecondaryIndexSearchOperationCallbackFactory();
+    }
+
+    /**
+     * Get the modification callback factory associated with this dataset, the 
passed index, and operation.
+     *
+     * @param index
+     *            the index
+     * @param jobId
+     *            the job id of the job being compiled
+     * @param op
+     *            the operation performed for this callback
+     * @param primaryKeyFields
+     *            the indexes of the primary keys (used for lock operations)
+     * @return
+     *         an instance of {@link 
org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory}
+     * @throws AlgebricksException
+     *             If the callback factory could not be created
+     */
+    public IModificationOperationCallbackFactory 
getModificationCallbackFactory(
+            IStorageComponentProvider componentProvider, Index index, JobId 
jobId, IndexOperation op,
+            int[] primaryKeyFields) throws AlgebricksException {
+        if (getDatasetDetails().isTemp()) {
+            return new 
TempDatasetSecondaryIndexModificationOperationCallbackFactory(jobId, 
getDatasetId(),
+                    primaryKeyFields, 
componentProvider.getTransactionSubsystemProvider(), op, index.resourceType());
+        } else if (index.isPrimaryIndex()) {
+            return op == IndexOperation.UPSERT
+                    ? new UpsertOperationCallbackFactory(jobId, 
getDatasetId(), primaryKeyFields,
+                            
componentProvider.getTransactionSubsystemProvider(), op, index.resourceType(),
+                            hasMetaPart())
+                    : op == IndexOperation.DELETE || op == 
IndexOperation.INSERT
+                            ? new 
PrimaryIndexModificationOperationCallbackFactory(jobId, getDatasetId(),
+                                    primaryKeyFields, 
componentProvider.getTransactionSubsystemProvider(), op,
+                                    index.resourceType(), hasMetaPart())
+                            : NoOpOperationCallbackFactory.INSTANCE;
+        } else {
+            return op == IndexOperation.DELETE || op == IndexOperation.INSERT 
|| op == IndexOperation.UPSERT
+                    ? new 
SecondaryIndexModificationOperationCallbackFactory(jobId, getDatasetId(), 
primaryKeyFields,
+                            
componentProvider.getTransactionSubsystemProvider(), op, index.resourceType(),
+                            hasMetaPart())
+                    : NoOpOperationCallbackFactory.INSTANCE;
+        }
+    }
+
     @Override
     public String toString() {
-        return dataverseName + "." + datasetName;
+        ObjectMapper mapper = new ObjectMapper();
+        try {
+            return mapper.writeValueAsString(toMap());
+        } catch (JsonProcessingException e) {
+            LOGGER.log(Level.WARNING, "Unable to convert map to json String", 
e);
+            return dataverseName + "." + datasetName;
+        }
+    }
+
+    public Map<String, Object> toMap() {
+        Map<String, Object> tree = new HashMap<>();
+        tree.put("datasetId", Integer.toString(datasetId));
+        tree.put("dataverseName", dataverseName);
+        tree.put("datasetName", datasetName);
+        tree.put("recordTypeDataverseName", recordTypeDataverseName);
+        tree.put("recordTypeName", recordTypeName);
+        tree.put("nodeGroupName", nodeGroupName);
+        tree.put("compactionPolicyFactory", compactionPolicyFactory);
+        tree.put("hints", hints);
+        tree.put("compactionPolicyProperties", compactionPolicyProperties);
+        tree.put("datasetType", datasetType.name());
+        tree.put("datasetDetails", datasetDetails.toString());
+        tree.put("metaTypeDataverseName", metaTypeDataverseName);
+        tree.put("metaTypeName", metaTypeName);
+        tree.put("pendingOp", MetadataUtil.pendingOpToString(pendingOp));
+        return tree;
+    }
+
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + ((datasetName == null) ? 0 : 
datasetName.hashCode());
+        result = prime * result + ((dataverseName == null) ? 0 : 
dataverseName.hashCode());
+        return result;
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/ExternalDatasetDetails.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/ExternalDatasetDetails.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/ExternalDatasetDetails.java
index cdc982f..b9b4cd9 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/ExternalDatasetDetails.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/ExternalDatasetDetails.java
@@ -21,17 +21,20 @@ package org.apache.asterix.metadata.entities;
 
 import java.io.DataOutput;
 import java.util.Date;
+import java.util.HashMap;
 import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
 
 import org.apache.asterix.builders.IARecordBuilder;
 import org.apache.asterix.builders.OrderedListBuilder;
 import org.apache.asterix.builders.RecordBuilder;
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
-import 
org.apache.asterix.common.config.DatasetConfig.ExternalDatasetTransactionState;
+import org.apache.asterix.common.config.DatasetConfig.TransactionState;
 import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
 import org.apache.asterix.metadata.IDatasetDetails;
 import org.apache.asterix.metadata.bootstrap.MetadataRecordTypes;
-import org.apache.asterix.metadata.utils.DatasetUtils;
+import org.apache.asterix.metadata.utils.DatasetUtil;
 import org.apache.asterix.om.base.ADateTime;
 import org.apache.asterix.om.base.AInt32;
 import org.apache.asterix.om.base.AMutableString;
@@ -42,17 +45,21 @@ import 
org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
 public class ExternalDatasetDetails implements IDatasetDetails {
 
     private static final long serialVersionUID = 1L;
+    private static final Logger LOGGER = 
Logger.getLogger(ExternalDatasetDetails.class.getName());
     private final String adapter;
     private final Map<String, String> properties;
     private final long addToCacheTime;
     private Date lastRefreshTime;
-    private ExternalDatasetTransactionState state;
+    private TransactionState state;
 
     public ExternalDatasetDetails(String adapter, Map<String, String> 
properties, Date lastRefreshTime,
-            ExternalDatasetTransactionState state) {
+            TransactionState state) {
         this.properties = properties;
         this.adapter = adapter;
         this.addToCacheTime = System.currentTimeMillis();
@@ -103,7 +110,7 @@ public class ExternalDatasetDetails implements 
IDatasetDetails {
             String name = property.getKey();
             String value = property.getValue();
             itemValue.reset();
-            DatasetUtils.writePropertyTypeRecord(name, value, 
itemValue.getDataOutput(),
+            DatasetUtil.writePropertyTypeRecord(name, value, 
itemValue.getDataOutput(),
                     
MetadataRecordTypes.DATASOURCE_ADAPTER_PROPERTIES_RECORDTYPE);
             listBuilder.addItem(itemValue);
         }
@@ -143,11 +150,31 @@ public class ExternalDatasetDetails implements 
IDatasetDetails {
         this.lastRefreshTime = timestamp;
     }
 
-    public ExternalDatasetTransactionState getState() {
+    public TransactionState getState() {
         return state;
     }
 
-    public void setState(ExternalDatasetTransactionState state) {
+    public void setState(TransactionState state) {
         this.state = state;
     }
+
+    @Override
+    public String toString() {
+        ObjectMapper mapper = new ObjectMapper();
+        try {
+            return mapper.writeValueAsString(toMap());
+        } catch (JsonProcessingException e) {
+            LOGGER.log(Level.WARNING, "Unable to convert map to json String", 
e);
+            return getClass().getSimpleName();
+        }
+    }
+
+    private Map<String, Object> toMap() {
+        Map<String, Object> map = new HashMap<>();
+        map.put("adapter", adapter);
+        map.put("properties", properties);
+        map.put("lastRefreshTime", lastRefreshTime.toString());
+        map.put("state", state.name());
+        return map;
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Index.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Index.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Index.java
index f33a2b6..df47c70 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Index.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Index.java
@@ -22,13 +22,16 @@ package org.apache.asterix.metadata.entities;
 import java.util.List;
 
 import org.apache.asterix.common.config.DatasetConfig.IndexType;
-import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.transactions.IRecoveryManager.ResourceType;
 import org.apache.asterix.metadata.MetadataCache;
 import org.apache.asterix.metadata.api.IMetadataEntity;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.AUnionType;
 import org.apache.asterix.om.types.IAType;
-import org.apache.asterix.om.util.NonTaggedFormatUtil;
+import org.apache.asterix.om.utils.NonTaggedFormatUtil;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.utils.Pair;
 
 /**
@@ -131,7 +134,7 @@ public class Index implements IMetadataEntity<Index>, 
Comparable<Index> {
         return !isPrimaryIndex();
     }
 
-    public static Pair<IAType, Boolean> getNonNullableType(IAType keyType) 
throws AsterixException {
+    public static Pair<IAType, Boolean> getNonNullableType(IAType keyType) {
         boolean nullable = false;
         IAType actualKeyType = keyType;
         if (NonTaggedFormatUtil.isOptional(keyType)) {
@@ -142,7 +145,7 @@ public class Index implements IMetadataEntity<Index>, 
Comparable<Index> {
     }
 
     public static Pair<IAType, Boolean> getNonNullableOpenFieldType(IAType 
fieldType, List<String> fieldName,
-            ARecordType recType) throws AsterixException {
+            ARecordType recType) throws AlgebricksException {
         Pair<IAType, Boolean> keyPairType = null;
         IAType subType = recType;
         for (int i = 0; i < fieldName.size(); i++) {
@@ -159,12 +162,12 @@ public class Index implements IMetadataEntity<Index>, 
Comparable<Index> {
     }
 
     public static Pair<IAType, Boolean> 
getNonNullableKeyFieldType(List<String> expr, ARecordType recType)
-            throws AsterixException {
+            throws AlgebricksException {
         IAType keyType = Index.keyFieldType(expr, recType);
         return getNonNullableType(keyType);
     }
 
-    private static IAType keyFieldType(List<String> expr, ARecordType recType) 
throws AsterixException {
+    private static IAType keyFieldType(List<String> expr, ARecordType recType) 
throws AlgebricksException {
         IAType fieldType = recType;
         fieldType = recType.getSubFieldType(expr);
         return fieldType;
@@ -251,4 +254,20 @@ public class Index implements IMetadataEntity<Index>, 
Comparable<Index> {
         }
         return false;
     }
+
+    public byte resourceType() throws CompilationException {
+        switch (indexType) {
+            case BTREE:
+                return ResourceType.LSM_BTREE;
+            case RTREE:
+                return ResourceType.LSM_RTREE;
+            case LENGTH_PARTITIONED_NGRAM_INVIX:
+            case LENGTH_PARTITIONED_WORD_INVIX:
+            case SINGLE_PARTITION_NGRAM_INVIX:
+            case SINGLE_PARTITION_WORD_INVIX:
+                return ResourceType.LSM_INVERTED_INDEX;
+            default:
+                throw new 
CompilationException(ErrorCode.COMPILATION_UNKNOWN_INDEX_TYPE, indexType);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java
index dbe4a67..138cad7 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java
@@ -37,7 +37,7 @@ import org.apache.asterix.builders.OrderedListBuilder;
 import org.apache.asterix.builders.RecordBuilder;
 import org.apache.asterix.builders.UnorderedListBuilder;
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
-import 
org.apache.asterix.common.config.DatasetConfig.ExternalDatasetTransactionState;
+import org.apache.asterix.common.config.DatasetConfig.TransactionState;
 import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
 import org.apache.asterix.metadata.IDatasetDetails;
 import org.apache.asterix.metadata.MetadataException;
@@ -48,7 +48,7 @@ import 
org.apache.asterix.metadata.entities.ExternalDatasetDetails;
 import org.apache.asterix.metadata.entities.InternalDatasetDetails;
 import 
org.apache.asterix.metadata.entities.InternalDatasetDetails.FileStructure;
 import 
org.apache.asterix.metadata.entities.InternalDatasetDetails.PartitioningStrategy;
-import org.apache.asterix.metadata.utils.DatasetUtils;
+import org.apache.asterix.metadata.utils.DatasetUtil;
 import org.apache.asterix.om.base.ABoolean;
 import org.apache.asterix.om.base.ADateTime;
 import org.apache.asterix.om.base.AInt32;
@@ -233,7 +233,7 @@ public class DatasetTupleTranslator extends 
AbstractTupleTranslator<Dataset> {
                         
.getValueByPos(MetadataRecordTypes.EXTERNAL_DETAILS_ARECORD_LAST_REFRESH_TIME_FIELD_INDEX)))
                                 .getChrononTime());
                 // State
-                ExternalDatasetTransactionState state = 
ExternalDatasetTransactionState
+                TransactionState state = TransactionState
                         .values()[((AInt32) datasetDetailsRecord.getValueByPos(
                                 
MetadataRecordTypes.EXTERNAL_DETAILS_ARECORD_TRANSACTION_STATE_FIELD_INDEX))
                                         .getIntegerValue()];
@@ -326,7 +326,7 @@ public class DatasetTupleTranslator extends 
AbstractTupleTranslator<Dataset> {
                 String name = property.getKey();
                 String value = property.getValue();
                 itemValue.reset();
-                DatasetUtils.writePropertyTypeRecord(name, value, 
itemValue.getDataOutput(),
+                DatasetUtil.writePropertyTypeRecord(name, value, 
itemValue.getDataOutput(),
                         
MetadataRecordTypes.COMPACTION_POLICY_PROPERTIES_RECORDTYPE);
                 listBuilder.addItem(itemValue);
             }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatatypeTupleTranslator.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatatypeTupleTranslator.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatatypeTupleTranslator.java
index fc7ef60..b81ec29 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatatypeTupleTranslator.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatatypeTupleTranslator.java
@@ -51,7 +51,7 @@ import org.apache.asterix.om.types.AUnorderedListType;
 import org.apache.asterix.om.types.AbstractCollectionType;
 import org.apache.asterix.om.types.AbstractComplexType;
 import org.apache.asterix.om.types.IAType;
-import org.apache.asterix.om.util.NonTaggedFormatUtil;
+import org.apache.asterix.om.utils.NonTaggedFormatUtil;
 import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/IndexTupleTranslator.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/IndexTupleTranslator.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/IndexTupleTranslator.java
index 70d18b4..6446e67 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/IndexTupleTranslator.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/IndexTupleTranslator.java
@@ -29,7 +29,6 @@ import java.util.List;
 
 import org.apache.asterix.builders.OrderedListBuilder;
 import org.apache.asterix.common.config.DatasetConfig.IndexType;
-import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.transactions.JobId;
 import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
 import org.apache.asterix.metadata.MetadataException;
@@ -39,7 +38,7 @@ import 
org.apache.asterix.metadata.bootstrap.MetadataRecordTypes;
 import org.apache.asterix.metadata.entities.BuiltinTypeMap;
 import org.apache.asterix.metadata.entities.Dataset;
 import org.apache.asterix.metadata.entities.Index;
-import org.apache.asterix.metadata.utils.KeyFieldTypeUtils;
+import org.apache.asterix.metadata.utils.KeyFieldTypeUtil;
 import org.apache.asterix.om.base.ABoolean;
 import org.apache.asterix.om.base.ACollectionCursor;
 import org.apache.asterix.om.base.AInt32;
@@ -53,6 +52,7 @@ import org.apache.asterix.om.types.AOrderedListType;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.BuiltinType;
 import org.apache.asterix.om.types.IAType;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
 import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
@@ -61,6 +61,7 @@ import 
org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
  * Translates an Index metadata entity to an ITupleReference and vice versa.
  */
 public class IndexTupleTranslator extends AbstractTupleTranslator<Index> {
+    private static final long serialVersionUID = 1L;
     // Field indexes of serialized Index in a tuple.
     // First key field.
     public static final int INDEX_DATAVERSENAME_TUPLE_FIELD_INDEX = 0;
@@ -82,18 +83,16 @@ public class IndexTupleTranslator extends 
AbstractTupleTranslator<Index> {
     private transient AOrderedListType int8List = new 
AOrderedListType(BuiltinType.AINT8, null);
     private transient ArrayBackedValueStorage nameValue = new 
ArrayBackedValueStorage();
     private transient ArrayBackedValueStorage itemValue = new 
ArrayBackedValueStorage();
-    private transient List<List<String>> searchKey;
-    private transient List<IAType> searchKeyType;
     private transient AMutableInt8 aInt8 = new AMutableInt8((byte) 0);
     @SuppressWarnings("unchecked")
-    private ISerializerDeserializer<AInt32> intSerde = 
SerializerDeserializerProvider.INSTANCE
-            .getSerializerDeserializer(BuiltinType.AINT32);
+    private ISerializerDeserializer<AInt32> intSerde =
+            
SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT32);
     @SuppressWarnings("unchecked")
-    private ISerializerDeserializer<AInt8> int8Serde = 
SerializerDeserializerProvider.INSTANCE
-            .getSerializerDeserializer(BuiltinType.AINT8);
+    private ISerializerDeserializer<AInt8> int8Serde =
+            
SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT8);
     @SuppressWarnings("unchecked")
-    private ISerializerDeserializer<ARecord> recordSerde = 
SerializerDeserializerProvider.INSTANCE
-            .getSerializerDeserializer(MetadataRecordTypes.INDEX_RECORDTYPE);
+    private ISerializerDeserializer<ARecord> recordSerde =
+            
SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(MetadataRecordTypes.INDEX_RECORDTYPE);
     private final MetadataNode metadataNode;
     private final JobId jobId;
 
@@ -120,14 +119,15 @@ public class IndexTupleTranslator extends 
AbstractTupleTranslator<Index> {
         IndexType indexStructure = IndexType
                 .valueOf(((AString) 
rec.getValueByPos(MetadataRecordTypes.INDEX_ARECORD_INDEXSTRUCTURE_FIELD_INDEX))
                         .getStringValue());
-        IACursor fieldNameCursor = ((AOrderedList) rec
-                
.getValueByPos(MetadataRecordTypes.INDEX_ARECORD_SEARCHKEY_FIELD_INDEX)).getCursor();
-        List<List<String>> searchKey = new ArrayList<List<String>>();
+        IACursor fieldNameCursor =
+                ((AOrderedList) 
rec.getValueByPos(MetadataRecordTypes.INDEX_ARECORD_SEARCHKEY_FIELD_INDEX))
+                        .getCursor();
+        List<List<String>> searchKey = new ArrayList<>();
         AOrderedList fieldNameList;
         while (fieldNameCursor.next()) {
             fieldNameList = (AOrderedList) fieldNameCursor.get();
             IACursor nestedFieldNameCursor = (fieldNameList.getCursor());
-            List<String> nestedFieldName = new ArrayList<String>();
+            List<String> nestedFieldName = new ArrayList<>();
             while (nestedFieldNameCursor.next()) {
                 nestedFieldName.add(((AString) 
nestedFieldNameCursor.get()).getStringValue());
             }
@@ -138,7 +138,7 @@ public class IndexTupleTranslator extends 
AbstractTupleTranslator<Index> {
         if (indexKeyTypeFieldPos > 0) {
             fieldTypeCursor = ((AOrderedList) 
rec.getValueByPos(indexKeyTypeFieldPos)).getCursor();
         }
-        List<IAType> searchKeyType = new ArrayList<IAType>(searchKey.size());
+        List<IAType> searchKeyType = new ArrayList<>(searchKey.size());
         while (fieldTypeCursor.next()) {
             String typeName = ((AString) 
fieldTypeCursor.get()).getStringValue();
             IAType fieldType = 
BuiltinTypeMap.getTypeFromTypeName(metadataNode, jobId, dvName, typeName, 
false);
@@ -150,8 +150,8 @@ public class IndexTupleTranslator extends 
AbstractTupleTranslator<Index> {
         if (isEnforcedFieldPos > 0) {
             isEnforcingKeys = ((ABoolean) 
rec.getValueByPos(isEnforcedFieldPos)).getBoolean();
         }
-        Boolean isPrimaryIndex = ((ABoolean) 
rec.getValueByPos(MetadataRecordTypes.INDEX_ARECORD_ISPRIMARY_FIELD_INDEX))
-                .getBoolean();
+        Boolean isPrimaryIndex =
+                ((ABoolean) 
rec.getValueByPos(MetadataRecordTypes.INDEX_ARECORD_ISPRIMARY_FIELD_INDEX)).getBoolean();
         int pendingOp = ((AInt32) 
rec.getValueByPos(MetadataRecordTypes.INDEX_ARECORD_PENDINGOP_FIELD_INDEX))
                 .getIntegerValue();
         // Check if there is a gram length as well.
@@ -180,8 +180,8 @@ public class IndexTupleTranslator extends 
AbstractTupleTranslator<Index> {
             Dataset dSet = metadataNode.getDataset(jobId, dvName, dsName);
             String datatypeName = dSet.getItemTypeName();
             String datatypeDataverseName = dSet.getItemTypeDataverseName();
-            ARecordType recordDt = (ARecordType) 
metadataNode.getDatatype(jobId, datatypeDataverseName, datatypeName)
-                    .getDatatype();
+            ARecordType recordDt =
+                    (ARecordType) metadataNode.getDatatype(jobId, 
datatypeDataverseName, datatypeName).getDatatype();
             String metatypeName = dSet.getMetaItemTypeName();
             String metatypeDataverseName = dSet.getMetaItemTypeDataverseName();
             ARecordType metaDt = null;
@@ -190,8 +190,8 @@ public class IndexTupleTranslator extends 
AbstractTupleTranslator<Index> {
                         .getDatatype();
             }
             try {
-                searchKeyType = KeyFieldTypeUtils.getKeyTypes(recordDt, 
metaDt, searchKey, keyFieldSourceIndicator);
-            } catch (AsterixException e) {
+                searchKeyType = KeyFieldTypeUtil.getKeyTypes(recordDt, metaDt, 
searchKey, keyFieldSourceIndicator);
+            } catch (AlgebricksException e) {
                 throw new MetadataException(e);
             }
         }
@@ -242,8 +242,8 @@ public class IndexTupleTranslator extends 
AbstractTupleTranslator<Index> {
         // write field 4
         primaryKeyListBuilder.reset((AOrderedListType) 
MetadataRecordTypes.INDEX_RECORDTYPE
                 
.getFieldTypes()[MetadataRecordTypes.INDEX_ARECORD_SEARCHKEY_FIELD_INDEX]);
-        this.searchKey = instance.getKeyFieldNames();
-        for (List<String> field : this.searchKey) {
+        List<List<String>> searchKey = instance.getKeyFieldNames();
+        for (List<String> field : searchKey) {
             listBuilder.reset(stringList);
             for (String subField : field) {
                 itemValue.reset();
@@ -293,14 +293,13 @@ public class IndexTupleTranslator extends 
AbstractTupleTranslator<Index> {
             // write optional field 9
             OrderedListBuilder typeListBuilder = new OrderedListBuilder();
             typeListBuilder.reset(new AOrderedListType(BuiltinType.ANY, null));
-            ArrayBackedValueStorage nameValue = new ArrayBackedValueStorage();
             nameValue.reset();
             aString.setValue(INDEX_SEARCHKEY_TYPE_FIELD_NAME);
 
             stringSerde.serialize(aString, nameValue.getDataOutput());
 
-            this.searchKeyType = instance.getKeyFieldTypes();
-            for (IAType type : this.searchKeyType) {
+            List<IAType> searchKeyType = instance.getKeyFieldTypes();
+            for (IAType type : searchKeyType) {
                 itemValue.reset();
                 aString.setValue(type.getTypeName());
                 stringSerde.serialize(aString, itemValue.getDataOutput());
@@ -334,7 +333,6 @@ public class IndexTupleTranslator extends 
AbstractTupleTranslator<Index> {
         }
         if (needSerialization) {
             listBuilder.reset(int8List);
-            ArrayBackedValueStorage nameValue = new ArrayBackedValueStorage();
             nameValue.reset();
             aString.setValue(INDEX_SEARCHKEY_SOURCE_INDICATOR_FIELD_NAME);
             stringSerde.serialize(aString, nameValue.getDataOutput());

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedOperations.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedOperations.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedOperations.java
new file mode 100644
index 0000000..00c9010
--- /dev/null
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedOperations.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.metadata.feeds;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.asterix.active.ActiveJobNotificationHandler;
+import org.apache.asterix.active.EntityId;
+import org.apache.asterix.active.IActiveMessage;
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.utils.StoragePathUtil;
+import org.apache.asterix.external.api.IAdapterFactory;
+import org.apache.asterix.external.feed.api.IFeedJoint;
+import org.apache.asterix.external.feed.management.FeedConnectionId;
+import org.apache.asterix.external.feed.management.FeedEventsListener;
+import org.apache.asterix.external.feed.message.EndFeedMessage;
+import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
+import org.apache.asterix.external.feed.watch.FeedConnectJobInfo;
+import org.apache.asterix.external.operators.FeedMessageOperatorDescriptor;
+import org.apache.asterix.external.util.FeedConstants;
+import org.apache.asterix.external.util.FeedUtils;
+import org.apache.asterix.external.util.FeedUtils.FeedRuntimeType;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.metadata.entities.Feed;
+import org.apache.asterix.runtime.utils.ClusterStateManager;
+import org.apache.asterix.runtime.utils.RuntimeUtils;
+import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.algebricks.common.utils.Triple;
+import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
+import org.apache.hyracks.api.io.FileSplit;
+import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import org.apache.hyracks.dataflow.std.file.FileRemoveOperatorDescriptor;
+import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
+import org.apache.hyracks.dataflow.std.misc.NullSinkOperatorDescriptor;
+
+/**
+ * Provides helper method(s) for creating JobSpec for operations on a feed.
+ */
+public class FeedOperations {
+
+    private FeedOperations() {
+    }
+
+    /**
+     * Builds the job spec for ingesting a (primary) feed from its external 
source via the feed adaptor.
+     *
+     * @param primaryFeed
+     * @param metadataProvider
+     * @return JobSpecification the Hyracks job specification for receiving 
data from external source
+     * @throws Exception
+     */
+    public static Pair<JobSpecification, IAdapterFactory> 
buildFeedIntakeJobSpec(Feed primaryFeed,
+            MetadataProvider metadataProvider, FeedPolicyAccessor 
policyAccessor) throws Exception {
+        JobSpecification spec = RuntimeUtils.createJobSpecification();
+        spec.setFrameSize(FeedConstants.JobConstants.DEFAULT_FRAME_SIZE);
+        IAdapterFactory adapterFactory;
+        IOperatorDescriptor feedIngestor;
+        AlgebricksPartitionConstraint ingesterPc;
+        Triple<IOperatorDescriptor, AlgebricksPartitionConstraint, 
IAdapterFactory> t =
+                metadataProvider.buildFeedIntakeRuntime(spec, primaryFeed, 
policyAccessor);
+        feedIngestor = t.first;
+        ingesterPc = t.second;
+        adapterFactory = t.third;
+        
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, 
feedIngestor, ingesterPc);
+        NullSinkOperatorDescriptor nullSink = new 
NullSinkOperatorDescriptor(spec);
+        
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, 
nullSink, ingesterPc);
+        spec.connect(new OneToOneConnectorDescriptor(spec), feedIngestor, 0, 
nullSink, 0);
+        spec.addRoot(nullSink);
+        return new Pair<>(spec, adapterFactory);
+    }
+
+    /**
+     * Builds the job spec for sending message to an active feed to disconnect 
it from the
+     * its source.
+     */
+    public static Pair<JobSpecification, Boolean> 
buildDisconnectFeedJobSpec(FeedConnectionId connectionId)
+            throws AlgebricksException {
+
+        JobSpecification spec = RuntimeUtils.createJobSpecification();
+        IOperatorDescriptor feedMessenger;
+        AlgebricksPartitionConstraint messengerPc;
+        List<String> locations = null;
+        FeedRuntimeType sourceRuntimeType;
+        try {
+            FeedEventsListener listener = (FeedEventsListener) 
ActiveJobNotificationHandler.INSTANCE
+                    .getActiveEntityListener(connectionId.getFeedId());
+            FeedConnectJobInfo cInfo = 
listener.getFeedConnectJobInfo(connectionId);
+            IFeedJoint sourceFeedJoint = cInfo.getSourceFeedJoint();
+            IFeedJoint computeFeedJoint = cInfo.getComputeFeedJoint();
+
+            boolean terminateIntakeJob = false;
+            boolean completeDisconnect = computeFeedJoint == null || 
computeFeedJoint.getReceivers().isEmpty();
+            if (completeDisconnect) {
+                sourceRuntimeType = FeedRuntimeType.INTAKE;
+                locations = cInfo.getCollectLocations();
+                terminateIntakeJob = sourceFeedJoint.getReceivers().size() == 
1;
+            } else {
+                locations = cInfo.getComputeLocations();
+                sourceRuntimeType = FeedRuntimeType.COMPUTE;
+            }
+
+            Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> p = 
buildDisconnectFeedMessengerRuntime(spec,
+                    connectionId, locations, sourceRuntimeType, 
completeDisconnect, sourceFeedJoint.getOwnerFeedId());
+
+            feedMessenger = p.first;
+            messengerPc = p.second;
+
+            
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, 
feedMessenger, messengerPc);
+            NullSinkOperatorDescriptor nullSink = new 
NullSinkOperatorDescriptor(spec);
+            
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, 
nullSink, messengerPc);
+            spec.connect(new OneToOneConnectorDescriptor(spec), feedMessenger, 
0, nullSink, 0);
+            spec.addRoot(nullSink);
+            return new Pair<>(spec, terminateIntakeJob);
+
+        } catch (AlgebricksException e) {
+            throw new AsterixException(e);
+        }
+
+    }
+
+    private static Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> 
buildSendFeedMessageRuntime(
+            JobSpecification jobSpec, FeedConnectionId feedConenctionId, 
IActiveMessage feedMessage,
+            Collection<String> locations) throws AlgebricksException {
+        AlgebricksPartitionConstraint partitionConstraint =
+                new 
AlgebricksAbsolutePartitionConstraint(locations.toArray(new String[] {}));
+        FeedMessageOperatorDescriptor feedMessenger =
+                new FeedMessageOperatorDescriptor(jobSpec, feedConenctionId, 
feedMessage);
+        return new Pair<>(feedMessenger, partitionConstraint);
+    }
+
+    private static Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> 
buildDisconnectFeedMessengerRuntime(
+            JobSpecification jobSpec, FeedConnectionId feedConenctionId, 
List<String> locations,
+            FeedRuntimeType sourceFeedRuntimeType, boolean 
completeDisconnection, EntityId sourceFeedId)
+            throws AlgebricksException {
+        IActiveMessage feedMessage = new EndFeedMessage(feedConenctionId, 
sourceFeedRuntimeType, sourceFeedId,
+                completeDisconnection, 
EndFeedMessage.EndMessageType.DISCONNECT_FEED);
+        return buildSendFeedMessageRuntime(jobSpec, feedConenctionId, 
feedMessage, locations);
+    }
+
+    public static JobSpecification buildRemoveFeedStorageJob(Feed feed) throws 
AsterixException {
+        JobSpecification spec = RuntimeUtils.createJobSpecification();
+        AlgebricksAbsolutePartitionConstraint allCluster = 
ClusterStateManager.INSTANCE.getClusterLocations();
+        Set<String> nodes = new TreeSet<>();
+        for (String node : allCluster.getLocations()) {
+            nodes.add(node);
+        }
+        AlgebricksAbsolutePartitionConstraint locations =
+                new AlgebricksAbsolutePartitionConstraint(nodes.toArray(new 
String[nodes.size()]));
+        FileSplit[] feedLogFileSplits =
+                FeedUtils.splitsForAdapter(feed.getDataverseName(), 
feed.getFeedName(), locations);
+        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> 
splitsAndConstraint =
+                
StoragePathUtil.splitProviderAndPartitionConstraints(feedLogFileSplits);
+        FileRemoveOperatorDescriptor frod = new 
FileRemoveOperatorDescriptor(spec, splitsAndConstraint.first, true);
+        
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, frod, 
splitsAndConstraint.second);
+        spec.addRoot(frod);
+        return spec;
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/functions/MetadataBuiltinFunctions.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/functions/MetadataBuiltinFunctions.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/functions/MetadataBuiltinFunctions.java
index 185232e..137e625 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/functions/MetadataBuiltinFunctions.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/functions/MetadataBuiltinFunctions.java
@@ -25,7 +25,7 @@ import 
org.apache.asterix.om.typecomputer.base.IResultTypeComputer;
 import org.apache.asterix.om.types.ATypeTag;
 import org.apache.asterix.om.types.BuiltinType;
 import org.apache.asterix.om.types.IAType;
-import org.apache.asterix.om.util.ConstantExpressionUtil;
+import org.apache.asterix.om.utils.ConstantExpressionUtil;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;

Reply via email to