nlu90 commented on a change in pull request #2979: [healthmgr] refactor 
physicalplanprovider and topologyprovider
URL: https://github.com/apache/incubator-heron/pull/2979#discussion_r209443062
 
 

 ##########
 File path: 
heron/healthmgr/src/java/org/apache/heron/healthmgr/common/PhysicalPlanProvider.java
 ##########
 @@ -19,70 +19,129 @@
 
 package org.apache.heron.healthmgr.common;
 
+import java.net.HttpURLConnection;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.Collection;
 import java.util.logging.Logger;
 
 import javax.inject.Inject;
 import javax.inject.Named;
 import javax.inject.Provider;
 
-import com.microsoft.dhalion.events.EventHandler;
-import com.microsoft.dhalion.events.EventManager;
-
-import org.apache.heron.healthmgr.common.HealthManagerEvents.ContainerRestart;
-import org.apache.heron.healthmgr.common.HealthManagerEvents.TopologyUpdate;
+import org.apache.heron.api.generated.TopologyAPI;
 import org.apache.heron.proto.system.PhysicalPlans.PhysicalPlan;
+import org.apache.heron.proto.tmaster.TopologyMaster;
 import org.apache.heron.spi.statemgr.SchedulerStateManagerAdaptor;
+import org.apache.heron.spi.utils.NetworkUtils;
 
 import static org.apache.heron.healthmgr.HealthPolicyConfig.CONF_TOPOLOGY_NAME;
 
 /**
- * A topology's physical plan may get updated after initial deployment. This 
provider is used to
- * fetch the latest version from the state manager and provide to any 
dependent components.
+ * A topology's physical plan may get updated at runtime. This provider is 
used to
+ * fetch the latest version from the tmaster and provide to any dependent 
components.
  */
 public class PhysicalPlanProvider implements Provider<PhysicalPlan> {
   private static final Logger LOG = 
Logger.getLogger(PhysicalPlanProvider.class.getName());
 
   private final SchedulerStateManagerAdaptor stateManagerAdaptor;
   private final String topologyName;
 
-  private PhysicalPlan physicalPlan;
+  private PhysicalPlan cachedPhysicalPlan = null;
 
   @Inject
   public PhysicalPlanProvider(SchedulerStateManagerAdaptor stateManagerAdaptor,
-      EventManager eventManager, @Named(CONF_TOPOLOGY_NAME) String 
topologyName) {
+                              @Named(CONF_TOPOLOGY_NAME) String topologyName) {
     this.stateManagerAdaptor = stateManagerAdaptor;
     this.topologyName = topologyName;
-    eventManager.addEventListener(TopologyUpdate.class, new 
EventHandler<TopologyUpdate>() {
-      /**
-       * Invalidates cached physical plan on receiving topology update 
notification
-       */
-      @Override
-      public synchronized void onEvent(TopologyUpdate event) {
-        LOG.info(
-            "Received topology update event, invalidating cached PhysicalPlan: 
" + event.type());
-        physicalPlan = null;
-      }
-    });
-    eventManager.addEventListener(ContainerRestart.class, new 
EventHandler<ContainerRestart>() {
-      /**
-       * Invalidates cached physical plan on receiving container restart 
notification
-       */
-      @Override
-      public synchronized void onEvent(ContainerRestart event) {
-        LOG.info("Received container restart event, invalidating cached 
PhysicalPlan: "
-            + event.type());
-        physicalPlan = null;
-      }
-    });
   }
 
   @Override
   public synchronized PhysicalPlan get() {
-    physicalPlan = stateManagerAdaptor.getPhysicalPlan(topologyName);
-    if (physicalPlan == null) {
+    TopologyMaster.TMasterLocation tMasterLocation
+        = stateManagerAdaptor.getTMasterLocation(topologyName);
+    String host = tMasterLocation.getHost();
+    int port = tMasterLocation.getControllerPort();
+
+    // construct metric cache stat url
+    String url = "http://"; + host + ":" + port + "/get_current_physical_plan";
+    LOG.fine("tmaster physical plan query endpoint: " + url);
+
+    // http communication
+    HttpURLConnection con = NetworkUtils.getHttpConnection(url);
+    NetworkUtils.sendHttpGetRequest(con);
+    byte[] responseData = NetworkUtils.readHttpResponse(con);
+    // byte to base64 string
+    String encodedString = new String(responseData);
+    LOG.fine("tmaster returns physical plan in base64 str: " + encodedString);
+    // base64 string to proto bytes
+    byte[] decodedBytes = Base64.getDecoder().decode(encodedString);
+    // construct proto obj from bytes
+    PhysicalPlan physicalPlan = null;
+    try {
+      physicalPlan = PhysicalPlan.parseFrom(decodedBytes);
+    } catch (Exception e) {
       throw new InvalidStateException(topologyName, "Failed to fetch the 
physical plan");
     }
+
+    cachedPhysicalPlan = physicalPlan;
     return physicalPlan;
   }
 
+  public PhysicalPlan getCachedPhysicalPlan() {
+    try {
+      get();
+    } catch (InvalidStateException e) {
+      if (cachedPhysicalPlan == null) {
+        throw e;
+      }
+    }
+    return cachedPhysicalPlan;
+  }
+
+  /**
+   * A utility method to extract bolt component names from the topology.
+   *
+   * @return array of all bolt names
+   */
+  protected Collection<String> getBoltNames(PhysicalPlan pp) {
+    TopologyAPI.Topology localTopology = pp.getTopology();
+    ArrayList<String> boltNames = new ArrayList<>();
+    for (TopologyAPI.Bolt bolt : localTopology.getBoltsList()) {
+      boltNames.add(bolt.getComp().getName());
+    }
+
+    return boltNames;
+  }
+  public Collection<String> getBoltNames() {
+    getCachedPhysicalPlan();
+    return getBoltNames(cachedPhysicalPlan);
+  }
+
+  /**
+   * A utility method to extract spout component names from the topology.
+   *
+   * @return array of all spout names
+   */
+  protected Collection<String> getSpoutNames(PhysicalPlan pp) {
+    TopologyAPI.Topology localTopology = pp.getTopology();
+    ArrayList<String> spoutNames = new ArrayList<>();
+    for (TopologyAPI.Spout spout : localTopology.getSpoutsList()) {
+      spoutNames.add(spout.getComp().getName());
+    }
+
+    return spoutNames;
+  }
+  public Collection<String> getSpoutNames() {
+    getCachedPhysicalPlan();
+    return getSpoutNames(cachedPhysicalPlan);
+  }
+
+  public Collection<String> getSpoutBoltNames() {
 
 Review comment:
   call it `getComponentNames` should be good enough. One general question is 
why do we need such a method?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to