[
https://issues.apache.org/jira/browse/FLINK-8608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16361021#comment-16361021
]
ASF GitHub Bot commented on FLINK-8608:
---------------------------------------
Github user GJL commented on a diff in the pull request:
https://github.com/apache/flink/pull/5431#discussion_r167601541
--- Diff:
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
---
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.blob.VoidBlobStore;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobmanager.OnCompletionActions;
+import org.apache.flink.runtime.jobmaster.JobManagerRunner;
+import org.apache.flink.runtime.jobmaster.JobManagerServices;
+import
org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
+import
org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
+import
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+import org.apache.flink.testutils.category.Flip6;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Tests for the {@link MiniDispatcher}.
+ */
+@Category(Flip6.class)
+public class MiniDispatcherTest extends TestLogger {
+
+ private static final JobGraph jobGraph = new JobGraph();
+
+ private static final Time timeout = Time.seconds(10L);
+
+ @ClassRule
+ public static TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+ private static TestingRpcService rpcService;
+
+ private static Configuration configuration;
+
+ private static BlobServer blobServer;
+
+ private MiniDispatcher miniDispatcher;
+
+ private CompletableFuture<JobGraph> jobGraphFuture;
+
+ private TestingLeaderElectionService dispatcherLeaderElectionService;
+
+ @BeforeClass
+ public static void setupClass() throws IOException {
+ rpcService = new TestingRpcService();
+ configuration = new Configuration();
+
+ configuration.setString(BlobServerOptions.STORAGE_DIRECTORY,
temporaryFolder.newFolder().getAbsolutePath());
+
+ blobServer = new BlobServer(configuration, new VoidBlobStore());
+ }
+
+ @Before
+ public void setup() throws Exception {
+ dispatcherLeaderElectionService = new
TestingLeaderElectionService();
+ final TestingHighAvailabilityServices highAvailabilityServices
= new TestingHighAvailabilityServices();
+ final TestingResourceManagerGateway resourceManagerGateway =
new TestingResourceManagerGateway();
+ final HeartbeatServices heartbeatServices = new
HeartbeatServices(1000L, 1000L);
+ final ArchivedExecutionGraphStore archivedExecutionGraphStore =
new MemoryArchivedExecutionGraphStore();
+ final TestingFatalErrorHandler testingFatalErrorHandler = new
TestingFatalErrorHandler();
+
+
highAvailabilityServices.setDispatcherLeaderElectionService(dispatcherLeaderElectionService);
+
+ jobGraphFuture = new CompletableFuture<>();
+ final TestingJobManagerRunnerFactory
testingJobManagerRunnerFactory = new
TestingJobManagerRunnerFactory(jobGraphFuture);
+
+ miniDispatcher = new MiniDispatcher(
+ rpcService,
+ UUID.randomUUID().toString(),
+ configuration,
+ highAvailabilityServices,
+ resourceManagerGateway,
+ blobServer,
+ heartbeatServices,
+ NoOpMetricRegistry.INSTANCE,
+ archivedExecutionGraphStore,
+ testingJobManagerRunnerFactory,
+ testingFatalErrorHandler,
+ null,
+ jobGraph,
+ ClusterEntrypoint.ExecutionMode.DETACHED);
+
+ miniDispatcher.start();
+ }
+
+ @After
+ public void teardown() throws InterruptedException, ExecutionException,
TimeoutException {
+ if (miniDispatcher != null) {
+ RpcUtils.terminateRpcEndpoint(miniDispatcher, timeout);
+ miniDispatcher = null;
+ }
+ }
+
+ @AfterClass
+ public static void teardownClass() throws IOException {
+ blobServer.close();
+ rpcService.stopService();
+ }
+
+ /**
+ * Tests that the {@link MiniDispatcher} recovers the single job with
which it
+ * was started.
+ */
+ @Test
+ public void testSingleJobRecovery() throws Exception {
+ // wait until the Dispatcher is the leader
+
dispatcherLeaderElectionService.isLeader(UUID.randomUUID()).get();
+
+ final JobGraph actualJobGraph = jobGraphFuture.get();
+
+ assertThat(actualJobGraph.getJobID(), is(jobGraph.getJobID()));
+ }
+
+ /**
+ * Tests that in detached mode, the {@link MiniDispatcher} will
terminate after the job
+ * has completed.
+ */
+ @Test
+ public void testTerminationAfterJobCompletion() throws Exception {
--- End diff --
It seems that we are only testing the absence of exceptions here. There are
assertions.
> Add MiniDispatcher for job mode
> -------------------------------
>
> Key: FLINK-8608
> URL: https://issues.apache.org/jira/browse/FLINK-8608
> Project: Flink
> Issue Type: New Feature
> Components: Distributed Coordination
> Affects Versions: 1.5.0
> Reporter: Till Rohrmann
> Assignee: Till Rohrmann
> Priority: Major
> Labels: flip-6
> Fix For: 1.5.0
>
>
> In order to properly support the job mode, we need a {{MiniDispatcher}} which
> is started with a pre initialized {{JobGraph}} and launches a single
> {{JobManagerRunner}} with this job. Once the job is completed and if the
> {{MiniDispatcher}} is running in detached mode, the {{MiniDispatcher}} should
> terminate.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)