This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new a04add6991 [Improve] Add default implement for 
`SeaTunnelSource::getProducedType` (#5670)
a04add6991 is described below

commit a04add6991b3c015dbba230586649b9b0d703760
Author: Jia Fan <[email protected]>
AuthorDate: Tue Oct 24 10:22:33 2023 +0800

    [Improve] Add default implement for `SeaTunnelSource::getProducedType` 
(#5670)
---
 .../seatunnel/api/source/SeaTunnelSource.java      |  2 +-
 .../cdc/base/source/IncrementalSource.java         | 22 ---------------
 .../seatunnel/fake/source/FakeSource.java          | 31 ----------------------
 .../core/starter/execution/PluginUtil.java         |  1 -
 .../flink/execution/SinkExecuteProcessor.java      |  1 -
 .../flink/execution/SinkExecuteProcessor.java      |  1 -
 .../spark/execution/SinkExecuteProcessor.java      |  1 -
 .../spark/execution/SinkExecuteProcessor.java      |  1 -
 .../apache/seatunnel/engine/server/TestUtils.java  |  1 -
 .../server/checkpoint/CheckpointPlanTest.java      |  1 -
 .../seatunnel/engine/server/dag/TaskTest.java      |  4 +--
 11 files changed, 2 insertions(+), 64 deletions(-)

diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelSource.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelSource.java
index 924c2e5244..916f076f57 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelSource.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelSource.java
@@ -56,7 +56,7 @@ public interface SeaTunnelSource<T, SplitT extends 
SourceSplit, StateT extends S
      */
     @Deprecated
     default SeaTunnelDataType<T> getProducedType() {
-        throw new UnsupportedOperationException("getProducedType method has 
not been implemented.");
+        return (SeaTunnelDataType) 
getProducedCatalogTables().get(0).getSeaTunnelRowType();
     }
 
     /**
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java
index 4c8ee235a9..11769ab87a 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java
@@ -17,9 +17,6 @@
 
 package org.apache.seatunnel.connectors.cdc.base.source;
 
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.common.PrepareFailException;
 import org.apache.seatunnel.api.common.metrics.MetricsContext;
 import org.apache.seatunnel.api.configuration.Option;
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
@@ -118,20 +115,6 @@ public abstract class IncrementalSource<T, C extends 
SourceConfig>
         this.offsetFactory = createOffsetFactory(readonlyConfig);
     }
 
-    @Override
-    public final void prepare(Config pluginConfig) throws PrepareFailException 
{
-        this.readonlyConfig = ReadonlyConfig.fromConfig(pluginConfig);
-
-        this.startupConfig = getStartupConfig(readonlyConfig);
-        this.stopConfig = getStopConfig(readonlyConfig);
-        this.stopMode = stopConfig.getStopMode();
-        this.incrementalParallelism = 
readonlyConfig.get(SourceOptions.INCREMENTAL_PARALLELISM);
-        this.configFactory = createSourceConfigFactory(readonlyConfig);
-        this.dataSourceDialect = createDataSourceDialect(readonlyConfig);
-        this.deserializationSchema = 
createDebeziumDeserializationSchema(readonlyConfig);
-        this.offsetFactory = createOffsetFactory(readonlyConfig);
-    }
-
     protected StartupConfig getStartupConfig(ReadonlyConfig config) {
         return new StartupConfig(
                 config.get(getStartupModeOption()),
@@ -178,11 +161,6 @@ public abstract class IncrementalSource<T, C extends 
SourceConfig>
         return stopMode == StopMode.NEVER ? Boundedness.UNBOUNDED : 
Boundedness.BOUNDED;
     }
 
-    @Override
-    public SeaTunnelDataType<T> getProducedType() {
-        return deserializationSchema.getProducedType();
-    }
-
     @SuppressWarnings("MagicNumber")
     @Override
     public SourceReader<T, SourceSplitBase> createReader(SourceReader.Context 
readerContext)
diff --git 
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
 
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
index e634347fde..d3bf9c6430 100644
--- 
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
+++ 
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
@@ -17,10 +17,7 @@
 
 package org.apache.seatunnel.connectors.seatunnel.fake.source;
 
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
 import org.apache.seatunnel.api.common.JobContext;
-import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.source.Boundedness;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
@@ -31,27 +28,19 @@ import org.apache.seatunnel.api.source.SupportParallelism;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
 import org.apache.seatunnel.api.table.catalog.TableIdentifier;
-import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.common.config.CheckConfigUtil;
-import org.apache.seatunnel.common.config.CheckResult;
 import org.apache.seatunnel.common.constants.JobMode;
-import org.apache.seatunnel.common.constants.PluginType;
 import org.apache.seatunnel.connectors.seatunnel.fake.config.FakeConfig;
-import 
org.apache.seatunnel.connectors.seatunnel.fake.exception.FakeConnectorException;
 import org.apache.seatunnel.connectors.seatunnel.fake.state.FakeSourceState;
 
 import org.apache.commons.collections4.CollectionUtils;
 
-import com.google.auto.service.AutoService;
 import com.google.common.collect.Lists;
 
 import java.util.Collections;
 import java.util.List;
 import java.util.stream.Collectors;
 
-@AutoService(SeaTunnelSource.class)
 public class FakeSource
         implements SeaTunnelSource<SeaTunnelRow, FakeSourceSplit, 
FakeSourceState>,
                 SupportParallelism,
@@ -96,11 +85,6 @@ public class FakeSource
                 .collect(Collectors.toList());
     }
 
-    @Override
-    public SeaTunnelRowType getProducedType() {
-        return catalogTable.getSeaTunnelRowType();
-    }
-
     @Override
     public SourceSplitEnumerator<FakeSourceSplit, FakeSourceState> 
createEnumerator(
             SourceSplitEnumerator.Context<FakeSourceSplit> enumeratorContext) 
throws Exception {
@@ -126,21 +110,6 @@ public class FakeSource
         return "FakeSource";
     }
 
-    @Override
-    public void prepare(Config pluginConfig) {
-        CheckResult result =
-                CheckConfigUtil.checkAllExists(pluginConfig, 
TableSchemaOptions.SCHEMA.key());
-        if (!result.isSuccess()) {
-            throw new FakeConnectorException(
-                    SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
-                    String.format(
-                            "PluginName: %s, PluginType: %s, Message: %s",
-                            getPluginName(), PluginType.SOURCE, 
result.getMsg()));
-        }
-        this.catalogTable = CatalogTableUtil.buildWithConfig(pluginConfig);
-        this.fakeConfig = FakeConfig.buildWithConfig(pluginConfig);
-    }
-
     @Override
     public void setJobContext(JobContext jobContext) {
         this.jobContext = jobContext;
diff --git 
a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/execution/PluginUtil.java
 
b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/execution/PluginUtil.java
index 0b1f34d3c1..0dc4209a8b 100644
--- 
a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/execution/PluginUtil.java
+++ 
b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/execution/PluginUtil.java
@@ -117,7 +117,6 @@ public class PluginUtil {
                             .equals(e.getMessage())) {
                 return true;
             }
-            return true;
         }
         return false;
     }
diff --git 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
index 86caa6939a..4f02b679b5 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
@@ -138,7 +138,6 @@ public class SinkExecuteProcessor
                             .equals(e.getMessage())) {
                 return true;
             }
-            return true;
         }
         return false;
     }
diff --git 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
index 2109ffe88f..c796a99400 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
@@ -141,7 +141,6 @@ public class SinkExecuteProcessor
                             .equals(e.getMessage())) {
                 return true;
             }
-            return true;
         }
         return false;
     }
diff --git 
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
 
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
index 5546890e83..156fd9e699 100644
--- 
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
@@ -157,7 +157,6 @@ public class SinkExecuteProcessor
                             .equals(e.getMessage())) {
                 return true;
             }
-            return true;
         }
         return false;
     }
diff --git 
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
 
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
index b3d978b1cb..64d9fcaf77 100644
--- 
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
@@ -159,7 +159,6 @@ public class SinkExecuteProcessor
                             .equals(e.getMessage())) {
                 return true;
             }
-            return true;
         }
         return false;
     }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TestUtils.java
 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TestUtils.java
index 0e47404b8e..6de34c3366 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TestUtils.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TestUtils.java
@@ -63,7 +63,6 @@ public class TestUtils {
                                         "fields", ImmutableMap.of("id", "int", 
"name", "string"))));
         FakeSource fakeSource = new 
FakeSource(ReadonlyConfig.fromConfig(fakeSourceConfig));
         fakeSource.setJobContext(jobContext);
-        fakeSource.prepare(fakeSourceConfig);
 
         Action fake =
                 new SourceAction<>(
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointPlanTest.java
 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointPlanTest.java
index 19faf3df56..77ff157bd1 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointPlanTest.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointPlanTest.java
@@ -115,7 +115,6 @@ public class CheckpointPlanTest extends 
AbstractSeaTunnelServerTest {
                                 Collections.singletonMap(
                                         "fields", ImmutableMap.of("id", "int", 
"name", "string"))));
         FakeSource fakeSource = new 
FakeSource(ReadonlyConfig.fromConfig(fakeSourceConfig));
-        fakeSource.prepare(fakeSourceConfig);
         fakeSource.setJobContext(jobContext);
 
         Action fake =
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
index f4d4a03566..e7fb77e37a 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
@@ -165,8 +165,6 @@ public class TaskTest extends AbstractSeaTunnelServerTest {
                                 "schema",
                                 Collections.singletonMap(
                                         "fields", ImmutableMap.of("id", "int", 
"name", "string"))));
-        FakeSource fakeSource = new 
FakeSource(ReadonlyConfig.fromConfig(fakeSourceConfig));
-        fakeSource.prepare(fakeSourceConfig);
-        return fakeSource;
+        return new FakeSource(ReadonlyConfig.fromConfig(fakeSourceConfig));
     }
 }

Reply via email to