[ 
https://issues.apache.org/jira/browse/BEAM-3905?focusedWorklogId=110065&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-110065
 ]

ASF GitHub Bot logged work on BEAM-3905:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 08/Jun/18 10:03
            Start Date: 08/Jun/18 10:03
    Worklog Time Spent: 10m 
      Work Description: iemejia commented on a change in pull request #5578: 
[BEAM-3905] Update Flink Runner to Flink 1.5.0
URL: https://github.com/apache/beam/pull/5578#discussion_r194008629
 
 

 ##########
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/DedupingOperator.java
 ##########
 @@ -53,135 +39,59 @@
  */
 public class DedupingOperator<T> extends 
AbstractStreamOperator<WindowedValue<T>>
     implements OneInputStreamOperator<WindowedValue<ValueWithRecordId<T>>, 
WindowedValue<T>>,
-    KeyGroupCheckpointedOperator {
+    Triggerable<ByteBuffer, VoidNamespace>{
 
   private static final long MAX_RETENTION_SINCE_ACCESS = 
Duration.standardMinutes(10L).getMillis();
-  private static final long MAX_CACHE_SIZE = 100_000L;
 
-  private transient LoadingCache<Integer, LoadingCache<ByteBuffer, 
AtomicBoolean>> dedupingCache;
-  private transient KeyedStateBackend<ByteBuffer> keyedStateBackend;
+  // we keep the time when we last saw an element id for cleanup
+  private ValueStateDescriptor<Long> dedupingStateDescriptor =
+      new ValueStateDescriptor<>("dedup-cache", LongSerializer.INSTANCE);
+
+  private transient InternalTimerService<VoidNamespace> timerService;
 
   @Override
-  public void open() throws Exception {
-    super.open();
-    checkInitCache();
-    keyedStateBackend = getKeyedStateBackend();
-  }
+  public void initializeState(StateInitializationContext context) throws 
Exception {
+    super.initializeState(context);
 
-  private void checkInitCache() {
-    if (dedupingCache == null) {
-      dedupingCache = CacheBuilder.newBuilder().build(new KeyGroupLoader());
-    }
-  }
+    timerService =
+        getInternalTimerService("dedup-cleanup-timer", 
VoidNamespaceSerializer.INSTANCE, this);
 
-  private static class KeyGroupLoader extends
-      CacheLoader<Integer, LoadingCache<ByteBuffer, AtomicBoolean>> {
-    @Override
-    public LoadingCache<ByteBuffer, AtomicBoolean> load(Integer ignore) throws 
Exception {
-      return CacheBuilder.newBuilder()
-          .expireAfterAccess(MAX_RETENTION_SINCE_ACCESS, TimeUnit.MILLISECONDS)
-          .maximumSize(MAX_CACHE_SIZE).build(new TrueBooleanLoader());
-    }
-  }
-
-  private static class TrueBooleanLoader extends CacheLoader<ByteBuffer, 
AtomicBoolean> {
-    @Override
-    public AtomicBoolean load(ByteBuffer ignore) throws Exception {
-      return new AtomicBoolean(true);
-    }
   }
 
   @Override
   public void processElement(
       StreamRecord<WindowedValue<ValueWithRecordId<T>>> streamRecord) throws 
Exception {
-    ByteBuffer currentKey = keyedStateBackend.getCurrentKey();
-    int groupIndex = keyedStateBackend.getCurrentKeyGroupIndex();
-    if (shouldOutput(groupIndex, currentKey)) {
-      WindowedValue<ValueWithRecordId<T>> value = streamRecord.getValue();
-      
output.collect(streamRecord.replace(value.withValue(value.getValue().getValue())));
-    }
-  }
 
-  private boolean shouldOutput(int groupIndex, ByteBuffer id) throws 
ExecutionException {
-    return dedupingCache.get(groupIndex).getUnchecked(id).getAndSet(false);
-  }
+    ValueState<Long> dedupingState = 
getPartitionedState(dedupingStateDescriptor);
 
-  @Override
-  public void restoreKeyGroupState(int keyGroupIndex, DataInputStream in) 
throws Exception {
-    checkInitCache();
-    Integer size = VarIntCoder.of().decode(in, Context.NESTED);
-    for (int i = 0; i < size; i++) {
-      byte[] idBytes = ByteArrayCoder.of().decode(in, Context.NESTED);
-      // restore the ids which not expired.
-      shouldOutput(keyGroupIndex, ByteBuffer.wrap(idBytes));
-    }
-  }
+    Long lastSeenTimestamp = dedupingState.value();
 
-  @Override
-  public void snapshotKeyGroupState(int keyGroupIndex, DataOutputStream out) 
throws Exception {
-    Set<ByteBuffer> ids = dedupingCache.get(keyGroupIndex).asMap().keySet();
-    VarIntCoder.of().encode(ids.size(), out, Context.NESTED);
-    for (ByteBuffer id : ids) {
-      ByteArrayCoder.of().encode(id.array(), out, Context.NESTED);
+    if (lastSeenTimestamp == null) {
+      // we have never seen this, emit
+      WindowedValue<ValueWithRecordId<T>> value = streamRecord.getValue();
+      
output.collect(streamRecord.replace(value.withValue(value.getValue().getValue())));
     }
+
+    long currentProcessingTime = timerService.currentProcessingTime();
+    dedupingState.update(currentProcessingTime);
+    timerService.registerProcessingTimeTimer(
+        VoidNamespace.INSTANCE, currentProcessingTime + 
MAX_RETENTION_SINCE_ACCESS);
   }
 
   @Override
-  public void snapshotState(StateSnapshotContext context) throws Exception {
-    // copy from AbstractStreamOperator
-    if (getKeyedStateBackend() != null) {
-      KeyedStateCheckpointOutputStream out;
-
-      try {
-        out = context.getRawKeyedOperatorStateOutput();
-      } catch (Exception exception) {
-        throw new Exception("Could not open raw keyed operator state stream 
for "
-            + getOperatorName() + '.', exception);
-      }
-
-      try {
-        KeyGroupsList allKeyGroups = out.getKeyGroupList();
-        for (int keyGroupIdx : allKeyGroups) {
-          out.startNewKeyGroup(keyGroupIdx);
-
-          DataOutputViewStreamWrapper dov = new 
DataOutputViewStreamWrapper(out);
-
-          // if (this instanceof KeyGroupCheckpointedOperator)
-          snapshotKeyGroupState(keyGroupIdx, dov);
-
-        }
-      } catch (Exception exception) {
-        throw new Exception("Could not write timer service of " + 
getOperatorName()
-            + " to checkpoint state stream.", exception);
-      } finally {
-        try {
-          out.close();
-        } catch (Exception closeException) {
-          LOG.warn("Could not close raw keyed operator state stream for {}. 
This "
-                  + "might have prevented deleting some state data.", 
getOperatorName(),
-              closeException);
-        }
-      }
-    }
+  public void onEventTime(InternalTimer<ByteBuffer, VoidNamespace> 
internalTimer) {
+    // will never happen
   }
 
   @Override
-  public void initializeState(StateInitializationContext context) throws 
Exception {
-    if (getKeyedStateBackend() != null) {
-      KeyGroupsList localKeyGroupRange = 
getKeyedStateBackend().getKeyGroupRange();
-
-      for (KeyGroupStatePartitionStreamProvider streamProvider : 
context.getRawKeyedStateInputs()) {
-        DataInputViewStreamWrapper div = new 
DataInputViewStreamWrapper(streamProvider.getStream());
-
-        int keyGroupIdx = streamProvider.getKeyGroupId();
-        checkArgument(localKeyGroupRange.contains(keyGroupIdx),
-            "Key Group " + keyGroupIdx + " does not belong to the local 
range.");
-
-        // if (this instanceof KeyGroupRestoringOperator)
-        restoreKeyGroupState(keyGroupIdx, div);
-
-      }
+  public void onProcessingTime(InternalTimer<ByteBuffer, VoidNamespace> 
internalTimer)
+      throws Exception {
+    ValueState<Long> dedupingState = 
getPartitionedState(dedupingStateDescriptor);
+
+    Long lastSeenTimestamp = dedupingState.value();
+    if (lastSeenTimestamp != null
+        && lastSeenTimestamp.equals(internalTimer.getTimestamp() - 
MAX_RETENTION_SINCE_ACCESS)) {
 
 Review comment:
   Question: Shouldn't this be like > or something like that or it has to be 
exact?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 110065)
    Time Spent: 2h 10m  (was: 2h)

> Update Flink Runner to Flink 1.5.0
> ----------------------------------
>
>                 Key: BEAM-3905
>                 URL: https://issues.apache.org/jira/browse/BEAM-3905
>             Project: Beam
>          Issue Type: Improvement
>          Components: runner-flink
>            Reporter: Aljoscha Krettek
>            Assignee: Aljoscha Krettek
>            Priority: Major
>          Time Spent: 2h 10m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to