This is an automated email from the ASF dual-hosted git repository.

zongwen pushed a commit to branch cdc-multiple-table
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/cdc-multiple-table by this 
push:
     new 98f2ad0c1 [hotfix][zeta] fix zeta multi-table parser error (#4193)
98f2ad0c1 is described below

commit 98f2ad0c194d64df3a2c8aee6b38ce00cf1df52a
Author: Zongwen Li <[email protected]>
AuthorDate: Thu Feb 23 14:19:28 2023 +0800

    [hotfix][zeta] fix zeta multi-table parser error (#4193)
---
 .../connectors/cdc/base/config/JdbcSourceConfigFactory.java      | 2 +-
 .../seatunnel/connectors/seatunnel/console/sink/ConsoleSink.java | 9 ++++++---
 .../connectors/seatunnel/console/sink/ConsoleSinkFactory.java    | 7 +++++++
 .../connectors/seatunnel/jdbc/catalog/MySqlCatalogFactory.java   | 4 ++++
 .../seatunnel/starrocks/catalog/StarRocksCatalogFactory.java     | 4 ++++
 .../connectors/seatunnel/starrocks/sink/StarRocksSink.java       | 2 ++
 .../org/apache/seatunnel/engine/core/parse/JobConfigParser.java  | 2 +-
 .../engine/core/parse/MultipleTableJobConfigParser.java          | 4 ++--
 .../engine/server/dag/execution/ExecutionPlanGenerator.java      | 6 +++++-
 .../org/apache/seatunnel/engine/server/master/JobMaster.java     | 2 +-
 10 files changed, 33 insertions(+), 9 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/JdbcSourceConfigFactory.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/JdbcSourceConfigFactory.java
index 8b764bac4..5ac33abdb 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/JdbcSourceConfigFactory.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/JdbcSourceConfigFactory.java
@@ -192,7 +192,7 @@ public abstract class JdbcSourceConfigFactory implements 
SourceConfig.Factory<Jd
         // TODO: support multi-table
         this.databaseList = 
Collections.singletonList(config.get(JdbcSourceOptions.DATABASE_NAME));
         this.tableList = 
Collections.singletonList(config.get(JdbcSourceOptions.DATABASE_NAME)
-            + "." + config.get(JdbcSourceOptions.TABLE_NAME));
+            + "\\." + config.get(JdbcSourceOptions.TABLE_NAME));
         this.distributionFactorUpper = 
config.get(JdbcSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND);
         this.distributionFactorLower = 
config.get(JdbcSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND);
         this.splitSize = config.get(SourceOptions.SNAPSHOT_SPLIT_SIZE);
diff --git 
a/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSink.java
 
b/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSink.java
index 9613b40c8..897bfe9e7 100644
--- 
a/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSink.java
+++ 
b/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSink.java
@@ -28,13 +28,17 @@ import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
 import com.google.auto.service.AutoService;
+import lombok.NoArgsConstructor;
 
+@NoArgsConstructor
 @AutoService(SeaTunnelSink.class)
 public class ConsoleSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
-
-    private Config pluginConfig;
     private SeaTunnelRowType seaTunnelRowType;
 
+    public ConsoleSink(SeaTunnelRowType seaTunnelRowType) {
+        this.seaTunnelRowType = seaTunnelRowType;
+    }
+
     @Override
     public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
         this.seaTunnelRowType = seaTunnelRowType;
@@ -57,7 +61,6 @@ public class ConsoleSink extends 
AbstractSimpleSink<SeaTunnelRow, Void> {
 
     @Override
     public void prepare(Config pluginConfig) {
-        this.pluginConfig = pluginConfig;
     }
 
 }
diff --git 
a/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkFactory.java
 
b/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkFactory.java
index 5363f2395..26607c37a 100644
--- 
a/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkFactory.java
@@ -18,7 +18,9 @@
 package org.apache.seatunnel.connectors.seatunnel.console.sink;
 
 import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.connector.TableSink;
 import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableFactoryContext;
 import org.apache.seatunnel.api.table.factory.TableSinkFactory;
 
 import com.google.auto.service.AutoService;
@@ -34,4 +36,9 @@ public class ConsoleSinkFactory implements TableSinkFactory {
     public OptionRule optionRule() {
         return OptionRule.builder().build();
     }
+
+    @Override
+    public TableSink createSink(TableFactoryContext context) {
+        return () -> new 
ConsoleSink(context.getCatalogTable().getTableSchema().toPhysicalRowDataType());
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MySqlCatalogFactory.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MySqlCatalogFactory.java
index d9bd85f08..f3246ab6c 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MySqlCatalogFactory.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MySqlCatalogFactory.java
@@ -21,7 +21,11 @@ import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.configuration.util.OptionRule;
 import org.apache.seatunnel.api.table.catalog.Catalog;
 import org.apache.seatunnel.api.table.factory.CatalogFactory;
+import org.apache.seatunnel.api.table.factory.Factory;
 
+import com.google.auto.service.AutoService;
+
+@AutoService(Factory.class)
 public class MySqlCatalogFactory implements CatalogFactory {
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalogFactory.java
 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalogFactory.java
index e17b8591a..5f1fa98b5 100644
--- 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalogFactory.java
+++ 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalogFactory.java
@@ -21,8 +21,12 @@ import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.configuration.util.OptionRule;
 import org.apache.seatunnel.api.table.catalog.Catalog;
 import org.apache.seatunnel.api.table.factory.CatalogFactory;
+import org.apache.seatunnel.api.table.factory.Factory;
 import 
org.apache.seatunnel.connectors.seatunnel.starrocks.config.StarRocksOptions;
 
+import com.google.auto.service.AutoService;
+
+@AutoService(Factory.class)
 public class StarRocksCatalogFactory implements CatalogFactory {
     public static final String IDENTIFIER = "StarRocks";
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java
 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java
index 19c0ce853..add3967ca 100644
--- 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java
+++ 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java
@@ -38,11 +38,13 @@ import 
org.apache.seatunnel.connectors.seatunnel.starrocks.config.SinkConfig;
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
 import com.google.auto.service.AutoService;
+import lombok.NoArgsConstructor;
 import org.apache.commons.lang3.StringUtils;
 
 import java.util.Collections;
 import java.util.List;
 
+@NoArgsConstructor
 @AutoService(SeaTunnelSink.class)
 public class StarRocksSink extends AbstractSimpleSink<SeaTunnelRow, Void> 
implements SupportDataSaveMode {
 
diff --git 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java
 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java
index f1ffca0a1..389cdcbd9 100644
--- 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java
+++ 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java
@@ -158,7 +158,7 @@ public class JobConfigParser {
         if (envConfigs.hasPath(EnvCommonOptions.CHECKPOINT_INTERVAL.key())) {
             jobConfig.getEnvOptions()
                 .put(EnvCommonOptions.CHECKPOINT_INTERVAL.key(),
-                    
envConfigs.getInt(EnvCommonOptions.CHECKPOINT_INTERVAL.key()));
+                    
envConfigs.getLong(EnvCommonOptions.CHECKPOINT_INTERVAL.key()));
         }
     }
 
diff --git 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
index e607a95fe..830b7e7ad 100644
--- 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
+++ 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
@@ -109,11 +109,11 @@ public class MultipleTableJobConfigParser {
         if (!envOptions.get(EnvCommonOptions.MULTIPLE_TABLE_ENABLE)) {
             return fallbackParser.parse();
         }
-        List<URL> connectorJars = null;
+        List<URL> connectorJars = new ArrayList<>();
         try {
             connectorJars = 
FileUtils.searchJarFiles(Common.connectorJarDir("seatunnel"));
         } catch (IOException e) {
-            throw new RuntimeException(e);
+            LOGGER.info(e);
         }
         ClassLoader classLoader = new 
SeaTunnelChildFirstClassLoader(connectorJars);
         Thread.currentThread().setContextClassLoader(classLoader);
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
index cea095d9f..8863a70d3 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
@@ -31,6 +31,7 @@ import 
org.apache.seatunnel.engine.core.dag.actions.ShuffleConfig;
 import org.apache.seatunnel.engine.core.dag.actions.ShuffleMultipleRowStrategy;
 import org.apache.seatunnel.engine.core.dag.actions.ShuffleStrategy;
 import org.apache.seatunnel.engine.core.dag.actions.SinkAction;
+import org.apache.seatunnel.engine.core.dag.actions.SinkConfig;
 import org.apache.seatunnel.engine.core.dag.actions.SourceAction;
 import org.apache.seatunnel.engine.core.dag.actions.TransformAction;
 import org.apache.seatunnel.engine.core.dag.actions.TransformChainAction;
@@ -101,7 +102,10 @@ public class ExecutionPlanGenerator {
                 ((ShuffleAction) action).getConfig());
         } else if (action instanceof SinkAction) {
             newAction = new SinkAction<>(id, action.getName(),
-                ((SinkAction<?, ?, ?, ?>) action).getSink(), 
action.getJarUrls());
+                new ArrayList<>(),
+                ((SinkAction<?, ?, ?, ?>) action).getSink(),
+                action.getJarUrls(),
+                (SinkConfig) action.getConfig());
         } else if (action instanceof SourceAction) {
             newAction = new SourceAction<>(id, action.getName(),
                 ((SourceAction<?, ?, ?>) action).getSource(), 
action.getJarUrls());
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index 6d53deffe..bad0872f5 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -212,7 +212,7 @@ public class JobMaster {
         jobCheckpointConfig.setStorage(jobCheckpointStorageConfig);
 
         if (jobEnv.containsKey(EnvCommonOptions.CHECKPOINT_INTERVAL.key())) {
-            jobCheckpointConfig.setCheckpointInterval((Integer) 
jobEnv.get(EnvCommonOptions.CHECKPOINT_INTERVAL.key()));
+            jobCheckpointConfig.setCheckpointInterval((Long) 
jobEnv.get(EnvCommonOptions.CHECKPOINT_INTERVAL.key()));
         }
         return jobCheckpointConfig;
     }

Reply via email to