This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 18c03f2e6c593a772f64cdb5c089e2911d3cbc89
Author: Alexander Fedulov <1492164+afedu...@users.noreply.github.com>
AuthorDate: Mon Nov 27 23:29:27 2023 +0100

    [FLINK-28229][connectors] Deprecate 
StreamExecutionEnvironment#fromCollection()
---
 .../program/StreamContextEnvironmentTest.java      | 16 ++---
 .../client/testjar/ForbidConfigurationJob.java     |  2 +-
 .../flink/streaming/examples/join/WindowJoin.java  | 16 +++--
 .../examples/join/WindowJoinSampleData.java        | 75 ++++++----------------
 .../examples/java/basics/StreamSQLExample.java     |  4 +-
 .../flink/formats/protobuf/ProtobufTestHelper.java |  2 +-
 .../apache/flink/state/api/SavepointWriter.java    |  4 +-
 .../flink/state/api/SavepointWriterITCase.java     | 12 ++--
 .../api/SavepointWriterUidModificationITCase.java  |  4 +-
 .../state/api/SavepointWriterWindowITCase.java     |  8 +--
 .../flink/state/api/WritableSavepointITCase.java   |  8 +--
 .../flink/python/util/PythonConfigUtilTest.java    |  2 +-
 .../environment/StreamExecutionEnvironment.java    | 15 +++++
 .../StreamExecutionEnvironmentTest.java            | 57 ++++++----------
 .../ExecutorDiscoveryAndJobClientTest.java         |  2 +-
 .../MultipleInputNodeCreationProcessorTest.java    | 14 +++-
 .../runtime/stream/sql/DataStreamJavaITCase.java   |  6 +-
 .../stream/table/TimeAttributesITCase.scala        | 15 +++--
 .../expressions/utils/ExpressionTestBase.scala     |  2 +-
 .../flink/table/planner/utils/TableTestBase.scala  |  4 +-
 .../table/planner/utils/testTableSourceSinks.scala | 10 +--
 .../multipleinput/MultipleInputTestBase.java       |  2 +-
 .../testframe/testsuites/SinkTestSuiteBase.java    |  2 +-
 .../test/accumulators/AccumulatorLiveITCase.java   |  2 +-
 .../streaming/runtime/BroadcastStateITCase.java    |  4 +-
 .../streaming/runtime/DataStreamPojoITCase.java    |  8 +--
 .../test/streaming/runtime/IterateITCase.java      |  4 +-
 .../streaming/runtime/LatencyMarkerITCase.java     | 10 +--
 .../test/streaming/runtime/PartitionerITCase.java  |  2 +-
 .../test/streaming/runtime/SideOutputITCase.java   | 46 ++++++-------
 .../flink/test/streaming/runtime/SinkITCase.java   |  6 +-
 .../flink/test/streaming/runtime/SinkV2ITCase.java |  2 +-
 tools/maven/checkstyle.xml                         |  2 +-
 33 files changed, 174 insertions(+), 194 deletions(-)

diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/program/StreamContextEnvironmentTest.java
 
b/flink-clients/src/test/java/org/apache/flink/client/program/StreamContextEnvironmentTest.java
index 1af115a402a..ac157b04f93 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/program/StreamContextEnvironmentTest.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/program/StreamContextEnvironmentTest.java
@@ -78,7 +78,7 @@ class StreamContextEnvironmentTest {
         // Add/mutate values in the configuration
         environment.configure(programConfig);
 
-        environment.fromCollection(Collections.singleton(1)).sinkTo(new 
DiscardingSink<>());
+        environment.fromData(Collections.singleton(1)).sinkTo(new 
DiscardingSink<>());
         assertThatThrownBy(() -> executor.accept(environment))
                 .isInstanceOf(MutatedConfigurationException.class)
                 .hasMessageContainingAll(
@@ -106,7 +106,7 @@ class StreamContextEnvironmentTest {
         // Change the CheckpointConfig
         environment.getCheckpointConfig().setCheckpointStorage(disallowedPath);
 
-        environment.fromCollection(Collections.singleton(1)).sinkTo(new 
DiscardingSink<>());
+        environment.fromData(Collections.singleton(1)).sinkTo(new 
DiscardingSink<>());
         assertThatThrownBy(() -> executor.accept(environment))
                 .isInstanceOf(MutatedConfigurationException.class)
                 .hasMessageContainingAll(
@@ -114,7 +114,7 @@ class StreamContextEnvironmentTest {
 
         environment.getCheckpointConfig().setCheckpointStorage(new 
JobManagerCheckpointStorage());
 
-        environment.fromCollection(Collections.singleton(1)).sinkTo(new 
DiscardingSink<>());
+        environment.fromData(Collections.singleton(1)).sinkTo(new 
DiscardingSink<>());
         assertThatThrownBy(() -> executor.accept(environment))
                 .isInstanceOf(MutatedConfigurationException.class)
                 .hasMessageContainingAll(
@@ -143,7 +143,7 @@ class StreamContextEnvironmentTest {
                         false,
                         Collections.emptyList());
 
-        environment.fromCollection(Collections.singleton(1)).sinkTo(new 
DiscardingSink<>());
+        environment.fromData(Collections.singleton(1)).sinkTo(new 
DiscardingSink<>());
         assertThatThrownBy(() -> executor.accept(environment))
                 .isInstanceOf(MutatedConfigurationException.class)
                 .hasMessageContainingAll(
@@ -169,7 +169,7 @@ class StreamContextEnvironmentTest {
         // Change the CheckpointConfig
         environment.getCheckpointConfig().setCheckpointStorage(allowedPath);
 
-        environment.fromCollection(Collections.singleton(1)).sinkTo(new 
DiscardingSink<>());
+        environment.fromData(Collections.singleton(1)).sinkTo(new 
DiscardingSink<>());
         assertThatThrownBy(() -> executor.accept(environment))
                 .isInstanceOf(ExecutorReachedException.class);
     }
@@ -186,7 +186,7 @@ class StreamContextEnvironmentTest {
         final StreamContextEnvironment environment =
                 constructStreamContextEnvironment(clusterConfig, 
Collections.emptyList());
 
-        environment.fromCollection(Collections.singleton(1)).sinkTo(new 
DiscardingSink<>());
+        environment.fromData(Collections.singleton(1)).sinkTo(new 
DiscardingSink<>());
         assertThatThrownBy(() -> executor.accept(environment))
                 .isInstanceOf(ExecutorReachedException.class);
     }
@@ -202,7 +202,7 @@ class StreamContextEnvironmentTest {
         final StreamContextEnvironment environment =
                 constructStreamContextEnvironment(clusterConfig, 
Collections.emptyList());
 
-        environment.fromCollection(Collections.singleton(1)).sinkTo(new 
DiscardingSink<>());
+        environment.fromData(Collections.singleton(1)).sinkTo(new 
DiscardingSink<>());
         assertThatThrownBy(() -> executor.accept(environment))
                 .isInstanceOf(ExecutorReachedException.class);
     }
@@ -235,7 +235,7 @@ class StreamContextEnvironmentTest {
         environment.configure(jobConfig);
         environment.getConfig().setMaxParallelism(1024);
 
-        environment.fromCollection(Collections.singleton(1)).sinkTo(new 
DiscardingSink<>());
+        environment.fromData(Collections.singleton(1)).sinkTo(new 
DiscardingSink<>());
         assertThatThrownBy(() -> executor.accept(environment))
                 .isInstanceOf(ExecutorReachedException.class);
         assertThat(environment.getConfig().getGlobalJobParameters().toMap())
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/testjar/ForbidConfigurationJob.java
 
b/flink-clients/src/test/java/org/apache/flink/client/testjar/ForbidConfigurationJob.java
index 19d84990f91..8e7f0222e2a 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/testjar/ForbidConfigurationJob.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/testjar/ForbidConfigurationJob.java
@@ -37,7 +37,7 @@ public class ForbidConfigurationJob {
         final StreamExecutionEnvironment env =
                 StreamExecutionEnvironment.getExecutionEnvironment(config);
 
-        env.fromCollection(Lists.newArrayList(1, 2, 3)).sinkTo(new 
DiscardingSink<>());
+        env.fromData(Lists.newArrayList(1, 2, 3)).sinkTo(new 
DiscardingSink<>());
         env.execute();
     }
 }
diff --git 
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
index e6332090289..3b6d6c2bd89 100644
--- 
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
+++ 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
@@ -37,8 +37,6 @@ import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import 
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
 import 
org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
 import org.apache.flink.streaming.api.windowing.time.Time;
-import 
org.apache.flink.streaming.examples.join.WindowJoinSampleData.GradeSource;
-import 
org.apache.flink.streaming.examples.join.WindowJoinSampleData.SalarySource;
 
 import java.time.Duration;
 
@@ -76,12 +74,18 @@ public class WindowJoin {
 
         // create the data sources for both grades and salaries
         DataStream<Tuple2<String, Integer>> grades =
-                GradeSource.getSource(env, rate)
-                        
.assignTimestampsAndWatermarks(IngestionTimeWatermarkStrategy.create());
+                env.fromSource(
+                                
WindowJoinSampleData.getGradeGeneratorSource(rate),
+                                IngestionTimeWatermarkStrategy.create(),
+                                "Grades Data Generator")
+                        .setParallelism(1);
 
         DataStream<Tuple2<String, Integer>> salaries =
-                SalarySource.getSource(env, rate)
-                        
.assignTimestampsAndWatermarks(IngestionTimeWatermarkStrategy.create());
+                env.fromSource(
+                                
WindowJoinSampleData.getSalaryGeneratorSource(rate),
+                                IngestionTimeWatermarkStrategy.create(),
+                                "Grades Data Generator")
+                        .setParallelism(1);
 
         // run the actual window join program
         // for testability, this functionality is in a separate method.
diff --git 
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/join/WindowJoinSampleData.java
 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/join/WindowJoinSampleData.java
index fbaec8e64de..55f59ed436a 100644
--- 
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/join/WindowJoinSampleData.java
+++ 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/join/WindowJoinSampleData.java
@@ -20,13 +20,11 @@ package org.apache.flink.streaming.examples.join;
 
 import org.apache.flink.api.common.typeinfo.TypeHint;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import 
org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.examples.utils.ThrottledIterator;
+import org.apache.flink.connector.datagen.source.DataGeneratorSource;
+import org.apache.flink.connector.datagen.source.GeneratorFunction;
 
-import java.io.Serializable;
-import java.util.Iterator;
 import java.util.Random;
 
 /** Sample data for the {@link WindowJoin} example. */
@@ -38,58 +36,27 @@ public class WindowJoinSampleData {
     static final int SALARY_MAX = 10000;
 
     /** Continuously generates (name, grade). */
-    public static class GradeSource implements Iterator<Tuple2<String, 
Integer>>, Serializable {
-
-        private final Random rnd = new Random(hashCode());
-
-        @Override
-        public boolean hasNext() {
-            return true;
-        }
-
-        @Override
-        public Tuple2<String, Integer> next() {
-            return new Tuple2<>(NAMES[rnd.nextInt(NAMES.length)], 
rnd.nextInt(GRADE_COUNT) + 1);
-        }
-
-        @Override
-        public void remove() {
-            throw new UnsupportedOperationException();
-        }
-
-        public static DataStream<Tuple2<String, Integer>> getSource(
-                StreamExecutionEnvironment env, long rate) {
-            return env.fromCollection(
-                    new ThrottledIterator<>(new GradeSource(), rate),
-                    TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() 
{}));
-        }
+    public static DataGeneratorSource<Tuple2<String, Integer>> 
getGradeGeneratorSource(
+            double elementsPerSecond) {
+        return getTupleGeneratorSource(GRADE_COUNT, elementsPerSecond);
     }
 
     /** Continuously generates (name, salary). */
-    public static class SalarySource implements Iterator<Tuple2<String, 
Integer>>, Serializable {
-
-        private final Random rnd = new Random(hashCode());
-
-        @Override
-        public boolean hasNext() {
-            return true;
-        }
-
-        @Override
-        public Tuple2<String, Integer> next() {
-            return new Tuple2<>(NAMES[rnd.nextInt(NAMES.length)], 
rnd.nextInt(SALARY_MAX) + 1);
-        }
-
-        @Override
-        public void remove() {
-            throw new UnsupportedOperationException();
-        }
+    public static DataGeneratorSource<Tuple2<String, Integer>> 
getSalaryGeneratorSource(
+            double elementsPerSecond) {
+        return getTupleGeneratorSource(SALARY_MAX, elementsPerSecond);
+    }
 
-        public static DataStream<Tuple2<String, Integer>> getSource(
-                StreamExecutionEnvironment env, long rate) {
-            return env.fromCollection(
-                    new ThrottledIterator<>(new SalarySource(), rate),
-                    TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() 
{}));
-        }
+    private static DataGeneratorSource<Tuple2<String, Integer>> 
getTupleGeneratorSource(
+            int maxValue, double elementsPerSecond) {
+        final Random rnd = new Random();
+        final GeneratorFunction<Long, Tuple2<String, Integer>> 
generatorFunction =
+                index -> new Tuple2<>(NAMES[rnd.nextInt(NAMES.length)], 
rnd.nextInt(maxValue) + 1);
+
+        return new DataGeneratorSource<>(
+                generatorFunction,
+                Long.MAX_VALUE,
+                RateLimiterStrategy.perSecond(elementsPerSecond),
+                TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() 
{}));
     }
 }
diff --git 
a/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/basics/StreamSQLExample.java
 
b/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/basics/StreamSQLExample.java
index 194b7e1f90b..ad7ad7ef40e 100644
--- 
a/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/basics/StreamSQLExample.java
+++ 
b/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/basics/StreamSQLExample.java
@@ -55,14 +55,14 @@ public final class StreamSQLExample {
         final StreamTableEnvironment tableEnv = 
StreamTableEnvironment.create(env);
 
         final DataStream<Order> orderA =
-                env.fromCollection(
+                env.fromData(
                         Arrays.asList(
                                 new Order(1L, "beer", 3),
                                 new Order(1L, "diaper", 4),
                                 new Order(3L, "rubber", 2)));
 
         final DataStream<Order> orderB =
-                env.fromCollection(
+                env.fromData(
                         Arrays.asList(
                                 new Order(2L, "pen", 3),
                                 new Order(2L, "rubber", 3),
diff --git 
a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/ProtobufTestHelper.java
 
b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/ProtobufTestHelper.java
index daaa8d68fcc..c24b1909aa8 100644
--- 
a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/ProtobufTestHelper.java
+++ 
b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/ProtobufTestHelper.java
@@ -56,7 +56,7 @@ public class ProtobufTestHelper {
                 (Row) 
DataFormatConverters.getConverterForDataType(rowDataType).toExternal(rowData);
         TypeInformation<Row> rowTypeInfo =
                 (TypeInformation<Row>) 
TypeConversions.fromDataTypeToLegacyInfo(rowDataType);
-        DataStream<Row> rows = 
env.fromCollection(Collections.singletonList(row), rowTypeInfo);
+        DataStream<Row> rows = env.fromData(Collections.singletonList(row), 
rowTypeInfo);
 
         Table table = tableEnv.fromDataStream(rows);
         tableEnv.createTemporaryView("t", table);
diff --git 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/SavepointWriter.java
 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/SavepointWriter.java
index 99f73fa524b..01f51ca997a 100644
--- 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/SavepointWriter.java
+++ 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/SavepointWriter.java
@@ -355,9 +355,7 @@ public class SavepointWriter {
         }
 
         DataStream<OperatorState> existingOperatorStates =
-                executionEnvironment
-                        .fromCollection(existingOperators)
-                        .name("existingOperatorStates");
+                
executionEnvironment.fromData(existingOperators).name("existingOperatorStates");
 
         existingOperatorStates
                 .flatMap(new StatePathExtractor())
diff --git 
a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterITCase.java
 
b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterITCase.java
index f5fb6c16dbc..13e6c47ac75 100644
--- 
a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterITCase.java
+++ 
b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterITCase.java
@@ -113,12 +113,12 @@ public class SavepointWriterITCase extends 
AbstractTestBase {
         env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
 
         StateBootstrapTransformation<Account> transformation =
-                
OperatorTransformation.bootstrapWith(env.fromCollection(accounts))
+                OperatorTransformation.bootstrapWith(env.fromData(accounts))
                         .keyBy(acc -> acc.id)
                         .transform(new AccountBootstrapper());
 
         StateBootstrapTransformation<CurrencyRate> broadcastTransformation =
-                
OperatorTransformation.bootstrapWith(env.fromCollection(currencyRates))
+                
OperatorTransformation.bootstrapWith(env.fromData(currencyRates))
                         .transform(new CurrencyBootstrapFunction());
 
         SavepointWriter writer =
@@ -141,15 +141,15 @@ public class SavepointWriterITCase extends 
AbstractTestBase {
         }
 
         DataStream<Account> stream =
-                env.fromCollection(accounts)
+                env.fromData(accounts)
                         .keyBy(acc -> acc.id)
                         .flatMap(new UpdateAndGetAccount())
                         .uid(ACCOUNT_UID);
 
         final CloseableIterator<Account> results = stream.collectAsync();
 
-        env.fromCollection(currencyRates)
-                
.connect(env.fromCollection(currencyRates).broadcast(descriptor))
+        env.fromData(currencyRates)
+                .connect(env.fromData(currencyRates).broadcast(descriptor))
                 .process(new CurrencyValidationFunction())
                 .uid(CURRENCY_UID)
                 .sinkTo(new DiscardingSink<>());
@@ -192,7 +192,7 @@ public class SavepointWriterITCase extends AbstractTestBase 
{
         }
 
         DataStream<Account> stream =
-                sEnv.fromCollection(accounts)
+                sEnv.fromData(accounts)
                         .keyBy(acc -> acc.id)
                         .flatMap(new UpdateAndGetAccount())
                         .uid(ACCOUNT_UID);
diff --git 
a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterUidModificationITCase.java
 
b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterUidModificationITCase.java
index e6c83ec7c30..1b6b665d687 100644
--- 
a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterUidModificationITCase.java
+++ 
b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterUidModificationITCase.java
@@ -161,7 +161,7 @@ public class SavepointWriterUidModificationITCase {
 
     private static StateBootstrapTransformation<Integer> bootstrap(
             StreamExecutionEnvironment env, Collection<Integer> data) {
-        return OperatorTransformation.bootstrapWith(env.fromCollection(data))
+        return OperatorTransformation.bootstrapWith(env.fromData(data))
                 .keyBy(v -> v)
                 .transform(new StateBootstrapper());
     }
@@ -194,7 +194,7 @@ public class SavepointWriterUidModificationITCase {
         final List<CloseableIterator<Integer>> iterators = new ArrayList<>();
         for (Tuple2<Collection<Integer>, String> assertion : assertions) {
             iterators.add(
-                    env.fromCollection(assertion.f0)
+                    env.fromData(assertion.f0)
                             .keyBy(v -> v)
                             .map(new StateReader())
                             .uid(assertion.f1)
diff --git 
a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterWindowITCase.java
 
b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterWindowITCase.java
index 32345098aca..bc08d725d2f 100644
--- 
a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterWindowITCase.java
+++ 
b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterWindowITCase.java
@@ -146,7 +146,7 @@ public class SavepointWriterWindowITCase extends 
AbstractTestBase {
         env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
 
         DataStream<Tuple2<String, Integer>> bootstrapData =
-                env.fromCollection(WORDS)
+                env.fromData(WORDS)
                         .map(word -> Tuple2.of(word, 1))
                         .returns(TUPLE_TYPE_INFO)
                         .assignTimestampsAndWatermarks(
@@ -190,7 +190,7 @@ public class SavepointWriterWindowITCase extends 
AbstractTestBase {
         env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
 
         DataStream<Tuple2<String, Integer>> bootstrapData =
-                env.fromCollection(WORDS)
+                env.fromData(WORDS)
                         .map(word -> Tuple2.of(word, 1))
                         .returns(TUPLE_TYPE_INFO)
                         .assignTimestampsAndWatermarks(
@@ -236,7 +236,7 @@ public class SavepointWriterWindowITCase extends 
AbstractTestBase {
         env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
 
         DataStream<Tuple2<String, Integer>> bootstrapData =
-                env.fromCollection(WORDS)
+                env.fromData(WORDS)
                         .map(word -> Tuple2.of(word, 1), TUPLE_TYPE_INFO)
                         .assignTimestampsAndWatermarks(
                                 WatermarkStrategy.<Tuple2<String, 
Integer>>noWatermarks()
@@ -283,7 +283,7 @@ public class SavepointWriterWindowITCase extends 
AbstractTestBase {
         env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
 
         DataStream<Tuple2<String, Integer>> bootstrapData =
-                env.fromCollection(WORDS)
+                env.fromData(WORDS)
                         .map(word -> Tuple2.of(word, 1))
                         .returns(TUPLE_TYPE_INFO)
                         .assignTimestampsAndWatermarks(
diff --git 
a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/WritableSavepointITCase.java
 
b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/WritableSavepointITCase.java
index 1d386ef084d..e94ad57ee42 100644
--- 
a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/WritableSavepointITCase.java
+++ 
b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/WritableSavepointITCase.java
@@ -151,15 +151,15 @@ public class WritableSavepointITCase extends 
AbstractTestBase {
         sEnv.setStateBackend(backend);
 
         DataStream<Account> stream =
-                sEnv.fromCollection(accounts)
+                sEnv.fromData(accounts)
                         .keyBy(acc -> acc.id)
                         .flatMap(new UpdateAndGetAccount())
                         .uid(ACCOUNT_UID);
 
         CloseableIterator<Account> results = stream.collectAsync();
 
-        sEnv.fromCollection(currencyRates)
-                
.connect(sEnv.fromCollection(currencyRates).broadcast(descriptor))
+        sEnv.fromData(currencyRates)
+                .connect(sEnv.fromData(currencyRates).broadcast(descriptor))
                 .process(new CurrencyValidationFunction())
                 .uid(CURRENCY_UID)
                 .sinkTo(new DiscardingSink<>());
@@ -195,7 +195,7 @@ public class WritableSavepointITCase extends 
AbstractTestBase {
         sEnv.setStateBackend(backend);
 
         DataStream<Account> stream =
-                sEnv.fromCollection(accounts)
+                sEnv.fromData(accounts)
                         .keyBy(acc -> acc.id)
                         .flatMap(new UpdateAndGetAccount())
                         .uid(ACCOUNT_UID);
diff --git 
a/flink-python/src/test/java/org/apache/flink/python/util/PythonConfigUtilTest.java
 
b/flink-python/src/test/java/org/apache/flink/python/util/PythonConfigUtilTest.java
index f9e0362181b..322f21a34ba 100644
--- 
a/flink-python/src/test/java/org/apache/flink/python/util/PythonConfigUtilTest.java
+++ 
b/flink-python/src/test/java/org/apache/flink/python/util/PythonConfigUtilTest.java
@@ -39,7 +39,7 @@ class PythonConfigUtilTest {
         config.set(PipelineOptions.NAME, jobName);
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment(config);
 
-        env.fromCollection(Collections.singletonList("test")).sinkTo(new 
DiscardingSink<>());
+        env.fromData(Collections.singletonList("test")).sinkTo(new 
DiscardingSink<>());
         StreamGraph streamGraph = env.getStreamGraph(true);
         assertThat(streamGraph.getJobName()).isEqualTo(jobName);
     }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 18dc49d3895..4b116805166 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -41,6 +41,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.connector.source.Boundedness;
 import org.apache.flink.api.connector.source.Source;
 import org.apache.flink.api.connector.source.lib.NumberSequenceSource;
+import 
org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
 import org.apache.flink.api.dag.Transformation;
 import org.apache.flink.api.java.ClosureCleaner;
 import org.apache.flink.api.java.Utils;
@@ -1474,6 +1475,8 @@ public class StreamExecutionEnvironment implements 
AutoCloseable {
      * @param data The collection of elements to create the data stream from.
      * @param <OUT> The generic type of the returned data stream.
      * @return The data stream representing the given collection
+     * @deprecated This method will be removed a future release, possibly as 
early as version 2.0.
+     *     Use {@link #fromData(Collection)} instead.
      */
     public <OUT> DataStreamSource<OUT> fromCollection(Collection<OUT> data) {
         TypeInformation<OUT> typeInfo = extractTypeInfoFromCollection(data);
@@ -1490,6 +1493,8 @@ public class StreamExecutionEnvironment implements 
AutoCloseable {
      * @param typeInfo The TypeInformation for the produced data stream
      * @param <OUT> The type of the returned data stream
      * @return The data stream representing the given collection
+     * @deprecated This method will be removed a future release, possibly as 
early as version 2.0.
+     *     Use {@link #fromData(Collection, TypeInformation)} instead.
      */
     public <OUT> DataStreamSource<OUT> fromCollection(
             Collection<OUT> data, TypeInformation<OUT> typeInfo) {
@@ -1519,6 +1524,11 @@ public class StreamExecutionEnvironment implements 
AutoCloseable {
      * @return The data stream representing the elements in the iterator
      * @see #fromCollection(java.util.Iterator,
      *     org.apache.flink.api.common.typeinfo.TypeInformation)
+     * @deprecated This method will be removed a future release, possibly as 
early as version 2.0.
+     *     Use {@link #fromData(Collection, TypeInformation)} instead. For 
rate-limited data
+     *     generation, use {@link DataGeneratorSource} with {@link 
RateLimiterStrategy}. If you need
+     *     to use a fixed set of elements in such scenario, combine it with 
{@link
+     *     FromElementsGeneratorFunction}.
      */
     public <OUT> DataStreamSource<OUT> fromCollection(Iterator<OUT> data, 
Class<OUT> type) {
         return fromCollection(data, TypeExtractor.getForClass(type));
@@ -1540,6 +1550,11 @@ public class StreamExecutionEnvironment implements 
AutoCloseable {
      * @param typeInfo The TypeInformation for the produced data stream
      * @param <OUT> The type of the returned data stream
      * @return The data stream representing the elements in the iterator
+     * @deprecated This method will be removed a future release, possibly as 
early as version 2.0.
+     *     Use {@link #fromData(Collection, TypeInformation)} instead. For 
rate-limited data
+     *     generation, use {@link DataGeneratorSource} with {@link 
RateLimiterStrategy}. If you need
+     *     to use a fixed set of elements in such scenario, combine it with 
{@link
+     *     FromElementsGeneratorFunction}.
      */
     public <OUT> DataStreamSource<OUT> fromCollection(
             Iterator<OUT> data, TypeInformation<OUT> typeInfo) {
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentTest.java
index 0f1aa2c7a08..b2be45482fb 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentTest.java
@@ -141,14 +141,6 @@ class StreamExecutionEnvironmentTest {
             TypeInformation<Integer> typeInfo = BasicTypeInfo.INT_TYPE_INFO;
             StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
 
-            DataStreamSource<Integer> dataStream1 =
-                    env.fromCollection(new DummySplittableIterator<Integer>(), 
typeInfo);
-
-            assertThatThrownBy(() -> dataStream1.setParallelism(4))
-                    .isInstanceOf(IllegalArgumentException.class);
-
-            dataStream1.sinkTo(new DiscardingSink<>());
-
             DataStreamSource<Integer> dataStream2 =
                     env.fromParallelCollection(new 
DummySplittableIterator<Integer>(), typeInfo)
                             .setParallelism(4);
@@ -158,9 +150,6 @@ class StreamExecutionEnvironmentTest {
             final StreamGraph streamGraph = env.getStreamGraph();
             streamGraph.getStreamingPlanAsJSON();
 
-            
assertThat(streamGraph.getStreamNode(dataStream1.getId()).getParallelism())
-                    .as("Parallelism of collection source must be 1.")
-                    .isOne();
             
assertThat(streamGraph.getStreamNode(dataStream2.getId()).getParallelism())
                     .as("Parallelism of parallel collection source must be 4.")
                     .isEqualTo(4);
@@ -172,6 +161,8 @@ class StreamExecutionEnvironmentTest {
 
     @Test
     void testSources() {
+        // TODO: remove this test when SourceFunction API gets removed 
together with the deprecated
+        //  StreamExecutionEnvironment generateSequence() and fromCollection() 
methods
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
 
         SourceFunction<Integer> srcFun =
@@ -329,33 +320,25 @@ class StreamExecutionEnvironmentTest {
 
     @Test
     void testGetStreamGraph() {
-        try {
-            TypeInformation<Integer> typeInfo = BasicTypeInfo.INT_TYPE_INFO;
-            StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-
-            DataStreamSource<Integer> dataStream1 =
-                    env.fromCollection(new DummySplittableIterator<Integer>(), 
typeInfo);
-            dataStream1.sinkTo(new DiscardingSink<>());
-            
assertThat(env.getStreamGraph().getStreamNodes().size()).isEqualTo(2);
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
 
-            DataStreamSource<Integer> dataStream2 =
-                    env.fromCollection(new DummySplittableIterator<Integer>(), 
typeInfo);
-            dataStream2.sinkTo(new DiscardingSink<>());
-            
assertThat(env.getStreamGraph().getStreamNodes().size()).isEqualTo(2);
-
-            DataStreamSource<Integer> dataStream3 =
-                    env.fromCollection(new DummySplittableIterator<Integer>(), 
typeInfo);
-            dataStream3.sinkTo(new DiscardingSink<>());
-            // Does not clear the transformations.
-            env.getExecutionPlan();
-            DataStreamSource<Integer> dataStream4 =
-                    env.fromCollection(new DummySplittableIterator<Integer>(), 
typeInfo);
-            dataStream4.sinkTo(new DiscardingSink<>());
-            
assertThat(env.getStreamGraph().getStreamNodes().size()).isEqualTo(4);
-        } catch (Exception e) {
-            e.printStackTrace();
-            fail(e.getMessage());
-        }
+        DataStreamSource<Integer> dataStream1 = env.fromData(1, 2, 3);
+        dataStream1.sinkTo(new DiscardingSink<>());
+        assertThat(env.getStreamGraph().getStreamNodes().size()).isEqualTo(2);
+
+        DataStreamSource<Integer> dataStream2 = env.fromData(1, 2, 3);
+        dataStream2.sinkTo(new DiscardingSink<>());
+        // Previous getStreamGraph() call cleaned dataStream1 transformations
+        assertThat(env.getStreamGraph().getStreamNodes().size()).isEqualTo(2);
+
+        DataStreamSource<Integer> dataStream3 = env.fromData(1, 2, 3);
+        dataStream3.sinkTo(new DiscardingSink<>());
+        // Does not clear the transformations.
+        env.getExecutionPlan();
+        DataStreamSource<Integer> dataStream4 = env.fromData(1, 2, 3);
+        dataStream4.sinkTo(new DiscardingSink<>());
+        // dataStream3 are preserved
+        assertThat(env.getStreamGraph().getStreamNodes().size()).isEqualTo(4);
     }
 
     @Test
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/ExecutorDiscoveryAndJobClientTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/ExecutorDiscoveryAndJobClientTest.java
index 516424be6f8..d71ccd343db 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/ExecutorDiscoveryAndJobClientTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/ExecutorDiscoveryAndJobClientTest.java
@@ -65,7 +65,7 @@ public class ExecutorDiscoveryAndJobClientTest {
     private JobExecutionResult executeTestJobBasedOnConfig(final Configuration 
configuration)
             throws Exception {
         final StreamExecutionEnvironment env = new 
StreamExecutionEnvironment(configuration);
-        env.fromCollection(Collections.singletonList(42)).sinkTo(new 
DiscardingSink<>());
+        env.fromData(Collections.singletonList(42)).sinkTo(new 
DiscardingSink<>());
         return env.execute();
     }
 
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/processor/MultipleInputNodeCreationProcessorTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/processor/MultipleInputNodeCreationProcessorTest.java
index 6caf9a5a3aa..b976d0ab650 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/processor/MultipleInputNodeCreationProcessorTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/processor/MultipleInputNodeCreationProcessorTest.java
@@ -22,6 +22,7 @@ import 
org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.connector.source.Boundedness;
 import org.apache.flink.api.connector.source.mocks.MockSource;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.expressions.ApiExpressionUtils;
@@ -39,7 +40,6 @@ import org.junit.jupiter.api.Test;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.Arrays;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -125,8 +125,7 @@ class MultipleInputNodeCreationProcessorTest extends 
TableTestBase {
     }
 
     private void createNonChainableStream(TableTestUtil util) {
-        DataStreamSource<Integer> dataStream =
-                util.getStreamEnv().fromCollection(Arrays.asList(1, 2, 3));
+        DataStreamSource<Integer> dataStream = 
util.getStreamEnv().addSource(new LegacySource());
         TableTestUtil.createTemporaryView(
                 util.tableEnv(),
                 "nonChainableStream",
@@ -172,4 +171,13 @@ class MultipleInputNodeCreationProcessorTest extends 
TableTestBase {
                         + "'\n"
                         + ")");
     }
+
+    private static class LegacySource implements SourceFunction<Integer> {
+        public void run(SourceContext<Integer> sourceContext) {
+            sourceContext.collect(1);
+        }
+
+        @Override
+        public void cancel() {}
+    }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DataStreamJavaITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DataStreamJavaITCase.java
index a756210d768..f8ea2a19b9b 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DataStreamJavaITCase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DataStreamJavaITCase.java
@@ -226,7 +226,7 @@ class DataStreamJavaITCase {
                     Row.of(null, Row.of(false, null), 
Collections.singletonMap("world", null))
                 };
 
-        final DataStream<Row> dataStream = 
env.fromCollection(Arrays.asList(rows), typeInfo);
+        final DataStream<Row> dataStream = env.fromData(Arrays.asList(rows), 
typeInfo);
 
         final TableResult result = 
tableEnv.fromDataStream(dataStream).execute();
 
@@ -305,7 +305,7 @@ class DataStreamJavaITCase {
                         Tuple2.of(DayOfWeek.MONDAY, ZoneOffset.UTC),
                         Tuple2.of(DayOfWeek.FRIDAY, ZoneOffset.ofHours(5)));
 
-        final DataStream<Tuple2<DayOfWeek, ZoneOffset>> dataStream = 
env.fromCollection(rawRecords);
+        final DataStream<Tuple2<DayOfWeek, ZoneOffset>> dataStream = 
env.fromData(rawRecords);
 
         // verify incoming type information
         assertThat(dataStream.getType()).isInstanceOf(TupleTypeInfo.class);
@@ -945,7 +945,7 @@ class DataStreamJavaITCase {
 
     private DataStream<Tuple3<Long, Integer, String>> 
getWatermarkedDataStream() {
         final DataStream<Tuple3<Long, Integer, String>> dataStream =
-                env.fromCollection(
+                env.fromData(
                         Arrays.asList(
                                 Tuple3.of(1L, 42, "a"),
                                 Tuple3.of(2L, 5, "a"),
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/table/TimeAttributesITCase.scala
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/table/TimeAttributesITCase.scala
index 22a08755939..60628f53ef6 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/table/TimeAttributesITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/table/TimeAttributesITCase.scala
@@ -20,7 +20,7 @@ package org.apache.flink.table.planner.runtime.stream.table
 
 import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, 
WatermarkStrategy}
 import org.apache.flink.api.scala.createTypeInformation
-import org.apache.flink.core.testutils.FlinkMatchers.containsMessage
+import org.apache.flink.streaming.api.functions.source.SourceFunction
 import org.apache.flink.table.api._
 import 
org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase.StateBackendMode
 import 
org.apache.flink.table.planner.runtime.utils.{StreamingWithStateTestBase, 
TestingAppendSink}
@@ -36,10 +36,17 @@ import java.time.{Duration, Instant, LocalDateTime, 
ZoneOffset}
 class TimeAttributesITCase(mode: StateBackendMode) extends 
StreamingWithStateTestBase(mode) {
 
   @TestTemplate
-  def testMissingTimeAttributeThrowsCorrectException(): Unit = {
-    val data = List(1L -> "hello", 2L -> "world")
-    val stream = env.fromCollection[(Long, String)](data)
+  def testMissingTimeAttributeInLegacySourceThrowsCorrectException(): Unit = {
+    //TODO: this test can be removed when SourceFunction gets removed (FLIP-27 
sources set TimestampAssigner.NO_TIMESTAMP value as event time when no 
timestamp is provided. See SourceOutputWithWatermarks#collect())
+    val stream = env
+      .addSource(new SourceFunction[(Long, String)]() {
+        def run(ctx: SourceFunction.SourceContext[(Long, String)]) {
+          ctx.collect(1L -> "hello")
+          ctx.collect(2L -> "world")
+        }
 
+        def cancel() {}
+      })
     tEnv.createTemporaryView("test", stream, $"event_time".rowtime(), $"data")
     val result = tEnv.sqlQuery("SELECT * FROM test")
 
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala
index f0f6e622952..dee5eda6111 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala
@@ -115,7 +115,7 @@ abstract class ExpressionTestBase(isStreaming: Boolean = 
true) {
       tEnv.getCatalogManager.getDataTypeFactory.createDataType(testDataType)
     }
     if (containsLegacyTypes) {
-      val ds = env.fromCollection(Collections.emptyList[Row](), typeInfo)
+      val ds = env.fromData(Collections.emptyList[Row](), typeInfo)
       tEnv.createTemporaryView(tableName, ds, 
typeInfo.getFieldNames.map(api.$): _*)
       functions.foreach(f => tEnv.createTemporarySystemFunction(f._1, f._2))
     } else {
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
index 26193a650f4..9d79525373f 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
@@ -91,6 +91,8 @@ import java.nio.file.{Files, Path, Paths}
 import java.time.Duration
 import java.util.Collections
 
+import scala.collection.JavaConverters._
+
 /** Test base for testing Table API / SQL plans. */
 abstract class TableTestBase {
 
@@ -1409,7 +1411,7 @@ class TestTableSource(override val isBounded: Boolean, 
schema: TableSchema)
   extends StreamTableSource[Row] {
 
   override def getDataStream(execEnv: environment.StreamExecutionEnvironment): 
DataStream[Row] = {
-    execEnv.fromCollection(List[Row](), getReturnType)
+    execEnv.fromData(List[Row]().asJava, getReturnType)
   }
 
   override def getReturnType: TypeInformation[Row] = {
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/testTableSourceSinks.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/testTableSourceSinks.scala
index dcd3f200a43..ed2c7811947 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/testTableSourceSinks.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/testTableSourceSinks.scala
@@ -202,7 +202,7 @@ class TestTableSourceWithTime[T](
   with DefinedFieldMapping {
 
   override def getDataStream(execEnv: StreamExecutionEnvironment): 
DataStream[T] = {
-    val dataStream = execEnv.fromCollection(values, returnType)
+    val dataStream = execEnv.fromData(values.asJava, returnType)
     dataStream.getTransformation.setMaxParallelism(1)
     dataStream
   }
@@ -528,7 +528,7 @@ class TestLegacyFilterableTableSource(
     }
 
     execEnv
-      .fromCollection[Row](
+      .fromData[Row](
         applyPredicatesToRows(records).asJava,
         fromDataTypeToTypeInfo(getProducedDataType).asInstanceOf[RowTypeInfo])
       .setParallelism(1)
@@ -929,7 +929,7 @@ class TestStreamTableSource(tableSchema: TableSchema, 
values: Seq[Row])
   extends StreamTableSource[Row] {
 
   override def getDataStream(execEnv: StreamExecutionEnvironment): 
DataStream[Row] = {
-    execEnv.fromCollection(values, tableSchema.toRowType)
+    execEnv.fromData(values.asJava, tableSchema.toRowType)
   }
 
   override def getProducedDataType: DataType = tableSchema.toRowDataType
@@ -1087,7 +1087,7 @@ class TestPartitionableTableSource(
       data.values.flatten
     }
 
-    execEnv.fromCollection[Row](remainingData, 
getReturnType).setParallelism(1).setMaxParallelism(1)
+    execEnv.fromData[Row](remainingData, 
getReturnType).setParallelism(1).setMaxParallelism(1)
   }
 
   override def explainSource(): String = {
@@ -1218,7 +1218,7 @@ class WithoutTimeAttributesTableSource(bounded: Boolean) 
extends StreamTableSour
     )
     val dataStream =
       execEnv
-        .fromCollection(data)
+        .fromData(data.asJava)
         
.returns(fromDataTypeToTypeInfo(getProducedDataType).asInstanceOf[RowTypeInfo])
     dataStream.getTransformation.setMaxParallelism(1)
     dataStream
diff --git 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/multipleinput/MultipleInputTestBase.java
 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/multipleinput/MultipleInputTestBase.java
index 24bdf3ca17b..8420d70221f 100644
--- 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/multipleinput/MultipleInputTestBase.java
+++ 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/multipleinput/MultipleInputTestBase.java
@@ -53,7 +53,7 @@ import java.util.stream.Collectors;
 public class MultipleInputTestBase {
 
     protected Transformation<RowData> createSource(StreamExecutionEnvironment 
env, String... data) {
-        return env.fromCollection(
+        return env.fromData(
                         Arrays.stream(data)
                                 .map(StringData::fromString)
                                 .map(GenericRowData::of)
diff --git 
a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/testsuites/SinkTestSuiteBase.java
 
b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/testsuites/SinkTestSuiteBase.java
index faeac88842a..461f8dfa432 100644
--- 
a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/testsuites/SinkTestSuiteBase.java
+++ 
b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/testsuites/SinkTestSuiteBase.java
@@ -141,7 +141,7 @@ public abstract class SinkTestSuiteBase<T extends 
Comparable<T>> {
                                 .build());
         execEnv.enableCheckpointing(50);
         DataStream<T> dataStream =
-                execEnv.fromCollection(testRecords)
+                execEnv.fromData(testRecords)
                         .name("sourceInSinkTest")
                         .setParallelism(1)
                         .returns(externalContext.getProducedType());
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
index d048048f8a2..c0b5e017865 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
@@ -133,7 +133,7 @@ public class AccumulatorLiveITCase extends TestLogger {
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
         env.setParallelism(1);
 
-        DataStream<Integer> input = env.fromCollection(inputData);
+        DataStream<Integer> input = env.fromData(inputData);
         input.flatMap(new NotifyingMapper())
                 .writeUsingOutputFormat(new DummyOutputFormat())
                 .disableChaining();
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BroadcastStateITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BroadcastStateITCase.java
index 01c0987a60d..e9822515434 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BroadcastStateITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BroadcastStateITCase.java
@@ -85,7 +85,7 @@ public class BroadcastStateITCase extends AbstractTestBase {
                         .keyBy((KeySelector<Long, Long>) value -> value);
 
         final DataStream<String> srcTwo =
-                env.fromCollection(expected.values())
+                env.fromData(expected.values())
                         .assignTimestampsAndWatermarks(
                                 new CustomWmEmitter<String>() {
 
@@ -145,7 +145,7 @@ public class BroadcastStateITCase extends AbstractTestBase {
                                 });
 
         final DataStream<String> srcTwo =
-                env.fromCollection(expected.values())
+                env.fromData(expected.values())
                         .assignTimestampsAndWatermarks(
                                 new CustomWmEmitter<String>() {
 
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/DataStreamPojoITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/DataStreamPojoITCase.java
index 8c195ae854a..6accaf64034 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/DataStreamPojoITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/DataStreamPojoITCase.java
@@ -53,7 +53,7 @@ public class DataStreamPojoITCase extends AbstractTestBase {
         see.getConfig().disableObjectReuse();
         see.setParallelism(3);
 
-        DataStream<Data> dataStream = see.fromCollection(elements);
+        DataStream<Data> dataStream = see.fromData(elements);
 
         DataStream<Data> summedStream =
                 dataStream
@@ -105,7 +105,7 @@ public class DataStreamPojoITCase extends AbstractTestBase {
         see.getConfig().disableObjectReuse();
         see.setParallelism(4);
 
-        DataStream<Data> dataStream = see.fromCollection(elements);
+        DataStream<Data> dataStream = see.fromData(elements);
 
         DataStream<Data> summedStream =
                 dataStream
@@ -160,7 +160,7 @@ public class DataStreamPojoITCase extends AbstractTestBase {
         see.getConfig().disableObjectReuse();
         see.setParallelism(4);
 
-        DataStream<Data> dataStream = see.fromCollection(elements);
+        DataStream<Data> dataStream = see.fromData(elements);
 
         DataStream<Data> summedStream =
                 dataStream
@@ -198,7 +198,7 @@ public class DataStreamPojoITCase extends AbstractTestBase {
     public void testFailOnNestedPojoFieldAccessor() throws Exception {
         StreamExecutionEnvironment see = 
StreamExecutionEnvironment.getExecutionEnvironment();
 
-        DataStream<Data> dataStream = see.fromCollection(elements);
+        DataStream<Data> dataStream = see.fromData(elements);
         dataStream.keyBy("aaa", "stats.count").sum("stats.nonExistingField");
     }
 
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/IterateITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/IterateITCase.java
index 6790436f778..147a17dc823 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/IterateITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/IterateITCase.java
@@ -420,7 +420,7 @@ public class IterateITCase extends AbstractTestBase {
                 iterated = new boolean[parallelism];
 
                 DataStream<Boolean> source =
-                        env.fromCollection(Collections.nCopies(parallelism * 
2, false))
+                        env.fromData(Collections.nCopies(parallelism * 2, 
false))
                                 .map(noOpBoolMap)
                                 .name("ParallelizeMap");
 
@@ -693,7 +693,7 @@ public class IterateITCase extends AbstractTestBase {
         env.enableCheckpointing();
 
         DataStream<Boolean> source =
-                env.fromCollection(Collections.nCopies(parallelism * 2, false))
+                env.fromData(Collections.nCopies(parallelism * 2, false))
                         .map(noOpBoolMap)
                         .name("ParallelizeMap");
 
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/LatencyMarkerITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/LatencyMarkerITCase.java
index 1340377a519..43f0a835a21 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/LatencyMarkerITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/LatencyMarkerITCase.java
@@ -21,7 +21,6 @@ import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.state.MapStateDescriptor;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -32,7 +31,6 @@ import org.apache.flink.util.Collector;
 import org.junit.Assert;
 import org.junit.Test;
 
-import java.util.Collections;
 import java.util.List;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
@@ -55,13 +53,11 @@ public class LatencyMarkerITCase {
 
         List<Integer> broadcastData =
                 IntStream.range(0, 
inputCount).boxed().collect(Collectors.toList());
-        DataStream<Integer> broadcastDataStream =
-                env.fromCollection(broadcastData).setParallelism(1);
+        DataStream<Integer> broadcastDataStream = 
env.fromData(broadcastData).setParallelism(1);
 
         // broadcast the configurations and create the broadcast state
 
-        DataStream<String> streamWithoutData =
-                env.fromCollection(Collections.emptyList(), 
TypeInformation.of(String.class));
+        DataStream<String> dataStream = env.fromData("test");
 
         MapStateDescriptor<String, Integer> stateDescriptor =
                 new MapStateDescriptor<>(
@@ -70,7 +66,7 @@ public class LatencyMarkerITCase {
                         BasicTypeInfo.INT_TYPE_INFO);
 
         SingleOutputStreamOperator<Integer> processor =
-                streamWithoutData
+                dataStream
                         
.connect(broadcastDataStream.broadcast(stateDescriptor))
                         .process(
                                 new BroadcastProcessFunction<String, Integer, 
Integer>() {
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/PartitionerITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/PartitionerITCase.java
index 8489bf9b2aa..ae988039083 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/PartitionerITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/PartitionerITCase.java
@@ -99,7 +99,7 @@ public class PartitionerITCase extends AbstractTestBase {
         env.setParallelism(PARALLELISM);
 
         DataStream<Tuple1<String>> src =
-                
env.fromCollection(INPUT.stream().map(Tuple1::of).collect(Collectors.toList()));
+                
env.fromData(INPUT.stream().map(Tuple1::of).collect(Collectors.toList()));
 
         // partition by hash
         src.keyBy(0).map(new 
SubtaskIndexAssigner()).addSink(hashPartitionResultSink);
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java
index c8d42a64280..db26f2e1e3e 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java
@@ -231,7 +231,7 @@ public class SideOutputITCase extends AbstractTestBase 
implements Serializable {
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
         env.setParallelism(3);
 
-        DataStream<Integer> dataStream = env.fromCollection(elements);
+        DataStream<Integer> dataStream = env.fromData(elements);
 
         SingleOutputStreamOperator<Integer> passThroughtStream =
                 dataStream.process(
@@ -273,7 +273,7 @@ public class SideOutputITCase extends AbstractTestBase 
implements Serializable {
         env.getConfig().enableObjectReuse();
         env.setParallelism(3);
 
-        DataStream<Integer> dataStream = env.fromCollection(elements);
+        DataStream<Integer> dataStream = env.fromData(elements);
 
         SingleOutputStreamOperator<Integer> passThroughtStream =
                 dataStream.process(
@@ -316,7 +316,7 @@ public class SideOutputITCase extends AbstractTestBase 
implements Serializable {
         env.getConfig().enableObjectReuse();
         env.setParallelism(3);
 
-        DataStream<Integer> dataStream = env.fromCollection(elements);
+        DataStream<Integer> dataStream = env.fromData(elements);
 
         SingleOutputStreamOperator<Integer> passThroughtStream =
                 dataStream.process(
@@ -356,7 +356,7 @@ public class SideOutputITCase extends AbstractTestBase 
implements Serializable {
         StreamExecutionEnvironment see = 
StreamExecutionEnvironment.getExecutionEnvironment();
         see.setParallelism(3);
 
-        DataStream<Integer> dataStream = see.fromCollection(elements);
+        DataStream<Integer> dataStream = see.fromData(elements);
 
         SingleOutputStreamOperator<Integer> passThroughtStream =
                 dataStream.process(
@@ -390,7 +390,7 @@ public class SideOutputITCase extends AbstractTestBase 
implements Serializable {
         StreamExecutionEnvironment see = 
StreamExecutionEnvironment.getExecutionEnvironment();
         see.setParallelism(3);
 
-        DataStream<Integer> dataStream = see.fromCollection(elements);
+        DataStream<Integer> dataStream = see.fromData(elements);
 
         SingleOutputStreamOperator<Integer> passThroughtStream =
                 dataStream.process(
@@ -427,8 +427,8 @@ public class SideOutputITCase extends AbstractTestBase 
implements Serializable {
         StreamExecutionEnvironment see = 
StreamExecutionEnvironment.getExecutionEnvironment();
         see.setParallelism(3);
 
-        DataStream<Integer> ds1 = see.fromCollection(elements);
-        DataStream<Integer> ds2 = see.fromCollection(elements);
+        DataStream<Integer> ds1 = see.fromData(elements);
+        DataStream<Integer> ds2 = see.fromData(elements);
 
         SingleOutputStreamOperator<Integer> passThroughtStream =
                 ds1.connect(ds2)
@@ -482,8 +482,8 @@ public class SideOutputITCase extends AbstractTestBase 
implements Serializable {
         StreamExecutionEnvironment see = 
StreamExecutionEnvironment.getExecutionEnvironment();
         see.setParallelism(3);
 
-        DataStream<Integer> ds1 = see.fromCollection(elements);
-        DataStream<Integer> ds2 = see.fromCollection(elements);
+        DataStream<Integer> ds1 = see.fromData(elements);
+        DataStream<Integer> ds2 = see.fromData(elements);
 
         SingleOutputStreamOperator<Integer> passThroughtStream =
                 ds1.connect(ds2)
@@ -538,7 +538,7 @@ public class SideOutputITCase extends AbstractTestBase 
implements Serializable {
         StreamExecutionEnvironment see = 
StreamExecutionEnvironment.getExecutionEnvironment();
         see.setParallelism(3);
 
-        DataStream<Integer> dataStream = see.fromCollection(elements);
+        DataStream<Integer> dataStream = see.fromData(elements);
 
         SingleOutputStreamOperator<Integer> passThroughtStream =
                 dataStream
@@ -586,8 +586,8 @@ public class SideOutputITCase extends AbstractTestBase 
implements Serializable {
         StreamExecutionEnvironment see = 
StreamExecutionEnvironment.getExecutionEnvironment();
         see.setParallelism(3);
 
-        DataStream<Integer> ds1 = see.fromCollection(elements);
-        DataStream<Integer> ds2 = see.fromCollection(elements);
+        DataStream<Integer> ds1 = see.fromData(elements);
+        DataStream<Integer> ds2 = see.fromData(elements);
 
         SingleOutputStreamOperator<Integer> passThroughtStream =
                 ds1.keyBy(i -> i)
@@ -640,8 +640,8 @@ public class SideOutputITCase extends AbstractTestBase 
implements Serializable {
         StreamExecutionEnvironment see = 
StreamExecutionEnvironment.getExecutionEnvironment();
         see.setParallelism(3);
 
-        DataStream<Integer> ds1 = see.fromCollection(elements);
-        DataStream<Integer> ds2 = see.fromCollection(elements);
+        DataStream<Integer> ds1 = see.fromData(elements);
+        DataStream<Integer> ds2 = see.fromData(elements);
 
         SingleOutputStreamOperator<Integer> passThroughtStream =
                 ds1.keyBy(i -> i)
@@ -707,8 +707,8 @@ public class SideOutputITCase extends AbstractTestBase 
implements Serializable {
         StreamExecutionEnvironment see = 
StreamExecutionEnvironment.getExecutionEnvironment();
         see.setParallelism(3);
 
-        DataStream<Integer> ds1 = see.fromCollection(elements);
-        DataStream<Integer> ds2 = see.fromCollection(elements);
+        DataStream<Integer> ds1 = see.fromData(elements);
+        DataStream<Integer> ds2 = see.fromData(elements);
 
         SingleOutputStreamOperator<Integer> passThroughtStream =
                 ds1.keyBy(i -> i)
@@ -766,8 +766,8 @@ public class SideOutputITCase extends AbstractTestBase 
implements Serializable {
         StreamExecutionEnvironment see = 
StreamExecutionEnvironment.getExecutionEnvironment();
         see.setParallelism(3);
 
-        DataStream<Integer> ds1 = see.fromCollection(elements);
-        DataStream<Integer> ds2 = see.fromCollection(elements);
+        DataStream<Integer> ds1 = see.fromData(elements);
+        DataStream<Integer> ds2 = see.fromData(elements);
 
         SingleOutputStreamOperator<Integer> passThroughtStream =
                 ds1.keyBy(i -> i)
@@ -830,7 +830,7 @@ public class SideOutputITCase extends AbstractTestBase 
implements Serializable {
         StreamExecutionEnvironment see = 
StreamExecutionEnvironment.getExecutionEnvironment();
         see.setParallelism(3);
 
-        DataStream<Integer> dataStream = see.fromCollection(elements);
+        DataStream<Integer> dataStream = see.fromData(elements);
 
         dataStream
                 .process(
@@ -886,7 +886,7 @@ public class SideOutputITCase extends AbstractTestBase 
implements Serializable {
         StreamExecutionEnvironment see = 
StreamExecutionEnvironment.getExecutionEnvironment();
         see.setParallelism(1);
 
-        DataStream<Integer> dataStream = see.fromCollection(elements);
+        DataStream<Integer> dataStream = see.fromData(elements);
 
         OutputTag<Integer> lateDataTag = new OutputTag<Integer>("late") {};
 
@@ -939,7 +939,7 @@ public class SideOutputITCase extends AbstractTestBase 
implements Serializable {
         StreamExecutionEnvironment see = 
StreamExecutionEnvironment.getExecutionEnvironment();
         see.setParallelism(3);
 
-        DataStream<Integer> dataStream = see.fromCollection(elements);
+        DataStream<Integer> dataStream = see.fromData(elements);
 
         OutputTag<Integer> lateDataTag = new OutputTag<Integer>("late") {};
 
@@ -989,7 +989,7 @@ public class SideOutputITCase extends AbstractTestBase 
implements Serializable {
         StreamExecutionEnvironment see = 
StreamExecutionEnvironment.getExecutionEnvironment();
         see.setParallelism(3);
 
-        DataStream<Integer> dataStream = see.fromCollection(elements);
+        DataStream<Integer> dataStream = see.fromData(elements);
 
         OutputTag<String> sideOutputTag = new OutputTag<String>("side") {};
 
@@ -1036,7 +1036,7 @@ public class SideOutputITCase extends AbstractTestBase 
implements Serializable {
         StreamExecutionEnvironment see = 
StreamExecutionEnvironment.getExecutionEnvironment();
         see.setParallelism(1);
 
-        DataStream<Integer> dataStream = see.fromCollection(elements);
+        DataStream<Integer> dataStream = see.fromData(elements);
 
         OutputTag<String> sideOutputTag = new OutputTag<String>("side") {};
 
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkITCase.java
index fdc79986223..2eaa88b045c 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkITCase.java
@@ -151,7 +151,7 @@ public class SinkITCase extends AbstractTestBase {
     public void writerAndCommitterAndGlobalCommitterExecuteInBatchMode() 
throws Exception {
         final StreamExecutionEnvironment env = buildBatchEnv();
 
-        env.fromCollection(SOURCE_DATA)
+        env.fromData(SOURCE_DATA)
                 .sinkTo(
                         TestSink.newBuilder()
                                 .setDefaultCommitter(
@@ -193,7 +193,7 @@ public class SinkITCase extends AbstractTestBase {
     public void writerAndCommitterExecuteInBatchMode() throws Exception {
         final StreamExecutionEnvironment env = buildBatchEnv();
 
-        env.fromCollection(SOURCE_DATA)
+        env.fromData(SOURCE_DATA)
                 .sinkTo(
                         TestSink.newBuilder()
                                 .setDefaultCommitter(
@@ -238,7 +238,7 @@ public class SinkITCase extends AbstractTestBase {
     public void writerAndGlobalCommitterExecuteInBatchMode() throws Exception {
         final StreamExecutionEnvironment env = buildBatchEnv();
 
-        env.fromCollection(SOURCE_DATA)
+        env.fromData(SOURCE_DATA)
                 .sinkTo(
                         TestSink.newBuilder()
                                 .setCommittableSerializer(
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2ITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2ITCase.java
index 70d94004748..b23dc34ed74 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2ITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2ITCase.java
@@ -107,7 +107,7 @@ public class SinkV2ITCase extends AbstractTestBase {
     public void writerAndCommitterExecuteInBatchMode() throws Exception {
         final StreamExecutionEnvironment env = buildBatchEnv();
 
-        env.fromCollection(SOURCE_DATA)
+        env.fromData(SOURCE_DATA)
                 .sinkTo(
                         TestSinkV2.<Integer>newBuilder()
                                 .setDefaultCommitter(
diff --git a/tools/maven/checkstyle.xml b/tools/maven/checkstyle.xml
index f03fea8d578..f313c69eba0 100644
--- a/tools/maven/checkstyle.xml
+++ b/tools/maven/checkstyle.xml
@@ -61,7 +61,7 @@ This file is based on the checkstyle file of Apache Beam.
        -->
 
        <module name="FileLength">
-               <property name="max" value="3000"/>
+               <property name="max" value="3100"/>
        </module>
 
        <!-- All Java AST specific tests live under TreeWalker module. -->

Reply via email to