This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 7eb0547c2 [feature][DAG] Support for mixing Factory and Plugin SPI
(#4359)
7eb0547c2 is described below
commit 7eb0547c25188c0bf206bfa8294fc54f8188cacf
Author: Zongwen Li <[email protected]>
AuthorDate: Tue Mar 21 09:56:01 2023 +0800
[feature][DAG] Support for mixing Factory and Plugin SPI (#4359)
* [feature][api] support RowType convert to CatalogTable
* [feature][DAG] support check the correctness of the DAG
* [feature][DAG] Support for mixing Factory and Plugin SPI
---
.../apache/seatunnel/api/common/CommonOptions.java | 6 +-
.../apache/seatunnel/api/env/EnvCommonOptions.java | 6 -
.../SourceOptions.java} | 28 +-
.../api/table/catalog/CatalogTableUtil.java | 27 +-
.../api/table/factory/TableSinkFactory.java | 3 +-
.../api/table/factory/TableSourceFactory.java | 3 +-
.../api/table/factory/TableTransformFactory.java | 3 +-
.../batch_fakesource_to_file_complex.conf | 4 +-
.../streaming_fakesource_to_file_complex.conf | 4 +-
.../engine/client/LogicalDagGeneratorTest.java | 6 +-
....java => MultipleTableJobConfigParserTest.java} | 36 +-
.../batch_fakesource_to_file_complex.conf | 4 +-
.../src/test/resources/client_test.conf | 4 +-
.../engine/core/parse/ConfigParserUtil.java | 290 ++++++++++
.../engine/core/parse/ConnectorInstanceLoader.java | 21 -
.../engine/core/parse/JobConfigParser.java | 591 +++++----------------
.../core/parse/MultipleTableJobConfigParser.java | 451 ++++++++++++----
.../apache/seatunnel/engine/server/TestUtils.java | 4 +-
.../server/master/JobHistoryServiceTest.java | 6 +-
.../engine/server/master/JobMetricsTest.java | 41 +-
.../src/test/resources/fake_to_console.conf | 4 +-
21 files changed, 870 insertions(+), 672 deletions(-)
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/CommonOptions.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/CommonOptions.java
index fff669a5a..5b6f404cc 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/CommonOptions.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/CommonOptions.java
@@ -20,6 +20,8 @@ package org.apache.seatunnel.api.common;
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
+import java.util.List;
+
public interface CommonOptions {
Option<String> FACTORY_ID =
Options.key("factory")
@@ -47,9 +49,9 @@ public interface CommonOptions {
+ "The data set (dataStream/dataset)
registered here can be directly accessed by other plugins "
+ "by specifying source_table_name .");
- Option<String> SOURCE_TABLE_NAME =
+ Option<List<String>> SOURCE_TABLE_NAME =
Options.key("source_table_name")
- .stringType()
+ .listType()
.noDefaultValue()
.withDescription(
"When source_table_name is not specified, "
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvCommonOptions.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvCommonOptions.java
index eca9fda24..de3226b1c 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvCommonOptions.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvCommonOptions.java
@@ -44,12 +44,6 @@ public interface EnvCommonOptions {
.noDefaultValue()
.withDescription("The job mode of this job, support Batch
and Stream");
- Option<ParsingMode> DAG_PARSING_MODE =
- Options.key("dag-parsing.mode")
- .enumType(ParsingMode.class)
- .defaultValue(ParsingMode.SINGLENESS)
- .withDescription("Whether to enable parsing support for
multi-table jobs");
-
Option<Long> CHECKPOINT_INTERVAL =
Options.key("checkpoint.interval")
.longType()
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableTransformFactory.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceOptions.java
similarity index 54%
copy from
seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableTransformFactory.java
copy to
seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceOptions.java
index 235392b67..85a8b31a8 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableTransformFactory.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceOptions.java
@@ -15,24 +15,16 @@
* limitations under the License.
*/
-package org.apache.seatunnel.api.table.factory;
+package org.apache.seatunnel.api.source;
-import org.apache.seatunnel.api.table.connector.TableTransform;
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.api.env.ParsingMode;
-/**
- * This is an SPI interface, used to create {@link
- * org.apache.seatunnel.api.table.connector.TableTransform}. Each plugin need
to have it own
- * implementation.
- */
-public interface TableTransformFactory extends Factory {
-
- /**
- * We will never use this method now. So gave a default implement and
return null.
- *
- * @param context TableFactoryContext
- * @return
- */
- default <T> TableTransform<T> createTransform(TableFactoryContext context)
{
- throw new UnsupportedOperationException("unsupported now");
- }
+public interface SourceOptions {
+ Option<ParsingMode> DAG_PARSING_MODE =
+ Options.key("dag-parsing.mode")
+ .enumType(ParsingMode.class)
+ .defaultValue(ParsingMode.SINGLENESS)
+ .withDescription("Whether to enable parsing support for
multi-table source");
}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java
index 00603e3f5..a6fa269ab 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java
@@ -76,6 +76,23 @@ public class CatalogTableUtil implements Serializable {
this.catalogTable = catalogTable;
}
+ @Deprecated
+ public static CatalogTable getCatalogTable(String tableName,
SeaTunnelRowType rowType) {
+ TableSchema.Builder schemaBuilder = TableSchema.builder();
+ for (int i = 0; i < rowType.getTotalFields(); i++) {
+ PhysicalColumn column =
+ PhysicalColumn.of(
+ rowType.getFieldName(i), rowType.getFieldType(i),
0, true, null, null);
+ schemaBuilder.column(column);
+ }
+ return CatalogTable.of(
+ TableIdentifier.of("schema", "default", tableName),
+ schemaBuilder.build(),
+ new HashMap<>(),
+ new ArrayList<>(),
+ "It is converted from RowType and only has column
information.");
+ }
+
public static List<CatalogTable> getCatalogTables(Config config,
ClassLoader classLoader) {
ReadonlyConfig readonlyConfig = ReadonlyConfig.fromConfig(config);
Map<String, String> catalogOptions =
@@ -103,11 +120,13 @@ public class CatalogTableUtil implements Serializable {
catalogConfig,
classLoader,
factoryId);
- if (!optionalCatalog.isPresent()) {
- return Collections.emptyList();
- }
+ return optionalCatalog
+ .map(catalog -> getCatalogTables(catalogConfig, catalog))
+ .orElse(Collections.emptyList());
+ }
- Catalog catalog = optionalCatalog.get();
+ public static List<CatalogTable> getCatalogTables(
+ ReadonlyConfig catalogConfig, Catalog catalog) {
// Get the list of specified tables
List<String> tableNames =
catalogConfig.get(CatalogOptions.TABLE_NAMES);
List<CatalogTable> catalogTables = new ArrayList<>();
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSinkFactory.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSinkFactory.java
index 4fb2981f8..f0015fa58 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSinkFactory.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSinkFactory.java
@@ -38,6 +38,7 @@ public interface TableSinkFactory<IN, StateT, CommitInfoT,
AggregatedCommitInfoT
*/
default TableSink<IN, StateT, CommitInfoT, AggregatedCommitInfoT>
createSink(
TableFactoryContext context) {
- throw new UnsupportedOperationException("unsupported now");
+ throw new UnsupportedOperationException(
+ "The Factory has not been implemented and the deprecated
Plugin will be used.");
}
}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSourceFactory.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSourceFactory.java
index 1b600fd45..30f70efde 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSourceFactory.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSourceFactory.java
@@ -36,7 +36,8 @@ public interface TableSourceFactory extends Factory {
*/
default <T, SplitT extends SourceSplit, StateT extends Serializable>
TableSource<T, SplitT, StateT> createSource(TableFactoryContext
context) {
- throw new UnsupportedOperationException("unsupported now");
+ throw new UnsupportedOperationException(
+ "The Factory has not been implemented and the deprecated
Plugin will be used.");
}
/**
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableTransformFactory.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableTransformFactory.java
index 235392b67..33caf328d 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableTransformFactory.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableTransformFactory.java
@@ -33,6 +33,7 @@ public interface TableTransformFactory extends Factory {
* @return
*/
default <T> TableTransform<T> createTransform(TableFactoryContext context)
{
- throw new UnsupportedOperationException("unsupported now");
+ throw new UnsupportedOperationException(
+ "The Factory has not been implemented and the deprecated
Plugin will be used.");
}
}
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file_complex.conf
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file_complex.conf
index 5bbd38ada..3d6ca5054 100644
---
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file_complex.conf
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file_complex.conf
@@ -80,7 +80,7 @@ source {
array.size = 10
bytes.length = 10
string.length = 10
- result_table_name = "fake"
+ result_table_name = "fake2"
schema = {
fields {
c_map = "map<string, array<int>>"
@@ -136,6 +136,6 @@ sink {
filename_time_format="yyyy.MM.dd"
is_enable_transaction=true
save_mode="error",
- source_table_name="fake"
+ source_table_name=["fake","fake2"]
}
}
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/streaming_fakesource_to_file_complex.conf
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/streaming_fakesource_to_file_complex.conf
index 96daf1365..15eb0c429 100644
---
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/streaming_fakesource_to_file_complex.conf
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/streaming_fakesource_to_file_complex.conf
@@ -79,7 +79,7 @@ source {
array.size = 10
bytes.length = 10
string.length = 10
- result_table_name = "fake"
+ result_table_name = "fake2"
parallelism = 1
schema = {
fields {
@@ -136,6 +136,6 @@ sink {
filename_time_format="yyyy.MM.dd"
is_enable_transaction=true
save_mode="error",
- source_table_name="fake"
+ source_table_name=["fake","fake2"]
}
}
diff --git
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/LogicalDagGeneratorTest.java
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/LogicalDagGeneratorTest.java
index b7a2b0383..7b9ea063c 100644
---
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/LogicalDagGeneratorTest.java
+++
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/LogicalDagGeneratorTest.java
@@ -25,7 +25,7 @@ import org.apache.seatunnel.engine.common.utils.IdGenerator;
import org.apache.seatunnel.engine.core.dag.actions.Action;
import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
import org.apache.seatunnel.engine.core.dag.logical.LogicalDagGenerator;
-import org.apache.seatunnel.engine.core.parse.JobConfigParser;
+import org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser;
import org.apache.commons.lang3.tuple.ImmutablePair;
@@ -49,14 +49,14 @@ public class LogicalDagGeneratorTest {
IdGenerator idGenerator = new IdGenerator();
ImmutablePair<List<Action>, Set<URL>> immutablePair =
- new JobConfigParser(filePath, idGenerator, jobConfig).parse();
+ new MultipleTableJobConfigParser(filePath, idGenerator,
jobConfig).parse();
LogicalDagGenerator logicalDagGenerator =
new LogicalDagGenerator(immutablePair.getLeft(), jobConfig,
idGenerator);
LogicalDag logicalDag = logicalDagGenerator.generate();
JsonObject logicalDagJson = logicalDag.getLogicalDagAsJson();
String result =
-
"{\"vertices\":[{\"id\":2,\"name\":\"Source[0]-FakeSource-fake(id=2)\",\"parallelism\":3},{\"id\":3,\"name\":\"Source[1]-FakeSource-fake(id=3)\",\"parallelism\":3},{\"id\":1,\"name\":\"Sink[0]-LocalFile-fake(id=1)\",\"parallelism\":6}],\"edges\":[{\"inputVertex\":\"Source[0]-FakeSource-fake\",\"targetVertex\":\"Sink[0]-LocalFile-fake\"},{\"inputVertex\":\"Source[1]-FakeSource-fake\",\"targetVertex\":\"Sink[0]-LocalFile-fake\"}]}";
+
"{\"vertices\":[{\"id\":1,\"name\":\"Source[0]-FakeSource-fake(id=1)\",\"parallelism\":3},{\"id\":2,\"name\":\"Source[0]-FakeSource-fake2(id=2)\",\"parallelism\":3},{\"id\":3,\"name\":\"Sink[0]-LocalFile-default-identifier(id=3)\",\"parallelism\":3}],\"edges\":[{\"inputVertex\":\"Source[0]-FakeSource-fake\",\"targetVertex\":\"Sink[0]-LocalFile-default-identifier\"},{\"inputVertex\":\"Source[0]-FakeSource-fake2\",\"targetVertex\":\"Sink[0]-LocalFile-default-identifier\"}]}";
Assertions.assertEquals(result, logicalDagJson.toString());
}
}
diff --git
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/JobConfigParserTest.java
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/MultipleTableJobConfigParserTest.java
similarity index 74%
rename from
seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/JobConfigParserTest.java
rename to
seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/MultipleTableJobConfigParserTest.java
index e9e59cf55..a806d0a96 100644
---
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/JobConfigParserTest.java
+++
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/MultipleTableJobConfigParserTest.java
@@ -23,7 +23,7 @@ import org.apache.seatunnel.common.config.DeployMode;
import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.common.utils.IdGenerator;
import org.apache.seatunnel.engine.core.dag.actions.Action;
-import org.apache.seatunnel.engine.core.parse.JobConfigParser;
+import org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser;
import org.apache.commons.lang3.tuple.ImmutablePair;
@@ -31,10 +31,11 @@ import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.net.URL;
+import java.util.Arrays;
import java.util.List;
import java.util.Set;
-public class JobConfigParserTest {
+public class MultipleTableJobConfigParserTest {
@SuppressWarnings("checkstyle:MagicNumber")
@Test
@@ -43,15 +44,15 @@ public class JobConfigParserTest {
String filePath =
TestUtils.getResource("/batch_fakesource_to_file.conf");
JobConfig jobConfig = new JobConfig();
jobConfig.setJobContext(new JobContext());
- JobConfigParser jobConfigParser =
- new JobConfigParser(filePath, new IdGenerator(), jobConfig);
+ MultipleTableJobConfigParser jobConfigParser =
+ new MultipleTableJobConfigParser(filePath, new IdGenerator(),
jobConfig);
ImmutablePair<List<Action>, Set<URL>> parse = jobConfigParser.parse();
List<Action> actions = parse.getLeft();
Assertions.assertEquals(1, actions.size());
- Assertions.assertEquals("Sink[0]-LocalFile-default",
actions.get(0).getName());
+ Assertions.assertEquals("Sink[0]-LocalFile-default-identifier",
actions.get(0).getName());
Assertions.assertEquals(1, actions.get(0).getUpstream().size());
Assertions.assertEquals(
- "Source[0]-FakeSource-default",
actions.get(0).getUpstream().get(0).getName());
+ "Source[0]-FakeSource-fake",
actions.get(0).getUpstream().get(0).getName());
Assertions.assertEquals(3,
actions.get(0).getUpstream().get(0).getParallelism());
Assertions.assertEquals(3, actions.get(0).getParallelism());
@@ -64,21 +65,28 @@ public class JobConfigParserTest {
String filePath =
TestUtils.getResource("/batch_fakesource_to_file_complex.conf");
JobConfig jobConfig = new JobConfig();
jobConfig.setJobContext(new JobContext());
- JobConfigParser jobConfigParser =
- new JobConfigParser(filePath, new IdGenerator(), jobConfig);
+ MultipleTableJobConfigParser jobConfigParser =
+ new MultipleTableJobConfigParser(filePath, new IdGenerator(),
jobConfig);
ImmutablePair<List<Action>, Set<URL>> parse = jobConfigParser.parse();
List<Action> actions = parse.getLeft();
Assertions.assertEquals(1, actions.size());
- Assertions.assertEquals("Sink[0]-LocalFile-fake",
actions.get(0).getName());
+ Assertions.assertEquals("Sink[0]-LocalFile-default-identifier",
actions.get(0).getName());
Assertions.assertEquals(2, actions.get(0).getUpstream().size());
- Assertions.assertEquals(
- "Source[0]-FakeSource-fake",
actions.get(0).getUpstream().get(0).getName());
- Assertions.assertEquals(
- "Source[1]-FakeSource-fake",
actions.get(0).getUpstream().get(1).getName());
+
+ String[] expected = {"Source[0]-FakeSource-fake",
"Source[0]-FakeSource-fake2"};
+ String[] actual = {
+ actions.get(0).getUpstream().get(0).getName(),
+ actions.get(0).getUpstream().get(1).getName()
+ };
+
+ Arrays.sort(expected);
+ Arrays.sort(actual);
+
+ Assertions.assertArrayEquals(expected, actual);
Assertions.assertEquals(3,
actions.get(0).getUpstream().get(0).getParallelism());
Assertions.assertEquals(3,
actions.get(0).getUpstream().get(1).getParallelism());
- Assertions.assertEquals(6, actions.get(0).getParallelism());
+ Assertions.assertEquals(3, actions.get(0).getParallelism());
}
}
diff --git
a/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fakesource_to_file_complex.conf
b/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fakesource_to_file_complex.conf
index fa79239e0..f4acfb7a5 100644
---
a/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fakesource_to_file_complex.conf
+++
b/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fakesource_to_file_complex.conf
@@ -40,7 +40,7 @@ source {
}
FakeSource {
- result_table_name = "fake"
+ result_table_name = "fake2"
schema = {
fields {
name = "string"
@@ -68,6 +68,6 @@ sink {
filename_time_format="yyyy.MM.dd"
is_enable_transaction=true
save_mode="error",
- source_table_name="fake"
+ source_table_name=["fake","fake2"]
}
}
\ No newline at end of file
diff --git
a/seatunnel-engine/seatunnel-engine-client/src/test/resources/client_test.conf
b/seatunnel-engine/seatunnel-engine-client/src/test/resources/client_test.conf
index 4e345cf96..92e159c2a 100644
---
a/seatunnel-engine/seatunnel-engine-client/src/test/resources/client_test.conf
+++
b/seatunnel-engine/seatunnel-engine-client/src/test/resources/client_test.conf
@@ -40,7 +40,7 @@ source {
}
FakeSource {
- result_table_name = "fake"
+ result_table_name = "fake2"
schema = {
fields {
name = "string"
@@ -68,6 +68,6 @@ sink {
filename_time_format="yyyy.MM.dd"
is_enable_transaction=true
save_mode="error",
- source_table_name="fake"
+ source_table_name="fake,fake2"
}
}
\ No newline at end of file
diff --git
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/ConfigParserUtil.java
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/ConfigParserUtil.java
new file mode 100644
index 000000000..3ffdc9586
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/ConfigParserUtil.java
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.core.parse;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.configuration.util.OptionValidationException;
+import org.apache.seatunnel.api.table.catalog.CatalogOptions;
+import org.apache.seatunnel.api.table.factory.CatalogFactory;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.FactoryUtil;
+import org.apache.seatunnel.engine.common.exception.JobDefineCheckException;
+
+import org.apache.commons.collections4.CollectionUtils;
+
+import lombok.extern.slf4j.Slf4j;
+import scala.Tuple2;
+
+import java.net.URL;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.apache.seatunnel.api.common.CommonOptions.FACTORY_ID;
+import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_NAME;
+import static org.apache.seatunnel.api.common.CommonOptions.RESULT_TABLE_NAME;
+import static org.apache.seatunnel.api.common.CommonOptions.SOURCE_TABLE_NAME;
+import static
org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.DEFAULT_ID;
+
+@Slf4j
+public final class ConfigParserUtil {
+ private ConfigParserUtil() {}
+
+ public static <T extends Factory> Set<URL> getFactoryUrls(
+ ReadonlyConfig readonlyConfig,
+ ClassLoader classLoader,
+ Class<T> factoryClass,
+ String factoryId) {
+ Set<URL> factoryUrls = new HashSet<>();
+ URL factoryUrl =
+ FactoryUtil.getFactoryUrl(
+ FactoryUtil.discoverFactory(classLoader, factoryClass,
factoryId));
+ factoryUrls.add(factoryUrl);
+ getCatalogFactoryUrl(readonlyConfig,
classLoader).ifPresent(factoryUrls::add);
+ return factoryUrls;
+ }
+
+ private static Optional<URL> getCatalogFactoryUrl(
+ ReadonlyConfig readonlyConfig, ClassLoader classLoader) {
+ Map<String, String> catalogOptions =
+
readonlyConfig.getOptional(CatalogOptions.CATALOG_OPTIONS).orElse(new
HashMap<>());
+ // TODO: fallback key
+ String factoryId =
+ catalogOptions.getOrDefault(FACTORY_ID.key(),
readonlyConfig.get(PLUGIN_NAME));
+ Optional<CatalogFactory> optionalFactory =
+ FactoryUtil.discoverOptionalFactory(classLoader,
CatalogFactory.class, factoryId);
+ return optionalFactory.map(FactoryUtil::getFactoryUrl);
+ }
+
+ public static void checkGraph(
+ List<? extends Config> sources,
+ List<? extends Config> transforms,
+ List<? extends Config> sinks) {
+ log.debug("Check whether this config file can generate DAG:");
+ if (CollectionUtils.isEmpty(sources) ||
CollectionUtils.isEmpty(sinks)) {
+ throw new JobDefineCheckException("Source And Sink can not be
null");
+ }
+ if (isSimpleGraph(sources, transforms, sinks)) {
+ checkSimpleGraph(sources, transforms, sinks);
+ return;
+ }
+ checkComplexGraph(sources, transforms, sinks);
+ }
+
+ private static boolean isSimpleGraph(
+ List<? extends Config> sources,
+ List<? extends Config> transforms,
+ List<? extends Config> sinks) {
+ return sources.size() == 1
+ && sinks.size() == 1
+ && (CollectionUtils.isEmpty(transforms) || transforms.size()
== 1);
+ }
+
+ private static void checkSimpleGraph(
+ List<? extends Config> sources,
+ List<? extends Config> transforms,
+ List<? extends Config> sinks) {
+ log.debug("This is a simple DAG.");
+ ReadonlyConfig source = ReadonlyConfig.fromConfig(sources.get(0));
+ ReadonlyConfig sink = ReadonlyConfig.fromConfig(sinks.get(0));
+ if (transforms.size() == 0) {
+ checkEdge(source, sink);
+ } else {
+ ReadonlyConfig transform =
ReadonlyConfig.fromConfig(transforms.get(0));
+ checkEdge(source, transform);
+ checkEdge(transform, sink);
+ }
+ }
+
+ @Deprecated
+ private static void checkEdge(ReadonlyConfig leftConfig, ReadonlyConfig
rightConfig) {
+ String tableId = getTableId(leftConfig);
+ String inputTableId = getInputIds(rightConfig).get(0);
+ if (tableId.equals(inputTableId)) {
+ return;
+ }
+
+ // Compatible with previous issues
+ log.info(
+ String.format(
+ "Currently, incorrect configuration of %s and %s
options don't affect job running. In the future we will ban incorrect
configurations.",
+ SOURCE_TABLE_NAME.key(), RESULT_TABLE_NAME.key()));
+ if (DEFAULT_ID.equals(tableId)) {
+ log.warn(
+ String.format(
+ "This configuration is not recommended."
+ + "A source/transform(%s) is not
configured with '%s' option, but subsequent transform/sink(%s) is configured
with '%s' option value of '%s'.",
+ getFactoryId(leftConfig),
+ RESULT_TABLE_NAME.key(),
+ getFactoryId(rightConfig),
+ SOURCE_TABLE_NAME.key(),
+ inputTableId));
+ return;
+ }
+ if (DEFAULT_ID.equals(inputTableId)) {
+ log.warn(
+ String.format(
+ "This configuration is not recommended."
+ + " A source/transform(%s) is configured
with '%s' option value of '%s', but subsequent transform/sink(%s) is not
configured with '%s' option.",
+ getFactoryId(leftConfig),
+ RESULT_TABLE_NAME.key(),
+ tableId,
+ getFactoryId(rightConfig),
+ SOURCE_TABLE_NAME.key()));
+ return;
+ }
+ log.error(
+ String.format(
+ "The '%s' option configured in [%s] is incorrect, and
the source/transform[%s] is not found.",
+ SOURCE_TABLE_NAME.key(), getFactoryId(rightConfig),
inputTableId));
+ }
+
+ private static void checkComplexGraph(
+ List<? extends Config> sources,
+ List<? extends Config> transforms,
+ List<? extends Config> sinks) {
+ log.debug("Start checking the correctness of the complex DAG: ");
+ log.debug(
+ String.format(
+ "Phase 1: Check whether '%s' option is configured.",
+ RESULT_TABLE_NAME.key()));
+ checkExistTableId(sources);
+ checkExistTableId(transforms);
+ log.debug(
+ String.format(
+ "Phase 2: Check whether '%s' option is configured.",
+ SOURCE_TABLE_NAME.key()));
+ checkExistInputTableId(transforms);
+ checkExistInputTableId(sinks);
+
+ log.debug("Phase 3: Generate virtual vertices.");
+ Map<String, Tuple2<Config, VertexStatus>> vertexStatusMap = new
HashMap<>();
+ fillVirtualVertices(sources, vertexStatusMap);
+ fillVirtualVertices(transforms, vertexStatusMap);
+ log.debug("Phase 4: Check if a non-existent vertex is used.");
+ checkInputId(transforms, vertexStatusMap);
+ checkInputId(sinks, vertexStatusMap);
+ log.debug("Phase 5: Check if there are unused vertex.");
+ checkLinked(vertexStatusMap);
+ }
+
+ private static void fillVirtualVertices(
+ List<? extends Config> configs,
+ Map<String, Tuple2<Config, VertexStatus>> vertexStatusMap) {
+ for (Config config : configs) {
+ vertexStatusMap.compute(
+ config.getString(RESULT_TABLE_NAME.key()),
+ (id, old) -> {
+ if (old != null) {
+ throw new JobDefineCheckException(
+ String.format(
+ "The value of the '%s' option of
the (%s and %s) plugins is both '%s', and they must be different.",
+ RESULT_TABLE_NAME.key(),
+
config.getString(PLUGIN_NAME.key()),
+
old._1().getString(PLUGIN_NAME.key()),
+ id));
+ }
+ return new Tuple2<>(config, VertexStatus.CREATED);
+ });
+ }
+ }
+
+ private static void checkInputId(
+ List<? extends Config> configs,
+ Map<String, Tuple2<Config, VertexStatus>> vertexStatusMap) {
+ for (Config config : configs) {
+ List<String> inputIds =
getInputIds(ReadonlyConfig.fromConfig(config));
+ inputIds.forEach(
+ inputId ->
+ vertexStatusMap.compute(
+ inputId,
+ (id, old) -> {
+ if (old == null) {
+ throw new JobDefineCheckException(
+ String.format(
+ "The '%s' option
configured in [%s] is incorrect, and the source/transform[%s] is not found.",
+
SOURCE_TABLE_NAME.key(),
+
config.getString(PLUGIN_NAME.key()),
+ id));
+ }
+ return new Tuple2<>(old._1(),
VertexStatus.LINKED);
+ }));
+ }
+ }
+
+ private static void checkLinked(Map<String, Tuple2<Config, VertexStatus>>
vertexStatusMap) {
+ vertexStatusMap.forEach(
+ (id, vertex) -> {
+ if (vertex._2() == VertexStatus.CREATED) {
+ throw new JobDefineCheckException(
+ String.format(
+ "The '%s' option configured is
incorrect, this table(%s) belonging to source/transform(%s) is not used.",
+ SOURCE_TABLE_NAME.key(),
+ id,
+
vertex._1().getString(PLUGIN_NAME.key())));
+ }
+ });
+ }
+
+ private static void checkExistTableId(List<? extends Config> configs) {
+ for (Config config : configs) {
+ if (!config.hasPath(RESULT_TABLE_NAME.key())) {
+ throw new JobDefineCheckException(
+ String.format(
+ "The source/transform(%s) is not configured
with '%s' option",
+ config.getString(PLUGIN_NAME.key()),
RESULT_TABLE_NAME.key()),
+ new OptionValidationException(RESULT_TABLE_NAME));
+ }
+ }
+ }
+
+ private static void checkExistInputTableId(List<? extends Config> configs)
{
+ for (Config config : configs) {
+ if (!config.hasPath(SOURCE_TABLE_NAME.key())) {
+ throw new JobDefineCheckException(
+ String.format(
+ "The transform/sink(%s) is not configured with
'%s' option",
+ config.getString(PLUGIN_NAME.key()),
SOURCE_TABLE_NAME.key()),
+ new OptionValidationException(SOURCE_TABLE_NAME));
+ }
+ }
+ }
+
+ private static String getTableId(ReadonlyConfig config) {
+ return config.getOptional(RESULT_TABLE_NAME).orElse(DEFAULT_ID);
+ }
+
+ static List<String> getInputIds(ReadonlyConfig config) {
+ return
config.getOptional(SOURCE_TABLE_NAME).orElse(Collections.singletonList(DEFAULT_ID));
+ }
+
+ public static String getFactoryId(ReadonlyConfig readonlyConfig) {
+ return
readonlyConfig.getOptional(FACTORY_ID).orElse(readonlyConfig.get(PLUGIN_NAME));
+ }
+
+ private enum VertexStatus {
+ CREATED,
+ LINKED
+ }
+}
diff --git
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/ConnectorInstanceLoader.java
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/ConnectorInstanceLoader.java
index 32f824c24..820b92c89 100644
---
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/ConnectorInstanceLoader.java
+++
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/ConnectorInstanceLoader.java
@@ -21,12 +21,10 @@ import
org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
-import org.apache.seatunnel.api.sink.SupportDataSaveMode;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.transform.SeaTunnelTransform;
import org.apache.seatunnel.common.constants.CollectionConstants;
-import org.apache.seatunnel.common.constants.JobMode;
import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
import
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery;
import
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSourcePluginDiscovery;
@@ -61,16 +59,6 @@ public class ConnectorInstanceLoader {
SeaTunnelSource seaTunnelSource =
sourcePluginDiscovery.createPluginInstance(pluginIdentifier,
pluginJars);
- seaTunnelSource.prepare(sourceConfig);
- seaTunnelSource.setJobContext(jobContext);
- if (jobContext.getJobMode() == JobMode.BATCH
- && seaTunnelSource.getBoundedness()
- ==
org.apache.seatunnel.api.source.Boundedness.UNBOUNDED) {
- throw new UnsupportedOperationException(
- String.format(
- "'%s' source don't support off-line job.",
- seaTunnelSource.getPluginName()));
- }
return new ImmutablePair<>(seaTunnelSource, new
HashSet<>(pluginJarPaths));
}
@@ -87,13 +75,6 @@ public class ConnectorInstanceLoader {
sinkPluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier));
SeaTunnelSink<SeaTunnelRow, Serializable, Serializable, Serializable>
seaTunnelSink =
sinkPluginDiscovery.createPluginInstance(pluginIdentifier,
pluginJars);
- seaTunnelSink.prepare(sinkConfig);
- seaTunnelSink.setJobContext(jobContext);
- if
(seaTunnelSink.getClass().isAssignableFrom(SupportDataSaveMode.class)) {
- SupportDataSaveMode saveModeSink = (SupportDataSaveMode)
seaTunnelSink;
- saveModeSink.checkOptions(sinkConfig);
- }
-
return new ImmutablePair<>(seaTunnelSink, new
HashSet<>(pluginJarPaths));
}
@@ -111,8 +92,6 @@ public class ConnectorInstanceLoader {
transformPluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier));
SeaTunnelTransform<?> seaTunnelTransform =
transformPluginDiscovery.createPluginInstance(pluginIdentifier, pluginJars);
- seaTunnelTransform.prepare(transformConfig);
- seaTunnelTransform.setJobContext(jobContext);
return new ImmutablePair<>(seaTunnelTransform, new
HashSet<>(pluginJarPaths));
}
}
diff --git
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java
index 04dcbe8d0..9b6812763 100644
---
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java
+++
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java
@@ -20,496 +20,187 @@ package org.apache.seatunnel.engine.core.parse;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.api.common.CommonOptions;
-import org.apache.seatunnel.api.env.EnvCommonOptions;
-import org.apache.seatunnel.api.sink.DataSaveMode;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
-import org.apache.seatunnel.api.sink.SupportDataSaveMode;
import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.api.transform.PartitionSeaTunnelTransform;
import org.apache.seatunnel.api.transform.SeaTunnelTransform;
-import org.apache.seatunnel.common.Constants;
-import org.apache.seatunnel.common.config.TypesafeConfigUtils;
import org.apache.seatunnel.common.constants.CollectionConstants;
-import org.apache.seatunnel.common.constants.JobMode;
-import org.apache.seatunnel.core.starter.utils.ConfigBuilder;
import org.apache.seatunnel.engine.common.config.JobConfig;
-import org.apache.seatunnel.engine.common.exception.JobDefineCheckException;
-import
org.apache.seatunnel.engine.common.loader.SeaTunnelChildFirstClassLoader;
import org.apache.seatunnel.engine.common.utils.IdGenerator;
import org.apache.seatunnel.engine.core.dag.actions.Action;
import org.apache.seatunnel.engine.core.dag.actions.SinkAction;
import org.apache.seatunnel.engine.core.dag.actions.SourceAction;
import org.apache.seatunnel.engine.core.dag.actions.TransformAction;
-import org.apache.commons.collections4.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.ImmutablePair;
-import com.google.common.collect.Lists;
import com.hazelcast.logging.ILogger;
import com.hazelcast.logging.Logger;
import lombok.Data;
import lombok.NonNull;
import scala.Serializable;
+import scala.Tuple2;
import java.net.URL;
-import java.nio.file.Paths;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
-import java.util.Map;
-import java.util.Objects;
import java.util.Set;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+import static
org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.DEFAULT_ID;
+import static
org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.checkProducedTypeEquals;
+import static
org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.ensureJobModeMatch;
+import static
org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.handleSaveMode;
@Data
public class JobConfigParser {
private static final ILogger LOGGER =
Logger.getLogger(JobConfigParser.class);
- private String jobDefineFilePath;
private IdGenerator idGenerator;
- private Map<Action, String> alreadyTransformActionMap = new HashMap<>();
-
- private Map<String, List<Config>> transformResultTableNameMap = new
HashMap<>();
- private Map<String, List<Config>> transformSourceTableNameMap = new
HashMap<>();
-
- private Map<String, List<Config>> sourceResultTableNameMap = new
HashMap<>();
-
- private List<Action> actions = new ArrayList<>();
- private Set<URL> jarUrlsSet = new HashSet<>();
-
- private JobConfig jobConfig;
-
- private Config seaTunnelJobConfig;
-
- private Config envConfigs;
-
private List<URL> commonPluginJars;
- public JobConfigParser(
- @NonNull String jobDefineFilePath,
- @NonNull IdGenerator idGenerator,
- @NonNull JobConfig jobConfig) {
- this(jobDefineFilePath, idGenerator, jobConfig,
Collections.emptyList());
- }
-
- public JobConfigParser(
- @NonNull String jobDefineFilePath,
- @NonNull IdGenerator idGenerator,
- @NonNull JobConfig jobConfig,
- @NonNull List<URL> commonPluginJars) {
- this.jobDefineFilePath = jobDefineFilePath;
+ public JobConfigParser(@NonNull IdGenerator idGenerator, @NonNull
List<URL> commonPluginJars) {
this.idGenerator = idGenerator;
- this.jobConfig = jobConfig;
- this.seaTunnelJobConfig =
ConfigBuilder.of(Paths.get(jobDefineFilePath));
- this.envConfigs = seaTunnelJobConfig.getConfig("env");
this.commonPluginJars = commonPluginJars;
}
- public ImmutablePair<List<Action>, Set<URL>> parse() {
- Thread.currentThread()
- .setContextClassLoader(new SeaTunnelChildFirstClassLoader(new
ArrayList<>()));
- List<? extends Config> sinkConfigs =
seaTunnelJobConfig.getConfigList("sink");
- List<? extends Config> transformConfigs =
- TypesafeConfigUtils.getConfigList(
- seaTunnelJobConfig, "transform",
Collections.emptyList());
- List<? extends Config> sourceConfigs =
seaTunnelJobConfig.getConfigList("source");
-
- if (CollectionUtils.isEmpty(sinkConfigs) ||
CollectionUtils.isEmpty(sourceConfigs)) {
- throw new JobDefineCheckException("Source And Sink can not be
null");
- }
-
- jobConfigAnalyze(envConfigs);
-
- if (sinkConfigs.size() == 1
- && sourceConfigs.size() == 1
- && (CollectionUtils.isEmpty(transformConfigs) ||
transformConfigs.size() == 1)) {
- sampleAnalyze(sourceConfigs, transformConfigs, sinkConfigs);
- } else {
- complexAnalyze(sourceConfigs, transformConfigs, sinkConfigs);
- }
- actions.forEach(this::addCommonPluginJarsToAction);
- jarUrlsSet.addAll(commonPluginJars);
- return new ImmutablePair<>(actions, jarUrlsSet);
- }
-
- private void addCommonPluginJarsToAction(Action action) {
- action.getJarUrls().addAll(commonPluginJars);
- if (!action.getUpstream().isEmpty()) {
- action.getUpstream().forEach(this::addCommonPluginJarsToAction);
- }
- }
-
- void jobConfigAnalyze(@NonNull Config envConfigs) {
- if (envConfigs.hasPath(EnvCommonOptions.JOB_MODE.key())) {
- jobConfig
- .getJobContext()
- .setJobMode(envConfigs.getEnum(JobMode.class,
EnvCommonOptions.JOB_MODE.key()));
- } else {
-
jobConfig.getJobContext().setJobMode(EnvCommonOptions.JOB_MODE.defaultValue());
- }
-
- if (StringUtils.isEmpty(jobConfig.getName())
- || jobConfig.getName().equals(Constants.LOGO)) {
- if (envConfigs.hasPath(EnvCommonOptions.JOB_NAME.key())) {
-
jobConfig.setName(envConfigs.getString(EnvCommonOptions.JOB_NAME.key()));
- } else {
- jobConfig.setName(EnvCommonOptions.JOB_NAME.defaultValue());
- }
- }
-
- if (envConfigs.hasPath(EnvCommonOptions.CHECKPOINT_INTERVAL.key())) {
- jobConfig
- .getEnvOptions()
- .put(
- EnvCommonOptions.CHECKPOINT_INTERVAL.key(),
-
envConfigs.getLong(EnvCommonOptions.CHECKPOINT_INTERVAL.key()));
- }
- }
-
- /**
- * If there are multiple sources or multiple transforms or multiple sink,
We will rely on
- * source_table_name and result_table_name to build actions pipeline. So
in this case
- * result_table_name is necessary for the Source Connector and all of
result_table_name and
- * source_table_name are necessary for Transform Connector. By the end,
source_table_name is
- * necessary for Sink Connector.
- */
- private void complexAnalyze(
- List<? extends Config> sourceConfigs,
- List<? extends Config> transformConfigs,
- List<? extends Config> sinkConfigs) {
- initRelationMap(sourceConfigs, transformConfigs);
-
- for (int configIndex = 0; configIndex < sinkConfigs.size();
configIndex++) {
- Config config = sinkConfigs.get(configIndex);
- ImmutablePair<
- SeaTunnelSink<SeaTunnelRow, Serializable,
Serializable, Serializable>,
- Set<URL>>
- sinkListImmutablePair =
- ConnectorInstanceLoader.loadSinkInstance(
- config, jobConfig.getJobContext(),
commonPluginJars);
-
- String sinkActionName =
- createSinkActionName(
- configIndex,
- sinkListImmutablePair.getLeft().getPluginName(),
- getTableName(config));
- SinkAction sinkAction =
- createSinkAction(
- idGenerator.getNextId(),
- sinkActionName,
- sinkListImmutablePair.getLeft(),
- sinkListImmutablePair.getRight());
-
- actions.add(sinkAction);
- if (!config.hasPath(CommonOptions.SOURCE_TABLE_NAME.key())) {
- throw new JobDefineCheckException(
- CommonOptions.SOURCE_TABLE_NAME
- + " must be set in the sink plugin config when
the job have complex dependencies");
- }
- String sourceTableName =
config.getString(CommonOptions.SOURCE_TABLE_NAME.key());
- List<Config> transformConfigList =
transformResultTableNameMap.get(sourceTableName);
- SeaTunnelDataType<?> dataType;
- if (CollectionUtils.isEmpty(transformConfigList)) {
- dataType = sourceAnalyze(sourceTableName, sinkAction);
- } else if (transformConfigList.size() > 1) {
- throw new JobDefineCheckException(
- "Only UnionTransform can have more than one upstream, "
- + sinkAction.getName()
- + " is not UnionTransform Connector");
- } else {
- dataType = transformAnalyze(sourceTableName, sinkAction);
- }
- SeaTunnelSink<SeaTunnelRow, Serializable, Serializable,
Serializable> seaTunnelSink =
- sinkListImmutablePair.getLeft();
- seaTunnelSink.setTypeInfo((SeaTunnelRowType) dataType);
- if
(SupportDataSaveMode.class.isAssignableFrom(seaTunnelSink.getClass())) {
- SupportDataSaveMode saveModeSink = (SupportDataSaveMode)
seaTunnelSink;
- DataSaveMode dataSaveMode = saveModeSink.getDataSaveMode();
- saveModeSink.handleSaveMode(dataSaveMode);
- }
- }
- }
-
- private SeaTunnelDataType sourceAnalyze(String sourceTableName, Action
action) {
- List<Config> sourceConfigList =
sourceResultTableNameMap.get(sourceTableName);
- if (CollectionUtils.isEmpty(sourceConfigList)) {
- throw new JobDefineCheckException(
- action.getName()
- + " source table name ["
- + sourceTableName
- + "] can not be found");
- }
-
- // If a transform have more than one upstream action, the parallelism
of this transform is
- // the sum of the parallelism
- // of its upstream action.
- SeaTunnelDataType dataType = null;
- AtomicInteger totalParallelism = new AtomicInteger();
- for (int configIndex = 0; configIndex < sourceConfigList.size();
configIndex++) {
- Config sourceConfig = sourceConfigList.get(configIndex);
- ImmutablePair<SeaTunnelSource, Set<URL>>
seaTunnelSourceListImmutablePair =
- ConnectorInstanceLoader.loadSourceInstance(
- sourceConfig, jobConfig.getJobContext(),
commonPluginJars);
- dataType =
seaTunnelSourceListImmutablePair.getLeft().getProducedType();
- String sourceActionName =
- createSourceActionName(
- configIndex,
-
sourceConfig.getString(CollectionConstants.PLUGIN_NAME),
- getTableName(sourceConfig));
- SourceAction sourceAction =
- createSourceAction(
- idGenerator.getNextId(),
- sourceActionName,
- seaTunnelSourceListImmutablePair.getLeft(),
- seaTunnelSourceListImmutablePair.getRight());
-
- int sourceParallelism = getSourceParallelism(sourceConfig);
- sourceAction.setParallelism(sourceParallelism);
- totalParallelism.set(totalParallelism.get() + sourceParallelism);
- action.addUpstream(sourceAction);
- action.setParallelism(totalParallelism.get());
- }
- return dataType;
- }
-
- private SeaTunnelDataType<?> transformAnalyze(String sourceTableName,
Action action) {
- // find upstream transform node
- List<Config> transformConfigList =
transformResultTableNameMap.get(sourceTableName);
- if (CollectionUtils.isEmpty(transformConfigList)) {
- return sourceAnalyze(sourceTableName, action);
+ public Tuple2<CatalogTable, Action> parseSource(
+ Config config, JobConfig jobConfig, String tableId, int
parallelism) {
+ ImmutablePair<SeaTunnelSource, Set<URL>> tuple =
+ ConnectorInstanceLoader.loadSourceInstance(
+ config, jobConfig.getJobContext(), commonPluginJars);
+ final SeaTunnelSource source = tuple.getLeft();
+ // old logic: prepare(initialization) -> set job context
+ source.prepare(config);
+ source.setJobContext(jobConfig.getJobContext());
+ ensureJobModeMatch(jobConfig.getJobContext(), source);
+ String actionName =
+ createSourceActionName(
+ 0, config.getString(CollectionConstants.PLUGIN_NAME),
getTableName(config));
+ SourceAction action =
+ new SourceAction(
+ idGenerator.getNextId(), actionName, tuple.getLeft(),
tuple.getRight());
+ action.setParallelism(parallelism);
+ SeaTunnelRowType producedType = (SeaTunnelRowType)
tuple.getLeft().getProducedType();
+ CatalogTable catalogTable = CatalogTableUtil.getCatalogTable(tableId,
producedType);
+ return new Tuple2<>(catalogTable, action);
+ }
+
+ public Tuple2<CatalogTable, Action> parseTransform(
+ Config config,
+ JobConfig jobConfig,
+ String tableId,
+ int parallelism,
+ SeaTunnelRowType rowType,
+ Set<Action> inputActions) {
+ final ImmutablePair<SeaTunnelTransform<?>, Set<URL>> tuple =
+ ConnectorInstanceLoader.loadTransformInstance(
+ config, jobConfig.getJobContext(), commonPluginJars);
+ final SeaTunnelTransform<?> transform = tuple.getLeft();
+ // old logic: prepare(initialization) -> set job context -> set row
type (There is a logical
+ // judgment that depends on before and after, not a simple set)
+ transform.prepare(config);
+ transform.setJobContext(jobConfig.getJobContext());
+ transform.setTypeInfo((SeaTunnelDataType) rowType);
+ final String actionName =
+ createTransformActionName(0, tuple.getLeft().getPluginName(),
getTableName(config));
+ final TransformAction action =
+ new TransformAction(
+ idGenerator.getNextId(),
+ actionName,
+ new ArrayList<>(inputActions),
+ transform,
+ tuple.getRight());
+ action.setParallelism(parallelism);
+ CatalogTable catalogTable =
+ CatalogTableUtil.getCatalogTable(
+ tableId, (SeaTunnelRowType)
transform.getProducedType());
+ return new Tuple2<>(catalogTable, action);
+ }
+
+ public List<SinkAction<?, ?, ?, ?>> parseSinks(
+ List<List<Tuple2<CatalogTable, Action>>> inputVertices,
+ Config sinkConfig,
+ JobConfig jobConfig) {
+ List<SinkAction<?, ?, ?, ?>> sinkActions = new ArrayList<>();
+ int spareParallelism =
inputVertices.get(0).get(0)._2().getParallelism();
+ if (inputVertices.size() > 1) {
+ // union
+ Set<Action> inputActions =
+ inputVertices.stream()
+ .flatMap(Collection::stream)
+ .map(Tuple2::_2)
+ .collect(Collectors.toSet());
+ checkProducedTypeEquals(inputActions);
+ SinkAction<?, ?, ?, ?> sinkAction =
+ parseSink(
+ sinkConfig,
+ jobConfig,
+ spareParallelism,
+ inputVertices
+ .get(0)
+ .get(0)
+ ._1()
+ .getTableSchema()
+ .toPhysicalRowDataType(),
+ inputActions);
+ sinkActions.add(sinkAction);
} else {
- AtomicInteger totalParallelism = new AtomicInteger();
- SeaTunnelDataType<?> dataTypeResult = null;
- for (int configIndex = 0; configIndex <
transformConfigList.size(); configIndex++) {
- Config config = transformConfigList.get(configIndex);
- ImmutablePair<SeaTunnelTransform<?>, Set<URL>>
transformListImmutablePair =
- ConnectorInstanceLoader.loadTransformInstance(
- config, jobConfig.getJobContext(),
commonPluginJars);
- String transformActionName =
- createTransformActionName(
- configIndex,
-
transformListImmutablePair.getLeft().getPluginName(),
- getTableName(config));
- TransformAction transformAction =
- createTransformAction(
- idGenerator.getNextId(),
- transformActionName,
- transformListImmutablePair.getLeft(),
- transformListImmutablePair.getRight());
-
- action.addUpstream(transformAction);
- SeaTunnelDataType dataType =
- transformAnalyze(
-
config.getString(CommonOptions.SOURCE_TABLE_NAME.key()),
- transformAction);
- transformListImmutablePair.getLeft().setTypeInfo(dataType);
- dataTypeResult =
transformListImmutablePair.getLeft().getProducedType();
- totalParallelism.set(totalParallelism.get() +
transformAction.getParallelism());
- action.setParallelism(totalParallelism.get());
+ // sink template
+ for (Tuple2<CatalogTable, Action> tableTuple :
inputVertices.get(0)) {
+ CatalogTable catalogTable = tableTuple._1();
+ Action inputAction = tableTuple._2();
+ int parallelism = inputAction.getParallelism();
+ SinkAction<?, ?, ?, ?> sinkAction =
+ parseSink(
+ sinkConfig,
+ jobConfig,
+ parallelism,
+
catalogTable.getTableSchema().toPhysicalRowDataType(),
+ Collections.singleton(inputAction));
+ sinkActions.add(sinkAction);
}
- return dataTypeResult;
}
+ return sinkActions;
}
- private void initRelationMap(
- List<? extends Config> sourceConfigs, List<? extends Config>
transformConfigs) {
- for (Config config : sourceConfigs) {
- if (!config.hasPath(CommonOptions.RESULT_TABLE_NAME.key())) {
- throw new JobDefineCheckException(
- CommonOptions.RESULT_TABLE_NAME.key()
- + " must be set in the source plugin config
when the job have complex dependencies");
- }
- String resultTableName =
config.getString(CommonOptions.RESULT_TABLE_NAME.key());
- sourceResultTableNameMap.computeIfAbsent(resultTableName, k -> new
ArrayList<>());
- sourceResultTableNameMap.get(resultTableName).add(config);
- }
-
- for (Config config : transformConfigs) {
- if (!config.hasPath(CommonOptions.RESULT_TABLE_NAME.key())) {
- throw new JobDefineCheckException(
- CommonOptions.RESULT_TABLE_NAME.key()
- + " must be set in the transform plugin config
when the job have complex dependencies");
- }
-
- if (!config.hasPath(CommonOptions.SOURCE_TABLE_NAME.key())) {
- throw new JobDefineCheckException(
- CommonOptions.SOURCE_TABLE_NAME.key()
- + " must be set in the transform plugin config
when the job have complex dependencies");
- }
- String resultTableName =
config.getString(CommonOptions.RESULT_TABLE_NAME.key());
- String sourceTableName =
config.getString(CommonOptions.SOURCE_TABLE_NAME.key());
- if (Objects.equals(sourceTableName, resultTableName)) {
- throw new JobDefineCheckException(
- String.format(
- "Source{%s} and result{%s} table name cannot
be equals",
- sourceTableName, resultTableName));
- }
-
- transformResultTableNameMap.computeIfAbsent(resultTableName, k ->
new ArrayList<>());
- transformResultTableNameMap.get(resultTableName).add(config);
-
- transformSourceTableNameMap.computeIfAbsent(sourceTableName, k ->
new ArrayList<>());
- transformSourceTableNameMap.get(sourceTableName).add(config);
- }
- }
-
- /**
- * If there is only one Source and one Sink and at most one Transform, We
simply build actions
- * pipeline in the following order Source | Transform(If have) | Sink
- */
- private void sampleAnalyze(
- List<? extends Config> sourceConfigs,
- List<? extends Config> transformConfigs,
- List<? extends Config> sinkConfigs) {
- ImmutablePair<SeaTunnelSource, Set<URL>> pair =
- ConnectorInstanceLoader.loadSourceInstance(
- sourceConfigs.get(0), jobConfig.getJobContext(),
commonPluginJars);
- String sourceActionName =
- createSourceActionName(0, pair.getLeft().getPluginName(),
"default");
- SourceAction sourceAction =
- createSourceAction(
- idGenerator.getNextId(), sourceActionName,
pair.getLeft(), pair.getRight());
-
sourceAction.setParallelism(getSourceParallelism(sourceConfigs.get(0)));
- SeaTunnelDataType dataType =
sourceAction.getSource().getProducedType();
-
- Action sinkUpstreamAction = sourceAction;
-
- if (!CollectionUtils.isEmpty(transformConfigs)) {
- ImmutablePair<SeaTunnelTransform<?>, Set<URL>>
transformListImmutablePair =
- ConnectorInstanceLoader.loadTransformInstance(
- transformConfigs.get(0),
jobConfig.getJobContext(), commonPluginJars);
- transformListImmutablePair.getLeft().setTypeInfo(dataType);
-
- dataType = transformListImmutablePair.getLeft().getProducedType();
- String transformActionName =
- createTransformActionName(
- 0,
transformListImmutablePair.getLeft().getPluginName(), "default");
- TransformAction transformAction =
- createTransformAction(
- idGenerator.getNextId(),
- transformActionName,
- Lists.newArrayList(sourceAction),
- transformListImmutablePair.getLeft(),
- transformListImmutablePair.getRight());
-
- initTransformParallelism(
- transformConfigs,
- sourceAction,
- transformListImmutablePair.getLeft(),
- transformAction);
-
- sinkUpstreamAction = transformAction;
- }
-
- ImmutablePair<
+ private SinkAction<?, ?, ?, ?> parseSink(
+ Config config,
+ JobConfig jobConfig,
+ int parallelism,
+ SeaTunnelRowType rowType,
+ Set<Action> inputActions) {
+ final ImmutablePair<
SeaTunnelSink<SeaTunnelRow, Serializable,
Serializable, Serializable>,
Set<URL>>
- sinkListImmutablePair =
+ tuple =
ConnectorInstanceLoader.loadSinkInstance(
- sinkConfigs.get(0), jobConfig.getJobContext(),
commonPluginJars);
- String sinkActionName =
- createSinkActionName(0,
sinkListImmutablePair.getLeft().getPluginName(), "default");
- SinkAction sinkAction =
- createSinkAction(
+ config, jobConfig.getJobContext(),
commonPluginJars);
+ final SeaTunnelSink<SeaTunnelRow, Serializable, Serializable,
Serializable> sink =
+ tuple.getLeft();
+ // old logic: prepare(initialization) -> set job context -> set row
type (There is a logical
+ // judgment that depends on before and after, not a simple set)
+ sink.prepare(config);
+ sink.setJobContext(jobConfig.getJobContext());
+ sink.setTypeInfo(rowType);
+ handleSaveMode(sink);
+ final String actionName =
+ createSinkActionName(0, tuple.getLeft().getPluginName(),
getTableName(config));
+ final SinkAction action =
+ new SinkAction<>(
idGenerator.getNextId(),
- sinkActionName,
- Lists.newArrayList(sinkUpstreamAction),
- sinkListImmutablePair.getLeft(),
- sinkListImmutablePair.getRight());
- SeaTunnelSink<?, ?, ?, ?> seaTunnelSink = sinkAction.getSink();
- seaTunnelSink.setTypeInfo((SeaTunnelRowType) dataType);
- sinkAction.setParallelism(sinkUpstreamAction.getParallelism());
- if
(SupportDataSaveMode.class.isAssignableFrom(seaTunnelSink.getClass())) {
- SupportDataSaveMode saveModeSink = (SupportDataSaveMode)
seaTunnelSink;
- DataSaveMode dataSaveMode = saveModeSink.getDataSaveMode();
- saveModeSink.handleSaveMode(dataSaveMode);
- }
- actions.add(sinkAction);
- }
-
- private void initTransformParallelism(
- List<? extends Config> transformConfigs,
- Action upstreamAction,
- SeaTunnelTransform seaTunnelTransform,
- TransformAction transformAction) {
- if (seaTunnelTransform instanceof PartitionSeaTunnelTransform
- &&
transformConfigs.get(0).hasPath(CommonOptions.PARALLELISM.key())) {
- transformAction.setParallelism(
-
transformConfigs.get(0).getInt(CommonOptions.PARALLELISM.key()));
- } else {
- // If transform type is not RePartitionTransform, Using the
parallelism of its upstream
- // operators.
- transformAction.setParallelism(upstreamAction.getParallelism());
- }
- }
-
- private int getSourceParallelism(Config sourceConfig) {
- if (sourceConfig.hasPath(CommonOptions.PARALLELISM.key())) {
- int sourceParallelism =
sourceConfig.getInt(CommonOptions.PARALLELISM.key());
- return Math.max(sourceParallelism, 1);
- }
- int executionParallelism = 0;
- if (envConfigs.hasPath(CommonOptions.PARALLELISM.key())) {
- executionParallelism =
envConfigs.getInt(CommonOptions.PARALLELISM.key());
- }
- return Math.max(executionParallelism, 1);
- }
-
- private SourceAction createSourceAction(
- long id, @NonNull String name, @NonNull SeaTunnelSource source,
Set<URL> jarUrls) {
- if (!CollectionUtils.isEmpty(jarUrls)) {
- jarUrlsSet.addAll(jarUrls);
- }
- return new SourceAction(id, name, source, jarUrls);
- }
-
- private TransformAction createTransformAction(
- long id,
- @NonNull String name,
- @NonNull List<Action> upstreams,
- @NonNull SeaTunnelTransform transformation,
- Set<URL> jarUrls) {
- if (!CollectionUtils.isEmpty(jarUrls)) {
- jarUrlsSet.addAll(jarUrls);
- }
- return new TransformAction(id, name, upstreams, transformation,
jarUrls);
- }
-
- private SinkAction createSinkAction(
- long id,
- @NonNull String name,
- @NonNull List<Action> upstreams,
- @NonNull SeaTunnelSink sink,
- Set<URL> jarUrls) {
- if (!CollectionUtils.isEmpty(jarUrls)) {
- jarUrlsSet.addAll(jarUrls);
- }
- return new SinkAction(id, name, upstreams, sink, jarUrls);
- }
-
- private TransformAction createTransformAction(
- long id,
- @NonNull String name,
- @NonNull SeaTunnelTransform transformation,
- Set<URL> jarUrls) {
- if (!CollectionUtils.isEmpty(jarUrls)) {
- jarUrlsSet.addAll(jarUrls);
- }
- return new TransformAction(id, name, transformation, jarUrls);
- }
-
- private SinkAction createSinkAction(
- long id, @NonNull String name, @NonNull SeaTunnelSink sink,
Set<URL> jarUrls) {
- if (!CollectionUtils.isEmpty(jarUrls)) {
- jarUrlsSet.addAll(jarUrls);
- }
- return new SinkAction(id, name, sink, jarUrls);
+ actionName,
+ new ArrayList<>(inputActions),
+ sink,
+ tuple.getRight());
+ action.setParallelism(parallelism);
+ return action;
}
static String createSourceActionName(int configIndex, String pluginName,
String tableName) {
@@ -525,27 +216,17 @@ public class JobConfigParser {
}
static String getTableName(Config config) {
- return getTableName(config, "default");
+ return getTableName(config, DEFAULT_ID);
}
static String getTableName(Config config, String defaultValue) {
- String sourceTableName = null;
- if (config.hasPath(CommonOptions.SOURCE_TABLE_NAME.key())) {
- sourceTableName =
config.getString(CommonOptions.SOURCE_TABLE_NAME.key());
- }
String resultTableName = null;
if (config.hasPath(CommonOptions.RESULT_TABLE_NAME.key())) {
resultTableName =
config.getString(CommonOptions.RESULT_TABLE_NAME.key());
}
- if (sourceTableName != null && resultTableName != null) {
- return String.format("%s_%s", sourceTableName, resultTableName);
- }
- if (sourceTableName == null) {
- return resultTableName;
- }
if (resultTableName == null) {
- return sourceTableName;
+ return defaultValue;
}
- return defaultValue;
+ return resultTableName;
}
}
diff --git
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
index 4c96a20e8..8777a9228 100644
---
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
+++
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.engine.core.parse;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.api.common.CommonOptions;
+import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.env.EnvCommonOptions;
import org.apache.seatunnel.api.env.ParsingMode;
@@ -27,17 +28,22 @@ import org.apache.seatunnel.api.sink.DataSaveMode;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SupportDataSaveMode;
import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceOptions;
import org.apache.seatunnel.api.source.SourceSplit;
-import org.apache.seatunnel.api.table.catalog.CatalogOptions;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.catalog.TablePath;
-import org.apache.seatunnel.api.table.factory.CatalogFactory;
+import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.FactoryUtil;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
+import org.apache.seatunnel.api.table.factory.TableTransformFactory;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.Constants;
import org.apache.seatunnel.common.config.Common;
+import org.apache.seatunnel.common.config.TypesafeConfigUtils;
+import org.apache.seatunnel.common.constants.JobMode;
import org.apache.seatunnel.common.utils.FileUtils;
import org.apache.seatunnel.core.starter.utils.ConfigBuilder;
import org.apache.seatunnel.engine.common.config.JobConfig;
@@ -48,6 +54,7 @@ import org.apache.seatunnel.engine.core.dag.actions.Action;
import org.apache.seatunnel.engine.core.dag.actions.SinkAction;
import org.apache.seatunnel.engine.core.dag.actions.SinkConfig;
import org.apache.seatunnel.engine.core.dag.actions.SourceAction;
+import org.apache.seatunnel.engine.core.dag.actions.TransformAction;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
@@ -55,6 +62,7 @@ import org.apache.commons.lang3.tuple.ImmutablePair;
import com.hazelcast.logging.ILogger;
import com.hazelcast.logging.Logger;
+import lombok.extern.slf4j.Slf4j;
import scala.Tuple2;
import java.io.IOException;
@@ -62,18 +70,30 @@ import java.io.Serializable;
import java.net.URL;
import java.nio.file.Paths;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
-import java.util.HashMap;
import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Optional;
+import java.util.Queue;
import java.util.Set;
+import java.util.function.Consumer;
import java.util.stream.Collectors;
+import static
org.apache.seatunnel.engine.core.parse.ConfigParserUtil.getFactoryId;
+import static
org.apache.seatunnel.engine.core.parse.ConfigParserUtil.getFactoryUrls;
+import static
org.apache.seatunnel.engine.core.parse.ConfigParserUtil.getInputIds;
+
+@Slf4j
public class MultipleTableJobConfigParser {
- private static final ILogger LOGGER =
Logger.getLogger(JobConfigParser.class);
+ private static final ILogger LOGGER =
Logger.getLogger(MultipleTableJobConfigParser.class);
+
+ static final String DEFAULT_ID = "default-identifier";
private final IdGenerator idGenerator;
private final JobConfig jobConfig;
@@ -85,8 +105,6 @@ public class MultipleTableJobConfigParser {
private final JobConfigParser fallbackParser;
- private final ParsingMode parsingMode;
-
public MultipleTableJobConfigParser(
String jobDefineFilePath, IdGenerator idGenerator, JobConfig
jobConfig) {
this(jobDefineFilePath, idGenerator, jobConfig,
Collections.emptyList());
@@ -102,15 +120,10 @@ public class MultipleTableJobConfigParser {
this.commonPluginJars = commonPluginJars;
this.seaTunnelJobConfig =
ConfigBuilder.of(Paths.get(jobDefineFilePath));
this.envOptions =
ReadonlyConfig.fromConfig(seaTunnelJobConfig.getConfig("env"));
- this.fallbackParser =
- new JobConfigParser(jobDefineFilePath, idGenerator, jobConfig,
commonPluginJars);
- this.parsingMode = envOptions.get(EnvCommonOptions.DAG_PARSING_MODE);
+ this.fallbackParser = new JobConfigParser(idGenerator,
commonPluginJars);
}
public ImmutablePair<List<Action>, Set<URL>> parse() {
- if (parsingMode == ParsingMode.SINGLENESS) {
- return fallbackParser.parse();
- }
List<URL> connectorJars = new ArrayList<>();
try {
connectorJars =
FileUtils.searchJarFiles(Common.connectorJarDir("seatunnel"));
@@ -119,24 +132,35 @@ public class MultipleTableJobConfigParser {
}
ClassLoader classLoader = new
SeaTunnelChildFirstClassLoader(connectorJars);
Thread.currentThread().setContextClassLoader(classLoader);
- // TODO: Support configuration transform
- List<? extends Config> sourceConfigs =
seaTunnelJobConfig.getConfigList("source");
- List<? extends Config> sinkConfigs =
seaTunnelJobConfig.getConfigList("sink");
- if (CollectionUtils.isEmpty(sourceConfigs) ||
CollectionUtils.isEmpty(sinkConfigs)) {
- throw new JobDefineCheckException("Source And Sink can not be
null");
- }
+ List<? extends Config> sourceConfigs =
+ TypesafeConfigUtils.getConfigList(
+ seaTunnelJobConfig, "source", Collections.emptyList());
+ List<? extends Config> transformConfigs =
+ TypesafeConfigUtils.getConfigList(
+ seaTunnelJobConfig, "transform",
Collections.emptyList());
+ List<? extends Config> sinkConfigs =
+ TypesafeConfigUtils.getConfigList(
+ seaTunnelJobConfig, "sink", Collections.emptyList());
+
+ ConfigParserUtil.checkGraph(sourceConfigs, transformConfigs,
sinkConfigs);
+
this.fillJobConfig();
- Map<String, List<Tuple2<CatalogTable, Action>>> tableWithActionMap =
new HashMap<>();
+
+ LinkedHashMap<String, List<Tuple2<CatalogTable, Action>>>
tableWithActionMap =
+ new LinkedHashMap<>();
+
for (Config sourceConfig : sourceConfigs) {
Tuple2<String, List<Tuple2<CatalogTable, Action>>> tuple2 =
- parserSource(sourceConfig, classLoader);
+ parseSource(sourceConfig, classLoader);
tableWithActionMap.put(tuple2._1(), tuple2._2());
}
+
+ parseTransforms(transformConfigs, classLoader, tableWithActionMap);
+
List<Action> sinkActions = new ArrayList<>();
for (int configIndex = 0; configIndex < sinkConfigs.size();
configIndex++) {
Config sinkConfig = sinkConfigs.get(configIndex);
- sinkActions.addAll(
- parserSink(configIndex, sinkConfig, classLoader,
tableWithActionMap));
+ sinkActions.addAll(parseSink(configIndex, sinkConfig, classLoader,
tableWithActionMap));
}
Set<URL> factoryUrls = getUsedFactoryUrls(sinkActions);
factoryUrls.addAll(commonPluginJars);
@@ -182,44 +206,86 @@ public class MultipleTableJobConfigParser {
.put(EnvCommonOptions.CHECKPOINT_INTERVAL.key(), interval));
}
- public Tuple2<String, List<Tuple2<CatalogTable, Action>>> parserSource(
+ private static <T extends Factory> boolean isFallback(
+ ClassLoader classLoader,
+ Class<T> factoryClass,
+ String factoryId,
+ Consumer<T> virtualCreator) {
+ Optional<T> factory =
+ FactoryUtil.discoverOptionalFactory(classLoader, factoryClass,
factoryId);
+ if (!factory.isPresent()) {
+ return true;
+ }
+ try {
+ virtualCreator.accept(factory.get());
+ } catch (Exception e) {
+ if (e instanceof UnsupportedOperationException
+ && "The Factory has not been implemented and the
deprecated Plugin will be used."
+ .equals(e.getMessage())) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private int getParallelism(ReadonlyConfig config) {
+ return Math.max(
+ 1,
+ config.getOptional(CommonOptions.PARALLELISM)
+ .orElse(envOptions.get(CommonOptions.PARALLELISM)));
+ }
+
+ public Tuple2<String, List<Tuple2<CatalogTable, Action>>> parseSource(
Config sourceConfig, ClassLoader classLoader) {
- List<CatalogTable> catalogTables =
+ final ReadonlyConfig readonlyConfig =
ReadonlyConfig.fromConfig(sourceConfig);
+ final String factoryId = getFactoryId(readonlyConfig);
+ final String tableId =
+
readonlyConfig.getOptional(CommonOptions.RESULT_TABLE_NAME).orElse(DEFAULT_ID);
+
+ final int parallelism = getParallelism(readonlyConfig);
+
+ boolean fallback =
+ isFallback(
+ classLoader,
+ TableSourceFactory.class,
+ factoryId,
+ (factory) -> factory.createSource(null));
+ if (fallback) {
+ Tuple2<CatalogTable, Action> tuple =
+ fallbackParser.parseSource(sourceConfig, jobConfig,
tableId, parallelism);
+ return new Tuple2<>(tableId, Collections.singletonList(tuple));
+ }
+
+ final List<CatalogTable> catalogTables =
CatalogTableUtil.getCatalogTables(sourceConfig, classLoader);
if (catalogTables.isEmpty()) {
throw new JobDefineCheckException(
"The source needs catalog table, please configure
`catalog` or `schema` options.");
}
- ReadonlyConfig readonlyConfig =
ReadonlyConfig.fromConfig(sourceConfig);
- String factoryId = getFactoryId(readonlyConfig);
- String tableId =
-
readonlyConfig.getOptional(CommonOptions.RESULT_TABLE_NAME).orElse("default");
-
- if (parsingMode == ParsingMode.SHARDING) {
- catalogTables = Collections.singletonList(catalogTables.get(0));
+ if (readonlyConfig.get(SourceOptions.DAG_PARSING_MODE) ==
ParsingMode.SHARDING) {
+ CatalogTable shardingTable = catalogTables.get(0);
+ catalogTables.clear();
+ catalogTables.add(shardingTable);
}
+
List<Tuple2<SeaTunnelSource<Object, SourceSplit, Serializable>,
List<CatalogTable>>>
sources =
FactoryUtil.createAndPrepareSource(
catalogTables, readonlyConfig, classLoader,
factoryId);
- // get factory urls
- Set<URL> factoryUrls = new HashSet<>();
- URL factoryUrl =
- FactoryUtil.getFactoryUrl(
- FactoryUtil.discoverFactory(
- classLoader, TableSourceFactory.class,
factoryId));
- factoryUrls.add(factoryUrl);
- getCatalogFactoryUrl(sourceConfig,
classLoader).ifPresent(factoryUrls::add);
+ Set<URL> factoryUrls =
+ getFactoryUrls(readonlyConfig, classLoader,
TableSourceFactory.class, factoryId);
List<Tuple2<CatalogTable, Action>> actions = new ArrayList<>();
- int parallelism = getParallelism(readonlyConfig);
for (int configIndex = 0; configIndex < sources.size(); configIndex++)
{
Tuple2<SeaTunnelSource<Object, SourceSplit, Serializable>,
List<CatalogTable>> tuple2 =
sources.get(configIndex);
long id = idGenerator.getNextId();
String actionName =
JobConfigParser.createSourceActionName(configIndex,
factoryId, tableId);
+ SeaTunnelSource<Object, SourceSplit, Serializable> source =
tuple2._1();
+ source.setJobContext(jobConfig.getJobContext());
+ ensureJobModeMatch(jobConfig.getJobContext(), source);
SourceAction<Object, SourceSplit, Serializable> action =
new SourceAction<>(id, actionName, tuple2._1(),
factoryUrls);
action.setParallelism(parallelism);
@@ -230,107 +296,266 @@ public class MultipleTableJobConfigParser {
return new Tuple2<>(tableId, actions);
}
- public static Optional<URL> getCatalogFactoryUrl(Config config,
ClassLoader classLoader) {
- // catalog url
- ReadonlyConfig readonlyConfig = ReadonlyConfig.fromConfig(config);
- Map<String, String> catalogOptions =
-
readonlyConfig.getOptional(CatalogOptions.CATALOG_OPTIONS).orElse(new
HashMap<>());
- // TODO: fallback key
- String factoryId =
- catalogOptions.getOrDefault(
- CommonOptions.FACTORY_ID.key(),
- readonlyConfig.get(CommonOptions.PLUGIN_NAME));
- Optional<CatalogFactory> optionalFactory =
- FactoryUtil.discoverOptionalFactory(classLoader,
CatalogFactory.class, factoryId);
- return optionalFactory.map(FactoryUtil::getFactoryUrl);
+ public static void ensureJobModeMatch(JobContext jobContext,
SeaTunnelSource source) {
+ if (jobContext.getJobMode() == JobMode.BATCH
+ && source.getBoundedness()
+ ==
org.apache.seatunnel.api.source.Boundedness.UNBOUNDED) {
+ throw new JobDefineCheckException(
+ String.format(
+ "'%s' source don't support off-line job.",
source.getPluginName()));
+ }
}
- private int getParallelism(ReadonlyConfig config) {
- return Math.max(
- 1,
- config.getOptional(CommonOptions.PARALLELISM)
- .orElse(envOptions.get(CommonOptions.PARALLELISM)));
+ public void parseTransforms(
+ List<? extends Config> transformConfigs,
+ ClassLoader classLoader,
+ LinkedHashMap<String, List<Tuple2<CatalogTable, Action>>>
tableWithActionMap) {
+ if (CollectionUtils.isEmpty(transformConfigs) ||
transformConfigs.size() == 0) {
+ return;
+ }
+ Queue<Config> configList = new LinkedList<>(transformConfigs);
+ parseTransform(configList, classLoader, tableWithActionMap);
+ }
+
+ private void parseTransform(
+ Queue<Config> transforms,
+ ClassLoader classLoader,
+ LinkedHashMap<String, List<Tuple2<CatalogTable, Action>>>
tableWithActionMap) {
+ Config config = transforms.poll();
+ final ReadonlyConfig readonlyConfig =
ReadonlyConfig.fromConfig(config);
+ final String factoryId = getFactoryId(readonlyConfig);
+ final List<String> inputIds = getInputIds(readonlyConfig);
+
+ List<Tuple2<CatalogTable, Action>> inputs =
+ inputIds.stream()
+ .map(tableWithActionMap::get)
+ .filter(Objects::nonNull)
+ .peek(
+ input -> {
+ if (input.size() > 1) {
+ throw new JobDefineCheckException(
+ "Adding transform to
multi-table source is not supported.");
+ }
+ })
+ .flatMap(Collection::stream)
+ .collect(Collectors.toList());
+ if (inputs.isEmpty()) {
+ if (transforms.isEmpty()) {
+ // Tolerates incorrect configuration of simple graph
+ inputs = findLast(tableWithActionMap);
+ } else {
+ // The previous transform has not been created
+ transforms.offer(config);
+ return;
+ }
+ }
+
+ final String tableId =
+
readonlyConfig.getOptional(CommonOptions.RESULT_TABLE_NAME).orElse(DEFAULT_ID);
+
+ boolean fallback =
+ isFallback(
+ classLoader,
+ TableTransformFactory.class,
+ factoryId,
+ (factory) -> factory.createTransform(null));
+
+ Set<Action> inputActions =
inputs.stream().map(Tuple2::_2).collect(Collectors.toSet());
+ SeaTunnelDataType<?> expectedType =
getProducedType(inputs.get(0)._2());
+ checkProducedTypeEquals(inputActions);
+ int spareParallelism = inputs.get(0)._2().getParallelism();
+ if (fallback) {
+ int parallelism =
+
readonlyConfig.getOptional(CommonOptions.PARALLELISM).orElse(spareParallelism);
+ Tuple2<CatalogTable, Action> tuple =
+ fallbackParser.parseTransform(
+ config,
+ jobConfig,
+ tableId,
+ parallelism,
+ (SeaTunnelRowType) expectedType,
+ inputActions);
+ tableWithActionMap.put(tableId, Collections.singletonList(tuple));
+ return;
+ }
+
+ // TODO: TableTransformFactory is not available.
+ return;
+ }
+
+ public static SeaTunnelDataType<?> getProducedType(Action action) {
+ if (action instanceof SourceAction) {
+ return ((SourceAction<?, ?, ?>)
action).getSource().getProducedType();
+ } else if (action instanceof TransformAction) {
+ return ((TransformAction) action).getTransform().getProducedType();
+ }
+ throw new UnsupportedOperationException();
+ }
+
+ public static void checkProducedTypeEquals(Set<Action> inputActions) {
+ SeaTunnelDataType<?> expectedType = getProducedType(new
ArrayList<>(inputActions).get(0));
+ for (Action action : inputActions) {
+ SeaTunnelDataType<?> producedType = getProducedType(action);
+ if (!expectedType.equals(producedType)) {
+ throw new JobDefineCheckException(
+ "Transform/Sink don't support processing data with two
different structures.");
+ }
+ }
+ }
+
+ @Deprecated
+ private static <T> T findLast(LinkedHashMap<?, T> map) {
+ int size = map.size();
+ int i = 1;
+ for (T value : map.values()) {
+ if (i == size) {
+ return value;
+ }
+ i++;
+ }
+ // never execution
+ return null;
}
- public List<SinkAction<?, ?, ?, ?>> parserSink(
+ public List<SinkAction<?, ?, ?, ?>> parseSink(
int configIndex,
Config sinkConfig,
ClassLoader classLoader,
- Map<String, List<Tuple2<CatalogTable, Action>>>
tableWithActionMap) {
+ LinkedHashMap<String, List<Tuple2<CatalogTable, Action>>>
tableWithActionMap) {
+
+ ReadonlyConfig readonlyConfig = ReadonlyConfig.fromConfig(sinkConfig);
+ String factoryId = getFactoryId(readonlyConfig);
+ List<String> inputIds = getInputIds(readonlyConfig);
+
+ List<List<Tuple2<CatalogTable, Action>>> inputVertices =
+ inputIds.stream()
+ .map(tableWithActionMap::get)
+ .filter(Objects::nonNull)
+ .collect(Collectors.toList());
+ if (inputVertices.isEmpty()) {
+ // Tolerates incorrect configuration of simple graph
+ inputVertices =
Collections.singletonList(findLast(tableWithActionMap));
+ } else if (inputVertices.size() > 1) {
+ for (List<Tuple2<CatalogTable, Action>> inputVertex :
inputVertices) {
+ if (inputVertex.size() > 1) {
+ throw new JobDefineCheckException(
+ "Sink don't support simultaneous writing of data
from multi-table source and other sources.");
+ }
+ }
+ }
+
+ boolean fallback =
+ isFallback(
+ classLoader,
+ TableSinkFactory.class,
+ factoryId,
+ (factory) -> factory.createSink(null));
+ if (fallback) {
+ return fallbackParser.parseSinks(inputVertices, sinkConfig,
jobConfig);
+ }
+
Map<TablePath, CatalogTable> tableMap =
CatalogTableUtil.getCatalogTables(sinkConfig,
classLoader).stream()
.collect(
Collectors.toMap(
catalogTable ->
catalogTable.getTableId().toTablePath(),
catalogTable -> catalogTable));
- ReadonlyConfig readonlyConfig = ReadonlyConfig.fromConfig(sinkConfig);
- String factoryId = getFactoryId(readonlyConfig);
- String leftTableId =
-
readonlyConfig.getOptional(CommonOptions.SOURCE_TABLE_NAME).orElse("default");
- List<Tuple2<CatalogTable, Action>> tableTuples =
tableWithActionMap.get(leftTableId);
// get factory urls
- Set<URL> factoryUrls = new HashSet<>();
- URL factoryUrl =
- FactoryUtil.getFactoryUrl(
- FactoryUtil.discoverFactory(
- classLoader, TableSinkFactory.class,
factoryId));
- factoryUrls.add(factoryUrl);
- getCatalogFactoryUrl(sinkConfig,
classLoader).ifPresent(factoryUrls::add);
-
+ Set<URL> factoryUrls =
+ getFactoryUrls(readonlyConfig, classLoader,
TableSinkFactory.class, factoryId);
List<SinkAction<?, ?, ?, ?>> sinkActions = new ArrayList<>();
- for (Tuple2<CatalogTable, Action> tableTuple : tableTuples) {
- CatalogTable catalogTable = tableTuple._1();
- Action leftAction = tableTuple._2();
- Optional<CatalogTable> insteadTable;
- if (parsingMode == ParsingMode.SHARDING) {
- insteadTable = tableMap.values().stream().findFirst();
- } else {
- // TODO: another table full name map
- insteadTable =
-
Optional.ofNullable(tableMap.get(catalogTable.getTableId().toTablePath()));
- }
- if (insteadTable.isPresent()) {
- catalogTable = insteadTable.get();
- }
- SeaTunnelSink<?, ?, ?, ?> sink =
- FactoryUtil.createAndPrepareSink(
- catalogTable, readonlyConfig, classLoader,
factoryId);
- SinkConfig actionConfig =
- new
SinkConfig(catalogTable.getTableId().toTablePath().toString());
- long id = idGenerator.getNextId();
- String actionName =
- JobConfigParser.createSinkActionName(
- configIndex,
+
+ // union
+ if (inputVertices.size() > 1) {
+ Set<Action> inputActions =
+ inputVertices.stream()
+ .flatMap(Collection::stream)
+ .map(Tuple2::_2)
+ .collect(Collectors.toSet());
+ checkProducedTypeEquals(inputActions);
+ Tuple2<CatalogTable, Action> inputActionSample =
inputVertices.get(0).get(0);
+ SinkAction<?, ?, ?, ?> sinkAction =
+ createSinkAction(
+ inputActionSample._1(),
+ tableMap,
+ inputActions,
+ readonlyConfig,
+ classLoader,
+ factoryUrls,
factoryId,
- String.format(
- "%s(%s)", leftTableId,
actionConfig.getMultipleRowTableId()));
+ inputActionSample._2().getParallelism(),
+ configIndex);
+ sinkActions.add(sinkAction);
+ return sinkActions;
+ }
+
+ // sink template
+ for (Tuple2<CatalogTable, Action> tuple : inputVertices.get(0)) {
SinkAction<?, ?, ?, ?> sinkAction =
- new SinkAction<>(
- id,
- actionName,
- Collections.singletonList(leftAction),
- sink,
+ createSinkAction(
+ tuple._1(),
+ tableMap,
+ Collections.singleton(tuple._2()),
+ readonlyConfig,
+ classLoader,
factoryUrls,
- actionConfig);
- handleSaveMode(sink);
- sinkAction.setParallelism(leftAction.getParallelism());
+ factoryId,
+ tuple._2().getParallelism(),
+ configIndex);
sinkActions.add(sinkAction);
}
return sinkActions;
}
- private static void handleSaveMode(SeaTunnelSink<?, ?, ?, ?> sink) {
+ private SinkAction<?, ?, ?, ?> createSinkAction(
+ CatalogTable catalogTable,
+ Map<TablePath, CatalogTable> sinkTableMap,
+ Set<Action> inputActions,
+ ReadonlyConfig readonlyConfig,
+ ClassLoader classLoader,
+ Set<URL> factoryUrls,
+ String factoryId,
+ int parallelism,
+ int configIndex) {
+ Optional<CatalogTable> insteadTable;
+ if (sinkTableMap.size() == 1) {
+ insteadTable = sinkTableMap.values().stream().findFirst();
+ } else {
+ // TODO: another table full name map
+ insteadTable =
+
Optional.ofNullable(sinkTableMap.get(catalogTable.getTableId().toTablePath()));
+ }
+ if (insteadTable.isPresent()) {
+ catalogTable = insteadTable.get();
+ }
+ SeaTunnelSink<?, ?, ?, ?> sink =
+ FactoryUtil.createAndPrepareSink(
+ catalogTable, readonlyConfig, classLoader, factoryId);
+ SinkConfig actionConfig =
+ new
SinkConfig(catalogTable.getTableId().toTablePath().toString());
+ long id = idGenerator.getNextId();
+ String actionName =
+ JobConfigParser.createSinkActionName(
+ configIndex, factoryId,
actionConfig.getMultipleRowTableId());
+ SinkAction<?, ?, ?, ?> sinkAction =
+ new SinkAction<>(
+ id,
+ actionName,
+ new ArrayList<>(inputActions),
+ sink,
+ factoryUrls,
+ actionConfig);
+ handleSaveMode(sink);
+ sinkAction.setParallelism(parallelism);
+ return sinkAction;
+ }
+
+ public static void handleSaveMode(SeaTunnelSink<?, ?, ?, ?> sink) {
if (SupportDataSaveMode.class.isAssignableFrom(sink.getClass())) {
SupportDataSaveMode saveModeSink = (SupportDataSaveMode) sink;
DataSaveMode dataSaveMode = saveModeSink.getDataSaveMode();
saveModeSink.handleSaveMode(dataSaveMode);
}
}
-
- private static String getFactoryId(ReadonlyConfig readonlyConfig) {
- return readonlyConfig
- .getOptional(CommonOptions.FACTORY_ID)
- .orElse(readonlyConfig.get(CommonOptions.PLUGIN_NAME));
- }
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TestUtils.java
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TestUtils.java
index d5f34f3fa..0481256ce 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TestUtils.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TestUtils.java
@@ -34,7 +34,7 @@ import
org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
import org.apache.seatunnel.engine.core.dag.logical.LogicalDagGenerator;
import org.apache.seatunnel.engine.core.dag.logical.LogicalEdge;
import org.apache.seatunnel.engine.core.dag.logical.LogicalVertex;
-import org.apache.seatunnel.engine.core.parse.JobConfigParser;
+import org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser;
import org.apache.commons.lang3.tuple.ImmutablePair;
@@ -109,7 +109,7 @@ public class TestUtils {
IdGenerator idGenerator = new IdGenerator();
ImmutablePair<List<Action>, Set<URL>> immutablePair =
- new JobConfigParser(filePath, idGenerator, jobConfig).parse();
+ new MultipleTableJobConfigParser(filePath, idGenerator,
jobConfig).parse();
LogicalDagGenerator logicalDagGenerator =
new LogicalDagGenerator(immutablePair.getLeft(), jobConfig,
idGenerator);
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobHistoryServiceTest.java
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobHistoryServiceTest.java
index 657888b14..334518618 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobHistoryServiceTest.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobHistoryServiceTest.java
@@ -40,9 +40,9 @@ import static org.awaitility.Awaitility.await;
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class JobHistoryServiceTest extends AbstractSeaTunnelServerTest {
- private static final Long JOB_1 = 1L;
- private static final Long JOB_2 = 2L;
- private static final Long JOB_3 = 3L;
+ private static final Long JOB_1 = System.currentTimeMillis() + 1L;
+ private static final Long JOB_2 = System.currentTimeMillis() + 2L;
+ private static final Long JOB_3 = System.currentTimeMillis() + 3L;
@Test
public void testlistJobState() throws Exception {
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMetricsTest.java
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMetricsTest.java
index 5e7afd3cf..5b0a75abf 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMetricsTest.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMetricsTest.java
@@ -50,20 +50,20 @@ import static org.junit.jupiter.api.Assertions.fail;
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class JobMetricsTest extends AbstractSeaTunnelServerTest {
- private static final Long JOB_1 = 145234L;
- private static final Long JOB_2 = 223452L;
- private static final Long JOB_3 = 323475L;
-
@Test
public void testGetJobMetrics() throws Exception {
- startJob(JOB_1, "fake_to_console_job_metrics.conf", false);
- startJob(JOB_2, "fake_to_console_job_metrics.conf", false);
+
+ long jobId1 = System.currentTimeMillis() + 145234L;
+ long jobId2 = System.currentTimeMillis() + 223452L;
+
+ startJob(jobId1, "fake_to_console_job_metrics.conf", false);
+ startJob(jobId2, "fake_to_console_job_metrics.conf", false);
await().atMost(60000, TimeUnit.MILLISECONDS)
.untilAsserted(
() -> {
JobMetrics jobMetrics =
-
server.getCoordinatorService().getJobMetrics(JOB_1);
+
server.getCoordinatorService().getJobMetrics(jobId1);
if (jobMetrics.get(SINK_WRITE_COUNT).size() > 0) {
assertTrue(
(Long)
jobMetrics.get(SINK_WRITE_COUNT).get(0).value() > 0);
@@ -75,7 +75,7 @@ class JobMetricsTest extends AbstractSeaTunnelServerTest {
}
});
- // waiting for JOB_1 status turn to FINISHED
+ // waiting for jobId1 status turn to FINISHED
await().atMost(60000, TimeUnit.MILLISECONDS)
.untilAsserted(
() ->
@@ -86,9 +86,9 @@ class JobMetricsTest extends AbstractSeaTunnelServerTest {
.contains(
String.format(
"\"jobId\":%s,\"jobName\":\"Test\",\"jobStatus\":\"FINISHED\"",
- JOB_1))));
+ jobId1))));
- JobMetrics jobMetrics =
server.getCoordinatorService().getJobMetrics(JOB_1);
+ JobMetrics jobMetrics =
server.getCoordinatorService().getJobMetrics(jobId1);
assertEquals(30, (Long)
jobMetrics.get(SINK_WRITE_COUNT).get(0).value());
assertEquals(30, (Long)
jobMetrics.get(SOURCE_RECEIVED_COUNT).get(0).value());
assertTrue((Double) jobMetrics.get(SOURCE_RECEIVED_QPS).get(0).value()
> 0);
@@ -98,22 +98,25 @@ class JobMetricsTest extends AbstractSeaTunnelServerTest {
@Test
@SuppressWarnings("checkstyle:RegexpSingleline")
public void testMetricsOnJobRestart() throws InterruptedException {
+
+ long jobId3 = System.currentTimeMillis() + 323475L;
+
CoordinatorService coordinatorService = server.getCoordinatorService();
- startJob(JOB_3, "stream_fake_to_console.conf", false);
+ startJob(jobId3, "stream_fake_to_console.conf", false);
// waiting for job status turn to running
await().atMost(120000, TimeUnit.MILLISECONDS)
.untilAsserted(
() ->
Assertions.assertEquals(
JobStatus.RUNNING,
-
server.getCoordinatorService().getJobStatus(JOB_3)));
+
server.getCoordinatorService().getJobStatus(jobId3)));
Thread.sleep(10000);
-
System.out.println(coordinatorService.getJobMetrics(JOB_3).toJsonString());
+
System.out.println(coordinatorService.getJobMetrics(jobId3).toJsonString());
// start savePoint
- coordinatorService.savePoint(JOB_3);
+ coordinatorService.savePoint(jobId3);
// waiting job FINISHED
await().atMost(120000, TimeUnit.MILLISECONDS)
@@ -121,25 +124,27 @@ class JobMetricsTest extends AbstractSeaTunnelServerTest {
() ->
Assertions.assertEquals(
JobStatus.FINISHED,
-
server.getCoordinatorService().getJobStatus(JOB_3)));
+
server.getCoordinatorService().getJobStatus(jobId3)));
// restore job
- startJob(JOB_3, "stream_fake_to_console.conf", true);
+ startJob(jobId3, "stream_fake_to_console.conf", true);
await().atMost(120000, TimeUnit.MILLISECONDS)
.untilAsserted(
() ->
Assertions.assertEquals(
JobStatus.RUNNING,
-
server.getCoordinatorService().getJobStatus(JOB_3)));
+
server.getCoordinatorService().getJobStatus(jobId3)));
Thread.sleep(20000);
// check metrics
- JobMetrics jobMetrics = coordinatorService.getJobMetrics(JOB_3);
+ JobMetrics jobMetrics = coordinatorService.getJobMetrics(jobId3);
System.out.println(jobMetrics.toJsonString());
assertTrue(40 < (Long)
jobMetrics.get(SINK_WRITE_COUNT).get(0).value());
assertTrue(40 < (Long)
jobMetrics.get(SINK_WRITE_COUNT).get(1).value());
assertTrue(40 < (Long)
jobMetrics.get(SOURCE_RECEIVED_COUNT).get(0).value());
assertTrue(40 < (Long)
jobMetrics.get(SOURCE_RECEIVED_COUNT).get(1).value());
+
+ server.getCoordinatorService().cancelJob(jobId3);
}
private void startJob(Long jobid, String path, boolean
isStartWithSavePoint) {
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/resources/fake_to_console.conf
b/seatunnel-engine/seatunnel-engine-server/src/test/resources/fake_to_console.conf
index 6b0599131..9e04be8be 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/test/resources/fake_to_console.conf
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/resources/fake_to_console.conf
@@ -40,7 +40,7 @@ source {
}
FakeSource {
- result_table_name = "fake"
+ result_table_name = "fake2"
parallelism = 1
schema = {
fields {
@@ -56,6 +56,6 @@ transform {
sink {
console {
- source_table_name="fake"
+ source_table_name = "fake,fake2"
}
}
\ No newline at end of file