This is an automated email from the ASF dual-hosted git repository.
xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 7a90a05e82d [FLINK-35089][runtime] Initialize lastRecordAttributes in
AbstractStreamOperator during setup
7a90a05e82d is described below
commit 7a90a05e82ddfb3438e611d44fd329858255de6b
Author: sxnan <[email protected]>
AuthorDate: Fri Apr 12 11:35:27 2024 +0800
[FLINK-35089][runtime] Initialize lastRecordAttributes in
AbstractStreamOperator during setup
This closes #24655
---
.../api/operators/AbstractStreamOperator.java | 9 +-
.../runtime/RecordAttributesPropagationITCase.java | 279 +++++++++++++++++++++
2 files changed, 284 insertions(+), 4 deletions(-)
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index f816fa6633d..632b89b16ae 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -153,10 +153,8 @@ public abstract class AbstractStreamOperator<OUT>
protected transient ProcessingTimeService processingTimeService;
- protected transient RecordAttributes lastRecordAttributes1 =
- RecordAttributes.EMPTY_RECORD_ATTRIBUTES;
- protected transient RecordAttributes lastRecordAttributes2 =
- RecordAttributes.EMPTY_RECORD_ATTRIBUTES;
+ protected transient RecordAttributes lastRecordAttributes1;
+ protected transient RecordAttributes lastRecordAttributes2;
// ------------------------------------------------------------------------
// Life Cycle
@@ -235,6 +233,9 @@ public abstract class AbstractStreamOperator<OUT>
stateKeySelector1 = config.getStatePartitioner(0,
getUserCodeClassloader());
stateKeySelector2 = config.getStatePartitioner(1,
getUserCodeClassloader());
+
+ lastRecordAttributes1 = RecordAttributes.EMPTY_RECORD_ATTRIBUTES;
+ lastRecordAttributes2 = RecordAttributes.EMPTY_RECORD_ATTRIBUTES;
}
/**
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/RecordAttributesPropagationITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/RecordAttributesPropagationITCase.java
new file mode 100644
index 00000000000..733d79b8f78
--- /dev/null
+++
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/RecordAttributesPropagationITCase.java
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.streaming.runtime;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.core.io.InputStatus;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
+import org.apache.flink.streaming.runtime.streamrecord.RecordAttributesBuilder;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Integration tests for {@link RecordAttributes} propagation. */
+public class RecordAttributesPropagationITCase {
+
+ @Test
+ void testRecordAttributesPropagation() throws Exception {
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+
+ final SourceWithBacklog source1 = new SourceWithBacklog();
+ final SourceWithBacklog source2 = new SourceWithBacklog();
+
+ env.fromSource(source1, WatermarkStrategy.noWatermarks(), "source1")
+ .returns(Long.class)
+ .transform("my_op1", Types.LONG, new OneInputOperator())
+ .connect(
+ env.fromSource(source2,
WatermarkStrategy.noWatermarks(), "source2")
+ .returns(Long.class))
+ .transform("my_op2", Types.LONG, new TwoInputOperator())
+ .addSink(new DiscardingSink<>());
+ env.execute();
+ final RecordAttributes backlog =
+ new
RecordAttributesBuilder(Collections.emptyList()).setBacklog(true).build();
+ final RecordAttributes nonBacklog =
+ new
RecordAttributesBuilder(Collections.emptyList()).setBacklog(false).build();
+
assertThat(OneInputOperator.receivedRecordAttributes).containsExactly(backlog,
nonBacklog);
+
assertThat(TwoInputOperator.receivedRecordAttributes1).containsExactly(backlog,
nonBacklog);
+
assertThat(TwoInputOperator.receivedRecordAttributes2).containsExactly(backlog,
nonBacklog);
+ }
+
+ static class SourceWithBacklog implements Source<Long, MockSplit, Long> {
+
+ @Override
+ public Boundedness getBoundedness() {
+ return Boundedness.BOUNDED;
+ }
+
+ @Override
+ public SplitEnumerator<MockSplit, Long> createEnumerator(
+ SplitEnumeratorContext<MockSplit> enumContext) {
+ return new SplitEnumeratorWithBacklog(enumContext);
+ }
+
+ @Override
+ public SplitEnumerator<MockSplit, Long> restoreEnumerator(
+ SplitEnumeratorContext<MockSplit> enumContext, Long
checkpoint) {
+ return new SplitEnumeratorWithBacklog(enumContext);
+ }
+
+ @Override
+ public SimpleVersionedSerializer<MockSplit> getSplitSerializer() {
+ return new SimpleVersionedSerializer<MockSplit>() {
+ @Override
+ public int getVersion() {
+ return 0;
+ }
+
+ @Override
+ public byte[] serialize(MockSplit obj) {
+ return new byte[0];
+ }
+
+ @Override
+ public MockSplit deserialize(int version, byte[] serialized) {
+ return new MockSplit();
+ }
+ };
+ }
+
+ @Override
+ public SimpleVersionedSerializer<Long>
getEnumeratorCheckpointSerializer() {
+ return new SimpleVersionedSerializer<Long>() {
+ @Override
+ public int getVersion() {
+ return 0;
+ }
+
+ @Override
+ public byte[] serialize(Long obj) {
+ return new byte[0];
+ }
+
+ @Override
+ public Long deserialize(int version, byte[] serialized) {
+ return 0L;
+ }
+ };
+ }
+
+ @Override
+ public SourceReader<Long, MockSplit> createReader(SourceReaderContext
readerContext) {
+ return new SourceReader<Long, MockSplit>() {
+ private boolean noMoreSplit;
+
+ @Override
+ public void start() {}
+
+ @Override
+ public InputStatus pollNext(ReaderOutput<Long> output) {
+ if (noMoreSplit) {
+ return InputStatus.END_OF_INPUT;
+ }
+ return InputStatus.MORE_AVAILABLE;
+ }
+
+ @Override
+ public List<MockSplit> snapshotState(long checkpointId) {
+ return null;
+ }
+
+ @Override
+ public CompletableFuture<Void> isAvailable() {
+ return null;
+ }
+
+ @Override
+ public void addSplits(List<MockSplit> splits) {}
+
+ @Override
+ public void notifyNoMoreSplits() {
+ noMoreSplit = true;
+ }
+
+ @Override
+ public void close() {}
+ };
+ }
+ }
+
+ static class SplitEnumeratorWithBacklog implements
SplitEnumerator<MockSplit, Long> {
+
+ private final SplitEnumeratorContext<MockSplit> context;
+ private final ExecutorService executor;
+
+ SplitEnumeratorWithBacklog(SplitEnumeratorContext<MockSplit> context) {
+ this.context = context;
+ executor = Executors.newSingleThreadExecutor();
+ }
+
+ @Override
+ public void start() {
+ executor.submit(
+ () -> {
+ context.setIsProcessingBacklog(true);
+ try {
+ Thread.sleep(3000);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ context.setIsProcessingBacklog(false);
+ for (int i = 0; i < context.currentParallelism(); i++)
{
+ context.signalNoMoreSplits(i);
+ }
+ });
+ }
+
+ @Override
+ public void handleSplitRequest(int subtaskId, @Nullable String
requesterHostname) {}
+
+ @Override
+ public void addSplitsBack(List<MockSplit> splits, int subtaskId) {}
+
+ @Override
+ public void addReader(int subtaskId) {}
+
+ @Override
+ public Long snapshotState(long checkpointId) throws Exception {
+ return null;
+ }
+
+ @Override
+ public void close() throws IOException {}
+ }
+
+ static class MockSplit implements SourceSplit, Serializable {
+ @Override
+ public String splitId() {
+ return "0";
+ }
+ }
+
+ static class OneInputOperator extends AbstractStreamOperator<Long>
+ implements OneInputStreamOperator<Long, Long> {
+ private static final List<RecordAttributes> receivedRecordAttributes =
new ArrayList<>();
+
+ @Override
+ public void processElement(StreamRecord<Long> element) throws
Exception {
+ output.collect(element);
+ }
+
+ @Override
+ public void processRecordAttributes(RecordAttributes recordAttributes)
throws Exception {
+ receivedRecordAttributes.add(recordAttributes);
+ super.processRecordAttributes(recordAttributes);
+ }
+ }
+
+ static class TwoInputOperator extends AbstractStreamOperator<Long>
+ implements TwoInputStreamOperator<Long, Long, Long> {
+
+ private static final List<RecordAttributes> receivedRecordAttributes1
= new ArrayList<>();
+ private static final List<RecordAttributes> receivedRecordAttributes2
= new ArrayList<>();
+
+ @Override
+ public void processRecordAttributes1(RecordAttributes
recordAttributes) {
+ receivedRecordAttributes1.add(recordAttributes);
+ super.processRecordAttributes1(recordAttributes);
+ }
+
+ @Override
+ public void processRecordAttributes2(RecordAttributes
recordAttributes) {
+ receivedRecordAttributes2.add(recordAttributes);
+ super.processRecordAttributes2(recordAttributes);
+ }
+
+ @Override
+ public void processElement1(StreamRecord<Long> element) {
+ output.collect(element);
+ }
+
+ @Override
+ public void processElement2(StreamRecord<Long> element) {
+ output.collect(element);
+ }
+ }
+}