This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 0a6a79e0f9d branch-3.0: [feat](catalog)Support Pre-Execution 
Authentication for HMS Type Iceberg Catalog Operations.  #43445 (#44127)
0a6a79e0f9d is described below

commit 0a6a79e0f9d77e8d6f089fd418720c985acca8b8
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Mon Nov 18 18:53:02 2024 +0800

    branch-3.0: [feat](catalog)Support Pre-Execution Authentication for HMS 
Type Iceberg Catalog Operations.  #43445 (#44127)
    
    Cherry-picked from #43445
    
    Co-authored-by: Calvin Kirs <[email protected]>
---
 .../authentication/PreExecutionAuthenticator.java  | 116 +++++++++++++++++++++
 .../datasource/iceberg/IcebergExternalCatalog.java |   4 +
 .../iceberg/IcebergHMSExternalCatalog.java         |   7 ++
 .../datasource/iceberg/IcebergMetadataOps.java     |  64 +++++++++++-
 .../datasource/iceberg/IcebergTransaction.java     |  23 ++--
 .../datasource/iceberg/IcebergTransactionTest.java |   1 +
 6 files changed, 204 insertions(+), 11 deletions(-)

diff --git 
a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/PreExecutionAuthenticator.java
 
b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/PreExecutionAuthenticator.java
new file mode 100644
index 00000000000..6260833b7db
--- /dev/null
+++ 
b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/PreExecutionAuthenticator.java
@@ -0,0 +1,116 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.common.security.authentication;
+
+import java.security.PrivilegedExceptionAction;
+import java.util.concurrent.Callable;
+
+/**
+ * PreExecutionAuthenticator is a utility class that ensures specified tasks
+ * are executed with necessary authentication, particularly useful for systems
+ * like Hadoop that require Kerberos-based pre-execution authentication.
+ *
+ * <p>If a HadoopAuthenticator is provided, this class will execute tasks
+ * within a privileged context using Hadoop's authentication mechanisms
+ * (such as Kerberos). Otherwise, it will execute tasks normally.
+ */
+public class PreExecutionAuthenticator {
+
+    private HadoopAuthenticator hadoopAuthenticator;
+
+    /**
+     * Default constructor for PreExecutionAuthenticator.
+     * This allows setting the HadoopAuthenticator at a later point if needed.
+     */
+    public PreExecutionAuthenticator() {
+    }
+
+    /**
+     * Executes the specified task with necessary authentication.
+     * <p>If a HadoopAuthenticator is set, the task will be executed within a
+     * privileged context using the doAs method. If no authenticator is 
present,
+     * the task will be executed directly.
+     *
+     * @param task The task to execute, represented as a Callable
+     * @param <T>  The type of the result returned by the task
+     * @return The result of the executed task
+     * @throws Exception If an exception occurs during task execution
+     */
+    public <T> T execute(Callable<T> task) throws Exception {
+        if (hadoopAuthenticator != null) {
+            // Adapts Callable to PrivilegedExceptionAction for use with 
Hadoop authentication
+            PrivilegedExceptionAction<T> action = new 
CallableToPrivilegedExceptionActionAdapter<>(task);
+            return hadoopAuthenticator.doAs(action);
+        } else {
+            // Executes the task directly if no authentication is needed
+            return task.call();
+        }
+    }
+
+    /**
+     * Retrieves the current HadoopAuthenticator.
+     * <p>This allows checking if a HadoopAuthenticator is configured or
+     * changing it at runtime.
+     *
+     * @return The current HadoopAuthenticator instance, or null if none is set
+     */
+    public HadoopAuthenticator getHadoopAuthenticator() {
+        return hadoopAuthenticator;
+    }
+
+    /**
+     * Sets the HadoopAuthenticator, enabling pre-execution authentication
+     * for tasks requiring privileged access.
+     *
+     * @param hadoopAuthenticator An instance of HadoopAuthenticator to be used
+     */
+    public void setHadoopAuthenticator(HadoopAuthenticator 
hadoopAuthenticator) {
+        this.hadoopAuthenticator = hadoopAuthenticator;
+    }
+
+    /**
+     * Adapter class to convert a Callable into a PrivilegedExceptionAction.
+     * <p>This is necessary to run the task within a privileged context,
+     * particularly for Hadoop operations with Kerberos.
+     *
+     * @param <T> The type of result returned by the action
+     */
+    public class CallableToPrivilegedExceptionActionAdapter<T> implements 
PrivilegedExceptionAction<T> {
+        private final Callable<T> callable;
+
+        /**
+         * Constructs an adapter that wraps a Callable into a 
PrivilegedExceptionAction.
+         *
+         * @param callable The Callable to be adapted
+         */
+        public CallableToPrivilegedExceptionActionAdapter(Callable<T> 
callable) {
+            this.callable = callable;
+        }
+
+        /**
+         * Executes the wrapped Callable as a PrivilegedExceptionAction.
+         *
+         * @return The result of the callable's call method
+         * @throws Exception If an exception occurs during callable execution
+         */
+        @Override
+        public T run() throws Exception {
+            return callable.call();
+        }
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
index 6f79afd5de5..d8dfd1c128f 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.datasource.iceberg;
 
+import 
org.apache.doris.common.security.authentication.PreExecutionAuthenticator;
 import org.apache.doris.datasource.ExternalCatalog;
 import org.apache.doris.datasource.InitCatalogLog;
 import org.apache.doris.datasource.SessionContext;
@@ -42,6 +43,8 @@ public abstract class IcebergExternalCatalog extends 
ExternalCatalog {
     protected String icebergCatalogType;
     protected Catalog catalog;
 
+    protected PreExecutionAuthenticator preExecutionAuthenticator;
+
     public IcebergExternalCatalog(long catalogId, String name, String comment) 
{
         super(catalogId, name, InitCatalogLog.Type.ICEBERG, comment);
     }
@@ -51,6 +54,7 @@ public abstract class IcebergExternalCatalog extends 
ExternalCatalog {
 
     @Override
     protected void initLocalObjectsImpl() {
+        preExecutionAuthenticator = new PreExecutionAuthenticator();
         initCatalog();
         IcebergMetadataOps ops = 
ExternalMetadataOperations.newIcebergMetadataOps(this, catalog);
         transactionManager = 
TransactionManagerFactory.createIcebergTransactionManager(ops);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergHMSExternalCatalog.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergHMSExternalCatalog.java
index 51d39357b81..c5a99c157ce 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergHMSExternalCatalog.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergHMSExternalCatalog.java
@@ -17,6 +17,8 @@
 
 package org.apache.doris.datasource.iceberg;
 
+import org.apache.doris.common.security.authentication.AuthenticationConfig;
+import org.apache.doris.common.security.authentication.HadoopAuthenticator;
 import org.apache.doris.datasource.CatalogProperty;
 import org.apache.doris.datasource.property.PropertyConverter;
 
@@ -35,6 +37,11 @@ public class IcebergHMSExternalCatalog extends 
IcebergExternalCatalog {
     protected void initCatalog() {
         icebergCatalogType = ICEBERG_HMS;
         catalog = IcebergUtils.createIcebergHiveCatalog(this, getName());
+        if (preExecutionAuthenticator.getHadoopAuthenticator() == null) {
+            AuthenticationConfig config = 
AuthenticationConfig.getKerberosConfig(getConfiguration());
+            HadoopAuthenticator authenticator = 
HadoopAuthenticator.getHadoopAuthenticator(config);
+            preExecutionAuthenticator.setHadoopAuthenticator(authenticator);
+        }
     }
 }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java
index 7fa7b5e84e8..87aaca90b95 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java
@@ -28,6 +28,7 @@ import org.apache.doris.common.DdlException;
 import org.apache.doris.common.ErrorCode;
 import org.apache.doris.common.ErrorReport;
 import org.apache.doris.common.UserException;
+import 
org.apache.doris.common.security.authentication.PreExecutionAuthenticator;
 import org.apache.doris.datasource.DorisTypeVisitor;
 import org.apache.doris.datasource.ExternalCatalog;
 import org.apache.doris.datasource.ExternalDatabase;
@@ -53,11 +54,14 @@ public class IcebergMetadataOps implements 
ExternalMetadataOps {
     protected Catalog catalog;
     protected IcebergExternalCatalog dorisCatalog;
     protected SupportsNamespaces nsCatalog;
+    private PreExecutionAuthenticator preExecutionAuthenticator;
 
     public IcebergMetadataOps(IcebergExternalCatalog dorisCatalog, Catalog 
catalog) {
         this.dorisCatalog = dorisCatalog;
         this.catalog = catalog;
         nsCatalog = (SupportsNamespaces) catalog;
+        this.preExecutionAuthenticator = 
dorisCatalog.preExecutionAuthenticator;
+
     }
 
     public Catalog getCatalog() {
@@ -82,9 +86,13 @@ public class IcebergMetadataOps implements 
ExternalMetadataOps {
     }
 
     public List<String> listDatabaseNames() {
-        return nsCatalog.listNamespaces().stream()
-                .map(e -> e.toString())
-                .collect(Collectors.toList());
+        try {
+            return preExecutionAuthenticator.execute(() -> 
nsCatalog.listNamespaces().stream()
+                   .map(Namespace::toString)
+                   .collect(Collectors.toList()));
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to list database names, error 
message is: " + e.getMessage());
+        }
     }
 
 
@@ -96,6 +104,19 @@ public class IcebergMetadataOps implements 
ExternalMetadataOps {
 
     @Override
     public void createDb(CreateDbStmt stmt) throws DdlException {
+        try {
+            preExecutionAuthenticator.execute(() -> {
+                performCreateDb(stmt);
+                return null;
+
+            });
+        } catch (Exception e) {
+            throw new DdlException("Failed to create database: "
+                    + stmt.getFullDbName() + " ,error message is: " + 
e.getMessage());
+        }
+    }
+
+    private void performCreateDb(CreateDbStmt stmt) throws DdlException {
         SupportsNamespaces nsCatalog = (SupportsNamespaces) catalog;
         String dbName = stmt.getFullDbName();
         Map<String, String> properties = stmt.getProperties();
@@ -110,7 +131,7 @@ public class IcebergMetadataOps implements 
ExternalMetadataOps {
         String icebergCatalogType = dorisCatalog.getIcebergCatalogType();
         if (!properties.isEmpty() && 
!IcebergExternalCatalog.ICEBERG_HMS.equals(icebergCatalogType)) {
             throw new DdlException(
-                "Not supported: create database with properties for iceberg 
catalog type: " + icebergCatalogType);
+                    "Not supported: create database with properties for 
iceberg catalog type: " + icebergCatalogType);
         }
         nsCatalog.createNamespace(Namespace.of(dbName), properties);
         dorisCatalog.onRefreshCache(true);
@@ -118,6 +139,17 @@ public class IcebergMetadataOps implements 
ExternalMetadataOps {
 
     @Override
     public void dropDb(DropDbStmt stmt) throws DdlException {
+        try {
+            preExecutionAuthenticator.execute(() -> {
+                preformDropDb(stmt);
+                return null;
+            });
+        } catch (Exception e) {
+            throw new DdlException("Failed to drop database: " + 
stmt.getDbName() + " ,error message is: ", e);
+        }
+    }
+
+    private void preformDropDb(DropDbStmt stmt) throws DdlException {
         String dbName = stmt.getDbName();
         if (!databaseExist(dbName)) {
             if (stmt.isSetIfExists()) {
@@ -134,6 +166,15 @@ public class IcebergMetadataOps implements 
ExternalMetadataOps {
 
     @Override
     public boolean createTable(CreateTableStmt stmt) throws UserException {
+        try {
+            preExecutionAuthenticator.execute(() -> performCreateTable(stmt));
+        } catch (Exception e) {
+            throw new DdlException("Failed to create table: " + 
stmt.getTableName() + " ,error message is:", e);
+        }
+        return false;
+    }
+
+    public boolean performCreateTable(CreateTableStmt stmt) throws 
UserException {
         String dbName = stmt.getDbName();
         ExternalDatabase<?> db = dorisCatalog.getDbNullable(dbName);
         if (db == null) {
@@ -166,6 +207,17 @@ public class IcebergMetadataOps implements 
ExternalMetadataOps {
 
     @Override
     public void dropTable(DropTableStmt stmt) throws DdlException {
+        try {
+            preExecutionAuthenticator.execute(() -> {
+                performDropTable(stmt);
+                return null;
+            });
+        } catch (Exception e) {
+            throw new DdlException("Failed to drop table: " + 
stmt.getTableName() + " ,error message is:", e);
+        }
+    }
+
+    private void performDropTable(DropTableStmt stmt) throws DdlException {
         String dbName = stmt.getDbName();
         String tableName = stmt.getTableName();
         ExternalDatabase<?> db = dorisCatalog.getDbNullable(dbName);
@@ -194,4 +246,8 @@ public class IcebergMetadataOps implements 
ExternalMetadataOps {
     public void truncateTable(String dbName, String tblName, List<String> 
partitions) {
         throw new UnsupportedOperationException("Truncate Iceberg table is not 
supported.");
     }
+
+    public PreExecutionAuthenticator getPreExecutionAuthenticator() {
+        return preExecutionAuthenticator;
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java
index c198b58b2a9..685915025d6 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java
@@ -82,14 +82,23 @@ public class IcebergTransaction implements Transaction {
         if (LOG.isDebugEnabled()) {
             LOG.info("iceberg table {} insert table finished!", tableInfo);
         }
-
-        //create and start the iceberg transaction
-        TUpdateMode updateMode = TUpdateMode.APPEND;
-        if (insertCtx.isPresent()) {
-            updateMode = ((BaseExternalTableInsertCommandContext) 
insertCtx.get()).isOverwrite() ? TUpdateMode.OVERWRITE
-                    : TUpdateMode.APPEND;
+        try {
+            ops.getPreExecutionAuthenticator().execute(() -> {
+                //create and start the iceberg transaction
+                TUpdateMode updateMode = TUpdateMode.APPEND;
+                if (insertCtx.isPresent()) {
+                    updateMode = ((BaseExternalTableInsertCommandContext) 
insertCtx.get()).isOverwrite()
+                            ? TUpdateMode.OVERWRITE
+                            : TUpdateMode.APPEND;
+                }
+                updateManifestAfterInsert(updateMode);
+                return null;
+            });
+        } catch (Exception e) {
+            LOG.warn("Failed to finish insert for iceberg table {}.", 
tableInfo, e);
+            throw new RuntimeException(e);
         }
-        updateManifestAfterInsert(updateMode);
+
     }
 
     private void updateManifestAfterInsert(TUpdateMode updateMode) {
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergTransactionTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergTransactionTest.java
index 66c3ea19710..79f7d5b5ad6 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergTransactionTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergTransactionTest.java
@@ -87,6 +87,7 @@ public class IcebergTransactionTest {
         hadoopCatalog.setConf(new Configuration());
         hadoopCatalog.initialize("df", props);
         this.externalCatalog = new IcebergHMSExternalCatalog(1L, "iceberg", 
"", Maps.newHashMap(), "");
+        externalCatalog.initLocalObjectsImpl();
         new MockUp<IcebergHMSExternalCatalog>() {
             @Mock
             public Catalog getCatalog() {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to