Yitian-Zhang commented on issue #2952: SchedulerStateManagerAdaptor failed to 
fetch data from Zookeeper path in Heron Cluster
URL: 
https://github.com/apache/incubator-heron/issues/2952#issuecomment-403124959
 
 
   @nwangtw  Thanks for your commit. Let's me introduce my question in detail. 
First, I created a CustomScheduler that have been deployed on Heron. In the 
CustomScheduler, besides the main thread that heron runs, I created a new 
thread after created job by Heron. The new created Thread is responsible for 
running my programs. This problem is happening in the thread I created.
   Second, I can submit topologies and activate them normally using the 
CustomScheduler.  So that means my CustomScheduler is deployed correctly and 
the main thread is right.
   For your curious, my submit commands is:
   `heron submit aurora/yitian/devel --config-path ~/.heron/conf 
~/aurora-topolgoies/heron-with-dependencies.jar 
zyt.custom.topology.aurora.SentenceWordCountTopology SentenceWordCountTopology 
--deploy-deactivated --verbose`
   As for this problem happened in the new thread that I created as above 
mentioned. In this new thread, I wanted to update a topology when it was 
running by using UpdateTopologyManager with a new PackingPlan. I want to make 
the effect of this method just like using the update command to update the 
topology. So I attempted to create ISchedulerClient and new runtime Config to 
update the topology by invoking ISchedulerClient.updateTopology function. Then 
this problem happened. 
   I sorry for I couldn't give more information about the WARNING, because it 
is the only information I can find out. But here is my code:
   ```
   public void doSchedule(PackingPlan packingPlan) {
           String stateMgrClass = Context.stateManagerClass(this.config); // 
get state manager instance
           IStateManager stateMgr = null;
           try {
               stateMgr = ReflectionUtils.newInstance(stateMgrClass);
               FileUtils.writeToFile(filename, "Create IStateManager object 
success...");
           } catch (ClassNotFoundException | InstantiationException | 
IllegalAccessException e) {
               e.printStackTrace();
           }
   
           try {
               stateMgr.initialize(this.config);
               SchedulerStateManagerAdaptor stateManagerAdaptor = new 
SchedulerStateManagerAdaptor(stateMgr, 5000);
                        
               // Then created the new packingplan. It is omitted here.
               PackingPlans.PackingPlan currentPackingPlan = 
serializer.toProto(packingPlan);
               PackingPlans.PackingPlan proposedPackingPlan = 
serializer.toProto(newPackingPlan);
   
               // build updatetopologyrequest object to update topogolgy
               Scheduler.UpdateTopologyRequest updateTopologyRequest =
                       Scheduler.UpdateTopologyRequest.newBuilder()
                               .setCurrentPackingPlan(currentPackingPlan)
                               .setProposedPackingPlan(proposedPackingPlan)
                               .build();
   
               // create runtime config using statemanageradaptor, this adaptor 
not included topology information
               // just add topologyname and adaptor to build schedulerCLient 
object
               Config primaryRuntime = 
LauncherUtils.getInstance().createPrimaryRuntime(topology);
               Config newRuntime = Config.newBuilder()
                       .putAll(primaryRuntime)
                       .put(Key.TOPOLOGY_NAME, Context.topologyName(config))
                       .put(Key.SCHEDULER_STATE_MANAGER_ADAPTOR, 
stateManagerAdaptor)
                       .build();
              
               // Create a ISchedulerClient basing on the config
               ISchedulerClient schedulerClient = 
getSchedulerClient(newRuntime);
   
                        // In fact, I can't get the physicalplan right here by 
testing. I don't know why.
                        // TopologyAPI.Topology topology3 = 
stateManagerAdaptor.getPhysicalPlan(topologyName).getTopology(); 
   
               if (!schedulerClient.updateTopology(updateTopologyRequest)) {
                   throw new TopologyRuntimeManagementException(String.format(
                           "Failed to update " + topology.getName() + " with 
Scheduler, updateTopologyRequest="
                                   + updateTopologyRequest));
               }
                        
           } finally {
               // close zookeeper client connnection
               SysUtils.closeIgnoringExceptions(stateMgr);
           }
       }
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to