This is an automated email from the ASF dual-hosted git repository. guoweijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 3b7038c414ccd82567c90578c3b28bef04ab2271 Author: Xu Huang <huangxu.wal...@gmail.com> AuthorDate: Mon Jan 13 15:19:15 2025 +0800 [FLINK-37112][API] Introduce EventTimeWatermark definition and EventTimeWatermarkGeneratorBuilder for DataStream V2 --- .../common/eventtime/WatermarksWithIdleness.java | 6 +- .../extension/eventtime/EventTimeExtension.java | 139 +++++++++++++ .../eventtime/strategy/EventTimeExtractor.java | 31 +++ .../EventTimeWatermarkGeneratorBuilder.java | 125 +++++++++++ .../strategy/EventTimeWatermarkStrategy.java | 98 +++++++++ .../eventtime/EventTimeExtensionImpl.java | 45 ++++ .../functions/ExtractEventTimeProcessFunction.java | 194 +++++++++++++++++ .../datastream/impl/operators/ProcessOperator.java | 12 ++ .../ExtractEventTimeProcessFunctionTest.java | 230 +++++++++++++++++++++ 9 files changed, 878 insertions(+), 2 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarksWithIdleness.java b/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarksWithIdleness.java index 74a4efcc2d0..4cc651cf6e9 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarksWithIdleness.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarksWithIdleness.java @@ -18,6 +18,7 @@ package org.apache.flink.api.common.eventtime; +import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.Public; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.util.clock.Clock; @@ -84,7 +85,8 @@ public class WatermarksWithIdleness<T> implements WatermarkGenerator<T> { // ------------------------------------------------------------------------ @VisibleForTesting - static final class IdlenessTimer { + @Internal + public static final class IdlenessTimer { /** The clock used to measure elapsed time. */ private final RelativeClock clock; @@ -104,7 +106,7 @@ public class WatermarksWithIdleness<T> implements WatermarkGenerator<T> { /** The duration before the output is marked as idle. */ private final long maxIdleTimeNanos; - IdlenessTimer(RelativeClock clock, Duration idleTimeout) { + public IdlenessTimer(RelativeClock clock, Duration idleTimeout) { this.clock = clock; long idleNanos; diff --git a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/eventtime/EventTimeExtension.java b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/eventtime/EventTimeExtension.java new file mode 100644 index 00000000000..9417baef1d5 --- /dev/null +++ b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/eventtime/EventTimeExtension.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.datastream.api.extension.eventtime; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.api.common.watermark.BoolWatermarkDeclaration; +import org.apache.flink.api.common.watermark.LongWatermarkDeclaration; +import org.apache.flink.api.common.watermark.Watermark; +import org.apache.flink.api.common.watermark.WatermarkDeclarations; +import org.apache.flink.datastream.api.extension.eventtime.strategy.EventTimeExtractor; +import org.apache.flink.datastream.api.extension.eventtime.strategy.EventTimeWatermarkGeneratorBuilder; +import org.apache.flink.datastream.api.function.ProcessFunction; + +/** + * The entry point for the event-time extension, which provides the following functionality: + * + * <ul> + * <li>defines the event-time watermark and idle status watermark. If you use the {@link + * EventTimeWatermarkGeneratorBuilder} below, then you don't need to declare these watermarks + * manually in your application; otherwise you need to declare them in your own {@link + * ProcessFunction}. + * <li>provides the {@link EventTimeWatermarkGeneratorBuilder} to facilitate the generation of + * event time watermarks. An example of using {@link EventTimeWatermarkGeneratorBuilder} is as + * follows: + * <pre>{@code + * OneInputStreamProcessFunction<POJO, POJO> watermarkGeneratorProcessFunction + * = EventTimeExtension + * .newWatermarkGeneratorBuilder(POJO::getEventTime) + * .periodicWatermark() + * .buildAsProcessFunction(); + * source.process(watermarkGeneratorProcessFunction) + * .process(...) + * }</pre> + * </ul> + */ +@Experimental +public class EventTimeExtension { + + // =============== Event Time related Watermark Declarations =============== + + /** + * Definition of EventTimeWatermark. The EventTimeWatermark represents a specific timestamp, + * signifying the passage of time. Once a process function receives an EventTimeWatermark, it + * will no longer receive events with a timestamp earlier than that watermark. + */ + public static final LongWatermarkDeclaration EVENT_TIME_WATERMARK_DECLARATION = + WatermarkDeclarations.newBuilder("BUILTIN_API_EVENT_TIME") + .typeLong() + .combineFunctionMin() + .combineWaitForAllChannels(true) + .defaultHandlingStrategyForward() + .build(); + + /** + * Definition of IdleStatusWatermark. The IdleStatusWatermark indicates that a particular input + * is in an idle state. When a ProcessFunction receives an IdleStatusWatermark from an input, it + * should ignore that input when combining EventTimeWatermarks. + */ + public static final BoolWatermarkDeclaration IDLE_STATUS_WATERMARK_DECLARATION = + WatermarkDeclarations.newBuilder("BUILTIN_API_EVENT_TIME_IDLE") + .typeBool() + .combineFunctionAND() + .combineWaitForAllChannels(true) + .defaultHandlingStrategyForward() + .build(); + + /** + * Determine if the received watermark is an EventTimeWatermark. + * + * @param watermark The watermark to be checked. + * @return true if the watermark is an EventTimeWatermark; false otherwise. + */ + public static boolean isEventTimeWatermark(Watermark watermark) { + return isEventTimeWatermark(watermark.getIdentifier()); + } + + /** + * Determine if the received watermark is an EventTimeWatermark by watermark identifier. + * + * @param watermarkIdentifier The identifier of the watermark to be checked. + * @return true if the watermark is an EventTimeWatermark; false otherwise. + */ + public static boolean isEventTimeWatermark(String watermarkIdentifier) { + return watermarkIdentifier.equals(EVENT_TIME_WATERMARK_DECLARATION.getIdentifier()); + } + + /** + * Determine if the received watermark is an IdleStatusWatermark. + * + * @param watermark The watermark to be checked. + * @return true if the watermark is an IdleStatusWatermark; false otherwise. + */ + public static boolean isIdleStatusWatermark(Watermark watermark) { + return isIdleStatusWatermark(watermark.getIdentifier()); + } + + /** + * Determine if the received watermark is an IdleStatusWatermark by watermark identifier. + * + * @param watermarkIdentifier The identifier of the watermark to be checked. + * @return true if the watermark is an IdleStatusWatermark; false otherwise. + */ + public static boolean isIdleStatusWatermark(String watermarkIdentifier) { + return watermarkIdentifier.equals(IDLE_STATUS_WATERMARK_DECLARATION.getIdentifier()); + } + + // ======== EventTimeWatermarkGeneratorBuilder to generate event time watermarks ========= + + /** + * Create an instance of {@link EventTimeWatermarkGeneratorBuilder}, which contains a {@code + * EventTimeExtractor}. + * + * @param eventTimeExtractor An instance of {@code EventTimeExtractor} used to extract event + * time information from data records. + * @param <T> The type of data records. + * @return An instance of {@code EventTimeWatermarkGeneratorBuilder} containing the specified + * event time extractor. + */ + public static <T> EventTimeWatermarkGeneratorBuilder<T> newWatermarkGeneratorBuilder( + EventTimeExtractor<T> eventTimeExtractor) { + return new EventTimeWatermarkGeneratorBuilder<>(eventTimeExtractor); + } +} diff --git a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/eventtime/strategy/EventTimeExtractor.java b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/eventtime/strategy/EventTimeExtractor.java new file mode 100644 index 00000000000..cafa443cc06 --- /dev/null +++ b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/eventtime/strategy/EventTimeExtractor.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.datastream.api.extension.eventtime.strategy; + +import org.apache.flink.annotation.Experimental; + +import java.io.Serializable; + +/** A user function designed to extract event time from an event. */ +@Experimental +public interface EventTimeExtractor<T> extends Serializable { + + /** Extract the event time from the event, with the result provided in milliseconds. */ + long extractTimestamp(T event); +} diff --git a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/eventtime/strategy/EventTimeWatermarkGeneratorBuilder.java b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/eventtime/strategy/EventTimeWatermarkGeneratorBuilder.java new file mode 100644 index 00000000000..a330eff23dd --- /dev/null +++ b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/eventtime/strategy/EventTimeWatermarkGeneratorBuilder.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.datastream.api.extension.eventtime.strategy; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.datastream.api.extension.eventtime.EventTimeExtension; +import org.apache.flink.datastream.api.function.OneInputStreamProcessFunction; + +import java.time.Duration; + +/** + * A utility class for constructing a processing function that extracts event time and generates + * event time watermarks in the {@link EventTimeExtension}. + */ +@Experimental +public class EventTimeWatermarkGeneratorBuilder<T> { + // how to extract event time from event + private EventTimeExtractor<T> eventTimeExtractor; + + // what frequency to generate event time watermark + private EventTimeWatermarkStrategy.EventTimeWatermarkGenerateMode generateMode = + EventTimeWatermarkStrategy.EventTimeWatermarkGenerateMode.PERIODIC; + + // if not set, it will default to the value of the "pipeline.auto-watermark-interval" + // configuration. + private Duration periodicWatermarkInterval = Duration.ZERO; + + // if set to zero, it will not generate idle status watermark + private Duration idleTimeout = Duration.ZERO; + + // max out-of-order time + private Duration maxOutOfOrderTime = Duration.ZERO; + + // ========= how to extract event times from events ========= + + public EventTimeWatermarkGeneratorBuilder(EventTimeExtractor<T> eventTimeExtractor) { + this.eventTimeExtractor = eventTimeExtractor; + } + + // ========= generate the event time watermark with what value ========= + + public EventTimeWatermarkGeneratorBuilder<T> withIdleness(Duration idleTimeout) { + this.idleTimeout = idleTimeout; + return this; + } + + public EventTimeWatermarkGeneratorBuilder<T> withMaxOutOfOrderTime(Duration maxOutOfOrderTime) { + this.maxOutOfOrderTime = maxOutOfOrderTime; + return this; + } + + // ========= when to generate event time watermark ========= + + public EventTimeWatermarkGeneratorBuilder<T> noWatermark() { + this.generateMode = EventTimeWatermarkStrategy.EventTimeWatermarkGenerateMode.NO_WATERMARK; + return this; + } + + /** + * The periodic watermark interval will be set to the value specified by + * PipelineOptions#AUTO_WATERMARK_INTERVAL. + */ + public EventTimeWatermarkGeneratorBuilder<T> periodicWatermark() { + this.generateMode = EventTimeWatermarkStrategy.EventTimeWatermarkGenerateMode.PERIODIC; + return this; + } + + public EventTimeWatermarkGeneratorBuilder<T> periodicWatermark( + Duration periodicWatermarkInterval) { + this.generateMode = EventTimeWatermarkStrategy.EventTimeWatermarkGenerateMode.PERIODIC; + this.periodicWatermarkInterval = periodicWatermarkInterval; + return this; + } + + public EventTimeWatermarkGeneratorBuilder<T> perEventWatermark() { + this.generateMode = EventTimeWatermarkStrategy.EventTimeWatermarkGenerateMode.PER_EVENT; + return this; + } + + // ========= build the watermark generator as process function ========= + + public OneInputStreamProcessFunction<T, T> buildAsProcessFunction() { + EventTimeWatermarkStrategy<T> watermarkStrategy = + new EventTimeWatermarkStrategy<>( + this.eventTimeExtractor, + this.generateMode, + this.periodicWatermarkInterval, + this.idleTimeout, + this.maxOutOfOrderTime); + + try { + return (OneInputStreamProcessFunction<T, T>) + getEventTimeExtensionImplClass() + .getMethod("buildAsProcessFunction", EventTimeWatermarkStrategy.class) + .invoke(null, watermarkStrategy); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private static Class<?> getEventTimeExtensionImplClass() { + try { + return Class.forName( + "org.apache.flink.datastream.impl.extension.eventtime.EventTimeExtensionImpl"); + } catch (ClassNotFoundException e) { + throw new RuntimeException("Please ensure that flink-datastream in your class path"); + } + } +} diff --git a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/eventtime/strategy/EventTimeWatermarkStrategy.java b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/eventtime/strategy/EventTimeWatermarkStrategy.java new file mode 100644 index 00000000000..9f6a7a6f585 --- /dev/null +++ b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/eventtime/strategy/EventTimeWatermarkStrategy.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.datastream.api.extension.eventtime.strategy; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.annotation.Internal; + +import java.io.Serializable; +import java.time.Duration; + +/** Component which encapsulates the logic of how and when to extract event time and watermarks. */ +@Experimental +public class EventTimeWatermarkStrategy<T> implements Serializable { + // how to extract event time from event + private final EventTimeExtractor<T> eventTimeExtractor; + + // what frequency to generate event time watermark + private final EventTimeWatermarkGenerateMode generateMode; + + // if not set, it will default to the value of the "pipeline.auto-watermark-interval" + // configuration. + private final Duration periodicWatermarkInterval; + + // if set to zero, it will not generate idle status watermark + private final Duration idleTimeout; + + // max out-of-order time + private final Duration maxOutOfOrderTime; + + public EventTimeWatermarkStrategy(EventTimeExtractor<T> eventTimeExtractor) { + this( + eventTimeExtractor, + EventTimeWatermarkGenerateMode.PERIODIC, + Duration.ZERO, + Duration.ZERO, + Duration.ZERO); + } + + public EventTimeWatermarkStrategy( + EventTimeExtractor<T> eventTimeExtractor, + EventTimeWatermarkGenerateMode generateMode, + Duration periodicWatermarkInterval, + Duration idleTimeout, + Duration maxOutOfOrderTime) { + this.eventTimeExtractor = eventTimeExtractor; + this.generateMode = generateMode; + this.periodicWatermarkInterval = periodicWatermarkInterval; + this.idleTimeout = idleTimeout; + this.maxOutOfOrderTime = maxOutOfOrderTime; + } + + public EventTimeExtractor<T> getEventTimeExtractor() { + return eventTimeExtractor; + } + + public EventTimeWatermarkGenerateMode getGenerateMode() { + return generateMode; + } + + public Duration getPeriodicWatermarkInterval() { + return periodicWatermarkInterval; + } + + public Duration getIdleTimeout() { + return idleTimeout; + } + + public Duration getMaxOutOfOrderTime() { + return maxOutOfOrderTime; + } + + /** + * {@link EventTimeWatermarkGenerateMode} indicates the frequency at which event-time watermarks + * are generated. + */ + @Internal + public enum EventTimeWatermarkGenerateMode { + NO_WATERMARK, + PERIODIC, + PER_EVENT, + } +} diff --git a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/extension/eventtime/EventTimeExtensionImpl.java b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/extension/eventtime/EventTimeExtensionImpl.java new file mode 100644 index 00000000000..a4a26bc9490 --- /dev/null +++ b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/extension/eventtime/EventTimeExtensionImpl.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.datastream.impl.extension.eventtime; + +import org.apache.flink.api.common.watermark.Watermark; +import org.apache.flink.datastream.api.extension.eventtime.EventTimeExtension; +import org.apache.flink.datastream.api.extension.eventtime.strategy.EventTimeWatermarkStrategy; +import org.apache.flink.datastream.api.function.OneInputStreamProcessFunction; +import org.apache.flink.datastream.impl.extension.eventtime.functions.ExtractEventTimeProcessFunction; + +/** The implementation of {@link EventTimeExtension}. */ +public class EventTimeExtensionImpl { + + // ============= Extract Event Time Process Function ============= + + /** + * Build an {@link ExtractEventTimeProcessFunction} to extract event time according to {@link + * EventTimeWatermarkStrategy}. + */ + public static <T> OneInputStreamProcessFunction<T, T> buildAsProcessFunction( + EventTimeWatermarkStrategy<T> strategy) { + return new ExtractEventTimeProcessFunction<>(strategy); + } + + // ============= Other Utils ============= + public static boolean isEventTimeExtensionWatermark(Watermark watermark) { + return EventTimeExtension.isEventTimeWatermark(watermark) + || EventTimeExtension.isIdleStatusWatermark(watermark); + } +} diff --git a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/extension/eventtime/functions/ExtractEventTimeProcessFunction.java b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/extension/eventtime/functions/ExtractEventTimeProcessFunction.java new file mode 100644 index 00000000000..db2c3bec7e3 --- /dev/null +++ b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/extension/eventtime/functions/ExtractEventTimeProcessFunction.java @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.datastream.impl.extension.eventtime.functions; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.eventtime.WatermarksWithIdleness.IdlenessTimer; +import org.apache.flink.api.common.watermark.WatermarkDeclaration; +import org.apache.flink.api.common.watermark.WatermarkManager; +import org.apache.flink.configuration.PipelineOptions; +import org.apache.flink.datastream.api.common.Collector; +import org.apache.flink.datastream.api.context.PartitionedContext; +import org.apache.flink.datastream.api.extension.eventtime.EventTimeExtension; +import org.apache.flink.datastream.api.extension.eventtime.strategy.EventTimeWatermarkStrategy; +import org.apache.flink.datastream.api.function.OneInputStreamProcessFunction; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkState; + +/** A specialized process function designed for extracting event timestamps. */ +public class ExtractEventTimeProcessFunction<IN> + implements OneInputStreamProcessFunction<IN, IN>, + ProcessingTimeService.ProcessingTimeCallback { + + /** User-defined watermark strategy. */ + private final EventTimeWatermarkStrategy<IN> watermarkStrategy; + + /** The maximum timestamp encountered so far. */ + private long currentMaxEventTime = Long.MIN_VALUE; + + private long lastEmittedEventTime = Long.MIN_VALUE; + + /** + * The periodic processing timer interval; if not configured by user in {@link + * EventTimeWatermarkStrategy}, it will default to the value specified by {@link + * PipelineOptions#AUTO_WATERMARK_INTERVAL}. + */ + private long periodicTimerInterval = 0; + + /** + * Whether enable create and send {@link EventTimeExtension#IDLE_STATUS_WATERMARK_DECLARATION}. + */ + private boolean enableIdleStatus; + + /** The {@link IdlenessTimer} is utilized to check whether the input is currently idle. */ + private IdlenessTimer idlenessTimer; + + private boolean isIdleNow = false; + + private final long maxOutOfOrderTimeInMs; + + private ProcessingTimeService processingTimeService; + + private WatermarkManager watermarkManager; + + public ExtractEventTimeProcessFunction(EventTimeWatermarkStrategy<IN> watermarkStrategy) { + this.watermarkStrategy = watermarkStrategy; + if (watermarkStrategy.getIdleTimeout().toMillis() > 0) { + this.enableIdleStatus = true; + } + this.maxOutOfOrderTimeInMs = watermarkStrategy.getMaxOutOfOrderTime().toMillis(); + } + + public void initEventTimeExtension( + ExecutionConfig config, + WatermarkManager watermarkManager, + ProcessingTimeService processingTimeService) { + this.processingTimeService = processingTimeService; + this.watermarkManager = watermarkManager; + + if (enableIdleStatus) { + this.idlenessTimer = + new IdlenessTimer( + processingTimeService.getClock(), watermarkStrategy.getIdleTimeout()); + } + + // May need register timer to check whether the input is idle and periodically send event + // time watermarks + boolean needRegisterTimer = + watermarkStrategy.getGenerateMode() + == EventTimeWatermarkStrategy.EventTimeWatermarkGenerateMode + .PERIODIC + || enableIdleStatus; + // set timer interval default to config option {@link + // PipelineOptions#AUTO_WATERMARK_INTERVAL} + this.periodicTimerInterval = config.getAutoWatermarkInterval(); + if (watermarkStrategy.getGenerateMode() + == EventTimeWatermarkStrategy.EventTimeWatermarkGenerateMode.PERIODIC + && !watermarkStrategy.getPeriodicWatermarkInterval().isZero()) { + this.periodicTimerInterval = + watermarkStrategy.getPeriodicWatermarkInterval().toMillis(); + } + checkState( + periodicTimerInterval > 0, + "Watermark interval " + periodicTimerInterval + " should large to 0."); + + if (needRegisterTimer) { + processingTimeService.registerTimer( + processingTimeService.getCurrentProcessingTime() + periodicTimerInterval, this); + } + } + + @Override + public Set<? extends WatermarkDeclaration> declareWatermarks() { + // declare EventTimeExtension.EVENT_TIME_WATERMARK_DECLARATION + // if idle status is enabled, also declare + // EventTimeExtension.IDLE_STATUS_WATERMARK_DECLARATION. + Set<WatermarkDeclaration> watermarkDeclarations = new HashSet<>(); + watermarkDeclarations.add(EventTimeExtension.EVENT_TIME_WATERMARK_DECLARATION); + if (enableIdleStatus) { + watermarkDeclarations.add(EventTimeExtension.IDLE_STATUS_WATERMARK_DECLARATION); + } + return watermarkDeclarations; + } + + @Override + public void processRecord(IN record, Collector<IN> output, PartitionedContext<IN> ctx) + throws Exception { + if (enableIdleStatus) { + if (isIdleNow) { + watermarkManager.emitWatermark( + EventTimeExtension.IDLE_STATUS_WATERMARK_DECLARATION.newWatermark(false)); + isIdleNow = false; + } + + // mark current input as active + idlenessTimer.activity(); + } + + // extract event time from record + long extractedEventTime = + watermarkStrategy.getEventTimeExtractor().extractTimestamp(record); + currentMaxEventTime = Math.max(currentMaxEventTime, extractedEventTime); + output.collectAndOverwriteTimestamp(record, extractedEventTime); + + if (watermarkStrategy.getGenerateMode() + == EventTimeWatermarkStrategy.EventTimeWatermarkGenerateMode.PER_EVENT) { + // If the per event watermark is utilized, create event time watermark and send + tryEmitEventTimeWatermark(ctx.getNonPartitionedContext().getWatermarkManager()); + } + } + + /** + * The processing timer has two goals: 1. check whether the input is idle 2. periodically emit + * event time watermark + */ + @Override + public void onProcessingTime(long time) throws IOException, InterruptedException, Exception { + if (enableIdleStatus && idlenessTimer.checkIfIdle()) { + if (!isIdleNow) { + watermarkManager.emitWatermark( + EventTimeExtension.IDLE_STATUS_WATERMARK_DECLARATION.newWatermark(true)); + isIdleNow = true; + } + } else if (watermarkStrategy.getGenerateMode() + == EventTimeWatermarkStrategy.EventTimeWatermarkGenerateMode.PERIODIC) { + tryEmitEventTimeWatermark(watermarkManager); + } + + processingTimeService.registerTimer(time + periodicTimerInterval, this); + } + + private void tryEmitEventTimeWatermark(WatermarkManager watermarkManager) { + if (currentMaxEventTime == Long.MIN_VALUE) { + return; + } + + long needEmittedEventTime = currentMaxEventTime - maxOutOfOrderTimeInMs; + if (needEmittedEventTime > lastEmittedEventTime) { + watermarkManager.emitWatermark( + EventTimeExtension.EVENT_TIME_WATERMARK_DECLARATION.newWatermark( + needEmittedEventTime)); + lastEmittedEventTime = needEmittedEventTime; + } + } +} diff --git a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/ProcessOperator.java b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/ProcessOperator.java index 684cbf5530d..cde50f26a74 100644 --- a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/ProcessOperator.java +++ b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/ProcessOperator.java @@ -30,6 +30,7 @@ import org.apache.flink.datastream.impl.context.DefaultNonPartitionedContext; import org.apache.flink.datastream.impl.context.DefaultPartitionedContext; import org.apache.flink.datastream.impl.context.DefaultRuntimeContext; import org.apache.flink.datastream.impl.context.UnsupportedProcessingTimeManager; +import org.apache.flink.datastream.impl.extension.eventtime.functions.ExtractEventTimeProcessFunction; import org.apache.flink.runtime.asyncprocessing.operators.AbstractAsyncStateUdfStreamOperator; import org.apache.flink.runtime.event.WatermarkEvent; import org.apache.flink.streaming.api.operators.BoundedOneInput; @@ -98,6 +99,16 @@ public class ProcessOperator<IN, OUT> outputCollector = getOutputCollector(); nonPartitionedContext = getNonPartitionedContext(); partitionedContext.setNonPartitionedContext(nonPartitionedContext); + + // Initialize event time extension related ProcessFunction + if (userFunction instanceof ExtractEventTimeProcessFunction) { + ((ExtractEventTimeProcessFunction<IN>) userFunction) + .initEventTimeExtension( + getExecutionConfig(), + partitionedContext.getNonPartitionedContext().getWatermarkManager(), + getProcessingTimeService()); + } + userFunction.open(nonPartitionedContext); } @@ -140,6 +151,7 @@ public class ProcessOperator<IN, OUT> } else { return (r, k) -> { Object oldKey = currentKey(); + setCurrentKey(k); try { r.run(); } finally { diff --git a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/extension/eventtime/functions/ExtractEventTimeProcessFunctionTest.java b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/extension/eventtime/functions/ExtractEventTimeProcessFunctionTest.java new file mode 100644 index 00000000000..1de5040e519 --- /dev/null +++ b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/extension/eventtime/functions/ExtractEventTimeProcessFunctionTest.java @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.datastream.impl.extension.eventtime.functions; + +import org.apache.flink.api.common.watermark.BoolWatermark; +import org.apache.flink.api.common.watermark.LongWatermark; +import org.apache.flink.api.common.watermark.WatermarkDeclaration; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.datastream.api.extension.eventtime.EventTimeExtension; +import org.apache.flink.datastream.api.extension.eventtime.strategy.EventTimeExtractor; +import org.apache.flink.datastream.api.extension.eventtime.strategy.EventTimeWatermarkStrategy; +import org.apache.flink.datastream.impl.operators.ProcessOperator; +import org.apache.flink.runtime.event.WatermarkEvent; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.watermark.WatermarkUtils; +import org.apache.flink.util.InstantiationUtil; + +import org.junit.jupiter.api.Test; + +import java.time.Duration; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.stream.Collectors; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for {@link ExtractEventTimeProcessFunction}. */ +public class ExtractEventTimeProcessFunctionTest { + + @Test + void testDoNotGenerateEventTimeWatermark() throws Exception { + EventTimeWatermarkStrategy<Tuple2<Long, String>> watermarkStartegy = + new EventTimeWatermarkStrategy<>( + (EventTimeExtractor<Tuple2<Long, String>>) event -> event.f0, + EventTimeWatermarkStrategy.EventTimeWatermarkGenerateMode.NO_WATERMARK, + Duration.ZERO, + Duration.ZERO, + Duration.ZERO); + OneInputStreamOperatorTestHarness<Tuple2<Long, String>, Tuple2<Long, String>> + operatorTestHarness = getOperatorTestHarness(watermarkStartegy); + + operatorTestHarness.processElement(new StreamRecord<>(new Tuple2<>(12345678L, "hello"))); + checkOutputEventTimeWatermarks(operatorTestHarness.getOutput()); + operatorTestHarness.processElement(new StreamRecord<>(new Tuple2<>(12345679L, "hello"))); + checkOutputEventTimeWatermarks(operatorTestHarness.getOutput()); + + operatorTestHarness.close(); + } + + @Test + void testGenerateEventTimeWatermarkPerEvent() throws Exception { + EventTimeWatermarkStrategy<Tuple2<Long, String>> watermarkStartegy = + new EventTimeWatermarkStrategy<>( + (EventTimeExtractor<Tuple2<Long, String>>) event -> event.f0, + EventTimeWatermarkStrategy.EventTimeWatermarkGenerateMode.PER_EVENT, + Duration.ZERO, + Duration.ZERO, + Duration.ZERO); + OneInputStreamOperatorTestHarness<Tuple2<Long, String>, Tuple2<Long, String>> + operatorTestHarness = getOperatorTestHarness(watermarkStartegy); + + operatorTestHarness.processElement(new StreamRecord<>(new Tuple2<>(12345678L, "hello"))); + checkOutputEventTimeWatermarks(operatorTestHarness.getOutput(), 12345678L); + + operatorTestHarness.processElement(new StreamRecord<>(new Tuple2<>(12345679L, "hello"))); + checkOutputEventTimeWatermarks(operatorTestHarness.getOutput(), 12345678L, 12345679L); + + operatorTestHarness.close(); + } + + @Test + void testGenerateEventTimeWatermarkPeriodic() throws Exception { + EventTimeWatermarkStrategy<Tuple2<Long, String>> watermarkStartegy = + new EventTimeWatermarkStrategy<>( + (EventTimeExtractor<Tuple2<Long, String>>) event -> event.f0, + EventTimeWatermarkStrategy.EventTimeWatermarkGenerateMode.PERIODIC, + Duration.ofMillis(200), + Duration.ZERO, + Duration.ZERO); + OneInputStreamOperatorTestHarness<Tuple2<Long, String>, Tuple2<Long, String>> + operatorTestHarness = getOperatorTestHarness(watermarkStartegy); + + operatorTestHarness.getProcessingTimeService().advance(200); + checkOutputEventTimeWatermarks(operatorTestHarness.getOutput()); + + operatorTestHarness.processElement(new StreamRecord<>(new Tuple2<>(12345678L, "hello"))); + operatorTestHarness.getProcessingTimeService().advance(200); + checkOutputEventTimeWatermarks(operatorTestHarness.getOutput(), 12345678L); + + operatorTestHarness.processElement(new StreamRecord<>(new Tuple2<>(12345679L, "hello"))); + operatorTestHarness.getProcessingTimeService().advance(200); + checkOutputEventTimeWatermarks(operatorTestHarness.getOutput(), 12345678L, 12345679L); + + operatorTestHarness.getProcessingTimeService().advance(1000); + checkOutputEventTimeWatermarks(operatorTestHarness.getOutput(), 12345678L, 12345679L); + + operatorTestHarness.close(); + } + + @Test + void testGenerateEventTimeWatermarkWithOutOfOrderTime() throws Exception { + EventTimeWatermarkStrategy<Tuple2<Long, String>> watermarkStartegy = + new EventTimeWatermarkStrategy<>( + (EventTimeExtractor<Tuple2<Long, String>>) event -> event.f0, + EventTimeWatermarkStrategy.EventTimeWatermarkGenerateMode.PER_EVENT, + Duration.ZERO, + Duration.ZERO, + Duration.ofMillis(100)); + OneInputStreamOperatorTestHarness<Tuple2<Long, String>, Tuple2<Long, String>> + operatorTestHarness = getOperatorTestHarness(watermarkStartegy); + + operatorTestHarness.processElement(new StreamRecord<>(new Tuple2<>(12345678L, "hello"))); + checkOutputEventTimeWatermarks(operatorTestHarness.getOutput(), 12345678L - 100); + + operatorTestHarness.processElement(new StreamRecord<>(new Tuple2<>(12345679L, "hello"))); + checkOutputEventTimeWatermarks( + operatorTestHarness.getOutput(), 12345678L - 100, 12345679L - 100); + + operatorTestHarness.close(); + } + + @Test + void testGenerateIdleStatusWatermark() throws Exception { + EventTimeWatermarkStrategy<Tuple2<Long, String>> watermarkStartegy = + new EventTimeWatermarkStrategy<>( + (EventTimeExtractor<Tuple2<Long, String>>) event -> event.f0, + EventTimeWatermarkStrategy.EventTimeWatermarkGenerateMode.PER_EVENT, + Duration.ZERO, + Duration.ofMillis(200), + Duration.ZERO); + OneInputStreamOperatorTestHarness<Tuple2<Long, String>, Tuple2<Long, String>> + operatorTestHarness = getOperatorTestHarness(watermarkStartegy); + + operatorTestHarness.processElement(new StreamRecord<>(new Tuple2<>(12345678L, "hello"))); + checkOutputIdleStatusWatermarks(operatorTestHarness.getOutput()); + checkOutputEventTimeWatermarks(operatorTestHarness.getOutput(), 12345678L); + + // simulate time advancing multiple times and thus triggering the timer multiple times, + // otherwise {@link ExtractEventTimeProcessFunction.onProcessingTime} will get the wrong + // time when judging the idle in the timer + operatorTestHarness.getProcessingTimeService().advance(200); + operatorTestHarness.getProcessingTimeService().advance(200); + operatorTestHarness.getProcessingTimeService().advance(200); + operatorTestHarness.getProcessingTimeService().advance(200); + operatorTestHarness.getProcessingTimeService().advance(200); + checkOutputIdleStatusWatermarks(operatorTestHarness.getOutput(), true); + + operatorTestHarness.processElement(new StreamRecord<>(new Tuple2<>(12345679L, "hello"))); + checkOutputIdleStatusWatermarks(operatorTestHarness.getOutput(), true, false); + checkOutputEventTimeWatermarks(operatorTestHarness.getOutput(), 12345678L, 12345679L); + + operatorTestHarness.close(); + } + + private OneInputStreamOperatorTestHarness<Tuple2<Long, String>, Tuple2<Long, String>> + getOperatorTestHarness( + EventTimeWatermarkStrategy<Tuple2<Long, String>> watermarkStrategy) + throws Exception { + ExtractEventTimeProcessFunction<Tuple2<Long, String>> processFunction = + new ExtractEventTimeProcessFunction<>(watermarkStrategy); + ProcessOperator<Tuple2<Long, String>, Tuple2<Long, String>> processOperator = + new ProcessOperator<>(processFunction); + OneInputStreamOperatorTestHarness<Tuple2<Long, String>, Tuple2<Long, String>> testHarness = + new OneInputStreamOperatorTestHarness<>(processOperator); + Set<WatermarkDeclaration> watermarkDeclarations = + (Set<WatermarkDeclaration>) processFunction.declareWatermarks(); + byte[] serializedWatermarkDeclarations = + InstantiationUtil.serializeObject( + WatermarkUtils.convertToInternalWatermarkDeclarations( + watermarkDeclarations)); + testHarness.getStreamConfig().setWatermarkDeclarations(serializedWatermarkDeclarations); + testHarness.open(); + return testHarness; + } + + private void checkOutputEventTimeWatermarks( + ConcurrentLinkedQueue<Object> output, Long... expectedEventTimeWatermarks) { + List<Long> actualEventTimeWatermarks = + output.stream() + .filter( + object -> + object instanceof WatermarkEvent + && EventTimeExtension.isEventTimeWatermark( + ((WatermarkEvent) object).getWatermark())) + .map( + watermarkEvent -> + ((LongWatermark) + ((WatermarkEvent) watermarkEvent) + .getWatermark()) + .getValue()) + .collect(Collectors.toList()); + assertThat(actualEventTimeWatermarks).containsExactly(expectedEventTimeWatermarks); + } + + private void checkOutputIdleStatusWatermarks( + ConcurrentLinkedQueue<Object> output, Boolean... expectedIdleStatusWatermarks) { + List<Boolean> actualEventTimeWatermarks = + output.stream() + .filter( + object -> + object instanceof WatermarkEvent + && EventTimeExtension.isIdleStatusWatermark( + ((WatermarkEvent) object).getWatermark())) + .map( + watermarkEvent -> + ((BoolWatermark) + ((WatermarkEvent) watermarkEvent) + .getWatermark()) + .getValue()) + .collect(Collectors.toList()); + assertThat(actualEventTimeWatermarks).containsExactly(expectedIdleStatusWatermarks); + } +}