This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch branch-4.1.1
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 0b2f6b7f35ab8de31d27631d2b693915848c99a5
Author: Daniel Becker <[email protected]>
AuthorDate: Mon Jul 25 16:22:27 2022 +0200

    IMPALA-11345: Parquet Bloom filtering failure if column is added to the
    schema
    
    If a new column was added to an existing table with existing data and
    Parquet Bloom filtering was turned ON, queries having an equality
    conjunct on the new column failed.
    
    This was because the old Parquet data files did not have the new column
    in their schema and could not find a column for the conjunct. This was
    treated as an error and the query failed.
    
    After this patch this situation is no longer treated as an error and the
    conjunct is simply disregarded for Bloom filtering in the files that
    lack the new column.
    
    Testing:
     - added the test
       TestParquetBloomFilter::test_parquet_bloom_filtering_schema_change in
       tests/query_test/test_parquet_bloom_filter.py that checks that a
       query as described above does not fail.
    
    Merge conflicts:
     - hdfs-parquet-scanner.cc removes usage of NeedDataInFile().
    
    Change-Id: Ief3e6b6358d3dff3abe5beeda752033a7e8e16a6
    Reviewed-on: http://gerrit.cloudera.org:8080/18779
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
    Reviewed-on: http://gerrit.cloudera.org:8080/18888
    Tested-by: Quanlong Huang <[email protected]>
    Reviewed-by: Csaba Ringhofer <[email protected]>
---
 be/src/exec/parquet/hdfs-parquet-scanner.cc   | 72 ++++++++++++++++++++++++---
 tests/query_test/test_parquet_bloom_filter.py | 21 ++++++++
 2 files changed, 87 insertions(+), 6 deletions(-)

diff --git a/be/src/exec/parquet/hdfs-parquet-scanner.cc 
b/be/src/exec/parquet/hdfs-parquet-scanner.cc
index d8058c875..04b1fddaa 100644
--- a/be/src/exec/parquet/hdfs-parquet-scanner.cc
+++ b/be/src/exec/parquet/hdfs-parquet-scanner.cc
@@ -1924,14 +1924,40 @@ Status HdfsParquetScanner::ReadToBuffer(uint64_t 
offset, uint8_t* buffer, uint64
   return Status::OK();
 }
 
+void LogMissingFields(google::LogSeverity log_level, const std::string& 
text_before,
+    const std::string& text_after, const std::unordered_set<std::string>& 
paths) {
+  stringstream s;
+  s << text_before;
+  s << "[";
+  size_t i = 0;
+  for (const std::string& path : paths) {
+    s << path;
+    if (i + 1 < paths.size()) {
+      s << ", ";
+    }
+    i++;
+  }
+
+  s << "]. ";
+  s << text_after;
+  VLOG(log_level) << s.str();
+}
+
 // Create a map from column index to EQ conjuncts for Bloom filtering.
 Status HdfsParquetScanner::CreateColIdx2EqConjunctMap() {
   // EQ conjuncts are represented as a LE and a GE conjunct with the same
   // value. This map is used to pair them to form EQ conjuncts.
-  // The value is a vector because there may be multiple GE or LE conjuncts on 
a column.
+  // The value is a set because there may be multiple GE or LE conjuncts on a 
column.
   unordered_map<int, std::unordered_set<std::pair<std::string, const 
Literal*>>>
       conjunct_halves;
 
+  // Slot paths for which no data is found in the file. It is expected for 
example if it
+  // is a partition column and unexpected for example if the column was added 
to the table
+  // schema after the current file was written and therefore the current file 
does
+  // not have the column.
+  std::unordered_set<std::string> unexpected_missing_fields;
+  std::unordered_set<std::string> expected_missing_fields;
+
   for (ScalarExprEvaluator* eval : stats_conjunct_evals_) {
     const ScalarExpr& expr = eval->root();
     const string& function_name = expr.function_name();
@@ -1983,13 +2009,24 @@ Status HdfsParquetScanner::CreateColIdx2EqConjunctMap() 
{
       }
 
       if (missing_field) {
-        if (file_metadata_utils_.IsValuePartitionCol(slot_desc)) continue;
-
-        return Status(Substitute(
-            "Unable to find SchemaNode for path '$0' in the schema of file 
'$1'.",
-            PrintPath(*scan_node_->hdfs_table(), slot_desc->col_path()), 
filename()));
+        if (!file_metadata_utils_.IsValuePartitionCol(slot_desc)) {
+          // If a column is added to the schema of an existing table, the 
schemas of the
+          // old parquet data files do not contain the new column: see 
IMPALA-11345. This
+          // is not an error, we simply disregard this column in Bloom 
filtering in this
+          // scanner.
+          unexpected_missing_fields.emplace(
+              PrintPath(*scan_node_->hdfs_table(), slot_desc->col_path()));
+        } else {
+          // If the data is not expected to be in the file, we disregard the 
conjuncts for
+          // the purposes of Bloom filtering.
+          expected_missing_fields.emplace(
+              PrintPath(*scan_node_->hdfs_table(), slot_desc->col_path()));
+        }
+        continue;
       }
 
+      DCHECK(node != nullptr);
+
       if (!IsParquetBloomFilterSupported(node->element->type, 
child_slot_ref->type())) {
         continue;
       }
@@ -2029,6 +2066,29 @@ Status HdfsParquetScanner::CreateColIdx2EqConjunctMap() {
     }
   }
 
+  // Log expected and unexpected missing fields.
+  if (!unexpected_missing_fields.empty()) {
+    LogMissingFields(google::WARNING,
+        Substitute(
+          "Unable to find SchemaNode for the following paths in the schema of "
+          "file '$0': ",
+          filename()),
+        "This may be because the column may have been added to the table 
schema after "
+        "writing this file. Disregarding conjuncts on this path for the 
purpose of "
+        "Parquet Bloom filtering in this file.",
+        unexpected_missing_fields);
+  }
+
+  if (!expected_missing_fields.empty()) {
+    LogMissingFields(google::INFO,
+        Substitute(
+          "Data for the following paths is not expected to be present in file 
'$0': ",
+          filename()),
+        "Disregarding conjuncts on this path for the purpose of Parquet Bloom 
filtering "
+        "in this file.",
+        expected_missing_fields);
+  }
+
   return Status::OK();
 }
 
diff --git a/tests/query_test/test_parquet_bloom_filter.py 
b/tests/query_test/test_parquet_bloom_filter.py
index eda93999b..11af6d28d 100644
--- a/tests/query_test/test_parquet_bloom_filter.py
+++ b/tests/query_test/test_parquet_bloom_filter.py
@@ -95,6 +95,27 @@ class TestParquetBloomFilter(ImpalaTestSuite):
     vector.get_value('exec_option')['parquet_bloom_filtering'] = False
     self.run_test_case('QueryTest/parquet-bloom-filter-disabled', vector, 
unique_database)
 
+  def test_parquet_bloom_filtering_schema_change(self, vector, 
unique_database):
+    """ Regression test for IMPALA-11345. Tests that the query does not fail 
when a new
+    column is added to the table schema but the old Parquet files do not 
contain it and
+    therefore no column is found for a conjunct while preparing Bloom 
filtering. """
+    vector.get_value('exec_option')['parquet_bloom_filtering'] = True
+
+    tbl_name = 'changed_schema'
+
+    stmts = [
+      'create table {db}.{tbl} (id INT) stored as parquet',
+      'insert into {db}.{tbl} values (1),(2),(3)',
+      'alter table {db}.{tbl} add columns (name STRING)',
+      'insert into {db}.{tbl} values (4, "James")',
+      'select * from {db}.{tbl} where name in ("Lily")'
+    ]
+
+    for stmt in stmts:
+      self.execute_query_expect_success(self.client,
+          stmt.format(db=str(unique_database), tbl=tbl_name),
+          vector.get_value('exec_option'))
+
   def test_write_parquet_bloom_filter(self, vector, unique_database, tmpdir):
     # Get Bloom filters from the first row group of file PARQUET_TEST_FILE.
     reference_col_to_bloom_filter = self._get_first_row_group_bloom_filters(

Reply via email to