hailin0 commented on code in PR #2974:
URL:
https://github.com/apache/incubator-seatunnel/pull/2974#discussion_r994087873
##########
seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java:
##########
@@ -19,24 +19,27 @@
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import
org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
-import
org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
import lombok.extern.slf4j.Slf4j;
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.LinkedList;
import java.util.List;
@Slf4j
-public class FakeSourceReader extends AbstractSingleSplitReader<SeaTunnelRow> {
-
- private final SingleSplitReaderContext context;
+public class FakeSourceReader implements SourceReader<SeaTunnelRow,
FakeSourceSplit> {
+ private final SourceReader.Context context;
+ private final Deque<FakeSourceSplit> splits = new LinkedList<>();
Review Comment:
```suggestion
private final Queue<FakeSourceSplit> splits = new LinkedList<>();
```
##########
seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java:
##########
@@ -52,16 +55,46 @@ public void close() {
@Override
@SuppressWarnings("magicnumber")
public void pollNext(Collector<SeaTunnelRow> output) throws
InterruptedException {
- // Generate a random number of rows to emit.
- List<SeaTunnelRow> seaTunnelRows =
fakeDataGenerator.generateFakedRows();
- for (SeaTunnelRow seaTunnelRow : seaTunnelRows) {
- output.collect(seaTunnelRow);
- }
- if (Boundedness.BOUNDED.equals(context.getBoundedness())) {
- // signal to the source that we have reached the end of the data.
- log.info("Closed the bounded fake source");
- context.signalNoMoreElement();
+ synchronized (output.getCheckpointLock()) {
+ FakeSourceSplit split = splits.poll();
+ if (null != split) {
+ // Generate a random number of rows to emit.
+ List<SeaTunnelRow> seaTunnelRows =
fakeDataGenerator.generateFakedRows();
+ for (SeaTunnelRow seaTunnelRow : seaTunnelRows) {
+ output.collect(seaTunnelRow);
+ }
+ } else {
+ if (noMoreSplit &&
Boundedness.BOUNDED.equals(context.getBoundedness())) {
+ // signal to the source that we have reached the end of
the data.
+ log.info("Closed the bounded fake source");
+ context.signalNoMoreElement();
+ }
+ if (!noMoreSplit) {
+ log.info("wait split!");
+ }
+ Thread.sleep(1000L);
+ }
+
}
Review Comment:
move sleep to synchronized block outside?
```suggestion
}
}
Thread.sleep(1000L);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]