kennknowles commented on code in PR #34348:
URL: https://github.com/apache/beam/pull/34348#discussion_r2014523298


##########
sdks/go/pkg/beam/runners/prism/internal/urns/urns.go:
##########
@@ -124,6 +124,7 @@ var (
        CoderTimer              = cdrUrn(pipepb.StandardCoders_TIMER)
 
        CoderKV                  = cdrUrn(pipepb.StandardCoders_KV)
+       CoderTuple               = "beam:coder:tuple:v1"

Review Comment:
   This seems suspiciously out of place, being a magic string and also not a 
standard coder. What is the story behind it?



##########
sdks/python/apache_beam/transforms/util.py:
##########
@@ -949,6 +952,97 @@ def is_compat_version_prior_to(options, 
breaking_change_version):
   return False
 
 
+def _default_window_reify_functions(should_keep_paneinfo):
+  globally_windowed = window.GlobalWindows.windowed_value(None)
+  MIN_TIMESTAMP = window.MIN_TIMESTAMP
+
+  if should_keep_paneinfo:
+
+    def reify_metadata(

Review Comment:
   Nothing about this needs to be defined in a nested context. For readability, 
just define the functions at the top level and then the if statement can be 
concise and clear.



##########
sdks/python/apache_beam/transforms/util.py:
##########
@@ -949,6 +952,97 @@ def is_compat_version_prior_to(options, 
breaking_change_version):
   return False
 
 
+def _default_window_reify_functions(should_keep_paneinfo):
+  globally_windowed = window.GlobalWindows.windowed_value(None)
+  MIN_TIMESTAMP = window.MIN_TIMESTAMP
+
+  if should_keep_paneinfo:
+
+    def reify_metadata(
+        element, timestamp=DoFn.TimestampParam, pane_info=DoFn.PaneInfoParam):
+      key, value = element
+      if timestamp == MIN_TIMESTAMP:
+        timestamp = None
+      return key, (value, timestamp, pane_info)
+
+    def restore_metadata(element):
+      key, values = element
+      return [
+          globally_windowed.with_value((key, value))
+          if timestamp is None else window.GlobalWindows.windowed_value(
+              value=(key, value), timestamp=timestamp, pane_info=pane_info)
+          for (value, timestamp, pane_info) in values
+      ]
+
+    return reify_metadata, restore_metadata
+
+  def reify_timestamps(element, timestamp=DoFn.TimestampParam):
+    key, value = element
+    if timestamp == MIN_TIMESTAMP:
+      timestamp = None
+    return key, (value, timestamp)
+
+  def restore_timestamps(element):
+    key, values = element
+    return [
+        globally_windowed.with_value((key, value))
+        if timestamp is None else window.GlobalWindows.windowed_value(
+            value=(key, value), timestamp=timestamp)
+        for (value, timestamp) in values
+    ]
+
+  return reify_timestamps, restore_timestamps
+
+
+def _custom_window_reify_functions(should_keep_paneinfo):
+  def restore_timestamps(element):
+    key, windowed_values = element
+    return [wv.with_value((key, wv.value)) for wv in windowed_values]
+
+  if should_keep_paneinfo:
+
+    def reify_metadata(

Review Comment:
   ditto



##########
sdks/python/apache_beam/transforms/util.py:
##########
@@ -959,52 +1053,19 @@ class ReshufflePerKey(PTransform):
   """
   def expand(self, pcoll):
     windowing_saved = pcoll.windowing
-    if windowing_saved.is_default():
-      # In this (common) case we can use a trivial trigger driver
-      # and avoid the (expensive) window param.
-      globally_windowed = window.GlobalWindows.windowed_value(None)
-      MIN_TIMESTAMP = window.MIN_TIMESTAMP
-
-      def reify_timestamps(element, timestamp=DoFn.TimestampParam):
-        key, value = element
-        if timestamp == MIN_TIMESTAMP:
-          timestamp = None
-        return key, (value, timestamp)
-
-      def restore_timestamps(element):
-        key, values = element
-        return [
-            globally_windowed.with_value((key, value)) if timestamp is None 
else
-            window.GlobalWindows.windowed_value((key, value), timestamp)
-            for (value, timestamp) in values
-        ]
-
-      if is_compat_version_prior_to(pcoll.pipeline.options,
-                                    
RESHUFFLE_TYPEHINT_BREAKING_CHANGE_VERSION):
-        pre_gbk_map = Map(reify_timestamps).with_output_types(Any)
-      else:
-        pre_gbk_map = Map(reify_timestamps).with_input_types(
-            tuple[K, V]).with_output_types(
-                tuple[K, tuple[V, Optional[Timestamp]]])
-    else:
-
-      # typing: All conditional function variants must have identical 
signatures
-      def reify_timestamps(  # type: ignore[misc]
-          element, timestamp=DoFn.TimestampParam, window=DoFn.WindowParam):
-        key, value = element
-        # Transport the window as part of the value and restore it later.
-        return key, windowed_value.WindowedValue(value, timestamp, [window])
-
-      def restore_timestamps(element):
-        key, windowed_values = element
-        return [wv.with_value((key, wv.value)) for wv in windowed_values]
-
-      if is_compat_version_prior_to(pcoll.pipeline.options,
-                                    
RESHUFFLE_TYPEHINT_BREAKING_CHANGE_VERSION):
-        pre_gbk_map = Map(reify_timestamps).with_output_types(Any)
-      else:
-        pre_gbk_map = Map(reify_timestamps).with_input_types(
-            tuple[K, V]).with_output_types(tuple[K, TypedWindowedValue[V]])
+    is_default_windowing = windowing_saved.is_default()
+    should_keep_paneinfo = not is_compat_version_prior_to(

Review Comment:
   Instead of adding flags that increase the cyclomatic complexity of `expand`, 
prefer to immediately fork `expand` to another version (which we will not 
modify ever again, really). See 
https://github.com/apache/beam/blob/4fe34db0f42633aac1045591b14fdaabd664a43c/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java#L87



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to