This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new e3e9308  IMPALA-9936: Only send invalidations in DDL responses to 
LocalCatalog coordinators
e3e9308 is described below

commit e3e93089d4233785df19a79669bb93b0d23e6d94
Author: stiga-huang <[email protected]>
AuthorDate: Thu Sep 10 09:48:53 2020 +0800

    IMPALA-9936: Only send invalidations in DDL responses to LocalCatalog 
coordinators
    
    Catalogd RPC response contains the updated catalog objects in a full
    form. For instance, a RPC for adding a new partition to an HdfsTable
    will return the whole HdfsTable object(metadata) containing all the
    partitions. This is required by legacy coordinators where the whole
    HdfsTable object is used to replace the stale object(metadata snapshot).
    However, LocalCatalog coordinators just need the object names for
    invalidations. It's a waste of space to send the full catalog objects to
    LocalCatalog coordinators. On the other hand, there is a risk of OOM due
    to hitting the Java array limit when serializing a table that has a huge
    metadata footprint.
    
    This patch refactors the catalogd RPC responses to only send back
    invalidations in need. To distinguish between legacy and LocalCatalog
    coordinators, a new field, want_minimal_response, is introduced in
    TCatalogServiceRequestHeader which is the header for most of the
    Catalogd RPC requests (e.g. TDdlExecRequest, TUpdateCatalogRequest and
    TResetMetadataRequest). LocalCatalog coordinators will set this field to
    true. When adding updated catalog objects to the response, catalogd will
    add invalidations which only contain the object names (e.g. db name,
    table name). Note that function objects are small so are ignored in this
    optimization.
    
    Tests:
     - Add DCHECKs in catalog-op-executor.cc to verify the catalog objects
       recieved by LocalCatalog coordinators are in minimal mode.
     - Run test_ddl.py in both legacy catalog mode and local catalog mode.
    
    Change-Id: Id45827295ddee3eb6e98a11c55f582b2aebe5f38
    Reviewed-on: http://gerrit.cloudera.org:8080/16435
    Reviewed-by: Quanlong Huang <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 be/src/exec/catalog-op-executor.cc                 |  27 +++
 be/src/service/client-request-state.cc             |   4 +
 common/thrift/CatalogService.thrift                |   4 +
 .../org/apache/impala/catalog/CatalogObject.java   |   9 +-
 .../impala/catalog/CatalogServiceCatalog.java      |  28 ++-
 fe/src/main/java/org/apache/impala/catalog/Db.java |   6 +
 .../main/java/org/apache/impala/catalog/Table.java |  29 ++-
 .../apache/impala/service/CatalogOpExecutor.java   | 258 +++++++++++----------
 .../java/org/apache/impala/service/Frontend.java   |   7 +-
 .../org/apache/impala/catalog/CatalogTest.java     |   6 +-
 10 files changed, 239 insertions(+), 139 deletions(-)

diff --git a/be/src/exec/catalog-op-executor.cc 
b/be/src/exec/catalog-op-executor.cc
index 0ffd708..d00b0eb 100644
--- a/be/src/exec/catalog-op-executor.cc
+++ b/be/src/exec/catalog-op-executor.cc
@@ -66,6 +66,29 @@ static Status CatalogRpcDebugFn(int* attempt) {
       Status::OK();
 }
 
+/// Used in LocalCatalog mode to verify the catalog RPC reponse only contains 
minimal
+/// catalog objects, i.e. database objects have only db names and table 
objects have only
+/// db and table names.
+static void VerifyMinimalResponse(const TCatalogUpdateResult& result) {
+  for (const TCatalogObject& obj : result.updated_catalog_objects) {
+    if (obj.type == impala::TCatalogObjectType::TABLE) {
+      // Make sure the table object only contains db_name and tbl_name and 
nothing else.
+      DCHECK(!obj.table.__isset.metastore_table);
+      DCHECK(!obj.table.__isset.columns);
+      DCHECK(!obj.table.__isset.clustering_columns);
+      DCHECK(!obj.table.__isset.table_stats);
+      DCHECK(!obj.table.__isset.hdfs_table);
+      DCHECK(!obj.table.__isset.kudu_table);
+      DCHECK(!obj.table.__isset.hbase_table);
+      DCHECK(!obj.table.__isset.iceberg_table);
+      DCHECK(!obj.table.__isset.data_source_table);
+    } else if (obj.type == impala::TCatalogObjectType::DATABASE) {
+      DCHECK(!obj.db.__isset.metastore_db)
+          << "Minimal database TCatalogObject should have empty metastore_db";
+    }
+  }
+}
+
 Status CatalogOpExecutor::Exec(const TCatalogOpRequest& request) {
   Status status;
   DCHECK(profile_ != NULL);
@@ -88,6 +111,7 @@ Status CatalogOpExecutor::Exec(const TCatalogOpRequest& 
request) {
               FLAGS_catalog_client_rpc_retry_interval_ms,
               [&attempt]() { return CatalogRpcDebugFn(&attempt); }, 
exec_response_.get());
       RETURN_IF_ERROR(rpc_status.status);
+      if (FLAGS_use_local_catalog) 
VerifyMinimalResponse(exec_response_.get()->result);
       catalog_update_result_.reset(
           new TCatalogUpdateResult(exec_response_.get()->result));
       Status status(exec_response_->result.status);
@@ -110,6 +134,7 @@ Status CatalogOpExecutor::Exec(const TCatalogOpRequest& 
request) {
               FLAGS_catalog_client_rpc_retry_interval_ms,
               [&attempt]() { return CatalogRpcDebugFn(&attempt); }, &response);
       RETURN_IF_ERROR(rpc_status.status);
+      if (FLAGS_use_local_catalog) VerifyMinimalResponse(response.result);
       catalog_update_result_.reset(new TCatalogUpdateResult(response.result));
       return Status(response.result.status);
     }
@@ -133,6 +158,8 @@ Status CatalogOpExecutor::ExecComputeStats(
   update_stats_req.__set_ddl_type(TDdlType::ALTER_TABLE);
   update_stats_req.__set_sync_ddl(compute_stats_request.sync_ddl);
   
update_stats_req.__set_debug_action(compute_stats_request.ddl_params.debug_action);
+  update_stats_req.__set_header(TCatalogServiceRequestHeader());
+  update_stats_req.header.__set_want_minimal_response(FLAGS_use_local_catalog);
 
   const TComputeStatsParams& compute_stats_params =
       compute_stats_request.ddl_params.compute_stats_params;
diff --git a/be/src/service/client-request-state.cc 
b/be/src/service/client-request-state.cc
index 6a60e24..3d1e3ff 100644
--- a/be/src/service/client-request-state.cc
+++ b/be/src/service/client-request-state.cc
@@ -84,6 +84,7 @@ DECLARE_int32(krpc_port);
 DECLARE_int32(catalog_service_port);
 DECLARE_string(catalog_service_host);
 DECLARE_int64(max_result_cache_size);
+DECLARE_bool(use_local_catalog);
 
 namespace impala {
 
@@ -258,6 +259,8 @@ Status ClientRequestState::Exec() {
       reset_req.__set_op_type(TCatalogOpType::RESET_METADATA);
       reset_req.__set_reset_metadata_params(TResetMetadataRequest());
       
reset_req.reset_metadata_params.__set_header(TCatalogServiceRequestHeader());
+      reset_req.reset_metadata_params.header.__set_want_minimal_response(
+          FLAGS_use_local_catalog);
       reset_req.reset_metadata_params.__set_is_refresh(true);
       reset_req.reset_metadata_params.__set_table_name(
           exec_request_->load_data_request.table_name);
@@ -1269,6 +1272,7 @@ Status ClientRequestState::UpdateCatalog() {
     catalog_update.__set_header(TCatalogServiceRequestHeader());
     catalog_update.header.__set_requesting_user(effective_user());
     catalog_update.header.__set_client_ip(session()->network_address.hostname);
+    catalog_update.header.__set_want_minimal_response(FLAGS_use_local_catalog);
     catalog_update.header.__set_redacted_sql_stmt(
         query_ctx_.client_request.__isset.redacted_stmt ?
             query_ctx_.client_request.redacted_stmt : 
query_ctx_.client_request.stmt);
diff --git a/common/thrift/CatalogService.thrift 
b/common/thrift/CatalogService.thrift
index 3e620af..bb0fce3 100644
--- a/common/thrift/CatalogService.thrift
+++ b/common/thrift/CatalogService.thrift
@@ -55,6 +55,10 @@ struct TCatalogServiceRequestHeader {
 
   // The client IP address.
   3: optional string client_ip
+
+  // Set by LocalCatalog coordinators. The response will contain minimal 
catalog objects
+  // (for invalidations) instead of full catalog objects.
+  4: optional bool want_minimal_response
 }
 
 // Returns details on the result of an operation that updates the catalog. 
Information
diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogObject.java 
b/fe/src/main/java/org/apache/impala/catalog/CatalogObject.java
index 90e289c..39da7a9 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogObject.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogObject.java
@@ -30,12 +30,15 @@ public interface CatalogObject extends HasName {
    * "full" form with all information, used in catalog topic updates and DDL 
responses to
    * coordinators. When sending incremental update for a hdfs table, its 
"descriptor" form
    * is used with no partitions. Its incremental partition updates will follow 
it in the
-   * same topic update.
+   * same topic update. "invalidation" form means only the name will be 
included. "none"
+   * form means return nothing, i.e. null.
    */
   static enum ThriftObjectType {
     FULL,
-    DESCRIPTOR_ONLY
-  };
+    DESCRIPTOR_ONLY,
+    INVALIDATION,
+    NONE
+  }
 
   // Returns the TCatalogObject type of this Catalog object.
   public TCatalogObjectType getCatalogObjectType();
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java 
b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
index 7fee648..23473e6 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
@@ -2272,11 +2272,13 @@ public class CatalogServiceCatalog extends Catalog {
   }
 
   /**
-   * Wrapper around {@link #reloadTable(Table, boolean, String)} which passes 
false for
-   * {@code refreshUpdatedPartitions} argument.
+   * Wrapper around {@link #reloadTable(Table, boolean, 
CatalogObject.ThriftObjectType,
+   * String)} which passes false for {@code refreshUpdatedPartitions} argument 
and ignore
+   * the result.
    */
-  public TCatalogObject reloadTable(Table tbl, String reason) throws 
CatalogException {
-    return reloadTable(tbl, new TResetMetadataRequest(), reason);
+  public void reloadTable(Table tbl, String reason) throws CatalogException {
+    reloadTable(tbl, new TResetMetadataRequest(), 
CatalogObject.ThriftObjectType.NONE,
+        reason);
   }
 
   /**
@@ -2287,9 +2289,10 @@ public class CatalogServiceCatalog extends Catalog {
    * Throws a CatalogException if there is an error loading table metadata.
    * If {@code refreshUpdatedParts} is true, the refresh logic detects updated
    * partitions in metastore and reloads them too.
+   * if {@code wantMinimalResult} is true, returns the result in the minimal 
form.
    */
   public TCatalogObject reloadTable(Table tbl, TResetMetadataRequest request,
-      String reason) throws CatalogException {
+      CatalogObject.ThriftObjectType resultType, String reason) throws 
CatalogException {
     LOG.info(String.format("Refreshing table metadata: %s", 
tbl.getFullName()));
     Preconditions.checkState(!(tbl instanceof IncompleteTable));
     String dbName = tbl.getDb().getName();
@@ -2321,7 +2324,7 @@ public class CatalogServiceCatalog extends Catalog {
       }
       tbl.setCatalogVersion(newCatalogVersion);
       LOG.info(String.format("Refreshed table metadata: %s", 
tbl.getFullName()));
-      return tbl.toTCatalogObject();
+      return tbl.toTCatalogObject(resultType);
     } finally {
       context.stop();
       Preconditions.checkState(!versionLock_.isWriteLockedByCurrentThread());
@@ -2506,7 +2509,8 @@ public class CatalogServiceCatalog extends Catalog {
       throw new TableNotLoadedException(dbName + "." + tblName + " is not 
loaded");
     }
     Reference<Boolean> wasPartitionRefreshed = new Reference<>(false);
-    reloadPartition(table, tPartSpec, wasPartitionRefreshed, reason);
+    reloadPartition(table, tPartSpec, wasPartitionRefreshed,
+        CatalogObject.ThriftObjectType.NONE, reason);
     return wasPartitionRefreshed.getRef();
   }
 
@@ -2837,9 +2841,9 @@ public class CatalogServiceCatalog extends Catalog {
    * 'partitionSpec' in table 'tbl'. Returns the resulting table's 
TCatalogObject after
    * the partition metadata was reloaded.
    */
-  public TCatalogObject reloadPartition(Table tbl,
-      List<TPartitionKeyValue> partitionSpec,
-      Reference<Boolean> wasPartitionReloaded, String reason) throws 
CatalogException {
+  public TCatalogObject reloadPartition(Table tbl, List<TPartitionKeyValue> 
partitionSpec,
+      Reference<Boolean> wasPartitionReloaded, CatalogObject.ThriftObjectType 
resultType,
+      String reason) throws CatalogException {
     if (!tryLockTable(tbl)) {
       throw new CatalogException(String.format("Error reloading partition of 
table %s " +
           "due to lock contention", tbl.getFullName()));
@@ -2876,7 +2880,7 @@ public class CatalogServiceCatalog extends Catalog {
                     + "it does not exist in metastore anymore",
                 hdfsTable.getFullName() + " " + partitionName));
           }
-          return hdfsTable.toTCatalogObject();
+          return hdfsTable.toTCatalogObject(resultType);
         } catch (Exception e) {
           throw new CatalogException("Error loading metadata for partition: "
               + hdfsTable.getFullName() + " " + partitionName, e);
@@ -2887,7 +2891,7 @@ public class CatalogServiceCatalog extends Catalog {
       wasPartitionReloaded.setRef(true);
       LOG.info(String.format("Refreshed partition metadata: %s %s",
           hdfsTable.getFullName(), partitionName));
-      return hdfsTable.toTCatalogObject();
+      return hdfsTable.toTCatalogObject(resultType);
     } finally {
       Preconditions.checkState(!versionLock_.isWriteLockedByCurrentThread());
       tbl.getLock().unlock();
diff --git a/fe/src/main/java/org/apache/impala/catalog/Db.java 
b/fe/src/main/java/org/apache/impala/catalog/Db.java
index b987008..bd6353f 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Db.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Db.java
@@ -465,6 +465,12 @@ public class Db extends CatalogObjectImpl implements FeDb {
     catalogObject.setDb(toThrift());
   }
 
+  public TCatalogObject toMinimalTCatalogObject() {
+    TCatalogObject min = new TCatalogObject(getCatalogObjectType(), 
getCatalogVersion());
+    min.setDb(new TDatabase(getName()));
+    return min;
+  }
+
   /**
    * Get partial information about this DB in order to service 
CatalogdMetaProvider
    * running in a remote impalad.
diff --git a/fe/src/main/java/org/apache/impala/catalog/Table.java 
b/fe/src/main/java/org/apache/impala/catalog/Table.java
index 625b3cb..ed935f6 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Table.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Table.java
@@ -522,13 +522,21 @@ public abstract class Table extends CatalogObjectImpl 
implements FeTable {
   private TCatalogObject toMinimalTCatalogObjectHelper() {
     TCatalogObject catalogObject =
         new TCatalogObject(getCatalogObjectType(), getCatalogVersion());
-    catalogObject.setTable(new TTable());
-    catalogObject.getTable().setDb_name(getDb().getName());
-    catalogObject.getTable().setTbl_name(getName());
+    catalogObject.setTable(new TTable(getDb().getName(), getName()));
     return catalogObject;
   }
 
   /**
+   * Returns a TCatalogObject with only the table name for invalidation. For 
non-hdfs
+   * tables, it's the same as toMinimalTCatalogObject(). For hdfs tables, their
+   * toMinimalTCatalogObject() will also return the partition names. So we use 
this method
+   * to get a light-weight object for invalidation in LocalCatalog.
+   */
+  public TCatalogObject toInvalidationObject() {
+    return toMinimalTCatalogObjectHelper();
+  }
+
+  /**
    * Override parent implementation that will finally call toThrift() which 
requires
    * holding the table lock. However, it's not guaranteed that caller holds 
the table
    * lock (IMPALA-9136). Here we use toMinimalTCatalogObjectHelper() directly 
since only
@@ -545,6 +553,21 @@ public abstract class Table extends CatalogObjectImpl 
implements FeTable {
   }
 
   /**
+   * Generates a TCatalogObject based on the required form. See more details 
in comments
+   * of ThriftObjectType.
+   */
+  public TCatalogObject toTCatalogObject(ThriftObjectType resultType) {
+    switch (resultType) {
+      case FULL: return toTCatalogObject();
+      case DESCRIPTOR_ONLY: return toMinimalTCatalogObject();
+      case INVALIDATION: return toInvalidationObject();
+      case NONE:
+      default:
+        return null;
+    }
+  }
+
+  /**
    * Return partial info about this table. This is called only on the catalogd 
to
    * service GetPartialCatalogObject RPCs.
    */
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java 
b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index 9fa0f9b..b4c1ddf 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -80,6 +80,7 @@ import org.apache.impala.authorization.AuthorizationDelta;
 import org.apache.impala.authorization.AuthorizationManager;
 import org.apache.impala.authorization.User;
 import org.apache.impala.catalog.CatalogException;
+import org.apache.impala.catalog.CatalogObject;
 import org.apache.impala.catalog.CatalogServiceCatalog;
 import org.apache.impala.catalog.Column;
 import org.apache.impala.catalog.ColumnNotFoundException;
@@ -343,8 +344,13 @@ public class CatalogOpExecutor {
     response.setResult(new TCatalogUpdateResult());
     response.getResult().setCatalog_service_id(JniCatalog.getServiceId());
     User requestingUser = null;
+    boolean wantMinimalResult = false;
     if (ddlRequest.isSetHeader()) {
-      requestingUser = new User(ddlRequest.getHeader().getRequesting_user());
+      TCatalogServiceRequestHeader header = ddlRequest.getHeader();
+      if (header.isSetRequesting_user()) {
+        requestingUser = new User(ddlRequest.getHeader().getRequesting_user());
+      }
+      wantMinimalResult = ddlRequest.getHeader().isWant_minimal_response();
     }
     Optional<TTableName> tTableName = Optional.empty();
     TDdlType ddl_type = ddlRequest.ddl_type;
@@ -355,53 +361,55 @@ public class CatalogOpExecutor {
           TAlterDbParams alter_db_params = ddlRequest.getAlter_db_params();
           tTableName = Optional.of(new TTableName(alter_db_params.db, ""));
           catalogOpMetric_.increment(ddl_type, tTableName);
-          alterDatabase(alter_db_params, response);
+          alterDatabase(alter_db_params, wantMinimalResult, response);
           break;
         case ALTER_TABLE:
           TAlterTableParams alter_table_params = 
ddlRequest.getAlter_table_params();
           tTableName = Optional.of(alter_table_params.getTable_name());
           catalogOpMetric_.increment(ddl_type, tTableName);
-          alterTable(alter_table_params, ddlRequest.getDebug_action(), 
response);
+          alterTable(alter_table_params, ddlRequest.getDebug_action(), 
wantMinimalResult,
+              response);
           break;
         case ALTER_VIEW:
           TCreateOrAlterViewParams alter_view_params = 
ddlRequest.getAlter_view_params();
           tTableName = Optional.of(alter_view_params.getView_name());
           catalogOpMetric_.increment(ddl_type, tTableName);
-          alterView(alter_view_params, response);
+          alterView(alter_view_params, wantMinimalResult, response);
           break;
         case CREATE_DATABASE:
           TCreateDbParams create_db_params = ddlRequest.getCreate_db_params();
           tTableName = Optional.of(new TTableName(create_db_params.db, ""));
           catalogOpMetric_.increment(ddl_type, tTableName);
-          createDatabase(create_db_params, response, syncDdl);
+          createDatabase(create_db_params, response, syncDdl, 
wantMinimalResult);
           break;
         case CREATE_TABLE_AS_SELECT:
           TCreateTableParams create_table_as_select_params =
               ddlRequest.getCreate_table_params();
           tTableName = 
Optional.of(create_table_as_select_params.getTable_name());
           catalogOpMetric_.increment(ddl_type, tTableName);
-          response.setNew_table_created(
-              createTable(create_table_as_select_params, response, syncDdl));
+          
response.setNew_table_created(createTable(create_table_as_select_params,
+              response, syncDdl, wantMinimalResult));
           break;
         case CREATE_TABLE:
           TCreateTableParams create_table_params = 
ddlRequest.getCreate_table_params();
           tTableName = Optional.of((create_table_params.getTable_name()));
           catalogOpMetric_.increment(ddl_type, tTableName);
-          createTable(ddlRequest.getCreate_table_params(), response, syncDdl);
+          createTable(ddlRequest.getCreate_table_params(), response, syncDdl,
+              wantMinimalResult);
           break;
         case CREATE_TABLE_LIKE:
           TCreateTableLikeParams create_table_like_params =
               ddlRequest.getCreate_table_like_params();
           tTableName = Optional.of(create_table_like_params.getTable_name());
           catalogOpMetric_.increment(ddl_type, tTableName);
-          createTableLike(create_table_like_params, response, syncDdl);
+          createTableLike(create_table_like_params, response, syncDdl, 
wantMinimalResult);
           break;
         case CREATE_VIEW:
           TCreateOrAlterViewParams create_view_params =
               ddlRequest.getCreate_view_params();
           tTableName = Optional.of(create_view_params.getView_name());
           catalogOpMetric_.increment(ddl_type, tTableName);
-          createView(create_view_params, response);
+          createView(create_view_params, wantMinimalResult, response);
           break;
         case CREATE_FUNCTION:
           catalogOpMetric_.increment(ddl_type, Optional.empty());
@@ -419,7 +427,7 @@ public class CatalogOpExecutor {
           TDropStatsParams drop_stats_params = 
ddlRequest.getDrop_stats_params();
           tTableName = Optional.of(drop_stats_params.getTable_name());
           catalogOpMetric_.increment(ddl_type, tTableName);
-          dropStats(drop_stats_params, response);
+          dropStats(drop_stats_params, wantMinimalResult, response);
           break;
         case DROP_DATABASE:
           TDropDbParams drop_db_params = ddlRequest.getDrop_db_params();
@@ -433,13 +441,15 @@ public class CatalogOpExecutor {
               ddlRequest.getDrop_table_or_view_params();
           tTableName = Optional.of(drop_table_or_view_params.getTable_name());
           catalogOpMetric_.increment(ddl_type, tTableName);
+          // Dropped tables and views are already returned as minimal results, 
so don't
+          // need to pass down wantMinimalResult here.
           dropTableOrView(drop_table_or_view_params, response);
           break;
         case TRUNCATE_TABLE:
           TTruncateParams truncate_params = ddlRequest.getTruncate_params();
           tTableName = Optional.of(truncate_params.getTable_name());
           catalogOpMetric_.increment(ddl_type, tTableName);
-          truncateTable(truncate_params, response);
+          truncateTable(truncate_params, wantMinimalResult, response);
           break;
         case DROP_FUNCTION:
           catalogOpMetric_.increment(ddl_type, Optional.empty());
@@ -480,7 +490,7 @@ public class CatalogOpExecutor {
         case COMMENT_ON:
           TCommentOnParams comment_on_params = 
ddlRequest.getComment_on_params();
           tTableName = Optional.of(new TTableName("", ""));
-          alterCommentOn(comment_on_params, response, tTableName);
+          alterCommentOn(comment_on_params, response, tTableName, 
wantMinimalResult);
           break;
         case COPY_TESTCASE:
           catalogOpMetric_.increment(ddl_type, Optional.empty());
@@ -636,7 +646,7 @@ public class CatalogOpExecutor {
    * thread-safe, i.e. concurrent operations on the same table are serialized.
    */
   private void alterTable(TAlterTableParams params, @Nullable String 
debugAction,
-      TDdlExecResponse response)
+      boolean wantMinimalResult, TDdlExecResponse response)
       throws ImpalaException {
     // When true, loads the file/block metadata.
     boolean reloadFileMetadata = false;
@@ -670,7 +680,7 @@ public class CatalogOpExecutor {
         try {
           alterTableOrViewRename(tbl,
               
TableName.fromThrift(params.getRename_params().getNew_table_name()),
-              newCatalogVersion, response);
+              newCatalogVersion, wantMinimalResult, response);
           return;
         } finally {
           // release the version taken in the tryLock call above
@@ -683,11 +693,13 @@ public class CatalogOpExecutor {
       catalog_.getLock().writeLock().unlock();
 
       if (tbl instanceof KuduTable && altersKuduTable(params.getAlter_type())) 
{
-        alterKuduTable(params, response, (KuduTable) tbl, newCatalogVersion);
+        alterKuduTable(params, response, (KuduTable) tbl, newCatalogVersion,
+            wantMinimalResult);
         return;
       } else if (tbl instanceof IcebergTable &&
           altersIcebergTable(params.getAlter_type())) {
-        alterIcebergTable(params, response, (IcebergTable) tbl, 
newCatalogVersion);
+        alterIcebergTable(params, response, (IcebergTable) tbl, 
newCatalogVersion,
+            wantMinimalResult);
         return;
       }
       switch (params.getAlter_type()) {
@@ -721,7 +733,7 @@ public class CatalogOpExecutor {
             // only add the versions for in-flight events when we are sure 
that the
             // partition was really added.
             catalog_.addVersionsForInflightEvents(false, tbl, 
newCatalogVersion);
-            addTableToCatalogUpdate(refreshedTable, response.result);
+            addTableToCatalogUpdate(refreshedTable, wantMinimalResult, 
response.result);
           }
           reloadMetadata = false;
           addSummary(response, "New partition has been added to the table.");
@@ -756,7 +768,7 @@ public class CatalogOpExecutor {
             // since by the time the event is received, the partition is 
already
             // removed from catalog and there is nothing to compare against 
during
             // self-event evaluation
-            addTableToCatalogUpdate(refreshedTable, response.result);
+            addTableToCatalogUpdate(refreshedTable, wantMinimalResult, 
response.result);
           }
           addSummary(response,
               "Dropped " + numUpdatedPartitions.getRef() + " partition(s).");
@@ -863,7 +875,7 @@ public class CatalogOpExecutor {
         // now that HMS alter operation has succeeded, add this version to 
list of
         // inflight events in catalog table if event processing is enabled
         catalog_.addVersionsForInflightEvents(false, tbl, newCatalogVersion);
-        addTableToCatalogUpdate(tbl, response.result);
+        addTableToCatalogUpdate(tbl, wantMinimalResult, response.result);
       }
       // Make sure all the modifications are done.
       Preconditions.checkState(!tbl.hasInProgressModification());
@@ -892,7 +904,8 @@ public class CatalogOpExecutor {
    * Executes the ALTER TABLE command for a Kudu table and reloads its 
metadata.
    */
   private void alterKuduTable(TAlterTableParams params, TDdlExecResponse 
response,
-      KuduTable tbl, long newCatalogVersion) throws ImpalaException {
+      KuduTable tbl, long newCatalogVersion, boolean wantMinimalResult)
+      throws ImpalaException {
     Preconditions.checkState(tbl.getLock().isHeldByCurrentThread());
     switch (params.getAlter_type()) {
       case ADD_COLUMNS:
@@ -932,7 +945,7 @@ public class CatalogOpExecutor {
 
     loadTableMetadata(tbl, newCatalogVersion, true, true, null, "ALTER KUDU 
TABLE " +
         params.getAlter_type().name());
-    addTableToCatalogUpdate(tbl, response.result);
+    addTableToCatalogUpdate(tbl, wantMinimalResult, response.result);
   }
 
   /**
@@ -950,7 +963,8 @@ public class CatalogOpExecutor {
    * Executes the ALTER TABLE command for a Iceberg table and reloads its 
metadata.
    */
   private void alterIcebergTable(TAlterTableParams params, TDdlExecResponse 
response,
-      IcebergTable tbl, long newCatalogVersion) throws ImpalaException {
+      IcebergTable tbl, long newCatalogVersion, boolean wantMinimalResult)
+      throws ImpalaException {
     Preconditions.checkState(tbl.getLock().isHeldByCurrentThread());
     switch (params.getAlter_type()) {
       case ADD_COLUMNS:
@@ -982,7 +996,7 @@ public class CatalogOpExecutor {
 
     loadTableMetadata(tbl, newCatalogVersion, true, true, null, "ALTER Iceberg 
TABLE " +
         params.getAlter_type().name());
-    addTableToCatalogUpdate(tbl, response.result);
+    addTableToCatalogUpdate(tbl, wantMinimalResult, response.result);
   }
 
   /**
@@ -1013,11 +1027,12 @@ public class CatalogOpExecutor {
    * Serializes and adds table 'tbl' to a TCatalogUpdateResult object. Uses the
    * version of the serialized table as the version of the catalog update 
result.
    */
-  private static void addTableToCatalogUpdate(Table tbl, TCatalogUpdateResult 
result) {
+  private static void addTableToCatalogUpdate(Table tbl, boolean 
wantMinimalResult,
+      TCatalogUpdateResult result) {
     Preconditions.checkNotNull(tbl);
-    TCatalogObject updatedCatalogObject = tbl.toTCatalogObject();
-    // TODO(IMPALA-9936): if client is a 'v2' impalad, only send back 
invalidation
     // TODO(IMPALA-9937): if client is a 'v1' impalad, only send back 
incremental updates
+    TCatalogObject updatedCatalogObject = wantMinimalResult ?
+        tbl.toInvalidationObject() : tbl.toTCatalogObject();
     result.addToUpdated_catalog_objects(updatedCatalogObject);
     result.setVersion(updatedCatalogObject.getCatalog_version());
   }
@@ -1045,8 +1060,8 @@ public class CatalogOpExecutor {
    * if the view does not exist or if the existing metadata entry is
    * a table instead of a a view.
    */
-   private void alterView(TCreateOrAlterViewParams params, TDdlExecResponse 
resp)
-      throws ImpalaException {
+   private void alterView(TCreateOrAlterViewParams params, boolean 
wantMinimalResult,
+       TDdlExecResponse resp) throws ImpalaException {
     TableName tableName = TableName.fromThrift(params.getView_name());
     Preconditions.checkState(tableName != null && 
tableName.isFullyQualified());
     Preconditions.checkState(params.getColumns() != null &&
@@ -1084,7 +1099,7 @@ public class CatalogOpExecutor {
       }
       addSummary(resp, "View has been altered.");
       tbl.setCatalogVersion(newCatalogVersion);
-      addTableToCatalogUpdate(tbl, resp.result);
+      addTableToCatalogUpdate(tbl, wantMinimalResult, resp.result);
     } finally {
       UnlockWriteLockIfErronouslyLocked();
       tbl.getLock().unlock();
@@ -1343,7 +1358,7 @@ public class CatalogOpExecutor {
    * @param  syncDdl tells if SYNC_DDL option is enabled on this DDL request.
    */
   private void createDatabase(TCreateDbParams params, TDdlExecResponse resp,
-      boolean syncDdl) throws ImpalaException {
+      boolean syncDdl, boolean wantMinimalResult) throws ImpalaException {
     Preconditions.checkNotNull(params);
     String dbName = params.getDb();
     Preconditions.checkState(dbName != null && !dbName.isEmpty(),
@@ -1381,9 +1396,7 @@ public class CatalogOpExecutor {
           existingDb.getLock().unlock();
         }
       }
-      // TODO(IMPALA-9936): if client is a 'v2' impalad, only send back 
invalidation
-      
resp.getResult().addToUpdated_catalog_objects(existingDb.toTCatalogObject());
-      resp.getResult().setVersion(existingDb.getCatalogVersion());
+      addDbToCatalogUpdate(existingDb, wantMinimalResult, resp.result);
       addSummary(resp, "Database already exists.");
       return;
     }
@@ -1441,9 +1454,7 @@ public class CatalogOpExecutor {
         }
       }
 
-      Preconditions.checkNotNull(newDb);
-      // TODO(IMPALA-9936): if client is a 'v2' impalad, only send back 
invalidation
-      resp.result.addToUpdated_catalog_objects(newDb.toTCatalogObject());
+      addDbToCatalogUpdate(newDb, wantMinimalResult, resp.result);
       if (authzConfig_.isEnabled()) {
         authzManager_.updateDatabaseOwnerPrivilege(params.server_name, 
newDb.getName(),
             /* oldOwner */ null, /* oldOwnerType */ null,
@@ -1451,7 +1462,6 @@ public class CatalogOpExecutor {
             resp);
       }
     }
-    resp.result.setVersion(newDb.getCatalogVersion());
   }
 
   private void createFunction(TCreateFunctionParams params, TDdlExecResponse 
resp)
@@ -1591,8 +1601,8 @@ public class CatalogOpExecutor {
    * encountered as part of this operation. Acquires a lock on the modified 
table
    * to protect against concurrent modifications.
    */
-  private void dropStats(TDropStatsParams params, TDdlExecResponse resp)
-      throws ImpalaException {
+  private void dropStats(TDropStatsParams params, boolean wantMinimalResult,
+      TDdlExecResponse resp) throws ImpalaException {
     Table table = getExistingTable(params.getTable_name().getDb_name(),
         params.getTable_name().getTable_name(), "Load for DROP STATS");
     Preconditions.checkNotNull(table);
@@ -1629,7 +1639,7 @@ public class CatalogOpExecutor {
       }
       loadTableMetadata(table, newCatalogVersion, /*reloadFileMetadata=*/false,
           /*reloadTableSchema=*/true, /*partitionsToUpdate=*/null, "DROP 
STATS");
-      addTableToCatalogUpdate(table, resp.result);
+      addTableToCatalogUpdate(table, wantMinimalResult, resp.result);
       addSummary(resp, "Stats have been dropped.");
     } finally {
       UnlockWriteLockIfErronouslyLocked();
@@ -2095,8 +2105,8 @@ public class CatalogOpExecutor {
    * concurrent table modifications.
    * TODO truncate specified partitions.
    */
-  private void truncateTable(TTruncateParams params, TDdlExecResponse resp)
-      throws ImpalaException {
+  private void truncateTable(TTruncateParams params, boolean wantMinimalResult,
+      TDdlExecResponse resp) throws ImpalaException {
     TTableName tblName = params.getTable_name();
     Table table = null;
     try {
@@ -2137,7 +2147,7 @@ public class CatalogOpExecutor {
       Preconditions.checkState(newCatalogVersion > 0);
       addSummary(resp, "Table has been truncated.");
       loadTableMetadata(table, newCatalogVersion, true, true, null, 
"TRUNCATE");
-      addTableToCatalogUpdate(table, resp.result);
+      addTableToCatalogUpdate(table, wantMinimalResult, resp.result);
     } finally {
       UnlockWriteLockIfErronouslyLocked();
       if (table.getLock().isHeldByCurrentThread()) {
@@ -2374,7 +2384,7 @@ public class CatalogOpExecutor {
    * otherwise.
    */
   private boolean createTable(TCreateTableParams params, TDdlExecResponse 
response,
-      boolean syncDdl) throws ImpalaException {
+      boolean syncDdl, boolean wantMinimalResult) throws ImpalaException {
     Preconditions.checkNotNull(params);
     TableName tableName = TableName.fromThrift(params.getTable_name());
     Preconditions.checkState(tableName != null && 
tableName.isFullyQualified());
@@ -2407,7 +2417,7 @@ public class CatalogOpExecutor {
           LOG.trace("Table {} version bumped to {} because SYNC_DDL is 
enabled.",
               tableName, newVersion);
         }
-        addTableToCatalogUpdate(existingTbl, response.result);
+        addTableToCatalogUpdate(existingTbl, wantMinimalResult, 
response.result);
       } finally {
         // Release the locks held in tryLock().
         catalog_.getLock().writeLock().unlock();
@@ -2417,14 +2427,16 @@ public class CatalogOpExecutor {
     }
     org.apache.hadoop.hive.metastore.api.Table tbl = 
createMetaStoreTable(params);
     LOG.trace("Creating table {}", tableName);
-    if (KuduTable.isKuduTable(tbl)) return createKuduTable(tbl, params, 
response);
-    else if (IcebergTable.isIcebergTable(tbl)) {
-      return createIcebergTable(tbl, params, response);
+    if (KuduTable.isKuduTable(tbl)) {
+      return createKuduTable(tbl, params, wantMinimalResult, response);
+    } else if (IcebergTable.isIcebergTable(tbl)) {
+      return createIcebergTable(tbl, params, wantMinimalResult, response);
     }
     Preconditions.checkState(params.getColumns().size() > 0,
         "Empty column list given as argument to Catalog.createTable");
     return createTable(tbl, params.if_not_exists, params.getCache_op(),
-        params.server_name, params.getPrimary_keys(), 
params.getForeign_keys(), response);
+        params.server_name, params.getPrimary_keys(), params.getForeign_keys(),
+        wantMinimalResult, response);
   }
 
   /**
@@ -2520,7 +2532,8 @@ public class CatalogOpExecutor {
    * table was created as part of this call, false otherwise.
    */
   private boolean createKuduTable(org.apache.hadoop.hive.metastore.api.Table 
newTable,
-      TCreateTableParams params, TDdlExecResponse response) throws 
ImpalaException {
+      TCreateTableParams params, boolean wantMinimalResult, TDdlExecResponse 
response)
+      throws ImpalaException {
     Preconditions.checkState(KuduTable.isKuduTable(newTable));
     boolean createHMSTable;
     if (!KuduTable.isSynchronizedTable(newTable)) {
@@ -2554,7 +2567,7 @@ public class CatalogOpExecutor {
         // Add the table to the catalog cache
         Table newTbl = catalog_.addIncompleteTable(newTable.getDbName(),
             newTable.getTableName());
-        addTableToCatalogUpdate(newTbl, response.result);
+        addTableToCatalogUpdate(newTbl, wantMinimalResult, response.result);
       }
     } catch (Exception e) {
       try {
@@ -2591,7 +2604,7 @@ public class CatalogOpExecutor {
   private boolean createTable(org.apache.hadoop.hive.metastore.api.Table 
newTable,
       boolean if_not_exists, THdfsCachingOp cacheOp, String serverName,
       List<SQLPrimaryKey> primaryKeys, List<SQLForeignKey> foreignKeys,
-      TDdlExecResponse response) throws ImpalaException {
+      boolean wantMinimalResult, TDdlExecResponse response) throws 
ImpalaException {
     Preconditions.checkState(!KuduTable.isKuduTable(newTable));
     synchronized (metastoreDdlLock_) {
       try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
@@ -2649,7 +2662,7 @@ public class CatalogOpExecutor {
       }
       Table newTbl = catalog_.addIncompleteTable(newTable.getDbName(),
           newTable.getTableName());
-      addTableToCatalogUpdate(newTbl, response.result);
+      addTableToCatalogUpdate(newTbl, wantMinimalResult, response.result);
       if (authzConfig_.isEnabled()) {
         authzManager_.updateTableOwnerPrivilege(serverName, 
newTable.getDbName(),
             newTable.getTableName(), /* oldOwner */ null,
@@ -2665,8 +2678,8 @@ public class CatalogOpExecutor {
    * lazily load the new metadata on the next access. Re-throws any Metastore
    * exceptions encountered during the create.
    */
-  private void createView(TCreateOrAlterViewParams params, TDdlExecResponse 
response)
-      throws ImpalaException {
+  private void createView(TCreateOrAlterViewParams params, boolean 
wantMinimalResult,
+      TDdlExecResponse response) throws ImpalaException {
     TableName tableName = TableName.fromThrift(params.getView_name());
     Preconditions.checkState(tableName != null && 
tableName.isFullyQualified());
     Preconditions.checkState(params.getColumns() != null &&
@@ -2687,7 +2700,7 @@ public class CatalogOpExecutor {
     setCreateViewAttributes(params, view);
     LOG.trace(String.format("Creating view %s", tableName));
     if (!createTable(view, params.if_not_exists, null, params.server_name,
-        new ArrayList<>(), new ArrayList<>(), response)) {
+        new ArrayList<>(), new ArrayList<>(), wantMinimalResult, response)) {
       addSummary(response, "View already exists.");
     } else {
       addSummary(response, "View has been created.");
@@ -2698,7 +2711,8 @@ public class CatalogOpExecutor {
    * Creates a new Iceberg table.
    */
   private boolean 
createIcebergTable(org.apache.hadoop.hive.metastore.api.Table newTable,
-      TCreateTableParams params, TDdlExecResponse response) throws 
ImpalaException {
+      TCreateTableParams params, boolean wantMinimalResult, TDdlExecResponse 
response)
+      throws ImpalaException {
     Preconditions.checkState(IcebergTable.isIcebergTable(newTable));
 
     try {
@@ -2782,7 +2796,7 @@ public class CatalogOpExecutor {
         // Add the table to the catalog cache
         Table newTbl = catalog_.addIncompleteTable(newTable.getDbName(),
             newTable.getTableName());
-        addTableToCatalogUpdate(newTbl, response.result);
+        addTableToCatalogUpdate(newTbl, wantMinimalResult, response.result);
       }
     } catch (Exception e) {
       if (e instanceof AlreadyExistsException && params.if_not_exists) {
@@ -2805,7 +2819,7 @@ public class CatalogOpExecutor {
    * @param  syncDdl tells is SYNC_DDL is enabled for this DDL request.
    */
   private void createTableLike(TCreateTableLikeParams params, TDdlExecResponse 
response,
-      boolean syncDdl) throws ImpalaException {
+      boolean syncDdl, boolean wantMinimalResult) throws ImpalaException {
     Preconditions.checkNotNull(params);
 
     THdfsFileFormat fileFormat =
@@ -2842,7 +2856,7 @@ public class CatalogOpExecutor {
           LOG.trace("Table {} version bumped to {} because SYNC_DDL is 
enabled.",
               existingTbl.getFullName(), newVersion);
         }
-        addTableToCatalogUpdate(existingTbl, response.result);
+        addTableToCatalogUpdate(existingTbl, wantMinimalResult, 
response.result);
       } finally {
         // Release the locks held in tryLock().
         catalog_.getLock().writeLock().unlock();
@@ -2912,7 +2926,7 @@ public class CatalogOpExecutor {
     setDefaultTableCapabilities(tbl);
     LOG.trace(String.format("Creating table %s LIKE %s", tblName, srcTblName));
     createTable(tbl, params.if_not_exists, null, params.server_name, null, 
null,
-        response);
+        wantMinimalResult, response);
   }
 
   private static void setDefaultTableCapabilities(
@@ -3339,7 +3353,8 @@ public class CatalogOpExecutor {
    * reloaded on the next access.
    */
   private void alterTableOrViewRename(Table oldTbl, TableName newTableName,
-      long newCatalogVersion, TDdlExecResponse response) throws 
ImpalaException {
+      long newCatalogVersion, boolean wantMinimalResult, TDdlExecResponse 
response)
+      throws ImpalaException {
     Preconditions.checkState(oldTbl.getLock().isHeldByCurrentThread()
         && catalog_.getLock().isWriteLockedByCurrentThread());
     TableName tableName = oldTbl.getTableName();
@@ -3391,9 +3406,14 @@ public class CatalogOpExecutor {
           newTableName.toString()));
     }
     catalog_.addVersionsForInflightEvents(false, result.second, 
newCatalogVersion);
-    // TODO(IMPALA-9936): if client is a 'v2' impalad, only send back 
invalidation
-    
response.result.addToRemoved_catalog_objects(result.first.toMinimalTCatalogObject());
-    
response.result.addToUpdated_catalog_objects(result.second.toTCatalogObject());
+    if (wantMinimalResult) {
+      
response.result.addToRemoved_catalog_objects(result.first.toInvalidationObject());
+      
response.result.addToUpdated_catalog_objects(result.second.toInvalidationObject());
+    } else {
+      response.result.addToRemoved_catalog_objects(
+          result.first.toMinimalTCatalogObject());
+      
response.result.addToUpdated_catalog_objects(result.second.toTCatalogObject());
+    }
     response.result.setVersion(result.second.getCatalogVersion());
     addSummary(response, "Renaming was successful.");
   }
@@ -4447,36 +4467,37 @@ public class CatalogOpExecutor {
           boolean isTableLoadedInCatalog = tbl.isLoaded();
           tbl = getExistingTable(tblName.getDb(), tblName.getTbl(),
               "Load triggered by " + cmdString);
-          if (tbl != null) {
-            if (isTableLoadedInCatalog) {
-              if (req.isSetPartition_spec()) {
-                boolean isTransactional = AcidUtils.isTransactionalTable(
-                    tbl.getMetaStoreTable().getParameters());
-                Preconditions.checkArgument(!isTransactional);
-                Reference<Boolean> wasPartitionRefreshed = new 
Reference<>(false);
-                // TODO if the partition was not really refreshed because the 
partSpec
-                // was wrong, do we still need to send back the table?
-                updatedThriftTable = catalog_.reloadPartition(tbl,
-                    req.getPartition_spec(), wasPartitionRefreshed, cmdString);
-              } else {
-                // TODO IMPALA-8809: Optimisation for partitioned tables:
-                //   1: Reload the whole table if schema change happened. 
Identify
-                //     such scenario by checking Table.TBL_PROP_LAST_DDL_TIME 
property.
-                //     Note, table level writeId is not updated by HMS for 
partitioned
-                //     ACID tables, there is a Jira to cover this: HIVE-22062.
-                //   2: If no need for a full table reload then fetch 
partition level
-                //     writeIds and reload only the ones that changed.
-                updatedThriftTable = catalog_
-                    .reloadTable(tbl, req, cmdString);
-              }
+          CatalogObject.ThriftObjectType resultType =
+              req.header.want_minimal_response ?
+                  CatalogObject.ThriftObjectType.INVALIDATION :
+                  CatalogObject.ThriftObjectType.FULL;
+          if (isTableLoadedInCatalog) {
+            if (req.isSetPartition_spec()) {
+              boolean isTransactional = AcidUtils.isTransactionalTable(
+                  tbl.getMetaStoreTable().getParameters());
+              Preconditions.checkArgument(!isTransactional);
+              Reference<Boolean> wasPartitionRefreshed = new 
Reference<>(false);
+              // TODO if the partition was not really refreshed because the 
partSpec
+              // was wrong, do we still need to send back the table?
+              updatedThriftTable = catalog_.reloadPartition(tbl,
+                  req.getPartition_spec(), wasPartitionRefreshed, resultType, 
cmdString);
             } else {
-              // Table was loaded from scratch, so it's already "refreshed".
-              tbl.getLock().lock();
-              try {
-                updatedThriftTable = tbl.toTCatalogObject();
-              } finally {
-                tbl.getLock().unlock();
-              }
+              // TODO IMPALA-8809: Optimisation for partitioned tables:
+              //   1: Reload the whole table if schema change happened. 
Identify
+              //     such scenario by checking Table.TBL_PROP_LAST_DDL_TIME 
property.
+              //     Note, table level writeId is not updated by HMS for 
partitioned
+              //     ACID tables, there is a Jira to cover this: HIVE-22062.
+              //   2: If no need for a full table reload then fetch partition 
level
+              //     writeIds and reload only the ones that changed.
+              updatedThriftTable = catalog_.reloadTable(tbl, req, resultType, 
cmdString);
+            }
+          } else {
+            // Table was loaded from scratch, so it's already "refreshed".
+            tbl.getLock().lock();
+            try {
+              updatedThriftTable = tbl.toTCatalogObject(resultType);
+            } finally {
+              tbl.getLock().unlock();
             }
           }
         }
@@ -4497,7 +4518,6 @@ public class CatalogOpExecutor {
       if (tblWasRemoved.getRef()) {
         resp.getResult().addToRemoved_catalog_objects(updatedThriftTable);
       } else {
-        // TODO(IMPALA-9936): if client is a 'v2' impalad, only send back 
invalidation
         // TODO(IMPALA-9937): if client is a 'v1' impalad, only send back 
incremental
         //  updates
         resp.getResult().addToUpdated_catalog_objects(updatedThriftTable);
@@ -4788,7 +4808,8 @@ public class CatalogOpExecutor {
         createInsertEvents(table, filesBeforeInsert, 
affectedExistingPartitions,
             newPartsCreated, update.is_overwrite, txnId, writeId);
       }
-      addTableToCatalogUpdate(table, response.result);
+      addTableToCatalogUpdate(table, update.header.want_minimal_response,
+          response.result);
     } finally {
       context.stop();
       UnlockWriteLockIfErronouslyLocked();
@@ -5013,21 +5034,21 @@ public class CatalogOpExecutor {
   }
 
   private void alterCommentOn(TCommentOnParams params, TDdlExecResponse 
response,
-      Optional<TTableName> tTableName)
+      Optional<TTableName> tTableName, boolean wantMinimalResult)
       throws ImpalaRuntimeException, CatalogException, InternalException {
     if (params.getDb() != null) {
       Preconditions.checkArgument(!params.isSetTable_name() &&
           !params.isSetColumn_name());
       tTableName.get().setDb_name(params.db);
       catalogOpMetric_.increment(TDdlType.COMMENT_ON, tTableName);
-      alterCommentOnDb(params.getDb(), params.getComment(), response);
+      alterCommentOnDb(params.getDb(), params.getComment(), wantMinimalResult, 
response);
     } else if (params.getTable_name() != null) {
       Preconditions.checkArgument(!params.isSetDb() && 
!params.isSetColumn_name());
       tTableName.get().setDb_name(params.table_name.db_name);
       tTableName.get().setTable_name(params.table_name.table_name);
       catalogOpMetric_.increment(TDdlType.COMMENT_ON, tTableName);
       alterCommentOnTableOrView(TableName.fromThrift(params.getTable_name()),
-          params.getComment(), response);
+          params.getComment(), wantMinimalResult, response);
     } else if (params.getColumn_name() != null) {
       Preconditions.checkArgument(!params.isSetDb() && 
!params.isSetTable_name());
       TColumnName columnName = params.getColumn_name();
@@ -5035,13 +5056,14 @@ public class CatalogOpExecutor {
       tTableName.get().setTable_name(columnName.table_name.table_name);
       catalogOpMetric_.increment(TDdlType.COMMENT_ON, tTableName);
       alterCommentOnColumn(TableName.fromThrift(columnName.getTable_name()),
-          columnName.getColumn_name(), params.getComment(), response);
+          columnName.getColumn_name(), params.getComment(), wantMinimalResult, 
response);
     } else {
       throw new UnsupportedOperationException("Unsupported COMMENT ON 
operation");
     }
   }
 
-  private void alterCommentOnDb(String dbName, String comment, 
TDdlExecResponse response)
+  private void alterCommentOnDb(String dbName, String comment, boolean 
wantMinimalResult,
+      TDdlExecResponse response)
       throws ImpalaRuntimeException, CatalogException, InternalException {
     Db db = catalog_.getDb(dbName);
     if (db == null) {
@@ -5062,7 +5084,7 @@ public class CatalogOpExecutor {
         throw e;
       }
       Db updatedDb = catalog_.updateDb(msDb);
-      addDbToCatalogUpdate(updatedDb, response.result);
+      addDbToCatalogUpdate(updatedDb, wantMinimalResult, response.result);
       // now that HMS alter operation has succeeded, add this version to list 
of inflight
       // events in catalog database if event processing is enabled
       catalog_.addVersionsForInflightEvents(db, newCatalogVersion);
@@ -5072,8 +5094,8 @@ public class CatalogOpExecutor {
     addSummary(response, "Updated database.");
   }
 
-  private void alterDatabase(TAlterDbParams params, TDdlExecResponse response)
-      throws ImpalaException {
+  private void alterDatabase(TAlterDbParams params, boolean wantMinimalResult,
+      TDdlExecResponse response) throws ImpalaException {
     Preconditions.checkNotNull(params);
     String dbName = params.getDb();
     Db db = catalog_.getDb(dbName);
@@ -5082,7 +5104,8 @@ public class CatalogOpExecutor {
     }
     switch (params.getAlter_type()) {
       case SET_OWNER:
-        alterDatabaseSetOwner(db, params.getSet_owner_params(), response);
+        alterDatabaseSetOwner(db, params.getSet_owner_params(), 
wantMinimalResult,
+            response);
         break;
       default:
         throw new UnsupportedOperationException(
@@ -5091,7 +5114,7 @@ public class CatalogOpExecutor {
   }
 
   private void alterDatabaseSetOwner(Db db, TAlterDbSetOwnerParams params,
-      TDdlExecResponse response) throws ImpalaException {
+      boolean wantMinimalResult, TDdlExecResponse response) throws 
ImpalaException {
     Preconditions.checkNotNull(params.owner_name);
     Preconditions.checkNotNull(params.owner_type);
     tryLock(db, "altering the owner");
@@ -5117,7 +5140,7 @@ public class CatalogOpExecutor {
             msDb.getOwnerType(), response);
       }
       Db updatedDb = catalog_.updateDb(msDb);
-      addDbToCatalogUpdate(updatedDb, response.result);
+      addDbToCatalogUpdate(updatedDb, wantMinimalResult, response.result);
       // now that HMS alter operation has succeeded, add this version to list 
of inflight
       // events in catalog database if event processing is enabled
       catalog_.addVersionsForInflightEvents(db, newCatalogVersion);
@@ -5141,18 +5164,19 @@ public class CatalogOpExecutor {
         String.valueOf(newCatalogVersion));
   }
 
-  private void addDbToCatalogUpdate(Db db, TCatalogUpdateResult result) {
+  private void addDbToCatalogUpdate(Db db, boolean wantMinimalResult,
+      TCatalogUpdateResult result) {
     Preconditions.checkNotNull(db);
-    TCatalogObject updatedCatalogObject = db.toTCatalogObject();
+    TCatalogObject updatedCatalogObject = wantMinimalResult ?
+        db.toMinimalTCatalogObject() : db.toTCatalogObject();
     
updatedCatalogObject.setCatalog_version(updatedCatalogObject.getCatalog_version());
-    // TODO(IMPALA-9936): if client is a 'v2' impalad, only send back 
invalidation
     result.addToUpdated_catalog_objects(updatedCatalogObject);
     result.setVersion(updatedCatalogObject.getCatalog_version());
   }
 
   private void alterCommentOnTableOrView(TableName tableName, String comment,
-      TDdlExecResponse response) throws CatalogException, InternalException,
-      ImpalaRuntimeException {
+      boolean wantMinimalResult, TDdlExecResponse response)
+      throws CatalogException, InternalException, ImpalaRuntimeException {
     Table tbl = getExistingTable(tableName.getDb(), tableName.getTbl(),
         "Load for ALTER COMMENT");
     tryLock(tbl);
@@ -5172,7 +5196,7 @@ public class CatalogOpExecutor {
       }
       applyAlterTable(msTbl);
       loadTableMetadata(tbl, newCatalogVersion, false, false, null, "ALTER 
COMMENT");
-      addTableToCatalogUpdate(tbl, response.result);
+      addTableToCatalogUpdate(tbl, wantMinimalResult, response.result);
       addSummary(response, String.format("Updated %s.", (isView) ? "view" : 
"table"));
     } finally {
       tbl.getLock().unlock();
@@ -5180,8 +5204,8 @@ public class CatalogOpExecutor {
   }
 
   private void alterCommentOnColumn(TableName tableName, String columnName,
-      String comment, TDdlExecResponse response) throws CatalogException,
-      InternalException, ImpalaRuntimeException {
+      String comment, boolean wantMinimalResult, TDdlExecResponse response)
+      throws CatalogException, InternalException, ImpalaRuntimeException {
     Table tbl = getExistingTable(tableName.getDb(), tableName.getTbl(),
         "Load for ALTER COLUMN COMMENT");
     tryLock(tbl);
@@ -5207,7 +5231,7 @@ public class CatalogOpExecutor {
       }
       loadTableMetadata(tbl, newCatalogVersion, false, true, null,
           "ALTER COLUMN COMMENT");
-      addTableToCatalogUpdate(tbl, response.result);
+      addTableToCatalogUpdate(tbl, wantMinimalResult, response.result);
       addSummary(response, "Column has been altered.");
     } finally {
       tbl.getLock().unlock();
diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java 
b/fe/src/main/java/org/apache/impala/service/Frontend.java
index a8ff340..1a1a4c9 100644
--- a/fe/src/main/java/org/apache/impala/service/Frontend.java
+++ b/fe/src/main/java/org/apache/impala/service/Frontend.java
@@ -682,6 +682,8 @@ public class Frontend {
       TClientRequest clientRequest = queryCtx.getClient_request();
       header.setRedacted_sql_stmt(clientRequest.isSetRedacted_stmt() ?
           clientRequest.getRedacted_stmt() : clientRequest.getStmt());
+      header.setWant_minimal_response(
+          BackendConfig.INSTANCE.getBackendCfg().use_local_catalog);
       ddl.getDdl_params().setHeader(header);
       ddl.getDdl_params().setSync_ddl(ddl.isSync_ddl());
       // forward debug_actions to the catalogd
@@ -689,11 +691,12 @@ public class Frontend {
         ddl.getDdl_params()
             .setDebug_action(result.getQuery_options().getDebug_action());
       }
-    }
-    if (ddl.getOp_type() == TCatalogOpType.RESET_METADATA) {
+    } else if (ddl.getOp_type() == TCatalogOpType.RESET_METADATA) {
       ddl.getReset_metadata_params().setSync_ddl(ddl.isSync_ddl());
       ddl.getReset_metadata_params().setRefresh_updated_hms_partitions(
           result.getQuery_options().isRefresh_updated_hms_partitions());
+      ddl.getReset_metadata_params().getHeader().setWant_minimal_response(
+          BackendConfig.INSTANCE.getBackendCfg().use_local_catalog);
       // forward debug_actions to the catalogd
       if (result.getQuery_options().isSetDebug_action()) {
         ddl.getReset_metadata_params()
diff --git a/fe/src/test/java/org/apache/impala/catalog/CatalogTest.java 
b/fe/src/test/java/org/apache/impala/catalog/CatalogTest.java
index 762107b..0060189 100644
--- a/fe/src/test/java/org/apache/impala/catalog/CatalogTest.java
+++ b/fe/src/test/java/org/apache/impala/catalog/CatalogTest.java
@@ -404,7 +404,8 @@ public class CatalogTest {
     List<TPartitionKeyValue> partitionSpec = ImmutableList.of(
         new TPartitionKeyValue("year", "2010"),
         new TPartitionKeyValue("month", "10"));
-    catalog_.reloadPartition(table, partitionSpec, new Reference<>(false), 
"test");
+    catalog_.reloadPartition(table, partitionSpec, new Reference<>(false),
+        CatalogObject.ThriftObjectType.NONE, "test");
     assertEquals(0L, (long)opsCounts.getLong(GET_FILE_BLOCK_LOCS));
 
     // Loading or reloading an unpartitioned table with some files in it 
should not make
@@ -426,7 +427,8 @@ public class CatalogTest {
     partBuilder.setFileDescriptors(new ArrayList<>());
     table.updatePartition(partBuilder);
     stats.reset();
-    catalog_.reloadPartition(table, partitionSpec, new Reference<>(false), 
"test");
+    catalog_.reloadPartition(table, partitionSpec, new Reference<>(false),
+        CatalogObject.ThriftObjectType.NONE, "test");
 
     // Should not scan the directory file-by-file, should use a single
     // listLocatedStatus() to get the whole directory (partition)

Reply via email to