This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 4eafaf87b5f129dc4fbe9f285599492122e12e1c
Author: Michael Smith <[email protected]>
AuthorDate: Wed Sep 11 13:59:28 2024 -0700

    IMPALA-13322: Fix alter on SystemTables
    
    Fixes SystemTable.load to handle incremental loads - such as during
    alter - and correctly update the metastore Table so subsequent
    operations see the correct version of the table schema.
    
    Updates QueryScanner to allow unknown columns and log a warning. This is
    primarily needed when another coordinator has been upgraded and modified
    table metadata with new columns that this coordinator doesn't recognize.
    
    Adds TestQueryLive::test_alter to test that altering the table works and
    the table is usable afterwards, including attempting to query unknown
    columns as we might encounter them during upgrades.
    
    Change-Id: I9a59e58c086e659941e0db8a2b893ac6dcc5143a
    Reviewed-on: http://gerrit.cloudera.org:8080/21792
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 be/src/exec/system-table-scanner.cc                | 12 ++++--
 be/src/exec/system-table-scanner.h                 | 10 +++--
 .../org/apache/impala/catalog/SystemTable.java     | 28 +++++++++----
 tests/custom_cluster/test_query_live.py            | 46 ++++++++++++++++++++--
 4 files changed, 78 insertions(+), 18 deletions(-)

diff --git a/be/src/exec/system-table-scanner.cc 
b/be/src/exec/system-table-scanner.cc
index 417a839d7..c4af35f42 100644
--- a/be/src/exec/system-table-scanner.cc
+++ b/be/src/exec/system-table-scanner.cc
@@ -55,7 +55,7 @@ Status SystemTableScanner::CreateScanner(RuntimeState* state, 
RuntimeProfile* pr
     TSystemTableName::type table_name, std::unique_ptr<SystemTableScanner>* 
scanner) {
   switch (table_name) {
     case TSystemTableName::IMPALA_QUERY_LIVE:
-      *scanner = make_unique<QueryScanner>(state, profile);
+      *scanner = make_unique<QueryScanner>(state, profile, table_name);
       break;
     default:
       return Status(ErrorMsg(TErrorCode::NOT_IMPLEMENTED_ERROR,
@@ -112,8 +112,9 @@ static void WriteDecimalSlot(
   DCHECK(!overflow);
 }
 
-QueryScanner::QueryScanner(RuntimeState* state, RuntimeProfile* profile)
-    : SystemTableScanner(state, profile),
+QueryScanner::QueryScanner(RuntimeState* state, RuntimeProfile* profile,
+    TSystemTableName::type table_name)
+    : SystemTableScanner(state, profile, table_name),
       active_query_collection_timer_(ADD_TIMER(profile_, 
"ActiveQueryCollectionTime")),
       pending_query_collection_timer_(ADD_TIMER(profile_, 
"PendingQueryCollectionTime"))
       {}
@@ -370,7 +371,10 @@ Status QueryScanner::MaterializeNextTuple(
         }
         break;
       default:
-        DCHECK(false) << "Unknown column position " << slot_desc->col_pos();
+        LOG(WARNING) << "Unknown column (position " << slot_desc->col_pos() << 
") added"
+            " to table " << table_name_ << "; check if a coordinator was 
upgraded";
+        tuple->SetNull(slot_desc->null_indicator_offset());
+        break;
     }
   }
 
diff --git a/be/src/exec/system-table-scanner.h 
b/be/src/exec/system-table-scanner.h
index 92fbd6a78..9afacde40 100644
--- a/be/src/exec/system-table-scanner.h
+++ b/be/src/exec/system-table-scanner.h
@@ -42,8 +42,9 @@ class SystemTableScanner {
   bool eos() const noexcept { return eos_; }
 
  protected:
-  SystemTableScanner(RuntimeState* state, RuntimeProfile* profile)
-      : state_(state), profile_(profile), eos_(false) {}
+  SystemTableScanner(RuntimeState* state, RuntimeProfile* profile,
+      TSystemTableName::type table_name)
+      : state_(state), profile_(profile), table_name_(table_name), eos_(false) 
{}
 
   /// Write a string value to a STRING slot, allocating memory from 'pool'. 
Returns
   /// an error if memory cannot be allocated without exceeding a memory limit.
@@ -54,13 +55,16 @@ class SystemTableScanner {
 
   RuntimeProfile* const profile_;
 
+  const TSystemTableName::type table_name_;
+
   /// if true, nothing left to return in getNext() in SystemTableScanNode
   bool eos_;
 };
 
 class QueryScanner : public SystemTableScanner {
  public:
-  QueryScanner(RuntimeState* state, RuntimeProfile* profile);
+  QueryScanner(RuntimeState* state, RuntimeProfile* profile,
+      TSystemTableName::type table_name);
 
   /// Start scan, load list of query IDs into active_query_ids_.
   virtual Status Open();
diff --git a/fe/src/main/java/org/apache/impala/catalog/SystemTable.java 
b/fe/src/main/java/org/apache/impala/catalog/SystemTable.java
index b8622e688..180dc9fac 100644
--- a/fe/src/main/java/org/apache/impala/catalog/SystemTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/SystemTable.java
@@ -37,7 +37,6 @@ import org.apache.impala.thrift.TTableDescriptor;
 import org.apache.impala.thrift.TTableType;
 import org.apache.impala.util.EventSequence;
 import org.apache.impala.util.TResultRowBuilder;
-import com.google.common.base.Preconditions;
 
 /**
  * Represents a system table reflecting backend internal state.
@@ -99,12 +98,27 @@ public final class SystemTable extends Table implements 
FeSystemTable {
   public void load(boolean reuseMetadata, IMetaStoreClient client,
       org.apache.hadoop.hive.metastore.api.Table msTbl, String reason,
       EventSequence catalogTimeline) throws TableLoadingException {
-    int pos = colsByPos_.size();
-    // Should be no partition columns.
-    Preconditions.checkState(pos == 0);
-    for (FieldSchema s: msTbl.getSd().getCols()) {
-      Type type = FeCatalogUtils.parseColumnType(s, getName());
-      addColumn(new Column(s.getName(), type, s.getComment(), pos++));
+    if (msTbl.getPartitionKeysSize() > 0) {
+      throw new TableLoadingException(
+          "System table cannot contain clustering columns: " + name_);
+    }
+
+    Table.LOADING_TABLES.incrementAndGet();
+    try {
+      // Reload all columns.
+      clearColumns();
+      numClusteringCols_ = 0;
+      int pos = 0;
+      for (FieldSchema s: msTbl.getSd().getCols()) {
+        addColumn(new Column(s.getName(), parseColumnType(s), s.getComment(), 
pos++));
+      }
+
+      // Ensure table metadata points to the latest version.
+      setMetaStoreTable(msTbl);
+      refreshLastUsedTime();
+    } finally {
+      // Ensure this is decremented in case an exception is thrown.
+      Table.LOADING_TABLES.decrementAndGet();
     }
   }
 
diff --git a/tests/custom_cluster/test_query_live.py 
b/tests/custom_cluster/test_query_live.py
index 3a2595cd0..2df6036ae 100644
--- a/tests/custom_cluster/test_query_live.py
+++ b/tests/custom_cluster/test_query_live.py
@@ -42,13 +42,16 @@ class TestQueryLive(CustomClusterTestSuite):
 
   def assert_describe_extended(self):
     describe_ext_result = self.execute_query('describe extended 
sys.impala_query_live')
-    assert len(describe_ext_result.data) == 80
+    # Alter can add additional event fields. Filter them out.
+    describe_ext_data = [
+        line for line in describe_ext_result.data if 'impala.events.catalog' 
not in line]
+    assert len(describe_ext_data) == 80
     system_table_re = re.compile(r'__IMPALA_SYSTEM_TABLE\s+true')
-    assert list(filter(system_table_re.search, describe_ext_result.data))
+    assert list(filter(system_table_re.search, describe_ext_data))
     external_re = re.compile(r'EXTERNAL\s+TRUE')
-    assert list(filter(external_re.search, describe_ext_result.data))
+    assert list(filter(external_re.search, describe_ext_data))
     external_table_re = re.compile(r'Table Type:\s+EXTERNAL_TABLE')
-    assert list(filter(external_table_re.search, describe_ext_result.data))
+    assert list(filter(external_table_re.search, describe_ext_data))
 
   def assert_impalads(self, profile, present=[0, 1, 2], absent=[]):
     for port_idx in present:
@@ -207,6 +210,41 @@ class TestQueryLive(CustomClusterTestSuite):
     assert_query('sys.impala_query_live', self.client, 'test_query_live',
                  result.runtime_profile)
 
+  @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
+                                                 
"--cluster_id=test_query_live",
+                                    catalogd_args="--enable_workload_mgmt")
+  def test_alter(self):
+    """Asserts alter works on query live table."""
+    column_desc = 'test_alter\tstring\t'
+
+    add_column = self.execute_query(
+        'alter table sys.impala_query_live add columns(test_alter string)')
+    assert add_column.data == ['New column(s) have been added to the table.']
+
+    try:
+      describe_column = self.execute_query('describe sys.impala_query_live')
+      assert len(describe_column.data) == 50
+      assert column_desc in describe_column.data
+
+      select_column = self.execute_query(
+          'select test_alter from sys.impala_query_live limit 1')
+      assert select_column.data == ['NULL']
+      self.assert_impalad_log_contains('WARNING', r'Unknown column \(position 
49\)'
+          + ' added to table IMPALA_QUERY_LIVE; check if a coordinator was 
upgraded')
+    finally:
+      # Ensure new column is dropped in case of test failure
+      drop_column = self.execute_query(
+          'alter table sys.impala_query_live drop test_alter')
+
+    assert drop_column.data == ['Column has been dropped.']
+
+    describe_column2 = self.execute_query('describe sys.impala_query_live')
+    assert len(describe_column2.data) == 49
+    assert column_desc not in describe_column2.data
+
+    select_column2 = self.execute_query('select * from sys.impala_query_live')
+    assert len(select_column2.data) > 1
+
   @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
                                                  
"--cluster_id=test_query_live",
                                     catalogd_args="--enable_workload_mgmt",

Reply via email to