This is an automated email from the ASF dual-hosted git repository.

fpaul pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 9463c9b155e [FLINK-32695] [Tests] Removed SourceFunction Dependency 
from YarnTestCacheJob.
9463c9b155e is described below

commit 9463c9b155e20b753bd4da697113f12e4333d5fe
Author: Poorvank <poorv...@uber.com>
AuthorDate: Wed Jul 23 09:33:18 2025 +0530

    [FLINK-32695] [Tests] Removed SourceFunction Dependency from 
YarnTestCacheJob.
---
 .../flink/yarn/testjob/YarnTestCacheJob.java       | 31 +---------------------
 1 file changed, 1 insertion(+), 30 deletions(-)

diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/testjob/YarnTestCacheJob.java
 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/testjob/YarnTestCacheJob.java
index 4cdd28aeeb6..3b4d34f28fb 100644
--- 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/testjob/YarnTestCacheJob.java
+++ 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/testjob/YarnTestCacheJob.java
@@ -21,11 +21,9 @@ package org.apache.flink.yarn.testjob;
 import org.apache.flink.api.common.functions.OpenContext;
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
-import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction;
 
 import org.apache.flink.shaded.guava33.com.google.common.collect.ImmutableList;
 
@@ -54,7 +52,7 @@ public class YarnTestCacheJob {
         env.registerCachedFile(testDirectory.getAbsolutePath(), 
TEST_DIRECTORY_NAME);
         env.registerCachedFile(cacheFilePath, "cacheFile", false);
 
-        env.addSource(new GenericSourceFunction(LIST, 
TypeInformation.of(String.class)))
+        env.fromData(LIST)
                 .setParallelism(1)
                 .map(new MapperFunction(), TypeInformation.of(String.class))
                 .setParallelism(1)
@@ -94,31 +92,4 @@ public class YarnTestCacheJob {
             return value;
         }
     }
-
-    private static class GenericSourceFunction<T>
-            implements SourceFunction<T>, ResultTypeQueryable<T> {
-        private List<T> inputDataset;
-        private TypeInformation returnType;
-
-        GenericSourceFunction(List<T> inputDataset, TypeInformation 
returnType) {
-            this.inputDataset = inputDataset;
-            this.returnType = returnType;
-        }
-
-        @Override
-        public void run(SourceContext<T> ctx) throws Exception {
-
-            for (T t : inputDataset) {
-                ctx.collect(t);
-            }
-        }
-
-        @Override
-        public void cancel() {}
-
-        @Override
-        public TypeInformation getProducedType() {
-            return this.returnType;
-        }
-    }
 }

Reply via email to