Eliaaazzz commented on code in PR #38724:
URL: https://github.com/apache/beam/pull/38724#discussion_r3333026458


##########
sdks/python/apache_beam/io/unbounded_source_test.py:
##########
@@ -0,0 +1,1185 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Tests for apache_beam.io.unbounded_source.
+
+Strategy: checkpoint/resume/watermark/coder semantics are covered by
+deterministic unit tests (no pipeline, no wall clock). A single end-to-end
+DirectRunner test asserts only ordering + termination -- no defer-timing
+assertions, which would be flaky (cf. periodicsequence_test which skips
+processing-time tests for the same reason).
+"""
+
+# pytype: skip-file
+
+import gc
+import logging
+import os
+import tempfile
+import time
+import unittest
+
+import apache_beam as beam
+from apache_beam import coders
+from apache_beam.io import unbounded_source as _unbounded_source_module
+from apache_beam.io.unbounded_source import _NO_DATA
+from apache_beam.io.unbounded_source import CheckpointMark
+from apache_beam.io.unbounded_source import ReadFromUnboundedSource
+from apache_beam.io.unbounded_source import UnboundedReader
+from apache_beam.io.unbounded_source import UnboundedSource
+from apache_beam.io.unbounded_source import _set_watermark_if_greater
+from apache_beam.io.unbounded_source import _UnboundedSourceRestriction
+from apache_beam.io.unbounded_source import _UnboundedSourceRestrictionCoder
+from apache_beam.io.unbounded_source import _UnboundedSourceRestrictionProvider
+from apache_beam.io.unbounded_source import _UnboundedSourceRestrictionTracker
+from apache_beam.io.watermark_estimators import ManualWatermarkEstimator
+from apache_beam.runners import sdf_utils
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
+from apache_beam.transforms import core
+from apache_beam.transforms.window import FixedWindows
+from apache_beam.utils.timestamp import MAX_TIMESTAMP
+from apache_beam.utils.timestamp import MIN_TIMESTAMP
+from apache_beam.utils.timestamp import Timestamp
+
+# pylint: disable=expression-not-assigned
+
+# 
------------------------------------------------------------------------------
+# A tiny in-memory demo source: emits the integers 0..count-1, one per record,
+# with event time Timestamp(index). It self-terminates (watermark -> MAX after
+# the last record) so a pipeline reading it ends. Resumes from a checkpoint at
+# (last_index + 1).
+# 
------------------------------------------------------------------------------
+
+
+class _CountingCheckpointMark(CheckpointMark):
+  def __init__(self, last_index, finalize_log=None):
+    self.last_index = last_index
+    self._finalize_log = finalize_log
+
+  def finalize_checkpoint(self):
+    if self._finalize_log is not None:
+      self._finalize_log.append(self.last_index)
+
+  def __eq__(self, other):
+    return (
+        isinstance(other, _CountingCheckpointMark) and
+        other.last_index == self.last_index)
+
+  def __hash__(self):
+    return hash(self.last_index)
+
+  def __repr__(self):
+    return '_CountingCheckpointMark(last_index=%r)' % (self.last_index, )
+
+
+class _CountingReader(UnboundedReader):
+  def __init__(self, count, start_index, finalize_log=None):
+    self._count = count
+    self._next = start_index
+    self._current = None
+    self._exhausted = False
+    self._finalize_log = finalize_log
+    self.closed = False
+
+  def _read_next(self):
+    if self._next >= self._count:
+      self._exhausted = True
+      return False
+    self._current = self._next
+    self._next += 1
+    return True
+
+  def start(self):
+    return self._read_next()
+
+  def advance(self):
+    return self._read_next()
+
+  def get_current(self):
+    return self._current
+
+  def get_current_timestamp(self):
+    return Timestamp(self._current)

Review Comment:
   Sure.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to