http://git-wip-us.apache.org/repos/asf/asterixdb/blob/39390edc/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java index 2f94ad7..fb9901d 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java @@ -87,6 +87,8 @@ import org.apache.asterix.om.utils.NonTaggedFormatUtil; import org.apache.asterix.runtime.base.AsterixTupleFilterFactory; import org.apache.asterix.runtime.formats.FormatUtils; import org.apache.asterix.runtime.job.listener.JobEventListenerFactory; +import org.apache.asterix.runtime.operators.LSMIndexBulkLoadOperatorDescriptor; +import org.apache.asterix.runtime.operators.LSMIndexBulkLoadOperatorDescriptor.BulkLoadUsage; import org.apache.asterix.runtime.operators.LSMSecondaryUpsertOperatorDescriptor; import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint; @@ -133,7 +135,6 @@ import org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFa import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory; import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory; import org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory; -import org.apache.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor; import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation; import org.apache.hyracks.storage.am.lsm.invertedindex.dataflow.BinaryTokenizerOperatorDescriptor; import org.apache.hyracks.storage.am.lsm.invertedindex.tokenizers.IBinaryTokenizerFactory; @@ -595,9 +596,9 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> // bulkload?) IIndexDataflowHelperFactory indexHelperFactory = new IndexDataflowHelperFactory( storageComponentProvider.getStorageManager(), splitsAndConstraint.first); - TreeIndexBulkLoadOperatorDescriptor btreeBulkLoad = - new TreeIndexBulkLoadOperatorDescriptor(spec, null, fieldPermutation, - GlobalConfig.DEFAULT_TREE_FILL_FACTOR, false, numElementsHint, true, indexHelperFactory); + LSMIndexBulkLoadOperatorDescriptor btreeBulkLoad = new LSMIndexBulkLoadOperatorDescriptor(spec, null, + fieldPermutation, GlobalConfig.DEFAULT_TREE_FILL_FACTOR, false, numElementsHint, true, + indexHelperFactory, null, BulkLoadUsage.LOAD, dataset.getDatasetId()); return new Pair<>(btreeBulkLoad, splitsAndConstraint.second); } catch (MetadataException me) { throw new AlgebricksException(me); @@ -1001,8 +1002,9 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> IOperatorDescriptor op; if (bulkload) { long numElementsHint = getCardinalityPerPartitionHint(dataset); - op = new TreeIndexBulkLoadOperatorDescriptor(spec, inputRecordDesc, fieldPermutation, - GlobalConfig.DEFAULT_TREE_FILL_FACTOR, true, numElementsHint, true, idfh); + op = new LSMIndexBulkLoadOperatorDescriptor(spec, inputRecordDesc, fieldPermutation, + GlobalConfig.DEFAULT_TREE_FILL_FACTOR, true, numElementsHint, true, idfh, null, + BulkLoadUsage.LOAD, dataset.getDatasetId()); } else { op = new LSMTreeInsertDeleteOperatorDescriptor(spec, inputRecordDesc, fieldPermutation, indexOp, idfh, null, true, modificationCallbackFactory); @@ -1135,8 +1137,9 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> IOperatorDescriptor op; if (bulkload) { long numElementsHint = getCardinalityPerPartitionHint(dataset); - op = new TreeIndexBulkLoadOperatorDescriptor(spec, inputRecordDesc, fieldPermutation, - GlobalConfig.DEFAULT_TREE_FILL_FACTOR, false, numElementsHint, false, idfh); + op = new LSMIndexBulkLoadOperatorDescriptor(spec, inputRecordDesc, fieldPermutation, + GlobalConfig.DEFAULT_TREE_FILL_FACTOR, false, numElementsHint, false, idfh, null, + BulkLoadUsage.LOAD, dataset.getDatasetId()); } else if (indexOp == IndexOperation.UPSERT) { op = new LSMSecondaryUpsertOperatorDescriptor(spec, inputRecordDesc, fieldPermutation, idfh, filterFactory, modificationCallbackFactory, prevFieldPermutation); @@ -1237,9 +1240,9 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> IOperatorDescriptor op; if (bulkload) { long numElementsHint = getCardinalityPerPartitionHint(dataset); - op = new TreeIndexBulkLoadOperatorDescriptor(spec, recordDesc, fieldPermutation, + op = new LSMIndexBulkLoadOperatorDescriptor(spec, recordDesc, fieldPermutation, GlobalConfig.DEFAULT_TREE_FILL_FACTOR, false, numElementsHint, false, - indexDataflowHelperFactory); + indexDataflowHelperFactory, null, BulkLoadUsage.LOAD, dataset.getDatasetId()); } else if (indexOp == IndexOperation.UPSERT) { op = new LSMSecondaryUpsertOperatorDescriptor(spec, recordDesc, fieldPermutation, indexDataflowHelperFactory, filterFactory, modificationCallbackFactory, prevFieldPermutation); @@ -1353,8 +1356,9 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> IOperatorDescriptor op; if (bulkload) { long numElementsHint = getCardinalityPerPartitionHint(dataset); - op = new TreeIndexBulkLoadOperatorDescriptor(spec, recordDesc, fieldPermutation, - GlobalConfig.DEFAULT_TREE_FILL_FACTOR, false, numElementsHint, false, indexDataFlowFactory); + op = new LSMIndexBulkLoadOperatorDescriptor(spec, recordDesc, fieldPermutation, + GlobalConfig.DEFAULT_TREE_FILL_FACTOR, false, numElementsHint, false, indexDataFlowFactory, + null, BulkLoadUsage.LOAD, dataset.getDatasetId()); } else if (indexOp == IndexOperation.UPSERT) { op = new LSMSecondaryUpsertOperatorDescriptor(spec, recordDesc, fieldPermutation, indexDataFlowFactory, filterFactory, modificationCallbackFactory, prevFieldPermutation);
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/39390edc/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java index 3fec73b..e5f97f0 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java @@ -30,6 +30,7 @@ import java.util.stream.IntStream; import org.apache.asterix.active.IActiveEntityEventsListener; import org.apache.asterix.active.IActiveNotificationHandler; import org.apache.asterix.common.config.DatasetConfig.DatasetType; +import org.apache.asterix.common.context.DatasetLSMComponentIdGeneratorFactory; import org.apache.asterix.common.context.CorrelatedPrefixMergePolicyFactory; import org.apache.asterix.common.context.IStorageComponentProvider; import org.apache.asterix.common.dataflow.NoOpFrameOperationCallbackFactory; @@ -107,6 +108,7 @@ import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory; import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory; import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation; import org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallbackFactory; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGeneratorFactory; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory; import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory; import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerFactory; @@ -505,15 +507,15 @@ public class Dataset implements IMetadataEntity<Dataset>, IDataset { case BTREE: return getDatasetType() == DatasetType.EXTERNAL && !index.getIndexName().equals(IndexingConstants.getFilesIndexName(getDatasetName())) - ? LSMBTreeWithBuddyIOOperationCallbackFactory.INSTANCE - : LSMBTreeIOOperationCallbackFactory.INSTANCE; + ? new LSMBTreeWithBuddyIOOperationCallbackFactory(getComponentIdGeneratorFactory()) + : new LSMBTreeIOOperationCallbackFactory(getComponentIdGeneratorFactory()); case RTREE: - return LSMRTreeIOOperationCallbackFactory.INSTANCE; + return new LSMRTreeIOOperationCallbackFactory(getComponentIdGeneratorFactory()); case LENGTH_PARTITIONED_NGRAM_INVIX: case LENGTH_PARTITIONED_WORD_INVIX: case SINGLE_PARTITION_NGRAM_INVIX: case SINGLE_PARTITION_WORD_INVIX: - return LSMInvertedIndexIOOperationCallbackFactory.INSTANCE; + return new LSMInvertedIndexIOOperationCallbackFactory(getComponentIdGeneratorFactory()); default: throw new CompilationException(ErrorCode.COMPILATION_UNKNOWN_INDEX_TYPE, index.getIndexType().toString()); @@ -532,6 +534,10 @@ public class Dataset implements IMetadataEntity<Dataset>, IDataset { : new SecondaryIndexOperationTrackerFactory(getDatasetId()); } + public ILSMComponentIdGeneratorFactory getComponentIdGeneratorFactory() { + return new DatasetLSMComponentIdGeneratorFactory(getDatasetId()); + } + /** * Get search callback factory for this dataset with the passed index and operation * http://git-wip-us.apache.org/repos/asf/asterixdb/blob/39390edc/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java index 5dac407..7701f65 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java @@ -19,8 +19,6 @@ package org.apache.asterix.metadata.utils; -import static org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor.DropOption; - import java.util.Collections; import java.util.List; import java.util.Map; @@ -51,6 +49,8 @@ import org.apache.asterix.om.types.IAType; import org.apache.asterix.runtime.evaluators.functions.AndDescriptor; import org.apache.asterix.runtime.evaluators.functions.IsUnknownDescriptor; import org.apache.asterix.runtime.evaluators.functions.NotDescriptor; +import org.apache.asterix.runtime.operators.LSMIndexBulkLoadOperatorDescriptor; +import org.apache.asterix.runtime.operators.LSMIndexBulkLoadOperatorDescriptor.BulkLoadUsage; import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint; import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; @@ -71,6 +71,8 @@ import org.apache.hyracks.api.job.JobSpecification; import org.apache.hyracks.dataflow.std.file.IFileSplitProvider; import org.apache.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor; import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory; +import org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory; +import org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor.DropOption; import org.apache.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor; import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory; @@ -124,8 +126,8 @@ public abstract class SecondaryIndexOperationsHelper { this.index = index; this.physOptConf = physOptConf; this.metadataProvider = metadataProvider; - this.itemType = (ARecordType) metadataProvider.findType(dataset.getItemTypeDataverseName(), - dataset.getItemTypeName()); + this.itemType = + (ARecordType) metadataProvider.findType(dataset.getItemTypeDataverseName(), dataset.getItemTypeName()); this.metaType = DatasetUtil.getMetaType(metadataProvider, dataset); Pair<ARecordType, ARecordType> enforcedTypes = getEnforcedType(index, itemType, metaType); this.enforcedItemType = enforcedTypes.first; @@ -341,11 +343,15 @@ public abstract class SecondaryIndexOperationsHelper { return sortOp; } - protected TreeIndexBulkLoadOperatorDescriptor createTreeIndexBulkLoadOp(JobSpecification spec, + protected LSMIndexBulkLoadOperatorDescriptor createTreeIndexBulkLoadOp(JobSpecification spec, int[] fieldPermutation, IIndexDataflowHelperFactory dataflowHelperFactory, float fillFactor) throws AlgebricksException { - TreeIndexBulkLoadOperatorDescriptor treeIndexBulkLoadOp = new TreeIndexBulkLoadOperatorDescriptor(spec, - secondaryRecDesc, fieldPermutation, fillFactor, false, numElementsHint, false, dataflowHelperFactory); + IndexDataflowHelperFactory primaryIndexDataflowHelperFactory = new IndexDataflowHelperFactory( + metadataProvider.getStorageComponentProvider().getStorageManager(), primaryFileSplitProvider); + + LSMIndexBulkLoadOperatorDescriptor treeIndexBulkLoadOp = new LSMIndexBulkLoadOperatorDescriptor(spec, + secondaryRecDesc, fieldPermutation, fillFactor, false, numElementsHint, false, dataflowHelperFactory, + primaryIndexDataflowHelperFactory, BulkLoadUsage.CREATE_INDEX, dataset.getDatasetId()); AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, treeIndexBulkLoadOp, secondaryPartitionConstraint); return treeIndexBulkLoadOp; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/39390edc/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMIndexBulkLoadOperatorDescriptor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMIndexBulkLoadOperatorDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMIndexBulkLoadOperatorDescriptor.java new file mode 100644 index 0000000..74590c7 --- /dev/null +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMIndexBulkLoadOperatorDescriptor.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.asterix.runtime.operators; + +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.dataflow.IOperatorNodePushable; +import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider; +import org.apache.hyracks.api.dataflow.value.RecordDescriptor; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.job.IOperatorDescriptorRegistry; +import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory; +import org.apache.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor; + +public class LSMIndexBulkLoadOperatorDescriptor extends TreeIndexBulkLoadOperatorDescriptor { + + private static final long serialVersionUID = 1L; + + public enum BulkLoadUsage { + LOAD, + CREATE_INDEX + } + + protected final IIndexDataflowHelperFactory primaryIndexHelperFactory; + + protected final BulkLoadUsage usage; + + protected final int datasetId; + + public LSMIndexBulkLoadOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor outRecDesc, + int[] fieldPermutation, float fillFactor, boolean verifyInput, long numElementsHint, + boolean checkIfEmptyIndex, IIndexDataflowHelperFactory indexHelperFactory, + IIndexDataflowHelperFactory primaryIndexHelperFactory, BulkLoadUsage usage, int datasetId) { + super(spec, outRecDesc, fieldPermutation, fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex, + indexHelperFactory); + this.primaryIndexHelperFactory = primaryIndexHelperFactory; + this.usage = usage; + this.datasetId = datasetId; + } + + @Override + public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx, + IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException { + return new LSMIndexBulkLoadOperatorNodePushable(indexHelperFactory, primaryIndexHelperFactory, ctx, partition, + fieldPermutation, fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex, + recordDescProvider.getInputRecordDescriptor(this.getActivityId(), 0), usage, datasetId); + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/39390edc/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMIndexBulkLoadOperatorNodePushable.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMIndexBulkLoadOperatorNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMIndexBulkLoadOperatorNodePushable.java new file mode 100644 index 0000000..2415556 --- /dev/null +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMIndexBulkLoadOperatorNodePushable.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.runtime.operators; + +import java.util.List; + +import org.apache.asterix.common.api.IDatasetLifecycleManager; +import org.apache.asterix.common.api.INcApplicationContext; +import org.apache.asterix.runtime.operators.LSMIndexBulkLoadOperatorDescriptor.BulkLoadUsage; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.dataflow.value.RecordDescriptor; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper; +import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory; +import org.apache.hyracks.storage.am.common.dataflow.IndexBulkLoadOperatorNodePushable; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; +import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId; +import org.apache.hyracks.storage.am.lsm.common.impls.LSMIndexDiskComponentBulkLoader; +import org.apache.hyracks.storage.am.lsm.common.util.LSMComponentIdUtils; + +public class LSMIndexBulkLoadOperatorNodePushable extends IndexBulkLoadOperatorNodePushable { + protected final BulkLoadUsage usage; + + protected final IIndexDataflowHelper primaryIndexHelper; + protected final IDatasetLifecycleManager datasetManager; + protected final int datasetId; + protected final int partition; + + protected ILSMIndex primaryIndex; + + public LSMIndexBulkLoadOperatorNodePushable(IIndexDataflowHelperFactory indexDataflowHelperFactory, + IIndexDataflowHelperFactory priamryIndexDataflowHelperFactory, IHyracksTaskContext ctx, int partition, + int[] fieldPermutation, float fillFactor, boolean verifyInput, long numElementsHint, + boolean checkIfEmptyIndex, RecordDescriptor recDesc, BulkLoadUsage usage, int datasetId) + throws HyracksDataException { + super(indexDataflowHelperFactory, ctx, partition, fieldPermutation, fillFactor, verifyInput, numElementsHint, + checkIfEmptyIndex, recDesc); + + if (priamryIndexDataflowHelperFactory != null) { + this.primaryIndexHelper = + priamryIndexDataflowHelperFactory.create(ctx.getJobletContext().getServiceContext(), partition); + } else { + this.primaryIndexHelper = null; + } + this.usage = usage; + this.datasetId = datasetId; + this.partition = partition; + INcApplicationContext ncCtx = + (INcApplicationContext) ctx.getJobletContext().getServiceContext().getApplicationContext(); + datasetManager = ncCtx.getDatasetLifecycleManager(); + } + + @Override + protected void initializeBulkLoader() throws HyracksDataException { + ILSMIndex targetIndex = (ILSMIndex) index; + if (usage.equals(BulkLoadUsage.LOAD)) { + // for a loaded dataset, we use the default Id 0 which is guaranteed to be smaller + // than Ids of all memory components + + // TODO handle component Id for datasets loaded multiple times + // TODO move this piece of code to io operation callback + bulkLoader = targetIndex.createBulkLoader(fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex); + ILSMDiskComponent diskComponent = ((LSMIndexDiskComponentBulkLoader) bulkLoader).getComponent(); + LSMComponentIdUtils.persist(LSMComponentId.DEFAULT_COMPONENT_ID, diskComponent.getMetadata()); + } else { + primaryIndexHelper.open(); + primaryIndex = (ILSMIndex) primaryIndexHelper.getIndexInstance(); + List<ILSMDiskComponent> primaryComponents = primaryIndex.getDiskComponents(); + bulkLoader = targetIndex.createBulkLoader(fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex); + if (!primaryComponents.isEmpty()) { + // TODO move this piece of code to io operation callback + // Ideally, this should be done in io operation callback when a bulk load operation is finished + // However, currently we don't have an extensible callback mechanism to support this + ILSMComponentId bulkloadId = LSMComponentIdUtils.union(primaryComponents.get(0).getId(), + primaryComponents.get(primaryComponents.size() - 1).getId()); + ILSMDiskComponent diskComponent = ((LSMIndexDiskComponentBulkLoader) bulkLoader).getComponent(); + LSMComponentIdUtils.persist(bulkloadId, diskComponent.getMetadata()); + } + } + } + + @Override + public void close() throws HyracksDataException { + try { + super.close(); + } finally { + if (primaryIndex != null) { + primaryIndexHelper.close(); + } + } + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/39390edc/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadNodePushable.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadNodePushable.java index cee20ce..6d9ec47 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadNodePushable.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadNodePushable.java @@ -19,24 +19,20 @@ package org.apache.asterix.runtime.operators; import java.nio.ByteBuffer; -import java.util.List; import org.apache.asterix.runtime.operators.LSMSecondaryIndexCreationTupleProcessorNodePushable.DeletedTupleCounter; import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.dataflow.value.RecordDescriptor; import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.data.std.primitive.LongPointable; import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper; import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory; import org.apache.hyracks.storage.am.common.tuples.PermutingFrameTupleReference; import org.apache.hyracks.storage.am.common.tuples.PermutingTupleReference; import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentBulkLoader; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentId; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; -import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex; +import org.apache.hyracks.storage.am.lsm.common.impls.LSMIndexDiskComponentBulkLoader; +import org.apache.hyracks.storage.am.lsm.common.util.LSMComponentIdUtils; /** * This operator node is used to bulk load incoming tuples (scanned from the primary index) @@ -56,12 +52,9 @@ public class LSMSecondaryIndexBulkLoadNodePushable extends AbstractLSMSecondaryI private ILSMIndex primaryIndex; private ILSMIndex secondaryIndex; - private ILSMDiskComponent component; - private ILSMDiskComponentBulkLoader componentBulkLoader; + private LSMIndexDiskComponentBulkLoader componentBulkLoader; private int currentComponentPos = -1; - private ILSMDiskComponent[] diskComponents; - public LSMSecondaryIndexBulkLoadNodePushable(IHyracksTaskContext ctx, int partition, RecordDescriptor inputRecDesc, IIndexDataflowHelperFactory primaryIndexHelperFactory, IIndexDataflowHelperFactory secondaryIndexHelperFactory, int[] fieldPermutation, int numTagFields, @@ -92,7 +85,6 @@ public class LSMSecondaryIndexBulkLoadNodePushable extends AbstractLSMSecondaryI super.open(); primaryIndexHelper.open(); primaryIndex = (ILSMIndex) primaryIndexHelper.getIndexInstance(); - diskComponents = new ILSMDiskComponent[primaryIndex.getDiskComponents().size()]; secondaryIndexHelper.open(); secondaryIndex = (ILSMIndex) secondaryIndexHelper.getIndexInstance(); @@ -107,8 +99,6 @@ public class LSMSecondaryIndexBulkLoadNodePushable extends AbstractLSMSecondaryI closeException = e; } - activateComponents(); - try { if (primaryIndexHelper != null) { primaryIndexHelper.close(); @@ -184,24 +174,22 @@ public class LSMSecondaryIndexBulkLoadNodePushable extends AbstractLSMSecondaryI } private void endCurrentComponent() throws HyracksDataException { - if (component != null) { - // set disk component id - + if (componentBulkLoader != null) { componentBulkLoader.end(); - diskComponents[currentComponentPos] = component; - componentBulkLoader = null; - component = null; } } private void loadNewComponent(int componentPos) throws HyracksDataException { endCurrentComponent(); - component = secondaryIndex.createBulkLoadTarget(); int numTuples = getNumDeletedTuples(componentPos); - componentBulkLoader = component.createBulkLoader(1.0f, false, numTuples, false, true, true); - + ILSMDiskComponent primaryComponent = primaryIndex.getDiskComponents().get(componentPos); + componentBulkLoader = + (LSMIndexDiskComponentBulkLoader) secondaryIndex.createBulkLoader(1.0f, false, numTuples, false); + ILSMDiskComponent diskComponent = componentBulkLoader.getComponent(); + // TODO move this piece of code to io operation callback + LSMComponentIdUtils.persist(primaryComponent.getId(), diskComponent.getMetadata()); } private void addAntiMatterTuple(ITupleReference tuple) throws HyracksDataException { @@ -220,30 +208,6 @@ public class LSMSecondaryIndexBulkLoadNodePushable extends AbstractLSMSecondaryI } - private void activateComponents() throws HyracksDataException { - List<ILSMDiskComponent> primaryComponents = primaryIndex.getDiskComponents(); - for (int i = diskComponents.length - 1; i >= 0; i--) { - // start from the oldest component to the newest component - if (diskComponents[i] != null && diskComponents[i].getComponentSize() > 0) { - secondaryIndex.getIOOperationCallback().afterOperation(LSMIOOperationType.FLUSH, null, - diskComponents[i]); - - // setting component id has to be place between afterOperation and addBulkLoadedComponent, - // since afterOperation would set a flush component id (but it's not invalid) - // and addBulkLoadedComponent would finalize the component - ILSMDiskComponentId primaryComponentId = primaryComponents.get(i).getComponentId(); - //set component id - diskComponents[i].getMetadata().put(ILSMDiskComponentId.COMPONENT_ID_MIN_KEY, - LongPointable.FACTORY.createPointable(primaryComponentId.getMinId())); - diskComponents[i].getMetadata().put(ILSMDiskComponentId.COMPONENT_ID_MAX_KEY, - LongPointable.FACTORY.createPointable(primaryComponentId.getMaxId())); - - ((AbstractLSMIndex) secondaryIndex).getLsmHarness().addBulkLoadedComponent(diskComponents[i]); - - } - } - } - private int getNumDeletedTuples(int componentPos) { DeletedTupleCounter counter = (DeletedTupleCounter) ctx.getStateObject(partition); return counter.get(componentPos); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/39390edc/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java index 5fc07ad..095159b 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java @@ -65,7 +65,7 @@ public class IndexBulkLoadOperatorNodePushable extends AbstractUnaryInputUnaryOu index = indexHelper.getIndexInstance(); try { writer.open(); - bulkLoader = index.createBulkLoader(fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex); + initializeBulkLoader(); } catch (Exception e) { throw HyracksDataException.create(e); } @@ -116,4 +116,8 @@ public class IndexBulkLoadOperatorNodePushable extends AbstractUnaryInputUnaryOu writer.fail(); } } + + protected void initializeBulkLoader() throws HyracksDataException { + bulkLoader = index.createBulkLoader(fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/39390edc/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeLocalResource.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeLocalResource.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeLocalResource.java index 6083637..a859f68 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeLocalResource.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeLocalResource.java @@ -57,6 +57,7 @@ public class ExternalBTreeLocalResource extends LSMBTreeLocalResource { public ILSMIndex createInstance(INCServiceContext serviceCtx) throws HyracksDataException { IIOManager ioManager = serviceCtx.getIoManager(); FileReference file = ioManager.resolve(path); + ioOpCallbackFactory.initialize(serviceCtx); return LSMBTreeUtil.createExternalBTree(ioManager, file, storageManager.getBufferCache(serviceCtx), typeTraits, cmpFactories, bloomFilterKeyFields, bloomFilterFalsePositiveRate, mergePolicyFactory.createMergePolicy(mergePolicyProperties, serviceCtx), http://git-wip-us.apache.org/repos/asf/asterixdb/blob/39390edc/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeWithBuddyLocalResource.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeWithBuddyLocalResource.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeWithBuddyLocalResource.java index 04b63f9..9422253 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeWithBuddyLocalResource.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeWithBuddyLocalResource.java @@ -60,6 +60,7 @@ public class ExternalBTreeWithBuddyLocalResource extends LSMBTreeLocalResource { public ILSMIndex createInstance(INCServiceContext serviceCtx) throws HyracksDataException { IIOManager ioManager = serviceCtx.getIoManager(); FileReference file = ioManager.resolve(path); + ioOpCallbackFactory.initialize(serviceCtx); return LSMBTreeUtil.createExternalBTreeWithBuddy(ioManager, file, storageManager.getBufferCache(serviceCtx), typeTraits, cmpFactories, bloomFilterFalsePositiveRate, mergePolicyFactory.createMergePolicy(mergePolicyProperties, serviceCtx), http://git-wip-us.apache.org/repos/asf/asterixdb/blob/39390edc/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResource.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResource.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResource.java index dfa88da..1988736 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResource.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResource.java @@ -70,6 +70,7 @@ public class LSMBTreeLocalResource extends LsmResource { IIOManager ioManager = serviceCtx.getIoManager(); FileReference file = ioManager.resolve(path); List<IVirtualBufferCache> vbcs = vbcProvider.getVirtualBufferCaches(serviceCtx, file); + ioOpCallbackFactory.initialize(serviceCtx); //TODO: enable updateAwareness for secondary LSMBTree indexes boolean updateAware = false; return LSMBTreeUtil.createLSMTree(ioManager, vbcs, file, storageManager.getBufferCache(serviceCtx), typeTraits, http://git-wip-us.apache.org/repos/asf/asterixdb/blob/39390edc/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponent.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponent.java index a8e707f..d759167 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponent.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponent.java @@ -24,11 +24,10 @@ import java.util.Set; import org.apache.hyracks.storage.am.btree.impls.BTree; import org.apache.hyracks.storage.am.common.api.IMetadataPageManager; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilter; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMDiskComponent; import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex; -public class LSMBTreeDiskComponent extends AbstractLSMDiskComponent implements ILSMDiskComponent { +public class LSMBTreeDiskComponent extends AbstractLSMDiskComponent { protected final BTree btree; public LSMBTreeDiskComponent(AbstractLSMIndex lsmIndex, BTree btree, ILSMComponentFilter filter) { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/39390edc/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponent.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponent.java index a60f544..ab8e899 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponent.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponent.java @@ -127,4 +127,11 @@ public interface ILSMComponent { * @return index data structure that is the stored in the component */ IIndex getIndex(); + + /** + * + * @return id of the component + * @throws HyracksDataException + */ + ILSMComponentId getId() throws HyracksDataException; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/39390edc/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentId.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentId.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentId.java new file mode 100644 index 0000000..5662862 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentId.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.storage.am.lsm.common.api; + +/** + * Stores the id of the disk component, which is a interval (minId, maxId). + * It is generated by {@link ILSMComponentIdGenerator} + * + */ +public interface ILSMComponentId { + public enum IdCompareResult { + UNKNOWN, + LESS_THAN, + GREATER_THAN, + INTERSECT, + INCLUDE + } + + /** + * @return whether the id is missing + */ + boolean missing(); + + IdCompareResult compareTo(ILSMComponentId id); +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/39390edc/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentIdGenerator.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentIdGenerator.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentIdGenerator.java new file mode 100644 index 0000000..5dd3061 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentIdGenerator.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.storage.am.lsm.common.api; + +/** + * This interface generates component Ids for LSM components (both memory and disk components). + */ +public interface ILSMComponentIdGenerator { + + /** + * @return An Id for LSM component + */ + public ILSMComponentId getId(); + + /** + * Refresh the component Id generator to generate the next Id. + * {@link #getId()} would always return the same Id before this method is called. + */ + public void refresh(); + +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/39390edc/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentIdGeneratorFactory.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentIdGeneratorFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentIdGeneratorFactory.java new file mode 100644 index 0000000..c0f530b --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentIdGeneratorFactory.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.storage.am.lsm.common.api; + +import java.io.Serializable; + +import org.apache.hyracks.api.application.INCServiceContext; + +@FunctionalInterface +public interface ILSMComponentIdGeneratorFactory extends Serializable { + + ILSMComponentIdGenerator getComponentIdGenerator(INCServiceContext serviceCtx); +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/39390edc/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponent.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponent.java index 43c5482..bd2bb45 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponent.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponent.java @@ -48,14 +48,6 @@ public interface ILSMDiskComponent extends ILSMComponent { int getFileReferenceCount(); /** - * Return the component Id of this disk component from its metadata - * - * @return - * @throws HyracksDataException - */ - ILSMDiskComponentId getComponentId() throws HyracksDataException; - - /** * @return LsmIndex of the component */ AbstractLSMIndex getLsmIndex(); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/39390edc/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponentId.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponentId.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponentId.java deleted file mode 100644 index 5d38ace..0000000 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponentId.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.hyracks.storage.am.lsm.common.api; - -import org.apache.hyracks.storage.am.common.freepage.MutableArrayValueReference; - -/** - * Stores the id of the disk component, which is a interval (minId, maxId). - * When a disk component is formed by the flush operation, its initial minId and maxId are the same, and - * currently are set as the flush LSN. - * When a disk component is formed by the merge operation, its [minId, maxId] is set as the union of - * all ids of merged disk components. - * - * @author luochen - * - */ -public interface ILSMDiskComponentId { - - public static final long NOT_FOUND = -1; - - public static final MutableArrayValueReference COMPONENT_ID_MIN_KEY = - new MutableArrayValueReference("Component_Id_Min".getBytes()); - - public static final MutableArrayValueReference COMPONENT_ID_MAX_KEY = - new MutableArrayValueReference("Component_Id_Max".getBytes()); - - long getMinId(); - - long getMaxId(); - - default boolean notFound() { - return getMinId() == NOT_FOUND || getMaxId() == NOT_FOUND; - } - -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/39390edc/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperationCallbackFactory.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperationCallbackFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperationCallbackFactory.java index b291f7c..a9dc50e 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperationCallbackFactory.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperationCallbackFactory.java @@ -20,7 +20,15 @@ package org.apache.hyracks.storage.am.lsm.common.api; import java.io.Serializable; -@FunctionalInterface +import org.apache.hyracks.api.application.INCServiceContext; + public interface ILSMIOOperationCallbackFactory extends Serializable { + /** + * Initialize the callback factory with the given ncCtx + * + * @param ncCtx + */ + void initialize(INCServiceContext ncCtx); + ILSMIOOperationCallback createIoOpCallback(ILSMIndex index); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/39390edc/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMMemoryComponent.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMMemoryComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMMemoryComponent.java index 13543e4..f892585 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMMemoryComponent.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMMemoryComponent.java @@ -101,4 +101,12 @@ public interface ILSMMemoryComponent extends ILSMComponent { * @return the size of the memory component */ long getSize(); + + /** + * Reset the component Id of the memory component after it's recycled + * + * @param newId + * @throws HyracksDataException + */ + void resetId(ILSMComponentId newId) throws HyracksDataException; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/39390edc/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java index a0d1c23..b664102 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java @@ -18,19 +18,29 @@ */ package org.apache.hyracks.storage.am.lsm.common.impls; +import java.util.logging.Logger; + import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.storage.am.common.api.IMetadataPageManager; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilter; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId; import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentId; import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType; import org.apache.hyracks.storage.am.lsm.common.util.ComponentUtils; +import org.apache.hyracks.storage.am.lsm.common.util.LSMComponentIdUtils; import org.apache.hyracks.storage.common.MultiComparator; public abstract class AbstractLSMDiskComponent extends AbstractLSMComponent implements ILSMDiskComponent { + private static final Logger LOGGER = Logger.getLogger(AbstractLSMDiskComponent.class.getName()); + private final DiskComponentMetadata metadata; + // a variable cache of componentId stored in metadata. + // since componentId is immutable, we do not want to read from metadata every time the componentId + // is requested. + private ILSMComponentId componentId; + public AbstractLSMDiskComponent(AbstractLSMIndex lsmIndex, IMetadataPageManager mdPageManager, ILSMComponentFilter filter) { super(lsmIndex, filter); @@ -109,13 +119,23 @@ public abstract class AbstractLSMDiskComponent extends AbstractLSMComponent impl } @Override - public ILSMDiskComponentId getComponentId() throws HyracksDataException { - long minID = ComponentUtils.getLong(metadata, ILSMDiskComponentId.COMPONENT_ID_MIN_KEY, - ILSMDiskComponentId.NOT_FOUND); - long maxID = ComponentUtils.getLong(metadata, ILSMDiskComponentId.COMPONENT_ID_MAX_KEY, - ILSMDiskComponentId.NOT_FOUND); - //TODO: do we need to throw an exception when ID is not found? - return new LSMDiskComponentId(minID, maxID); + public ILSMComponentId getId() throws HyracksDataException { + if (componentId != null) { + return componentId; + } + synchronized (this) { + if (componentId == null) { + componentId = LSMComponentIdUtils.readFrom(metadata); + } + } + if (componentId.missing()) { + // For normal datasets, componentId shouldn't be missing, since otherwise it'll be a bug. + // However, we cannot throw an exception here to be compatible with legacy datasets. + // In this case, the disk component would always get a garbage Id [-1, -1], which makes the + // component Id-based optimization useless but still correct. + LOGGER.warning("Component Id not found from disk component metadata"); + } + return componentId; } /** http://git-wip-us.apache.org/repos/asf/asterixdb/blob/39390edc/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java index 2b2fe0d..b0cc318 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java @@ -44,6 +44,8 @@ import org.apache.hyracks.storage.am.lsm.common.api.IComponentFilterHelper; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent.ComponentState; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilterFrameFactory; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId.IdCompareResult; import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentFactory; import org.apache.hyracks.storage.am.lsm.common.api.ILSMHarness; @@ -438,6 +440,7 @@ public abstract class AbstractLSMIndex implements ILSMIndex { if (c != EmptyComponent.INSTANCE) { diskComponents.add(0, c); } + assert checkComponentIds(); } @Override @@ -448,6 +451,25 @@ public abstract class AbstractLSMIndex implements ILSMIndex { if (newComponent != EmptyComponent.INSTANCE) { diskComponents.add(swapIndex, newComponent); } + assert checkComponentIds(); + } + + /** + * A helper method to ensure disk components have proper Ids (non-decreasing) + * We may get rid of this method once component Id is stablized + * + * @throws HyracksDataException + */ + private boolean checkComponentIds() throws HyracksDataException { + for (int i = 0; i < diskComponents.size() - 1; i++) { + ILSMComponentId id1 = diskComponents.get(i).getId(); + ILSMComponentId id2 = diskComponents.get(i + 1).getId(); + IdCompareResult cmp = id1.compareTo(id2); + if (cmp != IdCompareResult.UNKNOWN && cmp != IdCompareResult.GREATER_THAN) { + return false; + } + } + return true; } @Override http://git-wip-us.apache.org/repos/asf/asterixdb/blob/39390edc/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java index b7c3350..0378aae 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java @@ -22,9 +22,12 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilter; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId.IdCompareResult; import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent; import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache; import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType; +import org.apache.hyracks.storage.am.lsm.common.util.LSMComponentIdUtils; import org.apache.hyracks.storage.common.buffercache.IBufferCache; public abstract class AbstractLSMMemoryComponent extends AbstractLSMComponent implements ILSMMemoryComponent { @@ -34,6 +37,7 @@ public abstract class AbstractLSMMemoryComponent extends AbstractLSMComponent im private int writerCount; private boolean requestedToBeActive; private final MemoryComponentMetadata metadata; + private ILSMComponentId componentId; public AbstractLSMMemoryComponent(AbstractLSMIndex lsmIndex, IVirtualBufferCache vbc, boolean isActive, ILSMComponentFilter filter) { @@ -247,6 +251,7 @@ public abstract class AbstractLSMMemoryComponent extends AbstractLSMComponent im protected void doDeallocate() throws HyracksDataException { getIndex().deactivate(); getIndex().destroy(); + componentId = null; } @Override @@ -259,4 +264,19 @@ public abstract class AbstractLSMMemoryComponent extends AbstractLSMComponent im IBufferCache virtualBufferCache = getIndex().getBufferCache(); return virtualBufferCache.getPageBudget() * (long) virtualBufferCache.getPageSize(); } + + @Override + public ILSMComponentId getId() { + return componentId; + } + + @Override + public void resetId(ILSMComponentId componentId) throws HyracksDataException { + if (this.componentId != null && this.componentId.compareTo(componentId) != IdCompareResult.LESS_THAN) { + throw new IllegalStateException( + "LSM memory component receives illegal id. Old id " + this.componentId + ", new id " + componentId); + } + this.componentId = componentId; + LSMComponentIdUtils.persist(this.componentId, metadata); + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/39390edc/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/EmptyComponent.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/EmptyComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/EmptyComponent.java index f2751bf..e3ca9f1 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/EmptyComponent.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/EmptyComponent.java @@ -25,8 +25,8 @@ import org.apache.hyracks.api.exceptions.ErrorCode; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.storage.am.common.api.ITreeIndex; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilter; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId; import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentId; import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType; import org.apache.hyracks.storage.common.IIndex; @@ -83,8 +83,8 @@ public class EmptyComponent implements ILSMDiskComponent { } @Override - public ILSMDiskComponentId getComponentId() throws HyracksDataException { - return null; + public ILSMComponentId getId() { + return LSMComponentId.MISSING_COMPONENT_ID; } @Override http://git-wip-us.apache.org/repos/asf/asterixdb/blob/39390edc/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentId.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentId.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentId.java new file mode 100644 index 0000000..dd86f65 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentId.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hyracks.storage.am.lsm.common.impls; + +import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId; + +public class LSMComponentId implements ILSMComponentId { + + public static final long NOT_FOUND = -1; + + // Use to handle legacy datasets which do not have the component Id + public static final ILSMComponentId MISSING_COMPONENT_ID = new LSMComponentId(NOT_FOUND, NOT_FOUND); + + // A default component id used for bulk loaded component + public static final ILSMComponentId DEFAULT_COMPONENT_ID = new LSMComponentId(0, 0); + + private long minId; + + private long maxId; + + public LSMComponentId(long minId, long maxId) { + assert minId <= maxId; + this.minId = minId; + this.maxId = maxId; + } + + public void reset(long minId, long maxId) { + this.minId = minId; + this.maxId = maxId; + } + + public long getMinId() { + return this.minId; + } + + public long getMaxId() { + return this.maxId; + } + + @Override + public boolean missing() { + return minId == NOT_FOUND || maxId == NOT_FOUND; + } + + @Override + public String toString() { + return "[" + minId + "," + maxId + "]"; + } + + @Override + public int hashCode() { + return 31 * Long.hashCode(minId) + Long.hashCode(maxId); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (!(obj instanceof LSMComponentId)) { + return false; + } + LSMComponentId other = (LSMComponentId) obj; + if (maxId != other.maxId) { + return false; + } + if (minId != other.minId) { + return false; + } + return true; + } + + @Override + public IdCompareResult compareTo(ILSMComponentId id) { + if (this.missing() || id == null || id.missing()) { + return IdCompareResult.UNKNOWN; + } + LSMComponentId componentId = (LSMComponentId) id; + if (this.getMinId() > componentId.getMaxId()) { + return IdCompareResult.GREATER_THAN; + } else if (this.getMaxId() < componentId.getMinId()) { + return IdCompareResult.LESS_THAN; + } else if (this.getMinId() <= componentId.getMinId() && this.getMaxId() >= componentId.getMaxId()) { + return IdCompareResult.INCLUDE; + } else { + return IdCompareResult.INTERSECT; + } + } + +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/39390edc/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentIdGenerator.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentIdGenerator.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentIdGenerator.java new file mode 100644 index 0000000..e174153 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentIdGenerator.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hyracks.storage.am.lsm.common.impls; + +import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator; + +/** + * A default implementation of {@link ILSMComponentIdGenerator}. + * + */ +public class LSMComponentIdGenerator implements ILSMComponentIdGenerator { + + protected long previousTimestamp = -1L; + + private ILSMComponentId componentId; + + public LSMComponentIdGenerator() { + refresh(); + } + + @Override + public void refresh() { + long ts = getCurrentTimestamp(); + componentId = new LSMComponentId(ts, ts); + } + + @Override + public ILSMComponentId getId() { + return componentId; + } + + protected long getCurrentTimestamp() { + long timestamp = System.currentTimeMillis(); + while (timestamp <= previousTimestamp) { + // make sure timestamp is strictly increasing + try { + Thread.sleep(1); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + timestamp = System.currentTimeMillis(); + } + previousTimestamp = timestamp; + return timestamp; + + } + +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/39390edc/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentIdGeneratorFactory.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentIdGeneratorFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentIdGeneratorFactory.java new file mode 100644 index 0000000..c55ef19 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentIdGeneratorFactory.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hyracks.storage.am.lsm.common.impls; + +import org.apache.hyracks.api.application.INCServiceContext; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGeneratorFactory; + +/** + * A default implementation of {@link ILSMComponentIdGeneratorFactory}. + * + */ +public class LSMComponentIdGeneratorFactory implements ILSMComponentIdGeneratorFactory { + + @Override + public ILSMComponentIdGenerator getComponentIdGenerator(INCServiceContext serviceCtx) { + return new LSMComponentIdGenerator(); + } + +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/39390edc/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMDiskComponentId.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMDiskComponentId.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMDiskComponentId.java deleted file mode 100644 index f448c84..0000000 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMDiskComponentId.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.hyracks.storage.am.lsm.common.impls; - -import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentId; - -public class LSMDiskComponentId implements ILSMDiskComponentId { - - private final long minId; - - private final long maxId; - - public LSMDiskComponentId(long minId, long maxId) { - this.minId = minId; - this.maxId = maxId; - } - - @Override - public long getMinId() { - return this.minId; - } - - @Override - public long getMaxId() { - return this.maxId; - } - - @Override - public String toString() { - return "[" + minId + "," + maxId + "]"; - } - - @Override - public int hashCode() { - return 31 * Long.hashCode(minId) + Long.hashCode(maxId); - } - - @Override - public boolean equals(Object obj) { - if (this == obj) { - return true; - } - if (!(obj instanceof LSMDiskComponentId)) { - return false; - } - LSMDiskComponentId other = (LSMDiskComponentId) obj; - if (maxId != other.maxId) { - return false; - } - if (minId != other.minId) { - return false; - } - return true; - } - -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/39390edc/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java index 48b6d8f..b0abeb1 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java @@ -95,15 +95,24 @@ public class LSMHarness implements ILSMHarness { // Before entering the components, prune those corner cases that indeed should not proceed. switch (opType) { case FLUSH: + // if the lsm index does not have memory components allocated, then nothing to flush + if (!lsmIndex.isMemoryComponentsAllocated()) { + return false; + } ILSMMemoryComponent flushingComponent = (ILSMMemoryComponent) ctx.getComponentHolder().get(0); if (!flushingComponent.isModified()) { - //The mutable component has not been modified by any writer. There is nothing to flush. - //since the component is empty, set its state back to READABLE_WRITABLE if (flushingComponent.getState() == ComponentState.READABLE_UNWRITABLE) { + //The mutable component has not been modified by any writer. There is nothing to flush. + //since the component is empty, set its state back to READABLE_WRITABLE only when it's + //state has been set to READABLE_UNWRITABLE flushingComponent.setState(ComponentState.READABLE_WRITABLE); opTracker.notifyAll(); + + // Call recycled only when we change it's state is reset back to READABLE_WRITABLE + // Otherwise, if the component is in other state, e.g., INACTIVE, or + // READABLE_UNWRITABLE_FLUSHING, it's not considered as being recycled here. + lsmIndex.getIOOperationCallback().recycled(flushingComponent); } - lsmIndex.getIOOperationCallback().recycled(flushingComponent); return false; } if (flushingComponent.getWriterCount() > 0) { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/39390edc/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexDiskComponentBulkLoader.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexDiskComponentBulkLoader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexDiskComponentBulkLoader.java index 08b8bb6..000d5cf 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexDiskComponentBulkLoader.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexDiskComponentBulkLoader.java @@ -21,13 +21,14 @@ package org.apache.hyracks.storage.am.lsm.common.impls; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentBulkLoader; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType; import org.apache.hyracks.storage.common.IIndexBulkLoader; public class LSMIndexDiskComponentBulkLoader implements IIndexBulkLoader { private final AbstractLSMIndex lsmIndex; private final ILSMDiskComponent component; - private final IIndexBulkLoader componentBulkLoader; + private final ILSMDiskComponentBulkLoader componentBulkLoader; public LSMIndexDiskComponentBulkLoader(AbstractLSMIndex lsmIndex, float fillFactor, boolean verifyInput, long numElementsHint) throws HyracksDataException { @@ -39,11 +40,19 @@ public class LSMIndexDiskComponentBulkLoader implements IIndexBulkLoader { component.createBulkLoader(fillFactor, verifyInput, numElementsHint, false, true, true); } + public ILSMDiskComponent getComponent() { + return component; + } + @Override public void add(ITupleReference tuple) throws HyracksDataException { componentBulkLoader.add(tuple); } + public void delete(ITupleReference tuple) throws HyracksDataException { + componentBulkLoader.delete(tuple); + } + @Override public void end() throws HyracksDataException { try { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/39390edc/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoOpIOOperationCallbackFactory.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoOpIOOperationCallbackFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoOpIOOperationCallbackFactory.java index 09ca553..21d10d7 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoOpIOOperationCallbackFactory.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoOpIOOperationCallbackFactory.java @@ -20,6 +20,7 @@ package org.apache.hyracks.storage.am.lsm.common.impls; import java.util.List; +import org.apache.hyracks.api.application.INCServiceContext; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent; import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; @@ -37,6 +38,11 @@ public enum NoOpIOOperationCallbackFactory implements ILSMIOOperationCallbackFac return NoOpIOOperationCallback.INSTANCE; } + @Override + public void initialize(INCServiceContext ncCtx) { + // No op + } + public static class NoOpIOOperationCallback implements ILSMIOOperationCallback { private static final NoOpIOOperationCallback INSTANCE = new NoOpIOOperationCallback();
