This is an automated email from the ASF dual-hosted git repository.

michaelsmith pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 6a7030709 IMPALA-12621: Prevent Thread interrupt flag poisoning of JVM 
threads
6a7030709 is described below

commit 6a70307092b51447ce13ae6546e6f2a978066997
Author: Zoltan Borok-Nagy <[email protected]>
AuthorDate: Tue Feb 24 17:40:28 2026 +0100

    IMPALA-12621: Prevent Thread interrupt flag poisoning of JVM threads
    
    JVM threads are being reused across JNI invocations. This means
    if the interrupt flag is being set for a thread when the JNI call
    returns, the next JNI call might get the interrupt (if it checks for
    it).
    
    It can be reproduced manually by adding
    Thread.currentThread().interrupt() calls at the end of our methods
    (e.g. JniFrontend.convertTable(), JniCatalog.updateCatalog()).
    
    Doing so we can trigger errors we can see in IMPALA-12621, IMPALA-10633,
    IMPALA-10924, IMPALA-10540, IMPALA-12261. All these issues have
    a stack trace similar to the following:
    
    W20260221 07:53:31.855443 1324125 DataStreamer.java:832] DataStreamer 
Exception
    Java exception follows:
    java.nio.channels.ClosedByInterruptException
            at 
java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:202)
            at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:477)
            at 
org.apache.hadoop.net.SocketOutputStream$Writer.performIO(SocketOutputStream.java:63)
            at 
org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:141)
            at 
org.apache.hadoop.net.SocketOutputStream.write(SocketOutputStream.java:159)
            at 
org.apache.hadoop.net.SocketOutputStream.write(SocketOutputStream.java:117)
            at 
java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
            at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
            at java.io.DataOutputStream.flush(DataOutputStream.java:123)
            at org.apache.hadoop.hdfs.DataStreamer.run(DataStreamer.java:781)
    I20260221 07:53:31.863165 1312138 jni-util.cc:321] 
fa4015e85de1494e:e1586d1500000000] 
org.apache.iceberg.exceptions.RuntimeIOException: Failed to write json to file: 
hdfs://localhost:20500/test-warehouse/lineitem_sixblocks_iceberg/metadata/00000-93a5f622-d41e-4ccb-87a4-eb26d9bd7a5c.metadata.json
            at 
org.apache.iceberg.TableMetadataParser.internalWrite(TableMetadataParser.java:133)
            at 
org.apache.iceberg.TableMetadataParser.overwrite(TableMetadataParser.java:115)
            at 
org.apache.iceberg.BaseMetastoreTableOperations.writeNewMetadata(BaseMetastoreTableOperations.java:170)
            at 
org.apache.iceberg.BaseMetastoreTableOperations.writeNewMetadataIfRequired(BaseMetastoreTableOperations.java:160)
            at 
org.apache.iceberg.hive.HiveTableOperations.doCommit(HiveTableOperations.java:173)
            at 
org.apache.iceberg.BaseMetastoreTableOperations.commit(BaseMetastoreTableOperations.java:135)
            at 
org.apache.iceberg.BaseMetastoreCatalog$BaseMetastoreCatalogTableBuilder.create(BaseMetastoreCatalog.java:201)
            at org.apache.iceberg.catalog.Catalog.createTable(Catalog.java:75)
            at 
org.apache.impala.catalog.iceberg.IcebergHiveCatalog.createTable(IcebergHiveCatalog.java:74)
            at 
org.apache.impala.util.MigrateTableUtil.migrateToIcebergTable(MigrateTableUtil.java:99)
            at 
org.apache.impala.service.Frontend.convertTable(Frontend.java:1004)
            at 
org.apache.impala.service.JniFrontend.convertTable(JniFrontend.java:243)
    
    I.e., something interrupted the thread when it tried to create a new
    JSON file for an Iceberg table. Looking at the HDFS logs, the file
    was created successfully:
    
    org.apache.hadoop.hdfs.StateChange: DIR* completeFile: ...metadata.json
    
    There is also no thread interruption logic in the code path of
    JniFrontend.convertTable() that could explain the interruption.
    
    So probably the best explanation to the above is:
    * JNI thread gets interrupted in a way its interrupt flag remains set
    * New JNI call on thread runs Iceberg code that checks interruption
    * Above exception being thrown
    
    Testing:
    * tested manually by adding Thread.currentThread().interrupt() calls
      at the end of JniCatalog/JniFronted methods.
    
    Generated-by: Gemini Pro
    Generated-by: Claude Sonnet 4.5
    
    Change-Id: Iaec6860433431064737e994999dd57a63f223a20
    Reviewed-on: http://gerrit.cloudera.org:8080/24029
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 be/src/util/jni-util.cc                            | 26 ++++++++++++++++++++++
 be/src/util/jni-util.h                             | 11 +++++++++
 .../java/org/apache/impala/common/JniUtil.java     | 14 ++++++++++++
 3 files changed, 51 insertions(+)

diff --git a/be/src/util/jni-util.cc b/be/src/util/jni-util.cc
index 70931bab3..9327b06ff 100644
--- a/be/src/util/jni-util.cc
+++ b/be/src/util/jni-util.cc
@@ -95,6 +95,7 @@ jclass JniUtil::internal_exc_cl_ = NULL;
 jmethodID JniUtil::get_jvm_metrics_id_ = NULL;
 jmethodID JniUtil::get_jvm_threads_id_ = NULL;
 jmethodID JniUtil::get_jmx_json_ = NULL;
+jmethodID JniUtil::clear_interrupt_status_id_ = NULL;
 jmethodID JniUtil::throwable_to_string_id_ = NULL;
 jmethodID JniUtil::throwable_to_stack_trace_id_ = NULL;
 
@@ -236,6 +237,19 @@ Status JniUtil::Init() {
     if (env->ExceptionOccurred()) env->ExceptionDescribe();
     return Status("Failed to find JniUtil.getJMXJson method.");
   }
+
+  clear_interrupt_status_id_ =
+      env->GetStaticMethodID(jni_util_cl_, "clearInterruptStatus", "()Z");
+  if (clear_interrupt_status_id_ == NULL) {
+    if (env->ExceptionOccurred()) {
+      env->ExceptionDescribe();
+      env->ExceptionClear();
+    }
+    // We don't return error Status beause when Hive loads Impala FE stuff,
+    // it won't find JniUtil.clearInterruptStatus(). We can return error
+    // once Hive stops depending on Impala FE.
+    LOG(WARNING) << "Failed to find JniUtil.clearInterruptStatus method.";
+  }
   jvm_inited_ = true;
   return Status::OK();
 }
@@ -357,4 +371,16 @@ Status JniUtil::LoadStaticJniMethod(JNIEnv* env, const 
jclass& jni_class,
   RETURN_ERROR_IF_EXC(env);
   return Status::OK();
 }
+
+void JniUtil::ClearThreadInterruptStatus(JNIEnv* env) {
+  DCHECK(env != nullptr);
+  if (clear_interrupt_status_id_ == nullptr) return;
+  // Calls Thread.interrupted() which both checks and clears the interrupt 
status.
+  jboolean was_interrupted = env->CallStaticBooleanMethod(jni_util_cl_,
+      clear_interrupt_status_id_);
+  if (was_interrupted) {
+    VLOG(1) << "Cleared interrupt flag on thread " << pthread_self();
+    DCHECK(false) << "JVM thread had interrupt flag set.";
+  }
+}
 }
diff --git a/be/src/util/jni-util.h b/be/src/util/jni-util.h
index 55e313a7e..1e2d27225 100644
--- a/be/src/util/jni-util.h
+++ b/be/src/util/jni-util.h
@@ -424,6 +424,10 @@ class JniUtil {
   static Status CallJniMethod(const jobject& obj, const jmethodID& method,
       const T& arg, jobject* response) WARN_UNUSED_RESULT;
 
+  /// Clears the Java thread's interrupt status to prevent interrupt flag 
poisoning
+  /// when threads are reused.
+  static void ClearThreadInterruptStatus(JNIEnv* env);
+
  private:
   // Slow-path for GetJNIEnv, used on the first call by any thread.
   static JNIEnv* GetJNIEnvSlowPath();
@@ -437,6 +441,7 @@ class JniUtil {
   static jmethodID get_jvm_metrics_id_;
   static jmethodID get_jvm_threads_id_;
   static jmethodID get_jmx_json_;
+  static jmethodID clear_interrupt_status_id_;
 
   // Thread-local cache of the JNIEnv for this thread.
   static __thread JNIEnv* tls_env_;
@@ -531,6 +536,8 @@ template<class T>
 inline Status JniCall::Call(T* result) {
   RETURN_IF_ERROR(status_);
   DCHECK((instance_ != nullptr) ^ (class_ != nullptr));
+  // Clear interrupt status of JNI thread.
+  JniUtil::ClearThreadInterruptStatus(env_);
 
   // Even if the function takes no arguments, it's OK to pass an array here.
   // The JNI API doesn't take a length and just assumes that you've passed
@@ -543,6 +550,10 @@ inline Status JniCall::Call(T* result) {
   }
   RETURN_ERROR_IF_EXC(env_);
   RETURN_IF_ERROR(ObjectToResult(ret, result));
+#ifndef NDEBUG
+    // Clear interrupt status again.
+    JniUtil::ClearThreadInterruptStatus(env_);
+#endif
   return Status::OK();
 }
 
diff --git a/fe/src/main/java/org/apache/impala/common/JniUtil.java 
b/fe/src/main/java/org/apache/impala/common/JniUtil.java
index b55d7ee5b..4bb425671 100644
--- a/fe/src/main/java/org/apache/impala/common/JniUtil.java
+++ b/fe/src/main/java/org/apache/impala/common/JniUtil.java
@@ -99,6 +99,20 @@ public class JniUtil {
     return output.toString();
   }
 
+  /**
+   * Clears the interrupted status of the current thread and returns whether
+   * the thread was interrupted. This is used to prevent interrupt flag 
poisoning
+   * when threads are reused across different JNI operations.
+   */
+  public static boolean clearInterruptStatus() {
+    if (Thread.interrupted()) {
+      LOG.warn("Thread {} was interrupted. Clearing interrupt status to 
prevent " +
+          "interrupt flag poisoning.", Thread.currentThread().getId());
+      return true;
+    }
+    return false;
+  }
+
   /**
    * Serializes input into a byte[] using the default protocol factory.
    */

Reply via email to