http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87805197/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/nonexist_dataset/nonexist_dataset.1.adm ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/nonexist_dataset/nonexist_dataset.1.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/nonexist_dataset/nonexist_dataset.1.adm new file mode 100644 index 0000000..4b9eb7d --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/nonexist_dataset/nonexist_dataset.1.adm @@ -0,0 +1 @@ +{"results":"successful"} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87805197/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataset/single_dataset.10.adm ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataset/single_dataset.10.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataset/single_dataset.10.adm new file mode 100644 index 0000000..3a8dfe0 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataset/single_dataset.10.adm @@ -0,0 +1 @@ +6005 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87805197/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataset/single_dataset.11.adm ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataset/single_dataset.11.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataset/single_dataset.11.adm new file mode 100644 index 0000000..3c6303a --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataset/single_dataset.11.adm @@ -0,0 +1 @@ +{ "DatasetName": "LineItem", "GroupName": "LineItem_2", "rebalanceCount": 2 } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87805197/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataset/single_dataset.3.adm ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataset/single_dataset.3.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataset/single_dataset.3.adm new file mode 100644 index 0000000..3a8dfe0 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataset/single_dataset.3.adm @@ -0,0 +1 @@ +6005 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87805197/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataset/single_dataset.4.adm ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataset/single_dataset.4.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataset/single_dataset.4.adm new file mode 100644 index 0000000..4b9eb7d --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataset/single_dataset.4.adm @@ -0,0 +1 @@ +{"results":"successful"} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87805197/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataset/single_dataset.5.adm ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataset/single_dataset.5.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataset/single_dataset.5.adm new file mode 100644 index 0000000..4f0990e --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataset/single_dataset.5.adm @@ -0,0 +1 @@ +{"temp":false,"keys":"l_orderkey,l_linenumber","type":{"type":"org.apache.asterix.om.types.ARecordType","name":"LineItemType","open":false,"fields":[{"l_orderkey":{"type":"AInt64"}},{"l_partkey":{"type":"AInt64"}},{"l_suppkey":{"type":"AInt64"}},{"l_linenumber":{"type":"AInt64"}},{"l_quantity":{"type":"ADouble"}},{"l_extendedprice":{"type":"ADouble"}},{"l_discount":{"type":"ADouble"}},{"l_tax":{"type":"ADouble"}},{"l_returnflag":{"type":"AString"}},{"l_linestatus":{"type":"AString"}},{"l_shipdate":{"type":"AString"}},{"l_commitdate":{"type":"AString"}},{"l_receiptdate":{"type":"AString"}},{"l_shipinstruct":{"type":"AString"}},{"l_shipmode":{"type":"AString"}},{"l_comment":{"type":"AString"}}]},"splits":[{"ip":"127.0.0.1","path":"storage/partition_0/tpch/1/LineItem_idx_LineItem"},{"ip":"127.0.0.1","path":"storage/partition_1/tpch/1/LineItem_idx_LineItem"}]} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87805197/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataset/single_dataset.6.adm ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataset/single_dataset.6.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataset/single_dataset.6.adm new file mode 100644 index 0000000..3a8dfe0 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataset/single_dataset.6.adm @@ -0,0 +1 @@ +6005 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87805197/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataset/single_dataset.7.adm ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataset/single_dataset.7.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataset/single_dataset.7.adm new file mode 100644 index 0000000..2760db0 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataset/single_dataset.7.adm @@ -0,0 +1 @@ +{ "DatasetName": "LineItem", "GroupName": "LineItem_1", "rebalanceCount": 1 } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87805197/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataset/single_dataset.8.adm ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataset/single_dataset.8.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataset/single_dataset.8.adm new file mode 100644 index 0000000..4b9eb7d --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataset/single_dataset.8.adm @@ -0,0 +1 @@ +{"results":"successful"} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87805197/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataset/single_dataset.9.adm ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataset/single_dataset.9.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataset/single_dataset.9.adm new file mode 100644 index 0000000..44c244c --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataset/single_dataset.9.adm @@ -0,0 +1 @@ +{"temp":false,"keys":"l_orderkey,l_linenumber","type":{"type":"org.apache.asterix.om.types.ARecordType","name":"LineItemType","open":false,"fields":[{"l_orderkey":{"type":"AInt64"}},{"l_partkey":{"type":"AInt64"}},{"l_suppkey":{"type":"AInt64"}},{"l_linenumber":{"type":"AInt64"}},{"l_quantity":{"type":"ADouble"}},{"l_extendedprice":{"type":"ADouble"}},{"l_discount":{"type":"ADouble"}},{"l_tax":{"type":"ADouble"}},{"l_returnflag":{"type":"AString"}},{"l_linestatus":{"type":"AString"}},{"l_shipdate":{"type":"AString"}},{"l_commitdate":{"type":"AString"}},{"l_receiptdate":{"type":"AString"}},{"l_shipinstruct":{"type":"AString"}},{"l_shipmode":{"type":"AString"}},{"l_comment":{"type":"AString"}}]},"splits":[{"ip":"127.0.0.1","path":"storage/partition_0/tpch/2/LineItem_idx_LineItem"},{"ip":"127.0.0.1","path":"storage/partition_1/tpch/2/LineItem_idx_LineItem"},{"ip":"127.0.0.1","path":"storage/partition_2/tpch/2/LineItem_idx_LineItem"},{"ip":"127.0.0.1","path":"storage/partition_3/tpch/2/ LineItem_idx_LineItem"}]} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87805197/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java index f06a0b7..35b7d4c 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java @@ -100,6 +100,7 @@ public class ErrorCode { public static final int COMPILATION_BAD_QUERY_PARAMETER_VALUE = 1037; public static final int COMPILATION_ILLEGAL_STATE = 1038; public static final int COMPILATION_TWO_PHASE_LOCKING_VIOLATION = 1039; + public static final int DATASET_ID_EXHAUSTED = 1040; // Feed errors public static final int DATAFLOW_ILLEGAL_STATE = 3001; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87805197/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/Servlets.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/Servlets.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/Servlets.java index 3047ef5..60e6060 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/Servlets.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/Servlets.java @@ -32,6 +32,7 @@ public class Servlets { public static final String QUERY_RESULT = "/query/service/result/*"; public static final String QUERY_SERVICE = "/query/service"; public static final String CONNECTOR = "/connector"; + public static final String REBALANCE = "/admin/rebalance"; public static final String SHUTDOWN = "/admin/shutdown"; public static final String VERSION = "/admin/version"; public static final String RUNNING_REQUESTS = "/admin/requests/running/*"; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87805197/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java index 1aa1474..2e98abd 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java @@ -37,7 +37,6 @@ public class StoragePathUtil { public static final String PARTITION_DIR_PREFIX = "partition_"; public static final String TEMP_DATASETS_STORAGE_FOLDER = "temp"; public static final String DATASET_INDEX_NAME_SEPARATOR = "_idx_"; - public static final String ADAPTER_INSTANCE_PREFIX = "adapter_"; private StoragePathUtil() { } @@ -61,16 +60,18 @@ public class StoragePathUtil { return storageDirName + File.separator + StoragePathUtil.PARTITION_DIR_PREFIX + partitonId; } - public static String prepareDataverseIndexName(String dataverseName, String datasetName, String idxName) { - return prepareDataverseIndexName(dataverseName, prepareFullIndexName(datasetName, idxName)); + public static String prepareDataverseIndexName(String dataverseName, String datasetName, String idxName, + long rebalanceCount) { + return prepareDataverseIndexName(dataverseName, prepareFullIndexName(datasetName, idxName, rebalanceCount)); } public static String prepareDataverseIndexName(String dataverseName, String fullIndexName) { return dataverseName + File.separator + fullIndexName; } - private static String prepareFullIndexName(String datasetName, String idxName) { - return datasetName + DATASET_INDEX_NAME_SEPARATOR + idxName; + private static String prepareFullIndexName(String datasetName, String idxName, long rebalanceCount) { + return (rebalanceCount == 0 ? "" : rebalanceCount + File.separator) + datasetName + DATASET_INDEX_NAME_SEPARATOR + + idxName; } public static int getPartitionNumFromName(String name) { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87805197/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties index bba3a43..1f80fad 100644 --- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties +++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties @@ -86,6 +86,7 @@ 1037 = Invalid query parameter %1$s -- value has to be greater than or equal to %2$s bytes 1038 = Illegal state. %1$s 1039 = Two-phase locking violation -- locks can not be acquired after unlocking +1040 = Dataset id space is exhausted # Feed Errors 3001 = Illegal state. http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87805197/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataCache.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataCache.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataCache.java index 01ade10..c84a5bd 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataCache.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataCache.java @@ -26,7 +26,6 @@ import java.util.List; import java.util.Map; import org.apache.asterix.common.config.DatasetConfig.DatasetType; -import org.apache.asterix.common.config.DatasetConfig.IndexType; import org.apache.asterix.common.functions.FunctionSignature; import org.apache.asterix.metadata.api.IMetadataEntity; import org.apache.asterix.metadata.entities.CompactionPolicy; @@ -39,9 +38,9 @@ import org.apache.asterix.metadata.entities.FeedConnection; import org.apache.asterix.metadata.entities.FeedPolicyEntity; import org.apache.asterix.metadata.entities.Function; import org.apache.asterix.metadata.entities.Index; -import org.apache.asterix.metadata.entities.InternalDatasetDetails; import org.apache.asterix.metadata.entities.Library; import org.apache.asterix.metadata.entities.NodeGroup; +import org.apache.asterix.metadata.utils.IndexUtil; /** * Caches metadata entities such that the MetadataManager does not have to @@ -161,10 +160,7 @@ public class MetadataCache { // Add the primary index associated with the dataset, if the dataset is an // internal dataset. if (dataset.getDatasetType() == DatasetType.INTERNAL) { - InternalDatasetDetails id = (InternalDatasetDetails) dataset.getDatasetDetails(); - Index index = new Index(dataset.getDataverseName(), dataset.getDatasetName(), - dataset.getDatasetName(), IndexType.BTREE, id.getPartitioningKey(), - id.getKeySourceIndicator(), id.getPrimaryKeyType(), false, true, dataset.getPendingOp()); + Index index = IndexUtil.getPrimaryIndex(dataset); addIndexIfNotExistsInternal(index); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87805197/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java index 272cced..673a5ae 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java @@ -335,10 +335,6 @@ public class MetadataBootstrap { String resourceName = metadataPartitionPath + File.separator + index.getFileNameRelativePath(); FileReference file = ioManager.getFileReference(metadataDeviceId, resourceName); index.setFile(file); - // this should not be done this way. dataset lifecycle manager shouldn't return virtual buffer caches for - // a dataset that was not yet created - List<IVirtualBufferCache> virtualBufferCaches = appContext.getDatasetLifecycleManager() - .getVirtualBufferCaches(index.getDatasetId().getId(), metadataPartition.getIODeviceNum()); ITypeTraits[] typeTraits = index.getTypeTraits(); IBinaryComparatorFactory[] cmpFactories = index.getKeyBinaryComparatorFactory(); int[] bloomFilterKeyFields = index.getBloomFilterKeyFields(); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87805197/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataIndex.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataIndex.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataIndex.java index ee5c9f3..4103a2c 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataIndex.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataIndex.java @@ -19,7 +19,6 @@ package org.apache.asterix.metadata.bootstrap; -import java.io.File; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -27,6 +26,7 @@ import java.util.List; import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties; import org.apache.asterix.common.transactions.DatasetId; import org.apache.asterix.common.transactions.ImmutableDatasetId; +import org.apache.asterix.common.utils.StoragePathUtil; import org.apache.asterix.formats.nontagged.BinaryComparatorFactoryProvider; import org.apache.asterix.formats.nontagged.BinaryHashFunctionFactoryProvider; import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider; @@ -232,7 +232,9 @@ public class MetadataIndex implements IMetadataIndex { @Override public String getFileNameRelativePath() { - return getDataverseName() + File.separator + getIndexedDatasetName() + "_idx_" + getIndexName(); + // The rebalance count for metadata dataset is always 0. + return StoragePathUtil.prepareDataverseIndexName(getDataverseName(), getIndexedDatasetName(), getIndexName(), + 0); } @Override http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87805197/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataManagerUtil.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataManagerUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataManagerUtil.java index b647bb7..49b32c0 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataManagerUtil.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataManagerUtil.java @@ -124,6 +124,11 @@ public class MetadataManagerUtil { return new DefaultNodeGroupDomain(partitions); } + public static List<String> findNodes(MetadataTransactionContext mdTxnCtx, String nodeGroupName) + throws AlgebricksException { + return MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, nodeGroupName).getNodeNames(); + } + public static Feed findFeed(MetadataTransactionContext mdTxnCtx, String dataverse, String feedName) throws AlgebricksException { try { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87805197/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java index 69f4e03..e0cfc28 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java @@ -82,7 +82,6 @@ import org.apache.asterix.om.utils.NonTaggedFormatUtil; import org.apache.asterix.runtime.base.AsterixTupleFilterFactory; import org.apache.asterix.runtime.formats.FormatUtils; import org.apache.asterix.runtime.job.listener.JobEventListenerFactory; -import org.apache.asterix.runtime.operators.LSMPrimaryUpsertOperatorDescriptor; import org.apache.asterix.runtime.operators.LSMSecondaryUpsertOperatorDescriptor; import org.apache.asterix.runtime.utils.ClusterStateManager; import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; @@ -302,10 +301,22 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> return MetadataManagerUtil.findNodeDomain(mdTxnCtx, nodeGroupName); } + public List<String> findNodes(String nodeGroupName) throws AlgebricksException { + return MetadataManagerUtil.findNodes(mdTxnCtx, nodeGroupName); + } + public IAType findType(String dataverse, String typeName) throws AlgebricksException { return MetadataManagerUtil.findType(mdTxnCtx, dataverse, typeName); } + public IAType findType(Dataset dataset) throws AlgebricksException { + return findType(dataset.getItemTypeDataverseName(), dataset.getItemTypeName()); + } + + public IAType findMetaType(Dataset dataset) throws AlgebricksException { + return findType(dataset.getMetaItemTypeDataverseName(), dataset.getMetaItemTypeName()); + } + public Feed findFeed(String dataverse, String feedName) throws AlgebricksException { return MetadataManagerUtil.findFeed(mdTxnCtx, dataverse, feedName); } @@ -381,17 +392,6 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> return new Pair<>(dataScanner, constraint); } - public IDataFormat getDataFormat(String dataverseName) throws CompilationException { - Dataverse dataverse = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverseName); - IDataFormat format; - try { - format = (IDataFormat) Class.forName(dataverse.getDataFormat()).newInstance(); - } catch (Exception e) { - throw new CompilationException(e); - } - return format; - } - public Dataverse findDataverse(String dataverseName) throws CompilationException { return MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverseName); } @@ -760,10 +760,9 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> return SplitsAndConstraintsUtil.getDataverseSplitProviderAndConstraints(dataverse); } - public FileSplit[] splitsForDataset(MetadataTransactionContext mdTxnCtx, String dataverseName, String datasetName, - String targetIdxName, boolean temp) throws AlgebricksException { - return SplitsAndConstraintsUtil.getDatasetSplits(findDataset(dataverseName, datasetName), mdTxnCtx, - targetIdxName, temp); + public FileSplit[] splitsForIndex(MetadataTransactionContext mdTxnCtx, Dataset dataset, String indexName) + throws AlgebricksException { + return SplitsAndConstraintsUtil.getIndexSplits(dataset, indexName, mdTxnCtx); } public DatasourceAdapter getAdapter(MetadataTransactionContext mdTxnCtx, String dataverseName, String adapterName) @@ -860,80 +859,11 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> fieldPermutation[i++] = idx; } } - try { - Index primaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(), - dataset.getDatasetName(), dataset.getDatasetName()); - String itemTypeName = dataset.getItemTypeName(); - String itemTypeDataverseName = dataset.getItemTypeDataverseName(); - ARecordType itemType = (ARecordType) MetadataManager.INSTANCE - .getDatatype(mdTxnCtx, itemTypeDataverseName, itemTypeName).getDatatype(); - ARecordType metaItemType = DatasetUtil.getMetaType(this, dataset); - Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = - getSplitProviderAndConstraints(dataset); - // prepare callback - JobId jobId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId(); - int[] primaryKeyFields = new int[numKeys]; - for (i = 0; i < numKeys; i++) { - primaryKeyFields[i] = i; - } - - boolean hasSecondaries = MetadataManager.INSTANCE - .getDatasetIndexes(mdTxnCtx, dataset.getDataverseName(), dataset.getDatasetName()).size() > 1; - - IModificationOperationCallbackFactory modificationCallbackFactory = dataset.getModificationCallbackFactory( - storaegComponentProvider, primaryIndex, jobId, IndexOperation.UPSERT, primaryKeyFields); - ISearchOperationCallbackFactory searchCallbackFactory = dataset.getSearchCallbackFactory( - storaegComponentProvider, primaryIndex, jobId, IndexOperation.UPSERT, primaryKeyFields); - IIndexDataflowHelperFactory idfh = - new IndexDataflowHelperFactory(storaegComponentProvider.getStorageManager(), splitsAndConstraint.first); - LSMPrimaryUpsertOperatorDescriptor op; - ITypeTraits[] outputTypeTraits = - new ITypeTraits[recordDesc.getFieldCount() + (dataset.hasMetaPart() ? 2 : 1) + numFilterFields]; - ISerializerDeserializer<?>[] outputSerDes = new ISerializerDeserializer[recordDesc.getFieldCount() - + (dataset.hasMetaPart() ? 2 : 1) + numFilterFields]; - - // add the previous record first - int f = 0; - outputSerDes[f] = FormatUtils.getDefaultFormat().getSerdeProvider().getSerializerDeserializer(itemType); - f++; - // add the previous meta second - if (dataset.hasMetaPart()) { - outputSerDes[f] = - FormatUtils.getDefaultFormat().getSerdeProvider().getSerializerDeserializer(metaItemType); - outputTypeTraits[f] = FormatUtils.getDefaultFormat().getTypeTraitProvider().getTypeTrait(metaItemType); - f++; - } - // add the previous filter third - int fieldIdx = -1; - if (numFilterFields > 0) { - String filterField = DatasetUtil.getFilterField(dataset).get(0); - for (i = 0; i < itemType.getFieldNames().length; i++) { - if (itemType.getFieldNames()[i].equals(filterField)) { - break; - } - } - fieldIdx = i; - outputTypeTraits[f] = FormatUtils.getDefaultFormat().getTypeTraitProvider() - .getTypeTrait(itemType.getFieldTypes()[fieldIdx]); - outputSerDes[f] = FormatUtils.getDefaultFormat().getSerdeProvider() - .getSerializerDeserializer(itemType.getFieldTypes()[fieldIdx]); - f++; - } - for (int j = 0; j < recordDesc.getFieldCount(); j++) { - outputTypeTraits[j + f] = recordDesc.getTypeTraits()[j]; - outputSerDes[j + f] = recordDesc.getFields()[j]; - } - RecordDescriptor outputRecordDesc = new RecordDescriptor(outputSerDes, outputTypeTraits); - op = new LSMPrimaryUpsertOperatorDescriptor(spec, outputRecordDesc, fieldPermutation, idfh, - context.getMissingWriterFactory(), modificationCallbackFactory, searchCallbackFactory, - dataset.getFrameOpCallbackFactory(), numKeys, itemType, fieldIdx, hasSecondaries); - return new Pair<>(op, splitsAndConstraint.second); - - } catch (MetadataException me) { - throw new AlgebricksException(me); - } + return DatasetUtil.createPrimaryIndexUpsertOp(spec, this, dataset, recordDesc, fieldPermutation, + context.getMissingWriterFactory()); } + public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildExternalDatasetDataScannerRuntime( JobSpecification jobSpec, IAType itemType, IAdapterFactory adapterFactory, IDataFormat format) throws AlgebricksException { @@ -1635,15 +1565,12 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> public Pair<IFileSplitProvider, AlgebricksPartitionConstraint> getSplitProviderAndConstraints(Dataset ds) throws AlgebricksException { - FileSplit[] splits = splitsForDataset(mdTxnCtx, ds.getDataverseName(), ds.getDatasetName(), ds.getDatasetName(), - ds.getDatasetDetails().isTemp()); - return StoragePathUtil.splitProviderAndPartitionConstraints(splits); + return getSplitProviderAndConstraints(ds, ds.getDatasetName()); } public Pair<IFileSplitProvider, AlgebricksPartitionConstraint> getSplitProviderAndConstraints(Dataset ds, String indexName) throws AlgebricksException { - FileSplit[] splits = splitsForDataset(mdTxnCtx, ds.getDataverseName(), ds.getDatasetName(), indexName, - ds.getDatasetDetails().isTemp()); + FileSplit[] splits = splitsForIndex(mdTxnCtx, ds, indexName); return StoragePathUtil.splitProviderAndPartitionConstraints(splits); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87805197/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java index 34fa7bb..4b31767 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java @@ -25,6 +25,7 @@ import java.util.Map; import java.util.Objects; import java.util.logging.Level; import java.util.logging.Logger; +import java.util.stream.IntStream; import org.apache.asterix.active.ActiveLifecycleListener; import org.apache.asterix.active.IActiveEntityEventsListener; @@ -44,6 +45,9 @@ import org.apache.asterix.common.utils.JobUtils; import org.apache.asterix.common.utils.JobUtils.ProgressState; import org.apache.asterix.external.feed.management.FeedConnectionId; import org.apache.asterix.external.indexing.IndexingConstants; +import org.apache.asterix.formats.nontagged.BinaryHashFunctionFactoryProvider; +import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider; +import org.apache.asterix.formats.nontagged.TypeTraitProvider; import org.apache.asterix.metadata.IDatasetDetails; import org.apache.asterix.metadata.MetadataCache; import org.apache.asterix.metadata.MetadataManager; @@ -75,17 +79,23 @@ import org.apache.asterix.transaction.management.opcallbacks.TempDatasetSecondar import org.apache.asterix.transaction.management.opcallbacks.UpsertOperationCallbackFactory; import org.apache.asterix.transaction.management.resource.DatasetLocalResourceFactory; import org.apache.asterix.transaction.management.runtime.CommitRuntimeFactory; +import org.apache.asterix.transaction.management.service.transaction.DatasetIdFactory; import org.apache.commons.lang3.mutable.MutableBoolean; import org.apache.commons.lang3.mutable.MutableObject; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.algebricks.common.utils.Pair; import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator; import org.apache.hyracks.algebricks.data.IBinaryComparatorFactoryProvider; +import org.apache.hyracks.algebricks.data.ISerializerDeserializerProvider; import org.apache.hyracks.algebricks.data.ITypeTraitProvider; import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory; import org.apache.hyracks.api.client.IHyracksClientConnection; import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; +import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory; +import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer; import org.apache.hyracks.api.dataflow.value.ITypeTraits; +import org.apache.hyracks.api.dataflow.value.RecordDescriptor; +import org.apache.hyracks.api.io.FileSplit; import org.apache.hyracks.api.job.JobSpecification; import org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory; import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory; @@ -133,7 +143,9 @@ public class Dataset implements IMetadataEntity<Dataset>, IDataset { private final IDatasetDetails datasetDetails; private final String metaTypeDataverseName; private final String metaTypeName; + private final long rebalanceCount; private int pendingOp; + /* * Transient (For caching) */ @@ -151,6 +163,31 @@ public class Dataset implements IMetadataEntity<Dataset>, IDataset { String metaItemTypeDataverseName, String metaItemTypeName, String nodeGroupName, String compactionPolicy, Map<String, String> compactionPolicyProperties, IDatasetDetails datasetDetails, Map<String, String> hints, DatasetType datasetType, int datasetId, int pendingOp) { + this(dataverseName, datasetName, itemTypeDataverseName, itemTypeName, metaItemTypeDataverseName, + metaItemTypeName, nodeGroupName, compactionPolicy, compactionPolicyProperties, datasetDetails, hints, + datasetType, datasetId, pendingOp, 0L); + } + + public Dataset(Dataset dataset) { + this(dataset.dataverseName, dataset.datasetName, dataset.recordTypeDataverseName, dataset.recordTypeName, + dataset.metaTypeDataverseName, dataset.metaTypeName, dataset.nodeGroupName, + dataset.compactionPolicyFactory, dataset.compactionPolicyProperties, dataset.datasetDetails, + dataset.hints, dataset.datasetType, dataset.datasetId, dataset.pendingOp, dataset.rebalanceCount); + } + + public Dataset(Dataset dataset, boolean forRebalance, String targetNodeGroupName) { + this(dataset.dataverseName, dataset.datasetName, dataset.recordTypeDataverseName, dataset.recordTypeName, + dataset.metaTypeDataverseName, dataset.metaTypeName, targetNodeGroupName, + dataset.compactionPolicyFactory, + dataset.compactionPolicyProperties, dataset.datasetDetails, dataset.hints, dataset.datasetType, + forRebalance ? DatasetIdFactory.generateAlternatingDatasetId(dataset.datasetId) : dataset.datasetId, + dataset.pendingOp, forRebalance ? dataset.rebalanceCount + 1 : dataset.rebalanceCount); + } + + public Dataset(String dataverseName, String datasetName, String itemTypeDataverseName, String itemTypeName, + String metaItemTypeDataverseName, String metaItemTypeName, String nodeGroupName, String compactionPolicy, + Map<String, String> compactionPolicyProperties, IDatasetDetails datasetDetails, Map<String, String> hints, + DatasetType datasetType, int datasetId, int pendingOp, long rebalanceCount) { this.dataverseName = dataverseName; this.datasetName = datasetName; this.recordTypeName = itemTypeName; @@ -165,13 +202,7 @@ public class Dataset implements IMetadataEntity<Dataset>, IDataset { this.datasetId = datasetId; this.pendingOp = pendingOp; this.hints = hints; - } - - public Dataset(Dataset dataset) { - this(dataset.dataverseName, dataset.datasetName, dataset.recordTypeDataverseName, dataset.recordTypeName, - dataset.metaTypeDataverseName, dataset.metaTypeName, dataset.nodeGroupName, - dataset.compactionPolicyFactory, dataset.compactionPolicyProperties, dataset.datasetDetails, - dataset.hints, dataset.datasetType, dataset.datasetId, dataset.pendingOp); + this.rebalanceCount = rebalanceCount; } public String getDataverseName() { @@ -230,6 +261,10 @@ public class Dataset implements IMetadataEntity<Dataset>, IDataset { return metaTypeName; } + public long getRebalanceCount() { + return rebalanceCount; + } + public boolean hasMetaPart() { return metaTypeDataverseName != null && metaTypeName != null; } @@ -376,7 +411,7 @@ public class Dataset implements IMetadataEntity<Dataset>, IDataset { // Drop the associated nodegroup String nodegroup = getNodeGroupName(); if (!nodegroup.equalsIgnoreCase(MetadataConstants.METADATA_DEFAULT_NODEGROUP_NAME)) { - MetadataManager.INSTANCE.dropNodegroup(mdTxnCtx.getValue(), dataverseName + ":" + datasetName); + MetadataManager.INSTANCE.dropNodegroup(mdTxnCtx.getValue(), nodegroup); } } @@ -591,31 +626,26 @@ public class Dataset implements IMetadataEntity<Dataset>, IDataset { return Objects.hash(dataverseName, datasetName); } - public IPushRuntimeFactory getCommitRuntimeFactory(JobId jobId, int[] primaryKeyFields, - MetadataProvider metadataProvider, int[] datasetPartitions, boolean isSink) { - return new CommitRuntimeFactory(jobId, datasetId, primaryKeyFields, - metadataProvider.isTemporaryDatasetWriteJob(), metadataProvider.isWriteTransaction(), datasetPartitions, - isSink); - } - /** - * Get the index dataflow helper factory for the dataset's primary index + * Gets the commit runtime factory for inserting/upserting/deleting operations on this dataset. * - * @param mdProvider - * an instance of metadata provider that is used to fetch metadata information + * @param metadataProvider, + * the metadata provider. + * @param jobId, + * the AsterixDB job id for transaction management. + * @param primaryKeyFieldPermutation, + * the primary key field permutation according to the input. + * @param isSink, + * whether this commit runtime is the last operator in the pipeline. + * @return the commit runtime factory for inserting/upserting/deleting operations on this dataset. * @throws AlgebricksException */ - public IResourceFactory getResourceFactory(MetadataProvider mdProvider) throws AlgebricksException { - if (getDatasetType() != DatasetType.INTERNAL) { - throw new AlgebricksException(ErrorCode.ASTERIX, - ErrorCode.COMPILATION_DATASET_TYPE_DOES_NOT_HAVE_PRIMARY_INDEX, getDatasetType()); - } - Index index = mdProvider.getIndex(getDataverseName(), getDatasetName(), getDatasetName()); - ARecordType recordType = (ARecordType) mdProvider.findType(getItemTypeDataverseName(), getItemTypeName()); - ARecordType metaType = (ARecordType) mdProvider.findType(getMetaItemTypeDataverseName(), getMetaItemTypeName()); - Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = - DatasetUtil.getMergePolicyFactory(this, mdProvider.getMetadataTxnContext()); - return getResourceFactory(mdProvider, index, recordType, metaType, compactionInfo.first, compactionInfo.second); + public IPushRuntimeFactory getCommitRuntimeFactory(MetadataProvider metadataProvider, JobId jobId, + int[] primaryKeyFieldPermutation, boolean isSink) throws AlgebricksException { + int[] datasetPartitions = getDatasetPartitions(metadataProvider); + return new CommitRuntimeFactory(jobId, datasetId, primaryKeyFieldPermutation, + metadataProvider.isTemporaryDatasetWriteJob(), metadataProvider.isWriteTransaction(), datasetPartitions, + isSink); } public IFrameOperationCallbackFactory getFrameOpCallbackFactory() { @@ -659,6 +689,57 @@ public class Dataset implements IMetadataEntity<Dataset>, IDataset { return typeTraits; } + /** + * Gets the record descriptor for primary records of this dataset. + * + * @param metadataProvider, + * the metadata provider. + * @return the record descriptor for primary records of this dataset. + * @throws AlgebricksException + */ + public RecordDescriptor getPrimaryRecordDescriptor(MetadataProvider metadataProvider) throws AlgebricksException { + List<List<String>> partitioningKeys = getPrimaryKeys(); + int numPrimaryKeys = partitioningKeys.size(); + ISerializerDeserializer[] primaryRecFields = new ISerializerDeserializer[numPrimaryKeys + 1 + + (hasMetaPart() ? 1 : 0)]; + ITypeTraits[] primaryTypeTraits = new ITypeTraits[numPrimaryKeys + 1 + (hasMetaPart() ? 1 : 0)]; + ISerializerDeserializerProvider serdeProvider = metadataProvider.getFormat().getSerdeProvider(); + List<Integer> indicators = null; + if (hasMetaPart()) { + indicators = ((InternalDatasetDetails) getDatasetDetails()).getKeySourceIndicator(); + } + ARecordType itemType = (ARecordType) metadataProvider.findType(this); + ARecordType metaType = (ARecordType) metadataProvider.findMetaType(this); + + // Set the serde/traits for primary keys + for (int i = 0; i < numPrimaryKeys; i++) { + IAType keyType = (indicators == null || indicators.get(i) == 0) + ? itemType.getSubFieldType(partitioningKeys.get(i)) + : metaType.getSubFieldType(partitioningKeys.get(i)); + primaryRecFields[i] = serdeProvider.getSerializerDeserializer(keyType); + primaryTypeTraits[i] = TypeTraitProvider.INSTANCE.getTypeTrait(keyType); + } + + // Set the serde for the record field + primaryRecFields[numPrimaryKeys] = SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(itemType); + primaryTypeTraits[numPrimaryKeys] = TypeTraitProvider.INSTANCE.getTypeTrait(itemType); + if (hasMetaPart()) { + // Set the serde and traits for the meta record field + primaryRecFields[numPrimaryKeys + 1] = SerializerDeserializerProvider.INSTANCE + .getSerializerDeserializer(metaType); + primaryTypeTraits[numPrimaryKeys + 1] = TypeTraitProvider.INSTANCE.getTypeTrait(itemType); + } + return new RecordDescriptor(primaryRecFields, primaryTypeTraits); + } + + /** + * Gets the comparator factories for the primary key fields of this dataset. + * + * @param metadataProvider, + * the metadata provider. + * @return the comparator factories for the primary key fields of this dataset. + * @throws AlgebricksException + */ public IBinaryComparatorFactory[] getPrimaryComparatorFactories(MetadataProvider metadataProvider, ARecordType recordType, ARecordType metaType) throws AlgebricksException { IStorageComponentProvider storageComponentProvider = metadataProvider.getStorageComponentProvider(); @@ -671,22 +752,53 @@ public class Dataset implements IMetadataEntity<Dataset>, IDataset { indicators = ((InternalDatasetDetails) getDatasetDetails()).getKeySourceIndicator(); } for (int i = 0; i < numPrimaryKeys; i++) { + IAType keyType = (indicators == null || indicators.get(i) == 0) + ? recordType.getSubFieldType(partitioningKeys.get(i)) + : metaType.getSubFieldType(partitioningKeys.get(i)); + cmpFactories[i] = cmpFactoryProvider.getBinaryComparatorFactory(keyType, true); + } + return cmpFactories; + } + + /** + * Gets the hash function factories for the primary key fields of this dataset. + * + * @param metadataProvider, + * the metadata provider. + * @return the hash function factories for the primary key fields of this dataset. + * @throws AlgebricksException + */ + public IBinaryHashFunctionFactory[] getPrimaryHashFunctionFactories(MetadataProvider metadataProvider) + throws AlgebricksException { + ARecordType recordType = (ARecordType) metadataProvider.findType(this); + ARecordType metaType = (ARecordType) metadataProvider.findMetaType(this); + List<List<String>> partitioningKeys = getPrimaryKeys(); + int numPrimaryKeys = partitioningKeys.size(); + IBinaryHashFunctionFactory[] hashFuncFactories = new IBinaryHashFunctionFactory[numPrimaryKeys]; + List<Integer> indicators = null; + if (hasMetaPart()) { + indicators = ((InternalDatasetDetails) getDatasetDetails()).getKeySourceIndicator(); + } + for (int i = 0; i < numPrimaryKeys; i++) { IAType keyType = (indicators == null || indicators.get(i) == 0) ? recordType.getSubFieldType(partitioningKeys.get(i)) : metaType.getSubFieldType(partitioningKeys.get(i)); - cmpFactories[i] = cmpFactoryProvider.getBinaryComparatorFactory(keyType, true); + hashFuncFactories[i] = BinaryHashFunctionFactoryProvider.INSTANCE.getBinaryHashFunctionFactory(keyType); } - return cmpFactories; + return hashFuncFactories; } @Override public int[] getPrimaryBloomFilterFields() { List<List<String>> partitioningKeys = getPrimaryKeys(); int numPrimaryKeys = partitioningKeys.size(); - int[] bloomFilterKeyFields = new int[numPrimaryKeys]; - for (int i = 0; i < numPrimaryKeys; i++) { - bloomFilterKeyFields[i] = i; - } - return bloomFilterKeyFields; + return IntStream.range(0, numPrimaryKeys).toArray(); + } + + // Gets an array of partition numbers for this dataset. + protected int[] getDatasetPartitions(MetadataProvider metadataProvider) throws AlgebricksException { + FileSplit[] splitsForDataset = metadataProvider.splitsForIndex(metadataProvider.getMetadataTxnContext(), this, + getDatasetName()); + return IntStream.range(0, splitsForDataset.length).toArray(); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87805197/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Index.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Index.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Index.java index df47c70..56c3e5f 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Index.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Index.java @@ -41,7 +41,6 @@ public class Index implements IMetadataEntity<Index>, Comparable<Index> { private static final long serialVersionUID = 1L; public static final int RECORD_INDICATOR = 0; - public static final int META_INDICATOR = 1; private final String dataverseName; // Enforced to be unique within a dataverse. http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87805197/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java index c3c5023..b9464ed 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java @@ -51,8 +51,10 @@ import org.apache.asterix.metadata.utils.DatasetUtil; import org.apache.asterix.om.base.ABoolean; import org.apache.asterix.om.base.ADateTime; import org.apache.asterix.om.base.AInt32; +import org.apache.asterix.om.base.AInt64; import org.apache.asterix.om.base.AInt8; import org.apache.asterix.om.base.AMutableInt32; +import org.apache.asterix.om.base.AMutableInt64; import org.apache.asterix.om.base.AMutableString; import org.apache.asterix.om.base.AOrderedList; import org.apache.asterix.om.base.ARecord; @@ -73,26 +75,26 @@ import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; */ public class DatasetTupleTranslator extends AbstractTupleTranslator<Dataset> { private static final long serialVersionUID = 1L; - // Field indexes of serialized Dataset in a tuple. - // First key field. - public static final int DATASET_DATAVERSENAME_TUPLE_FIELD_INDEX = 0; - // Second key field. - public static final int DATASET_DATASETNAME_TUPLE_FIELD_INDEX = 1; // Payload field containing serialized Dataset. public static final int DATASET_PAYLOAD_TUPLE_FIELD_INDEX = 2; + private static final String REBALANCE_ID_FIELD_NAME = "rebalanceCount"; @SuppressWarnings("unchecked") protected final ISerializerDeserializer<ARecord> recordSerDes = SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(MetadataRecordTypes.DATASET_RECORDTYPE); protected final transient AMutableInt32 aInt32; protected final transient ISerializerDeserializer<AInt32> aInt32Serde; + protected final transient AMutableInt64 aBigInt; + protected final transient ISerializerDeserializer<AInt64> aBigIntSerde; protected final transient ArrayBackedValueStorage fieldName = new ArrayBackedValueStorage(); @SuppressWarnings("unchecked") protected DatasetTupleTranslator(boolean getTuple) { super(getTuple, MetadataPrimaryIndexes.DATASET_DATASET.getFieldCount()); aInt32 = new AMutableInt32(-1); + aBigInt = new AMutableInt64(-1); aInt32Serde = SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT32); + aBigIntSerde = SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT64); } @Override @@ -107,7 +109,6 @@ public class DatasetTupleTranslator extends AbstractTupleTranslator<Dataset> { } protected Dataset createDatasetFromARecord(ARecord datasetRecord) throws HyracksDataException { - String dataverseName = ((AString) datasetRecord.getValueByPos(MetadataRecordTypes.DATASET_ARECORD_DATAVERSENAME_FIELD_INDEX)) .getStringValue(); @@ -258,9 +259,14 @@ public class DatasetTupleTranslator extends AbstractTupleTranslator<Dataset> { metaTypeName = ((AString) datasetRecord.getValueByPos(metaTypeNameIndex)).getStringValue(); } + // Read the rebalance count if there is one. + int rebalanceCountIndex = datasetRecord.getType().getFieldIndex(REBALANCE_ID_FIELD_NAME); + long rebalanceCount = rebalanceCountIndex >= 0 + ? ((AInt64) datasetRecord.getValueByPos(rebalanceCountIndex)).getLongValue() : 0; + return new Dataset(dataverseName, datasetName, typeDataverseName, typeName, metaTypeDataverseName, metaTypeName, nodeGroupName, compactionPolicy, compactionPolicyProperties, datasetDetails, hints, datasetType, - datasetId, pendingOp); + datasetId, pendingOp, rebalanceCount); } @Override @@ -409,6 +415,16 @@ public class DatasetTupleTranslator extends AbstractTupleTranslator<Dataset> { stringSerde.serialize(aString, fieldValue.getDataOutput()); recordBuilder.addField(fieldName, fieldValue); } + if (dataset.getRebalanceCount() > 0) { + // Adds the field rebalanceCount. + fieldName.reset(); + aString.setValue("rebalanceCount"); + stringSerde.serialize(aString, fieldName.getDataOutput()); + fieldValue.reset(); + aBigInt.setValue(dataset.getRebalanceCount()); + aBigIntSerde.serialize(aBigInt, fieldValue.getDataOutput()); + recordBuilder.addField(fieldName, fieldValue); + } } protected void writeDatasetDetailsRecordType(IARecordBuilder recordBuilder, Dataset dataset, DataOutput dataOutput) http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87805197/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java index 098645e..6801427 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java @@ -19,7 +19,6 @@ package org.apache.asterix.metadata.utils; import java.io.DataOutput; -import java.io.File; import java.rmi.RemoteException; import java.util.List; import java.util.Map; @@ -29,11 +28,17 @@ import org.apache.asterix.builders.IARecordBuilder; import org.apache.asterix.builders.RecordBuilder; import org.apache.asterix.common.config.DatasetConfig.DatasetType; import org.apache.asterix.common.context.CorrelatedPrefixMergePolicyFactory; +import org.apache.asterix.common.context.IStorageComponentProvider; +import org.apache.asterix.common.context.ITransactionSubsystemProvider; +import org.apache.asterix.common.context.TransactionSubsystemProvider; import org.apache.asterix.common.exceptions.ACIDException; import org.apache.asterix.common.exceptions.AsterixException; +import org.apache.asterix.common.transactions.IRecoveryManager; +import org.apache.asterix.common.transactions.JobId; import org.apache.asterix.external.indexing.IndexingConstants; import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider; import org.apache.asterix.formats.nontagged.TypeTraitProvider; +import org.apache.asterix.metadata.MetadataException; import org.apache.asterix.metadata.MetadataManager; import org.apache.asterix.metadata.MetadataTransactionContext; import org.apache.asterix.metadata.declared.MetadataProvider; @@ -47,25 +52,40 @@ import org.apache.asterix.om.base.AString; import org.apache.asterix.om.types.ARecordType; import org.apache.asterix.om.types.BuiltinType; import org.apache.asterix.om.types.IAType; +import org.apache.asterix.runtime.formats.FormatUtils; +import org.apache.asterix.runtime.job.listener.JobEventListenerFactory; +import org.apache.asterix.runtime.operators.LSMPrimaryUpsertOperatorDescriptor; import org.apache.asterix.runtime.utils.RuntimeUtils; +import org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexInstantSearchOperationCallbackFactory; import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint; import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.algebricks.common.utils.Pair; import org.apache.hyracks.algebricks.data.IBinaryComparatorFactoryProvider; +import org.apache.hyracks.api.dataflow.IOperatorDescriptor; import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; +import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory; import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer; import org.apache.hyracks.api.dataflow.value.ITypeTraits; +import org.apache.hyracks.api.dataflow.value.RecordDescriptor; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.io.FileSplit; import org.apache.hyracks.api.job.JobSpecification; import org.apache.hyracks.data.std.util.ArrayBackedValueStorage; +import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder; +import org.apache.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer; import org.apache.hyracks.dataflow.std.file.IFileSplitProvider; +import org.apache.hyracks.dataflow.std.misc.ConstantTupleSourceOperatorDescriptor; +import org.apache.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor; +import org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory; +import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory; import org.apache.hyracks.storage.am.common.build.IndexBuilderFactory; import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory; import org.apache.hyracks.storage.am.common.dataflow.IndexCreateOperatorDescriptor; import org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory; import org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor; +import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory; +import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation; import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory; import org.apache.hyracks.storage.am.lsm.common.dataflow.LSMTreeIndexCompactOperatorDescriptor; import org.apache.hyracks.storage.common.IResourceFactory; @@ -217,8 +237,7 @@ public class DatasetUtil { public static JobSpecification dropDatasetJobSpec(Dataset dataset, MetadataProvider metadataProvider) throws AlgebricksException, HyracksDataException, RemoteException, ACIDException { - String datasetPath = dataset.getDataverseName() + File.separator + dataset.getDatasetName(); - LOGGER.info("DROP DATASETPATH: " + datasetPath); + LOGGER.info("DROP DATASET: " + dataset); if (dataset.getDatasetType() == DatasetType.EXTERNAL) { return RuntimeUtils.createJobSpecification(metadataProvider.getApplicationContext()); } @@ -249,41 +268,32 @@ public class DatasetUtil { return spec; } - public static JobSpecification createDatasetJobSpec(Dataverse dataverse, String datasetName, - MetadataProvider metadataProvider) throws AlgebricksException { - String dataverseName = dataverse.getDataverseName(); - Dataset dataset = metadataProvider.findDataset(dataverseName, datasetName); - if (dataset == null) { - throw new AsterixException("Could not find dataset " + datasetName + " in dataverse " + dataverseName); - } - Index index = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(), dataverseName, - datasetName, datasetName); - ARecordType itemType = - (ARecordType) metadataProvider.findType(dataset.getItemTypeDataverseName(), dataset.getItemTypeName()); + public static JobSpecification createDatasetJobSpec(Dataset dataset, MetadataProvider metadataProvider) + throws AlgebricksException { + Index index = IndexUtil.getPrimaryIndex(dataset); + ARecordType itemType = (ARecordType) metadataProvider.findType(dataset); // get meta item type ARecordType metaItemType = null; if (dataset.hasMetaPart()) { - metaItemType = (ARecordType) metadataProvider.findType(dataset.getMetaItemTypeDataverseName(), - dataset.getMetaItemTypeName()); + metaItemType = (ARecordType) metadataProvider.findMetaType(dataset); } JobSpecification spec = RuntimeUtils.createJobSpecification(metadataProvider.getApplicationContext()); - Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = - metadataProvider.getSplitProviderAndConstraints(dataset); + Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = metadataProvider + .getSplitProviderAndConstraints(dataset); FileSplit[] fs = splitsAndConstraint.first.getFileSplits(); StringBuilder sb = new StringBuilder(); for (int i = 0; i < fs.length; i++) { sb.append(fs[i] + " "); } LOGGER.info("CREATING File Splits: " + sb.toString()); - - Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = - DatasetUtil.getMergePolicyFactory(dataset, metadataProvider.getMetadataTxnContext()); + Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtil.getMergePolicyFactory(dataset, + metadataProvider.getMetadataTxnContext()); //prepare a LocalResourceMetadata which will be stored in NC's local resource repository IResourceFactory resourceFactory = dataset.getResourceFactory(metadataProvider, index, itemType, metaItemType, compactionInfo.first, compactionInfo.second); - IndexBuilderFactory indexBuilderFactory = - new IndexBuilderFactory(metadataProvider.getStorageComponentProvider().getStorageManager(), - splitsAndConstraint.first, resourceFactory, !dataset.isTemp()); + IndexBuilderFactory indexBuilderFactory = new IndexBuilderFactory( + metadataProvider.getStorageComponentProvider().getStorageManager(), splitsAndConstraint.first, + resourceFactory, !dataset.isTemp()); IndexCreateOperatorDescriptor indexCreateOp = new IndexCreateOperatorDescriptor(spec, indexBuilderFactory); AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, indexCreateOp, splitsAndConstraint.second); @@ -292,7 +302,7 @@ public class DatasetUtil { } public static JobSpecification compactDatasetJobSpec(Dataverse dataverse, String datasetName, - MetadataProvider metadataProvider) throws AsterixException, AlgebricksException { + MetadataProvider metadataProvider) throws AlgebricksException { String dataverseName = dataverse.getDataverseName(); Dataset dataset = metadataProvider.findDataset(dataverseName, datasetName); if (dataset == null) { @@ -313,6 +323,180 @@ public class DatasetUtil { return spec; } + /** + * Creates a primary index scan operator for a given dataset. + * + * @param spec, + * the job specification. + * @param metadataProvider, + * the metadata provider. + * @param dataset, + * the dataset to scan. + * @param jobId, + * the AsterixDB job id for transaction management. + * @return a primary index scan operator. + * @throws AlgebricksException + */ + public static IOperatorDescriptor createPrimaryIndexScanOp(JobSpecification spec, MetadataProvider metadataProvider, + Dataset dataset, JobId jobId) throws AlgebricksException { + Pair<IFileSplitProvider, AlgebricksPartitionConstraint> primarySplitsAndConstraint = metadataProvider + .getSplitProviderAndConstraints(dataset); + IFileSplitProvider primaryFileSplitProvider = primarySplitsAndConstraint.first; + AlgebricksPartitionConstraint primaryPartitionConstraint = primarySplitsAndConstraint.second; + // -Infinity + int[] lowKeyFields = null; + // +Infinity + int[] highKeyFields = null; + ITransactionSubsystemProvider txnSubsystemProvider = TransactionSubsystemProvider.INSTANCE; + boolean temp = dataset.getDatasetDetails().isTemp(); + ISearchOperationCallbackFactory searchCallbackFactory = temp ? NoOpOperationCallbackFactory.INSTANCE + : new PrimaryIndexInstantSearchOperationCallbackFactory(jobId, dataset.getDatasetId(), + dataset.getPrimaryBloomFilterFields(), txnSubsystemProvider, + IRecoveryManager.ResourceType.LSM_BTREE); + IndexDataflowHelperFactory indexHelperFactory = new IndexDataflowHelperFactory( + metadataProvider.getStorageComponentProvider().getStorageManager(), primaryFileSplitProvider); + BTreeSearchOperatorDescriptor primarySearchOp = new BTreeSearchOperatorDescriptor(spec, + dataset.getPrimaryRecordDescriptor(metadataProvider), lowKeyFields, highKeyFields, true, true, + indexHelperFactory, false, false, null, searchCallbackFactory, null, null, false); + AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, primarySearchOp, + primaryPartitionConstraint); + return primarySearchOp; + } + + /** + * Creates a primary index upsert operator for a given dataset. + * + * @param spec, + * the job specification. + * @param metadataProvider, + * the metadata provider. + * @param dataset, + * the dataset to upsert. + * @param inputRecordDesc,the + * record descriptor for an input tuple. + * @param fieldPermutation, + * the field permutation according to the input. + * @param missingWriterFactory, + * the factory for customizing missing value serialization. + * @return a primary index scan operator and its location constraints. + * @throws AlgebricksException + */ + public static Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> createPrimaryIndexUpsertOp( + JobSpecification spec, MetadataProvider metadataProvider, Dataset dataset, RecordDescriptor inputRecordDesc, + int[] fieldPermutation, IMissingWriterFactory missingWriterFactory) throws AlgebricksException { + int numKeys = dataset.getPrimaryKeys().size(); + int numFilterFields = DatasetUtil.getFilterField(dataset) == null ? 0 : 1; + ARecordType itemType = (ARecordType) metadataProvider.findType(dataset); + ARecordType metaItemType = (ARecordType) metadataProvider.findMetaType(dataset); + try { + Index primaryIndex = metadataProvider.getIndex(dataset.getDataverseName(), dataset.getDatasetName(), + dataset.getDatasetName()); + Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = metadataProvider + .getSplitProviderAndConstraints(dataset); + + // prepare callback + JobId jobId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId(); + int[] primaryKeyFields = new int[numKeys]; + for (int i = 0; i < numKeys; i++) { + primaryKeyFields[i] = i; + } + boolean hasSecondaries = metadataProvider + .getDatasetIndexes(dataset.getDataverseName(), dataset.getDatasetName()).size() > 1; + IStorageComponentProvider storageComponentProvider = metadataProvider.getStorageComponentProvider(); + IModificationOperationCallbackFactory modificationCallbackFactory = dataset.getModificationCallbackFactory( + storageComponentProvider, primaryIndex, jobId, IndexOperation.UPSERT, primaryKeyFields); + ISearchOperationCallbackFactory searchCallbackFactory = dataset.getSearchCallbackFactory( + storageComponentProvider, primaryIndex, jobId, IndexOperation.UPSERT, primaryKeyFields); + IIndexDataflowHelperFactory idfh = new IndexDataflowHelperFactory( + storageComponentProvider.getStorageManager(), splitsAndConstraint.first); + LSMPrimaryUpsertOperatorDescriptor op; + ITypeTraits[] outputTypeTraits = new ITypeTraits[inputRecordDesc.getFieldCount() + + (dataset.hasMetaPart() ? 2 : 1) + numFilterFields]; + ISerializerDeserializer<?>[] outputSerDes = new ISerializerDeserializer[inputRecordDesc.getFieldCount() + + (dataset.hasMetaPart() ? 2 : 1) + numFilterFields]; + + // add the previous record first + int f = 0; + outputSerDes[f] = FormatUtils.getDefaultFormat().getSerdeProvider().getSerializerDeserializer(itemType); + f++; + // add the previous meta second + if (dataset.hasMetaPart()) { + outputSerDes[f] = FormatUtils.getDefaultFormat().getSerdeProvider() + .getSerializerDeserializer(metaItemType); + outputTypeTraits[f] = FormatUtils.getDefaultFormat().getTypeTraitProvider().getTypeTrait(metaItemType); + f++; + } + // add the previous filter third + int fieldIdx = -1; + if (numFilterFields > 0) { + String filterField = DatasetUtil.getFilterField(dataset).get(0); + String[] fieldNames = itemType.getFieldNames(); + int i = 0; + for (; i < fieldNames.length; i++) { + if (fieldNames[i].equals(filterField)) { + break; + } + } + fieldIdx = i; + outputTypeTraits[f] = FormatUtils.getDefaultFormat().getTypeTraitProvider() + .getTypeTrait(itemType.getFieldTypes()[fieldIdx]); + outputSerDes[f] = FormatUtils.getDefaultFormat().getSerdeProvider() + .getSerializerDeserializer(itemType.getFieldTypes()[fieldIdx]); + f++; + } + for (int j = 0; j < inputRecordDesc.getFieldCount(); j++) { + outputTypeTraits[j + f] = inputRecordDesc.getTypeTraits()[j]; + outputSerDes[j + f] = inputRecordDesc.getFields()[j]; + } + RecordDescriptor outputRecordDesc = new RecordDescriptor(outputSerDes, outputTypeTraits); + op = new LSMPrimaryUpsertOperatorDescriptor(spec, outputRecordDesc, fieldPermutation, idfh, + missingWriterFactory, modificationCallbackFactory, searchCallbackFactory, + dataset.getFrameOpCallbackFactory(), numKeys, itemType, fieldIdx, hasSecondaries); + return new Pair<>(op, splitsAndConstraint.second); + } catch (MetadataException me) { + throw new AlgebricksException(me); + } + } + + /** + * Creates a dummy key provider operator for the primary index scan. + * + * @param spec, + * the job specification. + * @param dataset, + * the dataset to scan. + * @param metadataProvider, + * the metadata provider. + * @return a dummy key provider operator. + * @throws AlgebricksException + */ + public static IOperatorDescriptor createDummyKeyProviderOp(JobSpecification spec, Dataset dataset, + MetadataProvider metadataProvider) throws AlgebricksException { + Pair<IFileSplitProvider, AlgebricksPartitionConstraint> primarySplitsAndConstraint = metadataProvider + .getSplitProviderAndConstraints(dataset); + AlgebricksPartitionConstraint primaryPartitionConstraint = primarySplitsAndConstraint.second; + + // Build dummy tuple containing one field with a dummy value inside. + ArrayTupleBuilder tb = new ArrayTupleBuilder(1); + DataOutput dos = tb.getDataOutput(); + tb.reset(); + try { + // Serialize dummy value into a field. + IntegerSerializerDeserializer.INSTANCE.serialize(0, dos); + } catch (HyracksDataException e) { + throw new AsterixException(e); + } + // Add dummy field. + tb.addFieldEndOffset(); + ISerializerDeserializer[] keyRecDescSers = { IntegerSerializerDeserializer.INSTANCE }; + RecordDescriptor keyRecDesc = new RecordDescriptor(keyRecDescSers); + ConstantTupleSourceOperatorDescriptor keyProviderOp = new ConstantTupleSourceOperatorDescriptor(spec, + keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(), tb.getSize()); + AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, keyProviderOp, + primaryPartitionConstraint); + return keyProviderOp; + } + public static boolean isFullyQualifiedName(String datasetName) { return datasetName.indexOf('.') > 0; //NOSONAR a fully qualified name can't start with a . } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87805197/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java index 6d07fc7..96ca8d7 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java @@ -21,18 +21,24 @@ package org.apache.asterix.metadata.utils; import java.util.Collections; import java.util.List; +import org.apache.asterix.common.config.DatasetConfig; import org.apache.asterix.common.config.OptimizationConfUtil; import org.apache.asterix.common.exceptions.CompilationException; import org.apache.asterix.common.exceptions.ErrorCode; +import org.apache.asterix.common.transactions.JobId; import org.apache.asterix.external.indexing.ExternalFile; import org.apache.asterix.metadata.declared.MetadataProvider; import org.apache.asterix.metadata.entities.Dataset; import org.apache.asterix.metadata.entities.Index; +import org.apache.asterix.metadata.entities.InternalDatasetDetails; import org.apache.asterix.om.types.ARecordType; +import org.apache.asterix.runtime.job.listener.JobEventListenerFactory; +import org.apache.asterix.transaction.management.service.transaction.JobIdFactory; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.algebricks.common.utils.Pair; import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig; import org.apache.hyracks.api.dataflow.value.ITypeTraits; +import org.apache.hyracks.api.job.IJobletEventListenerFactory; import org.apache.hyracks.api.job.JobSpecification; public class IndexUtil { @@ -53,6 +59,13 @@ public class IndexUtil { return secondaryFilterFields(dataset, index, filterTypeTraits); } + public static Index getPrimaryIndex(Dataset dataset) { + InternalDatasetDetails id = (InternalDatasetDetails) dataset.getDatasetDetails(); + return new Index(dataset.getDataverseName(), dataset.getDatasetName(), dataset.getDatasetName(), + DatasetConfig.IndexType.BTREE, id.getPartitioningKey(), id.getKeySourceIndicator(), + id.getPrimaryKeyType(), false, true, dataset.getPendingOp()); + } + public static int[] getBtreeFieldsIfFiltered(Dataset dataset, Index index) throws AlgebricksException { if (index.isPrimaryIndex()) { return DatasetUtil.createBTreeFieldsWhenThereisAFilter(dataset); @@ -144,4 +157,22 @@ public class IndexUtil { physicalOptimizationConfig, recType, metaType, enforcedType, enforcedMetaType); return secondaryIndexHelper.buildCompactJobSpec(); } + + /** + * Binds a job event listener to the job specification. + * + * @param spec, + * the job specification. + * @param metadataProvider, + * the metadata provider. + * @return the AsterixDB job id for transaction management. + */ + public static JobId bindJobEventListener(JobSpecification spec, MetadataProvider metadataProvider) { + JobId jobId = JobIdFactory.generateJobId(); + metadataProvider.setJobId(jobId); + boolean isWriteTransaction = metadataProvider.isWriteTransaction(); + IJobletEventListenerFactory jobEventListenerFactory = new JobEventListenerFactory(jobId, isWriteTransaction); + spec.setJobletEventListenerFactory(jobEventListenerFactory); + return jobId; + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87805197/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryBTreeOperationsHelper.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryBTreeOperationsHelper.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryBTreeOperationsHelper.java index 1f7914d..b31bd47 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryBTreeOperationsHelper.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryBTreeOperationsHelper.java @@ -22,6 +22,7 @@ import java.util.List; import org.apache.asterix.common.config.DatasetConfig.DatasetType; import org.apache.asterix.common.config.GlobalConfig; +import org.apache.asterix.common.transactions.JobId; import org.apache.asterix.external.indexing.IndexingConstants; import org.apache.asterix.external.operators.ExternalScanOperatorDescriptor; import org.apache.asterix.metadata.declared.MetadataProvider; @@ -51,7 +52,6 @@ import org.apache.hyracks.dataflow.std.base.AbstractOperatorDescriptor; import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor; import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor; import org.apache.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor; -import org.apache.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor; import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory; import org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory; import org.apache.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor; @@ -128,13 +128,16 @@ public class SecondaryBTreeOperationsHelper extends SecondaryTreeIndexOperations return spec; } else { // Create dummy key provider for feeding the primary index scan. - AbstractOperatorDescriptor keyProviderOp = createDummyKeyProviderOp(spec); + IOperatorDescriptor keyProviderOp = DatasetUtil.createDummyKeyProviderOp(spec, dataset, + metadataProvider); + JobId jobId = IndexUtil.bindJobEventListener(spec, metadataProvider); // Create primary index scan op. - BTreeSearchOperatorDescriptor primaryScanOp = createPrimaryIndexScanOp(spec); + IOperatorDescriptor primaryScanOp = DatasetUtil.createPrimaryIndexScanOp(spec, metadataProvider, dataset, + jobId); // Assign op. - AbstractOperatorDescriptor sourceOp = primaryScanOp; + IOperatorDescriptor sourceOp = primaryScanOp; if (isEnforcingKeyTypes && !enforcedItemType.equals(itemType)) { sourceOp = createCastOp(spec, dataset.getDatasetType()); spec.connect(new OneToOneConnectorDescriptor(spec), primaryScanOp, 0, sourceOp, 0); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87805197/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java index 86e3911..b437798 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java @@ -19,19 +19,13 @@ package org.apache.asterix.metadata.utils; -import java.io.DataOutput; import java.util.List; import java.util.Map; import org.apache.asterix.common.config.DatasetConfig.DatasetType; import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp; -import org.apache.asterix.common.context.ITransactionSubsystemProvider; -import org.apache.asterix.common.context.TransactionSubsystemProvider; -import org.apache.asterix.common.exceptions.AsterixException; import org.apache.asterix.common.exceptions.CompilationException; import org.apache.asterix.common.exceptions.ErrorCode; -import org.apache.asterix.common.transactions.IRecoveryManager.ResourceType; -import org.apache.asterix.common.transactions.JobId; import org.apache.asterix.external.indexing.ExternalFile; import org.apache.asterix.external.indexing.IndexingConstants; import org.apache.asterix.external.operators.ExternalIndexBulkLoadOperatorDescriptor; @@ -52,9 +46,6 @@ import org.apache.asterix.runtime.evaluators.functions.AndDescriptor; import org.apache.asterix.runtime.evaluators.functions.CastTypeDescriptor; import org.apache.asterix.runtime.evaluators.functions.IsUnknownDescriptor; import org.apache.asterix.runtime.evaluators.functions.NotDescriptor; -import org.apache.asterix.runtime.job.listener.JobEventListenerFactory; -import org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexInstantSearchOperationCallbackFactory; -import org.apache.asterix.transaction.management.service.transaction.JobIdFactory; import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint; import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; @@ -71,21 +62,11 @@ import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer; import org.apache.hyracks.api.dataflow.value.ITypeTraits; import org.apache.hyracks.api.dataflow.value.RecordDescriptor; -import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.api.job.IJobletEventListenerFactory; import org.apache.hyracks.api.job.JobSpecification; -import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder; -import org.apache.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer; -import org.apache.hyracks.dataflow.std.base.AbstractOperatorDescriptor; import org.apache.hyracks.dataflow.std.file.IFileSplitProvider; -import org.apache.hyracks.dataflow.std.misc.ConstantTupleSourceOperatorDescriptor; import org.apache.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor; -import org.apache.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor; -import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory; import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory; -import org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory; import org.apache.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor; -import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory; import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory; @SuppressWarnings("rawtypes") @@ -262,59 +243,13 @@ public abstract class SecondaryIndexOperationsHelper { primaryTypeTraits[numPrimaryKeys] = TypeTraitProvider.INSTANCE.getTypeTrait(itemType); if (dataset.hasMetaPart()) { primaryRecFields[numPrimaryKeys + 1] = payloadSerde; - primaryTypeTraits[numPrimaryKeys + 1] = TypeTraitProvider.INSTANCE.getTypeTrait(itemType); + primaryTypeTraits[numPrimaryKeys + 1] = TypeTraitProvider.INSTANCE.getTypeTrait(metaType); } primaryRecDesc = new RecordDescriptor(primaryRecFields, primaryTypeTraits); } protected abstract void setSecondaryRecDescAndComparators() throws AlgebricksException; - protected AbstractOperatorDescriptor createDummyKeyProviderOp(JobSpecification spec) throws AlgebricksException { - // Build dummy tuple containing one field with a dummy value inside. - ArrayTupleBuilder tb = new ArrayTupleBuilder(1); - DataOutput dos = tb.getDataOutput(); - tb.reset(); - try { - // Serialize dummy value into a field. - IntegerSerializerDeserializer.INSTANCE.serialize(0, dos); - } catch (HyracksDataException e) { - throw new AsterixException(e); - } - // Add dummy field. - tb.addFieldEndOffset(); - ISerializerDeserializer[] keyRecDescSers = { IntegerSerializerDeserializer.INSTANCE }; - RecordDescriptor keyRecDesc = new RecordDescriptor(keyRecDescSers); - ConstantTupleSourceOperatorDescriptor keyProviderOp = new ConstantTupleSourceOperatorDescriptor(spec, - keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(), tb.getSize()); - AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, keyProviderOp, - primaryPartitionConstraint); - return keyProviderOp; - } - - protected BTreeSearchOperatorDescriptor createPrimaryIndexScanOp(JobSpecification spec) throws AlgebricksException { - // -Infinity - int[] lowKeyFields = null; - // +Infinity - int[] highKeyFields = null; - ITransactionSubsystemProvider txnSubsystemProvider = TransactionSubsystemProvider.INSTANCE; - JobId jobId = JobIdFactory.generateJobId(); - metadataProvider.setJobId(jobId); - boolean isWriteTransaction = metadataProvider.isWriteTransaction(); - IJobletEventListenerFactory jobEventListenerFactory = new JobEventListenerFactory(jobId, isWriteTransaction); - spec.setJobletEventListenerFactory(jobEventListenerFactory); - boolean temp = dataset.getDatasetDetails().isTemp(); - ISearchOperationCallbackFactory searchCallbackFactory = temp ? NoOpOperationCallbackFactory.INSTANCE - : new PrimaryIndexInstantSearchOperationCallbackFactory(jobId, dataset.getDatasetId(), - primaryBloomFilterKeyFields, txnSubsystemProvider, ResourceType.LSM_BTREE); - IndexDataflowHelperFactory indexHelperFactory = new IndexDataflowHelperFactory( - metadataProvider.getStorageComponentProvider().getStorageManager(), primaryFileSplitProvider); - BTreeSearchOperatorDescriptor primarySearchOp = - new BTreeSearchOperatorDescriptor(spec, primaryRecDesc, lowKeyFields, highKeyFields, true, true, - indexHelperFactory, false, false, null, searchCallbackFactory, null, null, false); - AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, primarySearchOp, - primaryPartitionConstraint); - return primarySearchOp; - } protected AlgebricksMetaOperatorDescriptor createAssignOp(JobSpecification spec, int numSecondaryKeyFields, RecordDescriptor secondaryRecDesc) throws AlgebricksException {
