gyfora commented on code in PR #783:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/783#discussion_r1501754751


##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java:
##########
@@ -229,13 +229,14 @@ Map<JobVertexID, ScalingSummary> computeScalingSummary(
                                 var currentParallelism =
                                         (int) 
metrics.get(ScalingMetric.PARALLELISM).getCurrent();
 
-                                final boolean hasKeyBy =
-                                        
jobTopology.get(v).getInputs().containsValue(HASH);
+                                var inputs = jobTopology.get(v).getInputs();
+                                var adjustByMaxParallelism =
+                                        inputs.isEmpty() || 
inputs.containsValue(HASH);
                                 var newParallelism =
                                         
jobVertexScaler.computeScaleTargetParallelism(
                                                 context,
                                                 v,
-                                                hasKeyBy,
+                                                adjustByMaxParallelism,
                                                 metrics,

Review Comment:
   Can we please add a single unit test case to cover this core piece of the 
logic? Just to make sure we don't mess up sources in the future unintentionally 
 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to