[
https://issues.apache.org/jira/browse/BEAM-3905?focusedWorklogId=110062&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-110062
]
ASF GitHub Bot logged work on BEAM-3905:
----------------------------------------
Author: ASF GitHub Bot
Created on: 08/Jun/18 10:03
Start Date: 08/Jun/18 10:03
Worklog Time Spent: 10m
Work Description: iemejia commented on a change in pull request #5578:
[BEAM-3905] Update Flink Runner to Flink 1.5.0
URL: https://github.com/apache/beam/pull/5578#discussion_r194008095
##########
File path:
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/DedupingOperator.java
##########
@@ -53,135 +39,59 @@
*/
public class DedupingOperator<T> extends
AbstractStreamOperator<WindowedValue<T>>
implements OneInputStreamOperator<WindowedValue<ValueWithRecordId<T>>,
WindowedValue<T>>,
- KeyGroupCheckpointedOperator {
+ Triggerable<ByteBuffer, VoidNamespace>{
private static final long MAX_RETENTION_SINCE_ACCESS =
Duration.standardMinutes(10L).getMillis();
- private static final long MAX_CACHE_SIZE = 100_000L;
- private transient LoadingCache<Integer, LoadingCache<ByteBuffer,
AtomicBoolean>> dedupingCache;
- private transient KeyedStateBackend<ByteBuffer> keyedStateBackend;
+ // we keep the time when we last saw an element id for cleanup
+ private ValueStateDescriptor<Long> dedupingStateDescriptor =
+ new ValueStateDescriptor<>("dedup-cache", LongSerializer.INSTANCE);
+
+ private transient InternalTimerService<VoidNamespace> timerService;
@Override
- public void open() throws Exception {
- super.open();
- checkInitCache();
- keyedStateBackend = getKeyedStateBackend();
- }
+ public void initializeState(StateInitializationContext context) throws
Exception {
+ super.initializeState(context);
- private void checkInitCache() {
- if (dedupingCache == null) {
- dedupingCache = CacheBuilder.newBuilder().build(new KeyGroupLoader());
- }
- }
+ timerService =
+ getInternalTimerService("dedup-cleanup-timer",
VoidNamespaceSerializer.INSTANCE, this);
- private static class KeyGroupLoader extends
- CacheLoader<Integer, LoadingCache<ByteBuffer, AtomicBoolean>> {
- @Override
- public LoadingCache<ByteBuffer, AtomicBoolean> load(Integer ignore) throws
Exception {
- return CacheBuilder.newBuilder()
- .expireAfterAccess(MAX_RETENTION_SINCE_ACCESS, TimeUnit.MILLISECONDS)
- .maximumSize(MAX_CACHE_SIZE).build(new TrueBooleanLoader());
- }
- }
-
- private static class TrueBooleanLoader extends CacheLoader<ByteBuffer,
AtomicBoolean> {
- @Override
- public AtomicBoolean load(ByteBuffer ignore) throws Exception {
- return new AtomicBoolean(true);
- }
}
@Override
public void processElement(
StreamRecord<WindowedValue<ValueWithRecordId<T>>> streamRecord) throws
Exception {
- ByteBuffer currentKey = keyedStateBackend.getCurrentKey();
- int groupIndex = keyedStateBackend.getCurrentKeyGroupIndex();
- if (shouldOutput(groupIndex, currentKey)) {
- WindowedValue<ValueWithRecordId<T>> value = streamRecord.getValue();
-
output.collect(streamRecord.replace(value.withValue(value.getValue().getValue())));
- }
- }
- private boolean shouldOutput(int groupIndex, ByteBuffer id) throws
ExecutionException {
- return dedupingCache.get(groupIndex).getUnchecked(id).getAndSet(false);
- }
+ ValueState<Long> dedupingState =
getPartitionedState(dedupingStateDescriptor);
- @Override
- public void restoreKeyGroupState(int keyGroupIndex, DataInputStream in)
throws Exception {
- checkInitCache();
- Integer size = VarIntCoder.of().decode(in, Context.NESTED);
- for (int i = 0; i < size; i++) {
- byte[] idBytes = ByteArrayCoder.of().decode(in, Context.NESTED);
- // restore the ids which not expired.
- shouldOutput(keyGroupIndex, ByteBuffer.wrap(idBytes));
- }
- }
+ Long lastSeenTimestamp = dedupingState.value();
- @Override
- public void snapshotKeyGroupState(int keyGroupIndex, DataOutputStream out)
throws Exception {
- Set<ByteBuffer> ids = dedupingCache.get(keyGroupIndex).asMap().keySet();
- VarIntCoder.of().encode(ids.size(), out, Context.NESTED);
- for (ByteBuffer id : ids) {
- ByteArrayCoder.of().encode(id.array(), out, Context.NESTED);
+ if (lastSeenTimestamp == null) {
+ // we have never seen this, emit
+ WindowedValue<ValueWithRecordId<T>> value = streamRecord.getValue();
+
output.collect(streamRecord.replace(value.withValue(value.getValue().getValue())));
Review comment:
`value.withValue(value.getValue().getValue()` obvious :P
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 110062)
Time Spent: 2h (was: 1h 50m)
> Update Flink Runner to Flink 1.5.0
> ----------------------------------
>
> Key: BEAM-3905
> URL: https://issues.apache.org/jira/browse/BEAM-3905
> Project: Beam
> Issue Type: Improvement
> Components: runner-flink
> Reporter: Aljoscha Krettek
> Assignee: Aljoscha Krettek
> Priority: Major
> Time Spent: 2h
> Remaining Estimate: 0h
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)