mxm commented on code in PR #698:
URL:
https://github.com/apache/flink-kubernetes-operator/pull/698#discussion_r1377403138
##########
flink-autoscaler/pom.xml:
##########
@@ -43,14 +43,44 @@ under the License.
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime</artifactId>
<version>${flink.version}</version>
- <scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
- <scope>provided</scope>
+ </dependency>
+
+ <!-- Logging -->
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <version>${slf4j.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-slf4j-impl</artifactId>
+ <version>${log4j.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-api</artifactId>
+ <version>${log4j.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-core</artifactId>
+ <version>${log4j.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-1.2-api</artifactId>
+ <version>${log4j.version}</version>
Review Comment:
It would be nice to move these changes to a separate module.
##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/LoggableEventHandler.java:
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.event;
+
+import org.apache.flink.autoscaler.JobAutoScalerContext;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+
+/** The loggable autoscaler event handler. */
+public class LoggableEventHandler<KEY, Context extends
JobAutoScalerContext<KEY>>
+ implements AutoScalerEventHandler<KEY, Context> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(LoggableEventHandler.class);
+
+ @Override
+ public void handleEvent(
+ Context context,
+ Type type,
+ String reason,
+ String message,
+ @Nullable String messageKey,
+ @Nullable Duration interval) {
+ LOG.info(
+ "Handle autoscaler event, job key : {}, type : {}, reason :
{}, message : {}, messageKey : {}, interval : {}.",
Review Comment:
```suggestion
"Autoscaler event, job key : {}, type : {}, reason : {},
message : {}, messageKey : {}, interval : {}.",
```
##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/realizer/RescaleApiScalingRealizer.java:
##########
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.realizer;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.autoscaler.JobAutoScalerContext;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.autoscaler.event.AutoScalerEventHandler;
+import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.jobgraph.JobResourceRequirements;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.JobVertexResourceRequirements;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import org.apache.flink.runtime.rest.messages.job.JobResourceRequirementsBody;
+import
org.apache.flink.runtime.rest.messages.job.JobResourceRequirementsHeaders;
+import
org.apache.flink.runtime.rest.messages.job.JobResourcesRequirementsUpdateHeaders;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/** Rescaling job based on rescale api. */
Review Comment:
```suggestion
/** A ScalingRealizer which uses the Rescale API to apply parallelism
changes. */
```
##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/LoggableEventHandler.java:
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.event;
+
+import org.apache.flink.autoscaler.JobAutoScalerContext;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+
+/** The loggable autoscaler event handler. */
Review Comment:
```suggestion
/** Autoscaler event handler which logs events. */
```
##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/realizer/RescaleApiScalingRealizer.java:
##########
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.realizer;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.autoscaler.JobAutoScalerContext;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.autoscaler.event.AutoScalerEventHandler;
+import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.jobgraph.JobResourceRequirements;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.JobVertexResourceRequirements;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import org.apache.flink.runtime.rest.messages.job.JobResourceRequirementsBody;
+import
org.apache.flink.runtime.rest.messages.job.JobResourceRequirementsHeaders;
+import
org.apache.flink.runtime.rest.messages.job.JobResourcesRequirementsUpdateHeaders;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/** Rescaling job based on rescale api. */
+public class RescaleApiScalingRealizer<KEY, Context extends
JobAutoScalerContext<KEY>>
+ implements ScalingRealizer<KEY, Context> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(RescaleApiScalingRealizer.class);
+
+ private final AutoScalerEventHandler<KEY, JobAutoScalerContext<KEY>>
eventHandler;
+
+ public RescaleApiScalingRealizer(
+ AutoScalerEventHandler<KEY, JobAutoScalerContext<KEY>>
eventHandler) {
+ this.eventHandler = eventHandler;
+ }
+
+ @Override
+ public void realize(Context context, Map<String, String>
parallelismOverrides) {
+ Configuration conf = context.getConfiguration();
+ if (!conf.get(JobManagerOptions.SCHEDULER)
+ .equals(JobManagerOptions.SchedulerType.Adaptive)) {
+ LOG.debug("In-place rescaling is only available with the adaptive
scheduler");
+ return;
+ }
+
+ JobStatus jobStatus = context.getJobStatus();
+ JobID jobID = context.getJobID();
+ if (jobID == null
+ || jobStatus == null
+ || jobStatus.isGloballyTerminalState()
+ || JobStatus.RECONCILING == jobStatus) {
+ LOG.info("Job in terminal or reconciling state cannot be scaled
in-place");
+ return;
+ }
+
+ Duration flinkRestClientTimeout =
conf.get(AutoScalerOptions.FLINK_CLIENT_TIMEOUT);
+
+ try (var client = context.getRestClusterClient()) {
+ var requirements =
+ new HashMap<>(getVertexResources(client, jobID,
flinkRestClientTimeout));
+ boolean parallelismUpdated = false;
+
+ for (Map.Entry<JobVertexID, JobVertexResourceRequirements> entry :
+ requirements.entrySet()) {
+ var jobVertexId = entry.getKey().toString();
+ var parallelism = entry.getValue().getParallelism();
+ var overrideStr = parallelismOverrides.get(jobVertexId);
+
+ // No overrides for this vertex
+ if (overrideStr == null) {
+ continue;
+ }
+
+ // We have an override for the vertex
+ int p = Integer.parseInt(overrideStr);
+ var newParallelism = new
JobVertexResourceRequirements.Parallelism(1, p);
+ // If the requirements changed we mark this as scaling
triggered
+ if (!parallelism.equals(newParallelism)) {
+ entry.setValue(new
JobVertexResourceRequirements(newParallelism));
+ parallelismUpdated = true;
+ }
+ }
+ if (parallelismUpdated) {
+ updateVertexResources(client, jobID, flinkRestClientTimeout,
requirements);
+ eventHandler.handleEvent(
+ context,
+ AutoScalerEventHandler.Type.Normal,
+ "Scaling",
+ String.format(
+ "In-place scaling triggered, the new
requirements is %s.",
+ requirements),
+ null,
+ null);
+ } else {
+ LOG.info("Vertex resources requirements already match target,
nothing to do...");
+ }
+ } catch (Exception e) {
+ LOG.warn("Scaling realize exception.", e);
Review Comment:
```suggestion
LOG.warn("Failed to apply parallelism overrides.", e);
```
##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/standalone/JobListFetcherFactory.java:
##########
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.standalone;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.java.utils.ParameterTool;
+
+import java.time.Duration;
+
+/** The factory for {@link JobListFetcher}. */
+@Experimental
+public interface JobListFetcherFactory<KEY> {
+
+ JobListFetcher<KEY> create(ParameterTool parameters, Duration
restClientTimeout);
Review Comment:
+1 We can construct the implementation directly.
##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/realizer/RescaleApiScalingRealizer.java:
##########
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.realizer;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.autoscaler.JobAutoScalerContext;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.autoscaler.event.AutoScalerEventHandler;
+import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.jobgraph.JobResourceRequirements;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.JobVertexResourceRequirements;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import org.apache.flink.runtime.rest.messages.job.JobResourceRequirementsBody;
+import
org.apache.flink.runtime.rest.messages.job.JobResourceRequirementsHeaders;
+import
org.apache.flink.runtime.rest.messages.job.JobResourcesRequirementsUpdateHeaders;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/** Rescaling job based on rescale api. */
+public class RescaleApiScalingRealizer<KEY, Context extends
JobAutoScalerContext<KEY>>
+ implements ScalingRealizer<KEY, Context> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(RescaleApiScalingRealizer.class);
+
+ private final AutoScalerEventHandler<KEY, JobAutoScalerContext<KEY>>
eventHandler;
+
+ public RescaleApiScalingRealizer(
+ AutoScalerEventHandler<KEY, JobAutoScalerContext<KEY>>
eventHandler) {
+ this.eventHandler = eventHandler;
+ }
+
+ @Override
+ public void realize(Context context, Map<String, String>
parallelismOverrides) {
+ Configuration conf = context.getConfiguration();
+ if (!conf.get(JobManagerOptions.SCHEDULER)
+ .equals(JobManagerOptions.SchedulerType.Adaptive)) {
+ LOG.debug("In-place rescaling is only available with the adaptive
scheduler");
Review Comment:
Should this be logged at WARN? Users might be surprised no scaling is being
performed.
##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/standalone/JobListFetcher.java:
##########
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.standalone;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.autoscaler.JobAutoScalerContext;
+import org.apache.flink.metrics.MetricGroup;
+
+import java.util.List;
+
+/** The JobListFetcher will fetch the jobContext of all jobs. */
+@Experimental
+public interface JobListFetcher<KEY> {
+
+ List<JobAutoScalerContext<KEY>> fetch(MetricGroup metricGroup) throws
Exception;
Review Comment:
+1, let's just keep the implementation. There are no other consumers for
this interface.
##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/realizer/RescaleApiScalingRealizer.java:
##########
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.realizer;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.autoscaler.JobAutoScalerContext;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.autoscaler.event.AutoScalerEventHandler;
+import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.jobgraph.JobResourceRequirements;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.JobVertexResourceRequirements;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import org.apache.flink.runtime.rest.messages.job.JobResourceRequirementsBody;
+import
org.apache.flink.runtime.rest.messages.job.JobResourceRequirementsHeaders;
+import
org.apache.flink.runtime.rest.messages.job.JobResourcesRequirementsUpdateHeaders;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/** Rescaling job based on rescale api. */
+public class RescaleApiScalingRealizer<KEY, Context extends
JobAutoScalerContext<KEY>>
+ implements ScalingRealizer<KEY, Context> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(RescaleApiScalingRealizer.class);
+
+ private final AutoScalerEventHandler<KEY, JobAutoScalerContext<KEY>>
eventHandler;
+
+ public RescaleApiScalingRealizer(
+ AutoScalerEventHandler<KEY, JobAutoScalerContext<KEY>>
eventHandler) {
+ this.eventHandler = eventHandler;
+ }
+
+ @Override
+ public void realize(Context context, Map<String, String>
parallelismOverrides) {
+ Configuration conf = context.getConfiguration();
+ if (!conf.get(JobManagerOptions.SCHEDULER)
+ .equals(JobManagerOptions.SchedulerType.Adaptive)) {
+ LOG.debug("In-place rescaling is only available with the adaptive
scheduler");
+ return;
+ }
+
+ JobStatus jobStatus = context.getJobStatus();
+ JobID jobID = context.getJobID();
+ if (jobID == null
+ || jobStatus == null
+ || jobStatus.isGloballyTerminalState()
+ || JobStatus.RECONCILING == jobStatus) {
+ LOG.info("Job in terminal or reconciling state cannot be scaled
in-place");
Review Comment:
Should we log at WARNING? This is probably a race condition because the
autoscaler wouldn't scale when not in RUNNING state.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]