This is an automated email from the ASF dual-hosted git repository. stigahuang pushed a commit to branch branch-3.4.2 in repository https://gitbox.apache.org/repos/asf/impala.git
commit 1292d18ad6d34053bd275feb54597d1b68d07840 Author: stiga-huang <[email protected]> AuthorDate: Tue Jul 19 11:16:11 2022 +0800 IMPALA-11444: Fix wrong results when reading wide rows from ORC After IMPALA-9228, ORC scanner reads rows into scratch batch where we perform conjuncts and runtime filters. The survived rows will be picked by the output row batch. We loop this until the output row batch is filled (1024 rows by default) or we finish reading the ORC batch (1024 rows by default). Usually the loop will have only 1 iteration since the scratch batch capacity is also 1024. All rows of the current ORC batch can be materialized into the scratch batch. However, when reading wide rows that have tuple size larger than 4096 bytes, the scratch batch capacity will be reduced to be lower 1024, i.e. the scratch batch can store less than 1024 rows. In this case, we need more iterations in the loop. The bug is that we didn't commit rows to the output row batch after each iteration. The suvived rows will be ovewritten in the second iteration. This is fixed in a later optimization (IMPALA-9469) which is missing in the 3.x branch. This patch only pick the fix of it. Tests: - Add test on wide tables with 2K columns Change-Id: I09f1c23c817ad012587355c16f37f42d5fb41bff Reviewed-on: http://gerrit.cloudera.org:8080/18745 Reviewed-by: Gabor Kaszab <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- be/src/exec/hdfs-orc-scanner.cc | 16 +++++++++++--- tests/query_test/test_scanners.py | 45 +++++++++++++++++++++++++++++++++++++-- 2 files changed, 56 insertions(+), 5 deletions(-) diff --git a/be/src/exec/hdfs-orc-scanner.cc b/be/src/exec/hdfs-orc-scanner.cc index d1be3302c..43fc666cb 100644 --- a/be/src/exec/hdfs-orc-scanner.cc +++ b/be/src/exec/hdfs-orc-scanner.cc @@ -732,6 +732,13 @@ Status HdfsOrcScanner::TransferTuples(OrcComplexColumnReader* coll_reader, int num_tuples_transferred = TransferScratchTuples(dst_batch); row_id += num_tuples_transferred; num_to_commit += num_tuples_transferred; + // Commit rows before next iteration. otherwise, they will be overwritten. + // This happens when more than one iteration is needed, e.g. when the capacity of + // scratch_batch is small (due to reading wide rows). + VLOG_ROW << Substitute( + "Transfer $0 rows from scratch batch to dst_batch ($1 rows)", + num_to_commit, dst_batch->num_rows()); + RETURN_IF_ERROR(CommitRows(num_tuples_transferred, dst_batch)); } else { if (tuple_desc->byte_size() > 0) DCHECK_LT((void*)tuple, (void*)tuple_mem_end_); InitTuple(tuple_desc, template_tuple_, tuple); @@ -746,9 +753,12 @@ Status HdfsOrcScanner::TransferTuples(OrcComplexColumnReader* coll_reader, } } } - VLOG_ROW << Substitute("Transfer $0 rows from scratch batch to dst_batch ($1 rows)", - num_to_commit, dst_batch->num_rows()); - return CommitRows(num_to_commit, dst_batch); + if (!do_batch_read) { + VLOG_ROW << Substitute("Transfer $0 rows from scratch batch to dst_batch ($1 rows)", + num_to_commit, dst_batch->num_rows()); + return CommitRows(num_to_commit, dst_batch); + } + return Status::OK(); } Status HdfsOrcScanner::AllocateTupleMem(RowBatch* row_batch) { diff --git a/tests/query_test/test_scanners.py b/tests/query_test/test_scanners.py index 9cb169a33..241716c92 100644 --- a/tests/query_test/test_scanners.py +++ b/tests/query_test/test_scanners.py @@ -68,6 +68,11 @@ DEBUG_ACTION_DIMS = [None, # Trigger injected soft limit failures when scanner threads check memory limit. DEBUG_ACTION_DIMS.append('HDFS_SCANNER_THREAD_CHECK_SOFT_MEM_LIMIT:[email protected]') +# Map from the test dimension file_format string to the SQL "STORED AS" +# argument. +STORED_AS_ARGS = {'text': 'textfile', 'parquet': 'parquet', 'avro': 'avro', 'orc': 'orc', + 'seq': 'sequencefile', 'rc': 'rcfile'} + class TestScannersAllTableFormats(ImpalaTestSuite): BATCH_SIZES = [0, 1, 16] @@ -225,7 +230,6 @@ class TestUnmatchedSchema(ImpalaTestSuite): self._drop_test_table(vector) -# Tests that scanners can read a single-column, single-row, 10MB table class TestWideRow(ImpalaTestSuite): @classmethod def get_workload(cls): @@ -240,7 +244,8 @@ class TestWideRow(ImpalaTestSuite): cls.ImpalaTestMatrix.add_constraint( lambda v: v.get_value('table_format').file_format != 'hbase') - def test_wide_row(self, vector): + def test_single_wide_row(self, vector): + """Tests that scanners can read a single-column, single-row, 10MB table""" if vector.get_value('table_format').file_format == 'kudu': pytest.xfail("KUDU-666: Kudu support for large values") @@ -256,6 +261,42 @@ class TestWideRow(ImpalaTestSuite): new_vector.get_value('exec_option')['mem_limit'] = 100 * 1024 * 1024 self.run_test_case('QueryTest/wide-row', new_vector) + @SkipIfABFS.hive + @SkipIfADLS.hive + @SkipIfIsilon.hive + @SkipIfLocal.hive + @SkipIfS3.hive + def test_multi_wide_rows(self, vector, unique_database): + """Tests that scanners can read multi rows of a wide table""" + if vector.get_value('table_format').file_format == 'kudu': + pytest.xfail("Kudu table can have a maximum of 300 columns") + format = STORED_AS_ARGS[vector.get_value('table_format').file_format] + + create_tbl_stmt = 'create table %s.wide_tbl(col0 bigint' % unique_database + for i in range(1, 2000): + create_tbl_stmt += ',col%d bigint' % i + create_tbl_stmt += ') stored as %s' % format + self.client.execute(create_tbl_stmt) + + insert_stmt = 'insert into %s.wide_tbl ' % unique_database +\ + 'select id' + (',id' * 1999) +\ + ' from functional.alltypes order by id limit 1000' + if format in ('textfile', 'parquet', 'kudu'): + self.client.execute(insert_stmt) + else: + self.run_stmt_in_hive(insert_stmt) + self.client.execute('refresh %s.wide_tbl' % unique_database) + + result = self.client.execute( + "select * from %s.wide_tbl where col0 = 1" % unique_database) + assert len(result.data) == 1 + assert type(result.data[0]) == str + cols = result.data[0].split('\t') + assert len(cols) == 2000 + for i in range(2000): + assert cols[i] == '1' + + class TestWideTable(ImpalaTestSuite): # TODO: expand this to more rows when we have the capability NUM_COLS = [250, 500, 1000]
