This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 20e1255fa [Improve][SourceConnector] Unifie Iceberg source fields to 
schema (#3959)
20e1255fa is described below

commit 20e1255fabb2f5b0813fe0c73632de6e6fac2ec3
Author: wfrong <[email protected]>
AuthorDate: Wed Feb 22 13:33:25 2023 +0800

    [Improve][SourceConnector] Unifie Iceberg source fields to schema (#3959)
    
    * Unifie Iceberg source fields to schema
    
    * add compatibility adaptation
    
    * e2e module struct nested field name change to uppercase
    
    * e2e module select fields remove struct type
    
    * make better code style
    
    ---------
    
    Co-authored-by: furong.wang <[email protected]>
---
 docs/en/connector-v2/source/Iceberg.md             | 20 +++++++++++--
 .../seatunnel/iceberg/source/IcebergSource.java    | 10 +++++--
 .../iceberg/source/IcebergSourceFactory.java       |  4 +--
 .../src/test/resources/iceberg/iceberg_source.conf | 34 ++++++++++++----------
 4 files changed, 46 insertions(+), 22 deletions(-)

diff --git a/docs/en/connector-v2/source/Iceberg.md 
b/docs/en/connector-v2/source/Iceberg.md
index dcd7ab49b..0529a7e43 100644
--- a/docs/en/connector-v2/source/Iceberg.md
+++ b/docs/en/connector-v2/source/Iceberg.md
@@ -32,6 +32,7 @@ Source connector for Apache Iceberg. It can support batch and 
stream mode.
 | warehouse                | string  | yes      | -                    |
 | namespace                | string  | yes      | -                    |
 | table                    | string  | yes      | -                    |
+| schema                   | config  | no       | -                    |
 | case_sensitive           | boolean | no       | false                |
 | start_snapshot_timestamp | long    | no       | -                    |
 | start_snapshot_id        | long    | no       | -                    |
@@ -69,12 +70,27 @@ The iceberg table name in the backend catalog.
 
 ### case_sensitive [boolean]
 
-If data columns where selected via fields(Collection), controls whether the 
match to the schema will be done with case sensitivity.
+If data columns where selected via schema [config], controls whether the match 
to the schema will be done with case sensitivity.
 
-### fields [array]
+### schema [config]
+
+#### fields [Config]
 
 Use projection to select data columns and columns order.
 
+e.g.
+
+```
+schema {
+    fields {
+      f2 = "boolean"
+      f1 = "bigint"
+      f3 = "int"
+      f4 = "bigint"
+    }
+}
+```
+
 ### start_snapshot_id [long]
 
 Instructs this scan to look for changes starting from a particular snapshot 
(exclusive).
diff --git 
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/IcebergSource.java
 
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/IcebergSource.java
index eb5465e2e..4ff45c755 100644
--- 
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/IcebergSource.java
+++ 
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/IcebergSource.java
@@ -105,9 +105,15 @@ public class IcebergSource
                         columnDataTypes.toArray(new SeaTunnelDataType[0]));
 
         CheckResult checkResult =
-                CheckConfigUtil.checkAllExists(pluginConfig, 
CommonConfig.KEY_FIELDS.key());
+                CheckConfigUtil.checkAtLeastOneExists(
+                        pluginConfig, CommonConfig.KEY_FIELDS.key(), 
SeaTunnelSchema.SCHEMA.key());
+
         if (checkResult.isSuccess()) {
-            SeaTunnelSchema configSchema = 
SeaTunnelSchema.buildWithConfig(pluginConfig);
+            Config config =
+                    pluginConfig.hasPath(SeaTunnelSchema.SCHEMA.key())
+                            ? 
pluginConfig.getConfig(SeaTunnelSchema.SCHEMA.key())
+                            : pluginConfig;
+            SeaTunnelSchema configSchema = 
SeaTunnelSchema.buildWithConfig(config);
             SeaTunnelRowType projectedRowType = 
configSchema.getSeaTunnelRowType();
             for (int i = 0; i < projectedRowType.getFieldNames().length; i++) {
                 String fieldName = projectedRowType.getFieldName(i);
diff --git 
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/IcebergSourceFactory.java
 
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/IcebergSourceFactory.java
index e630130a4..52cd028b4 100644
--- 
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/IcebergSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/IcebergSourceFactory.java
@@ -21,13 +21,13 @@ import 
org.apache.seatunnel.api.configuration.util.OptionRule;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.api.table.factory.Factory;
 import org.apache.seatunnel.api.table.factory.TableSourceFactory;
+import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
 
 import com.google.auto.service.AutoService;
 
 import static 
org.apache.seatunnel.connectors.seatunnel.iceberg.config.CommonConfig.KEY_CASE_SENSITIVE;
 import static 
org.apache.seatunnel.connectors.seatunnel.iceberg.config.CommonConfig.KEY_CATALOG_NAME;
 import static 
org.apache.seatunnel.connectors.seatunnel.iceberg.config.CommonConfig.KEY_CATALOG_TYPE;
-import static 
org.apache.seatunnel.connectors.seatunnel.iceberg.config.CommonConfig.KEY_FIELDS;
 import static 
org.apache.seatunnel.connectors.seatunnel.iceberg.config.CommonConfig.KEY_NAMESPACE;
 import static 
org.apache.seatunnel.connectors.seatunnel.iceberg.config.CommonConfig.KEY_TABLE;
 import static 
org.apache.seatunnel.connectors.seatunnel.iceberg.config.CommonConfig.KEY_URI;
@@ -55,7 +55,7 @@ public class IcebergSourceFactory implements 
TableSourceFactory {
                         KEY_CATALOG_NAME, KEY_CATALOG_TYPE, KEY_WAREHOUSE, 
KEY_NAMESPACE, KEY_TABLE)
                 .conditional(KEY_CATALOG_TYPE, HIVE, KEY_URI)
                 .optional(
-                        KEY_FIELDS,
+                        SeaTunnelSchema.SCHEMA,
                         KEY_CASE_SENSITIVE,
                         KEY_START_SNAPSHOT_TIMESTAMP,
                         KEY_START_SNAPSHOT_ID,
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/resources/iceberg/iceberg_source.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/resources/iceberg/iceberg_source.conf
index 1e2372205..00ae2afad 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/resources/iceberg/iceberg_source.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/resources/iceberg/iceberg_source.conf
@@ -26,22 +26,24 @@ env {
 
 source {
   Iceberg {
-    fields {
-      f2 = "boolean"
-      f1 = "bigint"
-      f3 = "int"
-      f4 = "bigint"
-      f5 = "float"
-      f6 = "double"
-      f7 = "date"
-      f9 = "timestamp"
-      f10 = "timestamp"
-      f11 = "string"
-      f12 = "bytes"
-      f13 = "bytes"
-      f14 = "decimal(19,9)"
-      f15 = "array<int>"
-      f16 = "map<string, int>"
+    schema {
+        fields {
+            f2 = "boolean"
+            f1 = "bigint"
+            f3 = "int"
+            f4 = "bigint"
+            f5 = "float"
+            f6 = "double"
+            f7 = "date"
+            f9 = "timestamp"
+            f10 = "timestamp"
+            f11 = "string"
+            f12 = "bytes"
+            f13 = "bytes"
+            f14 = "decimal(19,9)"
+            f15 = "array<int>"
+            f16 = "map<string, int>"
+        }
     }
     catalog_name = "seatunnel"
     catalog_type = "hadoop"

Reply via email to