http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-j/src/main/java/org/apache/qpid/proton/ProtonFactoryImpl.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/ProtonFactoryImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/ProtonFactoryImpl.java deleted file mode 100644 index 4238a6a..0000000 --- a/proton-j/src/main/java/org/apache/qpid/proton/ProtonFactoryImpl.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpid.proton; - -public class ProtonFactoryImpl implements ProtonFactory -{ - @Override - public final ImplementationType getImplementationType() - { - return ImplementationType.PROTON_J; - } -}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-j/src/main/java/org/apache/qpid/proton/ProtonFactoryLoader.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/ProtonFactoryLoader.java b/proton-j/src/main/java/org/apache/qpid/proton/ProtonFactoryLoader.java deleted file mode 100644 index 9ce1ad9..0000000 --- a/proton-j/src/main/java/org/apache/qpid/proton/ProtonFactoryLoader.java +++ /dev/null @@ -1,111 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpid.proton; - -import java.util.Iterator; -import java.util.ServiceLoader; -import java.util.logging.Level; -import java.util.logging.Logger; - -import org.apache.qpid.proton.ProtonFactory.ImplementationType; - -/** - * A thin wrapper around {@link ServiceLoader} intended for loading Proton object factories. - * - * If system property {@value #IMPLEMENTATION_TYPE_PROPERTY} if set, loads a factory - * whose {@link ImplementationType} matches its value. - */ -public class ProtonFactoryLoader<C extends ProtonFactory> -{ - public static final String IMPLEMENTATION_TYPE_PROPERTY = "qpid.proton.implementationtype"; - - private static final Logger LOGGER = Logger.getLogger(ProtonFactoryLoader.class.getName()); - private final Class<C> _factoryInterface; - private final ImplementationType _implementationType; - - /** - * Use this constructor if you intend to explicitly provide factory interface later, - * i.e. by calling {@link #loadFactory(Class)}. This is useful if you want to use the same - * ProtonFactoryLoader instance for loading multiple factory types. - */ - public ProtonFactoryLoader() - { - this(null); - } - - public ProtonFactoryLoader(Class<C> factoryInterface) - { - this(factoryInterface, getImpliedImplementationType()); - } - - static ImplementationType getImpliedImplementationType() - { - String implementationTypeFromSystemProperty = System.getProperty(IMPLEMENTATION_TYPE_PROPERTY); - if(implementationTypeFromSystemProperty != null) - { - return ImplementationType.valueOf(implementationTypeFromSystemProperty); - } - else - { - return ImplementationType.ANY; - } - } - - /** - * @param factoryInterface will be used as the factory interface class in calls to {@link #loadFactory()}. - */ - public ProtonFactoryLoader(Class<C> factoryInterface, ImplementationType implementationType) - { - _factoryInterface = factoryInterface; - _implementationType = implementationType; - } - - /** - * Returns the Proton factory that implements the stored {@link ProtonFactoryLoader#_factoryInterface} class. - */ - public C loadFactory() - { - return loadFactory(_factoryInterface); - } - - public C loadFactory(Class<C> factoryInterface) - { - if(factoryInterface == null) - { - throw new IllegalStateException("factoryInterface has not been set."); - } - ServiceLoader<C> serviceLoader = ServiceLoader.load(factoryInterface); - Iterator<C> serviceLoaderIterator = serviceLoader.iterator(); - while(serviceLoaderIterator.hasNext()) - { - C factory = serviceLoaderIterator.next(); - if(_implementationType == ImplementationType.ANY || factory.getImplementationType() == _implementationType) - { - if(LOGGER.isLoggable(Level.FINE)) - { - LOGGER.fine("loadFactory returning " + factory + " for loader's implementation type " + _implementationType); - } - return factory; - } - } - throw new IllegalStateException("Can't find service loader for " + factoryInterface.getName() + - " for implementation type " + _implementationType); - } - -} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-j/src/main/java/org/apache/qpid/proton/amqp/messaging/DeliveryAnnotations.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/amqp/messaging/DeliveryAnnotations.java b/proton-j/src/main/java/org/apache/qpid/proton/amqp/messaging/DeliveryAnnotations.java index 8472c04..9ea5504 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/amqp/messaging/DeliveryAnnotations.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/amqp/messaging/DeliveryAnnotations.java @@ -25,17 +25,18 @@ package org.apache.qpid.proton.amqp.messaging; import java.util.Map; -public final class DeliveryAnnotations - implements Section +import org.apache.qpid.proton.amqp.Symbol; + +public final class DeliveryAnnotations implements Section { - private final Map _value; + private final Map<Symbol, Object> _value; - public DeliveryAnnotations(Map value) + public DeliveryAnnotations(Map<Symbol, Object> value) { _value = value; } - public Map getValue() + public Map<Symbol, Object> getValue() { return _value; } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-j/src/main/java/org/apache/qpid/proton/amqp/messaging/MessageAnnotations.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/amqp/messaging/MessageAnnotations.java b/proton-j/src/main/java/org/apache/qpid/proton/amqp/messaging/MessageAnnotations.java index 925093a..9bf82d6 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/amqp/messaging/MessageAnnotations.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/amqp/messaging/MessageAnnotations.java @@ -23,20 +23,22 @@ package org.apache.qpid.proton.amqp.messaging; +import org.apache.qpid.proton.amqp.Symbol; + import java.util.Map; public final class MessageAnnotations implements Section { - private final Map _value; + private final Map<Symbol, Object> _value; - public MessageAnnotations(Map value) + public MessageAnnotations(Map<Symbol, Object> value) { _value = value; } - public Map getValue() + public Map<Symbol, Object> getValue() { return _value; } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-j/src/main/java/org/apache/qpid/proton/codec/Codec.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/codec/Codec.java b/proton-j/src/main/java/org/apache/qpid/proton/codec/Codec.java new file mode 100644 index 0000000..1aa2143 --- /dev/null +++ b/proton-j/src/main/java/org/apache/qpid/proton/codec/Codec.java @@ -0,0 +1,40 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.proton.codec; + +/** + * Codec + * + */ + +public final class Codec +{ + + private Codec() + { + } + + public static Data data(long capacity) + { + return Data.Factory.create(); + } + +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-j/src/main/java/org/apache/qpid/proton/codec/Data.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/codec/Data.java b/proton-j/src/main/java/org/apache/qpid/proton/codec/Data.java index 1252542..4f42488 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/codec/Data.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/codec/Data.java @@ -37,9 +37,19 @@ import org.apache.qpid.proton.amqp.UnsignedInteger; import org.apache.qpid.proton.amqp.UnsignedLong; import org.apache.qpid.proton.amqp.UnsignedShort; +import org.apache.qpid.proton.codec.impl.DataImpl; + public interface Data { + public static final class Factory { + + public static Data create() { + return new DataImpl(); + } + + } + enum DataType { http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-j/src/main/java/org/apache/qpid/proton/codec/DataFactory.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/codec/DataFactory.java b/proton-j/src/main/java/org/apache/qpid/proton/codec/DataFactory.java deleted file mode 100644 index 5a622e0..0000000 --- a/proton-j/src/main/java/org/apache/qpid/proton/codec/DataFactory.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.proton.codec; - -import org.apache.qpid.proton.ProtonFactory; - -public interface DataFactory extends ProtonFactory -{ - Data createData(long capacity); -} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-j/src/main/java/org/apache/qpid/proton/codec/impl/DataFactoryImpl.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/codec/impl/DataFactoryImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/codec/impl/DataFactoryImpl.java deleted file mode 100644 index 480e7a0..0000000 --- a/proton-j/src/main/java/org/apache/qpid/proton/codec/impl/DataFactoryImpl.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.proton.codec.impl; - -import org.apache.qpid.proton.ProtonFactoryImpl; -import org.apache.qpid.proton.ProtonUnsupportedOperationException; -import org.apache.qpid.proton.codec.Data; -import org.apache.qpid.proton.codec.DataFactory; - -public class DataFactoryImpl extends ProtonFactoryImpl implements DataFactory -{ - @Override - public Data createData(final long capacity) - { - return new DataImpl(); - } -} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-j/src/main/java/org/apache/qpid/proton/driver/Driver.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/driver/Driver.java b/proton-j/src/main/java/org/apache/qpid/proton/driver/Driver.java index c9eeeb3..2564f05 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/driver/Driver.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/driver/Driver.java @@ -21,8 +21,10 @@ package org.apache.qpid.proton.driver; +import java.io.IOException; import java.nio.channels.SelectableChannel; import java.nio.channels.ServerSocketChannel; +import org.apache.qpid.proton.driver.impl.DriverImpl; /** * A driver for the proton engine. @@ -40,6 +42,14 @@ import java.nio.channels.ServerSocketChannel; */ public interface Driver { + + public static final class Factory + { + public static Driver create() throws IOException { + return new DriverImpl(); + } + } + /** * Force {@link #doWait(long)} to return. * http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-j/src/main/java/org/apache/qpid/proton/driver/DriverFactory.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/driver/DriverFactory.java b/proton-j/src/main/java/org/apache/qpid/proton/driver/DriverFactory.java deleted file mode 100644 index c78779c..0000000 --- a/proton-j/src/main/java/org/apache/qpid/proton/driver/DriverFactory.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.proton.driver; - -import java.io.IOException; - -import org.apache.qpid.proton.ProtonFactory; - -public interface DriverFactory extends ProtonFactory -{ - Driver createDriver() throws IOException; -} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-j/src/main/java/org/apache/qpid/proton/driver/impl/ConnectorImpl.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/driver/impl/ConnectorImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/driver/impl/ConnectorImpl.java index e98317c..e3e43c4 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/driver/impl/ConnectorImpl.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/driver/impl/ConnectorImpl.java @@ -32,7 +32,6 @@ import org.apache.qpid.proton.engine.Connection; import org.apache.qpid.proton.engine.Sasl; import org.apache.qpid.proton.engine.Transport; import org.apache.qpid.proton.engine.TransportException; -import org.apache.qpid.proton.engine.impl.TransportFactory; class ConnectorImpl<C> implements Connector<C> { http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-j/src/main/java/org/apache/qpid/proton/driver/impl/DriverFactoryImpl.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/driver/impl/DriverFactoryImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/driver/impl/DriverFactoryImpl.java deleted file mode 100644 index 45db9c8..0000000 --- a/proton-j/src/main/java/org/apache/qpid/proton/driver/impl/DriverFactoryImpl.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.proton.driver.impl; - -import java.io.IOException; - -import org.apache.qpid.proton.ProtonFactoryImpl; -import org.apache.qpid.proton.driver.Driver; -import org.apache.qpid.proton.driver.DriverFactory; - -public class DriverFactoryImpl extends ProtonFactoryImpl implements DriverFactory -{ - @Override - public Driver createDriver() throws IOException - { - return new DriverImpl(); - } -} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-j/src/main/java/org/apache/qpid/proton/driver/impl/DriverImpl.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/driver/impl/DriverImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/driver/impl/DriverImpl.java index ade992e..bfdd017 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/driver/impl/DriverImpl.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/driver/impl/DriverImpl.java @@ -53,7 +53,7 @@ public class DriverImpl implements Driver private Queue<ConnectorImpl> _selectedConnectors = new ArrayDeque<ConnectorImpl>(); private Queue<ListenerImpl> _selectedListeners = new ArrayDeque<ListenerImpl>(); - DriverImpl() throws IOException + public DriverImpl() throws IOException { _selector = Selector.open(); } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-j/src/main/java/org/apache/qpid/proton/engine/Collector.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/Collector.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/Collector.java index c2c9765..f9e6fe5 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/engine/Collector.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/Collector.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.proton.engine; +import org.apache.qpid.proton.engine.impl.CollectorImpl; /** * Collector @@ -29,6 +30,13 @@ package org.apache.qpid.proton.engine; public interface Collector { + public static final class Factory + { + public static Collector create() { + return new CollectorImpl(); + } + } + Event peek(); void pop(); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-j/src/main/java/org/apache/qpid/proton/engine/Connection.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/Connection.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/Connection.java index 3df17cf..5fe66d5 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/engine/Connection.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/Connection.java @@ -24,6 +24,8 @@ import java.util.EnumSet; import java.util.Map; import org.apache.qpid.proton.amqp.Symbol; +import org.apache.qpid.proton.engine.impl.ConnectionImpl; + /** * Maintains lists of sessions, links and deliveries in a state @@ -36,6 +38,13 @@ import org.apache.qpid.proton.amqp.Symbol; public interface Connection extends Endpoint { + public static final class Factory + { + public static Connection create() { + return new ConnectionImpl(); + } + } + /** * Returns a newly created session * @@ -81,6 +90,8 @@ public interface Connection extends Endpoint public void setHostname(String hostname); + public String getHostname(); + public String getRemoteContainer(); public String getRemoteHostname(); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-j/src/main/java/org/apache/qpid/proton/engine/Engine.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/Engine.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/Engine.java new file mode 100644 index 0000000..3bd46ce --- /dev/null +++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/Engine.java @@ -0,0 +1,60 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.proton.engine; + +/** + * Engine + * + */ + +public final class Engine +{ + + private Engine() + { + } + + public static Collector collector() + { + return Collector.Factory.create(); + } + + public static Connection connection() + { + return Connection.Factory.create(); + } + + public static Transport transport() + { + return Transport.Factory.create(); + } + + public static SslDomain sslDomain() + { + return SslDomain.Factory.create(); + } + + public static SslPeerDetails sslPeerDetails(String hostname, int port) + { + return SslPeerDetails.Factory.create(hostname, port); + } + +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-j/src/main/java/org/apache/qpid/proton/engine/EngineFactory.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/EngineFactory.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/EngineFactory.java deleted file mode 100644 index 5cb3ce9..0000000 --- a/proton-j/src/main/java/org/apache/qpid/proton/engine/EngineFactory.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpid.proton.engine; - -import org.apache.qpid.proton.ProtonFactory; - -public interface EngineFactory extends ProtonFactory -{ - Connection createConnection(); - Transport createTransport(); - SslDomain createSslDomain(); - SslPeerDetails createSslPeerDetails(String hostname, int port); -} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-j/src/main/java/org/apache/qpid/proton/engine/Event.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/Event.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/Event.java index 14c0dc7..75ba56c 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/engine/Event.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/Event.java @@ -29,19 +29,38 @@ package org.apache.qpid.proton.engine; public interface Event { public enum Category { - PROTOCOL; + CONNECTION, + SESSION, + LINK, + DELIVERY, + TRANSPORT; } public enum Type { - CONNECTION_REMOTE_STATE(Category.PROTOCOL, 1), - CONNECTION_LOCAL_STATE(Category.PROTOCOL, 2), - SESSION_REMOTE_STATE(Category.PROTOCOL, 3), - SESSION_LOCAL_STATE(Category.PROTOCOL, 4), - LINK_REMOTE_STATE(Category.PROTOCOL, 5), - LINK_LOCAL_STATE(Category.PROTOCOL, 6), - LINK_FLOW(Category.PROTOCOL, 7), - DELIVERY(Category.PROTOCOL, 8), - TRANSPORT(Category.PROTOCOL, 9); + CONNECTION_INIT(Category.CONNECTION, 1), + CONNECTION_OPEN(Category.CONNECTION, 2), + CONNECTION_REMOTE_OPEN(Category.CONNECTION, 3), + CONNECTION_CLOSE(Category.CONNECTION, 4), + CONNECTION_REMOTE_CLOSE(Category.CONNECTION, 5), + CONNECTION_FINAL(Category.CONNECTION, 6), + + SESSION_INIT(Category.SESSION, 1), + SESSION_OPEN(Category.SESSION, 2), + SESSION_REMOTE_OPEN(Category.SESSION, 3), + SESSION_CLOSE(Category.SESSION, 4), + SESSION_REMOTE_CLOSE(Category.SESSION, 5), + SESSION_FINAL(Category.SESSION, 6), + + LINK_INIT(Category.LINK, 1), + LINK_OPEN(Category.LINK, 2), + LINK_REMOTE_OPEN(Category.LINK, 3), + LINK_CLOSE(Category.LINK, 4), + LINK_REMOTE_CLOSE(Category.LINK, 5), + LINK_FLOW(Category.LINK, 6), + LINK_FINAL(Category.LINK, 7), + + DELIVERY(Category.DELIVERY, 1), + TRANSPORT(Category.TRANSPORT, 1); private int _opcode; private Category _category; @@ -72,4 +91,6 @@ public interface Event Transport getTransport(); + Event copy(); + } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-j/src/main/java/org/apache/qpid/proton/engine/Sasl.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/Sasl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/Sasl.java index d4b74fc..dd63321 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/engine/Sasl.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/Sasl.java @@ -49,7 +49,8 @@ public interface Sasl PN_SASL_SYS((byte)2), /** failed due to unrecoverable error */ PN_SASL_PERM((byte)3), - PN_SASL_TEMP((byte)4); + PN_SASL_TEMP((byte)4), + PN_SASL_SKIPPED((byte)5); private final byte _code; @@ -72,6 +73,7 @@ public interface Sasl public static SaslOutcome PN_SASL_SYS = SaslOutcome.PN_SASL_SYS; public static SaslOutcome PN_SASL_PERM = SaslOutcome.PN_SASL_PERM; public static SaslOutcome PN_SASL_TEMP = SaslOutcome.PN_SASL_TEMP; + public static SaslOutcome PN_SASL_SKIPPED = SaslOutcome.PN_SASL_SKIPPED; /** * Access the current state of the layer. @@ -156,4 +158,10 @@ public interface Sasl void client(); void server(); + + /** + * Set whether servers may accept incoming connections + * that skip the SASL layer negotiation. + */ + void allowSkip(boolean allowSkip); } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-j/src/main/java/org/apache/qpid/proton/engine/SslDomain.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/SslDomain.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/SslDomain.java index 937d650..a908824 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/engine/SslDomain.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/SslDomain.java @@ -18,11 +18,21 @@ */ package org.apache.qpid.proton.engine; +import org.apache.qpid.proton.engine.impl.ssl.SslDomainImpl; + /** * I store the details used to create SSL sessions. */ public interface SslDomain { + + public static final class Factory + { + public static SslDomain create() { + return new SslDomainImpl(); + } + } + /** * Determines whether the endpoint acts as a client or server. */ http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-j/src/main/java/org/apache/qpid/proton/engine/SslPeerDetails.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/SslPeerDetails.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/SslPeerDetails.java index 884fdae..5b318fe 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/engine/SslPeerDetails.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/SslPeerDetails.java @@ -18,6 +18,8 @@ */ package org.apache.qpid.proton.engine; +import org.apache.qpid.proton.engine.impl.ssl.SslPeerDetailsImpl; + /** * The details of the remote peer involved in an SSL session. * @@ -28,6 +30,14 @@ package org.apache.qpid.proton.engine; */ public interface SslPeerDetails { + + public static final class Factory + { + public static SslPeerDetails create(String hostname, int port) { + return new SslPeerDetailsImpl(hostname, port); + } + } + String getHostname(); int getPort(); -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-j/src/main/java/org/apache/qpid/proton/engine/Transport.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/Transport.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/Transport.java index 53da131..ba806bd 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/engine/Transport.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/Transport.java @@ -22,6 +22,8 @@ package org.apache.qpid.proton.engine; import java.nio.ByteBuffer; +import org.apache.qpid.proton.engine.impl.TransportImpl; + /** * <p> @@ -63,6 +65,19 @@ import java.nio.ByteBuffer; */ public interface Transport extends Endpoint { + + public static final class Factory + { + public static Transport create() { + return new TransportImpl(); + } + } + + public static final int TRACE_OFF = 0; + public static final int TRACE_RAW = 1; + public static final int TRACE_FRM = 2; + public static final int TRACE_DRV = 4; + public static final int DEFAULT_MAX_FRAME_SIZE = -1; /** the lower bound for the agreed maximum frame size (in bytes). */ @@ -70,7 +85,10 @@ public interface Transport extends Endpoint public int SESSION_WINDOW = 16*1024; public int END_OF_STREAM = -1; + public void trace(int levels); + public void bind(Connection connection); + public void unbind(); public int capacity(); public ByteBuffer tail(); @@ -83,6 +101,8 @@ public interface Transport extends Endpoint public void pop(int bytes); public void close_head(); + public boolean isClosed(); + /** * Processes the provided input. * @@ -156,7 +176,15 @@ public interface Transport extends Endpoint */ void outputConsumed(); - Sasl sasl(); + /** + * Signal the transport to expect SASL frames used to establish a SASL layer prior to + * performing the AMQP protocol version negotiation. This must first be performed before + * the transport is used for processing. Subsequent invocations will return the same + * {@link Sasl} object. + * + * @throws IllegalStateException if transport processing has already begun prior to initial invocation + */ + Sasl sasl() throws IllegalStateException; /** * Wrap this transport's output and input to apply SSL encryption and decryption respectively. http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/CollectorImpl.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/CollectorImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/CollectorImpl.java index 344597c..e222819 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/CollectorImpl.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/CollectorImpl.java @@ -35,25 +35,55 @@ import java.util.Queue; public class CollectorImpl implements Collector { - private Queue<Event> events = new LinkedList<Event>(); + private EventImpl head; + private EventImpl tail; + private EventImpl free; public CollectorImpl() {} public Event peek() { - return events.peek(); + return head; } public void pop() { - events.poll(); + if (head != null) { + EventImpl next = head.next; + head.next = free; + free = head; + head.clear(); + head = next; + } } - public EventImpl put(Event.Type type) + public EventImpl put(Event.Type type, Object context) { - EventImpl event = new EventImpl(type); - events.add(event); + if (tail != null && tail.getType() == type && + tail.getContext() == context) { + return null; + } + + EventImpl event; + if (free == null) { + event = new EventImpl(); + } else { + event = free; + free = free.next; + event.next = null; + } + + event.init(type, context); + + if (head == null) { + head = event; + tail = event; + } else { + tail.next = event; + tail = event; + } + return event; } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ConnectionImpl.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ConnectionImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ConnectionImpl.java index 6a27103..736d93c 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ConnectionImpl.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ConnectionImpl.java @@ -52,6 +52,7 @@ public class ConnectionImpl extends EndpointImpl implements ProtonJConnection private TransportImpl _transport; private DeliveryImpl _transportWorkHead; private DeliveryImpl _transportWorkTail; + private int _transportWorkSize = 0; private String _localContainerId = ""; private String _localHostname = ""; private String _remoteContainer; @@ -182,16 +183,30 @@ public class ConnectionImpl extends EndpointImpl implements ProtonJConnection return this; } - public void free() - { - super.free(); - for(Session session : _sessions) - { + @Override + void postFinal() { + put(Event.Type.CONNECTION_FINAL, this); + } + + @Override + void doFree() { + for(Session session : _sessions) { session.free(); } _sessions = null; } + void modifyEndpoints() { + if (_sessions != null) { + for (SessionImpl ssn: _sessions) { + ssn.modifyEndpoints(); + } + } + if (!freed) { + modified(); + } + } + void handleOpen(Open open) { // TODO - store state @@ -201,10 +216,7 @@ public class ConnectionImpl extends EndpointImpl implements ProtonJConnection setRemoteDesiredCapabilities(open.getDesiredCapabilities()); setRemoteOfferedCapabilities(open.getOfferedCapabilities()); setRemoteProperties(open.getProperties()); - EventImpl ev = put(Event.Type.CONNECTION_REMOTE_STATE); - if (ev != null) { - ev.init(this); - } + put(Event.Type.CONNECTION_REMOTE_OPEN, this); } @@ -489,6 +501,10 @@ public class ConnectionImpl extends EndpointImpl implements ProtonJConnection return _transportWorkHead; } + int getTransportWorkSize() { + return _transportWorkSize; + } + public void removeTransportWork(DeliveryImpl delivery) { if (!delivery._transportWork) return; @@ -517,6 +533,7 @@ public class ConnectionImpl extends EndpointImpl implements ProtonJConnection } delivery._transportWork = false; + _transportWorkSize--; } void addTransportWork(DeliveryImpl delivery) @@ -538,6 +555,7 @@ public class ConnectionImpl extends EndpointImpl implements ProtonJConnection } delivery._transportWork = true; + _transportWorkSize++; } void workUpdate(DeliveryImpl delivery) @@ -571,23 +589,40 @@ public class ConnectionImpl extends EndpointImpl implements ProtonJConnection public void collect(Collector collector) { _collector = (CollectorImpl) collector; + + put(Event.Type.CONNECTION_INIT, this); + + LinkNode<SessionImpl> ssn = _sessionHead; + while (ssn != null) { + put(Event.Type.SESSION_INIT, ssn.getValue()); + ssn = ssn.getNext(); + } + + LinkNode<LinkImpl> lnk = _linkHead; + while (lnk != null) { + put(Event.Type.LINK_INIT, lnk.getValue()); + lnk = lnk.getNext(); + } } - EventImpl put(Event.Type type) + EventImpl put(Event.Type type, Object context) { if (_collector != null) { - return _collector.put(type); + return _collector.put(type, context); } else { return null; } } @Override - protected void localStateChanged() + void localOpen() { - EventImpl ev = put(Event.Type.CONNECTION_LOCAL_STATE); - if (ev != null) { - ev.init(this); - } + put(Event.Type.CONNECTION_OPEN, this); + } + + @Override + void localClose() + { + put(Event.Type.CONNECTION_CLOSE, this); } } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EndpointImpl.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EndpointImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EndpointImpl.java index 72ae1a6..b97793a 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EndpointImpl.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EndpointImpl.java @@ -37,7 +37,27 @@ public abstract class EndpointImpl implements ProtonJEndpoint private EndpointImpl _transportPrev; private Object _context; - protected abstract void localStateChanged(); + private int refcount = 1; + boolean freed = false; + + void incref() { + refcount++; + } + + void decref() { + refcount--; + if (refcount == 0) { + postFinal(); + } else if (refcount < 0) { + throw new IllegalStateException(); + } + } + + abstract void postFinal(); + + abstract void localOpen(); + + abstract void localClose(); public void open() { @@ -49,7 +69,7 @@ public abstract class EndpointImpl implements ProtonJEndpoint // TODO case UNINITIALIZED: _localState = EndpointState.ACTIVE; - localStateChanged(); + localOpen(); } modified(); } @@ -65,7 +85,7 @@ public abstract class EndpointImpl implements ProtonJEndpoint // TODO case ACTIVE: _localState = EndpointState.CLOSED; - localStateChanged(); + localClose(); } modified(); } @@ -129,9 +149,9 @@ public abstract class EndpointImpl implements ProtonJEndpoint if (emit) { ConnectionImpl conn = getConnectionImpl(); - EventImpl ev = conn.put(Event.Type.TRANSPORT); - if (ev != null) { - ev.init(conn); + TransportImpl trans = conn.getTransport(); + if (trans != null) { + conn.put(Event.Type.TRANSPORT, trans); } } } @@ -162,16 +182,15 @@ public abstract class EndpointImpl implements ProtonJEndpoint return _transportPrev; } - public void free() + abstract void doFree(); + + final public void free() { - if(_transportNext != null) - { - _transportNext.setTransportPrev(_transportPrev); - } - if(_transportPrev != null) - { - _transportPrev.setTransportNext(_transportNext); - } + if (freed) return; + freed = true; + + doFree(); + decref(); } void setTransportNext(EndpointImpl transportNext) @@ -194,9 +213,4 @@ public abstract class EndpointImpl implements ProtonJEndpoint _context = context; } - @Override - public String toString() - { - return "EndpointImpl(" + System.identityHashCode(this) + ") [_localState=" + _localState + ", _remoteState=" + _remoteState + ", _localError=" + _localError + ", _remoteError=" + _remoteError + "]"; - } } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EngineFactoryImpl.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EngineFactoryImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EngineFactoryImpl.java deleted file mode 100644 index 8ce9bba..0000000 --- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EngineFactoryImpl.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpid.proton.engine.impl; - -import org.apache.qpid.proton.ProtonFactoryImpl; -import org.apache.qpid.proton.engine.EngineFactory; -import org.apache.qpid.proton.engine.ProtonJConnection; -import org.apache.qpid.proton.engine.ProtonJSslDomain; -import org.apache.qpid.proton.engine.ProtonJSslPeerDetails; -import org.apache.qpid.proton.engine.ProtonJTransport; -import org.apache.qpid.proton.engine.impl.ssl.SslDomainImpl; -import org.apache.qpid.proton.engine.impl.ssl.SslPeerDetailsImpl; - -public class EngineFactoryImpl extends ProtonFactoryImpl implements EngineFactory -{ - @SuppressWarnings("deprecation") // TODO remove once the constructor is made non-public (and therefore non-deprecated) - @Override - public ProtonJConnection createConnection() - { - return new ConnectionImpl(); - } - - @SuppressWarnings("deprecation") // TODO remove once the constructor is made non-public (and therefore non-deprecated) - @Override - public ProtonJTransport createTransport() - { - return new TransportImpl(); - } - - @SuppressWarnings("deprecation") // TODO remove once the constructor is made non-public (and therefore non-deprecated) - @Override - public ProtonJSslDomain createSslDomain() - { - return new SslDomainImpl(); - } - - @SuppressWarnings("deprecation") // TODO remove once the constructor is made non-public (and therefore non-deprecated) - @Override - public ProtonJSslPeerDetails createSslPeerDetails(String hostname, int port) - { - return new SslPeerDetailsImpl(hostname, port); - } -} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EventImpl.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EventImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EventImpl.java index 7d57909..1dcebe4 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EventImpl.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EventImpl.java @@ -36,15 +36,24 @@ class EventImpl implements Event { Type type; - Connection connection; - Session session; - Link link; - Delivery delivery; - Transport transport; + Object context; + EventImpl next; - EventImpl(Type type) + EventImpl() + { + this.type = null; + } + + void init(Event.Type type, Object context) { this.type = type; + this.context = context; + } + + void clear() + { + type = null; + context = null; } public Category getCategory() @@ -57,58 +66,88 @@ class EventImpl implements Event return type; } + public Object getContext() + { + return context; + } + public Connection getConnection() { - return connection; + switch (type.getCategory()) { + case CONNECTION: + return (Connection) context; + case TRANSPORT: + Transport transport = getTransport(); + if (transport == null) { + return null; + } + return ((TransportImpl) transport).getConnectionImpl(); + default: + Session ssn = getSession(); + if (ssn == null) { + return null; + } + return ssn.getConnection(); + } } public Session getSession() { - return session; + switch (type.getCategory()) { + case SESSION: + return (Session) context; + default: + Link link = getLink(); + if (link == null) { + return null; + } + return link.getSession(); + } } public Link getLink() { - return link; + switch (type.getCategory()) { + case LINK: + return (Link) context; + default: + Delivery dlv = getDelivery(); + if (dlv == null) { + return null; + } + return dlv.getLink(); + } } public Delivery getDelivery() { - return delivery; + switch (type.getCategory()) { + case DELIVERY: + return (Delivery) context; + default: + return null; + } } public Transport getTransport() { - return transport; - } - - void init(Transport transport) - { - this.transport = transport; - } - - void init(Connection connection) - { - this.connection = connection; - init(((ConnectionImpl) connection).getTransport()); + switch (type.getCategory()) { + case TRANSPORT: + return (Transport) context; + default: + return null; + } } - - void init(Session session) + public Event copy() { - this.session = session; - init(session.getConnection()); + EventImpl newEvent = new EventImpl(); + newEvent.init(type, context); + return newEvent; } - void init(Link link) + @Override + public String toString() { - this.link = link; - init(link.getSession()); + return "EventImpl{" + "type=" + type + ", context=" + context + '}'; } - - void init(Delivery delivery) - { - this.delivery = delivery; - init(delivery.getLink()); - } - } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameHandler.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameHandler.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameHandler.java index 9b1413d..542466a 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameHandler.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameHandler.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.proton.engine.impl; +import org.apache.qpid.proton.engine.TransportException; import org.apache.qpid.proton.framing.TransportFrame; public interface FrameHandler @@ -31,7 +32,7 @@ public interface FrameHandler */ boolean handleFrame(TransportFrame frame); - void closed(); + void closed(TransportException error); /** * Returns whether I am currently able to handle frames. http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameParser.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameParser.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameParser.java index a83f888..849a2e7 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameParser.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameParser.java @@ -40,6 +40,8 @@ class FrameParser implements TransportInput { private static final Logger TRACE_LOGGER = Logger.getLogger("proton.trace"); + private static final ByteBuffer _emptyInputBuffer = newWriteableBuffer(0); + private enum State { HEADER0, @@ -62,8 +64,9 @@ class FrameParser implements TransportInput private final FrameHandler _frameHandler; private final ByteBufferDecoder _decoder; + private final int _maxFrameSize; - private final ByteBuffer _inputBuffer; + private ByteBuffer _inputBuffer = null; private boolean _tail_closed = false; private State _state = State.HEADER0; @@ -87,11 +90,7 @@ class FrameParser implements TransportInput { _frameHandler = frameHandler; _decoder = decoder; - if (maxFrameSize > 0) { - _inputBuffer = newWriteableBuffer(maxFrameSize); - } else { - _inputBuffer = newWriteableBuffer(4*1024); - } + _maxFrameSize = maxFrameSize > 0 ? maxFrameSize : 4*1024; } private void input(ByteBuffer in) throws TransportException @@ -372,6 +371,7 @@ class FrameParser implements TransportInput _decoder.setByteBuffer(in); Object val = _decoder.readObject(); + _decoder.setByteBuffer(null); Binary payload; @@ -447,7 +447,7 @@ class FrameParser implements TransportInput state = State.ERROR; frameParsingError = new TransportException("connection aborted"); } else { - _frameHandler.closed(); + _frameHandler.closed(null); } } @@ -460,7 +460,7 @@ class FrameParser implements TransportInput if(frameParsingError != null) { _parsingError = frameParsingError; - throw frameParsingError; + _frameHandler.closed(frameParsingError); } else { @@ -475,7 +475,11 @@ class FrameParser implements TransportInput if (_tail_closed) { return Transport.END_OF_STREAM; } else { - return _inputBuffer.remaining(); + if (_inputBuffer != null) { + return _inputBuffer.remaining(); + } else { + return _maxFrameSize; + } } } @@ -483,27 +487,41 @@ class FrameParser implements TransportInput public ByteBuffer tail() { if (_tail_closed) { - if (_parsingError != null) { - throw new TransportException(_parsingError.getMessage()); - } else { - throw new TransportException("tail closed"); - } + throw new TransportException("tail closed"); + } + + if (_inputBuffer == null) { + _inputBuffer = newWriteableBuffer(_maxFrameSize); } + return _inputBuffer; } @Override public void process() throws TransportException { - _inputBuffer.flip(); - - try + if (_inputBuffer != null) { - input(_inputBuffer); + _inputBuffer.flip(); + + try + { + input(_inputBuffer); + } + finally + { + if (_inputBuffer.hasRemaining()) { + _inputBuffer.compact(); + } else if (_inputBuffer.capacity() > TransportImpl.BUFFER_RELEASE_THRESHOLD) { + _inputBuffer = null; + } else { + _inputBuffer.clear(); + } + } } - finally + else { - _inputBuffer.compact(); + input(_emptyInputBuffer); } } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameWriter.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameWriter.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameWriter.java index bacd638..fce5f2d 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameWriter.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameWriter.java @@ -45,15 +45,15 @@ class FrameWriter private WritableBuffer _buffer; private int _maxFrameSize; private byte _frameType; - private ProtocolTracer _protocolTracer; - private Object _logCtx; + final private Ref<ProtocolTracer> _protocolTracer; + private TransportImpl _transport; private int _frameStart = 0; private int _payloadStart; private int _performativeSize; FrameWriter(EncoderImpl encoder, int maxFrameSize, byte frameType, - ProtocolTracer protocolTracer, Object logCtx) + Ref<ProtocolTracer> protocolTracer, TransportImpl transport) { _encoder = encoder; _bbuf = ByteBuffer.allocate(1024); @@ -62,7 +62,7 @@ class FrameWriter _maxFrameSize = maxFrameSize; _frameType = frameType; _protocolTracer = protocolTracer; - _logCtx = logCtx; + _transport = transport; } void setMaxFrameSize(int maxFrameSize) @@ -155,11 +155,12 @@ class FrameWriter // code, further refactor will fix this if (_frameType == AMQP_FRAME_TYPE) { TransportFrame frame = new TransportFrame(channel, (FrameBody) frameBody, Binary.create(originalPayload)); - TransportImpl.log(_logCtx, TransportImpl.OUTGOING, frame); + _transport.log(TransportImpl.OUTGOING, frame); - if( _protocolTracer!=null ) + ProtocolTracer tracer = _protocolTracer.get(); + if(tracer != null) { - _protocolTracer.sentFrame(frame); + tracer.sentFrame(frame); } } @@ -191,6 +192,11 @@ class FrameWriter writeFrame(0, frameBody, null, null); } + boolean isFull() { + // XXX: this should probably be tunable + return _bbuf.position() > 64*1024; + } + int readBytes(ByteBuffer dst) { ByteBuffer src = _bbuf.duplicate(); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java index dda2171..37433d5 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java @@ -53,14 +53,17 @@ public abstract class LinkImpl extends EndpointImpl implements Link private ReceiverSettleMode _remoteReceiverSettleMode; - private final LinkNode<LinkImpl> _node; + private LinkNode<LinkImpl> _node; private boolean _drain; LinkImpl(SessionImpl session, String name) { _session = session; + _session.incref(); _name = name; - _node = session.getConnectionImpl().addLinkEndpoint(this); + ConnectionImpl conn = session.getConnectionImpl(); + _node = conn.addLinkEndpoint(this); + conn.put(Event.Type.LINK_INIT, this); } @@ -103,11 +106,21 @@ public abstract class LinkImpl extends EndpointImpl implements Link } } - public void free() + @Override + void postFinal() { + _session.getConnectionImpl().put(Event.Type.LINK_FINAL, this); + _session.decref(); + } + + @Override + void doFree() { - super.free(); _session.getConnectionImpl().removeLinkEndpoint(_node); - //TODO. + _node = null; + } + + void modifyEndpoints() { + modified(); } public void remove(DeliveryImpl delivery) @@ -375,11 +388,14 @@ public abstract class LinkImpl extends EndpointImpl implements Link } @Override - protected void localStateChanged() + void localOpen() { - EventImpl ev = getConnectionImpl().put(Event.Type.LINK_LOCAL_STATE); - if (ev != null) { - ev.init(this); - } + getConnectionImpl().put(Event.Type.LINK_OPEN, this); + } + + @Override + void localClose() + { + getConnectionImpl().put(Event.Type.LINK_CLOSE, this); } } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ReceiverImpl.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ReceiverImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ReceiverImpl.java index 89b6a87..defb78b 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ReceiverImpl.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ReceiverImpl.java @@ -96,12 +96,11 @@ public class ReceiverImpl extends LinkImpl implements Receiver return consumed; } - public void free() + @Override + void doFree() { getSession().freeReceiver(this); - - super.free(); - //TODO. + super.doFree(); } boolean hasIncoming() http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/Ref.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/Ref.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/Ref.java new file mode 100644 index 0000000..01e3a35 --- /dev/null +++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/Ref.java @@ -0,0 +1,46 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.proton.engine.impl; + + +/** + * Ref + * + */ + +class Ref<T> +{ + + T value; + + public Ref(T initial) { + value = initial; + } + + public T get() { + return value; + } + + public void set(T value) { + this.value = value; + } + +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslImpl.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslImpl.java index 700bdf6..d587abb 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslImpl.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslImpl.java @@ -29,6 +29,7 @@ import java.nio.ByteBuffer; import java.util.logging.Level; import java.util.logging.Logger; +import org.apache.qpid.proton.ProtonUnsupportedOperationException; import org.apache.qpid.proton.amqp.Binary; import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.security.SaslChallenge; @@ -53,6 +54,8 @@ public class SaslImpl implements Sasl, SaslFrameBody.SaslFrameBodyHandler<Void>, private final DecoderImpl _decoder = new DecoderImpl(); private final EncoderImpl _encoder = new EncoderImpl(_decoder); + private final TransportImpl _transport; + private boolean _tail_closed = false; private final ByteBuffer _inputBuffer; private boolean _head_closed = false; @@ -86,14 +89,15 @@ public class SaslImpl implements Sasl, SaslFrameBody.SaslFrameBodyHandler<Void>, * returned by {@link SaslTransportWrapper#getInputBuffer()} and * {@link SaslTransportWrapper#getOutputBuffer()}. */ - SaslImpl(int maxFrameSize) + SaslImpl(TransportImpl transport, int maxFrameSize) { + _transport = transport; _inputBuffer = newWriteableBuffer(maxFrameSize); _outputBuffer = newWriteableBuffer(maxFrameSize); AMQPDefinedTypes.registerAllTypes(_decoder,_encoder); _frameParser = new SaslFrameParser(this, _decoder); - _frameWriter = new FrameWriter(_encoder, maxFrameSize, FrameWriter.SASL_FRAME_TYPE, null, this); + _frameWriter = new FrameWriter(_encoder, maxFrameSize, FrameWriter.SASL_FRAME_TYPE, null, _transport); } @Override @@ -460,6 +464,13 @@ public class SaslImpl implements Sasl, SaslFrameBody.SaslFrameBodyHandler<Void>, _role = Role.SERVER; } + @Override + public void allowSkip(boolean allowSkip) + { + //TODO: implement + throw new ProtonUnsupportedOperationException(); + } + public TransportWrapper wrap(final TransportInput input, final TransportOutput output) { return new SaslTransportWrapper(input, output); @@ -565,6 +576,9 @@ public class SaslImpl implements Sasl, SaslFrameBody.SaslFrameBodyHandler<Void>, _tail_closed = true; if (isInputInSaslMode()) { _head_closed = true; + _underlyingInput.close_tail(); + } else { + _underlyingInput.close_tail(); } } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SenderImpl.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SenderImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SenderImpl.java index 1ca8b52..38fb5f6 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SenderImpl.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SenderImpl.java @@ -63,11 +63,11 @@ public class SenderImpl extends LinkImpl implements Sender //TODO. } - public void free() + @Override + void doFree() { getSession().freeSender(this); - super.free(); - + super.doFree(); } @Override http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SessionImpl.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SessionImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SessionImpl.java index f95df27..57010d2 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SessionImpl.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SessionImpl.java @@ -45,7 +45,9 @@ public class SessionImpl extends EndpointImpl implements ProtonJSession SessionImpl(ConnectionImpl connection) { _connection = connection; + _connection.incref(); _node = _connection.addSessionEndpoint(this); + _connection.put(Event.Type.SESSION_INIT, this); } public SenderImpl sender(String name) @@ -90,25 +92,38 @@ public class SessionImpl extends EndpointImpl implements ProtonJSession return getConnectionImpl(); } - public void free() - { - super.free(); + @Override + void postFinal() { + _connection.put(Event.Type.SESSION_FINAL, this); + _connection.decref(); + } + @Override + void doFree() { _connection.removeSessionEndpoint(_node); _node = null; - for(SenderImpl sender : _senders.values()) - { + for(SenderImpl sender : _senders.values()) { sender.free(); } _senders.clear(); - for(ReceiverImpl receiver : _receivers.values()) - { + for(ReceiverImpl receiver : _receivers.values()) { receiver.free(); } _receivers.clear(); } + void modifyEndpoints() { + for (SenderImpl snd : _senders.values()) { + snd.modifyEndpoints(); + } + + for (ReceiverImpl rcv : _receivers.values()) { + rcv.modifyEndpoints(); + } + modified(); + } + TransportSession getTransportSession() { return _transportSession; @@ -184,11 +199,14 @@ public class SessionImpl extends EndpointImpl implements ProtonJSession } @Override - protected void localStateChanged() + void localOpen() { - EventImpl ev = getConnectionImpl().put(Event.Type.SESSION_LOCAL_STATE); - if (ev != null) { - ev.init(this); - } + getConnectionImpl().put(Event.Type.SESSION_OPEN, this); + } + + @Override + void localClose() + { + getConnectionImpl().put(Event.Type.SESSION_CLOSE, this); } } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportFactory.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportFactory.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportFactory.java deleted file mode 100644 index ddf2170..0000000 --- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportFactory.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.proton.engine.impl; - -import org.apache.qpid.proton.engine.Connection; -import org.apache.qpid.proton.engine.Transport; - -public abstract class TransportFactory -{ - public abstract Transport transport(Connection conn); - - public static TransportFactory getDefaultTransportFactory() - { - return new TransportFactoryImpl(); - } -} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportFactoryImpl.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportFactoryImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportFactoryImpl.java deleted file mode 100644 index f685575..0000000 --- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportFactoryImpl.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.proton.engine.impl; - -import org.apache.qpid.proton.engine.Connection; -import org.apache.qpid.proton.engine.Transport; - -class TransportFactoryImpl extends TransportFactory -{ - TransportFactoryImpl() - { - } - - @Override - @SuppressWarnings("deprecation") // TODO remove once the constructor is made non-public (and therefore non-deprecated) - public Transport transport(Connection c) - { - TransportImpl t = new TransportImpl(); - t.bind(c); - return t; - } - -} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
