This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch multi_kerberos in repository https://gitbox.apache.org/repos/asf/doris.git
commit 7b297015bad32ff2fd0121a917b1b45bae5c1e60 Author: slothever <[email protected]> AuthorDate: Tue Aug 6 01:56:23 2024 +0800 [Improvement](kerberos)refactor ugi login for iceberg hive catalog --- ...HadoopAuthenticator.java => Authenticator.java} | 25 +- .../authentication/HadoopAuthenticator.java | 2 +- .../apache/doris/datasource/ExternalCatalog.java | 19 ++ .../doris/datasource/hive/HMSExternalCatalog.java | 11 +- .../datasource/hive/HiveMetaStoreClientHelper.java | 7 - .../datasource/iceberg/HiveCompatibleCatalog.java | 181 ------------ .../datasource/iceberg/IcebergExternalCatalog.java | 24 ++ .../iceberg/IcebergHMSExternalCatalog.java | 4 +- .../iceberg/IcebergHadoopExternalCatalog.java | 33 ++- .../datasource/iceberg/IcebergMetadataCache.java | 27 +- .../datasource/iceberg/IcebergMetadataOps.java | 2 +- .../doris/datasource/iceberg/IcebergUtils.java | 31 ++- .../doris/datasource/iceberg/dlf/DLFCatalog.java | 118 +++++++- .../iceberg/hadoop/IcebergHadoopCatalog.java | 303 +++++++++++++++++++++ .../iceberg/hadoop/IcebergHadoopFileIO.java | 71 +++++ .../iceberg/hadoop/IcebergHadoopInputFile.java | 107 ++++++++ .../iceberg/hadoop/IcebergHadoopOutputFile.java | 73 +++++ .../hadoop/IcebergHadoopTableOperations.java | 48 ++++ .../hadoop/IcebergPositionOutputStream.java | 62 +++++ .../iceberg/hadoop/IcebergSeekableInputStream.java | 62 +++++ .../datasource/iceberg/hive/HCachedClientPool.java | 83 ++++++ .../iceberg/hive/HiveCompatibleCatalog.java | 101 +++++++ .../iceberg/hive/IcebergHiveCatalog.java | 130 +++++++++ .../IcebergHiveTableOperations.java} | 16 +- .../datasource/iceberg/source/IcebergScanNode.java | 7 +- .../apache/doris/fs/remote/dfs/DFSFileSystem.java | 49 +++- .../kerberos/test_two_iceberg_kerberos.out | 9 + .../kerberos/test_two_iceberg_kerberos.groovy | 124 +++++++++ 28 files changed, 1460 insertions(+), 269 deletions(-) diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopAuthenticator.java b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/Authenticator.java similarity index 56% copy from fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopAuthenticator.java copy to fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/Authenticator.java index c3cab5f410b..2e6c1602f0f 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopAuthenticator.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/Authenticator.java @@ -17,28 +17,17 @@ package org.apache.doris.common.security.authentication; -import org.apache.hadoop.security.UserGroupInformation; - import java.io.IOException; import java.security.PrivilegedExceptionAction; -public interface HadoopAuthenticator { - - UserGroupInformation getUGI() throws IOException; +public interface Authenticator { - default <T> T doAs(PrivilegedExceptionAction<T> action) throws IOException { - try { - return getUGI().doAs(action); - } catch (InterruptedException e) { - throw new IOException(e); - } - } + <T> T doAs(PrivilegedExceptionAction<T> action) throws IOException; - static HadoopAuthenticator getHadoopAuthenticator(AuthenticationConfig config) { - if (config instanceof KerberosAuthenticationConfig) { - return new HadoopKerberosAuthenticator((KerberosAuthenticationConfig) config); - } else { - return new HadoopSimpleAuthenticator((SimpleAuthenticationConfig) config); - } + default <T> void doAsNoReturn(Runnable action) throws IOException { + doAs(() -> { + action.run(); + return null; + }); } } diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopAuthenticator.java b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopAuthenticator.java index c3cab5f410b..dce3eb8e7b5 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopAuthenticator.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopAuthenticator.java @@ -22,7 +22,7 @@ import org.apache.hadoop.security.UserGroupInformation; import java.io.IOException; import java.security.PrivilegedExceptionAction; -public interface HadoopAuthenticator { +public interface HadoopAuthenticator extends Authenticator { UserGroupInformation getUGI() throws IOException; diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java index 0d854a9ecdc..44ecdca733c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java @@ -37,6 +37,7 @@ import org.apache.doris.common.UserException; import org.apache.doris.common.Version; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; +import org.apache.doris.common.security.authentication.Authenticator; import org.apache.doris.common.util.Util; import org.apache.doris.datasource.es.EsExternalDatabase; import org.apache.doris.datasource.hive.HMSExternalCatalog; @@ -78,6 +79,7 @@ import org.jetbrains.annotations.Nullable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.Collection; import java.util.HashSet; @@ -163,6 +165,23 @@ public abstract class ExternalCatalog return conf; } + /** + * get authenticator for catalog + * return a dummy authenticator by default + */ + public synchronized Authenticator getAuthenticator() { + return new Authenticator() { + @Override + public <T> T doAs(PrivilegedExceptionAction<T> action) throws IOException { + try { + return action.run(); + } catch (Exception e) { + throw new IOException(e); + } + } + }; + } + /** * set some default properties when creating catalog * @return list of database names in this catalog diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java index 5faf1f2bb6e..1faeeca92bd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java @@ -43,7 +43,6 @@ import org.apache.doris.transaction.TransactionManagerFactory; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Strings; -import lombok.Getter; import org.apache.commons.lang3.math.NumberUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.logging.log4j.LogManager; @@ -71,7 +70,6 @@ public class HMSExternalCatalog extends ExternalCatalog { private static final int FILE_SYSTEM_EXECUTOR_THREAD_NUM = 16; private ThreadPoolExecutor fileSystemExecutor; - @Getter private HadoopAuthenticator authenticator; private int hmsEventsBatchSizePerRpc = -1; @@ -281,8 +279,13 @@ public class HMSExternalCatalog extends ExternalCatalog { return catalogProperty.getOrDefault(HMSProperties.HIVE_METASTORE_URIS, ""); } - public String getHiveVersion() { - return catalogProperty.getOrDefault(HMSProperties.HIVE_VERSION, ""); + @Override + public synchronized HadoopAuthenticator getAuthenticator() { + if (authenticator == null) { + AuthenticationConfig config = AuthenticationConfig.getKerberosConfig(getConfiguration()); + authenticator = HadoopAuthenticator.getHadoopAuthenticator(config); + } + return authenticator; } public int getHmsEventsBatchSizePerRpc() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreClientHelper.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreClientHelper.java index db6019eda97..352e00a7620 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreClientHelper.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreClientHelper.java @@ -31,7 +31,6 @@ import org.apache.doris.analysis.NullLiteral; import org.apache.doris.analysis.SlotRef; import org.apache.doris.analysis.StringLiteral; import org.apache.doris.catalog.ArrayType; -import org.apache.doris.catalog.Env; import org.apache.doris.catalog.MapType; import org.apache.doris.catalog.PrimitiveType; import org.apache.doris.catalog.ScalarType; @@ -41,7 +40,6 @@ import org.apache.doris.catalog.Type; import org.apache.doris.common.DdlException; import org.apache.doris.common.security.authentication.AuthenticationConfig; import org.apache.doris.common.security.authentication.HadoopUGI; -import org.apache.doris.datasource.ExternalCatalog; import org.apache.doris.fs.remote.dfs.DFSFileSystem; import org.apache.doris.thrift.TExprOpcode; @@ -813,11 +811,6 @@ public class HiveMetaStoreClientHelper { return hudiSchema; } - public static <T> T ugiDoAs(long catalogId, PrivilegedExceptionAction<T> action) { - return ugiDoAs(((ExternalCatalog) Env.getCurrentEnv().getCatalogMgr().getCatalog(catalogId)).getConfiguration(), - action); - } - public static <T> T ugiDoAs(Configuration conf, PrivilegedExceptionAction<T> action) { // if hive config is not ready, then use hadoop kerberos to login AuthenticationConfig krbConfig = AuthenticationConfig.getKerberosConfig(conf, diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/HiveCompatibleCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/HiveCompatibleCatalog.java deleted file mode 100644 index 6431b02308b..00000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/HiveCompatibleCatalog.java +++ /dev/null @@ -1,181 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.datasource.iceberg; - -import org.apache.hadoop.conf.Configurable; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.metastore.IMetaStoreClient; -import org.apache.iceberg.BaseMetastoreCatalog; -import org.apache.iceberg.CatalogProperties; -import org.apache.iceberg.CatalogUtil; -import org.apache.iceberg.ClientPool; -import org.apache.iceberg.catalog.Namespace; -import org.apache.iceberg.catalog.SupportsNamespaces; -import org.apache.iceberg.catalog.TableIdentifier; -import org.apache.iceberg.exceptions.NamespaceNotEmptyException; -import org.apache.iceberg.exceptions.NoSuchNamespaceException; -import org.apache.iceberg.exceptions.NoSuchTableException; -import org.apache.iceberg.hadoop.HadoopFileIO; -import org.apache.iceberg.io.FileIO; -import shade.doris.hive.org.apache.thrift.TException; - -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.stream.Collectors; - -public abstract class HiveCompatibleCatalog extends BaseMetastoreCatalog implements SupportsNamespaces, Configurable { - - protected Configuration conf; - protected ClientPool<IMetaStoreClient, TException> clients; - protected FileIO fileIO; - protected String uid; - - public void initialize(String name, FileIO fileIO, - ClientPool<IMetaStoreClient, TException> clients) { - this.uid = name; - this.fileIO = fileIO; - this.clients = clients; - } - - protected FileIO initializeFileIO(Map<String, String> properties, Configuration hadoopConf) { - String fileIOImpl = properties.get(CatalogProperties.FILE_IO_IMPL); - if (fileIOImpl == null) { - /* when use the S3FileIO, we need some custom configurations, - * so HadoopFileIO is used in the superclass by default - * we can add better implementations to derived class just like the implementation in DLFCatalog. - */ - FileIO io = new HadoopFileIO(hadoopConf); - io.initialize(properties); - return io; - } else { - return CatalogUtil.loadFileIO(fileIOImpl, properties, hadoopConf); - } - } - - @Override - protected String defaultWarehouseLocation(TableIdentifier tableIdentifier) { - return null; - } - - @Override - protected boolean isValidIdentifier(TableIdentifier tableIdentifier) { - return tableIdentifier.namespace().levels().length == 1; - } - - protected boolean isValidNamespace(Namespace namespace) { - return namespace.levels().length != 1; - } - - @Override - public List<TableIdentifier> listTables(Namespace namespace) { - if (isValidNamespace(namespace)) { - throw new NoSuchTableException("Invalid namespace: %s", namespace); - } - String dbName = namespace.level(0); - try { - return clients.run(client -> client.getAllTables(dbName)) - .stream() - .map(tbl -> TableIdentifier.of(dbName, tbl)) - .collect(Collectors.toList()); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - @Override - public boolean dropTable(TableIdentifier tableIdentifier, boolean purge) { - throw new UnsupportedOperationException( - "Cannot drop table " + tableIdentifier + " : dropTable is not supported"); - } - - @Override - public void renameTable(TableIdentifier sourceTbl, TableIdentifier targetTbl) { - throw new UnsupportedOperationException( - "Cannot rename table " + sourceTbl + " : renameTable is not supported"); - } - - @Override - public void createNamespace(Namespace namespace, Map<String, String> props) { - throw new UnsupportedOperationException( - "Cannot create namespace " + namespace + " : createNamespace is not supported"); - } - - @Override - public List<Namespace> listNamespaces(Namespace namespace) throws NoSuchNamespaceException { - if (isValidNamespace(namespace) && !namespace.isEmpty()) { - throw new NoSuchNamespaceException("Namespace does not exist: %s", namespace); - } - if (!namespace.isEmpty()) { - return new ArrayList<>(); - } - List<Namespace> namespaces = new ArrayList<>(); - List<String> databases; - try { - databases = clients.run(client -> client.getAllDatabases()); - for (String database : databases) { - namespaces.add(Namespace.of(database)); - } - } catch (Exception e) { - throw new RuntimeException(e); - } - return namespaces; - } - - @Override - public Map<String, String> loadNamespaceMetadata(Namespace namespace) throws NoSuchNamespaceException { - if (isValidNamespace(namespace)) { - throw new NoSuchTableException("Invalid namespace: %s", namespace); - } - String dbName = namespace.level(0); - try { - return clients.run(client -> client.getDatabase(dbName)).getParameters(); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - @Override - public boolean dropNamespace(Namespace namespace) throws NamespaceNotEmptyException { - throw new UnsupportedOperationException( - "Cannot drop namespace " + namespace + " : dropNamespace is not supported"); - } - - @Override - public boolean setProperties(Namespace namespace, Map<String, String> props) throws NoSuchNamespaceException { - throw new UnsupportedOperationException( - "Cannot set namespace properties " + namespace + " : setProperties is not supported"); - } - - @Override - public boolean removeProperties(Namespace namespace, Set<String> pNames) throws NoSuchNamespaceException { - throw new UnsupportedOperationException( - "Cannot remove properties " + namespace + " : removeProperties is not supported"); - } - - @Override - public void setConf(Configuration conf) { - this.conf = conf; - } - - @Override - public Configuration getConf() { - return conf; - } -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java index 6f79afd5de5..a9029499ad8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java @@ -26,8 +26,11 @@ import org.apache.doris.transaction.TransactionManagerFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.s3a.Constants; +import org.apache.iceberg.Table; import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.TableIdentifier; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -83,4 +86,25 @@ public abstract class IcebergExternalCatalog extends ExternalCatalog { Map<String, String> properties = catalogProperty.getHadoopProperties(); conf.set(Constants.AWS_CREDENTIALS_PROVIDER, PropertyConverter.getAWSCredentialsProviders(properties)); } + + public Table loadTable(TableIdentifier of) { + Table tbl = getCatalog().loadTable(of); + Map<String, String> extProps = getProperties(); + initIcebergTableFileIO(tbl, extProps); + return tbl; + } + + public static void initIcebergTableFileIO(Table table, Map<String, String> props) { + Map<String, String> ioConf = new HashMap<>(); + table.properties().forEach((key, value) -> { + if (key.startsWith("io.")) { + ioConf.put(key, value); + } + }); + + // This `initialize` method will directly override the properties as a whole, + // so we need to merge the table's io-related properties with the doris's catalog-related properties + props.putAll(ioConf); + table.io().initialize(props); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergHMSExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergHMSExternalCatalog.java index c1475064934..05c08b0c771 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergHMSExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergHMSExternalCatalog.java @@ -18,11 +18,11 @@ package org.apache.doris.datasource.iceberg; import org.apache.doris.datasource.CatalogProperty; +import org.apache.doris.datasource.iceberg.hive.IcebergHiveCatalog; import org.apache.doris.datasource.property.PropertyConverter; import org.apache.doris.datasource.property.constants.HMSProperties; import org.apache.iceberg.CatalogProperties; -import org.apache.iceberg.hive.HiveCatalog; import java.util.Map; @@ -38,7 +38,7 @@ public class IcebergHMSExternalCatalog extends IcebergExternalCatalog { @Override protected void initCatalog() { icebergCatalogType = ICEBERG_HMS; - HiveCatalog hiveCatalog = new org.apache.iceberg.hive.HiveCatalog(); + IcebergHiveCatalog hiveCatalog = new IcebergHiveCatalog(); hiveCatalog.setConf(getConfiguration()); // initialize hive catalog Map<String, String> catalogProperties = catalogProperty.getProperties(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergHadoopExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergHadoopExternalCatalog.java index bf9e8c2b3f0..3820b609569 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergHadoopExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergHadoopExternalCatalog.java @@ -18,19 +18,26 @@ package org.apache.doris.datasource.iceberg; import org.apache.doris.catalog.HdfsResource; +import org.apache.doris.common.security.authentication.AuthenticationConfig; +import org.apache.doris.common.security.authentication.HadoopAuthenticator; import org.apache.doris.datasource.CatalogProperty; +import org.apache.doris.datasource.iceberg.hadoop.IcebergHadoopCatalog; import org.apache.doris.datasource.property.PropertyConverter; import com.google.common.base.Preconditions; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.CatalogProperties; -import org.apache.iceberg.hadoop.HadoopCatalog; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.TableIdentifier; +import java.io.IOException; import java.util.Map; public class IcebergHadoopExternalCatalog extends IcebergExternalCatalog { + private HadoopAuthenticator authenticator; + public IcebergHadoopExternalCatalog(long catalogId, String name, String resource, Map<String, String> props, String comment) { super(catalogId, name, comment); @@ -49,10 +56,19 @@ public class IcebergHadoopExternalCatalog extends IcebergExternalCatalog { } } + @Override + public synchronized HadoopAuthenticator getAuthenticator() { + if (authenticator == null) { + AuthenticationConfig config = AuthenticationConfig.getKerberosConfig(getConfiguration()); + authenticator = HadoopAuthenticator.getHadoopAuthenticator(config); + } + return authenticator; + } + @Override protected void initCatalog() { icebergCatalogType = ICEBERG_HADOOP; - HadoopCatalog hadoopCatalog = new HadoopCatalog(); + IcebergHadoopCatalog hadoopCatalog = new IcebergHadoopCatalog(); Configuration conf = getConfiguration(); initS3Param(conf); // initialize hadoop catalog @@ -63,4 +79,17 @@ public class IcebergHadoopExternalCatalog extends IcebergExternalCatalog { hadoopCatalog.initialize(getName(), catalogProperties); catalog = hadoopCatalog; } + + @Override + public Table loadTable(TableIdentifier of) { + Table tbl; + try { + tbl = getAuthenticator().doAs(() -> getCatalog().loadTable(of)); + } catch (IOException e) { + throw new RuntimeException(e); + } + Map<String, String> extProps = getProperties(); + initIcebergTableFileIO(tbl, extProps); + return tbl; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java index 13bd9650978..214b9cdeeee 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java @@ -23,7 +23,6 @@ import org.apache.doris.common.Config; import org.apache.doris.common.UserException; import org.apache.doris.datasource.CatalogIf; import org.apache.doris.datasource.hive.HMSExternalCatalog; -import org.apache.doris.datasource.hive.HiveMetaStoreClientHelper; import org.apache.doris.datasource.property.constants.HMSProperties; import org.apache.doris.fs.remote.dfs.DFSFileSystem; import org.apache.doris.thrift.TIcebergMetadataParams; @@ -41,6 +40,7 @@ import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.hive.HiveCatalog; import org.jetbrains.annotations.NotNull; +import java.io.IOException; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -111,21 +111,30 @@ public class IcebergMetadataCache { @NotNull private Table loadTable(IcebergMetadataCacheKey key) { Catalog icebergCatalog; + Table icebergTable; if (key.catalog instanceof HMSExternalCatalog) { HMSExternalCatalog ctg = (HMSExternalCatalog) key.catalog; icebergCatalog = createIcebergHiveCatalog( - ctg.getHiveMetastoreUris(), - ctg.getCatalogProperty().getHadoopProperties(), - ctg.getProperties()); + ctg.getHiveMetastoreUris(), + ctg.getCatalogProperty().getHadoopProperties(), + ctg.getProperties()); + try { + icebergTable = ctg.getAuthenticator().doAs(() -> { + Table tbl = icebergCatalog.loadTable(TableIdentifier.of(key.dbName, key.tableName)); + IcebergExternalCatalog.initIcebergTableFileIO(tbl, key.catalog.getProperties()); + return tbl; + }); + } catch (IOException e) { + throw new RuntimeException(e); + } + return icebergTable; } else if (key.catalog instanceof IcebergExternalCatalog) { - icebergCatalog = ((IcebergExternalCatalog) key.catalog).getCatalog(); + IcebergExternalCatalog ctg = ((IcebergExternalCatalog) key.catalog); + icebergTable = ctg.loadTable(TableIdentifier.of(key.dbName, key.tableName)); + return icebergTable; } else { throw new RuntimeException("Only support 'hms' and 'iceberg' type for iceberg table"); } - Table icebergTable = HiveMetaStoreClientHelper.ugiDoAs(key.catalog.getId(), - () -> icebergCatalog.loadTable(TableIdentifier.of(key.dbName, key.tableName))); - initIcebergTableFileIO(icebergTable, key.catalog.getProperties()); - return icebergTable; } public void invalidateCatalogCache(long catalogId) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java index 66c57e37307..9187178c7dc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java @@ -161,7 +161,7 @@ public class IcebergMetadataOps implements ExternalMetadataOps { PartitionSpec partitionSpec = IcebergUtils.solveIcebergPartitionSpec(stmt.getPartitionDesc(), schema); catalog.createTable(TableIdentifier.of(dbName, tableName), schema, partitionSpec, properties); db.setUnInitialized(true); - return false; + return true; } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java index f7280f5721f..3372d9033ea 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java @@ -46,7 +46,6 @@ import org.apache.doris.common.UserException; import org.apache.doris.common.info.SimpleTableInfo; import org.apache.doris.common.util.TimeUtils; import org.apache.doris.datasource.ExternalCatalog; -import org.apache.doris.datasource.hive.HiveMetaStoreClientHelper; import org.apache.doris.nereids.exceptions.NotSupportedException; import org.apache.doris.thrift.TExprOpcode; @@ -69,6 +68,7 @@ import org.apache.iceberg.util.LocationUtil; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Locale; @@ -570,21 +570,24 @@ public class IcebergUtils { * Get iceberg schema from catalog and convert them to doris schema */ public static List<Column> getSchema(ExternalCatalog catalog, String dbName, String name) { - return HiveMetaStoreClientHelper.ugiDoAs(catalog.getConfiguration(), () -> { - org.apache.iceberg.Table icebergTable = getIcebergTable(catalog, dbName, name); - Schema schema = icebergTable.schema(); - List<Types.NestedField> columns = schema.columns(); - List<Column> tmpSchema = Lists.newArrayListWithCapacity(columns.size()); - for (Types.NestedField field : columns) { - tmpSchema.add(new Column(field.name().toLowerCase(Locale.ROOT), - IcebergUtils.icebergTypeToDorisType(field.type()), true, null, true, field.doc(), true, - schema.caseInsensitiveFindField(field.name()).fieldId())); - } - return tmpSchema; - }); + try { + return catalog.getAuthenticator().doAs(() -> { + Table icebergTable = getIcebergTable(catalog, dbName, name); + Schema schema = icebergTable.schema(); + List<Types.NestedField> columns = schema.columns(); + List<Column> tmpSchema = Lists.newArrayListWithCapacity(columns.size()); + for (Types.NestedField field : columns) { + tmpSchema.add(new Column(field.name().toLowerCase(Locale.ROOT), + IcebergUtils.icebergTypeToDorisType(field.type()), true, null, true, field.doc(), true, + schema.caseInsensitiveFindField(field.name()).fieldId())); + } + return tmpSchema; + }); + } catch (IOException e) { + throw new RuntimeException(e); + } } - /** * Estimate iceberg table row count. * Get the row count by adding all task file recordCount. diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/dlf/DLFCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/dlf/DLFCatalog.java index e9c406715c1..ddbf7ebeac5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/dlf/DLFCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/dlf/DLFCatalog.java @@ -19,37 +19,36 @@ package org.apache.doris.datasource.iceberg.dlf; import org.apache.doris.common.credentials.CloudCredential; import org.apache.doris.common.util.S3Util; -import org.apache.doris.datasource.iceberg.HiveCompatibleCatalog; import org.apache.doris.datasource.iceberg.dlf.client.DLFCachedClientPool; +import org.apache.doris.datasource.iceberg.hive.HiveCompatibleCatalog; import org.apache.doris.datasource.property.PropertyConverter; import org.apache.doris.datasource.property.constants.OssProperties; import org.apache.doris.datasource.property.constants.S3Properties; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.aliyun.oss.Constants; -import org.apache.iceberg.TableOperations; import org.apache.iceberg.aws.s3.S3FileIO; +import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.NamespaceNotEmptyException; +import org.apache.iceberg.exceptions.NoSuchNamespaceException; +import org.apache.iceberg.exceptions.NoSuchTableException; import org.apache.iceberg.io.FileIO; import java.net.URI; +import java.util.ArrayList; +import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; public class DLFCatalog extends HiveCompatibleCatalog { @Override public void initialize(String name, Map<String, String> properties) { - super.initialize(name, initializeFileIO(properties, conf), new DLFCachedClientPool(this.conf, properties)); + super.initialize(name, initializeFileIO(properties), new DLFCachedClientPool(this.conf, properties)); } - @Override - protected TableOperations newTableOps(TableIdentifier tableIdentifier) { - String dbName = tableIdentifier.namespace().level(0); - String tableName = tableIdentifier.name(); - return new DLFTableOperations(this.conf, this.clients, this.fileIO, this.uid, dbName, tableName); - } - - protected FileIO initializeFileIO(Map<String, String> properties, Configuration hadoopConf) { + protected FileIO initializeFileIO(Map<String, String> properties) { // read from converted properties or default by old s3 aws properties String endpoint = properties.getOrDefault(Constants.ENDPOINT_KEY, properties.get(S3Properties.Env.ENDPOINT)); CloudCredential credential = new CloudCredential(); @@ -72,4 +71,99 @@ public class DLFCatalog extends HiveCompatibleCatalog { io.initialize(properties); return io; } + + @Override + protected String defaultWarehouseLocation(TableIdentifier tableIdentifier) { + return null; + } + + protected boolean isValidNamespace(Namespace namespace) { + return namespace.levels().length != 1; + } + + @Override + public List<TableIdentifier> listTables(Namespace namespace) { + if (isValidNamespace(namespace)) { + throw new NoSuchTableException("Invalid namespace: %s", namespace); + } + String dbName = namespace.level(0); + try { + return clients.run(client -> client.getAllTables(dbName)) + .stream() + .map(tbl -> TableIdentifier.of(dbName, tbl)) + .collect(Collectors.toList()); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public boolean dropTable(TableIdentifier tableIdentifier, boolean purge) { + throw new UnsupportedOperationException( + "Cannot drop table " + tableIdentifier + " : dropTable is not supported"); + } + + @Override + public void renameTable(TableIdentifier sourceTbl, TableIdentifier targetTbl) { + throw new UnsupportedOperationException( + "Cannot rename table " + sourceTbl + " : renameTable is not supported"); + } + + @Override + public void createNamespace(Namespace namespace, Map<String, String> props) { + throw new UnsupportedOperationException( + "Cannot create namespace " + namespace + " : createNamespace is not supported"); + } + + @Override + public List<Namespace> listNamespaces(Namespace namespace) throws NoSuchNamespaceException { + if (isValidNamespace(namespace) && !namespace.isEmpty()) { + throw new NoSuchNamespaceException("Namespace does not exist: %s", namespace); + } + if (!namespace.isEmpty()) { + return new ArrayList<>(); + } + List<Namespace> namespaces = new ArrayList<>(); + List<String> databases; + try { + databases = clients.run(client -> client.getAllDatabases()); + for (String database : databases) { + namespaces.add(Namespace.of(database)); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + return namespaces; + } + + @Override + public Map<String, String> loadNamespaceMetadata(Namespace namespace) throws NoSuchNamespaceException { + if (isValidNamespace(namespace)) { + throw new NoSuchTableException("Invalid namespace: %s", namespace); + } + String dbName = namespace.level(0); + try { + return clients.run(client -> client.getDatabase(dbName)).getParameters(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public boolean dropNamespace(Namespace namespace) throws NamespaceNotEmptyException { + throw new UnsupportedOperationException( + "Cannot drop namespace " + namespace + " : dropNamespace is not supported"); + } + + @Override + public boolean setProperties(Namespace namespace, Map<String, String> props) throws NoSuchNamespaceException { + throw new UnsupportedOperationException( + "Cannot set namespace properties " + namespace + " : setProperties is not supported"); + } + + @Override + public boolean removeProperties(Namespace namespace, Set<String> pNames) throws NoSuchNamespaceException { + throw new UnsupportedOperationException( + "Cannot remove properties " + namespace + " : removeProperties is not supported"); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hadoop/IcebergHadoopCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hadoop/IcebergHadoopCatalog.java new file mode 100644 index 00000000000..13ec717d7d8 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hadoop/IcebergHadoopCatalog.java @@ -0,0 +1,303 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.iceberg.hadoop; + +import org.apache.doris.fs.remote.dfs.DFSFileSystem; + +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.LockManager; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableOperations; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.AlreadyExistsException; +import org.apache.iceberg.exceptions.NamespaceNotEmptyException; +import org.apache.iceberg.exceptions.NoSuchNamespaceException; +import org.apache.iceberg.exceptions.NoSuchTableException; +import org.apache.iceberg.exceptions.RuntimeIOException; +import org.apache.iceberg.hadoop.HadoopCatalog; +import org.apache.iceberg.io.CloseableGroup; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.util.LocationUtil; +import org.apache.iceberg.util.LockManagers; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class IcebergHadoopCatalog extends HadoopCatalog { + private static final Logger LOG = LoggerFactory.getLogger(IcebergHadoopCatalog.class); + private String warehouseLocation; + private DFSFileSystem fs; + private CloseableGroup closeableGroup; + private LockManager lockManager; + protected FileIO fileIO; + protected String uid; + private static final Joiner SLASH = Joiner.on("/"); + + public void initialize(String name, Map<String, String> properties) { + super.initialize(name, properties); + String inputWarehouseLocation = properties.get("warehouse"); + Preconditions.checkArgument(inputWarehouseLocation != null && inputWarehouseLocation.length() > 0, + "Cannot initialize HadoopCatalog because warehousePath must not be null or empty"); + this.warehouseLocation = LocationUtil.stripTrailingSlash(inputWarehouseLocation); + this.fs = new DFSFileSystem(properties); + this.fileIO = initializeFileIO(properties, getConf()); + this.lockManager = LockManagers.from(properties); + this.closeableGroup = new CloseableGroup(); + this.closeableGroup.addCloseable(this.lockManager); + this.closeableGroup.setSuppressCloseFailure(true); + } + + protected FileIO initializeFileIO(Map<String, String> properties, Configuration hadoopConf) { + String fileIOImpl = properties.get(CatalogProperties.FILE_IO_IMPL); + if (fileIOImpl == null) { + /* when use the S3FileIO, we need some custom configurations, + * so HadoopFileIO is used in the superclass by default + * we can add better implementations to derived class just like the implementation in DLFCatalog. + */ + FileIO io; + try { + io = new IcebergHadoopFileIO(hadoopConf, this.fs.rawFileSystem()); + } catch (IOException e) { + throw new RuntimeException(e); + } + io.initialize(properties); + return io; + } else { + return CatalogUtil.loadFileIO(fileIOImpl, properties, hadoopConf); + } + } + + @Override + protected TableOperations newTableOps(TableIdentifier identifier) { + return new IcebergHadoopTableOperations(new Path(this.defaultWarehouseLocation(identifier)), + this.fileIO, getConf(), this.lockManager, this.fs); + } + + @Override + protected String defaultWarehouseLocation(TableIdentifier tableIdentifier) { + String tableName = tableIdentifier.name(); + StringBuilder sb = new StringBuilder(); + sb.append(this.warehouseLocation).append('/'); + String[] levels = tableIdentifier.namespace().levels(); + for (String level : levels) { + sb.append(level).append('/'); + } + sb.append(tableName); + return sb.toString(); + } + + @Override + public void createNamespace(Namespace namespace, Map<String, String> meta) { + Preconditions.checkArgument(!namespace.isEmpty(), + "Cannot create namespace with invalid name: %s", namespace); + if (!meta.isEmpty()) { + throw new UnsupportedOperationException("Cannot create namespace " + namespace + + ": metadata is not supported"); + } else { + Path nsPath = new Path(this.warehouseLocation, SLASH.join(namespace.levels())); + if (this.isNamespace(nsPath)) { + throw new AlreadyExistsException("Namespace already exists: %s", namespace); + } else { + try { + this.fs.mkdirs(nsPath); + } catch (IOException e) { + throw new RuntimeIOException(e, "Create namespace failed: %s", namespace); + } + } + } + } + + @Override + public List<Namespace> listNamespaces(Namespace namespace) { + Path nsPath = namespace.isEmpty() + ? new Path(warehouseLocation) + : new Path(warehouseLocation, SLASH.join(namespace.levels())); + if (!isNamespace(nsPath)) { + throw new NoSuchNamespaceException("Namespace does not exist: %s", namespace); + } + + try { + // using the iterator listing allows for paged downloads + // from HDFS and prefetching from object storage. + List<Namespace> namespaces = new ArrayList<>(); + RemoteIterator<FileStatus> it = fs.listStatusIterator(nsPath); + while (it.hasNext()) { + Path path = it.next().getPath(); + if (isNamespace(path)) { + namespaces.add(append(namespace, path.getName())); + } + } + return namespaces; + } catch (IOException ioe) { + throw new RuntimeIOException(ioe, "Failed to list namespace under: %s", namespace); + } + } + + @Override + public Map<String, String> loadNamespaceMetadata(Namespace namespace) throws NoSuchNamespaceException { + Path nsPath = new Path(this.warehouseLocation, SLASH.join(namespace.levels())); + if (this.isNamespace(nsPath) && !namespace.isEmpty()) { + return ImmutableMap.of("location", nsPath.toString()); + } else { + throw new NoSuchNamespaceException("Namespace does not exist: %s", namespace); + } + } + + @Override + public boolean dropNamespace(Namespace namespace) throws NamespaceNotEmptyException { + Path nsPath = new Path(this.warehouseLocation, SLASH.join(namespace.levels())); + if (this.isNamespace(nsPath) && !namespace.isEmpty()) { + try { + if (this.fs.listStatusIterator(nsPath).hasNext()) { + throw new NamespaceNotEmptyException("Namespace %s is not empty.", new Object[]{namespace}); + } else { + return this.fs.delete(nsPath, false); + } + } catch (IOException e) { + throw new RuntimeIOException(e, "Namespace delete failed: %s", new Object[]{namespace}); + } + } else { + return false; + } + } + + public boolean dropTable(TableIdentifier identifier, boolean purge) { + if (!this.isValidIdentifier(identifier)) { + throw new NoSuchTableException("Invalid identifier: %s", identifier); + } else { + Path tablePath = new Path(this.defaultWarehouseLocation(identifier)); + TableOperations ops = this.newTableOps(identifier); + TableMetadata lastMetadata = ops.current(); + try { + if (lastMetadata == null) { + LOG.debug("Not an iceberg table: {}", identifier); + return false; + } else { + if (purge) { + CatalogUtil.dropTableData(ops.io(), lastMetadata); + } + return this.fs.delete(tablePath, true); + } + } catch (IOException e) { + throw new RuntimeIOException(e, "Failed to delete file: %s", tablePath); + } + } + } + + private Namespace append(Namespace ns, String name) { + String[] levels = Arrays.copyOfRange(ns.levels(), 0, ns.levels().length + 1); + levels[ns.levels().length] = name; + return Namespace.of(levels); + } + + private boolean isNamespace(Path path) { + return isDirectory(path) && !isTableDir(path); + } + + @Override + public List<TableIdentifier> listTables(Namespace namespace) { + Preconditions.checkArgument(namespace.levels().length >= 1, + "Missing database in table identifier: %s", namespace); + Path nsPath = new Path(this.warehouseLocation, Joiner.on("/").join(namespace.levels())); + Set<TableIdentifier> tblIdents = Sets.newHashSet(); + + try { + if (!isDirectory(nsPath)) { + throw new NoSuchNamespaceException("Namespace does not exist: %s", namespace); + } + RemoteIterator<FileStatus> it = this.fs.listStatusIterator(nsPath); + while (it.hasNext()) { + FileStatus status = it.next(); + if (status.isDirectory()) { + Path path = status.getPath(); + if (isTableDir(path)) { + TableIdentifier tblIdent = TableIdentifier.of(namespace, path.getName()); + tblIdents.add(tblIdent); + } + } + } + } catch (IOException e) { + throw new RuntimeIOException(e, "Failed to list tables under: %s", namespace); + } + + return Lists.newArrayList(tblIdents); + } + + private boolean isTableDir(Path path) { + Path metadataPath = new Path(path, "metadata"); + PathFilter tableFilter = (filterPath) -> filterPath.getName().endsWith(".metadata.json"); + try { + return fs.listStatus(metadataPath, tableFilter).length >= 1; + } catch (FileNotFoundException f) { + return false; + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + private boolean isDirectory(Path path) { + try { + return fs.getFileStatus(path).isDirectory(); + } catch (FileNotFoundException e) { + return false; + } catch (IOException e) { + LOG.warn("Unable to list directory {}", path, e); + throw new UncheckedIOException(e); + } + } + + @Override + public void close() throws IOException { + this.closeableGroup.close(); + } + + @Override + public void setConf(Configuration configuration) { + super.setConf(configuration); + } + + @Override + public Configuration getConf() { + Configuration conf = super.getConf(); + if (conf == null) { + return new HdfsConfiguration(); + } + return conf; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hadoop/IcebergHadoopFileIO.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hadoop/IcebergHadoopFileIO.java new file mode 100644 index 00000000000..97912290d2f --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hadoop/IcebergHadoopFileIO.java @@ -0,0 +1,71 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.iceberg.hadoop; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.iceberg.exceptions.RuntimeIOException; +import org.apache.iceberg.hadoop.HadoopFileIO; +import org.apache.iceberg.io.BulkDeletionFailureException; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFile; + +import java.io.IOException; + +public class IcebergHadoopFileIO extends HadoopFileIO { + + private FileSystem fs; + private Configuration hadoopConf; + + public IcebergHadoopFileIO(Configuration hadoopConf, FileSystem fs) { + this.hadoopConf = hadoopConf; + this.fs = fs; + } + + @Override + public InputFile newInputFile(String path) { + return new IcebergHadoopInputFile(this.fs, path, this.hadoopConf); + } + + @Override + public InputFile newInputFile(String path, long length) { + return new IcebergHadoopInputFile(this.fs, path, length, this.hadoopConf); + } + + @Override + public OutputFile newOutputFile(String path) { + return new IcebergHadoopOutputFile(this.fs, new Path(path), this.hadoopConf); + } + + @Override + public void deleteFile(String path) { + Path toDelete = new Path(path); + try { + fs.delete(toDelete, false); + } catch (IOException var5) { + IOException e = var5; + throw new RuntimeIOException(e, "Failed to delete file: %s", path); + } + } + + @Override + public void deleteFiles(Iterable<String> pathsToDelete) throws BulkDeletionFailureException { + // TODO + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hadoop/IcebergHadoopInputFile.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hadoop/IcebergHadoopInputFile.java new file mode 100644 index 00000000000..105d83d4084 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hadoop/IcebergHadoopInputFile.java @@ -0,0 +1,107 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.iceberg.hadoop; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.iceberg.exceptions.NotFoundException; +import org.apache.iceberg.exceptions.RuntimeIOException; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.SeekableInputStream; + +import java.io.FileNotFoundException; +import java.io.IOException; + +public class IcebergHadoopInputFile implements InputFile { + + private String location; + private final FileSystem fs; + private final Path path; + private final Configuration conf; + private FileStatus stat = null; + private Long length; + + public IcebergHadoopInputFile(FileSystem fs, String location, Configuration conf) { + this(fs, location, null, conf); + } + + public IcebergHadoopInputFile(FileSystem fs, Path path, Configuration conf) { + this.fs = fs; + this.path = path; + this.conf = conf; + } + + public IcebergHadoopInputFile(FileSystem fs, String location, Long length, Configuration conf) { + this.fs = fs; + this.location = location; + this.path = new Path(location); + this.conf = conf; + this.length = length; + } + + @Override + public long getLength() { + if (this.length == null) { + this.length = this.lazyStat().getLen(); + } + return this.length; + } + + public SeekableInputStream newStream() { + try { + return new IcebergSeekableInputStream(openFile()); + } catch (FileNotFoundException e) { + throw new NotFoundException(e, "Failed to open input stream for file: %s", this.path); + } catch (IOException ex) { + throw new RuntimeIOException(ex, "Failed to open input stream for file: %s", this.path); + } + } + + private FSDataInputStream openFile() throws IOException { + return fs.open(path); + } + + @Override + public String location() { + return location; + } + + public boolean exists() { + try { + return this.lazyStat() != null; + } catch (NotFoundException e) { + return false; + } + } + + private FileStatus lazyStat() { + if (this.stat == null) { + try { + this.stat = this.fs.getFileStatus(this.path); + } catch (FileNotFoundException e) { + throw new NotFoundException(e, "File does not exist: %s", this.path); + } catch (IOException e) { + throw new RuntimeIOException(e, "Failed to get status for file: %s", this.path); + } + } + return this.stat; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hadoop/IcebergHadoopOutputFile.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hadoop/IcebergHadoopOutputFile.java new file mode 100644 index 00000000000..ca99f2b4590 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hadoop/IcebergHadoopOutputFile.java @@ -0,0 +1,73 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.iceberg.hadoop; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileAlreadyExistsException; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.iceberg.exceptions.AlreadyExistsException; +import org.apache.iceberg.exceptions.RuntimeIOException; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.io.PositionOutputStream; + +import java.io.IOException; + +public class IcebergHadoopOutputFile implements OutputFile { + + private final FileSystem fs; + private final Path path; + private final Configuration conf; + + public IcebergHadoopOutputFile(FileSystem fs, Path path, Configuration hadoopConf) { + this.fs = fs; + this.path = path; + this.conf = hadoopConf; + } + + @Override + public PositionOutputStream create() { + try { + return new IcebergPositionOutputStream(this.fs.create(this.path, false)); + } catch (FileAlreadyExistsException e) { + throw new AlreadyExistsException(e, "Path already exists: %s", this.path); + } catch (IOException e) { + throw new RuntimeIOException(e, "Failed to create file: %s", this.path); + } + } + + @Override + public PositionOutputStream createOrOverwrite() { + try { + return new IcebergPositionOutputStream(this.fs.create(this.path, true)); + } catch (IOException e) { + throw new RuntimeIOException(e, "Failed to create file: %s", this.path); + } + } + + @Override + public String location() { + return this.path.toString(); + } + + @Override + public InputFile toInputFile() { + return new IcebergHadoopInputFile(this.fs, this.path, this.conf); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hadoop/IcebergHadoopTableOperations.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hadoop/IcebergHadoopTableOperations.java new file mode 100644 index 00000000000..187a1cdfcf4 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hadoop/IcebergHadoopTableOperations.java @@ -0,0 +1,48 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.iceberg.hadoop; + +import org.apache.doris.fs.remote.dfs.DFSFileSystem; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.iceberg.LockManager; +import org.apache.iceberg.hadoop.HadoopTableOperations; +import org.apache.iceberg.io.FileIO; + +import java.io.IOException; + +public class IcebergHadoopTableOperations extends HadoopTableOperations { + private final DFSFileSystem fileSystem; + + public IcebergHadoopTableOperations(Path location, FileIO fileIO, Configuration conf, + LockManager lockManager, DFSFileSystem fileSystem) { + super(location, fileIO, conf, lockManager); + this.fileSystem = fileSystem; + } + + @Override + protected FileSystem getFileSystem(Path path, Configuration hadoopConf) { + try { + return fileSystem.rawFileSystem(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hadoop/IcebergPositionOutputStream.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hadoop/IcebergPositionOutputStream.java new file mode 100644 index 00000000000..7e18200fa80 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hadoop/IcebergPositionOutputStream.java @@ -0,0 +1,62 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.iceberg.hadoop; + +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.iceberg.io.DelegatingOutputStream; +import org.apache.iceberg.io.PositionOutputStream; + +import java.io.IOException; +import java.io.OutputStream; + +public class IcebergPositionOutputStream extends PositionOutputStream implements DelegatingOutputStream { + + private final FSDataOutputStream stream; + + IcebergPositionOutputStream(FSDataOutputStream stream) { + this.stream = stream; + } + + public OutputStream getDelegate() { + return this.stream; + } + + public long getPos() throws IOException { + return this.stream.getPos(); + } + + public void write(int b) throws IOException { + this.stream.write(b); + } + + public void write(byte[] b) throws IOException { + this.stream.write(b); + } + + public void write(byte[] b, int off, int len) throws IOException { + this.stream.write(b, off, len); + } + + public void flush() throws IOException { + this.stream.flush(); + } + + public void close() throws IOException { + this.stream.close(); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hadoop/IcebergSeekableInputStream.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hadoop/IcebergSeekableInputStream.java new file mode 100644 index 00000000000..501524fb497 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hadoop/IcebergSeekableInputStream.java @@ -0,0 +1,62 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.iceberg.hadoop; + +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.iceberg.io.SeekableInputStream; + +import java.io.IOException; + +public class IcebergSeekableInputStream extends SeekableInputStream { + private final FSDataInputStream stream; + + public IcebergSeekableInputStream(FSDataInputStream stream) { + this.stream = stream; + } + + @Override + public long getPos() throws IOException { + return stream.getPos(); + } + + @Override + public void seek(long position) throws IOException { + stream.seek(position); + } + + @Override + public int read() throws IOException { + return stream.read(); + } + + @Override + public int read(byte[] b) throws IOException { + return stream.read(b); + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + return stream.read(b, off, len); + } + + @Override + public void close() throws IOException { + stream.close(); + } + +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hive/HCachedClientPool.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hive/HCachedClientPool.java new file mode 100644 index 00000000000..ff8313cab0f --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hive/HCachedClientPool.java @@ -0,0 +1,83 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.iceberg.hive; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.ClientPool; +import org.apache.iceberg.hive.HiveClientPool; +import org.apache.iceberg.util.PropertyUtil; +import shade.doris.hive.org.apache.thrift.TException; + +import java.util.Map; +import java.util.concurrent.TimeUnit; + +public class HCachedClientPool implements ClientPool<IMetaStoreClient, TException> { + + private static volatile Cache<String, HiveClientPool> clientPoolCache; + private static final Object clientPoolCacheLock = new Object(); + private final String catalogName; + private final Configuration conf; + private final int clientPoolSize; + private final long evictionInterval; + + public HCachedClientPool(String catalogName, Configuration conf, Map<String, String> properties) { + this.catalogName = catalogName; + this.conf = conf; + this.clientPoolSize = + PropertyUtil.propertyAsInt( + properties, + CatalogProperties.CLIENT_POOL_SIZE, + CatalogProperties.CLIENT_POOL_SIZE_DEFAULT); + this.evictionInterval = + PropertyUtil.propertyAsLong( + properties, + CatalogProperties.CLIENT_POOL_CACHE_EVICTION_INTERVAL_MS, + CatalogProperties.CLIENT_POOL_CACHE_EVICTION_INTERVAL_MS_DEFAULT); + + if (clientPoolCache == null) { + synchronized (clientPoolCacheLock) { + if (clientPoolCache == null) { + clientPoolCache = + Caffeine.newBuilder() + .expireAfterAccess(evictionInterval, TimeUnit.MILLISECONDS) + .removalListener((key, value, cause) -> ((HiveClientPool) value).close()) + .build(); + } + } + } + } + + protected HiveClientPool clientPool() { + return clientPoolCache.get(this.catalogName, (k) -> new HiveClientPool(this.clientPoolSize, this.conf)); + } + + @Override + public <R> R run(Action<R, IMetaStoreClient, TException> action) throws TException, InterruptedException { + return clientPool().run(action); + } + + @Override + public <R> R run(Action<R, IMetaStoreClient, TException> action, boolean retry) + throws TException, InterruptedException { + return clientPool().run(action, retry); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hive/HiveCompatibleCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hive/HiveCompatibleCatalog.java new file mode 100644 index 00000000000..be3ad97859f --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hive/HiveCompatibleCatalog.java @@ -0,0 +1,101 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.iceberg.hive; + +import org.apache.doris.datasource.iceberg.hadoop.IcebergHadoopFileIO; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.ClientPool; +import org.apache.iceberg.TableOperations; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.hadoop.HadoopFileIO; +import org.apache.iceberg.hive.HiveCatalog; +import org.apache.iceberg.io.FileIO; +import shade.doris.hive.org.apache.thrift.TException; + +import java.io.IOException; +import java.util.Map; + +public abstract class HiveCompatibleCatalog extends HiveCatalog { + + protected Configuration conf; + protected ClientPool<IMetaStoreClient, TException> clients; + protected FileIO fileIO; + protected String uid; + + public void initialize(String name, FileIO fileIO, + ClientPool<IMetaStoreClient, TException> clients) { + this.uid = name; + this.fileIO = fileIO; + this.clients = clients; + } + + protected FileIO initializeFileIO(Map<String, String> properties) { + String fileIOImpl = properties.get(CatalogProperties.FILE_IO_IMPL); + if (fileIOImpl == null) { + /* when use the S3FileIO, we need some custom configurations, + * so HadoopFileIO is used in the superclass by default + * we can add better implementations to derived class just like the implementation in DLFCatalog. + */ + FileIO io; + try { + FileSystem fs = getFileSystem(); + if (fs == null) { + io = new HadoopFileIO(getConf()); + } else { + io = new IcebergHadoopFileIO(getConf(), getFileSystem()); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + io.initialize(properties); + return io; + } else { + return CatalogUtil.loadFileIO(fileIOImpl, properties, getConf()); + } + } + + public FileSystem getFileSystem() throws IOException { + return null; + } + + @Override + public TableOperations newTableOps(TableIdentifier tableIdentifier) { + String dbName = tableIdentifier.namespace().level(0); + String tableName = tableIdentifier.name(); + return new IcebergHiveTableOperations(this.conf, this.clients, this.fileIO, this.uid, dbName, tableName); + } + + @Override + public void setConf(Configuration conf) { + this.conf = conf; + } + + @Override + public Configuration getConf() { + if (conf == null) { + return new HdfsConfiguration(); + } + return conf; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hive/IcebergHiveCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hive/IcebergHiveCatalog.java new file mode 100644 index 00000000000..6aaa3965aaf --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hive/IcebergHiveCatalog.java @@ -0,0 +1,130 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.iceberg.hive; + +import org.apache.doris.datasource.iceberg.hadoop.IcebergHadoopFileIO; +import org.apache.doris.fs.remote.dfs.DFSFileSystem; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.ClientPool; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.NamespaceNotEmptyException; +import org.apache.iceberg.exceptions.NoSuchNamespaceException; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.util.LocationUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import shade.doris.hive.org.apache.thrift.TException; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +public class IcebergHiveCatalog extends HiveCompatibleCatalog { + private static final Logger LOG = LoggerFactory.getLogger(IcebergHiveCatalog.class); + private ClientPool<IMetaStoreClient, TException> clients; + private boolean listAllTables = false; + + public void initialize(String name, Map<String, String> properties) { + if (this.conf == null) { + LOG.warn("No Hadoop Configuration was set, using the default environment Configuration"); + this.conf = getConf(); + } + if (properties.containsKey("uri")) { + this.conf.set(HiveConf.ConfVars.METASTOREURIS.varname, properties.get("uri")); + } + if (properties.containsKey("warehouse")) { + this.conf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, + LocationUtil.stripTrailingSlash(properties.get("warehouse"))); + } + + this.listAllTables = Boolean.parseBoolean(properties.getOrDefault("list-all-tables", "true")); + String fileIOImpl = properties.get("io-impl"); + org.apache.hadoop.fs.FileSystem fs; + try { + fs = new DFSFileSystem(properties).rawFileSystem(); + } catch (IOException e) { + throw new RuntimeException(e); + } + FileIO fileIO = fileIOImpl == null ? new IcebergHadoopFileIO(this.conf, fs) + : CatalogUtil.loadFileIO(fileIOImpl, properties, this.conf); + this.clients = new HCachedClientPool(name, this.conf, properties); + super.initialize(name, fileIO, clients); + } + + @Override + protected String defaultWarehouseLocation(TableIdentifier tableIdentifier) { + try { + Database databaseData = this.clients.run((client) -> + client.getDatabase(tableIdentifier.namespace().levels()[0])); + if (databaseData.getLocationUri() != null) { + return String.format("%s/%s", databaseData.getLocationUri(), tableIdentifier.name()); + } + } catch (TException e) { + throw new RuntimeException(String.format("Metastore operation failed for %s", tableIdentifier), e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted during commit", e); + } + + String databaseLocation = this.databaseLocation(tableIdentifier.namespace().levels()[0]); + return String.format("%s/%s", databaseLocation, tableIdentifier.name()); + } + + private String databaseLocation(String databaseName) { + String warehouseLocation = this.conf.get(HiveConf.ConfVars.METASTOREWAREHOUSE.varname); + Preconditions.checkNotNull(warehouseLocation, + "Warehouse location is not set: hive.metastore.warehouse.dir=null"); + warehouseLocation = LocationUtil.stripTrailingSlash(warehouseLocation); + return String.format("%s/%s.db", warehouseLocation, databaseName); + } + + @Override + public void createNamespace(Namespace namespace, Map<String, String> meta) { + super.createNamespace(namespace, meta); + } + + @Override + public List<Namespace> listNamespaces(Namespace namespace) { + return super.listNamespaces(namespace); + } + + @Override + public Map<String, String> loadNamespaceMetadata(Namespace namespace) throws NoSuchNamespaceException { + return super.loadNamespaceMetadata(namespace); + } + + @Override + public boolean dropNamespace(Namespace namespace) throws NamespaceNotEmptyException { + return super.dropNamespace(namespace); + } + + public boolean dropTable(TableIdentifier identifier, boolean purge) { + return super.dropTable(identifier, purge); + } + + @Override + public List<TableIdentifier> listTables(Namespace namespace) { + return super.listTables(namespace); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/dlf/DLFTableOperations.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hive/IcebergHiveTableOperations.java similarity index 70% rename from fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/dlf/DLFTableOperations.java rename to fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hive/IcebergHiveTableOperations.java index 2aab8e754ca..ea444e7870b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/dlf/DLFTableOperations.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hive/IcebergHiveTableOperations.java @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.datasource.iceberg.dlf; +package org.apache.doris.datasource.iceberg.hive; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.metastore.IMetaStoreClient; @@ -24,14 +24,14 @@ import org.apache.iceberg.hive.HiveTableOperations; import org.apache.iceberg.io.FileIO; import shade.doris.hive.org.apache.thrift.TException; -public class DLFTableOperations extends HiveTableOperations { +public class IcebergHiveTableOperations extends HiveTableOperations { - public DLFTableOperations(Configuration conf, - ClientPool<IMetaStoreClient, TException> metaClients, - FileIO fileIO, - String catalogName, - String database, - String table) { + public IcebergHiveTableOperations(Configuration conf, + ClientPool<IMetaStoreClient, TException> metaClients, + FileIO fileIO, + String catalogName, + String database, + String table) { super(conf, metaClients, fileIO, catalogName, database, table); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java index 2ca51298fe6..b822304c153 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java @@ -31,7 +31,6 @@ import org.apache.doris.datasource.ExternalTable; import org.apache.doris.datasource.FileQueryScanNode; import org.apache.doris.datasource.TableFormatType; import org.apache.doris.datasource.hive.HMSExternalTable; -import org.apache.doris.datasource.hive.HiveMetaStoreClientHelper; import org.apache.doris.datasource.iceberg.IcebergExternalCatalog; import org.apache.doris.datasource.iceberg.IcebergExternalTable; import org.apache.doris.datasource.iceberg.IcebergUtils; @@ -172,7 +171,11 @@ public class IcebergScanNode extends FileQueryScanNode { @Override public List<Split> getSplits() throws UserException { - return HiveMetaStoreClientHelper.ugiDoAs(source.getCatalog().getConfiguration(), this::doGetSplits); + try { + return source.getCatalog().getAuthenticator().doAs(this::doGetSplits); + } catch (IOException e) { + throw new UserException(e); + } } private List<Split> doGetSplits() throws UserException { diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java index 59fbd73bda7..4018df68fa9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java @@ -37,6 +37,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.logging.log4j.LogManager; @@ -85,13 +86,7 @@ public class DFSFileSystem extends RemoteFileSystem { AuthenticationConfig authConfig = AuthenticationConfig.getKerberosConfig(conf); authenticator = HadoopAuthenticator.getHadoopAuthenticator(authConfig); try { - dfsFileSystem = authenticator.doAs(() -> { - try { - return FileSystem.get(new Path(remotePath).toUri(), conf); - } catch (IOException e) { - throw new RuntimeException(e); - } - }); + dfsFileSystem = authenticator.doAs(() -> FileSystem.get(new Path(remotePath).toUri(), conf)); } catch (Exception e) { throw new UserException(e); } @@ -394,7 +389,7 @@ public class DFSFileSystem extends RemoteFileSystem { if (!srcPathUri.getAuthority().trim().equals(destPathUri.getAuthority().trim())) { return new Status(Status.ErrCode.COMMON_ERROR, "only allow rename in same file system"); } - FileSystem fileSystem = nativeFileSystem(destPath); + FileSystem fileSystem = rawFileSystem(); Path srcfilePath = new Path(srcPathUri.getPath()); Path destfilePath = new Path(destPathUri.getPath()); boolean isRenameSuccess = authenticator.doAs(() -> fileSystem.rename(srcfilePath, destfilePath)); @@ -481,4 +476,42 @@ public class DFSFileSystem extends RemoteFileSystem { } return Status.OK; } + + public FileSystem rawFileSystem() throws IOException { + if (dfsFileSystem == null) { + synchronized (this) { + if (dfsFileSystem == null) { + Configuration conf = getHdfsConf(ifNotSetFallbackToSimpleAuth()); + for (Map.Entry<String, String> propEntry : properties.entrySet()) { + conf.set(propEntry.getKey(), propEntry.getValue()); + } + AuthenticationConfig authConfig = AuthenticationConfig.getKerberosConfig(conf); + authenticator = HadoopAuthenticator.getHadoopAuthenticator(authConfig); + dfsFileSystem = authenticator.doAs(() -> FileSystem.get(conf)); + operations = new HDFSFileOperations(dfsFileSystem); + } + } + } + return authenticator.doAs(() -> dfsFileSystem); + } + + public FileStatus[] listStatus(Path f, PathFilter filter) throws IOException { + return authenticator.doAs(() -> rawFileSystem().listStatus(f, filter)); + } + + public RemoteIterator<FileStatus> listStatusIterator(Path p) throws IOException { + return authenticator.doAs(() -> rawFileSystem().listStatusIterator(p)); + } + + public FileStatus getFileStatus(Path f) throws IOException { + return authenticator.doAs(() -> rawFileSystem().getFileStatus(f)); + } + + public boolean delete(Path p, boolean recursion) throws IOException { + return authenticator.doAs(() -> rawFileSystem().delete(p, recursion)); + } + + public boolean mkdirs(Path p) throws IOException { + return authenticator.doAs(() -> rawFileSystem().mkdirs(p)); + } } diff --git a/regression-test/data/external_table_p0/kerberos/test_two_iceberg_kerberos.out b/regression-test/data/external_table_p0/kerberos/test_two_iceberg_kerberos.out new file mode 100644 index 00000000000..fb4f79a0b73 --- /dev/null +++ b/regression-test/data/external_table_p0/kerberos/test_two_iceberg_kerberos.out @@ -0,0 +1,9 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !iceberg_q04 -- +1 2023-05-14 +2 2023-05-16 + +-- !iceberg_q06 -- +1 krb1 2023-05-14 +2 krb2 2023-05-16 +3 krb3 2023-05-17 diff --git a/regression-test/suites/external_table_p0/kerberos/test_two_iceberg_kerberos.groovy b/regression-test/suites/external_table_p0/kerberos/test_two_iceberg_kerberos.groovy new file mode 100644 index 00000000000..579b3fcca25 --- /dev/null +++ b/regression-test/suites/external_table_p0/kerberos/test_two_iceberg_kerberos.groovy @@ -0,0 +1,124 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.junit.Assert; + +suite("test_two_iceberg_kerberos", "p0,external,kerberos,external_docker,external_docker_kerberos") { + String enabled = context.config.otherConfigs.get("enableKerberosTest") + if (enabled != null && enabled.equalsIgnoreCase("true")) { + // test iceberg hadoop catalog with kerberos + sql """ + CREATE CATALOG IF NOT EXISTS test_krb_iceberg_ctl_hadoop + PROPERTIES ( + 'type'='iceberg', + 'iceberg.catalog.type'='hadoop', + "warehouse" = "hdfs://hadoop-master:9000/user/hive/warehouse", + "fs.defaultFS" = "hdfs://hadoop-master:9000", + "hadoop.security.authentication" = "kerberos", + "hadoop.kerberos.principal"="hive/[email protected]", + "hadoop.kerberos.keytab" = "/keytabs/hive-presto-master.keytab", + "doris.krb5.debug" = "true" + ); + """ + + sql """ SWITCH test_krb_iceberg_ctl_hadoop; """ + sql """ CREATE DATABASE IF NOT EXISTS hadoop_test_krb_iceberg_db; """ + sql """ USE hadoop_test_krb_iceberg_db; """ + sql """ CREATE TABLE IF NOT EXISTS hadoop_test_krb_iceberg_tbl (id int, str string, dd date) engine = iceberg; """ + sql """ INSERT INTO hadoop_test_krb_iceberg_tbl values(1, 'krb1', '2023-05-14') """ + sql """ INSERT INTO hadoop_test_krb_iceberg_tbl values(2, 'krb2', '2023-05-16') """ + sql """ INSERT INTO hadoop_test_krb_iceberg_tbl values(3, 'krb3', '2023-05-17') """ + + // order_qt_iceberg_q02 """ SELECT id,dd FROM test_krb_iceberg_ctl.test_krb_iceberg_db.test_krb_iceberg_tbl where dd >= '2023-05-16' """ + // order_qt_iceberg_q03 """ SELECT id,dd FROM other_test_krb_iceberg_ctl.other_test_krb_iceberg_db.other_test_krb_iceberg_tbl where dd <= '2023-05-16' """ + + // cross catalog query test + order_qt_iceberg_q04 """ SELECT id,dd FROM hadoop_test_krb_iceberg_tbl where dd <= '2023-05-16' """ + // order_qt_iceberg_q05 """ SELECT * FROM test_krb_iceberg_ctl.test_krb_iceberg_db.test_krb_iceberg_tbl """ + order_qt_iceberg_q06 """ SELECT * FROM test_krb_iceberg_ctl_hadoop.hadoop_test_krb_iceberg_db.hadoop_test_krb_iceberg_tbl """ + // order_qt_iceberg_q07 """ SELECT * FROM other_test_krb_iceberg_ctl.other_test_krb_iceberg_db.other_test_krb_iceberg_tbl """ + + // sql """ DROP TABLE IF EXISTS test_krb_iceberg_ctl.`test_krb_iceberg_db`.`test_krb_iceberg_tbl`; """ + // sql """ DROP TABLE IF EXISTS other_test_krb_iceberg_ctl.`other_test_krb_iceberg_db`.`other_test_krb_iceberg_tbl`; """ + sql """ DROP TABLE IF EXISTS test_krb_iceberg_ctl_hadoop.`hadoop_test_krb_iceberg_db`.`hadoop_test_krb_iceberg_tbl`; """ + + // sql """ DROP DATABASE IF EXISTS test_krb_iceberg_ctl.`test_krb_iceberg_db`; """ + // sql """ DROP DATABASE IF EXISTS other_test_krb_iceberg_ctl.`other_test_krb_iceberg_db`; """ + sql """ DROP DATABASE IF EXISTS test_krb_iceberg_ctl_hadoop.`hadoop_test_krb_iceberg_db`; """ + + // sql """ DROP CATALOG test_krb_iceberg_ctl """ + // sql """ DROP CATALOG other_test_krb_iceberg_ctl """ + sql """ DROP CATALOG test_krb_iceberg_ctl_hadoop """ + + // // test iceberg hms catalog with kerberos + // sql """ + // CREATE CATALOG IF NOT EXISTS test_krb_iceberg_ctl + // PROPERTIES ( + // 'type'='iceberg', + // 'iceberg.catalog.type'='hms', + // "hive.metastore.uris" = "thrift://172.31.71.25:9083", + // "fs.defaultFS" = "hdfs://hadoop-master:9000", + // "hadoop.security.authentication" = "kerberos", + // "hadoop.kerberos.min.seconds.before.relogin" = "5", + // "hadoop.kerberos.principal"="hive/[email protected]", + // "hadoop.kerberos.keytab" = "/keytabs/hive-presto-master.keytab", + // "hive.metastore.sasl.enabled" = "true", + // "hive.metastore.kerberos.principal" = "hive/[email protected]", + // "hive.metastore.warehouse.dir"="hdfs://hadoop-master:9000/user/hive/warehouse", + // "doris.krb5.debug" = "true" + // ); + // """ + // + // sql """ + // CREATE CATALOG IF NOT EXISTS other_test_krb_iceberg_ctl + // PROPERTIES ( + // 'type'='iceberg', + // 'iceberg.catalog.type'='hms', + // "hive.metastore.uris" = "thrift://172.31.71.26:9083", + // "fs.defaultFS" = "hdfs://hadoop-master-2:9000", + // "hive.metastore.warehouse.dir"="hdfs://hadoop-master-2:9000/user/hive/warehouse", + // "hadoop.security.authentication" = "kerberos", + // "hadoop.kerberos.min.seconds.before.relogin" = "5", + // "hadoop.kerberos.principal"="hive/[email protected]", + // "hadoop.kerberos.keytab" = "/keytabs/other-hive-presto-master.keytab", + // "hive.metastore.sasl.enabled" = "true", + // "hive.metastore.kerberos.principal" = "hive/[email protected]", + // "hadoop.security.auth_to_local" ="RULE:[2:\$1@\$0](.*@OTHERREALM.COM)s/@.*// + // RULE:[2:\$1@\$0](.*@OTHERLABS.TERADATA.COM)s/@.*// + // DEFAULT", + // "doris.krb5.debug" = "true" + // ); + // """ + // + // sql """ SWITCH test_krb_iceberg_ctl; """ + // sql """ CREATE DATABASE IF NOT EXISTS `test_krb_iceberg_db`; """ + // sql """ USE `test_krb_iceberg_db`; """ + // sql """ CREATE TABLE IF NOT EXISTS test_krb_iceberg_tbl (id int, str string, dd date) engine = iceberg; """ + // sql """ INSERT INTO test_krb_iceberg_tbl values(1, 'krb1', '2023-05-14') """ + // sql """ INSERT INTO test_krb_iceberg_tbl values(2, 'krb2', '2023-05-16') """ + // sql """ INSERT INTO test_krb_iceberg_tbl values(3, 'krb3', '2023-05-17') """ + // order_qt_iceberg_q01 """ SELECT * FROM test_krb_iceberg_tbl """ + // + // sql """ SWITCH other_test_krb_iceberg_ctl; """ + // sql """ CREATE DATABASE IF NOT EXISTS `other_test_krb_iceberg_db`; """ + // sql """ USE `other_test_krb_iceberg_db`; """ + // sql """ CREATE TABLE IF NOT EXISTS other_test_krb_iceberg_tbl (id int, str string, dd date) engine = iceberg; """ + // sql """ INSERT INTO other_test_krb_iceberg_tbl values(1, 'krb1', '2023-05-14') """ + // sql """ INSERT INTO other_test_krb_iceberg_tbl values(2, 'krb2', '2023-05-16') """ + // sql """ INSERT INTO other_test_krb_iceberg_tbl values(3, 'krb3', '2023-05-17') """ + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
