Dennis-Mircea commented on code in PR #1088: URL: https://github.com/apache/flink-kubernetes-operator/pull/1088#discussion_r3491908950
########## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/alignment/ParallelismAlignmentMode.java: ########## @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.autoscaler.alignment; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.autoscaler.JobAutoScalerContext; +import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric; +import org.apache.flink.autoscaler.metrics.ScalingMetric; +import org.apache.flink.autoscaler.topology.JobTopology; +import org.apache.flink.autoscaler.topology.ShipStrategy; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.jobgraph.JobVertexID; + +import lombok.Value; + +import java.util.Collection; +import java.util.Map; + +/** + * Determines the parallelism to apply for a vertex, given the autoscaler's computed target and the + * surrounding {@link Context}. + * + * <p>An alignment mode reasons about regions relative to the current parallelism and the computed + * target: + * + * <pre>{@code + * scale-up: + * current │── within-range ──│ target │── above-target ──│ upperAlignLimit + * scale-down: + * lowerAlignLimit │── below-target ──│ target │── within-range ──│ current + * }</pre> + * + * <p>This is the extension seam for tuning how the computed parallelism is adjusted. The built-in + * behaviors are provided by {@link BuiltInAlignmentMode}. Custom implementations are discovered as + * plugins (via {@code ServiceLoader} in the standalone autoscaler, or Flink's {@code PluginManager} + * in the operator) and selected by name through {@code scaling.alignment.mode} plus {@code + * scaling.alignment.mode.<name>.class}. + * + * <p>An implementation may keep the computed target unchanged or adjust it, and decides for itself + * whether it applies to a given vertex (see {@link #isApplicable(Context)}). + */ +@Experimental +public interface AlignmentMode { + + /** + * Whether this mode applies to the vertex described by {@code ctx}. When it returns {@code + * false} the autoscaler keeps the computed target parallelism and {@link #align(Context)} is + * not called. Defaults to keyBy (hash) vertices and to partitioned sources that report a + * partition count (Kafka and Pulsar do by default). A custom mode can widen this, for example + * to align custom partitioned vertices. + */ + default boolean isApplicable(Context ctx) { + return ctx.getNumSourcePartitions() > 0 + || ctx.getInputShipStrategies().contains(ShipStrategy.HASH); + } + + /** + * Returns the parallelism to apply for the vertex described by {@code ctx}. Called only when + * {@link #isApplicable(Context)} returned {@code true}. + */ + int align(Context ctx); + + /** + * Immutable inputs to a single per-vertex parallelism alignment, handed to {@link + * AlignmentMode#align(Context)}. Besides the parallelism inputs it exposes the full autoscaler + * context, the per-vertex evaluated metrics, the job topology, and a prefix-stripped + * configuration for the selected mode, so custom modes have what they need. + */ + @Value + class Context { + /** The vertex being aligned. */ + JobVertexID vertex; + + /** The current parallelism of the vertex. */ + int currentParallelism; + + /** The clamped computed target parallelism, before alignment. */ + int newParallelism; + + /** The number of source partitions, or a non-positive value when not a source. */ + int numSourcePartitions; + + /** The vertex max parallelism (number of key groups). */ + int maxParallelism; Review Comment: I looked into combining `numSourcePartitions` and `maxParallelism` into a single value, but I don't think it makes sense because both are needed: - `isApplicable` distinguishes a partitioned source (`numSourcePartitions > 0`) from a keyBy (`HASH`). A single combined value can't express that, every applicable vertex would just have a positive divisor with no way to tell the two apart. - The alignment cap is `min(numKeyGroupsOrPartitions, min(maxParallelism, parallelismUpperLimit))`. For a source, `numKeyGroupsOrPartitions = partitions != maxParallelism`, so with only the combined value you'd compute `min(partitions, upperLimit)` and lose the `maxParallelism` cap, letting a source align above its key-group limit (which Flink won't allow). You need both numbers to compute the cap. The `ParallelismAligner.numKeyGroupsOrPartitions` is a derived quantity used internally where only the divisor matters, it's not a substitute for the two raw inputs the `Context` has to carry. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
