This is an automated email from the ASF dual-hosted git repository. leiyanfei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 4a0c2e40506 [FLINK-37521][runtime] Implement async state version of KeyedCoProcessOperator (#26328) 4a0c2e40506 is described below commit 4a0c2e40506558e877b6b4188ab2ea5218c908a8 Author: Yanfei Lei <fredia...@gmail.com> AuthorDate: Thu Apr 3 10:27:45 2025 +0800 [FLINK-37521][runtime] Implement async state version of KeyedCoProcessOperator (#26328) --- .../DeclaringAsyncKeyedCoProcessFunction.java | 130 ++++ .../AbstractAsyncStateStreamOperator.java | 18 +- .../{ => co}/AsyncIntervalJoinOperator.java | 8 +- .../operators/co/AsyncKeyedCoProcessOperator.java | 238 +++++++ ...ncKeyedCoProcessOperatorWithWatermarkDelay.java | 48 ++ .../streaming/api/datastream/ConnectedStreams.java | 38 +- .../streaming/api/datastream/KeyedStream.java | 2 +- .../transformations/TwoInputTransformation.java | 11 + .../AbstractAsyncStateStreamOperatorTest.java | 7 +- .../operators/AsyncIntervalJoinOperatorTest.java | 1 + .../operators/AsyncKeyedCoProcessOperatorTest.java | 746 +++++++++++++++++++++ .../nodes/exec/stream/StreamExecIntervalJoin.java | 4 +- 12 files changed, 1236 insertions(+), 15 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/functions/DeclaringAsyncKeyedCoProcessFunction.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/functions/DeclaringAsyncKeyedCoProcessFunction.java new file mode 100644 index 00000000000..ab27c0095d3 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/functions/DeclaringAsyncKeyedCoProcessFunction.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.asyncprocessing.functions; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.runtime.asyncprocessing.declare.DeclarationContext; +import org.apache.flink.runtime.asyncprocessing.declare.DeclarationException; +import org.apache.flink.streaming.api.TimeDomain; +import org.apache.flink.streaming.api.TimerService; +import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction; +import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction; +import org.apache.flink.util.Collector; +import org.apache.flink.util.function.ThrowingConsumer; + +/** + * A function that processes elements of two keyed streams and produces a single output stream. + * + * <p>The function will be called for every element in the input streams and can produce zero or + * more output elements. Contrary to the {@link CoFlatMapFunction}, this function can also query the + * time (both event and processing) and set timers, through the provided {@link Context}. When + * reacting to the firing of timers the function can emit yet more elements. + * + * <p>An example use case for connected streams is the application of a set of rules that change + * over time ({@code stream A}) to the elements contained in another stream (stream {@code B}). The + * rules contained in {@code stream A} can be stored in the state and wait for new elements to + * arrive on {@code stream B}. Upon reception of a new element on {@code stream B}, the function can + * apply the previously stored rules to the element and emit a result, and/or register a timer that + * will trigger an action in the future. + * + * @param <K> Type of the key. + * @param <IN1> Type of the first input. + * @param <IN2> Type of the second input. + * @param <OUT> Output type. + */ +@Internal +public abstract class DeclaringAsyncKeyedCoProcessFunction<K, IN1, IN2, OUT> + extends KeyedCoProcessFunction<K, IN1, IN2, OUT> { + + private static final long serialVersionUID = 1L; + + /** Override and finalize this method. Please use {@link #declareProcess1} instead. */ + @Override + public final void processElement1(IN1 value, Context ctx, Collector<OUT> out) throws Exception { + throw new IllegalAccessException("This method is replaced by declareProcess1."); + } + + /** Override and finalize this method. Please use {@link #declareProcess2} instead. */ + @Override + public final void processElement2(IN2 value, Context ctx, Collector<OUT> out) throws Exception { + throw new IllegalAccessException("This method is replaced by declareProcess2."); + } + + /** Override and finalize this method. Please use {@link #declareOnTimer} instead. */ + public final void onTimer(long timestamp, OnTimerContext ctx, Collector<OUT> out) + throws Exception { + throw new IllegalAccessException("This method is replaced by declareOnTimer."); + } + + /** + * Declaring variables before {@link #declareProcess1} and {@link #declareProcess2} and {@link + * #declareOnTimer}. + */ + public void declareVariables(DeclarationContext context) {} + + /** + * Declare a process for one element from the first of the connected streams. + * + * <p>This function can output zero or more elements using the {@link Collector} parameter and + * also update internal state or set timers using the {@link Context} parameter. + * + * @param context the context that provides useful methods to define named callbacks. + * @param ctx A {@link Context} that allows querying the timestamp of the element and getting a + * {@link TimerService} for registering timers and querying the time. The context is only + * valid during the invocation of this method, do not store it. + * @param out The collector for returning result values. + * @return the whole processing logic just like {@code processElement}. + */ + public abstract ThrowingConsumer<IN1, Exception> declareProcess1( + DeclarationContext context, Context ctx, Collector<OUT> out) + throws DeclarationException; + + /** + * Declare a process for one element from the second of the connected streams. + * + * <p>This function can output zero or more elements using the {@link Collector} parameter and + * also update internal state or set timers using the {@link Context} parameter. + * + * @param context the context that provides useful methods to define named callbacks. + * @param ctx A {@link Context} that allows querying the timestamp of the element and getting a + * {@link TimerService} for registering timers and querying the time. The context is only + * valid during the invocation of this method, do not store it. + * @param out The collector for returning result values. + * @return the whole processing logic just like {@code processElement}. + */ + public abstract ThrowingConsumer<IN2, Exception> declareProcess2( + DeclarationContext context, Context ctx, Collector<OUT> out) + throws DeclarationException; + + /** + * Declare a procedure which is called when a timer set using {@link TimerService} fires. + * + * @param context the context that provides useful methods to define named callbacks. + * @param ctx An {@link OnTimerContext} that allows querying the timestamp, the {@link + * TimeDomain}, and the key of the firing timer and getting a {@link TimerService} for + * registering timers and querying the time. The context is only valid during the invocation + * of this method, do not store it. + * @param out The processor for processing timestamps. + */ + public ThrowingConsumer<Long, Exception> declareOnTimer( + DeclarationContext context, OnTimerContext ctx, Collector<OUT> out) + throws DeclarationException { + return (t) -> {}; + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java index df79c512a1d..0c702ed483e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java @@ -383,12 +383,18 @@ public abstract class AbstractAsyncStateStreamOperator<OUT> extends AbstractStre } /** - * A hook that will be invoked after finishing advancing the watermark. It is not recommended to - * perform async state here. Only some synchronous logic is suggested. + * A hook that will be invoked after finishing advancing the watermark and right before the + * watermark being emitting downstream. Here is a chance for customization of the emitting + * watermark. It is not recommended to perform async state here. Only some synchronous logic is + * suggested. * * @param watermark the advanced watermark. + * @return the watermark that should be emitted to downstream. Null if there is no need for + * following emitting. */ - public void postProcessWatermark(Watermark watermark) throws Exception {} + public Watermark postProcessWatermark(Watermark watermark) throws Exception { + return watermark; + } /** * Process a watermark when receiving it. Do not override this method since the async processing @@ -425,8 +431,10 @@ public abstract class AbstractAsyncStateStreamOperator<OUT> extends AbstractStre }, () -> { if (watermarkRef.get() != null) { - output.emitWatermark(watermarkRef.get()); - postProcessWatermark(watermarkRef.get()); + Watermark postProcessWatermark = postProcessWatermark(watermarkRef.get()); + if (postProcessWatermark != null) { + output.emitWatermark(postProcessWatermark); + } } }); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AsyncIntervalJoinOperator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/co/AsyncIntervalJoinOperator.java similarity index 97% rename from flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AsyncIntervalJoinOperator.java rename to flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/co/AsyncIntervalJoinOperator.java index 1480fc0e4cd..9eb54a23a5a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AsyncIntervalJoinOperator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/co/AsyncIntervalJoinOperator.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.flink.runtime.asyncprocessing.operators; +package org.apache.flink.runtime.asyncprocessing.operators.co; import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.VisibleForTesting; @@ -28,6 +28,8 @@ import org.apache.flink.api.common.typeutils.base.ListSerializer; import org.apache.flink.api.common.typeutils.base.LongSerializer; import org.apache.flink.api.common.typeutils.base.StringSerializer; import org.apache.flink.runtime.asyncprocessing.declare.DeclaredVariable; +import org.apache.flink.runtime.asyncprocessing.operators.AbstractAsyncStateUdfStreamOperator; +import org.apache.flink.runtime.asyncprocessing.operators.TimestampedCollectorWithDeclaredVariable; import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction; import org.apache.flink.streaming.api.operators.InternalTimer; import org.apache.flink.streaming.api.operators.InternalTimerService; @@ -417,12 +419,12 @@ public class AsyncIntervalJoinOperator<K, T1, T2, OUT> } @VisibleForTesting - MapState<Long, List<IntervalJoinOperator.BufferEntry<T1>>> getLeftBuffer() { + public MapState<Long, List<IntervalJoinOperator.BufferEntry<T1>>> getLeftBuffer() { return leftBuffer; } @VisibleForTesting - MapState<Long, List<IntervalJoinOperator.BufferEntry<T2>>> getRightBuffer() { + public MapState<Long, List<IntervalJoinOperator.BufferEntry<T2>>> getRightBuffer() { return rightBuffer; } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/co/AsyncKeyedCoProcessOperator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/co/AsyncKeyedCoProcessOperator.java new file mode 100644 index 00000000000..03454e22709 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/co/AsyncKeyedCoProcessOperator.java @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.asyncprocessing.operators.co; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.runtime.asyncprocessing.declare.DeclarationContext; +import org.apache.flink.runtime.asyncprocessing.declare.DeclaredVariable; +import org.apache.flink.runtime.asyncprocessing.functions.DeclaringAsyncKeyedCoProcessFunction; +import org.apache.flink.runtime.asyncprocessing.operators.AbstractAsyncStateUdfStreamOperator; +import org.apache.flink.runtime.asyncprocessing.operators.TimestampedCollectorWithDeclaredVariable; +import org.apache.flink.runtime.state.VoidNamespace; +import org.apache.flink.runtime.state.VoidNamespaceSerializer; +import org.apache.flink.streaming.api.SimpleTimerService; +import org.apache.flink.streaming.api.TimeDomain; +import org.apache.flink.streaming.api.TimerService; +import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction; +import org.apache.flink.streaming.api.operators.InternalTimer; +import org.apache.flink.streaming.api.operators.InternalTimerService; +import org.apache.flink.streaming.api.operators.Triggerable; +import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.OutputTag; +import org.apache.flink.util.function.ThrowingConsumer; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * A {@link org.apache.flink.streaming.api.operators.StreamOperator} for executing keyed {@link + * KeyedCoProcessFunction KeyedCoProcessFunction}. + */ +@Internal +public class AsyncKeyedCoProcessOperator<K, IN1, IN2, OUT> + extends AbstractAsyncStateUdfStreamOperator<OUT, KeyedCoProcessFunction<K, IN1, IN2, OUT>> + implements TwoInputStreamOperator<IN1, IN2, OUT>, Triggerable<K, VoidNamespace> { + + private static final long serialVersionUID = 1L; + + // Shared timestamp variable for collector, context and onTimerContext. + private transient DeclaredVariable<Long> sharedTimestamp; + + private transient TimestampedCollectorWithDeclaredVariable<OUT> collector; + + private transient ContextImpl<K, IN1, IN2, OUT> context; + + private transient OnTimerContextImpl<K, IN1, IN2, OUT> onTimerContext; + + private transient ThrowingConsumer<IN1, Exception> processor1; + private transient ThrowingConsumer<IN2, Exception> processor2; + private transient ThrowingConsumer<Long, Exception> timerProcessor; + + public AsyncKeyedCoProcessOperator( + KeyedCoProcessFunction<K, IN1, IN2, OUT> keyedCoProcessFunction) { + super(keyedCoProcessFunction); + } + + @Override + @SuppressWarnings({"unchecked", "rawtypes"}) + public void open() throws Exception { + super.open(); + sharedTimestamp = + declarationContext.declareVariable( + LongSerializer.INSTANCE, + "_AsyncCoKeyedProcessOperator$sharedTimestamp", + null); + + collector = new TimestampedCollectorWithDeclaredVariable<>(output, sharedTimestamp); + + InternalTimerService<VoidNamespace> internalTimerService = + getInternalTimerService("user-timers", VoidNamespaceSerializer.INSTANCE, this); + + TimerService timerService = new SimpleTimerService(internalTimerService); + + context = new ContextImpl<>(userFunction, timerService, sharedTimestamp); + onTimerContext = new OnTimerContextImpl<>(userFunction, timerService, declarationContext); + if (userFunction instanceof DeclaringAsyncKeyedCoProcessFunction) { + DeclaringAsyncKeyedCoProcessFunction declaringFunction = + (DeclaringAsyncKeyedCoProcessFunction) userFunction; + declaringFunction.declareVariables(declarationContext); + processor1 = declaringFunction.declareProcess1(declarationContext, context, collector); + processor2 = declaringFunction.declareProcess2(declarationContext, context, collector); + timerProcessor = + declaringFunction.declareOnTimer(declarationContext, onTimerContext, collector); + } else { + processor1 = (in) -> userFunction.processElement1(in, context, collector); + processor2 = (in) -> userFunction.processElement2(in, context, collector); + timerProcessor = (in) -> userFunction.onTimer(in, onTimerContext, collector); + } + } + + @Override + public void processElement1(StreamRecord<IN1> element) throws Exception { + collector.setTimestamp(element); + processor1.accept(element.getValue()); + } + + @Override + public void processElement2(StreamRecord<IN2> element) throws Exception { + collector.setTimestamp(element); + processor2.accept(element.getValue()); + } + + @Override + public void onEventTime(InternalTimer<K, VoidNamespace> timer) throws Exception { + collector.setAbsoluteTimestamp(timer.getTimestamp()); + invokeUserFunction(TimeDomain.EVENT_TIME, timer); + } + + @Override + public void onProcessingTime(InternalTimer<K, VoidNamespace> timer) throws Exception { + collector.eraseTimestamp(); + invokeUserFunction(TimeDomain.PROCESSING_TIME, timer); + } + + private void invokeUserFunction(TimeDomain timeDomain, InternalTimer<K, VoidNamespace> timer) + throws Exception { + onTimerContext.setTime(timer.getTimestamp(), timeDomain); + timerProcessor.accept(timer.getTimestamp()); + } + + public class ContextImpl<K, IN1, IN2, OUT> + extends KeyedCoProcessFunction<K, IN1, IN2, OUT>.Context { + + private final TimerService timerService; + + private final DeclaredVariable<Long> timestamp; + + ContextImpl( + KeyedCoProcessFunction<K, IN1, IN2, OUT> function, + TimerService timerService, + DeclaredVariable<Long> timestamp) { + function.super(); + this.timerService = checkNotNull(timerService); + this.timestamp = timestamp; + } + + @Override + public Long timestamp() { + return timestamp.get(); + } + + @Override + public TimerService timerService() { + return timerService; + } + + @Override + public <X> void output(OutputTag<X> outputTag, X value) { + if (outputTag == null) { + throw new IllegalArgumentException("OutputTag must not be null."); + } + + output.collect(outputTag, new StreamRecord<>(value, timestamp.get())); + } + + @Override + public K getCurrentKey() { + return (K) AsyncKeyedCoProcessOperator.this.getCurrentKey(); + } + } + + private class OnTimerContextImpl<K, IN1, IN2, OUT> + extends KeyedCoProcessFunction<K, IN1, IN2, OUT>.OnTimerContext { + + private final TimerService timerService; + + private final DeclaredVariable<String> timeDomain; + + private final DeclaredVariable<Long> timestamp; + + OnTimerContextImpl( + KeyedCoProcessFunction<K, IN1, IN2, OUT> function, + TimerService timerService, + DeclarationContext declarationContext) { + function.super(); + this.timerService = checkNotNull(timerService); + this.timeDomain = + declarationContext.declareVariable( + StringSerializer.INSTANCE, "_OnTimerContextImpl$timeDomain", null); + this.timestamp = + declarationContext.declareVariable( + LongSerializer.INSTANCE, "_OnTimerContextImpl$timestamp", null); + } + + public void setTime(long time, TimeDomain one) { + timestamp.set(time); + timeDomain.set(one.name()); + } + + @Override + public Long timestamp() { + checkState(timestamp.get() != null); + return timestamp.get(); + } + + @Override + public TimerService timerService() { + return timerService; + } + + @Override + public <X> void output(OutputTag<X> outputTag, X value) { + if (outputTag == null) { + throw new IllegalArgumentException("OutputTag must not be null."); + } + + output.collect(outputTag, new StreamRecord<>(value, timestamp())); + } + + @Override + public TimeDomain timeDomain() { + checkState(timeDomain.get() != null); + return TimeDomain.valueOf(timeDomain.get()); + } + + @Override + public K getCurrentKey() { + return (K) AsyncKeyedCoProcessOperator.this.getCurrentKey(); + } + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/co/AsyncKeyedCoProcessOperatorWithWatermarkDelay.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/co/AsyncKeyedCoProcessOperatorWithWatermarkDelay.java new file mode 100644 index 00000000000..ace1297a0a9 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/co/AsyncKeyedCoProcessOperatorWithWatermarkDelay.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.asyncprocessing.operators.co; + +import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction; +import org.apache.flink.streaming.api.operators.co.KeyedCoProcessOperator; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.util.Preconditions; + +/** A {@link KeyedCoProcessOperator} that supports holding back watermarks with a static delay. */ +public class AsyncKeyedCoProcessOperatorWithWatermarkDelay<K, IN1, IN2, OUT> + extends AsyncKeyedCoProcessOperator<K, IN1, IN2, OUT> { + private static final long serialVersionUID = 1L; + + private final long watermarkDelay; + + public AsyncKeyedCoProcessOperatorWithWatermarkDelay( + KeyedCoProcessFunction<K, IN1, IN2, OUT> keyedCoProcessFunction, long watermarkDelay) { + super(keyedCoProcessFunction); + Preconditions.checkArgument( + watermarkDelay >= 0, "The watermark delay should be non-negative."); + this.watermarkDelay = watermarkDelay; + } + + @Override + public Watermark postProcessWatermark(Watermark watermark) throws Exception { + if (watermarkDelay == 0) { + return watermark; + } else { + return new Watermark(watermark.getTimestamp() - watermarkDelay); + } + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java index 3c9347c9a25..45296ddfad8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java @@ -17,6 +17,7 @@ package org.apache.flink.streaming.api.datastream; +import org.apache.flink.annotation.Experimental; import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.Public; import org.apache.flink.annotation.PublicEvolving; @@ -26,6 +27,7 @@ import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.runtime.asyncprocessing.operators.co.AsyncKeyedCoProcessOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction; import org.apache.flink.streaming.api.functions.co.CoMapFunction; @@ -69,12 +71,20 @@ public class ConnectedStreams<IN1, IN2> { protected final StreamExecutionEnvironment environment; protected final DataStream<IN1> inputStream1; protected final DataStream<IN2> inputStream2; + protected boolean isEnableAsyncState; protected ConnectedStreams( StreamExecutionEnvironment env, DataStream<IN1> input1, DataStream<IN2> input2) { this.environment = requireNonNull(env); this.inputStream1 = requireNonNull(input1); this.inputStream2 = requireNonNull(input2); + if ((inputStream1 instanceof KeyedStream) && (inputStream2 instanceof KeyedStream)) { + this.isEnableAsyncState = + ((KeyedStream) inputStream1).isEnableAsyncState() + && ((KeyedStream) inputStream2).isEnableAsyncState(); + } else { + this.isEnableAsyncState = false; + } } public StreamExecutionEnvironment getExecutionEnvironment() { @@ -439,7 +449,12 @@ public class ConnectedStreams<IN1, IN2> { TwoInputStreamOperator<IN1, IN2, R> operator; if ((inputStream1 instanceof KeyedStream) && (inputStream2 instanceof KeyedStream)) { - operator = new KeyedCoProcessOperator<>(inputStream1.clean(keyedCoProcessFunction)); + operator = + isEnableAsyncState + ? new AsyncKeyedCoProcessOperator<>( + inputStream1.clean(keyedCoProcessFunction)) + : new KeyedCoProcessOperator<>( + inputStream1.clean(keyedCoProcessFunction)); } else { throw new UnsupportedOperationException( "KeyedCoProcessFunction can only be used " @@ -523,4 +538,25 @@ public class ConnectedStreams<IN1, IN2> { return returnStream; } + + /** + * Enable the async state processing for following keyed processing function on connected + * streams. This also requires only State V2 APIs are used in the function. + * + * @return the configured ConnectedStreams itself. + */ + @Experimental + public ConnectedStreams<IN1, IN2> enableAsyncState() { + if ((inputStream1 instanceof KeyedStream) && (inputStream2 instanceof KeyedStream)) { + ((KeyedStream<?, ?>) inputStream1).enableAsyncState(); + ((KeyedStream<?, ?>) inputStream2).enableAsyncState(); + this.isEnableAsyncState = true; + } else { + throw new UnsupportedOperationException( + "The connected streams do not support async state, " + + "please ensure that two input streams of your connected streams are " + + "keyed stream(not behind a keyBy())."); + } + return this; + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java index dd4818fe30f..f082f11b8e6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java @@ -35,8 +35,8 @@ import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; import org.apache.flink.api.java.typeutils.PojoTypeInfo; import org.apache.flink.api.java.typeutils.TupleTypeInfoBase; import org.apache.flink.api.java.typeutils.TypeExtractor; -import org.apache.flink.runtime.asyncprocessing.operators.AsyncIntervalJoinOperator; import org.apache.flink.runtime.asyncprocessing.operators.AsyncStreamFlatMap; +import org.apache.flink.runtime.asyncprocessing.operators.co.AsyncIntervalJoinOperator; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction; import org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator; diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/transformations/TwoInputTransformation.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/transformations/TwoInputTransformation.java index 007f908c809..43566988b1e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/transformations/TwoInputTransformation.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/transformations/TwoInputTransformation.java @@ -27,6 +27,7 @@ import org.apache.flink.streaming.api.operators.ChainingStrategy; import org.apache.flink.streaming.api.operators.SimpleOperatorFactory; import org.apache.flink.streaming.api.operators.StreamOperatorFactory; import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; +import org.apache.flink.streaming.runtime.operators.asyncprocessing.AsyncStateProcessingOperator; import java.util.ArrayList; import java.util.List; @@ -239,4 +240,14 @@ public class TwoInputTransformation<IN1, IN2, OUT> extends PhysicalTransformatio public boolean isInternalSorterSupported() { return operatorFactory.getOperatorAttributes().isInternalSorterSupported(); } + + @Override + public void enableAsyncState() { + TwoInputStreamOperator<IN1, IN2, OUT> operator = + (TwoInputStreamOperator<IN1, IN2, OUT>) + ((SimpleOperatorFactory<OUT>) operatorFactory).getOperator(); + if (!(operator instanceof AsyncStateProcessingOperator)) { + super.enableAsyncState(); + } + } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorTest.java index bb234af0eef..e5452f51656 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorTest.java @@ -371,8 +371,8 @@ public class AbstractAsyncStateStreamOperatorTest { expectedOutput.add(new StreamRecord<>(1002L)); expectedOutput.add(new StreamRecord<>(1L)); expectedOutput.add(new StreamRecord<>(3L)); - expectedOutput.add(new Watermark(3L)); expectedOutput.add(new StreamRecord<>(103L)); + expectedOutput.add(new Watermark(3L)); testHarness.processWatermark1(new Watermark(4L)); testHarness.processWatermark2(new Watermark(4L)); expectedOutput.add(new StreamRecord<>(1004L)); @@ -380,8 +380,8 @@ public class AbstractAsyncStateStreamOperatorTest { testHarness.processWatermark2(new Watermark(5L)); expectedOutput.add(new StreamRecord<>(1005L)); expectedOutput.add(new StreamRecord<>(4L)); - expectedOutput.add(new Watermark(6L)); expectedOutput.add(new StreamRecord<>(106L)); + expectedOutput.add(new Watermark(6L)); TestHarnessUtil.assertOutputEquals( "Output was not correct", expectedOutput, testHarness.getOutput()); @@ -690,10 +690,11 @@ public class AbstractAsyncStateStreamOperatorTest { } @Override - public void postProcessWatermark(Watermark watermark) throws Exception { + public Watermark postProcessWatermark(Watermark watermark) throws Exception { if (postProcessFunction != null) { postProcessFunction.accept(watermark); } + return watermark; } @Override diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/operators/AsyncIntervalJoinOperatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/operators/AsyncIntervalJoinOperatorTest.java index dc08ed7f76c..2c250380969 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/operators/AsyncIntervalJoinOperatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/operators/AsyncIntervalJoinOperatorTest.java @@ -25,6 +25,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.asyncprocessing.operators.co.AsyncIntervalJoinOperator; import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction; import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; diff --git a/flink-streaming-java/src/test/java/org/apache/flink/asyncprocessing/operators/AsyncKeyedCoProcessOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/asyncprocessing/operators/AsyncKeyedCoProcessOperatorTest.java new file mode 100644 index 00000000000..d7113471c35 --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/asyncprocessing/operators/AsyncKeyedCoProcessOperatorTest.java @@ -0,0 +1,746 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.asyncprocessing.operators; + +import org.apache.flink.api.common.state.v2.ValueState; +import org.apache.flink.api.common.state.v2.ValueStateDescriptor; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.core.state.StateFutureUtils; +import org.apache.flink.runtime.asyncprocessing.declare.ContextVariable; +import org.apache.flink.runtime.asyncprocessing.declare.DeclarationContext; +import org.apache.flink.runtime.asyncprocessing.declare.DeclarationException; +import org.apache.flink.runtime.asyncprocessing.functions.DeclaringAsyncKeyedCoProcessFunction; +import org.apache.flink.runtime.asyncprocessing.operators.co.AsyncKeyedCoProcessOperator; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.streaming.api.TimeDomain; +import org.apache.flink.streaming.api.TimerService; +import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.TestHarnessUtil; +import org.apache.flink.streaming.util.asyncprocessing.AsyncKeyedTwoInputStreamOperatorTestHarness; +import org.apache.flink.util.Collector; +import org.apache.flink.util.function.ThrowingConsumer; + +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link AsyncKeyedCoProcessOperator}. */ +class AsyncKeyedCoProcessOperatorTest { + + @Test + void testDeclareProcessor() throws Exception { + TestChainDeclarationFunction function = new TestChainDeclarationFunction(); + AsyncKeyedCoProcessOperator<String, Integer, String, String> operator = + new AsyncKeyedCoProcessOperator<>(function); + + AsyncKeyedTwoInputStreamOperatorTestHarness<String, Integer, String, String> testHarness = + AsyncKeyedTwoInputStreamOperatorTestHarness.create( + operator, + new IntToStringKeySelector<>(), + new IdentityKeySelector<>(), + BasicTypeInfo.STRING_TYPE_INFO); + + ArrayList<StreamRecord<String>> expectedOutput = new ArrayList<>(); + + testHarness.open(); + testHarness.processElement1(new StreamRecord<>(5)); + expectedOutput.add(new StreamRecord<>("11")); + assertThat(function.value.get()).isEqualTo(11); + testHarness.processElement2(new StreamRecord<>("6")); + expectedOutput.add(new StreamRecord<>("6")); + assertThat(function.value.get()).isEqualTo(17); + assertThat(testHarness.getOutput()).containsExactly(expectedOutput.toArray()); + } + + @Test + void testTimestampAndWatermarkQuerying() throws Exception { + + AsyncKeyedCoProcessOperator<String, Integer, String, String> operator = + new AsyncKeyedCoProcessOperator<>(new WatermarkQueryingProcessFunction()); + + AsyncKeyedTwoInputStreamOperatorTestHarness<String, Integer, String, String> testHarness = + AsyncKeyedTwoInputStreamOperatorTestHarness.create( + operator, + new IntToStringKeySelector<>(), + new IdentityKeySelector<>(), + BasicTypeInfo.STRING_TYPE_INFO); + + testHarness.setup(); + testHarness.open(); + + testHarness.processWatermark1(new Watermark(17)); + testHarness.processWatermark2(new Watermark(17)); + testHarness.processElement1(new StreamRecord<>(5, 12L)); + + testHarness.processWatermark1(new Watermark(42)); + testHarness.processWatermark2(new Watermark(42)); + testHarness.processElement2(new StreamRecord<>("6", 13L)); + + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); + + expectedOutput.add(new Watermark(17L)); + expectedOutput.add(new StreamRecord<>("5WM:17 TS:12", 12L)); + expectedOutput.add(new Watermark(42L)); + expectedOutput.add(new StreamRecord<>("6WM:42 TS:13", 13L)); + + TestHarnessUtil.assertOutputEquals( + "Output was not correct.", expectedOutput, testHarness.getOutput()); + + testHarness.close(); + } + + @Test + void testTimestampAndProcessingTimeQuerying() throws Exception { + + AsyncKeyedCoProcessOperator<String, Integer, String, String> operator = + new AsyncKeyedCoProcessOperator<>(new ProcessingTimeQueryingProcessFunction()); + + AsyncKeyedTwoInputStreamOperatorTestHarness<String, Integer, String, String> testHarness = + AsyncKeyedTwoInputStreamOperatorTestHarness.create( + operator, + new IntToStringKeySelector<>(), + new IdentityKeySelector<>(), + BasicTypeInfo.STRING_TYPE_INFO); + + testHarness.setup(); + testHarness.open(); + + testHarness.setProcessingTime(17); + testHarness.processElement1(new StreamRecord<>(5)); + + testHarness.setProcessingTime(42); + testHarness.processElement2(new StreamRecord<>("6")); + + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); + + expectedOutput.add(new StreamRecord<>("5PT:17 TS:null")); + expectedOutput.add(new StreamRecord<>("6PT:42 TS:null")); + + TestHarnessUtil.assertOutputEquals( + "Output was not correct.", expectedOutput, testHarness.getOutput()); + + testHarness.close(); + } + + @Test + void testEventTimeTimers() throws Exception { + + AsyncKeyedCoProcessOperator<String, Integer, String, String> operator = + new AsyncKeyedCoProcessOperator<>(new EventTimeTriggeringProcessFunction()); + + AsyncKeyedTwoInputStreamOperatorTestHarness<String, Integer, String, String> testHarness = + AsyncKeyedTwoInputStreamOperatorTestHarness.create( + operator, + new IntToStringKeySelector<>(), + new IdentityKeySelector<>(), + BasicTypeInfo.STRING_TYPE_INFO); + + testHarness.setup(); + testHarness.open(); + + testHarness.processElement1(new StreamRecord<>(17, 42L)); + testHarness.processElement2(new StreamRecord<>("18", 42L)); + + testHarness.processWatermark1(new Watermark(5)); + testHarness.processWatermark2(new Watermark(5)); + + testHarness.processWatermark1(new Watermark(6)); + testHarness.processWatermark2(new Watermark(6)); + + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); + + expectedOutput.add(new StreamRecord<>("INPUT1:17", 42L)); + expectedOutput.add(new StreamRecord<>("INPUT2:18", 42L)); + expectedOutput.add(new StreamRecord<>("17:1777", 5L)); + expectedOutput.add(new Watermark(5L)); + expectedOutput.add(new StreamRecord<>("18:1777", 6L)); + expectedOutput.add(new Watermark(6L)); + + TestHarnessUtil.assertOutputEquals( + "Output was not correct.", expectedOutput, testHarness.getOutput()); + + testHarness.close(); + } + + @Test + void testProcessingTimeTimers() throws Exception { + + AsyncKeyedCoProcessOperator<String, Integer, String, String> operator = + new AsyncKeyedCoProcessOperator<>(new ProcessingTimeTriggeringProcessFunction()); + + AsyncKeyedTwoInputStreamOperatorTestHarness<String, Integer, String, String> testHarness = + AsyncKeyedTwoInputStreamOperatorTestHarness.create( + operator, + new IntToStringKeySelector<>(), + new IdentityKeySelector<>(), + BasicTypeInfo.STRING_TYPE_INFO); + + testHarness.setup(); + testHarness.open(); + + testHarness.processElement1(new StreamRecord<>(17)); + testHarness.processElement2(new StreamRecord<>("18")); + + testHarness.setProcessingTime(5); + testHarness.setProcessingTime(6); + + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); + + expectedOutput.add(new StreamRecord<>("INPUT1:17")); + expectedOutput.add(new StreamRecord<>("INPUT2:18")); + expectedOutput.add(new StreamRecord<>("1777")); + expectedOutput.add(new StreamRecord<>("1777")); + + TestHarnessUtil.assertOutputEquals( + "Output was not correct.", expectedOutput, testHarness.getOutput()); + + testHarness.close(); + } + + /** Verifies that we don't have leakage between different keys. */ + @Test + void testEventTimeTimerWithState() throws Exception { + + AsyncKeyedCoProcessOperator<String, Integer, String, String> operator = + new AsyncKeyedCoProcessOperator<>(new EventTimeTriggeringStatefulProcessFunction()); + + AsyncKeyedTwoInputStreamOperatorTestHarness<String, Integer, String, String> testHarness = + AsyncKeyedTwoInputStreamOperatorTestHarness.create( + operator, + new IntToStringKeySelector<>(), + new IdentityKeySelector<>(), + BasicTypeInfo.STRING_TYPE_INFO); + + testHarness.setup(); + testHarness.open(); + + testHarness.processWatermark1(new Watermark(1)); + testHarness.processWatermark2(new Watermark(1)); + testHarness.processElement1(new StreamRecord<>(17, 0L)); // should set timer for 6 + testHarness.processElement1(new StreamRecord<>(13, 0L)); // should set timer for 6 + + testHarness.processWatermark1(new Watermark(2)); + testHarness.processWatermark2(new Watermark(2)); + testHarness.processElement1(new StreamRecord<>(13, 1L)); // should delete timer + testHarness.processElement2(new StreamRecord<>("42", 1L)); // should set timer for 7 + + testHarness.processWatermark1(new Watermark(6)); + testHarness.processWatermark2(new Watermark(6)); + + testHarness.processWatermark1(new Watermark(7)); + testHarness.processWatermark2(new Watermark(7)); + + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); + + expectedOutput.add(new Watermark(1L)); + expectedOutput.add(new StreamRecord<>("INPUT1:17", 0L)); + expectedOutput.add(new StreamRecord<>("INPUT1:13", 0L)); + expectedOutput.add(new Watermark(2L)); + expectedOutput.add(new StreamRecord<>("INPUT2:42", 1L)); + expectedOutput.add(new StreamRecord<>("STATE:17", 6L)); + expectedOutput.add(new Watermark(6L)); + expectedOutput.add(new StreamRecord<>("STATE:42", 7L)); + expectedOutput.add(new Watermark(7L)); + + TestHarnessUtil.assertOutputEquals( + "Output was not correct.", expectedOutput, testHarness.getOutput()); + + testHarness.close(); + } + + /** Verifies that we don't have leakage between different keys. */ + @Test + void testProcessingTimeTimerWithState() throws Exception { + + AsyncKeyedCoProcessOperator<String, Integer, String, String> operator = + new AsyncKeyedCoProcessOperator<>( + new ProcessingTimeTriggeringStatefulProcessFunction()); + + AsyncKeyedTwoInputStreamOperatorTestHarness<String, Integer, String, String> testHarness = + AsyncKeyedTwoInputStreamOperatorTestHarness.create( + operator, + new IntToStringKeySelector<>(), + new IdentityKeySelector<>(), + BasicTypeInfo.STRING_TYPE_INFO); + + testHarness.setup(); + testHarness.open(); + + testHarness.setProcessingTime(1); + testHarness.processElement1(new StreamRecord<>(17)); // should set timer for 6 + testHarness.processElement1(new StreamRecord<>(13)); // should set timer for 6 + + testHarness.setProcessingTime(2); + testHarness.processElement1(new StreamRecord<>(13)); // should delete timer again + testHarness.processElement2(new StreamRecord<>("42")); // should set timer for 7 + + testHarness.setProcessingTime(6); + testHarness.setProcessingTime(7); + + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); + + expectedOutput.add(new StreamRecord<>("INPUT1:17")); + expectedOutput.add(new StreamRecord<>("INPUT1:13")); + expectedOutput.add(new StreamRecord<>("INPUT2:42")); + expectedOutput.add(new StreamRecord<>("STATE:17")); + expectedOutput.add(new StreamRecord<>("STATE:42")); + + TestHarnessUtil.assertOutputEquals( + "Output was not correct.", expectedOutput, testHarness.getOutput()); + + testHarness.close(); + } + + @Test + void testSnapshotAndRestore() throws Exception { + + AsyncKeyedCoProcessOperator<String, Integer, String, String> operator = + new AsyncKeyedCoProcessOperator<>(new BothTriggeringProcessFunction()); + + AsyncKeyedTwoInputStreamOperatorTestHarness<String, Integer, String, String> testHarness = + AsyncKeyedTwoInputStreamOperatorTestHarness.create( + operator, + new IntToStringKeySelector<>(), + new IdentityKeySelector<>(), + BasicTypeInfo.STRING_TYPE_INFO); + + testHarness.setup(); + testHarness.open(); + + testHarness.processElement1(new StreamRecord<>(5, 12L)); + testHarness.processElement2(new StreamRecord<>("5", 12L)); + + // snapshot and restore from scratch + OperatorSubtaskState snapshot = testHarness.snapshot(0, 0); + + testHarness.close(); + + operator = new AsyncKeyedCoProcessOperator<>(new BothTriggeringProcessFunction()); + + testHarness = + AsyncKeyedTwoInputStreamOperatorTestHarness.create( + operator, + new IntToStringKeySelector<>(), + new IdentityKeySelector<>(), + BasicTypeInfo.STRING_TYPE_INFO); + + testHarness.setup(); + testHarness.initializeState(snapshot); + testHarness.open(); + + testHarness.setProcessingTime(5); + testHarness.processWatermark1(new Watermark(6)); + testHarness.processWatermark2(new Watermark(6)); + + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); + + expectedOutput.add(new StreamRecord<>("PROC:1777")); + expectedOutput.add(new StreamRecord<>("EVENT:1777", 6L)); + expectedOutput.add(new Watermark(6)); + + TestHarnessUtil.assertOutputEquals( + "Output was not correct.", expectedOutput, testHarness.getOutput()); + + testHarness.close(); + } + + @Test + void testGetCurrentKeyFromContext() throws Exception { + AsyncKeyedCoProcessOperator<String, Integer, String, String> operator = + new AsyncKeyedCoProcessOperator<>(new AppendCurrentKeyProcessFunction()); + + AsyncKeyedTwoInputStreamOperatorTestHarness<String, Integer, String, String> testHarness = + AsyncKeyedTwoInputStreamOperatorTestHarness.create( + operator, + new IntToStringKeySelector<>(), + new IdentityKeySelector<>(), + BasicTypeInfo.STRING_TYPE_INFO); + + testHarness.setup(); + testHarness.open(); + + testHarness.processElement1(new StreamRecord<>(5)); + testHarness.processElement1(new StreamRecord<>(6)); + testHarness.processElement2(new StreamRecord<>("hello")); + testHarness.processElement2(new StreamRecord<>("world")); + + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); + + expectedOutput.add(new StreamRecord<>("5,5")); + expectedOutput.add(new StreamRecord<>("6,6")); + expectedOutput.add(new StreamRecord<>("hello,hello")); + expectedOutput.add(new StreamRecord<>("world,world")); + + TestHarnessUtil.assertOutputEquals( + "Output was not correct.", expectedOutput, testHarness.getOutput()); + + testHarness.close(); + } + + private static class IntToStringKeySelector<T> implements KeySelector<Integer, String> { + private static final long serialVersionUID = 1L; + + @Override + public String getKey(Integer value) throws Exception { + return "" + value; + } + } + + private static class IdentityKeySelector<T> implements KeySelector<T, T> { + private static final long serialVersionUID = 1L; + + @Override + public T getKey(T value) throws Exception { + return value; + } + } + + private static class TestChainDeclarationFunction + extends DeclaringAsyncKeyedCoProcessFunction<String, Integer, String, String> { + + final AtomicInteger value = new AtomicInteger(0); + + @Override + public ThrowingConsumer<Integer, Exception> declareProcess1( + DeclarationContext context, + KeyedCoProcessFunction<String, Integer, String, String>.Context ctx, + Collector<String> out) + throws DeclarationException { + ContextVariable<Integer> inputValue = context.declareVariable(null); + return context.<Integer>declareChain() + .thenCompose( + e -> { + if (inputValue.get() == null) { + inputValue.set(e); + } + value.addAndGet(e); + return StateFutureUtils.completedVoidFuture(); + }) + .thenCompose(v -> StateFutureUtils.completedFuture(value.incrementAndGet())) + .withName("adder") + .thenAccept( + (v) -> { + value.addAndGet(inputValue.get()); + out.collect(String.valueOf(value.get())); + }) + .withName("doubler") + .finish(); + } + + @Override + public ThrowingConsumer<String, Exception> declareProcess2( + DeclarationContext context, + KeyedCoProcessFunction<String, Integer, String, String>.Context ctx, + Collector<String> out) + throws DeclarationException { + return context.<String>declareChain() + .thenAccept( + v -> { + out.collect(v); + value.addAndGet(Integer.valueOf(v)); + }) + .withName("pass") + .finish(); + } + } + + private static class WatermarkQueryingProcessFunction + extends KeyedCoProcessFunction<String, Integer, String, String> { + + private static final long serialVersionUID = 1L; + + @Override + public void processElement1(Integer value, Context ctx, Collector<String> out) + throws Exception { + out.collect( + value + + "WM:" + + ctx.timerService().currentWatermark() + + " TS:" + + ctx.timestamp()); + } + + @Override + public void processElement2(String value, Context ctx, Collector<String> out) + throws Exception { + out.collect( + value + + "WM:" + + ctx.timerService().currentWatermark() + + " TS:" + + ctx.timestamp()); + } + + @Override + public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) + throws Exception {} + } + + private static class EventTimeTriggeringProcessFunction + extends KeyedCoProcessFunction<String, Integer, String, String> { + + private static final long serialVersionUID = 1L; + + @Override + public void processElement1(Integer value, Context ctx, Collector<String> out) + throws Exception { + out.collect("INPUT1:" + value); + ctx.timerService().registerEventTimeTimer(5); + } + + @Override + public void processElement2(String value, Context ctx, Collector<String> out) + throws Exception { + out.collect("INPUT2:" + value); + ctx.timerService().registerEventTimeTimer(6); + } + + @Override + public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) + throws Exception { + assertThat(ctx.timeDomain()).isEqualTo(TimeDomain.EVENT_TIME); + out.collect(ctx.getCurrentKey() + ":" + 1777); + } + } + + private static class EventTimeTriggeringStatefulProcessFunction + extends KeyedCoProcessFunction<String, Integer, String, String> { + + private static final long serialVersionUID = 1L; + + private final ValueStateDescriptor<String> state = + new ValueStateDescriptor<>("seen-element", StringSerializer.INSTANCE); + + @Override + public void processElement1(Integer value, Context ctx, Collector<String> out) + throws Exception { + handleValue(value, out, ctx.timerService(), 1); + } + + @Override + public void processElement2(String value, Context ctx, Collector<String> out) + throws Exception { + handleValue(value, out, ctx.timerService(), 2); + } + + private void handleValue( + Object value, Collector<String> out, TimerService timerService, int channel) + throws IOException { + final ValueState<String> state = getRuntimeContext().getState(this.state); + state.asyncValue() + .thenAccept( + v -> { + if (v == null) { + state.asyncUpdate(String.valueOf(value)) + .thenAccept( + VO -> + out.collect( + "INPUT" + channel + ":" + + value)); + timerService.registerEventTimeTimer( + timerService.currentWatermark() + 5); + } else { + state.asyncClear(); + timerService.deleteEventTimeTimer( + timerService.currentWatermark() + 4); + } + }); + } + + @Override + public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) + throws Exception { + assertThat(ctx.timeDomain()).isEqualTo(TimeDomain.EVENT_TIME); + getRuntimeContext() + .getState(state) + .asyncValue() + .thenAccept( + v -> { + out.collect("STATE:" + v); + }); + } + } + + private static class ProcessingTimeTriggeringProcessFunction + extends KeyedCoProcessFunction<String, Integer, String, String> { + + private static final long serialVersionUID = 1L; + + @Override + public void processElement1(Integer value, Context ctx, Collector<String> out) + throws Exception { + out.collect("INPUT1:" + value); + ctx.timerService().registerProcessingTimeTimer(5); + } + + @Override + public void processElement2(String value, Context ctx, Collector<String> out) + throws Exception { + out.collect("INPUT2:" + value); + ctx.timerService().registerProcessingTimeTimer(6); + } + + @Override + public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) + throws Exception { + assertThat(ctx.timeDomain()).isEqualTo(TimeDomain.PROCESSING_TIME); + out.collect("" + 1777); + } + } + + private static class ProcessingTimeQueryingProcessFunction + extends KeyedCoProcessFunction<String, Integer, String, String> { + + private static final long serialVersionUID = 1L; + + @Override + public void processElement1(Integer value, Context ctx, Collector<String> out) + throws Exception { + out.collect( + value + + "PT:" + + ctx.timerService().currentProcessingTime() + + " TS:" + + ctx.timestamp()); + } + + @Override + public void processElement2(String value, Context ctx, Collector<String> out) + throws Exception { + out.collect( + value + + "PT:" + + ctx.timerService().currentProcessingTime() + + " TS:" + + ctx.timestamp()); + } + + @Override + public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) + throws Exception {} + } + + private static class ProcessingTimeTriggeringStatefulProcessFunction + extends KeyedCoProcessFunction<String, Integer, String, String> { + + private static final long serialVersionUID = 1L; + + private final ValueStateDescriptor<String> state = + new ValueStateDescriptor<>("seen-element", StringSerializer.INSTANCE); + + @Override + public void processElement1(Integer value, Context ctx, Collector<String> out) + throws Exception { + handleValue(value, out, ctx.timerService(), 1); + } + + @Override + public void processElement2(String value, Context ctx, Collector<String> out) + throws Exception { + handleValue(value, out, ctx.timerService(), 2); + } + + private void handleValue( + Object value, Collector<String> out, TimerService timerService, int channel) + throws IOException { + final ValueState<String> state = getRuntimeContext().getState(this.state); + state.asyncValue() + .thenAccept( + v -> { + if (v == null) { + state.asyncUpdate(String.valueOf(value)) + .thenAccept( + VO -> + out.collect( + "INPUT" + channel + ":" + + value)); + timerService.registerProcessingTimeTimer( + timerService.currentProcessingTime() + 5); + } else { + state.asyncClear(); + timerService.deleteProcessingTimeTimer( + timerService.currentProcessingTime() + 4); + } + }); + } + + @Override + public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) + throws Exception { + assertThat(ctx.timeDomain()).isEqualTo(TimeDomain.PROCESSING_TIME); + out.collect("STATE:" + getRuntimeContext().getState(state).value()); + } + } + + private static class BothTriggeringProcessFunction + extends KeyedCoProcessFunction<String, Integer, String, String> { + + private static final long serialVersionUID = 1L; + + @Override + public void processElement1(Integer value, Context ctx, Collector<String> out) + throws Exception { + ctx.timerService().registerProcessingTimeTimer(3); + ctx.timerService().registerEventTimeTimer(6); + ctx.timerService().deleteProcessingTimeTimer(3); + } + + @Override + public void processElement2(String value, Context ctx, Collector<String> out) + throws Exception { + ctx.timerService().registerEventTimeTimer(4); + ctx.timerService().registerProcessingTimeTimer(5); + ctx.timerService().deleteEventTimeTimer(4); + } + + @Override + public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) + throws Exception { + if (TimeDomain.EVENT_TIME.equals(ctx.timeDomain())) { + out.collect("EVENT:1777"); + } else { + out.collect("PROC:1777"); + } + } + } + + private static class AppendCurrentKeyProcessFunction + extends KeyedCoProcessFunction<String, Integer, String, String> { + + @Override + public void processElement1(Integer value, Context ctx, Collector<String> out) + throws Exception { + out.collect(value + "," + ctx.getCurrentKey()); + } + + @Override + public void processElement2(String value, Context ctx, Collector<String> out) + throws Exception { + out.collect(value + "," + ctx.getCurrentKey()); + } + } +} diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecIntervalJoin.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecIntervalJoin.java index 3d89e414ae7..2ac0af781e0 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecIntervalJoin.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecIntervalJoin.java @@ -367,7 +367,7 @@ public class StreamExecIntervalJoin extends ExecNodeBase<RowData> leftTypeInfo, rightTypeInfo, joinFunction); - + // TODO: add async version procJoinFunc to use AsyncKeyedCoProcessOperator return ExecNodeUtil.createTwoInputTransformation( leftInputTransform, rightInputTransform, @@ -404,7 +404,7 @@ public class StreamExecIntervalJoin extends ExecNodeBase<RowData> joinFunction, windowBounds.getLeftTimeIdx(), windowBounds.getRightTimeIdx()); - + // TODO: add async version rowJoinFunc to use AsyncKeyedCoProcessOperator return ExecNodeUtil.createTwoInputTransformation( leftInputTransform, rightInputTransform,