Repository: asterixdb Updated Branches: refs/heads/master 77109ea4b -> e856e1e8f
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e856e1e8/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedInvertedIndexOperationsHelper.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedInvertedIndexOperationsHelper.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedInvertedIndexOperationsHelper.java new file mode 100644 index 0000000..15f8a23 --- /dev/null +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedInvertedIndexOperationsHelper.java @@ -0,0 +1,339 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.metadata.utils; + +import org.apache.asterix.common.config.DatasetConfig.IndexType; +import org.apache.asterix.common.exceptions.CompilationException; +import org.apache.asterix.common.exceptions.ErrorCode; +import org.apache.asterix.common.transactions.JobId; +import org.apache.asterix.dataflow.data.nontagged.MissingWriterFactory; +import org.apache.asterix.metadata.declared.MetadataProvider; +import org.apache.asterix.metadata.entities.Dataset; +import org.apache.asterix.metadata.entities.Index; +import org.apache.asterix.om.types.IAType; +import org.apache.asterix.om.utils.NonTaggedFormatUtil; +import org.apache.asterix.om.utils.RecordUtil; +import org.apache.asterix.runtime.formats.FormatUtils; +import org.apache.asterix.runtime.utils.RuntimeUtils; +import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper; +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.algebricks.common.utils.Pair; +import org.apache.hyracks.algebricks.core.jobgen.impl.ConnectorPolicyAssignmentPolicy; +import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig; +import org.apache.hyracks.algebricks.data.ISerializerDeserializerProvider; +import org.apache.hyracks.algebricks.data.ITypeTraitProvider; +import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory; +import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; +import org.apache.hyracks.algebricks.runtime.operators.base.SinkRuntimeFactory; +import org.apache.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor; +import org.apache.hyracks.api.dataflow.IOperatorDescriptor; +import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; +import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer; +import org.apache.hyracks.api.dataflow.value.ITypeTraits; +import org.apache.hyracks.api.dataflow.value.RecordDescriptor; +import org.apache.hyracks.api.job.JobSpecification; +import org.apache.hyracks.data.std.accessors.PointableBinaryComparatorFactory; +import org.apache.hyracks.data.std.primitive.ShortPointable; +import org.apache.hyracks.dataflow.common.data.marshalling.ShortSerializerDeserializer; +import org.apache.hyracks.dataflow.std.base.AbstractOperatorDescriptor; +import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor; +import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor; +import org.apache.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor; +import org.apache.hyracks.storage.am.lsm.invertedindex.dataflow.BinaryTokenizerOperatorDescriptor; +import org.apache.hyracks.storage.am.lsm.invertedindex.tokenizers.IBinaryTokenizerFactory; + +public class SecondaryCorrelatedInvertedIndexOperationsHelper extends SecondaryCorrelatedTreeIndexOperationsHelper { + + private IAType secondaryKeyType; + private ITypeTraits[] invListsTypeTraits; + private IBinaryComparatorFactory[] tokenComparatorFactories; + private ITypeTraits[] tokenTypeTraits; + private IBinaryTokenizerFactory tokenizerFactory; + // For tokenization, sorting and loading. Represents <token, primary keys>. + private int numTokenKeyPairFields; + private IBinaryComparatorFactory[] tokenKeyPairComparatorFactories; + private RecordDescriptor tokenKeyPairRecDesc; + private boolean isPartitioned; + private int[] invertedIndexFields; + private int[] invertedIndexFieldsForNonBulkLoadOps; + private int[] secondaryFilterFieldsForNonBulkLoadOps; + + protected SecondaryCorrelatedInvertedIndexOperationsHelper(Dataset dataset, Index index, + PhysicalOptimizationConfig physOptConf, MetadataProvider metadataProvider) throws AlgebricksException { + super(dataset, index, physOptConf, metadataProvider); + } + + @Override + @SuppressWarnings("rawtypes") + protected void setSecondaryRecDescAndComparators() throws AlgebricksException { + int numSecondaryKeys = index.getKeyFieldNames().size(); + IndexType indexType = index.getIndexType(); + boolean isOverridingKeyFieldTypes = index.isOverridingKeyFieldTypes(); + // Sanity checks. + if (numPrimaryKeys > 1) { + throw new CompilationException(ErrorCode.COMPILATION_ILLEGAL_INDEX_FOR_DATASET_WITH_COMPOSITE_PRIMARY_INDEX, + indexType, RecordUtil.toFullyQualifiedName(dataset.getDataverseName(), dataset.getDatasetName())); + } + if (numSecondaryKeys > 1) { + throw new CompilationException(ErrorCode.COMPILATION_ILLEGAL_INDEX_NUM_OF_FIELD, numSecondaryKeys, + indexType, 1); + } + if (indexType == IndexType.LENGTH_PARTITIONED_WORD_INVIX + || indexType == IndexType.LENGTH_PARTITIONED_NGRAM_INVIX) { + isPartitioned = true; + } else { + isPartitioned = false; + } + // Prepare record descriptor used in the assign op, and the optional + // select op. + secondaryFieldAccessEvalFactories = new IScalarEvaluatorFactory[numSecondaryKeys + numFilterFields]; + ISerializerDeserializer[] secondaryRecFields = + new ISerializerDeserializer[numPrimaryKeys + numSecondaryKeys + numFilterFields]; + ISerializerDeserializer[] enforcedRecFields = new ISerializerDeserializer[1 + numPrimaryKeys + numFilterFields]; + secondaryTypeTraits = new ITypeTraits[numSecondaryKeys + numPrimaryKeys]; + ITypeTraits[] enforcedTypeTraits = new ITypeTraits[1 + numPrimaryKeys]; + ISerializerDeserializerProvider serdeProvider = FormatUtils.getDefaultFormat().getSerdeProvider(); + ITypeTraitProvider typeTraitProvider = FormatUtils.getDefaultFormat().getTypeTraitProvider(); + int recordColumn = NUM_TAG_FIELDS + numPrimaryKeys; + if (numSecondaryKeys > 0) { + secondaryFieldAccessEvalFactories[0] = FormatUtils.getDefaultFormat().getFieldAccessEvaluatorFactory( + isOverridingKeyFieldTypes ? enforcedItemType : itemType, index.getKeyFieldNames().get(0), + recordColumn); + Pair<IAType, Boolean> keyTypePair = Index.getNonNullableOpenFieldType(index.getKeyFieldTypes().get(0), + index.getKeyFieldNames().get(0), itemType); + secondaryKeyType = keyTypePair.first; + anySecondaryKeyIsNullable = anySecondaryKeyIsNullable || keyTypePair.second; + ISerializerDeserializer keySerde = serdeProvider.getSerializerDeserializer(secondaryKeyType); + secondaryRecFields[0] = keySerde; + secondaryTypeTraits[0] = typeTraitProvider.getTypeTrait(secondaryKeyType); + } + if (numFilterFields > 0) { + secondaryFieldAccessEvalFactories[numSecondaryKeys] = FormatUtils.getDefaultFormat() + .getFieldAccessEvaluatorFactory(itemType, filterFieldName, recordColumn); + Pair<IAType, Boolean> keyTypePair = Index.getNonNullableKeyFieldType(filterFieldName, itemType); + IAType type = keyTypePair.first; + ISerializerDeserializer serde = serdeProvider.getSerializerDeserializer(type); + secondaryRecFields[numPrimaryKeys + numSecondaryKeys] = serde; + } + secondaryRecDesc = new RecordDescriptor(secondaryRecFields); + // Comparators and type traits for tokens. + int numTokenFields = (!isPartitioned) ? numSecondaryKeys : numSecondaryKeys + 1; + tokenComparatorFactories = new IBinaryComparatorFactory[numTokenFields]; + tokenTypeTraits = new ITypeTraits[numTokenFields]; + tokenComparatorFactories[0] = NonTaggedFormatUtil.getTokenBinaryComparatorFactory(secondaryKeyType); + tokenTypeTraits[0] = NonTaggedFormatUtil.getTokenTypeTrait(secondaryKeyType); + if (isPartitioned) { + // The partitioning field is hardcoded to be a short *without* an Asterix type tag. + tokenComparatorFactories[1] = PointableBinaryComparatorFactory.of(ShortPointable.FACTORY); + tokenTypeTraits[1] = ShortPointable.TYPE_TRAITS; + } + // Set tokenizer factory. + // TODO: We might want to expose the hashing option at the AQL level, + // and add the choice to the index metadata. + tokenizerFactory = NonTaggedFormatUtil.getBinaryTokenizerFactory(secondaryKeyType.getTypeTag(), indexType, + index.getGramLength()); + // Type traits for inverted-list elements. Inverted lists contain + // primary keys. + invListsTypeTraits = new ITypeTraits[numPrimaryKeys]; + if (numPrimaryKeys > 0) { + invListsTypeTraits[0] = primaryRecDesc.getTypeTraits()[0]; + enforcedRecFields[0] = primaryRecDesc.getFields()[0]; + enforcedTypeTraits[0] = primaryRecDesc.getTypeTraits()[0]; + } + enforcedRecFields[numPrimaryKeys] = serdeProvider.getSerializerDeserializer(itemType); + enforcedRecDesc = new RecordDescriptor(enforcedRecFields, enforcedTypeTraits); + // For tokenization, sorting and loading. + // One token (+ optional partitioning field) + primary keys. + numTokenKeyPairFields = (!isPartitioned) ? 1 + numPrimaryKeys : 2 + numPrimaryKeys; + ISerializerDeserializer[] tokenKeyPairFields = + new ISerializerDeserializer[numTokenKeyPairFields + numFilterFields]; + ITypeTraits[] tokenKeyPairTypeTraits = new ITypeTraits[numTokenKeyPairFields]; + tokenKeyPairComparatorFactories = new IBinaryComparatorFactory[numTokenKeyPairFields]; + tokenKeyPairFields[0] = serdeProvider.getSerializerDeserializer(secondaryKeyType); + tokenKeyPairTypeTraits[0] = tokenTypeTraits[0]; + tokenKeyPairComparatorFactories[0] = NonTaggedFormatUtil.getTokenBinaryComparatorFactory(secondaryKeyType); + int pkOff = 1; + if (isPartitioned) { + tokenKeyPairFields[1] = ShortSerializerDeserializer.INSTANCE; + tokenKeyPairTypeTraits[1] = tokenTypeTraits[1]; + tokenKeyPairComparatorFactories[1] = PointableBinaryComparatorFactory.of(ShortPointable.FACTORY); + pkOff = 2; + } + if (numPrimaryKeys > 0) { + tokenKeyPairFields[pkOff] = primaryRecDesc.getFields()[0]; + tokenKeyPairTypeTraits[pkOff] = primaryRecDesc.getTypeTraits()[0]; + tokenKeyPairComparatorFactories[pkOff] = primaryComparatorFactories[0]; + } + if (numFilterFields > 0) { + tokenKeyPairFields[numPrimaryKeys + pkOff] = secondaryRecFields[numPrimaryKeys + numSecondaryKeys]; + } + tokenKeyPairRecDesc = new RecordDescriptor(tokenKeyPairFields, tokenKeyPairTypeTraits); + if (filterFieldName != null) { + invertedIndexFields = new int[numTokenKeyPairFields]; + for (int i = 0; i < invertedIndexFields.length; i++) { + invertedIndexFields[i] = i; + } + secondaryFilterFieldsForNonBulkLoadOps = new int[numFilterFields]; + secondaryFilterFieldsForNonBulkLoadOps[0] = numSecondaryKeys + numPrimaryKeys; + invertedIndexFieldsForNonBulkLoadOps = new int[numSecondaryKeys + numPrimaryKeys]; + for (int i = 0; i < invertedIndexFieldsForNonBulkLoadOps.length; i++) { + invertedIndexFieldsForNonBulkLoadOps[i] = i; + } + } + } + + @Override + protected int getNumSecondaryKeys() { + return numTokenKeyPairFields - numPrimaryKeys; + } + + @Override + public JobSpecification buildLoadingJobSpec() throws AlgebricksException { + JobSpecification spec = RuntimeUtils.createJobSpecification(metadataProvider.getApplicationContext()); + JobId jobId = IndexUtil.bindJobEventListener(spec, metadataProvider); + + // Create dummy key provider for feeding the primary index scan. + IOperatorDescriptor keyProviderOp = DatasetUtil.createDummyKeyProviderOp(spec, dataset, metadataProvider); + + // Create primary index scan op. + IOperatorDescriptor primaryScanOp = createPrimaryIndexScanDiskComponentsOp(spec, metadataProvider, + getTaggedRecordDescriptor(dataset.getPrimaryRecordDescriptor(metadataProvider)), jobId); + + IOperatorDescriptor sourceOp = primaryScanOp; + boolean isOverridingKeyFieldTypes = index.isOverridingKeyFieldTypes(); + int numSecondaryKeys = index.getKeyFieldNames().size(); + if (isOverridingKeyFieldTypes && !enforcedItemType.equals(itemType)) { + sourceOp = createCastOp(spec, dataset.getDatasetType(), index.isEnforced()); + spec.connect(new OneToOneConnectorDescriptor(spec), primaryScanOp, 0, sourceOp, 0); + } + + RecordDescriptor taggedSecondaryRecDesc = getTaggedRecordDescriptor(secondaryRecDesc); + + RecordDescriptor taggedTokenKeyPairRecDesc = getTaggedRecordDescriptor(tokenKeyPairRecDesc); + + AlgebricksMetaOperatorDescriptor asterixAssignOp = + createAssignOp(spec, numSecondaryKeys, taggedSecondaryRecDesc); + + // Generate compensate tuples for upsert + IOperatorDescriptor processorOp = + createTupleProcessorOp(spec, taggedSecondaryRecDesc, numSecondaryKeys, numPrimaryKeys, true); + + // Create a tokenizer op. + AbstractOperatorDescriptor tokenizerOp = createTokenizerOp(spec); + + // Create a sort op + ExternalSortOperatorDescriptor sortOp = createSortOp(spec, + getTaggedSecondaryComparatorFactories(tokenKeyPairComparatorFactories), taggedTokenKeyPairRecDesc); + + // Create secondary inverted index bulk load op. + AbstractSingleActivityOperatorDescriptor invIndexBulkLoadOp = + createTreeIndexBulkLoadOp(spec, metadataProvider, taggedTokenKeyPairRecDesc, + createFieldPermutationForBulkLoadOp(), getNumSecondaryKeys(), numPrimaryKeys, true); + + AlgebricksMetaOperatorDescriptor metaOp = new AlgebricksMetaOperatorDescriptor(spec, 1, 0, + new IPushRuntimeFactory[] { new SinkRuntimeFactory() }, new RecordDescriptor[] {}); + // Connect the operators. + spec.connect(new OneToOneConnectorDescriptor(spec), keyProviderOp, 0, primaryScanOp, 0); + spec.connect(new OneToOneConnectorDescriptor(spec), sourceOp, 0, asterixAssignOp, 0); + spec.connect(new OneToOneConnectorDescriptor(spec), asterixAssignOp, 0, processorOp, 0); + spec.connect(new OneToOneConnectorDescriptor(spec), processorOp, 0, tokenizerOp, 0); + spec.connect(new OneToOneConnectorDescriptor(spec), tokenizerOp, 0, sortOp, 0); + spec.connect(new OneToOneConnectorDescriptor(spec), sortOp, 0, invIndexBulkLoadOp, 0); + spec.connect(new OneToOneConnectorDescriptor(spec), invIndexBulkLoadOp, 0, metaOp, 0); + spec.addRoot(metaOp); + spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy()); + return spec; + } + + private AbstractOperatorDescriptor createTokenizerOp(JobSpecification spec) throws AlgebricksException { + int docField = NUM_TAG_FIELDS; + int numSecondaryKeys = index.getKeyFieldNames().size(); + int[] keyFields = new int[NUM_TAG_FIELDS + numPrimaryKeys + numFilterFields]; + // set tag fields + for (int i = 0; i < NUM_TAG_FIELDS; i++) { + keyFields[i] = i; + } + // set primary key + filter fields + for (int i = NUM_TAG_FIELDS; i < keyFields.length; i++) { + keyFields[i] = i + numSecondaryKeys; + } + BinaryTokenizerOperatorDescriptor tokenizerOp = new BinaryTokenizerOperatorDescriptor(spec, + getTaggedRecordDescriptor(tokenKeyPairRecDesc), tokenizerFactory, docField, keyFields, isPartitioned, + false, true, MissingWriterFactory.INSTANCE); + AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, tokenizerOp, + primaryPartitionConstraint); + return tokenizerOp; + } + + @Override + protected ExternalSortOperatorDescriptor createSortOp(JobSpecification spec, + IBinaryComparatorFactory[] taggedSecondaryComparatorFactories, RecordDescriptor taggedSecondaryRecDesc) { + /** + * after tokenization, the field layout becomes + * [token, num?, tag, primary key, filter value] + * we need to sort on + * [tag, token, num?, primary key] + */ + int[] taggedSortFields = new int[taggedSecondaryComparatorFactories.length]; + int numSecondaryKeys = getNumSecondaryKeys(); + int idx = 0; + // set component pos fields + taggedSortFields[idx++] = numSecondaryKeys; + // set secondary keys + for (int i = 0; i < numSecondaryKeys; i++) { + taggedSortFields[idx++] = i; + } + // set primary keys + for (int i = 0; i < numPrimaryKeys; i++) { + taggedSortFields[idx++] = i + numSecondaryKeys + NUM_TAG_FIELDS; + } + ExternalSortOperatorDescriptor sortOp = + new ExternalSortOperatorDescriptor(spec, physOptConf.getMaxFramesExternalSort(), taggedSortFields, + taggedSecondaryComparatorFactories, taggedSecondaryRecDesc); + AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, sortOp, primaryPartitionConstraint); + return sortOp; + } + + @Override + protected int[] createFieldPermutationForBulkLoadOp() { + /** + * after tokenization, the field layout becomes + * [token, num?, tag, primary key, filter value] + * we need to restore it back to + * [tag, token, num?, primary key, filter value] + */ + int[] fieldPermutation = new int[NUM_TAG_FIELDS + numTokenKeyPairFields + numFilterFields]; + int numSecondaryKeys = getNumSecondaryKeys(); + int idx = 0; + // set tag fields + for (int i = 0; i < NUM_TAG_FIELDS; i++) { + fieldPermutation[idx++] = i + numSecondaryKeys; + } + // set secondary keys + for (int i = 0; i < numSecondaryKeys; i++) { + fieldPermutation[idx++] = i; + } + // set primary key + filter + for (int i = 0; i < numPrimaryKeys + numFilterFields; i++) { + fieldPermutation[idx++] = i + NUM_TAG_FIELDS + numSecondaryKeys; + } + return fieldPermutation; + } + +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e856e1e8/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedRTreeOperationsHelper.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedRTreeOperationsHelper.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedRTreeOperationsHelper.java new file mode 100644 index 0000000..8fd8a7a --- /dev/null +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedRTreeOperationsHelper.java @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.metadata.utils; + +import java.util.List; + +import org.apache.asterix.common.config.DatasetConfig.DatasetType; +import org.apache.asterix.common.exceptions.AsterixException; +import org.apache.asterix.common.exceptions.ErrorCode; +import org.apache.asterix.common.transactions.JobId; +import org.apache.asterix.formats.nontagged.BinaryComparatorFactoryProvider; +import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider; +import org.apache.asterix.formats.nontagged.TypeTraitProvider; +import org.apache.asterix.metadata.declared.MetadataProvider; +import org.apache.asterix.metadata.entities.Dataset; +import org.apache.asterix.metadata.entities.Index; +import org.apache.asterix.om.types.ATypeTag; +import org.apache.asterix.om.types.IAType; +import org.apache.asterix.om.utils.NonTaggedFormatUtil; +import org.apache.asterix.runtime.operators.LSMSecondaryIndexBulkLoadOperatorDescriptor; +import org.apache.asterix.runtime.utils.RuntimeUtils; +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.algebricks.common.utils.Pair; +import org.apache.hyracks.algebricks.core.jobgen.impl.ConnectorPolicyAssignmentPolicy; +import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig; +import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory; +import org.apache.hyracks.algebricks.runtime.operators.base.SinkRuntimeFactory; +import org.apache.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor; +import org.apache.hyracks.api.dataflow.IOperatorDescriptor; +import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; +import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer; +import org.apache.hyracks.api.dataflow.value.ITypeTraits; +import org.apache.hyracks.api.dataflow.value.RecordDescriptor; +import org.apache.hyracks.api.job.JobSpecification; +import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor; +import org.apache.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor; +import org.apache.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory; + +@SuppressWarnings("rawtypes") +public class SecondaryCorrelatedRTreeOperationsHelper extends SecondaryCorrelatedTreeIndexOperationsHelper { + + protected IPrimitiveValueProviderFactory[] valueProviderFactories; + protected int numNestedSecondaryKeyFields; + protected ATypeTag keyType; + protected int[] primaryKeyFields; + protected int[] rtreeFields; + protected boolean isPointMBR; + protected RecordDescriptor secondaryRecDescForPointMBR = null; + + protected SecondaryCorrelatedRTreeOperationsHelper(Dataset dataset, Index index, + PhysicalOptimizationConfig physOptConf, MetadataProvider metadataProvider) throws AlgebricksException { + super(dataset, index, physOptConf, metadataProvider); + } + + @Override + protected int getNumSecondaryKeys() { + return numNestedSecondaryKeyFields; + } + + @Override + protected void setSecondaryRecDescAndComparators() throws AlgebricksException { + List<List<String>> secondaryKeyFields = index.getKeyFieldNames(); + int numSecondaryKeys = secondaryKeyFields.size(); + boolean isOverridingKeyFieldTypes = index.isOverridingKeyFieldTypes(); + if (numSecondaryKeys != 1) { + throw AsterixException.create(ErrorCode.INDEX_RTREE_MULTIPLE_FIELDS_NOT_ALLOWED, numSecondaryKeys); + } + Pair<IAType, Boolean> spatialTypePair = + Index.getNonNullableOpenFieldType(index.getKeyFieldTypes().get(0), secondaryKeyFields.get(0), itemType); + IAType spatialType = spatialTypePair.first; + anySecondaryKeyIsNullable = spatialTypePair.second; + isPointMBR = spatialType.getTypeTag() == ATypeTag.POINT || spatialType.getTypeTag() == ATypeTag.POINT3D; + int numDimensions = NonTaggedFormatUtil.getNumDimensions(spatialType.getTypeTag()); + numNestedSecondaryKeyFields = numDimensions * 2; + int recordColumn = NUM_TAG_FIELDS + numPrimaryKeys; + secondaryFieldAccessEvalFactories = + metadataProvider.getFormat().createMBRFactory(isOverridingKeyFieldTypes ? enforcedItemType : itemType, + secondaryKeyFields.get(0), recordColumn, numDimensions, filterFieldName, isPointMBR); + secondaryComparatorFactories = new IBinaryComparatorFactory[numNestedSecondaryKeyFields]; + valueProviderFactories = new IPrimitiveValueProviderFactory[numNestedSecondaryKeyFields]; + ISerializerDeserializer[] secondaryRecFields = + new ISerializerDeserializer[numPrimaryKeys + numNestedSecondaryKeyFields + numFilterFields]; + ISerializerDeserializer[] enforcedRecFields = new ISerializerDeserializer[1 + numPrimaryKeys + numFilterFields]; + secondaryTypeTraits = new ITypeTraits[numNestedSecondaryKeyFields + numPrimaryKeys]; + ITypeTraits[] enforcedTypeTraits = new ITypeTraits[1 + numPrimaryKeys]; + IAType nestedKeyType = NonTaggedFormatUtil.getNestedSpatialType(spatialType.getTypeTag()); + keyType = nestedKeyType.getTypeTag(); + for (int i = 0; i < numNestedSecondaryKeyFields; i++) { + ISerializerDeserializer keySerde = + SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(nestedKeyType); + secondaryRecFields[i] = keySerde; + secondaryComparatorFactories[i] = + BinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(nestedKeyType, true); + secondaryTypeTraits[i] = TypeTraitProvider.INSTANCE.getTypeTrait(nestedKeyType); + valueProviderFactories[i] = + metadataProvider.getStorageComponentProvider().getPrimitiveValueProviderFactory(); + + } + // Add serializers and comparators for primary index fields. + // only support internal datasets + for (int i = 0; i < numPrimaryKeys; i++) { + secondaryRecFields[numNestedSecondaryKeyFields + i] = primaryRecDesc.getFields()[i]; + secondaryTypeTraits[numNestedSecondaryKeyFields + i] = primaryRecDesc.getTypeTraits()[i]; + enforcedRecFields[i] = primaryRecDesc.getFields()[i]; + enforcedTypeTraits[i] = primaryRecDesc.getTypeTraits()[i]; + } + + enforcedRecFields[numPrimaryKeys] = SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(itemType); + enforcedRecDesc = new RecordDescriptor(enforcedRecFields, enforcedTypeTraits); + if (numFilterFields > 0) { + rtreeFields = new int[numNestedSecondaryKeyFields + numPrimaryKeys]; + for (int i = 0; i < rtreeFields.length; i++) { + rtreeFields[i] = i; + } + + Pair<IAType, Boolean> typePair = Index.getNonNullableKeyFieldType(filterFieldName, itemType); + IAType type = typePair.first; + ISerializerDeserializer serde = SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(type); + secondaryRecFields[numPrimaryKeys + numNestedSecondaryKeyFields] = serde; + } + secondaryRecDesc = new RecordDescriptor(secondaryRecFields); + primaryKeyFields = new int[numPrimaryKeys]; + for (int i = 0; i < primaryKeyFields.length; i++) { + primaryKeyFields[i] = i + numNestedSecondaryKeyFields; + } + if (isPointMBR) { + int numNestedSecondaryKeyFieldForPointMBR = numNestedSecondaryKeyFields / 2; + ISerializerDeserializer[] recFieldsForPointMBR = new ISerializerDeserializer[numPrimaryKeys + + numNestedSecondaryKeyFieldForPointMBR + numFilterFields]; + int idx = 0; + for (int i = 0; i < numNestedSecondaryKeyFieldForPointMBR; i++) { + recFieldsForPointMBR[idx++] = secondaryRecFields[i]; + } + for (int i = 0; i < numPrimaryKeys + numFilterFields; i++) { + recFieldsForPointMBR[idx++] = secondaryRecFields[numNestedSecondaryKeyFields + i]; + } + secondaryRecDescForPointMBR = new RecordDescriptor(recFieldsForPointMBR); + } + } + + @Override + public JobSpecification buildLoadingJobSpec() throws AsterixException, AlgebricksException { + /*************************************************** + * [ About PointMBR Optimization ] + * Instead of storing a MBR(4 doubles) for a point(2 doubles) in RTree leaf node, + * PointMBR concept is introduced. + * PointMBR is a way to store a point as 2 doubles in RTree leaf node. + * This reduces RTree index size roughly in half. + * In order to fully benefit from the PointMBR concept, besides RTree, + * external sort operator during bulk-loading (from either data loading or index creation) + * must deal with point as 2 doubles instead of 4 doubles. Otherwise, external sort will suffer from twice as + * many doubles as it actually requires. For this purpose, + * PointMBR specific optimization logic is added as follows: + * 1) CreateMBR function in assign operator generates 2 doubles, instead of 4 doubles. + * 2) External sort operator sorts points represented with 2 doubles. + * 3) Bulk-loading in RTree takes 4 doubles by reading 2 doubles twice and then, + * do the same work as non-point MBR cases. + ***************************************************/ + JobSpecification spec = RuntimeUtils.createJobSpecification(metadataProvider.getApplicationContext()); + int numNestedSecondaryKeFieldsConsideringPointMBR = + isPointMBR ? numNestedSecondaryKeyFields / 2 : numNestedSecondaryKeyFields; + RecordDescriptor secondaryRecDescConsideringPointMBR = isPointMBR + ? getTaggedRecordDescriptor(secondaryRecDescForPointMBR) : getTaggedRecordDescriptor(secondaryRecDesc); + + boolean isOverridingKeyFieldTypes = index.isOverridingKeyFieldTypes(); + + assert dataset.getDatasetType() == DatasetType.INTERNAL; + + // Create dummy key provider for feeding the primary index scan. + IOperatorDescriptor keyProviderOp = DatasetUtil.createDummyKeyProviderOp(spec, dataset, metadataProvider); + JobId jobId = IndexUtil.bindJobEventListener(spec, metadataProvider); + + // Create primary index scan op. + IOperatorDescriptor primaryScanOp = createPrimaryIndexScanDiskComponentsOp(spec, metadataProvider, + getTaggedRecordDescriptor(dataset.getPrimaryRecordDescriptor(metadataProvider)), jobId); + + // Assign op. + IOperatorDescriptor sourceOp = primaryScanOp; + if (isOverridingKeyFieldTypes && !enforcedItemType.equals(itemType)) { + sourceOp = createCastOp(spec, dataset.getDatasetType(), index.isEnforced()); + spec.connect(new OneToOneConnectorDescriptor(spec), primaryScanOp, 0, sourceOp, 0); + } + + AlgebricksMetaOperatorDescriptor asterixAssignOp = createAssignOp(spec, + numNestedSecondaryKeFieldsConsideringPointMBR, secondaryRecDescConsideringPointMBR); + + // Generate compensate tuples for upsert + IOperatorDescriptor processorOp = createTupleProcessorOp(spec, secondaryRecDescConsideringPointMBR, + numNestedSecondaryKeFieldsConsideringPointMBR, numPrimaryKeys, false); + + ExternalSortOperatorDescriptor sortOp = createSortOp(spec, + getTaggedSecondaryComparatorFactories(new IBinaryComparatorFactory[] { + MetadataProvider.proposeLinearizer(keyType, secondaryComparatorFactories.length) }), + secondaryRecDescConsideringPointMBR); + + // Create secondary RTree bulk load op. + LSMSecondaryIndexBulkLoadOperatorDescriptor secondaryBulkLoadOp = createTreeIndexBulkLoadOp(spec, + metadataProvider, secondaryRecDescConsideringPointMBR, createFieldPermutationForBulkLoadOp(), + numNestedSecondaryKeFieldsConsideringPointMBR, numPrimaryKeys, false); + + AlgebricksMetaOperatorDescriptor metaOp = new AlgebricksMetaOperatorDescriptor(spec, 1, 0, + new IPushRuntimeFactory[] { new SinkRuntimeFactory() }, new RecordDescriptor[] {}); + + // Connect the operators. + spec.connect(new OneToOneConnectorDescriptor(spec), keyProviderOp, 0, primaryScanOp, 0); + spec.connect(new OneToOneConnectorDescriptor(spec), sourceOp, 0, asterixAssignOp, 0); + spec.connect(new OneToOneConnectorDescriptor(spec), asterixAssignOp, 0, processorOp, 0); + spec.connect(new OneToOneConnectorDescriptor(spec), processorOp, 0, sortOp, 0); + spec.connect(new OneToOneConnectorDescriptor(spec), sortOp, 0, secondaryBulkLoadOp, 0); + spec.connect(new OneToOneConnectorDescriptor(spec), secondaryBulkLoadOp, 0, metaOp, 0); + spec.addRoot(metaOp); + spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy()); + + return spec; + } + + @Override + protected int[] createFieldPermutationForBulkLoadOp() { + if (isPointMBR) { + int[] fieldPermutation = + new int[NUM_TAG_FIELDS + numNestedSecondaryKeyFields + numPrimaryKeys + numFilterFields]; + int idx = 0; + int numSecondaryKeyFieldsForPointMBR = numNestedSecondaryKeyFields / 2; + for (int i = 0; i < NUM_TAG_FIELDS + numSecondaryKeyFieldsForPointMBR; i++) { + fieldPermutation[idx++] = i; + } + //add the rest of the sk fields for pointMBR + for (int i = 0; i < numSecondaryKeyFieldsForPointMBR; i++) { + fieldPermutation[idx++] = NUM_TAG_FIELDS + i; + } + //add the pk and filter fields + int end = numSecondaryKeyFieldsForPointMBR + numPrimaryKeys + numFilterFields; + for (int i = numSecondaryKeyFieldsForPointMBR; i < end; i++) { + fieldPermutation[idx++] = NUM_TAG_FIELDS + i; + } + return fieldPermutation; + } else { + return super.createFieldPermutationForBulkLoadOp(); + } + + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e856e1e8/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedTreeIndexOperationsHelper.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedTreeIndexOperationsHelper.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedTreeIndexOperationsHelper.java index fa33790..171d72e 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedTreeIndexOperationsHelper.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedTreeIndexOperationsHelper.java @@ -263,8 +263,8 @@ public abstract class SecondaryCorrelatedTreeIndexOperationsHelper extends Secon } protected LSMSecondaryIndexBulkLoadOperatorDescriptor createTreeIndexBulkLoadOp(JobSpecification spec, - MetadataProvider metadataProvider, RecordDescriptor taggedSecondaryRecDesc, int numSecondaryKeys, - int numPrimaryKeys, boolean hasBuddyBtree) throws AlgebricksException { + MetadataProvider metadataProvider, RecordDescriptor taggedSecondaryRecDesc, int[] fieldPermutation, + int numSecondaryKeys, int numPrimaryKeys, boolean hasBuddyBtree) throws AlgebricksException { IndexDataflowHelperFactory primaryIndexHelperFactory = new IndexDataflowHelperFactory( metadataProvider.getStorageComponentProvider().getStorageManager(), primaryFileSplitProvider); @@ -273,7 +273,8 @@ public abstract class SecondaryCorrelatedTreeIndexOperationsHelper extends Secon LSMSecondaryIndexBulkLoadOperatorDescriptor treeIndexBulkLoadOp = new LSMSecondaryIndexBulkLoadOperatorDescriptor(spec, taggedSecondaryRecDesc, primaryIndexHelperFactory, - secondaryIndexHelperFactory, NUM_TAG_FIELDS, numSecondaryKeys, numPrimaryKeys, hasBuddyBtree); + secondaryIndexHelperFactory, fieldPermutation, NUM_TAG_FIELDS, numSecondaryKeys, + numPrimaryKeys, hasBuddyBtree); AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, treeIndexBulkLoadOp, secondaryPartitionConstraint); return treeIndexBulkLoadOp; @@ -306,14 +307,16 @@ public abstract class SecondaryCorrelatedTreeIndexOperationsHelper extends Secon new SecondaryCorrelatedBTreeOperationsHelper(dataset, index, physOptConf, metadataProvider); break; case RTREE: - //TODO RTree + indexOperationsHelper = + new SecondaryCorrelatedRTreeOperationsHelper(dataset, index, physOptConf, metadataProvider); + break; case SINGLE_PARTITION_WORD_INVIX: case SINGLE_PARTITION_NGRAM_INVIX: case LENGTH_PARTITIONED_WORD_INVIX: case LENGTH_PARTITIONED_NGRAM_INVIX: - //TODO Inverted Index - //TODO This will be fixed soon - throw new UnsupportedOperationException(); + indexOperationsHelper = new SecondaryCorrelatedInvertedIndexOperationsHelper(dataset, index, + physOptConf, metadataProvider); + break; default: throw new CompilationException(ErrorCode.COMPILATION_UNKNOWN_INDEX_TYPE, index.getIndexType()); } @@ -321,4 +324,12 @@ public abstract class SecondaryCorrelatedTreeIndexOperationsHelper extends Secon return indexOperationsHelper; } + protected int[] createFieldPermutationForBulkLoadOp() { + int[] fieldPermutation = new int[NUM_TAG_FIELDS + getNumSecondaryKeys() + numPrimaryKeys + numFilterFields]; + for (int i = 0; i < fieldPermutation.length; i++) { + fieldPermutation[i] = i; + } + return fieldPermutation; + } + } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e856e1e8/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryInvertedIndexOperationsHelper.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryInvertedIndexOperationsHelper.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryInvertedIndexOperationsHelper.java index cb15b98..b4d8a22 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryInvertedIndexOperationsHelper.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryInvertedIndexOperationsHelper.java @@ -23,6 +23,7 @@ import org.apache.asterix.common.config.GlobalConfig; import org.apache.asterix.common.exceptions.CompilationException; import org.apache.asterix.common.exceptions.ErrorCode; import org.apache.asterix.common.transactions.JobId; +import org.apache.asterix.dataflow.data.nontagged.MissingWriterFactory; import org.apache.asterix.metadata.declared.MetadataProvider; import org.apache.asterix.metadata.entities.Dataset; import org.apache.asterix.metadata.entities.Index; @@ -213,8 +214,8 @@ public class SecondaryInvertedIndexOperationsHelper extends SecondaryTreeIndexOp IOperatorDescriptor keyProviderOp = DatasetUtil.createDummyKeyProviderOp(spec, dataset, metadataProvider); // Create primary index scan op. - IOperatorDescriptor primaryScanOp = DatasetUtil.createPrimaryIndexScanOp(spec, metadataProvider, dataset, - jobId); + IOperatorDescriptor primaryScanOp = + DatasetUtil.createPrimaryIndexScanOp(spec, metadataProvider, dataset, jobId); IOperatorDescriptor sourceOp = primaryScanOp; boolean isOverridingKeyFieldTypes = index.isOverridingKeyFieldTypes(); @@ -270,8 +271,9 @@ public class SecondaryInvertedIndexOperationsHelper extends SecondaryTreeIndexOp for (int i = 0; i < primaryKeyFields.length; i++) { primaryKeyFields[i] = numSecondaryKeys + i; } - BinaryTokenizerOperatorDescriptor tokenizerOp = new BinaryTokenizerOperatorDescriptor(spec, tokenKeyPairRecDesc, - tokenizerFactory, docField, primaryKeyFields, isPartitioned, false); + BinaryTokenizerOperatorDescriptor tokenizerOp = + new BinaryTokenizerOperatorDescriptor(spec, tokenKeyPairRecDesc, tokenizerFactory, docField, + primaryKeyFields, isPartitioned, false, false, MissingWriterFactory.INSTANCE); AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, tokenizerOp, primaryPartitionConstraint); return tokenizerOp; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e856e1e8/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AbstractLSMSecondaryIndexCreationNodePushable.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AbstractLSMSecondaryIndexCreationNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AbstractLSMSecondaryIndexCreationNodePushable.java index 6b050af..0d71f71 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AbstractLSMSecondaryIndexCreationNodePushable.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AbstractLSMSecondaryIndexCreationNodePushable.java @@ -24,7 +24,6 @@ import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.data.std.primitive.BooleanPointable; import org.apache.hyracks.data.std.primitive.IntegerPointable; import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor; -import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference; import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable; @@ -33,9 +32,6 @@ public abstract class AbstractLSMSecondaryIndexCreationNodePushable protected final IHyracksTaskContext ctx; protected final RecordDescriptor inputRecDesc; - // with tag fields - protected final FrameTupleReference tuple; - protected final int partition; protected final int numTagFields; protected final int numSecondaryKeys; @@ -50,7 +46,6 @@ public abstract class AbstractLSMSecondaryIndexCreationNodePushable boolean hasBuddyBTree) { this.ctx = ctx; this.inputRecDesc = inputRecDesc; - this.tuple = new FrameTupleReference(); this.partition = partition; this.numTagFields = numTagFields; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e856e1e8/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadNodePushable.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadNodePushable.java index f09bcd2..2eb84fb 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadNodePushable.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadNodePushable.java @@ -29,6 +29,7 @@ import org.apache.hyracks.data.std.primitive.LongPointable; import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper; import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory; +import org.apache.hyracks.storage.am.common.tuples.PermutingFrameTupleReference; import org.apache.hyracks.storage.am.common.tuples.PermutingTupleReference; import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentBulkLoader; @@ -45,7 +46,7 @@ import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex; */ public class LSMSecondaryIndexBulkLoadNodePushable extends AbstractLSMSecondaryIndexCreationNodePushable { // with tag fields - + private final PermutingFrameTupleReference tuple; private final PermutingTupleReference sourceTuple; private final PermutingTupleReference deletedKeyTuple; @@ -63,16 +64,17 @@ public class LSMSecondaryIndexBulkLoadNodePushable extends AbstractLSMSecondaryI public LSMSecondaryIndexBulkLoadNodePushable(IHyracksTaskContext ctx, int partition, RecordDescriptor inputRecDesc, IIndexDataflowHelperFactory primaryIndexHelperFactory, - IIndexDataflowHelperFactory secondaryIndexHelperFactory, int numTagFields, int numSecondaryKeys, - int numPrimaryKeys, boolean hasBuddyBTree) throws HyracksDataException { + IIndexDataflowHelperFactory secondaryIndexHelperFactory, int[] fieldPermutation, int numTagFields, + int numSecondaryKeys, int numPrimaryKeys, boolean hasBuddyBTree) throws HyracksDataException { super(ctx, partition, inputRecDesc, numTagFields, numSecondaryKeys, numPrimaryKeys, hasBuddyBTree); this.primaryIndexHelper = primaryIndexHelperFactory.create(ctx.getJobletContext().getServiceContext(), partition); this.secondaryIndexHelper = secondaryIndexHelperFactory.create(ctx.getJobletContext().getServiceContext(), partition); + this.tuple = new PermutingFrameTupleReference(fieldPermutation); - int[] sourcePermutation = new int[inputRecDesc.getFieldCount() - numTagFields]; + int[] sourcePermutation = new int[fieldPermutation.length - numTagFields]; for (int i = 0; i < sourcePermutation.length; i++) { sourcePermutation[i] = i + numTagFields; } @@ -170,7 +172,6 @@ public class LSMSecondaryIndexBulkLoadNodePushable extends AbstractLSMSecondaryI loadNewComponent(componentPos); currentComponentPos = componentPos; } - if (isAntiMatterTuple(tuple)) { addAntiMatterTuple(tuple); } else { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e856e1e8/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadOperatorDescriptor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadOperatorDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadOperatorDescriptor.java index 804b700..4f1dfd7 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadOperatorDescriptor.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadOperatorDescriptor.java @@ -35,6 +35,7 @@ public class LSMSecondaryIndexBulkLoadOperatorDescriptor extends AbstractSingleA private final IIndexDataflowHelperFactory primaryIndexHelperFactory; private final IIndexDataflowHelperFactory secondaryIndexHelperFactory; + private final int[] fieldPermutation; private final int numTagFields; private final int numSecondaryKeys; private final int numPrimaryKeys; @@ -43,12 +44,13 @@ public class LSMSecondaryIndexBulkLoadOperatorDescriptor extends AbstractSingleA public LSMSecondaryIndexBulkLoadOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor outRecDesc, IIndexDataflowHelperFactory primaryIndexHelperFactory, - IIndexDataflowHelperFactory secondaryIndexHelperFactory, int numTagFields, int numSecondaryKeys, - int numPrimaryKeys, boolean hasBuddyBTree) { + IIndexDataflowHelperFactory secondaryIndexHelperFactory, int[] fieldPermutation, int numTagFields, + int numSecondaryKeys, int numPrimaryKeys, boolean hasBuddyBTree) { super(spec, 1, 1); this.outRecDescs[0] = outRecDesc; this.primaryIndexHelperFactory = primaryIndexHelperFactory; this.secondaryIndexHelperFactory = secondaryIndexHelperFactory; + this.fieldPermutation = fieldPermutation; this.numTagFields = numTagFields; this.numSecondaryKeys = numSecondaryKeys; this.numPrimaryKeys = numPrimaryKeys; @@ -61,6 +63,7 @@ public class LSMSecondaryIndexBulkLoadOperatorDescriptor extends AbstractSingleA IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException { return new LSMSecondaryIndexBulkLoadNodePushable(ctx, partition, recordDescProvider.getInputRecordDescriptor(getActivityId(), 0), primaryIndexHelperFactory, - secondaryIndexHelperFactory, numTagFields, numSecondaryKeys, numPrimaryKeys, hasBuddyBTree); + secondaryIndexHelperFactory, fieldPermutation, numTagFields, numSecondaryKeys, numPrimaryKeys, + hasBuddyBTree); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e856e1e8/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexCreationTupleProcessorNodePushable.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexCreationTupleProcessorNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexCreationTupleProcessorNodePushable.java index b2a2fa4..9376d1b 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexCreationTupleProcessorNodePushable.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexCreationTupleProcessorNodePushable.java @@ -36,6 +36,7 @@ import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder; import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleReference; import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender; import org.apache.hyracks.dataflow.common.comm.util.FrameUtils; +import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference; import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; import org.apache.hyracks.dataflow.common.data.marshalling.BooleanSerializerDeserializer; import org.apache.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer; @@ -69,6 +70,7 @@ import org.apache.hyracks.dataflow.std.base.AbstractStateObject; * [component pos, anti-matter flag, secondary keys, primary keys, filter values] */ public class LSMSecondaryIndexCreationTupleProcessorNodePushable extends AbstractLSMSecondaryIndexCreationNodePushable { + private final FrameTupleReference tuple = new FrameTupleReference(); // prevSourceTuple stores the previous matter tuple private final ArrayTupleBuilder prevMatterTupleBuilder; private final ArrayTupleReference prevMatterTuple = new ArrayTupleReference(); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e856e1e8/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/BinaryTokenizerOperatorDescriptor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/BinaryTokenizerOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/BinaryTokenizerOperatorDescriptor.java index fe9decd..3ed47e6 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/BinaryTokenizerOperatorDescriptor.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/BinaryTokenizerOperatorDescriptor.java @@ -21,6 +21,7 @@ package org.apache.hyracks.storage.am.lsm.invertedindex.dataflow; import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.dataflow.IOperatorNodePushable; +import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory; import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider; import org.apache.hyracks.api.dataflow.value.RecordDescriptor; import org.apache.hyracks.api.exceptions.HyracksDataException; @@ -47,8 +48,13 @@ public class BinaryTokenizerOperatorDescriptor extends AbstractSingleActivityOpe // False: [token, number of token(if a partitioned index), keyfield1, keyfield2 ...] private final boolean writeKeyFieldsFirst; + private final boolean writeMissing; + + private final IMissingWriterFactory missingWriterFactory; + public BinaryTokenizerOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor recDesc, - IBinaryTokenizerFactory tokenizerFactory, int docField, int[] keyFields, boolean addNumTokensKey, boolean writeKeyFieldsFirst) { + IBinaryTokenizerFactory tokenizerFactory, int docField, int[] keyFields, boolean addNumTokensKey, + boolean writeKeyFieldsFirst, boolean writeMissing, IMissingWriterFactory missingWriterFactory) { super(spec, 1, 1); this.tokenizerFactory = tokenizerFactory; this.docField = docField; @@ -56,13 +62,16 @@ public class BinaryTokenizerOperatorDescriptor extends AbstractSingleActivityOpe this.addNumTokensKey = addNumTokensKey; outRecDescs[0] = recDesc; this.writeKeyFieldsFirst = writeKeyFieldsFirst; + this.writeMissing = writeMissing; + this.missingWriterFactory = missingWriterFactory; } @Override public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx, IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException { - return new BinaryTokenizerOperatorNodePushable(ctx, recordDescProvider.getInputRecordDescriptor( - getActivityId(), 0), outRecDescs[0], tokenizerFactory.createTokenizer(), docField, keyFields, - addNumTokensKey, writeKeyFieldsFirst); + return new BinaryTokenizerOperatorNodePushable(ctx, + recordDescProvider.getInputRecordDescriptor(getActivityId(), 0), outRecDescs[0], + tokenizerFactory.createTokenizer(), docField, keyFields, addNumTokensKey, writeKeyFieldsFirst, + writeMissing, missingWriterFactory); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e856e1e8/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/BinaryTokenizerOperatorNodePushable.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/BinaryTokenizerOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/BinaryTokenizerOperatorNodePushable.java index 002457b..3df185a 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/BinaryTokenizerOperatorNodePushable.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/BinaryTokenizerOperatorNodePushable.java @@ -24,6 +24,8 @@ import java.nio.ByteBuffer; import org.apache.hyracks.api.comm.VSizeFrame; import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.dataflow.value.IMissingWriter; +import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory; import org.apache.hyracks.api.dataflow.value.RecordDescriptor; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.data.std.util.GrowableArray; @@ -31,6 +33,8 @@ import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder; import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor; import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender; import org.apache.hyracks.dataflow.common.comm.util.FrameUtils; +import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference; +import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable; import org.apache.hyracks.storage.am.lsm.invertedindex.tokenizers.IBinaryTokenizer; import org.apache.hyracks.storage.am.lsm.invertedindex.tokenizers.IToken; @@ -45,6 +49,9 @@ public class BinaryTokenizerOperatorNodePushable extends AbstractUnaryInputUnary private final boolean writeKeyFieldsFirst; private final RecordDescriptor inputRecDesc; private final RecordDescriptor outputRecDesc; + private final boolean writeMissing; + private final IMissingWriter missingWriter; + private final FrameTupleReference tuple = new FrameTupleReference(); private FrameTupleAccessor accessor; private ArrayTupleBuilder builder; @@ -53,7 +60,8 @@ public class BinaryTokenizerOperatorNodePushable extends AbstractUnaryInputUnary public BinaryTokenizerOperatorNodePushable(IHyracksTaskContext ctx, RecordDescriptor inputRecDesc, RecordDescriptor outputRecDesc, IBinaryTokenizer tokenizer, int docField, int[] keyFields, - boolean addNumTokensKey, boolean writeKeyFieldsFirst) { + boolean addNumTokensKey, boolean writeKeyFieldsFirst, boolean writeMissing, + IMissingWriterFactory missingWriterFactory) { this.ctx = ctx; this.tokenizer = tokenizer; this.docField = docField; @@ -62,6 +70,8 @@ public class BinaryTokenizerOperatorNodePushable extends AbstractUnaryInputUnary this.inputRecDesc = inputRecDesc; this.outputRecDesc = outputRecDesc; this.writeKeyFieldsFirst = writeKeyFieldsFirst; + this.writeMissing = writeMissing; + this.missingWriter = missingWriterFactory.createMissingWriter(); } @Override @@ -79,76 +89,85 @@ public class BinaryTokenizerOperatorNodePushable extends AbstractUnaryInputUnary int tupleCount = accessor.getTupleCount(); for (int i = 0; i < tupleCount; i++) { - short numTokens = 0; + tuple.reset(accessor, i); - tokenizer.reset(accessor.getBuffer().array(), accessor.getTupleStartOffset(i) - + accessor.getFieldSlotsLength() + accessor.getFieldStartOffset(i, docField), - accessor.getFieldLength(i, docField)); + short numTokens = 0; - if (addNumTokensKey) { - // Get the total number of tokens. - numTokens = tokenizer.getTokensCount(); + if (!isDocFieldMissing(tuple)) { + tokenizer.reset(tuple.getFieldData(docField), tuple.getFieldStart(docField), + tuple.getFieldLength(docField)); + if (addNumTokensKey) { + // Get the total number of tokens. + numTokens = tokenizer.getTokensCount(); + } + // Write token and data into frame by following the order specified + // in the writeKeyFieldsFirst field. + while (tokenizer.hasNext()) { + tokenizer.next(); + IToken token = tokenizer.getToken(); + writeTuple(token, numTokens, i); + } + } else if (writeMissing) { + writeTuple(null, 0, i); } + } - // Write token and data into frame by following the order specified - // in the writeKeyFieldsFirst field. - while (tokenizer.hasNext()) { - - tokenizer.next(); - - builder.reset(); - - // Writing Order: token, number of token, keyfield1 ... n - if (!writeKeyFieldsFirst) { - try { - IToken token = tokenizer.getToken(); - token.serializeToken(builderData); - - builder.addFieldEndOffset(); - // Add number of tokens if requested. - if (addNumTokensKey) { - builder.getDataOutput().writeShort(numTokens); - builder.addFieldEndOffset(); - } - } catch (IOException e) { - throw new HyracksDataException(e.getMessage()); - } - - for (int k = 0; k < keyFields.length; k++) { - builder.addField(accessor, i, keyFields[k]); - } + } + private void writeTuple(IToken token, int numTokens, int fieldIdx) throws HyracksDataException { + builder.reset(); + + // Writing Order: token, number of token, keyfield1 ... n + if (!writeKeyFieldsFirst) { + try { + if (token != null) { + token.serializeToken(builderData); + builder.addFieldEndOffset(); + } else { + missingWriter.writeMissing(builder.getDataOutput()); + builder.addFieldEndOffset(); } - // Writing Order: keyfield1 ... n, token, number of token - else { - - for (int k = 0; k < keyFields.length; k++) { - builder.addField(accessor, i, keyFields[k]); - } - - try { - IToken token = tokenizer.getToken(); - token.serializeToken(builderData); - - builder.addFieldEndOffset(); - // Add number of tokens if requested. - if (addNumTokensKey) { - builder.getDataOutput().writeShort(numTokens); - builder.addFieldEndOffset(); - } - } catch (IOException e) { - throw new HyracksDataException(e.getMessage()); - } + // Add number of tokens if requested. + if (addNumTokensKey) { + builder.getDataOutput().writeShort(numTokens); + builder.addFieldEndOffset(); } + } catch (IOException e) { + throw HyracksDataException.create(e); + } - FrameUtils.appendToWriter(writer, appender, builder.getFieldEndOffsets(), builder.getByteArray(), 0, - builder.getSize()); + for (int k = 0; k < keyFields.length; k++) { + builder.addField(accessor, fieldIdx, keyFields[k]); + } + } + // Writing Order: keyfield1 ... n, token, number of token + else { + for (int k = 0; k < keyFields.length; k++) { + builder.addField(accessor, fieldIdx, keyFields[k]); } + try { + if (token != null) { + token.serializeToken(builderData); + builder.addFieldEndOffset(); + } else { + missingWriter.writeMissing(builder.getDataOutput()); + builder.addFieldEndOffset(); + } + // Add number of tokens if requested. + if (addNumTokensKey) { + builder.getDataOutput().writeShort(numTokens); + builder.addFieldEndOffset(); + } + } catch (IOException e) { + throw HyracksDataException.create(e); + } } + FrameUtils.appendToWriter(writer, appender, builder.getFieldEndOffsets(), builder.getByteArray(), 0, + builder.getSize()); } @Override @@ -169,4 +188,14 @@ public class BinaryTokenizerOperatorNodePushable extends AbstractUnaryInputUnary public void flush() throws HyracksDataException { appender.flush(writer); } + + /** + * Returns whether the doc field is missing (only with a type tag) + * + * @param tuple + * @return + */ + private boolean isDocFieldMissing(ITupleReference tuple) { + return tuple.getFieldLength(docField) <= 1; + } }
