This is an automated email from the ASF dual-hosted git repository.
wanglijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 4e3cd986cd4 [FLINK-30601][runtime] Omit "setKeyContextElement" call
for non-keyed stream/operators to improve performance
4e3cd986cd4 is described below
commit 4e3cd986cd4304c699d5c7d368ffd6e0ead5a096
Author: Lijie Wang <[email protected]>
AuthorDate: Mon Jan 9 14:15:02 2023 +0800
[FLINK-30601][runtime] Omit "setKeyContextElement" call for non-keyed
stream/operators to improve performance
This closes #21621
---
.../state/api/output/BootstrapStreamTask.java | 10 +-
.../flink/state/api/output/BoundedStreamTask.java | 8 +-
.../streaming/api/operators/AbstractInput.java | 7 +-
.../api/operators/AbstractStreamOperator.java | 13 +
.../streaming/api/operators/KeyContextHandler.java | 67 ++++
.../streaming/runtime/io/RecordProcessorUtils.java | 188 ++++++++++
.../io/StreamMultipleInputProcessorFactory.java | 8 +-
.../runtime/io/StreamTwoInputProcessorFactory.java | 20 +-
.../streaming/runtime/tasks/ChainingOutput.java | 7 +-
.../runtime/tasks/CopyingChainingOutput.java | 3 +-
.../runtime/tasks/OneInputStreamTask.java | 7 +-
.../runtime/io/RecordProcessorUtilsTest.java | 407 +++++++++++++++++++++
.../operators/multipleinput/input/InputBase.java | 12 +-
13 files changed, 723 insertions(+), 34 deletions(-)
diff --git
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/BootstrapStreamTask.java
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/BootstrapStreamTask.java
index 8a5e832b28f..936fe88b846 100644
---
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/BootstrapStreamTask.java
+++
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/BootstrapStreamTask.java
@@ -27,12 +27,14 @@ import
org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperatorFactoryUtil;
+import org.apache.flink.streaming.runtime.io.RecordProcessorUtils;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxDefaultAction;
import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.ThrowingConsumer;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
@@ -53,6 +55,8 @@ class BootstrapStreamTask<IN, OUT, OP extends
OneInputStreamOperator<IN, OUT> &
private final Output<StreamRecord<OUT>> output;
+ private ThrowingConsumer<StreamRecord<IN>, Exception> recordProcessor;
+
BootstrapStreamTask(
Environment environment,
BlockingQueue<StreamElement> input,
@@ -82,16 +86,14 @@ class BootstrapStreamTask<IN, OUT, OP extends
OneInputStreamOperator<IN, OUT> &
mainOperator = mainOperatorAndTimeService.f0;
mainOperator.initializeState(createStreamTaskStateInitializer());
mainOperator.open();
+ recordProcessor =
RecordProcessorUtils.getRecordProcessor(mainOperator);
}
@Override
protected void processInput(MailboxDefaultAction.Controller controller)
throws Exception {
StreamElement element = input.take();
if (element.isRecord()) {
- StreamRecord<IN> streamRecord = element.asRecord();
-
- mainOperator.setKeyContextElement1(streamRecord);
- mainOperator.processElement(streamRecord);
+ recordProcessor.accept(element.asRecord());
} else {
mainOperator.endInput();
mainOperator.finish();
diff --git
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/BoundedStreamTask.java
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/BoundedStreamTask.java
index c8e6ac37f9e..c1b35bed4c5 100644
---
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/BoundedStreamTask.java
+++
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/BoundedStreamTask.java
@@ -28,6 +28,7 @@ import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperatorFactoryUtil;
import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.io.RecordProcessorUtils;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
@@ -37,6 +38,7 @@ import
org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.ThrowingConsumer;
import java.util.Iterator;
import java.util.Optional;
@@ -60,6 +62,8 @@ class BoundedStreamTask<IN, OUT, OP extends
OneInputStreamOperator<IN, OUT> & Bo
private final Timestamper<IN> timestamper;
+ private ThrowingConsumer<StreamRecord<IN>, Exception> recordProcessor;
+
BoundedStreamTask(
Environment environment,
Iterable<IN> input,
@@ -91,6 +95,7 @@ class BoundedStreamTask<IN, OUT, OP extends
OneInputStreamOperator<IN, OUT> & Bo
mainOperator = mainOperatorAndTimeService.f0;
mainOperator.initializeState(createStreamTaskStateInitializer());
mainOperator.open();
+ recordProcessor =
RecordProcessorUtils.getRecordProcessor(mainOperator);
}
@Override
@@ -103,8 +108,7 @@ class BoundedStreamTask<IN, OUT, OP extends
OneInputStreamOperator<IN, OUT> & Bo
streamRecord.setTimestamp(timestamp);
}
- mainOperator.setKeyContextElement1(streamRecord);
- mainOperator.processElement(streamRecord);
+ recordProcessor.accept(streamRecord);
} else {
mainOperator.endInput();
mainOperator.finish();
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractInput.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractInput.java
index 8cbb28bb67d..f8d0a74fa4d 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractInput.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractInput.java
@@ -34,7 +34,7 @@ import static
org.apache.flink.util.Preconditions.checkArgument;
* AbstractStreamOperatorV2}.
*/
@Experimental
-public abstract class AbstractInput<IN, OUT> implements Input<IN> {
+public abstract class AbstractInput<IN, OUT> implements Input<IN>,
KeyContextHandler {
/**
* {@code KeySelector} for extracting a key from an element being
processed. This is used to
* scope keyed state to a key. This is null if the operator is not a keyed
operator.
@@ -75,4 +75,9 @@ public abstract class AbstractInput<IN, OUT> implements
Input<IN> {
public void setKeyContextElement(StreamRecord record) throws Exception {
owner.internalSetKeyContextElement(record, stateKeySelector);
}
+
+ @Override
+ public boolean hasKeyContext() {
+ return stateKeySelector != null;
+ }
}
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index d2a24dee4c7..78fb35af4e0 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -89,6 +89,7 @@ public abstract class AbstractStreamOperator<OUT>
implements StreamOperator<OUT>,
SetupableStreamOperator<OUT>,
CheckpointedStreamOperator,
+ KeyContextHandler,
Serializable {
private static final long serialVersionUID = 1L;
@@ -483,6 +484,18 @@ public abstract class AbstractStreamOperator<OUT>
setKeyContextElement(record, stateKeySelector2);
}
+ @Internal
+ @Override
+ public boolean hasKeyContext1() {
+ return stateKeySelector1 != null;
+ }
+
+ @Internal
+ @Override
+ public boolean hasKeyContext2() {
+ return stateKeySelector2 != null;
+ }
+
private <T> void setKeyContextElement(StreamRecord<T> record,
KeySelector<T, ?> selector)
throws Exception {
if (selector != null) {
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/KeyContextHandler.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/KeyContextHandler.java
new file mode 100644
index 00000000000..56d6a417b8e
--- /dev/null
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/KeyContextHandler.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.Internal;
+
+/**
+ * This interface is used to optimize the calls of {@link
Input#setKeyContextElement}, {@link
+ * StreamOperator#setKeyContextElement1} and {@link
StreamOperator#setKeyContextElement2}. We can
+ * decide(at the inputs/operators initialization) whether to omit the calls of
+ * "setKeyContextElement" according to the return value of {@link
#hasKeyContext}. In this way, we
+ * can omit the calls of "setKeyContextElement" for inputs/operators that
don't have "KeyContext".
+ *
+ * <p>All inputs/operators that want to optimize the "setKeyContextElement"
calls should implement
+ * this interface.
+ */
+@Internal
+public interface KeyContextHandler {
+
+ /**
+ * Whether the {@link Input} has "KeyContext". If false, we can omit the
call of {@link
+ * Input#setKeyContextElement} for each record.
+ *
+ * @return True if the {@link Input} has "KeyContext", false otherwise.
+ */
+ default boolean hasKeyContext() {
+ return hasKeyContext1();
+ }
+
+ /**
+ * Whether the first input of {@link StreamOperator} has "KeyContext". If
false, we can omit the
+ * call of {@link StreamOperator#setKeyContextElement1} for each record
arrived on the first
+ * input.
+ *
+ * @return True if the first input has "KeyContext", false otherwise.
+ */
+ default boolean hasKeyContext1() {
+ return true;
+ }
+
+ /**
+ * Whether the second input of {@link StreamOperator} has "KeyContext". If
false, we can omit
+ * the call of {@link StreamOperator#setKeyContextElement1} for each
record arrived on the
+ * second input.
+ *
+ * @return True if the second input has "KeyContext", false otherwise.
+ */
+ default boolean hasKeyContext2() {
+ return true;
+ }
+}
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordProcessorUtils.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordProcessorUtils.java
new file mode 100644
index 00000000000..f1636f3285d
--- /dev/null
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordProcessorUtils.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.Input;
+import org.apache.flink.streaming.api.operators.KeyContextHandler;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.function.ThrowingConsumer;
+
+/** Utility class for creating record processor for {@link Input} {@link
StreamOperator}. */
+public class RecordProcessorUtils {
+
+ private static final String METHOD_SET_KEY_CONTEXT_ELEMENT =
"setKeyContextElement";
+ private static final String METHOD_SET_KEY_CONTEXT_ELEMENT1 =
"setKeyContextElement1";
+ private static final String METHOD_SET_KEY_CONTEXT_ELEMENT2 =
"setKeyContextElement2";
+
+ /**
+ * Get record processor for {@link Input}, which will omit call of {@link
+ * Input#setKeyContextElement} if it doesn't have key context.
+ *
+ * @param input the {@link Input}
+ * @return the record processor
+ */
+ public static <T> ThrowingConsumer<StreamRecord<T>, Exception>
getRecordProcessor(
+ Input<T> input) {
+ boolean canOmitSetKeyContext;
+ if (input instanceof AbstractStreamOperator) {
+ canOmitSetKeyContext =
canOmitSetKeyContext((AbstractStreamOperator<?>) input, 0);
+ } else {
+ canOmitSetKeyContext =
+ input instanceof KeyContextHandler
+ && !((KeyContextHandler) input).hasKeyContext();
+ }
+
+ if (canOmitSetKeyContext) {
+ return input::processElement;
+ } else {
+ return record -> {
+ input.setKeyContextElement(record);
+ input.processElement(record);
+ };
+ }
+ }
+
+ /**
+ * Get record processor for the first input of {@link
TwoInputStreamOperator}, which will omit
+ * call of {@link StreamOperator#setKeyContextElement1} if it doesn't have
key context.
+ *
+ * @param operator the {@link TwoInputStreamOperator}
+ * @return the record processor
+ */
+ public static <T> ThrowingConsumer<StreamRecord<T>, Exception>
getRecordProcessor1(
+ TwoInputStreamOperator<T, ?, ?> operator) {
+ boolean canOmitSetKeyContext;
+ if (operator instanceof AbstractStreamOperator) {
+ canOmitSetKeyContext =
canOmitSetKeyContext((AbstractStreamOperator<?>) operator, 0);
+ } else {
+ canOmitSetKeyContext =
+ operator instanceof KeyContextHandler
+ && !((KeyContextHandler)
operator).hasKeyContext1();
+ }
+
+ if (canOmitSetKeyContext) {
+ return operator::processElement1;
+ } else {
+ return record -> {
+ operator.setKeyContextElement1(record);
+ operator.processElement1(record);
+ };
+ }
+ }
+
+ /**
+ * Get record processor for the second input of {@link
TwoInputStreamOperator}, which will omit
+ * call of {@link StreamOperator#setKeyContextElement2} if it doesn't have
key context.
+ *
+ * @param operator the {@link TwoInputStreamOperator}
+ * @return the record processor
+ */
+ public static <T> ThrowingConsumer<StreamRecord<T>, Exception>
getRecordProcessor2(
+ TwoInputStreamOperator<?, T, ?> operator) {
+ boolean canOmitSetKeyContext;
+ if (operator instanceof AbstractStreamOperator) {
+ canOmitSetKeyContext =
canOmitSetKeyContext((AbstractStreamOperator<?>) operator, 1);
+ } else {
+ canOmitSetKeyContext =
+ operator instanceof KeyContextHandler
+ && !((KeyContextHandler)
operator).hasKeyContext2();
+ }
+
+ if (canOmitSetKeyContext) {
+ return operator::processElement2;
+ } else {
+ return record -> {
+ operator.setKeyContextElement2(record);
+ operator.processElement2(record);
+ };
+ }
+ }
+
+ private static boolean canOmitSetKeyContext(
+ AbstractStreamOperator<?> streamOperator, int input) {
+ // Since AbstractStreamOperator is @PublicEvolving, we need to check
whether the
+ // "SetKeyContextElement" is overridden by the (user-implemented)
subclass. If it is
+ // overridden, we cannot omit it due to the subclass may maintain
different key selectors on
+ // its own.
+ return !hasKeyContext(streamOperator, input)
+ && !methodSetKeyContextIsOverridden(streamOperator, input);
+ }
+
+ private static boolean hasKeyContext(AbstractStreamOperator<?> operator,
int input) {
+ if (input == 0) {
+ return operator.hasKeyContext1();
+ } else {
+ return operator.hasKeyContext2();
+ }
+ }
+
+ private static boolean methodSetKeyContextIsOverridden(
+ AbstractStreamOperator<?> operator, int input) {
+ if (input == 0) {
+ if (operator instanceof OneInputStreamOperator) {
+ return methodIsOverridden(
+ operator,
+ OneInputStreamOperator.class,
+ METHOD_SET_KEY_CONTEXT_ELEMENT,
+ StreamRecord.class)
+ || methodIsOverridden(
+ operator,
+ AbstractStreamOperator.class,
+ METHOD_SET_KEY_CONTEXT_ELEMENT1,
+ StreamRecord.class);
+ } else {
+ return methodIsOverridden(
+ operator,
+ AbstractStreamOperator.class,
+ METHOD_SET_KEY_CONTEXT_ELEMENT1,
+ StreamRecord.class);
+ }
+ } else {
+ return methodIsOverridden(
+ operator,
+ AbstractStreamOperator.class,
+ METHOD_SET_KEY_CONTEXT_ELEMENT2,
+ StreamRecord.class);
+ }
+ }
+
+ private static boolean methodIsOverridden(
+ AbstractStreamOperator<?> operator,
+ Class<?> expectedDeclaringClass,
+ String methodName,
+ Class<?>... parameterTypes) {
+ try {
+ Class<?> methodDeclaringClass =
+ operator.getClass().getMethod(methodName,
parameterTypes).getDeclaringClass();
+ return methodDeclaringClass != expectedDeclaringClass;
+ } catch (NoSuchMethodException exception) {
+ throw new FlinkRuntimeException(
+ String.format(
+ "BUG: Can't find '%s' method in '%s'",
+ methodName, operator.getClass()));
+ }
+ }
+
+ /** Private constructor to prevent instantiation. */
+ private RecordProcessorUtils() {}
+}
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessorFactory.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessorFactory.java
index 9afa00e4cbd..f06e9e71fe8 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessorFactory.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessorFactory.java
@@ -50,6 +50,7 @@ import
org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask;
import org.apache.flink.streaming.runtime.tasks.WatermarkGaugeExposingOutput;
import org.apache.flink.streaming.runtime.watermarkstatus.StatusWatermarkValve;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
+import org.apache.flink.util.function.ThrowingConsumer;
import java.util.Arrays;
import java.util.List;
@@ -248,6 +249,9 @@ public class StreamMultipleInputProcessorFactory {
private final Counter networkRecordsIn;
+ /** The function way is only used for frequent record processing as
for JIT optimization. */
+ private final ThrowingConsumer<StreamRecord<T>, Exception>
recordConsumer;
+
private StreamTaskNetworkOutput(
Input<T> input,
WatermarkGauge inputWatermarkGauge,
@@ -257,12 +261,12 @@ public class StreamMultipleInputProcessorFactory {
this.inputWatermarkGauge = checkNotNull(inputWatermarkGauge);
this.mainOperatorRecordsIn = mainOperatorRecordsIn;
this.networkRecordsIn = networkRecordsIn;
+ this.recordConsumer =
RecordProcessorUtils.getRecordProcessor(input);
}
@Override
public void emitRecord(StreamRecord<T> record) throws Exception {
- input.setKeyContextElement(record);
- input.processElement(record);
+ recordConsumer.accept(record);
mainOperatorRecordsIn.inc();
networkRecordsIn.inc();
}
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessorFactory.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessorFactory.java
index 8b4330a1223..ea296a3283c 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessorFactory.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessorFactory.java
@@ -180,7 +180,7 @@ public class StreamTwoInputProcessorFactory {
StreamTaskNetworkOutput<IN1> output1 =
new StreamTaskNetworkOutput<>(
streamOperator,
- record -> processRecord1(record, streamOperator),
+
RecordProcessorUtils.getRecordProcessor1(streamOperator),
input1WatermarkGauge,
0,
numRecordsIn,
@@ -191,7 +191,7 @@ public class StreamTwoInputProcessorFactory {
StreamTaskNetworkOutput<IN2> output2 =
new StreamTaskNetworkOutput<>(
streamOperator,
- record -> processRecord2(record, streamOperator),
+
RecordProcessorUtils.getRecordProcessor2(streamOperator),
input2WatermarkGauge,
1,
numRecordsIn,
@@ -209,22 +209,6 @@ public class StreamTwoInputProcessorFactory {
return (StreamTaskInput<IN1>) multiInput;
}
- private static <T> void processRecord1(
- StreamRecord<T> record, TwoInputStreamOperator<T, ?, ?>
streamOperator)
- throws Exception {
-
- streamOperator.setKeyContextElement1(record);
- streamOperator.processElement1(record);
- }
-
- private static <T> void processRecord2(
- StreamRecord<T> record, TwoInputStreamOperator<?, T, ?>
streamOperator)
- throws Exception {
-
- streamOperator.setKeyContextElement2(record);
- streamOperator.processElement2(record);
- }
-
/**
* The network data output implementation used for processing stream
elements from {@link
* StreamTaskNetworkInput} in two input selective processor.
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ChainingOutput.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ChainingOutput.java
index 0bf275b2cd7..0ff3785056f 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ChainingOutput.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ChainingOutput.java
@@ -23,11 +23,13 @@ import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.metrics.groups.OperatorMetricGroup;
import org.apache.flink.streaming.api.operators.Input;
import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.io.RecordProcessorUtils;
import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
import org.apache.flink.util.OutputTag;
+import org.apache.flink.util.function.ThrowingConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,6 +45,7 @@ class ChainingOutput<T> implements
WatermarkGaugeExposingOutput<StreamRecord<T>>
protected final WatermarkGauge watermarkGauge = new WatermarkGauge();
@Nullable protected final OutputTag<T> outputTag;
protected WatermarkStatus announcedStatus = WatermarkStatus.ACTIVE;
+ protected final ThrowingConsumer<StreamRecord<T>, Exception>
recordProcessor;
public ChainingOutput(
Input<T> input,
@@ -59,6 +62,7 @@ class ChainingOutput<T> implements
WatermarkGaugeExposingOutput<StreamRecord<T>>
}
this.numRecordsIn =
curOperatorMetricGroup.getIOMetricGroup().getNumRecordsInCounter();
this.outputTag = outputTag;
+ this.recordProcessor = RecordProcessorUtils.getRecordProcessor(input);
}
@Override
@@ -87,8 +91,7 @@ class ChainingOutput<T> implements
WatermarkGaugeExposingOutput<StreamRecord<T>>
numRecordsOut.inc();
numRecordsIn.inc();
- input.setKeyContextElement(castRecord);
- input.processElement(castRecord);
+ recordProcessor.accept(castRecord);
} catch (Exception e) {
throw new ExceptionInChainedOperatorException(e);
}
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/CopyingChainingOutput.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/CopyingChainingOutput.java
index c9091165ab7..c27d3a83490 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/CopyingChainingOutput.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/CopyingChainingOutput.java
@@ -72,8 +72,7 @@ final class CopyingChainingOutput<T> extends
ChainingOutput<T> {
numRecordsOut.inc();
numRecordsIn.inc();
StreamRecord<T> copy =
castRecord.copy(serializer.copy(castRecord.getValue()));
- input.setKeyContextElement(copy);
- input.processElement(copy);
+ recordProcessor.accept(copy);
} catch (ClassCastException e) {
if (outputTag != null) {
// Enrich error message
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
index 1e445d05334..ddcb0182633 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
@@ -32,6 +32,7 @@ import
org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.sort.SortingDataInput;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput.DataOutput;
+import org.apache.flink.streaming.runtime.io.RecordProcessorUtils;
import org.apache.flink.streaming.runtime.io.StreamOneInputProcessor;
import org.apache.flink.streaming.runtime.io.StreamTaskInput;
import org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput;
@@ -44,6 +45,7 @@ import
org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.watermarkstatus.StatusWatermarkValve;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
+import org.apache.flink.util.function.ThrowingConsumer;
import org.apache.flink.shaded.curator5.com.google.common.collect.Iterables;
@@ -217,6 +219,7 @@ public class OneInputStreamTask<IN, OUT> extends
StreamTask<OUT, OneInputStreamO
private final WatermarkGauge watermarkGauge;
private final Counter numRecordsIn;
+ private final ThrowingConsumer<StreamRecord<IN>, Exception>
recordProcessor;
private StreamTaskNetworkOutput(
Input<IN> operator, WatermarkGauge watermarkGauge, Counter
numRecordsIn) {
@@ -224,13 +227,13 @@ public class OneInputStreamTask<IN, OUT> extends
StreamTask<OUT, OneInputStreamO
this.operator = checkNotNull(operator);
this.watermarkGauge = checkNotNull(watermarkGauge);
this.numRecordsIn = checkNotNull(numRecordsIn);
+ this.recordProcessor =
RecordProcessorUtils.getRecordProcessor(operator);
}
@Override
public void emitRecord(StreamRecord<IN> record) throws Exception {
numRecordsIn.inc();
- operator.setKeyContextElement(record);
- operator.processElement(record);
+ recordProcessor.accept(record);
}
@Override
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/RecordProcessorUtilsTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/RecordProcessorUtilsTest.java
new file mode 100644
index 00000000000..69d9703e007
--- /dev/null
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/RecordProcessorUtilsTest.java
@@ -0,0 +1,407 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.groups.OperatorMetricGroup;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.Input;
+import org.apache.flink.streaming.api.operators.KeyContextHandler;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures;
+import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTaskITCase;
+import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
+import org.apache.flink.streaming.util.MockOutput;
+import org.apache.flink.streaming.util.MockStreamConfig;
+import org.apache.flink.util.InstantiationUtil;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link RecordProcessorUtils}. */
+class RecordProcessorUtilsTest {
+
+ @Test
+ void testGetRecordProcessor() throws Exception {
+ TestOperator input1 = new TestOperator();
+ TestOperator input2 = new TestKeyContextHandlerOperator(true);
+ TestOperator input3 = new TestKeyContextHandlerOperator(false);
+
+ RecordProcessorUtils.getRecordProcessor(input1).accept(new
StreamRecord<>("test"));
+ assertThat(input1.setKeyContextElementCalled).isTrue();
+ assertThat(input1.processElementCalled).isTrue();
+
+ RecordProcessorUtils.getRecordProcessor(input2).accept(new
StreamRecord<>("test"));
+ assertThat(input2.setKeyContextElementCalled).isTrue();
+ assertThat(input2.processElementCalled).isTrue();
+
+ RecordProcessorUtils.getRecordProcessor(input3).accept(new
StreamRecord<>("test"));
+ assertThat(input3.setKeyContextElementCalled).isFalse();
+ assertThat(input3.processElementCalled).isTrue();
+ }
+
+ @Test
+ void testGetRecordProcessor1() throws Exception {
+ TestOperator operator1 = new TestOperator();
+ TestOperator operator2 = new TestKeyContextHandlerOperator(true, true);
+ TestOperator operator3 = new TestKeyContextHandlerOperator(false,
true);
+
+ RecordProcessorUtils.getRecordProcessor1(operator1).accept(new
StreamRecord<>("test"));
+ assertThat(operator1.setKeyContextElement1Called).isTrue();
+ assertThat(operator1.processElement1Called).isTrue();
+
+ RecordProcessorUtils.getRecordProcessor1(operator2).accept(new
StreamRecord<>("test"));
+ assertThat(operator2.setKeyContextElement1Called).isTrue();
+ assertThat(operator2.processElement1Called).isTrue();
+
+ RecordProcessorUtils.getRecordProcessor1(operator3).accept(new
StreamRecord<>("test"));
+ assertThat(operator3.setKeyContextElement1Called).isFalse();
+ assertThat(operator3.processElement1Called).isTrue();
+ }
+
+ @Test
+ void testGetRecordProcessor2() throws Exception {
+ TestOperator operator1 = new TestOperator();
+ TestOperator operator2 = new TestKeyContextHandlerOperator(true, true);
+ TestOperator operator3 = new TestKeyContextHandlerOperator(true,
false);
+
+ RecordProcessorUtils.getRecordProcessor2(operator1).accept(new
StreamRecord<>("test"));
+ assertThat(operator1.setKeyContextElement2Called).isTrue();
+ assertThat(operator1.processElement2Called).isTrue();
+
+ RecordProcessorUtils.getRecordProcessor2(operator2).accept(new
StreamRecord<>("test"));
+ assertThat(operator2.setKeyContextElement2Called).isTrue();
+ assertThat(operator2.processElement2Called).isTrue();
+
+ RecordProcessorUtils.getRecordProcessor2(operator3).accept(new
StreamRecord<>("test"));
+ assertThat(operator3.setKeyContextElement2Called).isFalse();
+ assertThat(operator3.processElement2Called).isTrue();
+ }
+
+ @Test
+ void testOverrideSetKeyContextElementForOneInputStreamOperator() throws
Exception {
+ // test no override
+ NoOverrideOneInputStreamOperator noOverride = new
NoOverrideOneInputStreamOperator();
+ RecordProcessorUtils.getRecordProcessor(noOverride).accept(new
StreamRecord<>("test"));
+ assertThat(noOverride.setCurrentKeyCalled).isFalse();
+
+ // test override "SetKeyContextElement"
+ OverrideSetKeyContextOneInputStreamOperator overrideSetKeyContext =
+ new OverrideSetKeyContextOneInputStreamOperator();
+ RecordProcessorUtils.getRecordProcessor(overrideSetKeyContext)
+ .accept(new StreamRecord<>("test"));
+ assertThat(overrideSetKeyContext.setKeyContextElementCalled).isTrue();
+
+ // test override "SetKeyContextElement1"
+ OverrideSetKeyContext1OneInputStreamOperator overrideSetKeyContext1 =
+ new OverrideSetKeyContext1OneInputStreamOperator();
+ RecordProcessorUtils.getRecordProcessor(overrideSetKeyContext1)
+ .accept(new StreamRecord<>("test"));
+
assertThat(overrideSetKeyContext1.setKeyContextElement1Called).isTrue();
+ }
+
+ @Test
+ void testOverrideSetKeyContextElementForTwoInputStreamOperator() throws
Exception {
+ // test no override
+ NoOverrideTwoInputStreamOperator noOverride = new
NoOverrideTwoInputStreamOperator();
+ RecordProcessorUtils.getRecordProcessor1(noOverride).accept(new
StreamRecord<>("test"));
+ RecordProcessorUtils.getRecordProcessor2(noOverride).accept(new
StreamRecord<>("test"));
+ assertThat(noOverride.setCurrentKeyCalled).isFalse();
+
+ // test override "SetKeyContextElement1" and "SetKeyContextElement2"
+ OverrideSetKeyContext1AndSetKeyContext2TwoInputStreamOperator override
=
+ new
OverrideSetKeyContext1AndSetKeyContext2TwoInputStreamOperator();
+ RecordProcessorUtils.getRecordProcessor1(override).accept(new
StreamRecord<>("test"));
+ RecordProcessorUtils.getRecordProcessor2(override).accept(new
StreamRecord<>("test"));
+ assertThat(override.setKeyContextElement1Called).isTrue();
+ assertThat(override.setKeyContextElement2Called).isTrue();
+ }
+
+ private static class NoOverrideOperator extends
AbstractStreamOperator<String> {
+
+ boolean setCurrentKeyCalled = false;
+
+ NoOverrideOperator() throws Exception {
+ super();
+ // For case that "SetKeyContextElement" has not been overridden,
+ // we can determine whether the "SetKeyContextElement" is called
through
+ // "setCurrentKey". According to the implementation, we need to
make the
+ // "stateKeySelector1/stateKeySelector2" not null. Besides, we
override the
+ // "hasKeyContext1" and "hasKeyContext2" to avoid
"stateKeySelector1/stateKeySelector2"
+ // from affecting the return value
+ Configuration configuration = new Configuration();
+ KeySelector keySelector = x -> x;
+ InstantiationUtil.writeObjectToConfig(keySelector, configuration,
"statePartitioner0");
+ InstantiationUtil.writeObjectToConfig(keySelector, configuration,
"statePartitioner1");
+ setup(
+ new StreamTaskITCase.NoOpStreamTask<>(new
DummyEnvironment()),
+ new MockStreamConfig(configuration, 1),
+ new MockOutput<>(new ArrayList<>()));
+ }
+
+ @Override
+ public boolean hasKeyContext1() {
+ return false;
+ }
+
+ @Override
+ public boolean hasKeyContext2() {
+ return false;
+ }
+
+ @Override
+ public void setCurrentKey(Object key) {
+ setCurrentKeyCalled = true;
+ }
+ }
+
+ private static class NoOverrideOneInputStreamOperator extends
NoOverrideOperator
+ implements OneInputStreamOperator<String, String> {
+
+ NoOverrideOneInputStreamOperator() throws Exception {
+ super();
+ }
+
+ @Override
+ public void processElement(StreamRecord<String> element) throws
Exception {}
+ }
+
+ private static class OverrideSetKeyContextOneInputStreamOperator
+ extends NoOverrideOneInputStreamOperator {
+ boolean setKeyContextElementCalled = false;
+
+ OverrideSetKeyContextOneInputStreamOperator() throws Exception {
+ super();
+ }
+
+ @Override
+ public void setKeyContextElement(StreamRecord<String> record) throws
Exception {
+ setKeyContextElementCalled = true;
+ }
+ }
+
+ private static class OverrideSetKeyContext1OneInputStreamOperator
+ extends NoOverrideOneInputStreamOperator {
+ boolean setKeyContextElement1Called = false;
+
+ OverrideSetKeyContext1OneInputStreamOperator() throws Exception {
+ super();
+ }
+
+ @Override
+ public void setKeyContextElement1(StreamRecord record) throws
Exception {
+ setKeyContextElement1Called = true;
+ }
+ }
+
+ private static class NoOverrideTwoInputStreamOperator extends
NoOverrideOperator
+ implements TwoInputStreamOperator<String, String, String> {
+
+ NoOverrideTwoInputStreamOperator() throws Exception {
+ super();
+ }
+
+ @Override
+ public void processElement1(StreamRecord<String> element) throws
Exception {}
+
+ @Override
+ public void processElement2(StreamRecord<String> element) throws
Exception {}
+ }
+
+ private static class
OverrideSetKeyContext1AndSetKeyContext2TwoInputStreamOperator
+ extends NoOverrideTwoInputStreamOperator {
+
+ boolean setKeyContextElement1Called = false;
+
+ boolean setKeyContextElement2Called = false;
+
+ OverrideSetKeyContext1AndSetKeyContext2TwoInputStreamOperator() throws
Exception {
+ super();
+ }
+
+ @Override
+ public void setKeyContextElement1(StreamRecord record) throws
Exception {
+ setKeyContextElement1Called = true;
+ }
+
+ @Override
+ public void setKeyContextElement2(StreamRecord record) throws
Exception {
+ setKeyContextElement2Called = true;
+ }
+ }
+
+ private static class TestOperator
+ implements Input<String>, TwoInputStreamOperator<String, String,
String> {
+ boolean setKeyContextElementCalled = false;
+ boolean processElementCalled = false;
+
+ boolean setKeyContextElement1Called = false;
+ boolean processElement1Called = false;
+
+ boolean setKeyContextElement2Called = false;
+ boolean processElement2Called = false;
+
+ @Override
+ public void processElement(StreamRecord<String> element) throws
Exception {
+ processElementCalled = true;
+ }
+
+ @Override
+ public void processWatermark(Watermark mark) throws Exception {}
+
+ @Override
+ public void processWatermarkStatus(WatermarkStatus watermarkStatus)
throws Exception {}
+
+ @Override
+ public void processLatencyMarker(LatencyMarker latencyMarker) throws
Exception {}
+
+ @Override
+ public void setKeyContextElement(StreamRecord<String> record) throws
Exception {
+ setKeyContextElementCalled = true;
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) throws
Exception {}
+
+ @Override
+ public void setCurrentKey(Object key) {}
+
+ @Override
+ public Object getCurrentKey() {
+ return null;
+ }
+
+ @Override
+ public void open() throws Exception {}
+
+ @Override
+ public void finish() throws Exception {}
+
+ @Override
+ public void close() throws Exception {}
+
+ @Override
+ public void prepareSnapshotPreBarrier(long checkpointId) throws
Exception {}
+
+ @Override
+ public OperatorSnapshotFutures snapshotState(
+ long checkpointId,
+ long timestamp,
+ CheckpointOptions checkpointOptions,
+ CheckpointStreamFactory storageLocation)
+ throws Exception {
+ return null;
+ }
+
+ @Override
+ public void initializeState(StreamTaskStateInitializer
streamTaskStateManager)
+ throws Exception {}
+
+ @Override
+ public OperatorMetricGroup getMetricGroup() {
+ return null;
+ }
+
+ @Override
+ public OperatorID getOperatorID() {
+ return null;
+ }
+
+ @Override
+ public void setKeyContextElement1(StreamRecord<?> record) throws
Exception {
+ setKeyContextElement1Called = true;
+ }
+
+ @Override
+ public void setKeyContextElement2(StreamRecord<?> record) throws
Exception {
+ setKeyContextElement2Called = true;
+ }
+
+ @Override
+ public void processElement1(StreamRecord<String> element) throws
Exception {
+ processElement1Called = true;
+ }
+
+ @Override
+ public void processElement2(StreamRecord<String> element) throws
Exception {
+ processElement2Called = true;
+ }
+
+ @Override
+ public void processWatermark1(Watermark mark) throws Exception {}
+
+ @Override
+ public void processWatermark2(Watermark mark) throws Exception {}
+
+ @Override
+ public void processLatencyMarker1(LatencyMarker latencyMarker) throws
Exception {}
+
+ @Override
+ public void processLatencyMarker2(LatencyMarker latencyMarker) throws
Exception {}
+
+ @Override
+ public void processWatermarkStatus1(WatermarkStatus watermarkStatus)
throws Exception {}
+
+ @Override
+ public void processWatermarkStatus2(WatermarkStatus watermarkStatus)
throws Exception {}
+ }
+
+ private static class TestKeyContextHandlerOperator extends TestOperator
+ implements KeyContextHandler {
+ private final boolean hasKeyContext1;
+ private final boolean hasKeyContext2;
+
+ TestKeyContextHandlerOperator(boolean hasKeyContext) {
+ this.hasKeyContext1 = hasKeyContext;
+ this.hasKeyContext2 = true;
+ }
+
+ TestKeyContextHandlerOperator(boolean hasKeyContext1, boolean
hasKeyContext2) {
+ this.hasKeyContext1 = hasKeyContext1;
+ this.hasKeyContext2 = hasKeyContext2;
+ }
+
+ @Override
+ public boolean hasKeyContext() {
+ return hasKeyContext1;
+ }
+
+ @Override
+ public boolean hasKeyContext1() {
+ return hasKeyContext1;
+ }
+
+ @Override
+ public boolean hasKeyContext2() {
+ return hasKeyContext2;
+ }
+ }
+}
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/input/InputBase.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/input/InputBase.java
index 2343441bebc..4d0a98ac4a6 100644
---
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/input/InputBase.java
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/input/InputBase.java
@@ -19,15 +19,25 @@
package org.apache.flink.table.runtime.operators.multipleinput.input;
import org.apache.flink.streaming.api.operators.Input;
+import org.apache.flink.streaming.api.operators.KeyContextHandler;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.data.RowData;
import
org.apache.flink.table.runtime.operators.multipleinput.MultipleInputStreamOperatorBase;
/** Base {@link Input} used in {@link MultipleInputStreamOperatorBase}. */
-public abstract class InputBase implements Input<RowData> {
+public abstract class InputBase implements Input<RowData>, KeyContextHandler {
@Override
public void setKeyContextElement(StreamRecord<RowData> record) throws
Exception {
// do nothing
}
+
+ @Override
+ public boolean hasKeyContext() {
+ // Currently, we can simply return false due to
InputBase#setKeyContextElement is an empty
+ // implementation. Once there is a non-empty implementation in the
future, this method
+ // should also be adapted, otherwise the
InputBase#setKeyContextElement will never be
+ // called.
+ return false;
+ }
}