XComp commented on a change in pull request #13313:
URL: https://github.com/apache/flink/pull/13313#discussion_r488604849



##########
File path: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/AbstractMesosServices.java
##########
@@ -41,15 +47,32 @@ protected AbstractMesosServices(ActorSystem actorSystem, 
MesosArtifactServer art
        }
 
        @Override
-       public ActorSystem getLocalActorSystem() {
-               return actorSystem;
+       public MesosResourceManagerActorFactory 
getMesosResourceManagerActorFactory() {
+               return new MesosResourceManagerActorFactoryImpl(actorSystem);
        }
 
        @Override
        public MesosArtifactServer getArtifactServer() {
                return artifactServer;
        }
 
+       @Override
+       public SchedulerDriver createMesosSchedulerDriver(
+                       MesosConfiguration mesosConfig, Scheduler scheduler, 
boolean implicitAcknowledgements) {
+               MesosSchedulerDriver schedulerDriver;
+               if (mesosConfig.credential().isDefined()) {
+                       schedulerDriver =
+                                       new MesosSchedulerDriver(scheduler, 
mesosConfig.frameworkInfo().build(), mesosConfig.masterUrl(),
+                                                       
implicitAcknowledgements, mesosConfig.credential().get().build());

Review comment:
       ```suggestion
                        schedulerDriver = new MesosSchedulerDriver(
                                scheduler,
                                mesosConfig.frameworkInfo().build(),
                                mesosConfig.masterUrl(),
                                implicitAcknowledgements,
                                mesosConfig.credential().get().build());
   ```
   
   I'd suggest reformatting these lines for readability purposes.

##########
File path: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerDriver.java
##########
@@ -739,172 +591,123 @@ static ResourceID extractResourceID(Protos.TaskID 
taskId) {
                }
        }
 
-       /**
-        * Creates the Fenzo optimizer (builder).
-        * The builder is an indirection to facilitate unit testing of the 
Launch Coordinator.
-        */
-       private static TaskSchedulerBuilder createOptimizer() {
-               return new TaskSchedulerBuilder() {
-                       TaskScheduler.Builder builder = new 
TaskScheduler.Builder();
-
-                       @Override
-                       public TaskSchedulerBuilder 
withLeaseRejectAction(Action1<VirtualMachineLease> action) {
-                               builder.withLeaseRejectAction(action);
-                               return this;
-                       }
-
-                       @Override
-                       public TaskSchedulerBuilder 
withRejectAllExpiredOffers() {
-                               builder.withRejectAllExpiredOffers();
-                               return this;
-                       }
+       private static String extractTerminatedDiagnostics(ResourceID id, 
Protos.TaskStatus status) {
+               return String.format("Worker %s terminated with status: %s, 
reason: %s, message: %s.",
+                               id, status.getState(), status.getReason(), 
status.getMessage());
+       }
 
-                       @Override
-                       public TaskSchedulerBuilder 
withLeaseOfferExpirySecs(long leaseOfferExpirySecs) {
-                               
builder.withLeaseOfferExpirySecs(leaseOfferExpirySecs);
-                               return this;
-                       }
+       private static TaskSchedulerBuilder createOptimizer() {
+               return new TaskSchedulerBuilderImpl();
+       }
 
-                       @Override
-                       public TaskScheduler build() {
-                               return builder.build();
-                       }
-               };
+       @VisibleForTesting
+       void assertStateCleared() {
+               assert(workersInNew.isEmpty());
+               assert(requestResourceFutures.isEmpty());

Review comment:
       Would it make sense to also check that the actors are cleared (i.e. 
`MesosResourceManagerDriver::stopSupportingActorsAsync` was performed properly)?

##########
File path: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/MesosServices.java
##########
@@ -36,28 +38,34 @@
         * mode.
         *
         * @param configuration to be used
-        * @param executor to run asynchronous tasks
         * @return a mesos worker store
         * @throws Exception if the mesos worker store could not be created
         */
        MesosWorkerStore createMesosWorkerStore(
-               Configuration configuration,
-               Executor executor) throws Exception;
+               Configuration configuration) throws Exception;
 
        /**
-        * Gets a local {@link ActorSystem} which is used for child actors 
within
-        * {@link 
org.apache.flink.mesos.runtime.clusterframework.MesosResourceManager}.
-        *
-        * @return a reference to an actor system.
+        * Gets a {@link MesosResourceManagerActorFactory} which creates child 
actors within
+        * {@link 
org.apache.flink.mesos.runtime.clusterframework.MesosResourceManagerDriver} in 
a local {@link ActorSystem}.
+        * @return the factory.
         */
-       ActorSystem getLocalActorSystem();
+       MesosResourceManagerActorFactory getMesosResourceManagerActorFactory();

Review comment:
       Shouldn't we call that one something like 
`createMesosResourceManagerActoryFactory`? ...since it's instantiating a new 
factory.

##########
File path: 
flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/services/TestingMesosServices.java
##########
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework.services;
+
+import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.mesos.runtime.clusterframework.actors.MesosResourceManagerActorFactory;
+import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
+import org.apache.flink.mesos.util.MesosArtifactServer;
+import org.apache.flink.mesos.util.MesosConfiguration;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.TriFunction;
+
+import org.apache.mesos.Scheduler;
+import org.apache.mesos.SchedulerDriver;
+
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+/**
+ * Testing implementation of {@link MesosServices}.
+ */
+public class TestingMesosServices implements MesosServices{

Review comment:
       Tiny thing: there's a space missing between `MesosServices` and the 
opening curly bracket.

##########
File path: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/AbstractMesosServices.java
##########
@@ -41,15 +47,32 @@ protected AbstractMesosServices(ActorSystem actorSystem, 
MesosArtifactServer art
        }
 
        @Override
-       public ActorSystem getLocalActorSystem() {
-               return actorSystem;
+       public MesosResourceManagerActorFactory 
getMesosResourceManagerActorFactory() {
+               return new MesosResourceManagerActorFactoryImpl(actorSystem);
        }
 
        @Override
        public MesosArtifactServer getArtifactServer() {
                return artifactServer;
        }
 
+       @Override
+       public SchedulerDriver createMesosSchedulerDriver(
+                       MesosConfiguration mesosConfig, Scheduler scheduler, 
boolean implicitAcknowledgements) {
+               MesosSchedulerDriver schedulerDriver;
+               if (mesosConfig.credential().isDefined()) {
+                       schedulerDriver =
+                                       new MesosSchedulerDriver(scheduler, 
mesosConfig.frameworkInfo().build(), mesosConfig.masterUrl(),
+                                                       
implicitAcknowledgements, mesosConfig.credential().get().build());
+               }
+               else {
+                       schedulerDriver =
+                                       new MesosSchedulerDriver(scheduler, 
mesosConfig.frameworkInfo().build(), mesosConfig.masterUrl(),
+                                                       
implicitAcknowledgements);

Review comment:
       ```suggestion
                        schedulerDriver = new MesosSchedulerDriver(
                                scheduler, 
                                mesosConfig.frameworkInfo().build(), 
                                mesosConfig.masterUrl(),
                                implicitAcknowledgements);
   ```
   Same here, since it's a long line with dots and commas.

##########
File path: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/AbstractMesosServices.java
##########
@@ -41,15 +47,32 @@ protected AbstractMesosServices(ActorSystem actorSystem, 
MesosArtifactServer art
        }
 
        @Override
-       public ActorSystem getLocalActorSystem() {
-               return actorSystem;
+       public MesosResourceManagerActorFactory 
getMesosResourceManagerActorFactory() {
+               return new MesosResourceManagerActorFactoryImpl(actorSystem);
        }
 
        @Override
        public MesosArtifactServer getArtifactServer() {
                return artifactServer;
        }
 
+       @Override
+       public SchedulerDriver createMesosSchedulerDriver(
+                       MesosConfiguration mesosConfig, Scheduler scheduler, 
boolean implicitAcknowledgements) {
+               MesosSchedulerDriver schedulerDriver;

Review comment:
       We don't need this variable as we can return the newly instantiated 
`MesosSchedulerDriver`s right away.

##########
File path: 
flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerDriverTest.java
##########
@@ -0,0 +1,361 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework;
+
+import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.mesos.runtime.clusterframework.actors.MesosResourceManagerActorFactory;
+import 
org.apache.flink.mesos.runtime.clusterframework.actors.MesosResourceManagerActorFactoryImpl;
+import org.apache.flink.mesos.runtime.clusterframework.services.MesosServices;
+import 
org.apache.flink.mesos.runtime.clusterframework.services.TestingMesosServices;
+import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.TestingMesosWorkerStore;
+import org.apache.flink.mesos.scheduler.ConnectionMonitor;
+import org.apache.flink.mesos.scheduler.LaunchCoordinator;
+import org.apache.flink.mesos.scheduler.LaunchableTask;
+import org.apache.flink.mesos.scheduler.TaskMonitor;
+import org.apache.flink.mesos.scheduler.TaskSchedulerBuilder;
+import org.apache.flink.mesos.scheduler.TestingSchedulerDriver;
+import org.apache.flink.mesos.scheduler.messages.AcceptOffers;
+import org.apache.flink.mesos.util.MesosArtifactServer;
+import org.apache.flink.mesos.util.MesosConfiguration;
+import org.apache.flink.mesos.util.TestingMesosArtifactServer;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.ContainerSpecification;
+import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.resourcemanager.active.ResourceManagerDriver;
+import 
org.apache.flink.runtime.resourcemanager.active.ResourceManagerDriverTestBase;
+import org.apache.flink.util.Preconditions;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.japi.JavaPartialFunction;
+import akka.testkit.TestActor;
+import akka.testkit.TestProbe;
+import org.apache.mesos.Protos;
+import org.apache.mesos.SchedulerDriver;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import scala.Option;
+import scala.concurrent.duration.Duration;
+
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link MesosResourceManagerDriver}.
+ */
+public class MesosResourceManagerDriverTest extends 
ResourceManagerDriverTestBase<RegisteredMesosWorkerNode> {
+
+       private static final Duration TIMEOUT_DURATION = 
Duration.create(TIMEOUT_SEC, TimeUnit.SECONDS);
+
+       private static final Protos.SlaveID SLAVE_ID = 
Protos.SlaveID.newBuilder().setValue("slave-id").build();
+       private static final String SLAVE_HOST = "slave-host";
+
+       @Test
+       public void testAcceptOffers() throws Exception {

Review comment:
       Out of curiosity: What's the difference between 
`MesosResourceManagerDriverTest::testAcceptOffers` and 
`ResourceManagerDriverTestBase::testRequestResource`. Based on the code it 
looks like both are doing the same thing. 🤔 
   
   Just trying to answer my questions myself: Is 
`MesosResourceManagerDriverTest::testAcceptOffers` testing whether 
`requestResource` is coming back successfully whereas 
`ResourceManagerDriverTestBase::testRequestResource` tests whether we reach the 
Mesos backend via `requestResource`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to