This is an automated email from the ASF dual-hosted git repository.
tyrantlucifer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new fcde8ee9c [Improve][Zeta] Fallback when the CatalogTables is empty
(#4551)
fcde8ee9c is described below
commit fcde8ee9cf31d3a6ca6f4337f51f2adb715cd7e8
Author: Zongwen Li <[email protected]>
AuthorDate: Wed Apr 12 19:52:47 2023 +0800
[Improve][Zeta] Fallback when the CatalogTables is empty (#4551)
---
.../core/parse/MultipleTableJobConfigParser.java | 18 +++++++++++-------
1 file changed, 11 insertions(+), 7 deletions(-)
diff --git
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
index b4b87cb8e..75bd54a4a 100644
---
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
+++
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
@@ -283,18 +283,22 @@ public class MultipleTableJobConfigParser {
TableSourceFactory.class,
factoryId,
(factory) -> factory.createSource(null));
- if (fallback) {
+
+ final List<CatalogTable> catalogTables = new ArrayList<>();
+ if (!fallback) {
+ List<CatalogTable> tables =
+ CatalogTableUtil.getCatalogTables(sourceConfig,
classLoader);
+ if (!tables.isEmpty()) {
+ catalogTables.addAll(tables);
+ }
+ }
+
+ if (fallback || catalogTables.isEmpty()) {
Tuple2<CatalogTable, Action> tuple =
fallbackParser.parseSource(sourceConfig, jobConfig,
tableId, parallelism);
return new Tuple2<>(tableId, Collections.singletonList(tuple));
}
- final List<CatalogTable> catalogTables =
- CatalogTableUtil.getCatalogTables(sourceConfig, classLoader);
- if (catalogTables.isEmpty()) {
- throw new JobDefineCheckException(
- "The source needs catalog table, please configure
`catalog` or `schema` options.");
- }
if (readonlyConfig.get(SourceOptions.DAG_PARSING_MODE) ==
ParsingMode.SHARDING) {
CatalogTable shardingTable = catalogTables.get(0);
catalogTables.clear();