This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 20e1255fa [Improve][SourceConnector] Unifie Iceberg source fields to
schema (#3959)
20e1255fa is described below
commit 20e1255fabb2f5b0813fe0c73632de6e6fac2ec3
Author: wfrong <[email protected]>
AuthorDate: Wed Feb 22 13:33:25 2023 +0800
[Improve][SourceConnector] Unifie Iceberg source fields to schema (#3959)
* Unifie Iceberg source fields to schema
* add compatibility adaptation
* e2e module struct nested field name change to uppercase
* e2e module select fields remove struct type
* make better code style
---------
Co-authored-by: furong.wang <[email protected]>
---
docs/en/connector-v2/source/Iceberg.md | 20 +++++++++++--
.../seatunnel/iceberg/source/IcebergSource.java | 10 +++++--
.../iceberg/source/IcebergSourceFactory.java | 4 +--
.../src/test/resources/iceberg/iceberg_source.conf | 34 ++++++++++++----------
4 files changed, 46 insertions(+), 22 deletions(-)
diff --git a/docs/en/connector-v2/source/Iceberg.md
b/docs/en/connector-v2/source/Iceberg.md
index dcd7ab49b..0529a7e43 100644
--- a/docs/en/connector-v2/source/Iceberg.md
+++ b/docs/en/connector-v2/source/Iceberg.md
@@ -32,6 +32,7 @@ Source connector for Apache Iceberg. It can support batch and
stream mode.
| warehouse | string | yes | - |
| namespace | string | yes | - |
| table | string | yes | - |
+| schema | config | no | - |
| case_sensitive | boolean | no | false |
| start_snapshot_timestamp | long | no | - |
| start_snapshot_id | long | no | - |
@@ -69,12 +70,27 @@ The iceberg table name in the backend catalog.
### case_sensitive [boolean]
-If data columns where selected via fields(Collection), controls whether the
match to the schema will be done with case sensitivity.
+If data columns where selected via schema [config], controls whether the match
to the schema will be done with case sensitivity.
-### fields [array]
+### schema [config]
+
+#### fields [Config]
Use projection to select data columns and columns order.
+e.g.
+
+```
+schema {
+ fields {
+ f2 = "boolean"
+ f1 = "bigint"
+ f3 = "int"
+ f4 = "bigint"
+ }
+}
+```
+
### start_snapshot_id [long]
Instructs this scan to look for changes starting from a particular snapshot
(exclusive).
diff --git
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/IcebergSource.java
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/IcebergSource.java
index eb5465e2e..4ff45c755 100644
---
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/IcebergSource.java
+++
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/IcebergSource.java
@@ -105,9 +105,15 @@ public class IcebergSource
columnDataTypes.toArray(new SeaTunnelDataType[0]));
CheckResult checkResult =
- CheckConfigUtil.checkAllExists(pluginConfig,
CommonConfig.KEY_FIELDS.key());
+ CheckConfigUtil.checkAtLeastOneExists(
+ pluginConfig, CommonConfig.KEY_FIELDS.key(),
SeaTunnelSchema.SCHEMA.key());
+
if (checkResult.isSuccess()) {
- SeaTunnelSchema configSchema =
SeaTunnelSchema.buildWithConfig(pluginConfig);
+ Config config =
+ pluginConfig.hasPath(SeaTunnelSchema.SCHEMA.key())
+ ?
pluginConfig.getConfig(SeaTunnelSchema.SCHEMA.key())
+ : pluginConfig;
+ SeaTunnelSchema configSchema =
SeaTunnelSchema.buildWithConfig(config);
SeaTunnelRowType projectedRowType =
configSchema.getSeaTunnelRowType();
for (int i = 0; i < projectedRowType.getFieldNames().length; i++) {
String fieldName = projectedRowType.getFieldName(i);
diff --git
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/IcebergSourceFactory.java
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/IcebergSourceFactory.java
index e630130a4..52cd028b4 100644
---
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/IcebergSourceFactory.java
+++
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/IcebergSourceFactory.java
@@ -21,13 +21,13 @@ import
org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
+import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
import com.google.auto.service.AutoService;
import static
org.apache.seatunnel.connectors.seatunnel.iceberg.config.CommonConfig.KEY_CASE_SENSITIVE;
import static
org.apache.seatunnel.connectors.seatunnel.iceberg.config.CommonConfig.KEY_CATALOG_NAME;
import static
org.apache.seatunnel.connectors.seatunnel.iceberg.config.CommonConfig.KEY_CATALOG_TYPE;
-import static
org.apache.seatunnel.connectors.seatunnel.iceberg.config.CommonConfig.KEY_FIELDS;
import static
org.apache.seatunnel.connectors.seatunnel.iceberg.config.CommonConfig.KEY_NAMESPACE;
import static
org.apache.seatunnel.connectors.seatunnel.iceberg.config.CommonConfig.KEY_TABLE;
import static
org.apache.seatunnel.connectors.seatunnel.iceberg.config.CommonConfig.KEY_URI;
@@ -55,7 +55,7 @@ public class IcebergSourceFactory implements
TableSourceFactory {
KEY_CATALOG_NAME, KEY_CATALOG_TYPE, KEY_WAREHOUSE,
KEY_NAMESPACE, KEY_TABLE)
.conditional(KEY_CATALOG_TYPE, HIVE, KEY_URI)
.optional(
- KEY_FIELDS,
+ SeaTunnelSchema.SCHEMA,
KEY_CASE_SENSITIVE,
KEY_START_SNAPSHOT_TIMESTAMP,
KEY_START_SNAPSHOT_ID,
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/resources/iceberg/iceberg_source.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/resources/iceberg/iceberg_source.conf
index 1e2372205..00ae2afad 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/resources/iceberg/iceberg_source.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/resources/iceberg/iceberg_source.conf
@@ -26,22 +26,24 @@ env {
source {
Iceberg {
- fields {
- f2 = "boolean"
- f1 = "bigint"
- f3 = "int"
- f4 = "bigint"
- f5 = "float"
- f6 = "double"
- f7 = "date"
- f9 = "timestamp"
- f10 = "timestamp"
- f11 = "string"
- f12 = "bytes"
- f13 = "bytes"
- f14 = "decimal(19,9)"
- f15 = "array<int>"
- f16 = "map<string, int>"
+ schema {
+ fields {
+ f2 = "boolean"
+ f1 = "bigint"
+ f3 = "int"
+ f4 = "bigint"
+ f5 = "float"
+ f6 = "double"
+ f7 = "date"
+ f9 = "timestamp"
+ f10 = "timestamp"
+ f11 = "string"
+ f12 = "bytes"
+ f13 = "bytes"
+ f14 = "decimal(19,9)"
+ f15 = "array<int>"
+ f16 = "map<string, int>"
+ }
}
catalog_name = "seatunnel"
catalog_type = "hadoop"