shunping commented on code in PR #35137:
URL: https://github.com/apache/beam/pull/35137#discussion_r2150109684


##########
sdks/python/apache_beam/io/iobase.py:
##########
@@ -1152,22 +1197,339 @@ def expand(self, pcoll):
           | 'Pair' >> core.Map(lambda x: (None, x))
           | core.GroupByKey()
           | 'Extract' >> core.FlatMap(lambda x: x[1]))
-    # PreFinalize should run before FinalizeWrite, and the two should not be
-    # fused.
+
     pre_finalize_coll = (
         do_once
         | 'PreFinalize' >> core.FlatMap(
             _pre_finalize,
             self.sink,
             AsSingleton(init_result_coll),
             AsIter(write_result_coll)))
-    return do_once | 'FinalizeWrite' >> core.FlatMap(
-        _finalize_write,
-        self.sink,
-        AsSingleton(init_result_coll),
-        AsIter(write_result_coll),
-        min_shards,
-        AsSingleton(pre_finalize_coll)).with_output_types(str)
+    return (
+        do_once | 'FinalizeWrite' >> core.FlatMap(
+            _finalize_write,
+            self.sink,
+            AsSingleton(init_result_coll),
+            AsIter(write_result_coll),
+            min_shards,
+            AsSingleton(pre_finalize_coll)).with_output_types(str))
+
+  def _apply_windowing(self, pcoll):
+    return (
+        pcoll  #TODO GroupIntoBatches and trigger indef per freq
+        | core.WindowInto(
+            window.FixedWindows(self.sink.triggering_frequency),
+            trigger=beam.transforms.trigger.AfterWatermark(),
+            accumulation_mode=beam.transforms.trigger.AccumulationMode.
+            DISCARDING,
+            allowed_lateness=beam.utils.timestamp.Duration(seconds=0)))
+
+  def _expand_unbounded(self, pcoll, min_shards):
+    """Handles the expansion logic for an unbounded PCollection."""
+    if (min_shards >= 1):
+      #unbounded PCollection needes to be written per window
+      if isinstance(pcoll.windowing.windowfn, window.GlobalWindows):
+        if (self.sink.triggering_frequency is None or
+            self.sink.triggering_frequency == 0):
+          raise ValueError(
+              'To write a GlobalWindow unbounded PCollection, '
+              'triggering_frequency must be set and be greater than 0')
+        widowed_pcoll = self._apply_windowing(pcoll)
+      else:
+        #keep user windowing, unless triggering_frequency has been specified
+        if (self.sink.triggering_frequency is not None and
+            self.sink.triggering_frequency > 0):
+          widowed_pcoll = self._apply_windowing(pcoll)
+        else:  #keep user windowing
+          widowed_pcoll = pcoll
+      if self.sink.convert_fn is not None:
+        widowed_pcoll = widowed_pcoll | core.ParDo(self.sink.convert_fn)
+      if min_shards == 1:
+        keyed_pcoll = widowed_pcoll | core.Map(lambda x: (None, x))
+      else:
+        keyed_pcoll = widowed_pcoll | core.ParDo(
+            _RoundRobinKeyFn(), count=min_shards)
+      init_result_window_coll = (
+          keyed_pcoll
+          | 'Pair init' >> core.Map(lambda x: (None, x))
+          | 'Pair init gbk' >> core.GroupByKey()
+          | 'InitializeWindowedWrite' >> core.Map(
+              lambda _, sink: sink.initialize_write(), self.sink))
+
+      write_result_coll = (
+          keyed_pcoll
+          | 'Group by random key' >> core.GroupByKey()
+          | 'WriteWindowedBundles' >> core.ParDo(
+              _WriteWindowedBundleDoFn(sink=self.sink, per_key=True),
+              AsSingleton(init_result_window_coll))
+          | 'Pair' >> core.Map(lambda x: (None, x))
+          | core.GroupByKey()
+          | 'Extract' >> core.Map(lambda x: x[1]))
+      pre_finalized_write_result_coll = (
+          write_result_coll
+          | 'PreFinalize' >> core.ParDo(
+              _PreFinalizeWindowedBundleDoFn(self.sink),
+              AsSingleton(init_result_window_coll)))
+      finalized_write_result_coll = (
+          pre_finalized_write_result_coll
+          | 'FinalizeWrite' >> core.FlatMap(
+              _finalize_windowed_write,
+              self.sink,
+              AsSingleton(init_result_window_coll),
+              AsSingleton(write_result_coll),
+              min_shards,
+              AsIter(pre_finalized_write_result_coll)).with_output_types(str))
+      return finalized_write_result_coll
+    else:
+      _LOGGER.info(
+          "*** WriteImpl min_shards undef so it's 1, and we write per Bundle")
+      #unbounded PCollection needes to be written per window
+      if isinstance(pcoll.windowing.windowfn, window.GlobalWindows):
+        if (self.sink.triggering_frequency is None or
+            self.sink.triggering_frequency == 0):
+          raise ValueError(
+              'To write a GlobalWindow PCollection, triggering_frequency must'
+              ' be set and be greater than 0')
+        widowed_pcoll = (
+            pcoll  #TODO GroupIntoBatches and trigger indef per freq
+            | core.WindowInto(
+                window.FixedWindows(self.sink.triggering_frequency),
+                trigger=beam.transforms.trigger.AfterWatermark(),
+                accumulation_mode=beam.transforms.trigger.AccumulationMode.
+                DISCARDING,
+                allowed_lateness=beam.utils.timestamp.Duration(seconds=0)))
+      else:
+        #keep user windowing, unless triggering_frequency has been specified
+        if (self.sink.triggering_frequency is not None and
+            self.sink.triggering_frequency > 0):
+          widowed_pcoll = (
+              pcoll  #TODO GroupIntoBatches and trigger indef per freq
+              | core.WindowInto(
+                  window.FixedWindows(self.sink.triggering_frequency),
+                  trigger=beam.transforms.trigger.AfterWatermark(),
+                  accumulation_mode=beam.transforms.trigger.AccumulationMode.
+                  DISCARDING,
+                  allowed_lateness=beam.utils.timestamp.Duration(seconds=0)))
+        else:  #keep user windowing
+          widowed_pcoll = pcoll
+      init_result_window_coll = (
+          widowed_pcoll
+          | 'Pair init' >> core.Map(lambda x: (None, x))
+          | 'Pair init gbk' >> core.GroupByKey()
+          | 'InitializeWindowedWrite' >> core.Map(
+              lambda _, sink: sink.initialize_write(), self.sink))
+      if self.sink.convert_fn is not None:
+        widowed_pcoll = widowed_pcoll | core.ParDo(self.sink.convert_fn)
+      write_result_coll = (
+          widowed_pcoll
+          | 'WriteWindowedBundles' >> core.ParDo(
+              _WriteWindowedBundleDoFn(self.sink),
+              AsSingleton(init_result_window_coll))
+          | 'Pair' >> core.Map(lambda x: (None, x))
+          | core.GroupByKey()
+          | 'Extract' >> core.Map(lambda x: x[1]))
+      pre_finalized_write_result_coll = (
+          write_result_coll
+          | 'PreFinalize' >> core.ParDo(
+              _PreFinalizeWindowedBundleDoFn(self.sink),
+              AsSingleton(init_result_window_coll)))
+      finalized_write_result_coll = (
+          pre_finalized_write_result_coll
+          | 'FinalizeWrite' >> core.FlatMap(
+              _finalize_windowed_write,
+              self.sink,
+              AsSingleton(init_result_window_coll),
+              AsSingleton(write_result_coll),
+              min_shards,
+              AsIter(pre_finalized_write_result_coll)).with_output_types(str))
+      return finalized_write_result_coll
+
+  def expand(self, pcoll):
+    min_shards = getattr(self.sink, 'num_shards', 0)
+
+    if (pcoll.is_bounded):
+      return self._expand_bounded(pcoll, min_shards)
+    else:
+      return self._expand_unbounded(pcoll, min_shards)
+
+  def expandOld(self, pcoll):

Review Comment:
   Do we still need this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to