XComp commented on a change in pull request #13004:
URL: https://github.com/apache/flink/pull/13004#discussion_r469728536
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/jobmanager/JobManagerFlinkMemory.java
##########
@@ -69,4 +73,23 @@ public MemorySize getJvmDirectMemorySize() {
public MemorySize getTotalFlinkMemorySize() {
return jvmHeap.add(offHeapMemory);
}
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == this) {
+ return true;
+ } else if (obj instanceof JobManagerFlinkMemory) {
+ JobManagerFlinkMemory that = (JobManagerFlinkMemory)
obj;
+ return Objects.equals(this.jvmHeap, that.jvmHeap) &&
+ Objects.equals(this. offHeapMemory,
that.offHeapMemory);
Review comment:
Here's also a formatting problem having the space between `this.` and
`offHeapMemory`. Alternatively, `this.` could be removed entirely to be on par
with the `hashCode()` implementation.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/JvmMetaspaceAndOverhead.java
##########
@@ -49,4 +50,23 @@ public MemorySize getMetaspace() {
public MemorySize getOverhead() {
return overhead;
}
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == this) {
+ return true;
+ } else if (obj instanceof JvmMetaspaceAndOverhead ) {
+ JvmMetaspaceAndOverhead that =
(JvmMetaspaceAndOverhead) obj;
+ return Objects.equals(this.metaspace, that.metaspace) &&
+ Objects.equals(this. overhead,
that.overhead);
Review comment:
Here as well: space between `this.` and `overhead` shouldn't be there.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/CommonProcessMemorySpec.java
##########
@@ -94,4 +96,23 @@ public MemorySize getTotalFlinkMemorySize() {
public MemorySize getTotalProcessMemorySize() {
return
flinkMemory.getTotalFlinkMemorySize().add(getJvmMetaspaceSize()).add(getJvmOverheadSize());
}
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == this) {
+ return true;
+ } else if (obj instanceof CommonProcessMemorySpec<?> ) {
+ CommonProcessMemorySpec<?> that =
(CommonProcessMemorySpec<?>) obj;
+ return Objects.equals(this.flinkMemory,
that.flinkMemory) &&
+ Objects.equals(this.
jvmMetaspaceAndOverhead, that.jvmMetaspaceAndOverhead);
Review comment:
Here as well: Space between `this.` and `jvmMetaspaceAndOverhead`.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
##########
@@ -1064,6 +1076,13 @@ public void handleError(final Exception exception) {
*/
protected abstract void initialize() throws ResourceManagerException;
+ /**
+ * Terminates the framework specific components.
+ *
+ * @throws Exception with occurs during termination.
+ */
+ protected abstract void terminate() throws Exception;
Review comment:
Just as a side note: As discussed with @tillrohrmann making terminate
being non-blocking would be follow-up feature we might consider later on.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/taskmanager/TaskExecutorFlinkMemory.java
##########
@@ -128,4 +130,31 @@ public MemorySize getJvmDirectMemorySize() {
public MemorySize getTotalFlinkMemorySize() {
return
frameworkHeap.add(frameworkOffHeap).add(taskHeap).add(taskOffHeap).add(network).add(managed);
}
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == this) {
+ return true;
+ } else if (obj instanceof TaskExecutorFlinkMemory) {
+ TaskExecutorFlinkMemory that =
(TaskExecutorFlinkMemory) obj;
+ return Objects.equals(this.frameworkHeap,
that.frameworkHeap) &&
+ Objects.equals(this. frameworkOffHeap,
that.frameworkOffHeap) &&
+ Objects.equals(this. taskHeap,
that.taskHeap)&&
+ Objects.equals(this. taskOffHeap,
that.taskOffHeap)&&
+ Objects.equals(this. network,
that.network)&&
+ Objects.equals(this. managed,
that.managed);
Review comment:
Interesting: I would have thought that this is causing a compilation
error. But it looks like the compiler can deal with spaces between `this.` and
the member. I learned something new. :-) Anyway, the formatting should be fixed
here removing the spaces between `this.` and the member variable and adding a
space before the `&&` operators.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/active/TestingResourceManagerDriver.java
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.active;
+
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.BiConsumerWithException;
+
+import javax.annotation.Nullable;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+/**
+ * Testing implementation of {@link ResourceManagerDriver}.
+ */
+public class TestingResourceManagerDriver implements
ResourceManagerDriver<ResourceID> {
+
+ private final BiConsumerWithException<ResourceEventHandler<ResourceID>,
Executor, Exception> initializeConsumer;
+ private final Supplier<CompletableFuture<Void>> terminateSupplier;
+ private final BiConsumerWithException<ApplicationStatus, String,
Exception> deregisterApplicationConsumer;
+ private final Function<TaskExecutorProcessSpec,
CompletableFuture<ResourceID>> requestResourceFunction;
+ private final Consumer<ResourceID> releaseResourceConsumer;
+
+ private TestingResourceManagerDriver(
+ final
BiConsumerWithException<ResourceEventHandler<ResourceID>, Executor, Exception>
initializeConsumer,
+ final Supplier<CompletableFuture<Void>>
terminateSupplier,
+ final BiConsumerWithException<ApplicationStatus,
String, Exception> deregisterApplicationConsumer,
+ final Function<TaskExecutorProcessSpec,
CompletableFuture<ResourceID>> requestResourceFunction,
+ final Consumer<ResourceID> releaseResourceConsumer) {
+ this.initializeConsumer =
Preconditions.checkNotNull(initializeConsumer);
+ this.terminateSupplier =
Preconditions.checkNotNull(terminateSupplier);
+ this.deregisterApplicationConsumer =
Preconditions.checkNotNull(deregisterApplicationConsumer);
+ this.requestResourceFunction =
Preconditions.checkNotNull(requestResourceFunction);
+ this.releaseResourceConsumer =
Preconditions.checkNotNull(releaseResourceConsumer);
+ }
+
+ @Override
+ public void initialize(ResourceEventHandler<ResourceID>
resourceEventHandler, Executor mainThreadExecutor) throws Exception {
+ initializeConsumer.accept(resourceEventHandler,
mainThreadExecutor);
+ }
+
+ @Override
+ public CompletableFuture<Void> terminate() {
+ return terminateSupplier.get();
+ }
+
+ @Override
+ public void deregisterApplication(ApplicationStatus finalStatus,
@Nullable String optionalDiagnostics) throws Exception {
+ deregisterApplicationConsumer.accept(finalStatus,
optionalDiagnostics);
+ }
+
+ @Override
+ public CompletableFuture<ResourceID>
requestResource(TaskExecutorProcessSpec taskExecutorProcessSpec) {
+ return requestResourceFunction.apply(taskExecutorProcessSpec);
+ }
+
+ @Override
+ public void releaseResource(ResourceID worker) {
+ releaseResourceConsumer.accept(worker);
+ }
+
+ public static class Builder {
+ private
BiConsumerWithException<ResourceEventHandler<ResourceID>, Executor, Exception>
initializeConsumer =
+ (ignore1, ignore2) -> {};
+
+ private Supplier<CompletableFuture<Void>> terminateSupplier =
+ () -> CompletableFuture.completedFuture(null);
+
+ private BiConsumerWithException<ApplicationStatus, String,
Exception> deregisterApplicationConsumer =
+ (ignore1, ignore2) -> {};
+
+ private Function<TaskExecutorProcessSpec,
CompletableFuture<ResourceID>> requestResourceFunction =
+ (ignore) ->
CompletableFuture.completedFuture(ResourceID.generate());
+
+ private Consumer<ResourceID> releaseResourceConsumer =
+ (ignore) -> {};
+
+ public Builder
setInitializeConsumer(BiConsumerWithException<ResourceEventHandler<ResourceID>,
Executor, Exception> initializeConsumer) {
Review comment:
CheckStyle complains about missing JavaDoc here.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ResourceManagerDriver.java
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.active;
+
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
+import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable;
+import
org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
+
+import javax.annotation.Nullable;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * A {@link ResourceManagerDriver} is responsible for requesting and releasing
resources from/to a particular external
+ * resource manager.
+ */
+public interface ResourceManagerDriver<WorkerType extends
ResourceIDRetrievable> {
+
+ /**
+ * Initialize the deployment specific components.
+ *
+ * @param resourceEventHandler Handler that handles resource events.
+ * @param mainThreadExecutor Rpc main thread executor.
+ */
+ void initialize(ResourceEventHandler<WorkerType> resourceEventHandler,
Executor mainThreadExecutor) throws Exception;
+
+ /**
+ * Terminate the deployment specific components.
+ *
+ * @return A future that will be completed successfully when the driver
is terminated, or exceptionally if cannot be
Review comment:
```suggestion
* @return A future that will be completed successfully when the driver
is terminated, or exceptionally if it cannot be
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]