lukecwik commented on a change in pull request #10972: Add 
DeduplicationByUniqueId transform
URL: https://github.com/apache/beam/pull/10972#discussion_r387321457
 
 

 ##########
 File path: sdks/python/apache_beam/runners/sdf_utils.py
 ##########
 @@ -244,3 +248,68 @@ def get_estimator_state(self):
         return None
 
     return _NoOpWatermarkEstimator()
+
+
+class DeduplicationByUniqueId(ptransform.PTransform):
+  """A Transform that perform deduplication based on the unique id.
+
+  The input should be a pair of (key, value), where key stands for the 
unique_id
+  of the value. The transform outputs the value with orignal windowing 
strategy.
+  By default, the deduplication will be performed with Session windows. The
+  default size of Session is one day.
+  """
+  def __init__(self, session_size=24 * 60 * 60):
+    self._session_size = session_size
+
+  class OutputValueWithOriginalWindowing(core.ParDo):
+    class _DropUniqueId(core.DoFn):
+      def process(self, element):
+        _, windowed_value = element
+        yield windowed_value
+
+    def __init__(self, original_windowing):
+      self.windowing = original_windowing
+      super(DeduplicationByUniqueId.OutputValueWithOriginalWindowing,
+            self).__init__(self._DropUniqueId())
+
+    def get_windowing(self, unused_inputs):
+      return self.windowing
+
+  class _DeduplicationCombineFn(core.CombineFn):
+    def create_accumulator(self):
+      return []
+
+    def add_input(self, accumulator, element):
+      if len(accumulator) == 0:
+        accumulator.append(element)
+      return accumulator
+
+    def merge_accumulators(self, accumulators):
+      for accumulator in accumulators:
+        if len(accumulator) > 0:
+          return accumulator
+
+    def extract_output(self, accumulator):
+      return accumulator[0]
+
+  class KeepValueWindowingInfo(core.DoFn):
+    def process(
+        self,
+        kv,
+        ts=core.DoFn.TimestampParam,
+        window=core.DoFn.WindowParam,
+        paneinfo=core.DoFn.PaneInfoParam):
+      id, value = kv
+      yield (id, WindowedValue(value, ts, [window], paneinfo))
+
+  def expand(self, pcoll):
+    windowing = pcoll.windowing
+    return (
+        pcoll
+        | core.ParDo(self.KeepValueWindowingInfo())
+        | core.WindowInto(
+            Sessions(self._session_size),
+            trigger=trigger.AfterCount(1),
+            accumulation_mode=trigger.AccumulationMode.DISCARDING)
+        | core.CombinePerKey(self._DeduplicationCombineFn())
 
 Review comment:
   This should be a GBK followed by a ParDo that only emits elements if it is 
the first pane.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to