This is an automated email from the ASF dual-hosted git repository.
wuchunfu pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new fa34ac98b4 [Improve][API] Check catalog table fields name legal before
send to downstream (#7358)
fa34ac98b4 is described below
commit fa34ac98b42cd2bf52837efa27382cfe24c655cd
Author: Jia Fan <[email protected]>
AuthorDate: Mon Aug 12 13:35:43 2024 +0800
[Improve][API] Check catalog table fields name legal before send to
downstream (#7358)
* [Improve][API] Check catalog table fields name legal before send to
downstream
* update
---
.github/workflows/backend.yml | 38 +++++++++++++
.../api/table/factory/TableFactoryContext.java | 28 ++++++++++
.../api/table/factory/TableSinkFactoryContext.java | 8 ++-
.../factory/TableTransformFactoryContext.java | 1 +
.../api/table/catalog/CatalogTableTest.java | 62 ++++++++++++++++++++++
5 files changed, 136 insertions(+), 1 deletion(-)
diff --git a/.github/workflows/backend.yml b/.github/workflows/backend.yml
index 88a2d59e3f..81222695a3 100644
--- a/.github/workflows/backend.yml
+++ b/.github/workflows/backend.yml
@@ -553,6 +553,8 @@ jobs:
java-version: ${{ matrix.java }}
distribution: 'temurin'
cache: 'maven'
+ - name: free disk space
+ run: tools/github/free_disk_space.sh
- name: run seatunnel zeta integration test
if: needs.changes.outputs.api == 'true'
run: |
@@ -609,6 +611,8 @@ jobs:
java-version: ${{ matrix.java }}
distribution: 'temurin'
cache: 'maven'
+ - name: free disk space
+ run: tools/github/free_disk_space.sh
- name: run transform-v2 integration test (part-1)
if: needs.changes.outputs.api == 'true'
run: |
@@ -633,6 +637,8 @@ jobs:
java-version: ${{ matrix.java }}
distribution: 'temurin'
cache: 'maven'
+ - name: free disk space
+ run: tools/github/free_disk_space.sh
- name: run transform-v2 integration test (part-2)
if: needs.changes.outputs.api == 'true'
run: |
@@ -657,6 +663,8 @@ jobs:
java-version: ${{ matrix.java }}
distribution: 'temurin'
cache: 'maven'
+ - name: free disk space
+ run: tools/github/free_disk_space.sh
- name: run connector-v2 integration test (part-1)
if: needs.changes.outputs.api == 'true'
run: |
@@ -684,6 +692,8 @@ jobs:
java-version: ${{ matrix.java }}
distribution: 'temurin'
cache: 'maven'
+ - name: free disk space
+ run: tools/github/free_disk_space.sh
- name: run connector-v2 integration test (part-2)
if: needs.changes.outputs.api == 'true'
run: |
@@ -711,6 +721,8 @@ jobs:
java-version: ${{ matrix.java }}
distribution: 'temurin'
cache: 'maven'
+ - name: free disk space
+ run: tools/github/free_disk_space.sh
- name: run connector-v2 integration test (part-3)
if: needs.changes.outputs.api == 'true'
run: |
@@ -738,6 +750,8 @@ jobs:
java-version: ${{ matrix.java }}
distribution: 'temurin'
cache: 'maven'
+ - name: free disk space
+ run: tools/github/free_disk_space.sh
- name: run connector-v2 integration test (part-4)
if: needs.changes.outputs.api == 'true'
run: |
@@ -765,6 +779,8 @@ jobs:
java-version: ${{ matrix.java }}
distribution: 'temurin'
cache: 'maven'
+ - name: free disk space
+ run: tools/github/free_disk_space.sh
- name: run connector-v2 integration test (part-5)
if: needs.changes.outputs.api == 'true'
run: |
@@ -792,6 +808,8 @@ jobs:
java-version: ${{ matrix.java }}
distribution: 'temurin'
cache: 'maven'
+ - name: free disk space
+ run: tools/github/free_disk_space.sh
- name: run connector-v2 integration test (part-6)
if: needs.changes.outputs.api == 'true'
run: |
@@ -819,6 +837,8 @@ jobs:
java-version: ${{ matrix.java }}
distribution: 'temurin'
cache: 'maven'
+ - name: free disk space
+ run: tools/github/free_disk_space.sh
- name: run connector-v2 integration test (part-7)
if: needs.changes.outputs.api == 'true'
run: |
@@ -898,6 +918,8 @@ jobs:
java-version: ${{ matrix.java }}
distribution: 'temurin'
cache: 'maven'
+ - name: free disk space
+ run: tools/github/free_disk_space.sh
- name: run jdbc connectors integration test (part-3)
if: needs.changes.outputs.api == 'true'
run: |
@@ -922,6 +944,8 @@ jobs:
java-version: ${{ matrix.java }}
distribution: 'temurin'
cache: 'maven'
+ - name: free disk space
+ run: tools/github/free_disk_space.sh
- name: run jdbc connectors integration test (part-4)
if: needs.changes.outputs.api == 'true'
run: |
@@ -946,6 +970,8 @@ jobs:
java-version: ${{ matrix.java }}
distribution: 'temurin'
cache: 'maven'
+ - name: free disk space
+ run: tools/github/free_disk_space.sh
- name: run jdbc connectors integration test (part-5)
if: needs.changes.outputs.api == 'true'
run: |
@@ -996,6 +1022,8 @@ jobs:
java-version: ${{ matrix.java }}
distribution: 'temurin'
cache: 'maven'
+ - name: free disk space
+ run: tools/github/free_disk_space.sh
- name: run jdbc connectors integration test (part-7)
if: needs.changes.outputs.api == 'true'
run: |
@@ -1020,6 +1048,8 @@ jobs:
java-version: ${{ matrix.java }}
distribution: 'temurin'
cache: 'maven'
+ - name: free disk space
+ run: tools/github/free_disk_space.sh
- name: run kudu connector integration test
run: |
./mvnw -B -T 1 verify -DskipUT=true -DskipIT=false
-D"license.skipAddThirdParty"=true --no-snapshot-updates -pl
:connector-kudu-e2e -am -Pci
@@ -1043,6 +1073,8 @@ jobs:
java-version: ${{ matrix.java }}
distribution: 'temurin'
cache: 'maven'
+ - name: free disk space
+ run: tools/github/free_disk_space.sh
- name: run amazonsqs connector integration test
run: |
./mvnw -B -T 1 verify -DskipUT=true -DskipIT=false
-D"license.skipAddThirdParty"=true --no-snapshot-updates -pl
:connector-amazonsqs-e2e -am -Pci
@@ -1066,6 +1098,8 @@ jobs:
java-version: ${{ matrix.java }}
distribution: 'temurin'
cache: 'maven'
+ - name: free disk space
+ run: tools/github/free_disk_space.sh
- name: run kafka connector integration test
run: |
./mvnw -B -T 1 verify -DskipUT=true -DskipIT=false
-D"license.skipAddThirdParty"=true --no-snapshot-updates -pl
:connector-kafka-e2e -am -Pci
@@ -1089,6 +1123,8 @@ jobs:
java-version: ${{ matrix.java }}
distribution: 'temurin'
cache: 'maven'
+ - name: free disk space
+ run: tools/github/free_disk_space.sh
- name: run rocket connector integration test
run: |
./mvnw -B -T 1 verify -DskipUT=true -DskipIT=false
-D"license.skipAddThirdParty"=true --no-snapshot-updates -pl
:connector-rocketmq-e2e -am -Pci
@@ -1139,6 +1175,8 @@ jobs:
java-version: ${{ matrix.java }}
distribution: 'temurin'
cache: 'maven'
+ - name: free disk space
+ run: tools/github/free_disk_space.sh
- name: run oracle cdc connector integration test
run: |
./mvnw -B -T 1 verify -DskipUT=true -DskipIT=false
-D"license.skipAddThirdParty"=true --no-snapshot-updates -pl
:connector-cdc-oracle-e2e -am -Pci
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableFactoryContext.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableFactoryContext.java
index 10436da09b..5664e48b4e 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableFactoryContext.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableFactoryContext.java
@@ -18,9 +18,16 @@
package org.apache.seatunnel.api.table.factory;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.common.utils.SeaTunnelException;
+
+import org.apache.commons.lang3.StringUtils;
import lombok.Getter;
+import java.util.ArrayList;
+import java.util.List;
+
@Getter
public abstract class TableFactoryContext {
@@ -31,4 +38,25 @@ public abstract class TableFactoryContext {
this.options = options;
this.classLoader = classLoader;
}
+
+ protected static void checkCatalogTableIllegal(List<CatalogTable>
catalogTables) {
+ for (CatalogTable catalogTable : catalogTables) {
+ List<String> alreadyChecked = new ArrayList<>();
+ for (String fieldName :
catalogTable.getTableSchema().getFieldNames()) {
+ if (StringUtils.isBlank(fieldName)) {
+ throw new SeaTunnelException(
+ String.format(
+ "Table %s field name cannot be empty",
+
catalogTable.getTablePath().getFullName()));
+ }
+ if (alreadyChecked.contains(fieldName)) {
+ throw new SeaTunnelException(
+ String.format(
+ "Table %s field %s duplicate",
+ catalogTable.getTablePath().getFullName(),
fieldName));
+ }
+ alreadyChecked.add(fieldName);
+ }
+ }
+ }
}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSinkFactoryContext.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSinkFactoryContext.java
index 9565bad6a0..3e0eb24cd5 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSinkFactoryContext.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSinkFactoryContext.java
@@ -21,18 +21,24 @@ import
org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.sink.TablePlaceholder;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import com.google.common.annotations.VisibleForTesting;
import lombok.Getter;
import java.util.Collection;
+import java.util.Collections;
@Getter
public class TableSinkFactoryContext extends TableFactoryContext {
private final CatalogTable catalogTable;
- protected TableSinkFactoryContext(
+ @VisibleForTesting
+ public TableSinkFactoryContext(
CatalogTable catalogTable, ReadonlyConfig options, ClassLoader
classLoader) {
super(options, classLoader);
+ if (catalogTable != null) {
+ checkCatalogTableIllegal(Collections.singletonList(catalogTable));
+ }
this.catalogTable = catalogTable;
}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableTransformFactoryContext.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableTransformFactoryContext.java
index bf8176c7a8..8e274a8e5e 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableTransformFactoryContext.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableTransformFactoryContext.java
@@ -32,6 +32,7 @@ public class TableTransformFactoryContext extends
TableFactoryContext {
public TableTransformFactoryContext(
List<CatalogTable> catalogTables, ReadonlyConfig options,
ClassLoader classLoader) {
super(options, classLoader);
+ checkCatalogTableIllegal(catalogTables);
this.catalogTables = catalogTables;
}
}
diff --git
a/seatunnel-api/src/test/java/org/apache/seatunnel/api/table/catalog/CatalogTableTest.java
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/table/catalog/CatalogTableTest.java
index d3c7692b60..0ed7045605 100644
---
a/seatunnel-api/src/test/java/org/apache/seatunnel/api/table/catalog/CatalogTableTest.java
+++
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/table/catalog/CatalogTableTest.java
@@ -18,7 +18,11 @@
package org.apache.seatunnel.api.table.catalog;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
+import org.apache.seatunnel.api.table.factory.TableTransformFactoryContext;
+import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
+import org.apache.seatunnel.common.utils.SeaTunnelException;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@@ -89,4 +93,62 @@ public class CatalogTableTest {
});
Assertions.assertEquals(result,
exception.getParamsValueAs("tableUnsupportedTypes"));
}
+
+ @Test
+ public void testCatalogTableWithIllegalFieldNames() {
+ CatalogTable catalogTable =
+ CatalogTable.of(
+ TableIdentifier.of("catalog", "database", "table"),
+ TableSchema.builder()
+ .column(
+ PhysicalColumn.of(
+ " ", BasicType.STRING_TYPE,
1L, true, null, ""))
+ .build(),
+ Collections.emptyMap(),
+ Collections.emptyList(),
+ "comment");
+ SeaTunnelException exception =
+ Assertions.assertThrows(
+ SeaTunnelException.class,
+ () ->
+ new TableTransformFactoryContext(
+
Collections.singletonList(catalogTable), null, null));
+ SeaTunnelException exception2 =
+ Assertions.assertThrows(
+ SeaTunnelException.class,
+ () -> new TableSinkFactoryContext(catalogTable, null,
null));
+ Assertions.assertEquals(
+ "Table database.table field name cannot be empty",
exception.getMessage());
+ Assertions.assertEquals(
+ "Table database.table field name cannot be empty",
exception2.getMessage());
+
+ CatalogTable catalogTable2 =
+ CatalogTable.of(
+ TableIdentifier.of("catalog", "database", "table"),
+ TableSchema.builder()
+ .column(
+ PhysicalColumn.of(
+ "name1",
BasicType.STRING_TYPE, 1L, true, null, ""))
+ .column(
+ PhysicalColumn.of(
+ "name1",
BasicType.STRING_TYPE, 1L, true, null, ""))
+ .build(),
+ Collections.emptyMap(),
+ Collections.emptyList(),
+ "comment");
+ SeaTunnelException exception3 =
+ Assertions.assertThrows(
+ SeaTunnelException.class,
+ () ->
+ new TableTransformFactoryContext(
+
Collections.singletonList(catalogTable2), null, null));
+ SeaTunnelException exception4 =
+ Assertions.assertThrows(
+ SeaTunnelException.class,
+ () -> new TableSinkFactoryContext(catalogTable2, null,
null));
+ Assertions.assertEquals(
+ "Table database.table field name1 duplicate",
exception3.getMessage());
+ Assertions.assertEquals(
+ "Table database.table field name1 duplicate",
exception4.getMessage());
+ }
}