This is an automated email from the ASF dual-hosted git repository. boroknagyz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 85ebfe220aa6fe654b860d5f3fe1d8902940de0f Author: Vihang Karajgaonkar <vihan...@apache.org> AuthorDate: Wed Jul 7 14:58:07 2021 -0700 IMPALA-10490: Fix illegalStateException in drop stats If a table was created and incremental stats were computed on the table while events processing is turned off, then after the events processing is turned on a drop stats or truncate table command on such a table fails with IllegalStateException. This happens because the catalog service identifiers are not found in the table properties while partition for the table are being altered. This patch adds the catalog service identifiers for drop stats, truncate and comment on statement code paths to fix the error. Testing: 1. Added more statements to the test_self_events test to cover the newly added logic. 2. Manually tested the following scenario which was previously failing a. starting catalogd without events processing. b. create partitioned table and compute incremental stats on it. c. restart catalogd with events processing turned on. d. issue drop stats command. Change-Id: Iaa0b4043879370c52049d22acb49c847b0be1c68 Reviewed-on: http://gerrit.cloudera.org:8080/17659 Reviewed-by: Vihang Karajgaonkar <vih...@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com> --- .../org/apache/impala/service/CatalogOpExecutor.java | 12 ++++++++++++ tests/custom_cluster/test_event_processing.py | 17 ++++++++++++++--- 2 files changed, 26 insertions(+), 3 deletions(-) diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java index 1fc20fc..3b77cc9 100644 --- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java +++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java @@ -2099,6 +2099,8 @@ public class CatalogOpExecutor { try { long newCatalogVersion = catalog_.incrementAndGetCatalogVersion(); catalog_.getLock().writeLock().unlock(); + addCatalogServiceIdentifiers(table, catalog_.getCatalogServiceId(), + newCatalogVersion); if (params.getPartition_set() == null) { // TODO: Report the number of updated partitions/columns to the user? // TODO: bulk alter the partitions. @@ -2124,6 +2126,7 @@ public class CatalogOpExecutor { } loadTableMetadata(table, newCatalogVersion, /*reloadFileMetadata=*/false, /*reloadTableSchema=*/true, /*partitionsToUpdate=*/null, "DROP STATS"); + catalog_.addVersionsForInflightEvents(false, table, newCatalogVersion); addTableToCatalogUpdate(table, wantMinimalResult, resp.result); addSummary(resp, "Stats have been dropped."); } finally { @@ -2667,6 +2670,7 @@ public class CatalogOpExecutor { Preconditions.checkState(newCatalogVersion > 0); addSummary(resp, "Table has been truncated."); loadTableMetadata(table, newCatalogVersion, true, true, null, "TRUNCATE"); + catalog_.addVersionsForInflightEvents(false, table, newCatalogVersion); addTableToCatalogUpdate(table, wantMinimalResult, resp.result); } finally { UnlockWriteLockIfErronouslyLocked(); @@ -2710,6 +2714,8 @@ public class CatalogOpExecutor { table.getFullName(), sw.elapsed(TimeUnit.MILLISECONDS)); newCatalogVersion = catalog_.incrementAndGetCatalogVersion(); catalog_.getLock().writeLock().unlock(); + addCatalogServiceIdentifiers(table, catalog_.getCatalogServiceId(), + newCatalogVersion); TblTransaction tblTxn = MetastoreShim.createTblTransaction(hmsClient, table.getMetaStoreTable(), txn.getId()); HdfsTable hdfsTable = (HdfsTable) table; @@ -2810,6 +2816,8 @@ public class CatalogOpExecutor { Preconditions.checkState(catalog_.getLock().isWriteLockedByCurrentThread()); long newCatalogVersion = catalog_.incrementAndGetCatalogVersion(); catalog_.getLock().writeLock().unlock(); + addCatalogServiceIdentifiers(table, catalog_.getCatalogServiceId(), + newCatalogVersion); HdfsTable hdfsTable = (HdfsTable) table; boolean isTableBeingReplicated = false; Stopwatch sw = Stopwatch.createStarted(); @@ -6334,6 +6342,7 @@ public class CatalogOpExecutor { } applyAlterTable(msTbl); loadTableMetadata(tbl, newCatalogVersion, false, false, null, "ALTER COMMENT"); + catalog_.addVersionsForInflightEvents(false, tbl, newCatalogVersion); addTableToCatalogUpdate(tbl, wantMinimalResult, response.result); addSummary(response, String.format("Updated %s.", (isView) ? "view" : "table")); } finally { @@ -6350,6 +6359,8 @@ public class CatalogOpExecutor { try { long newCatalogVersion = catalog_.incrementAndGetCatalogVersion(); catalog_.getLock().writeLock().unlock(); + addCatalogServiceIdentifiers(tbl, catalog_.getCatalogServiceId(), + newCatalogVersion); if (tbl instanceof KuduTable) { TColumn new_col = new TColumn(columnName, tbl.getColumn(columnName).getType().toThrift()); @@ -6369,6 +6380,7 @@ public class CatalogOpExecutor { } loadTableMetadata(tbl, newCatalogVersion, false, true, null, "ALTER COLUMN COMMENT"); + catalog_.addVersionsForInflightEvents(false, tbl, newCatalogVersion); addTableToCatalogUpdate(tbl, wantMinimalResult, response.result); addSummary(response, "Column has been altered."); } finally { diff --git a/tests/custom_cluster/test_event_processing.py b/tests/custom_cluster/test_event_processing.py index c9679e6..e5ed34d 100644 --- a/tests/custom_cluster/test_event_processing.py +++ b/tests/custom_cluster/test_event_processing.py @@ -659,7 +659,10 @@ class TestEventProcessing(CustomClusterTestSuite): "alter table {0}.{1} set TBLPROPERTIES ('k'='v')".format(db_name, tbl_name), "alter table {0}.{1} ADD COLUMN c1 int".format(db_name, tbl_name), "alter table {0}.{1} ALTER COLUMN C1 set comment 'c1 comment'".format(db_name, - tbl_name), + tbl_name), + "comment on table {0}.{1} IS 'table level comment'".format(db_name, tbl_name), + "comment on column {0}.{1}.C1 IS 'column level comment'".format(db_name, + tbl_name), "alter table {0}.{1} ADD COLUMNS (c2 int, c3 string)".format(db_name, tbl_name), "alter table {0}.{1} DROP COLUMN c1".format(db_name, tbl_name), "alter table {0}.{1} DROP COLUMN c2".format(db_name, tbl_name), @@ -670,6 +673,8 @@ class TestEventProcessing(CustomClusterTestSuite): "alter view {0}.{1} set owner role `test-view-role`".format(db_name, view_name), # compute stats will generates ALTER_PARTITION "compute stats {0}.{1}".format(db_name, tbl_name), + "compute incremental stats {0}.{1}".format(db_name, tbl_name), + "drop stats {0}.{1}".format(db_name, tbl_name), # insert into a existing partition; generates INSERT self-event "insert into table {0}.{1} partition " "(year, month) select * from functional.alltypessmall where year=2009 " @@ -677,8 +682,14 @@ class TestEventProcessing(CustomClusterTestSuite): # insert overwrite query from Impala also generates a INSERT self-event "insert overwrite table {0}.{1} partition " "(year, month) select * from functional.alltypessmall where year=2009 " - "and month=1".format(db_name, tbl_name)], - # Queries which will not increment the events-skipped counter + "and month=1".format(db_name, tbl_name), + # events processor doesn't process delete column stats events currently, + # however, in case of incremental stats, there could be alter table and + # alter partition events which should be ignored. Hence we run compute stats + # before to make sure that the truncate table command generated alter events + # are ignored. + "compute incremental stats {0}.{1}".format(db_name, tbl_name), + "truncate table {0}.{1}".format(db_name, tbl_name)], False: [ "create table {0}.{1} like functional.alltypessmall " "stored as parquet".format(db_name, tbl_name),