This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch branch-3.4.2
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 1292d18ad6d34053bd275feb54597d1b68d07840
Author: stiga-huang <[email protected]>
AuthorDate: Tue Jul 19 11:16:11 2022 +0800

    IMPALA-11444: Fix wrong results when reading wide rows from ORC
    
    After IMPALA-9228, ORC scanner reads rows into scratch batch where we
    perform conjuncts and runtime filters. The survived rows will be picked
    by the output row batch. We loop this until the output row batch is
    filled (1024 rows by default) or we finish reading the ORC batch (1024
    rows by default).
    
    Usually the loop will have only 1 iteration since the scratch batch
    capacity is also 1024. All rows of the current ORC batch can be
    materialized into the scratch batch. However, when reading wide rows
    that have tuple size larger than 4096 bytes, the scratch batch capacity
    will be reduced to be lower 1024, i.e. the scratch batch can store less
    than 1024 rows. In this case, we need more iterations in the loop.
    
    The bug is that we didn't commit rows to the output row batch after each
    iteration. The suvived rows will be ovewritten in the second iteration.
    
    This is fixed in a later optimization (IMPALA-9469) which is missing in
    the 3.x branch. This patch only pick the fix of it.
    
    Tests:
     - Add test on wide tables with 2K columns
    
    Change-Id: I09f1c23c817ad012587355c16f37f42d5fb41bff
    Reviewed-on: http://gerrit.cloudera.org:8080/18745
    Reviewed-by: Gabor Kaszab <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 be/src/exec/hdfs-orc-scanner.cc   | 16 +++++++++++---
 tests/query_test/test_scanners.py | 45 +++++++++++++++++++++++++++++++++++++--
 2 files changed, 56 insertions(+), 5 deletions(-)

diff --git a/be/src/exec/hdfs-orc-scanner.cc b/be/src/exec/hdfs-orc-scanner.cc
index d1be3302c..43fc666cb 100644
--- a/be/src/exec/hdfs-orc-scanner.cc
+++ b/be/src/exec/hdfs-orc-scanner.cc
@@ -732,6 +732,13 @@ Status 
HdfsOrcScanner::TransferTuples(OrcComplexColumnReader* coll_reader,
       int num_tuples_transferred = TransferScratchTuples(dst_batch);
       row_id += num_tuples_transferred;
       num_to_commit += num_tuples_transferred;
+      // Commit rows before next iteration. otherwise, they will be 
overwritten.
+      // This happens when more than one iteration is needed, e.g. when the 
capacity of
+      // scratch_batch is small (due to reading wide rows).
+      VLOG_ROW << Substitute(
+          "Transfer $0 rows from scratch batch to dst_batch ($1 rows)",
+          num_to_commit, dst_batch->num_rows());
+      RETURN_IF_ERROR(CommitRows(num_tuples_transferred, dst_batch));
     } else {
       if (tuple_desc->byte_size() > 0) DCHECK_LT((void*)tuple, 
(void*)tuple_mem_end_);
       InitTuple(tuple_desc, template_tuple_, tuple);
@@ -746,9 +753,12 @@ Status 
HdfsOrcScanner::TransferTuples(OrcComplexColumnReader* coll_reader,
       }
     }
   }
-  VLOG_ROW << Substitute("Transfer $0 rows from scratch batch to dst_batch ($1 
rows)",
-      num_to_commit, dst_batch->num_rows());
-  return CommitRows(num_to_commit, dst_batch);
+  if (!do_batch_read) {
+    VLOG_ROW << Substitute("Transfer $0 rows from scratch batch to dst_batch 
($1 rows)",
+        num_to_commit, dst_batch->num_rows());
+    return CommitRows(num_to_commit, dst_batch);
+  }
+  return Status::OK();
 }
 
 Status HdfsOrcScanner::AllocateTupleMem(RowBatch* row_batch) {
diff --git a/tests/query_test/test_scanners.py 
b/tests/query_test/test_scanners.py
index 9cb169a33..241716c92 100644
--- a/tests/query_test/test_scanners.py
+++ b/tests/query_test/test_scanners.py
@@ -68,6 +68,11 @@ DEBUG_ACTION_DIMS = [None,
 # Trigger injected soft limit failures when scanner threads check memory limit.
 DEBUG_ACTION_DIMS.append('HDFS_SCANNER_THREAD_CHECK_SOFT_MEM_LIMIT:[email protected]')
 
+# Map from the test dimension file_format string to the SQL "STORED AS"
+# argument.
+STORED_AS_ARGS = {'text': 'textfile', 'parquet': 'parquet', 'avro': 'avro', 
'orc': 'orc',
+    'seq': 'sequencefile', 'rc': 'rcfile'}
+
 class TestScannersAllTableFormats(ImpalaTestSuite):
   BATCH_SIZES = [0, 1, 16]
 
@@ -225,7 +230,6 @@ class TestUnmatchedSchema(ImpalaTestSuite):
     self._drop_test_table(vector)
 
 
-# Tests that scanners can read a single-column, single-row, 10MB table
 class TestWideRow(ImpalaTestSuite):
   @classmethod
   def get_workload(cls):
@@ -240,7 +244,8 @@ class TestWideRow(ImpalaTestSuite):
     cls.ImpalaTestMatrix.add_constraint(
       lambda v: v.get_value('table_format').file_format != 'hbase')
 
-  def test_wide_row(self, vector):
+  def test_single_wide_row(self, vector):
+    """Tests that scanners can read a single-column, single-row, 10MB table"""
     if vector.get_value('table_format').file_format == 'kudu':
       pytest.xfail("KUDU-666: Kudu support for large values")
 
@@ -256,6 +261,42 @@ class TestWideRow(ImpalaTestSuite):
     new_vector.get_value('exec_option')['mem_limit'] = 100 * 1024 * 1024
     self.run_test_case('QueryTest/wide-row', new_vector)
 
+  @SkipIfABFS.hive
+  @SkipIfADLS.hive
+  @SkipIfIsilon.hive
+  @SkipIfLocal.hive
+  @SkipIfS3.hive
+  def test_multi_wide_rows(self, vector, unique_database):
+    """Tests that scanners can read multi rows of a wide table"""
+    if vector.get_value('table_format').file_format == 'kudu':
+      pytest.xfail("Kudu table can have a maximum of 300 columns")
+    format = STORED_AS_ARGS[vector.get_value('table_format').file_format]
+
+    create_tbl_stmt = 'create table %s.wide_tbl(col0 bigint' % unique_database
+    for i in range(1, 2000):
+      create_tbl_stmt += ',col%d bigint' % i
+    create_tbl_stmt += ') stored as %s' % format
+    self.client.execute(create_tbl_stmt)
+
+    insert_stmt = 'insert into %s.wide_tbl ' % unique_database +\
+                  'select id' + (',id' * 1999) +\
+                  ' from functional.alltypes order by id limit 1000'
+    if format in ('textfile', 'parquet', 'kudu'):
+      self.client.execute(insert_stmt)
+    else:
+      self.run_stmt_in_hive(insert_stmt)
+      self.client.execute('refresh %s.wide_tbl' % unique_database)
+
+    result = self.client.execute(
+        "select * from %s.wide_tbl where col0 = 1" % unique_database)
+    assert len(result.data) == 1
+    assert type(result.data[0]) == str
+    cols = result.data[0].split('\t')
+    assert len(cols) == 2000
+    for i in range(2000):
+      assert cols[i] == '1'
+
+
 class TestWideTable(ImpalaTestSuite):
   # TODO: expand this to more rows when we have the capability
   NUM_COLS = [250, 500, 1000]

Reply via email to