[ 
https://issues.apache.org/jira/browse/FLINK-10411?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16627385#comment-16627385
 ] 

ASF GitHub Bot commented on FLINK-10411:
----------------------------------------

StefanRRichter commented on a change in pull request #6743: [FLINK-10411] Make 
ClusterEntrypoint more compositional
URL: https://github.com/apache/flink/pull/6743#discussion_r220128418
 
 

 ##########
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterComponent.java
 ##########
 @@ -0,0 +1,344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.entrypoint;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ConfigurationUtils;
+import org.apache.flink.configuration.WebOptions;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.dispatcher.ArchivedExecutionGraphStore;
+import org.apache.flink.runtime.dispatcher.Dispatcher;
+import org.apache.flink.runtime.dispatcher.DispatcherFactory;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.dispatcher.DispatcherId;
+import org.apache.flink.runtime.dispatcher.HistoryServerArchivist;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.metrics.util.MetricUtils;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerFactory;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
+import org.apache.flink.runtime.rest.RestEndpointFactory;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint;
+import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
+import 
org.apache.flink.runtime.webmonitor.retriever.impl.AkkaQueryServiceRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever;
+import org.apache.flink.util.AutoCloseableAsync;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.Preconditions;
+
+import akka.actor.ActorSystem;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Component which starts a {@link Dispatcher}, {@link ResourceManager} and 
{@link WebMonitorEndpoint}
+ * in the same process.
+ */
+public class ClusterComponent<T extends Dispatcher> implements 
AutoCloseableAsync {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(ClusterComponent.class);
+
+       private final Object lock = new Object();
+
+       private final DispatcherFactory<T> dispatcherFactory;
+
+       private final ResourceManagerFactory<?> resourceManagerFactory;
+
+       private final RestEndpointFactory<?> restEndpointFactory;
+
+       private final CompletableFuture<Void> terminationFuture;
+
+       private final CompletableFuture<ApplicationStatus> shutDownFuture;
+
+       @GuardedBy("lock")
+       private State state;
+
+       @GuardedBy("lock")
+       private ResourceManager<?> resourceManager;
+
+       @GuardedBy("lock")
+       private T dispatcher;
+
+       @GuardedBy("lock")
+       private LeaderRetrievalService dispatcherLeaderRetrievalService;
+
+       @GuardedBy("lock")
+       private LeaderRetrievalService resourceManagerRetrievalService;
+
+       @GuardedBy("lock")
+       private WebMonitorEndpoint<?> webMonitorEndpoint;
+
+       @GuardedBy("lock")
+       private JobManagerMetricGroup jobManagerMetricGroup;
+
+       public ClusterComponent(DispatcherFactory<T> dispatcherFactory, 
ResourceManagerFactory<?> resourceManagerFactory, RestEndpointFactory<?> 
restEndpointFactory) {
 
 Review comment:
   linebreaks.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Make ClusterEntrypoint more modular
> -----------------------------------
>
>                 Key: FLINK-10411
>                 URL: https://issues.apache.org/jira/browse/FLINK-10411
>             Project: Flink
>          Issue Type: Improvement
>          Components: Distributed Coordination
>    Affects Versions: 1.7.0
>            Reporter: Till Rohrmann
>            Assignee: Till Rohrmann
>            Priority: Minor
>              Labels: pull-request-available
>             Fix For: 1.7.0
>
>
> Currently, the {{ClusterEntrypoint}} is not very modular in the sense that it 
> cannot be really used for testing purposes (e.g. starting a {{Dispatcher}} 
> with a {{WebMonitorRestEndpoint}}). The problem is that the 
> {{ClusterEntrypoint}} combines too many responsibilities (creating the 
> cluster services, starting the cluster components and deciding on when to 
> terminate the JVM process).
> I suggest to make the structure more compositional, meaning to split up the 
> service generation from the cluster component start up. That way we could 
> also remove code duplication between the different {{ClusterEntrypoint}} 
> implementations.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to