This is an automated email from the ASF dual-hosted git repository.

gaborgsomogyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 99e459456b6 [FLINK-37851][state] Migrate state processor API from 
source API v1 to source API v2
99e459456b6 is described below

commit 99e459456b6714cb7febac603871b3f3773632aa
Author: Gabor Somogyi <gabor_somog...@apple.com>
AuthorDate: Thu Jun 26 14:33:10 2025 +0200

    [FLINK-37851][state] Migrate state processor API from source API v1 to 
source API v2
---
 .../e5126cae-f3fe-48aa-b6fb-60ae6cc3fcd5           |   2 -
 .../connector/source/RichSourceReaderContext.java  |  42 +++
 .../state/api/input/OperatorStateInputFormat.java  |   1 +
 .../flink/state/api/input/SourceBuilder.java       |  12 +-
 .../flink/state/api/SavepointDeepCopyTest.java     |   1 +
 .../connector/source/lib/InputFormatSource.java    | 328 +++++++++++++++++++++
 .../streaming/api/operators/SourceOperator.java    |  14 +-
 7 files changed, 388 insertions(+), 12 deletions(-)

diff --git 
a/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/e5126cae-f3fe-48aa-b6fb-60ae6cc3fcd5
 
b/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/e5126cae-f3fe-48aa-b6fb-60ae6cc3fcd5
index c53c7b0d544..7e0ecf4979f 100644
--- 
a/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/e5126cae-f3fe-48aa-b6fb-60ae6cc3fcd5
+++ 
b/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/e5126cae-f3fe-48aa-b6fb-60ae6cc3fcd5
@@ -19,7 +19,5 @@ Method 
<org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration.fromConfi
 Method 
<org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration.fromConfiguration(org.apache.flink.configuration.Configuration,
 org.apache.flink.runtime.taskexecutor.TaskExecutorResourceSpec, 
java.lang.String, java.io.File)> calls method 
<org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils.generateTotalAvailableResourceProfile(org.apache.flink.runtime.taskexecutor.TaskExecutorResourceSpec)>
 in (TaskManagerConfiguration.java:240)
 Method 
<org.apache.flink.runtime.taskexecutor.TaskManagerServices.createTaskSlotTable(int,
 org.apache.flink.runtime.taskexecutor.TaskExecutorResourceSpec, long, int, 
java.util.concurrent.Executor)> calls method 
<org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils.generateDefaultSlotResourceProfile(org.apache.flink.runtime.taskexecutor.TaskExecutorResourceSpec,
 int)> in (TaskManagerServices.java:481)
 Method 
<org.apache.flink.runtime.taskexecutor.TaskManagerServices.createTaskSlotTable(int,
 org.apache.flink.runtime.taskexecutor.TaskExecutorResourceSpec, long, int, 
java.util.concurrent.Executor)> calls method 
<org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils.generateTotalAvailableResourceProfile(org.apache.flink.runtime.taskexecutor.TaskExecutorResourceSpec)>
 in (TaskManagerServices.java:479)
-Method 
<org.apache.flink.streaming.api.operators.SourceOperator$1$1.asClassLoader()> 
calls method 
<org.apache.flink.streaming.api.operators.SourceOperator.getRuntimeContext()> 
in (SourceOperator.java:321)
-Method 
<org.apache.flink.streaming.api.operators.SourceOperator$1$1.registerReleaseHookIfAbsent(java.lang.String,
 java.lang.Runnable)> calls method 
<org.apache.flink.streaming.api.operators.SourceOperator.getRuntimeContext()> 
in (SourceOperator.java:327)
 Method 
<org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask.init()> 
calls method 
<org.apache.flink.streaming.api.operators.SourceOperator.getSourceReader()> in 
(SourceOperatorStreamTask.java:101)
 Method 
<org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl.isIdle()> 
calls method 
<org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.isDefaultActionAvailable()>
 in (MailboxExecutorImpl.java:64)
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/connector/source/RichSourceReaderContext.java
 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/RichSourceReaderContext.java
new file mode 100644
index 00000000000..c398cf80c8a
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/RichSourceReaderContext.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * An abstract stub implementation for {@link SourceReaderContext}. Rich 
formats have access to
+ * their runtime execution context via {@link #getRuntimeContext()}.
+ */
+@Experimental
+public abstract class RichSourceReaderContext implements SourceReaderContext {
+
+    private transient RuntimeContext runtimeContext;
+
+    public void setRuntimeContext(RuntimeContext runtimeContext) {
+        this.runtimeContext = runtimeContext;
+    }
+
+    public RuntimeContext getRuntimeContext() {
+        Preconditions.checkNotNull(runtimeContext, "The runtime context must 
not be null.");
+        return runtimeContext;
+    }
+}
diff --git 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/input/OperatorStateInputFormat.java
 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/input/OperatorStateInputFormat.java
index 88672c70eb5..978160dc0e0 100644
--- 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/input/OperatorStateInputFormat.java
+++ 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/input/OperatorStateInputFormat.java
@@ -120,6 +120,7 @@ abstract class OperatorStateInputFormat<OT> extends 
RichInputFormat<OT, Operator
         return new DefaultInputSplitAssigner(inputSplits);
     }
 
+    @Override
     public OperatorStateInputSplit[] createInputSplits(int minNumSplits) {
         OperatorStateInputSplit[] splits = 
getOperatorStateInputSplits(minNumSplits);
 
diff --git 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/input/SourceBuilder.java
 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/input/SourceBuilder.java
index 1fd89fe07bb..874d5f03a84 100644
--- 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/input/SourceBuilder.java
+++ 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/input/SourceBuilder.java
@@ -19,13 +19,13 @@
 package org.apache.flink.state.api.input;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.common.io.InputFormat;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.lib.InputFormatSource;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import 
org.apache.flink.streaming.api.functions.source.legacy.InputFormatSourceFunction;
-import org.apache.flink.streaming.api.operators.StreamSource;
 
 /** A utility for constructing {@link InputFormat} based sources that are 
marked as BOUNDED. */
 @Internal
@@ -48,13 +48,11 @@ public final class SourceBuilder {
             StreamExecutionEnvironment env,
             InputFormat<OUT, ?> inputFormat,
             TypeInformation<OUT> typeInfo) {
-        InputFormatSourceFunction<OUT> function =
-                new InputFormatSourceFunction<>(inputFormat, typeInfo);
+        InputFormatSource<OUT> source = new 
InputFormatSource<>(Boundedness.BOUNDED, inputFormat);
 
-        env.clean(function);
+        env.clean(source);
 
-        final StreamSource<OUT, ?> sourceOperator = new 
StreamSource<>(function);
         return new DataStreamSource<>(
-                env, typeInfo, sourceOperator, true, SOURCE_NAME, 
Boundedness.BOUNDED);
+                env, source, WatermarkStrategy.noWatermarks(), typeInfo, 
SOURCE_NAME);
     }
 }
diff --git 
a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointDeepCopyTest.java
 
b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointDeepCopyTest.java
index 0f2b3f3f98a..144aeb9d1c6 100644
--- 
a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointDeepCopyTest.java
+++ 
b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointDeepCopyTest.java
@@ -135,6 +135,7 @@ public class SavepointDeepCopyTest extends 
AbstractTestBaseJUnit4 {
     @Test
     public void testSavepointDeepCopy() throws Exception {
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
 
         DataStream<String> words = env.fromData(TEXT.split(" "));
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/api/connector/source/lib/InputFormatSource.java
 
b/flink-runtime/src/main/java/org/apache/flink/api/connector/source/lib/InputFormatSource.java
new file mode 100644
index 00000000000..cf1ae6d3e68
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/api/connector/source/lib/InputFormatSource.java
@@ -0,0 +1,328 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.lib;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.io.RichInputFormat;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.RichSourceReaderContext;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.core.io.InputStatus;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.util.InstantiationUtil;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+/** A {@link Source} that reads data using an {@link InputFormat}. */
+@Internal
+public class InputFormatSource<OUT> implements Source<OUT, SourceSplit, Void> {
+    private static final long serialVersionUID = 1L;
+
+    private final Boundedness boundedness;
+    private final InputFormat<OUT, InputSplit> format;
+
+    @SuppressWarnings("unchecked")
+    public InputFormatSource(Boundedness boundedness, InputFormat<OUT, ?> 
format) {
+        this.boundedness = boundedness;
+        this.format = (InputFormat<OUT, InputSplit>) format;
+    }
+
+    @Override
+    public Boundedness getBoundedness() {
+        return boundedness;
+    }
+
+    @Override
+    public SplitEnumerator<SourceSplit, Void> createEnumerator(
+            SplitEnumeratorContext<SourceSplit> context) throws Exception {
+        return new InputFormatSplitEnumerator<>(format, context);
+    }
+
+    @Override
+    public SplitEnumerator<SourceSplit, Void> restoreEnumerator(
+            SplitEnumeratorContext<SourceSplit> context, Void checkpoint) 
throws Exception {
+        return new InputFormatSplitEnumerator<>(format, context);
+    }
+
+    @Override
+    public SimpleVersionedSerializer<SourceSplit> getSplitSerializer() {
+        return new SimpleVersionedSerializer<>() {
+            @Override
+            public int getVersion() {
+                return 0;
+            }
+
+            @Override
+            public byte[] serialize(SourceSplit split) throws IOException {
+                return InstantiationUtil.serializeObject(split);
+            }
+
+            @Override
+            public SourceSplit deserialize(int version, byte[] serialized) 
throws IOException {
+                try {
+                    return InstantiationUtil.deserializeObject(
+                            serialized, 
Thread.currentThread().getContextClassLoader());
+                } catch (ClassNotFoundException e) {
+                    throw new IOException("Failed to deserialize 
SourceSplit.", e);
+                }
+            }
+        };
+    }
+
+    @Override
+    public SimpleVersionedSerializer<Void> getEnumeratorCheckpointSerializer() 
{
+        return new SimpleVersionedSerializer<>() {
+            @Override
+            public int getVersion() {
+                return 0;
+            }
+
+            @Override
+            public byte[] serialize(Void obj) {
+                return new byte[0];
+            }
+
+            @Override
+            public Void deserialize(int version, byte[] serialized) {
+                return null;
+            }
+        };
+    }
+
+    @Override
+    public SourceReader<OUT, SourceSplit> createReader(SourceReaderContext 
readerContext)
+            throws Exception {
+        RuntimeContext runtimeContext = null;
+        if (readerContext instanceof RichSourceReaderContext) {
+            runtimeContext = ((RichSourceReaderContext) 
readerContext).getRuntimeContext();
+        }
+        return new InputFormatSourceReader<>(readerContext, format, 
runtimeContext);
+    }
+
+    private static class InputSplitWrapperSourceSplit implements SourceSplit, 
Serializable {
+        private final InputSplit inputSplit;
+        private final String id;
+
+        public InputSplitWrapperSourceSplit(InputSplit inputSplit) {
+            this.inputSplit = inputSplit;
+            this.id = String.valueOf(inputSplit.getSplitNumber());
+        }
+
+        public InputSplit getInputSplit() {
+            return inputSplit;
+        }
+
+        @Override
+        public String splitId() {
+            return id;
+        }
+    }
+
+    private static class InputFormatSplitEnumerator<OUT>
+            implements SplitEnumerator<SourceSplit, Void> {
+        private final InputFormat<OUT, InputSplit> format;
+        private final SplitEnumeratorContext<SourceSplit> context;
+        private Queue<SourceSplit> remainingSplits;
+
+        public InputFormatSplitEnumerator(
+                InputFormat<OUT, InputSplit> format, 
SplitEnumeratorContext<SourceSplit> context) {
+            this.format = format;
+            this.context = context;
+        }
+
+        @Override
+        public void start() {
+            try {
+                remainingSplits =
+                        
Arrays.stream(format.createInputSplits(context.currentParallelism()))
+                                .map(InputSplitWrapperSourceSplit::new)
+                                
.collect(Collectors.toCollection(LinkedList::new));
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        }
+
+        @Override
+        public void handleSplitRequest(int subtaskId, @Nullable String 
requesterHostname) {
+            final SourceSplit nextSplit = remainingSplits.poll();
+            if (nextSplit != null) {
+                context.assignSplit(nextSplit, subtaskId);
+            } else {
+                context.signalNoMoreSplits(subtaskId);
+            }
+        }
+
+        @Override
+        public void addSplitsBack(List<SourceSplit> splits, int subtaskId) {
+            remainingSplits.addAll(splits);
+        }
+
+        @Override
+        public void addReader(int subtaskId) {}
+
+        @Override
+        public Void snapshotState(long checkpointId) {
+            return null;
+        }
+
+        @Override
+        public void close() {}
+    }
+
+    private static class InputFormatSourceReader<OUT> implements 
SourceReader<OUT, SourceSplit> {
+        private final SourceReaderContext readerContext;
+        private final InputFormat<OUT, InputSplit> format;
+        @Nullable private final RuntimeContext runtimeContext;
+        @Nullable private Counter completedSplitsCounter;
+        private Queue<SourceSplit> remainingSplits;
+        private boolean noMoreSplits;
+        private boolean isFormatOpen;
+        private OUT lastElement;
+
+        public InputFormatSourceReader(
+                SourceReaderContext readerContext,
+                InputFormat<OUT, InputSplit> format,
+                @Nullable RuntimeContext runtimeContext) {
+            this.format = format;
+            this.runtimeContext = runtimeContext;
+            this.readerContext = readerContext;
+        }
+
+        @Override
+        public void start() {
+            this.remainingSplits = new LinkedList<>();
+            if (runtimeContext != null) {
+                completedSplitsCounter =
+                        
runtimeContext.getMetricGroup().counter("numSplitsProcessed");
+            }
+            this.noMoreSplits = false;
+            this.isFormatOpen = false;
+            this.lastElement = null;
+
+            if (format instanceof RichInputFormat) {
+                ((RichInputFormat<?, ?>) 
format).setRuntimeContext(runtimeContext);
+            }
+            format.configure(readerContext.getConfiguration());
+            if (format instanceof RichInputFormat) {
+                try {
+                    ((RichInputFormat<?, ?>) format).openInputFormat();
+                } catch (IOException e) {
+                    throw new RuntimeException(e);
+                }
+            }
+            readerContext.sendSplitRequest();
+        }
+
+        @Override
+        public InputStatus pollNext(ReaderOutput<OUT> output) throws Exception 
{
+            // If no open format then try to open it
+            if (!isFormatOpen) {
+                InputSplitWrapperSourceSplit split =
+                        (InputSplitWrapperSourceSplit) remainingSplits.poll();
+                if (split != null) {
+                    format.open(split.getInputSplit());
+                    isFormatOpen = true;
+
+                    // We send a request for more
+                    if (remainingSplits.isEmpty() && !noMoreSplits) {
+                        readerContext.sendSplitRequest();
+                    }
+                }
+            }
+
+            // If there is a format which is not at the end then return the 
next record
+            if (isFormatOpen && !format.reachedEnd()) {
+                lastElement = format.nextRecord(lastElement);
+                output.collect(lastElement);
+                return InputStatus.MORE_AVAILABLE;
+            } else {
+                // Otherwise just close it
+                format.close();
+                isFormatOpen = false;
+                if (completedSplitsCounter != null) {
+                    completedSplitsCounter.inc();
+                }
+            }
+
+            // Here we have nothing to collect
+            if (remainingSplits.isEmpty()) {
+                if (noMoreSplits) {
+                    // No further data so signal end
+                    return InputStatus.END_OF_INPUT;
+                } else {
+                    // When there are splits remote then we signal nothing 
available
+                    return InputStatus.NOTHING_AVAILABLE;
+                }
+            } else {
+                // When we have splits locally then we just need to process 
them in the next
+                // round
+                return InputStatus.MORE_AVAILABLE;
+            }
+        }
+
+        @Override
+        public List<SourceSplit> snapshotState(long checkpointId) {
+            return List.of();
+        }
+
+        @Override
+        public CompletableFuture<Void> isAvailable() {
+            return CompletableFuture.completedFuture(null);
+        }
+
+        @Override
+        public void addSplits(List<SourceSplit> splits) {
+            remainingSplits.addAll(splits);
+        }
+
+        @Override
+        public void notifyNoMoreSplits() {
+            noMoreSplits = true;
+        }
+
+        @Override
+        public void close() throws Exception {
+            if (isFormatOpen) {
+                format.close();
+                isFormatOpen = false;
+            }
+            if (format instanceof RichInputFormat) {
+                ((RichInputFormat<?, ?>) format).closeInputFormat();
+            }
+        }
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
index b5fff8915da..e6cb4b99518 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
@@ -21,10 +21,12 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.eventtime.WatermarkAlignmentParams;
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import 
org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
 import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.RichSourceReaderContext;
 import org.apache.flink.api.connector.source.SourceEvent;
 import org.apache.flink.api.connector.source.SourceReader;
 import org.apache.flink.api.connector.source.SourceReaderContext;
@@ -278,10 +280,11 @@ public class SourceOperator<OUT, SplitT extends 
SourceSplit> extends AbstractStr
             return;
         }
 
-        final int subtaskIndex = 
getRuntimeContext().getTaskInfo().getIndexOfThisSubtask();
+        StreamingRuntimeContext runtimeContext = getRuntimeContext();
+        final int subtaskIndex = 
runtimeContext.getTaskInfo().getIndexOfThisSubtask();
 
-        final SourceReaderContext context =
-                new SourceReaderContext() {
+        final RichSourceReaderContext context =
+                new RichSourceReaderContext() {
                     @Override
                     public SourceReaderMetricGroup metricGroup() {
                         return sourceMetricGroup;
@@ -345,6 +348,11 @@ public class SourceOperator<OUT, SplitT extends 
SourceSplit> extends AbstractStr
                                         watermark,
                                         
watermarkIsAlignedMap.get(watermark.getIdentifier())));
                     }
+
+                    @Override
+                    public RuntimeContext getRuntimeContext() {
+                        return runtimeContext;
+                    }
                 };
 
         sourceReader = readerFactory.apply(context);

Reply via email to