[ 
https://issues.apache.org/jira/browse/BEAM-1872?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16269679#comment-16269679
 ] 

ASF GitHub Bot commented on BEAM-1872:
--------------------------------------

udim commented on a change in pull request #4040: [BEAM-1872] Add 
IdentityWindowFn for use in Reshuffle
URL: https://github.com/apache/beam/pull/4040#discussion_r153653661
 
 

 ##########
 File path: sdks/python/apache_beam/transforms/util.py
 ##########
 @@ -423,3 +431,115 @@ def expand(self, pcoll):
           self._batch_size_estimator))
     else:
       return pcoll | 
ParDo(_WindowAwareBatchingDoFn(self._batch_size_estimator))
+
+
+class IdentityWindowFn(NonMergingWindowFn):
+  """Windowing function that preserves existing windows.
+
+  To be used internally with the Reshuffle transform.
+  Will raise an exception when used after DoFns that return TimestampedValue
+  elements.
+  """
+
+  def __init__(self, coder):
+    """Create a new WindowFn with compatible coder.
+    To be applied to PCollections with windows that are compatible with the
+    given coder.
+
+    Arguments:
+      coder: coders.Coder object to be used on windows.
+    """
+    super(IdentityWindowFn, self).__init__()
+    if coder is None:
+      raise ValueError('coder should not be None')
+    self._coder = coder
+
+  def assign(self, assign_context):
+    if assign_context.window is None:
+      raise ValueError(
+          'assign_context.window should not be None. '
+          'This might be due to a DoFn returning a TimestampedValue.')
+    return [assign_context.window]
+
+  def get_window_coder(self):
+    return self._coder
+
+class TriggerForEveryElement(TriggerFn):
+
+  def __repr__(self):
+    return 'TriggerForEveryElement'
+
+  def __eq__(self, other):
+    return type(self) == type(other)
+
+  def on_element(self, element, window, context):
+    pass
+
+  def on_merge(self, to_be_merged, merge_result, context):
+    # doesn't merge
+    pass
+
+  def should_fire(self, watermark, window, context):
+    return True
+
+  def on_fire(self, watermark, window, context):
+    return True
+
+  def reset(self, window, context):
+    pass
+
+  @staticmethod
+  def from_runner_api(unused_proto, unused_context):
+    return TriggerForEveryElement()
+
+  def to_runner_api(self, unused_context):
+    # TODO: add TriggerForEveryElement to proto
+    return beam_runner_api_pb2.Trigger(
+        element_count=beam_runner_api_pb2.Trigger.ElementCount(
+            element_count=0))
+
+
+# TODO(ehudm): compare with Java implementation for more edge cases.
+# TODO: are these typehints necessary?
[email protected]_input_types(typehints.KV[K, V])
[email protected]_output_types(typehints.KV[K, V])
+class Reshuffle(PTransform):
+  """TODO description
+
+  Reshuffle is experimental. No backwards compatibility guarantees.
+  """
+
+  def expand(self, pcoll):
+    class ExpandIterableDoFn(DoFn):
 
 Review comment:
   <!--thread_id:cc_151220316_t; 
commit:4fa4caa4b4991fe994ce3938f37be975421c6761; resolved:1-->
   <!--section:context-quote-->
   > **robertwb** wrote:
   > A DoFn with nothing but a process method can be more simply implemented 
via beam.Map or beam.FlatMap.
   
   <!--section:body-->
   Done.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> implement Reshuffle transform in python, make it experimental in Java
> ---------------------------------------------------------------------
>
>                 Key: BEAM-1872
>                 URL: https://issues.apache.org/jira/browse/BEAM-1872
>             Project: Beam
>          Issue Type: Improvement
>          Components: sdk-java-core, sdk-py-core
>            Reporter: Ahmet Altay
>            Assignee: Udi Meiri
>              Labels: sdk-consistency
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to