This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit ca5de24d6ab035fee7089f4dcd473da37abf0d73
Author: Zoltan Borok-Nagy <[email protected]>
AuthorDate: Mon May 22 18:44:30 2023 +0200

    IMPALA-12153: Parquet STRUCT reader should fill position slots
    
    Before this patch the Parquet STRUCT reader didn't fill the
    position slots: collection position, file position. When users
    queried these virtual columns Impala was crashed or returned
    incorrect results.
    
    The ORC scanner already worked correctly, but there was no tests
    written for it.
    
    Test:
     * e2e tests for both ORC / Parquet
    
    Change-Id: I32a808a11f4543cd404ed9f3958e9b4e971ca1f4
    Reviewed-on: http://gerrit.cloudera.org:8080/19911
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 be/src/exec/parquet/parquet-column-readers.cc      |  12 --
 be/src/exec/parquet/parquet-column-readers.h       |   6 +
 .../exec/parquet/parquet-struct-column-reader.cc   |  15 ++
 .../queries/QueryTest/struct-positions.test        | 200 +++++++++++++++++++++
 tests/query_test/test_nested_types.py              |   4 +
 5 files changed, 225 insertions(+), 12 deletions(-)

diff --git a/be/src/exec/parquet/parquet-column-readers.cc 
b/be/src/exec/parquet/parquet-column-readers.cc
index 7e58b1c24..07b5e92de 100644
--- a/be/src/exec/parquet/parquet-column-readers.cc
+++ b/be/src/exec/parquet/parquet-column-readers.cc
@@ -158,10 +158,6 @@ class ScalarColumnReader : public BaseScalarColumnReader {
   /// It updates 'current_row_' when 'rep_level' is 0.
   inline ALWAYS_INLINE void ReadFilePositionBatched(int16_t rep_level, 
int64_t* file_pos);
 
-  /// Reads position into 'pos' and updates 'pos_current_value_' based on 
'rep_level'.
-  /// It updates 'current_row_' when 'rep_level' is 0.
-  inline ALWAYS_INLINE void ReadItemPositionBatched(int16_t rep_level, 
int64_t* pos);
-
   virtual Status CreateDictionaryDecoder(
       uint8_t* values, int size, DictDecoderBase** decoder) override {
     DCHECK(slot_desc_->type().type != TYPE_BOOLEAN)
@@ -910,14 +906,6 @@ void ScalarColumnReader<InternalType, PARQUET_TYPE, 
MATERIALIZED>::
   *file_pos = FilePositionOfCurrentRow();
 }
 
-template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool 
MATERIALIZED>
-void ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::
-    ReadItemPositionBatched(int16_t rep_level, int64_t* pos) {
-  // Reset position counter if we are at the start of a new parent collection.
-  if (rep_level <= max_rep_level() - 1) pos_current_value_ = 0;
-  *pos = pos_current_value_++;
-}
-
 template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool 
MATERIALIZED>
 void ScalarColumnReader<InternalType, PARQUET_TYPE, 
MATERIALIZED>::ReadPositions(
     int64_t num_to_read, int tuple_size, uint8_t* RESTRICT tuple_mem) RESTRICT 
{
diff --git a/be/src/exec/parquet/parquet-column-readers.h 
b/be/src/exec/parquet/parquet-column-readers.h
index 7e5f21432..fa6189c1c 100644
--- a/be/src/exec/parquet/parquet-column-readers.h
+++ b/be/src/exec/parquet/parquet-column-readers.h
@@ -214,6 +214,12 @@ class ParquetColumnReader {
     return pos_slot_desc_ != nullptr || file_pos_slot_desc_ != nullptr;
   }
 
+  void ReadItemPositionBatched(int16_t rep_level, int64_t* pos) {
+    // Reset position counter if we are at the start of a new parent 
collection.
+    if (rep_level <= max_rep_level() - 1) pos_current_value_ = 0;
+    *pos = pos_current_value_++;
+  }
+
  protected:
   HdfsParquetScanner* parent_;
   const SchemaNode& node_;
diff --git a/be/src/exec/parquet/parquet-struct-column-reader.cc 
b/be/src/exec/parquet/parquet-struct-column-reader.cc
index c58420d7c..675cb377b 100644
--- a/be/src/exec/parquet/parquet-struct-column-reader.cc
+++ b/be/src/exec/parquet/parquet-struct-column-reader.cc
@@ -27,6 +27,7 @@ bool StructColumnReader::NextLevels() {
   }
   def_level_ = children_[0]->def_level();
   rep_level_ = children_[0]->rep_level();
+  if (rep_level_ <= max_rep_level() - 1) pos_current_value_ = 0;
   return result;
 }
 
@@ -54,6 +55,7 @@ bool StructColumnReader::ReadValue(MemPool* pool, Tuple* 
tuple, bool* read_row)
 
   def_level_ = children_[0]->def_level();
   rep_level_ = children_[0]->rep_level();
+  if (rep_level_ <= max_rep_level() - 1) pos_current_value_ = 0;
   return should_abort;
 }
 
@@ -98,6 +100,19 @@ bool StructColumnReader::ReadValueBatch(MemPool* pool, int 
max_values, int tuple
   while (val_count < max_values && !RowGroupAtEnd() && continue_execution) {
     Tuple* tuple = reinterpret_cast<Tuple*>(tuple_mem + val_count * 
tuple_size);
     bool read_row = false;
+    // Fill in position slots if applicable
+    if (pos_slot_desc() != nullptr) {
+      DCHECK(file_pos_slot_desc() == nullptr);
+      ReadItemPositionBatched(rep_level_,
+          tuple->GetBigIntSlot(pos_slot_desc()->tuple_offset()));
+    } else if (file_pos_slot_desc() != nullptr) {
+      DCHECK(pos_slot_desc() == nullptr);
+      // It is OK to call the non-batched version because we let the child 
readers
+      // determine the LastProcessedRow() and we use the non-bached 
ReadValue() functions
+      // of the children.
+      ReadFilePositionNonBatched(
+          tuple->GetBigIntSlot(file_pos_slot_desc()->tuple_offset()));
+    }
     continue_execution = ReadValue<IN_COLLECTION>(pool, tuple, &read_row);
     if (read_row) ++val_count;
     if (SHOULD_TRIGGER_COL_READER_DEBUG_ACTION(val_count)) {
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/struct-positions.test 
b/testdata/workloads/functional-query/queries/QueryTest/struct-positions.test
new file mode 100644
index 000000000..c69d076ba
--- /dev/null
+++ 
b/testdata/workloads/functional-query/queries/QueryTest/struct-positions.test
@@ -0,0 +1,200 @@
+====
+---- QUERY
+# Let's just have the positions here for reference.
+select id, file__position from complextypestbl;
+---- RESULTS
+1,0
+2,1
+3,2
+4,3
+5,4
+6,5
+7,6
+8,0
+---- TYPES
+BIGINT, BIGINT
+====
+---- QUERY
+# Let's query the top-level sruct so we can verify the following results.
+select id, file__position, nested_struct from complextypestbl;
+---- RESULTS
+1,0,'{"a":1,"b":[1],"c":{"d":[[{"e":10,"f":"aaa"},{"e":-10,"f":"bbb"}],[{"e":11,"f":"c"}]]},"g":{"foo":{"h":{"i":[1.1]}}}}'
+2,1,'{"a":null,"b":[null],"c":{"d":[[{"e":null,"f":null},{"e":10,"f":"aaa"},{"e":null,"f":null},{"e":-10,"f":"bbb"},{"e":null,"f":null}],[{"e":11,"f":"c"},null],[],null]},"g":{"g1":{"h":{"i":[2.2,null]}},"g2":{"h":{"i":[]}},"g3":null,"g4":{"h":{"i":null}},"g5":{"h":null}}}'
+3,2,'{"a":null,"b":null,"c":{"d":[]},"g":{}}'
+4,3,'{"a":null,"b":null,"c":{"d":null},"g":null}'
+5,4,'{"a":null,"b":null,"c":null,"g":{"foo":{"h":{"i":[2.2,3.3]}}}}'
+6,5,'NULL'
+7,6,'{"a":7,"b":[2,3,null],"c":{"d":[[],[null],null]},"g":null}'
+8,0,'{"a":-1,"b":[-1],"c":{"d":[[{"e":-1,"f":"nonnullable"}]]},"g":{}}'
+---- TYPES
+BIGINT, BIGINT, STRING
+====
+---- QUERY
+# We only query the 'nested_struct' non-virtual column, so the STRUCT reader
+# needs to set the file positions
+select file__position, nested_struct from complextypestbl;
+---- RESULTS
+0,'{"a":1,"b":[1],"c":{"d":[[{"e":10,"f":"aaa"},{"e":-10,"f":"bbb"}],[{"e":11,"f":"c"}]]},"g":{"foo":{"h":{"i":[1.1]}}}}'
+1,'{"a":null,"b":[null],"c":{"d":[[{"e":null,"f":null},{"e":10,"f":"aaa"},{"e":null,"f":null},{"e":-10,"f":"bbb"},{"e":null,"f":null}],[{"e":11,"f":"c"},null],[],null]},"g":{"g1":{"h":{"i":[2.2,null]}},"g2":{"h":{"i":[]}},"g3":null,"g4":{"h":{"i":null}},"g5":{"h":null}}}'
+2,'{"a":null,"b":null,"c":{"d":[]},"g":{}}'
+3,'{"a":null,"b":null,"c":{"d":null},"g":null}'
+4,'{"a":null,"b":null,"c":null,"g":{"foo":{"h":{"i":[2.2,3.3]}}}}'
+5,'NULL'
+6,'{"a":7,"b":[2,3,null],"c":{"d":[[],[null],null]},"g":null}'
+0,'{"a":-1,"b":[-1],"c":{"d":[[{"e":-1,"f":"nonnullable"}]]},"g":{}}'
+---- TYPES
+BIGINT, STRING
+====
+---- QUERY
+select file__position, nested_struct from complextypestbl
+where file__position = 2;
+---- RESULTS
+2,'{"a":null,"b":null,"c":{"d":[]},"g":{}}'
+---- TYPES
+BIGINT, STRING
+====
+---- QUERY
+select file__position, nested_struct from complextypestbl
+where nested_struct.a > 0;
+---- RESULTS
+0,'{"a":1,"b":[1],"c":{"d":[[{"e":10,"f":"aaa"},{"e":-10,"f":"bbb"}],[{"e":11,"f":"c"}]]},"g":{"foo":{"h":{"i":[1.1]}}}}'
+6,'{"a":7,"b":[2,3,null],"c":{"d":[[],[null],null]},"g":null}'
+---- TYPES
+BIGINT, STRING
+====
+---- QUERY
+select id, file__position, item from complextypestbl c, 
c.nested_struct.c.d.item;
+---- RESULTS
+1,0,'{"e":10,"f":"aaa"}'
+1,0,'{"e":-10,"f":"bbb"}'
+1,0,'{"e":11,"f":"c"}'
+2,1,'{"e":null,"f":null}'
+2,1,'{"e":10,"f":"aaa"}'
+2,1,'{"e":null,"f":null}'
+2,1,'{"e":-10,"f":"bbb"}'
+2,1,'{"e":null,"f":null}'
+2,1,'{"e":11,"f":"c"}'
+2,1,'NULL'
+7,6,'NULL'
+8,0,'{"e":-1,"f":"nonnullable"}'
+---- TYPES
+BIGINT, BIGINT, STRING
+====
+---- QUERY
+select file__position, item from complextypestbl c, c.nested_struct.c.d.item;
+---- RESULTS
+0,'{"e":10,"f":"aaa"}'
+0,'{"e":-10,"f":"bbb"}'
+0,'{"e":11,"f":"c"}'
+1,'{"e":null,"f":null}'
+1,'{"e":10,"f":"aaa"}'
+1,'{"e":null,"f":null}'
+1,'{"e":-10,"f":"bbb"}'
+1,'{"e":null,"f":null}'
+1,'{"e":11,"f":"c"}'
+1,'NULL'
+6,'NULL'
+0,'{"e":-1,"f":"nonnullable"}'
+---- TYPES
+BIGINT, STRING
+====
+---- QUERY
+select pos, item from complextypestbl c, c.nested_struct.c.d.item;
+---- RESULTS
+0,'{"e":10,"f":"aaa"}'
+1,'{"e":-10,"f":"bbb"}'
+0,'{"e":11,"f":"c"}'
+0,'{"e":null,"f":null}'
+1,'{"e":10,"f":"aaa"}'
+2,'{"e":null,"f":null}'
+3,'{"e":-10,"f":"bbb"}'
+4,'{"e":null,"f":null}'
+0,'{"e":11,"f":"c"}'
+1,'NULL'
+0,'NULL'
+0,'{"e":-1,"f":"nonnullable"}'
+---- TYPES
+BIGINT, STRING
+====
+---- QUERY
+select pos, item from complextypestbl c, c.nested_struct.c.d.item
+where pos = 1;
+---- RESULTS
+1,'{"e":-10,"f":"bbb"}'
+1,'{"e":10,"f":"aaa"}'
+1,'NULL'
+---- TYPES
+BIGINT, STRING
+====
+---- QUERY
+select pos, item from complextypestbl c, c.nested_struct.c.d.item it
+where it.e < 11;
+---- RESULTS
+0,'{"e":10,"f":"aaa"}'
+1,'{"e":-10,"f":"bbb"}'
+1,'{"e":10,"f":"aaa"}'
+3,'{"e":-10,"f":"bbb"}'
+0,'{"e":-1,"f":"nonnullable"}'
+---- TYPES
+BIGINT, STRING
+====
+---- QUERY
+# Queries like above, but with a flat tuple structure, i.e. the items
+# are in the top-level (and only) tuple.
+select pos, item from complextypestbl.nested_struct.c.d.item;
+---- RESULTS
+0,'{"e":10,"f":"aaa"}'
+1,'{"e":-10,"f":"bbb"}'
+0,'{"e":11,"f":"c"}'
+0,'{"e":null,"f":null}'
+1,'{"e":10,"f":"aaa"}'
+2,'{"e":null,"f":null}'
+3,'{"e":-10,"f":"bbb"}'
+4,'{"e":null,"f":null}'
+0,'{"e":11,"f":"c"}'
+1,'NULL'
+0,'NULL'
+0,'{"e":-1,"f":"nonnullable"}'
+---- TYPES
+BIGINT, STRING
+====
+---- QUERY
+select pos, item from complextypestbl.nested_struct.c.d.item
+where pos = 1;
+---- RESULTS
+1,'{"e":-10,"f":"bbb"}'
+1,'{"e":10,"f":"aaa"}'
+1,'NULL'
+---- TYPES
+BIGINT, STRING
+====
+---- QUERY
+select pos, item from complextypestbl.nested_struct.c.d.item it
+where it.e < 11;
+---- RESULTS
+0,'{"e":10,"f":"aaa"}'
+1,'{"e":-10,"f":"bbb"}'
+1,'{"e":10,"f":"aaa"}'
+3,'{"e":-10,"f":"bbb"}'
+0,'{"e":-1,"f":"nonnullable"}'
+---- TYPES
+BIGINT, STRING
+====
+---- QUERY
+select file__position, pos, item from complextypestbl c, 
c.nested_struct.c.d.item;
+---- RESULTS
+0,0,'{"e":10,"f":"aaa"}'
+0,1,'{"e":-10,"f":"bbb"}'
+0,0,'{"e":11,"f":"c"}'
+1,0,'{"e":null,"f":null}'
+1,1,'{"e":10,"f":"aaa"}'
+1,2,'{"e":null,"f":null}'
+1,3,'{"e":-10,"f":"bbb"}'
+1,4,'{"e":null,"f":null}'
+1,0,'{"e":11,"f":"c"}'
+1,1,'NULL'
+6,0,'NULL'
+0,0,'{"e":-1,"f":"nonnullable"}'
+---- TYPES
+BIGINT, BIGINT, STRING
+====
diff --git a/tests/query_test/test_nested_types.py 
b/tests/query_test/test_nested_types.py
index 17d924f48..ce3d060ff 100644
--- a/tests/query_test/test_nested_types.py
+++ b/tests/query_test/test_nested_types.py
@@ -152,6 +152,10 @@ class TestNestedStructsInSelectList(ImpalaTestSuite):
     
new_vector.get_value('exec_option')['convert_legacy_hive_parquet_utc_timestamps']
 = 1
     self.run_test_case('QueryTest/nested-struct-in-select-list', new_vector)
 
+  def test_struct_positions(self, vector):
+    """Queries where structs and (file/collection) positions are used 
together"""
+    self.run_test_case('QueryTest/struct-positions', vector)
+
 
 class TestNestedCollectionsInSelectList(ImpalaTestSuite):
   """Functional tests for nested arrays provided in the select list."""

Reply via email to