This is an automated email from the ASF dual-hosted git repository.
stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new e3e9308 IMPALA-9936: Only send invalidations in DDL responses to
LocalCatalog coordinators
e3e9308 is described below
commit e3e93089d4233785df19a79669bb93b0d23e6d94
Author: stiga-huang <[email protected]>
AuthorDate: Thu Sep 10 09:48:53 2020 +0800
IMPALA-9936: Only send invalidations in DDL responses to LocalCatalog
coordinators
Catalogd RPC response contains the updated catalog objects in a full
form. For instance, a RPC for adding a new partition to an HdfsTable
will return the whole HdfsTable object(metadata) containing all the
partitions. This is required by legacy coordinators where the whole
HdfsTable object is used to replace the stale object(metadata snapshot).
However, LocalCatalog coordinators just need the object names for
invalidations. It's a waste of space to send the full catalog objects to
LocalCatalog coordinators. On the other hand, there is a risk of OOM due
to hitting the Java array limit when serializing a table that has a huge
metadata footprint.
This patch refactors the catalogd RPC responses to only send back
invalidations in need. To distinguish between legacy and LocalCatalog
coordinators, a new field, want_minimal_response, is introduced in
TCatalogServiceRequestHeader which is the header for most of the
Catalogd RPC requests (e.g. TDdlExecRequest, TUpdateCatalogRequest and
TResetMetadataRequest). LocalCatalog coordinators will set this field to
true. When adding updated catalog objects to the response, catalogd will
add invalidations which only contain the object names (e.g. db name,
table name). Note that function objects are small so are ignored in this
optimization.
Tests:
- Add DCHECKs in catalog-op-executor.cc to verify the catalog objects
recieved by LocalCatalog coordinators are in minimal mode.
- Run test_ddl.py in both legacy catalog mode and local catalog mode.
Change-Id: Id45827295ddee3eb6e98a11c55f582b2aebe5f38
Reviewed-on: http://gerrit.cloudera.org:8080/16435
Reviewed-by: Quanlong Huang <[email protected]>
Tested-by: Impala Public Jenkins <[email protected]>
---
be/src/exec/catalog-op-executor.cc | 27 +++
be/src/service/client-request-state.cc | 4 +
common/thrift/CatalogService.thrift | 4 +
.../org/apache/impala/catalog/CatalogObject.java | 9 +-
.../impala/catalog/CatalogServiceCatalog.java | 28 ++-
fe/src/main/java/org/apache/impala/catalog/Db.java | 6 +
.../main/java/org/apache/impala/catalog/Table.java | 29 ++-
.../apache/impala/service/CatalogOpExecutor.java | 258 +++++++++++----------
.../java/org/apache/impala/service/Frontend.java | 7 +-
.../org/apache/impala/catalog/CatalogTest.java | 6 +-
10 files changed, 239 insertions(+), 139 deletions(-)
diff --git a/be/src/exec/catalog-op-executor.cc
b/be/src/exec/catalog-op-executor.cc
index 0ffd708..d00b0eb 100644
--- a/be/src/exec/catalog-op-executor.cc
+++ b/be/src/exec/catalog-op-executor.cc
@@ -66,6 +66,29 @@ static Status CatalogRpcDebugFn(int* attempt) {
Status::OK();
}
+/// Used in LocalCatalog mode to verify the catalog RPC reponse only contains
minimal
+/// catalog objects, i.e. database objects have only db names and table
objects have only
+/// db and table names.
+static void VerifyMinimalResponse(const TCatalogUpdateResult& result) {
+ for (const TCatalogObject& obj : result.updated_catalog_objects) {
+ if (obj.type == impala::TCatalogObjectType::TABLE) {
+ // Make sure the table object only contains db_name and tbl_name and
nothing else.
+ DCHECK(!obj.table.__isset.metastore_table);
+ DCHECK(!obj.table.__isset.columns);
+ DCHECK(!obj.table.__isset.clustering_columns);
+ DCHECK(!obj.table.__isset.table_stats);
+ DCHECK(!obj.table.__isset.hdfs_table);
+ DCHECK(!obj.table.__isset.kudu_table);
+ DCHECK(!obj.table.__isset.hbase_table);
+ DCHECK(!obj.table.__isset.iceberg_table);
+ DCHECK(!obj.table.__isset.data_source_table);
+ } else if (obj.type == impala::TCatalogObjectType::DATABASE) {
+ DCHECK(!obj.db.__isset.metastore_db)
+ << "Minimal database TCatalogObject should have empty metastore_db";
+ }
+ }
+}
+
Status CatalogOpExecutor::Exec(const TCatalogOpRequest& request) {
Status status;
DCHECK(profile_ != NULL);
@@ -88,6 +111,7 @@ Status CatalogOpExecutor::Exec(const TCatalogOpRequest&
request) {
FLAGS_catalog_client_rpc_retry_interval_ms,
[&attempt]() { return CatalogRpcDebugFn(&attempt); },
exec_response_.get());
RETURN_IF_ERROR(rpc_status.status);
+ if (FLAGS_use_local_catalog)
VerifyMinimalResponse(exec_response_.get()->result);
catalog_update_result_.reset(
new TCatalogUpdateResult(exec_response_.get()->result));
Status status(exec_response_->result.status);
@@ -110,6 +134,7 @@ Status CatalogOpExecutor::Exec(const TCatalogOpRequest&
request) {
FLAGS_catalog_client_rpc_retry_interval_ms,
[&attempt]() { return CatalogRpcDebugFn(&attempt); }, &response);
RETURN_IF_ERROR(rpc_status.status);
+ if (FLAGS_use_local_catalog) VerifyMinimalResponse(response.result);
catalog_update_result_.reset(new TCatalogUpdateResult(response.result));
return Status(response.result.status);
}
@@ -133,6 +158,8 @@ Status CatalogOpExecutor::ExecComputeStats(
update_stats_req.__set_ddl_type(TDdlType::ALTER_TABLE);
update_stats_req.__set_sync_ddl(compute_stats_request.sync_ddl);
update_stats_req.__set_debug_action(compute_stats_request.ddl_params.debug_action);
+ update_stats_req.__set_header(TCatalogServiceRequestHeader());
+ update_stats_req.header.__set_want_minimal_response(FLAGS_use_local_catalog);
const TComputeStatsParams& compute_stats_params =
compute_stats_request.ddl_params.compute_stats_params;
diff --git a/be/src/service/client-request-state.cc
b/be/src/service/client-request-state.cc
index 6a60e24..3d1e3ff 100644
--- a/be/src/service/client-request-state.cc
+++ b/be/src/service/client-request-state.cc
@@ -84,6 +84,7 @@ DECLARE_int32(krpc_port);
DECLARE_int32(catalog_service_port);
DECLARE_string(catalog_service_host);
DECLARE_int64(max_result_cache_size);
+DECLARE_bool(use_local_catalog);
namespace impala {
@@ -258,6 +259,8 @@ Status ClientRequestState::Exec() {
reset_req.__set_op_type(TCatalogOpType::RESET_METADATA);
reset_req.__set_reset_metadata_params(TResetMetadataRequest());
reset_req.reset_metadata_params.__set_header(TCatalogServiceRequestHeader());
+ reset_req.reset_metadata_params.header.__set_want_minimal_response(
+ FLAGS_use_local_catalog);
reset_req.reset_metadata_params.__set_is_refresh(true);
reset_req.reset_metadata_params.__set_table_name(
exec_request_->load_data_request.table_name);
@@ -1269,6 +1272,7 @@ Status ClientRequestState::UpdateCatalog() {
catalog_update.__set_header(TCatalogServiceRequestHeader());
catalog_update.header.__set_requesting_user(effective_user());
catalog_update.header.__set_client_ip(session()->network_address.hostname);
+ catalog_update.header.__set_want_minimal_response(FLAGS_use_local_catalog);
catalog_update.header.__set_redacted_sql_stmt(
query_ctx_.client_request.__isset.redacted_stmt ?
query_ctx_.client_request.redacted_stmt :
query_ctx_.client_request.stmt);
diff --git a/common/thrift/CatalogService.thrift
b/common/thrift/CatalogService.thrift
index 3e620af..bb0fce3 100644
--- a/common/thrift/CatalogService.thrift
+++ b/common/thrift/CatalogService.thrift
@@ -55,6 +55,10 @@ struct TCatalogServiceRequestHeader {
// The client IP address.
3: optional string client_ip
+
+ // Set by LocalCatalog coordinators. The response will contain minimal
catalog objects
+ // (for invalidations) instead of full catalog objects.
+ 4: optional bool want_minimal_response
}
// Returns details on the result of an operation that updates the catalog.
Information
diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogObject.java
b/fe/src/main/java/org/apache/impala/catalog/CatalogObject.java
index 90e289c..39da7a9 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogObject.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogObject.java
@@ -30,12 +30,15 @@ public interface CatalogObject extends HasName {
* "full" form with all information, used in catalog topic updates and DDL
responses to
* coordinators. When sending incremental update for a hdfs table, its
"descriptor" form
* is used with no partitions. Its incremental partition updates will follow
it in the
- * same topic update.
+ * same topic update. "invalidation" form means only the name will be
included. "none"
+ * form means return nothing, i.e. null.
*/
static enum ThriftObjectType {
FULL,
- DESCRIPTOR_ONLY
- };
+ DESCRIPTOR_ONLY,
+ INVALIDATION,
+ NONE
+ }
// Returns the TCatalogObject type of this Catalog object.
public TCatalogObjectType getCatalogObjectType();
diff --git
a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
index 7fee648..23473e6 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
@@ -2272,11 +2272,13 @@ public class CatalogServiceCatalog extends Catalog {
}
/**
- * Wrapper around {@link #reloadTable(Table, boolean, String)} which passes
false for
- * {@code refreshUpdatedPartitions} argument.
+ * Wrapper around {@link #reloadTable(Table, boolean,
CatalogObject.ThriftObjectType,
+ * String)} which passes false for {@code refreshUpdatedPartitions} argument
and ignore
+ * the result.
*/
- public TCatalogObject reloadTable(Table tbl, String reason) throws
CatalogException {
- return reloadTable(tbl, new TResetMetadataRequest(), reason);
+ public void reloadTable(Table tbl, String reason) throws CatalogException {
+ reloadTable(tbl, new TResetMetadataRequest(),
CatalogObject.ThriftObjectType.NONE,
+ reason);
}
/**
@@ -2287,9 +2289,10 @@ public class CatalogServiceCatalog extends Catalog {
* Throws a CatalogException if there is an error loading table metadata.
* If {@code refreshUpdatedParts} is true, the refresh logic detects updated
* partitions in metastore and reloads them too.
+ * if {@code wantMinimalResult} is true, returns the result in the minimal
form.
*/
public TCatalogObject reloadTable(Table tbl, TResetMetadataRequest request,
- String reason) throws CatalogException {
+ CatalogObject.ThriftObjectType resultType, String reason) throws
CatalogException {
LOG.info(String.format("Refreshing table metadata: %s",
tbl.getFullName()));
Preconditions.checkState(!(tbl instanceof IncompleteTable));
String dbName = tbl.getDb().getName();
@@ -2321,7 +2324,7 @@ public class CatalogServiceCatalog extends Catalog {
}
tbl.setCatalogVersion(newCatalogVersion);
LOG.info(String.format("Refreshed table metadata: %s",
tbl.getFullName()));
- return tbl.toTCatalogObject();
+ return tbl.toTCatalogObject(resultType);
} finally {
context.stop();
Preconditions.checkState(!versionLock_.isWriteLockedByCurrentThread());
@@ -2506,7 +2509,8 @@ public class CatalogServiceCatalog extends Catalog {
throw new TableNotLoadedException(dbName + "." + tblName + " is not
loaded");
}
Reference<Boolean> wasPartitionRefreshed = new Reference<>(false);
- reloadPartition(table, tPartSpec, wasPartitionRefreshed, reason);
+ reloadPartition(table, tPartSpec, wasPartitionRefreshed,
+ CatalogObject.ThriftObjectType.NONE, reason);
return wasPartitionRefreshed.getRef();
}
@@ -2837,9 +2841,9 @@ public class CatalogServiceCatalog extends Catalog {
* 'partitionSpec' in table 'tbl'. Returns the resulting table's
TCatalogObject after
* the partition metadata was reloaded.
*/
- public TCatalogObject reloadPartition(Table tbl,
- List<TPartitionKeyValue> partitionSpec,
- Reference<Boolean> wasPartitionReloaded, String reason) throws
CatalogException {
+ public TCatalogObject reloadPartition(Table tbl, List<TPartitionKeyValue>
partitionSpec,
+ Reference<Boolean> wasPartitionReloaded, CatalogObject.ThriftObjectType
resultType,
+ String reason) throws CatalogException {
if (!tryLockTable(tbl)) {
throw new CatalogException(String.format("Error reloading partition of
table %s " +
"due to lock contention", tbl.getFullName()));
@@ -2876,7 +2880,7 @@ public class CatalogServiceCatalog extends Catalog {
+ "it does not exist in metastore anymore",
hdfsTable.getFullName() + " " + partitionName));
}
- return hdfsTable.toTCatalogObject();
+ return hdfsTable.toTCatalogObject(resultType);
} catch (Exception e) {
throw new CatalogException("Error loading metadata for partition: "
+ hdfsTable.getFullName() + " " + partitionName, e);
@@ -2887,7 +2891,7 @@ public class CatalogServiceCatalog extends Catalog {
wasPartitionReloaded.setRef(true);
LOG.info(String.format("Refreshed partition metadata: %s %s",
hdfsTable.getFullName(), partitionName));
- return hdfsTable.toTCatalogObject();
+ return hdfsTable.toTCatalogObject(resultType);
} finally {
Preconditions.checkState(!versionLock_.isWriteLockedByCurrentThread());
tbl.getLock().unlock();
diff --git a/fe/src/main/java/org/apache/impala/catalog/Db.java
b/fe/src/main/java/org/apache/impala/catalog/Db.java
index b987008..bd6353f 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Db.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Db.java
@@ -465,6 +465,12 @@ public class Db extends CatalogObjectImpl implements FeDb {
catalogObject.setDb(toThrift());
}
+ public TCatalogObject toMinimalTCatalogObject() {
+ TCatalogObject min = new TCatalogObject(getCatalogObjectType(),
getCatalogVersion());
+ min.setDb(new TDatabase(getName()));
+ return min;
+ }
+
/**
* Get partial information about this DB in order to service
CatalogdMetaProvider
* running in a remote impalad.
diff --git a/fe/src/main/java/org/apache/impala/catalog/Table.java
b/fe/src/main/java/org/apache/impala/catalog/Table.java
index 625b3cb..ed935f6 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Table.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Table.java
@@ -522,13 +522,21 @@ public abstract class Table extends CatalogObjectImpl
implements FeTable {
private TCatalogObject toMinimalTCatalogObjectHelper() {
TCatalogObject catalogObject =
new TCatalogObject(getCatalogObjectType(), getCatalogVersion());
- catalogObject.setTable(new TTable());
- catalogObject.getTable().setDb_name(getDb().getName());
- catalogObject.getTable().setTbl_name(getName());
+ catalogObject.setTable(new TTable(getDb().getName(), getName()));
return catalogObject;
}
/**
+ * Returns a TCatalogObject with only the table name for invalidation. For
non-hdfs
+ * tables, it's the same as toMinimalTCatalogObject(). For hdfs tables, their
+ * toMinimalTCatalogObject() will also return the partition names. So we use
this method
+ * to get a light-weight object for invalidation in LocalCatalog.
+ */
+ public TCatalogObject toInvalidationObject() {
+ return toMinimalTCatalogObjectHelper();
+ }
+
+ /**
* Override parent implementation that will finally call toThrift() which
requires
* holding the table lock. However, it's not guaranteed that caller holds
the table
* lock (IMPALA-9136). Here we use toMinimalTCatalogObjectHelper() directly
since only
@@ -545,6 +553,21 @@ public abstract class Table extends CatalogObjectImpl
implements FeTable {
}
/**
+ * Generates a TCatalogObject based on the required form. See more details
in comments
+ * of ThriftObjectType.
+ */
+ public TCatalogObject toTCatalogObject(ThriftObjectType resultType) {
+ switch (resultType) {
+ case FULL: return toTCatalogObject();
+ case DESCRIPTOR_ONLY: return toMinimalTCatalogObject();
+ case INVALIDATION: return toInvalidationObject();
+ case NONE:
+ default:
+ return null;
+ }
+ }
+
+ /**
* Return partial info about this table. This is called only on the catalogd
to
* service GetPartialCatalogObject RPCs.
*/
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index 9fa0f9b..b4c1ddf 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -80,6 +80,7 @@ import org.apache.impala.authorization.AuthorizationDelta;
import org.apache.impala.authorization.AuthorizationManager;
import org.apache.impala.authorization.User;
import org.apache.impala.catalog.CatalogException;
+import org.apache.impala.catalog.CatalogObject;
import org.apache.impala.catalog.CatalogServiceCatalog;
import org.apache.impala.catalog.Column;
import org.apache.impala.catalog.ColumnNotFoundException;
@@ -343,8 +344,13 @@ public class CatalogOpExecutor {
response.setResult(new TCatalogUpdateResult());
response.getResult().setCatalog_service_id(JniCatalog.getServiceId());
User requestingUser = null;
+ boolean wantMinimalResult = false;
if (ddlRequest.isSetHeader()) {
- requestingUser = new User(ddlRequest.getHeader().getRequesting_user());
+ TCatalogServiceRequestHeader header = ddlRequest.getHeader();
+ if (header.isSetRequesting_user()) {
+ requestingUser = new User(ddlRequest.getHeader().getRequesting_user());
+ }
+ wantMinimalResult = ddlRequest.getHeader().isWant_minimal_response();
}
Optional<TTableName> tTableName = Optional.empty();
TDdlType ddl_type = ddlRequest.ddl_type;
@@ -355,53 +361,55 @@ public class CatalogOpExecutor {
TAlterDbParams alter_db_params = ddlRequest.getAlter_db_params();
tTableName = Optional.of(new TTableName(alter_db_params.db, ""));
catalogOpMetric_.increment(ddl_type, tTableName);
- alterDatabase(alter_db_params, response);
+ alterDatabase(alter_db_params, wantMinimalResult, response);
break;
case ALTER_TABLE:
TAlterTableParams alter_table_params =
ddlRequest.getAlter_table_params();
tTableName = Optional.of(alter_table_params.getTable_name());
catalogOpMetric_.increment(ddl_type, tTableName);
- alterTable(alter_table_params, ddlRequest.getDebug_action(),
response);
+ alterTable(alter_table_params, ddlRequest.getDebug_action(),
wantMinimalResult,
+ response);
break;
case ALTER_VIEW:
TCreateOrAlterViewParams alter_view_params =
ddlRequest.getAlter_view_params();
tTableName = Optional.of(alter_view_params.getView_name());
catalogOpMetric_.increment(ddl_type, tTableName);
- alterView(alter_view_params, response);
+ alterView(alter_view_params, wantMinimalResult, response);
break;
case CREATE_DATABASE:
TCreateDbParams create_db_params = ddlRequest.getCreate_db_params();
tTableName = Optional.of(new TTableName(create_db_params.db, ""));
catalogOpMetric_.increment(ddl_type, tTableName);
- createDatabase(create_db_params, response, syncDdl);
+ createDatabase(create_db_params, response, syncDdl,
wantMinimalResult);
break;
case CREATE_TABLE_AS_SELECT:
TCreateTableParams create_table_as_select_params =
ddlRequest.getCreate_table_params();
tTableName =
Optional.of(create_table_as_select_params.getTable_name());
catalogOpMetric_.increment(ddl_type, tTableName);
- response.setNew_table_created(
- createTable(create_table_as_select_params, response, syncDdl));
+
response.setNew_table_created(createTable(create_table_as_select_params,
+ response, syncDdl, wantMinimalResult));
break;
case CREATE_TABLE:
TCreateTableParams create_table_params =
ddlRequest.getCreate_table_params();
tTableName = Optional.of((create_table_params.getTable_name()));
catalogOpMetric_.increment(ddl_type, tTableName);
- createTable(ddlRequest.getCreate_table_params(), response, syncDdl);
+ createTable(ddlRequest.getCreate_table_params(), response, syncDdl,
+ wantMinimalResult);
break;
case CREATE_TABLE_LIKE:
TCreateTableLikeParams create_table_like_params =
ddlRequest.getCreate_table_like_params();
tTableName = Optional.of(create_table_like_params.getTable_name());
catalogOpMetric_.increment(ddl_type, tTableName);
- createTableLike(create_table_like_params, response, syncDdl);
+ createTableLike(create_table_like_params, response, syncDdl,
wantMinimalResult);
break;
case CREATE_VIEW:
TCreateOrAlterViewParams create_view_params =
ddlRequest.getCreate_view_params();
tTableName = Optional.of(create_view_params.getView_name());
catalogOpMetric_.increment(ddl_type, tTableName);
- createView(create_view_params, response);
+ createView(create_view_params, wantMinimalResult, response);
break;
case CREATE_FUNCTION:
catalogOpMetric_.increment(ddl_type, Optional.empty());
@@ -419,7 +427,7 @@ public class CatalogOpExecutor {
TDropStatsParams drop_stats_params =
ddlRequest.getDrop_stats_params();
tTableName = Optional.of(drop_stats_params.getTable_name());
catalogOpMetric_.increment(ddl_type, tTableName);
- dropStats(drop_stats_params, response);
+ dropStats(drop_stats_params, wantMinimalResult, response);
break;
case DROP_DATABASE:
TDropDbParams drop_db_params = ddlRequest.getDrop_db_params();
@@ -433,13 +441,15 @@ public class CatalogOpExecutor {
ddlRequest.getDrop_table_or_view_params();
tTableName = Optional.of(drop_table_or_view_params.getTable_name());
catalogOpMetric_.increment(ddl_type, tTableName);
+ // Dropped tables and views are already returned as minimal results,
so don't
+ // need to pass down wantMinimalResult here.
dropTableOrView(drop_table_or_view_params, response);
break;
case TRUNCATE_TABLE:
TTruncateParams truncate_params = ddlRequest.getTruncate_params();
tTableName = Optional.of(truncate_params.getTable_name());
catalogOpMetric_.increment(ddl_type, tTableName);
- truncateTable(truncate_params, response);
+ truncateTable(truncate_params, wantMinimalResult, response);
break;
case DROP_FUNCTION:
catalogOpMetric_.increment(ddl_type, Optional.empty());
@@ -480,7 +490,7 @@ public class CatalogOpExecutor {
case COMMENT_ON:
TCommentOnParams comment_on_params =
ddlRequest.getComment_on_params();
tTableName = Optional.of(new TTableName("", ""));
- alterCommentOn(comment_on_params, response, tTableName);
+ alterCommentOn(comment_on_params, response, tTableName,
wantMinimalResult);
break;
case COPY_TESTCASE:
catalogOpMetric_.increment(ddl_type, Optional.empty());
@@ -636,7 +646,7 @@ public class CatalogOpExecutor {
* thread-safe, i.e. concurrent operations on the same table are serialized.
*/
private void alterTable(TAlterTableParams params, @Nullable String
debugAction,
- TDdlExecResponse response)
+ boolean wantMinimalResult, TDdlExecResponse response)
throws ImpalaException {
// When true, loads the file/block metadata.
boolean reloadFileMetadata = false;
@@ -670,7 +680,7 @@ public class CatalogOpExecutor {
try {
alterTableOrViewRename(tbl,
TableName.fromThrift(params.getRename_params().getNew_table_name()),
- newCatalogVersion, response);
+ newCatalogVersion, wantMinimalResult, response);
return;
} finally {
// release the version taken in the tryLock call above
@@ -683,11 +693,13 @@ public class CatalogOpExecutor {
catalog_.getLock().writeLock().unlock();
if (tbl instanceof KuduTable && altersKuduTable(params.getAlter_type()))
{
- alterKuduTable(params, response, (KuduTable) tbl, newCatalogVersion);
+ alterKuduTable(params, response, (KuduTable) tbl, newCatalogVersion,
+ wantMinimalResult);
return;
} else if (tbl instanceof IcebergTable &&
altersIcebergTable(params.getAlter_type())) {
- alterIcebergTable(params, response, (IcebergTable) tbl,
newCatalogVersion);
+ alterIcebergTable(params, response, (IcebergTable) tbl,
newCatalogVersion,
+ wantMinimalResult);
return;
}
switch (params.getAlter_type()) {
@@ -721,7 +733,7 @@ public class CatalogOpExecutor {
// only add the versions for in-flight events when we are sure
that the
// partition was really added.
catalog_.addVersionsForInflightEvents(false, tbl,
newCatalogVersion);
- addTableToCatalogUpdate(refreshedTable, response.result);
+ addTableToCatalogUpdate(refreshedTable, wantMinimalResult,
response.result);
}
reloadMetadata = false;
addSummary(response, "New partition has been added to the table.");
@@ -756,7 +768,7 @@ public class CatalogOpExecutor {
// since by the time the event is received, the partition is
already
// removed from catalog and there is nothing to compare against
during
// self-event evaluation
- addTableToCatalogUpdate(refreshedTable, response.result);
+ addTableToCatalogUpdate(refreshedTable, wantMinimalResult,
response.result);
}
addSummary(response,
"Dropped " + numUpdatedPartitions.getRef() + " partition(s).");
@@ -863,7 +875,7 @@ public class CatalogOpExecutor {
// now that HMS alter operation has succeeded, add this version to
list of
// inflight events in catalog table if event processing is enabled
catalog_.addVersionsForInflightEvents(false, tbl, newCatalogVersion);
- addTableToCatalogUpdate(tbl, response.result);
+ addTableToCatalogUpdate(tbl, wantMinimalResult, response.result);
}
// Make sure all the modifications are done.
Preconditions.checkState(!tbl.hasInProgressModification());
@@ -892,7 +904,8 @@ public class CatalogOpExecutor {
* Executes the ALTER TABLE command for a Kudu table and reloads its
metadata.
*/
private void alterKuduTable(TAlterTableParams params, TDdlExecResponse
response,
- KuduTable tbl, long newCatalogVersion) throws ImpalaException {
+ KuduTable tbl, long newCatalogVersion, boolean wantMinimalResult)
+ throws ImpalaException {
Preconditions.checkState(tbl.getLock().isHeldByCurrentThread());
switch (params.getAlter_type()) {
case ADD_COLUMNS:
@@ -932,7 +945,7 @@ public class CatalogOpExecutor {
loadTableMetadata(tbl, newCatalogVersion, true, true, null, "ALTER KUDU
TABLE " +
params.getAlter_type().name());
- addTableToCatalogUpdate(tbl, response.result);
+ addTableToCatalogUpdate(tbl, wantMinimalResult, response.result);
}
/**
@@ -950,7 +963,8 @@ public class CatalogOpExecutor {
* Executes the ALTER TABLE command for a Iceberg table and reloads its
metadata.
*/
private void alterIcebergTable(TAlterTableParams params, TDdlExecResponse
response,
- IcebergTable tbl, long newCatalogVersion) throws ImpalaException {
+ IcebergTable tbl, long newCatalogVersion, boolean wantMinimalResult)
+ throws ImpalaException {
Preconditions.checkState(tbl.getLock().isHeldByCurrentThread());
switch (params.getAlter_type()) {
case ADD_COLUMNS:
@@ -982,7 +996,7 @@ public class CatalogOpExecutor {
loadTableMetadata(tbl, newCatalogVersion, true, true, null, "ALTER Iceberg
TABLE " +
params.getAlter_type().name());
- addTableToCatalogUpdate(tbl, response.result);
+ addTableToCatalogUpdate(tbl, wantMinimalResult, response.result);
}
/**
@@ -1013,11 +1027,12 @@ public class CatalogOpExecutor {
* Serializes and adds table 'tbl' to a TCatalogUpdateResult object. Uses the
* version of the serialized table as the version of the catalog update
result.
*/
- private static void addTableToCatalogUpdate(Table tbl, TCatalogUpdateResult
result) {
+ private static void addTableToCatalogUpdate(Table tbl, boolean
wantMinimalResult,
+ TCatalogUpdateResult result) {
Preconditions.checkNotNull(tbl);
- TCatalogObject updatedCatalogObject = tbl.toTCatalogObject();
- // TODO(IMPALA-9936): if client is a 'v2' impalad, only send back
invalidation
// TODO(IMPALA-9937): if client is a 'v1' impalad, only send back
incremental updates
+ TCatalogObject updatedCatalogObject = wantMinimalResult ?
+ tbl.toInvalidationObject() : tbl.toTCatalogObject();
result.addToUpdated_catalog_objects(updatedCatalogObject);
result.setVersion(updatedCatalogObject.getCatalog_version());
}
@@ -1045,8 +1060,8 @@ public class CatalogOpExecutor {
* if the view does not exist or if the existing metadata entry is
* a table instead of a a view.
*/
- private void alterView(TCreateOrAlterViewParams params, TDdlExecResponse
resp)
- throws ImpalaException {
+ private void alterView(TCreateOrAlterViewParams params, boolean
wantMinimalResult,
+ TDdlExecResponse resp) throws ImpalaException {
TableName tableName = TableName.fromThrift(params.getView_name());
Preconditions.checkState(tableName != null &&
tableName.isFullyQualified());
Preconditions.checkState(params.getColumns() != null &&
@@ -1084,7 +1099,7 @@ public class CatalogOpExecutor {
}
addSummary(resp, "View has been altered.");
tbl.setCatalogVersion(newCatalogVersion);
- addTableToCatalogUpdate(tbl, resp.result);
+ addTableToCatalogUpdate(tbl, wantMinimalResult, resp.result);
} finally {
UnlockWriteLockIfErronouslyLocked();
tbl.getLock().unlock();
@@ -1343,7 +1358,7 @@ public class CatalogOpExecutor {
* @param syncDdl tells if SYNC_DDL option is enabled on this DDL request.
*/
private void createDatabase(TCreateDbParams params, TDdlExecResponse resp,
- boolean syncDdl) throws ImpalaException {
+ boolean syncDdl, boolean wantMinimalResult) throws ImpalaException {
Preconditions.checkNotNull(params);
String dbName = params.getDb();
Preconditions.checkState(dbName != null && !dbName.isEmpty(),
@@ -1381,9 +1396,7 @@ public class CatalogOpExecutor {
existingDb.getLock().unlock();
}
}
- // TODO(IMPALA-9936): if client is a 'v2' impalad, only send back
invalidation
-
resp.getResult().addToUpdated_catalog_objects(existingDb.toTCatalogObject());
- resp.getResult().setVersion(existingDb.getCatalogVersion());
+ addDbToCatalogUpdate(existingDb, wantMinimalResult, resp.result);
addSummary(resp, "Database already exists.");
return;
}
@@ -1441,9 +1454,7 @@ public class CatalogOpExecutor {
}
}
- Preconditions.checkNotNull(newDb);
- // TODO(IMPALA-9936): if client is a 'v2' impalad, only send back
invalidation
- resp.result.addToUpdated_catalog_objects(newDb.toTCatalogObject());
+ addDbToCatalogUpdate(newDb, wantMinimalResult, resp.result);
if (authzConfig_.isEnabled()) {
authzManager_.updateDatabaseOwnerPrivilege(params.server_name,
newDb.getName(),
/* oldOwner */ null, /* oldOwnerType */ null,
@@ -1451,7 +1462,6 @@ public class CatalogOpExecutor {
resp);
}
}
- resp.result.setVersion(newDb.getCatalogVersion());
}
private void createFunction(TCreateFunctionParams params, TDdlExecResponse
resp)
@@ -1591,8 +1601,8 @@ public class CatalogOpExecutor {
* encountered as part of this operation. Acquires a lock on the modified
table
* to protect against concurrent modifications.
*/
- private void dropStats(TDropStatsParams params, TDdlExecResponse resp)
- throws ImpalaException {
+ private void dropStats(TDropStatsParams params, boolean wantMinimalResult,
+ TDdlExecResponse resp) throws ImpalaException {
Table table = getExistingTable(params.getTable_name().getDb_name(),
params.getTable_name().getTable_name(), "Load for DROP STATS");
Preconditions.checkNotNull(table);
@@ -1629,7 +1639,7 @@ public class CatalogOpExecutor {
}
loadTableMetadata(table, newCatalogVersion, /*reloadFileMetadata=*/false,
/*reloadTableSchema=*/true, /*partitionsToUpdate=*/null, "DROP
STATS");
- addTableToCatalogUpdate(table, resp.result);
+ addTableToCatalogUpdate(table, wantMinimalResult, resp.result);
addSummary(resp, "Stats have been dropped.");
} finally {
UnlockWriteLockIfErronouslyLocked();
@@ -2095,8 +2105,8 @@ public class CatalogOpExecutor {
* concurrent table modifications.
* TODO truncate specified partitions.
*/
- private void truncateTable(TTruncateParams params, TDdlExecResponse resp)
- throws ImpalaException {
+ private void truncateTable(TTruncateParams params, boolean wantMinimalResult,
+ TDdlExecResponse resp) throws ImpalaException {
TTableName tblName = params.getTable_name();
Table table = null;
try {
@@ -2137,7 +2147,7 @@ public class CatalogOpExecutor {
Preconditions.checkState(newCatalogVersion > 0);
addSummary(resp, "Table has been truncated.");
loadTableMetadata(table, newCatalogVersion, true, true, null,
"TRUNCATE");
- addTableToCatalogUpdate(table, resp.result);
+ addTableToCatalogUpdate(table, wantMinimalResult, resp.result);
} finally {
UnlockWriteLockIfErronouslyLocked();
if (table.getLock().isHeldByCurrentThread()) {
@@ -2374,7 +2384,7 @@ public class CatalogOpExecutor {
* otherwise.
*/
private boolean createTable(TCreateTableParams params, TDdlExecResponse
response,
- boolean syncDdl) throws ImpalaException {
+ boolean syncDdl, boolean wantMinimalResult) throws ImpalaException {
Preconditions.checkNotNull(params);
TableName tableName = TableName.fromThrift(params.getTable_name());
Preconditions.checkState(tableName != null &&
tableName.isFullyQualified());
@@ -2407,7 +2417,7 @@ public class CatalogOpExecutor {
LOG.trace("Table {} version bumped to {} because SYNC_DDL is
enabled.",
tableName, newVersion);
}
- addTableToCatalogUpdate(existingTbl, response.result);
+ addTableToCatalogUpdate(existingTbl, wantMinimalResult,
response.result);
} finally {
// Release the locks held in tryLock().
catalog_.getLock().writeLock().unlock();
@@ -2417,14 +2427,16 @@ public class CatalogOpExecutor {
}
org.apache.hadoop.hive.metastore.api.Table tbl =
createMetaStoreTable(params);
LOG.trace("Creating table {}", tableName);
- if (KuduTable.isKuduTable(tbl)) return createKuduTable(tbl, params,
response);
- else if (IcebergTable.isIcebergTable(tbl)) {
- return createIcebergTable(tbl, params, response);
+ if (KuduTable.isKuduTable(tbl)) {
+ return createKuduTable(tbl, params, wantMinimalResult, response);
+ } else if (IcebergTable.isIcebergTable(tbl)) {
+ return createIcebergTable(tbl, params, wantMinimalResult, response);
}
Preconditions.checkState(params.getColumns().size() > 0,
"Empty column list given as argument to Catalog.createTable");
return createTable(tbl, params.if_not_exists, params.getCache_op(),
- params.server_name, params.getPrimary_keys(),
params.getForeign_keys(), response);
+ params.server_name, params.getPrimary_keys(), params.getForeign_keys(),
+ wantMinimalResult, response);
}
/**
@@ -2520,7 +2532,8 @@ public class CatalogOpExecutor {
* table was created as part of this call, false otherwise.
*/
private boolean createKuduTable(org.apache.hadoop.hive.metastore.api.Table
newTable,
- TCreateTableParams params, TDdlExecResponse response) throws
ImpalaException {
+ TCreateTableParams params, boolean wantMinimalResult, TDdlExecResponse
response)
+ throws ImpalaException {
Preconditions.checkState(KuduTable.isKuduTable(newTable));
boolean createHMSTable;
if (!KuduTable.isSynchronizedTable(newTable)) {
@@ -2554,7 +2567,7 @@ public class CatalogOpExecutor {
// Add the table to the catalog cache
Table newTbl = catalog_.addIncompleteTable(newTable.getDbName(),
newTable.getTableName());
- addTableToCatalogUpdate(newTbl, response.result);
+ addTableToCatalogUpdate(newTbl, wantMinimalResult, response.result);
}
} catch (Exception e) {
try {
@@ -2591,7 +2604,7 @@ public class CatalogOpExecutor {
private boolean createTable(org.apache.hadoop.hive.metastore.api.Table
newTable,
boolean if_not_exists, THdfsCachingOp cacheOp, String serverName,
List<SQLPrimaryKey> primaryKeys, List<SQLForeignKey> foreignKeys,
- TDdlExecResponse response) throws ImpalaException {
+ boolean wantMinimalResult, TDdlExecResponse response) throws
ImpalaException {
Preconditions.checkState(!KuduTable.isKuduTable(newTable));
synchronized (metastoreDdlLock_) {
try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
@@ -2649,7 +2662,7 @@ public class CatalogOpExecutor {
}
Table newTbl = catalog_.addIncompleteTable(newTable.getDbName(),
newTable.getTableName());
- addTableToCatalogUpdate(newTbl, response.result);
+ addTableToCatalogUpdate(newTbl, wantMinimalResult, response.result);
if (authzConfig_.isEnabled()) {
authzManager_.updateTableOwnerPrivilege(serverName,
newTable.getDbName(),
newTable.getTableName(), /* oldOwner */ null,
@@ -2665,8 +2678,8 @@ public class CatalogOpExecutor {
* lazily load the new metadata on the next access. Re-throws any Metastore
* exceptions encountered during the create.
*/
- private void createView(TCreateOrAlterViewParams params, TDdlExecResponse
response)
- throws ImpalaException {
+ private void createView(TCreateOrAlterViewParams params, boolean
wantMinimalResult,
+ TDdlExecResponse response) throws ImpalaException {
TableName tableName = TableName.fromThrift(params.getView_name());
Preconditions.checkState(tableName != null &&
tableName.isFullyQualified());
Preconditions.checkState(params.getColumns() != null &&
@@ -2687,7 +2700,7 @@ public class CatalogOpExecutor {
setCreateViewAttributes(params, view);
LOG.trace(String.format("Creating view %s", tableName));
if (!createTable(view, params.if_not_exists, null, params.server_name,
- new ArrayList<>(), new ArrayList<>(), response)) {
+ new ArrayList<>(), new ArrayList<>(), wantMinimalResult, response)) {
addSummary(response, "View already exists.");
} else {
addSummary(response, "View has been created.");
@@ -2698,7 +2711,8 @@ public class CatalogOpExecutor {
* Creates a new Iceberg table.
*/
private boolean
createIcebergTable(org.apache.hadoop.hive.metastore.api.Table newTable,
- TCreateTableParams params, TDdlExecResponse response) throws
ImpalaException {
+ TCreateTableParams params, boolean wantMinimalResult, TDdlExecResponse
response)
+ throws ImpalaException {
Preconditions.checkState(IcebergTable.isIcebergTable(newTable));
try {
@@ -2782,7 +2796,7 @@ public class CatalogOpExecutor {
// Add the table to the catalog cache
Table newTbl = catalog_.addIncompleteTable(newTable.getDbName(),
newTable.getTableName());
- addTableToCatalogUpdate(newTbl, response.result);
+ addTableToCatalogUpdate(newTbl, wantMinimalResult, response.result);
}
} catch (Exception e) {
if (e instanceof AlreadyExistsException && params.if_not_exists) {
@@ -2805,7 +2819,7 @@ public class CatalogOpExecutor {
* @param syncDdl tells is SYNC_DDL is enabled for this DDL request.
*/
private void createTableLike(TCreateTableLikeParams params, TDdlExecResponse
response,
- boolean syncDdl) throws ImpalaException {
+ boolean syncDdl, boolean wantMinimalResult) throws ImpalaException {
Preconditions.checkNotNull(params);
THdfsFileFormat fileFormat =
@@ -2842,7 +2856,7 @@ public class CatalogOpExecutor {
LOG.trace("Table {} version bumped to {} because SYNC_DDL is
enabled.",
existingTbl.getFullName(), newVersion);
}
- addTableToCatalogUpdate(existingTbl, response.result);
+ addTableToCatalogUpdate(existingTbl, wantMinimalResult,
response.result);
} finally {
// Release the locks held in tryLock().
catalog_.getLock().writeLock().unlock();
@@ -2912,7 +2926,7 @@ public class CatalogOpExecutor {
setDefaultTableCapabilities(tbl);
LOG.trace(String.format("Creating table %s LIKE %s", tblName, srcTblName));
createTable(tbl, params.if_not_exists, null, params.server_name, null,
null,
- response);
+ wantMinimalResult, response);
}
private static void setDefaultTableCapabilities(
@@ -3339,7 +3353,8 @@ public class CatalogOpExecutor {
* reloaded on the next access.
*/
private void alterTableOrViewRename(Table oldTbl, TableName newTableName,
- long newCatalogVersion, TDdlExecResponse response) throws
ImpalaException {
+ long newCatalogVersion, boolean wantMinimalResult, TDdlExecResponse
response)
+ throws ImpalaException {
Preconditions.checkState(oldTbl.getLock().isHeldByCurrentThread()
&& catalog_.getLock().isWriteLockedByCurrentThread());
TableName tableName = oldTbl.getTableName();
@@ -3391,9 +3406,14 @@ public class CatalogOpExecutor {
newTableName.toString()));
}
catalog_.addVersionsForInflightEvents(false, result.second,
newCatalogVersion);
- // TODO(IMPALA-9936): if client is a 'v2' impalad, only send back
invalidation
-
response.result.addToRemoved_catalog_objects(result.first.toMinimalTCatalogObject());
-
response.result.addToUpdated_catalog_objects(result.second.toTCatalogObject());
+ if (wantMinimalResult) {
+
response.result.addToRemoved_catalog_objects(result.first.toInvalidationObject());
+
response.result.addToUpdated_catalog_objects(result.second.toInvalidationObject());
+ } else {
+ response.result.addToRemoved_catalog_objects(
+ result.first.toMinimalTCatalogObject());
+
response.result.addToUpdated_catalog_objects(result.second.toTCatalogObject());
+ }
response.result.setVersion(result.second.getCatalogVersion());
addSummary(response, "Renaming was successful.");
}
@@ -4447,36 +4467,37 @@ public class CatalogOpExecutor {
boolean isTableLoadedInCatalog = tbl.isLoaded();
tbl = getExistingTable(tblName.getDb(), tblName.getTbl(),
"Load triggered by " + cmdString);
- if (tbl != null) {
- if (isTableLoadedInCatalog) {
- if (req.isSetPartition_spec()) {
- boolean isTransactional = AcidUtils.isTransactionalTable(
- tbl.getMetaStoreTable().getParameters());
- Preconditions.checkArgument(!isTransactional);
- Reference<Boolean> wasPartitionRefreshed = new
Reference<>(false);
- // TODO if the partition was not really refreshed because the
partSpec
- // was wrong, do we still need to send back the table?
- updatedThriftTable = catalog_.reloadPartition(tbl,
- req.getPartition_spec(), wasPartitionRefreshed, cmdString);
- } else {
- // TODO IMPALA-8809: Optimisation for partitioned tables:
- // 1: Reload the whole table if schema change happened.
Identify
- // such scenario by checking Table.TBL_PROP_LAST_DDL_TIME
property.
- // Note, table level writeId is not updated by HMS for
partitioned
- // ACID tables, there is a Jira to cover this: HIVE-22062.
- // 2: If no need for a full table reload then fetch
partition level
- // writeIds and reload only the ones that changed.
- updatedThriftTable = catalog_
- .reloadTable(tbl, req, cmdString);
- }
+ CatalogObject.ThriftObjectType resultType =
+ req.header.want_minimal_response ?
+ CatalogObject.ThriftObjectType.INVALIDATION :
+ CatalogObject.ThriftObjectType.FULL;
+ if (isTableLoadedInCatalog) {
+ if (req.isSetPartition_spec()) {
+ boolean isTransactional = AcidUtils.isTransactionalTable(
+ tbl.getMetaStoreTable().getParameters());
+ Preconditions.checkArgument(!isTransactional);
+ Reference<Boolean> wasPartitionRefreshed = new
Reference<>(false);
+ // TODO if the partition was not really refreshed because the
partSpec
+ // was wrong, do we still need to send back the table?
+ updatedThriftTable = catalog_.reloadPartition(tbl,
+ req.getPartition_spec(), wasPartitionRefreshed, resultType,
cmdString);
} else {
- // Table was loaded from scratch, so it's already "refreshed".
- tbl.getLock().lock();
- try {
- updatedThriftTable = tbl.toTCatalogObject();
- } finally {
- tbl.getLock().unlock();
- }
+ // TODO IMPALA-8809: Optimisation for partitioned tables:
+ // 1: Reload the whole table if schema change happened.
Identify
+ // such scenario by checking Table.TBL_PROP_LAST_DDL_TIME
property.
+ // Note, table level writeId is not updated by HMS for
partitioned
+ // ACID tables, there is a Jira to cover this: HIVE-22062.
+ // 2: If no need for a full table reload then fetch partition
level
+ // writeIds and reload only the ones that changed.
+ updatedThriftTable = catalog_.reloadTable(tbl, req, resultType,
cmdString);
+ }
+ } else {
+ // Table was loaded from scratch, so it's already "refreshed".
+ tbl.getLock().lock();
+ try {
+ updatedThriftTable = tbl.toTCatalogObject(resultType);
+ } finally {
+ tbl.getLock().unlock();
}
}
}
@@ -4497,7 +4518,6 @@ public class CatalogOpExecutor {
if (tblWasRemoved.getRef()) {
resp.getResult().addToRemoved_catalog_objects(updatedThriftTable);
} else {
- // TODO(IMPALA-9936): if client is a 'v2' impalad, only send back
invalidation
// TODO(IMPALA-9937): if client is a 'v1' impalad, only send back
incremental
// updates
resp.getResult().addToUpdated_catalog_objects(updatedThriftTable);
@@ -4788,7 +4808,8 @@ public class CatalogOpExecutor {
createInsertEvents(table, filesBeforeInsert,
affectedExistingPartitions,
newPartsCreated, update.is_overwrite, txnId, writeId);
}
- addTableToCatalogUpdate(table, response.result);
+ addTableToCatalogUpdate(table, update.header.want_minimal_response,
+ response.result);
} finally {
context.stop();
UnlockWriteLockIfErronouslyLocked();
@@ -5013,21 +5034,21 @@ public class CatalogOpExecutor {
}
private void alterCommentOn(TCommentOnParams params, TDdlExecResponse
response,
- Optional<TTableName> tTableName)
+ Optional<TTableName> tTableName, boolean wantMinimalResult)
throws ImpalaRuntimeException, CatalogException, InternalException {
if (params.getDb() != null) {
Preconditions.checkArgument(!params.isSetTable_name() &&
!params.isSetColumn_name());
tTableName.get().setDb_name(params.db);
catalogOpMetric_.increment(TDdlType.COMMENT_ON, tTableName);
- alterCommentOnDb(params.getDb(), params.getComment(), response);
+ alterCommentOnDb(params.getDb(), params.getComment(), wantMinimalResult,
response);
} else if (params.getTable_name() != null) {
Preconditions.checkArgument(!params.isSetDb() &&
!params.isSetColumn_name());
tTableName.get().setDb_name(params.table_name.db_name);
tTableName.get().setTable_name(params.table_name.table_name);
catalogOpMetric_.increment(TDdlType.COMMENT_ON, tTableName);
alterCommentOnTableOrView(TableName.fromThrift(params.getTable_name()),
- params.getComment(), response);
+ params.getComment(), wantMinimalResult, response);
} else if (params.getColumn_name() != null) {
Preconditions.checkArgument(!params.isSetDb() &&
!params.isSetTable_name());
TColumnName columnName = params.getColumn_name();
@@ -5035,13 +5056,14 @@ public class CatalogOpExecutor {
tTableName.get().setTable_name(columnName.table_name.table_name);
catalogOpMetric_.increment(TDdlType.COMMENT_ON, tTableName);
alterCommentOnColumn(TableName.fromThrift(columnName.getTable_name()),
- columnName.getColumn_name(), params.getComment(), response);
+ columnName.getColumn_name(), params.getComment(), wantMinimalResult,
response);
} else {
throw new UnsupportedOperationException("Unsupported COMMENT ON
operation");
}
}
- private void alterCommentOnDb(String dbName, String comment,
TDdlExecResponse response)
+ private void alterCommentOnDb(String dbName, String comment, boolean
wantMinimalResult,
+ TDdlExecResponse response)
throws ImpalaRuntimeException, CatalogException, InternalException {
Db db = catalog_.getDb(dbName);
if (db == null) {
@@ -5062,7 +5084,7 @@ public class CatalogOpExecutor {
throw e;
}
Db updatedDb = catalog_.updateDb(msDb);
- addDbToCatalogUpdate(updatedDb, response.result);
+ addDbToCatalogUpdate(updatedDb, wantMinimalResult, response.result);
// now that HMS alter operation has succeeded, add this version to list
of inflight
// events in catalog database if event processing is enabled
catalog_.addVersionsForInflightEvents(db, newCatalogVersion);
@@ -5072,8 +5094,8 @@ public class CatalogOpExecutor {
addSummary(response, "Updated database.");
}
- private void alterDatabase(TAlterDbParams params, TDdlExecResponse response)
- throws ImpalaException {
+ private void alterDatabase(TAlterDbParams params, boolean wantMinimalResult,
+ TDdlExecResponse response) throws ImpalaException {
Preconditions.checkNotNull(params);
String dbName = params.getDb();
Db db = catalog_.getDb(dbName);
@@ -5082,7 +5104,8 @@ public class CatalogOpExecutor {
}
switch (params.getAlter_type()) {
case SET_OWNER:
- alterDatabaseSetOwner(db, params.getSet_owner_params(), response);
+ alterDatabaseSetOwner(db, params.getSet_owner_params(),
wantMinimalResult,
+ response);
break;
default:
throw new UnsupportedOperationException(
@@ -5091,7 +5114,7 @@ public class CatalogOpExecutor {
}
private void alterDatabaseSetOwner(Db db, TAlterDbSetOwnerParams params,
- TDdlExecResponse response) throws ImpalaException {
+ boolean wantMinimalResult, TDdlExecResponse response) throws
ImpalaException {
Preconditions.checkNotNull(params.owner_name);
Preconditions.checkNotNull(params.owner_type);
tryLock(db, "altering the owner");
@@ -5117,7 +5140,7 @@ public class CatalogOpExecutor {
msDb.getOwnerType(), response);
}
Db updatedDb = catalog_.updateDb(msDb);
- addDbToCatalogUpdate(updatedDb, response.result);
+ addDbToCatalogUpdate(updatedDb, wantMinimalResult, response.result);
// now that HMS alter operation has succeeded, add this version to list
of inflight
// events in catalog database if event processing is enabled
catalog_.addVersionsForInflightEvents(db, newCatalogVersion);
@@ -5141,18 +5164,19 @@ public class CatalogOpExecutor {
String.valueOf(newCatalogVersion));
}
- private void addDbToCatalogUpdate(Db db, TCatalogUpdateResult result) {
+ private void addDbToCatalogUpdate(Db db, boolean wantMinimalResult,
+ TCatalogUpdateResult result) {
Preconditions.checkNotNull(db);
- TCatalogObject updatedCatalogObject = db.toTCatalogObject();
+ TCatalogObject updatedCatalogObject = wantMinimalResult ?
+ db.toMinimalTCatalogObject() : db.toTCatalogObject();
updatedCatalogObject.setCatalog_version(updatedCatalogObject.getCatalog_version());
- // TODO(IMPALA-9936): if client is a 'v2' impalad, only send back
invalidation
result.addToUpdated_catalog_objects(updatedCatalogObject);
result.setVersion(updatedCatalogObject.getCatalog_version());
}
private void alterCommentOnTableOrView(TableName tableName, String comment,
- TDdlExecResponse response) throws CatalogException, InternalException,
- ImpalaRuntimeException {
+ boolean wantMinimalResult, TDdlExecResponse response)
+ throws CatalogException, InternalException, ImpalaRuntimeException {
Table tbl = getExistingTable(tableName.getDb(), tableName.getTbl(),
"Load for ALTER COMMENT");
tryLock(tbl);
@@ -5172,7 +5196,7 @@ public class CatalogOpExecutor {
}
applyAlterTable(msTbl);
loadTableMetadata(tbl, newCatalogVersion, false, false, null, "ALTER
COMMENT");
- addTableToCatalogUpdate(tbl, response.result);
+ addTableToCatalogUpdate(tbl, wantMinimalResult, response.result);
addSummary(response, String.format("Updated %s.", (isView) ? "view" :
"table"));
} finally {
tbl.getLock().unlock();
@@ -5180,8 +5204,8 @@ public class CatalogOpExecutor {
}
private void alterCommentOnColumn(TableName tableName, String columnName,
- String comment, TDdlExecResponse response) throws CatalogException,
- InternalException, ImpalaRuntimeException {
+ String comment, boolean wantMinimalResult, TDdlExecResponse response)
+ throws CatalogException, InternalException, ImpalaRuntimeException {
Table tbl = getExistingTable(tableName.getDb(), tableName.getTbl(),
"Load for ALTER COLUMN COMMENT");
tryLock(tbl);
@@ -5207,7 +5231,7 @@ public class CatalogOpExecutor {
}
loadTableMetadata(tbl, newCatalogVersion, false, true, null,
"ALTER COLUMN COMMENT");
- addTableToCatalogUpdate(tbl, response.result);
+ addTableToCatalogUpdate(tbl, wantMinimalResult, response.result);
addSummary(response, "Column has been altered.");
} finally {
tbl.getLock().unlock();
diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java
b/fe/src/main/java/org/apache/impala/service/Frontend.java
index a8ff340..1a1a4c9 100644
--- a/fe/src/main/java/org/apache/impala/service/Frontend.java
+++ b/fe/src/main/java/org/apache/impala/service/Frontend.java
@@ -682,6 +682,8 @@ public class Frontend {
TClientRequest clientRequest = queryCtx.getClient_request();
header.setRedacted_sql_stmt(clientRequest.isSetRedacted_stmt() ?
clientRequest.getRedacted_stmt() : clientRequest.getStmt());
+ header.setWant_minimal_response(
+ BackendConfig.INSTANCE.getBackendCfg().use_local_catalog);
ddl.getDdl_params().setHeader(header);
ddl.getDdl_params().setSync_ddl(ddl.isSync_ddl());
// forward debug_actions to the catalogd
@@ -689,11 +691,12 @@ public class Frontend {
ddl.getDdl_params()
.setDebug_action(result.getQuery_options().getDebug_action());
}
- }
- if (ddl.getOp_type() == TCatalogOpType.RESET_METADATA) {
+ } else if (ddl.getOp_type() == TCatalogOpType.RESET_METADATA) {
ddl.getReset_metadata_params().setSync_ddl(ddl.isSync_ddl());
ddl.getReset_metadata_params().setRefresh_updated_hms_partitions(
result.getQuery_options().isRefresh_updated_hms_partitions());
+ ddl.getReset_metadata_params().getHeader().setWant_minimal_response(
+ BackendConfig.INSTANCE.getBackendCfg().use_local_catalog);
// forward debug_actions to the catalogd
if (result.getQuery_options().isSetDebug_action()) {
ddl.getReset_metadata_params()
diff --git a/fe/src/test/java/org/apache/impala/catalog/CatalogTest.java
b/fe/src/test/java/org/apache/impala/catalog/CatalogTest.java
index 762107b..0060189 100644
--- a/fe/src/test/java/org/apache/impala/catalog/CatalogTest.java
+++ b/fe/src/test/java/org/apache/impala/catalog/CatalogTest.java
@@ -404,7 +404,8 @@ public class CatalogTest {
List<TPartitionKeyValue> partitionSpec = ImmutableList.of(
new TPartitionKeyValue("year", "2010"),
new TPartitionKeyValue("month", "10"));
- catalog_.reloadPartition(table, partitionSpec, new Reference<>(false),
"test");
+ catalog_.reloadPartition(table, partitionSpec, new Reference<>(false),
+ CatalogObject.ThriftObjectType.NONE, "test");
assertEquals(0L, (long)opsCounts.getLong(GET_FILE_BLOCK_LOCS));
// Loading or reloading an unpartitioned table with some files in it
should not make
@@ -426,7 +427,8 @@ public class CatalogTest {
partBuilder.setFileDescriptors(new ArrayList<>());
table.updatePartition(partBuilder);
stats.reset();
- catalog_.reloadPartition(table, partitionSpec, new Reference<>(false),
"test");
+ catalog_.reloadPartition(table, partitionSpec, new Reference<>(false),
+ CatalogObject.ThriftObjectType.NONE, "test");
// Should not scan the directory file-by-file, should use a single
// listLocatedStatus() to get the whole directory (partition)