This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new b1b2697cc7 [fix](iceberg) fix iceberg catalog (#16372)
b1b2697cc7 is described below
commit b1b2697cc7c832071930ed721c9dadb7be396dab
Author: slothever <[email protected]>
AuthorDate: Sun Feb 5 13:15:28 2023 +0800
[fix](iceberg) fix iceberg catalog (#16372)
1. Fix iceberg catalog access s3
2. Fix iceberg catalog partition table query
3. Fix persistence
---
be/src/vec/exec/format/generic_reader.h | 2 +-
be/src/vec/exec/format/table/iceberg_reader.cpp | 4 ++
be/src/vec/exec/format/table/iceberg_reader.h | 2 +
be/src/vec/exec/scan/scanner_scheduler.cpp | 4 +-
docs/zh-CN/docs/lakehouse/multi-catalog/iceberg.md | 43 ++++++++++++++++++++++
.../datasource/iceberg/IcebergExternalCatalog.java | 22 ++++++++---
.../iceberg/IcebergExternalCatalogFactory.java | 4 +-
.../iceberg/IcebergHMSExternalCatalog.java | 6 +--
.../iceberg/IcebergRestExternalCatalog.java | 29 +++++++++++----
.../org/apache/doris/persist/gson/GsonUtils.java | 2 +
10 files changed, 98 insertions(+), 20 deletions(-)
diff --git a/be/src/vec/exec/format/generic_reader.h
b/be/src/vec/exec/format/generic_reader.h
index 9f4cfd00ee..5abf064813 100644
--- a/be/src/vec/exec/format/generic_reader.h
+++ b/be/src/vec/exec/format/generic_reader.h
@@ -47,7 +47,7 @@ public:
/// If the underlying FileReader has filled the partition&missing columns,
/// The FileScanner does not need to fill
- bool fill_all_columns() const { return _fill_all_columns; }
+ virtual bool fill_all_columns() const { return _fill_all_columns; }
/// Tell the underlying FileReader the partition&missing columns,
/// and the FileReader determine to fill columns or not.
diff --git a/be/src/vec/exec/format/table/iceberg_reader.cpp
b/be/src/vec/exec/format/table/iceberg_reader.cpp
index adc31e605b..0035323432 100644
--- a/be/src/vec/exec/format/table/iceberg_reader.cpp
+++ b/be/src/vec/exec/format/table/iceberg_reader.cpp
@@ -114,6 +114,10 @@ Status IcebergTableReader::set_fill_columns(
return _file_format_reader->set_fill_columns(partition_columns,
missing_columns);
}
+bool IcebergTableReader::fill_all_columns() const {
+ return _file_format_reader->fill_all_columns();
+};
+
Status IcebergTableReader::get_columns(
std::unordered_map<std::string, TypeDescriptor>* name_to_type,
std::unordered_set<std::string>* missing_cols) {
diff --git a/be/src/vec/exec/format/table/iceberg_reader.h
b/be/src/vec/exec/format/table/iceberg_reader.h
index 93a6963c80..f9e480f28b 100644
--- a/be/src/vec/exec/format/table/iceberg_reader.h
+++ b/be/src/vec/exec/format/table/iceberg_reader.h
@@ -53,6 +53,8 @@ public:
partition_columns,
const std::unordered_map<std::string, VExprContext*>&
missing_columns) override;
+ bool fill_all_columns() const override;
+
Status get_columns(std::unordered_map<std::string, TypeDescriptor>*
name_to_type,
std::unordered_set<std::string>* missing_cols) override;
diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp
b/be/src/vec/exec/scan/scanner_scheduler.cpp
index fe7356d0e8..32e6a74da3 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.cpp
+++ b/be/src/vec/exec/scan/scanner_scheduler.cpp
@@ -242,7 +242,7 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler*
scheduler, ScannerContext
auto block = ctx->get_free_block(&get_free_block);
status = scanner->get_block(state, block, &eos);
- VLOG_ROW << "VOlapScanNode input rows: " << block->rows() << ", eos: "
<< eos;
+ VLOG_ROW << "VScanNode input rows: " << block->rows() << ", eos: " <<
eos;
// The VFileScanner for external table may try to open not exist files,
// Because FE file cache for external table may out of date.
// So, NOT_FOUND for VFileScanner is not a fail case.
@@ -250,7 +250,7 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler*
scheduler, ScannerContext
if (!status.ok() && (typeid(*scanner) !=
typeid(doris::vectorized::VFileScanner) ||
(typeid(*scanner) ==
typeid(doris::vectorized::VFileScanner) &&
!status.is<ErrorCode::NOT_FOUND>()))) {
- LOG(WARNING) << "Scan thread read VOlapScanner failed: " <<
status.to_string();
+ LOG(WARNING) << "Scan thread read VScanner failed: " <<
status.to_string();
// Add block ptr in blocks, prevent mem leak in read failed
blocks.push_back(block);
break;
diff --git a/docs/zh-CN/docs/lakehouse/multi-catalog/iceberg.md
b/docs/zh-CN/docs/lakehouse/multi-catalog/iceberg.md
index d27a176e96..3424eb8c29 100644
--- a/docs/zh-CN/docs/lakehouse/multi-catalog/iceberg.md
+++ b/docs/zh-CN/docs/lakehouse/multi-catalog/iceberg.md
@@ -35,6 +35,8 @@ under the License.
## 创建 Catalog
+### 基于Hive Metastore创建Catalog
+
和 Hive Catalog 基本一致,这里仅给出简单示例。其他示例可参阅 [Hive Catalog](./hive)。
```sql
@@ -50,6 +52,47 @@ CREATE CATALOG iceberg PROPERTIES (
);
```
+### 基于Iceberg API创建Catalog
+
+使用Iceberg API访问元数据的方式,支持Hive、REST、Glue等服务作为Iceberg的Catalog。
+
+- Hive Metastore作为元数据服务
+
+```sql
+CREATE CATALOG iceberg PROPERTIES (
+ 'type'='iceberg',
+ 'iceberg.catalog.type'='hms',
+ 'hive.metastore.uris' = 'thrift://172.21.0.1:7004',
+ 'hadoop.username' = 'hive',
+ 'dfs.nameservices'='your-nameservice',
+ 'dfs.ha.namenodes.your-nameservice'='nn1,nn2',
+ 'dfs.namenode.rpc-address.your-nameservice.nn1'='172.21.0.2:4007',
+ 'dfs.namenode.rpc-address.your-nameservice.nn2'='172.21.0.3:4007',
+
'dfs.client.failover.proxy.provider.your-nameservice'='org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider'
+);
+```
+
+- REST Catalog作为元数据服务
+
+该方式需要预先提供REST服务,用户需实现获取Iceberg元数据的REST接口。
+
+```sql
+CREATE CATALOG iceberg PROPERTIES (
+ 'type'='iceberg',
+ 'iceberg.catalog.type'='rest',
+ 'uri' = 'http://172.21.0.1:8181',
+);
+```
+
+若数据存放在S3上,properties中可以使用以下参数
+
+```sql
+"AWS_ACCESS_KEY" = "username"
+"AWS_SECRET_KEY" = "password"
+"AWS_REGION" = "region-name"
+"AWS_ENDPOINT" = "http://endpoint-uri"
+```
+
## 列类型映射
和 Hive Catalog 一致,可参阅 [Hive Catalog](./hive) 中 **列类型映射** 一节。
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
index c396ca268f..4b9f2a2e15 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
@@ -35,6 +35,7 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.iceberg.Schema;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.SupportsNamespaces;
@@ -54,13 +55,12 @@ public abstract class IcebergExternalCatalog extends
ExternalCatalog {
public static final String ICEBERG_CATALOG_TYPE = "iceberg.catalog.type";
public static final String ICEBERG_REST = "rest";
public static final String ICEBERG_HMS = "hms";
- protected final String icebergCatalogType;
+ protected String icebergCatalogType;
protected Catalog catalog;
protected SupportsNamespaces nsCatalog;
- public IcebergExternalCatalog(long catalogId, String name, String type) {
+ public IcebergExternalCatalog(long catalogId, String name) {
super(catalogId, name);
- this.icebergCatalogType = type;
}
@Override
@@ -152,7 +152,18 @@ public abstract class IcebergExternalCatalog extends
ExternalCatalog {
}
}
+ public Catalog getCatalog() {
+ makeSureInitialized();
+ return catalog;
+ }
+
+ public SupportsNamespaces getNsCatalog() {
+ makeSureInitialized();
+ return nsCatalog;
+ }
+
public String getIcebergCatalogType() {
+ makeSureInitialized();
return icebergCatalogType;
}
@@ -180,12 +191,13 @@ public abstract class IcebergExternalCatalog extends
ExternalCatalog {
@Override
public List<Column> getSchema(String dbName, String tblName) {
makeSureInitialized();
- List<Types.NestedField> columns = getIcebergTable(dbName,
tblName).schema().columns();
+ Schema schema = getIcebergTable(dbName, tblName).schema();
+ List<Types.NestedField> columns = schema.columns();
List<Column> tmpSchema =
Lists.newArrayListWithCapacity(columns.size());
for (Types.NestedField field : columns) {
tmpSchema.add(new Column(field.name(),
icebergTypeToDorisType(field.type()), true, null,
- true, field.doc(), true, -1));
+ true, field.doc(), true,
schema.caseInsensitiveFindField(field.name()).fieldId()));
}
return tmpSchema;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalogFactory.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalogFactory.java
index 62ad0c8729..4f97c3cd59 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalogFactory.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalogFactory.java
@@ -32,9 +32,9 @@ public class IcebergExternalCatalogFactory {
}
switch (catalogType) {
case IcebergExternalCatalog.ICEBERG_REST:
- return new IcebergRestExternalCatalog(catalogId, name,
resource, catalogType, props);
+ return new IcebergRestExternalCatalog(catalogId, name,
resource, props);
case IcebergExternalCatalog.ICEBERG_HMS:
- return new IcebergHMSExternalCatalog(catalogId, name,
resource, catalogType, props);
+ return new IcebergHMSExternalCatalog(catalogId, name,
resource, props);
default:
throw new DdlException("Unknown " +
IcebergExternalCatalog.ICEBERG_CATALOG_TYPE
+ " value: " + catalogType);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergHMSExternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergHMSExternalCatalog.java
index f969ae7085..97748c608b 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergHMSExternalCatalog.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergHMSExternalCatalog.java
@@ -28,14 +28,14 @@ import java.util.Map;
public class IcebergHMSExternalCatalog extends IcebergExternalCatalog {
- public IcebergHMSExternalCatalog(long catalogId, String name, String
resource, String catalogType,
- Map<String, String> props) {
- super(catalogId, name, catalogType);
+ public IcebergHMSExternalCatalog(long catalogId, String name, String
resource, Map<String, String> props) {
+ super(catalogId, name);
catalogProperty = new CatalogProperty(resource, props);
}
@Override
protected void initLocalObjectsImpl() {
+ icebergCatalogType = ICEBERG_HMS;
HiveCatalog hiveCatalog = new org.apache.iceberg.hive.HiveCatalog();
hiveCatalog.setConf(getConfiguration());
// initialize hive catalog
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergRestExternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergRestExternalCatalog.java
index f3f240b661..c0343f244c 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergRestExternalCatalog.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergRestExternalCatalog.java
@@ -17,6 +17,7 @@
package org.apache.doris.datasource.iceberg;
+import org.apache.doris.catalog.S3Resource;
import org.apache.doris.datasource.CatalogProperty;
import org.apache.doris.datasource.credentials.DataLakeAWSCredentialsProvider;
@@ -30,24 +31,38 @@ import java.util.Map;
public class IcebergRestExternalCatalog extends IcebergExternalCatalog {
- public IcebergRestExternalCatalog(long catalogId, String name, String
resource, String catalogType,
- Map<String, String> props) {
- super(catalogId, name, catalogType);
+ public IcebergRestExternalCatalog(long catalogId, String name, String
resource, Map<String, String> props) {
+ super(catalogId, name);
catalogProperty = new CatalogProperty(resource, props);
}
@Override
protected void initLocalObjectsImpl() {
+ icebergCatalogType = ICEBERG_REST;
Map<String, String> restProperties = new HashMap<>();
String restUri =
catalogProperty.getProperties().getOrDefault(CatalogProperties.URI, "");
restProperties.put(CatalogProperties.URI, restUri);
RESTCatalog restCatalog = new RESTCatalog();
- String credentials = catalogProperty.getProperties()
- .getOrDefault(Constants.AWS_CREDENTIALS_PROVIDER,
DataLakeAWSCredentialsProvider.class.getName());
- Configuration conf = getConfiguration();
- conf.set(Constants.AWS_CREDENTIALS_PROVIDER, credentials);
+ Configuration conf = replaceS3Properties(getConfiguration());
restCatalog.setConf(conf);
restCatalog.initialize(icebergCatalogType, restProperties);
catalog = restCatalog;
}
+
+ private Configuration replaceS3Properties(Configuration conf) {
+ Map<String, String> catalogProperties =
catalogProperty.getProperties();
+ String credentials = catalogProperties
+ .getOrDefault(Constants.AWS_CREDENTIALS_PROVIDER,
DataLakeAWSCredentialsProvider.class.getName());
+ conf.set(Constants.AWS_CREDENTIALS_PROVIDER, credentials);
+ String usePahStyle =
catalogProperties.getOrDefault(S3Resource.USE_PATH_STYLE, "true");
+ // Set path style
+ conf.set(S3Resource.USE_PATH_STYLE, usePahStyle);
+ conf.set(Constants.PATH_STYLE_ACCESS, usePahStyle);
+ // Get AWS client retry limit
+ conf.set(Constants.RETRY_LIMIT,
catalogProperties.getOrDefault(Constants.RETRY_LIMIT, "1"));
+ conf.set(Constants.RETRY_THROTTLE_LIMIT,
catalogProperties.getOrDefault(Constants.RETRY_THROTTLE_LIMIT, "1"));
+ conf.set(Constants.S3GUARD_CONSISTENCY_RETRY_LIMIT,
+
catalogProperties.getOrDefault(Constants.S3GUARD_CONSISTENCY_RETRY_LIMIT, "1"));
+ return conf;
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java
b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java
index f613d6f986..fef1ed14dc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java
@@ -52,6 +52,7 @@ import org.apache.doris.datasource.EsExternalCatalog;
import org.apache.doris.datasource.HMSExternalCatalog;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.datasource.JdbcExternalCatalog;
+import org.apache.doris.datasource.iceberg.IcebergExternalCatalog;
import org.apache.doris.datasource.iceberg.IcebergHMSExternalCatalog;
import org.apache.doris.datasource.iceberg.IcebergRestExternalCatalog;
import org.apache.doris.load.loadv2.LoadJob.LoadJobStateUpdateInfo;
@@ -172,6 +173,7 @@ public class GsonUtils {
.registerSubtype(HMSExternalCatalog.class,
HMSExternalCatalog.class.getSimpleName())
.registerSubtype(EsExternalCatalog.class,
EsExternalCatalog.class.getSimpleName())
.registerSubtype(JdbcExternalCatalog.class,
JdbcExternalCatalog.class.getSimpleName())
+ .registerSubtype(IcebergExternalCatalog.class,
IcebergExternalCatalog.class.getSimpleName())
.registerSubtype(IcebergHMSExternalCatalog.class,
IcebergHMSExternalCatalog.class.getSimpleName())
.registerSubtype(IcebergRestExternalCatalog.class,
IcebergRestExternalCatalog.class.getSimpleName());
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]