The doc (which wasn't linked in https://s.apache.org/beam-design-docs, but is now) doesn't actually definitely that PeriodicImpulse is bounded or not. Only that conceptually it can trigger what would be a bounded read pre-wrapped in an appropriate fixed window.
This combined with ProcessContinuations implying unbounded, leads me to the conclusion that PeriodicImpulse must be producing Unbounded PCollections. On Thu, May 19, 2022, 4:44 PM Chamikara Jayalath <[email protected]> wrote: > > > On Thu, May 19, 2022 at 4:30 PM Yi Hu <[email protected]> wrote: > >> Dear Cham, >> >> Thanks for the feedbacks. Java PeriodicImpulse indeed returns an >> unbounded PColl. This can be verified by adding an assert >> `assertEquals(PCollection.IsBounded.UNBOUNDED, result.isBounded());` >> at the unit test here: >> >> https://github.com/apache/beam/blob/f3041e078643abe4f7608a7a11459f81b0d20b3f/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/PeriodicImpulseTest.java#L71 >> >> Then run: >> >> ./gradlew :runners:direct-java:needsRunnerTests --tests >> org.apache.beam.sdk.transforms.PeriodicImpulseTest >> >> The test passes. (If asserting BOUNDED, an assertion error happens: >> java.lang.AssertionError: expected:<BOUNDED> but was:<UNBOUNDED>) >> >> The unbounded pcoll is a result of return type of DoFn.ProcessElement >> here: >> >> https://github.com/apache/beam/blob/f3041e078643abe4f7608a7a11459f81b0d20b3f/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PeriodicSequence.java#L182 >> >> The behavior is documented here: >> >> https://github.com/apache/beam/blob/6390bcd265512f077c92124a551419ee0349c84c/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java#L877 >> "If ProcessElement returns DoFn.ProcessContinuation, assume it is >> unbounded." >> > > Potentially current Java behavior is incorrect ? Reading the original > document, it seems like the use-case for PeriodicImpulse was to support > reading a bounded side-input that is consistent within a given window. > > Thanks, > Cham > > >> >> Best, >> Yi >> >> On Thu, May 19, 2022 at 6:38 PM Chamikara Jayalath <[email protected]> >> wrote: >> >>> >>> >>> On Wed, May 18, 2022 at 12:32 PM Yi Hu <[email protected]> wrote: >>> >>>> Hi dev group, >>>> >>>> TL;DR: PeriodicImpluse transform in Java sdk generates unbounded >>>> PCollection; while in Python sdk it generates bounded PCollection. The >>>> latter case may cause issues in streaming. >>>> >>>> Thank you for your attention. There are periodic impulse transforms in >>>> both Java and Python sdk, implemented in a quite similar way. However, one >>>> difference is that Java PeriodicImpluse generates an *unbounded* >>>> PCollection of time series, while the PCollection generated by >>>> Python PeriodicImpluse is *bounded*. >>>> >>>> This difference has generated an issue when downstream PTransform >>>> contains a ReShuffle op in it. The GBK will hold inflow elements and fire >>>> it like a batch pipeline, regardless of the window settings. >>>> >>>> Here is a working example. There are no printed values in the console. >>>> But commenting out Reshuffle() the timestamps then get printed >>>> continuously. (Removing apply_windowing or streaming flags the behavior is >>>> the same). >>>> >>>> ``` >>>> import apache_beam as beam >>>> from apache_beam.options.pipeline_options import PipelineOptions >>>> from apache_beam.transforms.periodicsequence import PeriodicImpulse >>>> from apache_beam.transforms.util import Reshuffle >>>> >>>> >>>> if __name__ == '__main__': >>>> pipeline_options = PipelineOptions(flags=['--streaming']) >>>> >>>> with beam.Pipeline(options=pipeline_options) as p: >>>> result = ( >>>> p >>>> | PeriodicImpulse(fire_interval=1, apply_windowing=True) >>>> | Reshuffle() # commenting out here the timestamps get printed >>>> | beam.Map(print) >>>> ) >>>> >>>> ``` >>>> >>>> Having looking the background I see the design doc of BEAM-9650 ( >>>> https://docs.google.com/document/d/1LDY_CtsOJ8Y_zNv1QtkP6AGFrtzkj1q5EW_gSChOIvg/edit#heading=h.iluf68hiykcs) >>>> and learned that the Python implementation was originally designed in the >>>> context of slowly changing sources, which itself is "bounded" during short >>>> time (like days) but can change. On the other hand, In Java sdk, the >>>> unbounded PCollection is a result of ProcessContinuation return type of >>>> DoFn's ProcessElement method. In Python this is not set thus the output >>>> PTransform is inferred bounded. >>>> >>>> >>> That design doc I believe was both for Java and Python so there should >>> be behavior differences between Java and Python implementations. Seems like >>> Java implementation is bounded as well ? >>> >>> https://github.com/apache/beam/blob/e9cfd8e441017085c9e9064a4a8cdd3576e3da43/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PeriodicImpulse.java#L69 >>> >>> >>>> Now the question is, >>>> (1) should we change the python implementation to generates unbounded >>>> PCollection in alignment with Java >>>> (2) If yes (or no), should we introduce a parameter to set the >>>> boundedness of PeriodicImpulse, however this seems not aligned with current >>>> Beam sources where boundedness is a final property for a specific source. >>>> >>> >>> I think you are trying to use this to implement watchForNewFiles, right >>> ? If so, "PeriodicImpulse" might not be the correct thing to base this on. >>> We might actually need to develop a Watch transform (SDF) for the Python >>> SDK. >>> >>> Thanks, >>> Cham >>> >>> >>>> Regards, >>>> Yi >>>> >>>> -- >>>> >>>> Yi Hu, (he/him/his) >>>> >>>> Software Engineer >>>> >>>> >>>>
