Added: activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttSession.java URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttSession.java?rev=1461839&view=auto ============================================================================== --- activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttSession.java (added) +++ activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttSession.java Wed Mar 27 20:20:31 2013 @@ -0,0 +1,1084 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.apollo.mqtt; + +import org.apache.activemq.apollo.broker.*; +import org.apache.activemq.apollo.broker.protocol.RawMessage; +import org.apache.activemq.apollo.broker.protocol.RawMessageCodec$; +import org.apache.activemq.apollo.broker.security.SecurityContext; +import org.apache.activemq.apollo.broker.store.StoreUOW; +import org.apache.activemq.apollo.filter.FilterException; +import org.apache.activemq.apollo.mqtt.MqttSessionManager.HostState; +import org.apache.activemq.apollo.mqtt.MqttSessionManager.SessionState; +import org.apache.activemq.apollo.util.LRUCache; +import org.apache.activemq.apollo.util.LongCounter; +import org.apache.activemq.apollo.util.path.Path$; +import org.apache.activemq.apollo.util.path.PathMap; +import org.apache.activemq.apollo.util.path.PathParser; +import org.fusesource.hawtbuf.Buffer; +import org.fusesource.hawtbuf.UTF8Buffer; +import org.fusesource.hawtdispatch.*; +import org.fusesource.mqtt.client.QoS; +import org.fusesource.mqtt.client.Topic; +import org.fusesource.mqtt.codec.*; +import scala.Option; +import scala.Tuple2; +import scala.runtime.BoxedUnit; + +import java.net.ProtocolException; +import java.util.*; + +import static org.fusesource.hawtdispatch.Dispatch.NOOP; +import static org.fusesource.hawtdispatch.Dispatch.createQueue; + +/** + * An MqttSession can be switch from one connection/protocol handler to another, + * but it will only be associated with one at a time. An MqttSession tracks + * the state of the communication with a client. + * + * @author <a href="http://hiramchirino.com">Hiram Chirino</a> + */ +public class MqttSession { + + public static final ScalaSupport.Logger log = new ScalaSupport.Logger(MqttProtocolHandler$.MODULE$); + + public static <T> T received(T value) { + log.trace("received: %s", value); + return value; + } + + + public final HostState host_state; + public final UTF8Buffer client_id; + public final SessionState session_state; + public final DispatchQueue queue; + + public MqttSession(HostState host_state, UTF8Buffer client_id, SessionState session_state) { + this.host_state = host_state; + this.client_id = client_id; + this.queue = createQueue("mqtt: " + client_id); + this.session_state = session_state; + } + + public boolean manager_disconnected = false; + public MqttProtocolHandler handler; + public SecurityContext security_context; + public boolean clean_session = false; + public CONNECT connect_message; + public DestinationParser destination_parser = MqttProtocol.destination_parser; + boolean publish_body = false; + + public VirtualHost host() { + return host_state.host(); + } + + public void connect(final MqttProtocolHandler next) { + queue.execute(new Task() { + public void run() { + if (manager_disconnected) { + // we are not the assignment anymore.. go to the session manager + // again to setup a new session. + MqttSessionManager.attach(host(), client_id, next); + } else { + + // so that we don't switch again until this current switch completes + queue.suspend(); + if (handler != null) { + detach(); + handler = null; + } + queue.execute(new Task() { + public void run() { + handler = next; + attach(); + } + }); + + // switch the connection to the session queue.. + next.connection()._set_dispatch_queue(queue, new Task() { + public void run() { + queue.resume(); + } + }); + } + + } + }); + } + + public void disconnect(final MqttProtocolHandler prev) { + queue.execute(new Task() { + @Override + public void run() { + if (handler == prev) { + MqttSessionManager.remove(host_state, client_id); + manager_disconnected = true; + detach(); + handler = null; + } + } + }); + } + + ///////////////////////////////////////////////////////////////////// + // + // Bits that deal with connections attaching/detaching from the session + // + ///////////////////////////////////////////////////////////////////// + public void attach() { + queue.assertExecuting(); + final MqttProtocolHandler h = handler; + clean_session = h.connect_message().cleanSession(); + security_context = h.security_context(); + h.command_handler_$eq(ScalaSupport.toScala(new UnitFn1<Object>() { + @Override + public void call(Object v1) { + on_transport_command(v1); + } + })); + + destination_parser = h.destination_parser(); + mqtt_consumer().consumer_sink.downstream_$eq(ScalaSupport.some(h.sink_manager().open())); + + final Task ack_connect = new Task() { + @Override + public void run() { + queue.assertExecuting(); + connect_message = h.connect_message(); + CONNACK connack = new CONNACK(); + connack.code(CONNACK.Code.CONNECTION_ACCEPTED); + send(connack); + } + }; + + if (!clean_session) { + // Setup the previous subscriptions.. + session_state.strategy().create(host().store(), client_id); + if (!session_state.subscriptions().isEmpty()) { + h._suspend_read("subscribing"); + ArrayList<Topic> topics = ScalaSupport.map(session_state.subscriptions().values(), new Fn1<Tuple2<Topic, BindAddress>, Topic>() { + @Override + public Topic apply(Tuple2<Topic, BindAddress> v1) { + return v1._1(); + } + }); + subscribe(topics, new Task() { + @Override + public void run() { + h.resume_read(); + h.queue().execute(ack_connect); + } + }); + } else { + ack_connect.run(); + } + } else { + // do we need to clear the received ids? + // durable_session_state.received_message_ids.clear() + session_state.subscriptions().clear(); + if (session_state.durable_sub() != null) { + final DestinationAddress[] addresses = new DestinationAddress[]{session_state.durable_sub()}; + session_state.durable_sub_$eq(null); + host().dispatch_queue().execute(new Task() { + @Override + public void run() { + host().router().delete(addresses, security_context); + } + }); + } + + session_state.strategy().destroy(new Task() { + @Override + public void run() { + ack_connect.run(); + } + }); + } + + } + + public void detach() { + queue.assertExecuting(); + if (!producerRoutes.isEmpty()) { + final ArrayList<MqttProducerRoute> routes = new ArrayList<MqttProducerRoute>(producerRoutes.values()); + host().dispatch_queue().execute(new Task() { + @Override + public void run() { + for (MqttProducerRoute route : routes) { + host().router().disconnect(new ConnectAddress[]{route.address}, route); + } + } + }); + producerRoutes.clear(); + } + + if (clean_session) { + if (!mqtt_consumer().addresses.isEmpty()) { + final BindAddress[] addresses = mqtt_consumer().addresses.keySet().toArray(new BindAddress[mqtt_consumer().addresses.size()]); + host().dispatch_queue().execute(new Runnable() { + @Override + public void run() { + host().router().unbind(addresses, mqtt_consumer(), false, security_context); + } + }); + mqtt_consumer().addresses.clear(); + } + session_state.subscriptions().clear(); + } else { + if (session_state.durable_sub() != null) { + final BindAddress[] addresses = new BindAddress[]{session_state.durable_sub()}; + host().dispatch_queue().execute(new Runnable() { + @Override + public void run() { + host().router().unbind(addresses, mqtt_consumer(), false, security_context); + } + }); + mqtt_consumer().addresses.clear(); + session_state.durable_sub_$eq(null); + } + } + + for (Request request : in_flight_publishes.values()) { + if (request.ack() != null) { + request.ack().apply( + request.delivered() ? Delivered$.MODULE$ : Undelivered$.MODULE$ + ); + } + } + in_flight_publishes.clear(); + + handler.sink_manager().close(mqtt_consumer().consumer_sink.downstream().get(), ScalaSupport.<Request>noopFn1()); + mqtt_consumer().consumer_sink.downstream_$eq(ScalaSupport.<Sink<Request>>none()); + handler.on_transport_disconnected(); + } + + public SimpleAddress decode_destination(UTF8Buffer value) { + + SimpleAddress rc = destination_parser.decode_single_destination(value.toString(), ScalaSupport.toScala(new Fn1<String, SimpleAddress>() { + public SimpleAddress apply(String name) { + return new SimpleAddress("topic", destination_parser.decode_path(name)); + } + })); + if (rc == null && handler != null) { + handler.die("Invalid mqtt destination name: " + value); + } + return rc; + } + + ///////////////////////////////////////////////////////////////////// + // + // Bits that deal with assigning message ids to QoS > 0 requests + // and tracking those requests so that they can get replayed on a + // reconnect. + // + ///////////////////////////////////////////////////////////////////// + + final HashMap<Short, Request> in_flight_publishes = new HashMap<Short, Request>(); + + public void send(MessageSupport.Message message) { + queue.assertExecuting(); + handler.connection_sink().offer(new Request((short) 0, message, null)); + } + + public void publish_completed(short id) { + queue.assertExecuting(); + Request request = in_flight_publishes.remove(id); + if (request != null) { + if (request.ack() != null) { + request.ack().apply(Consumed$.MODULE$); + } + } else { + // It's possible that on a reconnect, we get an ACK + // in for message that was not dispatched yet. store + // a place holder so we ack it upon the dispatch + // attempt. + in_flight_publishes.put(id, new Request(id, null, null)); + } + } + + ///////////////////////////////////////////////////////////////////// + // + // Bits that deal with processing new messages from the client. + // + ///////////////////////////////////////////////////////////////////// + public void on_transport_command(Object o) { + try { + if (o.getClass() == MQTTFrame.class) { + MQTTFrame command = (MQTTFrame) o; + switch (command.messageType()) { + case PUBLISH.TYPE: { + on_mqtt_publish(received(new PUBLISH().decode(command))); + break; + } + // This follows a Publish with QoS EXACTLY_ONCE + case PUBREL.TYPE: { + final PUBREL ack = received(new PUBREL().decode(command)); + // TODO: perhaps persist the processed list.. otherwise + // we can't filter out dups after a broker restart. + session_state.received_message_ids().remove(ack.messageId()); + session_state.strategy().update(new Task() { + @Override + public void run() { + send(new PUBCOMP().messageId(ack.messageId())); + } + }); + break; + } + case SUBSCRIBE.TYPE: { + on_mqtt_subscribe(received(new SUBSCRIBE().decode(command))); + break; + } + case UNSUBSCRIBE.TYPE: { + on_mqtt_unsubscribe(received(new UNSUBSCRIBE().decode(command))); + break; + } + // AT_LEAST_ONCE ack flow for a client subscription + case PUBACK.TYPE: { + PUBACK ack = received(new PUBACK().decode(command)); + publish_completed(ack.messageId()); + break; + } + // EXACTLY_ONCE ack flow for a client subscription + case PUBREC.TYPE: { + PUBREC ack = received(new PUBREC().decode(command)); + send(new PUBREL().messageId(ack.messageId())); + break; + } + case PUBCOMP.TYPE: { + PUBCOMP ack = received(new PUBCOMP().decode(command)); + publish_completed(ack.messageId()); + break; + } + case PINGREQ.TYPE: { + received(new PINGREQ().decode(command)); + send(new PINGRESP()); + break; + } + case DISCONNECT.TYPE: { + received(new DISCONNECT()); + MqttSessionManager.disconnect(host_state, client_id, handler); + break; + } + default: { + handler.die("Invalid MQTT message type: " + command.messageType()); + break; + } + } + } else if ("failure".equals(o)) { + // Publish the client's will + publish_will(new Task() { + @Override + public void run() { + // then disconnect him. + MqttSessionManager.disconnect(host_state, client_id, handler); + + } + }); + } else { + handler.die("Internal Server Error: unexpected mqtt command: " + o.getClass()); + } + } catch (ProtocolException e) { + handler.die("Internal Server Error: " + e); + } + } + + ///////////////////////////////////////////////////////////////////// + // + // Bits that deal with processing PUBLISH messages + // + ///////////////////////////////////////////////////////////////////// + LRUCache<UTF8Buffer, MqttProducerRoute> producerRoutes = new LRUCache<UTF8Buffer, MqttProducerRoute>(10) { + @Override + protected void onCacheEviction(final Map.Entry<UTF8Buffer, MqttProducerRoute> eldest) { + host().dispatch_queue().execute(new Task() { + @Override + public void run() { + ConnectAddress[] array = new ConnectAddress[]{eldest.getValue().address}; + host().router().disconnect(array, eldest.getValue()); + } + }); + } + }; + + class MqttProducerRoute extends DeliveryProducerRoute { + + public final SimpleAddress address; + public final MqttProtocolHandler handler; + boolean suspended = false; + + public MqttProducerRoute(SimpleAddress address, MqttProtocolHandler h) { + super(host().router()); + this.address = address; + this.handler = h; + this.refiller_$eq(new Task() { + @Override + public void run() { + if (suspended) { + suspended = false; + handler.resume_read(); + } + } + }); + } + + @Override + public int send_buffer_size() { + return handler.codec().getReadBufferSize(); + } + + @Override + public Option<BrokerConnection> connection() { + return ScalaSupport.some(handler.connection()); + } + + @Override + public DispatchQueue dispatch_queue() { + return queue; + } + } + + public void on_mqtt_publish(final PUBLISH publish) { + + if ((publish.qos() == QoS.EXACTLY_ONCE) && session_state.received_message_ids().contains(publish.messageId())) { + PUBREC response = new PUBREC(); + response.messageId(publish.messageId()); + send(response); + return; + } + + handler.messages_received().incrementAndGet(); + + queue.assertExecuting(); + MqttProducerRoute route = producerRoutes.get(publish.topicName()); + if (route == null) { + // create the producer route... + + final SimpleAddress destination = decode_destination(publish.topicName()); + final MqttProducerRoute froute = route = new MqttProducerRoute(destination, handler); + + // don't process commands until producer is connected... + route.handler._suspend_read("route publish lookup"); + host().dispatch_queue().execute(new Task() { + + @Override + public void run() { + host().router().connect(new ConnectAddress[]{destination}, froute, security_context); + queue.execute(new Runnable() { + @Override + public void run() { + // We don't care if we are not allowed to send.. + if (!froute.handler.connection().stopped()) { + froute.handler.resume_read(); + producerRoutes.put(publish.topicName(), froute); + send_via_route(froute, publish); + } + } + }); + } + }); + } else { + // we can re-use the existing producer route + send_via_route(route, publish); + } + } + + class AtLeastOnceProducerAck extends UnitFn2<DeliveryResult, StoreUOW> { + public final PUBLISH publish; + + AtLeastOnceProducerAck(PUBLISH publish) { + this.publish = publish; + } + + public void call(final DeliveryResult r, final StoreUOW uow) { + queue.execute(new Task() { + @Override + public void run() { + PUBACK response = new PUBACK(); + response.messageId(publish.messageId()); + send(response); + } + }); + } + } + + class ExactlyOnceProducerAck extends AtLeastOnceProducerAck { + + ExactlyOnceProducerAck(PUBLISH publish) { + super(publish); + } + + public void call(final DeliveryResult r, final StoreUOW uow) { + queue.execute(new Task() { + @Override + public void run() { + // TODO: perhaps persist the processed list.. + session_state.received_message_ids().add(publish.messageId()); + session_state.strategy().update(new Task() { + @Override + public void run() { + PUBREC response = new PUBREC(); + response.messageId(publish.messageId()); + send(response); + } + }); + } + }); + } + } + + public void send_via_route(MqttProducerRoute route, PUBLISH publish) { + queue.assertExecuting(); + + AtLeastOnceProducerAck ack = null; + if (publish.qos() == QoS.AT_LEAST_ONCE) { + ack = new AtLeastOnceProducerAck(publish); + } else if (publish.qos() == QoS.EXACTLY_ONCE) { + ack = new ExactlyOnceProducerAck(publish); + } + + if (!route.targets().isEmpty()) { + Delivery delivery = new Delivery(); + delivery.message_$eq(new RawMessage(publish.payload())); + delivery.persistent_$eq(publish.qos().ordinal() > 0); + delivery.size_$eq(publish.payload().length); + delivery.ack_$eq(ScalaSupport.toScala(ack)); + if (publish.retain()) { + if (delivery.size() == 0) { + delivery.retain_$eq(RetainRemove$.MODULE$); + } else { + delivery.retain_$eq(RetainSet$.MODULE$); + } + } + + // routes can always accept at least 1 delivery... + assert !route.full(); + route.offer(delivery); + if (route.full()) { + // but once it gets full.. suspend to flow control the producer. + route.suspended = true; + handler._suspend_read("blocked sending to: " + route.overflowSessions().mkString(", ")); + } + + } else { + if (ack != null) { + ack.apply(null, null); + } + } + } + + public void publish_will(final Task complete_close) { + if (connect_message != null) { + if (connect_message.willTopic() == null) { + complete_close.run(); + } else { + + final SimpleAddress destination = decode_destination(connect_message.willTopic()); + final DeliveryProducerRoute prodcuer = new DeliveryProducerRoute(host().router()) { + { + refiller_$eq(NOOP); + } + + @Override + public int send_buffer_size() { + return 1024 * 64; + } + + @Override + public Option<BrokerConnection> connection() { + return handler != null ? ScalaSupport.some(handler.connection()) : ScalaSupport.<BrokerConnection>none(); + } + + @Override + public DispatchQueue dispatch_queue() { + return queue; + } + }; + + host().dispatch_queue().execute(new Task() { + @Override + public void run() { + host().router().connect(new ConnectAddress[]{destination}, prodcuer, security_context); + queue.execute(new Task() { + @Override + public void run() { + if (prodcuer.targets().isEmpty()) { + complete_close.run(); + } else { + Delivery delivery = new Delivery(); + delivery.message_$eq(new RawMessage(connect_message.willMessage())); + delivery.size_$eq(connect_message.willMessage().length); + delivery.persistent_$eq(connect_message.willQos().ordinal() > 0); + if (connect_message.willRetain()) { + if (delivery.size() == 0) { + delivery.retain_$eq(RetainRemove$.MODULE$); + } else { + delivery.retain_$eq(RetainSet$.MODULE$); + } + } + delivery.ack_$eq(ScalaSupport.toScala(new UnitFn2<DeliveryResult, StoreUOW>() { + @Override + public void call(DeliveryResult x, StoreUOW y) { + host().dispatch_queue().execute(new Task() { + @Override + public void run() { + host().router().disconnect(new ConnectAddress[]{destination}, prodcuer); + } + }); + complete_close.run(); + } + })); + handler.messages_received().incrementAndGet(); + prodcuer.offer(delivery); + } + } + }); + } + }); + } + } + } + ///////////////////////////////////////////////////////////////////// + // + // Bits that deal with subscriptions + // + ///////////////////////////////////////////////////////////////////// + + public void on_mqtt_subscribe(final SUBSCRIBE sub) { + subscribe(Arrays.asList(sub.topics()), new Task() { + @Override + public void run() { + queue.execute(new Task() { + @Override + public void run() { + session_state.strategy().update(new Task() { + @Override + public void run() { + SUBACK suback = new SUBACK(); + suback.messageId(sub.messageId()); + + byte[] granted = new byte[sub.topics().length]; + int i = 0; + for (Topic topic : sub.topics()) { + granted[i] = (byte) topic.qos().ordinal(); + i++; + } + + suback.grantedQos(granted); + send(suback); + } + }); + } + }); + } + }); + } + + public void subscribe(Collection<Topic> topics, final Task on_subscribed) { + final ArrayList<BindAddress> addresses = ScalaSupport.map(topics, new Fn1<Topic, BindAddress>() { + @Override + public BindAddress apply(Topic topic) { + BindAddress address = decode_destination(topic.name()); + session_state.subscriptions().put(topic.name(), new Tuple2<Topic, BindAddress>(topic, address)); + mqtt_consumer().addresses.put(address, topic.qos()); + if (PathParser.containsWildCards(address.path())) { + mqtt_consumer().wildcards.put(address.path(), topic.qos()); + } + return address; + } + }); + + + handler.subscription_count_$eq(mqtt_consumer().addresses.size()); + + if (!clean_session) { + Set<BindAddress> bindAddressSet = mqtt_consumer().addresses.keySet(); + SubscriptionAddress durable_sub = new SubscriptionAddress(Path$.MODULE$.create(client_id.toString()), null, bindAddressSet.toArray(new BindAddress[bindAddressSet.size()])); + session_state.durable_sub_$eq(durable_sub); + addresses.clear(); + addresses.add(durable_sub); + } + + host().dispatch_queue().execute(new Task() { + @Override + public void run() { + for (BindAddress address : addresses) { + host().router().bind(new BindAddress[]{address}, mqtt_consumer(), security_context, ScalaSupport.<Option<String>>noopFn1()); + } + on_subscribed.run(); + } + }); + } + + public void on_mqtt_unsubscribe(final UNSUBSCRIBE unsubscribe) { + + ArrayList<BindAddress> addressesList = ScalaSupport.flatMap(Arrays.asList(unsubscribe.topics()), new Fn1<UTF8Buffer, Option<BindAddress>>() { + @Override + public Option<BindAddress> apply(UTF8Buffer topicName) { + Tuple2<Topic, BindAddress> removed = session_state.subscriptions().remove(topicName); + if (removed != null) { + Topic topic = removed._1(); + BindAddress address = removed._2(); + mqtt_consumer().addresses.remove(address); + if (PathParser.containsWildCards(address.path())) { + mqtt_consumer().wildcards.remove(address.path(), topic.qos()); + } + return ScalaSupport.some(address); + } else { + return ScalaSupport.none(); + } + + } + }); + final BindAddress[] addresses = addressesList.toArray(new BindAddress[addressesList.size()]); + + handler.subscription_count_$eq(mqtt_consumer().addresses.size()); + + if (!clean_session) { + Set<BindAddress> bindAddressSet = mqtt_consumer().addresses.keySet(); + session_state.durable_sub_$eq(new SubscriptionAddress(Path$.MODULE$.create(client_id.toString()), null, bindAddressSet.toArray(new BindAddress[bindAddressSet.size()]))); + } + + host().dispatch_queue().execute(new Task() { + @Override + public void run() { + if (clean_session) { + host().router().unbind(addresses, mqtt_consumer(), false, security_context); + } else { + if (mqtt_consumer().addresses.isEmpty()) { + host().router().unbind(new BindAddress[]{session_state.durable_sub()}, mqtt_consumer(), true, security_context); + session_state.durable_sub_$eq(null); + } else { + host().router().bind(new BindAddress[]{session_state.durable_sub()}, mqtt_consumer(), security_context, ScalaSupport.<Option<String>>noopFn1()); + } + } + queue.execute(new Task() { + @Override + public void run() { + session_state.strategy().update(new Task() { + @Override + public void run() { + UNSUBACK ack = new UNSUBACK(); + ack.messageId(unsubscribe.messageId()); + send(ack); + } + }); + } + }); + } + }); + + } + + + MqttConsumer _mqtt_consumer; + + MqttConsumer mqtt_consumer() { + if (_mqtt_consumer == null) { + _mqtt_consumer = new MqttConsumer(); + } + return _mqtt_consumer; + } + + class IntPair { + int _1; + int _2; + + IntPair(int int1, int int2) { + this._1 = int1; + this._2 = int2; + } + } + + class MqttConsumer extends AbstractRetainedDeliveryConsumer { + + @Override + public String toString() { + return "mqtt client:" + client_id + " remote address: " + security_context.remote_address(); + } + + public DispatchQueue dispatch_queue() { + return queue; + } + + public HashMap<BindAddress, QoS> addresses = new HashMap<BindAddress, QoS>(); + public PathMap wildcards = new PathMap<QoS>(); + + CustomDispatchSource<IntPair, IntPair> credit_window_source = Dispatch.createSource(new EventAggregator<IntPair, IntPair>() { + public IntPair mergeEvent(IntPair previous, IntPair event) { + if (previous == null) { + return event; + } else { + return new IntPair(previous._1 + event._1, previous._2 + event._2); + } + } + + public IntPair mergeEvents(IntPair previous, IntPair events) { + return mergeEvent(previous, events); + } + + }, queue); + + { + credit_window_source.setEventHandler(new Task() { + @Override + public void run() { + IntPair data = credit_window_source.getData(); + credit_window_filter.credit(data._1, data._2); + } + }); + credit_window_source.resume(); + } + + // + MutableSink<Request> consumer_sink = new MutableSink<Request>(); + + { + consumer_sink.downstream_$eq(ScalaSupport.<Sink<Request>>none()); + } + + public LongCounter next_seq_id = new LongCounter(0); + + public long get_next_seq_id() { + return next_seq_id.getAndIncrement(); + } + + short to_message_id(long value) { + return (short) + (0x8000 | // MQTT message ids cannot be zero, so we always set the highest bit. + (value & 0x7FFF)); // the lower 15 bits come for the original seq id. + } + + CreditWindowFilter<Tuple2<Session<Delivery>, Delivery>> credit_window_filter = new CreditWindowFilter<Tuple2<Session<Delivery>, Delivery>>(consumer_sink.flatMap(ScalaSupport.toScala(new Fn1<Tuple2<Session<Delivery>, Delivery>, Option<Request>>() { + public Option<Request> apply(Tuple2<Session<Delivery>, Delivery> event) { + queue.assertExecuting(); + Session<Delivery> session = event._1(); + final Delivery delivery = event._2(); + + session_manager.delivered(session, delivery.size()); + + // Look up which QoS we need to send this message with.. + SimpleAddress topic = delivery.sender().head().simple(); + + QoS qos = addresses.get(topic); + if (qos == null) { + qos = ScalaSupport.<QoS>head(wildcards.get(topic.path())); + } + + if (qos == null) { + acked(delivery, Consumed$.MODULE$); + return ScalaSupport.none(); + } else { + PUBLISH publish = new PUBLISH(); + publish.topicName(new UTF8Buffer(destination_parser.encode_destination(delivery.sender().head()))); + if (delivery.redeliveries() > 0) { + publish.dup(true); + } + + if (delivery.message().codec() == RawMessageCodec$.MODULE$) { + publish.payload(((RawMessage) delivery.message()).payload()); + } else { + if (publish_body) { + try { + publish.payload(delivery.message().getBodyAs(Buffer.class)); + } catch (FilterException e) { + log.error(e, "Internal Server Error: Could not covert message body to a Buffer"); + } + } else { + publish.payload(delivery.message().encoded()); + } + } + + handler.messages_sent().incrementAndGet(); + + UnitFn1<DeliveryResult> ack = new UnitFn1<DeliveryResult>() { + @Override + public void call(DeliveryResult result) { + acked(delivery, result); + } + }; + + if (delivery.ack() != null && (qos != QoS.AT_MOST_ONCE)) { + publish.qos(qos); + short id = to_message_id(clean_session ? + get_next_seq_id() : // generate our own seq id. + delivery.seq() // use the durable sub's seq id.. + ); + + publish.messageId(id); + Request request = new Request(id, publish, ack); + Request prev = in_flight_publishes.put(id, request); + if (prev != null) { + + // A reconnecting client could have acked before + // we get dispatched by the durable sub. + if (prev.message() == null) { + in_flight_publishes.remove(id); + acked(delivery, Consumed$.MODULE$); + } else { + // Looks we sent out a msg with that id. This could only + // happen once we send out 0x7FFF message and the first + // one has not been acked. + handler.async_die("Client not acking regularly.", null); + } + } + return ScalaSupport.some(request); + + } else { + // This callback gets executed once the message + // sent to the transport. + publish.qos(QoS.AT_MOST_ONCE); + return ScalaSupport.some(new Request((short) 0, publish, ack)); + } + + } + } + })), SessionDeliverySizer$.MODULE$); + + + public void acked(Delivery delivery, DeliveryResult result) { + queue.assertExecuting(); + credit_window_source.merge(new IntPair(delivery.size(), 1)); + if (delivery.ack() != null) { + delivery.ack().apply(result, null); + } + } + + { + credit_window_filter.credit(handler.codec().getWriteBufferSize() * 2, 1); + } + + SessionSinkMux<Delivery> session_manager = new SessionSinkMux<Delivery>(credit_window_filter, queue, Delivery$.MODULE$, Integer.MAX_VALUE / 2, receive_buffer_size()) { + @Override + public long time_stamp() { + return host().broker().now(); + } + }; + + private void super_dispose() { + super.dispose(); + } + + @Override + protected void dispose() { + queue.execute(new Task() { + @Override + public void run() { + super_dispose(); + } + }); + } + + @Override + public Option<BrokerConnection> connection() { + return handler != null ? ScalaSupport.some(handler.connection()) : ScalaSupport.<BrokerConnection>none(); + } + + @Override + public int receive_buffer_size() { + return 1024 * 64; + } + + @Override + public boolean is_persistent() { + return false; + } + + @Override + public boolean matches(Delivery message) { + return true; + } + + // + // Each destination we subscribe to will establish a session with us. + // + class MqttConsumerSession extends AbstractSessionSinkFilter<Delivery> implements DeliverySession { + + final DeliveryProducer producer; + final SessionSink<Delivery> downstream; + + MqttConsumerSession(DeliveryProducer producer) { + producer.dispatch_queue().assertExecuting(); + this.producer = producer; + downstream = session_manager.open(producer.dispatch_queue()); + retain(); + } + + @Override + public SessionSink<Delivery> downstream_session_sink() { + return downstream; + } + + @Override + public DeliveryProducer producer() { + return producer; + } + + @Override + public String toString() { + if (handler == null) { + return "unconnected"; + } else { + return "connection to " + handler.connection().transport().getRemoteAddress(); + } + } + + public MqttConsumer consumer() { + return mqtt_consumer(); + } + + public boolean closed = false; + + public void close() { + producer.dispatch_queue().assertExecuting(); + if (!closed) { + closed = true; + dispose(); + } + } + + public void dispose() { + session_manager.close(downstream(), ScalaSupport.toScala(new UnitFn1<Delivery>() { + @Override + public void call(Delivery delivery) { + // We have been closed so we have to nak any deliveries. + if (delivery.ack() != null) { + delivery.ack().apply(Undelivered$.MODULE$, delivery.uow()); + } + } + })); + release(); + } + + @Override + public boolean offer(Delivery delivery) { + if (full()) { + return false; + } else { + delivery.message().retain(); + boolean rc = downstream().offer(delivery); + assert rc : "offer should be accepted since it was not full"; + return true; + } + } + } + + public MqttConsumerSession connect(DeliveryProducer p) { + return new MqttConsumerSession(p); + } + } + +}
Added: activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/ScalaSupport.java URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/ScalaSupport.java?rev=1461839&view=auto ============================================================================== --- activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/ScalaSupport.java (added) +++ activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/ScalaSupport.java Wed Mar 27 20:20:31 2013 @@ -0,0 +1,141 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.apollo.mqtt; + +import org.apache.activemq.apollo.util.Log; +import scala.Function1; +import scala.Function2; +import scala.Option; +import scala.collection.immutable.List; +import scala.runtime.BoxedUnit; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; + +/** + * @author <a href="http://hiramchirino.com">Hiram Chirino</a> + */ +public class ScalaSupport { + private static ScalaSupportHelper$ helper = ScalaSupportHelper$.MODULE$; + + static final Function1<Object,BoxedUnit> NOOP_FN1 = helper.toScala(new UnitFn1<Object>() { + @Override + public void call(Object v1) { + } + }); + + public static <T1> Function1<T1, BoxedUnit> noopFn1() { + return (Function1<T1, BoxedUnit>) NOOP_FN1; + } + + + public static <T1, R> Function1<T1, R> toScala(Fn1<T1, R> func) { + if( func == null ) { + return null; + } + return helper.toScala(func); + } + + public static <T1, T2, R> Function2<T1, T2, R> toScala(Fn2<T1, T2, R> func) { + if( func == null ) { + return null; + } + return helper.toScala(func); + } + + public static <T> Option<T> none() { + return helper.none(); + } + + public static <T> Option<T> some(T t) { + return helper.some(t); + } + + public static <T> T head(Iterable<T> i) { + Iterator<T> iterator = i.iterator(); + if( iterator.hasNext() ) { + return iterator.next(); + } else { + return null; + } + } + + public static <T,R> ArrayList<R> map(Collection<T> values, Fn1<T, R> func) { + ArrayList rc = new ArrayList(values.size()); + for( T t: values) { + rc.add(func.apply(t)); + } + return rc; + } + + public static <T,R> ArrayList<R> flatMap(Collection<T> values, Fn1<T, Option<R>> func) { + ArrayList rc = new ArrayList(values.size()); + for( T t: values) { + Option<R> opt = func.apply(t); + if( opt.isDefined() ) { + rc.add(opt.get()); + } + } + return rc; + } + + public static List<String> toList(String ... s) { + return helper.toList(s); + } + + + public static class Logger { + final Log log; + + public Logger(Log log) { + this.log = log; + } + + public <T> void trace(String message, Object ...args) { + helper.trace(log, message, args); + } + public <T> void debug(String message, Object ...args) { + helper.debug(log, message, args); + } + public <T> void info(String message, Object ...args) { + helper.info(log, message, args); + } + public <T> void warn(String message, Object ...args) { + helper.warn(log, message, args); + } + public <T> void error(String message, Object ...args) { + helper.error(log, message, args); + } + + public <T> void trace(Throwable e, String message, Object ...args) { + helper.trace(log, e, message, args); + } + public <T> void debug(Throwable e, String message, Object ...args) { + helper.debug(log, e, message, args); + } + public <T> void info(Throwable e, String message, Object ...args) { + helper.info(log, e, message, args); + } + public <T> void warn(Throwable e, String message, Object ...args) { + helper.warn(log, e, message, args); + } + public <T> void error(Throwable e, String message, Object ...args) { + helper.error(log, e, message, args); + } + } +} Added: activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/ScalaSupport.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/ScalaSupport.scala?rev=1461839&view=auto ============================================================================== --- activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/ScalaSupport.scala (added) +++ activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/ScalaSupport.scala Wed Mar 27 20:20:31 2013 @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.apollo.mqtt + +import scala.Function1 +import org.apache.activemq.apollo.util.Log +import scala.runtime.BoxedUnit + +abstract class Fn1[-T1,+R] { + def apply(v1: T1): R +} + +class UnitFn1[-T1] extends Fn1[T1, BoxedUnit] { + def call(v1: T1) = {} + def apply(v1: T1) = { + call(v1) + BoxedUnit.UNIT; + } +} + +abstract class Fn2[-T1,-T2,+R] { + def apply(v1: T1, v2: T2): R +} + +class UnitFn2[-T1,-T2] extends Fn2[T1,T2, BoxedUnit] { + def call (v1: T1, v2: T2) = {} + def apply(v1: T1, v2: T2) = { + call(v1, v2) + BoxedUnit.UNIT; + } +} + +/** + * Created with IntelliJ IDEA. + * User: chirino + * Date: 3/26/13 + * Time: 3:27 PM + * To change this template use File | Settings | File Templates. + */ +object ScalaSupportHelper { + def toScala[T1,R](func:Fn1[T1,R]):Function1[T1,R] = (v1:T1) => { func.apply(v1) } + def toScala[T1,T2,R](func:Fn2[T1,T2,R]):Function2[T1,T2,R] = (v1:T1, v2:T2) => { func.apply(v1,v2) } + def none[T]:Option[T] = None + def some[T](t:T):Option[T] = Some(t) + def toList[T](args:Array[T]):List[T] = List(args:_*) + + def trace(log:Log, message:String, args:Array[Object]) = log.trace(message, args:_*) + def debug(log:Log, message:String, args:Array[Object]) = log.debug(message, args:_*) + def info (log:Log, message:String, args:Array[Object]) = log.info (message, args:_*) + def warn (log:Log, message:String, args:Array[Object]) = log.warn (message, args:_*) + def error(log:Log, message:String, args:Array[Object]) = log.error(message, args:_*) + + def trace(log:Log, e:Throwable, message:String, args:Array[Object]) = log.trace(e, message, args:_*) + def debug(log:Log, e:Throwable, message:String, args:Array[Object]) = log.debug(e, message, args:_*) + def info (log:Log, e:Throwable, message:String, args:Array[Object]) = log.info (e, message, args:_*) + def warn (log:Log, e:Throwable, message:String, args:Array[Object]) = log.warn (e, message, args:_*) + def error(log:Log, e:Throwable, message:String, args:Array[Object]) = log.error(e, message, args:_*) +} Copied: activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/dto/Module.java (from r1459409, activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/dto/Module.scala) URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/dto/Module.java?p2=activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/dto/Module.java&p1=activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/dto/Module.scala&r1=1459409&r2=1461839&rev=1461839&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/dto/Module.scala (original) +++ activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/dto/Module.java Wed Mar 27 20:20:31 2013 @@ -14,15 +14,25 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.activemq.apollo.mqtt.dto +package org.apache.activemq.apollo.mqtt.dto; -import org.apache.activemq.apollo.util.DtoModule +import org.apache.activemq.apollo.util.DtoModule; /** + * <p> + * </p> + * * @author <a href="http://hiramchirino.com">Hiram Chirino</a> */ -class Module extends DtoModule { - def dto_package = "org.apache.activemq.apollo.mqtt.dto" +public class Module implements DtoModule { - def extension_classes = Array(classOf[MqttDTO], classOf[MqttConnectionStatusDTO]) -} \ No newline at end of file + @Override + public String dto_package() { + return "org.apache.activemq.apollo.mqtt.dto"; + } + + @Override + public Class<?>[] extension_classes() { + return new Class<?>[]{ MqttDTO.class, MqttConnectionStatusDTO.class }; + } +} Modified: activemq/activemq-apollo/trunk/apollo-scala/pom.xml URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-scala/pom.xml?rev=1461839&r1=1461838&r2=1461839&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-scala/pom.xml (original) +++ activemq/activemq-apollo/trunk/apollo-scala/pom.xml Wed Mar 27 20:20:31 2013 @@ -32,12 +32,7 @@ <name>${project.artifactId}</name> <description>Holds the common Maven settings for Scala based modules.</description> - - <properties> - <scala-plugin-main-goal>compile</scala-plugin-main-goal> - <scala-plugin-test-goal>testCompile</scala-plugin-test-goal> - </properties> - + <dependencies> <!-- Scala Support: this needs to be copied into every scala project --> @@ -81,9 +76,10 @@ <executions> <execution> <id>compile</id> - <phase>compile</phase> + <phase>process-resources</phase> <goals> - <goal>${scala-plugin-main-goal}</goal> + <goal>add-source</goal> + <goal>compile</goal> </goals> <configuration> <testSourceDir>dontcompile</testSourceDir> @@ -106,7 +102,7 @@ <id>test</id> <phase>test-compile</phase> <goals> - <goal>${scala-plugin-test-goal}</goal> + <goal>testCompile</goal> </goals> <configuration> <!-- <displayCmd>true</displayCmd> --> Modified: activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/Path.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/Path.scala?rev=1461839&r1=1461838&r2=1461839&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/Path.scala (original) +++ activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/Path.scala Wed Mar 27 20:20:31 2013 @@ -18,6 +18,7 @@ package org.apache.activemq.apollo.util. object Path { def apply(value:String*):Path = Path(value.toList.map(LiteralPart(_))) + def create(value:String):Path = Path(LiteralPart(value)::Nil) } /**
