Yair Zaslavsky has uploaded a new change for review.

Change subject: core: introducing scheduling framework
......................................................................

core: introducing scheduling framework

Introducing scheudling framework.
Motivation:
a. Provide mechanism for waiting for completion of long flows
b. Separating polling from scheduling

Change-Id: I07253e8465776482bbdade21d78e9b40eeaadbb5
Signed-off-by: Yair Zaslavsky <[email protected]>
---
A 
backend/manager/modules/taskmgr/src/main/java/org/ovirt/engine/core/taskmgr/scheduler/OperationScheduler.java
A 
backend/manager/modules/taskmgr/src/main/java/org/ovirt/engine/core/taskmgr/scheduler/SimpleOperationScheduler.java
A 
backend/manager/modules/taskmgr/src/main/java/org/ovirt/engine/core/taskmgr/scheduler/WaitedFuture.java
A 
backend/manager/modules/taskmgr/src/main/java/org/ovirt/engine/core/taskmgr/scheduler/WaitedFutureMixin.java
4 files changed, 197 insertions(+), 0 deletions(-)


  git pull ssh://gerrit.ovirt.org:29418/ovirt-engine refs/changes/95/13795/1

diff --git 
a/backend/manager/modules/taskmgr/src/main/java/org/ovirt/engine/core/taskmgr/scheduler/OperationScheduler.java
 
b/backend/manager/modules/taskmgr/src/main/java/org/ovirt/engine/core/taskmgr/scheduler/OperationScheduler.java
new file mode 100644
index 0000000..c86f463
--- /dev/null
+++ 
b/backend/manager/modules/taskmgr/src/main/java/org/ovirt/engine/core/taskmgr/scheduler/OperationScheduler.java
@@ -0,0 +1,12 @@
+package org.ovirt.engine.core.taskmgr.scheduler;
+
+public interface OperationScheduler {
+
+    public interface Callback<V> {
+
+        public void call(WaitedFuture<V> task);
+    }
+
+    public <T> void registerTask(WaitedFuture<T> task, Callback<T> callback,
+            int timeoutSec);
+}
diff --git 
a/backend/manager/modules/taskmgr/src/main/java/org/ovirt/engine/core/taskmgr/scheduler/SimpleOperationScheduler.java
 
b/backend/manager/modules/taskmgr/src/main/java/org/ovirt/engine/core/taskmgr/scheduler/SimpleOperationScheduler.java
new file mode 100644
index 0000000..caf5112
--- /dev/null
+++ 
b/backend/manager/modules/taskmgr/src/main/java/org/ovirt/engine/core/taskmgr/scheduler/SimpleOperationScheduler.java
@@ -0,0 +1,99 @@
+package org.ovirt.engine.core.taskmgr.scheduler;
+
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.concurrent.Executor;
+
+public final class SimpleOperationScheduler implements Runnable, 
OperationScheduler {
+
+    private class OperationEntry<T> {
+
+        final private Calendar _timeout;
+        final private WaitedFuture<T> _task;
+        final private Callback<T> _callback;
+
+        public OperationEntry(WaitedFuture<T> task, Callback<T> callback,
+                Calendar timeout) {
+            _task = task;
+            _callback = callback;
+            _timeout = timeout;
+        }
+
+        public boolean isDone() {
+            return _task.isDone();
+        }
+
+        public void invoke() {
+            _callback.call(_task);
+        }
+
+        public boolean isExpired() {
+            return Calendar.getInstance().after(_timeout);
+        }
+
+        public int expiresInSec() {
+            return _timeout.compareTo(Calendar.getInstance()) / 1000;
+        }
+    }
+    final private ArrayList<OperationEntry<?>> _tasks;
+    final private Executor _executor;
+
+    public SimpleOperationScheduler(Executor executor) {
+        _tasks = new ArrayList<OperationEntry<?>>();
+        _executor = executor;
+    }
+
+    public <T> void registerTask(WaitedFuture<T> task, Callback<T> callback,
+            int timeoutSec) {
+        task.registerWaitObject(this);
+        synchronized (_tasks) {
+            Calendar timeout = Calendar.getInstance();
+            timeout.add(Calendar.SECOND, timeoutSec);
+            _tasks.add(new OperationEntry<T>(task, callback, timeout));
+        }
+        /* Do a run in case the task finished before we registered
+         * for notifications.
+         */
+        synchronized (this) {
+            this.notify();
+        }
+
+    }
+
+    public void run() {
+        long closestExpirationSec = Integer.MAX_VALUE;
+        while (true) {
+            if (closestExpirationSec > 0) {
+                try {
+                    synchronized (this) {
+                        this.wait(closestExpirationSec * 1000);
+                    }
+                } catch (InterruptedException e) {
+                    // ignore
+                }
+            }
+
+            closestExpirationSec = Integer.MAX_VALUE;
+
+            synchronized (_tasks) {
+                for (int i = 0; i < _tasks.size(); i++) {
+                    final OperationEntry<?> task = _tasks.get(i);
+                    if (task.isExpired() || task.isDone()) {
+                        _tasks.remove(i);
+                        i--;
+                        _executor.execute(new Runnable() {
+                            public void run() {
+                                task.invoke();
+                            }
+                        });
+                    } else {
+                        int exp = task.expiresInSec();
+                        if (exp < closestExpirationSec) {
+                            closestExpirationSec = exp;
+                        }
+                    }
+                }
+            }
+        }
+    }
+}
diff --git 
a/backend/manager/modules/taskmgr/src/main/java/org/ovirt/engine/core/taskmgr/scheduler/WaitedFuture.java
 
b/backend/manager/modules/taskmgr/src/main/java/org/ovirt/engine/core/taskmgr/scheduler/WaitedFuture.java
new file mode 100644
index 0000000..49373a0
--- /dev/null
+++ 
b/backend/manager/modules/taskmgr/src/main/java/org/ovirt/engine/core/taskmgr/scheduler/WaitedFuture.java
@@ -0,0 +1,8 @@
+package org.ovirt.engine.core.taskmgr.scheduler;
+
+import java.util.concurrent.Future;
+
+public interface WaitedFuture<V> extends Future<V> {
+
+    public void registerWaitObject(Object object);
+}
diff --git 
a/backend/manager/modules/taskmgr/src/main/java/org/ovirt/engine/core/taskmgr/scheduler/WaitedFutureMixin.java
 
b/backend/manager/modules/taskmgr/src/main/java/org/ovirt/engine/core/taskmgr/scheduler/WaitedFutureMixin.java
new file mode 100644
index 0000000..d097289
--- /dev/null
+++ 
b/backend/manager/modules/taskmgr/src/main/java/org/ovirt/engine/core/taskmgr/scheduler/WaitedFutureMixin.java
@@ -0,0 +1,78 @@
+package org.ovirt.engine.core.taskmgr.scheduler;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public final class WaitedFutureMixin<V> implements WaitedFuture<V> {
+
+    public interface Parent {
+
+        public boolean cancel(boolean mayInterruptIfRunning);
+    }
+    final private CountDownLatch _flag;
+    private boolean _cancelled;
+    private boolean _done;
+    private V _result;
+    private Object _waitObj;
+    final private Parent _parent;
+
+    public WaitedFutureMixin(Parent parent) {
+        _flag = new CountDownLatch(1);
+        _cancelled = false;
+        _done = false;
+        _result = null;
+        _waitObj = null;
+        _parent = parent;
+    }
+
+    public boolean cancel(boolean mayInterruptIfRunning) {
+        synchronized (this) {
+            if (!_cancelled) {
+                _cancelled = _parent.cancel(mayInterruptIfRunning);
+            }
+
+            return _cancelled;
+        }
+    }
+
+    public V get() throws InterruptedException, ExecutionException {
+        _flag.await();
+        return _result;
+    }
+
+    public V get(long timeout, TimeUnit unit) throws InterruptedException,
+            ExecutionException, TimeoutException {
+        _flag.await(timeout, unit);
+        return _result;
+    }
+
+    public void setResult(V result) {
+        assert (isDone() == false);
+        _result = result;
+        _flag.countDown();
+        _done = true;
+        synchronized (this) {
+            if (_waitObj != null) {
+                synchronized (_waitObj) {
+                    _waitObj.notify();
+                }
+            }
+        }
+    }
+
+    public boolean isCancelled() {
+        return _cancelled;
+    }
+
+    public boolean isDone() {
+        return (_done || _cancelled);
+    }
+
+    public void registerWaitObject(Object object) {
+        synchronized (this) {
+            _waitObj = object;
+        }
+    }
+}


--
To view, visit http://gerrit.ovirt.org/13795
To unsubscribe, visit http://gerrit.ovirt.org/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I07253e8465776482bbdade21d78e9b40eeaadbb5
Gerrit-PatchSet: 1
Gerrit-Project: ovirt-engine
Gerrit-Branch: master
Gerrit-Owner: Yair Zaslavsky <[email protected]>
_______________________________________________
Engine-patches mailing list
[email protected]
http://lists.ovirt.org/mailman/listinfo/engine-patches

Reply via email to