This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 18c03f2e6c593a772f64cdb5c089e2911d3cbc89 Author: Alexander Fedulov <1492164+afedu...@users.noreply.github.com> AuthorDate: Mon Nov 27 23:29:27 2023 +0100 [FLINK-28229][connectors] Deprecate StreamExecutionEnvironment#fromCollection() --- .../program/StreamContextEnvironmentTest.java | 16 ++--- .../client/testjar/ForbidConfigurationJob.java | 2 +- .../flink/streaming/examples/join/WindowJoin.java | 16 +++-- .../examples/join/WindowJoinSampleData.java | 75 ++++++---------------- .../examples/java/basics/StreamSQLExample.java | 4 +- .../flink/formats/protobuf/ProtobufTestHelper.java | 2 +- .../apache/flink/state/api/SavepointWriter.java | 4 +- .../flink/state/api/SavepointWriterITCase.java | 12 ++-- .../api/SavepointWriterUidModificationITCase.java | 4 +- .../state/api/SavepointWriterWindowITCase.java | 8 +-- .../flink/state/api/WritableSavepointITCase.java | 8 +-- .../flink/python/util/PythonConfigUtilTest.java | 2 +- .../environment/StreamExecutionEnvironment.java | 15 +++++ .../StreamExecutionEnvironmentTest.java | 57 ++++++---------- .../ExecutorDiscoveryAndJobClientTest.java | 2 +- .../MultipleInputNodeCreationProcessorTest.java | 14 +++- .../runtime/stream/sql/DataStreamJavaITCase.java | 6 +- .../stream/table/TimeAttributesITCase.scala | 15 +++-- .../expressions/utils/ExpressionTestBase.scala | 2 +- .../flink/table/planner/utils/TableTestBase.scala | 4 +- .../table/planner/utils/testTableSourceSinks.scala | 10 +-- .../multipleinput/MultipleInputTestBase.java | 2 +- .../testframe/testsuites/SinkTestSuiteBase.java | 2 +- .../test/accumulators/AccumulatorLiveITCase.java | 2 +- .../streaming/runtime/BroadcastStateITCase.java | 4 +- .../streaming/runtime/DataStreamPojoITCase.java | 8 +-- .../test/streaming/runtime/IterateITCase.java | 4 +- .../streaming/runtime/LatencyMarkerITCase.java | 10 +-- .../test/streaming/runtime/PartitionerITCase.java | 2 +- .../test/streaming/runtime/SideOutputITCase.java | 46 ++++++------- .../flink/test/streaming/runtime/SinkITCase.java | 6 +- .../flink/test/streaming/runtime/SinkV2ITCase.java | 2 +- tools/maven/checkstyle.xml | 2 +- 33 files changed, 174 insertions(+), 194 deletions(-) diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/StreamContextEnvironmentTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/StreamContextEnvironmentTest.java index 1af115a402a..ac157b04f93 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/program/StreamContextEnvironmentTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/program/StreamContextEnvironmentTest.java @@ -78,7 +78,7 @@ class StreamContextEnvironmentTest { // Add/mutate values in the configuration environment.configure(programConfig); - environment.fromCollection(Collections.singleton(1)).sinkTo(new DiscardingSink<>()); + environment.fromData(Collections.singleton(1)).sinkTo(new DiscardingSink<>()); assertThatThrownBy(() -> executor.accept(environment)) .isInstanceOf(MutatedConfigurationException.class) .hasMessageContainingAll( @@ -106,7 +106,7 @@ class StreamContextEnvironmentTest { // Change the CheckpointConfig environment.getCheckpointConfig().setCheckpointStorage(disallowedPath); - environment.fromCollection(Collections.singleton(1)).sinkTo(new DiscardingSink<>()); + environment.fromData(Collections.singleton(1)).sinkTo(new DiscardingSink<>()); assertThatThrownBy(() -> executor.accept(environment)) .isInstanceOf(MutatedConfigurationException.class) .hasMessageContainingAll( @@ -114,7 +114,7 @@ class StreamContextEnvironmentTest { environment.getCheckpointConfig().setCheckpointStorage(new JobManagerCheckpointStorage()); - environment.fromCollection(Collections.singleton(1)).sinkTo(new DiscardingSink<>()); + environment.fromData(Collections.singleton(1)).sinkTo(new DiscardingSink<>()); assertThatThrownBy(() -> executor.accept(environment)) .isInstanceOf(MutatedConfigurationException.class) .hasMessageContainingAll( @@ -143,7 +143,7 @@ class StreamContextEnvironmentTest { false, Collections.emptyList()); - environment.fromCollection(Collections.singleton(1)).sinkTo(new DiscardingSink<>()); + environment.fromData(Collections.singleton(1)).sinkTo(new DiscardingSink<>()); assertThatThrownBy(() -> executor.accept(environment)) .isInstanceOf(MutatedConfigurationException.class) .hasMessageContainingAll( @@ -169,7 +169,7 @@ class StreamContextEnvironmentTest { // Change the CheckpointConfig environment.getCheckpointConfig().setCheckpointStorage(allowedPath); - environment.fromCollection(Collections.singleton(1)).sinkTo(new DiscardingSink<>()); + environment.fromData(Collections.singleton(1)).sinkTo(new DiscardingSink<>()); assertThatThrownBy(() -> executor.accept(environment)) .isInstanceOf(ExecutorReachedException.class); } @@ -186,7 +186,7 @@ class StreamContextEnvironmentTest { final StreamContextEnvironment environment = constructStreamContextEnvironment(clusterConfig, Collections.emptyList()); - environment.fromCollection(Collections.singleton(1)).sinkTo(new DiscardingSink<>()); + environment.fromData(Collections.singleton(1)).sinkTo(new DiscardingSink<>()); assertThatThrownBy(() -> executor.accept(environment)) .isInstanceOf(ExecutorReachedException.class); } @@ -202,7 +202,7 @@ class StreamContextEnvironmentTest { final StreamContextEnvironment environment = constructStreamContextEnvironment(clusterConfig, Collections.emptyList()); - environment.fromCollection(Collections.singleton(1)).sinkTo(new DiscardingSink<>()); + environment.fromData(Collections.singleton(1)).sinkTo(new DiscardingSink<>()); assertThatThrownBy(() -> executor.accept(environment)) .isInstanceOf(ExecutorReachedException.class); } @@ -235,7 +235,7 @@ class StreamContextEnvironmentTest { environment.configure(jobConfig); environment.getConfig().setMaxParallelism(1024); - environment.fromCollection(Collections.singleton(1)).sinkTo(new DiscardingSink<>()); + environment.fromData(Collections.singleton(1)).sinkTo(new DiscardingSink<>()); assertThatThrownBy(() -> executor.accept(environment)) .isInstanceOf(ExecutorReachedException.class); assertThat(environment.getConfig().getGlobalJobParameters().toMap()) diff --git a/flink-clients/src/test/java/org/apache/flink/client/testjar/ForbidConfigurationJob.java b/flink-clients/src/test/java/org/apache/flink/client/testjar/ForbidConfigurationJob.java index 19d84990f91..8e7f0222e2a 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/testjar/ForbidConfigurationJob.java +++ b/flink-clients/src/test/java/org/apache/flink/client/testjar/ForbidConfigurationJob.java @@ -37,7 +37,7 @@ public class ForbidConfigurationJob { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config); - env.fromCollection(Lists.newArrayList(1, 2, 3)).sinkTo(new DiscardingSink<>()); + env.fromData(Lists.newArrayList(1, 2, 3)).sinkTo(new DiscardingSink<>()); env.execute(); } } diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java index e6332090289..3b6d6c2bd89 100644 --- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java +++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java @@ -37,8 +37,6 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; -import org.apache.flink.streaming.examples.join.WindowJoinSampleData.GradeSource; -import org.apache.flink.streaming.examples.join.WindowJoinSampleData.SalarySource; import java.time.Duration; @@ -76,12 +74,18 @@ public class WindowJoin { // create the data sources for both grades and salaries DataStream<Tuple2<String, Integer>> grades = - GradeSource.getSource(env, rate) - .assignTimestampsAndWatermarks(IngestionTimeWatermarkStrategy.create()); + env.fromSource( + WindowJoinSampleData.getGradeGeneratorSource(rate), + IngestionTimeWatermarkStrategy.create(), + "Grades Data Generator") + .setParallelism(1); DataStream<Tuple2<String, Integer>> salaries = - SalarySource.getSource(env, rate) - .assignTimestampsAndWatermarks(IngestionTimeWatermarkStrategy.create()); + env.fromSource( + WindowJoinSampleData.getSalaryGeneratorSource(rate), + IngestionTimeWatermarkStrategy.create(), + "Grades Data Generator") + .setParallelism(1); // run the actual window join program // for testability, this functionality is in a separate method. diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/join/WindowJoinSampleData.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/join/WindowJoinSampleData.java index fbaec8e64de..55f59ed436a 100644 --- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/join/WindowJoinSampleData.java +++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/join/WindowJoinSampleData.java @@ -20,13 +20,11 @@ package org.apache.flink.streaming.examples.join; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.examples.utils.ThrottledIterator; +import org.apache.flink.connector.datagen.source.DataGeneratorSource; +import org.apache.flink.connector.datagen.source.GeneratorFunction; -import java.io.Serializable; -import java.util.Iterator; import java.util.Random; /** Sample data for the {@link WindowJoin} example. */ @@ -38,58 +36,27 @@ public class WindowJoinSampleData { static final int SALARY_MAX = 10000; /** Continuously generates (name, grade). */ - public static class GradeSource implements Iterator<Tuple2<String, Integer>>, Serializable { - - private final Random rnd = new Random(hashCode()); - - @Override - public boolean hasNext() { - return true; - } - - @Override - public Tuple2<String, Integer> next() { - return new Tuple2<>(NAMES[rnd.nextInt(NAMES.length)], rnd.nextInt(GRADE_COUNT) + 1); - } - - @Override - public void remove() { - throw new UnsupportedOperationException(); - } - - public static DataStream<Tuple2<String, Integer>> getSource( - StreamExecutionEnvironment env, long rate) { - return env.fromCollection( - new ThrottledIterator<>(new GradeSource(), rate), - TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {})); - } + public static DataGeneratorSource<Tuple2<String, Integer>> getGradeGeneratorSource( + double elementsPerSecond) { + return getTupleGeneratorSource(GRADE_COUNT, elementsPerSecond); } /** Continuously generates (name, salary). */ - public static class SalarySource implements Iterator<Tuple2<String, Integer>>, Serializable { - - private final Random rnd = new Random(hashCode()); - - @Override - public boolean hasNext() { - return true; - } - - @Override - public Tuple2<String, Integer> next() { - return new Tuple2<>(NAMES[rnd.nextInt(NAMES.length)], rnd.nextInt(SALARY_MAX) + 1); - } - - @Override - public void remove() { - throw new UnsupportedOperationException(); - } + public static DataGeneratorSource<Tuple2<String, Integer>> getSalaryGeneratorSource( + double elementsPerSecond) { + return getTupleGeneratorSource(SALARY_MAX, elementsPerSecond); + } - public static DataStream<Tuple2<String, Integer>> getSource( - StreamExecutionEnvironment env, long rate) { - return env.fromCollection( - new ThrottledIterator<>(new SalarySource(), rate), - TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {})); - } + private static DataGeneratorSource<Tuple2<String, Integer>> getTupleGeneratorSource( + int maxValue, double elementsPerSecond) { + final Random rnd = new Random(); + final GeneratorFunction<Long, Tuple2<String, Integer>> generatorFunction = + index -> new Tuple2<>(NAMES[rnd.nextInt(NAMES.length)], rnd.nextInt(maxValue) + 1); + + return new DataGeneratorSource<>( + generatorFunction, + Long.MAX_VALUE, + RateLimiterStrategy.perSecond(elementsPerSecond), + TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {})); } } diff --git a/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/basics/StreamSQLExample.java b/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/basics/StreamSQLExample.java index 194b7e1f90b..ad7ad7ef40e 100644 --- a/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/basics/StreamSQLExample.java +++ b/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/basics/StreamSQLExample.java @@ -55,14 +55,14 @@ public final class StreamSQLExample { final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); final DataStream<Order> orderA = - env.fromCollection( + env.fromData( Arrays.asList( new Order(1L, "beer", 3), new Order(1L, "diaper", 4), new Order(3L, "rubber", 2))); final DataStream<Order> orderB = - env.fromCollection( + env.fromData( Arrays.asList( new Order(2L, "pen", 3), new Order(2L, "rubber", 3), diff --git a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/ProtobufTestHelper.java b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/ProtobufTestHelper.java index daaa8d68fcc..c24b1909aa8 100644 --- a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/ProtobufTestHelper.java +++ b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/ProtobufTestHelper.java @@ -56,7 +56,7 @@ public class ProtobufTestHelper { (Row) DataFormatConverters.getConverterForDataType(rowDataType).toExternal(rowData); TypeInformation<Row> rowTypeInfo = (TypeInformation<Row>) TypeConversions.fromDataTypeToLegacyInfo(rowDataType); - DataStream<Row> rows = env.fromCollection(Collections.singletonList(row), rowTypeInfo); + DataStream<Row> rows = env.fromData(Collections.singletonList(row), rowTypeInfo); Table table = tableEnv.fromDataStream(rows); tableEnv.createTemporaryView("t", table); diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/SavepointWriter.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/SavepointWriter.java index 99f73fa524b..01f51ca997a 100644 --- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/SavepointWriter.java +++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/SavepointWriter.java @@ -355,9 +355,7 @@ public class SavepointWriter { } DataStream<OperatorState> existingOperatorStates = - executionEnvironment - .fromCollection(existingOperators) - .name("existingOperatorStates"); + executionEnvironment.fromData(existingOperators).name("existingOperatorStates"); existingOperatorStates .flatMap(new StatePathExtractor()) diff --git a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterITCase.java b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterITCase.java index f5fb6c16dbc..13e6c47ac75 100644 --- a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterITCase.java +++ b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterITCase.java @@ -113,12 +113,12 @@ public class SavepointWriterITCase extends AbstractTestBase { env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); StateBootstrapTransformation<Account> transformation = - OperatorTransformation.bootstrapWith(env.fromCollection(accounts)) + OperatorTransformation.bootstrapWith(env.fromData(accounts)) .keyBy(acc -> acc.id) .transform(new AccountBootstrapper()); StateBootstrapTransformation<CurrencyRate> broadcastTransformation = - OperatorTransformation.bootstrapWith(env.fromCollection(currencyRates)) + OperatorTransformation.bootstrapWith(env.fromData(currencyRates)) .transform(new CurrencyBootstrapFunction()); SavepointWriter writer = @@ -141,15 +141,15 @@ public class SavepointWriterITCase extends AbstractTestBase { } DataStream<Account> stream = - env.fromCollection(accounts) + env.fromData(accounts) .keyBy(acc -> acc.id) .flatMap(new UpdateAndGetAccount()) .uid(ACCOUNT_UID); final CloseableIterator<Account> results = stream.collectAsync(); - env.fromCollection(currencyRates) - .connect(env.fromCollection(currencyRates).broadcast(descriptor)) + env.fromData(currencyRates) + .connect(env.fromData(currencyRates).broadcast(descriptor)) .process(new CurrencyValidationFunction()) .uid(CURRENCY_UID) .sinkTo(new DiscardingSink<>()); @@ -192,7 +192,7 @@ public class SavepointWriterITCase extends AbstractTestBase { } DataStream<Account> stream = - sEnv.fromCollection(accounts) + sEnv.fromData(accounts) .keyBy(acc -> acc.id) .flatMap(new UpdateAndGetAccount()) .uid(ACCOUNT_UID); diff --git a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterUidModificationITCase.java b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterUidModificationITCase.java index e6c83ec7c30..1b6b665d687 100644 --- a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterUidModificationITCase.java +++ b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterUidModificationITCase.java @@ -161,7 +161,7 @@ public class SavepointWriterUidModificationITCase { private static StateBootstrapTransformation<Integer> bootstrap( StreamExecutionEnvironment env, Collection<Integer> data) { - return OperatorTransformation.bootstrapWith(env.fromCollection(data)) + return OperatorTransformation.bootstrapWith(env.fromData(data)) .keyBy(v -> v) .transform(new StateBootstrapper()); } @@ -194,7 +194,7 @@ public class SavepointWriterUidModificationITCase { final List<CloseableIterator<Integer>> iterators = new ArrayList<>(); for (Tuple2<Collection<Integer>, String> assertion : assertions) { iterators.add( - env.fromCollection(assertion.f0) + env.fromData(assertion.f0) .keyBy(v -> v) .map(new StateReader()) .uid(assertion.f1) diff --git a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterWindowITCase.java b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterWindowITCase.java index 32345098aca..bc08d725d2f 100644 --- a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterWindowITCase.java +++ b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterWindowITCase.java @@ -146,7 +146,7 @@ public class SavepointWriterWindowITCase extends AbstractTestBase { env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); DataStream<Tuple2<String, Integer>> bootstrapData = - env.fromCollection(WORDS) + env.fromData(WORDS) .map(word -> Tuple2.of(word, 1)) .returns(TUPLE_TYPE_INFO) .assignTimestampsAndWatermarks( @@ -190,7 +190,7 @@ public class SavepointWriterWindowITCase extends AbstractTestBase { env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); DataStream<Tuple2<String, Integer>> bootstrapData = - env.fromCollection(WORDS) + env.fromData(WORDS) .map(word -> Tuple2.of(word, 1)) .returns(TUPLE_TYPE_INFO) .assignTimestampsAndWatermarks( @@ -236,7 +236,7 @@ public class SavepointWriterWindowITCase extends AbstractTestBase { env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); DataStream<Tuple2<String, Integer>> bootstrapData = - env.fromCollection(WORDS) + env.fromData(WORDS) .map(word -> Tuple2.of(word, 1), TUPLE_TYPE_INFO) .assignTimestampsAndWatermarks( WatermarkStrategy.<Tuple2<String, Integer>>noWatermarks() @@ -283,7 +283,7 @@ public class SavepointWriterWindowITCase extends AbstractTestBase { env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); DataStream<Tuple2<String, Integer>> bootstrapData = - env.fromCollection(WORDS) + env.fromData(WORDS) .map(word -> Tuple2.of(word, 1)) .returns(TUPLE_TYPE_INFO) .assignTimestampsAndWatermarks( diff --git a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/WritableSavepointITCase.java b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/WritableSavepointITCase.java index 1d386ef084d..e94ad57ee42 100644 --- a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/WritableSavepointITCase.java +++ b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/WritableSavepointITCase.java @@ -151,15 +151,15 @@ public class WritableSavepointITCase extends AbstractTestBase { sEnv.setStateBackend(backend); DataStream<Account> stream = - sEnv.fromCollection(accounts) + sEnv.fromData(accounts) .keyBy(acc -> acc.id) .flatMap(new UpdateAndGetAccount()) .uid(ACCOUNT_UID); CloseableIterator<Account> results = stream.collectAsync(); - sEnv.fromCollection(currencyRates) - .connect(sEnv.fromCollection(currencyRates).broadcast(descriptor)) + sEnv.fromData(currencyRates) + .connect(sEnv.fromData(currencyRates).broadcast(descriptor)) .process(new CurrencyValidationFunction()) .uid(CURRENCY_UID) .sinkTo(new DiscardingSink<>()); @@ -195,7 +195,7 @@ public class WritableSavepointITCase extends AbstractTestBase { sEnv.setStateBackend(backend); DataStream<Account> stream = - sEnv.fromCollection(accounts) + sEnv.fromData(accounts) .keyBy(acc -> acc.id) .flatMap(new UpdateAndGetAccount()) .uid(ACCOUNT_UID); diff --git a/flink-python/src/test/java/org/apache/flink/python/util/PythonConfigUtilTest.java b/flink-python/src/test/java/org/apache/flink/python/util/PythonConfigUtilTest.java index f9e0362181b..322f21a34ba 100644 --- a/flink-python/src/test/java/org/apache/flink/python/util/PythonConfigUtilTest.java +++ b/flink-python/src/test/java/org/apache/flink/python/util/PythonConfigUtilTest.java @@ -39,7 +39,7 @@ class PythonConfigUtilTest { config.set(PipelineOptions.NAME, jobName); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config); - env.fromCollection(Collections.singletonList("test")).sinkTo(new DiscardingSink<>()); + env.fromData(Collections.singletonList("test")).sinkTo(new DiscardingSink<>()); StreamGraph streamGraph = env.getStreamGraph(true); assertThat(streamGraph.getJobName()).isEqualTo(jobName); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java index 18dc49d3895..4b116805166 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java @@ -41,6 +41,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.connector.source.Boundedness; import org.apache.flink.api.connector.source.Source; import org.apache.flink.api.connector.source.lib.NumberSequenceSource; +import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy; import org.apache.flink.api.dag.Transformation; import org.apache.flink.api.java.ClosureCleaner; import org.apache.flink.api.java.Utils; @@ -1474,6 +1475,8 @@ public class StreamExecutionEnvironment implements AutoCloseable { * @param data The collection of elements to create the data stream from. * @param <OUT> The generic type of the returned data stream. * @return The data stream representing the given collection + * @deprecated This method will be removed a future release, possibly as early as version 2.0. + * Use {@link #fromData(Collection)} instead. */ public <OUT> DataStreamSource<OUT> fromCollection(Collection<OUT> data) { TypeInformation<OUT> typeInfo = extractTypeInfoFromCollection(data); @@ -1490,6 +1493,8 @@ public class StreamExecutionEnvironment implements AutoCloseable { * @param typeInfo The TypeInformation for the produced data stream * @param <OUT> The type of the returned data stream * @return The data stream representing the given collection + * @deprecated This method will be removed a future release, possibly as early as version 2.0. + * Use {@link #fromData(Collection, TypeInformation)} instead. */ public <OUT> DataStreamSource<OUT> fromCollection( Collection<OUT> data, TypeInformation<OUT> typeInfo) { @@ -1519,6 +1524,11 @@ public class StreamExecutionEnvironment implements AutoCloseable { * @return The data stream representing the elements in the iterator * @see #fromCollection(java.util.Iterator, * org.apache.flink.api.common.typeinfo.TypeInformation) + * @deprecated This method will be removed a future release, possibly as early as version 2.0. + * Use {@link #fromData(Collection, TypeInformation)} instead. For rate-limited data + * generation, use {@link DataGeneratorSource} with {@link RateLimiterStrategy}. If you need + * to use a fixed set of elements in such scenario, combine it with {@link + * FromElementsGeneratorFunction}. */ public <OUT> DataStreamSource<OUT> fromCollection(Iterator<OUT> data, Class<OUT> type) { return fromCollection(data, TypeExtractor.getForClass(type)); @@ -1540,6 +1550,11 @@ public class StreamExecutionEnvironment implements AutoCloseable { * @param typeInfo The TypeInformation for the produced data stream * @param <OUT> The type of the returned data stream * @return The data stream representing the elements in the iterator + * @deprecated This method will be removed a future release, possibly as early as version 2.0. + * Use {@link #fromData(Collection, TypeInformation)} instead. For rate-limited data + * generation, use {@link DataGeneratorSource} with {@link RateLimiterStrategy}. If you need + * to use a fixed set of elements in such scenario, combine it with {@link + * FromElementsGeneratorFunction}. */ public <OUT> DataStreamSource<OUT> fromCollection( Iterator<OUT> data, TypeInformation<OUT> typeInfo) { diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentTest.java index 0f1aa2c7a08..b2be45482fb 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentTest.java @@ -141,14 +141,6 @@ class StreamExecutionEnvironmentTest { TypeInformation<Integer> typeInfo = BasicTypeInfo.INT_TYPE_INFO; StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - DataStreamSource<Integer> dataStream1 = - env.fromCollection(new DummySplittableIterator<Integer>(), typeInfo); - - assertThatThrownBy(() -> dataStream1.setParallelism(4)) - .isInstanceOf(IllegalArgumentException.class); - - dataStream1.sinkTo(new DiscardingSink<>()); - DataStreamSource<Integer> dataStream2 = env.fromParallelCollection(new DummySplittableIterator<Integer>(), typeInfo) .setParallelism(4); @@ -158,9 +150,6 @@ class StreamExecutionEnvironmentTest { final StreamGraph streamGraph = env.getStreamGraph(); streamGraph.getStreamingPlanAsJSON(); - assertThat(streamGraph.getStreamNode(dataStream1.getId()).getParallelism()) - .as("Parallelism of collection source must be 1.") - .isOne(); assertThat(streamGraph.getStreamNode(dataStream2.getId()).getParallelism()) .as("Parallelism of parallel collection source must be 4.") .isEqualTo(4); @@ -172,6 +161,8 @@ class StreamExecutionEnvironmentTest { @Test void testSources() { + // TODO: remove this test when SourceFunction API gets removed together with the deprecated + // StreamExecutionEnvironment generateSequence() and fromCollection() methods StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); SourceFunction<Integer> srcFun = @@ -329,33 +320,25 @@ class StreamExecutionEnvironmentTest { @Test void testGetStreamGraph() { - try { - TypeInformation<Integer> typeInfo = BasicTypeInfo.INT_TYPE_INFO; - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - - DataStreamSource<Integer> dataStream1 = - env.fromCollection(new DummySplittableIterator<Integer>(), typeInfo); - dataStream1.sinkTo(new DiscardingSink<>()); - assertThat(env.getStreamGraph().getStreamNodes().size()).isEqualTo(2); + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - DataStreamSource<Integer> dataStream2 = - env.fromCollection(new DummySplittableIterator<Integer>(), typeInfo); - dataStream2.sinkTo(new DiscardingSink<>()); - assertThat(env.getStreamGraph().getStreamNodes().size()).isEqualTo(2); - - DataStreamSource<Integer> dataStream3 = - env.fromCollection(new DummySplittableIterator<Integer>(), typeInfo); - dataStream3.sinkTo(new DiscardingSink<>()); - // Does not clear the transformations. - env.getExecutionPlan(); - DataStreamSource<Integer> dataStream4 = - env.fromCollection(new DummySplittableIterator<Integer>(), typeInfo); - dataStream4.sinkTo(new DiscardingSink<>()); - assertThat(env.getStreamGraph().getStreamNodes().size()).isEqualTo(4); - } catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } + DataStreamSource<Integer> dataStream1 = env.fromData(1, 2, 3); + dataStream1.sinkTo(new DiscardingSink<>()); + assertThat(env.getStreamGraph().getStreamNodes().size()).isEqualTo(2); + + DataStreamSource<Integer> dataStream2 = env.fromData(1, 2, 3); + dataStream2.sinkTo(new DiscardingSink<>()); + // Previous getStreamGraph() call cleaned dataStream1 transformations + assertThat(env.getStreamGraph().getStreamNodes().size()).isEqualTo(2); + + DataStreamSource<Integer> dataStream3 = env.fromData(1, 2, 3); + dataStream3.sinkTo(new DiscardingSink<>()); + // Does not clear the transformations. + env.getExecutionPlan(); + DataStreamSource<Integer> dataStream4 = env.fromData(1, 2, 3); + dataStream4.sinkTo(new DiscardingSink<>()); + // dataStream3 are preserved + assertThat(env.getStreamGraph().getStreamNodes().size()).isEqualTo(4); } @Test diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/ExecutorDiscoveryAndJobClientTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/ExecutorDiscoveryAndJobClientTest.java index 516424be6f8..d71ccd343db 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/ExecutorDiscoveryAndJobClientTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/ExecutorDiscoveryAndJobClientTest.java @@ -65,7 +65,7 @@ public class ExecutorDiscoveryAndJobClientTest { private JobExecutionResult executeTestJobBasedOnConfig(final Configuration configuration) throws Exception { final StreamExecutionEnvironment env = new StreamExecutionEnvironment(configuration); - env.fromCollection(Collections.singletonList(42)).sinkTo(new DiscardingSink<>()); + env.fromData(Collections.singletonList(42)).sinkTo(new DiscardingSink<>()); return env.execute(); } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/processor/MultipleInputNodeCreationProcessorTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/processor/MultipleInputNodeCreationProcessorTest.java index 6caf9a5a3aa..b976d0ab650 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/processor/MultipleInputNodeCreationProcessorTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/processor/MultipleInputNodeCreationProcessorTest.java @@ -22,6 +22,7 @@ import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.connector.source.Boundedness; import org.apache.flink.api.connector.source.mocks.MockSource; import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.table.api.TableConfig; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.expressions.ApiExpressionUtils; @@ -39,7 +40,6 @@ import org.junit.jupiter.api.Test; import java.io.File; import java.io.IOException; -import java.util.Arrays; import static org.assertj.core.api.Assertions.assertThat; @@ -125,8 +125,7 @@ class MultipleInputNodeCreationProcessorTest extends TableTestBase { } private void createNonChainableStream(TableTestUtil util) { - DataStreamSource<Integer> dataStream = - util.getStreamEnv().fromCollection(Arrays.asList(1, 2, 3)); + DataStreamSource<Integer> dataStream = util.getStreamEnv().addSource(new LegacySource()); TableTestUtil.createTemporaryView( util.tableEnv(), "nonChainableStream", @@ -172,4 +171,13 @@ class MultipleInputNodeCreationProcessorTest extends TableTestBase { + "'\n" + ")"); } + + private static class LegacySource implements SourceFunction<Integer> { + public void run(SourceContext<Integer> sourceContext) { + sourceContext.collect(1); + } + + @Override + public void cancel() {} + } } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DataStreamJavaITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DataStreamJavaITCase.java index a756210d768..f8ea2a19b9b 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DataStreamJavaITCase.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DataStreamJavaITCase.java @@ -226,7 +226,7 @@ class DataStreamJavaITCase { Row.of(null, Row.of(false, null), Collections.singletonMap("world", null)) }; - final DataStream<Row> dataStream = env.fromCollection(Arrays.asList(rows), typeInfo); + final DataStream<Row> dataStream = env.fromData(Arrays.asList(rows), typeInfo); final TableResult result = tableEnv.fromDataStream(dataStream).execute(); @@ -305,7 +305,7 @@ class DataStreamJavaITCase { Tuple2.of(DayOfWeek.MONDAY, ZoneOffset.UTC), Tuple2.of(DayOfWeek.FRIDAY, ZoneOffset.ofHours(5))); - final DataStream<Tuple2<DayOfWeek, ZoneOffset>> dataStream = env.fromCollection(rawRecords); + final DataStream<Tuple2<DayOfWeek, ZoneOffset>> dataStream = env.fromData(rawRecords); // verify incoming type information assertThat(dataStream.getType()).isInstanceOf(TupleTypeInfo.class); @@ -945,7 +945,7 @@ class DataStreamJavaITCase { private DataStream<Tuple3<Long, Integer, String>> getWatermarkedDataStream() { final DataStream<Tuple3<Long, Integer, String>> dataStream = - env.fromCollection( + env.fromData( Arrays.asList( Tuple3.of(1L, 42, "a"), Tuple3.of(2L, 5, "a"), diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/table/TimeAttributesITCase.scala b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/table/TimeAttributesITCase.scala index 22a08755939..60628f53ef6 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/table/TimeAttributesITCase.scala +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/table/TimeAttributesITCase.scala @@ -20,7 +20,7 @@ package org.apache.flink.table.planner.runtime.stream.table import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy} import org.apache.flink.api.scala.createTypeInformation -import org.apache.flink.core.testutils.FlinkMatchers.containsMessage +import org.apache.flink.streaming.api.functions.source.SourceFunction import org.apache.flink.table.api._ import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase.StateBackendMode import org.apache.flink.table.planner.runtime.utils.{StreamingWithStateTestBase, TestingAppendSink} @@ -36,10 +36,17 @@ import java.time.{Duration, Instant, LocalDateTime, ZoneOffset} class TimeAttributesITCase(mode: StateBackendMode) extends StreamingWithStateTestBase(mode) { @TestTemplate - def testMissingTimeAttributeThrowsCorrectException(): Unit = { - val data = List(1L -> "hello", 2L -> "world") - val stream = env.fromCollection[(Long, String)](data) + def testMissingTimeAttributeInLegacySourceThrowsCorrectException(): Unit = { + //TODO: this test can be removed when SourceFunction gets removed (FLIP-27 sources set TimestampAssigner.NO_TIMESTAMP value as event time when no timestamp is provided. See SourceOutputWithWatermarks#collect()) + val stream = env + .addSource(new SourceFunction[(Long, String)]() { + def run(ctx: SourceFunction.SourceContext[(Long, String)]) { + ctx.collect(1L -> "hello") + ctx.collect(2L -> "world") + } + def cancel() {} + }) tEnv.createTemporaryView("test", stream, $"event_time".rowtime(), $"data") val result = tEnv.sqlQuery("SELECT * FROM test") diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala index f0f6e622952..dee5eda6111 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala @@ -115,7 +115,7 @@ abstract class ExpressionTestBase(isStreaming: Boolean = true) { tEnv.getCatalogManager.getDataTypeFactory.createDataType(testDataType) } if (containsLegacyTypes) { - val ds = env.fromCollection(Collections.emptyList[Row](), typeInfo) + val ds = env.fromData(Collections.emptyList[Row](), typeInfo) tEnv.createTemporaryView(tableName, ds, typeInfo.getFieldNames.map(api.$): _*) functions.foreach(f => tEnv.createTemporarySystemFunction(f._1, f._2)) } else { diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala index 26193a650f4..9d79525373f 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala @@ -91,6 +91,8 @@ import java.nio.file.{Files, Path, Paths} import java.time.Duration import java.util.Collections +import scala.collection.JavaConverters._ + /** Test base for testing Table API / SQL plans. */ abstract class TableTestBase { @@ -1409,7 +1411,7 @@ class TestTableSource(override val isBounded: Boolean, schema: TableSchema) extends StreamTableSource[Row] { override def getDataStream(execEnv: environment.StreamExecutionEnvironment): DataStream[Row] = { - execEnv.fromCollection(List[Row](), getReturnType) + execEnv.fromData(List[Row]().asJava, getReturnType) } override def getReturnType: TypeInformation[Row] = { diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/testTableSourceSinks.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/testTableSourceSinks.scala index dcd3f200a43..ed2c7811947 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/testTableSourceSinks.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/testTableSourceSinks.scala @@ -202,7 +202,7 @@ class TestTableSourceWithTime[T]( with DefinedFieldMapping { override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[T] = { - val dataStream = execEnv.fromCollection(values, returnType) + val dataStream = execEnv.fromData(values.asJava, returnType) dataStream.getTransformation.setMaxParallelism(1) dataStream } @@ -528,7 +528,7 @@ class TestLegacyFilterableTableSource( } execEnv - .fromCollection[Row]( + .fromData[Row]( applyPredicatesToRows(records).asJava, fromDataTypeToTypeInfo(getProducedDataType).asInstanceOf[RowTypeInfo]) .setParallelism(1) @@ -929,7 +929,7 @@ class TestStreamTableSource(tableSchema: TableSchema, values: Seq[Row]) extends StreamTableSource[Row] { override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = { - execEnv.fromCollection(values, tableSchema.toRowType) + execEnv.fromData(values.asJava, tableSchema.toRowType) } override def getProducedDataType: DataType = tableSchema.toRowDataType @@ -1087,7 +1087,7 @@ class TestPartitionableTableSource( data.values.flatten } - execEnv.fromCollection[Row](remainingData, getReturnType).setParallelism(1).setMaxParallelism(1) + execEnv.fromData[Row](remainingData, getReturnType).setParallelism(1).setMaxParallelism(1) } override def explainSource(): String = { @@ -1218,7 +1218,7 @@ class WithoutTimeAttributesTableSource(bounded: Boolean) extends StreamTableSour ) val dataStream = execEnv - .fromCollection(data) + .fromData(data.asJava) .returns(fromDataTypeToTypeInfo(getProducedDataType).asInstanceOf[RowTypeInfo]) dataStream.getTransformation.setMaxParallelism(1) dataStream diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/multipleinput/MultipleInputTestBase.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/multipleinput/MultipleInputTestBase.java index 24bdf3ca17b..8420d70221f 100644 --- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/multipleinput/MultipleInputTestBase.java +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/multipleinput/MultipleInputTestBase.java @@ -53,7 +53,7 @@ import java.util.stream.Collectors; public class MultipleInputTestBase { protected Transformation<RowData> createSource(StreamExecutionEnvironment env, String... data) { - return env.fromCollection( + return env.fromData( Arrays.stream(data) .map(StringData::fromString) .map(GenericRowData::of) diff --git a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/testsuites/SinkTestSuiteBase.java b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/testsuites/SinkTestSuiteBase.java index faeac88842a..461f8dfa432 100644 --- a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/testsuites/SinkTestSuiteBase.java +++ b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/testsuites/SinkTestSuiteBase.java @@ -141,7 +141,7 @@ public abstract class SinkTestSuiteBase<T extends Comparable<T>> { .build()); execEnv.enableCheckpointing(50); DataStream<T> dataStream = - execEnv.fromCollection(testRecords) + execEnv.fromData(testRecords) .name("sourceInSinkTest") .setParallelism(1) .returns(externalContext.getProducedType()); diff --git a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java index d048048f8a2..c0b5e017865 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java @@ -133,7 +133,7 @@ public class AccumulatorLiveITCase extends TestLogger { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); - DataStream<Integer> input = env.fromCollection(inputData); + DataStream<Integer> input = env.fromData(inputData); input.flatMap(new NotifyingMapper()) .writeUsingOutputFormat(new DummyOutputFormat()) .disableChaining(); diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BroadcastStateITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BroadcastStateITCase.java index 01c0987a60d..e9822515434 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BroadcastStateITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BroadcastStateITCase.java @@ -85,7 +85,7 @@ public class BroadcastStateITCase extends AbstractTestBase { .keyBy((KeySelector<Long, Long>) value -> value); final DataStream<String> srcTwo = - env.fromCollection(expected.values()) + env.fromData(expected.values()) .assignTimestampsAndWatermarks( new CustomWmEmitter<String>() { @@ -145,7 +145,7 @@ public class BroadcastStateITCase extends AbstractTestBase { }); final DataStream<String> srcTwo = - env.fromCollection(expected.values()) + env.fromData(expected.values()) .assignTimestampsAndWatermarks( new CustomWmEmitter<String>() { diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/DataStreamPojoITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/DataStreamPojoITCase.java index 8c195ae854a..6accaf64034 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/DataStreamPojoITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/DataStreamPojoITCase.java @@ -53,7 +53,7 @@ public class DataStreamPojoITCase extends AbstractTestBase { see.getConfig().disableObjectReuse(); see.setParallelism(3); - DataStream<Data> dataStream = see.fromCollection(elements); + DataStream<Data> dataStream = see.fromData(elements); DataStream<Data> summedStream = dataStream @@ -105,7 +105,7 @@ public class DataStreamPojoITCase extends AbstractTestBase { see.getConfig().disableObjectReuse(); see.setParallelism(4); - DataStream<Data> dataStream = see.fromCollection(elements); + DataStream<Data> dataStream = see.fromData(elements); DataStream<Data> summedStream = dataStream @@ -160,7 +160,7 @@ public class DataStreamPojoITCase extends AbstractTestBase { see.getConfig().disableObjectReuse(); see.setParallelism(4); - DataStream<Data> dataStream = see.fromCollection(elements); + DataStream<Data> dataStream = see.fromData(elements); DataStream<Data> summedStream = dataStream @@ -198,7 +198,7 @@ public class DataStreamPojoITCase extends AbstractTestBase { public void testFailOnNestedPojoFieldAccessor() throws Exception { StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment(); - DataStream<Data> dataStream = see.fromCollection(elements); + DataStream<Data> dataStream = see.fromData(elements); dataStream.keyBy("aaa", "stats.count").sum("stats.nonExistingField"); } diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/IterateITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/IterateITCase.java index 6790436f778..147a17dc823 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/IterateITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/IterateITCase.java @@ -420,7 +420,7 @@ public class IterateITCase extends AbstractTestBase { iterated = new boolean[parallelism]; DataStream<Boolean> source = - env.fromCollection(Collections.nCopies(parallelism * 2, false)) + env.fromData(Collections.nCopies(parallelism * 2, false)) .map(noOpBoolMap) .name("ParallelizeMap"); @@ -693,7 +693,7 @@ public class IterateITCase extends AbstractTestBase { env.enableCheckpointing(); DataStream<Boolean> source = - env.fromCollection(Collections.nCopies(parallelism * 2, false)) + env.fromData(Collections.nCopies(parallelism * 2, false)) .map(noOpBoolMap) .name("ParallelizeMap"); diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/LatencyMarkerITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/LatencyMarkerITCase.java index 1340377a519..43f0a835a21 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/LatencyMarkerITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/LatencyMarkerITCase.java @@ -21,7 +21,6 @@ import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; -import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -32,7 +31,6 @@ import org.apache.flink.util.Collector; import org.junit.Assert; import org.junit.Test; -import java.util.Collections; import java.util.List; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -55,13 +53,11 @@ public class LatencyMarkerITCase { List<Integer> broadcastData = IntStream.range(0, inputCount).boxed().collect(Collectors.toList()); - DataStream<Integer> broadcastDataStream = - env.fromCollection(broadcastData).setParallelism(1); + DataStream<Integer> broadcastDataStream = env.fromData(broadcastData).setParallelism(1); // broadcast the configurations and create the broadcast state - DataStream<String> streamWithoutData = - env.fromCollection(Collections.emptyList(), TypeInformation.of(String.class)); + DataStream<String> dataStream = env.fromData("test"); MapStateDescriptor<String, Integer> stateDescriptor = new MapStateDescriptor<>( @@ -70,7 +66,7 @@ public class LatencyMarkerITCase { BasicTypeInfo.INT_TYPE_INFO); SingleOutputStreamOperator<Integer> processor = - streamWithoutData + dataStream .connect(broadcastDataStream.broadcast(stateDescriptor)) .process( new BroadcastProcessFunction<String, Integer, Integer>() { diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/PartitionerITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/PartitionerITCase.java index 8489bf9b2aa..ae988039083 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/PartitionerITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/PartitionerITCase.java @@ -99,7 +99,7 @@ public class PartitionerITCase extends AbstractTestBase { env.setParallelism(PARALLELISM); DataStream<Tuple1<String>> src = - env.fromCollection(INPUT.stream().map(Tuple1::of).collect(Collectors.toList())); + env.fromData(INPUT.stream().map(Tuple1::of).collect(Collectors.toList())); // partition by hash src.keyBy(0).map(new SubtaskIndexAssigner()).addSink(hashPartitionResultSink); diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java index c8d42a64280..db26f2e1e3e 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java @@ -231,7 +231,7 @@ public class SideOutputITCase extends AbstractTestBase implements Serializable { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(3); - DataStream<Integer> dataStream = env.fromCollection(elements); + DataStream<Integer> dataStream = env.fromData(elements); SingleOutputStreamOperator<Integer> passThroughtStream = dataStream.process( @@ -273,7 +273,7 @@ public class SideOutputITCase extends AbstractTestBase implements Serializable { env.getConfig().enableObjectReuse(); env.setParallelism(3); - DataStream<Integer> dataStream = env.fromCollection(elements); + DataStream<Integer> dataStream = env.fromData(elements); SingleOutputStreamOperator<Integer> passThroughtStream = dataStream.process( @@ -316,7 +316,7 @@ public class SideOutputITCase extends AbstractTestBase implements Serializable { env.getConfig().enableObjectReuse(); env.setParallelism(3); - DataStream<Integer> dataStream = env.fromCollection(elements); + DataStream<Integer> dataStream = env.fromData(elements); SingleOutputStreamOperator<Integer> passThroughtStream = dataStream.process( @@ -356,7 +356,7 @@ public class SideOutputITCase extends AbstractTestBase implements Serializable { StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment(); see.setParallelism(3); - DataStream<Integer> dataStream = see.fromCollection(elements); + DataStream<Integer> dataStream = see.fromData(elements); SingleOutputStreamOperator<Integer> passThroughtStream = dataStream.process( @@ -390,7 +390,7 @@ public class SideOutputITCase extends AbstractTestBase implements Serializable { StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment(); see.setParallelism(3); - DataStream<Integer> dataStream = see.fromCollection(elements); + DataStream<Integer> dataStream = see.fromData(elements); SingleOutputStreamOperator<Integer> passThroughtStream = dataStream.process( @@ -427,8 +427,8 @@ public class SideOutputITCase extends AbstractTestBase implements Serializable { StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment(); see.setParallelism(3); - DataStream<Integer> ds1 = see.fromCollection(elements); - DataStream<Integer> ds2 = see.fromCollection(elements); + DataStream<Integer> ds1 = see.fromData(elements); + DataStream<Integer> ds2 = see.fromData(elements); SingleOutputStreamOperator<Integer> passThroughtStream = ds1.connect(ds2) @@ -482,8 +482,8 @@ public class SideOutputITCase extends AbstractTestBase implements Serializable { StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment(); see.setParallelism(3); - DataStream<Integer> ds1 = see.fromCollection(elements); - DataStream<Integer> ds2 = see.fromCollection(elements); + DataStream<Integer> ds1 = see.fromData(elements); + DataStream<Integer> ds2 = see.fromData(elements); SingleOutputStreamOperator<Integer> passThroughtStream = ds1.connect(ds2) @@ -538,7 +538,7 @@ public class SideOutputITCase extends AbstractTestBase implements Serializable { StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment(); see.setParallelism(3); - DataStream<Integer> dataStream = see.fromCollection(elements); + DataStream<Integer> dataStream = see.fromData(elements); SingleOutputStreamOperator<Integer> passThroughtStream = dataStream @@ -586,8 +586,8 @@ public class SideOutputITCase extends AbstractTestBase implements Serializable { StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment(); see.setParallelism(3); - DataStream<Integer> ds1 = see.fromCollection(elements); - DataStream<Integer> ds2 = see.fromCollection(elements); + DataStream<Integer> ds1 = see.fromData(elements); + DataStream<Integer> ds2 = see.fromData(elements); SingleOutputStreamOperator<Integer> passThroughtStream = ds1.keyBy(i -> i) @@ -640,8 +640,8 @@ public class SideOutputITCase extends AbstractTestBase implements Serializable { StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment(); see.setParallelism(3); - DataStream<Integer> ds1 = see.fromCollection(elements); - DataStream<Integer> ds2 = see.fromCollection(elements); + DataStream<Integer> ds1 = see.fromData(elements); + DataStream<Integer> ds2 = see.fromData(elements); SingleOutputStreamOperator<Integer> passThroughtStream = ds1.keyBy(i -> i) @@ -707,8 +707,8 @@ public class SideOutputITCase extends AbstractTestBase implements Serializable { StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment(); see.setParallelism(3); - DataStream<Integer> ds1 = see.fromCollection(elements); - DataStream<Integer> ds2 = see.fromCollection(elements); + DataStream<Integer> ds1 = see.fromData(elements); + DataStream<Integer> ds2 = see.fromData(elements); SingleOutputStreamOperator<Integer> passThroughtStream = ds1.keyBy(i -> i) @@ -766,8 +766,8 @@ public class SideOutputITCase extends AbstractTestBase implements Serializable { StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment(); see.setParallelism(3); - DataStream<Integer> ds1 = see.fromCollection(elements); - DataStream<Integer> ds2 = see.fromCollection(elements); + DataStream<Integer> ds1 = see.fromData(elements); + DataStream<Integer> ds2 = see.fromData(elements); SingleOutputStreamOperator<Integer> passThroughtStream = ds1.keyBy(i -> i) @@ -830,7 +830,7 @@ public class SideOutputITCase extends AbstractTestBase implements Serializable { StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment(); see.setParallelism(3); - DataStream<Integer> dataStream = see.fromCollection(elements); + DataStream<Integer> dataStream = see.fromData(elements); dataStream .process( @@ -886,7 +886,7 @@ public class SideOutputITCase extends AbstractTestBase implements Serializable { StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment(); see.setParallelism(1); - DataStream<Integer> dataStream = see.fromCollection(elements); + DataStream<Integer> dataStream = see.fromData(elements); OutputTag<Integer> lateDataTag = new OutputTag<Integer>("late") {}; @@ -939,7 +939,7 @@ public class SideOutputITCase extends AbstractTestBase implements Serializable { StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment(); see.setParallelism(3); - DataStream<Integer> dataStream = see.fromCollection(elements); + DataStream<Integer> dataStream = see.fromData(elements); OutputTag<Integer> lateDataTag = new OutputTag<Integer>("late") {}; @@ -989,7 +989,7 @@ public class SideOutputITCase extends AbstractTestBase implements Serializable { StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment(); see.setParallelism(3); - DataStream<Integer> dataStream = see.fromCollection(elements); + DataStream<Integer> dataStream = see.fromData(elements); OutputTag<String> sideOutputTag = new OutputTag<String>("side") {}; @@ -1036,7 +1036,7 @@ public class SideOutputITCase extends AbstractTestBase implements Serializable { StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment(); see.setParallelism(1); - DataStream<Integer> dataStream = see.fromCollection(elements); + DataStream<Integer> dataStream = see.fromData(elements); OutputTag<String> sideOutputTag = new OutputTag<String>("side") {}; diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkITCase.java index fdc79986223..2eaa88b045c 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkITCase.java @@ -151,7 +151,7 @@ public class SinkITCase extends AbstractTestBase { public void writerAndCommitterAndGlobalCommitterExecuteInBatchMode() throws Exception { final StreamExecutionEnvironment env = buildBatchEnv(); - env.fromCollection(SOURCE_DATA) + env.fromData(SOURCE_DATA) .sinkTo( TestSink.newBuilder() .setDefaultCommitter( @@ -193,7 +193,7 @@ public class SinkITCase extends AbstractTestBase { public void writerAndCommitterExecuteInBatchMode() throws Exception { final StreamExecutionEnvironment env = buildBatchEnv(); - env.fromCollection(SOURCE_DATA) + env.fromData(SOURCE_DATA) .sinkTo( TestSink.newBuilder() .setDefaultCommitter( @@ -238,7 +238,7 @@ public class SinkITCase extends AbstractTestBase { public void writerAndGlobalCommitterExecuteInBatchMode() throws Exception { final StreamExecutionEnvironment env = buildBatchEnv(); - env.fromCollection(SOURCE_DATA) + env.fromData(SOURCE_DATA) .sinkTo( TestSink.newBuilder() .setCommittableSerializer( diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2ITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2ITCase.java index 70d94004748..b23dc34ed74 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2ITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2ITCase.java @@ -107,7 +107,7 @@ public class SinkV2ITCase extends AbstractTestBase { public void writerAndCommitterExecuteInBatchMode() throws Exception { final StreamExecutionEnvironment env = buildBatchEnv(); - env.fromCollection(SOURCE_DATA) + env.fromData(SOURCE_DATA) .sinkTo( TestSinkV2.<Integer>newBuilder() .setDefaultCommitter( diff --git a/tools/maven/checkstyle.xml b/tools/maven/checkstyle.xml index f03fea8d578..f313c69eba0 100644 --- a/tools/maven/checkstyle.xml +++ b/tools/maven/checkstyle.xml @@ -61,7 +61,7 @@ This file is based on the checkstyle file of Apache Beam. --> <module name="FileLength"> - <property name="max" value="3000"/> + <property name="max" value="3100"/> </module> <!-- All Java AST specific tests live under TreeWalker module. -->