This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new ff8bb33b9 IMPALA-12870: Tag query id for Java pool threads
ff8bb33b9 is described below

commit ff8bb33b91cd689bfaf2a51a19d0f615edd33d54
Author: stiga-huang <[email protected]>
AuthorDate: Fri Oct 17 19:40:13 2025 +0800

    IMPALA-12870: Tag query id for Java pool threads
    
    Logs from Java threads running in ExecutorService are missing the query
    id which is stored in the C++ thread-local ThreadDebugInfo variable.
    This patch adds JNI calls for Java threads to manage the ThreadDebugInfo
    variable. Currently two thread pools are changed:
     - MissingTable loading pool in StmtMetadataLoader.parallelTableLoad().
     - Table loading pool in TableLoadingMgr.
    
    MissingTable loading pool only lives within the parallelTableLoad()
    method. So we initialize ThreadDebugInfo with the queryId at the
    beginning of the thread and delete it at the end of the thread. Note
    that a thread might be reused to load different tables, but they all
    belong to the same query.
    
    Table loading pool is a long running pool in catalogd that never
    shut down. Threads in it is used to load tables triggered by different
    queries. We initialize ThreadDebugInfo as the above but update it when
    the thread starts loading table for a different query id, and reset it
    when the loading is done. The query id is passed down from the catalogd
    RPC request headers.
    
    Tests:
     - Added e2e test to verify the logs.
     - Ran existing CORE tests.
    
    Change-Id: I83cca55edc72de35f5e8c5422efc104e6aa894c1
    Reviewed-on: http://gerrit.cloudera.org:8080/23558
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 be/src/service/fe-support.cc                       | 63 +++++++++++++-
 .../apache/impala/analysis/StmtMetadataLoader.java |  8 +-
 .../impala/catalog/CatalogServiceCatalog.java      | 15 ++--
 .../org/apache/impala/catalog/TableLoadingMgr.java | 23 ++++--
 .../metastore/CatalogMetastoreServiceHandler.java  | 10 +--
 .../apache/impala/common/TaggedThreadFactory.java  | 96 ++++++++++++++++++++++
 .../apache/impala/service/CatalogOpExecutor.java   | 71 +++++++++-------
 .../java/org/apache/impala/service/FeSupport.java  | 15 ++++
 .../authorization/AuthorizationStmtTest.java       |  5 ++
 .../authorization/ranger/RangerAuditLogTest.java   |  4 +
 tests/custom_cluster/test_observability.py         | 38 ++++++++-
 11 files changed, 289 insertions(+), 59 deletions(-)

diff --git a/be/src/service/fe-support.cc b/be/src/service/fe-support.cc
index 560716cb8..3398c7bf1 100644
--- a/be/src/service/fe-support.cc
+++ b/be/src/service/fe-support.cc
@@ -591,8 +591,7 @@ 
Java_org_apache_impala_service_FeSupport_NativeGetPartialCatalogObject(
   // Populate the request header.
   if (!request.__isset.header) {
     ThreadDebugInfo* tdi = GetThreadDebugInfo();
-    // TODO: After IMPALA-14447, query ids might be missing in some threads. 
This will
-    // be addressed in IMPALA-12870.
+    // Requests from WebUI don't have ThreadDebugInfo and query ids.
     if (tdi != nullptr) {
       request.__set_header(TCatalogServiceRequestHeader());
       request.header.__set_query_id(tdi->GetQueryId());
@@ -807,6 +806,50 @@ 
Java_org_apache_impala_service_FeSupport_NativeWaitForHmsEvents(JNIEnv* env,
   return result_bytes;
 }
 
+extern "C" JNIEXPORT jlong JNICALL
+Java_org_apache_impala_service_FeSupport_NativeInitThreadDebugInfo(JNIEnv* env,
+    jclass fe_support_class, jbyteArray thrift_query_id) {
+  TUniqueId query_id;
+  THROW_IF_ERROR_RET(DeserializeThriftMsg(env, thrift_query_id, &query_id), 
env,
+      JniUtil::internal_exc_class(), 0);
+  ThreadDebugInfo* tdi = new ThreadDebugInfo();
+  tdi->SetQueryId(query_id);
+  return reinterpret_cast<jlong>(tdi);
+}
+
+extern "C" JNIEXPORT void JNICALL
+Java_org_apache_impala_service_FeSupport_NativeUpdateThreadDebugInfo(JNIEnv* 
env,
+    jclass fe_support_class, jbyteArray thrift_query_id) {
+  TUniqueId query_id;
+  THROW_IF_ERROR(DeserializeThriftMsg(env, thrift_query_id, &query_id), env,
+      JniUtil::internal_exc_class());
+  ThreadDebugInfo* tdi = GetThreadDebugInfo();
+  DCHECK(tdi != nullptr);
+  if (tdi == nullptr) {
+    LOG(WARNING) << "ThreadDebugInfo is null. Not tagging query id " << 
PrintId(query_id);
+    return;
+  }
+  tdi->SetQueryId(query_id);
+}
+
+extern "C" JNIEXPORT void JNICALL
+Java_org_apache_impala_service_FeSupport_NativeResetThreadDebugInfo(JNIEnv* 
env,
+    jclass fe_support_class) {
+  ThreadDebugInfo* tdi = GetThreadDebugInfo();
+  // If ThreadDebugInfo is null, no need to reset anything.
+  if (tdi == nullptr) return;
+  // A 0 unique id, which indicates that one has not been set.
+  static const TUniqueId ZERO_UNIQUE_ID;
+  tdi->SetQueryId(ZERO_UNIQUE_ID);
+}
+
+extern "C" JNIEXPORT void JNICALL
+Java_org_apache_impala_service_FeSupport_NativeDeleteThreadDebugInfo(JNIEnv* 
env,
+    jclass fe_support_class, jlong tdi) {
+  if (tdi == 0) return;
+  delete (ThreadDebugInfo*)tdi;
+}
+
 namespace impala {
 
 static JNINativeMethod native_methods[] = {
@@ -902,6 +945,22 @@ static JNINativeMethod native_methods[] = {
     const_cast<char*>("NativeWaitForHmsEvents"), const_cast<char*>("([B[B)[B"),
     (void*)::Java_org_apache_impala_service_FeSupport_NativeWaitForHmsEvents
   },
+  {
+    const_cast<char*>("NativeInitThreadDebugInfo"), const_cast<char*>("([B)J"),
+    (void*)::Java_org_apache_impala_service_FeSupport_NativeInitThreadDebugInfo
+  },
+  {
+    const_cast<char*>("NativeUpdateThreadDebugInfo"), 
const_cast<char*>("([B)V"),
+    
(void*)::Java_org_apache_impala_service_FeSupport_NativeUpdateThreadDebugInfo
+  },
+  {
+    const_cast<char*>("NativeResetThreadDebugInfo"), const_cast<char*>("()V"),
+    
(void*)::Java_org_apache_impala_service_FeSupport_NativeResetThreadDebugInfo
+  },
+  {
+    const_cast<char*>("NativeDeleteThreadDebugInfo"), 
const_cast<char*>("(J)V"),
+    
(void*)::Java_org_apache_impala_service_FeSupport_NativeDeleteThreadDebugInfo
+  },
 };
 
 void InitFeSupport(bool disable_codegen) {
diff --git 
a/fe/src/main/java/org/apache/impala/analysis/StmtMetadataLoader.java 
b/fe/src/main/java/org/apache/impala/analysis/StmtMetadataLoader.java
index 7ef9290c4..e13695367 100644
--- a/fe/src/main/java/org/apache/impala/analysis/StmtMetadataLoader.java
+++ b/fe/src/main/java/org/apache/impala/analysis/StmtMetadataLoader.java
@@ -52,6 +52,7 @@ import org.apache.impala.catalog.local.LocalCatalogException;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.common.InternalException;
 import org.apache.impala.common.Pair;
+import org.apache.impala.common.TaggedThreadFactory;
 import org.apache.impala.compat.MetastoreShim;
 import org.apache.impala.service.BackendConfig;
 import org.apache.impala.service.Frontend;
@@ -66,7 +67,6 @@ import org.slf4j.LoggerFactory;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Throwables;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 /**
  * Loads all table and view metadata relevant for a single SQL statement and 
returns the
@@ -370,10 +370,8 @@ public class StmtMetadataLoader {
     List<Pair<TableName, FeTable>> tables = new ArrayList<>();
     ExecutorService executorService = null;
     try {
-      executorService = Executors.newFixedThreadPool(maxThreads,
-          new ThreadFactoryBuilder()
-              .setNameFormat("MissingTableLoaderThread-" + queryIdStr + "-%d")
-              .build());
+      executorService = Executors.newFixedThreadPool(maxThreads, new 
TaggedThreadFactory(
+          queryId_, "MissingTableLoaderThread-" + queryIdStr + "-%d"));
       // Transform tbls to a list of tasks.
       List<Callable<Pair<TableName, FeTable>>> tasks =
           tbls.stream()
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java 
b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
index 02e8d3eb7..b797c4ede 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
@@ -920,8 +920,9 @@ public class CatalogServiceCatalog extends Catalog {
           .toString(), request.valid_write_ids);
     }
     long tableId = request.getTable_id();
+    // This is only used in the legacy catalog mode. No query ids are sent 
here.
     Table table = getOrLoadTable(tableName.db_name, tableName.table_name,
-        "needed to fetch partition stats", writeIdList, tableId,
+        "needed to fetch partition stats", /*queryId*/null, writeIdList, 
tableId,
         NoOpEventSequence.INSTANCE);
 
     // Table could be null if it does not exist anymore.
@@ -2801,7 +2802,7 @@ public class CatalogServiceCatalog extends Catalog {
 
   public @Nullable Table getOrLoadTable(String dbName, String tblName, String 
reason,
       ValidWriteIdList validWriteIdList) throws CatalogException {
-    return getOrLoadTable(dbName, tblName, reason, validWriteIdList,
+    return getOrLoadTable(dbName, tblName, reason, null, validWriteIdList,
         TABLE_ID_UNAVAILABLE, NoOpEventSequence.INSTANCE);
   }
 
@@ -2815,8 +2816,8 @@ public class CatalogServiceCatalog extends Catalog {
    * (not yet loaded table) will be returned.
    */
   public @Nullable Table getOrLoadTable(String dbName, String tblName, String 
reason,
-      ValidWriteIdList validWriteIdList, long tableId, EventSequence 
catalogTimeline)
-      throws CatalogException {
+      @Nullable TUniqueId queryId, ValidWriteIdList validWriteIdList, long 
tableId,
+      EventSequence catalogTimeline) throws CatalogException {
     TTableName tableName = new TTableName(dbName.toLowerCase(), 
tblName.toLowerCase());
     Table tbl;
     TableLoadingMgr.LoadRequest loadReq = null;
@@ -2889,7 +2890,7 @@ public class CatalogServiceCatalog extends Catalog {
         previousCatalogVersion = tbl.getCatalogVersion();
         LOG.trace("Loading full table {}", tbl.getFullName());
         loadReq = tableLoadingMgr_.loadAsync(tableName, 
tbl.getCreateEventId(), reason,
-            catalogTimeline);
+            queryId, catalogTimeline);
       }
     } finally {
       versionLock_.readLock().unlock();
@@ -4371,9 +4372,11 @@ public class CatalogServiceCatalog extends Catalog {
               dbName + "." + tblName, req.table_info_selector.valid_write_ids);
           tableId = req.table_info_selector.getTable_id();
         }
+        TUniqueId queryId = req.isSetHeader() && req.header.isSetQuery_id() ?
+            req.header.query_id : null;
         table = getOrLoadTable(
             objectDesc.getTable().getDb_name(), 
objectDesc.getTable().getTbl_name(),
-            tableLoadReason, writeIdList, tableId, NoOpEventSequence.INSTANCE);
+            tableLoadReason, queryId, writeIdList, tableId, 
NoOpEventSequence.INSTANCE);
       } catch (DatabaseNotFoundException e) {
         return createGetPartialCatalogObjectError(req, 
CatalogLookupStatus.DB_NOT_FOUND);
       }
diff --git a/fe/src/main/java/org/apache/impala/catalog/TableLoadingMgr.java 
b/fe/src/main/java/org/apache/impala/catalog/TableLoadingMgr.java
index 08de02566..5999b2349 100644
--- a/fe/src/main/java/org/apache/impala/catalog/TableLoadingMgr.java
+++ b/fe/src/main/java/org/apache/impala/catalog/TableLoadingMgr.java
@@ -31,7 +31,9 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.impala.common.Pair;
+import org.apache.impala.common.TaggedThreadFactory;
 import org.apache.impala.thrift.TTableName;
+import org.apache.impala.thrift.TUniqueId;
 import org.apache.impala.util.EventSequence;
 import org.apache.impala.util.HdfsCachingUtil;
 import org.apache.impala.util.NoOpEventSequence;
@@ -167,7 +169,7 @@ public class TableLoadingMgr {
     tblLoader_ = new TableLoader(catalog_);
     numLoadingThreads_ = numLoadingThreads;
     tblLoadingPool_ = Executors.newFixedThreadPool(numLoadingThreads_,
-        new 
ThreadFactoryBuilder().setNameFormat("TableLoadingThread-%d").build());
+        new TaggedThreadFactory("TableLoadingThread-%d"));
 
     // Start the background table loading submitter threads.
     startTableLoadingSubmitterThreads();
@@ -236,7 +238,7 @@ public class TableLoadingMgr {
    * loads of the same table.
    */
   public LoadRequest loadAsync(final TTableName tblName, final long 
createdEventId,
-      final String reason, final EventSequence catalogTimeline)
+      final String reason, final TUniqueId queryId, final EventSequence 
catalogTimeline)
       throws DatabaseNotFoundException {
     final Db parentDb = catalog_.getDb(tblName.getDb_name());
     if (parentDb == null) {
@@ -244,13 +246,16 @@ public class TableLoadingMgr {
           "Database '" + tblName.getDb_name() + "' was not found.");
     }
 
-    FutureTask<Table> tableLoadTask = new FutureTask<Table>(new 
Callable<Table>() {
-        @Override
-        public Table call() throws Exception {
-          catalogTimeline.markEvent("Start loading table");
-          return tblLoader_.load(parentDb, tblName.table_name, createdEventId, 
reason,
-              catalogTimeline);
-        }});
+    FutureTask<Table> tableLoadTask = new FutureTask<>(() -> {
+      try {
+        TaggedThreadFactory.updateQueryId(queryId);
+        catalogTimeline.markEvent("Start loading table");
+        return tblLoader_.load(parentDb, tblName.table_name, createdEventId, 
reason,
+            catalogTimeline);
+      } finally {
+        TaggedThreadFactory.resetQueryId();
+      }
+    });
 
     FutureTask<Table> existingValue = loadingTables_.putIfAbsent(tblName, 
tableLoadTask);
     if (existingValue == null) {
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/metastore/CatalogMetastoreServiceHandler.java
 
b/fe/src/main/java/org/apache/impala/catalog/metastore/CatalogMetastoreServiceHandler.java
index cc2b141f5..dcdd0f551 100644
--- 
a/fe/src/main/java/org/apache/impala/catalog/metastore/CatalogMetastoreServiceHandler.java
+++ 
b/fe/src/main/java/org/apache/impala/catalog/metastore/CatalogMetastoreServiceHandler.java
@@ -873,9 +873,9 @@ public class CatalogMetastoreServiceHandler extends 
MetastoreServiceHandler {
       String destDb = MetaStoreUtils.parseDbName(destDbWithCatalog, 
serverConf_)[1];
       EventSequence catalogTimeline = NoOpEventSequence.INSTANCE;
       srcTbl = catalogOpExecutor_.getExistingTable(sourceDb, sourceTbl, 
apiName,
-          catalogTimeline);
+          /*queryId*/null, catalogTimeline);
       destinationTbl = catalogOpExecutor_.getExistingTable(destDb, destTbl, 
apiName,
-          catalogTimeline);
+          /*queryId*/null, catalogTimeline);
 
       if (!catalog_.tryWriteLock(
           new org.apache.impala.catalog.Table[] {srcTbl, destinationTbl})) {
@@ -920,9 +920,9 @@ public class CatalogMetastoreServiceHandler extends 
MetastoreServiceHandler {
       String destDb = MetaStoreUtils.parseDbName(destDbWithCatalog, 
serverConf_)[1];
       EventSequence catalogTimeline = NoOpEventSequence.INSTANCE;
       srcTbl = catalogOpExecutor_.getExistingTable(sourceDb, sourceTbl, 
apiName,
-          catalogTimeline);
+          /*queryId*/null, catalogTimeline);
       destinationTbl = catalogOpExecutor_.getExistingTable(destDb, destTbl, 
apiName,
-          catalogTimeline);
+          /*queryId*/null, catalogTimeline);
 
       if (!catalog_.tryWriteLock(
           new org.apache.impala.catalog.Table[] {srcTbl, destinationTbl})) {
@@ -1216,7 +1216,7 @@ public class CatalogMetastoreServiceHandler extends 
MetastoreServiceHandler {
     org.apache.impala.catalog.Table tbl = null;
     try {
       String dbName = MetaStoreUtils.parseDbName(dbNameWithCatalog, 
serverConf_)[1];
-      tbl = catalogOpExecutor_.getExistingTable(dbName, tblName, apiName,
+      tbl = catalogOpExecutor_.getExistingTable(dbName, tblName, apiName, 
/*queryId*/null,
           NoOpEventSequence.INSTANCE);
     } catch (Exception e) {
       rethrowException(e, apiName);
diff --git a/fe/src/main/java/org/apache/impala/common/TaggedThreadFactory.java 
b/fe/src/main/java/org/apache/impala/common/TaggedThreadFactory.java
new file mode 100644
index 000000000..544310719
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/common/TaggedThreadFactory.java
@@ -0,0 +1,96 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.common;
+
+import org.apache.impala.service.FeSupport;
+import org.apache.impala.thrift.TUniqueId;
+import org.apache.impala.util.TUniqueIdUtil;
+import org.apache.thrift.TException;
+import org.apache.thrift.TSerializer;
+import org.jetbrains.annotations.NotNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * ThreadFactory to create threads that can be tagged with query ids which 
will appear in
+ * their logs. The query id is stored in a thread-local ThreadDebugInfo 
variable in the
+ * Backend. GLog retrieves the query id from it and prepend it to the logs. 
See more
+ * details in common/thread-debug-info.h and MessageListener() in 
common/logging.cc.
+ */
+public class TaggedThreadFactory implements ThreadFactory {
+  private final static Logger LOG = 
LoggerFactory.getLogger(TaggedThreadFactory.class);
+  private final static TUniqueId ZERO_QUERY_ID = new TUniqueId();
+  private final static AtomicInteger poolNumber = new AtomicInteger(0);
+  private final String nameFormat_;
+  private byte[] thriftQueryId_;
+
+  /**
+   * Initialize threads without query ids. They can be updated later.
+   */
+  public TaggedThreadFactory(String nameFormat) {
+    this(ZERO_QUERY_ID, nameFormat);
+  }
+
+  /**
+   * Initialize threads with a given query id.
+   */
+  public TaggedThreadFactory(TUniqueId queryId, String nameFormat) {
+    nameFormat_ = nameFormat;
+    if (queryId == null) queryId = ZERO_QUERY_ID;
+    try {
+      thriftQueryId_ = new TSerializer().serialize(queryId);
+    } catch (TException e) {
+      LOG.error("Failed to serialize query id {}", 
TUniqueIdUtil.PrintId(queryId));
+    }
+  }
+
+  @Override
+  public Thread newThread(@NotNull Runnable r) {
+    Runnable initializerRunnable = () -> {
+      long ptr = 0;
+      try {
+        ptr = FeSupport.NativeInitThreadDebugInfo(thriftQueryId_);
+        r.run();
+      } catch (Throwable e) {
+        LOG.error("Pool thread exception", e);
+      } finally {
+        // The thread-local ThreadDebugInfo variable is owned by the Java 
thread so we
+        // should delete it at the end.
+        FeSupport.NativeDeleteThreadDebugInfo(ptr);
+      }
+    };
+    return new Thread(initializerRunnable, String.format(
+        nameFormat_, poolNumber.getAndIncrement()));
+  }
+
+  public static void updateQueryId(TUniqueId queryId) {
+    if (queryId == null) return;
+    try {
+      FeSupport.NativeUpdateThreadDebugInfo(new 
TSerializer().serialize(queryId));
+    } catch (TException e) {
+      LOG.error("Failed to update query id {}", 
TUniqueIdUtil.PrintId(queryId));
+    }
+  }
+
+  public static void resetQueryId() {
+    FeSupport.NativeResetThreadDebugInfo();
+  }
+}
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java 
b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index fb5640359..163cca702 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -479,13 +479,14 @@ public class CatalogOpExecutor {
           tTableName = Optional.of(alter_table_params.getTable_name());
           catalogOpTracker_.increment(ddlRequest, tTableName);
           alterTable(alter_table_params, debugAction, wantMinimalResult, 
response,
-              catalogTimeline);
+              catalogTimeline, queryId);
           break;
         case ALTER_VIEW:
           TCreateOrAlterViewParams alter_view_params = 
ddlRequest.getAlter_view_params();
           tTableName = Optional.of(alter_view_params.getView_name());
           catalogOpTracker_.increment(ddlRequest, tTableName);
-          alterView(alter_view_params, wantMinimalResult, response, 
catalogTimeline);
+          alterView(alter_view_params, wantMinimalResult, response, 
catalogTimeline,
+              queryId);
           break;
         case CREATE_DATABASE:
           TCreateDbParams create_db_params = ddlRequest.getCreate_db_params();
@@ -515,7 +516,7 @@ public class CatalogOpExecutor {
           tTableName = Optional.of(create_table_like_params.getTable_name());
           catalogOpTracker_.increment(ddlRequest, tTableName);
           createTableLike(create_table_like_params, response, catalogTimeline, 
syncDdl,
-              wantMinimalResult, debugAction);
+              wantMinimalResult, debugAction, queryId);
           break;
         case CREATE_VIEW:
           TCreateOrAlterViewParams create_view_params =
@@ -547,7 +548,7 @@ public class CatalogOpExecutor {
           tTableName = Optional.of(drop_stats_params.getTable_name());
           catalogOpTracker_.increment(ddlRequest, tTableName);
           dropStats(drop_stats_params, wantMinimalResult, response, 
catalogTimeline,
-              ddlRequest.getQuery_options().getDebug_action());
+              ddlRequest.getQuery_options().getDebug_action(), queryId);
           break;
         case DROP_DATABASE:
           TDropDbParams drop_db_params = ddlRequest.getDrop_db_params();
@@ -568,7 +569,7 @@ public class CatalogOpExecutor {
           dropTableOrView(drop_table_or_view_params, response,
               ddlRequest.getQuery_options().getLock_max_wait_time_s(),
               ddlRequest.getQuery_options().getKudu_table_reserve_seconds(),
-              catalogTimeline);
+              catalogTimeline, queryId);
           break;
         case TRUNCATE_TABLE:
           TTruncateParams truncate_params = ddlRequest.getTruncate_params();
@@ -576,7 +577,7 @@ public class CatalogOpExecutor {
           catalogOpTracker_.increment(ddlRequest, tTableName);
           truncateTable(truncate_params, wantMinimalResult, response,
               ddlRequest.getQuery_options().getLock_max_wait_time_s(), 
catalogTimeline,
-              ddlRequest.getQuery_options().getDebug_action());
+              ddlRequest.getQuery_options().getDebug_action(), queryId);
           break;
         case DROP_FUNCTION:
           TDropFunctionParams drop_func_params = 
ddlRequest.getDrop_fn_params();
@@ -1209,8 +1210,8 @@ public class CatalogOpExecutor {
    * serialized.
    */
   private void alterTable(TAlterTableParams params, @Nullable String 
debugAction,
-      boolean wantMinimalResult, TDdlExecResponse response, EventSequence 
catalogTimeline)
-      throws ImpalaException {
+      boolean wantMinimalResult, TDdlExecResponse response, EventSequence 
catalogTimeline,
+      TUniqueId queryId) throws ImpalaException {
     // When true, loads the file/block metadata.
     boolean reloadFileMetadata = false;
     // When true, loads the table schema and the column stats from the Hive 
Metastore.
@@ -1220,7 +1221,7 @@ public class CatalogOpExecutor {
 
     TableName tableName = TableName.fromThrift(params.getTable_name());
     Table tbl = getExistingTable(tableName.getDb(), tableName.getTbl(),
-        "Load for ALTER TABLE", catalogTimeline);
+        "Load for ALTER TABLE", queryId, catalogTimeline);
     if (params.getAlter_type() == TAlterTableType.RENAME_VIEW
         || params.getAlter_type() == TAlterTableType.RENAME_TABLE) {
       TableName newTableName = TableName.fromThrift(
@@ -1888,14 +1889,15 @@ public class CatalogOpExecutor {
    * a table instead of a a view.
    */
    private void alterView(TCreateOrAlterViewParams params, boolean 
wantMinimalResult,
-       TDdlExecResponse resp, EventSequence catalogTimeline) throws 
ImpalaException {
+       TDdlExecResponse resp, EventSequence catalogTimeline, TUniqueId queryId)
+       throws ImpalaException {
     TableName tableName = TableName.fromThrift(params.getView_name());
     Preconditions.checkState(tableName != null && 
tableName.isFullyQualified());
     Preconditions.checkState(params.getColumns() != null &&
         params.getColumns().size() > 0,
           "Null or empty column list given as argument to 
DdlExecutor.alterView");
     Table tbl = getExistingTable(tableName.getDb(), tableName.getTbl(),
-        "Load for ALTER VIEW", catalogTimeline);
+        "Load for ALTER VIEW", queryId, catalogTimeline);
     Preconditions.checkState(tbl instanceof View, "Expected view: %s",
         tableName);
     tryWriteLock(tbl, catalogTimeline);
@@ -2788,10 +2790,11 @@ public class CatalogOpExecutor {
    * to protect against concurrent modifications.
    */
   private void dropStats(TDropStatsParams params, boolean wantMinimalResult,
-      TDdlExecResponse resp, EventSequence catalogTimeline, @Nullable String 
debugAction)
-      throws ImpalaException {
+      TDdlExecResponse resp, EventSequence catalogTimeline, @Nullable String 
debugAction,
+      TUniqueId queryId) throws ImpalaException {
     Table table = getExistingTable(params.getTable_name().getDb_name(),
-        params.getTable_name().getTable_name(), "Load for DROP STATS", 
catalogTimeline);
+        params.getTable_name().getTable_name(), "Load for DROP STATS", queryId,
+        catalogTimeline);
     Preconditions.checkNotNull(table);
     // There is no transactional HMS API to drop stats at the moment 
(HIVE-22104).
     Preconditions.checkState(!AcidUtils.isTransactionalTable(table));
@@ -3170,8 +3173,8 @@ public class CatalogOpExecutor {
    * executing the drop operation.
    */
   private void dropTableOrView(TDropTableOrViewParams params, TDdlExecResponse 
resp,
-      int lockMaxWaitTime, int kudu_table_reserve_seconds, EventSequence 
catalogTimeline)
-      throws ImpalaException {
+      int lockMaxWaitTime, int kudu_table_reserve_seconds, EventSequence 
catalogTimeline,
+      TUniqueId queryId) throws ImpalaException {
     TableName tableName = TableName.fromThrift(params.getTable_name());
     Preconditions.checkState(tableName != null && 
tableName.isFullyQualified());
     Preconditions.checkState(!catalog_.isBlacklistedTable(tableName) || 
params.if_exists,
@@ -3197,7 +3200,7 @@ public class CatalogOpExecutor {
       // we pass null validWriteIdList here since we don't really care what 
version of
       // table is loaded, eventually its going to be dropped below.
       catalog_.getOrLoadTable(params.getTable_name().db_name,
-          params.getTable_name().table_name, "Load for DROP TABLE/VIEW", null,
+          params.getTable_name().table_name, "Load for DROP TABLE/VIEW", 
queryId, null,
           TABLE_ID_UNAVAILABLE, catalogTimeline);
       catalogTimeline.markEvent("Loaded catalog table");
     } catch (CatalogException e) {
@@ -3435,12 +3438,12 @@ public class CatalogOpExecutor {
    */
   private void truncateTable(TTruncateParams params, boolean wantMinimalResult,
       TDdlExecResponse resp, int lockMaxWaitTime, EventSequence 
catalogTimeline,
-      @Nullable String debugAction) throws ImpalaException {
+      @Nullable String debugAction, TUniqueId queryId) throws ImpalaException {
     TTableName tblName = params.getTable_name();
     Table table = null;
     try {
       table = getExistingTable(tblName.getDb_name(), tblName.getTable_name(),
-          "Load for TRUNCATE TABLE", catalogTimeline);
+          "Load for TRUNCATE TABLE", queryId, catalogTimeline);
     } catch (TableNotFoundException e) {
       if (params.if_exists) {
         addSummary(resp, "Table does not exist.");
@@ -4503,7 +4506,7 @@ public class CatalogOpExecutor {
    */
   private void createTableLike(TCreateTableLikeParams params, TDdlExecResponse 
response,
       EventSequence catalogTimeline, boolean syncDdl, boolean 
wantMinimalResult,
-      @Nullable String debugAction) throws ImpalaException {
+      @Nullable String debugAction, TUniqueId queryId) throws ImpalaException {
     Preconditions.checkNotNull(params);
     THdfsFileFormat fileFormat =
         params.isSetFile_format() ? params.getFile_format() : null;
@@ -4548,7 +4551,7 @@ public class CatalogOpExecutor {
       return;
     }
     Table srcTable = getExistingTable(srcTblName.getDb(), srcTblName.getTbl(),
-        "Load source for CREATE TABLE LIKE", catalogTimeline);
+        "Load source for CREATE TABLE LIKE", queryId, catalogTimeline);
     org.apache.hadoop.hive.metastore.api.Table tbl =
         srcTable.getMetaStoreTable().deepCopy();
     tbl.setDbName(tblName.getDb());
@@ -7367,8 +7370,10 @@ public class CatalogOpExecutor {
           // If the table is not loaded, no need to perform refresh after the 
initial
           // metadata load.
           boolean isTableLoadedInCatalog = tbl.isLoaded();
+          TUniqueId queryId = req.isSetHeader() && 
req.getHeader().isSetQuery_id() ?
+              req.header.query_id : null;
           tbl = getExistingTable(tblName.getDb(), tblName.getTbl(),
-              "Load triggered by " + cmdString, catalogTimeline);
+              "Load triggered by " + cmdString, queryId, catalogTimeline);
           CatalogObject.ThriftObjectType resultType =
               req.header.want_minimal_response ?
                   CatalogObject.ThriftObjectType.INVALIDATION :
@@ -7587,9 +7592,11 @@ public class CatalogOpExecutor {
       throws ImpalaException {
     EventSequence catalogTimeline = new EventSequence(CATALOG_TIMELINE_NAME);
     TUpdateCatalogResponse response = new TUpdateCatalogResponse();
+    TUniqueId queryId = update.isSetHeader() && update.header.isSetQuery_id() ?
+        update.header.query_id : null;
     // Only update metastore for Hdfs tables.
     Table table = getExistingTable(update.getDb_name(), 
update.getTarget_table(),
-        "Load for INSERT", catalogTimeline);
+        "Load for INSERT", queryId, catalogTimeline);
     if (!(table instanceof FeFsTable)) {
       throw new InternalException("Unexpected table type: " +
           update.getTarget_table());
@@ -8223,10 +8230,10 @@ public class CatalogOpExecutor {
    * know when a table has been dropped and re-created with the same name.
    */
   public Table getExistingTable(String dbName, String tblName, String reason,
-      EventSequence catalogTimeline) throws CatalogException {
+      TUniqueId queryId, EventSequence catalogTimeline) throws 
CatalogException {
     // passing null validWriteIdList makes sure that we return the table if it 
is
     // already loaded.
-    Table tbl = catalog_.getOrLoadTable(dbName, tblName, reason, null,
+    Table tbl = catalog_.getOrLoadTable(dbName, tblName, reason, queryId, null,
         TABLE_ID_UNAVAILABLE, catalogTimeline);
     if (tbl == null) {
       throw new TableNotFoundException("Table not found: " + dbName + "." + 
tblName);
@@ -8257,6 +8264,8 @@ public class CatalogOpExecutor {
       throws ImpalaRuntimeException, CatalogException, InternalException {
     Preconditions.checkState(tTableName.isPresent());
     TCommentOnParams params = ddlRequest.getComment_on_params();
+    TUniqueId queryId = ddlRequest.isSetHeader() && 
ddlRequest.header.isSetQuery_id() ?
+        ddlRequest.header.query_id : null;
     if (params.getDb() != null) {
       Preconditions.checkArgument(!params.isSetTable_name() &&
           !params.isSetColumn_name());
@@ -8271,7 +8280,7 @@ public class CatalogOpExecutor {
       catalogOpTracker_.increment(ddlRequest, tTableName);
       alterCommentOnTableOrView(TableName.fromThrift(params.getTable_name()),
           params.getComment(), wantMinimalResult, response, catalogTimeline,
-          ddlRequest.getQuery_options().getDebug_action());
+          ddlRequest.getQuery_options().getDebug_action(), queryId);
     } else if (params.getColumn_name() != null) {
       Preconditions.checkArgument(!params.isSetDb() && 
!params.isSetTable_name());
       TColumnName columnName = params.getColumn_name();
@@ -8280,7 +8289,7 @@ public class CatalogOpExecutor {
       catalogOpTracker_.increment(ddlRequest, tTableName);
       alterCommentOnColumn(TableName.fromThrift(columnName.getTable_name()),
           columnName.getColumn_name(), params.getComment(), wantMinimalResult, 
response,
-          catalogTimeline, ddlRequest.getQuery_options().getDebug_action());
+          catalogTimeline, ddlRequest.getQuery_options().getDebug_action(), 
queryId);
     } else {
       throw new UnsupportedOperationException("Unsupported COMMENT ON 
operation");
     }
@@ -8402,10 +8411,10 @@ public class CatalogOpExecutor {
 
   private void alterCommentOnTableOrView(TableName tableName, String comment,
       boolean wantMinimalResult, TDdlExecResponse response, EventSequence 
catalogTimeline,
-      @Nullable String debugAction)
+      @Nullable String debugAction, TUniqueId queryId)
       throws CatalogException, InternalException, ImpalaRuntimeException {
     Table tbl = getExistingTable(tableName.getDb(), tableName.getTbl(),
-        "Load for ALTER COMMENT", catalogTimeline);
+        "Load for ALTER COMMENT", queryId, catalogTimeline);
     tryWriteLock(tbl, catalogTimeline);
     try {
       InProgressTableModification modification =
@@ -8434,10 +8443,10 @@ public class CatalogOpExecutor {
 
   private void alterCommentOnColumn(TableName tableName, String columnName,
       String comment, boolean wantMinimalResult, TDdlExecResponse response,
-      EventSequence catalogTimeline, @Nullable String debugAction)
+      EventSequence catalogTimeline, @Nullable String debugAction, TUniqueId 
queryId)
       throws CatalogException, InternalException, ImpalaRuntimeException {
     Table tbl = getExistingTable(tableName.getDb(), tableName.getTbl(),
-        "Load for ALTER COLUMN COMMENT", catalogTimeline);
+        "Load for ALTER COLUMN COMMENT", queryId, catalogTimeline);
     tryWriteLock(tbl, catalogTimeline);
     try {
       InProgressTableModification modification =
diff --git a/fe/src/main/java/org/apache/impala/service/FeSupport.java 
b/fe/src/main/java/org/apache/impala/service/FeSupport.java
index 59bba2542..ee8314e63 100644
--- a/fe/src/main/java/org/apache/impala/service/FeSupport.java
+++ b/fe/src/main/java/org/apache/impala/service/FeSupport.java
@@ -160,6 +160,21 @@ public class FeSupport {
   public native static byte[] NativeWaitForHmsEvents(byte[] thriftReq,
       byte[] thriftQueryOptions);
 
+  // Initialize ThreadDebugInfo for the current thread. Should only be used 
once for a
+  // thread.
+  public native static long NativeInitThreadDebugInfo(byte[] thriftQueryId);
+
+  // Update the ThreadDebugInfo to track a new query id. So logs of the 
current thread
+  // can be tagged with that query id.
+  public native static void NativeUpdateThreadDebugInfo(byte[] thriftQueryId);
+
+  // Reset the ThreadDebugInfo to not tag on any query ids.
+  public native static void NativeResetThreadDebugInfo();
+
+  // Delete the ThreadDebugInfo created in Backend. Should only be used once 
for a thread
+  // that has initialized ThreadDebugInfo.
+  public native static void NativeDeleteThreadDebugInfo(long ptr);
+
   /**
    * Locally caches the jar at the specified HDFS location.
    *
diff --git 
a/fe/src/test/java/org/apache/impala/authorization/AuthorizationStmtTest.java 
b/fe/src/test/java/org/apache/impala/authorization/AuthorizationStmtTest.java
index 1991e7133..766a4a43e 100644
--- 
a/fe/src/test/java/org/apache/impala/authorization/AuthorizationStmtTest.java
+++ 
b/fe/src/test/java/org/apache/impala/authorization/AuthorizationStmtTest.java
@@ -29,6 +29,7 @@ import org.apache.impala.common.AnalysisException;
 import org.apache.impala.common.ImpalaException;
 import org.apache.impala.common.RuntimeEnv;
 import org.apache.impala.service.BackendConfig;
+import org.apache.impala.service.FeSupport;
 import org.apache.impala.thrift.TDescribeOutputStyle;
 import org.apache.impala.thrift.TPrivilegeLevel;
 import org.apache.impala.thrift.TQueryOptions;
@@ -66,6 +67,10 @@ public class AuthorizationStmtTest extends 
AuthorizationTestBase {
     super(authzProvider);
   }
 
+  static {
+    FeSupport.loadLibrary();
+  }
+
   @BeforeClass
   public static void setUp() {
     RuntimeEnv.INSTANCE.setTestEnv(true);
diff --git 
a/fe/src/test/java/org/apache/impala/authorization/ranger/RangerAuditLogTest.java
 
b/fe/src/test/java/org/apache/impala/authorization/ranger/RangerAuditLogTest.java
index 8c7e60cc5..587f37fe0 100644
--- 
a/fe/src/test/java/org/apache/impala/authorization/ranger/RangerAuditLogTest.java
+++ 
b/fe/src/test/java/org/apache/impala/authorization/ranger/RangerAuditLogTest.java
@@ -26,6 +26,7 @@ import org.apache.impala.authorization.AuthorizationPolicy;
 import org.apache.impala.authorization.AuthorizationProvider;
 import org.apache.impala.authorization.AuthorizationTestBase;
 import org.apache.impala.common.ImpalaException;
+import org.apache.impala.service.FeSupport;
 import org.apache.impala.thrift.TPrivilege;
 import org.apache.impala.thrift.TPrivilegeLevel;
 import org.apache.ranger.audit.model.AuthzAuditEvent;
@@ -44,6 +45,9 @@ import static org.junit.Assert.assertTrue;
 
 public class RangerAuditLogTest extends AuthorizationTestBase {
   private static RangerAuthorizationCheckerSpy authzChecker_ = null;
+  static {
+    FeSupport.loadLibrary();
+  }
 
   private static class RangerAuthorizationCheckerSpy extends 
RangerAuthorizationChecker {
     private AuthorizationContext authzCtx_;
diff --git a/tests/custom_cluster/test_observability.py 
b/tests/custom_cluster/test_observability.py
index 0a620f0ff..14554c056 100644
--- a/tests/custom_cluster/test_observability.py
+++ b/tests/custom_cluster/test_observability.py
@@ -35,7 +35,7 @@ class TestObservability(CustomClusterTestSuite):
         "select {0}.gc(int_col) from functional.alltypes limit 1000".format(
             unique_database)).runtime_profile
 
-    gc_count_regex = "GcCount:.*\((.*)\)"
+    gc_count_regex = r"GcCount:.*\((.*)\)"
     gc_count_match = re.search(gc_count_regex, profile)
     assert gc_count_match, profile
     assert int(gc_count_match.group(1)) > 0, profile
@@ -44,3 +44,39 @@ class TestObservability(CustomClusterTestSuite):
     gc_time_millis_match = re.search(gc_time_millis_regex, profile)
     assert gc_time_millis_match, profile
     assert parse_duration_string_ms(gc_time_millis_match.group(1)) > 0
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+      catalogd_args="--catalog_topic_mode=minimal",
+      impalad_args="--use_local_catalog=true",
+      disable_log_buffering=True)
+  def test_query_id_in_logs(self, unique_database):
+    res = self.execute_query("create table %s.tbl (i int)" % unique_database)
+    self.assert_catalogd_log_contains(
+        "INFO", "{}] execDdl request: CREATE_TABLE {}.tbl issued by"
+        .format(res.query_id, unique_database))
+
+    res = self.execute_query("explain select * from %s.tbl" % unique_database)
+    self.assert_catalogd_log_contains(
+        "INFO", r"{}] Loading metadata for: {}.tbl \(needed by coordinator\)"
+        .format(res.query_id, unique_database))
+
+    res = self.execute_query(
+        "create table %s.tbl2 as select * from functional.alltypes" % 
unique_database)
+    self.assert_catalogd_log_contains(
+        "INFO", "%s] Loading metadata for table: functional.alltypes" % 
res.query_id)
+    self.assert_catalogd_log_contains(
+        "INFO", "%s] Remaining items in queue: 0. Loads in progress: 1" % 
res.query_id,
+        expected_count=-1)
+    self.assert_catalogd_log_contains(
+        "INFO", r"{}] Loading metadata for: functional.alltypes \(needed by 
coordinator\)"
+        .format(res.query_id))
+    self.assert_catalogd_log_contains(
+        "INFO", "{}] execDdl request: CREATE_TABLE_AS_SELECT {}.tbl2 issued by"
+        .format(res.query_id, unique_database))
+    self.assert_catalogd_log_contains(
+        "INFO", "{}] updateCatalog request: Update catalog for {}.tbl2"
+        .format(res.query_id, unique_database))
+    self.assert_catalogd_log_contains(
+        "INFO", r"{}] Loading metadata for: {}.tbl2 \(Load for INSERT\)"
+        .format(res.query_id, unique_database))


Reply via email to