This is an automated email from the ASF dual-hosted git repository.

boroknagyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 8170ec124d8b1f64c122de5dc9ed5853883c6528
Author: Noemi Pap-Takacs <[email protected]>
AuthorDate: Tue Apr 29 14:06:50 2025 +0200

    IMPALA-11672: Update 'transient_lastDdlTime' for Iceberg tables
    
    'transient_lastDdlTime' table property was not updated for Iceberg
    tables before this change. Now it is updated after DDL operations
    including DROP PARTITION as well.
    
    Renaming an Iceberg table is an exception:
    Iceberg does not keep track of the table name in the metadata files,
    so there is no Iceberg transaction to change it.
    The table name is a concept that exists only in the catalog.
    If we rename the table, we only edit our catalog entry, but the metadata
    stored on the file system - the table's state - does not change.
    Therefore renaming an Iceberg table does not change the
    'transient_lastDdlTime' table property because rename is a
    catalog-level operation for Iceberg tables, and not table-level.
    
    Testing:
     - added managed and non-managed Iceberg table DDL tests to
       test_last_ddl_update.py
    
    Change-Id: I7e5f63b50bd37c80faf482c4baf4221be857c54b
    Reviewed-on: http://gerrit.cloudera.org:8080/22831
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 .../apache/impala/service/CatalogOpExecutor.java   |  76 ++++++++++----
 tests/metadata/test_last_ddl_time_update.py        | 113 ++++++++++++++++-----
 2 files changed, 145 insertions(+), 44 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java 
b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index 195605e6e..b5acac0ad 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -1182,8 +1182,11 @@ public class CatalogOpExecutor {
 
   /**
    * Execute the ALTER TABLE command according to the TAlterTableParams and 
refresh the
-   * table metadata, except for RENAME, ADD PARTITION and DROP PARTITION. This 
call is
-   * thread-safe, i.e. concurrent operations on the same table are serialized.
+   * table metadata, except for:
+   * - RENAME for Iceberg tables
+   * - RENAME, ADD PARTITION and DROP PARTITION for HDFS tables.
+   * This call is thread-safe, i.e. concurrent operations on the same table are
+   * serialized.
    */
   private void alterTable(TAlterTableParams params, @Nullable String 
debugAction,
       boolean wantMinimalResult, TDdlExecResponse response, EventSequence 
catalogTimeline)
@@ -1299,14 +1302,23 @@ public class CatalogOpExecutor {
         case DROP_PARTITION:
           TAlterTableDropPartitionParams dropPartParams =
               params.getDrop_partition_params();
-          // Drop the partition from the corresponding table. If "purge" 
option is
-          // specified partition data is purged by skipping Trash, if 
configured.
-          alterTableDropPartition(tbl, dropPartParams.getPartition_set(),
-              dropPartParams.isIf_exists(), dropPartParams.isPurge(),
-              numUpdatedPartitions, catalogTimeline, modification);
-          responseSummaryMsg =
-              "Dropped " + numUpdatedPartitions.getRef() + " partition(s).";
-          reloadMetadata = false;
+          if (tbl instanceof IcebergTable) {
+            // The partitions of Iceberg tables are not listed in HMS. DROP 
PARTITION is a
+            // metadata-only change for Iceberg tables.
+            updateHmsAfterIcebergOnlyModification(
+                (IcebergTable) tbl, catalogTimeline,modification);
+          } else {
+            // DROP PARTITION for HDFS tables is different, because partitions 
are listed
+            // in HMS.
+            // Drop the partition from the corresponding table. If "purge" 
option is
+            // specified partition data is purged by skipping Trash, if 
configured.
+            alterTableDropPartition(tbl, dropPartParams.getPartition_set(),
+                dropPartParams.isIf_exists(), dropPartParams.isPurge(),
+                numUpdatedPartitions, catalogTimeline, modification);
+            responseSummaryMsg =
+                "Dropped " + numUpdatedPartitions.getRef() + " partition(s).";
+            reloadMetadata = false;
+          }
           break;
         case RENAME_TABLE:
         case RENAME_VIEW:
@@ -1420,6 +1432,13 @@ public class CatalogOpExecutor {
               tbl, params.getSet_owner_params(), response, catalogTimeline, 
modification);
           responseSummaryMsg = "Updated table/view.";
           break;
+        // ALTER a non-managed Iceberg table.
+        case EXECUTE:
+        case SET_PARTITION_SPEC:
+          Preconditions.checkState(tbl instanceof IcebergTable);
+          updateHmsAfterIcebergOnlyModification(
+              (IcebergTable) tbl, catalogTimeline, modification);
+          break;
         default:
           throw new UnsupportedOperationException(
               "Unknown ALTER TABLE operation type: " + params.getAlter_type());
@@ -1582,7 +1601,6 @@ public class CatalogOpExecutor {
         case EXECUTE:
           Preconditions.checkState(params.isSetSet_execute_params());
           // All the EXECUTE functions operate only on Iceberg data.
-          needsToUpdateHms = false;
           TAlterTableExecuteParams setExecuteParams = 
params.getSet_execute_params();
           if (setExecuteParams.isSetExecute_rollback_params()) {
             String rollbackSummary = 
IcebergCatalogOpExecutor.alterTableExecuteRollback(
@@ -1601,7 +1619,6 @@ public class CatalogOpExecutor {
           break;
         case SET_PARTITION_SPEC:
           // Partition spec is not stored in HMS.
-          needsToUpdateHms = false;
           TAlterTableSetPartitionSpecParams setPartSpecParams =
               params.getSet_partition_spec_params();
           IcebergCatalogOpExecutor.alterTableSetPartitionSpec(tbl,
@@ -1617,8 +1634,7 @@ public class CatalogOpExecutor {
           addSummary(response, "Updated table.");
           break;
         case DROP_PARTITION:
-          // Metadata change only
-          needsToUpdateHms = false;
+          // Partitions are not stored in HMS, this is a metadata change only.
           long droppedPartitions = 
IcebergCatalogOpExecutor.alterTableDropPartition(
               iceTxn, params.getDrop_partition_params());
           addSummary(
@@ -1633,6 +1649,14 @@ public class CatalogOpExecutor {
               params.getAlter_type());
       }
       catalogTimeline.markEvent("Iceberg operations are prepared for commit");
+      // Modify "transient_lastDdlTime" table property through Iceberg.
+      // Non-managed Iceberg tables also need to update this property in HMS 
separately
+      // in 'alterTable()', regardless of whether the modification itself 
needs to update
+      // HMS or affects only Iceberg metadata stored on the file system.
+      Map<String, String> property = new HashMap<>();
+      property.put(Table.TBL_PROP_LAST_DDL_TIME,
+          Long.toString(System.currentTimeMillis() / 1000));
+      IcebergCatalogOpExecutor.setTblProperties(iceTxn, property);
       if (!needsToUpdateHms) {
         // registerInflightEvent() before committing transaction.
         modification.registerInflightEvent();
@@ -1653,8 +1677,7 @@ public class CatalogOpExecutor {
     }
 
     if (!needsToUpdateHms) {
-      // We don't need to update HMS because either it is already done by 
Iceberg's
-      // HiveCatalog, or we modified the Iceberg data which is not stored in 
HMS.
+      // We don't need to update HMS because it is already done by Iceberg's 
HiveCatalog.
       loadTableMetadata(tbl, modification.newVersionNumber(), true, true,
           "ALTER Iceberg TABLE " + params.getAlter_type().name(), debugAction,
           catalogTimeline);
@@ -1681,8 +1704,8 @@ public class CatalogOpExecutor {
 
   /**
    * Iceberg format from V2 supports row-level modifications. We set write 
modes to
-   * "merge-on-read" which is the write mode Impala will eventually
-   * support (IMPALA-11664). Unless the user specified otherwise in the table 
properties.
+   * "merge-on-read" which is the write mode Impala supports.
+   * Unless the user specified otherwise in the table properties.
    */
   private void addMergeOnReadPropertiesIfNeeded(IcebergTable tbl,
       Map<String, String> properties) {
@@ -1714,6 +1737,23 @@ public class CatalogOpExecutor {
     return true;
   }
 
+  /**
+   * Update HMS about an ALTER TABLE issued on a non-managed Iceberg table. In 
case the
+   * modification only affects Iceberg metadata, there would be no update to 
HMS, but
+   * 'transient_lastDdlTime' table property still needs to be updated in Hive 
Metastore.
+   */
+  private void updateHmsAfterIcebergOnlyModification(IcebergTable tbl,
+      EventSequence catalogTimeline, InProgressTableModification modification)
+      throws ImpalaRuntimeException {
+    // Managed Iceberg tables in HMS are updated by Iceberg's HiveCatalog 
together with
+    // committing the Iceberg transaction. HMS should not be updated directly 
in parallel
+    // with that because of potential data loss.
+    
Preconditions.checkState(!IcebergUtil.isHiveCatalog(tbl.getMetaStoreTable()));
+    Preconditions.checkState(tbl.isWriteLockedByCurrentThread());
+    org.apache.hadoop.hive.metastore.api.Table msTbl = 
tbl.getMetaStoreTable().deepCopy();
+    applyAlterAndInProgressTableModification(msTbl, catalogTimeline, 
modification);
+  }
+
   /**
    * Loads the metadata of a table 'tbl' and assigns a new catalog version.
    * 'reloadFileMetadata' and 'reloadTableSchema'
diff --git a/tests/metadata/test_last_ddl_time_update.py 
b/tests/metadata/test_last_ddl_time_update.py
index 6b202933f..a5e48c7fd 100644
--- a/tests/metadata/test_last_ddl_time_update.py
+++ b/tests/metadata/test_last_ddl_time_update.py
@@ -39,6 +39,12 @@ class TestLastDdlTimeUpdate(ImpalaTestSuite):
       # to regress here.
       cls.ImpalaTestMatrix.add_constraint(lambda v: False)
 
+  class TableFormat:
+    HDFS = 1
+    KUDU = 2
+    INTEGRATED_ICEBERG = 3
+    NON_INTEGRATED_ICEBERG = 4
+
   # Convenience class to make calls to TestLastDdlTimeUpdate.run_test() 
shorter by
   # storing common arguments as members and substituting table name and HDFS 
warehouse
   # path to the query string.
@@ -76,13 +82,18 @@ class TestLastDdlTimeUpdate(ImpalaTestSuite):
       """
       self.run_test(query, self.TimeState.UNCHANGED, self.TimeState.CHANGED)
 
-    def expect_ddl_time_change_on_rename(self, new_tbl_name):
+    def expect_ddl_time_change_on_rename(self, new_tbl_name, expect_change):
       """
       Checks that after an ALTER TABLE ... RENAME query transient_lastDdlTime 
is higher on
       the new table than it was on the old table.
       """
       query = "alter table %(TBL)s rename to {}".format(self.db_name + "." + 
new_tbl_name)
-      self.run_test(query, self.TimeState.CHANGED, self.TimeState.UNCHANGED, 
new_tbl_name)
+      if expect_change:
+        self.run_test(
+            query, self.TimeState.CHANGED, self.TimeState.UNCHANGED, 
new_tbl_name)
+      else:
+        self.run_test(
+            query, self.TimeState.UNCHANGED, self.TimeState.UNCHANGED, 
new_tbl_name)
 
     def expect_stat_time_set(self, query):
       """Running the query should not change transient_lastDdlTime while
@@ -160,25 +171,33 @@ class TestLastDdlTimeUpdate(ImpalaTestSuite):
       statsTime = table.parameters.get(LAST_COMPUTE_STATS_TIME_KEY, "")
       return (ddlTime, statsTime)
 
-  def _create_table(self, fq_tbl_name, is_kudu):
-    if is_kudu:
-      self.execute_query("create table %s (i int primary key) "
-                         "partition by hash(i) partitions 3 stored as kudu" % 
fq_tbl_name)
-    else:
+  def _create_table(self, fq_tbl_name, format):
+    if format == self.TableFormat.HDFS:
       self.execute_query("create external table %s (i int) "
-                         "partitioned by (j int, s string)" % fq_tbl_name)
-
-  def _create_and_init_test_helper(self, unique_database, tbl_name, is_kudu):
+          "partitioned by (j int, s string)" % fq_tbl_name)
+    elif format == self.TableFormat.KUDU:
+      self.execute_query("create table %s (i int primary key) partition by "
+          "hash(i) partitions 3 stored as kudu" % fq_tbl_name)
+    elif format == self.TableFormat.INTEGRATED_ICEBERG:
+      self.execute_query("create table %s (i int) partitioned by (j int, s 
string) "
+          "stored by iceberg" % fq_tbl_name)
+    elif format == self.TableFormat.NON_INTEGRATED_ICEBERG:
+      self.execute_query("create table %s (i int) partitioned by (j int, s 
string) "
+          "stored by iceberg tblproperties('iceberg.catalog'='hadoop.tables')"
+          % fq_tbl_name)
+
+  def _create_and_init_test_helper(self, unique_database, tbl_name, 
table_format):
     helper = TestLastDdlTimeUpdate.TestHelper(self, unique_database, tbl_name)
-    self._create_table(helper.fq_tbl_name, is_kudu)
+    self._create_table(helper.fq_tbl_name, table_format)
 
     # compute statistics to fill table property impala.lastComputeStatsTime
     self.execute_query("compute stats %s" % helper.fq_tbl_name)
     return helper
 
-  def test_alter(self, vector, unique_database):
+  def test_hdfs_alter(self, vector, unique_database):
     TBL_NAME = "alter_test_tbl"
-    h = self._create_and_init_test_helper(unique_database, TBL_NAME, False)
+    h = self._create_and_init_test_helper(
+        unique_database, TBL_NAME, self.TableFormat.HDFS)
 
     # add/drop partitions
     h.expect_no_time_change("alter table %(TBL)s add partition (j=1, 
s='2012')")
@@ -214,10 +233,10 @@ class TestLastDdlTimeUpdate(ImpalaTestSuite):
     # compute sampled statistics
     h.expect_stat_time_change("compute stats %(TBL)s tablesample system(20)")
 
-
-  def test_insert(self, vector, unique_database):
+  def test_hdfs_insert(self, vector, unique_database):
     TBL_NAME = "insert_test_tbl"
-    h = self._create_and_init_test_helper(unique_database, TBL_NAME, False)
+    h = self._create_and_init_test_helper(
+        unique_database, TBL_NAME, self.TableFormat.HDFS)
 
     # static partition insert
     h.expect_no_time_change("insert into %(TBL)s partition(j=1, s='2012') 
select 10")
@@ -233,28 +252,69 @@ class TestLastDdlTimeUpdate(ImpalaTestSuite):
 
   def test_kudu_alter_and_insert(self, vector, unique_database):
     TBL_NAME = "kudu_test_tbl"
-    h = self._create_and_init_test_helper(unique_database, TBL_NAME, True)
+    h = self._create_and_init_test_helper(
+        unique_database, TBL_NAME, self.TableFormat.KUDU)
 
     # insert
     h.expect_no_time_change("insert into %s values (1)" % h.fq_tbl_name)
 
     self.run_common_test_cases(h)
 
+  def test_iceberg_alter_and_insert(self, vector, unique_database):
+    TBL_NAMES = ("iceberg_test_tbl", "non_integrated_iceberg_test_tbl")
+    helpers = (self._create_and_init_test_helper(
+                unique_database, TBL_NAMES[0], 
self.TableFormat.INTEGRATED_ICEBERG),
+              self._create_and_init_test_helper(
+                unique_database, TBL_NAMES[1], 
self.TableFormat.NON_INTEGRATED_ICEBERG))
+
+    for h in helpers:
+      # insert
+      h.expect_no_time_change("insert into %(TBL)s select 10, 2, '2025'")
+      h.expect_no_time_change("insert into %(TBL)s select 20, 1, '2012'")
+
+      # add, alter and drop column
+      h.expect_ddl_time_change("alter table %(TBL)s add column a boolean")
+      h.expect_ddl_time_change("alter table %(TBL)s alter column a set comment 
'bool'")
+      h.expect_ddl_time_change("alter table %(TBL)s drop column a")
+
+      # drop partition
+      h.expect_ddl_time_change("alter table %(TBL)s drop partition (j=1, 
s='2012')")
+
+      # set partition spec
+      h.expect_ddl_time_change("alter table %(TBL)s set partition spec 
(void(j))")
+
+      # expire snapshots
+      h.expect_ddl_time_change("alter table %(TBL)s execute 
expire_snapshots(now())")
+
+      self.run_common_test_cases(h)
+
   def test_rename(self, vector, unique_database):
-    # Test non-Kudu table
-    OLD_TBL_NAME = "rename_from_test_tbl"
-    NEW_TBL_NAME = "rename_to_test_tbl"
+    # Test HDFS table
+    OLD_HDFS_TBL_NAME = "hdfs_rename_from_test_tbl"
+    NEW_HDFS_TBL_NAME = "hdfs_rename_to_test_tbl"
 
-    h = self._create_and_init_test_helper(unique_database, OLD_TBL_NAME, False)
-    h.expect_ddl_time_change_on_rename(NEW_TBL_NAME)
+    h = self._create_and_init_test_helper(
+        unique_database, OLD_HDFS_TBL_NAME, self.TableFormat.HDFS)
+    h.expect_ddl_time_change_on_rename(NEW_HDFS_TBL_NAME, True)
 
     # Test Kudu table
     OLD_KUDU_TBL_NAME = "kudu_rename_from_test_tbl"
     NEW_KUDU_TBL_NAME = "kudu_rename_to_test_tbl"
-    h = self._create_and_init_test_helper(unique_database, OLD_KUDU_TBL_NAME, 
True)
-    h.expect_ddl_time_change_on_rename(NEW_KUDU_TBL_NAME)
-
-  # Tests that should behave the same with HDFS and Kudu tables.
+    h = self._create_and_init_test_helper(
+        unique_database, OLD_KUDU_TBL_NAME, self.TableFormat.KUDU)
+    h.expect_ddl_time_change_on_rename(NEW_KUDU_TBL_NAME, True)
+
+    # The name of an Iceberg table is not tracked by Iceberg in the metadata 
files.
+    # Table name is a catalog-level abstraction, therefore rename is a 
catalog-level
+    # operation which does not change 'transient_lastDdlTime' table property.
+    # Iceberg tables that use 'hadoop.tables' as catalog cannot be renamed.
+    OLD_ICEBERG_TBL_NAME = "iceberg_rename_from_test_tbl"
+    NEW_ICEBERG_TBL_NAME = "iceberg_rename_to_test_tbl"
+    h = self._create_and_init_test_helper(
+        unique_database, OLD_ICEBERG_TBL_NAME, 
self.TableFormat.INTEGRATED_ICEBERG)
+    h.expect_ddl_time_change_on_rename(NEW_ICEBERG_TBL_NAME, False)
+
+  # Tests that should behave the same with HDFS, Kudu and Iceberg tables.
   def run_common_test_cases(self, test_helper):
     h = test_helper
     # rename columns
@@ -262,6 +322,7 @@ class TestLastDdlTimeUpdate(ImpalaTestSuite):
     h.expect_ddl_time_change("alter table %(TBL)s change column k i int")
     # change table property
     h.expect_ddl_time_change("alter table %(TBL)s set tblproperties ('a'='b')")
+    h.expect_ddl_time_change("alter table %(TBL)s unset tblproperties ('a')")
 
     # changing table statistics manually
     h.expect_ddl_time_change("alter table %(TBL)s set tblproperties 
('numRows'='1')")

Reply via email to