je-ik commented on code in PR #30905:
URL: https://github.com/apache/beam/pull/30905#discussion_r1562135025


##########
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/bounded/FlinkBoundedSourceReader.java:
##########
@@ -61,8 +55,6 @@
  */
 public class FlinkBoundedSourceReader<T> extends FlinkSourceReaderBase<T, 
WindowedValue<T>> {
   private static final Logger LOG = 
LoggerFactory.getLogger(FlinkBoundedSourceReader.class);
-  private static final VarLongCoder LONG_CODER = VarLongCoder.of();
-  private final Map<Integer, Long> consumedFromSplit = new HashMap<>();

Review Comment:
   This is related, because this was introduced in an attempt to fix exactly 
this issue of avoiding re-emission of elements from Impulse. Unfortunately, 
this is not working as expected and - more seriously - can cause data-loss for 
sources that are not 'stable' - e.g. NoSQL databases, where the data in a split 
can change between restarts, which might cause skipping an element that was 
always present (but changed its ordered position).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to