This is an automated email from the ASF dual-hosted git repository. mblow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit f070329ca4dec5cf6a80466f15ce6fe8e3461775 Author: Peeyush Gupta <[email protected]> AuthorDate: Sun Nov 24 21:30:20 2024 -0800 [ASTERIXDB-3532][STO] Account for incoming tuple's new columns - user model changes: no - storage format changes: no - interface changes: no Details: - When calculating if a tuple can fit in page0, account for the additional space that might be required for new columns added by the new tuple. Ext-ref: MB-64337 Change-Id: I4a318ae69538daf385060df26e9a5fd1d2d494f8 Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19125 Integration-Tests: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> Reviewed-by: Ritik Raj <[email protected]> Reviewed-by: Murtadha Hubail <[email protected]> --- .../org/apache/asterix/column/ColumnManager.java | 4 +- .../operation/lsm/flush/FlushColumnMetadata.java | 47 ++--- .../lsm/flush/FlushColumnTupleWriter.java | 48 ++++- .../lsm/flush/NoWriteColumnTransformer.java | 141 ++++++++++++++ .../lsm/flush/NoWriteFlushColumnMetadata.java | 202 +++++++++++++++++++++ .../lsm/merge/MergeColumnTupleWriter.java | 7 +- .../values/writer/NoOpColumnValuesWriter.java | 124 +++++++++++++ .../column/test/bytes/AbstractBytesTest.java | 2 +- .../column/api/AbstractColumnTupleWriter.java | 8 +- .../column/impls/btree/ColumnBTreeBulkloader.java | 3 +- .../impls/btree/ColumnBTreeWriteLeafFrame.java | 2 +- 11 files changed, 553 insertions(+), 35 deletions(-) diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/ColumnManager.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/ColumnManager.java index 6db8a78a15..ea85d8b774 100644 --- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/ColumnManager.java +++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/ColumnManager.java @@ -74,8 +74,8 @@ public final class ColumnManager implements IColumnManager { try { Mutable<IColumnWriteMultiPageOp> multiPageOpRef = new MutableObject<>(); IColumnValuesWriterFactory writerFactory = new ColumnValuesWriterFactory(multiPageOpRef); - return FlushColumnMetadata.create(datasetType, metaType, primaryKeys, keySourceIndicator, writerFactory, - multiPageOpRef, metadata); + return FlushColumnMetadata.create(datasetType, metaType, primaryKeys.size(), keySourceIndicator, + writerFactory, multiPageOpRef, metadata); } catch (IOException e) { throw HyracksDataException.create(e); } diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnMetadata.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnMetadata.java index f5146386d0..337e569072 100644 --- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnMetadata.java +++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnMetadata.java @@ -70,22 +70,22 @@ import it.unimi.dsi.fastutil.ints.IntArrayList; * Flush column metadata belongs to a flushing {@link ILSMMemoryComponent} * The schema here is mutable and can change according to the flushed records */ -public final class FlushColumnMetadata extends AbstractColumnMetadata { +public class FlushColumnMetadata extends AbstractColumnMetadata { private static final Logger LOGGER = LogManager.getLogger(); - private final Map<AbstractSchemaNestedNode, RunLengthIntArray> definitionLevels; + protected final Map<AbstractSchemaNestedNode, RunLengthIntArray> definitionLevels; private final Mutable<IColumnWriteMultiPageOp> multiPageOpRef; private final IFieldNamesDictionary fieldNamesDictionary; private final ObjectSchemaNode root; private final ObjectSchemaNode metaRoot; private final IColumnValuesWriterFactory columnWriterFactory; - private final List<IColumnValuesWriter> columnWriters; + protected final List<IColumnValuesWriter> columnWriters; private final ArrayBackedValueStorage serializedMetadata; private final PathInfoSerializer pathInfoSerializer; - private final IntArrayList nullWriterIndexes; + protected final IntArrayList nullWriterIndexes; private final boolean metaContainsKeys; private boolean changed; - private int level; - private int repeated; + protected int level; + protected int repeated; public FlushColumnMetadata(ARecordType datasetType, ARecordType metaType, List<List<String>> primaryKeys, List<Integer> keySourceIndicator, IColumnValuesWriterFactory columnWriterFactory, @@ -124,13 +124,13 @@ public final class FlushColumnMetadata extends AbstractColumnMetadata { serializeColumnsMetadata(); } - private FlushColumnMetadata(ARecordType datasetType, ARecordType metaType, List<List<String>> primaryKeys, + public FlushColumnMetadata(ARecordType datasetType, ARecordType metaType, int numPrimaryKeys, boolean metaContainsKeys, IColumnValuesWriterFactory columnWriterFactory, Mutable<IColumnWriteMultiPageOp> multiPageOpRef, List<IColumnValuesWriter> columnWriters, IFieldNamesDictionary fieldNamesDictionary, ObjectSchemaNode root, ObjectSchemaNode metaRoot, Map<AbstractSchemaNestedNode, RunLengthIntArray> definitionLevels, ArrayBackedValueStorage serializedMetadata) { - super(datasetType, metaType, primaryKeys.size()); + super(datasetType, metaType, numPrimaryKeys); this.multiPageOpRef = multiPageOpRef; this.columnWriterFactory = columnWriterFactory; this.definitionLevels = definitionLevels; @@ -226,21 +226,21 @@ public final class FlushColumnMetadata extends AbstractColumnMetadata { IntegerPointable.setInteger(serializedMetadata.getByteArray(), pointer, offset); } - public static FlushColumnMetadata create(ARecordType datasetType, ARecordType metaType, - List<List<String>> primaryKeys, List<Integer> keySourceIndicator, - IColumnValuesWriterFactory columnWriterFactory, Mutable<IColumnWriteMultiPageOp> multiPageOpRef, - IValueReference serializedMetadata) throws HyracksDataException { + public static FlushColumnMetadata create(ARecordType datasetType, ARecordType metaType, int numPrimaryKeys, + List<Integer> keySourceIndicator, IColumnValuesWriterFactory columnWriterFactory, + Mutable<IColumnWriteMultiPageOp> multiPageOpRef, IValueReference serializedMetadata) + throws HyracksDataException { boolean metaContainsKeys = metaType != null && keySourceIndicator.get(0) == 1; try { - return createMutableMetadata(datasetType, metaType, primaryKeys, metaContainsKeys, columnWriterFactory, + return createMutableMetadata(datasetType, metaType, numPrimaryKeys, metaContainsKeys, columnWriterFactory, multiPageOpRef, serializedMetadata); } catch (IOException e) { throw HyracksDataException.create(e); } } - private static FlushColumnMetadata createMutableMetadata(ARecordType datasetType, ARecordType metaType, - List<List<String>> primaryKeys, boolean metaContainsKeys, IColumnValuesWriterFactory columnWriterFactory, + public static FlushColumnMetadata createMutableMetadata(ARecordType datasetType, ARecordType metaType, + int numPrimaryKeys, boolean metaContainsKeys, IColumnValuesWriterFactory columnWriterFactory, Mutable<IColumnWriteMultiPageOp> multiPageOpRef, IValueReference serializedMetadata) throws IOException { DataInput input = new DataInputStream(new ByteArrayInputStream(serializedMetadata.getByteArray(), serializedMetadata.getStartOffset(), serializedMetadata.getLength())); @@ -265,7 +265,7 @@ public final class FlushColumnMetadata extends AbstractColumnMetadata { ArrayBackedValueStorage schemaStorage = new ArrayBackedValueStorage(serializedMetadata.getLength()); schemaStorage.append(serializedMetadata); logSchema(root, metaRoot, fieldNamesDictionary); - return new FlushColumnMetadata(datasetType, metaType, primaryKeys, metaContainsKeys, columnWriterFactory, + return new FlushColumnMetadata(datasetType, metaType, numPrimaryKeys, metaContainsKeys, columnWriterFactory, multiPageOpRef, writers, fieldNamesDictionary, root, metaRoot, definitionLevels, schemaStorage); } @@ -443,7 +443,7 @@ public final class FlushColumnMetadata extends AbstractColumnMetadata { } } - private void flushDefinitionLevels(int parentMask, int childMask, RunLengthIntArray parentDefLevels, + protected void flushDefinitionLevels(int parentMask, int childMask, RunLengthIntArray parentDefLevels, AbstractSchemaNode node) throws HyracksDataException { int startIndex = node.getCounter(); if (node.isNested()) { @@ -501,7 +501,7 @@ public final class FlushColumnMetadata extends AbstractColumnMetadata { } } - private AbstractSchemaNode createChild(AbstractSchemaNode child, ATypeTag childTypeTag) + protected AbstractSchemaNode createChild(AbstractSchemaNode child, ATypeTag childTypeTag) throws HyracksDataException { AbstractSchemaNode createdChild; ATypeTag normalizedTypeTag = getNormalizedTypeTag(childTypeTag); @@ -525,7 +525,7 @@ public final class FlushColumnMetadata extends AbstractColumnMetadata { return createdChild; } - private AbstractSchemaNode createChild(ATypeTag childTypeTag) throws HyracksDataException { + protected AbstractSchemaNode createChild(ATypeTag childTypeTag) throws HyracksDataException { switch (childTypeTag) { case OBJECT: return addDefinitionLevelsAndGet(new ObjectSchemaNode()); @@ -563,7 +563,7 @@ public final class FlushColumnMetadata extends AbstractColumnMetadata { } } - private void addColumn(int index, IColumnValuesWriter writer) { + protected void addColumn(int index, IColumnValuesWriter writer) { if (index == columnWriters.size()) { columnWriters.add(writer); } else { @@ -571,7 +571,7 @@ public final class FlushColumnMetadata extends AbstractColumnMetadata { } } - private AbstractSchemaNode addDefinitionLevelsAndGet(AbstractSchemaNestedNode nestedNode) { + protected AbstractSchemaNode addDefinitionLevelsAndGet(AbstractSchemaNestedNode nestedNode) { definitionLevels.put(nestedNode, new RunLengthIntArray()); return nestedNode; } @@ -590,4 +590,9 @@ public final class FlushColumnMetadata extends AbstractColumnMetadata { LOGGER.debug("Schema for {} has changed: {}", META_RECORD_SCHEMA, metaRecordSchema); } } + + public boolean isMetaContainsKey() { + return metaContainsKeys; + } + } diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleWriter.java index 65f5eb44cf..36dd6bce23 100644 --- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleWriter.java +++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleWriter.java @@ -18,13 +18,18 @@ */ package org.apache.asterix.column.operation.lsm.flush; +import java.io.IOException; import java.nio.ByteBuffer; import org.apache.asterix.column.values.IColumnValuesWriter; +import org.apache.asterix.column.values.IColumnValuesWriterFactory; import org.apache.asterix.column.values.writer.ColumnBatchWriter; +import org.apache.asterix.column.values.writer.ColumnValuesWriterFactory; import org.apache.asterix.column.values.writer.filters.AbstractColumnFilterWriter; import org.apache.asterix.om.lazy.RecordLazyVisitablePointable; import org.apache.asterix.om.lazy.TypedRecordLazyVisitablePointable; +import org.apache.commons.lang3.mutable.Mutable; +import org.apache.commons.lang3.mutable.MutableObject; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; import org.apache.hyracks.storage.am.lsm.btree.column.api.AbstractColumnTupleWriter; @@ -34,10 +39,13 @@ import org.apache.hyracks.storage.am.lsm.btree.tuples.LSMBTreeTupleReference; public class FlushColumnTupleWriter extends AbstractColumnTupleWriter { protected final FlushColumnMetadata columnMetadata; + protected final NoWriteFlushColumnMetadata columnMetadataWithCurrentTuple; + protected final BatchFinalizerVisitor finalizer; protected final ColumnBatchWriter writer; private final ColumnTransformer transformer; + private final NoWriteColumnTransformer transformerForCurrentTuple; private final RecordLazyVisitablePointable pointable; private final int maxNumberOfTuples; private final IColumnValuesWriter[] primaryKeyWriters; @@ -60,6 +68,19 @@ public class FlushColumnTupleWriter extends AbstractColumnTupleWriter { for (int i = 0; i < numberOfPrimaryKeys; i++) { primaryKeyWriters[i] = columnMetadata.getWriter(i); } + + Mutable<IColumnWriteMultiPageOp> multiPageOpRef = new MutableObject<>(); + IColumnValuesWriterFactory writerFactory = new ColumnValuesWriterFactory(multiPageOpRef); + try { + columnMetadataWithCurrentTuple = NoWriteFlushColumnMetadata.createMutableMetadata( + columnMetadata.getDatasetType(), columnMetadata.getMetaType(), + columnMetadata.getNumberOfPrimaryKeys(), columnMetadata.isMetaContainsKey(), writerFactory, + multiPageOpRef, columnMetadata.serializeColumnsMetadata()); + } catch (IOException e) { + throw new RuntimeException(e); + } + transformerForCurrentTuple = + new NoWriteColumnTransformer(columnMetadataWithCurrentTuple, columnMetadataWithCurrentTuple.getRoot()); } @Override @@ -68,8 +89,12 @@ public class FlushColumnTupleWriter extends AbstractColumnTupleWriter { } @Override - public final int getNumberOfColumns() { - return columnMetadata.getNumberOfColumns(); + public final int getNumberOfColumns(boolean includeCurrentTupleColumns) { + if (includeCurrentTupleColumns) { + return columnMetadataWithCurrentTuple.getNumberOfColumns(); + } else { + return columnMetadata.getNumberOfColumns(); + } } @Override @@ -85,7 +110,7 @@ public class FlushColumnTupleWriter extends AbstractColumnTupleWriter { @Override public final int getOccupiedSpace() { - int numberOfColumns = getNumberOfColumns(); + int numberOfColumns = getNumberOfColumns(true); int filterSize = numberOfColumns * AbstractColumnFilterWriter.FILTER_SIZE; return primaryKeysEstimatedSize + filterSize; } @@ -109,6 +134,21 @@ public class FlushColumnTupleWriter extends AbstractColumnTupleWriter { writer.close(); } + public void updateColumnMetadataForCurrentTuple(ITupleReference tuple) throws HyracksDataException { + // Execution can reach here in case of Load statements + // and the type of tuple in that case is PermutingFrameTupleReference + if (tuple instanceof LSMBTreeTupleReference) { + LSMBTreeTupleReference btreeTuple = (LSMBTreeTupleReference) tuple; + if (btreeTuple.isAntimatter()) { + return; + } + } + int recordFieldId = columnMetadata.getRecordFieldIndex(); + pointable.set(tuple.getFieldData(recordFieldId), tuple.getFieldStart(recordFieldId), + tuple.getFieldLength(recordFieldId)); + transformerForCurrentTuple.transform(pointable); + } + @Override public void writeTuple(ITupleReference tuple) throws HyracksDataException { //This from an in-memory component, hence the cast @@ -124,7 +164,7 @@ public class FlushColumnTupleWriter extends AbstractColumnTupleWriter { @Override public final int flush(ByteBuffer pageZero) throws HyracksDataException { - writer.setPageZeroBuffer(pageZero, getNumberOfColumns(), columnMetadata.getNumberOfPrimaryKeys()); + writer.setPageZeroBuffer(pageZero, getNumberOfColumns(false), columnMetadata.getNumberOfPrimaryKeys()); transformer.resetStringLengths(); return finalizer.finalizeBatch(writer, columnMetadata); } diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/NoWriteColumnTransformer.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/NoWriteColumnTransformer.java new file mode 100644 index 0000000000..5efab0e175 --- /dev/null +++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/NoWriteColumnTransformer.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.column.operation.lsm.flush; + +import org.apache.asterix.column.metadata.schema.AbstractSchemaNestedNode; +import org.apache.asterix.column.metadata.schema.AbstractSchemaNode; +import org.apache.asterix.column.metadata.schema.ObjectSchemaNode; +import org.apache.asterix.column.metadata.schema.UnionSchemaNode; +import org.apache.asterix.column.metadata.schema.collection.AbstractCollectionSchemaNode; +import org.apache.asterix.om.lazy.AbstractLazyVisitablePointable; +import org.apache.asterix.om.lazy.AbstractListLazyVisitablePointable; +import org.apache.asterix.om.lazy.FlatLazyVisitablePointable; +import org.apache.asterix.om.lazy.ILazyVisitablePointableVisitor; +import org.apache.asterix.om.lazy.RecordLazyVisitablePointable; +import org.apache.asterix.om.types.ATypeTag; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.data.std.api.IValueReference; + +public class NoWriteColumnTransformer + implements ILazyVisitablePointableVisitor<AbstractSchemaNode, AbstractSchemaNode> { + + private final NoWriteFlushColumnMetadata columnMetadata; + private final ObjectSchemaNode root; + private AbstractSchemaNestedNode currentParent; + + public NoWriteColumnTransformer(NoWriteFlushColumnMetadata columnMetadata, ObjectSchemaNode root) { + this.columnMetadata = columnMetadata; + this.root = root; + } + + /** + * Transform a tuple in row format into columns + * + * @param pointable record pointable + * @return the estimated size (possibly overestimated) of the primary key(s) columns + */ + public int transform(RecordLazyVisitablePointable pointable) throws HyracksDataException { + pointable.accept(this, root); + return 0; + } + + @Override + public AbstractSchemaNode visit(RecordLazyVisitablePointable pointable, AbstractSchemaNode arg) + throws HyracksDataException { + AbstractSchemaNestedNode previousParent = currentParent; + + ObjectSchemaNode objectNode = (ObjectSchemaNode) arg; + currentParent = objectNode; + for (int i = 0; i < pointable.getNumberOfChildren(); i++) { + pointable.nextChild(); + IValueReference fieldName = pointable.getFieldName(); + ATypeTag childTypeTag = pointable.getChildTypeTag(); + if (childTypeTag != ATypeTag.MISSING) { + AbstractSchemaNode childNode = objectNode.getOrCreateChild(fieldName, childTypeTag, columnMetadata); + acceptActualNode(pointable.getChildVisitablePointable(), childNode); + } + } + + if (pointable.getNumberOfChildren() == 0) { + // Set as empty object + objectNode.setEmptyObject(columnMetadata); + } + + currentParent = previousParent; + return null; + } + + @Override + public AbstractSchemaNode visit(AbstractListLazyVisitablePointable pointable, AbstractSchemaNode arg) + throws HyracksDataException { + AbstractSchemaNestedNode previousParent = currentParent; + + AbstractCollectionSchemaNode collectionNode = (AbstractCollectionSchemaNode) arg; + currentParent = collectionNode; + + int numberOfChildren = pointable.getNumberOfChildren(); + for (int i = 0; i < numberOfChildren; i++) { + pointable.nextChild(); + ATypeTag childTypeTag = pointable.getChildTypeTag(); + AbstractSchemaNode childNode = collectionNode.getOrCreateItem(childTypeTag, columnMetadata); + acceptActualNode(pointable.getChildVisitablePointable(), childNode); + } + + // Add missing as a last element of the array to help indicate empty arrays + collectionNode.getOrCreateItem(ATypeTag.MISSING, columnMetadata); + currentParent = previousParent; + return null; + } + + @Override + public AbstractSchemaNode visit(FlatLazyVisitablePointable pointable, AbstractSchemaNode arg) + throws HyracksDataException { + return null; + } + + private void acceptActualNode(AbstractLazyVisitablePointable pointable, AbstractSchemaNode node) + throws HyracksDataException { + if (node.getTypeTag() == ATypeTag.UNION) { + AbstractSchemaNestedNode previousParent = currentParent; + + UnionSchemaNode unionNode = (UnionSchemaNode) node; + currentParent = unionNode; + + ATypeTag childTypeTag = pointable.getTypeTag(); + + if (childTypeTag == ATypeTag.NULL || childTypeTag == ATypeTag.MISSING) { + /* + * NULL and MISSING are tracked since the start to be written in the originalType (i.e., the type + * before injecting a union between the parent and the original node). + */ + AbstractSchemaNode actualNode = unionNode.getOriginalType(); + acceptActualNode(pointable, actualNode); + } else { + AbstractSchemaNode actualNode = unionNode.getOrCreateChild(pointable.getTypeTag(), columnMetadata); + pointable.accept(this, actualNode); + } + + currentParent = previousParent; + } else if (pointable.getTypeTag() == ATypeTag.NULL && node.isNested()) { + columnMetadata.addNestedNull(currentParent, (AbstractSchemaNestedNode) node); + } else { + pointable.accept(this, node); + } + } +} \ No newline at end of file diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/NoWriteFlushColumnMetadata.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/NoWriteFlushColumnMetadata.java new file mode 100644 index 0000000000..c988a1136f --- /dev/null +++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/NoWriteFlushColumnMetadata.java @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.column.operation.lsm.flush; + +import static org.apache.asterix.column.util.ColumnValuesUtil.getNormalizedTypeTag; + +import java.io.ByteArrayInputStream; +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.asterix.column.metadata.IFieldNamesDictionary; +import org.apache.asterix.column.metadata.dictionary.AbstractFieldNamesDictionary; +import org.apache.asterix.column.metadata.schema.AbstractSchemaNestedNode; +import org.apache.asterix.column.metadata.schema.AbstractSchemaNode; +import org.apache.asterix.column.metadata.schema.ObjectSchemaNode; +import org.apache.asterix.column.metadata.schema.UnionSchemaNode; +import org.apache.asterix.column.metadata.schema.collection.AbstractCollectionSchemaNode; +import org.apache.asterix.column.metadata.schema.collection.ArraySchemaNode; +import org.apache.asterix.column.metadata.schema.collection.MultisetSchemaNode; +import org.apache.asterix.column.metadata.schema.primitive.PrimitiveSchemaNode; +import org.apache.asterix.column.util.RunLengthIntArray; +import org.apache.asterix.column.values.IColumnValuesWriter; +import org.apache.asterix.column.values.IColumnValuesWriterFactory; +import org.apache.asterix.column.values.writer.NoOpColumnValuesWriter; +import org.apache.asterix.om.types.ARecordType; +import org.apache.asterix.om.types.ATypeTag; +import org.apache.commons.lang3.mutable.Mutable; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.data.std.api.IValueReference; +import org.apache.hyracks.data.std.util.ArrayBackedValueStorage; +import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnWriteMultiPageOp; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent; + +/** + * Flush column metadata belongs to a flushing {@link ILSMMemoryComponent} + * The schema here is mutable and can change according to the flushed records + */ +public final class NoWriteFlushColumnMetadata extends FlushColumnMetadata { + + private int numColumns; + + public NoWriteFlushColumnMetadata(ARecordType datasetType, ARecordType metaType, int numPrimaryKeys, + boolean metaContainsKeys, IColumnValuesWriterFactory columnWriterFactory, + Mutable<IColumnWriteMultiPageOp> multiPageOpRef, List<IColumnValuesWriter> writers, + IFieldNamesDictionary fieldNamesDictionary, ObjectSchemaNode root, ObjectSchemaNode metaRoot, + Map<AbstractSchemaNestedNode, RunLengthIntArray> definitionLevels, ArrayBackedValueStorage schemaStorage) { + super(datasetType, metaType, numPrimaryKeys, metaContainsKeys, columnWriterFactory, multiPageOpRef, writers, + fieldNamesDictionary, root, metaRoot, definitionLevels, schemaStorage); + numColumns = 0; + } + + public static NoWriteFlushColumnMetadata createMutableMetadata(ARecordType datasetType, ARecordType metaType, + int numPrimaryKeys, boolean metaContainsKeys, IColumnValuesWriterFactory columnWriterFactory, + Mutable<IColumnWriteMultiPageOp> multiPageOpRef, IValueReference serializedMetadata) throws IOException { + DataInput input = new DataInputStream(new ByteArrayInputStream(serializedMetadata.getByteArray(), + serializedMetadata.getStartOffset(), serializedMetadata.getLength())); + //Skip offsets + input.skipBytes(OFFSETS_SIZE); + + //ColumnWriter + List<IColumnValuesWriter> writers = new ArrayList<>(); + deserializeWriters(input, writers, columnWriterFactory); + + //FieldNames + IFieldNamesDictionary fieldNamesDictionary = AbstractFieldNamesDictionary.deserialize(input); + + //Schema + Map<AbstractSchemaNestedNode, RunLengthIntArray> definitionLevels = new HashMap<>(); + ObjectSchemaNode root = (ObjectSchemaNode) AbstractSchemaNode.deserialize(input, definitionLevels); + ObjectSchemaNode metaRoot = null; + if (metaType != null) { + metaRoot = (ObjectSchemaNode) AbstractSchemaNode.deserialize(input, definitionLevels); + } + + ArrayBackedValueStorage schemaStorage = new ArrayBackedValueStorage(serializedMetadata.getLength()); + schemaStorage.append(serializedMetadata); + return new NoWriteFlushColumnMetadata(datasetType, metaType, numPrimaryKeys, metaContainsKeys, + columnWriterFactory, multiPageOpRef, writers, fieldNamesDictionary, root, metaRoot, definitionLevels, + schemaStorage); + } + + public void close() { + } + + @Override + public void flushDefinitionLevels(int level, AbstractSchemaNestedNode parent, AbstractSchemaNode node) + throws HyracksDataException { + //NoOp + } + + @Override + protected void flushDefinitionLevels(int parentMask, int childMask, RunLengthIntArray parentDefLevels, + AbstractSchemaNode node) throws HyracksDataException { + //NoOp + } + + @Override + public void enterLevel(AbstractSchemaNestedNode node) { + //NoOp + } + + @Override + public void exitNode(AbstractSchemaNode node) { + //NoOp + } + + @Override + public void exitLevel(AbstractSchemaNestedNode node) { + //NoOp + } + + @Override + public void exitCollectionNode(AbstractCollectionSchemaNode collectionNode, int numberOfItems) { + //NoOp + } + + @Override + public void addNestedNull(AbstractSchemaNestedNode parent, AbstractSchemaNestedNode node) + throws HyracksDataException { + //NoOp + } + + @Override + protected AbstractSchemaNode createChild(AbstractSchemaNode child, ATypeTag childTypeTag) + throws HyracksDataException { + AbstractSchemaNode createdChild; + ATypeTag normalizedTypeTag = getNormalizedTypeTag(childTypeTag); + if (child != null) { + if (child.getTypeTag() == ATypeTag.NULL) { + int columnIndex = ((PrimitiveSchemaNode) child).getColumnIndex(); + nullWriterIndexes.add(columnIndex); + createdChild = createChild(normalizedTypeTag); + } else { + createdChild = addDefinitionLevelsAndGet(new UnionSchemaNode(child, createChild(normalizedTypeTag))); + } + } else { + createdChild = createChild(normalizedTypeTag); + } + return createdChild; + } + + @Override + protected AbstractSchemaNode createChild(ATypeTag childTypeTag) throws HyracksDataException { + switch (childTypeTag) { + case OBJECT: + return addDefinitionLevelsAndGet(new ObjectSchemaNode()); + case ARRAY: + return addDefinitionLevelsAndGet(new ArraySchemaNode()); + case MULTISET: + return addDefinitionLevelsAndGet(new MultisetSchemaNode()); + case NULL: + case MISSING: + case BOOLEAN: + case FLOAT: + case DOUBLE: + case TINYINT: + case SMALLINT: + case INTEGER: + case BIGINT: + case STRING: + case UUID: + int columnIndex = nullWriterIndexes.isEmpty() ? columnWriters.size() : nullWriterIndexes.removeInt(0); + boolean primaryKey = columnIndex < getNumberOfPrimaryKeys(); + ATypeTag normalizedTypeTag = primaryKey ? childTypeTag : getNormalizedTypeTag(childTypeTag); + if (columnIndex == numColumns) { + numColumns++; + } + IColumnValuesWriter writer = NoOpColumnValuesWriter.INSTANCE; + addColumn(columnIndex, writer); + return new PrimitiveSchemaNode(columnIndex, normalizedTypeTag, primaryKey); + default: + throw new IllegalStateException("Unsupported type " + childTypeTag); + + } + } + + @Override + protected AbstractSchemaNode addDefinitionLevelsAndGet(AbstractSchemaNestedNode nestedNode) { + return nestedNode; + } +} \ No newline at end of file diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleWriter.java index 6ff2cd9468..fc7b85972a 100644 --- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleWriter.java +++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleWriter.java @@ -95,7 +95,7 @@ public class MergeColumnTupleWriter extends AbstractColumnTupleWriter { } @Override - public int getNumberOfColumns() { + public int getNumberOfColumns(boolean includeCurrentTupleColumns) { return columnMetadata.getNumberOfColumns(); } @@ -106,7 +106,7 @@ public class MergeColumnTupleWriter extends AbstractColumnTupleWriter { @Override public int getOccupiedSpace() { - int numberOfColumns = getNumberOfColumns(); + int numberOfColumns = getNumberOfColumns(true); int filterSize = numberOfColumns * AbstractColumnFilterWriter.FILTER_SIZE; return primaryKeysEstimatedSize + filterSize; } @@ -251,4 +251,7 @@ public class MergeColumnTupleWriter extends AbstractColumnTupleWriter { node.put("componentIndex", componentIndex); node.put("count", count); } + + public void updateColumnMetadataForCurrentTuple(ITupleReference tuple) throws HyracksDataException { + } } diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/NoOpColumnValuesWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/NoOpColumnValuesWriter.java new file mode 100644 index 0000000000..e774e9c00f --- /dev/null +++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/NoOpColumnValuesWriter.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.column.values.writer; + +import java.io.DataOutput; +import java.io.IOException; +import java.io.OutputStream; + +import org.apache.asterix.column.util.RunLengthIntArray; +import org.apache.asterix.column.values.IColumnValuesReader; +import org.apache.asterix.column.values.IColumnValuesWriter; +import org.apache.asterix.om.types.ATypeTag; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.data.std.api.IValueReference; + +public class NoOpColumnValuesWriter implements IColumnValuesWriter { + public static NoOpColumnValuesWriter INSTANCE = new NoOpColumnValuesWriter(); + + @Override + public void reset() throws HyracksDataException { + + } + + @Override + public int getColumnIndex() { + return 0; + } + + @Override + public void writeValue(ATypeTag tag, IValueReference value) throws HyracksDataException { + + } + + @Override + public void writeAntiMatter(ATypeTag tag, IValueReference value) throws HyracksDataException { + + } + + @Override + public void writeLevel(int level) throws HyracksDataException { + + } + + @Override + public void writeLevels(int level, int count) throws HyracksDataException { + + } + + @Override + public RunLengthIntArray getDefinitionLevelsIntArray() { + return null; + } + + @Override + public void writeNull(int level) throws HyracksDataException { + + } + + @Override + public void writeValue(IColumnValuesReader reader) throws HyracksDataException { + + } + + @Override + public int getEstimatedSize() { + return 0; + } + + @Override + public int getEstimatedSize(int length) { + return 0; + } + + @Override + public int getAllocatedSpace() { + return 0; + } + + @Override + public int getCount() { + return 0; + } + + @Override + public long getNormalizedMinValue() { + return 0; + } + + @Override + public long getNormalizedMaxValue() { + return 0; + } + + @Override + public void flush(OutputStream out) throws HyracksDataException { + + } + + @Override + public void close() { + + } + + @Override + public void serialize(DataOutput output) throws IOException { + + } +} diff --git a/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/bytes/AbstractBytesTest.java b/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/bytes/AbstractBytesTest.java index 55097c5d3e..8fd9f6b890 100644 --- a/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/bytes/AbstractBytesTest.java +++ b/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/bytes/AbstractBytesTest.java @@ -193,7 +193,7 @@ public abstract class AbstractBytesTest extends TestBase { pageZero.position(HEADER_SIZE); pageZero.putInt(MEGA_LEAF_NODE_LENGTH, writer.flush(pageZero)); //Write page header - int numberOfColumn = writer.getNumberOfColumns(); + int numberOfColumn = writer.getNumberOfColumns(false); pageZero.putInt(TUPLE_COUNT_OFFSET, tupleCount); pageZero.putInt(NUMBER_OF_COLUMNS_OFFSET, numberOfColumn); diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/AbstractColumnTupleWriter.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/AbstractColumnTupleWriter.java index 6d26a45d4e..92b3892b68 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/AbstractColumnTupleWriter.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/AbstractColumnTupleWriter.java @@ -49,10 +49,12 @@ public abstract class AbstractColumnTupleWriter extends AbstractTupleWriterDisab */ public abstract void init(IColumnWriteMultiPageOp multiPageOp) throws HyracksDataException; + public abstract void updateColumnMetadataForCurrentTuple(ITupleReference tuple) throws HyracksDataException; + /** - * @return The current number of columns + * @return The current number of columns including the current tuple */ - public abstract int getNumberOfColumns(); + public abstract int getNumberOfColumns(boolean includeCurrentTupleColumns); /** * Currently, a column offset takes 4-byte (fixed). But in the future, we can reformat the offsets. For example, @@ -61,7 +63,7 @@ public abstract class AbstractColumnTupleWriter extends AbstractTupleWriterDisab * @return the size needed to store columns' offsets */ public final int getColumnOffsetsSize() { - return Integer.BYTES * getNumberOfColumns(); + return Integer.BYTES * getNumberOfColumns(true); } /** diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeBulkloader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeBulkloader.java index d9aec0c63b..ee61fa6f8e 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeBulkloader.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeBulkloader.java @@ -99,7 +99,7 @@ public final class ColumnBTreeBulkloader extends BTreeNSMBulkLoader implements I return tupleWriter.createTupleReference(); } - private boolean isFull(ITupleReference tuple) { + private boolean isFull(ITupleReference tuple) throws HyracksDataException { if (tupleCount == 0) { return false; } else if (tupleCount >= columnWriter.getMaxNumberOfTuples()) { @@ -108,6 +108,7 @@ public final class ColumnBTreeBulkloader extends BTreeNSMBulkLoader implements I } int requiredFreeSpace = AbstractColumnBTreeLeafFrame.HEADER_SIZE; //Columns' Offsets + columnWriter.updateColumnMetadataForCurrentTuple(tuple); requiredFreeSpace += columnWriter.getColumnOffsetsSize(); //Occupied space from previous writes requiredFreeSpace += columnWriter.getOccupiedSpace(); diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeWriteLeafFrame.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeWriteLeafFrame.java index c725084aef..bced7efd10 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeWriteLeafFrame.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeWriteLeafFrame.java @@ -60,7 +60,7 @@ public class ColumnBTreeWriteLeafFrame extends AbstractColumnBTreeLeafFrame { rowTupleWriter.writeTuple(maxKey, buf.array(), offset); // Write page information - int numberOfColumns = columnWriter.getNumberOfColumns(); + int numberOfColumns = columnWriter.getNumberOfColumns(false); buf.putInt(TUPLE_COUNT_OFFSET, numberOfTuples); buf.putInt(NUMBER_OF_COLUMNS_OFFSET, numberOfColumns); buf.putInt(SIZE_OF_COLUMNS_OFFSETS_OFFSET, columnWriter.getColumnOffsetsSize());
