johnyangk commented on a change in pull request #135: [NEMO-230] Emit collected
data when receiving watermark in GroupByKeyAndWindowTransform
URL: https://github.com/apache/incubator-nemo/pull/135#discussion_r229180375
##########
File path:
compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java
##########
@@ -171,35 +227,73 @@ public String toString() {
return timerData;
}
+ /**
+ * Maintains state internals and timer internals for keys.
+ */
+ final class StateAndTimerInternalsFactory {
+ private final InMemoryTimerInternalsFactory inMemoryTimerInternalsFactory;
+ private final InMemoryStateInternalsFactory inMemoryStateInternalsFactory;
+
+ StateAndTimerInternalsFactory() {
+ final Map<K, StateAndTimerForKey> map = new HashMap<>();
+ this.inMemoryTimerInternalsFactory = new
InMemoryTimerInternalsFactory(map);
+ this.inMemoryStateInternalsFactory = new
InMemoryStateInternalsFactory(map);
+ }
+ }
+
+ /**
+ * State and timer internal.
+ */
+ final class StateAndTimerForKey {
+ private StateInternals stateInternals;
+ private TimerInternals timerInternals;
+
+ StateAndTimerForKey(final StateInternals stateInternals,
+ final TimerInternals timerInternals) {
+ this.stateInternals = stateInternals;
+ this.timerInternals = timerInternals;
+ }
+ }
+
/**
* InMemoryStateInternalsFactory.
*/
final class InMemoryStateInternalsFactory implements
StateInternalsFactory<K> {
- private final InMemoryStateInternals inMemoryStateInternals;
+ private final Map<K, StateAndTimerForKey> map;
- InMemoryStateInternalsFactory() {
- this.inMemoryStateInternals = InMemoryStateInternals.forKey(null);
+ InMemoryStateInternalsFactory(final Map<K, StateAndTimerForKey> map) {
+ this.map = map;
}
@Override
public StateInternals stateInternalsForKey(final K key) {
- return inMemoryStateInternals;
+ map.putIfAbsent(key, new StateAndTimerForKey(null, null));
+ final StateAndTimerForKey stateAndTimerForKey = map.get(key);
+ if (stateAndTimerForKey.stateInternals == null) {
+ stateAndTimerForKey.stateInternals =
InMemoryStateInternals.forKey(key);
+ }
+ return stateAndTimerForKey.stateInternals;
}
}
/**
* InMemoryTimerInternalsFactory.
*/
final class InMemoryTimerInternalsFactory implements
TimerInternalsFactory<K> {
- private final InMemoryTimerInternals timerInternals;
+ private final Map<K, StateAndTimerForKey> map;
- InMemoryTimerInternalsFactory() {
- this.timerInternals = new InMemoryTimerInternals();
+ InMemoryTimerInternalsFactory(final Map<K, StateAndTimerForKey> map) {
+ this.map = map;
}
@Override
public TimerInternals timerInternalsForKey(final K key) {
- return timerInternals;
+ map.putIfAbsent(key, new StateAndTimerForKey(null, null));
Review comment:
Can you avoid instantiating `StateAndTimerForKey` with `null`s?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services