This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch api-draft
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/api-draft by this push:
new 3abf728a Add boundedness to source reader context, so that we can
control to stop this reader when produce data (#1885)
3abf728a is described below
commit 3abf728a77299dafe221bceb5bd5a43d5163ca59
Author: Wenjun Ruan <[email protected]>
AuthorDate: Mon May 16 11:17:14 2022 +0800
Add boundedness to source reader context, so that we can control to stop
this reader when produce data (#1885)
---
pom.xml | 43 +++++++++++++++-----
seatunnel-api/pom.xml | 4 --
.../apache/seatunnel/api/source/Boundedness.java | 4 ++
.../org/apache/seatunnel/api/source/Collector.java | 5 +++
.../seatunnel/api/source/SeaTunnelSource.java | 39 +++++++++++++++++-
.../apache/seatunnel/api/source/SourceEvent.java | 2 +-
.../apache/seatunnel/api/source/SourceReader.java | 39 +++++++++++++++++-
.../api/source/SourceSplitEnumerator.java | 12 ++++++
.../java/org/apache/seatunnel/api/state/State.java | 4 +-
.../seatunnel/api/table/factory/Factory.java | 3 ++
.../api/table/factory/TableSinkFactory.java | 9 +++++
.../api/table/factory/TableSourceFactory.java | 4 ++
.../seatunnel/common/utils/SerializationUtils.java | 8 ++--
.../seatunnel/console/sink/ConsoleSink.java | 3 ++
.../org.apache.seatunnel.api.sink.SeaTunnelSink | 17 --------
.../seatunnel/fake/source/FakeSource.java | 8 +++-
.../seatunnel/fake/source/FakeSourceReader.java | 7 +++-
.../seatunnel/fake/source/ObjectSerializer.java | 47 ----------------------
...org.apache.seatunnel.api.source.SeaTunnelSource | 17 --------
seatunnel-dist/release-docs/LICENSE | 3 --
seatunnel-dist/src/main/assembly/assembly-bin.xml | 10 +++++
.../translation/source/ParallelReaderContext.java | 9 +++++
.../translation/source/ParallelSource.java | 9 +++--
.../util/ThreadPoolExecutorFactory.java | 1 +
tools/dependencies/known-dependencies.txt | 2 -
25 files changed, 195 insertions(+), 114 deletions(-)
diff --git a/pom.xml b/pom.xml
index 893a8bd3..7880a2ae 100644
--- a/pom.xml
+++ b/pom.xml
@@ -174,6 +174,8 @@
<skipIT>true</skipIT>
<elasticsearch>7</elasticsearch>
<slf4j.version>1.7.25</slf4j.version>
+ <guava.version>19.0</guava.version>
+ <auto-service.version>1.0.1</auto-service.version>
</properties>
<dependencyManagement>
@@ -607,20 +609,43 @@
<version>${scala.version}</version>
</dependency>
- <!-- logging -->
<dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-api</artifactId>
- <version>${slf4j.version}</version>
- </dependency>
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- <version>${slf4j.version}</version>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>${guava.version}</version>
</dependency>
</dependencies>
</dependencyManagement>
+ <dependencies>
+ <dependency>
+ <groupId>com.google.auto.service</groupId>
+ <artifactId>auto-service</artifactId>
+ <version>${auto-service.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.projectlombok</groupId>
+ <artifactId>lombok</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <version>${slf4j.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ <version>${slf4j.version}</version>
+ </dependency>
+ </dependencies>
+
<build>
<finalName>${project.artifactId}-${project.version}-${scala.version}</finalName>
diff --git a/seatunnel-api/pom.xml b/seatunnel-api/pom.xml
index f3efb218..7b6829a0 100644
--- a/seatunnel-api/pom.xml
+++ b/seatunnel-api/pom.xml
@@ -38,9 +38,5 @@
<artifactId>seatunnel-common</artifactId>
<version>${project.version}</version>
</dependency>
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-api</artifactId>
- </dependency>
</dependencies>
</project>
\ No newline at end of file
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/Boundedness.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/Boundedness.java
index bceb3cdb..c7b3fcd9 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/Boundedness.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/Boundedness.java
@@ -17,6 +17,10 @@
package org.apache.seatunnel.api.source;
+/**
+ * Used to define the boundedness of a source. In batch mode, the source is
{@link Boundedness#BOUNDED}.
+ * In streaming mode, the source is {@link Boundedness#UNBOUNDED}.
+ */
public enum Boundedness {
/**
* A BOUNDED stream is a stream with finite records.
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/Collector.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/Collector.java
index f36a7f87..6bd08eec 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/Collector.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/Collector.java
@@ -17,6 +17,11 @@
package org.apache.seatunnel.api.source;
+/**
+ * A {@link Collector} is used to collect data from {@link SourceReader}.
+ *
+ * @param <T> data type.
+ */
public interface Collector<T> {
void collect(T record);
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelSource.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelSource.java
index 729753fc..7244891a 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelSource.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelSource.java
@@ -27,6 +27,7 @@ import java.io.Serializable;
*
* @param <T> The type of records produced by the source.
* @param <SplitT> The type of splits handled by the source.
+ * @param <StateT> The type of checkpoint states.
*/
public interface SeaTunnelSource<T, SplitT extends SourceSplit, StateT>
extends Serializable {
@@ -37,14 +38,48 @@ public interface SeaTunnelSource<T, SplitT extends
SourceSplit, StateT> extends
*/
Boundedness getBoundedness();
+ /**
+ * Create source reader, used to produce data.
+ *
+ * @param readerContext reader context.
+ * @return source reader.
+ * @throws Exception when create reader failed.
+ */
SourceReader<T, SplitT> createReader(SourceReader.Context readerContext)
throws Exception;
+ /**
+ * Create split serializer, use to serialize/deserialize split generated
by {@link SourceSplitEnumerator}.
+ *
+ * @return split serializer.
+ */
Serializer<SplitT> getSplitSerializer();
- SourceSplitEnumerator<SplitT, StateT>
createEnumerator(SourceSplitEnumerator.Context<SplitT> enumeratorContext)
throws Exception;
+ /**
+ * Create source split enumerator, used to generate splits. This method
will be called only once when start a source.
+ *
+ * @param enumeratorContext enumerator context.
+ * @return source split enumerator.
+ * @throws Exception when create enumerator failed.
+ */
+ SourceSplitEnumerator<SplitT, StateT>
createEnumerator(SourceSplitEnumerator.Context<SplitT> enumeratorContext)
+ throws Exception;
- SourceSplitEnumerator<SplitT, StateT>
restoreEnumerator(SourceSplitEnumerator.Context<SplitT> enumeratorContext,
StateT checkpointState) throws Exception;
+ /**
+ * Create source split enumerator, used to generate splits. This method
will be called when restore from checkpoint.
+ *
+ * @param enumeratorContext enumerator context.
+ * @param checkpointState checkpoint state.
+ * @return source split enumerator.
+ * @throws Exception when create enumerator failed.
+ */
+ SourceSplitEnumerator<SplitT, StateT>
restoreEnumerator(SourceSplitEnumerator.Context<SplitT> enumeratorContext,
+ StateT
checkpointState) throws Exception;
+ /**
+ * Create enumerator state serializer, used to serialize/deserialize
checkpoint state.
+ *
+ * @return enumerator state serializer.
+ */
Serializer<StateT> getEnumeratorStateSerializer();
}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceEvent.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceEvent.java
index 4d2374b4..2f4558fb 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceEvent.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceEvent.java
@@ -20,7 +20,7 @@ package org.apache.seatunnel.api.source;
import java.io.Serializable;
/**
- * An base class for the events passed between the SourceReaders and
Enumerators.
+ * A base class for the events passed between the {@link SourceReader} and
{@link SourceSplitEnumerator}.
*/
public interface SourceEvent extends Serializable {
}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceReader.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceReader.java
index 8131e835..f43b5a55 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceReader.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceReader.java
@@ -22,8 +22,17 @@ import org.apache.seatunnel.api.state.CheckpointListener;
import java.io.IOException;
import java.util.List;
+/**
+ * The {@link SourceReader} is used to generate source record, and it will be
running at worker.
+ *
+ * @param <T> record type.
+ * @param <SplitT> source split type.
+ */
public interface SourceReader<T, SplitT extends SourceSplit> extends
AutoCloseable, CheckpointListener {
+ /**
+ * Open the source reader.
+ */
void open();
/**
@@ -33,10 +42,28 @@ public interface SourceReader<T, SplitT extends
SourceSplit> extends AutoCloseab
@Override
void close() throws IOException;
+ /**
+ * Generate the next batch of records.
+ *
+ * @param output output collector.
+ * @throws Exception if error occurs.
+ */
void pollNext(Collector<T> output) throws Exception;
+ /**
+ * Get the current split checkpoint state by checkpointId.
+ *
+ * @param checkpointId checkpoint Id.
+ * @return split checkpoint state.
+ * @throws Exception if error occurs.
+ */
List<SplitT> snapshotState(long checkpointId) throws Exception;
+ /**
+ * Add the split checkpoint state to reader.
+ *
+ * @param splits split checkpoint state.
+ */
void addSplits(List<SplitT> splits);
/**
@@ -48,6 +75,11 @@ public interface SourceReader<T, SplitT extends SourceSplit>
extends AutoCloseab
*/
void handleNoMoreSplits();
+ /**
+ * Handle the source event form {@link SourceSplitEnumerator}.
+ *
+ * @param sourceEvent source event.
+ */
default void handleSourceEvent(SourceEvent sourceEvent) {
}
@@ -59,7 +91,12 @@ public interface SourceReader<T, SplitT extends SourceSplit>
extends AutoCloseab
int getIndexOfSubtask();
/**
- * Indicator that the input has reached the end of data.
+ * @return boundedness of this reader.
+ */
+ Boundedness getBoundedness();
+
+ /**
+ * Indicator that the input has reached the end of data. Then will
cancel this reader.
*/
void signalNoMoreElement();
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceSplitEnumerator.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceSplitEnumerator.java
index 77e75c8f..4ad8df6a 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceSplitEnumerator.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceSplitEnumerator.java
@@ -24,6 +24,12 @@ import java.util.Collections;
import java.util.List;
import java.util.Set;
+/**
+ * The {@link SourceSplitEnumerator} is responsible for enumerating the splits
of a source. It will run at master.
+ *
+ * @param <SplitT> source split type
+ * @param <StateT>source split state type
+ */
public interface SourceSplitEnumerator<SplitT extends SourceSplit, StateT>
extends AutoCloseable, CheckpointListener {
void open();
@@ -52,6 +58,12 @@ public interface SourceSplitEnumerator<SplitT extends
SourceSplit, StateT> exten
StateT snapshotState(long checkpointId) throws Exception;
+ /**
+ * Handle the source event from {@link SourceReader}.
+ *
+ * @param subtaskId The id of the subtask to which the source event from.
+ * @param sourceEvent source event.
+ */
default void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/state/State.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/state/State.java
index d7e43d44..4384100b 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/state/State.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/state/State.java
@@ -17,5 +17,7 @@
package org.apache.seatunnel.api.state;
-public interface State {
+import java.io.Serializable;
+
+public interface State extends Serializable {
}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/Factory.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/Factory.java
index 59b0fd51..cd35609b 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/Factory.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/Factory.java
@@ -17,6 +17,9 @@
package org.apache.seatunnel.api.table.factory;
+/**
+ * This is the SPI interface.
+ */
public interface Factory {
/**
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSinkFactory.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSinkFactory.java
index bdc3d1a6..8d941885 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSinkFactory.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSinkFactory.java
@@ -19,6 +19,15 @@ package org.apache.seatunnel.api.table.factory;
import org.apache.seatunnel.api.table.connector.TableSink;
+/**
+ * This is an SPI interface, used to create {@link TableSink}. Each plugin
need to have it own implementation.
+ * todo: now we have not use this interface, we directly use {@link
org.apache.seatunnel.api.sink.SeaTunnelSink} as the SPI interface.
+ *
+ * @param <IN> row type
+ * @param <StateT> state type
+ * @param <CommitInfoT> commit info type
+ * @param <AggregatedCommitInfoT> aggregated commit info type
+ */
public interface TableSinkFactory<IN, StateT, CommitInfoT,
AggregatedCommitInfoT> extends Factory {
TableSink<IN, StateT, CommitInfoT, AggregatedCommitInfoT>
createSink(TableFactoryContext context);
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSourceFactory.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSourceFactory.java
index 2206a6bb..a75236e0 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSourceFactory.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSourceFactory.java
@@ -20,6 +20,10 @@ package org.apache.seatunnel.api.table.factory;
import org.apache.seatunnel.api.source.SourceSplit;
import org.apache.seatunnel.api.table.connector.TableSource;
+/**
+ * This is an SPI interface, used to create {@link TableSource}. Each plugin
need to have it own implementation.
+ * todo: now we have not use this interface, we directly use {@link
org.apache.seatunnel.api.source.SeaTunnelSource} as the SPI interface
+ */
public interface TableSourceFactory extends Factory {
<T, SplitT extends SourceSplit, StateT> TableSource<T, SplitT, StateT>
createSource(TableFactoryContext context);
diff --git
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/SerializationUtils.java
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/SerializationUtils.java
index 43dead16..a1d0aa96 100644
---
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/SerializationUtils.java
+++
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/SerializationUtils.java
@@ -26,24 +26,24 @@ public class SerializationUtils {
public static String objectToString(Serializable obj) {
if (obj != null) {
- return
Base64.encodeBase64String(org.apache.commons.lang3.SerializationUtils.serialize(obj));
+ return
Base64.encodeBase64String(SerializationUtils.serialize(obj));
}
return null;
}
public static <T extends Serializable> T stringToObject(String str) {
if (StringUtils.isNotEmpty(str)) {
- return
org.apache.commons.lang3.SerializationUtils.deserialize(Base64.decodeBase64(str));
+ return SerializationUtils.deserialize(Base64.decodeBase64(str));
}
return null;
}
public static <T extends Serializable> byte[] serialize(T obj) {
- return org.apache.commons.lang3.SerializationUtils.serialize(obj);
+ return SerializationUtils.serialize(obj);
}
public static <T extends Serializable> T deserialize(byte[] bytes) {
- return org.apache.commons.lang3.SerializationUtils.deserialize(bytes);
+ return SerializationUtils.deserialize(bytes);
}
}
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSink.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSink.java
index d7eab44b..ea7f69cb 100644
---
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSink.java
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSink.java
@@ -22,8 +22,11 @@ import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.connectors.seatunnel.console.state.ConsoleState;
+import com.google.auto.service.AutoService;
+
import java.util.List;
+@AutoService(SeaTunnelSink.class)
public class ConsoleSink implements SeaTunnelSink<SeaTunnelRow, ConsoleState,
ConsoleCommitInfo, ConsoleAggregatedCommitInfo> {
@Override
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/resources/META-INF/services/org.apache.seatunnel.api.sink.SeaTunnelSink
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/resources/META-INF/services/org.apache.seatunnel.api.sink.SeaTunnelSink
deleted file mode 100644
index 12b49983..00000000
---
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/resources/META-INF/services/org.apache.seatunnel.api.sink.SeaTunnelSink
+++ /dev/null
@@ -1,17 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSink
\ No newline at end of file
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
index 9f18668b..9c12e4da 100644
---
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.connectors.seatunnel.fake.source;
+import org.apache.seatunnel.api.serialization.DefaultSerializer;
import org.apache.seatunnel.api.serialization.Serializer;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
@@ -25,6 +26,9 @@ import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.connectors.seatunnel.fake.state.FakeState;
+import com.google.auto.service.AutoService;
+
+@AutoService(SeaTunnelSource.class)
public class FakeSource implements SeaTunnelSource<SeaTunnelRow,
FakeSourceSplit, FakeState> {
@Override
@@ -39,7 +43,7 @@ public class FakeSource implements
SeaTunnelSource<SeaTunnelRow, FakeSourceSplit
@Override
public Serializer<FakeSourceSplit> getSplitSerializer() {
- return new ObjectSerializer<>();
+ return new DefaultSerializer<>();
}
@Override
@@ -57,6 +61,6 @@ public class FakeSource implements
SeaTunnelSource<SeaTunnelRow, FakeSourceSplit
@Override
public Serializer<FakeState> getEnumeratorStateSerializer() {
- return new ObjectSerializer<>();
+ return new DefaultSerializer<>();
}
}
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java
index bf2cb2f6..e6bdd0d4 100644
---
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.connectors.seatunnel.fake.source;
+import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
@@ -57,7 +58,6 @@ public class FakeSourceReader implements
SourceReader<SeaTunnelRow, FakeSourceSp
@Override
@SuppressWarnings("magicnumber")
public void pollNext(Collector<SeaTunnelRow> output) throws
InterruptedException {
- Thread.sleep(1000L);
int i = random.nextInt(names.length);
Map<String, Object> fieldMap = new HashMap<>(4);
fieldMap.put("name", names[i]);
@@ -65,6 +65,11 @@ public class FakeSourceReader implements
SourceReader<SeaTunnelRow, FakeSourceSp
fieldMap.put("timestamp", System.currentTimeMillis());
SeaTunnelRow seaTunnelRow = new SeaTunnelRow(new Object[]{names[i],
ages[i], System.currentTimeMillis()}, fieldMap);
output.collect(seaTunnelRow);
+ if (Boundedness.BOUNDED.equals(context.getBoundedness())) {
+ // signal to the source that we have reached the end of the data.
+ context.signalNoMoreElement();
+ }
+ Thread.sleep(1000L);
}
@Override
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/ObjectSerializer.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/ObjectSerializer.java
deleted file mode 100644
index bd53b4a2..00000000
---
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/ObjectSerializer.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.fake.source;
-
-import org.apache.seatunnel.api.serialization.Serializer;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-
-public class ObjectSerializer<T> implements Serializer<T> {
-
- @Override
- public byte[] serialize(T obj) throws IOException {
- try (ByteArrayOutputStream byteArrayOutputStream = new
ByteArrayOutputStream();
- ObjectOutputStream objectOutputStream = new
ObjectOutputStream(byteArrayOutputStream)) {
- objectOutputStream.writeObject(obj);
- return byteArrayOutputStream.toByteArray();
- }
- }
-
- @Override
- public T deserialize(byte[] serialized) throws IOException {
- try {
- return (T) new ObjectInputStream(new
ByteArrayInputStream(serialized)).readObject();
- } catch (ClassNotFoundException e) {
- throw new RuntimeException("deserialize split error", e);
- }
- }
-}
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/resources/META-INF/services/org.apache.seatunnel.api.source.SeaTunnelSource
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/resources/META-INF/services/org.apache.seatunnel.api.source.SeaTunnelSource
deleted file mode 100644
index b21a0581..00000000
---
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/resources/META-INF/services/org.apache.seatunnel.api.source.SeaTunnelSource
+++ /dev/null
@@ -1,17 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-org.apache.seatunnel.connectors.seatunnel.fake.source.FakeSource
\ No newline at end of file
diff --git a/seatunnel-dist/release-docs/LICENSE
b/seatunnel-dist/release-docs/LICENSE
index 636eb0d1..0c696160 100644
--- a/seatunnel-dist/release-docs/LICENSE
+++ b/seatunnel-dist/release-docs/LICENSE
@@ -755,9 +755,6 @@ The text of each license is the standard Apache 2.0 license.
(The Apache Software License, Version 2.0) Google HTTP Client Library for
Java (com.google.http-client:google-http-client:1.26.0 -
https://github.com/googleapis/google-http-java-client/google-http-client)
(The Apache Software License, Version 2.0) Google OAuth Client Library
for Java (com.google.oauth-client:google-oauth-client:1.26.0 -
https://github.com/googleapis/google-oauth-java-client/google-oauth-client)
(The Apache Software License, Version 2.0) Gson
(com.google.code.gson:gson:2.2.4 - http://code.google.com/p/google-gson/)
- (The Apache Software License, Version 2.0) Guava: Google Core Libraries
for Java (com.google.guava:guava:11.0.2 -
http://code.google.com/p/guava-libraries/guava)
- (The Apache Software License, Version 2.0) Guava: Google Core Libraries
for Java (com.google.guava:guava:13.0.1 -
http://code.google.com/p/guava-libraries/guava)
- (The Apache Software License, Version 2.0) Guava: Google Core Libraries
for Java (com.google.guava:guava:16.0.1 -
http://code.google.com/p/guava-libraries/guava)
(The Apache Software License, Version 2.0) Guava: Google Core Libraries
for Java (com.google.guava:guava:19.0 - https://github.com/google/guava/guava)
(The Apache Software License, Version 2.0) HPPC Collections
(com.carrotsearch:hppc:0.7.1 - http://labs.carrotsearch.com/hppc.html/hppc)
(The Apache Software License, Version 2.0) HPPC Collections
(com.carrotsearch:hppc:0.7.2 - http://labs.carrotsearch.com/hppc.html/hppc)
diff --git a/seatunnel-dist/src/main/assembly/assembly-bin.xml
b/seatunnel-dist/src/main/assembly/assembly-bin.xml
index aaa25e23..aa5380bc 100644
--- a/seatunnel-dist/src/main/assembly/assembly-bin.xml
+++ b/seatunnel-dist/src/main/assembly/assembly-bin.xml
@@ -108,6 +108,16 @@
</excludes>
<outputDirectory>/connectors/spark</outputDirectory>
</fileSet>
+ <fileSet>
+
<directory>../seatunnel-connectors/seatunnel-connectors-seatunnel-dist/target/lib</directory>
+ <includes>
+ <include>seatunnel-connector-seatunnel*.jar</include>
+ </includes>
+ <excludes>
+ <exclude>%regex[.*((javadoc)|(sources))\.jar]</exclude>
+ </excludes>
+ <outputDirectory>/connectors/seatunnel</outputDirectory>
+ </fileSet>
<fileSet>
<directory>../seatunnel-connectors</directory>
<includes>
diff --git
a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelReaderContext.java
b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelReaderContext.java
index 0535645f..b2667ea8 100644
---
a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelReaderContext.java
+++
b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelReaderContext.java
@@ -17,17 +17,20 @@
package org.apache.seatunnel.translation.source;
+import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SourceEvent;
import org.apache.seatunnel.api.source.SourceReader;
public class ParallelReaderContext implements SourceReader.Context {
protected final ParallelSource<?, ?, ?> parallelSource;
+ protected final Boundedness boundedness;
protected final Integer subtaskId;
public ParallelReaderContext(ParallelSource<?, ?, ?> parallelSource,
Integer subtaskId) {
this.parallelSource = parallelSource;
+ this.boundedness = parallelSource.source.getBoundedness();
this.subtaskId = subtaskId;
}
@@ -36,8 +39,14 @@ public class ParallelReaderContext implements
SourceReader.Context {
return subtaskId;
}
+ @Override
+ public Boundedness getBoundedness() {
+ return boundedness;
+ }
+
@Override
public void signalNoMoreElement() {
+ // todo: if we have multiple subtasks, we need to know if all subtask
is stopped
parallelSource.handleNoMoreElement();
}
diff --git
a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelSource.java
b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelSource.java
index af67eb66..59c51eb4 100644
---
a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelSource.java
+++
b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelSource.java
@@ -30,10 +30,13 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
public class ParallelSource<T, SplitT extends SourceSplit, StateT> implements
AutoCloseable, CheckpointListener {
+ private final long splitEnumeratorTimeInterval = 5L;
+
protected final SeaTunnelSource<T, SplitT, StateT> source;
protected final ParallelEnumeratorContext<SplitT>
parallelEnumeratorContext;
protected final ParallelReaderContext readerContext;
@@ -47,7 +50,7 @@ public class ParallelSource<T, SplitT extends SourceSplit,
StateT> implements Au
protected transient volatile SourceSplitEnumerator<SplitT, StateT>
splitEnumerator;
protected transient volatile SourceReader<T, SplitT> reader;
- protected transient volatile ExecutorService executorService;
+ protected transient volatile ScheduledThreadPoolExecutor executorService;
/**
* Flag indicating whether the consumer is still running.
@@ -99,7 +102,7 @@ public class ParallelSource<T, SplitT extends SourceSplit,
StateT> implements Au
}
public void run(Collector<T> collector) throws Exception {
- executorService.execute(() -> splitEnumerator.run());
+ executorService.scheduleAtFixedRate(() -> splitEnumerator.run(), 0L,
splitEnumeratorTimeInterval, TimeUnit.SECONDS);
while (running) {
reader.pollNext(collector);
}
diff --git
a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/util/ThreadPoolExecutorFactory.java
b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/util/ThreadPoolExecutorFactory.java
index cd5dffa6..7ce81a8f 100644
---
a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/util/ThreadPoolExecutorFactory.java
+++
b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/util/ThreadPoolExecutorFactory.java
@@ -28,6 +28,7 @@ public class ThreadPoolExecutorFactory {
AtomicInteger cnt = new AtomicInteger(0);
return new ScheduledThreadPoolExecutor(corePoolSize, runnable -> {
Thread thread = new Thread(runnable);
+ thread.setDaemon(true);
thread.setName(name + "-" + cnt.incrementAndGet());
return thread;
});
diff --git a/tools/dependencies/known-dependencies.txt
b/tools/dependencies/known-dependencies.txt
index 9d9b4b7b..0b2f9822 100755
--- a/tools/dependencies/known-dependencies.txt
+++ b/tools/dependencies/known-dependencies.txt
@@ -188,8 +188,6 @@ google-http-client-1.26.0.jar
google-http-client-jackson2-1.26.0.jar
google-oauth-client-1.26.0.jar
gson-2.2.4.jar
-guava-13.0.1.jar
-guava-16.0.1.jar
guava-19.0.jar
guice-3.0.jar
guice-4.1.0.jar