This is an automated email from the ASF dual-hosted git repository.
zongwen pushed a commit to branch cdc-multiple-table
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/cdc-multiple-table by this
push:
new 98f2ad0c1 [hotfix][zeta] fix zeta multi-table parser error (#4193)
98f2ad0c1 is described below
commit 98f2ad0c194d64df3a2c8aee6b38ce00cf1df52a
Author: Zongwen Li <[email protected]>
AuthorDate: Thu Feb 23 14:19:28 2023 +0800
[hotfix][zeta] fix zeta multi-table parser error (#4193)
---
.../connectors/cdc/base/config/JdbcSourceConfigFactory.java | 2 +-
.../seatunnel/connectors/seatunnel/console/sink/ConsoleSink.java | 9 ++++++---
.../connectors/seatunnel/console/sink/ConsoleSinkFactory.java | 7 +++++++
.../connectors/seatunnel/jdbc/catalog/MySqlCatalogFactory.java | 4 ++++
.../seatunnel/starrocks/catalog/StarRocksCatalogFactory.java | 4 ++++
.../connectors/seatunnel/starrocks/sink/StarRocksSink.java | 2 ++
.../org/apache/seatunnel/engine/core/parse/JobConfigParser.java | 2 +-
.../engine/core/parse/MultipleTableJobConfigParser.java | 4 ++--
.../engine/server/dag/execution/ExecutionPlanGenerator.java | 6 +++++-
.../org/apache/seatunnel/engine/server/master/JobMaster.java | 2 +-
10 files changed, 33 insertions(+), 9 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/JdbcSourceConfigFactory.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/JdbcSourceConfigFactory.java
index 8b764bac4..5ac33abdb 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/JdbcSourceConfigFactory.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/JdbcSourceConfigFactory.java
@@ -192,7 +192,7 @@ public abstract class JdbcSourceConfigFactory implements
SourceConfig.Factory<Jd
// TODO: support multi-table
this.databaseList =
Collections.singletonList(config.get(JdbcSourceOptions.DATABASE_NAME));
this.tableList =
Collections.singletonList(config.get(JdbcSourceOptions.DATABASE_NAME)
- + "." + config.get(JdbcSourceOptions.TABLE_NAME));
+ + "\\." + config.get(JdbcSourceOptions.TABLE_NAME));
this.distributionFactorUpper =
config.get(JdbcSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND);
this.distributionFactorLower =
config.get(JdbcSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND);
this.splitSize = config.get(SourceOptions.SNAPSHOT_SPLIT_SIZE);
diff --git
a/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSink.java
b/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSink.java
index 9613b40c8..897bfe9e7 100644
---
a/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSink.java
+++
b/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSink.java
@@ -28,13 +28,17 @@ import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import com.google.auto.service.AutoService;
+import lombok.NoArgsConstructor;
+@NoArgsConstructor
@AutoService(SeaTunnelSink.class)
public class ConsoleSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
-
- private Config pluginConfig;
private SeaTunnelRowType seaTunnelRowType;
+ public ConsoleSink(SeaTunnelRowType seaTunnelRowType) {
+ this.seaTunnelRowType = seaTunnelRowType;
+ }
+
@Override
public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
this.seaTunnelRowType = seaTunnelRowType;
@@ -57,7 +61,6 @@ public class ConsoleSink extends
AbstractSimpleSink<SeaTunnelRow, Void> {
@Override
public void prepare(Config pluginConfig) {
- this.pluginConfig = pluginConfig;
}
}
diff --git
a/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkFactory.java
b/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkFactory.java
index 5363f2395..26607c37a 100644
---
a/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkFactory.java
@@ -18,7 +18,9 @@
package org.apache.seatunnel.connectors.seatunnel.console.sink;
import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.connector.TableSink;
import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableFactoryContext;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
import com.google.auto.service.AutoService;
@@ -34,4 +36,9 @@ public class ConsoleSinkFactory implements TableSinkFactory {
public OptionRule optionRule() {
return OptionRule.builder().build();
}
+
+ @Override
+ public TableSink createSink(TableFactoryContext context) {
+ return () -> new
ConsoleSink(context.getCatalogTable().getTableSchema().toPhysicalRowDataType());
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MySqlCatalogFactory.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MySqlCatalogFactory.java
index d9bd85f08..f3246ab6c 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MySqlCatalogFactory.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MySqlCatalogFactory.java
@@ -21,7 +21,11 @@ import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.table.catalog.Catalog;
import org.apache.seatunnel.api.table.factory.CatalogFactory;
+import org.apache.seatunnel.api.table.factory.Factory;
+import com.google.auto.service.AutoService;
+
+@AutoService(Factory.class)
public class MySqlCatalogFactory implements CatalogFactory {
@Override
diff --git
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalogFactory.java
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalogFactory.java
index e17b8591a..5f1fa98b5 100644
---
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalogFactory.java
+++
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalogFactory.java
@@ -21,8 +21,12 @@ import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.table.catalog.Catalog;
import org.apache.seatunnel.api.table.factory.CatalogFactory;
+import org.apache.seatunnel.api.table.factory.Factory;
import
org.apache.seatunnel.connectors.seatunnel.starrocks.config.StarRocksOptions;
+import com.google.auto.service.AutoService;
+
+@AutoService(Factory.class)
public class StarRocksCatalogFactory implements CatalogFactory {
public static final String IDENTIFIER = "StarRocks";
@Override
diff --git
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java
index 19c0ce853..add3967ca 100644
---
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java
+++
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java
@@ -38,11 +38,13 @@ import
org.apache.seatunnel.connectors.seatunnel.starrocks.config.SinkConfig;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import com.google.auto.service.AutoService;
+import lombok.NoArgsConstructor;
import org.apache.commons.lang3.StringUtils;
import java.util.Collections;
import java.util.List;
+@NoArgsConstructor
@AutoService(SeaTunnelSink.class)
public class StarRocksSink extends AbstractSimpleSink<SeaTunnelRow, Void>
implements SupportDataSaveMode {
diff --git
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java
index f1ffca0a1..389cdcbd9 100644
---
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java
+++
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java
@@ -158,7 +158,7 @@ public class JobConfigParser {
if (envConfigs.hasPath(EnvCommonOptions.CHECKPOINT_INTERVAL.key())) {
jobConfig.getEnvOptions()
.put(EnvCommonOptions.CHECKPOINT_INTERVAL.key(),
-
envConfigs.getInt(EnvCommonOptions.CHECKPOINT_INTERVAL.key()));
+
envConfigs.getLong(EnvCommonOptions.CHECKPOINT_INTERVAL.key()));
}
}
diff --git
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
index e607a95fe..830b7e7ad 100644
---
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
+++
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
@@ -109,11 +109,11 @@ public class MultipleTableJobConfigParser {
if (!envOptions.get(EnvCommonOptions.MULTIPLE_TABLE_ENABLE)) {
return fallbackParser.parse();
}
- List<URL> connectorJars = null;
+ List<URL> connectorJars = new ArrayList<>();
try {
connectorJars =
FileUtils.searchJarFiles(Common.connectorJarDir("seatunnel"));
} catch (IOException e) {
- throw new RuntimeException(e);
+ LOGGER.info(e);
}
ClassLoader classLoader = new
SeaTunnelChildFirstClassLoader(connectorJars);
Thread.currentThread().setContextClassLoader(classLoader);
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
index cea095d9f..8863a70d3 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
@@ -31,6 +31,7 @@ import
org.apache.seatunnel.engine.core.dag.actions.ShuffleConfig;
import org.apache.seatunnel.engine.core.dag.actions.ShuffleMultipleRowStrategy;
import org.apache.seatunnel.engine.core.dag.actions.ShuffleStrategy;
import org.apache.seatunnel.engine.core.dag.actions.SinkAction;
+import org.apache.seatunnel.engine.core.dag.actions.SinkConfig;
import org.apache.seatunnel.engine.core.dag.actions.SourceAction;
import org.apache.seatunnel.engine.core.dag.actions.TransformAction;
import org.apache.seatunnel.engine.core.dag.actions.TransformChainAction;
@@ -101,7 +102,10 @@ public class ExecutionPlanGenerator {
((ShuffleAction) action).getConfig());
} else if (action instanceof SinkAction) {
newAction = new SinkAction<>(id, action.getName(),
- ((SinkAction<?, ?, ?, ?>) action).getSink(),
action.getJarUrls());
+ new ArrayList<>(),
+ ((SinkAction<?, ?, ?, ?>) action).getSink(),
+ action.getJarUrls(),
+ (SinkConfig) action.getConfig());
} else if (action instanceof SourceAction) {
newAction = new SourceAction<>(id, action.getName(),
((SourceAction<?, ?, ?>) action).getSource(),
action.getJarUrls());
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index 6d53deffe..bad0872f5 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -212,7 +212,7 @@ public class JobMaster {
jobCheckpointConfig.setStorage(jobCheckpointStorageConfig);
if (jobEnv.containsKey(EnvCommonOptions.CHECKPOINT_INTERVAL.key())) {
- jobCheckpointConfig.setCheckpointInterval((Integer)
jobEnv.get(EnvCommonOptions.CHECKPOINT_INTERVAL.key()));
+ jobCheckpointConfig.setCheckpointInterval((Long)
jobEnv.get(EnvCommonOptions.CHECKPOINT_INTERVAL.key()));
}
return jobCheckpointConfig;
}