This is an automated email from the ASF dual-hosted git repository.

tyrantlucifer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new fcde8ee9c [Improve][Zeta] Fallback when the CatalogTables is empty 
(#4551)
fcde8ee9c is described below

commit fcde8ee9cf31d3a6ca6f4337f51f2adb715cd7e8
Author: Zongwen Li <[email protected]>
AuthorDate: Wed Apr 12 19:52:47 2023 +0800

    [Improve][Zeta] Fallback when the CatalogTables is empty (#4551)
---
 .../core/parse/MultipleTableJobConfigParser.java       | 18 +++++++++++-------
 1 file changed, 11 insertions(+), 7 deletions(-)

diff --git 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
index b4b87cb8e..75bd54a4a 100644
--- 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
+++ 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
@@ -283,18 +283,22 @@ public class MultipleTableJobConfigParser {
                         TableSourceFactory.class,
                         factoryId,
                         (factory) -> factory.createSource(null));
-        if (fallback) {
+
+        final List<CatalogTable> catalogTables = new ArrayList<>();
+        if (!fallback) {
+            List<CatalogTable> tables =
+                    CatalogTableUtil.getCatalogTables(sourceConfig, 
classLoader);
+            if (!tables.isEmpty()) {
+                catalogTables.addAll(tables);
+            }
+        }
+
+        if (fallback || catalogTables.isEmpty()) {
             Tuple2<CatalogTable, Action> tuple =
                     fallbackParser.parseSource(sourceConfig, jobConfig, 
tableId, parallelism);
             return new Tuple2<>(tableId, Collections.singletonList(tuple));
         }
 
-        final List<CatalogTable> catalogTables =
-                CatalogTableUtil.getCatalogTables(sourceConfig, classLoader);
-        if (catalogTables.isEmpty()) {
-            throw new JobDefineCheckException(
-                    "The source needs catalog table, please configure 
`catalog` or `schema` options.");
-        }
         if (readonlyConfig.get(SourceOptions.DAG_PARSING_MODE) == 
ParsingMode.SHARDING) {
             CatalogTable shardingTable = catalogTables.get(0);
             catalogTables.clear();

Reply via email to