This is an automated email from the ASF dual-hosted git repository. mblow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit 1cffa2bc98bf851dfe2663190e1032cafc9df09d Author: Ritik Raj <[email protected]> AuthorDate: Thu Dec 12 20:29:41 2024 +0530 [ASTERIXDB-3534][STO] Fixed Reading definitionLevel for column having nulls - user model changes: no - storage format changes: no - interface changes: no Details: - In cases where a column consists of both null values and nonNull values, there can be cases where in a single batch, one leafNode is written with all nulls while the other leafNode may contains same column with different type, where the "level" of the field can be different for null and non-null field. This change of level can cause the ParquetRunLengthBitPackingHybridDecoder to read more than required for pulling out a single definitionLevel, leading to early reading of all bytes and causing out of bytes error for furthur tupleNumber. This patch accomodates for that, by creating/reusing a different decoder when a column is reset with different maxLevel. Ext-ref: MB-64486 Change-Id: Ia857e6520f5d2ae2f48a4fbbc9d647855649d145 Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19193 Reviewed-by: Ali Alsuliman <[email protected]> Reviewed-by: Peeyush Gupta <[email protected]> Tested-by: Peeyush Gupta <[email protected]> --- .../ASTERIXDB-3534/ASTERIXDB-3534.001.ddl.sqlpp | 28 ++++++++++ .../ASTERIXDB-3534/ASTERIXDB-3534.002.update.sqlpp | 33 ++++++++++++ .../ASTERIXDB-3534/ASTERIXDB-3534.003.query.sqlpp | 22 ++++++++ .../assembly/ASTERIXDB-3534/ASTERIXDB-3534.003.adm | 3 ++ .../values/reader/AbstractColumnValuesReader.java | 59 +++++++++++++++++----- 5 files changed, 132 insertions(+), 13 deletions(-) diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/assembly/ASTERIXDB-3534/ASTERIXDB-3534.001.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/assembly/ASTERIXDB-3534/ASTERIXDB-3534.001.ddl.sqlpp new file mode 100644 index 0000000000..70b73e9dd2 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/assembly/ASTERIXDB-3534/ASTERIXDB-3534.001.ddl.sqlpp @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + DROP DATAVERSE test IF EXISTS; + CREATE DATAVERSE test; + + USE test; + + CREATE DATASET ColumnDataset + PRIMARY KEY (id: String) WITH { + "storage-format": {"format": "column"} + }; \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/assembly/ASTERIXDB-3534/ASTERIXDB-3534.002.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/assembly/ASTERIXDB-3534/ASTERIXDB-3534.002.update.sqlpp new file mode 100644 index 0000000000..1287f920cd --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/assembly/ASTERIXDB-3534/ASTERIXDB-3534.002.update.sqlpp @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + USE test; + + INSERT INTO ColumnDataset [{ + "id":"bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb [...] + "config": {"version": {"observer":null}} + }, + { + "id":"abbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb [...] + "config": {"version": {"observer":null}} + }, + { + "id":"cadadasdasdasdadasdadasdadaawdasdawdfadawfafafawfafhkaskjdfhaskjfkjhasksbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb [...] + "config": {"version": {"observer":{"hash":"aaa"}}} + }]; diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/assembly/ASTERIXDB-3534/ASTERIXDB-3534.003.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/assembly/ASTERIXDB-3534/ASTERIXDB-3534.003.query.sqlpp new file mode 100644 index 0000000000..67a83176df --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/assembly/ASTERIXDB-3534/ASTERIXDB-3534.003.query.sqlpp @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + USE test; + + SELECT config.version from ColumnDataset; \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/column/assembly/ASTERIXDB-3534/ASTERIXDB-3534.003.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/assembly/ASTERIXDB-3534/ASTERIXDB-3534.003.adm new file mode 100644 index 0000000000..164a125fb7 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/assembly/ASTERIXDB-3534/ASTERIXDB-3534.003.adm @@ -0,0 +1,3 @@ +{ "version": { "observer": null } } +{ "version": { "observer": null } } +{ "version": { "observer": { "hash": "aaa" } } } \ No newline at end of file diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/AbstractColumnValuesReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/AbstractColumnValuesReader.java index 0942a23ca3..576b884fb2 100644 --- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/AbstractColumnValuesReader.java +++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/AbstractColumnValuesReader.java @@ -31,15 +31,24 @@ import org.apache.asterix.column.values.reader.value.AbstractValueReader; import org.apache.asterix.om.types.ATypeTag; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.data.std.api.IValueReference; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.apache.parquet.bytes.BytesUtils; +import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; +import it.unimi.dsi.fastutil.ints.Int2ObjectArrayMap; + abstract class AbstractColumnValuesReader implements IColumnValuesReader { + private static final Logger LOGGER = LogManager.getLogger(); + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + protected final AbstractValueReader valueReader; protected final int columnIndex; - protected final int maxLevel; - protected final ParquetRunLengthBitPackingHybridDecoder definitionLevels; + protected int maxLevel; + protected Int2ObjectArrayMap<ParquetRunLengthBitPackingHybridDecoder> definitionLevels; + protected ParquetRunLengthBitPackingHybridDecoder currentDefinitionLevels; protected final AbstractBytesInputStream valuesStream; private final boolean primaryKey; protected int level; @@ -58,7 +67,10 @@ abstract class AbstractColumnValuesReader implements IColumnValuesReader { this.valueReader = valueReader; this.columnIndex = columnIndex; this.maxLevel = !primaryKey && valueReader.getTypeTag() == ATypeTag.MISSING ? Integer.MAX_VALUE : maxLevel; - definitionLevels = new ParquetRunLengthBitPackingHybridDecoder(ColumnValuesUtil.getBitWidth(maxLevel)); + definitionLevels = new Int2ObjectArrayMap<>(); + definitionLevels.put(maxLevel, + new ParquetRunLengthBitPackingHybridDecoder(ColumnValuesUtil.getBitWidth(maxLevel))); + currentDefinitionLevels = definitionLevels.get(maxLevel); valuesStream = primaryKey ? new ByteBufferInputStream() : new MultiByteBufferInputStream(); this.primaryKey = primaryKey; } @@ -70,15 +82,22 @@ abstract class AbstractColumnValuesReader implements IColumnValuesReader { return; } - int actualLevel = definitionLevels.readInt(); - //Check whether the level is for a null value - nullLevel = ColumnValuesUtil.isNull(nullBitMask, actualLevel); - //Clear the null bit to allow repeated value readers determine the correct delimiter for null values - level = ColumnValuesUtil.clearNullBit(nullBitMask, actualLevel); + try { + int actualLevel = currentDefinitionLevels.readInt(); + //Check whether the level is for a null value + nullLevel = ColumnValuesUtil.isNull(nullBitMask, actualLevel); + //Clear the null bit to allow repeated value readers determine the correct delimiter for null values + level = ColumnValuesUtil.clearNullBit(nullBitMask, actualLevel); - // For logging purposes only - numberOfEncounteredMissing += isMissing() ? 1 : 0; - numberOfEncounteredNull += isNull() ? 1 : 0; + // For logging purposes only + numberOfEncounteredMissing += isMissing() ? 1 : 0; + numberOfEncounteredNull += isNull() ? 1 : 0; + } catch (Exception e) { + ObjectNode infoNode = OBJECT_MAPPER.createObjectNode(); + appendReaderInformation(infoNode); + LOGGER.error("error reading nextLevel, collected info: {}", infoNode); + throw HyracksDataException.create(e); + } } abstract void resetValues(); @@ -96,16 +115,28 @@ abstract class AbstractColumnValuesReader implements IColumnValuesReader { } allMissing = false; try { - nullBitMask = ColumnValuesUtil.getNullMask(BytesUtils.readZigZagVarInt(in)); + int actualLevel = BytesUtils.readZigZagVarInt(in); + maxLevel = ColumnValuesUtil.clearNullBit(nullBitMask, actualLevel); + nullBitMask = ColumnValuesUtil.getNullMask(actualLevel); + + currentDefinitionLevels = definitionLevels.get(maxLevel); + if (currentDefinitionLevels == null) { + currentDefinitionLevels = + new ParquetRunLengthBitPackingHybridDecoder(ColumnValuesUtil.getBitWidth(maxLevel)); + definitionLevels.put(maxLevel, currentDefinitionLevels); + } int defLevelsSize = BytesUtils.readZigZagVarInt(in); valueCount = BytesUtils.readZigZagVarInt(in); - definitionLevels.reset(in); + currentDefinitionLevels.reset(in); valuesStream.resetAt(defLevelsSize, in); int valueLength = BytesUtils.readZigZagVarInt(valuesStream); if (valueLength > 0) { valueReader.init(valuesStream, tupleCount); } } catch (IOException e) { + ObjectNode infoNode = OBJECT_MAPPER.createObjectNode(); + appendReaderInformation(infoNode); + LOGGER.error("error while resetting reader, collected info: {}", infoNode); throw HyracksDataException.create(e); } resetValues(); @@ -222,5 +253,7 @@ abstract class AbstractColumnValuesReader implements IColumnValuesReader { node.put("nullBitMask", nullBitMask); node.put("numberOfEncounteredMissing", numberOfEncounteredMissing); node.put("numberOfEncounteredNull", numberOfEncounteredNull); + node.put("numberOfDecodersRequired", definitionLevels.size()); + node.put("maxLevelsEncountered", definitionLevels.keySet().toString()); } }
