This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 40d9e19e1d [feature-wip](multi-catalog) support iceberg union catalog,
and add h… (#16082)
40d9e19e1d is described below
commit 40d9e19e1d1efe12a0853c99177d8e672c9993c4
Author: slothever <[email protected]>
AuthorDate: Wed Feb 1 22:59:42 2023 +0800
[feature-wip](multi-catalog) support iceberg union catalog, and add h…
(#16082)
support iceberg unified catalog framework, and add hms and rest catalog for
the framework
---
.../java/org/apache/doris/catalog/TableIf.java | 6 +-
.../catalog/external/IcebergExternalDatabase.java | 180 ++++++++++++++++++
.../catalog/external/IcebergExternalTable.java | 66 +++++++
.../apache/doris/datasource/CatalogFactory.java | 4 +
.../apache/doris/datasource/ExternalCatalog.java | 9 +
.../apache/doris/datasource/InitCatalogLog.java | 3 +-
.../DataLakeAWSCredentialsProvider.java | 59 ++++++
.../datasource/iceberg/IcebergExternalCatalog.java | 210 +++++++++++++++++++++
.../iceberg/IcebergExternalCatalogFactory.java | 43 +++++
.../iceberg/IcebergHMSExternalCatalog.java | 50 +++++
.../iceberg/IcebergRestExternalCatalog.java | 53 ++++++
.../apache/doris/planner/SingleNodePlanner.java | 1 +
.../planner/external/ExternalFileScanNode.java | 38 +++-
.../doris/planner/external/QueryScanProvider.java | 2 +
.../planner/external/iceberg/IcebergApiSource.java | 119 ++++++++++++
.../{ => iceberg}/IcebergDeleteFileFilter.java | 11 +-
.../planner/external/iceberg/IcebergHMSSource.java | 85 +++++++++
.../{ => iceberg}/IcebergScanProvider.java | 101 +++++++---
.../planner/external/iceberg/IcebergSource.java | 44 +++++
.../external/{ => iceberg}/IcebergSplit.java | 3 +-
20 files changed, 1054 insertions(+), 33 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java
index df013b7cc7..7a71d4f4e7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java
@@ -134,7 +134,8 @@ public interface TableIf {
*/
enum TableType {
MYSQL, ODBC, OLAP, SCHEMA, INLINE_VIEW, VIEW, BROKER, ELASTICSEARCH,
HIVE, ICEBERG, HUDI, JDBC,
- TABLE_VALUED_FUNCTION, HMS_EXTERNAL_TABLE, ES_EXTERNAL_TABLE,
MATERIALIZED_VIEW, JDBC_EXTERNAL_TABLE;
+ TABLE_VALUED_FUNCTION, HMS_EXTERNAL_TABLE, ES_EXTERNAL_TABLE,
MATERIALIZED_VIEW, JDBC_EXTERNAL_TABLE,
+ ICEBERG_EXTERNAL_TABLE;
public String toEngineName() {
switch (this) {
@@ -167,6 +168,8 @@ public interface TableIf {
return "hms";
case ES_EXTERNAL_TABLE:
return "es";
+ case ICEBERG_EXTERNAL_TABLE:
+ return "iceberg";
default:
return null;
}
@@ -193,6 +196,7 @@ public interface TableIf {
case TABLE_VALUED_FUNCTION:
case HMS_EXTERNAL_TABLE:
case ES_EXTERNAL_TABLE:
+ case ICEBERG_EXTERNAL_TABLE:
return "EXTERNAL TABLE";
default:
return null;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/IcebergExternalDatabase.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/IcebergExternalDatabase.java
new file mode 100644
index 0000000000..9b110c9dcf
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/IcebergExternalDatabase.java
@@ -0,0 +1,180 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.catalog.external;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.datasource.ExternalCatalog;
+import org.apache.doris.datasource.InitDatabaseLog;
+import org.apache.doris.datasource.iceberg.IcebergExternalCatalog;
+import org.apache.doris.persist.gson.GsonPostProcessable;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.google.gson.annotations.SerializedName;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.io.IOException;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+
+public class IcebergExternalDatabase extends
ExternalDatabase<IcebergExternalTable> implements GsonPostProcessable {
+
+ private static final Logger LOG =
LogManager.getLogger(IcebergExternalDatabase.class);
+ // Cache of table name to table id.
+ private Map<String, Long> tableNameToId = Maps.newConcurrentMap();
+ @SerializedName(value = "idToTbl")
+ private Map<Long, IcebergExternalTable> idToTbl = Maps.newConcurrentMap();
+
+ public IcebergExternalDatabase(ExternalCatalog extCatalog, Long id, String
name) {
+ super(extCatalog, id, name);
+ }
+
+ public void replayInitDb(InitDatabaseLog log, ExternalCatalog catalog) {
+ Map<String, Long> tmpTableNameToId = Maps.newConcurrentMap();
+ Map<Long, IcebergExternalTable> tmpIdToTbl = Maps.newConcurrentMap();
+ for (int i = 0; i < log.getRefreshCount(); i++) {
+ IcebergExternalTable table =
getTableForReplay(log.getRefreshTableIds().get(i));
+ tmpTableNameToId.put(table.getName(), table.getId());
+ tmpIdToTbl.put(table.getId(), table);
+ }
+ for (int i = 0; i < log.getCreateCount(); i++) {
+ IcebergExternalTable table = new
IcebergExternalTable(log.getCreateTableIds().get(i),
+ log.getCreateTableNames().get(i), name,
(IcebergExternalCatalog) catalog);
+ tmpTableNameToId.put(table.getName(), table.getId());
+ tmpIdToTbl.put(table.getId(), table);
+ }
+ tableNameToId = tmpTableNameToId;
+ idToTbl = tmpIdToTbl;
+ initialized = true;
+ }
+
+ public void setTableExtCatalog(ExternalCatalog extCatalog) {
+ for (IcebergExternalTable table : idToTbl.values()) {
+ table.setCatalog(extCatalog);
+ }
+ }
+
+ @Override
+ protected void init() {
+ InitDatabaseLog initDatabaseLog = new InitDatabaseLog();
+ initDatabaseLog.setType(InitDatabaseLog.Type.HMS);
+ initDatabaseLog.setCatalogId(extCatalog.getId());
+ initDatabaseLog.setDbId(id);
+ List<String> tableNames = extCatalog.listTableNames(null, name);
+ if (tableNames != null) {
+ Map<String, Long> tmpTableNameToId = Maps.newConcurrentMap();
+ Map<Long, IcebergExternalTable> tmpIdToTbl = Maps.newHashMap();
+ for (String tableName : tableNames) {
+ long tblId;
+ if (tableNameToId != null &&
tableNameToId.containsKey(tableName)) {
+ tblId = tableNameToId.get(tableName);
+ tmpTableNameToId.put(tableName, tblId);
+ IcebergExternalTable table = idToTbl.get(tblId);
+ tmpIdToTbl.put(tblId, table);
+ initDatabaseLog.addRefreshTable(tblId);
+ } else {
+ tblId = Env.getCurrentEnv().getNextId();
+ tmpTableNameToId.put(tableName, tblId);
+ IcebergExternalTable table = new
IcebergExternalTable(tblId, tableName, name,
+ (IcebergExternalCatalog) extCatalog);
+ tmpIdToTbl.put(tblId, table);
+ initDatabaseLog.addCreateTable(tblId, tableName);
+ }
+ }
+ tableNameToId = tmpTableNameToId;
+ idToTbl = tmpIdToTbl;
+ }
+ initialized = true;
+ Env.getCurrentEnv().getEditLog().logInitExternalDb(initDatabaseLog);
+ }
+
+ @Override
+ public List<IcebergExternalTable> getTables() {
+ makeSureInitialized();
+ return Lists.newArrayList(idToTbl.values());
+ }
+
+ @Override
+ public List<IcebergExternalTable> getTablesOnIdOrder() {
+ // Sort the name instead, because the id may change.
+ return
getTables().stream().sorted(Comparator.comparing(TableIf::getName)).collect(Collectors.toList());
+ }
+
+ @Override
+ public Set<String> getTableNamesWithLock() {
+ makeSureInitialized();
+ return Sets.newHashSet(tableNameToId.keySet());
+ }
+
+ @Override
+ public IcebergExternalTable getTableNullable(String tableName) {
+ makeSureInitialized();
+ if (!tableNameToId.containsKey(tableName)) {
+ return null;
+ }
+ return idToTbl.get(tableNameToId.get(tableName));
+ }
+
+ @Override
+ public IcebergExternalTable getTableNullable(long tableId) {
+ makeSureInitialized();
+ return idToTbl.get(tableId);
+ }
+
+ @Override
+ public IcebergExternalTable getTableForReplay(long tableId) {
+ return idToTbl.get(tableId);
+ }
+
+ @Override
+ public void gsonPostProcess() throws IOException {
+ tableNameToId = Maps.newConcurrentMap();
+ for (IcebergExternalTable tbl : idToTbl.values()) {
+ tableNameToId.put(tbl.getName(), tbl.getId());
+ }
+ rwLock = new ReentrantReadWriteLock(true);
+ }
+
+ @Override
+ public void dropTable(String tableName) {
+ LOG.debug("drop table [{}]", tableName);
+ makeSureInitialized();
+ Long tableId = tableNameToId.remove(tableName);
+ if (tableId == null) {
+ LOG.warn("drop table [{}] failed", tableName);
+ }
+ idToTbl.remove(tableId);
+ }
+
+ @Override
+ public void createTable(String tableName, long tableId) {
+ LOG.debug("create table [{}]", tableName);
+ makeSureInitialized();
+ tableNameToId.put(tableName, tableId);
+ IcebergExternalTable table = new IcebergExternalTable(tableId,
tableName, name,
+ (IcebergExternalCatalog) extCatalog);
+ idToTbl.put(tableId, table);
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/IcebergExternalTable.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/IcebergExternalTable.java
new file mode 100644
index 0000000000..389ba8dc55
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/IcebergExternalTable.java
@@ -0,0 +1,66 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.catalog.external;
+
+import org.apache.doris.catalog.Column;
+import org.apache.doris.datasource.iceberg.IcebergExternalCatalog;
+import org.apache.doris.thrift.THiveTable;
+import org.apache.doris.thrift.TIcebergTable;
+import org.apache.doris.thrift.TTableDescriptor;
+import org.apache.doris.thrift.TTableType;
+
+import java.util.HashMap;
+import java.util.List;
+
+public class IcebergExternalTable extends ExternalTable {
+
+ IcebergExternalCatalog icebergCatalog;
+
+ public IcebergExternalTable(long id, String name, String dbName,
IcebergExternalCatalog catalog) {
+ super(id, name, catalog, dbName, TableType.ICEBERG_EXTERNAL_TABLE);
+ icebergCatalog = catalog;
+ }
+
+ public String getIcebergCatalogType() {
+ return icebergCatalog.getIcebergCatalogType();
+ }
+
+ protected synchronized void makeSureInitialized() {
+ if (!objectCreated) {
+ objectCreated = true;
+ }
+ }
+
+ @Override
+ public TTableDescriptor toThrift() {
+ List<Column> schema = getFullSchema();
+ if (icebergCatalog.getIcebergCatalogType().equals("hms")) {
+ THiveTable tHiveTable = new THiveTable(dbName, name, new
HashMap<>());
+ TTableDescriptor tTableDescriptor = new TTableDescriptor(getId(),
TTableType.HIVE_TABLE, schema.size(), 0,
+ getName(), dbName);
+ tTableDescriptor.setHiveTable(tHiveTable);
+ return tTableDescriptor;
+ } else {
+ TIcebergTable icebergTable = new TIcebergTable(dbName, name, new
HashMap<>());
+ TTableDescriptor tTableDescriptor = new TTableDescriptor(getId(),
TTableType.ICEBERG_TABLE,
+ schema.size(), 0, getName(), dbName);
+ tTableDescriptor.setIcebergTable(icebergTable);
+ return tTableDescriptor;
+ }
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogFactory.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogFactory.java
index 87c1212c1c..3f9be036ec 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogFactory.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogFactory.java
@@ -26,6 +26,7 @@ import org.apache.doris.analysis.StatementBase;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.Resource;
import org.apache.doris.common.DdlException;
+import org.apache.doris.datasource.iceberg.IcebergExternalCatalogFactory;
import org.apache.parquet.Strings;
@@ -102,6 +103,9 @@ public class CatalogFactory {
case "jdbc":
catalog = new JdbcExternalCatalog(catalogId, name, resource,
props);
break;
+ case "iceberg":
+ catalog =
IcebergExternalCatalogFactory.createCatalog(catalogId, name, resource, props);
+ break;
default:
throw new DdlException("Unknown catalog type: " + catalogType);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
index 3a4a711e1c..200999c6db 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
@@ -22,6 +22,7 @@ import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.external.EsExternalDatabase;
import org.apache.doris.catalog.external.ExternalDatabase;
import org.apache.doris.catalog.external.HMSExternalDatabase;
+import org.apache.doris.catalog.external.IcebergExternalDatabase;
import org.apache.doris.catalog.external.JdbcExternalDatabase;
import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.common.io.Text;
@@ -280,6 +281,14 @@ public abstract class ExternalCatalog implements
CatalogIf<ExternalDatabase>, Wr
tmpIdToDb.put(db.getId(), db);
}
break;
+ case ICEBERG:
+ for (int i = 0; i < log.getCreateCount(); i++) {
+ IcebergExternalDatabase db = new IcebergExternalDatabase(
+ this, log.getCreateDbIds().get(i),
log.getCreateDbNames().get(i));
+ tmpDbNameToId.put(db.getFullName(), db.getId());
+ tmpIdToDb.put(db.getId(), db);
+ }
+ break;
default:
break;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/InitCatalogLog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/InitCatalogLog.java
index c7ed1696b2..524e2d9d3f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InitCatalogLog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InitCatalogLog.java
@@ -32,10 +32,11 @@ import java.util.List;
@Data
public class InitCatalogLog implements Writable {
- enum Type {
+ public enum Type {
HMS,
ES,
JDBC,
+ ICEBERG,
UNKNOWN;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/credentials/DataLakeAWSCredentialsProvider.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/credentials/DataLakeAWSCredentialsProvider.java
new file mode 100644
index 0000000000..9901b9c668
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/credentials/DataLakeAWSCredentialsProvider.java
@@ -0,0 +1,59 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.credentials;
+
+import com.amazonaws.SdkClientException;
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.auth.BasicSessionCredentials;
+import com.amazonaws.util.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.s3a.Constants;
+
+public class DataLakeAWSCredentialsProvider implements AWSCredentialsProvider
{
+
+ private final Configuration conf;
+
+ public DataLakeAWSCredentialsProvider(Configuration conf) {
+ this.conf = conf;
+ }
+
+ @Override
+ public AWSCredentials getCredentials() {
+ String accessKey = StringUtils.trim(conf.get(Constants.ACCESS_KEY));
+ String secretKey = StringUtils.trim(conf.get(Constants.SECRET_KEY));
+ String sessionToken =
StringUtils.trim(conf.get(Constants.SESSION_TOKEN));
+ if (!StringUtils.isNullOrEmpty(accessKey) &&
!StringUtils.isNullOrEmpty(secretKey)) {
+ return (StringUtils.isNullOrEmpty(sessionToken) ? new
BasicAWSCredentials(accessKey,
+ secretKey) : new BasicSessionCredentials(accessKey, secretKey,
sessionToken));
+ } else {
+ throw new SdkClientException(
+ "Unable to load AWS credentials from hive conf
(fs.s3a.access.key and fs.s3a.secret.key)");
+ }
+ }
+
+ @Override
+ public void refresh() {
+ }
+
+ @Override
+ public String toString() {
+ return getClass().getSimpleName();
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
new file mode 100644
index 0000000000..53e5b57459
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
@@ -0,0 +1,210 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.iceberg;
+
+import org.apache.doris.catalog.ArrayType;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.ScalarType;
+import org.apache.doris.catalog.Type;
+import org.apache.doris.catalog.external.ExternalDatabase;
+import org.apache.doris.catalog.external.IcebergExternalDatabase;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.FeNameFormat;
+import org.apache.doris.common.util.Util;
+import org.apache.doris.datasource.ExternalCatalog;
+import org.apache.doris.datasource.InitCatalogLog;
+import org.apache.doris.datasource.SessionContext;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.types.Types;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public abstract class IcebergExternalCatalog extends ExternalCatalog {
+
+ private static final Logger LOG =
LogManager.getLogger(IcebergExternalCatalog.class);
+ public static final String ICEBERG_CATALOG_TYPE = "iceberg.catalog.type";
+ public static final String ICEBERG_REST = "rest";
+ public static final String ICEBERG_HMS = "hms";
+ protected final String icebergCatalogType;
+ protected Catalog catalog;
+ protected SupportsNamespaces nsCatalog;
+
+ public IcebergExternalCatalog(long catalogId, String name, String type) {
+ super(catalogId, name);
+ this.icebergCatalogType = type;
+ }
+
+ @Override
+ protected void init() {
+ nsCatalog = (SupportsNamespaces) catalog;
+ Map<String, Long> tmpDbNameToId = Maps.newConcurrentMap();
+ Map<Long, ExternalDatabase> tmpIdToDb = Maps.newConcurrentMap();
+ InitCatalogLog initCatalogLog = new InitCatalogLog();
+ initCatalogLog.setCatalogId(id);
+ initCatalogLog.setType(InitCatalogLog.Type.ICEBERG);
+ List<String> allDatabaseNames = listDatabaseNames();
+ for (String dbName : allDatabaseNames) {
+ long dbId;
+ if (dbNameToId != null && dbNameToId.containsKey(dbName)) {
+ dbId = dbNameToId.get(dbName);
+ tmpDbNameToId.put(dbName, dbId);
+ ExternalDatabase db = idToDb.get(dbId);
+ db.setUnInitialized(invalidCacheInInit);
+ tmpIdToDb.put(dbId, db);
+ initCatalogLog.addRefreshDb(dbId);
+ } else {
+ dbId = Env.getCurrentEnv().getNextId();
+ tmpDbNameToId.put(dbName, dbId);
+ IcebergExternalDatabase db = new IcebergExternalDatabase(this,
dbId, dbName);
+ tmpIdToDb.put(dbId, db);
+ initCatalogLog.addCreateDb(dbId, dbName);
+ }
+ }
+ dbNameToId = tmpDbNameToId;
+ idToDb = tmpIdToDb;
+ Env.getCurrentEnv().getEditLog().logInitCatalog(initCatalogLog);
+ }
+
+ protected Configuration getConfiguration() {
+ Configuration conf = new HdfsConfiguration();
+ Map<String, String> catalogProperties =
catalogProperty.getHadoopProperties();
+ for (Map.Entry<String, String> entry : catalogProperties.entrySet()) {
+ conf.set(entry.getKey(), entry.getValue());
+ }
+ return conf;
+ }
+
+ protected Type icebergTypeToDorisType(org.apache.iceberg.types.Type type) {
+ if (type.isPrimitiveType()) {
+ return
icebergPrimitiveTypeToDorisType((org.apache.iceberg.types.Type.PrimitiveType)
type);
+ }
+ switch (type.typeId()) {
+ case LIST:
+ Types.ListType list = (Types.ListType) type;
+ return
ArrayType.create(icebergTypeToDorisType(list.elementType()), true);
+ case MAP:
+ case STRUCT:
+ return Type.UNSUPPORTED;
+ default:
+ throw new IllegalArgumentException("Cannot transform unknown
type: " + type);
+ }
+ }
+
+ private Type
icebergPrimitiveTypeToDorisType(org.apache.iceberg.types.Type.PrimitiveType
primitive) {
+ switch (primitive.typeId()) {
+ case BOOLEAN:
+ return Type.BOOLEAN;
+ case INTEGER:
+ return Type.INT;
+ case LONG:
+ return Type.BIGINT;
+ case FLOAT:
+ return Type.FLOAT;
+ case DOUBLE:
+ return Type.DOUBLE;
+ case STRING:
+ case BINARY:
+ case UUID:
+ return Type.STRING;
+ case FIXED:
+ Types.FixedType fixed = (Types.FixedType) primitive;
+ return ScalarType.createCharType(fixed.length());
+ case DECIMAL:
+ Types.DecimalType decimal = (Types.DecimalType) primitive;
+ return ScalarType.createDecimalType(decimal.precision(),
decimal.scale());
+ case DATE:
+ return ScalarType.createDateV2Type();
+ case TIMESTAMP:
+ return ScalarType.createDatetimeV2Type(0);
+ case TIME:
+ return Type.UNSUPPORTED;
+ default:
+ throw new IllegalArgumentException("Cannot transform unknown
type: " + primitive);
+ }
+ }
+
+ public String getIcebergCatalogType() {
+ return icebergCatalogType;
+ }
+
+ protected List<String> listDatabaseNames() {
+ return nsCatalog.listNamespaces().stream()
+ .map(e -> {
+ String dbName = e.toString();
+ try {
+ FeNameFormat.checkDbName(dbName);
+ } catch (AnalysisException ex) {
+ Util.logAndThrowRuntimeException(LOG,
+ String.format("Not a supported namespace name
format: %s", dbName), ex);
+ }
+ return dbName;
+ })
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public List<String> listDatabaseNames(SessionContext ctx) {
+ makeSureInitialized();
+ return new ArrayList<>(dbNameToId.keySet());
+ }
+
+ @Override
+ public List<Column> getSchema(String dbName, String tblName) {
+ makeSureInitialized();
+ List<Types.NestedField> columns = getIcebergTable(dbName,
tblName).schema().columns();
+ List<Column> tmpSchema =
Lists.newArrayListWithCapacity(columns.size());
+ for (Types.NestedField field : columns) {
+ tmpSchema.add(new Column(field.name(),
+ icebergTypeToDorisType(field.type()), true, null,
+ true, null, field.doc(), true, null, -1));
+ }
+ return tmpSchema;
+ }
+
+ @Override
+ public boolean tableExist(SessionContext ctx, String dbName, String
tblName) {
+ makeSureInitialized();
+ return catalog.tableExists(TableIdentifier.of(dbName, tblName));
+ }
+
+ @Override
+ public List<String> listTableNames(SessionContext ctx, String dbName) {
+ makeSureInitialized();
+ List<TableIdentifier> tableIdentifiers =
catalog.listTables(Namespace.of(dbName));
+ return
tableIdentifiers.stream().map(TableIdentifier::name).collect(Collectors.toList());
+ }
+
+ public org.apache.iceberg.Table getIcebergTable(String dbName, String
tblName) {
+ makeSureInitialized();
+ return catalog.loadTable(TableIdentifier.of(dbName, tblName));
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalogFactory.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalogFactory.java
new file mode 100644
index 0000000000..62ad0c8729
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalogFactory.java
@@ -0,0 +1,43 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.iceberg;
+
+import org.apache.doris.common.DdlException;
+import org.apache.doris.datasource.CatalogIf;
+
+import java.util.Map;
+
+public class IcebergExternalCatalogFactory {
+
+ public static CatalogIf createCatalog(long catalogId, String name, String
resource, Map<String, String> props)
+ throws DdlException {
+ String catalogType =
props.get(IcebergExternalCatalog.ICEBERG_CATALOG_TYPE);
+ if (catalogType == null) {
+ throw new DdlException("Missing " +
IcebergExternalCatalog.ICEBERG_CATALOG_TYPE + " property");
+ }
+ switch (catalogType) {
+ case IcebergExternalCatalog.ICEBERG_REST:
+ return new IcebergRestExternalCatalog(catalogId, name,
resource, catalogType, props);
+ case IcebergExternalCatalog.ICEBERG_HMS:
+ return new IcebergHMSExternalCatalog(catalogId, name,
resource, catalogType, props);
+ default:
+ throw new DdlException("Unknown " +
IcebergExternalCatalog.ICEBERG_CATALOG_TYPE
+ + " value: " + catalogType);
+ }
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergHMSExternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergHMSExternalCatalog.java
new file mode 100644
index 0000000000..f969ae7085
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergHMSExternalCatalog.java
@@ -0,0 +1,50 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.iceberg;
+
+import org.apache.doris.catalog.HMSResource;
+import org.apache.doris.datasource.CatalogProperty;
+
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.hive.HiveCatalog;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class IcebergHMSExternalCatalog extends IcebergExternalCatalog {
+
+ public IcebergHMSExternalCatalog(long catalogId, String name, String
resource, String catalogType,
+ Map<String, String> props) {
+ super(catalogId, name, catalogType);
+ catalogProperty = new CatalogProperty(resource, props);
+ }
+
+ @Override
+ protected void initLocalObjectsImpl() {
+ HiveCatalog hiveCatalog = new org.apache.iceberg.hive.HiveCatalog();
+ hiveCatalog.setConf(getConfiguration());
+ // initialize hive catalog
+ Map<String, String> catalogProperties = new HashMap<>();
+ String metastoreUris =
catalogProperty.getOrDefault(HMSResource.HIVE_METASTORE_URIS, "");
+
+ catalogProperties.put(HMSResource.HIVE_METASTORE_URIS, metastoreUris);
+ catalogProperties.put(CatalogProperties.URI, metastoreUris);
+ hiveCatalog.initialize(icebergCatalogType, catalogProperties);
+ catalog = hiveCatalog;
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergRestExternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergRestExternalCatalog.java
new file mode 100644
index 0000000000..f3f240b661
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergRestExternalCatalog.java
@@ -0,0 +1,53 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.iceberg;
+
+import org.apache.doris.datasource.CatalogProperty;
+import org.apache.doris.datasource.credentials.DataLakeAWSCredentialsProvider;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.s3a.Constants;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.rest.RESTCatalog;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class IcebergRestExternalCatalog extends IcebergExternalCatalog {
+
+ public IcebergRestExternalCatalog(long catalogId, String name, String
resource, String catalogType,
+ Map<String, String> props) {
+ super(catalogId, name, catalogType);
+ catalogProperty = new CatalogProperty(resource, props);
+ }
+
+ @Override
+ protected void initLocalObjectsImpl() {
+ Map<String, String> restProperties = new HashMap<>();
+ String restUri =
catalogProperty.getProperties().getOrDefault(CatalogProperties.URI, "");
+ restProperties.put(CatalogProperties.URI, restUri);
+ RESTCatalog restCatalog = new RESTCatalog();
+ String credentials = catalogProperty.getProperties()
+ .getOrDefault(Constants.AWS_CREDENTIALS_PROVIDER,
DataLakeAWSCredentialsProvider.class.getName());
+ Configuration conf = getConfiguration();
+ conf.set(Constants.AWS_CREDENTIALS_PROVIDER, credentials);
+ restCatalog.setConf(conf);
+ restCatalog.initialize(icebergCatalogType, restProperties);
+ catalog = restCatalog;
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
index 199e0c623a..ae286c7974 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
@@ -1940,6 +1940,7 @@ public class SingleNodePlanner {
scanNode = ((TableValuedFunctionRef)
tblRef).getScanNode(ctx.getNextNodeId());
break;
case HMS_EXTERNAL_TABLE:
+ case ICEBERG_EXTERNAL_TABLE:
scanNode = new ExternalFileScanNode(ctx.getNextNodeId(),
tblRef.getDesc());
break;
case ES_EXTERNAL_TABLE:
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java
index de007632c9..7f5cbddb87 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java
@@ -38,11 +38,17 @@ import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.external.HMSExternalTable;
+import org.apache.doris.catalog.external.IcebergExternalTable;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.UserException;
+import org.apache.doris.datasource.iceberg.IcebergExternalCatalog;
import org.apache.doris.load.BrokerFileGroup;
import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.planner.external.iceberg.IcebergApiSource;
+import org.apache.doris.planner.external.iceberg.IcebergHMSSource;
+import org.apache.doris.planner.external.iceberg.IcebergScanProvider;
+import org.apache.doris.planner.external.iceberg.IcebergSource;
import org.apache.doris.rewrite.ExprRewriter;
import org.apache.doris.statistics.StatisticalType;
import org.apache.doris.tablefunction.ExternalFileTableValuedFunction;
@@ -175,6 +181,9 @@ public class ExternalFileScanNode extends ExternalScanNode {
} else if (this.desc.getTable() instanceof FunctionGenTable) {
FunctionGenTable table = (FunctionGenTable)
this.desc.getTable();
initFunctionGenTable(table,
(ExternalFileTableValuedFunction) table.getTvf());
+ } else if (this.desc.getTable() instanceof
IcebergExternalTable) {
+ IcebergExternalTable table = (IcebergExternalTable)
this.desc.getTable();
+ initIcebergExternalTable(table);
}
break;
case LOAD:
@@ -211,6 +220,9 @@ public class ExternalFileScanNode extends ExternalScanNode {
} else if (this.desc.getTable() instanceof FunctionGenTable) {
FunctionGenTable table = (FunctionGenTable)
this.desc.getTable();
initFunctionGenTable(table,
(ExternalFileTableValuedFunction) table.getTvf());
+ } else if (this.desc.getTable() instanceof
IcebergExternalTable) {
+ IcebergExternalTable table = (IcebergExternalTable)
this.desc.getTable();
+ initIcebergExternalTable(table);
}
break;
default:
@@ -244,7 +256,8 @@ public class ExternalFileScanNode extends ExternalScanNode {
scanProvider = new HudiScanProvider(hmsTable, desc,
columnNameToRange);
break;
case ICEBERG:
- scanProvider = new IcebergScanProvider(hmsTable, analyzer,
desc, columnNameToRange);
+ IcebergSource hmsSource = new IcebergHMSSource(hmsTable, desc,
columnNameToRange);
+ scanProvider = new IcebergScanProvider(hmsSource, analyzer);
break;
case HIVE:
scanProvider = new HiveScanProvider(hmsTable, desc,
columnNameToRange);
@@ -255,6 +268,29 @@ public class ExternalFileScanNode extends ExternalScanNode
{
this.scanProviders.add(scanProvider);
}
+ private void initIcebergExternalTable(IcebergExternalTable icebergTable)
throws UserException {
+ Preconditions.checkNotNull(icebergTable);
+ if (icebergTable.isView()) {
+ throw new AnalysisException(
+ String.format("Querying external view '%s.%s' is not
supported", icebergTable.getDbName(),
+ icebergTable.getName()));
+ }
+
+ FileScanProviderIf scanProvider;
+ String catalogType = icebergTable.getIcebergCatalogType();
+ switch (catalogType) {
+ case IcebergExternalCatalog.ICEBERG_HMS:
+ case IcebergExternalCatalog.ICEBERG_REST:
+ IcebergSource icebergSource = new IcebergApiSource(
+ icebergTable, desc, columnNameToRange);
+ scanProvider = new IcebergScanProvider(icebergSource,
analyzer);
+ break;
+ default:
+ throw new UserException("Unknown iceberg catalog type: " +
catalogType);
+ }
+ this.scanProviders.add(scanProvider);
+ }
+
private void initFunctionGenTable(FunctionGenTable table,
ExternalFileTableValuedFunction tvf) {
Preconditions.checkNotNull(table);
FileScanProviderIf scanProvider = new TVFScanProvider(table, desc,
tvf);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/QueryScanProvider.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/QueryScanProvider.java
index b332819700..d894dd1fff 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/QueryScanProvider.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/QueryScanProvider.java
@@ -25,6 +25,8 @@ import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.BrokerUtil;
import
org.apache.doris.planner.external.ExternalFileScanNode.ParamCreateContext;
+import org.apache.doris.planner.external.iceberg.IcebergScanProvider;
+import org.apache.doris.planner.external.iceberg.IcebergSplit;
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.TExternalScanRange;
import org.apache.doris.thrift.TFileAttributes;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergApiSource.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergApiSource.java
new file mode 100644
index 0000000000..35b45282c5
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergApiSource.java
@@ -0,0 +1,119 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.planner.external.iceberg;
+
+import org.apache.doris.analysis.SlotDescriptor;
+import org.apache.doris.analysis.TupleDescriptor;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.catalog.external.IcebergExternalTable;
+import org.apache.doris.common.MetaNotFoundException;
+import org.apache.doris.common.UserException;
+import org.apache.doris.datasource.ExternalCatalog;
+import org.apache.doris.datasource.iceberg.IcebergExternalCatalog;
+import org.apache.doris.load.BrokerFileGroup;
+import org.apache.doris.planner.ColumnRange;
+import org.apache.doris.planner.external.ExternalFileScanNode;
+import org.apache.doris.thrift.TFileAttributes;
+import org.apache.doris.thrift.TFileScanRangeParams;
+import org.apache.doris.thrift.TFileScanSlotInfo;
+
+import org.apache.iceberg.PartitionField;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * Get metadata from iceberg api (all iceberg table like hive, rest, glue...)
+ */
+public class IcebergApiSource implements IcebergSource {
+
+ private final IcebergExternalTable icebergExtTable;
+ private final Table originTable;
+
+ private final TupleDescriptor desc;
+
+ public IcebergApiSource(IcebergExternalTable table, TupleDescriptor desc,
+ Map<String, ColumnRange> columnNameToRange) {
+ this.icebergExtTable = table;
+ this.originTable = ((IcebergExternalCatalog)
icebergExtTable.getCatalog())
+ .getIcebergTable(icebergExtTable.getDbName(),
icebergExtTable.getName());
+ this.desc = desc;
+ }
+
+ @Override
+ public TupleDescriptor getDesc() {
+ return desc;
+ }
+
+ @Override
+ public String getFileFormat() {
+ return originTable.properties()
+ .getOrDefault(TableProperties.DEFAULT_FILE_FORMAT,
TableProperties.DEFAULT_FILE_FORMAT_DEFAULT);
+ }
+
+ @Override
+ public Table getIcebergTable() throws MetaNotFoundException {
+ return originTable;
+ }
+
+ @Override
+ public TableIf getTargetTable() {
+ return icebergExtTable;
+ }
+
+ @Override
+ public ExternalFileScanNode.ParamCreateContext createContext() throws
UserException {
+ ExternalFileScanNode.ParamCreateContext context = new
ExternalFileScanNode.ParamCreateContext();
+ context.params = new TFileScanRangeParams();
+ context.destTupleDescriptor = desc;
+ context.params.setDestTupleId(desc.getId().asInt());
+ context.fileGroup = new BrokerFileGroup(icebergExtTable.getId(),
originTable.location(), getFileFormat());
+
+ // Hive table must extract partition value from path and hudi/iceberg
table keep
+ // partition field in file.
+ List<String> partitionKeys = originTable.spec().fields().stream()
+ .map(PartitionField::name).collect(Collectors.toList());
+ List<Column> columns = icebergExtTable.getBaseSchema(false);
+ context.params.setNumOfColumnsFromFile(columns.size() -
partitionKeys.size());
+ for (SlotDescriptor slot : desc.getSlots()) {
+ if (!slot.isMaterialized()) {
+ continue;
+ }
+ TFileScanSlotInfo slotInfo = new TFileScanSlotInfo();
+ slotInfo.setSlotId(slot.getId().asInt());
+
slotInfo.setIsFileSlot(!partitionKeys.contains(slot.getColumn().getName()));
+ context.params.addToRequiredSlots(slotInfo);
+ }
+ return context;
+ }
+
+ @Override
+ public TFileAttributes getFileAttributes() throws UserException {
+ return new TFileAttributes();
+ }
+
+ @Override
+ public ExternalCatalog getCatalog() {
+ return icebergExtTable.getCatalog();
+ }
+
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergDeleteFileFilter.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergDeleteFileFilter.java
similarity index 92%
rename from
fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergDeleteFileFilter.java
rename to
fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergDeleteFileFilter.java
index 9add306c46..e1e94a8329 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergDeleteFileFilter.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergDeleteFileFilter.java
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-package org.apache.doris.planner.external;
+package org.apache.doris.planner.external.iceberg;
import lombok.Data;
@@ -64,7 +64,6 @@ public class IcebergDeleteFileFilter {
}
}
- @Data
static class EqualityDelete extends IcebergDeleteFileFilter {
private List<Integer> fieldIds;
@@ -72,5 +71,13 @@ public class IcebergDeleteFileFilter {
super(deleteFilePath);
this.fieldIds = fieldIds;
}
+
+ public List<Integer> getFieldIds() {
+ return fieldIds;
+ }
+
+ public void setFieldIds(List<Integer> fieldIds) {
+ this.fieldIds = fieldIds;
+ }
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergHMSSource.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergHMSSource.java
new file mode 100644
index 0000000000..747d7fd6f6
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergHMSSource.java
@@ -0,0 +1,85 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.planner.external.iceberg;
+
+import org.apache.doris.analysis.TupleDescriptor;
+import org.apache.doris.catalog.HiveMetaStoreClientHelper;
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.catalog.external.HMSExternalTable;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.MetaNotFoundException;
+import org.apache.doris.common.UserException;
+import org.apache.doris.datasource.ExternalCatalog;
+import org.apache.doris.planner.ColumnRange;
+import org.apache.doris.planner.external.ExternalFileScanNode;
+import org.apache.doris.planner.external.HiveScanProvider;
+import org.apache.doris.thrift.TFileAttributes;
+
+import org.apache.iceberg.TableProperties;
+
+import java.util.Map;
+
+public class IcebergHMSSource implements IcebergSource {
+
+ private final HMSExternalTable hmsTable;
+ private final HiveScanProvider hiveScanProvider;
+
+ private final TupleDescriptor desc;
+
+ public IcebergHMSSource(HMSExternalTable hmsTable, TupleDescriptor desc,
+ Map<String, ColumnRange> columnNameToRange) {
+ this.hiveScanProvider = new HiveScanProvider(hmsTable, desc,
columnNameToRange);
+ this.hmsTable = hmsTable;
+ this.desc = desc;
+ }
+
+ @Override
+ public TupleDescriptor getDesc() {
+ return desc;
+ }
+
+ @Override
+ public String getFileFormat() throws DdlException, MetaNotFoundException {
+ return hiveScanProvider.getRemoteHiveTable().getParameters()
+ .getOrDefault(TableProperties.DEFAULT_FILE_FORMAT,
TableProperties.DEFAULT_FILE_FORMAT_DEFAULT);
+ }
+
+ public org.apache.iceberg.Table getIcebergTable() throws
MetaNotFoundException {
+ return HiveMetaStoreClientHelper.getIcebergTable(hmsTable);
+ }
+
+ @Override
+ public ExternalFileScanNode.ParamCreateContext createContext() throws
UserException {
+ return hiveScanProvider.createContext(null);
+ }
+
+ @Override
+ public TableIf getTargetTable() {
+ return hiveScanProvider.getTargetTable();
+ }
+
+ @Override
+ public TFileAttributes getFileAttributes() throws UserException {
+ return hiveScanProvider.getFileAttributes();
+ }
+
+ @Override
+ public ExternalCatalog getCatalog() {
+ return hmsTable.getCatalog();
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergScanProvider.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanProvider.java
similarity index 75%
rename from
fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergScanProvider.java
rename to
fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanProvider.java
index 24dc94fc0e..94e744725a 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergScanProvider.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanProvider.java
@@ -15,22 +15,25 @@
// specific language governing permissions and limitations
// under the License.
-package org.apache.doris.planner.external;
+package org.apache.doris.planner.external.iceberg;
import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.TableSnapshot;
-import org.apache.doris.analysis.TupleDescriptor;
-import org.apache.doris.catalog.HiveMetaStoreClientHelper;
-import org.apache.doris.catalog.external.HMSExternalTable;
+import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.DdlException;
+import org.apache.doris.common.FeConstants;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.external.iceberg.util.IcebergUtils;
-import org.apache.doris.planner.ColumnRange;
+import org.apache.doris.planner.external.ExternalFileScanNode;
+import org.apache.doris.planner.external.QueryScanProvider;
+import org.apache.doris.planner.external.TableFormatType;
+import org.apache.doris.thrift.TFileAttributes;
import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TFileRangeDesc;
+import org.apache.doris.thrift.TFileType;
import org.apache.doris.thrift.TIcebergDeleteFileDesc;
import org.apache.doris.thrift.TIcebergFileDesc;
import org.apache.doris.thrift.TTableFormatFileDesc;
@@ -43,7 +46,7 @@ import org.apache.iceberg.FileContent;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.HistoryEntry;
import org.apache.iceberg.MetadataColumns;
-import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.PartitionField;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.exceptions.NotFoundException;
import org.apache.iceberg.expressions.Expression;
@@ -52,23 +55,23 @@ import org.apache.iceberg.types.Conversions;
import java.nio.ByteBuffer;
import java.time.Instant;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
+import java.util.stream.Collectors;
/**
* A file scan provider for iceberg.
*/
-public class IcebergScanProvider extends HiveScanProvider {
+public class IcebergScanProvider extends QueryScanProvider {
private static final int MIN_DELETE_FILE_SUPPORT_VERSION = 2;
private final Analyzer analyzer;
+ private final IcebergSource icebergSource;
- public IcebergScanProvider(HMSExternalTable hmsTable, Analyzer analyzer,
TupleDescriptor desc,
- Map<String, ColumnRange> columnNameToRange) {
- super(hmsTable, desc, columnNameToRange);
+ public IcebergScanProvider(IcebergSource icebergSource, Analyzer analyzer)
{
+ this.icebergSource = icebergSource;
this.analyzer = analyzer;
}
@@ -111,34 +114,43 @@ public class IcebergScanProvider extends HiveScanProvider
{
}
@Override
- public TFileFormatType getFileFormatType() throws DdlException,
MetaNotFoundException {
- TFileFormatType type;
-
- String icebergFormat = getRemoteHiveTable().getParameters()
- .getOrDefault(TableProperties.DEFAULT_FILE_FORMAT,
TableProperties.DEFAULT_FILE_FORMAT_DEFAULT);
- if (icebergFormat.equalsIgnoreCase("parquet")) {
- type = TFileFormatType.FORMAT_PARQUET;
- } else if (icebergFormat.equalsIgnoreCase("orc")) {
- type = TFileFormatType.FORMAT_ORC;
- } else {
- throw new DdlException(String.format("Unsupported format name: %s
for iceberg table.", icebergFormat));
+ public TFileType getLocationType() throws DdlException,
MetaNotFoundException {
+ String location = icebergSource.getIcebergTable().location();
+ if (location != null && !location.isEmpty()) {
+ if (location.startsWith(FeConstants.FS_PREFIX_S3)
+ || location.startsWith(FeConstants.FS_PREFIX_S3A)
+ || location.startsWith(FeConstants.FS_PREFIX_S3N)
+ || location.startsWith(FeConstants.FS_PREFIX_BOS)
+ || location.startsWith(FeConstants.FS_PREFIX_COS)
+ || location.startsWith(FeConstants.FS_PREFIX_OSS)
+ || location.startsWith(FeConstants.FS_PREFIX_OBS)) {
+ return TFileType.FILE_S3;
+ } else if (location.startsWith(FeConstants.FS_PREFIX_HDFS)) {
+ return TFileType.FILE_HDFS;
+ } else if (location.startsWith(FeConstants.FS_PREFIX_FILE)) {
+ return TFileType.FILE_LOCAL;
+ } else if (location.startsWith(FeConstants.FS_PREFIX_OFS)) {
+ return TFileType.FILE_BROKER;
+ } else if (location.startsWith(FeConstants.FS_PREFIX_JFS)) {
+ return TFileType.FILE_BROKER;
+ }
}
- return type;
+ throw new DdlException("Unknown file location " + location
+ + " for hms table " + icebergSource.getIcebergTable().name());
}
@Override
public List<InputSplit> getSplits(List<Expr> exprs) throws UserException {
List<Expression> expressions = new ArrayList<>();
- org.apache.iceberg.Table table =
HiveMetaStoreClientHelper.getIcebergTable(hmsTable);
+ org.apache.iceberg.Table table = icebergSource.getIcebergTable();
for (Expr conjunct : exprs) {
Expression expression =
IcebergUtils.convertToIcebergExpr(conjunct, table.schema());
if (expression != null) {
expressions.add(expression);
}
}
-
TableScan scan = table.newScan();
- TableSnapshot tableSnapshot = desc.getRef().getTableSnapshot();
+ TableSnapshot tableSnapshot =
icebergSource.getDesc().getRef().getTableSnapshot();
if (tableSnapshot != null) {
TableSnapshot.VersionType type = tableSnapshot.getType();
try {
@@ -220,6 +232,41 @@ public class IcebergScanProvider extends HiveScanProvider {
@Override
public List<String> getPathPartitionKeys() throws DdlException,
MetaNotFoundException {
- return Collections.emptyList();
+ return
icebergSource.getIcebergTable().spec().fields().stream().map(PartitionField::name)
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public TFileFormatType getFileFormatType() throws DdlException,
MetaNotFoundException {
+ TFileFormatType type;
+ String icebergFormat = icebergSource.getFileFormat();
+ if (icebergFormat.equalsIgnoreCase("parquet")) {
+ type = TFileFormatType.FORMAT_PARQUET;
+ } else if (icebergFormat.equalsIgnoreCase("orc")) {
+ type = TFileFormatType.FORMAT_ORC;
+ } else {
+ throw new DdlException(String.format("Unsupported format name: %s
for iceberg table.", icebergFormat));
+ }
+ return type;
+ }
+
+ @Override
+ public Map<String, String> getLocationProperties() throws
MetaNotFoundException, DdlException {
+ return icebergSource.getCatalog().getProperties();
+ }
+
+ @Override
+ public ExternalFileScanNode.ParamCreateContext createContext(Analyzer
analyzer) throws UserException {
+ return icebergSource.createContext();
+ }
+
+ @Override
+ public TableIf getTargetTable() {
+ return icebergSource.getTargetTable();
+ }
+
+ @Override
+ public TFileAttributes getFileAttributes() throws UserException {
+ return icebergSource.getFileAttributes();
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergSource.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergSource.java
new file mode 100644
index 0000000000..ab17c6a448
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergSource.java
@@ -0,0 +1,44 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.planner.external.iceberg;
+
+import org.apache.doris.analysis.TupleDescriptor;
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.MetaNotFoundException;
+import org.apache.doris.common.UserException;
+import org.apache.doris.datasource.ExternalCatalog;
+import org.apache.doris.planner.external.ExternalFileScanNode;
+import org.apache.doris.thrift.TFileAttributes;
+
+public interface IcebergSource {
+
+ TupleDescriptor getDesc();
+
+ org.apache.iceberg.Table getIcebergTable() throws MetaNotFoundException;
+
+ ExternalFileScanNode.ParamCreateContext createContext() throws
UserException;
+
+ TableIf getTargetTable();
+
+ TFileAttributes getFileAttributes() throws UserException;
+
+ ExternalCatalog getCatalog();
+
+ String getFileFormat() throws DdlException, MetaNotFoundException;
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergSplit.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergSplit.java
similarity index 92%
rename from
fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergSplit.java
rename to
fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergSplit.java
index b9607a7f00..a82c99b04a 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergSplit.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergSplit.java
@@ -15,9 +15,10 @@
// specific language governing permissions and limitations
// under the License.
-package org.apache.doris.planner.external;
+package org.apache.doris.planner.external.iceberg;
import org.apache.doris.analysis.Analyzer;
+import org.apache.doris.planner.external.HiveSplit;
import lombok.Data;
import org.apache.hadoop.fs.Path;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]