This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch multi_kerberos
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 7b297015bad32ff2fd0121a917b1b45bae5c1e60
Author: slothever <[email protected]>
AuthorDate: Tue Aug 6 01:56:23 2024 +0800

    [Improvement](kerberos)refactor ugi login for iceberg hive catalog
---
 ...HadoopAuthenticator.java => Authenticator.java} |  25 +-
 .../authentication/HadoopAuthenticator.java        |   2 +-
 .../apache/doris/datasource/ExternalCatalog.java   |  19 ++
 .../doris/datasource/hive/HMSExternalCatalog.java  |  11 +-
 .../datasource/hive/HiveMetaStoreClientHelper.java |   7 -
 .../datasource/iceberg/HiveCompatibleCatalog.java  | 181 ------------
 .../datasource/iceberg/IcebergExternalCatalog.java |  24 ++
 .../iceberg/IcebergHMSExternalCatalog.java         |   4 +-
 .../iceberg/IcebergHadoopExternalCatalog.java      |  33 ++-
 .../datasource/iceberg/IcebergMetadataCache.java   |  27 +-
 .../datasource/iceberg/IcebergMetadataOps.java     |   2 +-
 .../doris/datasource/iceberg/IcebergUtils.java     |  31 ++-
 .../doris/datasource/iceberg/dlf/DLFCatalog.java   | 118 +++++++-
 .../iceberg/hadoop/IcebergHadoopCatalog.java       | 303 +++++++++++++++++++++
 .../iceberg/hadoop/IcebergHadoopFileIO.java        |  71 +++++
 .../iceberg/hadoop/IcebergHadoopInputFile.java     | 107 ++++++++
 .../iceberg/hadoop/IcebergHadoopOutputFile.java    |  73 +++++
 .../hadoop/IcebergHadoopTableOperations.java       |  48 ++++
 .../hadoop/IcebergPositionOutputStream.java        |  62 +++++
 .../iceberg/hadoop/IcebergSeekableInputStream.java |  62 +++++
 .../datasource/iceberg/hive/HCachedClientPool.java |  83 ++++++
 .../iceberg/hive/HiveCompatibleCatalog.java        | 101 +++++++
 .../iceberg/hive/IcebergHiveCatalog.java           | 130 +++++++++
 .../IcebergHiveTableOperations.java}               |  16 +-
 .../datasource/iceberg/source/IcebergScanNode.java |   7 +-
 .../apache/doris/fs/remote/dfs/DFSFileSystem.java  |  49 +++-
 .../kerberos/test_two_iceberg_kerberos.out         |   9 +
 .../kerberos/test_two_iceberg_kerberos.groovy      | 124 +++++++++
 28 files changed, 1460 insertions(+), 269 deletions(-)

diff --git 
a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopAuthenticator.java
 
b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/Authenticator.java
similarity index 56%
copy from 
fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopAuthenticator.java
copy to 
fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/Authenticator.java
index c3cab5f410b..2e6c1602f0f 100644
--- 
a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopAuthenticator.java
+++ 
b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/Authenticator.java
@@ -17,28 +17,17 @@
 
 package org.apache.doris.common.security.authentication;
 
-import org.apache.hadoop.security.UserGroupInformation;
-
 import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
 
-public interface HadoopAuthenticator {
-
-    UserGroupInformation getUGI() throws IOException;
+public interface Authenticator {
 
-    default <T> T doAs(PrivilegedExceptionAction<T> action) throws IOException 
{
-        try {
-            return getUGI().doAs(action);
-        } catch (InterruptedException e) {
-            throw new IOException(e);
-        }
-    }
+    <T> T doAs(PrivilegedExceptionAction<T> action) throws IOException;
 
-    static HadoopAuthenticator getHadoopAuthenticator(AuthenticationConfig 
config) {
-        if (config instanceof KerberosAuthenticationConfig) {
-            return new 
HadoopKerberosAuthenticator((KerberosAuthenticationConfig) config);
-        } else {
-            return new HadoopSimpleAuthenticator((SimpleAuthenticationConfig) 
config);
-        }
+    default <T> void doAsNoReturn(Runnable action) throws IOException {
+        doAs(() -> {
+            action.run();
+            return null;
+        });
     }
 }
diff --git 
a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopAuthenticator.java
 
b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopAuthenticator.java
index c3cab5f410b..dce3eb8e7b5 100644
--- 
a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopAuthenticator.java
+++ 
b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopAuthenticator.java
@@ -22,7 +22,7 @@ import org.apache.hadoop.security.UserGroupInformation;
 import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
 
-public interface HadoopAuthenticator {
+public interface HadoopAuthenticator extends Authenticator {
 
     UserGroupInformation getUGI() throws IOException;
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
index 0d854a9ecdc..44ecdca733c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
@@ -37,6 +37,7 @@ import org.apache.doris.common.UserException;
 import org.apache.doris.common.Version;
 import org.apache.doris.common.io.Text;
 import org.apache.doris.common.io.Writable;
+import org.apache.doris.common.security.authentication.Authenticator;
 import org.apache.doris.common.util.Util;
 import org.apache.doris.datasource.es.EsExternalDatabase;
 import org.apache.doris.datasource.hive.HMSExternalCatalog;
@@ -78,6 +79,7 @@ import org.jetbrains.annotations.Nullable;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashSet;
@@ -163,6 +165,23 @@ public abstract class ExternalCatalog
         return conf;
     }
 
+    /**
+     * get authenticator for catalog
+     * return a dummy authenticator by default
+     */
+    public synchronized Authenticator getAuthenticator() {
+        return new Authenticator() {
+            @Override
+            public <T> T doAs(PrivilegedExceptionAction<T> action) throws 
IOException {
+                try {
+                    return action.run();
+                } catch (Exception e) {
+                    throw new IOException(e);
+                }
+            }
+        };
+    }
+
     /**
      * set some default properties when creating catalog
      * @return list of database names in this catalog
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java
index 5faf1f2bb6e..1faeeca92bd 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java
@@ -43,7 +43,6 @@ import org.apache.doris.transaction.TransactionManagerFactory;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Strings;
-import lombok.Getter;
 import org.apache.commons.lang3.math.NumberUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.logging.log4j.LogManager;
@@ -71,7 +70,6 @@ public class HMSExternalCatalog extends ExternalCatalog {
 
     private static final int FILE_SYSTEM_EXECUTOR_THREAD_NUM = 16;
     private ThreadPoolExecutor fileSystemExecutor;
-    @Getter
     private HadoopAuthenticator authenticator;
 
     private int hmsEventsBatchSizePerRpc = -1;
@@ -281,8 +279,13 @@ public class HMSExternalCatalog extends ExternalCatalog {
         return catalogProperty.getOrDefault(HMSProperties.HIVE_METASTORE_URIS, 
"");
     }
 
-    public String getHiveVersion() {
-        return catalogProperty.getOrDefault(HMSProperties.HIVE_VERSION, "");
+    @Override
+    public synchronized HadoopAuthenticator getAuthenticator() {
+        if (authenticator == null) {
+            AuthenticationConfig config = 
AuthenticationConfig.getKerberosConfig(getConfiguration());
+            authenticator = HadoopAuthenticator.getHadoopAuthenticator(config);
+        }
+        return authenticator;
     }
 
     public int getHmsEventsBatchSizePerRpc() {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreClientHelper.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreClientHelper.java
index db6019eda97..352e00a7620 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreClientHelper.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreClientHelper.java
@@ -31,7 +31,6 @@ import org.apache.doris.analysis.NullLiteral;
 import org.apache.doris.analysis.SlotRef;
 import org.apache.doris.analysis.StringLiteral;
 import org.apache.doris.catalog.ArrayType;
-import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.MapType;
 import org.apache.doris.catalog.PrimitiveType;
 import org.apache.doris.catalog.ScalarType;
@@ -41,7 +40,6 @@ import org.apache.doris.catalog.Type;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.security.authentication.AuthenticationConfig;
 import org.apache.doris.common.security.authentication.HadoopUGI;
-import org.apache.doris.datasource.ExternalCatalog;
 import org.apache.doris.fs.remote.dfs.DFSFileSystem;
 import org.apache.doris.thrift.TExprOpcode;
 
@@ -813,11 +811,6 @@ public class HiveMetaStoreClientHelper {
         return hudiSchema;
     }
 
-    public static <T> T ugiDoAs(long catalogId, PrivilegedExceptionAction<T> 
action) {
-        return ugiDoAs(((ExternalCatalog) 
Env.getCurrentEnv().getCatalogMgr().getCatalog(catalogId)).getConfiguration(),
-                action);
-    }
-
     public static <T> T ugiDoAs(Configuration conf, 
PrivilegedExceptionAction<T> action) {
         // if hive config is not ready, then use hadoop kerberos to login
         AuthenticationConfig krbConfig = 
AuthenticationConfig.getKerberosConfig(conf,
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/HiveCompatibleCatalog.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/HiveCompatibleCatalog.java
deleted file mode 100644
index 6431b02308b..00000000000
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/HiveCompatibleCatalog.java
+++ /dev/null
@@ -1,181 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.datasource.iceberg;
-
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.metastore.IMetaStoreClient;
-import org.apache.iceberg.BaseMetastoreCatalog;
-import org.apache.iceberg.CatalogProperties;
-import org.apache.iceberg.CatalogUtil;
-import org.apache.iceberg.ClientPool;
-import org.apache.iceberg.catalog.Namespace;
-import org.apache.iceberg.catalog.SupportsNamespaces;
-import org.apache.iceberg.catalog.TableIdentifier;
-import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
-import org.apache.iceberg.exceptions.NoSuchNamespaceException;
-import org.apache.iceberg.exceptions.NoSuchTableException;
-import org.apache.iceberg.hadoop.HadoopFileIO;
-import org.apache.iceberg.io.FileIO;
-import shade.doris.hive.org.apache.thrift.TException;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
-
-public abstract class HiveCompatibleCatalog extends BaseMetastoreCatalog 
implements SupportsNamespaces, Configurable {
-
-    protected Configuration conf;
-    protected ClientPool<IMetaStoreClient, TException> clients;
-    protected FileIO fileIO;
-    protected String uid;
-
-    public void initialize(String name, FileIO fileIO,
-                           ClientPool<IMetaStoreClient, TException> clients) {
-        this.uid = name;
-        this.fileIO = fileIO;
-        this.clients = clients;
-    }
-
-    protected FileIO initializeFileIO(Map<String, String> properties, 
Configuration hadoopConf) {
-        String fileIOImpl = properties.get(CatalogProperties.FILE_IO_IMPL);
-        if (fileIOImpl == null) {
-            /* when use the S3FileIO, we need some custom configurations,
-             * so HadoopFileIO is used in the superclass by default
-             * we can add better implementations to derived class just like 
the implementation in DLFCatalog.
-             */
-            FileIO io = new HadoopFileIO(hadoopConf);
-            io.initialize(properties);
-            return io;
-        } else {
-            return CatalogUtil.loadFileIO(fileIOImpl, properties, hadoopConf);
-        }
-    }
-
-    @Override
-    protected String defaultWarehouseLocation(TableIdentifier tableIdentifier) 
{
-        return null;
-    }
-
-    @Override
-    protected boolean isValidIdentifier(TableIdentifier tableIdentifier) {
-        return tableIdentifier.namespace().levels().length == 1;
-    }
-
-    protected boolean isValidNamespace(Namespace namespace) {
-        return namespace.levels().length != 1;
-    }
-
-    @Override
-    public List<TableIdentifier> listTables(Namespace namespace) {
-        if (isValidNamespace(namespace)) {
-            throw new NoSuchTableException("Invalid namespace: %s", namespace);
-        }
-        String dbName = namespace.level(0);
-        try {
-            return clients.run(client -> client.getAllTables(dbName))
-                .stream()
-                .map(tbl -> TableIdentifier.of(dbName, tbl))
-                .collect(Collectors.toList());
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    @Override
-    public boolean dropTable(TableIdentifier tableIdentifier, boolean purge) {
-        throw new UnsupportedOperationException(
-            "Cannot drop table " + tableIdentifier + " : dropTable is not 
supported");
-    }
-
-    @Override
-    public void renameTable(TableIdentifier sourceTbl, TableIdentifier 
targetTbl) {
-        throw new UnsupportedOperationException(
-            "Cannot rename table " + sourceTbl + " : renameTable is not 
supported");
-    }
-
-    @Override
-    public void createNamespace(Namespace namespace, Map<String, String> 
props) {
-        throw new UnsupportedOperationException(
-            "Cannot create namespace " + namespace + " : createNamespace is 
not supported");
-    }
-
-    @Override
-    public List<Namespace> listNamespaces(Namespace namespace) throws 
NoSuchNamespaceException {
-        if (isValidNamespace(namespace) && !namespace.isEmpty()) {
-            throw new NoSuchNamespaceException("Namespace does not exist: %s", 
namespace);
-        }
-        if (!namespace.isEmpty()) {
-            return new ArrayList<>();
-        }
-        List<Namespace> namespaces = new ArrayList<>();
-        List<String> databases;
-        try {
-            databases = clients.run(client -> client.getAllDatabases());
-            for (String database : databases) {
-                namespaces.add(Namespace.of(database));
-            }
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-        return namespaces;
-    }
-
-    @Override
-    public Map<String, String> loadNamespaceMetadata(Namespace namespace) 
throws NoSuchNamespaceException {
-        if (isValidNamespace(namespace)) {
-            throw new NoSuchTableException("Invalid namespace: %s", namespace);
-        }
-        String dbName = namespace.level(0);
-        try {
-            return clients.run(client -> 
client.getDatabase(dbName)).getParameters();
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    @Override
-    public boolean dropNamespace(Namespace namespace) throws 
NamespaceNotEmptyException {
-        throw new UnsupportedOperationException(
-            "Cannot drop namespace " + namespace + " : dropNamespace is not 
supported");
-    }
-
-    @Override
-    public boolean setProperties(Namespace namespace, Map<String, String> 
props) throws NoSuchNamespaceException {
-        throw new UnsupportedOperationException(
-            "Cannot set namespace properties " + namespace + " : setProperties 
is not supported");
-    }
-
-    @Override
-    public boolean removeProperties(Namespace namespace, Set<String> pNames) 
throws NoSuchNamespaceException {
-        throw new UnsupportedOperationException(
-            "Cannot remove properties " + namespace + " : removeProperties is 
not supported");
-    }
-
-    @Override
-    public void setConf(Configuration conf) {
-        this.conf = conf;
-    }
-
-    @Override
-    public Configuration getConf() {
-        return conf;
-    }
-}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
index 6f79afd5de5..a9029499ad8 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
@@ -26,8 +26,11 @@ import 
org.apache.doris.transaction.TransactionManagerFactory;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.s3a.Constants;
+import org.apache.iceberg.Table;
 import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.TableIdentifier;
 
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -83,4 +86,25 @@ public abstract class IcebergExternalCatalog extends 
ExternalCatalog {
         Map<String, String> properties = catalogProperty.getHadoopProperties();
         conf.set(Constants.AWS_CREDENTIALS_PROVIDER, 
PropertyConverter.getAWSCredentialsProviders(properties));
     }
+
+    public Table loadTable(TableIdentifier of) {
+        Table tbl = getCatalog().loadTable(of);
+        Map<String, String> extProps = getProperties();
+        initIcebergTableFileIO(tbl, extProps);
+        return tbl;
+    }
+
+    public static void initIcebergTableFileIO(Table table, Map<String, String> 
props) {
+        Map<String, String> ioConf = new HashMap<>();
+        table.properties().forEach((key, value) -> {
+            if (key.startsWith("io.")) {
+                ioConf.put(key, value);
+            }
+        });
+
+        // This `initialize` method will directly override the properties as a 
whole,
+        // so we need to merge the table's io-related properties with the 
doris's catalog-related properties
+        props.putAll(ioConf);
+        table.io().initialize(props);
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergHMSExternalCatalog.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergHMSExternalCatalog.java
index c1475064934..05c08b0c771 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergHMSExternalCatalog.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergHMSExternalCatalog.java
@@ -18,11 +18,11 @@
 package org.apache.doris.datasource.iceberg;
 
 import org.apache.doris.datasource.CatalogProperty;
+import org.apache.doris.datasource.iceberg.hive.IcebergHiveCatalog;
 import org.apache.doris.datasource.property.PropertyConverter;
 import org.apache.doris.datasource.property.constants.HMSProperties;
 
 import org.apache.iceberg.CatalogProperties;
-import org.apache.iceberg.hive.HiveCatalog;
 
 import java.util.Map;
 
@@ -38,7 +38,7 @@ public class IcebergHMSExternalCatalog extends 
IcebergExternalCatalog {
     @Override
     protected void initCatalog() {
         icebergCatalogType = ICEBERG_HMS;
-        HiveCatalog hiveCatalog = new org.apache.iceberg.hive.HiveCatalog();
+        IcebergHiveCatalog hiveCatalog = new IcebergHiveCatalog();
         hiveCatalog.setConf(getConfiguration());
         // initialize hive catalog
         Map<String, String> catalogProperties = 
catalogProperty.getProperties();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergHadoopExternalCatalog.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergHadoopExternalCatalog.java
index bf9e8c2b3f0..3820b609569 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergHadoopExternalCatalog.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergHadoopExternalCatalog.java
@@ -18,19 +18,26 @@
 package org.apache.doris.datasource.iceberg;
 
 import org.apache.doris.catalog.HdfsResource;
+import org.apache.doris.common.security.authentication.AuthenticationConfig;
+import org.apache.doris.common.security.authentication.HadoopAuthenticator;
 import org.apache.doris.datasource.CatalogProperty;
+import org.apache.doris.datasource.iceberg.hadoop.IcebergHadoopCatalog;
 import org.apache.doris.datasource.property.PropertyConverter;
 
 import com.google.common.base.Preconditions;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.iceberg.CatalogProperties;
-import org.apache.iceberg.hadoop.HadoopCatalog;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.TableIdentifier;
 
+import java.io.IOException;
 import java.util.Map;
 
 public class IcebergHadoopExternalCatalog extends IcebergExternalCatalog {
 
+    private HadoopAuthenticator authenticator;
+
     public IcebergHadoopExternalCatalog(long catalogId, String name, String 
resource, Map<String, String> props,
                                         String comment) {
         super(catalogId, name, comment);
@@ -49,10 +56,19 @@ public class IcebergHadoopExternalCatalog extends 
IcebergExternalCatalog {
         }
     }
 
+    @Override
+    public synchronized HadoopAuthenticator getAuthenticator() {
+        if (authenticator == null) {
+            AuthenticationConfig config = 
AuthenticationConfig.getKerberosConfig(getConfiguration());
+            authenticator = HadoopAuthenticator.getHadoopAuthenticator(config);
+        }
+        return authenticator;
+    }
+
     @Override
     protected void initCatalog() {
         icebergCatalogType = ICEBERG_HADOOP;
-        HadoopCatalog hadoopCatalog = new HadoopCatalog();
+        IcebergHadoopCatalog hadoopCatalog = new IcebergHadoopCatalog();
         Configuration conf = getConfiguration();
         initS3Param(conf);
         // initialize hadoop catalog
@@ -63,4 +79,17 @@ public class IcebergHadoopExternalCatalog extends 
IcebergExternalCatalog {
         hadoopCatalog.initialize(getName(), catalogProperties);
         catalog = hadoopCatalog;
     }
+
+    @Override
+    public Table loadTable(TableIdentifier of) {
+        Table tbl;
+        try {
+            tbl = getAuthenticator().doAs(() -> getCatalog().loadTable(of));
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+        Map<String, String> extProps = getProperties();
+        initIcebergTableFileIO(tbl, extProps);
+        return tbl;
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java
index 13bd9650978..214b9cdeeee 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java
@@ -23,7 +23,6 @@ import org.apache.doris.common.Config;
 import org.apache.doris.common.UserException;
 import org.apache.doris.datasource.CatalogIf;
 import org.apache.doris.datasource.hive.HMSExternalCatalog;
-import org.apache.doris.datasource.hive.HiveMetaStoreClientHelper;
 import org.apache.doris.datasource.property.constants.HMSProperties;
 import org.apache.doris.fs.remote.dfs.DFSFileSystem;
 import org.apache.doris.thrift.TIcebergMetadataParams;
@@ -41,6 +40,7 @@ import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.hive.HiveCatalog;
 import org.jetbrains.annotations.NotNull;
 
+import java.io.IOException;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -111,21 +111,30 @@ public class IcebergMetadataCache {
     @NotNull
     private Table loadTable(IcebergMetadataCacheKey key) {
         Catalog icebergCatalog;
+        Table icebergTable;
         if (key.catalog instanceof HMSExternalCatalog) {
             HMSExternalCatalog ctg = (HMSExternalCatalog) key.catalog;
             icebergCatalog = createIcebergHiveCatalog(
-                    ctg.getHiveMetastoreUris(),
-                    ctg.getCatalogProperty().getHadoopProperties(),
-                    ctg.getProperties());
+                ctg.getHiveMetastoreUris(),
+                ctg.getCatalogProperty().getHadoopProperties(),
+                ctg.getProperties());
+            try {
+                icebergTable = ctg.getAuthenticator().doAs(() -> {
+                    Table tbl = 
icebergCatalog.loadTable(TableIdentifier.of(key.dbName, key.tableName));
+                    IcebergExternalCatalog.initIcebergTableFileIO(tbl, 
key.catalog.getProperties());
+                    return tbl;
+                });
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+            return icebergTable;
         } else if (key.catalog instanceof IcebergExternalCatalog) {
-            icebergCatalog = ((IcebergExternalCatalog) 
key.catalog).getCatalog();
+            IcebergExternalCatalog ctg = ((IcebergExternalCatalog) 
key.catalog);
+            icebergTable = ctg.loadTable(TableIdentifier.of(key.dbName, 
key.tableName));
+            return icebergTable;
         } else {
             throw new RuntimeException("Only support 'hms' and 'iceberg' type 
for iceberg table");
         }
-        Table icebergTable = 
HiveMetaStoreClientHelper.ugiDoAs(key.catalog.getId(),
-                () -> icebergCatalog.loadTable(TableIdentifier.of(key.dbName, 
key.tableName)));
-        initIcebergTableFileIO(icebergTable, key.catalog.getProperties());
-        return icebergTable;
     }
 
     public void invalidateCatalogCache(long catalogId) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java
index 66c57e37307..9187178c7dc 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java
@@ -161,7 +161,7 @@ public class IcebergMetadataOps implements 
ExternalMetadataOps {
         PartitionSpec partitionSpec = 
IcebergUtils.solveIcebergPartitionSpec(stmt.getPartitionDesc(), schema);
         catalog.createTable(TableIdentifier.of(dbName, tableName), schema, 
partitionSpec, properties);
         db.setUnInitialized(true);
-        return false;
+        return true;
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java
index f7280f5721f..3372d9033ea 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java
@@ -46,7 +46,6 @@ import org.apache.doris.common.UserException;
 import org.apache.doris.common.info.SimpleTableInfo;
 import org.apache.doris.common.util.TimeUtils;
 import org.apache.doris.datasource.ExternalCatalog;
-import org.apache.doris.datasource.hive.HiveMetaStoreClientHelper;
 import org.apache.doris.nereids.exceptions.NotSupportedException;
 import org.apache.doris.thrift.TExprOpcode;
 
@@ -69,6 +68,7 @@ import org.apache.iceberg.util.LocationUtil;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Locale;
@@ -570,21 +570,24 @@ public class IcebergUtils {
      * Get iceberg schema from catalog and convert them to doris schema
      */
     public static List<Column> getSchema(ExternalCatalog catalog, String 
dbName, String name) {
-        return HiveMetaStoreClientHelper.ugiDoAs(catalog.getConfiguration(), 
() -> {
-            org.apache.iceberg.Table icebergTable = getIcebergTable(catalog, 
dbName, name);
-            Schema schema = icebergTable.schema();
-            List<Types.NestedField> columns = schema.columns();
-            List<Column> tmpSchema = 
Lists.newArrayListWithCapacity(columns.size());
-            for (Types.NestedField field : columns) {
-                tmpSchema.add(new Column(field.name().toLowerCase(Locale.ROOT),
-                        IcebergUtils.icebergTypeToDorisType(field.type()), 
true, null, true, field.doc(), true,
-                        
schema.caseInsensitiveFindField(field.name()).fieldId()));
-            }
-            return tmpSchema;
-        });
+        try {
+            return catalog.getAuthenticator().doAs(() -> {
+                Table icebergTable = getIcebergTable(catalog, dbName, name);
+                Schema schema = icebergTable.schema();
+                List<Types.NestedField> columns = schema.columns();
+                List<Column> tmpSchema = 
Lists.newArrayListWithCapacity(columns.size());
+                for (Types.NestedField field : columns) {
+                    tmpSchema.add(new 
Column(field.name().toLowerCase(Locale.ROOT),
+                            IcebergUtils.icebergTypeToDorisType(field.type()), 
true, null, true, field.doc(), true,
+                            
schema.caseInsensitiveFindField(field.name()).fieldId()));
+                }
+                return tmpSchema;
+            });
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
     }
 
-
     /**
      * Estimate iceberg table row count.
      * Get the row count by adding all task file recordCount.
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/dlf/DLFCatalog.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/dlf/DLFCatalog.java
index e9c406715c1..ddbf7ebeac5 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/dlf/DLFCatalog.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/dlf/DLFCatalog.java
@@ -19,37 +19,36 @@ package org.apache.doris.datasource.iceberg.dlf;
 
 import org.apache.doris.common.credentials.CloudCredential;
 import org.apache.doris.common.util.S3Util;
-import org.apache.doris.datasource.iceberg.HiveCompatibleCatalog;
 import org.apache.doris.datasource.iceberg.dlf.client.DLFCachedClientPool;
+import org.apache.doris.datasource.iceberg.hive.HiveCompatibleCatalog;
 import org.apache.doris.datasource.property.PropertyConverter;
 import org.apache.doris.datasource.property.constants.OssProperties;
 import org.apache.doris.datasource.property.constants.S3Properties;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.aliyun.oss.Constants;
-import org.apache.iceberg.TableOperations;
 import org.apache.iceberg.aws.s3.S3FileIO;
+import org.apache.iceberg.catalog.Namespace;
 import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
+import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
 import org.apache.iceberg.io.FileIO;
 
 import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
 
 public class DLFCatalog extends HiveCompatibleCatalog {
 
     @Override
     public void initialize(String name, Map<String, String> properties) {
-        super.initialize(name, initializeFileIO(properties, conf), new 
DLFCachedClientPool(this.conf, properties));
+        super.initialize(name, initializeFileIO(properties), new 
DLFCachedClientPool(this.conf, properties));
     }
 
-    @Override
-    protected TableOperations newTableOps(TableIdentifier tableIdentifier) {
-        String dbName = tableIdentifier.namespace().level(0);
-        String tableName = tableIdentifier.name();
-        return new DLFTableOperations(this.conf, this.clients, this.fileIO, 
this.uid, dbName, tableName);
-    }
-
-    protected FileIO initializeFileIO(Map<String, String> properties, 
Configuration hadoopConf) {
+    protected FileIO initializeFileIO(Map<String, String> properties) {
         // read from converted properties or default by old s3 aws properties
         String endpoint = properties.getOrDefault(Constants.ENDPOINT_KEY, 
properties.get(S3Properties.Env.ENDPOINT));
         CloudCredential credential = new CloudCredential();
@@ -72,4 +71,99 @@ public class DLFCatalog extends HiveCompatibleCatalog {
         io.initialize(properties);
         return io;
     }
+
+    @Override
+    protected String defaultWarehouseLocation(TableIdentifier tableIdentifier) 
{
+        return null;
+    }
+
+    protected boolean isValidNamespace(Namespace namespace) {
+        return namespace.levels().length != 1;
+    }
+
+    @Override
+    public List<TableIdentifier> listTables(Namespace namespace) {
+        if (isValidNamespace(namespace)) {
+            throw new NoSuchTableException("Invalid namespace: %s", namespace);
+        }
+        String dbName = namespace.level(0);
+        try {
+            return clients.run(client -> client.getAllTables(dbName))
+                .stream()
+                .map(tbl -> TableIdentifier.of(dbName, tbl))
+                .collect(Collectors.toList());
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public boolean dropTable(TableIdentifier tableIdentifier, boolean purge) {
+        throw new UnsupportedOperationException(
+            "Cannot drop table " + tableIdentifier + " : dropTable is not 
supported");
+    }
+
+    @Override
+    public void renameTable(TableIdentifier sourceTbl, TableIdentifier 
targetTbl) {
+        throw new UnsupportedOperationException(
+            "Cannot rename table " + sourceTbl + " : renameTable is not 
supported");
+    }
+
+    @Override
+    public void createNamespace(Namespace namespace, Map<String, String> 
props) {
+        throw new UnsupportedOperationException(
+            "Cannot create namespace " + namespace + " : createNamespace is 
not supported");
+    }
+
+    @Override
+    public List<Namespace> listNamespaces(Namespace namespace) throws 
NoSuchNamespaceException {
+        if (isValidNamespace(namespace) && !namespace.isEmpty()) {
+            throw new NoSuchNamespaceException("Namespace does not exist: %s", 
namespace);
+        }
+        if (!namespace.isEmpty()) {
+            return new ArrayList<>();
+        }
+        List<Namespace> namespaces = new ArrayList<>();
+        List<String> databases;
+        try {
+            databases = clients.run(client -> client.getAllDatabases());
+            for (String database : databases) {
+                namespaces.add(Namespace.of(database));
+            }
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+        return namespaces;
+    }
+
+    @Override
+    public Map<String, String> loadNamespaceMetadata(Namespace namespace) 
throws NoSuchNamespaceException {
+        if (isValidNamespace(namespace)) {
+            throw new NoSuchTableException("Invalid namespace: %s", namespace);
+        }
+        String dbName = namespace.level(0);
+        try {
+            return clients.run(client -> 
client.getDatabase(dbName)).getParameters();
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public boolean dropNamespace(Namespace namespace) throws 
NamespaceNotEmptyException {
+        throw new UnsupportedOperationException(
+            "Cannot drop namespace " + namespace + " : dropNamespace is not 
supported");
+    }
+
+    @Override
+    public boolean setProperties(Namespace namespace, Map<String, String> 
props) throws NoSuchNamespaceException {
+        throw new UnsupportedOperationException(
+            "Cannot set namespace properties " + namespace + " : setProperties 
is not supported");
+    }
+
+    @Override
+    public boolean removeProperties(Namespace namespace, Set<String> pNames) 
throws NoSuchNamespaceException {
+        throw new UnsupportedOperationException(
+            "Cannot remove properties " + namespace + " : removeProperties is 
not supported");
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hadoop/IcebergHadoopCatalog.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hadoop/IcebergHadoopCatalog.java
new file mode 100644
index 00000000000..13ec717d7d8
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hadoop/IcebergHadoopCatalog.java
@@ -0,0 +1,303 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.iceberg.hadoop;
+
+import org.apache.doris.fs.remote.dfs.DFSFileSystem;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.LockManager;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
+import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.hadoop.HadoopCatalog;
+import org.apache.iceberg.io.CloseableGroup;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.util.LocationUtil;
+import org.apache.iceberg.util.LockManagers;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class IcebergHadoopCatalog extends HadoopCatalog {
+    private static final Logger LOG = 
LoggerFactory.getLogger(IcebergHadoopCatalog.class);
+    private String warehouseLocation;
+    private DFSFileSystem fs;
+    private CloseableGroup closeableGroup;
+    private LockManager lockManager;
+    protected FileIO fileIO;
+    protected String uid;
+    private static final Joiner SLASH = Joiner.on("/");
+
+    public void initialize(String name, Map<String, String> properties) {
+        super.initialize(name, properties);
+        String inputWarehouseLocation = properties.get("warehouse");
+        Preconditions.checkArgument(inputWarehouseLocation != null && 
inputWarehouseLocation.length() > 0,
+                "Cannot initialize HadoopCatalog because warehousePath must 
not be null or empty");
+        this.warehouseLocation = 
LocationUtil.stripTrailingSlash(inputWarehouseLocation);
+        this.fs = new DFSFileSystem(properties);
+        this.fileIO = initializeFileIO(properties, getConf());
+        this.lockManager = LockManagers.from(properties);
+        this.closeableGroup = new CloseableGroup();
+        this.closeableGroup.addCloseable(this.lockManager);
+        this.closeableGroup.setSuppressCloseFailure(true);
+    }
+
+    protected FileIO initializeFileIO(Map<String, String> properties, 
Configuration hadoopConf) {
+        String fileIOImpl = properties.get(CatalogProperties.FILE_IO_IMPL);
+        if (fileIOImpl == null) {
+            /* when use the S3FileIO, we need some custom configurations,
+             * so HadoopFileIO is used in the superclass by default
+             * we can add better implementations to derived class just like 
the implementation in DLFCatalog.
+             */
+            FileIO io;
+            try {
+                io = new IcebergHadoopFileIO(hadoopConf, 
this.fs.rawFileSystem());
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+            io.initialize(properties);
+            return io;
+        } else {
+            return CatalogUtil.loadFileIO(fileIOImpl, properties, hadoopConf);
+        }
+    }
+
+    @Override
+    protected TableOperations newTableOps(TableIdentifier identifier) {
+        return new IcebergHadoopTableOperations(new 
Path(this.defaultWarehouseLocation(identifier)),
+                this.fileIO, getConf(), this.lockManager, this.fs);
+    }
+
+    @Override
+    protected String defaultWarehouseLocation(TableIdentifier tableIdentifier) 
{
+        String tableName = tableIdentifier.name();
+        StringBuilder sb = new StringBuilder();
+        sb.append(this.warehouseLocation).append('/');
+        String[] levels = tableIdentifier.namespace().levels();
+        for (String level : levels) {
+            sb.append(level).append('/');
+        }
+        sb.append(tableName);
+        return sb.toString();
+    }
+
+    @Override
+    public void createNamespace(Namespace namespace, Map<String, String> meta) 
{
+        Preconditions.checkArgument(!namespace.isEmpty(),
+                "Cannot create namespace with invalid name: %s", namespace);
+        if (!meta.isEmpty()) {
+            throw new UnsupportedOperationException("Cannot create namespace " 
+ namespace
+                    + ": metadata is not supported");
+        } else {
+            Path nsPath = new Path(this.warehouseLocation, 
SLASH.join(namespace.levels()));
+            if (this.isNamespace(nsPath)) {
+                throw new AlreadyExistsException("Namespace already exists: 
%s", namespace);
+            } else {
+                try {
+                    this.fs.mkdirs(nsPath);
+                } catch (IOException e) {
+                    throw new RuntimeIOException(e, "Create namespace failed: 
%s", namespace);
+                }
+            }
+        }
+    }
+
+    @Override
+    public List<Namespace> listNamespaces(Namespace namespace) {
+        Path nsPath = namespace.isEmpty()
+                ? new Path(warehouseLocation)
+                : new Path(warehouseLocation, SLASH.join(namespace.levels()));
+        if (!isNamespace(nsPath)) {
+            throw new NoSuchNamespaceException("Namespace does not exist: %s", 
namespace);
+        }
+
+        try {
+            // using the iterator listing allows for paged downloads
+            // from HDFS and prefetching from object storage.
+            List<Namespace> namespaces = new ArrayList<>();
+            RemoteIterator<FileStatus> it = fs.listStatusIterator(nsPath);
+            while (it.hasNext()) {
+                Path path = it.next().getPath();
+                if (isNamespace(path)) {
+                    namespaces.add(append(namespace, path.getName()));
+                }
+            }
+            return namespaces;
+        } catch (IOException ioe) {
+            throw new RuntimeIOException(ioe, "Failed to list namespace under: 
%s", namespace);
+        }
+    }
+
+    @Override
+    public Map<String, String> loadNamespaceMetadata(Namespace namespace) 
throws NoSuchNamespaceException {
+        Path nsPath = new Path(this.warehouseLocation, 
SLASH.join(namespace.levels()));
+        if (this.isNamespace(nsPath) && !namespace.isEmpty()) {
+            return ImmutableMap.of("location", nsPath.toString());
+        } else {
+            throw new NoSuchNamespaceException("Namespace does not exist: %s", 
namespace);
+        }
+    }
+
+    @Override
+    public boolean dropNamespace(Namespace namespace) throws 
NamespaceNotEmptyException {
+        Path nsPath = new Path(this.warehouseLocation, 
SLASH.join(namespace.levels()));
+        if (this.isNamespace(nsPath) && !namespace.isEmpty()) {
+            try {
+                if (this.fs.listStatusIterator(nsPath).hasNext()) {
+                    throw new NamespaceNotEmptyException("Namespace %s is not 
empty.", new Object[]{namespace});
+                } else {
+                    return this.fs.delete(nsPath, false);
+                }
+            } catch (IOException e) {
+                throw new RuntimeIOException(e, "Namespace delete failed: %s", 
new Object[]{namespace});
+            }
+        } else {
+            return false;
+        }
+    }
+
+    public boolean dropTable(TableIdentifier identifier, boolean purge) {
+        if (!this.isValidIdentifier(identifier)) {
+            throw new NoSuchTableException("Invalid identifier: %s", 
identifier);
+        } else {
+            Path tablePath = new 
Path(this.defaultWarehouseLocation(identifier));
+            TableOperations ops = this.newTableOps(identifier);
+            TableMetadata lastMetadata = ops.current();
+            try {
+                if (lastMetadata == null) {
+                    LOG.debug("Not an iceberg table: {}", identifier);
+                    return false;
+                } else {
+                    if (purge) {
+                        CatalogUtil.dropTableData(ops.io(), lastMetadata);
+                    }
+                    return this.fs.delete(tablePath, true);
+                }
+            } catch (IOException e) {
+                throw new RuntimeIOException(e, "Failed to delete file: %s", 
tablePath);
+            }
+        }
+    }
+
+    private Namespace append(Namespace ns, String name) {
+        String[] levels = Arrays.copyOfRange(ns.levels(), 0, 
ns.levels().length + 1);
+        levels[ns.levels().length] = name;
+        return Namespace.of(levels);
+    }
+
+    private boolean isNamespace(Path path) {
+        return isDirectory(path) && !isTableDir(path);
+    }
+
+    @Override
+    public List<TableIdentifier> listTables(Namespace namespace) {
+        Preconditions.checkArgument(namespace.levels().length >= 1,
+                "Missing database in table identifier: %s", namespace);
+        Path nsPath = new Path(this.warehouseLocation, 
Joiner.on("/").join(namespace.levels()));
+        Set<TableIdentifier> tblIdents = Sets.newHashSet();
+
+        try {
+            if (!isDirectory(nsPath)) {
+                throw new NoSuchNamespaceException("Namespace does not exist: 
%s", namespace);
+            }
+            RemoteIterator<FileStatus> it = this.fs.listStatusIterator(nsPath);
+            while (it.hasNext()) {
+                FileStatus status = it.next();
+                if (status.isDirectory()) {
+                    Path path = status.getPath();
+                    if (isTableDir(path)) {
+                        TableIdentifier tblIdent = 
TableIdentifier.of(namespace, path.getName());
+                        tblIdents.add(tblIdent);
+                    }
+                }
+            }
+        } catch (IOException e) {
+            throw new RuntimeIOException(e, "Failed to list tables under: %s", 
namespace);
+        }
+
+        return Lists.newArrayList(tblIdents);
+    }
+
+    private boolean isTableDir(Path path) {
+        Path metadataPath = new Path(path, "metadata");
+        PathFilter tableFilter = (filterPath) -> 
filterPath.getName().endsWith(".metadata.json");
+        try {
+            return fs.listStatus(metadataPath, tableFilter).length >= 1;
+        } catch (FileNotFoundException f) {
+            return false;
+        } catch (IOException e) {
+            throw new UncheckedIOException(e);
+        }
+    }
+
+    private boolean isDirectory(Path path) {
+        try {
+            return fs.getFileStatus(path).isDirectory();
+        } catch (FileNotFoundException e) {
+            return false;
+        } catch (IOException e) {
+            LOG.warn("Unable to list directory {}", path, e);
+            throw new UncheckedIOException(e);
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        this.closeableGroup.close();
+    }
+
+    @Override
+    public void setConf(Configuration configuration) {
+        super.setConf(configuration);
+    }
+
+    @Override
+    public Configuration getConf() {
+        Configuration conf = super.getConf();
+        if (conf == null) {
+            return new HdfsConfiguration();
+        }
+        return conf;
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hadoop/IcebergHadoopFileIO.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hadoop/IcebergHadoopFileIO.java
new file mode 100644
index 00000000000..97912290d2f
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hadoop/IcebergHadoopFileIO.java
@@ -0,0 +1,71 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.iceberg.hadoop;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.hadoop.HadoopFileIO;
+import org.apache.iceberg.io.BulkDeletionFailureException;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+
+import java.io.IOException;
+
+public class IcebergHadoopFileIO extends HadoopFileIO {
+
+    private FileSystem fs;
+    private Configuration hadoopConf;
+
+    public IcebergHadoopFileIO(Configuration hadoopConf, FileSystem fs) {
+        this.hadoopConf = hadoopConf;
+        this.fs = fs;
+    }
+
+    @Override
+    public InputFile newInputFile(String path) {
+        return new IcebergHadoopInputFile(this.fs, path, this.hadoopConf);
+    }
+
+    @Override
+    public InputFile newInputFile(String path, long length) {
+        return new IcebergHadoopInputFile(this.fs, path, length, 
this.hadoopConf);
+    }
+
+    @Override
+    public OutputFile newOutputFile(String path) {
+        return new IcebergHadoopOutputFile(this.fs, new Path(path), 
this.hadoopConf);
+    }
+
+    @Override
+    public void deleteFile(String path) {
+        Path toDelete = new Path(path);
+        try {
+            fs.delete(toDelete, false);
+        } catch (IOException var5) {
+            IOException e = var5;
+            throw new RuntimeIOException(e, "Failed to delete file: %s", path);
+        }
+    }
+
+    @Override
+    public void deleteFiles(Iterable<String> pathsToDelete) throws 
BulkDeletionFailureException {
+        // TODO
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hadoop/IcebergHadoopInputFile.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hadoop/IcebergHadoopInputFile.java
new file mode 100644
index 00000000000..105d83d4084
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hadoop/IcebergHadoopInputFile.java
@@ -0,0 +1,107 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.iceberg.hadoop;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.exceptions.NotFoundException;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.SeekableInputStream;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+public class IcebergHadoopInputFile implements InputFile {
+
+    private String location;
+    private final FileSystem fs;
+    private final Path path;
+    private final Configuration conf;
+    private FileStatus stat = null;
+    private Long length;
+
+    public IcebergHadoopInputFile(FileSystem fs, String location, 
Configuration conf) {
+        this(fs, location, null, conf);
+    }
+
+    public IcebergHadoopInputFile(FileSystem fs, Path path, Configuration 
conf) {
+        this.fs = fs;
+        this.path = path;
+        this.conf = conf;
+    }
+
+    public IcebergHadoopInputFile(FileSystem fs, String location, Long length, 
Configuration conf) {
+        this.fs = fs;
+        this.location = location;
+        this.path = new Path(location);
+        this.conf = conf;
+        this.length = length;
+    }
+
+    @Override
+    public long getLength() {
+        if (this.length == null) {
+            this.length = this.lazyStat().getLen();
+        }
+        return this.length;
+    }
+
+    public SeekableInputStream newStream() {
+        try {
+            return new IcebergSeekableInputStream(openFile());
+        } catch (FileNotFoundException e) {
+            throw new NotFoundException(e, "Failed to open input stream for 
file: %s", this.path);
+        } catch (IOException ex) {
+            throw new RuntimeIOException(ex, "Failed to open input stream for 
file: %s", this.path);
+        }
+    }
+
+    private FSDataInputStream openFile() throws IOException {
+        return fs.open(path);
+    }
+
+    @Override
+    public String location() {
+        return location;
+    }
+
+    public boolean exists() {
+        try {
+            return this.lazyStat() != null;
+        } catch (NotFoundException e) {
+            return false;
+        }
+    }
+
+    private FileStatus lazyStat() {
+        if (this.stat == null) {
+            try {
+                this.stat = this.fs.getFileStatus(this.path);
+            } catch (FileNotFoundException e) {
+                throw new NotFoundException(e, "File does not exist: %s", 
this.path);
+            } catch (IOException e) {
+                throw new RuntimeIOException(e, "Failed to get status for 
file: %s", this.path);
+            }
+        }
+        return this.stat;
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hadoop/IcebergHadoopOutputFile.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hadoop/IcebergHadoopOutputFile.java
new file mode 100644
index 00000000000..ca99f2b4590
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hadoop/IcebergHadoopOutputFile.java
@@ -0,0 +1,73 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.iceberg.hadoop;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.io.PositionOutputStream;
+
+import java.io.IOException;
+
+public class IcebergHadoopOutputFile implements OutputFile  {
+
+    private final FileSystem fs;
+    private final Path path;
+    private final Configuration conf;
+
+    public IcebergHadoopOutputFile(FileSystem fs, Path path, Configuration 
hadoopConf) {
+        this.fs = fs;
+        this.path = path;
+        this.conf = hadoopConf;
+    }
+
+    @Override
+    public PositionOutputStream create() {
+        try {
+            return new IcebergPositionOutputStream(this.fs.create(this.path, 
false));
+        } catch (FileAlreadyExistsException e) {
+            throw new AlreadyExistsException(e, "Path already exists: %s", 
this.path);
+        } catch (IOException e) {
+            throw new RuntimeIOException(e, "Failed to create file: %s", 
this.path);
+        }
+    }
+
+    @Override
+    public PositionOutputStream createOrOverwrite() {
+        try {
+            return new IcebergPositionOutputStream(this.fs.create(this.path, 
true));
+        } catch (IOException e) {
+            throw new RuntimeIOException(e, "Failed to create file: %s", 
this.path);
+        }
+    }
+
+    @Override
+    public String location() {
+        return this.path.toString();
+    }
+
+    @Override
+    public InputFile toInputFile() {
+        return new IcebergHadoopInputFile(this.fs, this.path, this.conf);
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hadoop/IcebergHadoopTableOperations.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hadoop/IcebergHadoopTableOperations.java
new file mode 100644
index 00000000000..187a1cdfcf4
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hadoop/IcebergHadoopTableOperations.java
@@ -0,0 +1,48 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.iceberg.hadoop;
+
+import org.apache.doris.fs.remote.dfs.DFSFileSystem;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.LockManager;
+import org.apache.iceberg.hadoop.HadoopTableOperations;
+import org.apache.iceberg.io.FileIO;
+
+import java.io.IOException;
+
+public class IcebergHadoopTableOperations extends HadoopTableOperations {
+    private final DFSFileSystem fileSystem;
+
+    public IcebergHadoopTableOperations(Path location, FileIO fileIO, 
Configuration conf,
+                                        LockManager lockManager, DFSFileSystem 
fileSystem) {
+        super(location, fileIO, conf, lockManager);
+        this.fileSystem = fileSystem;
+    }
+
+    @Override
+    protected FileSystem getFileSystem(Path path, Configuration hadoopConf) {
+        try {
+            return fileSystem.rawFileSystem();
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hadoop/IcebergPositionOutputStream.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hadoop/IcebergPositionOutputStream.java
new file mode 100644
index 00000000000..7e18200fa80
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hadoop/IcebergPositionOutputStream.java
@@ -0,0 +1,62 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.iceberg.hadoop;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.iceberg.io.DelegatingOutputStream;
+import org.apache.iceberg.io.PositionOutputStream;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+public class IcebergPositionOutputStream extends PositionOutputStream 
implements DelegatingOutputStream {
+
+    private final FSDataOutputStream stream;
+
+    IcebergPositionOutputStream(FSDataOutputStream stream) {
+        this.stream = stream;
+    }
+
+    public OutputStream getDelegate() {
+        return this.stream;
+    }
+
+    public long getPos() throws IOException {
+        return this.stream.getPos();
+    }
+
+    public void write(int b) throws IOException {
+        this.stream.write(b);
+    }
+
+    public void write(byte[] b) throws IOException {
+        this.stream.write(b);
+    }
+
+    public void write(byte[] b, int off, int len) throws IOException {
+        this.stream.write(b, off, len);
+    }
+
+    public void flush() throws IOException {
+        this.stream.flush();
+    }
+
+    public void close() throws IOException {
+        this.stream.close();
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hadoop/IcebergSeekableInputStream.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hadoop/IcebergSeekableInputStream.java
new file mode 100644
index 00000000000..501524fb497
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hadoop/IcebergSeekableInputStream.java
@@ -0,0 +1,62 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.iceberg.hadoop;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.iceberg.io.SeekableInputStream;
+
+import java.io.IOException;
+
+public class IcebergSeekableInputStream extends SeekableInputStream {
+    private final FSDataInputStream stream;
+
+    public IcebergSeekableInputStream(FSDataInputStream stream) {
+        this.stream = stream;
+    }
+
+    @Override
+    public long getPos() throws IOException {
+        return stream.getPos();
+    }
+
+    @Override
+    public void seek(long position) throws IOException {
+        stream.seek(position);
+    }
+
+    @Override
+    public int read() throws IOException {
+        return stream.read();
+    }
+
+    @Override
+    public int read(byte[] b) throws IOException {
+        return stream.read(b);
+    }
+
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException {
+        return stream.read(b, off, len);
+    }
+
+    @Override
+    public void close() throws IOException {
+        stream.close();
+    }
+
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hive/HCachedClientPool.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hive/HCachedClientPool.java
new file mode 100644
index 00000000000..ff8313cab0f
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hive/HCachedClientPool.java
@@ -0,0 +1,83 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.iceberg.hive;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.ClientPool;
+import org.apache.iceberg.hive.HiveClientPool;
+import org.apache.iceberg.util.PropertyUtil;
+import shade.doris.hive.org.apache.thrift.TException;
+
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class HCachedClientPool implements ClientPool<IMetaStoreClient, 
TException> {
+
+    private static volatile Cache<String, HiveClientPool> clientPoolCache;
+    private static final Object clientPoolCacheLock = new Object();
+    private final String catalogName;
+    private final Configuration conf;
+    private final int clientPoolSize;
+    private final long evictionInterval;
+
+    public HCachedClientPool(String catalogName, Configuration conf, 
Map<String, String> properties) {
+        this.catalogName = catalogName;
+        this.conf = conf;
+        this.clientPoolSize =
+            PropertyUtil.propertyAsInt(
+                properties,
+                CatalogProperties.CLIENT_POOL_SIZE,
+                CatalogProperties.CLIENT_POOL_SIZE_DEFAULT);
+        this.evictionInterval =
+            PropertyUtil.propertyAsLong(
+                properties,
+                CatalogProperties.CLIENT_POOL_CACHE_EVICTION_INTERVAL_MS,
+                
CatalogProperties.CLIENT_POOL_CACHE_EVICTION_INTERVAL_MS_DEFAULT);
+
+        if (clientPoolCache == null) {
+            synchronized (clientPoolCacheLock) {
+                if (clientPoolCache == null) {
+                    clientPoolCache =
+                        Caffeine.newBuilder()
+                            .expireAfterAccess(evictionInterval, 
TimeUnit.MILLISECONDS)
+                            .removalListener((key, value, cause) -> 
((HiveClientPool) value).close())
+                            .build();
+                }
+            }
+        }
+    }
+
+    protected HiveClientPool clientPool() {
+        return clientPoolCache.get(this.catalogName, (k) -> new 
HiveClientPool(this.clientPoolSize, this.conf));
+    }
+
+    @Override
+    public <R> R run(Action<R, IMetaStoreClient, TException> action) throws 
TException, InterruptedException {
+        return clientPool().run(action);
+    }
+
+    @Override
+    public <R> R run(Action<R, IMetaStoreClient, TException> action, boolean 
retry)
+            throws TException, InterruptedException {
+        return clientPool().run(action, retry);
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hive/HiveCompatibleCatalog.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hive/HiveCompatibleCatalog.java
new file mode 100644
index 00000000000..be3ad97859f
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hive/HiveCompatibleCatalog.java
@@ -0,0 +1,101 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.iceberg.hive;
+
+import org.apache.doris.datasource.iceberg.hadoop.IcebergHadoopFileIO;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.ClientPool;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.hadoop.HadoopFileIO;
+import org.apache.iceberg.hive.HiveCatalog;
+import org.apache.iceberg.io.FileIO;
+import shade.doris.hive.org.apache.thrift.TException;
+
+import java.io.IOException;
+import java.util.Map;
+
+public abstract class HiveCompatibleCatalog extends HiveCatalog {
+
+    protected Configuration conf;
+    protected ClientPool<IMetaStoreClient, TException> clients;
+    protected FileIO fileIO;
+    protected String uid;
+
+    public void initialize(String name, FileIO fileIO,
+                           ClientPool<IMetaStoreClient, TException> clients) {
+        this.uid = name;
+        this.fileIO = fileIO;
+        this.clients = clients;
+    }
+
+    protected FileIO initializeFileIO(Map<String, String> properties) {
+        String fileIOImpl = properties.get(CatalogProperties.FILE_IO_IMPL);
+        if (fileIOImpl == null) {
+            /* when use the S3FileIO, we need some custom configurations,
+             * so HadoopFileIO is used in the superclass by default
+             * we can add better implementations to derived class just like 
the implementation in DLFCatalog.
+             */
+            FileIO io;
+            try {
+                FileSystem fs = getFileSystem();
+                if (fs == null) {
+                    io = new HadoopFileIO(getConf());
+                } else {
+                    io = new IcebergHadoopFileIO(getConf(), getFileSystem());
+                }
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+            io.initialize(properties);
+            return io;
+        } else {
+            return CatalogUtil.loadFileIO(fileIOImpl, properties, getConf());
+        }
+    }
+
+    public FileSystem getFileSystem() throws IOException {
+        return null;
+    }
+
+    @Override
+    public TableOperations newTableOps(TableIdentifier tableIdentifier) {
+        String dbName = tableIdentifier.namespace().level(0);
+        String tableName = tableIdentifier.name();
+        return new IcebergHiveTableOperations(this.conf, this.clients, 
this.fileIO, this.uid, dbName, tableName);
+    }
+
+    @Override
+    public void setConf(Configuration conf) {
+        this.conf = conf;
+    }
+
+    @Override
+    public Configuration getConf() {
+        if (conf == null) {
+            return new HdfsConfiguration();
+        }
+        return conf;
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hive/IcebergHiveCatalog.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hive/IcebergHiveCatalog.java
new file mode 100644
index 00000000000..6aaa3965aaf
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hive/IcebergHiveCatalog.java
@@ -0,0 +1,130 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.iceberg.hive;
+
+import org.apache.doris.datasource.iceberg.hadoop.IcebergHadoopFileIO;
+import org.apache.doris.fs.remote.dfs.DFSFileSystem;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.ClientPool;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
+import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.util.LocationUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import shade.doris.hive.org.apache.thrift.TException;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+public class IcebergHiveCatalog extends HiveCompatibleCatalog {
+    private static final Logger LOG = 
LoggerFactory.getLogger(IcebergHiveCatalog.class);
+    private ClientPool<IMetaStoreClient, TException> clients;
+    private boolean listAllTables = false;
+
+    public void initialize(String name, Map<String, String> properties) {
+        if (this.conf == null) {
+            LOG.warn("No Hadoop Configuration was set, using the default 
environment Configuration");
+            this.conf = getConf();
+        }
+        if (properties.containsKey("uri")) {
+            this.conf.set(HiveConf.ConfVars.METASTOREURIS.varname, 
properties.get("uri"));
+        }
+        if (properties.containsKey("warehouse")) {
+            this.conf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname,
+                    
LocationUtil.stripTrailingSlash(properties.get("warehouse")));
+        }
+
+        this.listAllTables = 
Boolean.parseBoolean(properties.getOrDefault("list-all-tables", "true"));
+        String fileIOImpl = properties.get("io-impl");
+        org.apache.hadoop.fs.FileSystem fs;
+        try {
+            fs = new DFSFileSystem(properties).rawFileSystem();
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+        FileIO fileIO = fileIOImpl == null ? new 
IcebergHadoopFileIO(this.conf, fs)
+                : CatalogUtil.loadFileIO(fileIOImpl, properties, this.conf);
+        this.clients = new HCachedClientPool(name, this.conf, properties);
+        super.initialize(name, fileIO, clients);
+    }
+
+    @Override
+    protected String defaultWarehouseLocation(TableIdentifier tableIdentifier) 
{
+        try {
+            Database databaseData = this.clients.run((client) ->
+                    
client.getDatabase(tableIdentifier.namespace().levels()[0]));
+            if (databaseData.getLocationUri() != null) {
+                return String.format("%s/%s", databaseData.getLocationUri(), 
tableIdentifier.name());
+            }
+        } catch (TException e) {
+            throw new RuntimeException(String.format("Metastore operation 
failed for %s", tableIdentifier), e);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new RuntimeException("Interrupted during commit", e);
+        }
+
+        String databaseLocation = 
this.databaseLocation(tableIdentifier.namespace().levels()[0]);
+        return String.format("%s/%s", databaseLocation, 
tableIdentifier.name());
+    }
+
+    private String databaseLocation(String databaseName) {
+        String warehouseLocation = 
this.conf.get(HiveConf.ConfVars.METASTOREWAREHOUSE.varname);
+        Preconditions.checkNotNull(warehouseLocation,
+                "Warehouse location is not set: 
hive.metastore.warehouse.dir=null");
+        warehouseLocation = LocationUtil.stripTrailingSlash(warehouseLocation);
+        return String.format("%s/%s.db", warehouseLocation, databaseName);
+    }
+
+    @Override
+    public void createNamespace(Namespace namespace, Map<String, String> meta) 
{
+        super.createNamespace(namespace, meta);
+    }
+
+    @Override
+    public List<Namespace> listNamespaces(Namespace namespace) {
+        return super.listNamespaces(namespace);
+    }
+
+    @Override
+    public Map<String, String> loadNamespaceMetadata(Namespace namespace) 
throws NoSuchNamespaceException {
+        return super.loadNamespaceMetadata(namespace);
+    }
+
+    @Override
+    public boolean dropNamespace(Namespace namespace) throws 
NamespaceNotEmptyException {
+        return super.dropNamespace(namespace);
+    }
+
+    public boolean dropTable(TableIdentifier identifier, boolean purge) {
+        return super.dropTable(identifier, purge);
+    }
+
+    @Override
+    public List<TableIdentifier> listTables(Namespace namespace) {
+        return super.listTables(namespace);
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/dlf/DLFTableOperations.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hive/IcebergHiveTableOperations.java
similarity index 70%
rename from 
fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/dlf/DLFTableOperations.java
rename to 
fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hive/IcebergHiveTableOperations.java
index 2aab8e754ca..ea444e7870b 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/dlf/DLFTableOperations.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hive/IcebergHiveTableOperations.java
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package org.apache.doris.datasource.iceberg.dlf;
+package org.apache.doris.datasource.iceberg.hive;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
@@ -24,14 +24,14 @@ import org.apache.iceberg.hive.HiveTableOperations;
 import org.apache.iceberg.io.FileIO;
 import shade.doris.hive.org.apache.thrift.TException;
 
-public class DLFTableOperations extends HiveTableOperations {
+public class IcebergHiveTableOperations extends HiveTableOperations {
 
-    public DLFTableOperations(Configuration conf,
-                              ClientPool<IMetaStoreClient, TException> 
metaClients,
-                              FileIO fileIO,
-                              String catalogName,
-                              String database,
-                              String table) {
+    public IcebergHiveTableOperations(Configuration conf,
+                                      ClientPool<IMetaStoreClient, TException> 
metaClients,
+                                      FileIO fileIO,
+                                      String catalogName,
+                                      String database,
+                                      String table) {
         super(conf, metaClients, fileIO, catalogName, database, table);
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
index 2ca51298fe6..b822304c153 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
@@ -31,7 +31,6 @@ import org.apache.doris.datasource.ExternalTable;
 import org.apache.doris.datasource.FileQueryScanNode;
 import org.apache.doris.datasource.TableFormatType;
 import org.apache.doris.datasource.hive.HMSExternalTable;
-import org.apache.doris.datasource.hive.HiveMetaStoreClientHelper;
 import org.apache.doris.datasource.iceberg.IcebergExternalCatalog;
 import org.apache.doris.datasource.iceberg.IcebergExternalTable;
 import org.apache.doris.datasource.iceberg.IcebergUtils;
@@ -172,7 +171,11 @@ public class IcebergScanNode extends FileQueryScanNode {
 
     @Override
     public List<Split> getSplits() throws UserException {
-        return 
HiveMetaStoreClientHelper.ugiDoAs(source.getCatalog().getConfiguration(), 
this::doGetSplits);
+        try {
+            return 
source.getCatalog().getAuthenticator().doAs(this::doGetSplits);
+        } catch (IOException e) {
+            throw new UserException(e);
+        }
     }
 
     private List<Split> doGetSplits() throws UserException {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java 
b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java
index 59fbd73bda7..4018df68fa9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.logging.log4j.LogManager;
@@ -85,13 +86,7 @@ public class DFSFileSystem extends RemoteFileSystem {
                     AuthenticationConfig authConfig = 
AuthenticationConfig.getKerberosConfig(conf);
                     authenticator = 
HadoopAuthenticator.getHadoopAuthenticator(authConfig);
                     try {
-                        dfsFileSystem = authenticator.doAs(() -> {
-                            try {
-                                return FileSystem.get(new 
Path(remotePath).toUri(), conf);
-                            } catch (IOException e) {
-                                throw new RuntimeException(e);
-                            }
-                        });
+                        dfsFileSystem = authenticator.doAs(() -> 
FileSystem.get(new Path(remotePath).toUri(), conf));
                     } catch (Exception e) {
                         throw new UserException(e);
                     }
@@ -394,7 +389,7 @@ public class DFSFileSystem extends RemoteFileSystem {
             if 
(!srcPathUri.getAuthority().trim().equals(destPathUri.getAuthority().trim())) {
                 return new Status(Status.ErrCode.COMMON_ERROR, "only allow 
rename in same file system");
             }
-            FileSystem fileSystem = nativeFileSystem(destPath);
+            FileSystem fileSystem = rawFileSystem();
             Path srcfilePath = new Path(srcPathUri.getPath());
             Path destfilePath = new Path(destPathUri.getPath());
             boolean isRenameSuccess = authenticator.doAs(() -> 
fileSystem.rename(srcfilePath, destfilePath));
@@ -481,4 +476,42 @@ public class DFSFileSystem extends RemoteFileSystem {
         }
         return Status.OK;
     }
+
+    public FileSystem rawFileSystem() throws IOException {
+        if (dfsFileSystem == null) {
+            synchronized (this) {
+                if (dfsFileSystem == null) {
+                    Configuration conf = 
getHdfsConf(ifNotSetFallbackToSimpleAuth());
+                    for (Map.Entry<String, String> propEntry : 
properties.entrySet()) {
+                        conf.set(propEntry.getKey(), propEntry.getValue());
+                    }
+                    AuthenticationConfig authConfig = 
AuthenticationConfig.getKerberosConfig(conf);
+                    authenticator = 
HadoopAuthenticator.getHadoopAuthenticator(authConfig);
+                    dfsFileSystem = authenticator.doAs(() -> 
FileSystem.get(conf));
+                    operations = new HDFSFileOperations(dfsFileSystem);
+                }
+            }
+        }
+        return authenticator.doAs(() -> dfsFileSystem);
+    }
+
+    public FileStatus[] listStatus(Path f, PathFilter filter) throws 
IOException {
+        return authenticator.doAs(() -> rawFileSystem().listStatus(f, filter));
+    }
+
+    public RemoteIterator<FileStatus> listStatusIterator(Path p) throws 
IOException {
+        return authenticator.doAs(() -> rawFileSystem().listStatusIterator(p));
+    }
+
+    public FileStatus getFileStatus(Path f) throws IOException {
+        return authenticator.doAs(() -> rawFileSystem().getFileStatus(f));
+    }
+
+    public boolean delete(Path p, boolean recursion) throws IOException {
+        return authenticator.doAs(() -> rawFileSystem().delete(p, recursion));
+    }
+
+    public boolean mkdirs(Path p) throws IOException {
+        return authenticator.doAs(() -> rawFileSystem().mkdirs(p));
+    }
 }
diff --git 
a/regression-test/data/external_table_p0/kerberos/test_two_iceberg_kerberos.out 
b/regression-test/data/external_table_p0/kerberos/test_two_iceberg_kerberos.out
new file mode 100644
index 00000000000..fb4f79a0b73
--- /dev/null
+++ 
b/regression-test/data/external_table_p0/kerberos/test_two_iceberg_kerberos.out
@@ -0,0 +1,9 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !iceberg_q04 --
+1      2023-05-14
+2      2023-05-16
+
+-- !iceberg_q06 --
+1      krb1    2023-05-14
+2      krb2    2023-05-16
+3      krb3    2023-05-17
diff --git 
a/regression-test/suites/external_table_p0/kerberos/test_two_iceberg_kerberos.groovy
 
b/regression-test/suites/external_table_p0/kerberos/test_two_iceberg_kerberos.groovy
new file mode 100644
index 00000000000..579b3fcca25
--- /dev/null
+++ 
b/regression-test/suites/external_table_p0/kerberos/test_two_iceberg_kerberos.groovy
@@ -0,0 +1,124 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.junit.Assert;
+
+suite("test_two_iceberg_kerberos", 
"p0,external,kerberos,external_docker,external_docker_kerberos") {
+    String enabled = context.config.otherConfigs.get("enableKerberosTest")
+    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+        //  test iceberg hadoop catalog with kerberos
+        sql """
+            CREATE CATALOG IF NOT EXISTS test_krb_iceberg_ctl_hadoop
+            PROPERTIES ( 
+                'type'='iceberg',
+                'iceberg.catalog.type'='hadoop',
+                "warehouse" = "hdfs://hadoop-master:9000/user/hive/warehouse",
+                "fs.defaultFS" = "hdfs://hadoop-master:9000",
+                "hadoop.security.authentication" = "kerberos",
+                
"hadoop.kerberos.principal"="hive/[email protected]",
+                "hadoop.kerberos.keytab" = 
"/keytabs/hive-presto-master.keytab",
+                "doris.krb5.debug" = "true"
+            );
+            """
+
+        sql """ SWITCH test_krb_iceberg_ctl_hadoop; """
+        sql """ CREATE DATABASE IF NOT EXISTS hadoop_test_krb_iceberg_db; """
+        sql """ USE hadoop_test_krb_iceberg_db; """
+        sql """ CREATE TABLE IF NOT EXISTS hadoop_test_krb_iceberg_tbl (id 
int, str string, dd date) engine = iceberg; """
+        sql """ INSERT INTO hadoop_test_krb_iceberg_tbl values(1, 'krb1', 
'2023-05-14') """
+        sql """ INSERT INTO hadoop_test_krb_iceberg_tbl values(2, 'krb2', 
'2023-05-16') """
+        sql """ INSERT INTO hadoop_test_krb_iceberg_tbl values(3, 'krb3', 
'2023-05-17') """
+
+        // order_qt_iceberg_q02 """ SELECT id,dd FROM 
test_krb_iceberg_ctl.test_krb_iceberg_db.test_krb_iceberg_tbl where dd >= 
'2023-05-16' """
+        // order_qt_iceberg_q03 """ SELECT id,dd FROM 
other_test_krb_iceberg_ctl.other_test_krb_iceberg_db.other_test_krb_iceberg_tbl 
where dd <= '2023-05-16' """
+
+        // cross catalog query test
+         order_qt_iceberg_q04 """ SELECT id,dd FROM 
hadoop_test_krb_iceberg_tbl where dd <= '2023-05-16' """
+        // order_qt_iceberg_q05 """ SELECT * FROM 
test_krb_iceberg_ctl.test_krb_iceberg_db.test_krb_iceberg_tbl """
+        order_qt_iceberg_q06 """ SELECT * FROM 
test_krb_iceberg_ctl_hadoop.hadoop_test_krb_iceberg_db.hadoop_test_krb_iceberg_tbl
 """
+        // order_qt_iceberg_q07 """ SELECT * FROM 
other_test_krb_iceberg_ctl.other_test_krb_iceberg_db.other_test_krb_iceberg_tbl 
"""
+
+        // sql """ DROP TABLE IF EXISTS 
test_krb_iceberg_ctl.`test_krb_iceberg_db`.`test_krb_iceberg_tbl`; """
+        // sql """ DROP TABLE IF EXISTS 
other_test_krb_iceberg_ctl.`other_test_krb_iceberg_db`.`other_test_krb_iceberg_tbl`;
 """
+        sql """ DROP TABLE IF EXISTS 
test_krb_iceberg_ctl_hadoop.`hadoop_test_krb_iceberg_db`.`hadoop_test_krb_iceberg_tbl`;
 """
+
+        // sql """ DROP DATABASE IF EXISTS 
test_krb_iceberg_ctl.`test_krb_iceberg_db`; """
+        // sql """ DROP DATABASE IF EXISTS 
other_test_krb_iceberg_ctl.`other_test_krb_iceberg_db`; """
+        sql """ DROP DATABASE IF EXISTS 
test_krb_iceberg_ctl_hadoop.`hadoop_test_krb_iceberg_db`; """
+
+        // sql """ DROP CATALOG test_krb_iceberg_ctl """
+        // sql """ DROP CATALOG other_test_krb_iceberg_ctl """
+        sql """ DROP CATALOG test_krb_iceberg_ctl_hadoop """
+
+        //        // test iceberg hms catalog with kerberos
+        //        sql """
+        //            CREATE CATALOG IF NOT EXISTS test_krb_iceberg_ctl
+        //            PROPERTIES (
+        //                'type'='iceberg',
+        //                'iceberg.catalog.type'='hms',
+        //                "hive.metastore.uris" = "thrift://172.31.71.25:9083",
+        //                "fs.defaultFS" = "hdfs://hadoop-master:9000",
+        //                "hadoop.security.authentication" = "kerberos",
+        //                "hadoop.kerberos.min.seconds.before.relogin" = "5",
+        //                
"hadoop.kerberos.principal"="hive/[email protected]",
+        //                "hadoop.kerberos.keytab" = 
"/keytabs/hive-presto-master.keytab",
+        //                "hive.metastore.sasl.enabled" = "true",
+        //                "hive.metastore.kerberos.principal" = 
"hive/[email protected]",
+        //                
"hive.metastore.warehouse.dir"="hdfs://hadoop-master:9000/user/hive/warehouse",
+        //                "doris.krb5.debug" = "true"
+        //            );
+        //        """
+        //
+        //        sql """
+        //            CREATE CATALOG IF NOT EXISTS other_test_krb_iceberg_ctl
+        //            PROPERTIES (
+        //                'type'='iceberg',
+        //                'iceberg.catalog.type'='hms',
+        //                "hive.metastore.uris" = "thrift://172.31.71.26:9083",
+        //                "fs.defaultFS" = "hdfs://hadoop-master-2:9000",
+        //                
"hive.metastore.warehouse.dir"="hdfs://hadoop-master-2:9000/user/hive/warehouse",
+        //                "hadoop.security.authentication" = "kerberos",
+        //                "hadoop.kerberos.min.seconds.before.relogin" = "5",
+        //                
"hadoop.kerberos.principal"="hive/[email protected]",
+        //                "hadoop.kerberos.keytab" = 
"/keytabs/other-hive-presto-master.keytab",
+        //                "hive.metastore.sasl.enabled" = "true",
+        //                "hive.metastore.kerberos.principal" = 
"hive/[email protected]",
+        //                "hadoop.security.auth_to_local" 
="RULE:[2:\$1@\$0](.*@OTHERREALM.COM)s/@.*//
+        //                                                  
RULE:[2:\$1@\$0](.*@OTHERLABS.TERADATA.COM)s/@.*//
+        //                                                  DEFAULT",
+        //                "doris.krb5.debug" = "true"
+        //            );
+        //        """
+        //
+        //        sql """ SWITCH test_krb_iceberg_ctl; """
+        //        sql """ CREATE DATABASE IF NOT EXISTS `test_krb_iceberg_db`; 
"""
+        //        sql """ USE `test_krb_iceberg_db`; """
+        //        sql """ CREATE TABLE IF NOT EXISTS test_krb_iceberg_tbl (id 
int, str string, dd date) engine = iceberg; """
+        //        sql """ INSERT INTO test_krb_iceberg_tbl values(1, 'krb1', 
'2023-05-14') """
+        //        sql """ INSERT INTO test_krb_iceberg_tbl values(2, 'krb2', 
'2023-05-16') """
+        //        sql """ INSERT INTO test_krb_iceberg_tbl values(3, 'krb3', 
'2023-05-17') """
+        //        order_qt_iceberg_q01 """ SELECT * FROM test_krb_iceberg_tbl 
"""
+        //
+        //        sql """ SWITCH other_test_krb_iceberg_ctl; """
+        //        sql """ CREATE DATABASE IF NOT EXISTS 
`other_test_krb_iceberg_db`; """
+        //        sql """ USE `other_test_krb_iceberg_db`; """
+        //        sql """ CREATE TABLE IF NOT EXISTS 
other_test_krb_iceberg_tbl (id int, str string, dd date) engine = iceberg; """
+        //        sql """ INSERT INTO other_test_krb_iceberg_tbl values(1, 
'krb1', '2023-05-14') """
+        //        sql """ INSERT INTO other_test_krb_iceberg_tbl values(2, 
'krb2', '2023-05-16') """
+        //        sql """ INSERT INTO other_test_krb_iceberg_tbl values(3, 
'krb3', '2023-05-17') """
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to