Yair Zaslavsky has uploaded a new change for review. Change subject: core: introducing scheduling framework ......................................................................
core: introducing scheduling framework Introducing scheudling framework. Motivation: a. Provide mechanism for waiting for completion of long flows b. Separating polling from scheduling Change-Id: I07253e8465776482bbdade21d78e9b40eeaadbb5 Signed-off-by: Yair Zaslavsky <[email protected]> --- A backend/manager/modules/taskmgr/src/main/java/org/ovirt/engine/core/taskmgr/scheduler/OperationScheduler.java A backend/manager/modules/taskmgr/src/main/java/org/ovirt/engine/core/taskmgr/scheduler/SimpleOperationScheduler.java A backend/manager/modules/taskmgr/src/main/java/org/ovirt/engine/core/taskmgr/scheduler/WaitedFuture.java A backend/manager/modules/taskmgr/src/main/java/org/ovirt/engine/core/taskmgr/scheduler/WaitedFutureMixin.java 4 files changed, 197 insertions(+), 0 deletions(-) git pull ssh://gerrit.ovirt.org:29418/ovirt-engine refs/changes/95/13795/1 diff --git a/backend/manager/modules/taskmgr/src/main/java/org/ovirt/engine/core/taskmgr/scheduler/OperationScheduler.java b/backend/manager/modules/taskmgr/src/main/java/org/ovirt/engine/core/taskmgr/scheduler/OperationScheduler.java new file mode 100644 index 0000000..c86f463 --- /dev/null +++ b/backend/manager/modules/taskmgr/src/main/java/org/ovirt/engine/core/taskmgr/scheduler/OperationScheduler.java @@ -0,0 +1,12 @@ +package org.ovirt.engine.core.taskmgr.scheduler; + +public interface OperationScheduler { + + public interface Callback<V> { + + public void call(WaitedFuture<V> task); + } + + public <T> void registerTask(WaitedFuture<T> task, Callback<T> callback, + int timeoutSec); +} diff --git a/backend/manager/modules/taskmgr/src/main/java/org/ovirt/engine/core/taskmgr/scheduler/SimpleOperationScheduler.java b/backend/manager/modules/taskmgr/src/main/java/org/ovirt/engine/core/taskmgr/scheduler/SimpleOperationScheduler.java new file mode 100644 index 0000000..caf5112 --- /dev/null +++ b/backend/manager/modules/taskmgr/src/main/java/org/ovirt/engine/core/taskmgr/scheduler/SimpleOperationScheduler.java @@ -0,0 +1,99 @@ +package org.ovirt.engine.core.taskmgr.scheduler; + +import java.util.ArrayList; +import java.util.Calendar; +import java.util.concurrent.Executor; + +public final class SimpleOperationScheduler implements Runnable, OperationScheduler { + + private class OperationEntry<T> { + + final private Calendar _timeout; + final private WaitedFuture<T> _task; + final private Callback<T> _callback; + + public OperationEntry(WaitedFuture<T> task, Callback<T> callback, + Calendar timeout) { + _task = task; + _callback = callback; + _timeout = timeout; + } + + public boolean isDone() { + return _task.isDone(); + } + + public void invoke() { + _callback.call(_task); + } + + public boolean isExpired() { + return Calendar.getInstance().after(_timeout); + } + + public int expiresInSec() { + return _timeout.compareTo(Calendar.getInstance()) / 1000; + } + } + final private ArrayList<OperationEntry<?>> _tasks; + final private Executor _executor; + + public SimpleOperationScheduler(Executor executor) { + _tasks = new ArrayList<OperationEntry<?>>(); + _executor = executor; + } + + public <T> void registerTask(WaitedFuture<T> task, Callback<T> callback, + int timeoutSec) { + task.registerWaitObject(this); + synchronized (_tasks) { + Calendar timeout = Calendar.getInstance(); + timeout.add(Calendar.SECOND, timeoutSec); + _tasks.add(new OperationEntry<T>(task, callback, timeout)); + } + /* Do a run in case the task finished before we registered + * for notifications. + */ + synchronized (this) { + this.notify(); + } + + } + + public void run() { + long closestExpirationSec = Integer.MAX_VALUE; + while (true) { + if (closestExpirationSec > 0) { + try { + synchronized (this) { + this.wait(closestExpirationSec * 1000); + } + } catch (InterruptedException e) { + // ignore + } + } + + closestExpirationSec = Integer.MAX_VALUE; + + synchronized (_tasks) { + for (int i = 0; i < _tasks.size(); i++) { + final OperationEntry<?> task = _tasks.get(i); + if (task.isExpired() || task.isDone()) { + _tasks.remove(i); + i--; + _executor.execute(new Runnable() { + public void run() { + task.invoke(); + } + }); + } else { + int exp = task.expiresInSec(); + if (exp < closestExpirationSec) { + closestExpirationSec = exp; + } + } + } + } + } + } +} diff --git a/backend/manager/modules/taskmgr/src/main/java/org/ovirt/engine/core/taskmgr/scheduler/WaitedFuture.java b/backend/manager/modules/taskmgr/src/main/java/org/ovirt/engine/core/taskmgr/scheduler/WaitedFuture.java new file mode 100644 index 0000000..49373a0 --- /dev/null +++ b/backend/manager/modules/taskmgr/src/main/java/org/ovirt/engine/core/taskmgr/scheduler/WaitedFuture.java @@ -0,0 +1,8 @@ +package org.ovirt.engine.core.taskmgr.scheduler; + +import java.util.concurrent.Future; + +public interface WaitedFuture<V> extends Future<V> { + + public void registerWaitObject(Object object); +} diff --git a/backend/manager/modules/taskmgr/src/main/java/org/ovirt/engine/core/taskmgr/scheduler/WaitedFutureMixin.java b/backend/manager/modules/taskmgr/src/main/java/org/ovirt/engine/core/taskmgr/scheduler/WaitedFutureMixin.java new file mode 100644 index 0000000..d097289 --- /dev/null +++ b/backend/manager/modules/taskmgr/src/main/java/org/ovirt/engine/core/taskmgr/scheduler/WaitedFutureMixin.java @@ -0,0 +1,78 @@ +package org.ovirt.engine.core.taskmgr.scheduler; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +public final class WaitedFutureMixin<V> implements WaitedFuture<V> { + + public interface Parent { + + public boolean cancel(boolean mayInterruptIfRunning); + } + final private CountDownLatch _flag; + private boolean _cancelled; + private boolean _done; + private V _result; + private Object _waitObj; + final private Parent _parent; + + public WaitedFutureMixin(Parent parent) { + _flag = new CountDownLatch(1); + _cancelled = false; + _done = false; + _result = null; + _waitObj = null; + _parent = parent; + } + + public boolean cancel(boolean mayInterruptIfRunning) { + synchronized (this) { + if (!_cancelled) { + _cancelled = _parent.cancel(mayInterruptIfRunning); + } + + return _cancelled; + } + } + + public V get() throws InterruptedException, ExecutionException { + _flag.await(); + return _result; + } + + public V get(long timeout, TimeUnit unit) throws InterruptedException, + ExecutionException, TimeoutException { + _flag.await(timeout, unit); + return _result; + } + + public void setResult(V result) { + assert (isDone() == false); + _result = result; + _flag.countDown(); + _done = true; + synchronized (this) { + if (_waitObj != null) { + synchronized (_waitObj) { + _waitObj.notify(); + } + } + } + } + + public boolean isCancelled() { + return _cancelled; + } + + public boolean isDone() { + return (_done || _cancelled); + } + + public void registerWaitObject(Object object) { + synchronized (this) { + _waitObj = object; + } + } +} -- To view, visit http://gerrit.ovirt.org/13795 To unsubscribe, visit http://gerrit.ovirt.org/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I07253e8465776482bbdade21d78e9b40eeaadbb5 Gerrit-PatchSet: 1 Gerrit-Project: ovirt-engine Gerrit-Branch: master Gerrit-Owner: Yair Zaslavsky <[email protected]> _______________________________________________ Engine-patches mailing list [email protected] http://lists.ovirt.org/mailman/listinfo/engine-patches
