[
https://issues.apache.org/jira/browse/FLINK-3674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15588286#comment-15588286
]
ASF GitHub Bot commented on FLINK-3674:
---------------------------------------
Github user aljoscha commented on a diff in the pull request:
https://github.com/apache/flink/pull/2570#discussion_r84038706
--- Diff:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
---
@@ -390,4 +425,141 @@ public void close() {
output.close();
}
}
+
+ //
------------------------------------------------------------------------
+ // Watermark handling
+ //
------------------------------------------------------------------------
+
+ /**
+ * Returns a {@link InternalTimerService} that can be used to query
current processing time
+ * and event time and to set timers. An operator can have several timer
services, where
+ * each has its own namespace serializer. Timer services are
differentiated by the string
+ * key that is given when requesting them, if you call this method with
the same key
+ * multiple times you will get the same timer service instance in
subsequent requests.
+ *
+ * <p>Timers are always scoped to a key, the currently active key of a
keyed stream operation.
+ * When a timer fires, this key will also be set as the currently
active key.
+ *
+ * <p>Each timer has attached metadata, the namespace. Different timer
services
+ * can have a different namespace type. If you don't need namespace
differentiation you
+ * can use {@link VoidNamespaceSerializer} as the namespace serializer.
+ *
+ * @param name The name of the requested timer service. If no service
exists under the given
+ * name a new one will be created and returned.
+ * @param keySerializer {@code TypeSerializer} for the keys of the
timers.
+ * @param namespaceSerializer {@code TypeSerializer} for the timer
namespace.
+ * @param triggerable The {@link Triggerable} that should be invoked
when timers fire
+ *
+ * @param <K> The type of the timer keys.
+ * @param <N> The type of the timer namespace.
+ */
+ public <K, N> InternalTimerService<N> getInternalTimerService(
+ String name,
+ TypeSerializer<K> keySerializer,
+ TypeSerializer<N> namespaceSerializer,
+ Triggerable<K, N> triggerable) {
+
+ @SuppressWarnings("unchecked")
+ HeapInternalTimerService<K, N> service =
(HeapInternalTimerService<K, N>) timerServices.get(name);
+
+ if (service == null) {
+ if (restoredServices != null &&
restoredServices.containsKey(name)) {
+ @SuppressWarnings("unchecked")
+ HeapInternalTimerService.RestoredTimers<K, N>
restoredService =
+
(HeapInternalTimerService.RestoredTimers<K, N>) restoredServices.remove(name);
+
+ service = new HeapInternalTimerService<>(
+ keySerializer,
+ namespaceSerializer,
+ triggerable,
+ this,
+
getRuntimeContext().getProcessingTimeService(),
+ restoredService);
+
+ } else {
+ service = new HeapInternalTimerService<>(
+ keySerializer,
+ namespaceSerializer,
+ triggerable,
+ this,
+
getRuntimeContext().getProcessingTimeService());
+ }
+ timerServices.put(name, service);
+ }
+
+ return service;
+ }
+
+ public void processWatermark(Watermark mark) throws Exception {
+ for (HeapInternalTimerService<?, ?> service :
timerServices.values()) {
+ service.advanceWatermark(mark.getTimestamp());
+ }
+ output.emitWatermark(mark);
+ }
+
+ public void processWatermark1(Watermark mark) throws Exception {
+ input1Watermark = mark.getTimestamp();
+ long newMin = Math.min(input1Watermark, input2Watermark);
+ if (newMin > combinedWatermark) {
+ combinedWatermark = newMin;
+ processWatermark(new Watermark(combinedWatermark));
+ }
+ }
+
+ public void processWatermark2(Watermark mark) throws Exception {
--- End diff --
Jip, I'm already trying to address that in another PR for changing the
operator hierarchy. 😄
> Add an interface for Time aware User Functions
> ----------------------------------------------
>
> Key: FLINK-3674
> URL: https://issues.apache.org/jira/browse/FLINK-3674
> Project: Flink
> Issue Type: New Feature
> Components: Streaming
> Affects Versions: 1.0.0
> Reporter: Stephan Ewen
> Assignee: Aljoscha Krettek
>
> I suggest to add an interface that UDFs can implement, which will let them be
> notified upon watermark updates.
> Example usage:
> {code}
> public interface EventTimeFunction {
> void onWatermark(Watermark watermark);
> }
> public class MyMapper implements MapFunction<String, String>,
> EventTimeFunction {
> private long currentEventTime = Long.MIN_VALUE;
> public String map(String value) {
> return value + " @ " + currentEventTime;
> }
> public void onWatermark(Watermark watermark) {
> currentEventTime = watermark.getTimestamp();
> }
> }
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)