1996fanrui commented on code in PR #762:
URL:
https://github.com/apache/flink-kubernetes-operator/pull/762#discussion_r1482361348
##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java:
##########
@@ -250,6 +251,31 @@ private static ConfigOptions.OptionBuilder
autoScalerConfig(String key) {
.withDescription(
"Max allowed percentage of heap usage during
scaling operations. Autoscaling will be paused if the heap usage exceeds this
threshold.");
+ public static final ConfigOption<Boolean> MEMORY_TUNING_ENABLED =
+ autoScalerConfig("memory.tuning.enabled")
+ .booleanType()
+ .defaultValue(false)
+
.withFallbackKeys(oldOperatorConfigKey("memory.tuning.enabled"))
+ .withDescription(
+ "If enabled, the initial amount of memory
specified for TaskManagers will be reduced according to the observed needs.");
+
+ public static final ConfigOption<MemorySize> MEMORY_TUNING_MIN_HEAP =
+ autoScalerConfig("memory.tuning.heap.min")
+ .memoryType()
+ .defaultValue(MemorySize.ofMebiBytes(512L))
+
.withFallbackKeys(oldOperatorConfigKey("memory.tuning.heap.min"))
+ .withDescription(
+ "The minimum amount of TaskManager memory, if
memory tuning is enabled.");
+
+ public static final ConfigOption<Boolean>
MEMORY_TUNING_TRANSFER_HEAP_TO_MANAGED =
+ autoScalerConfig("memory.tuning.heap.transfer-to-managed")
+ .booleanType()
+ .defaultValue(false)
+ .withFallbackKeys(
+
oldOperatorConfigKey("memory.tuning.heap.transfer-to-managed"))
+ .withDescription(
+ "If enabled, any reduction of heap memory will
increase the managed memory for RocksDB. RocksDB needs to be enabled.");
Review Comment:
```suggestion
"If enabled, any reduction of heap memory will
increase the managed memory for RocksDB when rocksdb state backend is used.");
```
How about this?
##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerContext.java:
##########
@@ -52,19 +55,25 @@ public class JobAutoScalerContext<KEY> {
@Nullable @Getter private final JobStatus jobStatus;
+ /** The configuration derived from the current spec. */
Review Comment:
```suggestion
/** The configuration derived from the user-specified spec instead of
actual spec. */
```
##########
flink-autoscaler/src/test/java/org/apache/flink/autoscaler/utils/ResourceCheckUtilsTest.java:
##########
@@ -32,7 +32,7 @@
class ResourceCheckUtilsTest {
@Test
- void testEstimateNumTaskSlotsAfterRescale() {
+ void estimateNumTaskSlotsAfterRescale() {
Review Comment:
Why remove the test prefix? In general, the prefix of all test methods are
`test`.
##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkResourceContext.java:
##########
@@ -65,7 +65,7 @@ public KubernetesJobAutoScalerContext
getJobAutoScalerContext() {
}
private KubernetesJobAutoScalerContext createJobAutoScalerContext() {
- Configuration conf = new Configuration(getObserveConfig());
+ Configuration conf = new
Configuration(getDeployConfig(resource.getSpec()));
Review Comment:
Using `getDeployConfig` here makes sense to me. And I left a comment about
the code comment of `JobAutoScalerContext#configuration`.
It's better to remind others it's user-specified spec instead of actual spec.
##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesScalingRealizer.java:
##########
@@ -43,6 +53,34 @@ public void realize(
getOverrideString(context, parallelismOverrides));
}
+ @Override
+ public void realizeConfigOverrides(
+ KubernetesJobAutoScalerContext context, Configuration
configOverrides) {
+ if (!(context.getResource() instanceof FlinkDeployment)) {
+ // We can't adjust the configuration of non-job deployments.
+ return;
+ }
+ FlinkDeployment flinkDeployment = ((FlinkDeployment)
context.getResource());
+ // Apply config overrides
+
flinkDeployment.getSpec().getFlinkConfiguration().putAll(configOverrides.toMap());
+
+ // Update total memory in spec
+ var totalMemoryOverride =
MemoryTuningUtils.getTotalMemory(configOverrides, context);
+ if (totalMemoryOverride.compareTo(MemorySize.ZERO) <= 0) {
+ LOG.warn("Memory override {} is not valid", totalMemoryOverride);
Review Comment:
nit:
```suggestion
LOG.warn("Total memory override {} is not valid",
totalMemoryOverride);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]