http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java index e684285..4ebf055 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java @@ -19,58 +19,116 @@ package org.apache.asterix.metadata.entities; +import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.logging.Level; +import java.util.logging.Logger; +import org.apache.asterix.active.ActiveJobNotificationHandler; +import org.apache.asterix.active.IActiveEntityEventsListener; import org.apache.asterix.common.config.DatasetConfig.DatasetType; +import org.apache.asterix.common.context.IStorageComponentProvider; +import org.apache.asterix.common.exceptions.CompilationException; +import org.apache.asterix.common.exceptions.ErrorCode; +import org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory; +import org.apache.asterix.common.ioopcallbacks.LSMBTreeWithBuddyIOOperationCallbackFactory; +import org.apache.asterix.common.ioopcallbacks.LSMInvertedIndexIOOperationCallbackFactory; +import org.apache.asterix.common.ioopcallbacks.LSMRTreeIOOperationCallbackFactory; +import org.apache.asterix.common.transactions.IRecoveryManager.ResourceType; +import org.apache.asterix.common.transactions.JobId; +import org.apache.asterix.common.utils.JobUtils; +import org.apache.asterix.common.utils.JobUtils.ProgressState; +import org.apache.asterix.external.feed.management.FeedConnectionId; import org.apache.asterix.metadata.IDatasetDetails; import org.apache.asterix.metadata.MetadataCache; +import org.apache.asterix.metadata.MetadataManager; +import org.apache.asterix.metadata.MetadataTransactionContext; import org.apache.asterix.metadata.api.IMetadataEntity; +import org.apache.asterix.metadata.declared.BTreeDataflowHelperFactoryProvider; +import org.apache.asterix.metadata.declared.MetadataProvider; +import org.apache.asterix.metadata.utils.DatasetUtil; +import org.apache.asterix.metadata.utils.ExternalDatasetsRegistry; +import org.apache.asterix.metadata.utils.ExternalIndexingOperations; +import org.apache.asterix.metadata.utils.IndexUtil; +import org.apache.asterix.metadata.utils.InvertedIndexDataflowHelperFactoryProvider; +import org.apache.asterix.metadata.utils.MetadataConstants; +import org.apache.asterix.metadata.utils.MetadataUtil; +import org.apache.asterix.metadata.utils.RTreeDataflowHelperFactoryProvider; +import org.apache.asterix.om.types.ARecordType; +import org.apache.asterix.om.utils.RecordUtil; +import org.apache.asterix.transaction.management.opcallbacks.LockThenSearchOperationCallbackFactory; +import org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexInstantSearchOperationCallbackFactory; +import org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexModificationOperationCallbackFactory; +import org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexOperationTrackerFactory; +import org.apache.asterix.transaction.management.opcallbacks.SecondaryIndexModificationOperationCallbackFactory; +import org.apache.asterix.transaction.management.opcallbacks.SecondaryIndexOperationTrackerFactory; +import org.apache.asterix.transaction.management.opcallbacks.SecondaryIndexSearchOperationCallbackFactory; +import org.apache.asterix.transaction.management.opcallbacks.TempDatasetSecondaryIndexModificationOperationCallbackFactory; +import org.apache.asterix.transaction.management.opcallbacks.UpsertOperationCallbackFactory; +import org.apache.commons.lang3.mutable.MutableBoolean; +import org.apache.commons.lang3.mutable.MutableObject; +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.algebricks.common.utils.Pair; import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator; -import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator; +import org.apache.hyracks.api.client.IHyracksClientConnection; +import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; +import org.apache.hyracks.api.dataflow.value.ITypeTraits; +import org.apache.hyracks.api.job.JobSpecification; +import org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory; +import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory; +import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory; +import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory; +import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerFactory; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; /** * Metadata describing a dataset. */ public class Dataset implements IMetadataEntity<Dataset> { - /** - * Dataset related operations + /* + * Constants */ - public static final byte OP_READ = 0x00; - public static final byte OP_INSERT = 0x01; - public static final byte OP_DELETE = 0x02; - public static final byte OP_UPSERT = 0x03; - private static final long serialVersionUID = 1L; + private static final Logger LOGGER = Logger.getLogger(Dataset.class.getName()); + //TODO: Remove Singletons + private static final BTreeDataflowHelperFactoryProvider bTreeDataflowHelperFactoryProvider = + BTreeDataflowHelperFactoryProvider.INSTANCE; + private static final RTreeDataflowHelperFactoryProvider rTreeDataflowHelperFactoryProvider = + RTreeDataflowHelperFactoryProvider.INSTANCE; + private static final InvertedIndexDataflowHelperFactoryProvider invertedIndexDataflowHelperFactoryProvider = + InvertedIndexDataflowHelperFactoryProvider.INSTANCE; + /* + * Members + */ + private final int datasetId; private final String dataverseName; - // Enforced to be unique within a dataverse. private final String datasetName; - // Dataverse of ItemType for this dataset - private final String itemTypeDataverseName; - // Type of items stored in this dataset. - private final String itemTypeName; + private final String recordTypeDataverseName; + private final String recordTypeName; private final String nodeGroupName; - private final String compactionPolicy; + private final String compactionPolicyFactory; + private final Map<String, String> hints; private final Map<String, String> compactionPolicyProperties; private final DatasetType datasetType; private final IDatasetDetails datasetDetails; - // Hints related to cardinatlity of dataset, avg size of tuples etc. - private final Map<String, String> hints; - private final int datasetId; - // Type of pending operations with respect to atomic DDL operation + private final String metaTypeDataverseName; + private final String metaTypeName; private int pendingOp; - // Dataverse of Meta ItemType for this dataset. - private final String metaItemTypeDataverseName; - // Type of Meta items stored in this dataset. - private final String metaItemTypeName; - - public Dataset(String dataverseName, String datasetName, String itemTypeDataverseName, String itemTypeName, + public Dataset(String dataverseName, String datasetName, String recordTypeDataverseName, String recordTypeName, String nodeGroupName, String compactionPolicy, Map<String, String> compactionPolicyProperties, IDatasetDetails datasetDetails, Map<String, String> hints, DatasetType datasetType, int datasetId, int pendingOp) { - this(dataverseName, datasetName, itemTypeDataverseName, itemTypeName, null, null, nodeGroupName, - compactionPolicy, compactionPolicyProperties, datasetDetails, hints, datasetType, datasetId, pendingOp); + this(dataverseName, datasetName, recordTypeDataverseName, recordTypeName, /*metaTypeDataverseName*/null, + /*metaTypeName*/null, nodeGroupName, compactionPolicy, compactionPolicyProperties, datasetDetails, + hints, datasetType, datasetId, pendingOp); } public Dataset(String dataverseName, String datasetName, String itemTypeDataverseName, String itemTypeName, @@ -79,12 +137,12 @@ public class Dataset implements IMetadataEntity<Dataset> { DatasetType datasetType, int datasetId, int pendingOp) { this.dataverseName = dataverseName; this.datasetName = datasetName; - this.itemTypeName = itemTypeName; - this.itemTypeDataverseName = itemTypeDataverseName; - this.metaItemTypeDataverseName = metaItemTypeDataverseName; - this.metaItemTypeName = metaItemTypeName; + this.recordTypeName = itemTypeName; + this.recordTypeDataverseName = itemTypeDataverseName; + this.metaTypeDataverseName = metaItemTypeDataverseName; + this.metaTypeName = metaItemTypeName; this.nodeGroupName = nodeGroupName; - this.compactionPolicy = compactionPolicy; + this.compactionPolicyFactory = compactionPolicy; this.compactionPolicyProperties = compactionPolicyProperties; this.datasetType = datasetType; this.datasetDetails = datasetDetails; @@ -94,10 +152,10 @@ public class Dataset implements IMetadataEntity<Dataset> { } public Dataset(Dataset dataset) { - this(dataset.dataverseName, dataset.datasetName, dataset.itemTypeDataverseName, dataset.itemTypeName, - dataset.metaItemTypeDataverseName, dataset.metaItemTypeName, dataset.nodeGroupName, - dataset.compactionPolicy, dataset.compactionPolicyProperties, dataset.datasetDetails, dataset.hints, - dataset.datasetType, dataset.datasetId, dataset.pendingOp); + this(dataset.dataverseName, dataset.datasetName, dataset.recordTypeDataverseName, dataset.recordTypeName, + dataset.metaTypeDataverseName, dataset.metaTypeName, dataset.nodeGroupName, + dataset.compactionPolicyFactory, dataset.compactionPolicyProperties, dataset.datasetDetails, + dataset.hints, dataset.datasetType, dataset.datasetId, dataset.pendingOp); } public String getDataverseName() { @@ -109,11 +167,11 @@ public class Dataset implements IMetadataEntity<Dataset> { } public String getItemTypeName() { - return itemTypeName; + return recordTypeName; } public String getItemTypeDataverseName() { - return itemTypeDataverseName; + return recordTypeDataverseName; } public String getNodeGroupName() { @@ -121,7 +179,7 @@ public class Dataset implements IMetadataEntity<Dataset> { } public String getCompactionPolicy() { - return compactionPolicy; + return compactionPolicyFactory; } public Map<String, String> getCompactionPolicyProperties() { @@ -149,15 +207,15 @@ public class Dataset implements IMetadataEntity<Dataset> { } public String getMetaItemTypeDataverseName() { - return metaItemTypeDataverseName; + return metaTypeDataverseName; } public String getMetaItemTypeName() { - return metaItemTypeName; + return metaTypeName; } public boolean hasMetaPart() { - return metaItemTypeDataverseName != null && metaItemTypeName != null; + return metaTypeDataverseName != null && metaTypeName != null; } public void setPendingOp(int pendingOp) { @@ -192,12 +250,327 @@ public class Dataset implements IMetadataEntity<Dataset> { return true; } - public boolean allow(ILogicalOperator topOp, byte operation) { + public boolean allow(ILogicalOperator topOp, byte operation) {//NOSONAR: this method is meant to be extended return !hasMetaPart(); } + /** + * Drop this dataset + * + * @param metadataProvider + * metadata provider that can be used to get metadata info and runtimes + * @param mdTxnCtx + * the transaction context + * @param jobsToExecute + * a list of jobs to be executed as part of the drop operation + * @param bActiveTxn + * whether the metadata transaction is ongoing + * @param progress + * a mutable progress state used for error handling during the drop operation + * @param hcc + * a client connection to hyracks master for job execution + * @throws Exception + * if an error occur during the drop process or if the dataset can't be dropped for any reason + */ + public void drop(MetadataProvider metadataProvider, MutableObject<MetadataTransactionContext> mdTxnCtx, + List<JobSpecification> jobsToExecute, MutableBoolean bActiveTxn, MutableObject<ProgressState> progress, + IHyracksClientConnection hcc) throws Exception { + Map<FeedConnectionId, Pair<JobSpecification, Boolean>> disconnectJobList = new HashMap<>(); + if (getDatasetType() == DatasetType.INTERNAL) { + // prepare job spec(s) that would disconnect any active feeds involving the dataset. + IActiveEntityEventsListener[] activeListeners = ActiveJobNotificationHandler.INSTANCE.getEventListeners(); + for (IActiveEntityEventsListener listener : activeListeners) { + if (listener.isEntityUsingDataset(dataverseName, datasetName)) { + throw new CompilationException(ErrorCode.COMPILATION_CANT_DROP_ACTIVE_DATASET, + RecordUtil.toFullyQualifiedName(dataverseName, datasetName), + listener.getEntityId().toString()); + } + } + // #. prepare jobs to drop the datatset and the indexes in NC + List<Index> indexes = + MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx.getValue(), dataverseName, datasetName); + for (int j = 0; j < indexes.size(); j++) { + if (indexes.get(j).isSecondaryIndex()) { + jobsToExecute.add(IndexUtil.dropJob(indexes.get(j), metadataProvider, this)); + } + } + Index primaryIndex = + MetadataManager.INSTANCE.getIndex(mdTxnCtx.getValue(), dataverseName, datasetName, datasetName); + jobsToExecute.add(DatasetUtil.createDropDatasetJobSpec(this, primaryIndex, metadataProvider)); + // #. mark the existing dataset as PendingDropOp + MetadataManager.INSTANCE.dropDataset(mdTxnCtx.getValue(), dataverseName, datasetName); + MetadataManager.INSTANCE.addDataset(mdTxnCtx.getValue(), + new Dataset(dataverseName, datasetName, getItemTypeDataverseName(), getItemTypeName(), + getMetaItemTypeDataverseName(), getMetaItemTypeName(), getNodeGroupName(), + getCompactionPolicy(), getCompactionPolicyProperties(), getDatasetDetails(), getHints(), + getDatasetType(), getDatasetId(), MetadataUtil.PENDING_DROP_OP)); + + MetadataManager.INSTANCE.commitTransaction(mdTxnCtx.getValue()); + bActiveTxn.setValue(false); + progress.setValue(ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA); + + // # disconnect the feeds + for (Pair<JobSpecification, Boolean> p : disconnectJobList.values()) { + JobUtils.runJob(hcc, p.first, true); + } + + // #. run the jobs + for (JobSpecification jobSpec : jobsToExecute) { + JobUtils.runJob(hcc, jobSpec, true); + } + + mdTxnCtx.setValue(MetadataManager.INSTANCE.beginTransaction()); + bActiveTxn.setValue(true); + metadataProvider.setMetadataTxnContext(mdTxnCtx.getValue()); + } else { + // External dataset + ExternalDatasetsRegistry.INSTANCE.removeDatasetInfo(this); + // #. prepare jobs to drop the datatset and the indexes in NC + List<Index> indexes = + MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx.getValue(), dataverseName, datasetName); + for (int j = 0; j < indexes.size(); j++) { + if (ExternalIndexingOperations.isFileIndex(indexes.get(j))) { + jobsToExecute.add(IndexUtil.dropJob(indexes.get(j), metadataProvider, this)); + } else { + jobsToExecute.add(DatasetUtil.buildDropFilesIndexJobSpec(metadataProvider, this)); + } + } + + // #. mark the existing dataset as PendingDropOp + MetadataManager.INSTANCE.dropDataset(mdTxnCtx.getValue(), dataverseName, datasetName); + MetadataManager.INSTANCE.addDataset(mdTxnCtx.getValue(), + new Dataset(dataverseName, datasetName, getItemTypeDataverseName(), getItemTypeName(), + getNodeGroupName(), getCompactionPolicy(), getCompactionPolicyProperties(), + getDatasetDetails(), getHints(), getDatasetType(), getDatasetId(), + MetadataUtil.PENDING_DROP_OP)); + + MetadataManager.INSTANCE.commitTransaction(mdTxnCtx.getValue()); + bActiveTxn.setValue(false); + progress.setValue(ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA); + + // #. run the jobs + for (JobSpecification jobSpec : jobsToExecute) { + JobUtils.runJob(hcc, jobSpec, true); + } + if (!indexes.isEmpty()) { + ExternalDatasetsRegistry.INSTANCE.removeDatasetInfo(this); + } + mdTxnCtx.setValue(MetadataManager.INSTANCE.beginTransaction()); + bActiveTxn.setValue(true); + metadataProvider.setMetadataTxnContext(mdTxnCtx.getValue()); + } + + // #. finally, delete the dataset. + MetadataManager.INSTANCE.dropDataset(mdTxnCtx.getValue(), dataverseName, datasetName); + // Drop the associated nodegroup + String nodegroup = getNodeGroupName(); + if (!nodegroup.equalsIgnoreCase(MetadataConstants.METADATA_DEFAULT_NODEGROUP_NAME)) { + MetadataManager.INSTANCE.dropNodegroup(mdTxnCtx.getValue(), dataverseName + ":" + datasetName); + } + } + + /** + * Create the index dataflow helper factory for a particular index on the dataset + * + * @param mdProvider + * metadata provider to get metadata information, components, and runtimes + * @param index + * the index to get the dataflow helper factory for + * @param recordType + * the record type for the dataset + * @param metaType + * the meta type for the dataset + * @param mergePolicyFactory + * the merge policy factory of the dataset + * @param mergePolicyProperties + * the merge policy properties for the dataset + * @return indexDataflowHelperFactory + * an instance of {@link org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory} + * @throws AlgebricksException + * if dataflow helper factory could not be created + */ + public IIndexDataflowHelperFactory getIndexDataflowHelperFactory(MetadataProvider mdProvider, Index index, + ARecordType recordType, ARecordType metaType, ILSMMergePolicyFactory mergePolicyFactory, + Map<String, String> mergePolicyProperties) throws AlgebricksException { + ITypeTraits[] filterTypeTraits = DatasetUtil.computeFilterTypeTraits(this, recordType); + IBinaryComparatorFactory[] filterCmpFactories = DatasetUtil.computeFilterBinaryComparatorFactories(this, + recordType, mdProvider.getStorageComponentProvider().getComparatorFactoryProvider()); + switch (index.getIndexType()) { + case BTREE: + return bTreeDataflowHelperFactoryProvider.getIndexDataflowHelperFactory(mdProvider, this, index, + recordType, metaType, mergePolicyFactory, mergePolicyProperties, filterTypeTraits, + filterCmpFactories); + case RTREE: + return rTreeDataflowHelperFactoryProvider.getIndexDataflowHelperFactory(mdProvider, this, index, + recordType, metaType, mergePolicyFactory, mergePolicyProperties, filterTypeTraits, + filterCmpFactories); + case LENGTH_PARTITIONED_NGRAM_INVIX: + case LENGTH_PARTITIONED_WORD_INVIX: + case SINGLE_PARTITION_NGRAM_INVIX: + case SINGLE_PARTITION_WORD_INVIX: + return invertedIndexDataflowHelperFactoryProvider.getIndexDataflowHelperFactory(mdProvider, this, + index, recordType, metaType, mergePolicyFactory, mergePolicyProperties, filterTypeTraits, + filterCmpFactories); + default: + throw new CompilationException(ErrorCode.COMPILATION_UNKNOWN_INDEX_TYPE, + index.getIndexType().toString()); + } + } + + /** + * Get the IO Operation callback factory for the index which belongs to this dataset + * + * @param index + * the index + * @return ioOperationCallbackFactory + * an instance of {@link org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory} + * to be used with IO operations + * @throws AlgebricksException + * if the factory could not be created for the index/dataset combination + */ + public ILSMIOOperationCallbackFactory getIoOperationCallbackFactory(Index index) throws AlgebricksException { + switch (index.getIndexType()) { + case BTREE: + return getDatasetType() == DatasetType.EXTERNAL + && !index.getIndexName().equals(BTreeDataflowHelperFactoryProvider.externalFileIndexName(this)) + ? LSMBTreeWithBuddyIOOperationCallbackFactory.INSTANCE + : LSMBTreeIOOperationCallbackFactory.INSTANCE; + case RTREE: + return LSMRTreeIOOperationCallbackFactory.INSTANCE; + case LENGTH_PARTITIONED_NGRAM_INVIX: + case LENGTH_PARTITIONED_WORD_INVIX: + case SINGLE_PARTITION_NGRAM_INVIX: + case SINGLE_PARTITION_WORD_INVIX: + return LSMInvertedIndexIOOperationCallbackFactory.INSTANCE; + default: + throw new CompilationException(ErrorCode.COMPILATION_UNKNOWN_INDEX_TYPE, + index.getIndexType().toString()); + } + } + + /** + * get the IndexOperationTrackerFactory for a particular index on the dataset + * + * @param index + * the index + * @return an instance of {@link org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerFactory} + */ + public ILSMOperationTrackerFactory getIndexOperationTrackerFactory(Index index) { + return index.isPrimaryIndex() ? new PrimaryIndexOperationTrackerFactory(getDatasetId()) + : new SecondaryIndexOperationTrackerFactory(getDatasetId()); + } + + /** + * Get search callback factory for this dataset with the passed index and operation + * + * @param index + * the index + * @param jobId + * the job id being compiled + * @param op + * the operation this search is part of + * @param primaryKeyFields + * the primary key fields indexes for locking purposes + * @return + * an instance of {@link org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory} + * @throws AlgebricksException + * if the callback factory could not be created + */ + public ISearchOperationCallbackFactory getSearchCallbackFactory(IStorageComponentProvider storageComponentProvider, + Index index, JobId jobId, IndexOperation op, int[] primaryKeyFields) throws AlgebricksException { + if (getDatasetDetails().isTemp()) { + return NoOpOperationCallbackFactory.INSTANCE; + } else if (index.isPrimaryIndex()) { + /** + * Due to the read-committed isolation level, + * we may acquire very short duration lock(i.e., instant lock) for readers. + */ + return (op == IndexOperation.UPSERT) + ? new LockThenSearchOperationCallbackFactory(jobId, getDatasetId(), primaryKeyFields, + storageComponentProvider.getTransactionSubsystemProvider(), ResourceType.LSM_BTREE) + : new PrimaryIndexInstantSearchOperationCallbackFactory(jobId, getDatasetId(), primaryKeyFields, + storageComponentProvider.getTransactionSubsystemProvider(), ResourceType.LSM_BTREE); + } + return new SecondaryIndexSearchOperationCallbackFactory(); + } + + /** + * Get the modification callback factory associated with this dataset, the passed index, and operation. + * + * @param index + * the index + * @param jobId + * the job id of the job being compiled + * @param op + * the operation performed for this callback + * @param primaryKeyFields + * the indexes of the primary keys (used for lock operations) + * @return + * an instance of {@link org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory} + * @throws AlgebricksException + * If the callback factory could not be created + */ + public IModificationOperationCallbackFactory getModificationCallbackFactory( + IStorageComponentProvider componentProvider, Index index, JobId jobId, IndexOperation op, + int[] primaryKeyFields) throws AlgebricksException { + if (getDatasetDetails().isTemp()) { + return new TempDatasetSecondaryIndexModificationOperationCallbackFactory(jobId, getDatasetId(), + primaryKeyFields, componentProvider.getTransactionSubsystemProvider(), op, index.resourceType()); + } else if (index.isPrimaryIndex()) { + return op == IndexOperation.UPSERT + ? new UpsertOperationCallbackFactory(jobId, getDatasetId(), primaryKeyFields, + componentProvider.getTransactionSubsystemProvider(), op, index.resourceType(), + hasMetaPart()) + : op == IndexOperation.DELETE || op == IndexOperation.INSERT + ? new PrimaryIndexModificationOperationCallbackFactory(jobId, getDatasetId(), + primaryKeyFields, componentProvider.getTransactionSubsystemProvider(), op, + index.resourceType(), hasMetaPart()) + : NoOpOperationCallbackFactory.INSTANCE; + } else { + return op == IndexOperation.DELETE || op == IndexOperation.INSERT || op == IndexOperation.UPSERT + ? new SecondaryIndexModificationOperationCallbackFactory(jobId, getDatasetId(), primaryKeyFields, + componentProvider.getTransactionSubsystemProvider(), op, index.resourceType(), + hasMetaPart()) + : NoOpOperationCallbackFactory.INSTANCE; + } + } + @Override public String toString() { - return dataverseName + "." + datasetName; + ObjectMapper mapper = new ObjectMapper(); + try { + return mapper.writeValueAsString(toMap()); + } catch (JsonProcessingException e) { + LOGGER.log(Level.WARNING, "Unable to convert map to json String", e); + return dataverseName + "." + datasetName; + } + } + + public Map<String, Object> toMap() { + Map<String, Object> tree = new HashMap<>(); + tree.put("datasetId", Integer.toString(datasetId)); + tree.put("dataverseName", dataverseName); + tree.put("datasetName", datasetName); + tree.put("recordTypeDataverseName", recordTypeDataverseName); + tree.put("recordTypeName", recordTypeName); + tree.put("nodeGroupName", nodeGroupName); + tree.put("compactionPolicyFactory", compactionPolicyFactory); + tree.put("hints", hints); + tree.put("compactionPolicyProperties", compactionPolicyProperties); + tree.put("datasetType", datasetType.name()); + tree.put("datasetDetails", datasetDetails.toString()); + tree.put("metaTypeDataverseName", metaTypeDataverseName); + tree.put("metaTypeName", metaTypeName); + tree.put("pendingOp", MetadataUtil.pendingOpToString(pendingOp)); + return tree; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((datasetName == null) ? 0 : datasetName.hashCode()); + result = prime * result + ((dataverseName == null) ? 0 : dataverseName.hashCode()); + return result; } }
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/ExternalDatasetDetails.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/ExternalDatasetDetails.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/ExternalDatasetDetails.java index cdc982f..b9b4cd9 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/ExternalDatasetDetails.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/ExternalDatasetDetails.java @@ -21,17 +21,20 @@ package org.apache.asterix.metadata.entities; import java.io.DataOutput; import java.util.Date; +import java.util.HashMap; import java.util.Map; +import java.util.logging.Level; +import java.util.logging.Logger; import org.apache.asterix.builders.IARecordBuilder; import org.apache.asterix.builders.OrderedListBuilder; import org.apache.asterix.builders.RecordBuilder; import org.apache.asterix.common.config.DatasetConfig.DatasetType; -import org.apache.asterix.common.config.DatasetConfig.ExternalDatasetTransactionState; +import org.apache.asterix.common.config.DatasetConfig.TransactionState; import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider; import org.apache.asterix.metadata.IDatasetDetails; import org.apache.asterix.metadata.bootstrap.MetadataRecordTypes; -import org.apache.asterix.metadata.utils.DatasetUtils; +import org.apache.asterix.metadata.utils.DatasetUtil; import org.apache.asterix.om.base.ADateTime; import org.apache.asterix.om.base.AInt32; import org.apache.asterix.om.base.AMutableString; @@ -42,17 +45,21 @@ import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.data.std.util.ArrayBackedValueStorage; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; + public class ExternalDatasetDetails implements IDatasetDetails { private static final long serialVersionUID = 1L; + private static final Logger LOGGER = Logger.getLogger(ExternalDatasetDetails.class.getName()); private final String adapter; private final Map<String, String> properties; private final long addToCacheTime; private Date lastRefreshTime; - private ExternalDatasetTransactionState state; + private TransactionState state; public ExternalDatasetDetails(String adapter, Map<String, String> properties, Date lastRefreshTime, - ExternalDatasetTransactionState state) { + TransactionState state) { this.properties = properties; this.adapter = adapter; this.addToCacheTime = System.currentTimeMillis(); @@ -103,7 +110,7 @@ public class ExternalDatasetDetails implements IDatasetDetails { String name = property.getKey(); String value = property.getValue(); itemValue.reset(); - DatasetUtils.writePropertyTypeRecord(name, value, itemValue.getDataOutput(), + DatasetUtil.writePropertyTypeRecord(name, value, itemValue.getDataOutput(), MetadataRecordTypes.DATASOURCE_ADAPTER_PROPERTIES_RECORDTYPE); listBuilder.addItem(itemValue); } @@ -143,11 +150,31 @@ public class ExternalDatasetDetails implements IDatasetDetails { this.lastRefreshTime = timestamp; } - public ExternalDatasetTransactionState getState() { + public TransactionState getState() { return state; } - public void setState(ExternalDatasetTransactionState state) { + public void setState(TransactionState state) { this.state = state; } + + @Override + public String toString() { + ObjectMapper mapper = new ObjectMapper(); + try { + return mapper.writeValueAsString(toMap()); + } catch (JsonProcessingException e) { + LOGGER.log(Level.WARNING, "Unable to convert map to json String", e); + return getClass().getSimpleName(); + } + } + + private Map<String, Object> toMap() { + Map<String, Object> map = new HashMap<>(); + map.put("adapter", adapter); + map.put("properties", properties); + map.put("lastRefreshTime", lastRefreshTime.toString()); + map.put("state", state.name()); + return map; + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Index.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Index.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Index.java index f33a2b6..df47c70 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Index.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Index.java @@ -22,13 +22,16 @@ package org.apache.asterix.metadata.entities; import java.util.List; import org.apache.asterix.common.config.DatasetConfig.IndexType; -import org.apache.asterix.common.exceptions.AsterixException; +import org.apache.asterix.common.exceptions.CompilationException; +import org.apache.asterix.common.exceptions.ErrorCode; +import org.apache.asterix.common.transactions.IRecoveryManager.ResourceType; import org.apache.asterix.metadata.MetadataCache; import org.apache.asterix.metadata.api.IMetadataEntity; import org.apache.asterix.om.types.ARecordType; import org.apache.asterix.om.types.AUnionType; import org.apache.asterix.om.types.IAType; -import org.apache.asterix.om.util.NonTaggedFormatUtil; +import org.apache.asterix.om.utils.NonTaggedFormatUtil; +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.algebricks.common.utils.Pair; /** @@ -131,7 +134,7 @@ public class Index implements IMetadataEntity<Index>, Comparable<Index> { return !isPrimaryIndex(); } - public static Pair<IAType, Boolean> getNonNullableType(IAType keyType) throws AsterixException { + public static Pair<IAType, Boolean> getNonNullableType(IAType keyType) { boolean nullable = false; IAType actualKeyType = keyType; if (NonTaggedFormatUtil.isOptional(keyType)) { @@ -142,7 +145,7 @@ public class Index implements IMetadataEntity<Index>, Comparable<Index> { } public static Pair<IAType, Boolean> getNonNullableOpenFieldType(IAType fieldType, List<String> fieldName, - ARecordType recType) throws AsterixException { + ARecordType recType) throws AlgebricksException { Pair<IAType, Boolean> keyPairType = null; IAType subType = recType; for (int i = 0; i < fieldName.size(); i++) { @@ -159,12 +162,12 @@ public class Index implements IMetadataEntity<Index>, Comparable<Index> { } public static Pair<IAType, Boolean> getNonNullableKeyFieldType(List<String> expr, ARecordType recType) - throws AsterixException { + throws AlgebricksException { IAType keyType = Index.keyFieldType(expr, recType); return getNonNullableType(keyType); } - private static IAType keyFieldType(List<String> expr, ARecordType recType) throws AsterixException { + private static IAType keyFieldType(List<String> expr, ARecordType recType) throws AlgebricksException { IAType fieldType = recType; fieldType = recType.getSubFieldType(expr); return fieldType; @@ -251,4 +254,20 @@ public class Index implements IMetadataEntity<Index>, Comparable<Index> { } return false; } + + public byte resourceType() throws CompilationException { + switch (indexType) { + case BTREE: + return ResourceType.LSM_BTREE; + case RTREE: + return ResourceType.LSM_RTREE; + case LENGTH_PARTITIONED_NGRAM_INVIX: + case LENGTH_PARTITIONED_WORD_INVIX: + case SINGLE_PARTITION_NGRAM_INVIX: + case SINGLE_PARTITION_WORD_INVIX: + return ResourceType.LSM_INVERTED_INDEX; + default: + throw new CompilationException(ErrorCode.COMPILATION_UNKNOWN_INDEX_TYPE, indexType); + } + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java index dbe4a67..138cad7 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java @@ -37,7 +37,7 @@ import org.apache.asterix.builders.OrderedListBuilder; import org.apache.asterix.builders.RecordBuilder; import org.apache.asterix.builders.UnorderedListBuilder; import org.apache.asterix.common.config.DatasetConfig.DatasetType; -import org.apache.asterix.common.config.DatasetConfig.ExternalDatasetTransactionState; +import org.apache.asterix.common.config.DatasetConfig.TransactionState; import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider; import org.apache.asterix.metadata.IDatasetDetails; import org.apache.asterix.metadata.MetadataException; @@ -48,7 +48,7 @@ import org.apache.asterix.metadata.entities.ExternalDatasetDetails; import org.apache.asterix.metadata.entities.InternalDatasetDetails; import org.apache.asterix.metadata.entities.InternalDatasetDetails.FileStructure; import org.apache.asterix.metadata.entities.InternalDatasetDetails.PartitioningStrategy; -import org.apache.asterix.metadata.utils.DatasetUtils; +import org.apache.asterix.metadata.utils.DatasetUtil; import org.apache.asterix.om.base.ABoolean; import org.apache.asterix.om.base.ADateTime; import org.apache.asterix.om.base.AInt32; @@ -233,7 +233,7 @@ public class DatasetTupleTranslator extends AbstractTupleTranslator<Dataset> { .getValueByPos(MetadataRecordTypes.EXTERNAL_DETAILS_ARECORD_LAST_REFRESH_TIME_FIELD_INDEX))) .getChrononTime()); // State - ExternalDatasetTransactionState state = ExternalDatasetTransactionState + TransactionState state = TransactionState .values()[((AInt32) datasetDetailsRecord.getValueByPos( MetadataRecordTypes.EXTERNAL_DETAILS_ARECORD_TRANSACTION_STATE_FIELD_INDEX)) .getIntegerValue()]; @@ -326,7 +326,7 @@ public class DatasetTupleTranslator extends AbstractTupleTranslator<Dataset> { String name = property.getKey(); String value = property.getValue(); itemValue.reset(); - DatasetUtils.writePropertyTypeRecord(name, value, itemValue.getDataOutput(), + DatasetUtil.writePropertyTypeRecord(name, value, itemValue.getDataOutput(), MetadataRecordTypes.COMPACTION_POLICY_PROPERTIES_RECORDTYPE); listBuilder.addItem(itemValue); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatatypeTupleTranslator.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatatypeTupleTranslator.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatatypeTupleTranslator.java index fc7ef60..b81ec29 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatatypeTupleTranslator.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatatypeTupleTranslator.java @@ -51,7 +51,7 @@ import org.apache.asterix.om.types.AUnorderedListType; import org.apache.asterix.om.types.AbstractCollectionType; import org.apache.asterix.om.types.AbstractComplexType; import org.apache.asterix.om.types.IAType; -import org.apache.asterix.om.util.NonTaggedFormatUtil; +import org.apache.asterix.om.utils.NonTaggedFormatUtil; import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.data.std.util.ArrayBackedValueStorage; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/IndexTupleTranslator.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/IndexTupleTranslator.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/IndexTupleTranslator.java index 70d18b4..6446e67 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/IndexTupleTranslator.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/IndexTupleTranslator.java @@ -29,7 +29,6 @@ import java.util.List; import org.apache.asterix.builders.OrderedListBuilder; import org.apache.asterix.common.config.DatasetConfig.IndexType; -import org.apache.asterix.common.exceptions.AsterixException; import org.apache.asterix.common.transactions.JobId; import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider; import org.apache.asterix.metadata.MetadataException; @@ -39,7 +38,7 @@ import org.apache.asterix.metadata.bootstrap.MetadataRecordTypes; import org.apache.asterix.metadata.entities.BuiltinTypeMap; import org.apache.asterix.metadata.entities.Dataset; import org.apache.asterix.metadata.entities.Index; -import org.apache.asterix.metadata.utils.KeyFieldTypeUtils; +import org.apache.asterix.metadata.utils.KeyFieldTypeUtil; import org.apache.asterix.om.base.ABoolean; import org.apache.asterix.om.base.ACollectionCursor; import org.apache.asterix.om.base.AInt32; @@ -53,6 +52,7 @@ import org.apache.asterix.om.types.AOrderedListType; import org.apache.asterix.om.types.ARecordType; import org.apache.asterix.om.types.BuiltinType; import org.apache.asterix.om.types.IAType; +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer; import org.apache.hyracks.data.std.util.ArrayBackedValueStorage; import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; @@ -61,6 +61,7 @@ import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; * Translates an Index metadata entity to an ITupleReference and vice versa. */ public class IndexTupleTranslator extends AbstractTupleTranslator<Index> { + private static final long serialVersionUID = 1L; // Field indexes of serialized Index in a tuple. // First key field. public static final int INDEX_DATAVERSENAME_TUPLE_FIELD_INDEX = 0; @@ -82,18 +83,16 @@ public class IndexTupleTranslator extends AbstractTupleTranslator<Index> { private transient AOrderedListType int8List = new AOrderedListType(BuiltinType.AINT8, null); private transient ArrayBackedValueStorage nameValue = new ArrayBackedValueStorage(); private transient ArrayBackedValueStorage itemValue = new ArrayBackedValueStorage(); - private transient List<List<String>> searchKey; - private transient List<IAType> searchKeyType; private transient AMutableInt8 aInt8 = new AMutableInt8((byte) 0); @SuppressWarnings("unchecked") - private ISerializerDeserializer<AInt32> intSerde = SerializerDeserializerProvider.INSTANCE - .getSerializerDeserializer(BuiltinType.AINT32); + private ISerializerDeserializer<AInt32> intSerde = + SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT32); @SuppressWarnings("unchecked") - private ISerializerDeserializer<AInt8> int8Serde = SerializerDeserializerProvider.INSTANCE - .getSerializerDeserializer(BuiltinType.AINT8); + private ISerializerDeserializer<AInt8> int8Serde = + SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT8); @SuppressWarnings("unchecked") - private ISerializerDeserializer<ARecord> recordSerde = SerializerDeserializerProvider.INSTANCE - .getSerializerDeserializer(MetadataRecordTypes.INDEX_RECORDTYPE); + private ISerializerDeserializer<ARecord> recordSerde = + SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(MetadataRecordTypes.INDEX_RECORDTYPE); private final MetadataNode metadataNode; private final JobId jobId; @@ -120,14 +119,15 @@ public class IndexTupleTranslator extends AbstractTupleTranslator<Index> { IndexType indexStructure = IndexType .valueOf(((AString) rec.getValueByPos(MetadataRecordTypes.INDEX_ARECORD_INDEXSTRUCTURE_FIELD_INDEX)) .getStringValue()); - IACursor fieldNameCursor = ((AOrderedList) rec - .getValueByPos(MetadataRecordTypes.INDEX_ARECORD_SEARCHKEY_FIELD_INDEX)).getCursor(); - List<List<String>> searchKey = new ArrayList<List<String>>(); + IACursor fieldNameCursor = + ((AOrderedList) rec.getValueByPos(MetadataRecordTypes.INDEX_ARECORD_SEARCHKEY_FIELD_INDEX)) + .getCursor(); + List<List<String>> searchKey = new ArrayList<>(); AOrderedList fieldNameList; while (fieldNameCursor.next()) { fieldNameList = (AOrderedList) fieldNameCursor.get(); IACursor nestedFieldNameCursor = (fieldNameList.getCursor()); - List<String> nestedFieldName = new ArrayList<String>(); + List<String> nestedFieldName = new ArrayList<>(); while (nestedFieldNameCursor.next()) { nestedFieldName.add(((AString) nestedFieldNameCursor.get()).getStringValue()); } @@ -138,7 +138,7 @@ public class IndexTupleTranslator extends AbstractTupleTranslator<Index> { if (indexKeyTypeFieldPos > 0) { fieldTypeCursor = ((AOrderedList) rec.getValueByPos(indexKeyTypeFieldPos)).getCursor(); } - List<IAType> searchKeyType = new ArrayList<IAType>(searchKey.size()); + List<IAType> searchKeyType = new ArrayList<>(searchKey.size()); while (fieldTypeCursor.next()) { String typeName = ((AString) fieldTypeCursor.get()).getStringValue(); IAType fieldType = BuiltinTypeMap.getTypeFromTypeName(metadataNode, jobId, dvName, typeName, false); @@ -150,8 +150,8 @@ public class IndexTupleTranslator extends AbstractTupleTranslator<Index> { if (isEnforcedFieldPos > 0) { isEnforcingKeys = ((ABoolean) rec.getValueByPos(isEnforcedFieldPos)).getBoolean(); } - Boolean isPrimaryIndex = ((ABoolean) rec.getValueByPos(MetadataRecordTypes.INDEX_ARECORD_ISPRIMARY_FIELD_INDEX)) - .getBoolean(); + Boolean isPrimaryIndex = + ((ABoolean) rec.getValueByPos(MetadataRecordTypes.INDEX_ARECORD_ISPRIMARY_FIELD_INDEX)).getBoolean(); int pendingOp = ((AInt32) rec.getValueByPos(MetadataRecordTypes.INDEX_ARECORD_PENDINGOP_FIELD_INDEX)) .getIntegerValue(); // Check if there is a gram length as well. @@ -180,8 +180,8 @@ public class IndexTupleTranslator extends AbstractTupleTranslator<Index> { Dataset dSet = metadataNode.getDataset(jobId, dvName, dsName); String datatypeName = dSet.getItemTypeName(); String datatypeDataverseName = dSet.getItemTypeDataverseName(); - ARecordType recordDt = (ARecordType) metadataNode.getDatatype(jobId, datatypeDataverseName, datatypeName) - .getDatatype(); + ARecordType recordDt = + (ARecordType) metadataNode.getDatatype(jobId, datatypeDataverseName, datatypeName).getDatatype(); String metatypeName = dSet.getMetaItemTypeName(); String metatypeDataverseName = dSet.getMetaItemTypeDataverseName(); ARecordType metaDt = null; @@ -190,8 +190,8 @@ public class IndexTupleTranslator extends AbstractTupleTranslator<Index> { .getDatatype(); } try { - searchKeyType = KeyFieldTypeUtils.getKeyTypes(recordDt, metaDt, searchKey, keyFieldSourceIndicator); - } catch (AsterixException e) { + searchKeyType = KeyFieldTypeUtil.getKeyTypes(recordDt, metaDt, searchKey, keyFieldSourceIndicator); + } catch (AlgebricksException e) { throw new MetadataException(e); } } @@ -242,8 +242,8 @@ public class IndexTupleTranslator extends AbstractTupleTranslator<Index> { // write field 4 primaryKeyListBuilder.reset((AOrderedListType) MetadataRecordTypes.INDEX_RECORDTYPE .getFieldTypes()[MetadataRecordTypes.INDEX_ARECORD_SEARCHKEY_FIELD_INDEX]); - this.searchKey = instance.getKeyFieldNames(); - for (List<String> field : this.searchKey) { + List<List<String>> searchKey = instance.getKeyFieldNames(); + for (List<String> field : searchKey) { listBuilder.reset(stringList); for (String subField : field) { itemValue.reset(); @@ -293,14 +293,13 @@ public class IndexTupleTranslator extends AbstractTupleTranslator<Index> { // write optional field 9 OrderedListBuilder typeListBuilder = new OrderedListBuilder(); typeListBuilder.reset(new AOrderedListType(BuiltinType.ANY, null)); - ArrayBackedValueStorage nameValue = new ArrayBackedValueStorage(); nameValue.reset(); aString.setValue(INDEX_SEARCHKEY_TYPE_FIELD_NAME); stringSerde.serialize(aString, nameValue.getDataOutput()); - this.searchKeyType = instance.getKeyFieldTypes(); - for (IAType type : this.searchKeyType) { + List<IAType> searchKeyType = instance.getKeyFieldTypes(); + for (IAType type : searchKeyType) { itemValue.reset(); aString.setValue(type.getTypeName()); stringSerde.serialize(aString, itemValue.getDataOutput()); @@ -334,7 +333,6 @@ public class IndexTupleTranslator extends AbstractTupleTranslator<Index> { } if (needSerialization) { listBuilder.reset(int8List); - ArrayBackedValueStorage nameValue = new ArrayBackedValueStorage(); nameValue.reset(); aString.setValue(INDEX_SEARCHKEY_SOURCE_INDICATOR_FIELD_NAME); stringSerde.serialize(aString, nameValue.getDataOutput()); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedOperations.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedOperations.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedOperations.java new file mode 100644 index 0000000..00c9010 --- /dev/null +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedOperations.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.metadata.feeds; + +import java.util.Collection; +import java.util.List; +import java.util.Set; +import java.util.TreeSet; + +import org.apache.asterix.active.ActiveJobNotificationHandler; +import org.apache.asterix.active.EntityId; +import org.apache.asterix.active.IActiveMessage; +import org.apache.asterix.common.exceptions.AsterixException; +import org.apache.asterix.common.utils.StoragePathUtil; +import org.apache.asterix.external.api.IAdapterFactory; +import org.apache.asterix.external.feed.api.IFeedJoint; +import org.apache.asterix.external.feed.management.FeedConnectionId; +import org.apache.asterix.external.feed.management.FeedEventsListener; +import org.apache.asterix.external.feed.message.EndFeedMessage; +import org.apache.asterix.external.feed.policy.FeedPolicyAccessor; +import org.apache.asterix.external.feed.watch.FeedConnectJobInfo; +import org.apache.asterix.external.operators.FeedMessageOperatorDescriptor; +import org.apache.asterix.external.util.FeedConstants; +import org.apache.asterix.external.util.FeedUtils; +import org.apache.asterix.external.util.FeedUtils.FeedRuntimeType; +import org.apache.asterix.metadata.declared.MetadataProvider; +import org.apache.asterix.metadata.entities.Feed; +import org.apache.asterix.runtime.utils.ClusterStateManager; +import org.apache.asterix.runtime.utils.RuntimeUtils; +import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; +import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint; +import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper; +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.algebricks.common.utils.Pair; +import org.apache.hyracks.algebricks.common.utils.Triple; +import org.apache.hyracks.api.dataflow.IOperatorDescriptor; +import org.apache.hyracks.api.io.FileSplit; +import org.apache.hyracks.api.job.JobSpecification; +import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor; +import org.apache.hyracks.dataflow.std.file.FileRemoveOperatorDescriptor; +import org.apache.hyracks.dataflow.std.file.IFileSplitProvider; +import org.apache.hyracks.dataflow.std.misc.NullSinkOperatorDescriptor; + +/** + * Provides helper method(s) for creating JobSpec for operations on a feed. + */ +public class FeedOperations { + + private FeedOperations() { + } + + /** + * Builds the job spec for ingesting a (primary) feed from its external source via the feed adaptor. + * + * @param primaryFeed + * @param metadataProvider + * @return JobSpecification the Hyracks job specification for receiving data from external source + * @throws Exception + */ + public static Pair<JobSpecification, IAdapterFactory> buildFeedIntakeJobSpec(Feed primaryFeed, + MetadataProvider metadataProvider, FeedPolicyAccessor policyAccessor) throws Exception { + JobSpecification spec = RuntimeUtils.createJobSpecification(); + spec.setFrameSize(FeedConstants.JobConstants.DEFAULT_FRAME_SIZE); + IAdapterFactory adapterFactory; + IOperatorDescriptor feedIngestor; + AlgebricksPartitionConstraint ingesterPc; + Triple<IOperatorDescriptor, AlgebricksPartitionConstraint, IAdapterFactory> t = + metadataProvider.buildFeedIntakeRuntime(spec, primaryFeed, policyAccessor); + feedIngestor = t.first; + ingesterPc = t.second; + adapterFactory = t.third; + AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, feedIngestor, ingesterPc); + NullSinkOperatorDescriptor nullSink = new NullSinkOperatorDescriptor(spec); + AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, nullSink, ingesterPc); + spec.connect(new OneToOneConnectorDescriptor(spec), feedIngestor, 0, nullSink, 0); + spec.addRoot(nullSink); + return new Pair<>(spec, adapterFactory); + } + + /** + * Builds the job spec for sending message to an active feed to disconnect it from the + * its source. + */ + public static Pair<JobSpecification, Boolean> buildDisconnectFeedJobSpec(FeedConnectionId connectionId) + throws AlgebricksException { + + JobSpecification spec = RuntimeUtils.createJobSpecification(); + IOperatorDescriptor feedMessenger; + AlgebricksPartitionConstraint messengerPc; + List<String> locations = null; + FeedRuntimeType sourceRuntimeType; + try { + FeedEventsListener listener = (FeedEventsListener) ActiveJobNotificationHandler.INSTANCE + .getActiveEntityListener(connectionId.getFeedId()); + FeedConnectJobInfo cInfo = listener.getFeedConnectJobInfo(connectionId); + IFeedJoint sourceFeedJoint = cInfo.getSourceFeedJoint(); + IFeedJoint computeFeedJoint = cInfo.getComputeFeedJoint(); + + boolean terminateIntakeJob = false; + boolean completeDisconnect = computeFeedJoint == null || computeFeedJoint.getReceivers().isEmpty(); + if (completeDisconnect) { + sourceRuntimeType = FeedRuntimeType.INTAKE; + locations = cInfo.getCollectLocations(); + terminateIntakeJob = sourceFeedJoint.getReceivers().size() == 1; + } else { + locations = cInfo.getComputeLocations(); + sourceRuntimeType = FeedRuntimeType.COMPUTE; + } + + Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> p = buildDisconnectFeedMessengerRuntime(spec, + connectionId, locations, sourceRuntimeType, completeDisconnect, sourceFeedJoint.getOwnerFeedId()); + + feedMessenger = p.first; + messengerPc = p.second; + + AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, feedMessenger, messengerPc); + NullSinkOperatorDescriptor nullSink = new NullSinkOperatorDescriptor(spec); + AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, nullSink, messengerPc); + spec.connect(new OneToOneConnectorDescriptor(spec), feedMessenger, 0, nullSink, 0); + spec.addRoot(nullSink); + return new Pair<>(spec, terminateIntakeJob); + + } catch (AlgebricksException e) { + throw new AsterixException(e); + } + + } + + private static Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildSendFeedMessageRuntime( + JobSpecification jobSpec, FeedConnectionId feedConenctionId, IActiveMessage feedMessage, + Collection<String> locations) throws AlgebricksException { + AlgebricksPartitionConstraint partitionConstraint = + new AlgebricksAbsolutePartitionConstraint(locations.toArray(new String[] {})); + FeedMessageOperatorDescriptor feedMessenger = + new FeedMessageOperatorDescriptor(jobSpec, feedConenctionId, feedMessage); + return new Pair<>(feedMessenger, partitionConstraint); + } + + private static Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildDisconnectFeedMessengerRuntime( + JobSpecification jobSpec, FeedConnectionId feedConenctionId, List<String> locations, + FeedRuntimeType sourceFeedRuntimeType, boolean completeDisconnection, EntityId sourceFeedId) + throws AlgebricksException { + IActiveMessage feedMessage = new EndFeedMessage(feedConenctionId, sourceFeedRuntimeType, sourceFeedId, + completeDisconnection, EndFeedMessage.EndMessageType.DISCONNECT_FEED); + return buildSendFeedMessageRuntime(jobSpec, feedConenctionId, feedMessage, locations); + } + + public static JobSpecification buildRemoveFeedStorageJob(Feed feed) throws AsterixException { + JobSpecification spec = RuntimeUtils.createJobSpecification(); + AlgebricksAbsolutePartitionConstraint allCluster = ClusterStateManager.INSTANCE.getClusterLocations(); + Set<String> nodes = new TreeSet<>(); + for (String node : allCluster.getLocations()) { + nodes.add(node); + } + AlgebricksAbsolutePartitionConstraint locations = + new AlgebricksAbsolutePartitionConstraint(nodes.toArray(new String[nodes.size()])); + FileSplit[] feedLogFileSplits = + FeedUtils.splitsForAdapter(feed.getDataverseName(), feed.getFeedName(), locations); + Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = + StoragePathUtil.splitProviderAndPartitionConstraints(feedLogFileSplits); + FileRemoveOperatorDescriptor frod = new FileRemoveOperatorDescriptor(spec, splitsAndConstraint.first, true); + AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, frod, splitsAndConstraint.second); + spec.addRoot(frod); + return spec; + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/functions/MetadataBuiltinFunctions.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/functions/MetadataBuiltinFunctions.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/functions/MetadataBuiltinFunctions.java index 185232e..137e625 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/functions/MetadataBuiltinFunctions.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/functions/MetadataBuiltinFunctions.java @@ -25,7 +25,7 @@ import org.apache.asterix.om.typecomputer.base.IResultTypeComputer; import org.apache.asterix.om.types.ATypeTag; import org.apache.asterix.om.types.BuiltinType; import org.apache.asterix.om.types.IAType; -import org.apache.asterix.om.util.ConstantExpressionUtil; +import org.apache.asterix.om.utils.ConstantExpressionUtil; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.algebricks.common.utils.Pair; import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
