afedulov commented on code in PR #23553: URL: https://github.com/apache/flink/pull/23553#discussion_r1370757489
########## flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java: ########## @@ -566,15 +567,15 @@ public void testMaxParallelismWithConnectedKeyedStream() { int maxParallelism = 42; StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - DataStream<Integer> input1 = env.fromElements(1, 2, 3, 4).setMaxParallelism(128); - DataStream<Integer> input2 = env.fromElements(1, 2, 3, 4).setMaxParallelism(129); + DataStream<Long> input1 = env.fromSequence(1, 4).setMaxParallelism(128); + DataStream<Long> input2 = env.fromSequence(1, 4).setMaxParallelism(129); Review Comment: Yes, this fixes test failures that arise because of this: https://github.com/apache/flink/pull/23553/files#diff-4a5eb9032bed78bb9f18e6523d4f7b3dc86ed10e3a3689757c1c4fa2335e7255R1307 the SingleOutputStreamOperator caps max parallelism to 1. Since this PR is already pretty sizable, it seemed appropriate to postpone dealing with this when we work on `fromSequence`. ########## flink-architecture-tests/flink-architecture-tests-production/archunit-violations/e5126cae-f3fe-48aa-b6fb-60ae6cc3fcd5: ########## @@ -19,8 +19,8 @@ Method <org.apache.flink.runtime.io.network.partition.ResourceManagerPartitionTr Method <org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator.closeAsync(long)> calls method <org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$QuiesceableContext.quiesce()> in (RecreateOnResetOperatorCoordinator.java:361) Method <org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration.fromConfiguration(org.apache.flink.configuration.Configuration, org.apache.flink.runtime.taskexecutor.TaskExecutorResourceSpec, java.lang.String, java.io.File)> calls method <org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils.generateDefaultSlotResourceProfile(org.apache.flink.runtime.taskexecutor.TaskExecutorResourceSpec, int)> in (TaskManagerConfiguration.java:244) Method <org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration.fromConfiguration(org.apache.flink.configuration.Configuration, org.apache.flink.runtime.taskexecutor.TaskExecutorResourceSpec, java.lang.String, java.io.File)> calls method <org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils.generateTotalAvailableResourceProfile(org.apache.flink.runtime.taskexecutor.TaskExecutorResourceSpec)> in (TaskManagerConfiguration.java:246) -Method <org.apache.flink.runtime.taskexecutor.TaskManagerServices.createTaskSlotTable(int, org.apache.flink.runtime.taskexecutor.TaskExecutorResourceSpec, long, int, java.util.concurrent.Executor)> calls method <org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils.generateDefaultSlotResourceProfile(org.apache.flink.runtime.taskexecutor.TaskExecutorResourceSpec, int)> in (TaskManagerServices.java:433) -Method <org.apache.flink.runtime.taskexecutor.TaskManagerServices.createTaskSlotTable(int, org.apache.flink.runtime.taskexecutor.TaskExecutorResourceSpec, long, int, java.util.concurrent.Executor)> calls method <org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils.generateTotalAvailableResourceProfile(org.apache.flink.runtime.taskexecutor.TaskExecutorResourceSpec)> in (TaskManagerServices.java:431) Review Comment: Indeed. I had to enable refreeze to add missing datagen source violations, but how exactly it is supposed to work it archunit is still a bit of a mystery to me to be honest. ########## flink-architecture-tests/flink-architecture-tests-production/archunit-violations/f7a4e6fa-e7de-48c9-a61e-c13e83f0c72e: ########## @@ -64,6 +64,19 @@ Constructor <org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher. Constructor <org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.<init>(int, org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue, org.apache.flink.connector.base.source.reader.splitreader.SplitReader, java.util.function.Consumer, java.lang.Runnable, java.util.function.Consumer, boolean)> calls method <org.apache.flink.util.Preconditions.checkNotNull(java.lang.Object)> in (SplitFetcher.java:97) Constructor <org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.<init>(org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue, java.util.function.Supplier, org.apache.flink.configuration.Configuration, java.util.function.Consumer)> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (SplitFetcherManager.java:0) Constructor <org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue.<init>(int)> calls method <org.apache.flink.util.Preconditions.checkArgument(boolean, java.lang.Object)> in (FutureCompletingBlockingQueue.java:114) +Constructor <org.apache.flink.connector.datagen.functions.FromElementsGeneratorFunction.<init>(org.apache.flink.api.common.typeutils.TypeSerializer, java.lang.Iterable)> calls method <org.apache.flink.util.Preconditions.checkNotNull(java.lang.Object)> in (FromElementsGeneratorFunction.java:85) Review Comment: That's what I thought too. As also tracked in the wiki, the DataGen will not be externalized: https://cwiki.apache.org/confluence/display/FLINK/Externalized+Connector+development -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org