Michael Kublin has posted comments on this change. Change subject: core: introducing scheduling framework ......................................................................
Patch Set 1: I would prefer that you didn't submit this (21 inline comments) This patch is look like another java.util.concurrent package. Also, maybe instead of writing it will take a look on http://code.google.com/p/guava-libraries/ project, maybe they already wrote what we need? .................................................... File backend/manager/modules/taskmgr/src/main/java/org/ovirt/engine/core/taskmgr/scheduler/SimpleOperationScheduler.java Line 6: Line 7: public final class SimpleOperationScheduler implements Runnable, OperationScheduler { Line 8: Line 9: private class OperationEntry<T> { Line 10: The Calendar is not efficient package. Calendar has poor performance, use System. currenttimemillis() Line 11: final private Calendar _timeout; Line 12: final private WaitedFuture<T> _task; Line 13: final private Callback<T> _callback; Line 14: Line 36: } Line 37: } Line 38: final private ArrayList<OperationEntry<?>> _tasks; Line 39: final private Executor _executor; Line 40: Why you passing executor here? We have a central thread pool in the system Line 41: public SimpleOperationScheduler(Executor executor) { Line 42: _tasks = new ArrayList<OperationEntry<?>>(); Line 43: _executor = executor; Line 44: } Line 47: int timeoutSec) { Line 48: task.registerWaitObject(this); Line 49: synchronized (_tasks) { Line 50: Calendar timeout = Calendar.getInstance(); Line 51: timeout.add(Calendar.SECOND, timeoutSec); In Java 1.5 were introduce java.util.concurrent package it has a big number of collections, I am pretty sure that one of them will be suitable here: for example DelayQueue or PriorityBlockingQueue Line 52: _tasks.add(new OperationEntry<T>(task, callback, timeout)); Line 53: } Line 54: /* Do a run in case the task finished before we registered Line 55: * for notifications. Line 52: _tasks.add(new OperationEntry<T>(task, callback, timeout)); Line 53: } Line 54: /* Do a run in case the task finished before we registered Line 55: * for notifications. Line 56: */ This code is useless. You can send notify before another thread acquired a lock, so it will not help Line 57: synchronized (this) { Line 58: this.notify(); Line 59: } Line 60: Line 63: public void run() { Line 64: long closestExpirationSec = Integer.MAX_VALUE; Line 65: while (true) { Line 66: if (closestExpirationSec > 0) { Line 67: try { Why not use Lock, Lock.newCondition ? Line 68: synchronized (this) { Line 69: this.wait(closestExpirationSec * 1000); Line 70: } Line 71: } catch (InterruptedException e) { Line 64: long closestExpirationSec = Integer.MAX_VALUE; Line 65: while (true) { Line 66: if (closestExpirationSec > 0) { Line 67: try { Line 68: synchronized (this) { TimeUnit? Line 69: this.wait(closestExpirationSec * 1000); Line 70: } Line 71: } catch (InterruptedException e) { Line 72: // ignore Line 71: } catch (InterruptedException e) { Line 72: // ignore Line 73: } Line 74: } Line 75: no need for this line here Line 76: closestExpirationSec = Integer.MAX_VALUE; Line 77: Line 78: synchronized (_tasks) { Line 79: for (int i = 0; i < _tasks.size(); i++) { Line 73: } Line 74: } Line 75: Line 76: closestExpirationSec = Integer.MAX_VALUE; Line 77: Instead of all this code: ExecutorService.invokeAll() , ExecutorService.invokeAny(), ExecutorService.invoke() -> Get future ? Line 78: synchronized (_tasks) { Line 79: for (int i = 0; i < _tasks.size(); i++) { Line 80: final OperationEntry<?> task = _tasks.get(i); Line 81: if (task.isExpired() || task.isDone()) { Line 76: closestExpirationSec = Integer.MAX_VALUE; Line 77: Line 78: synchronized (_tasks) { Line 79: for (int i = 0; i < _tasks.size(); i++) { Line 80: final OperationEntry<?> task = _tasks.get(i); Not exactly understood this code, if task is expired or done why I will run it? Line 81: if (task.isExpired() || task.isDone()) { Line 82: _tasks.remove(i); Line 83: i--; Line 84: _executor.execute(new Runnable() { Line 77: Line 78: synchronized (_tasks) { Line 79: for (int i = 0; i < _tasks.size(); i++) { Line 80: final OperationEntry<?> task = _tasks.get(i); Line 81: if (task.isExpired() || task.isDone()) { why remove it? is not efficient. will cause to system.arraycopy. A more efficient way should be used - actually should be used queue as data structure Line 82: _tasks.remove(i); Line 83: i--; Line 84: _executor.execute(new Runnable() { Line 85: public void run() { .................................................... File backend/manager/modules/taskmgr/src/main/java/org/ovirt/engine/core/taskmgr/scheduler/WaitedFutureMixin.java Line 3: import java.util.concurrent.CountDownLatch; Line 4: import java.util.concurrent.ExecutionException; Line 5: import java.util.concurrent.TimeUnit; Line 6: import java.util.concurrent.TimeoutException; Line 7: This class is look likes a copy of Future class from java.util.concurrent . Why we are writing some code that already written inside java? Line 8: public final class WaitedFutureMixin<V> implements WaitedFuture<V> { Line 9: Line 10: public interface Parent { Line 11: Line 25: _result = null; Line 26: _waitObj = null; Line 27: _parent = parent; Line 28: } Line 29: @Override? Line 30: public boolean cancel(boolean mayInterruptIfRunning) { Line 31: synchronized (this) { Line 32: if (!_cancelled) { Line 33: _cancelled = _parent.cancel(mayInterruptIfRunning); Line 28: } Line 29: Line 30: public boolean cancel(boolean mayInterruptIfRunning) { Line 31: synchronized (this) { Line 32: if (!_cancelled) { You know, that cancel is not promising anything. Read javadoc: Attempts to cancel execution of this task. Line 33: _cancelled = _parent.cancel(mayInterruptIfRunning); Line 34: } Line 35: Line 36: return _cancelled; Line 35: Line 36: return _cancelled; Line 37: } Line 38: } Line 39: @Override ? Line 40: public V get() throws InterruptedException, ExecutionException { Line 41: _flag.await(); Line 42: return _result; Line 43: } Line 40: public V get() throws InterruptedException, ExecutionException { Line 41: _flag.await(); Line 42: return _result; Line 43: } Line 44: This is wrong implementation of API: Javadoc: Waits if necessary for at most the given time for the computation to complete, and then retrieves its result, if available. Parameters: timeout - the maximum time to wait unit - the time unit of the timeout argument Returns: the computed result Throws: CancellationException - if the computation was cancelled ExecutionException - if the computation threw an exception InterruptedException - if the current thread was interrupted while waiting TimeoutException - if the wait timed out Line 45: public V get(long timeout, TimeUnit unit) throws InterruptedException, Line 46: ExecutionException, TimeoutException { Line 47: _flag.await(timeout, unit); Line 48: return _result; Line 46: ExecutionException, TimeoutException { Line 47: _flag.await(timeout, unit); Line 48: return _result; Line 49: } Line 50: This code is look like try to make settable future. I think we have better solution over internet Line 51: public void setResult(V result) { Line 52: assert (isDone() == false); Line 53: _result = result; Line 54: _flag.countDown(); Line 47: _flag.await(timeout, unit); Line 48: return _result; Line 49: } Line 50: Line 51: public void setResult(V result) { This line is wrong due happen-before actions. Line 52: assert (isDone() == false); Line 53: _result = result; Line 54: _flag.countDown(); Line 55: _done = true; Line 54: _flag.countDown(); Line 55: _done = true; Line 56: synchronized (this) { Line 57: if (_waitObj != null) { Line 58: synchronized (_waitObj) { why notify, no wait is used. Signal is expensive Line 59: _waitObj.notify(); Line 60: } Line 61: } Line 62: } Line 60: } Line 61: } Line 62: } Line 63: } Line 64: @Override ? happen-before? Line 65: public boolean isCancelled() { Line 66: return _cancelled; Line 67: } Line 68: Line 64: Line 65: public boolean isCancelled() { Line 66: return _cancelled; Line 67: } Line 68: same here? Line 69: public boolean isDone() { Line 70: return (_done || _cancelled); Line 71: } Line 72: Line 70: return (_done || _cancelled); Line 71: } Line 72: Line 73: public void registerWaitObject(Object object) { Line 74: synchronized (this) { AtomicReference ? Line 75: _waitObj = object; Line 76: } Line 77: } -- To view, visit http://gerrit.ovirt.org/13795 To unsubscribe, visit http://gerrit.ovirt.org/settings Gerrit-MessageType: comment Gerrit-Change-Id: I07253e8465776482bbdade21d78e9b40eeaadbb5 Gerrit-PatchSet: 1 Gerrit-Project: ovirt-engine Gerrit-Branch: master Gerrit-Owner: Yair Zaslavsky <[email protected]> Gerrit-Reviewer: Michael Kublin <[email protected]> Gerrit-Reviewer: Ravi Nori <[email protected]> Gerrit-Reviewer: Saggi Mizrahi <[email protected]> Gerrit-Reviewer: Yaniv Bronhaim <[email protected]> _______________________________________________ Engine-patches mailing list [email protected] http://lists.ovirt.org/mailman/listinfo/engine-patches
