[
https://issues.apache.org/jira/browse/EAGLE-435?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15416507#comment-15416507
]
ASF GitHub Bot commented on EAGLE-435:
--------------------------------------
Github user RalphSu commented on a diff in the pull request:
https://github.com/apache/incubator-eagle/pull/322#discussion_r74366625
--- Diff:
eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/Coordinator.java
---
@@ -98,28 +103,58 @@ public Coordinator(Config config, ConfigBusProducer
producer, IMetadataServiceCl
}
public synchronized ScheduleState schedule(ScheduleOption option) {
- Stopwatch watch = Stopwatch.createStarted();
- IScheduleContext context = new
ScheduleContextBuilder(client).buildContext();
- TopologyMgmtService mgmtService = new TopologyMgmtService();
- IPolicyScheduler scheduler =
PolicySchedulerFactory.createScheduler();
-
- scheduler.init(context, mgmtService);
- ScheduleState state = scheduler.schedule(option);
-
- long scheduleTime = watch.elapsed(TimeUnit.MILLISECONDS);
- state.setScheduleTimeMillis((int) scheduleTime);// hardcode to
integer
- watch.reset();
- watch.start();
-
- // persist & notify
- postSchedule(client, state, producer);
-
- watch.stop();
- long postTime = watch.elapsed(TimeUnit.MILLISECONDS);
- LOG.info("Schedule result, schedule time {} ms, post schedule time
{} ms !", scheduleTime, postTime);
-
- currentState = state;
- return state;
+ ScheduleZkState scheduleZkState = new ScheduleZkState();
+ ExclusiveExecutor.Runnable exclusiveRunnable = new
ExclusiveExecutor.Runnable() {
+ @Override
+ public void run() throws Exception {
+ scheduleZkState.scheduleAcquired = true;
+
+ while (!scheduleZkState.scheduleCompleted) {
+
Thread.sleep(ACQUIRE_LOCK_WAIT_INTERVAL_MS);
+ }
+ }
+ };
+ ExclusiveExecutor.execute(GREEDY_SCHEDULER_ZK_PATH,
exclusiveRunnable);
+ int waitMaxTimes = 0;
+ while (waitMaxTimes < ACQUIRE_LOCK_MAX_RETRIES_TIMES) { //about
3 minutes waiting
+ if (!scheduleZkState.scheduleAcquired) {
+ waitMaxTimes ++;
+ try {
+
Thread.sleep(ACQUIRE_LOCK_WAIT_INTERVAL_MS);
+ } catch (InterruptedException e) {}
+ continue;
+ }
+
+ ScheduleState state = null;
+ try {
+ Stopwatch watch = Stopwatch.createStarted();
+ IScheduleContext context = new
ScheduleContextBuilder(client).buildContext();
+ TopologyMgmtService mgmtService = new
TopologyMgmtService();
+ IPolicyScheduler scheduler =
PolicySchedulerFactory.createScheduler();
+
+ scheduler.init(context, mgmtService);
+ state = scheduler.schedule(option);
+
+ long scheduleTime =
watch.elapsed(TimeUnit.MILLISECONDS);
+ state.setScheduleTimeMillis((int) scheduleTime);//
hardcode to integer
+ watch.reset();
+ watch.start();
+
+ // persist & notify
+ postSchedule(client, state, producer);
+
+ watch.stop();
+ long postTime = watch.elapsed(TimeUnit.MILLISECONDS);
+ LOG.info("Schedule result, schedule time {} ms, post
schedule time {} ms !", scheduleTime, postTime);
+
+ currentState = state;
+ } finally {
+ //schedule completed
+ scheduleZkState.scheduleCompleted = true;
+ }
+ return state;
+ }
+ throw new LockWebApplicationException("Acquire greedy scheduler
lock failed, please retry later");
--- End diff --
Just the "scheduler lock" is enough?
> Coordiantor schedule operation must be exclusive in distributed deployment
> --------------------------------------------------------------------------
>
> Key: EAGLE-435
> URL: https://issues.apache.org/jira/browse/EAGLE-435
> Project: Eagle
> Issue Type: Improvement
> Affects Versions: v0.5.0
> Reporter: Garrett Li
> Assignee: Garrett Li
> Labels: alert_engine
> Fix For: v0.5.0
>
> Original Estimate: 24h
> Remaining Estimate: 24h
>
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)