TyrantLucifer commented on code in PR #5100:
URL: https://github.com/apache/seatunnel/pull/5100#discussion_r1413563310
##########
seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazonDynamoDBSourceReader.java:
##########
@@ -83,16 +84,24 @@ public void close() {
@Override
@SuppressWarnings("magicnumber")
- public void pollNext(Collector<SeaTunnelRow> output) {
- while (!pendingSplits.isEmpty()) {
- synchronized (output.getCheckpointLock()) {
- AmazonDynamoDBSourceSplit split = pendingSplits.poll();
+ public void pollNext(Collector<SeaTunnelRow> output) throws
InterruptedException {
+ synchronized (output.getCheckpointLock()) {
+ AmazonDynamoDBSourceSplit split = pendingSplits.poll();
+ if (split == null) {
+ log.info(
+ "AmazonDynamoDB Source Reader [{}] waiting for splits",
+ context.getIndexOfSubtask());
+ if (noMoreSplit) {
+ // signal to the source that we have reached the end of
the data.
+ log.info("Closed the bounded amazonDynamodb source");
+ context.signalNoMoreElement();
+ Thread.sleep(2000L);
Review Comment:
avoid read data duplicated
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]