This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new a04add6991 [Improve] Add default implement for
`SeaTunnelSource::getProducedType` (#5670)
a04add6991 is described below
commit a04add6991b3c015dbba230586649b9b0d703760
Author: Jia Fan <[email protected]>
AuthorDate: Tue Oct 24 10:22:33 2023 +0800
[Improve] Add default implement for `SeaTunnelSource::getProducedType`
(#5670)
---
.../seatunnel/api/source/SeaTunnelSource.java | 2 +-
.../cdc/base/source/IncrementalSource.java | 22 ---------------
.../seatunnel/fake/source/FakeSource.java | 31 ----------------------
.../core/starter/execution/PluginUtil.java | 1 -
.../flink/execution/SinkExecuteProcessor.java | 1 -
.../flink/execution/SinkExecuteProcessor.java | 1 -
.../spark/execution/SinkExecuteProcessor.java | 1 -
.../spark/execution/SinkExecuteProcessor.java | 1 -
.../apache/seatunnel/engine/server/TestUtils.java | 1 -
.../server/checkpoint/CheckpointPlanTest.java | 1 -
.../seatunnel/engine/server/dag/TaskTest.java | 4 +--
11 files changed, 2 insertions(+), 64 deletions(-)
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelSource.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelSource.java
index 924c2e5244..916f076f57 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelSource.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelSource.java
@@ -56,7 +56,7 @@ public interface SeaTunnelSource<T, SplitT extends
SourceSplit, StateT extends S
*/
@Deprecated
default SeaTunnelDataType<T> getProducedType() {
- throw new UnsupportedOperationException("getProducedType method has
not been implemented.");
+ return (SeaTunnelDataType)
getProducedCatalogTables().get(0).getSeaTunnelRowType();
}
/**
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java
index 4c8ee235a9..11769ab87a 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java
@@ -17,9 +17,6 @@
package org.apache.seatunnel.connectors.cdc.base.source;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.common.metrics.MetricsContext;
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
@@ -118,20 +115,6 @@ public abstract class IncrementalSource<T, C extends
SourceConfig>
this.offsetFactory = createOffsetFactory(readonlyConfig);
}
- @Override
- public final void prepare(Config pluginConfig) throws PrepareFailException
{
- this.readonlyConfig = ReadonlyConfig.fromConfig(pluginConfig);
-
- this.startupConfig = getStartupConfig(readonlyConfig);
- this.stopConfig = getStopConfig(readonlyConfig);
- this.stopMode = stopConfig.getStopMode();
- this.incrementalParallelism =
readonlyConfig.get(SourceOptions.INCREMENTAL_PARALLELISM);
- this.configFactory = createSourceConfigFactory(readonlyConfig);
- this.dataSourceDialect = createDataSourceDialect(readonlyConfig);
- this.deserializationSchema =
createDebeziumDeserializationSchema(readonlyConfig);
- this.offsetFactory = createOffsetFactory(readonlyConfig);
- }
-
protected StartupConfig getStartupConfig(ReadonlyConfig config) {
return new StartupConfig(
config.get(getStartupModeOption()),
@@ -178,11 +161,6 @@ public abstract class IncrementalSource<T, C extends
SourceConfig>
return stopMode == StopMode.NEVER ? Boundedness.UNBOUNDED :
Boundedness.BOUNDED;
}
- @Override
- public SeaTunnelDataType<T> getProducedType() {
- return deserializationSchema.getProducedType();
- }
-
@SuppressWarnings("MagicNumber")
@Override
public SourceReader<T, SourceSplitBase> createReader(SourceReader.Context
readerContext)
diff --git
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
index e634347fde..d3bf9c6430 100644
---
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
+++
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
@@ -17,10 +17,7 @@
package org.apache.seatunnel.connectors.seatunnel.fake.source;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
import org.apache.seatunnel.api.common.JobContext;
-import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
@@ -31,27 +28,19 @@ import org.apache.seatunnel.api.source.SupportParallelism;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
-import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.common.config.CheckConfigUtil;
-import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.JobMode;
-import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.connectors.seatunnel.fake.config.FakeConfig;
-import
org.apache.seatunnel.connectors.seatunnel.fake.exception.FakeConnectorException;
import org.apache.seatunnel.connectors.seatunnel.fake.state.FakeSourceState;
import org.apache.commons.collections4.CollectionUtils;
-import com.google.auto.service.AutoService;
import com.google.common.collect.Lists;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
-@AutoService(SeaTunnelSource.class)
public class FakeSource
implements SeaTunnelSource<SeaTunnelRow, FakeSourceSplit,
FakeSourceState>,
SupportParallelism,
@@ -96,11 +85,6 @@ public class FakeSource
.collect(Collectors.toList());
}
- @Override
- public SeaTunnelRowType getProducedType() {
- return catalogTable.getSeaTunnelRowType();
- }
-
@Override
public SourceSplitEnumerator<FakeSourceSplit, FakeSourceState>
createEnumerator(
SourceSplitEnumerator.Context<FakeSourceSplit> enumeratorContext)
throws Exception {
@@ -126,21 +110,6 @@ public class FakeSource
return "FakeSource";
}
- @Override
- public void prepare(Config pluginConfig) {
- CheckResult result =
- CheckConfigUtil.checkAllExists(pluginConfig,
TableSchemaOptions.SCHEMA.key());
- if (!result.isSuccess()) {
- throw new FakeConnectorException(
- SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
- String.format(
- "PluginName: %s, PluginType: %s, Message: %s",
- getPluginName(), PluginType.SOURCE,
result.getMsg()));
- }
- this.catalogTable = CatalogTableUtil.buildWithConfig(pluginConfig);
- this.fakeConfig = FakeConfig.buildWithConfig(pluginConfig);
- }
-
@Override
public void setJobContext(JobContext jobContext) {
this.jobContext = jobContext;
diff --git
a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/execution/PluginUtil.java
b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/execution/PluginUtil.java
index 0b1f34d3c1..0dc4209a8b 100644
---
a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/execution/PluginUtil.java
+++
b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/execution/PluginUtil.java
@@ -117,7 +117,6 @@ public class PluginUtil {
.equals(e.getMessage())) {
return true;
}
- return true;
}
return false;
}
diff --git
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
index 86caa6939a..4f02b679b5 100644
---
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
@@ -138,7 +138,6 @@ public class SinkExecuteProcessor
.equals(e.getMessage())) {
return true;
}
- return true;
}
return false;
}
diff --git
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
index 2109ffe88f..c796a99400 100644
---
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
@@ -141,7 +141,6 @@ public class SinkExecuteProcessor
.equals(e.getMessage())) {
return true;
}
- return true;
}
return false;
}
diff --git
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
index 5546890e83..156fd9e699 100644
---
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
@@ -157,7 +157,6 @@ public class SinkExecuteProcessor
.equals(e.getMessage())) {
return true;
}
- return true;
}
return false;
}
diff --git
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
index b3d978b1cb..64d9fcaf77 100644
---
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
@@ -159,7 +159,6 @@ public class SinkExecuteProcessor
.equals(e.getMessage())) {
return true;
}
- return true;
}
return false;
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TestUtils.java
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TestUtils.java
index 0e47404b8e..6de34c3366 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TestUtils.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TestUtils.java
@@ -63,7 +63,6 @@ public class TestUtils {
"fields", ImmutableMap.of("id", "int",
"name", "string"))));
FakeSource fakeSource = new
FakeSource(ReadonlyConfig.fromConfig(fakeSourceConfig));
fakeSource.setJobContext(jobContext);
- fakeSource.prepare(fakeSourceConfig);
Action fake =
new SourceAction<>(
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointPlanTest.java
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointPlanTest.java
index 19faf3df56..77ff157bd1 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointPlanTest.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointPlanTest.java
@@ -115,7 +115,6 @@ public class CheckpointPlanTest extends
AbstractSeaTunnelServerTest {
Collections.singletonMap(
"fields", ImmutableMap.of("id", "int",
"name", "string"))));
FakeSource fakeSource = new
FakeSource(ReadonlyConfig.fromConfig(fakeSourceConfig));
- fakeSource.prepare(fakeSourceConfig);
fakeSource.setJobContext(jobContext);
Action fake =
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
index f4d4a03566..e7fb77e37a 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
@@ -165,8 +165,6 @@ public class TaskTest extends AbstractSeaTunnelServerTest {
"schema",
Collections.singletonMap(
"fields", ImmutableMap.of("id", "int",
"name", "string"))));
- FakeSource fakeSource = new
FakeSource(ReadonlyConfig.fromConfig(fakeSourceConfig));
- fakeSource.prepare(fakeSourceConfig);
- return fakeSource;
+ return new FakeSource(ReadonlyConfig.fromConfig(fakeSourceConfig));
}
}