afedulov commented on code in PR #23553:
URL: https://github.com/apache/flink/pull/23553#discussion_r1370757489


##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java:
##########
@@ -566,15 +567,15 @@ public void testMaxParallelismWithConnectedKeyedStream() {
         int maxParallelism = 42;
 
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        DataStream<Integer> input1 = env.fromElements(1, 2, 3, 
4).setMaxParallelism(128);
-        DataStream<Integer> input2 = env.fromElements(1, 2, 3, 
4).setMaxParallelism(129);
+        DataStream<Long> input1 = env.fromSequence(1, 
4).setMaxParallelism(128);
+        DataStream<Long> input2 = env.fromSequence(1, 
4).setMaxParallelism(129);

Review Comment:
   Yes, this fixes test failures that arise because of this: 
https://github.com/apache/flink/pull/23553/files#diff-4a5eb9032bed78bb9f18e6523d4f7b3dc86ed10e3a3689757c1c4fa2335e7255R1307
   the SingleOutputStreamOperator caps max parallelism to 1. 
   Since this PR is already pretty sizable, it seemed appropriate to postpone 
dealing with this when we work on `fromSequence`.



##########
flink-architecture-tests/flink-architecture-tests-production/archunit-violations/e5126cae-f3fe-48aa-b6fb-60ae6cc3fcd5:
##########
@@ -19,8 +19,8 @@ Method 
<org.apache.flink.runtime.io.network.partition.ResourceManagerPartitionTr
 Method 
<org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator.closeAsync(long)>
 calls method 
<org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$QuiesceableContext.quiesce()>
 in (RecreateOnResetOperatorCoordinator.java:361)
 Method 
<org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration.fromConfiguration(org.apache.flink.configuration.Configuration,
 org.apache.flink.runtime.taskexecutor.TaskExecutorResourceSpec, 
java.lang.String, java.io.File)> calls method 
<org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils.generateDefaultSlotResourceProfile(org.apache.flink.runtime.taskexecutor.TaskExecutorResourceSpec,
 int)> in (TaskManagerConfiguration.java:244)
 Method 
<org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration.fromConfiguration(org.apache.flink.configuration.Configuration,
 org.apache.flink.runtime.taskexecutor.TaskExecutorResourceSpec, 
java.lang.String, java.io.File)> calls method 
<org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils.generateTotalAvailableResourceProfile(org.apache.flink.runtime.taskexecutor.TaskExecutorResourceSpec)>
 in (TaskManagerConfiguration.java:246)
-Method 
<org.apache.flink.runtime.taskexecutor.TaskManagerServices.createTaskSlotTable(int,
 org.apache.flink.runtime.taskexecutor.TaskExecutorResourceSpec, long, int, 
java.util.concurrent.Executor)> calls method 
<org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils.generateDefaultSlotResourceProfile(org.apache.flink.runtime.taskexecutor.TaskExecutorResourceSpec,
 int)> in (TaskManagerServices.java:433)
-Method 
<org.apache.flink.runtime.taskexecutor.TaskManagerServices.createTaskSlotTable(int,
 org.apache.flink.runtime.taskexecutor.TaskExecutorResourceSpec, long, int, 
java.util.concurrent.Executor)> calls method 
<org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils.generateTotalAvailableResourceProfile(org.apache.flink.runtime.taskexecutor.TaskExecutorResourceSpec)>
 in (TaskManagerServices.java:431)

Review Comment:
   Indeed. I had to enable refreeze to add missing datagen source violations, 
but how exactly it is supposed to work it archunit is still a bit of a mystery 
to me to be honest. 



##########
flink-architecture-tests/flink-architecture-tests-production/archunit-violations/f7a4e6fa-e7de-48c9-a61e-c13e83f0c72e:
##########
@@ -64,6 +64,19 @@ Constructor 
<org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.
 Constructor 
<org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.<init>(int, 
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue,
 org.apache.flink.connector.base.source.reader.splitreader.SplitReader, 
java.util.function.Consumer, java.lang.Runnable, java.util.function.Consumer, 
boolean)> calls method 
<org.apache.flink.util.Preconditions.checkNotNull(java.lang.Object)> in 
(SplitFetcher.java:97)
 Constructor 
<org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.<init>(org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue,
 java.util.function.Supplier, org.apache.flink.configuration.Configuration, 
java.util.function.Consumer)> is annotated with 
<org.apache.flink.annotation.VisibleForTesting> in (SplitFetcherManager.java:0)
 Constructor 
<org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue.<init>(int)>
 calls method <org.apache.flink.util.Preconditions.checkArgument(boolean, 
java.lang.Object)> in (FutureCompletingBlockingQueue.java:114)
+Constructor 
<org.apache.flink.connector.datagen.functions.FromElementsGeneratorFunction.<init>(org.apache.flink.api.common.typeutils.TypeSerializer,
 java.lang.Iterable)> calls method 
<org.apache.flink.util.Preconditions.checkNotNull(java.lang.Object)> in 
(FromElementsGeneratorFunction.java:85)

Review Comment:
   That's what I thought too. As also tracked in the wiki, the DataGen will not 
be externalized:
   
https://cwiki.apache.org/confluence/display/FLINK/Externalized+Connector+development



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to