This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch api-draft
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/api-draft by this push:
     new 3abf728a Add boundedness to source reader context, so that we can 
control to stop this reader when produce data (#1885)
3abf728a is described below

commit 3abf728a77299dafe221bceb5bd5a43d5163ca59
Author: Wenjun Ruan <[email protected]>
AuthorDate: Mon May 16 11:17:14 2022 +0800

    Add boundedness to source reader context, so that we can control to stop 
this reader when produce data (#1885)
---
 pom.xml                                            | 43 +++++++++++++++-----
 seatunnel-api/pom.xml                              |  4 --
 .../apache/seatunnel/api/source/Boundedness.java   |  4 ++
 .../org/apache/seatunnel/api/source/Collector.java |  5 +++
 .../seatunnel/api/source/SeaTunnelSource.java      | 39 +++++++++++++++++-
 .../apache/seatunnel/api/source/SourceEvent.java   |  2 +-
 .../apache/seatunnel/api/source/SourceReader.java  | 39 +++++++++++++++++-
 .../api/source/SourceSplitEnumerator.java          | 12 ++++++
 .../java/org/apache/seatunnel/api/state/State.java |  4 +-
 .../seatunnel/api/table/factory/Factory.java       |  3 ++
 .../api/table/factory/TableSinkFactory.java        |  9 +++++
 .../api/table/factory/TableSourceFactory.java      |  4 ++
 .../seatunnel/common/utils/SerializationUtils.java |  8 ++--
 .../seatunnel/console/sink/ConsoleSink.java        |  3 ++
 .../org.apache.seatunnel.api.sink.SeaTunnelSink    | 17 --------
 .../seatunnel/fake/source/FakeSource.java          |  8 +++-
 .../seatunnel/fake/source/FakeSourceReader.java    |  7 +++-
 .../seatunnel/fake/source/ObjectSerializer.java    | 47 ----------------------
 ...org.apache.seatunnel.api.source.SeaTunnelSource | 17 --------
 seatunnel-dist/release-docs/LICENSE                |  3 --
 seatunnel-dist/src/main/assembly/assembly-bin.xml  | 10 +++++
 .../translation/source/ParallelReaderContext.java  |  9 +++++
 .../translation/source/ParallelSource.java         |  9 +++--
 .../util/ThreadPoolExecutorFactory.java            |  1 +
 tools/dependencies/known-dependencies.txt          |  2 -
 25 files changed, 195 insertions(+), 114 deletions(-)

diff --git a/pom.xml b/pom.xml
index 893a8bd3..7880a2ae 100644
--- a/pom.xml
+++ b/pom.xml
@@ -174,6 +174,8 @@
         <skipIT>true</skipIT>
         <elasticsearch>7</elasticsearch>
         <slf4j.version>1.7.25</slf4j.version>
+        <guava.version>19.0</guava.version>
+        <auto-service.version>1.0.1</auto-service.version>
     </properties>
 
     <dependencyManagement>
@@ -607,20 +609,43 @@
                 <version>${scala.version}</version>
             </dependency>
 
-            <!-- logging -->
             <dependency>
-                <groupId>org.slf4j</groupId>
-                <artifactId>slf4j-api</artifactId>
-                <version>${slf4j.version}</version>
-            </dependency>
-            <dependency>
-                <groupId>org.slf4j</groupId>
-                <artifactId>slf4j-log4j12</artifactId>
-                <version>${slf4j.version}</version>
+                <groupId>com.google.guava</groupId>
+                <artifactId>guava</artifactId>
+                <version>${guava.version}</version>
             </dependency>
         </dependencies>
     </dependencyManagement>
 
+    <dependencies>
+        <dependency>
+            <groupId>com.google.auto.service</groupId>
+            <artifactId>auto-service</artifactId>
+            <version>${auto-service.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.projectlombok</groupId>
+            <artifactId>lombok</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+            <version>${slf4j.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-log4j12</artifactId>
+            <version>${slf4j.version}</version>
+        </dependency>
+    </dependencies>
+
     <build>
 
         
<finalName>${project.artifactId}-${project.version}-${scala.version}</finalName>
diff --git a/seatunnel-api/pom.xml b/seatunnel-api/pom.xml
index f3efb218..7b6829a0 100644
--- a/seatunnel-api/pom.xml
+++ b/seatunnel-api/pom.xml
@@ -38,9 +38,5 @@
             <artifactId>seatunnel-common</artifactId>
             <version>${project.version}</version>
         </dependency>
-        <dependency>
-            <groupId>org.slf4j</groupId>
-            <artifactId>slf4j-api</artifactId>
-        </dependency>
     </dependencies>
 </project>
\ No newline at end of file
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/Boundedness.java 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/Boundedness.java
index bceb3cdb..c7b3fcd9 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/Boundedness.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/Boundedness.java
@@ -17,6 +17,10 @@
 
 package org.apache.seatunnel.api.source;
 
+/**
+ * Used to define the boundedness of a source. In batch mode, the source is 
{@link Boundedness#BOUNDED}.
+ * In streaming mode, the source is {@link  Boundedness#UNBOUNDED}.
+ */
 public enum Boundedness {
     /**
      * A BOUNDED stream is a stream with finite records.
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/Collector.java 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/Collector.java
index f36a7f87..6bd08eec 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/Collector.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/Collector.java
@@ -17,6 +17,11 @@
 
 package org.apache.seatunnel.api.source;
 
+/**
+ * A {@link Collector} is used to collect data from {@link SourceReader}.
+ *
+ * @param <T> data type.
+ */
 public interface Collector<T> {
 
     void collect(T record);
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelSource.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelSource.java
index 729753fc..7244891a 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelSource.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelSource.java
@@ -27,6 +27,7 @@ import java.io.Serializable;
  *
  * @param <T>      The type of records produced by the source.
  * @param <SplitT> The type of splits handled by the source.
+ * @param <StateT> The type of checkpoint states.
  */
 public interface SeaTunnelSource<T, SplitT extends SourceSplit, StateT> 
extends Serializable {
 
@@ -37,14 +38,48 @@ public interface SeaTunnelSource<T, SplitT extends 
SourceSplit, StateT> extends
      */
     Boundedness getBoundedness();
 
+    /**
+     * Create source reader, used to produce data.
+     *
+     * @param readerContext reader context.
+     * @return source reader.
+     * @throws Exception when create reader failed.
+     */
     SourceReader<T, SplitT> createReader(SourceReader.Context readerContext) 
throws Exception;
 
+    /**
+     * Create split serializer, use to serialize/deserialize split generated 
by {@link SourceSplitEnumerator}.
+     *
+     * @return split serializer.
+     */
     Serializer<SplitT> getSplitSerializer();
 
-    SourceSplitEnumerator<SplitT, StateT> 
createEnumerator(SourceSplitEnumerator.Context<SplitT> enumeratorContext) 
throws Exception;
+    /**
+     * Create source split enumerator, used to generate splits. This method 
will be called only once when start a source.
+     *
+     * @param enumeratorContext enumerator context.
+     * @return source split enumerator.
+     * @throws Exception when create enumerator failed.
+     */
+    SourceSplitEnumerator<SplitT, StateT> 
createEnumerator(SourceSplitEnumerator.Context<SplitT> enumeratorContext)
+        throws Exception;
 
-    SourceSplitEnumerator<SplitT, StateT> 
restoreEnumerator(SourceSplitEnumerator.Context<SplitT> enumeratorContext, 
StateT checkpointState) throws Exception;
+    /**
+     * Create source split enumerator, used to generate splits. This method 
will be called when restore from checkpoint.
+     *
+     * @param enumeratorContext enumerator context.
+     * @param checkpointState   checkpoint state.
+     * @return source split enumerator.
+     * @throws Exception when create enumerator failed.
+     */
+    SourceSplitEnumerator<SplitT, StateT> 
restoreEnumerator(SourceSplitEnumerator.Context<SplitT> enumeratorContext,
+                                                            StateT 
checkpointState) throws Exception;
 
+    /**
+     * Create enumerator state serializer, used to serialize/deserialize 
checkpoint state.
+     *
+     * @return enumerator state serializer.
+     */
     Serializer<StateT> getEnumeratorStateSerializer();
 
 }
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceEvent.java 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceEvent.java
index 4d2374b4..2f4558fb 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceEvent.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceEvent.java
@@ -20,7 +20,7 @@ package org.apache.seatunnel.api.source;
 import java.io.Serializable;
 
 /**
- * An base class for the events passed between the SourceReaders and 
Enumerators.
+ * A base class for the events passed between the {@link SourceReader} and 
{@link SourceSplitEnumerator}.
  */
 public interface SourceEvent extends Serializable {
 }
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceReader.java 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceReader.java
index 8131e835..f43b5a55 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceReader.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceReader.java
@@ -22,8 +22,17 @@ import org.apache.seatunnel.api.state.CheckpointListener;
 import java.io.IOException;
 import java.util.List;
 
+/**
+ * The {@link SourceReader} is used to generate source record, and it will be 
running at worker.
+ *
+ * @param <T>      record type.
+ * @param <SplitT> source split type.
+ */
 public interface SourceReader<T, SplitT extends SourceSplit> extends 
AutoCloseable, CheckpointListener {
 
+    /**
+     * Open the source reader.
+     */
     void open();
 
     /**
@@ -33,10 +42,28 @@ public interface SourceReader<T, SplitT extends 
SourceSplit> extends AutoCloseab
     @Override
     void close() throws IOException;
 
+    /**
+     * Generate the next batch of records.
+     *
+     * @param output output collector.
+     * @throws Exception if error occurs.
+     */
     void pollNext(Collector<T> output) throws Exception;
 
+    /**
+     * Get the current split checkpoint state by checkpointId.
+     *
+     * @param checkpointId checkpoint Id.
+     * @return split checkpoint state.
+     * @throws Exception if error occurs.
+     */
     List<SplitT> snapshotState(long checkpointId) throws Exception;
 
+    /**
+     * Add the split checkpoint state to reader.
+     *
+     * @param splits split checkpoint state.
+     */
     void addSplits(List<SplitT> splits);
 
     /**
@@ -48,6 +75,11 @@ public interface SourceReader<T, SplitT extends SourceSplit> 
extends AutoCloseab
      */
     void handleNoMoreSplits();
 
+    /**
+     * Handle the source event form {@link SourceSplitEnumerator}.
+     *
+     * @param sourceEvent source event.
+     */
     default void handleSourceEvent(SourceEvent sourceEvent) {
     }
 
@@ -59,7 +91,12 @@ public interface SourceReader<T, SplitT extends SourceSplit> 
extends AutoCloseab
         int getIndexOfSubtask();
 
         /**
-         * Indicator that the input has reached the end of data.
+         * @return boundedness of this reader.
+         */
+        Boundedness getBoundedness();
+
+        /**
+         * Indicator that the input has reached the end of data. Then will 
cancel this reader.
          */
         void signalNoMoreElement();
 
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceSplitEnumerator.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceSplitEnumerator.java
index 77e75c8f..4ad8df6a 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceSplitEnumerator.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceSplitEnumerator.java
@@ -24,6 +24,12 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Set;
 
+/**
+ * The {@link SourceSplitEnumerator} is responsible for enumerating the splits 
of a source. It will run at master.
+ *
+ * @param <SplitT>       source split type
+ * @param <StateT>source split state type
+ */
 public interface SourceSplitEnumerator<SplitT extends SourceSplit, StateT> 
extends AutoCloseable, CheckpointListener {
 
     void open();
@@ -52,6 +58,12 @@ public interface SourceSplitEnumerator<SplitT extends 
SourceSplit, StateT> exten
 
     StateT snapshotState(long checkpointId) throws Exception;
 
+    /**
+     * Handle the source event from {@link SourceReader}.
+     *
+     * @param subtaskId   The id of the subtask to which the source event from.
+     * @param sourceEvent source event.
+     */
     default void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
     }
 
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/state/State.java 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/state/State.java
index d7e43d44..4384100b 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/state/State.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/state/State.java
@@ -17,5 +17,7 @@
 
 package org.apache.seatunnel.api.state;
 
-public interface State {
+import java.io.Serializable;
+
+public interface State extends Serializable {
 }
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/Factory.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/Factory.java
index 59b0fd51..cd35609b 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/Factory.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/Factory.java
@@ -17,6 +17,9 @@
 
 package org.apache.seatunnel.api.table.factory;
 
+/**
+ * This is the SPI interface.
+ */
 public interface Factory {
 
     /**
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSinkFactory.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSinkFactory.java
index bdc3d1a6..8d941885 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSinkFactory.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSinkFactory.java
@@ -19,6 +19,15 @@ package org.apache.seatunnel.api.table.factory;
 
 import org.apache.seatunnel.api.table.connector.TableSink;
 
+/**
+ * This is an SPI interface, used to create {@link TableSink}. Each plugin 
need to have it own implementation.
+ * todo: now we have not use this interface, we directly use {@link 
org.apache.seatunnel.api.sink.SeaTunnelSink} as the SPI interface.
+ *
+ * @param <IN>                    row type
+ * @param <StateT>                state type
+ * @param <CommitInfoT>           commit info type
+ * @param <AggregatedCommitInfoT> aggregated commit info type
+ */
 public interface TableSinkFactory<IN, StateT, CommitInfoT, 
AggregatedCommitInfoT> extends Factory {
 
     TableSink<IN, StateT, CommitInfoT, AggregatedCommitInfoT> 
createSink(TableFactoryContext context);
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSourceFactory.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSourceFactory.java
index 2206a6bb..a75236e0 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSourceFactory.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSourceFactory.java
@@ -20,6 +20,10 @@ package org.apache.seatunnel.api.table.factory;
 import org.apache.seatunnel.api.source.SourceSplit;
 import org.apache.seatunnel.api.table.connector.TableSource;
 
+/**
+ * This is an SPI interface, used to create {@link TableSource}. Each plugin 
need to have it own implementation.
+ * todo: now we have not use this interface, we directly use {@link 
org.apache.seatunnel.api.source.SeaTunnelSource} as the SPI interface
+ */
 public interface TableSourceFactory extends Factory {
 
     <T, SplitT extends SourceSplit, StateT> TableSource<T, SplitT, StateT> 
createSource(TableFactoryContext context);
diff --git 
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/SerializationUtils.java
 
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/SerializationUtils.java
index 43dead16..a1d0aa96 100644
--- 
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/SerializationUtils.java
+++ 
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/SerializationUtils.java
@@ -26,24 +26,24 @@ public class SerializationUtils {
 
     public static String objectToString(Serializable obj) {
         if (obj != null) {
-            return 
Base64.encodeBase64String(org.apache.commons.lang3.SerializationUtils.serialize(obj));
+            return 
Base64.encodeBase64String(SerializationUtils.serialize(obj));
         }
         return null;
     }
 
     public static <T extends Serializable> T stringToObject(String str) {
         if (StringUtils.isNotEmpty(str)) {
-            return 
org.apache.commons.lang3.SerializationUtils.deserialize(Base64.decodeBase64(str));
+            return SerializationUtils.deserialize(Base64.decodeBase64(str));
         }
         return null;
     }
 
     public static <T extends Serializable> byte[] serialize(T obj) {
-        return org.apache.commons.lang3.SerializationUtils.serialize(obj);
+        return SerializationUtils.serialize(obj);
     }
 
     public static <T extends Serializable> T deserialize(byte[] bytes) {
-        return org.apache.commons.lang3.SerializationUtils.deserialize(bytes);
+        return SerializationUtils.deserialize(bytes);
     }
 
 }
diff --git 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSink.java
 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSink.java
index d7eab44b..ea7f69cb 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSink.java
+++ 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSink.java
@@ -22,8 +22,11 @@ import org.apache.seatunnel.api.sink.SinkWriter;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.connectors.seatunnel.console.state.ConsoleState;
 
+import com.google.auto.service.AutoService;
+
 import java.util.List;
 
+@AutoService(SeaTunnelSink.class)
 public class ConsoleSink implements SeaTunnelSink<SeaTunnelRow, ConsoleState, 
ConsoleCommitInfo, ConsoleAggregatedCommitInfo> {
 
     @Override
diff --git 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/resources/META-INF/services/org.apache.seatunnel.api.sink.SeaTunnelSink
 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/resources/META-INF/services/org.apache.seatunnel.api.sink.SeaTunnelSink
deleted file mode 100644
index 12b49983..00000000
--- 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/resources/META-INF/services/org.apache.seatunnel.api.sink.SeaTunnelSink
+++ /dev/null
@@ -1,17 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSink
\ No newline at end of file
diff --git 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
index 9f18668b..9c12e4da 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
+++ 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.connectors.seatunnel.fake.source;
 
+import org.apache.seatunnel.api.serialization.DefaultSerializer;
 import org.apache.seatunnel.api.serialization.Serializer;
 import org.apache.seatunnel.api.source.Boundedness;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
@@ -25,6 +26,9 @@ import org.apache.seatunnel.api.source.SourceSplitEnumerator;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.connectors.seatunnel.fake.state.FakeState;
 
+import com.google.auto.service.AutoService;
+
+@AutoService(SeaTunnelSource.class)
 public class FakeSource implements SeaTunnelSource<SeaTunnelRow, 
FakeSourceSplit, FakeState> {
 
     @Override
@@ -39,7 +43,7 @@ public class FakeSource implements 
SeaTunnelSource<SeaTunnelRow, FakeSourceSplit
 
     @Override
     public Serializer<FakeSourceSplit> getSplitSerializer() {
-        return new ObjectSerializer<>();
+        return new DefaultSerializer<>();
     }
 
     @Override
@@ -57,6 +61,6 @@ public class FakeSource implements 
SeaTunnelSource<SeaTunnelRow, FakeSourceSplit
 
     @Override
     public Serializer<FakeState> getEnumeratorStateSerializer() {
-        return new ObjectSerializer<>();
+        return new DefaultSerializer<>();
     }
 }
diff --git 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java
 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java
index bf2cb2f6..e6bdd0d4 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java
+++ 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.connectors.seatunnel.fake.source;
 
+import org.apache.seatunnel.api.source.Boundedness;
 import org.apache.seatunnel.api.source.Collector;
 import org.apache.seatunnel.api.source.SourceReader;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
@@ -57,7 +58,6 @@ public class FakeSourceReader implements 
SourceReader<SeaTunnelRow, FakeSourceSp
     @Override
     @SuppressWarnings("magicnumber")
     public void pollNext(Collector<SeaTunnelRow> output) throws 
InterruptedException {
-        Thread.sleep(1000L);
         int i = random.nextInt(names.length);
         Map<String, Object> fieldMap = new HashMap<>(4);
         fieldMap.put("name", names[i]);
@@ -65,6 +65,11 @@ public class FakeSourceReader implements 
SourceReader<SeaTunnelRow, FakeSourceSp
         fieldMap.put("timestamp", System.currentTimeMillis());
         SeaTunnelRow seaTunnelRow = new SeaTunnelRow(new Object[]{names[i], 
ages[i], System.currentTimeMillis()}, fieldMap);
         output.collect(seaTunnelRow);
+        if (Boundedness.BOUNDED.equals(context.getBoundedness())) {
+            // signal to the source that we have reached the end of the data.
+            context.signalNoMoreElement();
+        }
+        Thread.sleep(1000L);
     }
 
     @Override
diff --git 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/ObjectSerializer.java
 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/ObjectSerializer.java
deleted file mode 100644
index bd53b4a2..00000000
--- 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/ObjectSerializer.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.fake.source;
-
-import org.apache.seatunnel.api.serialization.Serializer;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-
-public class ObjectSerializer<T> implements Serializer<T> {
-
-    @Override
-    public byte[] serialize(T obj) throws IOException {
-        try (ByteArrayOutputStream byteArrayOutputStream = new 
ByteArrayOutputStream();
-             ObjectOutputStream objectOutputStream = new 
ObjectOutputStream(byteArrayOutputStream)) {
-            objectOutputStream.writeObject(obj);
-            return byteArrayOutputStream.toByteArray();
-        }
-    }
-
-    @Override
-    public T deserialize(byte[] serialized) throws IOException {
-        try {
-            return (T) new ObjectInputStream(new 
ByteArrayInputStream(serialized)).readObject();
-        } catch (ClassNotFoundException e) {
-            throw new RuntimeException("deserialize split error", e);
-        }
-    }
-}
diff --git 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/resources/META-INF/services/org.apache.seatunnel.api.source.SeaTunnelSource
 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/resources/META-INF/services/org.apache.seatunnel.api.source.SeaTunnelSource
deleted file mode 100644
index b21a0581..00000000
--- 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/resources/META-INF/services/org.apache.seatunnel.api.source.SeaTunnelSource
+++ /dev/null
@@ -1,17 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-org.apache.seatunnel.connectors.seatunnel.fake.source.FakeSource
\ No newline at end of file
diff --git a/seatunnel-dist/release-docs/LICENSE 
b/seatunnel-dist/release-docs/LICENSE
index 636eb0d1..0c696160 100644
--- a/seatunnel-dist/release-docs/LICENSE
+++ b/seatunnel-dist/release-docs/LICENSE
@@ -755,9 +755,6 @@ The text of each license is the standard Apache 2.0 license.
      (The Apache Software License, Version 2.0) Google HTTP Client Library for 
Java (com.google.http-client:google-http-client:1.26.0 - 
https://github.com/googleapis/google-http-java-client/google-http-client)
      (The Apache Software License, Version 2.0) Google OAuth Client Library 
for Java (com.google.oauth-client:google-oauth-client:1.26.0 - 
https://github.com/googleapis/google-oauth-java-client/google-oauth-client)
      (The Apache Software License, Version 2.0) Gson 
(com.google.code.gson:gson:2.2.4 - http://code.google.com/p/google-gson/)
-     (The Apache Software License, Version 2.0) Guava: Google Core Libraries 
for Java (com.google.guava:guava:11.0.2 - 
http://code.google.com/p/guava-libraries/guava)
-     (The Apache Software License, Version 2.0) Guava: Google Core Libraries 
for Java (com.google.guava:guava:13.0.1 - 
http://code.google.com/p/guava-libraries/guava)
-     (The Apache Software License, Version 2.0) Guava: Google Core Libraries 
for Java (com.google.guava:guava:16.0.1 - 
http://code.google.com/p/guava-libraries/guava)
      (The Apache Software License, Version 2.0) Guava: Google Core Libraries 
for Java (com.google.guava:guava:19.0 - https://github.com/google/guava/guava)
      (The Apache Software License, Version 2.0) HPPC Collections 
(com.carrotsearch:hppc:0.7.1 - http://labs.carrotsearch.com/hppc.html/hppc)
      (The Apache Software License, Version 2.0) HPPC Collections 
(com.carrotsearch:hppc:0.7.2 - http://labs.carrotsearch.com/hppc.html/hppc)
diff --git a/seatunnel-dist/src/main/assembly/assembly-bin.xml 
b/seatunnel-dist/src/main/assembly/assembly-bin.xml
index aaa25e23..aa5380bc 100644
--- a/seatunnel-dist/src/main/assembly/assembly-bin.xml
+++ b/seatunnel-dist/src/main/assembly/assembly-bin.xml
@@ -108,6 +108,16 @@
             </excludes>
             <outputDirectory>/connectors/spark</outputDirectory>
         </fileSet>
+        <fileSet>
+            
<directory>../seatunnel-connectors/seatunnel-connectors-seatunnel-dist/target/lib</directory>
+            <includes>
+                <include>seatunnel-connector-seatunnel*.jar</include>
+            </includes>
+            <excludes>
+                <exclude>%regex[.*((javadoc)|(sources))\.jar]</exclude>
+            </excludes>
+            <outputDirectory>/connectors/seatunnel</outputDirectory>
+        </fileSet>
         <fileSet>
             <directory>../seatunnel-connectors</directory>
             <includes>
diff --git 
a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelReaderContext.java
 
b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelReaderContext.java
index 0535645f..b2667ea8 100644
--- 
a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelReaderContext.java
+++ 
b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelReaderContext.java
@@ -17,17 +17,20 @@
 
 package org.apache.seatunnel.translation.source;
 
+import org.apache.seatunnel.api.source.Boundedness;
 import org.apache.seatunnel.api.source.SourceEvent;
 import org.apache.seatunnel.api.source.SourceReader;
 
 public class ParallelReaderContext implements SourceReader.Context {
 
     protected final ParallelSource<?, ?, ?> parallelSource;
+    protected final Boundedness boundedness;
     protected final Integer subtaskId;
 
     public ParallelReaderContext(ParallelSource<?, ?, ?> parallelSource,
                                  Integer subtaskId) {
         this.parallelSource = parallelSource;
+        this.boundedness = parallelSource.source.getBoundedness();
         this.subtaskId = subtaskId;
     }
 
@@ -36,8 +39,14 @@ public class ParallelReaderContext implements 
SourceReader.Context {
         return subtaskId;
     }
 
+    @Override
+    public Boundedness getBoundedness() {
+        return boundedness;
+    }
+
     @Override
     public void signalNoMoreElement() {
+        // todo: if we have multiple subtasks, we need to know if all subtask 
is stopped
         parallelSource.handleNoMoreElement();
     }
 
diff --git 
a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelSource.java
 
b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelSource.java
index af67eb66..59c51eb4 100644
--- 
a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelSource.java
+++ 
b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelSource.java
@@ -30,10 +30,13 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
-import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
 public class ParallelSource<T, SplitT extends SourceSplit, StateT> implements 
AutoCloseable, CheckpointListener {
 
+    private final long splitEnumeratorTimeInterval = 5L;
+
     protected final SeaTunnelSource<T, SplitT, StateT> source;
     protected final ParallelEnumeratorContext<SplitT> 
parallelEnumeratorContext;
     protected final ParallelReaderContext readerContext;
@@ -47,7 +50,7 @@ public class ParallelSource<T, SplitT extends SourceSplit, 
StateT> implements Au
 
     protected transient volatile SourceSplitEnumerator<SplitT, StateT> 
splitEnumerator;
     protected transient volatile SourceReader<T, SplitT> reader;
-    protected transient volatile ExecutorService executorService;
+    protected transient volatile ScheduledThreadPoolExecutor executorService;
 
     /**
      * Flag indicating whether the consumer is still running.
@@ -99,7 +102,7 @@ public class ParallelSource<T, SplitT extends SourceSplit, 
StateT> implements Au
     }
 
     public void run(Collector<T> collector) throws Exception {
-        executorService.execute(() -> splitEnumerator.run());
+        executorService.scheduleAtFixedRate(() -> splitEnumerator.run(), 0L, 
splitEnumeratorTimeInterval, TimeUnit.SECONDS);
         while (running) {
             reader.pollNext(collector);
         }
diff --git 
a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/util/ThreadPoolExecutorFactory.java
 
b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/util/ThreadPoolExecutorFactory.java
index cd5dffa6..7ce81a8f 100644
--- 
a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/util/ThreadPoolExecutorFactory.java
+++ 
b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/util/ThreadPoolExecutorFactory.java
@@ -28,6 +28,7 @@ public class ThreadPoolExecutorFactory {
         AtomicInteger cnt = new AtomicInteger(0);
         return new ScheduledThreadPoolExecutor(corePoolSize, runnable -> {
             Thread thread = new Thread(runnable);
+            thread.setDaemon(true);
             thread.setName(name + "-" + cnt.incrementAndGet());
             return thread;
         });
diff --git a/tools/dependencies/known-dependencies.txt 
b/tools/dependencies/known-dependencies.txt
index 9d9b4b7b..0b2f9822 100755
--- a/tools/dependencies/known-dependencies.txt
+++ b/tools/dependencies/known-dependencies.txt
@@ -188,8 +188,6 @@ google-http-client-1.26.0.jar
 google-http-client-jackson2-1.26.0.jar
 google-oauth-client-1.26.0.jar
 gson-2.2.4.jar
-guava-13.0.1.jar
-guava-16.0.1.jar
 guava-19.0.jar
 guice-3.0.jar
 guice-4.1.0.jar

Reply via email to