This is an automated email from the ASF dual-hosted git repository.

mblow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git

commit 1cffa2bc98bf851dfe2663190e1032cafc9df09d
Author: Ritik Raj <[email protected]>
AuthorDate: Thu Dec 12 20:29:41 2024 +0530

    [ASTERIXDB-3534][STO] Fixed Reading definitionLevel for column having nulls
    
    - user model changes: no
    - storage format changes: no
    - interface changes: no
    
    Details:
    
    - In cases where a column consists of both null values
      and nonNull values, there can be cases where in a single batch,
      one leafNode is written with all nulls while the other leafNode
      may contains same column with different type, where the "level" of the
      field can be different for null and non-null field.
      This change of level can cause the
      ParquetRunLengthBitPackingHybridDecoder
      to read more than required for pulling out a single definitionLevel,
      leading to early reading of all bytes and causing out of bytes error
      for furthur tupleNumber.
    
      This patch accomodates for that, by creating/reusing a
      different decoder when a column is reset with
      different maxLevel.
    
    Ext-ref: MB-64486
    
    Change-Id: Ia857e6520f5d2ae2f48a4fbbc9d647855649d145
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19193
    Reviewed-by: Ali Alsuliman <[email protected]>
    Reviewed-by: Peeyush Gupta <[email protected]>
    Tested-by: Peeyush Gupta <[email protected]>
---
 .../ASTERIXDB-3534/ASTERIXDB-3534.001.ddl.sqlpp    | 28 ++++++++++
 .../ASTERIXDB-3534/ASTERIXDB-3534.002.update.sqlpp | 33 ++++++++++++
 .../ASTERIXDB-3534/ASTERIXDB-3534.003.query.sqlpp  | 22 ++++++++
 .../assembly/ASTERIXDB-3534/ASTERIXDB-3534.003.adm |  3 ++
 .../values/reader/AbstractColumnValuesReader.java  | 59 +++++++++++++++++-----
 5 files changed, 132 insertions(+), 13 deletions(-)

diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/assembly/ASTERIXDB-3534/ASTERIXDB-3534.001.ddl.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/assembly/ASTERIXDB-3534/ASTERIXDB-3534.001.ddl.sqlpp
new file mode 100644
index 0000000000..70b73e9dd2
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/assembly/ASTERIXDB-3534/ASTERIXDB-3534.001.ddl.sqlpp
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+ DROP DATAVERSE test IF EXISTS;
+ CREATE DATAVERSE test;
+
+ USE test;
+
+ CREATE DATASET ColumnDataset
+ PRIMARY KEY (id: String) WITH {
+     "storage-format": {"format": "column"}
+ };
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/assembly/ASTERIXDB-3534/ASTERIXDB-3534.002.update.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/assembly/ASTERIXDB-3534/ASTERIXDB-3534.002.update.sqlpp
new file mode 100644
index 0000000000..1287f920cd
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/assembly/ASTERIXDB-3534/ASTERIXDB-3534.002.update.sqlpp
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+ USE test;
+
+ INSERT INTO ColumnDataset [{
+     
"id":"bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb
 [...]
+     "config": {"version": {"observer":null}}
+ },
+ {
+     
"id":"abbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb
 [...]
+     "config": {"version": {"observer":null}}
+ },
+ {
+     
"id":"cadadasdasdasdadasdadasdadaawdasdawdfadawfafafawfafhkaskjdfhaskjfkjhasksbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb
 [...]
+     "config": {"version": {"observer":{"hash":"aaa"}}}
+ }];
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/assembly/ASTERIXDB-3534/ASTERIXDB-3534.003.query.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/assembly/ASTERIXDB-3534/ASTERIXDB-3534.003.query.sqlpp
new file mode 100644
index 0000000000..67a83176df
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/assembly/ASTERIXDB-3534/ASTERIXDB-3534.003.query.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+ USE test;
+
+ SELECT config.version from ColumnDataset;
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/column/assembly/ASTERIXDB-3534/ASTERIXDB-3534.003.adm
 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/assembly/ASTERIXDB-3534/ASTERIXDB-3534.003.adm
new file mode 100644
index 0000000000..164a125fb7
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/assembly/ASTERIXDB-3534/ASTERIXDB-3534.003.adm
@@ -0,0 +1,3 @@
+{ "version": { "observer": null } }
+{ "version": { "observer": null } }
+{ "version": { "observer": { "hash": "aaa" } } }
\ No newline at end of file
diff --git 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/AbstractColumnValuesReader.java
 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/AbstractColumnValuesReader.java
index 0942a23ca3..576b884fb2 100644
--- 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/AbstractColumnValuesReader.java
+++ 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/AbstractColumnValuesReader.java
@@ -31,15 +31,24 @@ import 
org.apache.asterix.column.values.reader.value.AbstractValueReader;
 import org.apache.asterix.om.types.ATypeTag;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 import org.apache.parquet.bytes.BytesUtils;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 
+import it.unimi.dsi.fastutil.ints.Int2ObjectArrayMap;
+
 abstract class AbstractColumnValuesReader implements IColumnValuesReader {
+    private static final Logger LOGGER = LogManager.getLogger();
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
     protected final AbstractValueReader valueReader;
     protected final int columnIndex;
-    protected final int maxLevel;
-    protected final ParquetRunLengthBitPackingHybridDecoder definitionLevels;
+    protected int maxLevel;
+    protected Int2ObjectArrayMap<ParquetRunLengthBitPackingHybridDecoder> 
definitionLevels;
+    protected ParquetRunLengthBitPackingHybridDecoder currentDefinitionLevels;
     protected final AbstractBytesInputStream valuesStream;
     private final boolean primaryKey;
     protected int level;
@@ -58,7 +67,10 @@ abstract class AbstractColumnValuesReader implements 
IColumnValuesReader {
         this.valueReader = valueReader;
         this.columnIndex = columnIndex;
         this.maxLevel = !primaryKey && valueReader.getTypeTag() == 
ATypeTag.MISSING ? Integer.MAX_VALUE : maxLevel;
-        definitionLevels = new 
ParquetRunLengthBitPackingHybridDecoder(ColumnValuesUtil.getBitWidth(maxLevel));
+        definitionLevels = new Int2ObjectArrayMap<>();
+        definitionLevels.put(maxLevel,
+                new 
ParquetRunLengthBitPackingHybridDecoder(ColumnValuesUtil.getBitWidth(maxLevel)));
+        currentDefinitionLevels = definitionLevels.get(maxLevel);
         valuesStream = primaryKey ? new ByteBufferInputStream() : new 
MultiByteBufferInputStream();
         this.primaryKey = primaryKey;
     }
@@ -70,15 +82,22 @@ abstract class AbstractColumnValuesReader implements 
IColumnValuesReader {
             return;
         }
 
-        int actualLevel = definitionLevels.readInt();
-        //Check whether the level is for a null value
-        nullLevel = ColumnValuesUtil.isNull(nullBitMask, actualLevel);
-        //Clear the null bit to allow repeated value readers determine the 
correct delimiter for null values
-        level = ColumnValuesUtil.clearNullBit(nullBitMask, actualLevel);
+        try {
+            int actualLevel = currentDefinitionLevels.readInt();
+            //Check whether the level is for a null value
+            nullLevel = ColumnValuesUtil.isNull(nullBitMask, actualLevel);
+            //Clear the null bit to allow repeated value readers determine the 
correct delimiter for null values
+            level = ColumnValuesUtil.clearNullBit(nullBitMask, actualLevel);
 
-        // For logging purposes only
-        numberOfEncounteredMissing += isMissing() ? 1 : 0;
-        numberOfEncounteredNull += isNull() ? 1 : 0;
+            // For logging purposes only
+            numberOfEncounteredMissing += isMissing() ? 1 : 0;
+            numberOfEncounteredNull += isNull() ? 1 : 0;
+        } catch (Exception e) {
+            ObjectNode infoNode = OBJECT_MAPPER.createObjectNode();
+            appendReaderInformation(infoNode);
+            LOGGER.error("error reading nextLevel, collected info: {}", 
infoNode);
+            throw HyracksDataException.create(e);
+        }
     }
 
     abstract void resetValues();
@@ -96,16 +115,28 @@ abstract class AbstractColumnValuesReader implements 
IColumnValuesReader {
         }
         allMissing = false;
         try {
-            nullBitMask = 
ColumnValuesUtil.getNullMask(BytesUtils.readZigZagVarInt(in));
+            int actualLevel = BytesUtils.readZigZagVarInt(in);
+            maxLevel = ColumnValuesUtil.clearNullBit(nullBitMask, actualLevel);
+            nullBitMask = ColumnValuesUtil.getNullMask(actualLevel);
+
+            currentDefinitionLevels = definitionLevels.get(maxLevel);
+            if (currentDefinitionLevels == null) {
+                currentDefinitionLevels =
+                        new 
ParquetRunLengthBitPackingHybridDecoder(ColumnValuesUtil.getBitWidth(maxLevel));
+                definitionLevels.put(maxLevel, currentDefinitionLevels);
+            }
             int defLevelsSize = BytesUtils.readZigZagVarInt(in);
             valueCount = BytesUtils.readZigZagVarInt(in);
-            definitionLevels.reset(in);
+            currentDefinitionLevels.reset(in);
             valuesStream.resetAt(defLevelsSize, in);
             int valueLength = BytesUtils.readZigZagVarInt(valuesStream);
             if (valueLength > 0) {
                 valueReader.init(valuesStream, tupleCount);
             }
         } catch (IOException e) {
+            ObjectNode infoNode = OBJECT_MAPPER.createObjectNode();
+            appendReaderInformation(infoNode);
+            LOGGER.error("error while resetting reader, collected info: {}", 
infoNode);
             throw HyracksDataException.create(e);
         }
         resetValues();
@@ -222,5 +253,7 @@ abstract class AbstractColumnValuesReader implements 
IColumnValuesReader {
         node.put("nullBitMask", nullBitMask);
         node.put("numberOfEncounteredMissing", numberOfEncounteredMissing);
         node.put("numberOfEncounteredNull", numberOfEncounteredNull);
+        node.put("numberOfDecodersRequired", definitionLevels.size());
+        node.put("maxLevelsEncountered", definitionLevels.keySet().toString());
     }
 }

Reply via email to