This is an automated email from the ASF dual-hosted git repository. wyk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
The following commit(s) were added to refs/heads/master by this push: new 37e3f5cc0a [ASTERIXDB-3137][STO] Introduce LSM write operations for columnar format 37e3f5cc0a is described below commit 37e3f5cc0a6a53bab348000481a981178cbe31cb Author: Wail Alkowaileet <wael....@gmail.com> AuthorDate: Tue Mar 14 17:33:12 2023 -0700 [ASTERIXDB-3137][STO] Introduce LSM write operations for columnar format - user mode changes: no - storage format changes: no - interface changes: yes Details: This patch adds the support for writing columnar values to LSM indexes. By write we mean LSM flush, merge, and load operations Interface changes: ITupleProjector#project() now returns ITupleReference Change-Id: Ibe494d6de4478954df8e2f3ba0941391934954c2 Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17424 Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Reviewed-by: Wail Alkowaileet <wael....@gmail.com> Reviewed-by: Murtadha Hubail <mhub...@apache.org> --- .../assembler/AbstractNestedValueAssembler.java | 111 ++++++++++++ .../assembler/AbstractPrimitiveValueAssembler.java | 96 +++++++++++ .../column/assembler/AbstractValueAssembler.java | 110 ++++++++++++ .../column/assembler/ArrayValueAssembler.java | 75 ++++++++ .../assembler/ArrayWithUnionValueAssembler.java | 67 +++++++ .../asterix/column/assembler/AssemblerInfo.java | 100 +++++++++++ .../asterix/column/assembler/EmptyAssembler.java | 34 +++- .../column/assembler/ObjectValueAssembler.java | 73 ++++++++ .../column/assembler/PrimitiveValueAssembler.java | 47 +++++ .../assembler/RepeatedPrimitiveValueAssembler.java | 96 +++++++++++ .../value/AbstractFixedLengthValueGetter.java | 19 +- .../column/assembler/value/BooleanValueGetter.java | 21 ++- .../column/assembler/value/DoubleValueGetter.java | 21 ++- .../column/assembler/value/IValueGetter.java | 14 +- .../assembler/value/IValueGetterFactory.java | 13 +- .../column/assembler/value/LongValueGetter.java | 21 ++- .../column/assembler/value/MissingValueGetter.java | 27 ++- .../column/assembler/value/NullValueGetter.java | 27 ++- .../column/assembler/value/StringValueGetter.java | 31 ++-- .../column/assembler/value/UUIDValueGetter.java | 24 ++- .../column/assembler/value/ValueGetterFactory.java | 50 ++++++ .../operation/lsm/flush/BatchFinalizerVisitor.java | 115 ++++++++++++ .../operation/lsm/flush/ColumnTransformer.java | 183 ++++++++++++++++++++ .../flush/FlushColumnTupleReaderWriterFactory.java | 49 ++++++ .../lsm/flush/FlushColumnTupleWithMetaWriter.java | 49 ++++++ .../lsm/flush/FlushColumnTupleWriter.java | 121 +++++++++++++ .../load/LoadColumnTupleReaderWriterFactory.java | 37 ++++ .../operation/lsm/load/LoadColumnTupleWriter.java | 21 ++- .../operation/lsm/merge/IEndOfPageCallBack.java | 24 ++- .../lsm/merge/MergeColumnReadMetadata.java | 95 ++++++++++ .../lsm/merge/MergeColumnTupleProjector.java | 61 +++++++ .../lsm/merge/MergeColumnTupleReader.java | 40 +++++ .../merge/MergeColumnTupleReaderWriterFactory.java | 45 +++++ .../lsm/merge/MergeColumnTupleWriter.java | 192 +++++++++++++++++++++ .../lsm/merge/MergeColumnWriteMetadata.java | 115 ++++++++++++ .../tuple/AbstractAsterixColumnTupleReference.java | 140 +++++++++++++++ .../column/tuple/MergeColumnTupleReference.java | 100 +++++++++++ .../am/common/impls/DefaultTupleProjector.java | 3 +- .../storage/common/projection/ITupleProjector.java | 2 +- 39 files changed, 2364 insertions(+), 105 deletions(-) diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/AbstractNestedValueAssembler.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/AbstractNestedValueAssembler.java new file mode 100644 index 0000000000..1a4c3ef2e7 --- /dev/null +++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/AbstractNestedValueAssembler.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.column.assembler; + +import org.apache.asterix.om.types.ATypeTag; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.data.std.util.ArrayBackedValueStorage; + +abstract class AbstractNestedValueAssembler extends AbstractValueAssembler { + protected final ArrayBackedValueStorage storage; + + AbstractNestedValueAssembler(int level, AssemblerInfo info) { + super(level, info); + storage = new ArrayBackedValueStorage(); + } + + /** + * @return whether the nested assembler was started or not + */ + final boolean isStarted() { + return started; + } + + /** + * Add a nested value + * + * @param value contains the value and its information + */ + abstract void addValue(AbstractValueAssembler value) throws HyracksDataException; + + /** + * Add a nested {@link ATypeTag#NULL} + * + * @param value contains the value's information + */ + abstract void addNull(AbstractValueAssembler value) throws HyracksDataException; + + /** + * Add a nested {@link ATypeTag#MISSING} + */ + void addMissing() throws HyracksDataException { + //By default, we ignore missing + } + + @Override + final void addNullToAncestor(int nullLevel) throws HyracksDataException { + AbstractNestedValueAssembler parent = getParent(); + if (nullLevel + 1 == level) { + parent.start(); + parent.addNull(this); + return; + } + parent.addNullToAncestor(nullLevel); + } + + @Override + final void addMissingToAncestor(int missingLevel) throws HyracksDataException { + AbstractNestedValueAssembler parent = getParent(); + if (missingLevel + 1 == level) { + parent.start(); + parent.addMissing(); + return; + } + parent.addMissingToAncestor(missingLevel); + } + + /** + * Recursively start the path of this assembler by staring all un-started parents + */ + public final void start() { + if (started) { + return; + } + started = true; + reset(); + AbstractNestedValueAssembler parent = getParent(); + if (parent != null && !parent.isStarted()) { + parent.start(); + } + } + + /** + * End the assembler and add this nested value to its parent + */ + public final void end() throws HyracksDataException { + if (started) { + addValueToParent(); + started = false; + } + + if (isDelegate()) { + getParent().end(); + } + } +} diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/AbstractPrimitiveValueAssembler.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/AbstractPrimitiveValueAssembler.java new file mode 100644 index 0000000000..9f1809dfe3 --- /dev/null +++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/AbstractPrimitiveValueAssembler.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.column.assembler; + +import org.apache.asterix.column.assembler.value.IValueGetter; +import org.apache.asterix.column.bytes.stream.in.AbstractBytesInputStream; +import org.apache.asterix.column.values.IColumnValuesReader; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.data.std.api.IValueReference; + +public abstract class AbstractPrimitiveValueAssembler extends AbstractValueAssembler { + /** + * An indicator to go to the next value + */ + public static final int NEXT_ASSEMBLER = -1; + protected final IValueGetter primitiveValueGetter; + protected final IColumnValuesReader reader; + + AbstractPrimitiveValueAssembler(int level, AssemblerInfo info, IColumnValuesReader reader, + IValueGetter primitiveValueGetter) { + super(level, info); + this.primitiveValueGetter = primitiveValueGetter; + this.reader = reader; + } + + public final void reset(AbstractBytesInputStream in, int startIndex, int numberOfTuples) + throws HyracksDataException { + reader.reset(in, numberOfTuples); + reader.skip(startIndex); + } + + @Override + public final IValueReference getValue() throws HyracksDataException { + return primitiveValueGetter.getValue(reader); + } + + @Override + void addNullToAncestor(int nullLevel) throws HyracksDataException { + AbstractNestedValueAssembler parent = getParent(); + if (nullLevel + 1 == level) { + parent.start(); + parent.addNull(this); + return; + } + parent.addNullToAncestor(nullLevel); + } + + @Override + void addMissingToAncestor(int missingLevel) throws HyracksDataException { + AbstractNestedValueAssembler parent = getParent(); + if (missingLevel + 1 == level) { + parent.start(); + parent.addMissing(); + return; + } + parent.addMissingToAncestor(missingLevel); + } + + @Override + final void addValueToParent() throws HyracksDataException { + AbstractNestedValueAssembler parent = getParent(); + parent.start(); + getParent().addValue(this); + } + + public final int getColumnIndex() { + return reader.getColumnIndex(); + } + + public final void skip(int count) throws HyracksDataException { + reader.skip(count); + } + + /** + * Move to the next primitive value assembler + * + * @return the index of the next value + */ + public abstract int next() throws HyracksDataException; +} diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/AbstractValueAssembler.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/AbstractValueAssembler.java new file mode 100644 index 0000000000..0071917074 --- /dev/null +++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/AbstractValueAssembler.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.column.assembler; + +import org.apache.asterix.om.types.ATypeTag; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.data.std.api.IValueReference; +import org.apache.hyracks.data.std.primitive.VoidPointable; + +public abstract class AbstractValueAssembler { + protected static final VoidPointable NULL; + protected static final VoidPointable MISSING; + private final AbstractNestedValueAssembler parent; + private final IValueReference fieldName; + private final int fieldIndex; + private final boolean delegate; + protected final int level; + protected boolean started; + + static { + NULL = new VoidPointable(); + NULL.set(new byte[] { ATypeTag.NULL.serialize() }, 0, 1); + + MISSING = new VoidPointable(); + MISSING.set(new byte[] { ATypeTag.MISSING.serialize() }, 0, 1); + } + + protected AbstractValueAssembler(int level, AssemblerInfo info) { + this.parent = info.getParent(); + this.fieldName = info.getFieldName(); + this.fieldIndex = info.getFieldIndex(); + this.delegate = info.isDelegate(); + this.level = level; + } + + /** + * Add {@link ATypeTag#NULL} value to the ancestor at {@code nullLevel} + * + * @param nullLevel at what level the null occurred + */ + abstract void addNullToAncestor(int nullLevel) throws HyracksDataException; + + /** + * Add {@link ATypeTag#MISSING} value to the ancestor at {@code missingLevel} + * + * @param missingLevel at what level the missing occurred + */ + abstract void addMissingToAncestor(int missingLevel) throws HyracksDataException; + + /** + * Add the value of this assembler to its parent + */ + abstract void addValueToParent() throws HyracksDataException; + + /** + * @return the assembled value + */ + public abstract IValueReference getValue() throws HyracksDataException; + + /** + * Reset assembler + */ + void reset() { + //NoOp + } + + /** + * @return whether this assembler is the delegate (or representative) of its siblings + */ + final boolean isDelegate() { + return delegate; + } + + /** + * @return parent of the assembler + */ + final AbstractNestedValueAssembler getParent() { + return parent; + } + + /** + * Return the field name of the value of this assembler + */ + final IValueReference getFieldName() { + return fieldName; + } + + /** + * Return the field index of the value of this assembler (for closed types) + */ + final int getFieldIndex() { + return fieldIndex; + } +} diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/ArrayValueAssembler.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/ArrayValueAssembler.java new file mode 100644 index 0000000000..2352e7fc96 --- /dev/null +++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/ArrayValueAssembler.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.column.assembler; + +import org.apache.asterix.builders.IAsterixListBuilder; +import org.apache.asterix.builders.ListBuilderFactory; +import org.apache.asterix.om.types.AbstractCollectionType; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.data.std.api.IValueReference; + +public class ArrayValueAssembler extends AbstractNestedValueAssembler { + private final IAsterixListBuilder listBuilder; + private final AbstractCollectionType collectionType; + private final int firstValueIndex; + + ArrayValueAssembler(int level, AssemblerInfo info, int firstValueIndex) { + super(level, info); + this.firstValueIndex = firstValueIndex; + collectionType = (AbstractCollectionType) info.getDeclaredType(); + listBuilder = new ListBuilderFactory().create(collectionType.getTypeTag()); + } + + final int getFirstValueIndex() { + return firstValueIndex; + } + + @Override + void reset() { + listBuilder.reset(collectionType); + storage.reset(); + } + + @Override + void addValue(AbstractValueAssembler value) throws HyracksDataException { + listBuilder.addItem(value.getValue()); + } + + @Override + void addNull(AbstractValueAssembler value) throws HyracksDataException { + listBuilder.addItem(NULL); + } + + @Override + void addMissing() throws HyracksDataException { + listBuilder.addItem(MISSING); + } + + @Override + void addValueToParent() throws HyracksDataException { + storage.reset(); + listBuilder.write(storage.getDataOutput(), true); + getParent().addValue(this); + } + + @Override + public IValueReference getValue() { + return storage; + } +} diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/ArrayWithUnionValueAssembler.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/ArrayWithUnionValueAssembler.java new file mode 100644 index 0000000000..dcd240bc78 --- /dev/null +++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/ArrayWithUnionValueAssembler.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.column.assembler; + +import org.apache.asterix.column.metadata.schema.AbstractSchemaNode; +import org.apache.asterix.column.metadata.schema.UnionSchemaNode; +import org.apache.hyracks.api.exceptions.HyracksDataException; + +public class ArrayWithUnionValueAssembler extends ArrayValueAssembler { + private final int numberOfUnionChildren; + private int numberOfAddedValues; + private boolean nonMissingValueAdded; + + ArrayWithUnionValueAssembler(int level, AssemblerInfo info, int firstValueIndex, AbstractSchemaNode itemNode) { + super(level, info, firstValueIndex); + this.numberOfUnionChildren = ((UnionSchemaNode) itemNode).getChildren().size(); + } + + @Override + void reset() { + numberOfAddedValues = 0; + nonMissingValueAdded = false; + super.reset(); + } + + @Override + void addValue(AbstractValueAssembler value) throws HyracksDataException { + nonMissingValueAdded = true; + numberOfAddedValues++; + super.addValue(value); + } + + @Override + void addNull(AbstractValueAssembler value) throws HyracksDataException { + nonMissingValueAdded = true; + numberOfAddedValues++; + super.addNull(value); + } + + @Override + void addMissing() throws HyracksDataException { + numberOfAddedValues++; + if (nonMissingValueAdded && numberOfAddedValues >= numberOfUnionChildren) { + nonMissingValueAdded = false; + numberOfAddedValues = numberOfAddedValues % numberOfUnionChildren; + } else if (numberOfAddedValues == numberOfUnionChildren) { + super.addMissing(); + numberOfAddedValues = 0; + } + } +} diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/AssemblerInfo.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/AssemblerInfo.java new file mode 100644 index 0000000000..712e65c842 --- /dev/null +++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/AssemblerInfo.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.column.assembler; + +import org.apache.asterix.om.types.ATypeTag; +import org.apache.asterix.om.types.BuiltinType; +import org.apache.asterix.om.types.IAType; +import org.apache.hyracks.data.std.api.IValueReference; +import org.apache.hyracks.data.std.primitive.VoidPointable; + +public class AssemblerInfo { + private final AbstractNestedValueAssembler parent; + private final IAType declaredType; + private final boolean delegate; + private final IValueReference fieldName; + private final int fieldIndex; + + public AssemblerInfo() { + this(BuiltinType.ANY, null, false); + } + + public AssemblerInfo(IAType declaredType, EmptyAssembler parent) { + this(declaredType, parent, false); + } + + public AssemblerInfo(IAType declaredType, AbstractNestedValueAssembler parent, boolean delegate) { + this(declaredType, parent, delegate, null, -1); + } + + public AssemblerInfo(IAType declaredType, AbstractNestedValueAssembler parent, boolean delegate, + IValueReference fieldName) { + this(declaredType, parent, delegate, fieldName, -1); + } + + public AssemblerInfo(IAType declaredType, AbstractNestedValueAssembler parent, boolean delegate, int fieldIndex) { + this(declaredType, parent, delegate, null, fieldIndex); + } + + public AssemblerInfo(IAType declaredType, AbstractNestedValueAssembler parent, boolean delegate, + IValueReference fieldName, int fieldIndex) { + this(declaredType, parent, delegate, fieldName, fieldIndex, false); + } + + public AssemblerInfo(IAType declaredType, AbstractNestedValueAssembler parent, boolean delegate, + IValueReference fieldName, int fieldIndex, boolean fieldNameTagged) { + this.parent = parent; + this.declaredType = declaredType; + this.delegate = delegate; + this.fieldName = fieldNameTagged ? fieldName : createTaggedFieldName(fieldName); + this.fieldIndex = fieldIndex; + } + + private IValueReference createTaggedFieldName(IValueReference fieldName) { + if (fieldName == null) { + return null; + } + byte[] storage = new byte[1 + fieldName.getLength()]; + storage[0] = ATypeTag.STRING.serialize(); + System.arraycopy(fieldName.getByteArray(), fieldName.getStartOffset(), storage, 1, fieldName.getLength()); + VoidPointable taggedFieldName = new VoidPointable(); + taggedFieldName.set(storage, 0, storage.length); + return taggedFieldName; + } + + public AbstractNestedValueAssembler getParent() { + return parent; + } + + public IAType getDeclaredType() { + return declaredType; + } + + public boolean isDelegate() { + return delegate; + } + + public IValueReference getFieldName() { + return fieldName; + } + + public int getFieldIndex() { + return fieldIndex; + } +} diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/projection/ITupleProjector.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/EmptyAssembler.java similarity index 52% copy from hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/projection/ITupleProjector.java copy to asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/EmptyAssembler.java index 8ca1a82541..406a4016e4 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/projection/ITupleProjector.java +++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/EmptyAssembler.java @@ -16,14 +16,34 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.hyracks.storage.common.projection; +package org.apache.asterix.column.assembler; -import java.io.DataOutput; -import java.io.IOException; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.data.std.api.IValueReference; -import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder; -import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; +public class EmptyAssembler extends AbstractNestedValueAssembler { -public interface ITupleProjector { - void project(ITupleReference tuple, DataOutput dos, ArrayTupleBuilder tb) throws IOException; + EmptyAssembler() { + super(-1, new AssemblerInfo()); + } + + @Override + void addValue(AbstractValueAssembler value) throws HyracksDataException { + //noOp + } + + @Override + void addValueToParent() throws HyracksDataException { + //noOp + } + + @Override + void addNull(AbstractValueAssembler value) throws HyracksDataException { + //noOp + } + + @Override + public IValueReference getValue() throws HyracksDataException { + return null; + } } diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/ObjectValueAssembler.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/ObjectValueAssembler.java new file mode 100644 index 0000000000..536ce02005 --- /dev/null +++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/ObjectValueAssembler.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.column.assembler; + +import org.apache.asterix.builders.RecordBuilder; +import org.apache.asterix.om.types.ARecordType; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.data.std.api.IValueReference; + +public class ObjectValueAssembler extends AbstractNestedValueAssembler { + private final RecordBuilder recordBuilder; + private final ARecordType recordType; + + ObjectValueAssembler(int level, AssemblerInfo info) { + super(level, info); + recordBuilder = new RecordBuilder(); + recordType = (ARecordType) info.getDeclaredType(); + } + + @Override + void reset() { + recordBuilder.reset(recordType); + storage.reset(); + } + + @Override + void addValue(AbstractValueAssembler value) throws HyracksDataException { + int valueIndex = value.getFieldIndex(); + if (valueIndex >= 0) { + recordBuilder.addField(valueIndex, value.getValue()); + } else { + recordBuilder.addField(value.getFieldName(), value.getValue()); + } + } + + @Override + void addNull(AbstractValueAssembler value) throws HyracksDataException { + int valueIndex = value.getFieldIndex(); + if (valueIndex >= 0) { + recordBuilder.addField(valueIndex, NULL); + } else { + recordBuilder.addField(value.getFieldName(), NULL); + } + } + + @Override + void addValueToParent() throws HyracksDataException { + storage.reset(); + recordBuilder.write(storage.getDataOutput(), true); + getParent().addValue(this); + } + + @Override + public IValueReference getValue() { + return storage; + } +} diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/PrimitiveValueAssembler.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/PrimitiveValueAssembler.java new file mode 100644 index 0000000000..9592a12da9 --- /dev/null +++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/PrimitiveValueAssembler.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.column.assembler; + +import org.apache.asterix.column.assembler.value.IValueGetter; +import org.apache.asterix.column.values.IColumnValuesReader; +import org.apache.hyracks.api.exceptions.HyracksDataException; + +public class PrimitiveValueAssembler extends AbstractPrimitiveValueAssembler { + + PrimitiveValueAssembler(int level, AssemblerInfo info, IColumnValuesReader reader, IValueGetter primitiveValue) { + super(level, info, reader, primitiveValue); + } + + @Override + public int next() throws HyracksDataException { + if (!reader.next()) { + throw new IllegalAccessError("no more values"); + } else if (reader.isNull() && (isDelegate() || reader.getLevel() + 1 == level)) { + addNullToAncestor(reader.getLevel()); + } else if (reader.isValue()) { + addValueToParent(); + } + + if (isDelegate()) { + getParent().end(); + } + //Go to next value + return -1; + } +} diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/RepeatedPrimitiveValueAssembler.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/RepeatedPrimitiveValueAssembler.java new file mode 100644 index 0000000000..8fa228f0df --- /dev/null +++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/RepeatedPrimitiveValueAssembler.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.column.assembler; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.asterix.column.assembler.value.IValueGetter; +import org.apache.asterix.column.values.IColumnValuesReader; +import org.apache.hyracks.api.exceptions.HyracksDataException; + +class RepeatedPrimitiveValueAssembler extends AbstractPrimitiveValueAssembler { + private final List<ArrayValueAssembler> arrays; + + RepeatedPrimitiveValueAssembler(int level, AssemblerInfo info, IColumnValuesReader reader, + IValueGetter primitiveValue) { + super(level, info, reader, primitiveValue); + this.arrays = new ArrayList<>(); + } + + public void addArray(ArrayValueAssembler assembler) { + arrays.add(assembler); + } + + @Override + public int next() throws HyracksDataException { + if (!reader.next()) { + throw new IllegalStateException("No more values"); + } else if (reader.isNull() && (!arrays.isEmpty() || reader.getLevel() + 1 == level)) { + /* + * There are two cases here for where the null belongs to: + * 1- If the null is an array item, then add it + * 2- If the null is an ancestor, then we only add null if this column is the array delegate + * (i.e., !arrays.isEmpty()) + */ + addNullToAncestor(reader.getLevel()); + } else if (reader.isMissing() && reader.getLevel() + 1 == level) { + /* + * Add a missing item + */ + addMissingToAncestor(reader.getLevel()); + } else if (reader.isValue()) { + addValueToParent(); + } + + if (isDelegate()) { + getParent().end(); + } + + //Initially, go to the next primitive assembler + int nextIndex = NEXT_ASSEMBLER; + if (!arrays.isEmpty()) { + /* + * This assembler is a delegate of a repeated group + * The delimiter index tells us that this assembler is responsible for a finished group + */ + int delimiterIndex = reader.getDelimiterIndex(); + if (delimiterIndex < arrays.size() && reader.isDelimiter()) { + //Also finish the next group + delimiterIndex++; + } + + int numberOfFinishedGroups = Math.min(delimiterIndex, arrays.size()); + for (int i = 0; i < numberOfFinishedGroups; i++) { + //I'm the delegate for this group of repeated values and the group(s) is finished + ArrayValueAssembler assembler = arrays.get(i); + assembler.end(); + } + + //Is the repeated group (determined by the delimiter index) still unfinished? + if (delimiterIndex < arrays.size()) { + //Yes, go to the first value of the unfinished repeated group + nextIndex = arrays.get(delimiterIndex).getFirstValueIndex(); + } + } + + //Go to next value + return nextIndex; + } +} diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/projection/ITupleProjector.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/AbstractFixedLengthValueGetter.java similarity index 58% copy from hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/projection/ITupleProjector.java copy to asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/AbstractFixedLengthValueGetter.java index 8ca1a82541..aeef686aca 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/projection/ITupleProjector.java +++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/AbstractFixedLengthValueGetter.java @@ -16,14 +16,19 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.hyracks.storage.common.projection; +package org.apache.asterix.column.assembler.value; -import java.io.DataOutput; -import java.io.IOException; +import org.apache.asterix.om.types.ATypeTag; +import org.apache.hyracks.data.std.primitive.VoidPointable; -import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder; -import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; +public abstract class AbstractFixedLengthValueGetter implements IValueGetter { + protected final VoidPointable value; -public interface ITupleProjector { - void project(ITupleReference tuple, DataOutput dos, ArrayTupleBuilder tb) throws IOException; + AbstractFixedLengthValueGetter(ATypeTag typeTag, int nonTaggedLength) { + //+1 for the type tag + byte[] storage = new byte[1 + nonTaggedLength]; + storage[0] = typeTag.serialize(); + value = new VoidPointable(); + value.set(storage, 0, storage.length); + } } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/projection/ITupleProjector.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/BooleanValueGetter.java similarity index 56% copy from hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/projection/ITupleProjector.java copy to asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/BooleanValueGetter.java index 8ca1a82541..4a776abfcb 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/projection/ITupleProjector.java +++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/BooleanValueGetter.java @@ -16,14 +16,21 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.hyracks.storage.common.projection; +package org.apache.asterix.column.assembler.value; -import java.io.DataOutput; -import java.io.IOException; +import org.apache.asterix.column.values.IColumnValuesReader; +import org.apache.asterix.om.types.ATypeTag; +import org.apache.hyracks.data.std.api.IValueReference; +import org.apache.hyracks.data.std.primitive.BooleanPointable; -import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder; -import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; +class BooleanValueGetter extends AbstractFixedLengthValueGetter { + BooleanValueGetter() { + super(ATypeTag.BOOLEAN, 1); + } -public interface ITupleProjector { - void project(ITupleReference tuple, DataOutput dos, ArrayTupleBuilder tb) throws IOException; + @Override + public IValueReference getValue(IColumnValuesReader reader) { + BooleanPointable.setBoolean(value.getByteArray(), value.getStartOffset() + 1, reader.getBoolean()); + return value; + } } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/projection/ITupleProjector.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/DoubleValueGetter.java similarity index 56% copy from hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/projection/ITupleProjector.java copy to asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/DoubleValueGetter.java index 8ca1a82541..2e88896b7e 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/projection/ITupleProjector.java +++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/DoubleValueGetter.java @@ -16,14 +16,21 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.hyracks.storage.common.projection; +package org.apache.asterix.column.assembler.value; -import java.io.DataOutput; -import java.io.IOException; +import org.apache.asterix.column.values.IColumnValuesReader; +import org.apache.asterix.om.types.ATypeTag; +import org.apache.hyracks.data.std.api.IValueReference; +import org.apache.hyracks.data.std.primitive.DoublePointable; -import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder; -import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; +class DoubleValueGetter extends AbstractFixedLengthValueGetter { + DoubleValueGetter() { + super(ATypeTag.DOUBLE, Double.BYTES); + } -public interface ITupleProjector { - void project(ITupleReference tuple, DataOutput dos, ArrayTupleBuilder tb) throws IOException; + @Override + public IValueReference getValue(IColumnValuesReader reader) { + DoublePointable.setDouble(value.getByteArray(), value.getStartOffset() + 1, reader.getDouble()); + return value; + } } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/projection/ITupleProjector.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/IValueGetter.java similarity index 67% copy from hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/projection/ITupleProjector.java copy to asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/IValueGetter.java index 8ca1a82541..9e58ab8748 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/projection/ITupleProjector.java +++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/IValueGetter.java @@ -16,14 +16,12 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.hyracks.storage.common.projection; +package org.apache.asterix.column.assembler.value; -import java.io.DataOutput; -import java.io.IOException; +import org.apache.asterix.column.values.IColumnValuesReader; +import org.apache.hyracks.data.std.api.IValueReference; -import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder; -import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; - -public interface ITupleProjector { - void project(ITupleReference tuple, DataOutput dos, ArrayTupleBuilder tb) throws IOException; +@FunctionalInterface +public interface IValueGetter { + IValueReference getValue(IColumnValuesReader reader); } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/projection/ITupleProjector.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/IValueGetterFactory.java similarity index 67% copy from hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/projection/ITupleProjector.java copy to asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/IValueGetterFactory.java index 8ca1a82541..0b58cfc4e6 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/projection/ITupleProjector.java +++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/IValueGetterFactory.java @@ -16,14 +16,11 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.hyracks.storage.common.projection; +package org.apache.asterix.column.assembler.value; -import java.io.DataOutput; -import java.io.IOException; +import org.apache.asterix.om.types.ATypeTag; -import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder; -import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; - -public interface ITupleProjector { - void project(ITupleReference tuple, DataOutput dos, ArrayTupleBuilder tb) throws IOException; +@FunctionalInterface +public interface IValueGetterFactory { + IValueGetter createValueGetter(ATypeTag typeTag); } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/projection/ITupleProjector.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/LongValueGetter.java similarity index 56% copy from hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/projection/ITupleProjector.java copy to asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/LongValueGetter.java index 8ca1a82541..e76e3c9564 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/projection/ITupleProjector.java +++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/LongValueGetter.java @@ -16,14 +16,21 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.hyracks.storage.common.projection; +package org.apache.asterix.column.assembler.value; -import java.io.DataOutput; -import java.io.IOException; +import org.apache.asterix.column.values.IColumnValuesReader; +import org.apache.asterix.om.types.ATypeTag; +import org.apache.hyracks.data.std.api.IValueReference; +import org.apache.hyracks.data.std.primitive.LongPointable; -import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder; -import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; +class LongValueGetter extends AbstractFixedLengthValueGetter { + LongValueGetter() { + super(ATypeTag.BIGINT, Long.BYTES); + } -public interface ITupleProjector { - void project(ITupleReference tuple, DataOutput dos, ArrayTupleBuilder tb) throws IOException; + @Override + public IValueReference getValue(IColumnValuesReader reader) { + LongPointable.setLong(value.getByteArray(), value.getStartOffset() + 1, reader.getLong()); + return value; + } } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/projection/ITupleProjector.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/MissingValueGetter.java similarity index 52% copy from hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/projection/ITupleProjector.java copy to asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/MissingValueGetter.java index 8ca1a82541..1ae84ee5c7 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/projection/ITupleProjector.java +++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/MissingValueGetter.java @@ -16,14 +16,27 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.hyracks.storage.common.projection; +package org.apache.asterix.column.assembler.value; -import java.io.DataOutput; -import java.io.IOException; +import org.apache.asterix.column.values.IColumnValuesReader; +import org.apache.asterix.om.types.ATypeTag; +import org.apache.hyracks.data.std.api.IValueReference; +import org.apache.hyracks.data.std.primitive.VoidPointable; -import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder; -import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; +public class MissingValueGetter implements IValueGetter { + public static final IValueGetter INSTANCE = new MissingValueGetter(); + private static final VoidPointable MISSING; -public interface ITupleProjector { - void project(ITupleReference tuple, DataOutput dos, ArrayTupleBuilder tb) throws IOException; + static { + MISSING = new VoidPointable(); + MISSING.set(new byte[] { ATypeTag.MISSING.serialize() }, 0, 1); + } + + private MissingValueGetter() { + } + + @Override + public IValueReference getValue(IColumnValuesReader reader) { + return MISSING; + } } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/projection/ITupleProjector.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/NullValueGetter.java similarity index 53% copy from hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/projection/ITupleProjector.java copy to asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/NullValueGetter.java index 8ca1a82541..e05025211f 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/projection/ITupleProjector.java +++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/NullValueGetter.java @@ -16,14 +16,27 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.hyracks.storage.common.projection; +package org.apache.asterix.column.assembler.value; -import java.io.DataOutput; -import java.io.IOException; +import org.apache.asterix.column.values.IColumnValuesReader; +import org.apache.asterix.om.types.ATypeTag; +import org.apache.hyracks.data.std.api.IValueReference; +import org.apache.hyracks.data.std.primitive.VoidPointable; -import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder; -import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; +public class NullValueGetter implements IValueGetter { + public static final IValueGetter INSTANCE = new NullValueGetter(); + private static final VoidPointable NULL; -public interface ITupleProjector { - void project(ITupleReference tuple, DataOutput dos, ArrayTupleBuilder tb) throws IOException; + static { + NULL = new VoidPointable(); + NULL.set(new byte[] { ATypeTag.NULL.serialize() }, 0, 1); + } + + private NullValueGetter() { + } + + @Override + public IValueReference getValue(IColumnValuesReader reader) { + return NULL; + } } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/DefaultTupleProjector.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/StringValueGetter.java similarity index 50% copy from hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/DefaultTupleProjector.java copy to asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/StringValueGetter.java index 00cb0c5486..1dd1aa7169 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/DefaultTupleProjector.java +++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/StringValueGetter.java @@ -16,26 +16,27 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.hyracks.storage.am.common.impls; +package org.apache.asterix.column.assembler.value; -import java.io.DataOutput; -import java.io.IOException; +import org.apache.asterix.column.values.IColumnValuesReader; +import org.apache.asterix.om.types.ATypeTag; +import org.apache.hyracks.data.std.api.IValueReference; +import org.apache.hyracks.data.std.util.ArrayBackedValueStorage; -import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder; -import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; -import org.apache.hyracks.storage.common.projection.ITupleProjector; +class StringValueGetter implements IValueGetter { + private final ArrayBackedValueStorage value; -class DefaultTupleProjector implements ITupleProjector { - public static final ITupleProjector INSTANCE = new DefaultTupleProjector(); - - private DefaultTupleProjector() { + public StringValueGetter() { + value = new ArrayBackedValueStorage(); } @Override - public void project(ITupleReference tuple, DataOutput dos, ArrayTupleBuilder tb) throws IOException { - for (int i = 0; i < tuple.getFieldCount(); i++) { - dos.write(tuple.getFieldData(i), tuple.getFieldStart(i), tuple.getFieldLength(i)); - tb.addFieldEndOffset(); - } + public IValueReference getValue(IColumnValuesReader reader) { + IValueReference string = reader.getBytes(); + value.setSize(1 + string.getLength()); + byte[] bytes = value.getByteArray(); + bytes[0] = ATypeTag.STRING.serialize(); + System.arraycopy(string.getByteArray(), string.getStartOffset(), bytes, 1, string.getLength()); + return value; } } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/projection/ITupleProjector.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/UUIDValueGetter.java similarity index 55% copy from hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/projection/ITupleProjector.java copy to asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/UUIDValueGetter.java index 8ca1a82541..135ed8571b 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/projection/ITupleProjector.java +++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/UUIDValueGetter.java @@ -16,14 +16,22 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.hyracks.storage.common.projection; +package org.apache.asterix.column.assembler.value; -import java.io.DataOutput; -import java.io.IOException; +import org.apache.asterix.column.values.IColumnValuesReader; +import org.apache.asterix.om.types.ATypeTag; +import org.apache.hyracks.data.std.api.IValueReference; -import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder; -import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; +class UUIDValueGetter extends AbstractFixedLengthValueGetter { + UUIDValueGetter() { + super(ATypeTag.UUID, 16); + } -public interface ITupleProjector { - void project(ITupleReference tuple, DataOutput dos, ArrayTupleBuilder tb) throws IOException; -} + @Override + public IValueReference getValue(IColumnValuesReader reader) { + IValueReference uuid = reader.getBytes(); + System.arraycopy(uuid.getByteArray(), uuid.getStartOffset(), value.getByteArray(), value.getStartOffset() + 1, + uuid.getLength()); + return value; + } +} \ No newline at end of file diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/ValueGetterFactory.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/ValueGetterFactory.java new file mode 100644 index 0000000000..5f7fd7e096 --- /dev/null +++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/ValueGetterFactory.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.column.assembler.value; + +import org.apache.asterix.om.types.ATypeTag; + +public class ValueGetterFactory implements IValueGetterFactory { + public static final IValueGetterFactory INSTANCE = new ValueGetterFactory(); + + private ValueGetterFactory() { + } + + @Override + public IValueGetter createValueGetter(ATypeTag typeTag) { + switch (typeTag) { + case NULL: + return NullValueGetter.INSTANCE; + case MISSING: + return MissingValueGetter.INSTANCE; + case BOOLEAN: + return new BooleanValueGetter(); + case BIGINT: + return new LongValueGetter(); + case DOUBLE: + return new DoubleValueGetter(); + case STRING: + return new StringValueGetter(); + case UUID: + return new UUIDValueGetter(); + default: + throw new UnsupportedOperationException(typeTag + " is not supported"); + } + } +} diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/BatchFinalizerVisitor.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/BatchFinalizerVisitor.java new file mode 100644 index 0000000000..4cbe09bc9c --- /dev/null +++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/BatchFinalizerVisitor.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.column.operation.lsm.flush; + +import java.util.Comparator; +import java.util.List; +import java.util.PriorityQueue; + +import org.apache.asterix.column.metadata.schema.AbstractSchemaNestedNode; +import org.apache.asterix.column.metadata.schema.AbstractSchemaNode; +import org.apache.asterix.column.metadata.schema.ISchemaNodeVisitor; +import org.apache.asterix.column.metadata.schema.ObjectSchemaNode; +import org.apache.asterix.column.metadata.schema.UnionSchemaNode; +import org.apache.asterix.column.metadata.schema.collection.AbstractCollectionSchemaNode; +import org.apache.asterix.column.metadata.schema.primitive.PrimitiveSchemaNode; +import org.apache.asterix.column.values.IColumnBatchWriter; +import org.apache.asterix.column.values.IColumnValuesWriter; +import org.apache.hyracks.api.exceptions.HyracksDataException; + +public final class BatchFinalizerVisitor implements ISchemaNodeVisitor<Void, AbstractSchemaNestedNode> { + private final FlushColumnMetadata columnSchemaMetadata; + private final IColumnValuesWriter[] primaryKeyWriters; + private final PriorityQueue<IColumnValuesWriter> orderedColumns; + private int level; + + public BatchFinalizerVisitor(FlushColumnMetadata columnSchemaMetadata) { + this.columnSchemaMetadata = columnSchemaMetadata; + orderedColumns = new PriorityQueue<>(Comparator.comparingInt(x -> -x.getEstimatedSize())); + int numberOfPrimaryKeys = columnSchemaMetadata.getNumberOfPrimaryKeys(); + primaryKeyWriters = new IColumnValuesWriter[numberOfPrimaryKeys]; + for (int i = 0; i < numberOfPrimaryKeys; i++) { + primaryKeyWriters[i] = columnSchemaMetadata.getWriter(i); + } + level = -1; + } + + public int finalizeBatch(IColumnBatchWriter batchWriter, FlushColumnMetadata columnMetadata) + throws HyracksDataException { + orderedColumns.clear(); + + columnMetadata.getRoot().accept(this, null); + if (columnMetadata.getMetaRoot() != null) { + columnMetadata.getMetaRoot().accept(this, null); + } + + int allocatedSpace = batchWriter.writePrimaryKeyColumns(primaryKeyWriters); + allocatedSpace += batchWriter.writeColumns(orderedColumns); + return allocatedSpace; + } + + @Override + public Void visit(ObjectSchemaNode objectNode, AbstractSchemaNestedNode arg) throws HyracksDataException { + level++; + columnSchemaMetadata.flushDefinitionLevels(level, arg, objectNode); + List<AbstractSchemaNode> children = objectNode.getChildren(); + for (int i = 0; i < children.size(); i++) { + children.get(i).accept(this, objectNode); + } + objectNode.setCounter(0); + columnSchemaMetadata.clearDefinitionLevels(objectNode); + level--; + return null; + } + + @Override + public Void visit(AbstractCollectionSchemaNode collectionNode, AbstractSchemaNestedNode arg) + throws HyracksDataException { + level++; + columnSchemaMetadata.flushDefinitionLevels(level, arg, collectionNode); + collectionNode.getItemNode().accept(this, collectionNode); + collectionNode.setCounter(0); + columnSchemaMetadata.clearDefinitionLevels(collectionNode); + level--; + return null; + } + + @Override + public Void visit(UnionSchemaNode unionNode, AbstractSchemaNestedNode arg) throws HyracksDataException { + columnSchemaMetadata.flushDefinitionLevels(level, arg, unionNode); + for (AbstractSchemaNode node : unionNode.getChildren().values()) { + node.accept(this, unionNode); + } + unionNode.setCounter(0); + columnSchemaMetadata.clearDefinitionLevels(unionNode); + return null; + } + + @Override + public Void visit(PrimitiveSchemaNode primitiveNode, AbstractSchemaNestedNode arg) throws HyracksDataException { + columnSchemaMetadata.flushDefinitionLevels(level, arg, primitiveNode); + if (!primitiveNode.isPrimaryKey()) { + orderedColumns.add(columnSchemaMetadata.getWriter(primitiveNode.getColumnIndex())); + } + + //Prepare for the next batch + primitiveNode.setCounter(0); + return null; + } +} diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/ColumnTransformer.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/ColumnTransformer.java new file mode 100644 index 0000000000..48cd442c09 --- /dev/null +++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/ColumnTransformer.java @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.column.operation.lsm.flush; + +import org.apache.asterix.column.metadata.schema.AbstractSchemaNestedNode; +import org.apache.asterix.column.metadata.schema.AbstractSchemaNode; +import org.apache.asterix.column.metadata.schema.ObjectSchemaNode; +import org.apache.asterix.column.metadata.schema.UnionSchemaNode; +import org.apache.asterix.column.metadata.schema.collection.AbstractCollectionSchemaNode; +import org.apache.asterix.column.metadata.schema.primitive.PrimitiveSchemaNode; +import org.apache.asterix.column.util.RunLengthIntArray; +import org.apache.asterix.column.values.IColumnValuesWriter; +import org.apache.asterix.om.lazy.AbstractLazyVisitablePointable; +import org.apache.asterix.om.lazy.AbstractListLazyVisitablePointable; +import org.apache.asterix.om.lazy.FlatLazyVisitablePointable; +import org.apache.asterix.om.lazy.ILazyVisitablePointableVisitor; +import org.apache.asterix.om.lazy.RecordLazyVisitablePointable; +import org.apache.asterix.om.types.ATypeTag; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.data.std.api.IValueReference; +import org.apache.hyracks.data.std.primitive.VoidPointable; +import org.apache.hyracks.storage.am.lsm.btree.tuples.LSMBTreeTupleReference; + +public class ColumnTransformer implements ILazyVisitablePointableVisitor<AbstractSchemaNode, AbstractSchemaNode> { + private final FlushColumnMetadata columnMetadata; + private final VoidPointable nonTaggedValue; + private final ObjectSchemaNode root; + private AbstractSchemaNestedNode currentParent; + private int primaryKeysLength; + + public ColumnTransformer(FlushColumnMetadata columnMetadata, ObjectSchemaNode root) { + this.columnMetadata = columnMetadata; + this.root = root; + nonTaggedValue = new VoidPointable(); + } + + /** + * Transform a tuple in row format into columns + * + * @param pointable record pointable + * @return the estimated size (possibly overestimated) of the primary key(s) columns + */ + public int transform(RecordLazyVisitablePointable pointable) throws HyracksDataException { + primaryKeysLength = 0; + pointable.accept(this, root); + return primaryKeysLength; + } + + public int writeAntiMatter(LSMBTreeTupleReference tuple) throws HyracksDataException { + int pkSize = 0; + for (int i = 0; i < columnMetadata.getNumberOfPrimaryKeys(); i++) { + byte[] bytes = tuple.getFieldData(i); + int start = tuple.getFieldStart(i); + ATypeTag tag = ATypeTag.VALUE_TYPE_MAPPING[bytes[start]]; + nonTaggedValue.set(bytes, start + 1, tuple.getFieldLength(i) - 1); + IColumnValuesWriter writer = columnMetadata.getWriter(i); + writer.writeAntiMatter(tag, nonTaggedValue); + pkSize += writer.getEstimatedSize(); + } + return pkSize; + } + + @Override + public AbstractSchemaNode visit(RecordLazyVisitablePointable pointable, AbstractSchemaNode arg) + throws HyracksDataException { + columnMetadata.enterNode(currentParent, arg); + AbstractSchemaNestedNode previousParent = currentParent; + + ObjectSchemaNode objectNode = (ObjectSchemaNode) arg; + currentParent = objectNode; + for (int i = 0; i < pointable.getNumberOfChildren(); i++) { + pointable.nextChild(); + IValueReference fieldName = pointable.getFieldName(); + ATypeTag childTypeTag = pointable.getChildTypeTag(); + if (childTypeTag != ATypeTag.MISSING) { + //Only write actual field values (including NULL) but ignore MISSING fields + AbstractSchemaNode childNode = objectNode.getOrCreateChild(fieldName, childTypeTag, columnMetadata); + acceptActualNode(pointable.getChildVisitablePointable(), childNode); + } + } + + columnMetadata.exitNode(arg); + currentParent = previousParent; + return null; + } + + @Override + public AbstractSchemaNode visit(AbstractListLazyVisitablePointable pointable, AbstractSchemaNode arg) + throws HyracksDataException { + columnMetadata.enterNode(currentParent, arg); + AbstractSchemaNestedNode previousParent = currentParent; + + AbstractCollectionSchemaNode collectionNode = (AbstractCollectionSchemaNode) arg; + RunLengthIntArray defLevels = columnMetadata.getDefinitionLevels(collectionNode); + //the level at which an item is missing + int missingLevel = columnMetadata.getLevel(); + currentParent = collectionNode; + + int numberOfChildren = pointable.getNumberOfChildren(); + for (int i = 0; i < numberOfChildren; i++) { + pointable.nextChild(); + ATypeTag childTypeTag = pointable.getChildTypeTag(); + AbstractSchemaNode childNode = collectionNode.getOrCreateItem(childTypeTag, columnMetadata); + acceptActualNode(pointable.getChildVisitablePointable(), childNode); + /* + * The array item may change (e.g., BIGINT --> UNION). Thus, new items would be considered as missing + */ + defLevels.add(missingLevel); + } + + columnMetadata.exitCollectionNode(collectionNode, numberOfChildren); + currentParent = previousParent; + return null; + } + + @Override + public AbstractSchemaNode visit(FlatLazyVisitablePointable pointable, AbstractSchemaNode arg) + throws HyracksDataException { + columnMetadata.enterNode(currentParent, arg); + ATypeTag valueTypeTag = pointable.getTypeTag(); + PrimitiveSchemaNode node = (PrimitiveSchemaNode) arg; + IColumnValuesWriter writer = columnMetadata.getWriter(node.getColumnIndex()); + if (valueTypeTag == ATypeTag.MISSING) { + writer.writeLevel(columnMetadata.getLevel()); + } else if (valueTypeTag == ATypeTag.NULL) { + writer.writeNull(columnMetadata.getLevel()); + } else if (pointable.isTagged()) { + //Remove type tag + nonTaggedValue.set(pointable.getByteArray(), pointable.getStartOffset() + 1, pointable.getLength() - 1); + writer.writeValue(pointable.getTypeTag(), nonTaggedValue); + } else { + writer.writeValue(pointable.getTypeTag(), pointable); + } + if (node.isPrimaryKey()) { + primaryKeysLength += writer.getEstimatedSize(); + } + columnMetadata.exitNode(arg); + return null; + } + + private void acceptActualNode(AbstractLazyVisitablePointable pointable, AbstractSchemaNode node) + throws HyracksDataException { + if (node.getTypeTag() == ATypeTag.UNION) { + columnMetadata.enterNode(currentParent, node); + AbstractSchemaNestedNode previousParent = currentParent; + + UnionSchemaNode unionNode = (UnionSchemaNode) node; + currentParent = unionNode; + + ATypeTag childTypeTag = pointable.getTypeTag(); + AbstractSchemaNode actualNode; + if (childTypeTag == ATypeTag.NULL || childTypeTag == ATypeTag.MISSING) { + actualNode = unionNode.getOriginalType(); + } else { + actualNode = unionNode.getOrCreateChild(pointable.getTypeTag(), columnMetadata); + } + pointable.accept(this, actualNode); + + currentParent = previousParent; + columnMetadata.exitNode(node); + } else if (pointable.getTypeTag() == ATypeTag.NULL && node.isNested()) { + columnMetadata.addNestedNull(currentParent, (AbstractSchemaNestedNode) node); + } else { + pointable.accept(this, node); + } + } +} diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleReaderWriterFactory.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleReaderWriterFactory.java new file mode 100644 index 0000000000..9b1b0a20e9 --- /dev/null +++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleReaderWriterFactory.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.column.operation.lsm.flush; + +import org.apache.asterix.column.metadata.AbstractColumnImmutableReadMetadata; +import org.apache.hyracks.storage.am.lsm.btree.column.api.AbstractColumnTupleReader; +import org.apache.hyracks.storage.am.lsm.btree.column.api.AbstractColumnTupleReaderWriterFactory; +import org.apache.hyracks.storage.am.lsm.btree.column.api.AbstractColumnTupleWriter; +import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnMetadata; +import org.apache.hyracks.storage.am.lsm.btree.column.api.projection.IColumnProjectionInfo; + +public class FlushColumnTupleReaderWriterFactory extends AbstractColumnTupleReaderWriterFactory { + private static final long serialVersionUID = -9197679192729634493L; + + public FlushColumnTupleReaderWriterFactory(int pageSize, int maxNumberOfTuples, float tolerance) { + super(pageSize, maxNumberOfTuples, tolerance); + } + + @Override + public AbstractColumnTupleWriter createColumnWriter(IColumnMetadata columnMetadata) { + FlushColumnMetadata flushColumnMetadata = (FlushColumnMetadata) columnMetadata; + if (flushColumnMetadata.getMetaType() == null) { + //no meta + return new FlushColumnTupleWriter(flushColumnMetadata, pageSize, maxNumberOfTuples, tolerance); + } + return new FlushColumnTupleWithMetaWriter(flushColumnMetadata, pageSize, maxNumberOfTuples, tolerance); + } + + @Override + public AbstractColumnTupleReader createColumnReader(IColumnProjectionInfo columnProjectionInfo) { + return ((AbstractColumnImmutableReadMetadata) columnProjectionInfo).createTupleReader(); + } +} diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleWithMetaWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleWithMetaWriter.java new file mode 100644 index 0000000000..9c527daa0e --- /dev/null +++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleWithMetaWriter.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.column.operation.lsm.flush; + +import org.apache.asterix.om.lazy.RecordLazyVisitablePointable; +import org.apache.asterix.om.lazy.TypedRecordLazyVisitablePointable; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.storage.am.lsm.btree.tuples.LSMBTreeTupleReference; + +public class FlushColumnTupleWithMetaWriter extends FlushColumnTupleWriter { + private final ColumnTransformer metaColumnTransformer; + private final RecordLazyVisitablePointable metaPointable; + + public FlushColumnTupleWithMetaWriter(FlushColumnMetadata columnMetadata, int pageSize, int maxNumberOfTuples, + float tolerance) { + super(columnMetadata, pageSize, maxNumberOfTuples, tolerance); + metaColumnTransformer = new ColumnTransformer(columnMetadata, columnMetadata.getMetaRoot()); + metaPointable = new TypedRecordLazyVisitablePointable(columnMetadata.getMetaType()); + } + + @Override + protected void writeMeta(LSMBTreeTupleReference btreeTuple) throws HyracksDataException { + if (btreeTuple.isAntimatter()) { + return; + } + + int metaFieldId = columnMetadata.getMetaRecordFieldIndex(); + metaPointable.set(btreeTuple.getFieldData(metaFieldId), btreeTuple.getFieldStart(metaFieldId), + btreeTuple.getFieldLength(metaFieldId)); + //In case the primary key is not in the meta part, we take the maximum + primaryKeysEstimatedSize = Math.max(metaColumnTransformer.transform(metaPointable), primaryKeysEstimatedSize); + } +} diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleWriter.java new file mode 100644 index 0000000000..1af043fb4e --- /dev/null +++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleWriter.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.column.operation.lsm.flush; + +import java.nio.ByteBuffer; + +import org.apache.asterix.column.values.writer.ColumnBatchWriter; +import org.apache.asterix.column.values.writer.filters.AbstractColumnFilterWriter; +import org.apache.asterix.om.lazy.RecordLazyVisitablePointable; +import org.apache.asterix.om.lazy.TypedRecordLazyVisitablePointable; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; +import org.apache.hyracks.storage.am.lsm.btree.column.api.AbstractColumnTupleWriter; +import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnWriteMultiPageOp; +import org.apache.hyracks.storage.am.lsm.btree.tuples.LSMBTreeTupleReference; + +public class FlushColumnTupleWriter extends AbstractColumnTupleWriter { + protected final FlushColumnMetadata columnMetadata; + protected final BatchFinalizerVisitor finalizer; + protected final ColumnBatchWriter writer; + + private final ColumnTransformer transformer; + private final RecordLazyVisitablePointable pointable; + private final int maxNumberOfTuples; + + protected int primaryKeysEstimatedSize; + + public FlushColumnTupleWriter(FlushColumnMetadata columnMetadata, int pageSize, int maxNumberOfTuples, + float tolerance) { + this.columnMetadata = columnMetadata; + transformer = new ColumnTransformer(columnMetadata, columnMetadata.getRoot()); + finalizer = new BatchFinalizerVisitor(columnMetadata); + writer = new ColumnBatchWriter(columnMetadata.getMultiPageOpRef(), pageSize, tolerance); + this.maxNumberOfTuples = maxNumberOfTuples; + pointable = new TypedRecordLazyVisitablePointable(columnMetadata.getDatasetType()); + } + + @Override + public final void init(IColumnWriteMultiPageOp multiPageOp) throws HyracksDataException { + columnMetadata.init(multiPageOp); + } + + @Override + public final int getNumberOfColumns() { + return columnMetadata.getNumberOfColumns(); + } + + @Override + public final int bytesRequired(ITupleReference tuple) { + int primaryKeysSize = 0; + for (int i = 0; i < columnMetadata.getNumberOfPrimaryKeys(); i++) { + primaryKeysSize += tuple.getFieldLength(i); + } + + //Mostly it is an overestimated size + return primaryKeysSize; + } + + @Override + public final int getOccupiedSpace() { + int numberOfColumns = getNumberOfColumns(); + int filterSize = numberOfColumns * AbstractColumnFilterWriter.FILTER_SIZE; + return primaryKeysEstimatedSize + filterSize; + } + + @Override + public final int getMaxNumberOfTuples() { + return maxNumberOfTuples; + } + + @Override + public final void close() { + columnMetadata.close(); + } + + @Override + public void writeTuple(ITupleReference tuple) throws HyracksDataException { + //This from an in-memory component, hence the cast + LSMBTreeTupleReference btreeTuple = (LSMBTreeTupleReference) tuple; + if (btreeTuple.isAntimatter()) { + //Write only the primary keys of an anti-matter tuple + primaryKeysEstimatedSize = transformer.writeAntiMatter(btreeTuple); + return; + } + writeRecord(tuple); + writeMeta(btreeTuple); + } + + @Override + public final int flush(ByteBuffer pageZero) throws HyracksDataException { + writer.setPageZeroBuffer(pageZero, getNumberOfColumns(), columnMetadata.getNumberOfPrimaryKeys()); + return finalizer.finalizeBatch(writer, columnMetadata); + } + + protected void writeRecord(ITupleReference tuple) throws HyracksDataException { + int recordFieldId = columnMetadata.getRecordFieldIndex(); + pointable.set(tuple.getFieldData(recordFieldId), tuple.getFieldStart(recordFieldId), + tuple.getFieldLength(recordFieldId)); + primaryKeysEstimatedSize = transformer.transform(pointable); + } + + protected void writeMeta(LSMBTreeTupleReference btreeTuple) throws HyracksDataException { + //NoOp + } +} diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/load/LoadColumnTupleReaderWriterFactory.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/load/LoadColumnTupleReaderWriterFactory.java new file mode 100644 index 0000000000..0c1990f479 --- /dev/null +++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/load/LoadColumnTupleReaderWriterFactory.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.column.operation.lsm.load; + +import org.apache.asterix.column.operation.lsm.flush.FlushColumnMetadata; +import org.apache.asterix.column.operation.lsm.flush.FlushColumnTupleReaderWriterFactory; +import org.apache.hyracks.storage.am.lsm.btree.column.api.AbstractColumnTupleWriter; +import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnMetadata; + +public class LoadColumnTupleReaderWriterFactory extends FlushColumnTupleReaderWriterFactory { + private static final long serialVersionUID = -7583574057314353873L; + + public LoadColumnTupleReaderWriterFactory(int pageSize, int maxNumberOfTuples, float tolerance) { + super(pageSize, maxNumberOfTuples, tolerance); + } + + @Override + public AbstractColumnTupleWriter createColumnWriter(IColumnMetadata columnMetadata) { + return new LoadColumnTupleWriter((FlushColumnMetadata) columnMetadata, pageSize, maxNumberOfTuples, tolerance); + } +} diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/projection/ITupleProjector.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/load/LoadColumnTupleWriter.java similarity index 56% copy from hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/projection/ITupleProjector.java copy to asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/load/LoadColumnTupleWriter.java index 8ca1a82541..e4604da849 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/projection/ITupleProjector.java +++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/load/LoadColumnTupleWriter.java @@ -16,14 +16,21 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.hyracks.storage.common.projection; +package org.apache.asterix.column.operation.lsm.load; -import java.io.DataOutput; -import java.io.IOException; - -import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder; +import org.apache.asterix.column.operation.lsm.flush.FlushColumnMetadata; +import org.apache.asterix.column.operation.lsm.flush.FlushColumnTupleWriter; +import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; -public interface ITupleProjector { - void project(ITupleReference tuple, DataOutput dos, ArrayTupleBuilder tb) throws IOException; +public class LoadColumnTupleWriter extends FlushColumnTupleWriter { + public LoadColumnTupleWriter(FlushColumnMetadata columnMetadata, int pageSize, int maxNumberOfTuples, + float tolerance) { + super(columnMetadata, pageSize, maxNumberOfTuples, tolerance); + } + + @Override + public void writeTuple(ITupleReference tuple) throws HyracksDataException { + writeRecord(tuple); + } } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/projection/ITupleProjector.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/IEndOfPageCallBack.java similarity index 50% copy from hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/projection/ITupleProjector.java copy to asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/IEndOfPageCallBack.java index 8ca1a82541..93df021bb1 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/projection/ITupleProjector.java +++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/IEndOfPageCallBack.java @@ -16,14 +16,22 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.hyracks.storage.common.projection; +package org.apache.asterix.column.operation.lsm.merge; -import java.io.DataOutput; -import java.io.IOException; +import org.apache.asterix.column.tuple.MergeColumnTupleReference; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.ColumnBTreeRangeSearchCursor; -import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder; -import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; - -public interface ITupleProjector { - void project(ITupleReference tuple, DataOutput dos, ArrayTupleBuilder tb) throws IOException; +/** + * An interface to signal {@link MergeColumnTupleWriter} that a component's page has reached the end. + */ +@FunctionalInterface +public interface IEndOfPageCallBack { + /** + * Call {@link MergeColumnTupleWriter} to finish the current "vertical" merging batch. + * The caller of this method is {@link MergeColumnTupleReference#lastTupleReached()} + * + * @see ColumnBTreeRangeSearchCursor#doHasNext() + */ + void callEnd(MergeColumnTupleReference columnTuple) throws HyracksDataException; } diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnReadMetadata.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnReadMetadata.java new file mode 100644 index 0000000000..11f3059504 --- /dev/null +++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnReadMetadata.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.column.operation.lsm.merge; + +import java.io.ByteArrayInputStream; +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.IOException; + +import org.apache.asterix.column.metadata.AbstractColumnImmutableReadMetadata; +import org.apache.asterix.column.operation.lsm.flush.FlushColumnMetadata; +import org.apache.asterix.column.values.IColumnValuesReader; +import org.apache.asterix.column.values.IColumnValuesReaderFactory; +import org.apache.asterix.om.types.ARecordType; +import org.apache.hyracks.data.std.api.IValueReference; +import org.apache.hyracks.data.std.primitive.IntegerPointable; +import org.apache.hyracks.storage.am.lsm.btree.column.api.AbstractColumnTupleReader; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; + +/** + * Merge column read metadata belongs to read an {@link ILSMDiskComponent} + * This only for reading an existing on-disk component for a merge operation. The schema here is immutable and cannot + * be changed. + */ +public final class MergeColumnReadMetadata extends AbstractColumnImmutableReadMetadata { + private final IColumnValuesReader[] columnReaders; + + private MergeColumnReadMetadata(ARecordType datasetType, ARecordType metaType, int numberOfPrimaryKeys, + IColumnValuesReader[] columnReaders, IValueReference serializedMetadata) { + super(datasetType, metaType, numberOfPrimaryKeys, serializedMetadata, columnReaders.length); + this.columnReaders = columnReaders; + } + + /** + * create ColumnMergeReadMetadata from columnMetadata + * + * @param serializedMetadata columnMetadata + * @return {@link MergeColumnReadMetadata} + * @see FlushColumnMetadata#serializeColumnsMetadata() for more information about serialization order + */ + public static MergeColumnReadMetadata create(ARecordType datasetType, ARecordType metaType, int numberOfPrimaryKeys, + IColumnValuesReaderFactory readerFactory, IValueReference serializedMetadata) throws IOException { + byte[] bytes = serializedMetadata.getByteArray(); + int offset = serializedMetadata.getStartOffset(); + int length = serializedMetadata.getLength(); + + int pathInfoStart = offset + IntegerPointable.getInteger(bytes, offset + PATH_INFO_POINTER); + DataInput input = new DataInputStream(new ByteArrayInputStream(bytes, pathInfoStart, length)); + int numberOfColumns = input.readInt(); + IColumnValuesReader[] columnReaders = new IColumnValuesReader[numberOfColumns]; + for (int i = 0; i < numberOfColumns; i++) { + IColumnValuesReader columnReader = readerFactory.createValueReader(input); + //The order at which the path info was written is not ordered by the column index + columnReaders[columnReader.getColumnIndex()] = columnReader; + } + + return new MergeColumnReadMetadata(datasetType, metaType, numberOfPrimaryKeys, columnReaders, + serializedMetadata); + } + + public IColumnValuesReader[] getColumnReaders() { + return columnReaders; + } + + @Override + public int getColumnIndex(int ordinal) { + return ordinal; + } + + @Override + public int getNumberOfProjectedColumns() { + return columnReaders.length; + } + + @Override + public AbstractColumnTupleReader createTupleReader() { + return new MergeColumnTupleReader(this); + } +} diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleProjector.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleProjector.java new file mode 100644 index 0000000000..f03506efa3 --- /dev/null +++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleProjector.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.column.operation.lsm.merge; + +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.asterix.column.values.IColumnValuesReaderFactory; +import org.apache.asterix.om.types.ARecordType; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.data.std.api.IValueReference; +import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder; +import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; +import org.apache.hyracks.storage.am.lsm.btree.column.api.projection.IColumnProjectionInfo; +import org.apache.hyracks.storage.am.lsm.btree.column.api.projection.IColumnTupleProjector; + +public class MergeColumnTupleProjector implements IColumnTupleProjector { + private final ARecordType datasetType; + private final ARecordType metaType; + private final int numberOfPrimaryKeys; + private final IColumnValuesReaderFactory readerFactory; + + public MergeColumnTupleProjector(ARecordType datasetType, ARecordType metaType, int numberOfPrimaryKeys, + IColumnValuesReaderFactory readerFactory) { + this.datasetType = datasetType; + this.metaType = metaType; + this.numberOfPrimaryKeys = numberOfPrimaryKeys; + this.readerFactory = readerFactory; + } + + @Override + public IColumnProjectionInfo createProjectionInfo(IValueReference columnMetadata) throws HyracksDataException { + try { + return MergeColumnReadMetadata.create(datasetType, metaType, numberOfPrimaryKeys, readerFactory, + columnMetadata); + } catch (IOException e) { + throw HyracksDataException.create(e); + } + } + + @Override + public ITupleReference project(ITupleReference tuple, DataOutput dos, ArrayTupleBuilder tb) throws IOException { + throw new IllegalAccessError(getClass().getName()); + } +} diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleReader.java new file mode 100644 index 0000000000..4114f1076b --- /dev/null +++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleReader.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.column.operation.lsm.merge; + +import org.apache.asterix.column.metadata.AbstractColumnImmutableReadMetadata; +import org.apache.asterix.column.tuple.MergeColumnTupleReference; +import org.apache.hyracks.storage.am.lsm.btree.column.api.AbstractColumnTupleReader; +import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnReadMultiPageOp; +import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnTupleIterator; +import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.ColumnBTreeReadLeafFrame; + +public class MergeColumnTupleReader extends AbstractColumnTupleReader { + private final MergeColumnReadMetadata columnMetadata; + + public MergeColumnTupleReader(AbstractColumnImmutableReadMetadata columnMetadata) { + this.columnMetadata = (MergeColumnReadMetadata) columnMetadata; + } + + @Override + public IColumnTupleIterator createTupleIterator(ColumnBTreeReadLeafFrame frame, int componentIndex, + IColumnReadMultiPageOp multiPageOp) { + return new MergeColumnTupleReference(componentIndex, frame, columnMetadata, multiPageOp); + } +} diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleReaderWriterFactory.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleReaderWriterFactory.java new file mode 100644 index 0000000000..1ac94fe4cc --- /dev/null +++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleReaderWriterFactory.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.column.operation.lsm.merge; + +import org.apache.asterix.column.metadata.AbstractColumnImmutableReadMetadata; +import org.apache.hyracks.storage.am.lsm.btree.column.api.AbstractColumnTupleReader; +import org.apache.hyracks.storage.am.lsm.btree.column.api.AbstractColumnTupleReaderWriterFactory; +import org.apache.hyracks.storage.am.lsm.btree.column.api.AbstractColumnTupleWriter; +import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnMetadata; +import org.apache.hyracks.storage.am.lsm.btree.column.api.projection.IColumnProjectionInfo; + +public class MergeColumnTupleReaderWriterFactory extends AbstractColumnTupleReaderWriterFactory { + private static final long serialVersionUID = -2131401304338796428L; + + public MergeColumnTupleReaderWriterFactory(int pageSize, int maxNumberOfTuples, float tolerance) { + super(pageSize, maxNumberOfTuples, tolerance); + } + + @Override + public AbstractColumnTupleWriter createColumnWriter(IColumnMetadata columnMetadata) { + MergeColumnWriteMetadata mergeWriteMetadata = (MergeColumnWriteMetadata) columnMetadata; + return new MergeColumnTupleWriter(mergeWriteMetadata, pageSize, maxNumberOfTuples, tolerance); + } + + @Override + public AbstractColumnTupleReader createColumnReader(IColumnProjectionInfo columnProjectionInfo) { + return ((AbstractColumnImmutableReadMetadata) columnProjectionInfo).createTupleReader(); + } +} diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleWriter.java new file mode 100644 index 0000000000..fbda6d000c --- /dev/null +++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleWriter.java @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.column.operation.lsm.merge; + +import java.nio.ByteBuffer; +import java.util.Comparator; +import java.util.List; +import java.util.PriorityQueue; + +import org.apache.asterix.column.tuple.MergeColumnTupleReference; +import org.apache.asterix.column.util.RunLengthIntArray; +import org.apache.asterix.column.values.IColumnValuesReader; +import org.apache.asterix.column.values.IColumnValuesWriter; +import org.apache.asterix.column.values.writer.ColumnBatchWriter; +import org.apache.asterix.column.values.writer.filters.AbstractColumnFilterWriter; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; +import org.apache.hyracks.storage.am.lsm.btree.column.api.AbstractColumnTupleWriter; +import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnTupleIterator; +import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnWriteMultiPageOp; + +public class MergeColumnTupleWriter extends AbstractColumnTupleWriter { + private final MergeColumnWriteMetadata columnMetadata; + private final MergeColumnTupleReference[] componentsTuples; + private final RunLengthIntArray writtenComponents; + + private final IColumnValuesWriter[] primaryKeyWriters; + private final PriorityQueue<IColumnValuesWriter> orderedColumns; + private final ColumnBatchWriter writer; + private final int maxNumberOfTuples; + private int primaryKeysEstimatedSize; + + public MergeColumnTupleWriter(MergeColumnWriteMetadata columnMetadata, int pageSize, int maxNumberOfTuples, + float tolerance) { + this.columnMetadata = columnMetadata; + List<IColumnTupleIterator> componentsTuplesList = columnMetadata.getComponentsTuples(); + this.componentsTuples = new MergeColumnTupleReference[componentsTuplesList.size()]; + for (int i = 0; i < componentsTuplesList.size(); i++) { + MergeColumnTupleReference mergeTuple = (MergeColumnTupleReference) componentsTuplesList.get(i); + this.componentsTuples[i] = mergeTuple; + mergeTuple.registerEndOfPageCallBack(this::writeAllColumns); + } + this.writtenComponents = new RunLengthIntArray(); + this.maxNumberOfTuples = maxNumberOfTuples; + writer = new ColumnBatchWriter(columnMetadata.getMultiPageOpRef(), pageSize, tolerance); + writtenComponents.reset(); + + primaryKeyWriters = new IColumnValuesWriter[columnMetadata.getNumberOfPrimaryKeys()]; + for (int i = 0; i < primaryKeyWriters.length; i++) { + primaryKeyWriters[i] = columnMetadata.getWriter(i); + } + orderedColumns = new PriorityQueue<>(Comparator.comparingInt(x -> -x.getEstimatedSize())); + } + + @Override + public int bytesRequired(ITupleReference tuple) { + int primaryKeysSize = 0; + for (int i = 0; i < columnMetadata.getNumberOfPrimaryKeys(); i++) { + primaryKeysSize += tuple.getFieldLength(i); + } + + return primaryKeysSize; + } + + @Override + public void init(IColumnWriteMultiPageOp multiPageOp) throws HyracksDataException { + columnMetadata.init(multiPageOp); + } + + @Override + public int getNumberOfColumns() { + return columnMetadata.getNumberOfColumns(); + } + + @Override + public int getMaxNumberOfTuples() { + return maxNumberOfTuples; + } + + @Override + public int getOccupiedSpace() { + int numberOfColumns = getNumberOfColumns(); + int filterSize = numberOfColumns * AbstractColumnFilterWriter.FILTER_SIZE; + return primaryKeysEstimatedSize + filterSize; + } + + @Override + public void writeTuple(ITupleReference tuple) throws HyracksDataException { + MergeColumnTupleReference columnTuple = (MergeColumnTupleReference) tuple; + int componentIndex = columnTuple.getComponentIndex(); + int skipCount = columnTuple.getAndResetSkipCount(); + if (skipCount > 0) { + writtenComponents.add(-componentIndex, skipCount); + } + if (columnTuple.isAntimatter()) { + writtenComponents.add(-componentIndex); + } else { + writtenComponents.add(componentIndex); + } + writePrimaryKeys(columnTuple); + } + + private void writePrimaryKeys(MergeColumnTupleReference columnTuple) throws HyracksDataException { + int primaryKeySize = 0; + for (int i = 0; i < columnMetadata.getNumberOfPrimaryKeys(); i++) { + IColumnValuesReader columnReader = columnTuple.getReader(i); + IColumnValuesWriter columnWriter = primaryKeyWriters[i]; + columnReader.write(columnWriter, false); + primaryKeySize += columnWriter.getEstimatedSize(); + } + primaryKeysEstimatedSize = primaryKeySize; + } + + private void writeNonKeyColumns() throws HyracksDataException { + for (int i = 0; i < writtenComponents.getNumberOfBlocks(); i++) { + int componentIndex = writtenComponents.getBlockValue(i); + if (componentIndex < 0) { + //Skip writing values of deleted tuples + componentIndex = -componentIndex; + skipReaders(componentIndex, writtenComponents.getBlockSize(i)); + continue; + } + MergeColumnTupleReference componentTuple = componentsTuples[componentIndex]; + int count = writtenComponents.getBlockSize(i); + for (int j = columnMetadata.getNumberOfPrimaryKeys(); j < columnMetadata.getNumberOfColumns(); j++) { + IColumnValuesReader columnReader = componentTuple.getReader(j); + IColumnValuesWriter columnWriter = columnMetadata.getWriter(j); + columnReader.write(columnWriter, count); + } + } + } + + private void skipReaders(int componentIndex, int count) throws HyracksDataException { + MergeColumnTupleReference componentTuple = componentsTuples[componentIndex]; + for (int j = columnMetadata.getNumberOfPrimaryKeys(); j < columnMetadata.getNumberOfColumns(); j++) { + IColumnValuesReader columnReader = componentTuple.getReader(j); + columnReader.skip(count); + } + } + + @Override + public int flush(ByteBuffer pageZero) throws HyracksDataException { + int numberOfColumns = columnMetadata.getNumberOfColumns(); + int numberOfPrimaryKeys = columnMetadata.getNumberOfPrimaryKeys(); + if (writtenComponents.getSize() > 0) { + writeNonKeyColumns(); + writtenComponents.reset(); + } + for (int i = numberOfPrimaryKeys; i < numberOfColumns; i++) { + orderedColumns.add(columnMetadata.getWriter(i)); + } + writer.setPageZeroBuffer(pageZero, numberOfColumns, numberOfPrimaryKeys); + int allocatedSpace = writer.writePrimaryKeyColumns(primaryKeyWriters); + allocatedSpace += writer.writeColumns(orderedColumns); + return allocatedSpace; + } + + @Override + public void close() { + columnMetadata.close(); + } + + private void writeAllColumns(MergeColumnTupleReference columnTuple) throws HyracksDataException { + /* + * The last tuple from one of the components was reached. Since we are going to the next leaf, we will not be + * able to access the readers of this component's leaf after this tuple. So, we are going to write + * the values of all columns as recorded in writtenComponents + */ + int skipCount = columnTuple.getAndResetSkipCount(); + if (skipCount > 0) { + writtenComponents.add(-columnTuple.getComponentIndex(), skipCount); + } + writeNonKeyColumns(); + writtenComponents.reset(); + } +} diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnWriteMetadata.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnWriteMetadata.java new file mode 100644 index 0000000000..b0d1a01015 --- /dev/null +++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnWriteMetadata.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.column.operation.lsm.merge; + +import java.io.ByteArrayInputStream; +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.asterix.column.metadata.AbstractColumnImmutableMetadata; +import org.apache.asterix.column.operation.lsm.flush.FlushColumnMetadata; +import org.apache.asterix.column.values.IColumnValuesWriter; +import org.apache.asterix.column.values.IColumnValuesWriterFactory; +import org.apache.asterix.column.values.writer.ColumnValuesWriterFactory; +import org.apache.asterix.om.types.ARecordType; +import org.apache.commons.lang3.mutable.Mutable; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.data.std.api.IValueReference; +import org.apache.hyracks.data.std.primitive.IntegerPointable; +import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnTupleIterator; +import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnWriteMultiPageOp; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; + +/** + * Merge column write metadata belongs to write a new merge {@link ILSMDiskComponent} + * This is for writing a new on-disk component by merging two or more on disk components. The final schema for this + * component will the most recent schema, which belongs to the newest merged component. The schema here is immutable + * and cannot be changed. + */ +public final class MergeColumnWriteMetadata extends AbstractColumnImmutableMetadata { + private final Mutable<IColumnWriteMultiPageOp> multiPageOpRef; + private final List<IColumnValuesWriter> columnWriters; + private final List<IColumnTupleIterator> componentsTuples; + + /** + * For LSM Merge + */ + private MergeColumnWriteMetadata(ARecordType datasetType, ARecordType metaType, int numberOfPrimaryKeys, + Mutable<IColumnWriteMultiPageOp> multiPageOpRef, List<IColumnValuesWriter> columnWriters, + IValueReference serializedMetadata, List<IColumnTupleIterator> componentsTuples) { + super(datasetType, metaType, numberOfPrimaryKeys, serializedMetadata, columnWriters.size()); + this.multiPageOpRef = multiPageOpRef; + this.columnWriters = columnWriters; + this.componentsTuples = componentsTuples; + } + + /** + * Set {@link IColumnWriteMultiPageOp} for {@link IColumnValuesWriter} + * + * @param multiPageOp multi-buffer allocator + */ + public void init(IColumnWriteMultiPageOp multiPageOp) throws HyracksDataException { + multiPageOpRef.setValue(multiPageOp); + + //Reset writer for the first write + for (int i = 0; i < columnWriters.size(); i++) { + columnWriters.get(i).reset(); + } + } + + public Mutable<IColumnWriteMultiPageOp> getMultiPageOpRef() { + return multiPageOpRef; + } + + public IColumnValuesWriter getWriter(int columnIndex) { + return columnWriters.get(columnIndex); + } + + public void close() { + multiPageOpRef.setValue(null); + for (int i = 0; i < columnWriters.size(); i++) { + columnWriters.get(i).close(); + } + } + + public static MergeColumnWriteMetadata create(ARecordType datasetType, ARecordType metaType, + int numberOfPrimaryKeys, Mutable<IColumnWriteMultiPageOp> multiPageOpRef, + IValueReference serializedMetadata, List<IColumnTupleIterator> componentsTuples) throws IOException { + byte[] bytes = serializedMetadata.getByteArray(); + int offset = serializedMetadata.getStartOffset(); + int length = serializedMetadata.getLength(); + + int writersOffset = offset + IntegerPointable.getInteger(bytes, offset + WRITERS_POINTER); + DataInput input = new DataInputStream(new ByteArrayInputStream(bytes, writersOffset, length)); + + IColumnValuesWriterFactory writerFactory = new ColumnValuesWriterFactory(multiPageOpRef); + List<IColumnValuesWriter> writers = new ArrayList<>(); + FlushColumnMetadata.deserializeWriters(input, writers, writerFactory); + + return new MergeColumnWriteMetadata(datasetType, metaType, numberOfPrimaryKeys, multiPageOpRef, writers, + serializedMetadata, componentsTuples); + } + + public List<IColumnTupleIterator> getComponentsTuples() { + return componentsTuples; + } +} diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/tuple/AbstractAsterixColumnTupleReference.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/tuple/AbstractAsterixColumnTupleReference.java new file mode 100644 index 0000000000..df6b554111 --- /dev/null +++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/tuple/AbstractAsterixColumnTupleReference.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.column.tuple; + +import org.apache.asterix.column.assembler.value.IValueGetter; +import org.apache.asterix.column.assembler.value.ValueGetterFactory; +import org.apache.asterix.column.bytes.stream.in.AbstractBytesInputStream; +import org.apache.asterix.column.bytes.stream.in.ByteBufferInputStream; +import org.apache.asterix.column.bytes.stream.in.MultiByteBufferInputStream; +import org.apache.asterix.column.values.IColumnValuesReader; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.data.std.primitive.VoidPointable; +import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnBufferProvider; +import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnReadMultiPageOp; +import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnTupleIterator; +import org.apache.hyracks.storage.am.lsm.btree.column.api.projection.IColumnProjectionInfo; +import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.ColumnBTreeReadLeafFrame; +import org.apache.hyracks.storage.am.lsm.btree.column.impls.lsm.tuples.AbstractColumnTupleReference; + +public abstract class AbstractAsterixColumnTupleReference extends AbstractColumnTupleReference { + private final IValueGetter[] primaryKeysValueGetters; + protected final ByteBufferInputStream[] primaryKeyStreams; + protected final IColumnValuesReader[] primaryKeyReaders; + protected final VoidPointable[] primaryKeys; + protected final AbstractBytesInputStream[] columnStreams; + + protected AbstractAsterixColumnTupleReference(int componentIndex, ColumnBTreeReadLeafFrame frame, + IColumnProjectionInfo info, IColumnReadMultiPageOp multiPageOp) { + super(componentIndex, frame, info, multiPageOp); + primaryKeyReaders = getPrimaryKeyReaders(info); + int numberOfPrimaryKeys = primaryKeyReaders.length; + + this.primaryKeyStreams = new ByteBufferInputStream[numberOfPrimaryKeys]; + primaryKeysValueGetters = new IValueGetter[numberOfPrimaryKeys]; + primaryKeys = new VoidPointable[numberOfPrimaryKeys]; + + for (int i = 0; i < numberOfPrimaryKeys; i++) { + primaryKeyStreams[i] = new ByteBufferInputStream(); + primaryKeysValueGetters[i] = + ValueGetterFactory.INSTANCE.createValueGetter(primaryKeyReaders[i].getTypeTag()); + primaryKeys[i] = new VoidPointable(); + } + + this.columnStreams = new AbstractBytesInputStream[info.getNumberOfProjectedColumns()]; + for (int i = 0; i < columnStreams.length; i++) { + if (info.getColumnIndex(i) >= numberOfPrimaryKeys) { + columnStreams[i] = new MultiByteBufferInputStream(); + } else { + columnStreams[i] = new ByteBufferInputStream(); + } + } + } + + protected abstract IColumnValuesReader[] getPrimaryKeyReaders(IColumnProjectionInfo info); + + @Override + protected final void startPrimaryKey(IColumnBufferProvider provider, int startIndex, int ordinal, + int numberOfTuples) throws HyracksDataException { + ByteBufferInputStream primaryKeyStream = primaryKeyStreams[ordinal]; + primaryKeyStream.reset(provider); + IColumnValuesReader reader = primaryKeyReaders[ordinal]; + reader.reset(primaryKeyStream, numberOfTuples); + reader.skip(startIndex); + } + + @Override + protected final void onNext() throws HyracksDataException { + for (int i = 0; i < primaryKeys.length; i++) { + IColumnValuesReader reader = primaryKeyReaders[i]; + reader.next(); + primaryKeys[i].set(primaryKeysValueGetters[i].getValue(reader)); + } + } + + @Override + public void lastTupleReached() throws HyracksDataException { + //Default: noOp + } + + @Override + public final int getFieldCount() { + return primaryKeys.length; + } + + @Override + public final byte[] getFieldData(int fIdx) { + return primaryKeys[fIdx].getByteArray(); + } + + @Override + public final int getFieldStart(int fIdx) { + return primaryKeys[fIdx].getStartOffset(); + } + + @Override + public final int getFieldLength(int fIdx) { + return primaryKeys[fIdx].getLength(); + } + + @Override + public final int getTupleSize() { + return -1; + } + + @Override + public final boolean isAntimatter() { + /* + * The primary key cannot be missing, but the actual tuple is missing. There is no need to check other + * primary key readers (for composite primary keys). One primary key reader is sufficient to determine if a + * tuple is an anti-matter tuple. + */ + return primaryKeyReaders[0].isMissing(); + } + + @Override + public final int compareTo(IColumnTupleIterator o) { + AbstractAsterixColumnTupleReference other = (AbstractAsterixColumnTupleReference) o; + int compare = 0; + for (int i = 0; i < primaryKeys.length && compare == 0; i++) { + compare = primaryKeyReaders[i].compareTo(other.primaryKeyReaders[i]); + } + return compare; + } +} diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/tuple/MergeColumnTupleReference.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/tuple/MergeColumnTupleReference.java new file mode 100644 index 0000000000..c10d41550c --- /dev/null +++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/tuple/MergeColumnTupleReference.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.column.tuple; + +import java.nio.ByteBuffer; + +import org.apache.asterix.column.bytes.stream.in.MultiByteBufferInputStream; +import org.apache.asterix.column.operation.lsm.merge.IEndOfPageCallBack; +import org.apache.asterix.column.operation.lsm.merge.MergeColumnReadMetadata; +import org.apache.asterix.column.values.IColumnValuesReader; +import org.apache.asterix.column.values.writer.filters.AbstractColumnFilterWriter; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnBufferProvider; +import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnReadMultiPageOp; +import org.apache.hyracks.storage.am.lsm.btree.column.api.projection.IColumnProjectionInfo; +import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.ColumnBTreeReadLeafFrame; + +public final class MergeColumnTupleReference extends AbstractAsterixColumnTupleReference { + private final IColumnValuesReader[] columnReaders; + private int skipCount; + private IEndOfPageCallBack endOfPageCallBack; + + public MergeColumnTupleReference(int componentIndex, ColumnBTreeReadLeafFrame frame, + MergeColumnReadMetadata columnMetadata, IColumnReadMultiPageOp multiPageOp) { + super(componentIndex, frame, columnMetadata, multiPageOp); + this.columnReaders = columnMetadata.getColumnReaders(); + } + + @Override + protected IColumnValuesReader[] getPrimaryKeyReaders(IColumnProjectionInfo info) { + MergeColumnReadMetadata columnMetadata = (MergeColumnReadMetadata) info; + int numberOfPrimaryKeys = columnMetadata.getNumberOfPrimaryKeys(); + IColumnValuesReader[] primaryKeyReaders = new IColumnValuesReader[numberOfPrimaryKeys]; + System.arraycopy(columnMetadata.getColumnReaders(), 0, primaryKeyReaders, 0, numberOfPrimaryKeys); + return primaryKeyReaders; + } + + @Override + protected boolean startNewPage(ByteBuffer pageZero, int numberOfColumns, int numberOfTuples) { + //Skip filters + pageZero.position(pageZero.position() + numberOfColumns * AbstractColumnFilterWriter.FILTER_SIZE); + skipCount = 0; + return true; + } + + @Override + protected void startColumn(IColumnBufferProvider buffersProvider, int startIndex, int ordinal, int numberOfTuples) + throws HyracksDataException { + int numberOfPrimaryKeys = primaryKeys.length; + if (ordinal < numberOfPrimaryKeys) { + //Skip primary key + return; + } + MultiByteBufferInputStream columnStream = (MultiByteBufferInputStream) columnStreams[ordinal]; + columnStream.reset(buffersProvider); + IColumnValuesReader reader = columnReaders[ordinal]; + reader.reset(columnStream, numberOfTuples); + reader.skip(startIndex); + } + + @Override + public void skip(int count) throws HyracksDataException { + skipCount += count; + } + + @Override + public void lastTupleReached() throws HyracksDataException { + endOfPageCallBack.callEnd(this); + } + + public int getAndResetSkipCount() { + int currentSkipCount = skipCount; + skipCount = 0; + return currentSkipCount; + } + + public IColumnValuesReader getReader(int columnIndex) { + return columnReaders[columnIndex]; + } + + public void registerEndOfPageCallBack(IEndOfPageCallBack endOfPageCallBack) { + this.endOfPageCallBack = endOfPageCallBack; + } +} diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/DefaultTupleProjector.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/DefaultTupleProjector.java index 00cb0c5486..c63912bfa8 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/DefaultTupleProjector.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/DefaultTupleProjector.java @@ -32,10 +32,11 @@ class DefaultTupleProjector implements ITupleProjector { } @Override - public void project(ITupleReference tuple, DataOutput dos, ArrayTupleBuilder tb) throws IOException { + public ITupleReference project(ITupleReference tuple, DataOutput dos, ArrayTupleBuilder tb) throws IOException { for (int i = 0; i < tuple.getFieldCount(); i++) { dos.write(tuple.getFieldData(i), tuple.getFieldStart(i), tuple.getFieldLength(i)); tb.addFieldEndOffset(); } + return tuple; } } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/projection/ITupleProjector.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/projection/ITupleProjector.java index 8ca1a82541..ba23e307ef 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/projection/ITupleProjector.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/projection/ITupleProjector.java @@ -25,5 +25,5 @@ import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder; import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; public interface ITupleProjector { - void project(ITupleReference tuple, DataOutput dos, ArrayTupleBuilder tb) throws IOException; + ITupleReference project(ITupleReference tuple, DataOutput dos, ArrayTupleBuilder tb) throws IOException; }