This is an automated email from the ASF dual-hosted git repository. michaelsmith pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 45682c132ffb6c5511e40e949a2c8d4008f17e7d Author: zhangyifan27 <[email protected]> AuthorDate: Tue Dec 12 10:17:06 2023 +0800 IMPALA-12229: Support soft-delete Kudu table Adds 'kudu_table_reserve_seconds' query option to set reserved time for deleted Impala managed Kudu tables. The default value is 0. This option can prevent users from deleting important Kudu tables by mistake. Testing: - Added e2e tests. Change-Id: I3020567bb6cfe4dd48ef17906f8de674f37217e7 Reviewed-on: http://gerrit.cloudera.org:8080/20773 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- be/src/service/query-options.cc | 7 +++ be/src/service/query-options.h | 4 +- common/thrift/CatalogService.thrift | 5 ++ common/thrift/ImpalaService.thrift | 5 ++ common/thrift/Query.thrift | 3 ++ .../apache/impala/service/CatalogOpExecutor.java | 33 +++++++----- .../java/org/apache/impala/service/Frontend.java | 2 + .../impala/service/KuduCatalogOpExecutor.java | 9 ++-- infra/python/deps/kudu-requirements.txt | 2 +- tests/query_test/test_kudu.py | 59 ++++++++++++++++++++++ 10 files changed, 112 insertions(+), 17 deletions(-) diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc index 062b9b6c2..4678a7d19 100644 --- a/be/src/service/query-options.cc +++ b/be/src/service/query-options.cc @@ -1166,6 +1166,13 @@ Status impala::SetQueryOption(const string& key, const string& value, query_options->__set_codegen_opt_level(enum_type); break; } + case TImpalaQueryOptions::KUDU_TABLE_RESERVE_SECONDS: { + int32_t int32_t_val = 0; + RETURN_IF_ERROR(QueryOptionParser::ParseAndCheckNonNegative<int32_t>( + option, value, &int32_t_val)); + query_options->__set_kudu_table_reserve_seconds(int32_t_val); + break; + } default: if (IsRemovedQueryOption(key)) { LOG(WARNING) << "Ignoring attempt to set removed query option '" << key << "'"; diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h index 68337d907..c420e0743 100644 --- a/be/src/service/query-options.h +++ b/be/src/service/query-options.h @@ -50,7 +50,7 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type> // time we add or remove a query option to/from the enum TImpalaQueryOptions. #define QUERY_OPTS_TABLE \ DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(), \ - TImpalaQueryOptions::CODEGEN_OPT_LEVEL + 1); \ + TImpalaQueryOptions::KUDU_TABLE_RESERVE_SECONDS + 1); \ REMOVED_QUERY_OPT_FN(abort_on_default_limit_exceeded, ABORT_ON_DEFAULT_LIMIT_EXCEEDED) \ QUERY_OPT_FN(abort_on_error, ABORT_ON_ERROR, TQueryOptionLevel::REGULAR) \ REMOVED_QUERY_OPT_FN(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS) \ @@ -311,6 +311,8 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type> QUERY_OPT_FN(hdfs_scanner_non_reserved_bytes, HDFS_SCANNER_NON_RESERVED_BYTES, \ TQueryOptionLevel::ADVANCED) \ QUERY_OPT_FN(codegen_opt_level, CODEGEN_OPT_LEVEL, TQueryOptionLevel::ADVANCED) \ + QUERY_OPT_FN(kudu_table_reserve_seconds, KUDU_TABLE_RESERVE_SECONDS, \ + TQueryOptionLevel::ADVANCED) \ ; /// Enforce practical limits on some query options to avoid undesired query state. diff --git a/common/thrift/CatalogService.thrift b/common/thrift/CatalogService.thrift index 7d2282d3f..99d210d06 100644 --- a/common/thrift/CatalogService.thrift +++ b/common/thrift/CatalogService.thrift @@ -108,6 +108,11 @@ struct TDdlQueryOptions { // Maximum wait time on an HMS ACID lock in seconds. 3: optional i32 lock_max_wait_time_s + + // The reservation time (in seconds) for deleted impala-managed kudu table. + // During this time deleted Kudu tables can be recovered by Kudu's 'recall table' API. + // See KUDU-3326 for details. + 4: optional i32 kudu_table_reserve_seconds } // Request for executing a DDL operation (CREATE, ALTER, DROP). diff --git a/common/thrift/ImpalaService.thrift b/common/thrift/ImpalaService.thrift index 13c6c4e34..d2fb3a046 100644 --- a/common/thrift/ImpalaService.thrift +++ b/common/thrift/ImpalaService.thrift @@ -873,6 +873,11 @@ enum TImpalaQueryOptions { // Select codegen optimization level from O0, O1, Os, O2, or O3. Higher levels will // overwrite existing codegen cache entries. CODEGEN_OPT_LEVEL = 167 + + // The reservation time (in seconds) for deleted impala-managed Kudu tables. + // During this time deleted Kudu tables can be recovered by Kudu's 'recall table' API. + // See KUDU-3326 for details. + KUDU_TABLE_RESERVE_SECONDS = 168 } // The summary of a DML statement. diff --git a/common/thrift/Query.thrift b/common/thrift/Query.thrift index 39ba7f896..e6e5ded5a 100644 --- a/common/thrift/Query.thrift +++ b/common/thrift/Query.thrift @@ -678,6 +678,9 @@ struct TQueryOptions { // See comment in ImpalaService.thrift 168: optional TCodeGenOptLevel codegen_opt_level = TCodeGenOptLevel.O2 + + // See comment in ImpalaService.thrift + 169: optional i32 kudu_table_reserve_seconds = 0; } // Impala currently has three types of sessions: Beeswax, HiveServer2 and external diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java index f89bfd40f..75fac245a 100644 --- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java +++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java @@ -520,7 +520,8 @@ public class CatalogOpExecutor { TDropDbParams drop_db_params = ddlRequest.getDrop_db_params(); tTableName = Optional.of(new TTableName(drop_db_params.getDb(), "")); catalogOpTracker_.increment(ddlRequest, tTableName); - dropDatabase(drop_db_params, response); + dropDatabase(drop_db_params, response, + ddlRequest.getQuery_options().getKudu_table_reserve_seconds()); break; case DROP_TABLE: case DROP_VIEW: @@ -531,7 +532,8 @@ public class CatalogOpExecutor { // Dropped tables and views are already returned as minimal results, so don't // need to pass down wantMinimalResult here. dropTableOrView(drop_table_or_view_params, response, - ddlRequest.getQuery_options().getLock_max_wait_time_s()); + ddlRequest.getQuery_options().getLock_max_wait_time_s(), + ddlRequest.getQuery_options().getKudu_table_reserve_seconds()); break; case TRUNCATE_TABLE: TTruncateParams truncate_params = ddlRequest.getTruncate_params(); @@ -2652,8 +2654,8 @@ public class CatalogOpExecutor { * internal cache. Attempts to remove the HDFS cache directives of the underlying * tables. Re-throws any HMS exceptions encountered during the drop. */ - private void dropDatabase(TDropDbParams params, TDdlExecResponse resp) - throws ImpalaException { + private void dropDatabase(TDropDbParams params, TDdlExecResponse resp, + int kudu_table_reserve_seconds) throws ImpalaException { Preconditions.checkNotNull(params); String dbName = params.getDb(); Preconditions.checkState(dbName != null && !dbName.isEmpty(), @@ -2677,7 +2679,9 @@ public class CatalogOpExecutor { getMetastoreDdlLock().lock(); try { // Remove all the Kudu tables of 'db' from the Kudu storage engine. - if (db != null && params.cascade) dropTablesFromKudu(db); + if (db != null && params.cascade) { + dropTablesFromKudu(db, kudu_table_reserve_seconds); + } // The Kudu tables in the HMS should have been dropped at this point // with the Hive Metastore integration enabled. @@ -2764,7 +2768,8 @@ public class CatalogOpExecutor { * metadata for Kudu tables cannot be loaded from HMS or if an error occurs while * trying to drop a table from Kudu. */ - private void dropTablesFromKudu(Db db) throws ImpalaException { + private void dropTablesFromKudu(Db db, int kudu_table_reserve_seconds) + throws ImpalaException { // If the table format isn't available, because the table hasn't been loaded yet, // the metadata must be fetched from the Hive Metastore. List<String> incompleteTableNames = Lists.newArrayList(); @@ -2793,7 +2798,8 @@ public class CatalogOpExecutor { // some reason Kudu is permanently stuck in a non-functional state, the user is // expected to ALTER TABLE to either set the table to UNMANAGED or set the format // to something else. - KuduCatalogOpExecutor.dropTable(msTable, /*if exists*/ true); + KuduCatalogOpExecutor.dropTable( + msTable, /*if exists*/ true, kudu_table_reserve_seconds); } } @@ -2837,7 +2843,7 @@ public class CatalogOpExecutor { * executing the drop operation. */ private void dropTableOrView(TDropTableOrViewParams params, TDdlExecResponse resp, - int lockMaxWaitTime) throws ImpalaException { + int lockMaxWaitTime, int kudu_table_reserve_seconds) throws ImpalaException { TableName tableName = TableName.fromThrift(params.getTable_name()); Preconditions.checkState(tableName != null && tableName.isFullyQualified()); Preconditions.checkState(!catalog_.isBlacklistedTable(tableName) || params.if_exists, @@ -2882,7 +2888,7 @@ public class CatalogOpExecutor { } try { - dropTableOrViewInternal(params, tableName, resp); + dropTableOrViewInternal(params, tableName, resp, kudu_table_reserve_seconds); } finally { if (lockId > 0) catalog_.releaseTableLock(lockId); } @@ -2892,7 +2898,8 @@ public class CatalogOpExecutor { * Helper function for dropTableOrView(). */ private void dropTableOrViewInternal(TDropTableOrViewParams params, - TableName tableName, TDdlExecResponse resp) throws ImpalaException { + TableName tableName, TDdlExecResponse resp, int kudu_table_reserve_seconds) + throws ImpalaException { TCatalogObject removedObject = new TCatalogObject(); getMetastoreDdlLock().lock(); try { @@ -2950,7 +2957,8 @@ public class CatalogOpExecutor { boolean isSynchronizedKuduTable = msTbl != null && KuduTable.isKuduTable(msTbl) && KuduTable.isSynchronizedTable(msTbl); if (isSynchronizedKuduTable) { - KuduCatalogOpExecutor.dropTable(msTbl, /* if exists */ true); + KuduCatalogOpExecutor.dropTable( + msTbl, /* if exists */ true, kudu_table_reserve_seconds); } long eventId; @@ -3654,7 +3662,8 @@ public class CatalogOpExecutor { try { // Error creating the table in HMS, drop the synchronized table from Kudu. if (!KuduTable.isSynchronizedTable(newTable)) { - KuduCatalogOpExecutor.dropTable(newTable, false); + KuduCatalogOpExecutor.dropTable( + newTable, /* if exists */ false, /* kudu_table_reserve_seconds */ 0); } } catch (Exception logged) { String kuduTableName = newTable.getParameters().get(KuduTable.KEY_TABLE_NAME); diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java b/fe/src/main/java/org/apache/impala/service/Frontend.java index 543c756a4..c0633b83a 100644 --- a/fe/src/main/java/org/apache/impala/service/Frontend.java +++ b/fe/src/main/java/org/apache/impala/service/Frontend.java @@ -885,6 +885,8 @@ public class Frontend { } ddlQueryOpts.setLock_max_wait_time_s( result.getQuery_options().lock_max_wait_time_s); + ddlQueryOpts.setKudu_table_reserve_seconds( + result.getQuery_options().kudu_table_reserve_seconds); ddl.getDdl_params().setQuery_options(ddlQueryOpts); } else if (ddl.getOp_type() == TCatalogOpType.RESET_METADATA) { ddl.getReset_metadata_params().setSync_ddl(ddl.isSync_ddl()); diff --git a/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java index 738e21fbf..f725ab0a9 100644 --- a/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java +++ b/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java @@ -324,10 +324,13 @@ public class KuduCatalogOpExecutor { /** * Drops the table in Kudu. If the table does not exist and 'ifExists' is false, a * TableNotFoundException is thrown. If the table exists and could not be dropped, - * an ImpalaRuntimeException is thrown. + * an ImpalaRuntimeException is thrown. If 'kudu_table_reserve_seconds' is 0, the + * table will be deleted immediately, otherwise the table will be reserved in the + * kudu cluster for 'kudu_table_reserve_seconds'. */ public static void dropTable(org.apache.hadoop.hive.metastore.api.Table msTbl, - boolean ifExists) throws ImpalaRuntimeException, TableNotFoundException { + boolean ifExists, int kudu_table_reserve_seconds) + throws ImpalaRuntimeException, TableNotFoundException { Preconditions.checkState(KuduTable.isSynchronizedTable(msTbl)); String tableName = msTbl.getParameters().get(KuduTable.KEY_TABLE_NAME); String masterHosts = msTbl.getParameters().get(KuduTable.KEY_MASTER_HOSTS); @@ -341,7 +344,7 @@ public class KuduCatalogOpExecutor { // TODO: The IF EXISTS case should be handled by Kudu to ensure atomicity. // (see KUDU-1710). if (kudu.tableExists(tableName)) { - kudu.deleteTable(tableName); + kudu.deleteTable(tableName, kudu_table_reserve_seconds); } else if (!ifExists) { throw new TableNotFoundException(String.format( "Table '%s' does not exist in Kudu master(s) '%s'.", tableName, masterHosts)); diff --git a/infra/python/deps/kudu-requirements.txt b/infra/python/deps/kudu-requirements.txt index bb5e2c429..893e9658b 100644 --- a/infra/python/deps/kudu-requirements.txt +++ b/infra/python/deps/kudu-requirements.txt @@ -19,4 +19,4 @@ # and also depends on Cython being installed into the virtualenv, so it must be installed # after the toolchain is bootstrapped and all requirements in requirements.txt and # compiled-requirements.txt are installed into the virtualenv. -kudu-python==1.14.0 +kudu-python==1.17.0 diff --git a/tests/query_test/test_kudu.py b/tests/query_test/test_kudu.py index 1472bd130..ea3c3388b 100644 --- a/tests/query_test/test_kudu.py +++ b/tests/query_test/test_kudu.py @@ -1258,6 +1258,40 @@ class TestDropDb(KuduTestSuite): assert kudu_client.table_exists(kudu_table.name) assert not kudu_client.table_exists(managed_table_name) + @SkipIfKudu.hms_integration_enabled + def test_soft_drop_db_cascade(self, unique_cursor, kudu_client): + """Check that an attempt to drop a database will succeed but the managed Kudu tables + are not removed immediately if 'kudu_table_reserve_seconds' is greater than 0. + These Kudu tables are in 'soft_deleted' state and can be recalled during the + reservation period. + """ + db_name = unique_cursor.conn.db_name + table_name_pattern = "managed_kudu_table_" + for i in range(10): + managed_table_name = table_name_pattern + str(i) + unique_cursor.execute(""" + CREATE TABLE %s (a INT PRIMARY KEY) PARTITION BY HASH (a) PARTITIONS 3 + STORED AS KUDU""" % managed_table_name) + kudu_tbl_name = KuduTestSuite.to_kudu_table_name(db_name, managed_table_name) + assert kudu_client.table_exists(kudu_tbl_name) + + unique_cursor.execute("set kudu_table_reserve_seconds=300") + unique_cursor.execute("USE DEFAULT") + unique_cursor.execute("DROP DATABASE %s CASCADE" % db_name) + unique_cursor.execute("SHOW DATABASES") + assert (db_name, '') not in unique_cursor.fetchall() + + for i in range(10): + kudu_tbl_name = \ + KuduTestSuite.to_kudu_table_name(db_name, table_name_pattern + str(i)) + assert kudu_client.table_exists(kudu_tbl_name) + assert kudu_tbl_name not in kudu_client.list_tables() + assert kudu_tbl_name in kudu_client.list_soft_deleted_tables() + table = kudu_client.table(kudu_tbl_name) + kudu_client.recall_table(table.id) + assert kudu_tbl_name in kudu_client.list_tables() + assert kudu_tbl_name not in kudu_client.list_soft_deleted_tables() + class TestImpalaKuduIntegration(KuduTestSuite): @SkipIfKudu.hms_integration_enabled def test_replace_kudu_table(self, cursor, kudu_client): @@ -1338,6 +1372,31 @@ class TestImpalaKuduIntegration(KuduTestSuite): cursor.execute("SHOW TABLES IN %s" % unique_database) assert (impala_tbl_name,) not in cursor.fetchall() + @SkipIfKudu.hms_integration_enabled + def test_soft_delete_kudu_table(self, cursor, kudu_client, unique_database): + """Check that the query option 'kudu_table_reserve_seconds' works for managed Kudu + table. If it is greater than 0, the underlying Kudu will not be deleted immediately. + During the reservation period, the Kudu table can be recalled.""" + impala_tbl_name = "foo" + cursor.execute("""CREATE TABLE %s.%s (a INT PRIMARY KEY) PARTITION BY HASH (a) + PARTITIONS 3 STORED AS KUDU""" % (unique_database, impala_tbl_name)) + kudu_tbl_name = KuduTestSuite.to_kudu_table_name(unique_database, impala_tbl_name) + assert kudu_client.table_exists(kudu_tbl_name) + + cursor.execute("set kudu_table_reserve_seconds=300") + cursor.execute("DROP TABLE %s.%s" % (unique_database, impala_tbl_name)) + cursor.execute("SHOW TABLES IN %s" % unique_database) + assert (impala_tbl_name,) not in cursor.fetchall() + + assert kudu_client.table_exists(kudu_tbl_name) + assert kudu_tbl_name not in kudu_client.list_tables() + assert kudu_tbl_name in kudu_client.list_soft_deleted_tables() + + table = kudu_client.table(kudu_tbl_name) + kudu_client.recall_table(table.id) + assert kudu_tbl_name in kudu_client.list_tables() + assert kudu_tbl_name not in kudu_client.list_soft_deleted_tables() + @SkipIfNotHdfsMinicluster.tuned_for_minicluster class TestKuduMemLimits(KuduTestSuite):
