This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch api-draft
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/api-draft by this push:
new b063bd56 Add basic fake source (#1864)
b063bd56 is described below
commit b063bd564fe7a2633cfdfadbcc698fd606f2ac1b
Author: Wenjun Ruan <[email protected]>
AuthorDate: Thu May 12 20:55:28 2022 +0800
Add basic fake source (#1864)
---
seatunnel-api/pom.xml | 18 ++++++
.../seatunnel/api/table/connector/TableSink.java | 4 +-
.../seatunnel/api/table/connector/TableSource.java | 5 +-
.../seatunnel/api/table/factory/FactoryUtil.java | 32 +++++-----
.../api/table/factory/TableSinkFactory.java | 4 +-
.../api/table/factory/TableSourceFactory.java | 3 +-
seatunnel-connectors/pom.xml | 2 +
.../seatunnel-connectors-seatunnel-dist/pom.xml | 67 ++++++++++++++++++++
.../{ => seatunnel-connectors-seatunnel}/pom.xml | 15 ++---
.../pom.xml | 22 +++----
.../console/sink/ConsoleAggregatedCommitInfo.java | 8 +--
.../seatunnel/console/sink/ConsoleCommitInfo.java | 8 +--
.../seatunnel/console/sink/ConsoleSink.java | 68 ++++++++++++++++++++
.../seatunnel/console/sink/ConsoleSinkWriter.java | 29 +++++++--
.../seatunnel/console/state/ConsoleState.java | 7 +--
.../seatunnel-connectors-seatunnel-fake}/pom.xml | 21 ++++---
.../seatunnel/fake/source/FakeSource.java | 61 ++++++++++++++++++
.../seatunnel/fake/source/FakeSourceEvent.java | 28 +++++++--
.../seatunnel/fake/source/FakeSourceReader.java | 73 ++++++++++++++++++++++
.../seatunnel/fake/source/FakeSourceSplit.java | 22 +++++--
.../fake/source/FakeSourceSplitEnumerator.java | 73 ++++++++++++++++++++++
.../fake/source/FakeSupportCoordinate.java | 8 +--
.../seatunnel/fake/source/ObjectSerializer.java | 47 ++++++++++++++
.../connectors/seatunnel/fake/state/FakeState.java | 8 +--
24 files changed, 543 insertions(+), 90 deletions(-)
diff --git a/seatunnel-api/pom.xml b/seatunnel-api/pom.xml
index e86d7301..f3efb218 100644
--- a/seatunnel-api/pom.xml
+++ b/seatunnel-api/pom.xml
@@ -1,4 +1,22 @@
<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/TableSink.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/TableSink.java
index 9cd8efbe..8461e7dc 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/TableSink.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/TableSink.java
@@ -19,7 +19,7 @@ package org.apache.seatunnel.api.table.connector;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
-public interface TableSink {
+public interface TableSink<IN, StateT, CommitInfoT, AggregatedCommitInfoT> {
- SeaTunnelSink<?, ?, ?, ?> createSink();
+ SeaTunnelSink<IN, StateT, CommitInfoT, AggregatedCommitInfoT> createSink();
}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/TableSource.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/TableSource.java
index 7727735a..edb8ee69 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/TableSource.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/TableSource.java
@@ -18,8 +18,9 @@
package org.apache.seatunnel.api.table.connector;
import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceSplit;
-public interface TableSource {
+public interface TableSource<T, SplitT extends SourceSplit, StateT> {
- SeaTunnelSource<?, ?, ?> createSource();
+ SeaTunnelSource<T, SplitT, StateT> createSource();
}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
index b0677676..c260c264 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.api.table.factory;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceSplit;
import org.apache.seatunnel.api.table.catalog.Catalog;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.connector.TableSource;
@@ -41,37 +42,40 @@ public final class FactoryUtil {
private static final Logger LOG =
LoggerFactory.getLogger(FactoryUtil.class);
- public static List<SeaTunnelSource> createAndPrepareSource(
- List<CatalogTable> multipleTables,
- Map<String, String> options,
- ClassLoader classLoader,
- String factoryIdentifier) {
+ public static <T, SplitT extends SourceSplit, StateT>
List<SeaTunnelSource<T, SplitT, StateT>> createAndPrepareSource(
+ List<CatalogTable> multipleTables,
+ Map<String, String> options,
+ ClassLoader classLoader,
+ String factoryIdentifier) {
try {
-
final TableSourceFactory factory = discoverFactory(classLoader,
TableSourceFactory.class, factoryIdentifier);
- List<SeaTunnelSource> sources = new
ArrayList<>(multipleTables.size());
+ List<SeaTunnelSource<T, SplitT, StateT>> sources = new
ArrayList<>(multipleTables.size());
if (factory instanceof SupportMultipleTable) {
TableFactoryContext context = new
TableFactoryContext(multipleTables, options, classLoader);
SupportMultipleTable multipleTableSourceFactory =
(SupportMultipleTable) factory;
// TODO: create all source
SupportMultipleTable.Result result =
multipleTableSourceFactory.applyTables(context);
- TableSource multipleTableSource = factory.createSource(new
TableFactoryContext(result.getAcceptedTables(), options, classLoader));
+ TableSource<T, SplitT, StateT> multipleTableSource =
factory.createSource(
+ new TableFactoryContext(result.getAcceptedTables(),
options, classLoader));
// TODO: handle reading metadata
- SeaTunnelSource<?, ?, ?> source =
multipleTableSource.createSource();
+ SeaTunnelSource<T, SplitT, StateT> source =
multipleTableSource.createSource();
sources.add(source);
}
return sources;
} catch (Throwable t) {
throw new FactoryException(
- String.format(
- "Unable to create a source for identifier '%s'.",
factoryIdentifier),
- t);
+ String.format(
+ "Unable to create a source for identifier '%s'.",
factoryIdentifier),
+ t);
}
}
- public static List<SeaTunnelSink> createAndPrepareSink() {
- return null;
+ public static <IN, StateT, CommitInfoT, AggregatedCommitInfoT>
SeaTunnelSink<IN, StateT, CommitInfoT, AggregatedCommitInfoT>
createAndPrepareSink(
+ ClassLoader classLoader, String factoryIdentifier) {
+ // todo: do we need to set table?
+ TableSinkFactory<IN, StateT, CommitInfoT, AggregatedCommitInfoT>
factory = discoverFactory(classLoader, TableSinkFactory.class,
factoryIdentifier);
+ return factory.createSink(null).createSink();
}
public static Catalog createCatalog(String catalogName,
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSinkFactory.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSinkFactory.java
index bb92a76a..bdc3d1a6 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSinkFactory.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSinkFactory.java
@@ -19,7 +19,7 @@ package org.apache.seatunnel.api.table.factory;
import org.apache.seatunnel.api.table.connector.TableSink;
-public interface TableSinkFactory extends Factory {
+public interface TableSinkFactory<IN, StateT, CommitInfoT,
AggregatedCommitInfoT> extends Factory {
- TableSink createSink(TableFactoryContext context);
+ TableSink<IN, StateT, CommitInfoT, AggregatedCommitInfoT>
createSink(TableFactoryContext context);
}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSourceFactory.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSourceFactory.java
index 241deeb0..2206a6bb 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSourceFactory.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSourceFactory.java
@@ -17,9 +17,10 @@
package org.apache.seatunnel.api.table.factory;
+import org.apache.seatunnel.api.source.SourceSplit;
import org.apache.seatunnel.api.table.connector.TableSource;
public interface TableSourceFactory extends Factory {
- TableSource createSource(TableFactoryContext context);
+ <T, SplitT extends SourceSplit, StateT> TableSource<T, SplitT, StateT>
createSource(TableFactoryContext context);
}
diff --git a/seatunnel-connectors/pom.xml b/seatunnel-connectors/pom.xml
index 69a02445..482cf788 100644
--- a/seatunnel-connectors/pom.xml
+++ b/seatunnel-connectors/pom.xml
@@ -35,6 +35,8 @@
<module>seatunnel-connectors-flink-dist</module>
<module>seatunnel-connectors-spark</module>
<module>seatunnel-connectors-spark-dist</module>
+ <module>seatunnel-connectors-seatunnel</module>
+ <module>seatunnel-connectors-seatunnel-dist</module>
</modules>
</project>
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel-dist/pom.xml
b/seatunnel-connectors/seatunnel-connectors-seatunnel-dist/pom.xml
new file mode 100644
index 00000000..b487fe1a
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel-dist/pom.xml
@@ -0,0 +1,67 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>seatunnel-connectors</artifactId>
+ <groupId>org.apache.seatunnel</groupId>
+ <version>${revision}</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>seatunnel-connectors-seatunnel-dist</artifactId>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-connectors-seatunnel-fake</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-connectors-seatunnel-console</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-dependency-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>copy-connector</id>
+ <phase>package</phase>
+ <goals>
+ <goal>copy-dependencies</goal>
+ </goals>
+ <configuration>
+ <type>jar</type>
+ <includeTypes>jar</includeTypes>
+
<outputDirectory>${project.build.directory}/lib</outputDirectory>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
\ No newline at end of file
diff --git a/seatunnel-connectors/pom.xml
b/seatunnel-connectors/seatunnel-connectors-seatunnel/pom.xml
similarity index 79%
copy from seatunnel-connectors/pom.xml
copy to seatunnel-connectors/seatunnel-connectors-seatunnel/pom.xml
index 69a02445..7897a316 100644
--- a/seatunnel-connectors/pom.xml
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/pom.xml
@@ -21,20 +21,17 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
+ <artifactId>seatunnel-connectors</artifactId>
<groupId>org.apache.seatunnel</groupId>
- <artifactId>seatunnel</artifactId>
<version>${revision}</version>
</parent>
<modelVersion>4.0.0</modelVersion>
-
- <artifactId>seatunnel-connectors</artifactId>
<packaging>pom</packaging>
+ <artifactId>seatunnel-connectors-seatunnel</artifactId>
+
<modules>
- <module>seatunnel-connectors-flink</module>
- <module>seatunnel-connectors-flink-dist</module>
- <module>seatunnel-connectors-spark</module>
- <module>seatunnel-connectors-spark-dist</module>
+ <module>seatunnel-connectors-seatunnel-console</module>
+ <module>seatunnel-connectors-seatunnel-fake</module>
</modules>
-
-</project>
+</project>
\ No newline at end of file
diff --git a/seatunnel-connectors/pom.xml
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/pom.xml
similarity index 76%
copy from seatunnel-connectors/pom.xml
copy to
seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/pom.xml
index 69a02445..9bc8680f 100644
--- a/seatunnel-connectors/pom.xml
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/pom.xml
@@ -1,6 +1,5 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
-
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
@@ -15,26 +14,25 @@
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.apache.seatunnel</groupId>
- <artifactId>seatunnel</artifactId>
+ <artifactId>seatunnel-connectors-seatunnel</artifactId>
<version>${revision}</version>
</parent>
<modelVersion>4.0.0</modelVersion>
- <artifactId>seatunnel-connectors</artifactId>
- <packaging>pom</packaging>
+ <artifactId>seatunnel-connectors-seatunnel-console</artifactId>
- <modules>
- <module>seatunnel-connectors-flink</module>
- <module>seatunnel-connectors-flink-dist</module>
- <module>seatunnel-connectors-spark</module>
- <module>seatunnel-connectors-spark-dist</module>
- </modules>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-api</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
-</project>
+</project>
\ No newline at end of file
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/TableSink.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleAggregatedCommitInfo.java
similarity index 81%
copy from
seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/TableSink.java
copy to
seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleAggregatedCommitInfo.java
index 9cd8efbe..d866e084 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/TableSink.java
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleAggregatedCommitInfo.java
@@ -15,11 +15,7 @@
* limitations under the License.
*/
-package org.apache.seatunnel.api.table.connector;
+package org.apache.seatunnel.connectors.seatunnel.console.sink;
-import org.apache.seatunnel.api.sink.SeaTunnelSink;
-
-public interface TableSink {
-
- SeaTunnelSink<?, ?, ?, ?> createSink();
+public class ConsoleAggregatedCommitInfo {
}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/TableSink.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleCommitInfo.java
similarity index 81%
copy from
seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/TableSink.java
copy to
seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleCommitInfo.java
index 9cd8efbe..6c41be0c 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/TableSink.java
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleCommitInfo.java
@@ -15,11 +15,7 @@
* limitations under the License.
*/
-package org.apache.seatunnel.api.table.connector;
+package org.apache.seatunnel.connectors.seatunnel.console.sink;
-import org.apache.seatunnel.api.sink.SeaTunnelSink;
-
-public interface TableSink {
-
- SeaTunnelSink<?, ?, ?, ?> createSink();
+public class ConsoleCommitInfo {
}
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSink.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSink.java
new file mode 100644
index 00000000..947b3125
--- /dev/null
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSink.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.console.sink;
+
+import org.apache.seatunnel.api.serialization.Serializer;
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
+import org.apache.seatunnel.api.sink.SinkCommitter;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.connectors.seatunnel.console.state.ConsoleState;
+
+import java.util.List;
+import java.util.Optional;
+
+public class ConsoleSink implements SeaTunnelSink<SeaTunnelRow, ConsoleState,
ConsoleCommitInfo, ConsoleAggregatedCommitInfo> {
+
+ @Override
+ public SinkWriter<SeaTunnelRow, ConsoleCommitInfo, ConsoleState>
createWriter(SinkWriter.Context context) {
+ return new ConsoleSinkWriter();
+ }
+
+ @Override
+ public SinkWriter<SeaTunnelRow, ConsoleCommitInfo, ConsoleState>
restoreWriter(
+ SinkWriter.Context context, List<ConsoleState> states) {
+ return restoreWriter(context, states);
+ }
+
+ @Override
+ public Optional<Serializer<ConsoleState>> getWriterStateSerializer() {
+ return getWriterStateSerializer();
+ }
+
+ @Override
+ public Optional<SinkCommitter<ConsoleCommitInfo>> createCommitter() {
+ return createCommitter();
+ }
+
+ @Override
+ public Optional<Serializer<ConsoleCommitInfo>> getCommitInfoSerializer() {
+ return getCommitInfoSerializer();
+ }
+
+ @Override
+ public Optional<SinkAggregatedCommitter<ConsoleCommitInfo,
ConsoleAggregatedCommitInfo>> createAggregatedCommitter() {
+ return createAggregatedCommitter();
+ }
+
+ @Override
+ public Optional<Serializer<ConsoleAggregatedCommitInfo>>
getAggregatedCommitInfoSerializer() {
+ return getAggregatedCommitInfoSerializer();
+ }
+}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSourceFactory.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriter.java
similarity index 50%
copy from
seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSourceFactory.java
copy to
seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriter.java
index 241deeb0..e395ad3e 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSourceFactory.java
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriter.java
@@ -15,11 +15,32 @@
* limitations under the License.
*/
-package org.apache.seatunnel.api.table.factory;
+package org.apache.seatunnel.connectors.seatunnel.console.sink;
-import org.apache.seatunnel.api.table.connector.TableSource;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.connectors.seatunnel.console.state.ConsoleState;
-public interface TableSourceFactory extends Factory {
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
- TableSource createSource(TableFactoryContext context);
+public class ConsoleSinkWriter implements SinkWriter<SeaTunnelRow,
ConsoleCommitInfo, ConsoleState> {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ConsoleSinkWriter.class);
+
+ @Override
+ @SuppressWarnings("checkstyle:RegexpSingleline")
+ public void write(SeaTunnelRow element) {
+ System.out.println(element.toString());
+ }
+
+ @Override
+ public ConsoleCommitInfo prepareCommit() {
+ return null;
+ }
+
+ @Override
+ public void close() {
+
+ }
}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/TableSink.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/state/ConsoleState.java
similarity index 82%
copy from
seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/TableSink.java
copy to
seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/state/ConsoleState.java
index 9cd8efbe..b2abaeff 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/TableSink.java
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/state/ConsoleState.java
@@ -15,11 +15,10 @@
* limitations under the License.
*/
-package org.apache.seatunnel.api.table.connector;
+package org.apache.seatunnel.connectors.seatunnel.console.state;
-import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.state.State;
-public interface TableSink {
+public class ConsoleState implements State {
- SeaTunnelSink<?, ?, ?, ?> createSink();
}
diff --git a/seatunnel-connectors/pom.xml
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/pom.xml
similarity index 76%
copy from seatunnel-connectors/pom.xml
copy to
seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/pom.xml
index 69a02445..cc2daab7 100644
--- a/seatunnel-connectors/pom.xml
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/pom.xml
@@ -22,19 +22,20 @@
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.apache.seatunnel</groupId>
- <artifactId>seatunnel</artifactId>
+ <artifactId>seatunnel-connectors-seatunnel</artifactId>
<version>${revision}</version>
</parent>
<modelVersion>4.0.0</modelVersion>
- <artifactId>seatunnel-connectors</artifactId>
- <packaging>pom</packaging>
+ <artifactId>seatunnel-connectors-seatunnel-fake</artifactId>
- <modules>
- <module>seatunnel-connectors-flink</module>
- <module>seatunnel-connectors-flink-dist</module>
- <module>seatunnel-connectors-spark</module>
- <module>seatunnel-connectors-spark-dist</module>
- </modules>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-api</artifactId>
+ <version>${project.version}</version>
+ </dependency>
-</project>
+ </dependencies>
+
+</project>
\ No newline at end of file
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
new file mode 100644
index 00000000..0c3fb31b
--- /dev/null
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.fake.source;
+
+import org.apache.seatunnel.api.serialization.Serializer;
+import org.apache.seatunnel.api.source.Boundedness;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.connectors.seatunnel.fake.state.FakeState;
+
+public class FakeSource implements SeaTunnelSource<FakeSourceEvent,
FakeSourceSplit, FakeState> {
+
+ @Override
+ public Boundedness getBoundedness() {
+ return Boundedness.BOUNDED;
+ }
+
+ @Override
+ public SourceReader<FakeSourceEvent, FakeSourceSplit>
createReader(SourceReader.Context readerContext) {
+ return new FakeSourceReader(readerContext);
+ }
+
+ @Override
+ public Serializer<FakeSourceSplit> getSplitSerializer() {
+ return new ObjectSerializer<>();
+ }
+
+ @Override
+ public SourceSplitEnumerator<FakeSourceSplit, FakeState> createEnumerator(
+ SourceSplitEnumerator.Context<FakeSourceSplit> enumeratorContext) {
+ return new FakeSourceSplitEnumerator(enumeratorContext);
+ }
+
+ @Override
+ public SourceSplitEnumerator<FakeSourceSplit, FakeState> restoreEnumerator(
+ SourceSplitEnumerator.Context<FakeSourceSplit> enumeratorContext,
FakeState checkpointState) {
+ // todo:
+ return null;
+ }
+
+ @Override
+ public Serializer<FakeState> getEnumeratorStateSerializer() {
+ return new ObjectSerializer<>();
+ }
+}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSourceFactory.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceEvent.java
similarity index 57%
copy from
seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSourceFactory.java
copy to
seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceEvent.java
index 241deeb0..76e0a3eb 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSourceFactory.java
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceEvent.java
@@ -15,11 +15,31 @@
* limitations under the License.
*/
-package org.apache.seatunnel.api.table.factory;
+package org.apache.seatunnel.connectors.seatunnel.fake.source;
-import org.apache.seatunnel.api.table.connector.TableSource;
+import org.apache.seatunnel.api.source.SourceEvent;
-public interface TableSourceFactory extends Factory {
+public class FakeSourceEvent implements SourceEvent {
- TableSource createSource(TableFactoryContext context);
+ private final String name;
+ private final int age;
+ private final long timestamp;
+
+ public FakeSourceEvent(String name, int age, long timestamp) {
+ this.name = name;
+ this.age = age;
+ this.timestamp = timestamp;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public int getAge() {
+ return age;
+ }
+
+ public long getTimestamp() {
+ return timestamp;
+ }
}
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java
new file mode 100644
index 00000000..fa1c28c4
--- /dev/null
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.fake.source;
+
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.source.SourceReader;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+public class FakeSourceReader implements SourceReader<FakeSourceEvent,
FakeSourceSplit> {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(FakeSourceReader.class);
+
+ private final SourceReader.Context context;
+
+ public FakeSourceReader(SourceReader.Context context) {
+ this.context = context;
+ }
+
+ @Override
+ public void open() {
+
+ }
+
+ @Override
+ public void close() {
+
+ }
+
+ @Override
+ @SuppressWarnings("magicnumber")
+ public void pollNext(Collector<FakeSourceEvent> output) {
+ output.collect(new FakeSourceEvent("Tom", 19,
System.currentTimeMillis()));
+ }
+
+ @Override
+ public List<FakeSourceSplit> snapshotState(long checkpointId) {
+ return null;
+ }
+
+ @Override
+ public void addSplits(List<FakeSourceSplit> splits) {
+
+ }
+
+ @Override
+ public void handleNoMoreSplits() {
+
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) {
+
+ }
+}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSourceFactory.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceSplit.java
similarity index 63%
copy from
seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSourceFactory.java
copy to
seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceSplit.java
index 241deeb0..05e85061 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSourceFactory.java
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceSplit.java
@@ -15,11 +15,25 @@
* limitations under the License.
*/
-package org.apache.seatunnel.api.table.factory;
+package org.apache.seatunnel.connectors.seatunnel.fake.source;
-import org.apache.seatunnel.api.table.connector.TableSource;
+import org.apache.seatunnel.api.source.SourceSplit;
-public interface TableSourceFactory extends Factory {
+import java.io.Serializable;
+
+public class FakeSourceSplit implements SourceSplit, Serializable {
+
+ private static final long serialVersionUID = -1L;
+
+ private final String splitId;
+
+ public FakeSourceSplit(String splitId) {
+ this.splitId = splitId;
+ }
+
+ @Override
+ public String splitId() {
+ return splitId;
+ }
- TableSource createSource(TableFactoryContext context);
}
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceSplitEnumerator.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceSplitEnumerator.java
new file mode 100644
index 00000000..e41d96c0
--- /dev/null
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceSplitEnumerator.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.fake.source;
+
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.connectors.seatunnel.fake.state.FakeState;
+
+import java.io.IOException;
+import java.util.List;
+
+public class FakeSourceSplitEnumerator implements
SourceSplitEnumerator<FakeSourceSplit, FakeState> {
+
+ private final SourceSplitEnumerator.Context<FakeSourceSplit>
enumeratorContext;
+
+ public
FakeSourceSplitEnumerator(SourceSplitEnumerator.Context<FakeSourceSplit>
enumeratorContext) {
+ this.enumeratorContext = enumeratorContext;
+ }
+
+ @Override
+ public void open() {
+
+ }
+
+ @Override
+ public void run() {
+
+ }
+
+ @Override
+ public void close() throws IOException {
+
+ }
+
+ @Override
+ public void addSplitsBack(List<FakeSourceSplit> splits, int subtaskId) {
+
+ }
+
+ @Override
+ public void handleSplitRequest(int subtaskId) {
+
+ }
+
+ @Override
+ public void registerReader(int subtaskId) {
+
+ }
+
+ @Override
+ public FakeState snapshotState(long checkpointId) throws Exception {
+ return null;
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) throws Exception {
+
+ }
+}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/TableSink.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSupportCoordinate.java
similarity index 81%
copy from
seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/TableSink.java
copy to
seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSupportCoordinate.java
index 9cd8efbe..f7a9b098 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/TableSink.java
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSupportCoordinate.java
@@ -15,11 +15,9 @@
* limitations under the License.
*/
-package org.apache.seatunnel.api.table.connector;
+package org.apache.seatunnel.connectors.seatunnel.fake.source;
-import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.source.SupportCoordinate;
-public interface TableSink {
-
- SeaTunnelSink<?, ?, ?, ?> createSink();
+public class FakeSupportCoordinate implements SupportCoordinate {
}
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/ObjectSerializer.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/ObjectSerializer.java
new file mode 100644
index 00000000..bd53b4a2
--- /dev/null
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/ObjectSerializer.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.fake.source;
+
+import org.apache.seatunnel.api.serialization.Serializer;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+
+public class ObjectSerializer<T> implements Serializer<T> {
+
+ @Override
+ public byte[] serialize(T obj) throws IOException {
+ try (ByteArrayOutputStream byteArrayOutputStream = new
ByteArrayOutputStream();
+ ObjectOutputStream objectOutputStream = new
ObjectOutputStream(byteArrayOutputStream)) {
+ objectOutputStream.writeObject(obj);
+ return byteArrayOutputStream.toByteArray();
+ }
+ }
+
+ @Override
+ public T deserialize(byte[] serialized) throws IOException {
+ try {
+ return (T) new ObjectInputStream(new
ByteArrayInputStream(serialized)).readObject();
+ } catch (ClassNotFoundException e) {
+ throw new RuntimeException("deserialize split error", e);
+ }
+ }
+}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/TableSink.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/state/FakeState.java
similarity index 82%
copy from
seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/TableSink.java
copy to
seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/state/FakeState.java
index 9cd8efbe..a920f830 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/TableSink.java
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/state/FakeState.java
@@ -15,11 +15,9 @@
* limitations under the License.
*/
-package org.apache.seatunnel.api.table.connector;
+package org.apache.seatunnel.connectors.seatunnel.fake.state;
-import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.state.State;
-public interface TableSink {
-
- SeaTunnelSink<?, ?, ?, ?> createSink();
+public class FakeState implements State {
}