This is an automated email from the ASF dual-hosted git repository. gaborgsomogyi pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 99e459456b6 [FLINK-37851][state] Migrate state processor API from source API v1 to source API v2 99e459456b6 is described below commit 99e459456b6714cb7febac603871b3f3773632aa Author: Gabor Somogyi <gabor_somog...@apple.com> AuthorDate: Thu Jun 26 14:33:10 2025 +0200 [FLINK-37851][state] Migrate state processor API from source API v1 to source API v2 --- .../e5126cae-f3fe-48aa-b6fb-60ae6cc3fcd5 | 2 - .../connector/source/RichSourceReaderContext.java | 42 +++ .../state/api/input/OperatorStateInputFormat.java | 1 + .../flink/state/api/input/SourceBuilder.java | 12 +- .../flink/state/api/SavepointDeepCopyTest.java | 1 + .../connector/source/lib/InputFormatSource.java | 328 +++++++++++++++++++++ .../streaming/api/operators/SourceOperator.java | 14 +- 7 files changed, 388 insertions(+), 12 deletions(-) diff --git a/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/e5126cae-f3fe-48aa-b6fb-60ae6cc3fcd5 b/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/e5126cae-f3fe-48aa-b6fb-60ae6cc3fcd5 index c53c7b0d544..7e0ecf4979f 100644 --- a/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/e5126cae-f3fe-48aa-b6fb-60ae6cc3fcd5 +++ b/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/e5126cae-f3fe-48aa-b6fb-60ae6cc3fcd5 @@ -19,7 +19,5 @@ Method <org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration.fromConfi Method <org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration.fromConfiguration(org.apache.flink.configuration.Configuration, org.apache.flink.runtime.taskexecutor.TaskExecutorResourceSpec, java.lang.String, java.io.File)> calls method <org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils.generateTotalAvailableResourceProfile(org.apache.flink.runtime.taskexecutor.TaskExecutorResourceSpec)> in (TaskManagerConfiguration.java:240) Method <org.apache.flink.runtime.taskexecutor.TaskManagerServices.createTaskSlotTable(int, org.apache.flink.runtime.taskexecutor.TaskExecutorResourceSpec, long, int, java.util.concurrent.Executor)> calls method <org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils.generateDefaultSlotResourceProfile(org.apache.flink.runtime.taskexecutor.TaskExecutorResourceSpec, int)> in (TaskManagerServices.java:481) Method <org.apache.flink.runtime.taskexecutor.TaskManagerServices.createTaskSlotTable(int, org.apache.flink.runtime.taskexecutor.TaskExecutorResourceSpec, long, int, java.util.concurrent.Executor)> calls method <org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils.generateTotalAvailableResourceProfile(org.apache.flink.runtime.taskexecutor.TaskExecutorResourceSpec)> in (TaskManagerServices.java:479) -Method <org.apache.flink.streaming.api.operators.SourceOperator$1$1.asClassLoader()> calls method <org.apache.flink.streaming.api.operators.SourceOperator.getRuntimeContext()> in (SourceOperator.java:321) -Method <org.apache.flink.streaming.api.operators.SourceOperator$1$1.registerReleaseHookIfAbsent(java.lang.String, java.lang.Runnable)> calls method <org.apache.flink.streaming.api.operators.SourceOperator.getRuntimeContext()> in (SourceOperator.java:327) Method <org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask.init()> calls method <org.apache.flink.streaming.api.operators.SourceOperator.getSourceReader()> in (SourceOperatorStreamTask.java:101) Method <org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl.isIdle()> calls method <org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.isDefaultActionAvailable()> in (MailboxExecutorImpl.java:64) diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/RichSourceReaderContext.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/RichSourceReaderContext.java new file mode 100644 index 00000000000..c398cf80c8a --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/RichSourceReaderContext.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.connector.source; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.util.Preconditions; + +/** + * An abstract stub implementation for {@link SourceReaderContext}. Rich formats have access to + * their runtime execution context via {@link #getRuntimeContext()}. + */ +@Experimental +public abstract class RichSourceReaderContext implements SourceReaderContext { + + private transient RuntimeContext runtimeContext; + + public void setRuntimeContext(RuntimeContext runtimeContext) { + this.runtimeContext = runtimeContext; + } + + public RuntimeContext getRuntimeContext() { + Preconditions.checkNotNull(runtimeContext, "The runtime context must not be null."); + return runtimeContext; + } +} diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/input/OperatorStateInputFormat.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/input/OperatorStateInputFormat.java index 88672c70eb5..978160dc0e0 100644 --- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/input/OperatorStateInputFormat.java +++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/input/OperatorStateInputFormat.java @@ -120,6 +120,7 @@ abstract class OperatorStateInputFormat<OT> extends RichInputFormat<OT, Operator return new DefaultInputSplitAssigner(inputSplits); } + @Override public OperatorStateInputSplit[] createInputSplits(int minNumSplits) { OperatorStateInputSplit[] splits = getOperatorStateInputSplits(minNumSplits); diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/input/SourceBuilder.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/input/SourceBuilder.java index 1fd89fe07bb..874d5f03a84 100644 --- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/input/SourceBuilder.java +++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/input/SourceBuilder.java @@ -19,13 +19,13 @@ package org.apache.flink.state.api.input; import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.io.InputFormat; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.lib.InputFormatSource; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.source.legacy.InputFormatSourceFunction; -import org.apache.flink.streaming.api.operators.StreamSource; /** A utility for constructing {@link InputFormat} based sources that are marked as BOUNDED. */ @Internal @@ -48,13 +48,11 @@ public final class SourceBuilder { StreamExecutionEnvironment env, InputFormat<OUT, ?> inputFormat, TypeInformation<OUT> typeInfo) { - InputFormatSourceFunction<OUT> function = - new InputFormatSourceFunction<>(inputFormat, typeInfo); + InputFormatSource<OUT> source = new InputFormatSource<>(Boundedness.BOUNDED, inputFormat); - env.clean(function); + env.clean(source); - final StreamSource<OUT, ?> sourceOperator = new StreamSource<>(function); return new DataStreamSource<>( - env, typeInfo, sourceOperator, true, SOURCE_NAME, Boundedness.BOUNDED); + env, source, WatermarkStrategy.noWatermarks(), typeInfo, SOURCE_NAME); } } diff --git a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointDeepCopyTest.java b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointDeepCopyTest.java index 0f2b3f3f98a..144aeb9d1c6 100644 --- a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointDeepCopyTest.java +++ b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointDeepCopyTest.java @@ -135,6 +135,7 @@ public class SavepointDeepCopyTest extends AbstractTestBaseJUnit4 { @Test public void testSavepointDeepCopy() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); DataStream<String> words = env.fromData(TEXT.split(" ")); diff --git a/flink-runtime/src/main/java/org/apache/flink/api/connector/source/lib/InputFormatSource.java b/flink-runtime/src/main/java/org/apache/flink/api/connector/source/lib/InputFormatSource.java new file mode 100644 index 00000000000..cf1ae6d3e68 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/api/connector/source/lib/InputFormatSource.java @@ -0,0 +1,328 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.connector.source.lib; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.io.InputFormat; +import org.apache.flink.api.common.io.RichInputFormat; +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.ReaderOutput; +import org.apache.flink.api.connector.source.RichSourceReaderContext; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.SourceSplit; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.core.io.InputSplit; +import org.apache.flink.core.io.InputStatus; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.metrics.Counter; +import org.apache.flink.util.InstantiationUtil; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.io.Serializable; +import java.util.Arrays; +import java.util.LinkedList; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +/** A {@link Source} that reads data using an {@link InputFormat}. */ +@Internal +public class InputFormatSource<OUT> implements Source<OUT, SourceSplit, Void> { + private static final long serialVersionUID = 1L; + + private final Boundedness boundedness; + private final InputFormat<OUT, InputSplit> format; + + @SuppressWarnings("unchecked") + public InputFormatSource(Boundedness boundedness, InputFormat<OUT, ?> format) { + this.boundedness = boundedness; + this.format = (InputFormat<OUT, InputSplit>) format; + } + + @Override + public Boundedness getBoundedness() { + return boundedness; + } + + @Override + public SplitEnumerator<SourceSplit, Void> createEnumerator( + SplitEnumeratorContext<SourceSplit> context) throws Exception { + return new InputFormatSplitEnumerator<>(format, context); + } + + @Override + public SplitEnumerator<SourceSplit, Void> restoreEnumerator( + SplitEnumeratorContext<SourceSplit> context, Void checkpoint) throws Exception { + return new InputFormatSplitEnumerator<>(format, context); + } + + @Override + public SimpleVersionedSerializer<SourceSplit> getSplitSerializer() { + return new SimpleVersionedSerializer<>() { + @Override + public int getVersion() { + return 0; + } + + @Override + public byte[] serialize(SourceSplit split) throws IOException { + return InstantiationUtil.serializeObject(split); + } + + @Override + public SourceSplit deserialize(int version, byte[] serialized) throws IOException { + try { + return InstantiationUtil.deserializeObject( + serialized, Thread.currentThread().getContextClassLoader()); + } catch (ClassNotFoundException e) { + throw new IOException("Failed to deserialize SourceSplit.", e); + } + } + }; + } + + @Override + public SimpleVersionedSerializer<Void> getEnumeratorCheckpointSerializer() { + return new SimpleVersionedSerializer<>() { + @Override + public int getVersion() { + return 0; + } + + @Override + public byte[] serialize(Void obj) { + return new byte[0]; + } + + @Override + public Void deserialize(int version, byte[] serialized) { + return null; + } + }; + } + + @Override + public SourceReader<OUT, SourceSplit> createReader(SourceReaderContext readerContext) + throws Exception { + RuntimeContext runtimeContext = null; + if (readerContext instanceof RichSourceReaderContext) { + runtimeContext = ((RichSourceReaderContext) readerContext).getRuntimeContext(); + } + return new InputFormatSourceReader<>(readerContext, format, runtimeContext); + } + + private static class InputSplitWrapperSourceSplit implements SourceSplit, Serializable { + private final InputSplit inputSplit; + private final String id; + + public InputSplitWrapperSourceSplit(InputSplit inputSplit) { + this.inputSplit = inputSplit; + this.id = String.valueOf(inputSplit.getSplitNumber()); + } + + public InputSplit getInputSplit() { + return inputSplit; + } + + @Override + public String splitId() { + return id; + } + } + + private static class InputFormatSplitEnumerator<OUT> + implements SplitEnumerator<SourceSplit, Void> { + private final InputFormat<OUT, InputSplit> format; + private final SplitEnumeratorContext<SourceSplit> context; + private Queue<SourceSplit> remainingSplits; + + public InputFormatSplitEnumerator( + InputFormat<OUT, InputSplit> format, SplitEnumeratorContext<SourceSplit> context) { + this.format = format; + this.context = context; + } + + @Override + public void start() { + try { + remainingSplits = + Arrays.stream(format.createInputSplits(context.currentParallelism())) + .map(InputSplitWrapperSourceSplit::new) + .collect(Collectors.toCollection(LinkedList::new)); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) { + final SourceSplit nextSplit = remainingSplits.poll(); + if (nextSplit != null) { + context.assignSplit(nextSplit, subtaskId); + } else { + context.signalNoMoreSplits(subtaskId); + } + } + + @Override + public void addSplitsBack(List<SourceSplit> splits, int subtaskId) { + remainingSplits.addAll(splits); + } + + @Override + public void addReader(int subtaskId) {} + + @Override + public Void snapshotState(long checkpointId) { + return null; + } + + @Override + public void close() {} + } + + private static class InputFormatSourceReader<OUT> implements SourceReader<OUT, SourceSplit> { + private final SourceReaderContext readerContext; + private final InputFormat<OUT, InputSplit> format; + @Nullable private final RuntimeContext runtimeContext; + @Nullable private Counter completedSplitsCounter; + private Queue<SourceSplit> remainingSplits; + private boolean noMoreSplits; + private boolean isFormatOpen; + private OUT lastElement; + + public InputFormatSourceReader( + SourceReaderContext readerContext, + InputFormat<OUT, InputSplit> format, + @Nullable RuntimeContext runtimeContext) { + this.format = format; + this.runtimeContext = runtimeContext; + this.readerContext = readerContext; + } + + @Override + public void start() { + this.remainingSplits = new LinkedList<>(); + if (runtimeContext != null) { + completedSplitsCounter = + runtimeContext.getMetricGroup().counter("numSplitsProcessed"); + } + this.noMoreSplits = false; + this.isFormatOpen = false; + this.lastElement = null; + + if (format instanceof RichInputFormat) { + ((RichInputFormat<?, ?>) format).setRuntimeContext(runtimeContext); + } + format.configure(readerContext.getConfiguration()); + if (format instanceof RichInputFormat) { + try { + ((RichInputFormat<?, ?>) format).openInputFormat(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + readerContext.sendSplitRequest(); + } + + @Override + public InputStatus pollNext(ReaderOutput<OUT> output) throws Exception { + // If no open format then try to open it + if (!isFormatOpen) { + InputSplitWrapperSourceSplit split = + (InputSplitWrapperSourceSplit) remainingSplits.poll(); + if (split != null) { + format.open(split.getInputSplit()); + isFormatOpen = true; + + // We send a request for more + if (remainingSplits.isEmpty() && !noMoreSplits) { + readerContext.sendSplitRequest(); + } + } + } + + // If there is a format which is not at the end then return the next record + if (isFormatOpen && !format.reachedEnd()) { + lastElement = format.nextRecord(lastElement); + output.collect(lastElement); + return InputStatus.MORE_AVAILABLE; + } else { + // Otherwise just close it + format.close(); + isFormatOpen = false; + if (completedSplitsCounter != null) { + completedSplitsCounter.inc(); + } + } + + // Here we have nothing to collect + if (remainingSplits.isEmpty()) { + if (noMoreSplits) { + // No further data so signal end + return InputStatus.END_OF_INPUT; + } else { + // When there are splits remote then we signal nothing available + return InputStatus.NOTHING_AVAILABLE; + } + } else { + // When we have splits locally then we just need to process them in the next + // round + return InputStatus.MORE_AVAILABLE; + } + } + + @Override + public List<SourceSplit> snapshotState(long checkpointId) { + return List.of(); + } + + @Override + public CompletableFuture<Void> isAvailable() { + return CompletableFuture.completedFuture(null); + } + + @Override + public void addSplits(List<SourceSplit> splits) { + remainingSplits.addAll(splits); + } + + @Override + public void notifyNoMoreSplits() { + noMoreSplits = true; + } + + @Override + public void close() throws Exception { + if (isFormatOpen) { + format.close(); + isFormatOpen = false; + } + if (format instanceof RichInputFormat) { + ((RichInputFormat<?, ?>) format).closeInputFormat(); + } + } + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java index b5fff8915da..e6cb4b99518 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java @@ -21,10 +21,12 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.eventtime.WatermarkAlignmentParams; import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer; import org.apache.flink.api.connector.source.ReaderOutput; +import org.apache.flink.api.connector.source.RichSourceReaderContext; import org.apache.flink.api.connector.source.SourceEvent; import org.apache.flink.api.connector.source.SourceReader; import org.apache.flink.api.connector.source.SourceReaderContext; @@ -278,10 +280,11 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> extends AbstractStr return; } - final int subtaskIndex = getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(); + StreamingRuntimeContext runtimeContext = getRuntimeContext(); + final int subtaskIndex = runtimeContext.getTaskInfo().getIndexOfThisSubtask(); - final SourceReaderContext context = - new SourceReaderContext() { + final RichSourceReaderContext context = + new RichSourceReaderContext() { @Override public SourceReaderMetricGroup metricGroup() { return sourceMetricGroup; @@ -345,6 +348,11 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> extends AbstractStr watermark, watermarkIsAlignedMap.get(watermark.getIdentifier()))); } + + @Override + public RuntimeContext getRuntimeContext() { + return runtimeContext; + } }; sourceReader = readerFactory.apply(context);