This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new f58b3204cab branch-2.1: [Fix](Iceberg-hadoop-catalog)Fix
Kerberos-authenticated HadoopCatalog insert failures due to missing kerberos
credentials #51245 (#51337)
f58b3204cab is described below
commit f58b3204cab8a7a35869c5047ead796d3d756960
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Fri Jun 20 14:12:25 2025 +0800
branch-2.1: [Fix](Iceberg-hadoop-catalog)Fix Kerberos-authenticated
HadoopCatalog insert failures due to missing kerberos credentials #51245
(#51337)
Cherry-picked from #51245
---------
Co-authored-by: Calvin Kirs <[email protected]>
---
.../authentication/PreExecutionAuthenticator.java | 48 ++++-------
.../datasource/iceberg/IcebergTransaction.java | 16 +++-
.../test_iceberg_hadoop_catalog_kerberos.groovy | 98 ++++++++++++++++++++++
3 files changed, 124 insertions(+), 38 deletions(-)
diff --git
a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/PreExecutionAuthenticator.java
b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/PreExecutionAuthenticator.java
index 0d7cf60c6f7..93967247330 100644
---
a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/PreExecutionAuthenticator.java
+++
b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/PreExecutionAuthenticator.java
@@ -19,7 +19,6 @@ package org.apache.doris.common.security.authentication;
import org.apache.hadoop.conf.Configuration;
-import java.security.PrivilegedExceptionAction;
import java.util.concurrent.Callable;
/**
@@ -69,14 +68,26 @@ public class PreExecutionAuthenticator {
public <T> T execute(Callable<T> task) throws Exception {
if (hadoopAuthenticator != null) {
// Adapts Callable to PrivilegedExceptionAction for use with
Hadoop authentication
- PrivilegedExceptionAction<T> action = new
CallableToPrivilegedExceptionActionAdapter<>(task);
- return hadoopAuthenticator.doAs(action);
+ return hadoopAuthenticator.doAs(task::call);
} else {
// Executes the task directly if no authentication is needed
return task.call();
}
}
+ public void execute(Runnable task) throws Exception {
+ if (hadoopAuthenticator != null) {
+ // Adapts Runnable to PrivilegedExceptionAction for use with
Hadoop authentication
+ hadoopAuthenticator.doAs(() -> {
+ task.run();
+ return null;
+ });
+ } else {
+ // Executes the task directly if no authentication is needed
+ task.run();
+ }
+ }
+
/**
* Retrieves the current HadoopAuthenticator.
* <p>This allows checking if a HadoopAuthenticator is configured or
@@ -97,35 +108,4 @@ public class PreExecutionAuthenticator {
public void setHadoopAuthenticator(HadoopAuthenticator
hadoopAuthenticator) {
this.hadoopAuthenticator = hadoopAuthenticator;
}
-
- /**
- * Adapter class to convert a Callable into a PrivilegedExceptionAction.
- * <p>This is necessary to run the task within a privileged context,
- * particularly for Hadoop operations with Kerberos.
- *
- * @param <T> The type of result returned by the action
- */
- public class CallableToPrivilegedExceptionActionAdapter<T> implements
PrivilegedExceptionAction<T> {
- private final Callable<T> callable;
-
- /**
- * Constructs an adapter that wraps a Callable into a
PrivilegedExceptionAction.
- *
- * @param callable The Callable to be adapted
- */
- public CallableToPrivilegedExceptionActionAdapter(Callable<T>
callable) {
- this.callable = callable;
- }
-
- /**
- * Executes the wrapped Callable as a PrivilegedExceptionAction.
- *
- * @return The result of the callable's call method
- * @throws Exception If an exception occurs during callable execution
- */
- @Override
- public T run() throws Exception {
- return callable.call();
- }
- }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java
index d0cca11b0af..e36db86022e 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java
@@ -73,10 +73,18 @@ public class IcebergTransaction implements Transaction {
}
}
- public void beginInsert(SimpleTableInfo tableInfo) {
- this.tableInfo = tableInfo;
- this.table = getNativeTable(tableInfo);
- this.transaction = table.newTransaction();
+ public void beginInsert(SimpleTableInfo tableInfo) throws UserException {
+ try {
+ ops.getPreExecutionAuthenticator().execute(() -> {
+ // create and start the iceberg transaction
+ this.tableInfo = tableInfo;
+ this.table = getNativeTable(tableInfo);
+ this.transaction = table.newTransaction();
+ });
+ } catch (Exception e) {
+ throw new UserException("Failed to begin insert for iceberg table
" + tableInfo, e);
+ }
+
}
public void finishInsert(SimpleTableInfo tableInfo,
Optional<InsertCommandContext> insertCtx) {
diff --git
a/regression-test/suites/external_table_p0/kerberos/test_iceberg_hadoop_catalog_kerberos.groovy
b/regression-test/suites/external_table_p0/kerberos/test_iceberg_hadoop_catalog_kerberos.groovy
new file mode 100644
index 00000000000..61b567e1d83
--- /dev/null
+++
b/regression-test/suites/external_table_p0/kerberos/test_iceberg_hadoop_catalog_kerberos.groovy
@@ -0,0 +1,98 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_iceberg_hadoop_catalog_kerberos",
"p0,external,kerberos,external_docker,external_docker_kerberos") {
+ String enabled = context.config.otherConfigs.get("enableKerberosTest")
+ if (enabled == null || !enabled.equalsIgnoreCase("true")) {
+ return
+ }
+ def String catalog_name = "iceberg_hadoop_catalog_kerberos_test"
+ def database_name = "test_iceberg_hadoop_db"
+ def String test_tbl_name="iceberg_test_table"
+ def keytab_root_dir = "/keytabs"
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+ sql """
+ drop catalog if exists ${catalog_name}
+ """
+ sql """
+ CREATE CATALOG IF NOT EXISTS ${catalog_name}
+ PROPERTIES (
+ 'type'='iceberg',
+ 'iceberg.catalog.type' = 'hadoop',
+ 'warehouse' = 'hdfs://${externalEnvIp}:8520/tmp/iceberg/catalog',
+ "hadoop.security.authentication" = "kerberos",
+ "hadoop.security.auth_to_local" =
"RULE:[2:\$1@\$0](.*@LABS.TERADATA.COM)s/@.*//
+
RULE:[2:\$1@\$0](.*@OTHERLABS.TERADATA.COM)s/@.*//
+
RULE:[2:\$1@\$0](.*@OTHERREALM.COM)s/@.*//
+ DEFAULT",
+ "hadoop.kerberos.principal" =
"hive/[email protected]",
+ "hadoop.kerberos.min.seconds.before.relogin" = "5",
+ "hadoop.kerberos.keytab.login.autorenewal.enabled" = "false",
+ "hadoop.kerberos.keytab" =
"${keytab_root_dir}/hive-presto-master.keytab",
+ "fs.defaultFS" = "hdfs://${externalEnvIp}:8520"
+ );
+ """
+
+ sql """ switch ${catalog_name} """
+ sql """ drop database if exists ${database_name} """
+ sql """ create database if not exists ${database_name} """
+ def database = sql """ show databases like '%${database_name}' """
+ assert database.size() == 1
+ sql """ use ${database_name} """
+ sql """ drop table if exists ${test_tbl_name}"""
+ sql """
+ CREATE TABLE ${test_tbl_name} (
+ `ts` DATETIME COMMENT 'ts',
+ `col1` BOOLEAN COMMENT 'col1',
+ `col2` INT COMMENT 'col2',
+ `col3` BIGINT COMMENT 'col3',
+ `col4` FLOAT COMMENT 'col4',
+ `col5` DOUBLE COMMENT 'col5',
+ `col6` DECIMAL(9,4) COMMENT 'col6',
+ `col7` STRING COMMENT 'col7',
+ `col8` DATE COMMENT 'col8',
+ `col9` DATETIME COMMENT 'col9',
+ `pt1` STRING COMMENT 'pt1',
+ `pt2` STRING COMMENT 'pt2'
+ ) ENGINE=iceberg
+ PARTITION BY LIST (DAY(ts), pt1, pt2) ()
+ PROPERTIES (
+ 'write-format'='orc',
+ 'compression-codec'='zlib'
+ );
+ """
+ def table = sql """ show tables like '%${test_tbl_name}' """
+ assert table.size() == 1
+ sql """
+ insert into ${test_tbl_name} values (
+ '2024-05-26 12:34:56',
+ true,
+ 123,
+ 1234567890123,
+ 12.34,
+ 56.789,
+ 12345.6789,
+ 'example text',
+ '2024-05-26',
+ '2024-05-26 14:00:00',
+ 'partition_val1',
+ 'partition_val2'
+ );
+ """
+ def dataResult = sql """select count(1) from ${test_tbl_name} """
+ assert dataResult.get(0).get(0) == 1
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]