XComp commented on a change in pull request #13313:
URL: https://github.com/apache/flink/pull/13313#discussion_r488604849
##########
File path:
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/AbstractMesosServices.java
##########
@@ -41,15 +47,32 @@ protected AbstractMesosServices(ActorSystem actorSystem,
MesosArtifactServer art
}
@Override
- public ActorSystem getLocalActorSystem() {
- return actorSystem;
+ public MesosResourceManagerActorFactory
getMesosResourceManagerActorFactory() {
+ return new MesosResourceManagerActorFactoryImpl(actorSystem);
}
@Override
public MesosArtifactServer getArtifactServer() {
return artifactServer;
}
+ @Override
+ public SchedulerDriver createMesosSchedulerDriver(
+ MesosConfiguration mesosConfig, Scheduler scheduler,
boolean implicitAcknowledgements) {
+ MesosSchedulerDriver schedulerDriver;
+ if (mesosConfig.credential().isDefined()) {
+ schedulerDriver =
+ new MesosSchedulerDriver(scheduler,
mesosConfig.frameworkInfo().build(), mesosConfig.masterUrl(),
+
implicitAcknowledgements, mesosConfig.credential().get().build());
Review comment:
```suggestion
schedulerDriver = new MesosSchedulerDriver(
scheduler,
mesosConfig.frameworkInfo().build(),
mesosConfig.masterUrl(),
implicitAcknowledgements,
mesosConfig.credential().get().build());
```
I'd suggest reformatting these lines for readability purposes.
##########
File path:
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerDriver.java
##########
@@ -739,172 +591,123 @@ static ResourceID extractResourceID(Protos.TaskID
taskId) {
}
}
- /**
- * Creates the Fenzo optimizer (builder).
- * The builder is an indirection to facilitate unit testing of the
Launch Coordinator.
- */
- private static TaskSchedulerBuilder createOptimizer() {
- return new TaskSchedulerBuilder() {
- TaskScheduler.Builder builder = new
TaskScheduler.Builder();
-
- @Override
- public TaskSchedulerBuilder
withLeaseRejectAction(Action1<VirtualMachineLease> action) {
- builder.withLeaseRejectAction(action);
- return this;
- }
-
- @Override
- public TaskSchedulerBuilder
withRejectAllExpiredOffers() {
- builder.withRejectAllExpiredOffers();
- return this;
- }
+ private static String extractTerminatedDiagnostics(ResourceID id,
Protos.TaskStatus status) {
+ return String.format("Worker %s terminated with status: %s,
reason: %s, message: %s.",
+ id, status.getState(), status.getReason(),
status.getMessage());
+ }
- @Override
- public TaskSchedulerBuilder
withLeaseOfferExpirySecs(long leaseOfferExpirySecs) {
-
builder.withLeaseOfferExpirySecs(leaseOfferExpirySecs);
- return this;
- }
+ private static TaskSchedulerBuilder createOptimizer() {
+ return new TaskSchedulerBuilderImpl();
+ }
- @Override
- public TaskScheduler build() {
- return builder.build();
- }
- };
+ @VisibleForTesting
+ void assertStateCleared() {
+ assert(workersInNew.isEmpty());
+ assert(requestResourceFutures.isEmpty());
Review comment:
Would it make sense to also check that the actors are cleared (i.e.
`MesosResourceManagerDriver::stopSupportingActorsAsync` was performed properly)?
##########
File path:
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/MesosServices.java
##########
@@ -36,28 +38,34 @@
* mode.
*
* @param configuration to be used
- * @param executor to run asynchronous tasks
* @return a mesos worker store
* @throws Exception if the mesos worker store could not be created
*/
MesosWorkerStore createMesosWorkerStore(
- Configuration configuration,
- Executor executor) throws Exception;
+ Configuration configuration) throws Exception;
/**
- * Gets a local {@link ActorSystem} which is used for child actors
within
- * {@link
org.apache.flink.mesos.runtime.clusterframework.MesosResourceManager}.
- *
- * @return a reference to an actor system.
+ * Gets a {@link MesosResourceManagerActorFactory} which creates child
actors within
+ * {@link
org.apache.flink.mesos.runtime.clusterframework.MesosResourceManagerDriver} in
a local {@link ActorSystem}.
+ * @return the factory.
*/
- ActorSystem getLocalActorSystem();
+ MesosResourceManagerActorFactory getMesosResourceManagerActorFactory();
Review comment:
Shouldn't we call that one something like
`createMesosResourceManagerActoryFactory`? ...since it's instantiating a new
factory.
##########
File path:
flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/services/TestingMesosServices.java
##########
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework.services;
+
+import org.apache.flink.configuration.Configuration;
+import
org.apache.flink.mesos.runtime.clusterframework.actors.MesosResourceManagerActorFactory;
+import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
+import org.apache.flink.mesos.util.MesosArtifactServer;
+import org.apache.flink.mesos.util.MesosConfiguration;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.TriFunction;
+
+import org.apache.mesos.Scheduler;
+import org.apache.mesos.SchedulerDriver;
+
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+/**
+ * Testing implementation of {@link MesosServices}.
+ */
+public class TestingMesosServices implements MesosServices{
Review comment:
Tiny thing: there's a space missing between `MesosServices` and the
opening curly bracket.
##########
File path:
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/AbstractMesosServices.java
##########
@@ -41,15 +47,32 @@ protected AbstractMesosServices(ActorSystem actorSystem,
MesosArtifactServer art
}
@Override
- public ActorSystem getLocalActorSystem() {
- return actorSystem;
+ public MesosResourceManagerActorFactory
getMesosResourceManagerActorFactory() {
+ return new MesosResourceManagerActorFactoryImpl(actorSystem);
}
@Override
public MesosArtifactServer getArtifactServer() {
return artifactServer;
}
+ @Override
+ public SchedulerDriver createMesosSchedulerDriver(
+ MesosConfiguration mesosConfig, Scheduler scheduler,
boolean implicitAcknowledgements) {
+ MesosSchedulerDriver schedulerDriver;
+ if (mesosConfig.credential().isDefined()) {
+ schedulerDriver =
+ new MesosSchedulerDriver(scheduler,
mesosConfig.frameworkInfo().build(), mesosConfig.masterUrl(),
+
implicitAcknowledgements, mesosConfig.credential().get().build());
+ }
+ else {
+ schedulerDriver =
+ new MesosSchedulerDriver(scheduler,
mesosConfig.frameworkInfo().build(), mesosConfig.masterUrl(),
+
implicitAcknowledgements);
Review comment:
```suggestion
schedulerDriver = new MesosSchedulerDriver(
scheduler,
mesosConfig.frameworkInfo().build(),
mesosConfig.masterUrl(),
implicitAcknowledgements);
```
Same here, since it's a long line with dots and commas.
##########
File path:
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/AbstractMesosServices.java
##########
@@ -41,15 +47,32 @@ protected AbstractMesosServices(ActorSystem actorSystem,
MesosArtifactServer art
}
@Override
- public ActorSystem getLocalActorSystem() {
- return actorSystem;
+ public MesosResourceManagerActorFactory
getMesosResourceManagerActorFactory() {
+ return new MesosResourceManagerActorFactoryImpl(actorSystem);
}
@Override
public MesosArtifactServer getArtifactServer() {
return artifactServer;
}
+ @Override
+ public SchedulerDriver createMesosSchedulerDriver(
+ MesosConfiguration mesosConfig, Scheduler scheduler,
boolean implicitAcknowledgements) {
+ MesosSchedulerDriver schedulerDriver;
Review comment:
We don't need this variable as we can return the newly instantiated
`MesosSchedulerDriver`s right away.
##########
File path:
flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerDriverTest.java
##########
@@ -0,0 +1,361 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework;
+
+import org.apache.flink.configuration.Configuration;
+import
org.apache.flink.mesos.runtime.clusterframework.actors.MesosResourceManagerActorFactory;
+import
org.apache.flink.mesos.runtime.clusterframework.actors.MesosResourceManagerActorFactoryImpl;
+import org.apache.flink.mesos.runtime.clusterframework.services.MesosServices;
+import
org.apache.flink.mesos.runtime.clusterframework.services.TestingMesosServices;
+import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
+import
org.apache.flink.mesos.runtime.clusterframework.store.TestingMesosWorkerStore;
+import org.apache.flink.mesos.scheduler.ConnectionMonitor;
+import org.apache.flink.mesos.scheduler.LaunchCoordinator;
+import org.apache.flink.mesos.scheduler.LaunchableTask;
+import org.apache.flink.mesos.scheduler.TaskMonitor;
+import org.apache.flink.mesos.scheduler.TaskSchedulerBuilder;
+import org.apache.flink.mesos.scheduler.TestingSchedulerDriver;
+import org.apache.flink.mesos.scheduler.messages.AcceptOffers;
+import org.apache.flink.mesos.util.MesosArtifactServer;
+import org.apache.flink.mesos.util.MesosConfiguration;
+import org.apache.flink.mesos.util.TestingMesosArtifactServer;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.ContainerSpecification;
+import
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.resourcemanager.active.ResourceManagerDriver;
+import
org.apache.flink.runtime.resourcemanager.active.ResourceManagerDriverTestBase;
+import org.apache.flink.util.Preconditions;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.japi.JavaPartialFunction;
+import akka.testkit.TestActor;
+import akka.testkit.TestProbe;
+import org.apache.mesos.Protos;
+import org.apache.mesos.SchedulerDriver;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import scala.Option;
+import scala.concurrent.duration.Duration;
+
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link MesosResourceManagerDriver}.
+ */
+public class MesosResourceManagerDriverTest extends
ResourceManagerDriverTestBase<RegisteredMesosWorkerNode> {
+
+ private static final Duration TIMEOUT_DURATION =
Duration.create(TIMEOUT_SEC, TimeUnit.SECONDS);
+
+ private static final Protos.SlaveID SLAVE_ID =
Protos.SlaveID.newBuilder().setValue("slave-id").build();
+ private static final String SLAVE_HOST = "slave-host";
+
+ @Test
+ public void testAcceptOffers() throws Exception {
Review comment:
Out of curiosity: What's the difference between
`MesosResourceManagerDriverTest::testAcceptOffers` and
`ResourceManagerDriverTestBase::testRequestResource`. Based on the code it
looks like both are doing the same thing. 🤔
Just trying to answer my questions myself: Is
`MesosResourceManagerDriverTest::testAcceptOffers` testing whether
`requestResource` is coming back successfully whereas
`ResourceManagerDriverTestBase::testRequestResource` tests whether we reach the
Mesos backend via `requestResource`?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]