[ 
https://issues.apache.org/jira/browse/EAGLE-435?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15416507#comment-15416507
 ] 

ASF GitHub Bot commented on EAGLE-435:
--------------------------------------

Github user RalphSu commented on a diff in the pull request:

    https://github.com/apache/incubator-eagle/pull/322#discussion_r74366625
  
    --- Diff: 
eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/Coordinator.java
 ---
    @@ -98,28 +103,58 @@ public Coordinator(Config config, ConfigBusProducer 
producer, IMetadataServiceCl
         }
     
         public synchronized ScheduleState schedule(ScheduleOption option) {
    -        Stopwatch watch = Stopwatch.createStarted();
    -        IScheduleContext context = new 
ScheduleContextBuilder(client).buildContext();
    -        TopologyMgmtService mgmtService = new TopologyMgmtService();
    -        IPolicyScheduler scheduler = 
PolicySchedulerFactory.createScheduler();
    -
    -        scheduler.init(context, mgmtService);
    -        ScheduleState state = scheduler.schedule(option);
    -
    -        long scheduleTime = watch.elapsed(TimeUnit.MILLISECONDS);
    -        state.setScheduleTimeMillis((int) scheduleTime);// hardcode to 
integer
    -        watch.reset();
    -        watch.start();
    -
    -        // persist & notify
    -        postSchedule(client, state, producer);
    -
    -        watch.stop();
    -        long postTime = watch.elapsed(TimeUnit.MILLISECONDS);
    -        LOG.info("Schedule result, schedule time {} ms, post schedule time 
{} ms !", scheduleTime, postTime);
    -
    -        currentState = state;
    -        return state;
    +           ScheduleZkState scheduleZkState = new ScheduleZkState();
    +           ExclusiveExecutor.Runnable exclusiveRunnable = new 
ExclusiveExecutor.Runnable() {
    +                   @Override
    +                   public void run() throws Exception {
    +                           scheduleZkState.scheduleAcquired = true;
    +                           
    +                           while (!scheduleZkState.scheduleCompleted) {
    +                                   
Thread.sleep(ACQUIRE_LOCK_WAIT_INTERVAL_MS);
    +                           }
    +                   }
    +           };
    +           ExclusiveExecutor.execute(GREEDY_SCHEDULER_ZK_PATH, 
exclusiveRunnable);
    +           int waitMaxTimes = 0;
    +           while (waitMaxTimes < ACQUIRE_LOCK_MAX_RETRIES_TIMES) { //about 
3 minutes waiting
    +                   if (!scheduleZkState.scheduleAcquired) {
    +                           waitMaxTimes ++;
    +                           try {
    +                                   
Thread.sleep(ACQUIRE_LOCK_WAIT_INTERVAL_MS);
    +                           } catch (InterruptedException e) {}
    +                           continue;
    +                   }
    +                   
    +                   ScheduleState state = null;
    +                   try {
    +                           Stopwatch watch = Stopwatch.createStarted();
    +                   IScheduleContext context = new 
ScheduleContextBuilder(client).buildContext();
    +                   TopologyMgmtService mgmtService = new 
TopologyMgmtService();
    +                   IPolicyScheduler scheduler = 
PolicySchedulerFactory.createScheduler();
    +           
    +                   scheduler.init(context, mgmtService);
    +                   state = scheduler.schedule(option);
    +                   
    +                   long scheduleTime = 
watch.elapsed(TimeUnit.MILLISECONDS);
    +                   state.setScheduleTimeMillis((int) scheduleTime);// 
hardcode to integer
    +                   watch.reset();
    +                   watch.start();
    +           
    +                   // persist & notify
    +                   postSchedule(client, state, producer);
    +           
    +                   watch.stop();
    +                   long postTime = watch.elapsed(TimeUnit.MILLISECONDS);
    +                   LOG.info("Schedule result, schedule time {} ms, post 
schedule time {} ms !", scheduleTime, postTime);
    +           
    +                   currentState = state;
    +                   } finally {
    +                           //schedule completed
    +                           scheduleZkState.scheduleCompleted = true;
    +                   }
    +           return state;
    +           }
    +           throw new LockWebApplicationException("Acquire greedy scheduler 
lock failed, please retry later");
    --- End diff --
    
    Just the "scheduler lock" is enough?


> Coordiantor schedule operation must be exclusive in distributed deployment
> --------------------------------------------------------------------------
>
>                 Key: EAGLE-435
>                 URL: https://issues.apache.org/jira/browse/EAGLE-435
>             Project: Eagle
>          Issue Type: Improvement
>    Affects Versions: v0.5.0
>            Reporter: Garrett Li
>            Assignee: Garrett Li
>              Labels: alert_engine
>             Fix For: v0.5.0
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to