Repository: asterixdb Updated Branches: refs/heads/master 86cbec537 -> a7fa05bb0
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a7fa05bb/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadNodePushable.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadNodePushable.java new file mode 100644 index 0000000..f09bcd2 --- /dev/null +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadNodePushable.java @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.runtime.operators; + +import java.nio.ByteBuffer; +import java.util.List; + +import org.apache.asterix.runtime.operators.LSMSecondaryIndexCreationTupleProcessorNodePushable.DeletedTupleCounter; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.dataflow.value.RecordDescriptor; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.data.std.primitive.LongPointable; +import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; +import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper; +import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory; +import org.apache.hyracks.storage.am.common.tuples.PermutingTupleReference; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentBulkLoader; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentId; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; +import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType; +import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex; + +/** + * This operator node is used to bulk load incoming tuples (scanned from the primary index) + * into multiple disk components of the secondary index. + * Incoming tuple format: + * [component pos, anti-matter flag, secondary keys, primary keys, filter values] + */ +public class LSMSecondaryIndexBulkLoadNodePushable extends AbstractLSMSecondaryIndexCreationNodePushable { + // with tag fields + + private final PermutingTupleReference sourceTuple; + private final PermutingTupleReference deletedKeyTuple; + + private final IIndexDataflowHelper primaryIndexHelper; + private final IIndexDataflowHelper secondaryIndexHelper; + + private ILSMIndex primaryIndex; + private ILSMIndex secondaryIndex; + + private ILSMDiskComponent component; + private ILSMDiskComponentBulkLoader componentBulkLoader; + private int currentComponentPos = -1; + + private ILSMDiskComponent[] diskComponents; + + public LSMSecondaryIndexBulkLoadNodePushable(IHyracksTaskContext ctx, int partition, RecordDescriptor inputRecDesc, + IIndexDataflowHelperFactory primaryIndexHelperFactory, + IIndexDataflowHelperFactory secondaryIndexHelperFactory, int numTagFields, int numSecondaryKeys, + int numPrimaryKeys, boolean hasBuddyBTree) throws HyracksDataException { + super(ctx, partition, inputRecDesc, numTagFields, numSecondaryKeys, numPrimaryKeys, hasBuddyBTree); + + this.primaryIndexHelper = + primaryIndexHelperFactory.create(ctx.getJobletContext().getServiceContext(), partition); + this.secondaryIndexHelper = + secondaryIndexHelperFactory.create(ctx.getJobletContext().getServiceContext(), partition); + + int[] sourcePermutation = new int[inputRecDesc.getFieldCount() - numTagFields]; + for (int i = 0; i < sourcePermutation.length; i++) { + sourcePermutation[i] = i + numTagFields; + } + sourceTuple = new PermutingTupleReference(sourcePermutation); + + int[] deletedKeyPermutation = new int[inputRecDesc.getFieldCount() - numTagFields - numSecondaryKeys]; + for (int i = 0; i < deletedKeyPermutation.length; i++) { + deletedKeyPermutation[i] = i + numTagFields + numSecondaryKeys; + } + deletedKeyTuple = new PermutingTupleReference(deletedKeyPermutation); + } + + @Override + public void open() throws HyracksDataException { + super.open(); + primaryIndexHelper.open(); + primaryIndex = (ILSMIndex) primaryIndexHelper.getIndexInstance(); + diskComponents = new ILSMDiskComponent[primaryIndex.getImmutableComponents().size()]; + secondaryIndexHelper.open(); + secondaryIndex = (ILSMIndex) secondaryIndexHelper.getIndexInstance(); + + } + + @Override + public void close() throws HyracksDataException { + HyracksDataException closeException = null; + try { + endCurrentComponent(); + } catch (HyracksDataException e) { + closeException = e; + } + + activateComponents(); + + try { + if (primaryIndexHelper != null) { + primaryIndexHelper.close(); + } + } catch (HyracksDataException e) { + if (closeException == null) { + closeException = e; + } else { + closeException.addSuppressed(e); + } + } + + try { + if (secondaryIndexHelper != null) { + secondaryIndexHelper.close(); + } + } catch (HyracksDataException e) { + if (closeException == null) { + closeException = e; + } else { + closeException.addSuppressed(e); + } + } + + try { + // will definitely be called regardless of exceptions + writer.close(); + } catch (HyracksDataException th) { + if (closeException == null) { + closeException = th; + } else { + closeException.addSuppressed(th); + } + } + + if (closeException != null) { + throw closeException; + } + } + + @Override + public void flush() throws HyracksDataException { + writer.flush(); + } + + @Override + public void fail() throws HyracksDataException { + writer.fail(); + } + + @Override + public void nextFrame(ByteBuffer buffer) throws HyracksDataException { + accessor.reset(buffer); + int tupleCount = accessor.getTupleCount(); + for (int i = 0; i < tupleCount; i++) { + try { + // if both previous value and new value are null, then we skip + tuple.reset(accessor, i); + int componentPos = getComponentPos(tuple); + if (componentPos != currentComponentPos) { + loadNewComponent(componentPos); + currentComponentPos = componentPos; + } + + if (isAntiMatterTuple(tuple)) { + addAntiMatterTuple(tuple); + } else { + addMatterTuple(tuple); + } + } catch (Exception e) { + throw HyracksDataException.create(e); + } + } + } + + private void endCurrentComponent() throws HyracksDataException { + if (component != null) { + // set disk component id + + componentBulkLoader.end(); + diskComponents[currentComponentPos] = component; + + componentBulkLoader = null; + component = null; + } + } + + private void loadNewComponent(int componentPos) throws HyracksDataException { + endCurrentComponent(); + + component = secondaryIndex.createBulkLoadTarget(); + componentBulkLoader = secondaryIndex.createComponentBulkLoader(component, 1.0f, false, + getNumDeletedTuples(componentPos), false, true, true); + + } + + private void addAntiMatterTuple(ITupleReference tuple) throws HyracksDataException { + if (hasBuddyBTree) { + deletedKeyTuple.reset(tuple); + componentBulkLoader.delete(deletedKeyTuple); + } else { + sourceTuple.reset(tuple); + componentBulkLoader.delete(sourceTuple); + } + } + + private void addMatterTuple(ITupleReference tuple) throws HyracksDataException { + sourceTuple.reset(tuple); + componentBulkLoader.add(sourceTuple); + + } + + private void activateComponents() throws HyracksDataException { + List<ILSMDiskComponent> primaryComponents = primaryIndex.getImmutableComponents(); + for (int i = diskComponents.length - 1; i >= 0; i--) { + // start from the oldest component to the newest component + if (diskComponents[i] != null && diskComponents[i].getComponentSize() > 0) { + secondaryIndex.getIOOperationCallback().afterOperation(LSMOperationType.FLUSH, null, diskComponents[i]); + + // setting component id has to be place between afterOperation and addBulkLoadedComponent, + // since afterOperation would set a flush component id (but it's not invalid) + // and addBulkLoadedComponent would finalize the component + ILSMDiskComponentId primaryComponentId = primaryComponents.get(i).getComponentId(); + //set component id + diskComponents[i].getMetadata().put(ILSMDiskComponentId.COMPONENT_ID_MIN_KEY, + LongPointable.FACTORY.createPointable(primaryComponentId.getMinId())); + diskComponents[i].getMetadata().put(ILSMDiskComponentId.COMPONENT_ID_MAX_KEY, + LongPointable.FACTORY.createPointable(primaryComponentId.getMaxId())); + + ((AbstractLSMIndex) secondaryIndex).getLsmHarness().addBulkLoadedComponent(diskComponents[i]); + + } + } + } + + private int getNumDeletedTuples(int componentPos) { + DeletedTupleCounter counter = (DeletedTupleCounter) ctx.getStateObject(partition); + return counter.get(componentPos); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a7fa05bb/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadOperatorDescriptor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadOperatorDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadOperatorDescriptor.java new file mode 100644 index 0000000..804b700 --- /dev/null +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadOperatorDescriptor.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.asterix.runtime.operators; + +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.dataflow.IOperatorNodePushable; +import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider; +import org.apache.hyracks.api.dataflow.value.RecordDescriptor; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.job.IOperatorDescriptorRegistry; +import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor; +import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory; + +public class LSMSecondaryIndexBulkLoadOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor { + + private static final long serialVersionUID = 1L; + + private final IIndexDataflowHelperFactory primaryIndexHelperFactory; + private final IIndexDataflowHelperFactory secondaryIndexHelperFactory; + + private final int numTagFields; + private final int numSecondaryKeys; + private final int numPrimaryKeys; + + private final boolean hasBuddyBTree; + + public LSMSecondaryIndexBulkLoadOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor outRecDesc, + IIndexDataflowHelperFactory primaryIndexHelperFactory, + IIndexDataflowHelperFactory secondaryIndexHelperFactory, int numTagFields, int numSecondaryKeys, + int numPrimaryKeys, boolean hasBuddyBTree) { + super(spec, 1, 1); + this.outRecDescs[0] = outRecDesc; + this.primaryIndexHelperFactory = primaryIndexHelperFactory; + this.secondaryIndexHelperFactory = secondaryIndexHelperFactory; + this.numTagFields = numTagFields; + this.numSecondaryKeys = numSecondaryKeys; + this.numPrimaryKeys = numPrimaryKeys; + this.hasBuddyBTree = hasBuddyBTree; + + } + + @Override + public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx, + IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException { + return new LSMSecondaryIndexBulkLoadNodePushable(ctx, partition, + recordDescProvider.getInputRecordDescriptor(getActivityId(), 0), primaryIndexHelperFactory, + secondaryIndexHelperFactory, numTagFields, numSecondaryKeys, numPrimaryKeys, hasBuddyBTree); + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a7fa05bb/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexCreationTupleProcessorNodePushable.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexCreationTupleProcessorNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexCreationTupleProcessorNodePushable.java new file mode 100644 index 0000000..b2a2fa4 --- /dev/null +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexCreationTupleProcessorNodePushable.java @@ -0,0 +1,298 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.runtime.operators; + +import java.io.DataOutput; +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; + +import org.apache.asterix.om.types.ATypeTag; +import org.apache.asterix.om.types.TypeTagUtil; +import org.apache.hyracks.api.comm.VSizeFrame; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.dataflow.value.IMissingWriter; +import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory; +import org.apache.hyracks.api.dataflow.value.RecordDescriptor; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.job.JobId; +import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder; +import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleReference; +import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender; +import org.apache.hyracks.dataflow.common.comm.util.FrameUtils; +import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; +import org.apache.hyracks.dataflow.common.data.marshalling.BooleanSerializerDeserializer; +import org.apache.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer; +import org.apache.hyracks.dataflow.common.utils.TupleUtils; +import org.apache.hyracks.dataflow.std.base.AbstractStateObject; + +/** + * This operator node is used for post-processing tuples scanned from the primary index + * when creating a new secondary index. + * The incoming tuples could be either a matter-tuple or an anti-matter tuple. + * For an anti-matter tuple, it outputs a corresponding anti-matter tuple + * (if the secondary index has a buddy btree, then the result tuple only has primary key; + * otherwise, the result tuple has both secondary keys and primary key) + * For each matter-tuple (here old secondary keys refer to the secondary keys of the previous + * matter tuple with the same primary key) + * -If old secondary keys == new secondary keys + * --If with buddy btree + * ---generate a deleted-tuple + * --generate a new key + * -else + * --If old secondary keys are null? + * ---do nothing + * --else + * ---delete old secondary keys + * --If new keys are null? + * ---do nothing + * --else + * ---insert new keys + * + * Incoming tuple format: + * [component pos, anti-matter flag, secondary keys, primary keys, filter values] + */ +public class LSMSecondaryIndexCreationTupleProcessorNodePushable extends AbstractLSMSecondaryIndexCreationNodePushable { + // prevSourceTuple stores the previous matter tuple + private final ArrayTupleBuilder prevMatterTupleBuilder; + private final ArrayTupleReference prevMatterTuple = new ArrayTupleReference(); + + private boolean hasPrevMatterTuple; + + private FrameTupleAppender appender; + private ArrayTupleBuilder tb; + private DataOutput dos; + private final IMissingWriter missingWriter; + private DeletedTupleCounter deletedTupleCounter; + + public static class DeletedTupleCounter extends AbstractStateObject { + private final Map<Integer, Integer> map = new HashMap<>(); + + public DeletedTupleCounter(JobId jobId, int partition) { + super(jobId, partition); + } + + public int get(int componentPos) { + Integer value = map.get(componentPos); + return value != null ? value : 0; + } + + public void inc(int componentPos) { + map.put(componentPos, get(componentPos) + 1); + } + } + + public LSMSecondaryIndexCreationTupleProcessorNodePushable(IHyracksTaskContext ctx, int partition, + RecordDescriptor inputRecDesc, IMissingWriterFactory missingWriterFactory, int numTagFields, + int numSecondaryKeys, int numPrimaryKeys, boolean hasBuddyBTree) throws HyracksDataException { + + super(ctx, partition, inputRecDesc, numTagFields, numSecondaryKeys, numPrimaryKeys, hasBuddyBTree); + + this.prevMatterTupleBuilder = new ArrayTupleBuilder(inputRecDesc.getFieldCount()); + + if (this.hasBuddyBTree) { + missingWriter = missingWriterFactory.createMissingWriter(); + } else { + missingWriter = null; + } + + } + + @Override + public void open() throws HyracksDataException { + super.open(); + deletedTupleCounter = new DeletedTupleCounter(ctx.getJobletContext().getJobId(), partition); + ctx.setStateObject(deletedTupleCounter); + try { + tb = new ArrayTupleBuilder(recordDesc.getFieldCount()); + dos = tb.getDataOutput(); + appender = new FrameTupleAppender(new VSizeFrame(ctx), true); + } catch (Exception e) { + throw HyracksDataException.create(e); + } + } + + @Override + public void close() throws HyracksDataException { + HyracksDataException closeException = null; + try { + if (appender.getTupleCount() > 0) { + appender.write(writer, true); + } + } catch (HyracksDataException e) { + closeException = e; + } + try { + // will definitely be called regardless of exceptions + writer.close(); + } catch (HyracksDataException th) { + if (closeException == null) { + closeException = th; + } else { + closeException.addSuppressed(th); + } + } + if (closeException != null) { + throw closeException; + } + } + + @Override + public void flush() throws HyracksDataException { + appender.flush(writer); + } + + @Override + public void fail() throws HyracksDataException { + writer.fail(); + } + + @Override + public void nextFrame(ByteBuffer buffer) throws HyracksDataException { + accessor.reset(buffer); + int tupleCount = accessor.getTupleCount(); + for (int i = 0; i < tupleCount; i++) { + try { + // if both previous value and new value are null, then we skip + tuple.reset(accessor, i); + if (isAntiMatterTuple(tuple)) { + processAntiMatterTuple(tuple); + hasPrevMatterTuple = false; + } else { + processMatterTuple(tuple); + // save the matter tuple + TupleUtils.copyTuple(prevMatterTupleBuilder, tuple, recordDesc.getFieldCount()); + prevMatterTuple.reset(prevMatterTupleBuilder.getFieldEndOffsets(), + prevMatterTupleBuilder.getByteArray()); + hasPrevMatterTuple = true; + } + } catch (Exception e) { + throw HyracksDataException.create(e); + } + } + } + + private void processMatterTuple(ITupleReference tuple) throws HyracksDataException { + + boolean isNewValueMissing = isSecondaryKeyMissing(tuple); + boolean isOldValueMissing = !hasPrevMatterTuple || !equalPrimaryKeys(tuple, prevMatterTuple) + || isSecondaryKeyMissing(prevMatterTuple); + + if (isNewValueMissing && isOldValueMissing) { + // if both values are missing, then do nothing + return; + } + // At least one is not null + if (!isOldValueMissing && equalSecondaryKeys(prevMatterTuple, tuple)) { + if (hasBuddyBTree) { + // if the index has buddy btree, then we have to delete the index entry + // from the older disk components + writeAntiMatterTuple(prevMatterTuple, getComponentPos(tuple)); + } + // we need to write the new tuple anyway + writeMatterTuple(tuple); + return; + } + if (!isOldValueMissing) { + // we need to delete the previous entry + writeAntiMatterTuple(prevMatterTuple, getComponentPos(tuple)); + } + if (!isNewValueMissing) { + // we need to insert the new entry + writeMatterTuple(tuple); + } + + } + + private void processAntiMatterTuple(ITupleReference tuple) throws HyracksDataException { + boolean isNewValueMissing = isSecondaryKeyMissing(tuple); + // if the secondary value is missing (which means the secondary value of the previous matter tuple + // is also missing), we then simply ignore this tuple since there is nothing to delete + if (!isNewValueMissing) { + writeAntiMatterTuple(tuple, getComponentPos(tuple)); + } + } + + private void writeAntiMatterTuple(ITupleReference tuple, int componentPos) throws HyracksDataException { + deletedTupleCounter.inc(componentPos); + tb.reset(); + // write tag fields + tb.addField(IntegerSerializerDeserializer.INSTANCE, componentPos); + tb.addField(BooleanSerializerDeserializer.INSTANCE, true); + if (hasBuddyBTree) { + // the output tuple does not have secondary keys (only primary keys + filter values) + // write secondary keys (missing) + for (int i = 0; i < numSecondaryKeys; i++) { + missingWriter.writeMissing(dos); + tb.addFieldEndOffset(); + } + } else { + for (int i = numTagFields; i < numTagFields + numSecondaryKeys; i++) { + tb.addField(tuple.getFieldData(i), tuple.getFieldStart(i), tuple.getFieldLength(i)); + } + } + + // write all remaining fields + for (int i = numTagFields + numSecondaryKeys; i < recordDesc.getFieldCount(); i++) { + tb.addField(tuple.getFieldData(i), tuple.getFieldStart(i), tuple.getFieldLength(i)); + } + + FrameUtils.appendToWriter(writer, appender, tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize()); + } + + private void writeMatterTuple(ITupleReference tuple) throws HyracksDataException { + // simply output the original tuple to the writer + TupleUtils.copyTuple(tb, tuple, recordDesc.getFieldCount()); + FrameUtils.appendToWriter(writer, appender, tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize()); + } + + private boolean isSecondaryKeyMissing(ITupleReference tuple) { + for (int i = numTagFields; i < numTagFields + numSecondaryKeys; i++) { + if (TypeTagUtil.isType(tuple, i, ATypeTag.SERIALIZED_MISSING_TYPE_TAG) + || TypeTagUtil.isType(tuple, i, ATypeTag.SERIALIZED_NULL_TYPE_TAG)) { + return true; + } + } + return false; + } + + private boolean equalPrimaryKeys(ITupleReference tuple1, ITupleReference tuple2) { + for (int i = numTagFields + numSecondaryKeys; i < numTagFields + numPrimaryKeys + numSecondaryKeys; i++) { + if (!equalField(tuple1, tuple2, i)) { + return false; + } + } + return true; + } + + private boolean equalSecondaryKeys(ITupleReference tuple1, ITupleReference tuple2) { + for (int i = numTagFields; i < numTagFields + numSecondaryKeys; i++) { + if (!equalField(tuple1, tuple2, i)) { + return false; + } + } + return true; + } + + private boolean equalField(ITupleReference tuple1, ITupleReference tuple2, int fIdx) { + return LSMSecondaryUpsertOperatorNodePushable.equals(tuple1.getFieldData(fIdx), tuple1.getFieldStart(fIdx), + tuple1.getFieldLength(fIdx), tuple2.getFieldData(fIdx), tuple2.getFieldStart(fIdx), + tuple2.getFieldLength(fIdx)); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a7fa05bb/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexCreationTupleProcessorOperatorDescriptor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexCreationTupleProcessorOperatorDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexCreationTupleProcessorOperatorDescriptor.java new file mode 100644 index 0000000..68e7603 --- /dev/null +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexCreationTupleProcessorOperatorDescriptor.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.asterix.runtime.operators; + +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.dataflow.IOperatorNodePushable; +import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory; +import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider; +import org.apache.hyracks.api.dataflow.value.RecordDescriptor; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.job.IOperatorDescriptorRegistry; +import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor; + +public class LSMSecondaryIndexCreationTupleProcessorOperatorDescriptor + extends AbstractSingleActivityOperatorDescriptor { + + private static final long serialVersionUID = 1L; + + private final IMissingWriterFactory missingWriterFactory; + + private final int numTagFields; + private final int numSecondaryKeys; + private final int numPrimaryKeys; + + private final boolean hasBuddyBTree; + + public LSMSecondaryIndexCreationTupleProcessorOperatorDescriptor(IOperatorDescriptorRegistry spec, + RecordDescriptor outRecDesc, IMissingWriterFactory missingWriterFactory, int numTagFields, + int numSecondaryKeys, int numPrimaryKeys, boolean hasBuddyBTree) { + super(spec, 1, 1); + this.outRecDescs[0] = outRecDesc; + this.missingWriterFactory = missingWriterFactory; + this.numTagFields = numTagFields; + this.numSecondaryKeys = numSecondaryKeys; + this.numPrimaryKeys = numPrimaryKeys; + this.hasBuddyBTree = hasBuddyBTree; + } + + @Override + public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx, + IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException { + return new LSMSecondaryIndexCreationTupleProcessorNodePushable(ctx, partition, + recordDescProvider.getInputRecordDescriptor(getActivityId(), 0), missingWriterFactory, numTagFields, + numSecondaryKeys, numPrimaryKeys, hasBuddyBTree); + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a7fa05bb/asterixdb/asterix-runtime/src/test/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexCreationTupleProcessorTest.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/test/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexCreationTupleProcessorTest.java b/asterixdb/asterix-runtime/src/test/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexCreationTupleProcessorTest.java new file mode 100644 index 0000000..1db124a --- /dev/null +++ b/asterixdb/asterix-runtime/src/test/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexCreationTupleProcessorTest.java @@ -0,0 +1,343 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.asterix.runtime.operators; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +import org.apache.asterix.dataflow.data.nontagged.MissingWriterFactory; +import org.apache.asterix.om.types.ATypeTag; +import org.apache.asterix.om.types.TypeTagUtil; +import org.apache.asterix.runtime.operators.LSMSecondaryIndexCreationTupleProcessorNodePushable.DeletedTupleCounter; +import org.apache.hyracks.api.comm.IFrame; +import org.apache.hyracks.api.comm.IFrameWriter; +import org.apache.hyracks.api.comm.VSizeFrame; +import org.apache.hyracks.api.context.IHyracksJobletContext; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.dataflow.state.IStateObject; +import org.apache.hyracks.api.dataflow.value.IMissingWriter; +import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer; +import org.apache.hyracks.api.dataflow.value.RecordDescriptor; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.data.std.primitive.BooleanPointable; +import org.apache.hyracks.data.std.primitive.IntegerPointable; +import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder; +import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleReference; +import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor; +import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender; +import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference; +import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; +import org.apache.hyracks.dataflow.common.data.marshalling.BooleanSerializerDeserializer; +import org.apache.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer; +import org.apache.hyracks.dataflow.common.utils.TupleUtils; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +public class LSMSecondaryIndexCreationTupleProcessorTest { + + private static final int FRAME_SIZE = 512; + + private final ISerializerDeserializer[] fields = + { IntegerSerializerDeserializer.INSTANCE, BooleanSerializerDeserializer.INSTANCE, + IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE }; + + private final RecordDescriptor recDesc = new RecordDescriptor(fields); + + private final int numTagFields = 2; + + private final int numSecondaryKeys = 1; + + private final int numPrimaryKeys = 1; + + private class ResultFrameWriter implements IFrameWriter { + FrameTupleAccessor resultAccessor = new FrameTupleAccessor(recDesc); + + FrameTupleReference tuple = new FrameTupleReference(); + final List<ITupleReference> resultTuples; + + public ResultFrameWriter(List<ITupleReference> resultTuples) { + this.resultTuples = resultTuples; + } + + @Override + public void open() throws HyracksDataException { + } + + @Override + public void nextFrame(ByteBuffer buffer) throws HyracksDataException { + resultAccessor.reset(buffer); + int count = resultAccessor.getTupleCount(); + for (int i = 0; i < count; i++) { + tuple.reset(resultAccessor, i); + resultTuples.add(TupleUtils.copyTuple(tuple)); + } + } + + @Override + public void fail() throws HyracksDataException { + + } + + @Override + public void close() throws HyracksDataException { + } + } + + @Test + public void testBasic() { + List<Object[]> inputs = new ArrayList<>(); + inputs.add(new Object[] { 0, false, 1, 1 }); + inputs.add(new Object[] { 1, false, 1, 2 }); + inputs.add(new Object[] { 2, false, 2, 3 }); + + List<Object[]> outputs = new ArrayList<>(); + outputs.add(new Object[] { 0, false, 1, 1 }); + outputs.add(new Object[] { 1, false, 1, 2 }); + outputs.add(new Object[] { 2, false, 2, 3 }); + + List<Object[]> buddyOutputs = new ArrayList<>(); + buddyOutputs.add(new Object[] { 0, false, 1, 1 }); + buddyOutputs.add(new Object[] { 1, false, 1, 2 }); + buddyOutputs.add(new Object[] { 2, false, 2, 3 }); + try { + runTest(inputs, outputs, new int[] { 0, 0, 0 }, false); + runTest(inputs, buddyOutputs, new int[] { 0, 0, 0 }, true); + } catch (HyracksDataException e) { + e.printStackTrace(); + Assert.fail(e.getMessage()); + } + } + + @Test + public void testUpsertWithSameSecondary() { + List<Object[]> inputs = new ArrayList<>(); + inputs.add(new Object[] { 1, false, 1, 1 }); + inputs.add(new Object[] { 0, false, 1, 1 }); + + List<Object[]> outputs = new ArrayList<>(); + outputs.add(new Object[] { 1, false, 1, 1 }); + outputs.add(new Object[] { 0, false, 1, 1 }); + + List<Object[]> buddyOutputs = new ArrayList<>(); + buddyOutputs.add(new Object[] { 1, false, 1, 1 }); + buddyOutputs.add(new Object[] { 0, true, null, 1 }); + buddyOutputs.add(new Object[] { 0, false, 1, 1 }); + + try { + runTest(inputs, outputs, new int[] { 0, 0 }, false); + runTest(inputs, buddyOutputs, new int[] { 1, 0 }, true); + } catch (HyracksDataException e) { + e.printStackTrace(); + Assert.fail(e.getMessage()); + } + } + + @Test + public void testUpsertWithDifferentSecondary() { + List<Object[]> inputs = new ArrayList<>(); + inputs.add(new Object[] { 1, false, 2, 1 }); + inputs.add(new Object[] { 0, false, 1, 1 }); + + List<Object[]> outputs = new ArrayList<>(); + outputs.add(new Object[] { 1, false, 2, 1 }); + outputs.add(new Object[] { 0, true, 2, 1 }); + outputs.add(new Object[] { 0, false, 1, 1 }); + + List<Object[]> buddyOutputs = new ArrayList<>(); + buddyOutputs.add(new Object[] { 1, false, 2, 1 }); + buddyOutputs.add(new Object[] { 0, true, null, 1 }); + buddyOutputs.add(new Object[] { 0, false, 1, 1 }); + try { + runTest(inputs, outputs, new int[] { 1, 0 }, false); + runTest(inputs, buddyOutputs, new int[] { 1, 0 }, true); + } catch (HyracksDataException e) { + e.printStackTrace(); + Assert.fail(e.getMessage()); + } + } + + @Test + public void testDelete() { + List<Object[]> inputs = new ArrayList<>(); + inputs.add(new Object[] { 2, false, 1, 1 }); + inputs.add(new Object[] { 1, true, 1, 1 }); + inputs.add(new Object[] { 0, false, 2, 1 }); + + List<Object[]> outputs = new ArrayList<>(); + outputs.add(new Object[] { 2, false, 1, 1 }); + outputs.add(new Object[] { 1, true, 1, 1 }); + outputs.add(new Object[] { 0, false, 2, 1 }); + + List<Object[]> buddyOutputs = new ArrayList<>(); + buddyOutputs.add(new Object[] { 2, false, 1, 1 }); + buddyOutputs.add(new Object[] { 1, true, null, 1 }); + buddyOutputs.add(new Object[] { 0, false, 2, 1 }); + + try { + runTest(inputs, outputs, new int[] { 0, 1, 0 }, false); + runTest(inputs, buddyOutputs, new int[] { 0, 1, 0 }, true); + + } catch (HyracksDataException e) { + e.printStackTrace(); + Assert.fail(e.getMessage()); + } + } + + @Test + public void testDeleteNullSecondary() { + List<Object[]> inputs = new ArrayList<>(); + inputs.add(new Object[] { 1, false, null, 1 }); + inputs.add(new Object[] { 0, true, null, 1 }); + + List<Object[]> outputs = new ArrayList<>(); + + List<Object[]> buddyOutputs = new ArrayList<>(); + + try { + runTest(inputs, outputs, new int[] { 0, 0 }, false); + runTest(inputs, buddyOutputs, new int[] { 0, 0 }, true); + } catch (HyracksDataException e) { + e.printStackTrace(); + Assert.fail(e.getMessage()); + } + } + + @Test + public void testNullSecondary() { + List<Object[]> inputs = new ArrayList<>(); + inputs.add(new Object[] { 2, false, null, 1 }); + inputs.add(new Object[] { 1, false, 2, 1 }); + inputs.add(new Object[] { 0, false, null, 1 }); + + List<Object[]> outputs = new ArrayList<>(); + outputs.add(new Object[] { 1, false, 2, 1 }); + outputs.add(new Object[] { 0, true, 2, 1 }); + + List<Object[]> buddyOutputs = new ArrayList<>(); + buddyOutputs.add(new Object[] { 1, false, 2, 1 }); + buddyOutputs.add(new Object[] { 0, true, null, 1 }); + try { + runTest(inputs, outputs, new int[] { 1, 0, 0 }, false); + runTest(inputs, buddyOutputs, new int[] { 1, 0, 0 }, true); + } catch (HyracksDataException e) { + e.printStackTrace(); + Assert.fail(e.getMessage()); + } + } + + private void runTest(List<Object[]> inputTuples, List<Object[]> expectedTuples, int[] numDeletedTuples, + boolean hasBuddyBTree) throws HyracksDataException { + List<Object> stateObjects = new ArrayList<>(); + IHyracksJobletContext jobletContext = Mockito.mock(IHyracksJobletContext.class); + IHyracksTaskContext ctx = Mockito.mock(IHyracksTaskContext.class); + Mockito.when(ctx.getJobletContext()).thenReturn(jobletContext); + Mockito.when(ctx.getInitialFrameSize()).thenReturn(FRAME_SIZE); + Mockito.when(ctx.allocateFrame(FRAME_SIZE)).thenAnswer(new Answer<ByteBuffer>() { + @Override + public ByteBuffer answer(InvocationOnMock invocation) throws Throwable { + return ByteBuffer.allocate(FRAME_SIZE); + } + }); + + Mockito.doAnswer(new Answer<Void>() { + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + stateObjects.add(invocation.getArguments()[0]); + return null; + } + }).when(ctx).setStateObject(Mockito.any(IStateObject.class)); + + List<ITupleReference> resultTuples = new ArrayList<>(); + IFrameWriter resultWriter = new ResultFrameWriter(resultTuples); + + LSMSecondaryIndexCreationTupleProcessorNodePushable op = + new LSMSecondaryIndexCreationTupleProcessorNodePushable(ctx, 0, recDesc, MissingWriterFactory.INSTANCE, + numTagFields, numSecondaryKeys, numPrimaryKeys, hasBuddyBTree); + op.setOutputFrameWriter(0, resultWriter, recDesc); + + op.open(); + + IFrame frame = new VSizeFrame(ctx); + FrameTupleAppender appender = new FrameTupleAppender(frame, true); + // generate input tuples + for (Object[] inputTuple : inputTuples) { + ITupleReference tuple = buildTuple(inputTuple); + appender.append(tuple); + } + appender.write(op, true); + op.close(); + + // check results + Assert.assertEquals("Check the number of returned tuples", expectedTuples.size(), resultTuples.size()); + + for (int i = 0; i < expectedTuples.size(); i++) { + Object[] expectedTuple = expectedTuples.get(i); + ITupleReference resultTuple = resultTuples.get(i); + + // check component pos + Assert.assertEquals("Check component position", expectedTuple[0], + IntegerPointable.getInteger(resultTuple.getFieldData(0), resultTuple.getFieldStart(0))); + + // check anti-matter flag + Assert.assertEquals("Check anti-matter", expectedTuple[1], + BooleanPointable.getBoolean(resultTuple.getFieldData(1), resultTuple.getFieldStart(1))); + + if (expectedTuple[2] == null) { + Assert.assertTrue("Check secondary key is empty", + TypeTagUtil.isType(resultTuple, 2, ATypeTag.SERIALIZED_MISSING_TYPE_TAG)); + } else { + Assert.assertEquals("Check secondary key", expectedTuple[2], + IntegerPointable.getInteger(resultTuple.getFieldData(2), resultTuple.getFieldStart(2))); + } + + Assert.assertEquals("Check primary key", expectedTuple[3], + IntegerPointable.getInteger(resultTuple.getFieldData(3), resultTuple.getFieldStart(3))); + } + + //check num deleted tuples + Assert.assertEquals("Check state object", 1, stateObjects.size()); + DeletedTupleCounter counter = (DeletedTupleCounter) stateObjects.get(0); + for (int i = 0; i < numDeletedTuples.length; i++) { + Assert.assertEquals("Check num of deleted tuples", numDeletedTuples[i], counter.get(i)); + } + } + + private ITupleReference buildTuple(Object[] values) throws HyracksDataException { + ArrayTupleBuilder builder = new ArrayTupleBuilder(numTagFields + numSecondaryKeys + numPrimaryKeys); + builder.addField(IntegerSerializerDeserializer.INSTANCE, (int) values[0]); + builder.addField(BooleanSerializerDeserializer.INSTANCE, (boolean) values[1]); + if (values[2] == null) { + IMissingWriter writer = MissingWriterFactory.INSTANCE.createMissingWriter(); + writer.writeMissing(builder.getDataOutput()); + builder.addFieldEndOffset(); + } else { + builder.addField(IntegerSerializerDeserializer.INSTANCE, (int) values[2]); + } + + builder.addField(IntegerSerializerDeserializer.INSTANCE, (int) values[3]); + ArrayTupleReference tuple = new ArrayTupleReference(); + tuple.reset(builder.getFieldEndOffsets(), builder.getByteArray()); + return tuple; + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a7fa05bb/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/pom.xml ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/pom.xml b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/pom.xml index 2b92a20..3ea29bc 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/pom.xml +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/pom.xml @@ -63,6 +63,11 @@ </dependency> <dependency> <groupId>org.apache.hyracks</groupId> + <artifactId>hyracks-dataflow-std</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.hyracks</groupId> <artifactId>hyracks-storage-am-common</artifactId> <version>${project.version}</version> </dependency> http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a7fa05bb/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeDiskComponentScanOperatorDescriptor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeDiskComponentScanOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeDiskComponentScanOperatorDescriptor.java new file mode 100644 index 0000000..291363d --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeDiskComponentScanOperatorDescriptor.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hyracks.storage.am.lsm.btree.dataflow; + +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider; +import org.apache.hyracks.api.dataflow.value.RecordDescriptor; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.job.IOperatorDescriptorRegistry; +import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor; +import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory; +import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory; + +public class LSMBTreeDiskComponentScanOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor { + + private static final long serialVersionUID = 1L; + + protected final IIndexDataflowHelperFactory indexHelperFactory; + protected final ISearchOperationCallbackFactory searchCallbackFactory; + + public LSMBTreeDiskComponentScanOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor outRecDesc, + IIndexDataflowHelperFactory indexHelperFactory, ISearchOperationCallbackFactory searchCallbackFactory) { + super(spec, 1, 1); + this.indexHelperFactory = indexHelperFactory; + this.searchCallbackFactory = searchCallbackFactory; + this.outRecDescs[0] = outRecDesc; + } + + @Override + public LSMBTreeDiskComponentScanOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx, + IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException { + return new LSMBTreeDiskComponentScanOperatorNodePushable(ctx, partition, + recordDescProvider.getInputRecordDescriptor(getActivityId(), 0), indexHelperFactory, + searchCallbackFactory); + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a7fa05bb/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeDiskComponentScanOperatorNodePushable.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeDiskComponentScanOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeDiskComponentScanOperatorNodePushable.java new file mode 100644 index 0000000..1f71876 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeDiskComponentScanOperatorNodePushable.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.storage.am.lsm.btree.dataflow; + +import java.nio.ByteBuffer; + +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.dataflow.value.RecordDescriptor; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory; +import org.apache.hyracks.storage.am.common.api.ITreeIndex; +import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory; +import org.apache.hyracks.storage.am.common.dataflow.IndexSearchOperatorNodePushable; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor; +import org.apache.hyracks.storage.common.ISearchPredicate; + +public class LSMBTreeDiskComponentScanOperatorNodePushable extends IndexSearchOperatorNodePushable { + + public LSMBTreeDiskComponentScanOperatorNodePushable(IHyracksTaskContext ctx, int partition, + RecordDescriptor inputRecDesc, IIndexDataflowHelperFactory indexHelperFactory, + ISearchOperationCallbackFactory searchCallbackFactory) throws HyracksDataException { + super(ctx, inputRecDesc, partition, null, null, indexHelperFactory, false, false, null, searchCallbackFactory, + false); + } + + @Override + public void nextFrame(ByteBuffer buffer) throws HyracksDataException { + try { + ((ILSMIndexAccessor) indexAccessor).scanDiskComponents(cursor); + writeSearchResults(0); + } catch (Exception e) { + throw HyracksDataException.create(e); + } + } + + @Override + protected ISearchPredicate createSearchPredicate() { + // do nothing + // no need to create search predicate for disk component scan operation + return null; + } + + @Override + protected void resetSearchPredicate(int tupleIndex) { + throw new UnsupportedOperationException(); + } + + @Override + protected int getFieldCount() { + return ((ITreeIndex) index).getFieldCount() + 2; + } + +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a7fa05bb/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java index 1c99b17..f6f4828 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java @@ -675,7 +675,8 @@ public class ExternalBTreeWithBuddy extends AbstractLSMIndex implements ITreeInd } } - protected ILSMDiskComponent createBulkLoadTarget() throws HyracksDataException { + @Override + public ILSMDiskComponent createBulkLoadTarget() throws HyracksDataException { LSMComponentFileReferences componentFileRefs = fileManager.getRelFlushFileReference(); return createDiskComponent(bulkComponentFactory, componentFileRefs.getInsertIndexFileReference(), componentFileRefs.getDeleteIndexFileReference(), componentFileRefs.getBloomFilterFileReference(), true); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a7fa05bb/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java index e830b3e..ffdfe2a 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java @@ -455,7 +455,8 @@ public class LSMBTree extends AbstractLSMIndex implements ITreeIndex { return new LSMBTreeBulkLoader(this, fillLevel, verifyInput, numElementsHint); } - protected ILSMDiskComponent createBulkLoadTarget() throws HyracksDataException { + @Override + public ILSMDiskComponent createBulkLoadTarget() throws HyracksDataException { LSMComponentFileReferences componentFileRefs = fileManager.getRelFlushFileReference(); return createDiskComponent(bulkLoadComponentFactory, componentFileRefs.getInsertIndexFileReference(), componentFileRefs.getBloomFilterFileReference(), true); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a7fa05bb/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponentScanCursor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponentScanCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponentScanCursor.java index 3c9f709..ab9b5af 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponentScanCursor.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponentScanCursor.java @@ -19,7 +19,6 @@ package org.apache.hyracks.storage.am.lsm.btree.impls; -import org.apache.hyracks.api.exceptions.ErrorCode; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.data.std.api.IValueReference; import org.apache.hyracks.data.std.primitive.BooleanPointable; @@ -107,13 +106,17 @@ public class LSMBTreeDiskComponentScanCursor extends LSMIndexSearchCursor { super.next(); LSMBTreeTupleReference diskTuple = (LSMBTreeTupleReference) super.getTuple(); if (diskTuple.isAntimatter()) { - setAntiMatterTuple(diskTuple, outputElement.getCursorIndex()); + if (setAntiMatterTuple(diskTuple, outputElement.getCursorIndex())) { + foundNext = true; + return true; + } } else { //matter tuple setMatterTuple(diskTuple, outputElement.getCursorIndex()); + foundNext = true; + return true; } - foundNext = true; - return true; + } return false; @@ -139,21 +142,23 @@ public class LSMBTreeDiskComponentScanCursor extends LSMIndexSearchCursor { } originalTuple = new PermutingTupleReference(permutation); } - //build the matter tuple buildTuple(tupleBuilder, diskTuple, cursorIndex, MATTER_TUPLE_FLAG); outputTuple.reset(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray()); originalTuple.reset(outputTuple); } - private void setAntiMatterTuple(ITupleReference diskTuple, int cursorIndex) throws HyracksDataException { + private boolean setAntiMatterTuple(ITupleReference diskTuple, int cursorIndex) throws HyracksDataException { if (originalTuple == null || cmp.compare(diskTuple, originalTuple) != 0) { - //This shouldn't happen, because we shouldn't place an anti-matter tuple - //into the primary index if that tuple is not there - throw HyracksDataException.create(ErrorCode.CANNOT_FIND_MATTER_TUPLE_FOR_ANTI_MATTER_TUPLE); + // This could happen sometimes... + // Consider insert tuple A into the memory component, and then delete it immediately. + // We would have -A in the memory component, but there is no tuple A in the previous disk components. + // But in this case, we can simply ignore it for the scan purpose + return false; } buildTuple(antiMatterTupleBuilder, originalTuple, cursorIndex, ANTIMATTER_TUPLE_FLAG); outputTuple.reset(antiMatterTupleBuilder.getFieldEndOffsets(), antiMatterTupleBuilder.getByteArray()); + return true; } private void buildTuple(ArrayTupleBuilder builder, ITupleReference diskTuple, int cursorIndex, http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a7fa05bb/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java index 9e1f30d..ae2c93d 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java @@ -159,4 +159,11 @@ public interface ILSMIndex extends IIndex { boolean verifyInput, long numElementsHint, boolean checkIfEmptyIndex, boolean withFilter, boolean cleanupEmptyComponent) throws HyracksDataException; + /** + * Creates a disk component for the bulk load operation + * + * @return + * @throws HyracksDataException + */ + ILSMDiskComponent createBulkLoadTarget() throws HyracksDataException; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a7fa05bb/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java index d5edbb5..30266fb 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java @@ -549,12 +549,13 @@ public class LSMInvertedIndex extends AbstractLSMIndex implements IInvertedIndex componentBulkLoader.abort(); } - private ILSMDiskComponent createBulkLoadTarget() throws HyracksDataException { - LSMComponentFileReferences componentFileRefs = fileManager.getRelFlushFileReference(); - return createDiskInvIndexComponent(componentFactory, componentFileRefs.getInsertIndexFileReference(), - componentFileRefs.getDeleteIndexFileReference(), componentFileRefs.getBloomFilterFileReference(), - true); - } + } + + @Override + public ILSMDiskComponent createBulkLoadTarget() throws HyracksDataException { + LSMComponentFileReferences componentFileRefs = fileManager.getRelFlushFileReference(); + return createDiskInvIndexComponent(componentFactory, componentFileRefs.getInsertIndexFileReference(), + componentFileRefs.getDeleteIndexFileReference(), componentFileRefs.getBloomFilterFileReference(), true); } protected InMemoryInvertedIndex createInMemoryInvertedIndex(IVirtualBufferCache virtualBufferCache, http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a7fa05bb/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java index 56af16d..f295f5b 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java @@ -338,7 +338,8 @@ public class LSMRTree extends AbstractLSMRTree { buddyBTreeFields); } - protected ILSMDiskComponent createBulkLoadTarget() throws HyracksDataException { + @Override + public ILSMDiskComponent createBulkLoadTarget() throws HyracksDataException { LSMComponentFileReferences componentFileRefs = fileManager.getRelFlushFileReference(); return createDiskComponent(componentFactory, componentFileRefs.getInsertIndexFileReference(), componentFileRefs.getDeleteIndexFileReference(), componentFileRefs.getBloomFilterFileReference(), true); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a7fa05bb/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java index ae72884..19f0ca0 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java @@ -311,12 +311,13 @@ public class LSMRTreeWithAntiMatterTuples extends AbstractLSMRTree { } } - private ILSMDiskComponent createBulkLoadTarget() throws HyracksDataException { - LSMComponentFileReferences relFlushFileRefs = fileManager.getRelFlushFileReference(); - return createDiskComponent(bulkLoaComponentFactory, relFlushFileRefs.getInsertIndexFileReference(), null, - null, true); - } + } + @Override + public ILSMDiskComponent createBulkLoadTarget() throws HyracksDataException { + LSMComponentFileReferences relFlushFileRefs = fileManager.getRelFlushFileReference(); + return createDiskComponent(bulkLoaComponentFactory, relFlushFileRefs.getInsertIndexFileReference(), null, null, + true); } @Override
