This is an automated email from the ASF dual-hosted git repository. fpaul pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 9463c9b155e [FLINK-32695] [Tests] Removed SourceFunction Dependency from YarnTestCacheJob. 9463c9b155e is described below commit 9463c9b155e20b753bd4da697113f12e4333d5fe Author: Poorvank <poorv...@uber.com> AuthorDate: Wed Jul 23 09:33:18 2025 +0530 [FLINK-32695] [Tests] Removed SourceFunction Dependency from YarnTestCacheJob. --- .../flink/yarn/testjob/YarnTestCacheJob.java | 31 +--------------------- 1 file changed, 1 insertion(+), 30 deletions(-) diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/testjob/YarnTestCacheJob.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/testjob/YarnTestCacheJob.java index 4cdd28aeeb6..3b4d34f28fb 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/testjob/YarnTestCacheJob.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/testjob/YarnTestCacheJob.java @@ -21,11 +21,9 @@ package org.apache.flink.yarn.testjob; import org.apache.flink.api.common.functions.OpenContext; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink; -import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction; import org.apache.flink.shaded.guava33.com.google.common.collect.ImmutableList; @@ -54,7 +52,7 @@ public class YarnTestCacheJob { env.registerCachedFile(testDirectory.getAbsolutePath(), TEST_DIRECTORY_NAME); env.registerCachedFile(cacheFilePath, "cacheFile", false); - env.addSource(new GenericSourceFunction(LIST, TypeInformation.of(String.class))) + env.fromData(LIST) .setParallelism(1) .map(new MapperFunction(), TypeInformation.of(String.class)) .setParallelism(1) @@ -94,31 +92,4 @@ public class YarnTestCacheJob { return value; } } - - private static class GenericSourceFunction<T> - implements SourceFunction<T>, ResultTypeQueryable<T> { - private List<T> inputDataset; - private TypeInformation returnType; - - GenericSourceFunction(List<T> inputDataset, TypeInformation returnType) { - this.inputDataset = inputDataset; - this.returnType = returnType; - } - - @Override - public void run(SourceContext<T> ctx) throws Exception { - - for (T t : inputDataset) { - ctx.collect(t); - } - } - - @Override - public void cancel() {} - - @Override - public TypeInformation getProducedType() { - return this.returnType; - } - } }