use a scheduled executor for request timeout
Project: http://git-wip-us.apache.org/repos/asf/mina/repo Commit: http://git-wip-us.apache.org/repos/asf/mina/commit/50d27d9e Tree: http://git-wip-us.apache.org/repos/asf/mina/tree/50d27d9e Diff: http://git-wip-us.apache.org/repos/asf/mina/diff/50d27d9e Branch: refs/heads/trunk Commit: 50d27d9e60838e9a4906ac797305e88247470f5c Parents: a3a8bcc Author: jvermillard <[email protected]> Authored: Tue Jun 18 21:52:47 2013 +0200 Committer: jvermillard <[email protected]> Committed: Tue Jun 18 21:52:47 2013 +0200 ---------------------------------------------------------------------- .../apache/mina/filter/query/RequestFilter.java | 37 +++++--------------- .../apache/mina/filter/query/RequestFuture.java | 30 +++++++++++----- 2 files changed, 30 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mina/blob/50d27d9e/core/src/main/java/org/apache/mina/filter/query/RequestFilter.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/mina/filter/query/RequestFilter.java b/core/src/main/java/org/apache/mina/filter/query/RequestFilter.java index 5539b89..77f8827 100644 --- a/core/src/main/java/org/apache/mina/filter/query/RequestFilter.java +++ b/core/src/main/java/org/apache/mina/filter/query/RequestFilter.java @@ -21,6 +21,9 @@ package org.apache.mina.filter.query; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import org.apache.mina.api.AbstractIoFilter; import org.apache.mina.api.IoFuture; @@ -63,13 +66,15 @@ public class RequestFilter<REQUEST extends Request, RESPONSE extends Response> e * @param session the session where to write the request * @param request the request to be issued * @param timeoutInMs the timeout in milli-seconds (doesn't work Work-in-progress). - * @return + * @return the {@link IoFuture} for waiting or listening the completion of this request. */ @SuppressWarnings({ "rawtypes", "unchecked" }) public IoFuture<RESPONSE> request(IoSession session, REQUEST request, long timeoutInMs) { Map inFlight = session.getAttribute(IN_FLIGHT_REQUESTS); - IoFuture<RESPONSE> future = new RequestFuture<REQUEST, RESPONSE>(session, System.currentTimeMillis() - + timeoutInMs, request.requestId()); + RequestFuture<REQUEST, RESPONSE> future = new RequestFuture<REQUEST, RESPONSE>(session, request.requestId()); + + // schedule a timeout task + future.setTimeoutFuture(schedExec.schedule(future.timeout, timeoutInMs, TimeUnit.MILLISECONDS)); // save the future for completion inFlight.put(request.requestId(), future); @@ -80,8 +85,7 @@ public class RequestFilter<REQUEST extends Request, RESPONSE extends Response> e @SuppressWarnings("rawtypes") static final AttributeKey<Map> IN_FLIGHT_REQUESTS = new AttributeKey<Map>(Map.class, "request.in.flight"); - // last time we checked the timeouts - private long lastTimeoutCheck = 0; + private ScheduledExecutorService schedExec = Executors.newScheduledThreadPool(1); @SuppressWarnings("rawtypes") @Override @@ -104,32 +108,9 @@ public class RequestFilter<REQUEST extends Request, RESPONSE extends Response> e } } - // // check for timeout - // long now = System.currentTimeMillis(); - // if (lastTimeoutCheck + 1000 < now) { - // lastTimeoutCheck = now; - // Map<?, ?> inFlight = session.getAttribute(IN_FLIGHT_REQUESTS); - // for (Object v : inFlight.values()) { - // ((RequestFuture<?, ?>) v).timeoutIfNeeded(now); - // } - // } - // trigger the next filter super.messageReceived(session, message, controller); } - @Override - public void messageSent(IoSession session, Object message) { - // check for timeout - // long now = System.currentTimeMillis(); - // if (lastTimeoutCheck + 1000 < now) { - // lastTimeoutCheck = now; - // Map<?, ?> inFlight = session.getAttribute(IN_FLIGHT_REQUESTS); - // for (Object v : inFlight.values()) { - // ((RequestFuture<?, ?>) v).timeoutIfNeeded(now); - // } - // } - } - /** * {@inheritDoc} cancel remaining requests */ http://git-wip-us.apache.org/repos/asf/mina/blob/50d27d9e/core/src/main/java/org/apache/mina/filter/query/RequestFuture.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/mina/filter/query/RequestFuture.java b/core/src/main/java/org/apache/mina/filter/query/RequestFuture.java index de8564f..7b15443 100644 --- a/core/src/main/java/org/apache/mina/filter/query/RequestFuture.java +++ b/core/src/main/java/org/apache/mina/filter/query/RequestFuture.java @@ -20,6 +20,7 @@ package org.apache.mina.filter.query; import java.util.Map; +import java.util.concurrent.ScheduledFuture; import org.apache.mina.api.IoSession; import org.apache.mina.util.AbstractIoFuture; @@ -36,13 +37,12 @@ class RequestFuture<REQUEST extends Request, RESPONSE extends Response> extends private final IoSession session; - private final long timeout; - private final Object id; - public RequestFuture(IoSession session, long timeout, Object id) { + private ScheduledFuture<?> schedFuture; + + public RequestFuture(IoSession session, Object id) { this.session = session; - this.timeout = timeout; this.id = id; } @@ -52,15 +52,27 @@ class RequestFuture<REQUEST extends Request, RESPONSE extends Response> extends } void complete(RESPONSE response) { + if (schedFuture != null) { + schedFuture.cancel(true); + } setResult(response); } - @SuppressWarnings("rawtypes") - void timeoutIfNeeded(long time) { - if (timeout < time) { + void setTimeoutFuture(ScheduledFuture<?> schedFuture) { + this.schedFuture = schedFuture; + } + + Runnable timeout = new Runnable() { + + @SuppressWarnings("rawtypes") + @Override + public void run() { Map inFlight = session.getAttribute(RequestFilter.IN_FLIGHT_REQUESTS); - inFlight.remove(id); + if (inFlight != null) { + inFlight.remove(id); + } setException(new RequestTimeoutException()); + } - } + }; } \ No newline at end of file
