Repository: qpid-proton Updated Branches: refs/heads/proton-j-reactor bfb5dcd37 -> 06c451f0a
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0187017/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOHandler.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOHandler.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOHandler.java new file mode 100644 index 0000000..08aca1f --- /dev/null +++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOHandler.java @@ -0,0 +1,333 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.proton.reactor.impl; + +import java.io.IOException; +import java.net.Socket; +import java.nio.channels.SocketChannel; +import java.util.Iterator; + +import org.apache.qpid.proton.Proton; +import org.apache.qpid.proton.amqp.Symbol; +import org.apache.qpid.proton.amqp.transport.ErrorCondition; +import org.apache.qpid.proton.engine.BaseHandler; +import org.apache.qpid.proton.engine.Connection; +import org.apache.qpid.proton.engine.EndpointState; +import org.apache.qpid.proton.engine.Event; +import org.apache.qpid.proton.engine.Sasl; +import org.apache.qpid.proton.engine.Transport; +import org.apache.qpid.proton.engine.impl.TransportImpl; +import org.apache.qpid.proton.reactor.Reactor; +import org.apache.qpid.proton.reactor.Selectable; +import org.apache.qpid.proton.reactor.Selectable.Callback; +import org.apache.qpid.proton.reactor.Selector; + +public class IOHandler extends BaseHandler { + + // pni_handle_quiesced from connection.c + private void handleQuiesced(Reactor reactor, Selector selector) throws IOException { + // check if we are still quiesced, other handlers of + // PN_REACTOR_QUIESCED could have produced more events to process + if (!reactor.quiesced()) return; + selector.select(reactor.getTimeout()); + reactor.mark(); + Iterator<Selectable> selectables = selector.readable(); + while(selectables.hasNext()) { + selectables.next().readable(); + } + selectables = selector.writeable(); + while(selectables.hasNext()) { + selectables.next().writeable(); + } + selectables = selector.expired(); + while(selectables.hasNext()) { + selectables.next().expired(); + } + selectables = selector.error(); + while(selectables.hasNext()) { + selectables.next().error(); + } + reactor.yield(); + } + + // pni_handle_open(...) from connection.c + private void handleOpen(Event event) { + Connection connection = event.getConnection(); + if (connection.getRemoteState() != EndpointState.UNINITIALIZED) { + return; + } + Transport transport = Proton.transport(); + Sasl sasl = transport.sasl(); + sasl.setMechanisms("ANONYMOUS"); + transport.bind(connection); + } + + // pni_handle_bound(...) from connection.c + private void handleBound(Reactor reactor, Event event) { + Connection connection = event.getConnection(); + String hostname = connection.getHostname(); + if (hostname == null) { + return; + } + + int colonIndex = hostname.indexOf(':'); + int port = 5672; + if (colonIndex >= 0) { + port = Integer.parseInt(hostname.substring(colonIndex+1)); // TODO: this can throw NumberFormatException on malformed input! + hostname = hostname.substring(0, colonIndex); + } + + Transport transport = event.getTransport(); + Socket socket = null; // TODO: null is our equivalent of PN_INVALID_SOCKET + try { + socket = new Socket(hostname, port); + } catch(IOException ioException) { + ErrorCondition condition = transport.getCondition(); + condition.setCondition(Symbol.getSymbol("proton:io")); + condition.setDescription(ioException.getMessage()); + transport.close_tail(); + transport.close_head(); + } + selectableTransport(reactor, socket, transport); + } + + // pni_connection_capacity from connection.c + private int capacity(Selectable selectable) { + Transport transport = selectable.getTransport(); + int capacity = transport.capacity(); + if (capacity < 0) { + if (transport.isClosed()) { + selectable.terminate(); + } + } + return capacity; + } + + // pni_connection_pending from connection.c + private int pending(Selectable selectable) { + Transport transport = selectable.getTransport(); + int pending = transport.pending(); + if (pending < 0) { + if (transport.isClosed()) { + selectable.terminate(); + } + } + return pending; + } + + // pni_connection_deadline from connection.c + private long deadline(Selectable selectable) { + Reactor reactor = selectable.getReactor(); + Transport transport = selectable.getTransport(); + long deadline = transport.tick(reactor.now()); + return deadline; + } + + // pni_connection_update from connection.c + private void update(Selectable selectable) { + int c = capacity(selectable); + int p = pending(selectable); + selectable.setReading(c > 0); + selectable.setWriting(p > 0); + selectable.setDeadline(deadline(selectable)); + } + + // pni_connection_readable from connection.c + private class ConnectionReadable implements Callback { + @Override + public void run(Selectable selectable) { + Reactor reactor = selectable.getReactor(); + Transport transport = selectable.getTransport(); + int capacity = transport.capacity(); + if (capacity > 0) { + // TODO: we shouldn't be doing this cast. Instead - selectable should return an + // object with 1) a getter for the SelectableChannel, 2) read/write methods. + SocketChannel socketChannel = (SocketChannel)selectable.getChannel(); + try { + int n = socketChannel.read(transport.tail()); + if (n == -1) { + transport.close_tail(); + } else { + transport.process(); + } + } catch (IOException e) { + ErrorCondition condition = transport.getCondition(); + condition.setCondition(Symbol.getSymbol("proton:io")); + condition.setDescription(e.getMessage()); + transport.close_tail(); + } + } + // TODO: comment from C code... + // occasionally transport events aren't generated when expected, so + // the following hack ensures we always update the selector + update(selectable); + reactor.update(selectable); + } + } + + // pni_connection_writable from connection.c + private class ConnectionWritable implements Callback { + @Override + public void run(Selectable selectable) { + Reactor reactor = selectable.getReactor(); + Transport transport = selectable.getTransport(); + int pending = transport.pending(); + if (pending > 0) { + SocketChannel channel = (SocketChannel)selectable.getChannel(); // TODO: can't rely on this cast always working! + try { + int n = channel.write(transport.head()); + if (n < 0) { + transport.close_head(); + } else { + transport.pop(n); + } + } catch(IOException ioException) { + ErrorCondition condition = transport.getCondition(); + condition.setCondition(Symbol.getSymbol("proton:io")); + condition.setDescription(ioException.getMessage()); + transport.close_head(); + } + } + + int newPending = transport.pending(); + if (newPending != pending) { + update(selectable); + reactor.update(selectable); + } + } + } + + // pni_connection_error from connection.c + private class ConnectionError implements Callback { + @Override + public void run(Selectable selectable) { + Reactor reactor = selectable.getReactor(); + selectable.terminate(); + reactor.update(selectable); + } + + } + + // pni_connection_expired from connection.c + private class ConnectionExpired implements Callback { + @Override + public void run(Selectable selectable) { + Reactor reactor = selectable.getReactor(); + Transport transport = selectable.getTransport(); + long deadline = transport.tick(reactor.now()); + selectable.setDeadline(deadline); + int c = capacity(selectable); + int p = pending(selectable); + selectable.setReading(c > 0); + selectable.setWriting(p > 0); + reactor.update(selectable); + } + } + + private class ConnectionFinalize implements Callback { + @Override + public void run(Selectable selectable) { + try { + selectable.getChannel().close(); + } catch(IOException ioException) { + ioException.printStackTrace(); + // TODO: what now? + } + } + } + + // pn_reactor_selectable_transport + private Selectable selectableTransport(Reactor reactor, Socket socket, Transport transport) { + // TODO: this code needs to be able to deal with a null socket (this is our equivalent of PN_INVALID_SOCKET) + Selectable selectable = reactor.selectable(); + selectable.setChannel(socket.getChannel()); + selectable.onReadable(new ConnectionReadable()); + selectable.onWritable(new ConnectionWritable()); + selectable.onError(new ConnectionError()); + selectable.onExpired(new ConnectionExpired()); + selectable.onFinalize(new ConnectionFinalize()); // TODO: the corresponding selectable._finalize method is never called anywhere in the C codebase! + selectable.setTransport(transport); + ((TransportImpl)transport).setSelectable(selectable); + ((TransportImpl)transport).setReactor(reactor); + update(selectable); + reactor.update(selectable); + return selectable; + } + + private void handleTransport(Reactor reactor, Event event) { + TransportImpl transport = (TransportImpl)event.getTransport(); + Selectable selectable = transport.getSelectable(); + if (selectable != null && !selectable.isTerminal()) { + update(selectable); + reactor.update(selectable); + } + } + + @Override + public void onUnhandled(Event event) { + try { + ReactorImpl reactor = (ReactorImpl)event.getReactor(); + Selector selector = reactor.getSelector(); + if (selector == null) { + selector = new SelectorImpl(); // TODO: the C code supplies the reactor's pn_io object here... + reactor.setSelector(selector); + } + + Selectable selectable; + switch(event.getType()) { + case SELECTABLE_INIT: + selectable = event.getSelectable(); + selector.add(selectable); + break; + case SELECTABLE_UPDATED: + selectable = event.getSelectable(); + selector.update(selectable); + break; + case SELECTABLE_FINAL: + selectable = event.getSelectable(); + selector.remove(selectable); + selectable.release(); + break; + case CONNECTION_LOCAL_OPEN: + handleOpen(event); + break; + case CONNECTION_BOUND: + handleBound(reactor, event); + break; + case TRANSPORT: + handleTransport(reactor, event); + break; + case TRANSPORT_CLOSED: + event.getTransport().unbind(); + break; + case REACTOR_QUIESCED: + handleQuiesced(reactor, selector); + break; + default: + break; + } + } catch(IOException e) { + e.printStackTrace(); + // TODO: not clear what to do with this! + } + } +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0187017/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java new file mode 100644 index 0000000..5072958 --- /dev/null +++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java @@ -0,0 +1,445 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.proton.reactor.impl; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.Pipe; +import java.util.HashSet; +import java.util.Set; + +import org.apache.qpid.proton.Proton; +import org.apache.qpid.proton.engine.BaseHandler; +import org.apache.qpid.proton.engine.Collector; +import org.apache.qpid.proton.engine.Event; +import org.apache.qpid.proton.engine.Event.Type; +import org.apache.qpid.proton.engine.Handler; +import org.apache.qpid.proton.engine.impl.CollectorImpl; +import org.apache.qpid.proton.engine.impl.HandlerEndpointImpl; +import org.apache.qpid.proton.reactor.Reactor; +import org.apache.qpid.proton.reactor.Selectable; +import org.apache.qpid.proton.reactor.Selectable.Callback; +import org.apache.qpid.proton.reactor.Selectable.RecordKeyType; +import org.apache.qpid.proton.reactor.Selectable.RecordValueType; +import org.apache.qpid.proton.reactor.Selector; +import org.apache.qpid.proton.reactor.Task; + +public class ReactorImpl implements Reactor { + + /* + * pn_record_t *attachments; + 41 pn_io_t *io; + 42 pn_collector_t *collector; + 43 pn_handler_t *global; + 44 pn_handler_t *handler; + 45 pn_list_t *children; + 46 pn_timer_t *timer; + 47 pn_socket_t wakeup[2]; + 48 pn_selectable_t *selectable; + 49 pn_event_type_t previous; + 50 pn_timestamp_t now; + 51 int selectables; + 52 int timeout; + 53 bool yield; + */ + + private Object attachment; + private CollectorImpl collector; + private long now; + private long timeout; + private Handler global; + private Handler handler; + private Set<Selectable> children; + private int selectables; + private boolean yield; + private Selectable selectable; + private Type previous; + private Timer timer; + private final Pipe wakeup; + + @Override + public long mark() { + now = System.currentTimeMillis(); + return now; + } + + @Override + public long now() { + return now; + } +/* + * tatic void pn_reactor_initialize(pn_reactor_t *reactor) { + 68 reactor->attachments = pn_record(); + 69 reactor->io = pn_io(); TODO: pn_io most literally translates to SocketFactory (and possibly also ServerSocketFactory...) + 70 reactor->collector = pn_collector(); + 71 reactor->global = pn_iohandler(); + 72 reactor->handler = pn_handler(NULL); + 73 reactor->children = pn_list(PN_OBJECT, 0); + 74 reactor->timer = pn_timer(reactor->collector); + 75 reactor->wakeup[0] = PN_INVALID_SOCKET; + 76 reactor->wakeup[1] = PN_INVALID_SOCKET; + 77 reactor->selectable = NULL; + 78 reactor->previous = PN_EVENT_NONE; + 79 reactor->selectables = 0; + 80 reactor->timeout = 0; + 81 reactor->yield = false; + 82 pn_reactor_mark(reactor); + 83 } + 84 */ + public ReactorImpl() throws IOException { + collector = (CollectorImpl)Proton.collector(); + global = new IOHandler(); + handler = new BaseHandler(); + children = new HashSet<Selectable>(); + selectables = 0; + timer = new Timer(collector); + wakeup = Pipe.open(); + mark(); + } + /* + 85 static void pn_reactor_finalize(pn_reactor_t *reactor) { + 86 for (int i = 0; i < 2; i++) { + 87 if (reactor->wakeup[i] != PN_INVALID_SOCKET) { + 88 pn_close(reactor->io, reactor->wakeup[i]); + 89 } + 90 } + 91 pn_decref(reactor->attachments); + 92 pn_decref(reactor->collector); + 93 pn_decref(reactor->global); + 94 pn_decref(reactor->handler); + 95 pn_decref(reactor->children); + 96 pn_decref(reactor->timer); + 97 pn_decref(reactor->io); + 98 } + */ + + @Override + public void attach(Object attachment) { + this.attachment = attachment; + } + + @Override + public Object attachment() { + return attachment; + } + + @Override + public long getTimeout() { + return timeout; + } + + @Override + public void setTimeout(long timeout) { + this.timeout = timeout; + } + + @Override + public Handler getGlobalHandler() { + return global; + } + + @Override + public void setGlobalHandler(Handler handler) { + global = handler; + } + + @Override + public Handler getHandler() { + return handler; + } + + @Override + public void setHandler(Handler handler) { + this.handler = handler; + } + +/* TODO + * pn_io_t *pn_reactor_io(pn_reactor_t *reactor) { +166 assert(reactor); +167 return reactor->io; +168 } +169 + + */ + + @Override + public Set<Selectable> children() { + return children; + } + + @Override + public Collector collector() { + return collector; + } + + private class ReleaseCallback implements Callback { + private final ReactorImpl reactor; + public ReleaseCallback(ReactorImpl reactor) { + this.reactor = reactor; + } + @Override + public void run(Selectable selectable) { + if (reactor.children.remove(selectable)) { + --reactor.selectables; + } + } + } + + @Override + public Selectable selectable() { + Selectable result = new SelectableImpl(); + result.setCollector(collector); + collector.put(Type.SELECTABLE_INIT, result); + result.setReactor(this); + children.add(result); + result.onRelease(new ReleaseCallback(this)); + ++selectables; + return result; + } + + + + @Override + public void update(Selectable selectable) { + if (!selectable.hasRecord(RecordKeyType.PNI_TERMINATED)) { + if (selectable.isTerminal()) { + selectable.setRecord(RecordKeyType.PNI_TERMINATED, RecordValueType.PN_VOID); + collector.put(Type.SELECTABLE_FINAL, selectable); + } else { + collector.put(Type.SELECTABLE_UPDATED, selectable); + } + } + } + + // TODO: pn_record_get_handler + // TODO: pn_record_set_handler + // TODO: pn_class_reactor + // TODO: pn_object_reactor + // TODO: pn_event_reactor + + // pn_event_handler - TODO: this is copied from the Reactor.java code, so might need some tweaks... + private Handler eventHandler(Event event) { + Handler result; + if (event.getLink() != null) { + result = ((HandlerEndpointImpl)event.getLink()).getHandler(); + if (result != null) return result; + } + if (event.getSession() != null) { + result = ((HandlerEndpointImpl)event.getSession()).getHandler(); + if (result != null) return result; + } + if (event.getConnection() != null) { + result = ((HandlerEndpointImpl)event.getConnection()).getHandler(); + if (result != null) return result; + } +// if (event.getTransport() != null) { // TODO: do we want to allow handlers to be added to the Transport object? +// result = ((EndpointImpl)event.getTransport()).getHandlers(); +// if (result.hasNext()) return result; +// } + + if (event.getTask() != null) { + result = event.getTask().getHandler(); + if (result != null) return result; + } + + if (event.getSelectable() != null) { + result = event.getSelectable().getHandler(); + if (result != null) return result; + } + + return handler; + } + + + @Override + public void yield() { + yield = true; + } + + @Override + public boolean quiesced() { + Event event = collector.peek(); + if (event == null) return true; + if (collector.more()) return false; + return event.getType() == Type.REACTOR_QUIESCED; + } + + @Override + public boolean process() { + mark(); + Type previous = null; + while (true) { + Event event = collector.peek(); + if (event != null) { + if (yield) { + yield = false; + return true; + } + yield = false; // TODO: is this required? + Handler handler = eventHandler(event); + event.dispatch(handler); + event.dispatch(global); + + if (event.getType() == Type.CONNECTION_FINAL) { // TODO: this should be the same as the pni_reactor_dispatch_post logic... + children.remove(event.getConnection()); + } + this.previous = event.getType(); + previous = this.previous; + collector.pop(); + + } else { + if (more()) { + if (previous != Type.REACTOR_QUIESCED && this.previous != Type.REACTOR_FINAL) { + collector.put(Type.REACTOR_QUIESCED, this); + } else { + return true; + } + } else { + if (selectable != null) { + selectable.terminate(); + update(selectable); + selectable = null; + } else { + return false; + } + } + } + } + } + + + @Override + public void wakeup() throws IOException { + //selector.wakeup(); + wakeup.sink().write(ByteBuffer.allocate(1)); // TODO: c version returns a value! + } + + @Override + public void start() { + collector.put(Type.REACTOR_INIT, this); + selectable = timerSelectable(); + //selectable.setDeadline(now + timeout); // TODO: this isn't in the C code... + } + + @Override + public void stop() { + collector.put(Type.REACTOR_FINAL, this); + // (Comment from C code) XXX: should consider removing this fron stop to avoid reentrance + process(); + collector = null; + } + + private boolean more() { + return timer.tasks() > 0 || selectables > 1; + } + + @Override + public void run() { + setTimeout(3141); // TODO: eh? + start(); + while(process()) {} + stop(); + } + + // pn_reactor_schedule from reactor.c + @Override + public Task schedule(int delay, Handler handler) { + Task task = timer.schedule(now + delay); + task.setReactor(this); + task.setHandler(handler); + if (selectable != null) { + selectable.setDeadline(timer.deadline()); + update(selectable); + } + return task; + } + // TODO: acceptor + // TODO: connection + // TODO: acceptorClose + + private class TimerReadable implements Callback { + + @Override + public void run(Selectable selectable) { + // TODO: the implication is that this will be called when the selectable is woken-up +/* + 434 static void pni_timer_readable(pn_selectable_t *sel) { + 435 char buf[64]; + 436 pn_reactor_t *reactor = pni_reactor(sel); + 437 pn_socket_t fd = pn_selectable_get_fd(sel); + 438 pn_read(reactor->io, fd, buf, 64); + 439 pni_timer_expired(sel); + 440 } + */ + // TODO: this could be more elegant... + new TimerExpired().run(selectable); + } + + } + + private class TimerExpired implements Callback { + @Override + public void run(Selectable selectable) { + ReactorImpl reactor = (ReactorImpl) selectable.getReactor(); + reactor.timer.tick(reactor.now); + selectable.setDeadline(reactor.timer.deadline()); + reactor.update(selectable); + } + } + + + // pni_timer_finalize from reactor.c + private class TimerFinalize implements Callback { + @Override + public void run(Selectable selectable) { + try { + selectable.getChannel().close(); + } catch(IOException e) { + e.printStackTrace(); + // TODO: no idea what to do here... + } + } + } + + private Selectable timerSelectable() { + Selectable sel = selectable(); + sel.setChannel(wakeup.source()); + sel.onReadable(new TimerReadable()); + sel.onExpired(new TimerExpired()); + sel.onFinalize(new TimerFinalize()); // TODO: not sure the corresponding sel._finalize() gets called anywhere... + sel.setReading(true); + sel.setDeadline(timer.deadline()); + update(sel); + return sel; + } + + // TODO: the C code allows records to be associated with a Reactor and the Selector is get/set using that capability. + private Selector selector; + + protected Selector getSelector() { + return selector; + } + + protected void setSelector(Selector selector) { + this.selector = selector; + } +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0187017/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectableImpl.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectableImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectableImpl.java new file mode 100644 index 0000000..cf3839d --- /dev/null +++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectableImpl.java @@ -0,0 +1,272 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.proton.reactor.impl; + +import java.nio.channels.SelectableChannel; +import java.util.HashMap; + +import org.apache.qpid.proton.engine.BaseHandler; +import org.apache.qpid.proton.engine.Collector; +import org.apache.qpid.proton.engine.Event.Type; +import org.apache.qpid.proton.engine.Handler; +import org.apache.qpid.proton.engine.Transport; +import org.apache.qpid.proton.engine.impl.CollectorImpl; +import org.apache.qpid.proton.reactor.Reactor; +import org.apache.qpid.proton.reactor.Selectable; + +public class SelectableImpl implements Selectable { + + private Callback readable; + private Callback writable; + private Callback error; + private Callback expire; + private Callback release; + private Callback finalize; + + private boolean reading = false; + private boolean writing = false; + private long deadline = 0; + private SelectableChannel channel; + private Object attachment; + private boolean registered; + private Reactor reactor; + private Transport transport; + private boolean terminal; + + @Override + public boolean isReading() { + return reading; + } + + @Override + public boolean isWriting() { + return writing; + } + + @Override + public long getDeadline() { + return deadline; + } + + @Override + public void setReading(boolean reading) { + this.reading = reading; + } + + @Override + public void setWriting(boolean writing) { + this.writing = writing; + } + + @Override + public void setDeadline(long deadline) { + this.deadline = deadline; + } + + @Override + public void onReadable(Callback runnable) { + this.readable = runnable; + } + + @Override + public void onWritable(Callback runnable) { + this.writable = runnable; + } + + @Override + public void onExpired(Callback runnable) { + this.expire = runnable; + } + + @Override + public void onError(Callback runnable) { + this.error = runnable; + } + + @Override + public void onRelease(Callback runnable) { + this.release = runnable; + } + + @Override + public void onFinalize(Callback runnable) { + this.finalize = runnable; + } + + @Override + public void readable() { + if (readable != null) { + readable.run(this); + } + } + + @Override + public void writeable() { + if (writable != null) { + writable.run(this); + } + } + + @Override + public void expired() { + if (expire != null) { + expire.run(this); + } + } + + @Override + public void error() { + if (error != null) { + error.run(this); + } + } + + @Override + public void release() { + if (release != null) { + release.run(this); + } + } + + @Override + public void _finalize() { + if (finalize != null) { + finalize.run(this); + } + } + + // These are equivalent to the C code's set/get file descritor functions. + @Override + public void setChannel(SelectableChannel channel) { + this.channel = channel; + } + + @Override + public SelectableChannel getChannel() { + return channel; + } + + @Override + public void setAttachment(Object attachment) { + this.attachment = attachment; + } + + @Override + public Object getAttachment() { + return attachment; + } + + @Override + public boolean isRegistered() { + return registered; + } + + @Override + public void setRegistered(boolean registered) { + this.registered = registered; + } + + @Override + public void setCollector(final Collector collector) { + final CollectorImpl collectorImpl = (CollectorImpl)collector; + + onReadable(new Callback() { + @Override + public void run(Selectable selectable) { + collectorImpl.put(Type.SELECTABLE_READABLE, selectable); + } + }); + onWritable(new Callback() { + @Override + public void run(Selectable selectable) { + collectorImpl.put(Type.SELECTABLE_WRITABLE, selectable); + } + }); + onExpired(new Callback() { + @Override + public void run(Selectable selectable) { + collectorImpl.put(Type.SELECTABLE_EXPIRED, selectable); + } + }); + onError(new Callback() { + @Override + public void run(Selectable selectable) { + collectorImpl.put(Type.SELECTABLE_ERROR, selectable); + } + }); + } + + @Override + public Reactor getReactor() { // TODO: the C version uses set/getContext for this - should we do the same? + return reactor; + } + + @Override + public void terminate() { + terminal = true; + } + + private final HashMap<RecordKeyType, RecordValueType> records = new HashMap<RecordKeyType, RecordValueType>(); + + @Override + public boolean hasRecord(RecordKeyType type) { + return records.containsKey(type); + } + + @Override + public void setRecord(RecordKeyType key, RecordValueType value) { + records.put(key, value); + } + + @Override + public boolean isTerminal() { + return terminal; + } + + + @Override + public Transport getTransport() { + return transport; + } + + @Override + public void setTransport(Transport transport) { + this.transport = transport; + } + + @Override + public void setReactor(Reactor reactor) { + this.reactor = reactor; + } + + // TODO: all this gets stuffed into records in the C code... + private BaseHandler _handler = new BaseHandler(); + @Override + public void add(Handler handler) { + _handler.add(handler); + } + + @Override + public Handler getHandler() { + return _handler; + } +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0187017/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectorImpl.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectorImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectorImpl.java new file mode 100644 index 0000000..c74853e --- /dev/null +++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectorImpl.java @@ -0,0 +1,137 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.proton.reactor.impl; + +import java.io.IOException; +import java.nio.channels.SelectionKey; +import java.util.HashSet; +import java.util.Iterator; + +import org.apache.qpid.proton.reactor.Selectable; +import org.apache.qpid.proton.reactor.Selector; + +public class SelectorImpl implements Selector { + + private final java.nio.channels.Selector selector; + private final HashSet<Selectable> selectables = new HashSet<Selectable>(); + private final HashSet<Selectable> readable = new HashSet<Selectable>(); + private final HashSet<Selectable> writeable = new HashSet<Selectable>(); + private final HashSet<Selectable> expired = new HashSet<Selectable>(); + private final HashSet<Selectable> error = new HashSet<Selectable>(); + + public SelectorImpl() throws IOException { + selector = java.nio.channels.Selector.open(); + } + + @Override + public void add(Selectable selectable) throws IOException { + selectable.getChannel().configureBlocking(false); + SelectionKey key = selectable.getChannel().register(selector, 0); + key.attach(selectable); + selectables.add(selectable); + update(selectable); + } + + @Override + public void update(Selectable selectable) { + int interestedOps = 0; + if (selectable.isReading()) interestedOps |= SelectionKey.OP_READ; + if (selectable.isWriting()) interestedOps |= SelectionKey.OP_WRITE; + SelectionKey key = selectable.getChannel().keyFor(selector); + key.interestOps(interestedOps); + } + + @Override + public void remove(Selectable selectable) { + SelectionKey key = selectable.getChannel().keyFor(selector); + key.cancel(); + key.attach(null); + selectables.remove(selectable); + } + + @Override + public void select(long timeout) throws IOException { + if (timeout > 0) { + long deadline = 0; + for (Selectable selectable : selectables) { // TODO: this differs from the C code which requires a call to update() to make deadline changes take affect + long d = selectable.getDeadline(); + if (d > 0) { + deadline = (deadline == 0) ? d : Math.min(deadline, d); + } + } + + if (deadline > 0) { + long now = System.currentTimeMillis(); + long delta = deadline - now; + if (delta < 0) { + timeout = 0; + } else if (delta < timeout) { + timeout = delta; + } + } + } + + if (timeout > 0) { + selector.select(timeout); + } else { + selector.selectNow(); + } + long awoken = System.currentTimeMillis(); + + readable.clear(); + writeable.clear(); + expired.clear(); + error.clear(); // TODO: nothing ever gets put in here... + for (SelectionKey key : selector.selectedKeys()) { + Selectable selectable = (Selectable)key.attachment(); + if (key.isReadable()) readable.add(selectable); + if (key.isWritable()) writeable.add(selectable); + } + for (Selectable selectable : selectables) { // TODO: this is different to the C code which evaluates expiry at the point the selectable is iterated over. + long deadline = selectable.getDeadline(); + if (deadline > 0 && awoken >= deadline) { + expired.add(selectable); + } + } + } + + @Override + public Iterator<Selectable> readable() { + return readable.iterator(); + } + + @Override + public Iterator<Selectable> writeable() { + return writeable.iterator(); + } + + @Override + public Iterator<Selectable> expired() { + return expired.iterator(); + } + + @Override + public Iterator<Selectable> error() { + return error.iterator(); + } + +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0187017/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/TaskImpl.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/TaskImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/TaskImpl.java new file mode 100644 index 0000000..3f650c2 --- /dev/null +++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/TaskImpl.java @@ -0,0 +1,83 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.proton.reactor.impl; + +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.qpid.proton.engine.Handler; +import org.apache.qpid.proton.reactor.Reactor; +import org.apache.qpid.proton.reactor.Task; + +public class TaskImpl implements Task, Comparable<TaskImpl> { + private final long deadline; + private final int counter; + private final AtomicInteger count = new AtomicInteger(); + + public TaskImpl(long deadline) { + this.deadline = deadline; + this.counter = count.getAndIncrement(); + } + @Override + public int compareTo(TaskImpl other) { + int result; + if (deadline < other.deadline) { + result = -1; + } else if (deadline > other.deadline) { + result = 1; + } else { + result = counter - other.counter; + } + return result; + } + @Override + public long deadline() { + return deadline; + } + private Reactor reactor; + @Override + public void setReactor(Reactor reactor) { + this.reactor = reactor; + + } + @Override + public Reactor getReactor() { + return reactor; + } + private Handler handler; + @Override + public void setHandler(Handler handler) { + this.handler = handler; + } + @Override + public Handler getHandler() { + return handler; + } + private Object attachment; + @Override + public Object getAttachment() { + return attachment; + } + @Override + public void setAttachment(Object attachment) { + this.attachment = attachment; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0187017/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/Timer.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/Timer.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/Timer.java new file mode 100644 index 0000000..32bb4f6 --- /dev/null +++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/Timer.java @@ -0,0 +1,70 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.proton.reactor.impl; + +import java.util.PriorityQueue; + +import org.apache.qpid.proton.engine.Collector; +import org.apache.qpid.proton.engine.Event.Type; +import org.apache.qpid.proton.engine.impl.CollectorImpl; +import org.apache.qpid.proton.reactor.Task; + +public class Timer { + + private CollectorImpl collector; + private PriorityQueue<Task> tasks = new PriorityQueue<Task>(); + + public Timer(Collector collector) { + this.collector = (CollectorImpl)collector; + } + + Task schedule(long deadline) { + TaskImpl task = new TaskImpl(deadline); + tasks.add(task); + return task; + } + + long deadline() { + if (tasks.size() > 0) { + Task task = tasks.peek(); + return task.deadline(); + } else { + return 0; + } + } + + void tick(long now) { + while(!tasks.isEmpty()) { + Task task = tasks.peek(); + if (now >= task.deadline()) { + tasks.poll(); + collector.put(Type.TIMER_TASK, task); + } else { + break; + } + } + } + + int tasks() { + return tasks.size(); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
