[ 
https://issues.apache.org/jira/browse/BEAM-9454?focusedWorklogId=402295&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-402295
 ]

ASF GitHub Bot logged work on BEAM-9454:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 12/Mar/20 16:28
            Start Date: 12/Mar/20 16:28
    Worklog Time Spent: 10m 
      Work Description: lukecwik commented on pull request #11060: [BEAM-9454] 
Create Deduplication transform based on user timer/state
URL: https://github.com/apache/beam/pull/11060#discussion_r391727529
 
 

 ##########
 File path: sdks/python/apache_beam/runners/sdf_utils.py
 ##########
 @@ -244,3 +251,63 @@ def get_estimator_state(self):
         return None
 
     return _NoOpWatermarkEstimator()
+
+
+class DeduplictaionWithinDuration(ptransform.PTransform):
+  """ A PTransform which deduplicate input records over a time domain and
+  threshold. Values in different windows will NOT be considered duplicates of
+  each other. Deduplication is best effort.
+
+  The durations specified may impose memory and/or storage requirements within
+  a runner and care might need to be used to ensure that the deduplication time
+  limit is long enough to remove duplicates but short enough to not cause
+  performance problems within a runner. Each runner may provide an optimized
+  implementation of their choice using the deduplication time domain and
+  threshold specified.
+
+  Does not preserve any order the input PCollection might have had.
+  """
+  def __init__(
+      self,
+      time_domain=userstate.TimeDomain.REAL_TIME,
+      duration=Duration(10 * 60)):
+    self.time_domain = time_domain
+    self.duration = duration
+
+  def _create_deduplicate_fn(self):
+    timer_spec = userstate.TimerSpec('expiry_timer', self.time_domain)
+    state_spec = userstate.BagStateSpec('seen', BooleanCoder())
+    duration = self.duration
+    domain = self.time_domain
+
+    class DeduplicationFn(DoFn):
+      def process(
+          self,
+          element,
+          ts=DoFn.TimestampParam,
+          seen_state=DoFn.StateParam(state_spec),
+          expiry_timer=DoFn.TimerParam(timer_spec)):
+        if True not in seen_state.read():
+          if domain is userstate.TimeDomain.REAL_TIME:
+            expiry_timer.set(Timestamp.now() + duration)
+          else:
+            expiry_timer.set(ts + duration)
+          seen_state.add(True)
+          value, _ = element
+          yield value
 
 Review comment:
   I should have used a guard style statement in the Java implementation 
instead of nesting.
   
   ```suggestion
           if True in seen_state.read():
             return
   
           if domain is userstate.TimeDomain.REAL_TIME:
             expiry_timer.set(Timestamp.now() + duration)
           else:
             expiry_timer.set(ts + duration)
           seen_state.add(True)
           value, _ = element
           yield value
   ```
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 402295)
    Time Spent: 20m  (was: 10m)

> Add Deduplication transform for SDF
> -----------------------------------
>
>                 Key: BEAM-9454
>                 URL: https://issues.apache.org/jira/browse/BEAM-9454
>             Project: Beam
>          Issue Type: New Feature
>          Components: sdk-java-core, sdk-py-core
>            Reporter: Boyuan Zhang
>            Priority: Major
>          Time Spent: 20m
>  Remaining Estimate: 0h
>
> When SDF is used as a source-like operation, it's necessary to provide a 
> default Deduplication transform for the SDF user to deduplicate values by 
> certain unique id.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to