Update with merge of latest proton codebase and checked against latest emscripten incoming branch
git-svn-id: https://svn.apache.org/repos/asf/qpid/proton/branches/fadams-javascript-binding@1622849 13f79535-47bb-0310-9956-ffa450edef68 Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/1c2f4894 Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/1c2f4894 Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/1c2f4894 Branch: refs/heads/master Commit: 1c2f4894270775269abb6e8303e75684331a3212 Parents: 4a78327 Author: fadams <fadams@unknown> Authored: Sat Sep 6 11:23:10 2014 +0000 Committer: fadams <fadams@unknown> Committed: Sat Sep 6 11:23:10 2014 +0000 ---------------------------------------------------------------------- CMakeLists.txt | 14 + DEVELOPERS | 25 +- config.bat.in | 66 + config.sh | 79 -- config.sh.in | 61 + contrib/proton-hawtdispatch/pom.xml | 61 + .../hawtdispatch/api/AmqpConnectOptions.java | 228 ++++ .../proton/hawtdispatch/api/AmqpConnection.java | 201 ++++ .../hawtdispatch/api/AmqpDeliveryListener.java | 32 + .../hawtdispatch/api/AmqpEndpointBase.java | 158 +++ .../qpid/proton/hawtdispatch/api/AmqpLink.java | 27 + .../proton/hawtdispatch/api/AmqpReceiver.java | 141 +++ .../proton/hawtdispatch/api/AmqpSender.java | 227 ++++ .../proton/hawtdispatch/api/AmqpSession.java | 141 +++ .../qpid/proton/hawtdispatch/api/Callback.java | 29 + .../hawtdispatch/api/ChainedCallback.java | 37 + .../hawtdispatch/api/DeliveryAttachment.java | 27 + .../qpid/proton/hawtdispatch/api/Future.java | 31 + .../hawtdispatch/api/MessageDelivery.java | 226 ++++ .../qpid/proton/hawtdispatch/api/Promise.java | 107 ++ .../qpid/proton/hawtdispatch/api/QoS.java | 26 + .../proton/hawtdispatch/api/TransportState.java | 29 + .../proton/hawtdispatch/impl/AmqpHeader.java | 85 ++ .../proton/hawtdispatch/impl/AmqpListener.java | 71 ++ .../hawtdispatch/impl/AmqpProtocolCodec.java | 109 ++ .../proton/hawtdispatch/impl/AmqpTransport.java | 587 +++++++++ .../qpid/proton/hawtdispatch/impl/Defer.java | 27 + .../hawtdispatch/impl/EndpointContext.java | 76 ++ .../qpid/proton/hawtdispatch/impl/Support.java | 41 + .../qpid/proton/hawtdispatch/impl/Watch.java | 26 + .../proton/hawtdispatch/impl/WatchBase.java | 54 + .../proton/hawtdispatch/api/SampleTest.java | 292 +++++ .../hawtdispatch/test/MessengerServer.java | 135 +++ contrib/proton-jms/pom.xml | 50 + .../jms/AMQPNativeInboundTransformer.java | 40 + .../jms/AMQPNativeOutboundTransformer.java | 103 ++ .../proton/jms/AMQPRawInboundTransformer.java | 47 + .../proton/jms/AutoOutboundTransformer.java | 46 + .../apache/qpid/proton/jms/EncodedMessage.java | 75 ++ .../qpid/proton/jms/InboundTransformer.java | 314 +++++ .../jms/JMSMappingInboundTransformer.java | 102 ++ .../jms/JMSMappingOutboundTransformer.java | 246 ++++ .../org/apache/qpid/proton/jms/JMSVendor.java | 45 + .../qpid/proton/jms/OutboundTransformer.java | 52 + examples/CMakeLists.txt | 1 + examples/messenger/java/recv | 2 +- examples/messenger/java/send | 2 +- .../org/apache/qpid/proton/example/Send.java | 10 +- examples/messenger/javascript/send.html | 12 +- examples/messenger/perl/recv.pl | 34 +- examples/messenger/perl/send.pl | 16 +- examples/messenger/perl/server.pl | 6 +- proton-c/CMakeLists.txt | 49 +- proton-c/bindings/CMakeLists.txt | 2 + proton-c/bindings/javascript/CMakeLists.txt | 10 +- proton-c/bindings/perl/CMakeLists.txt | 4 +- .../bindings/perl/lib/qpid/proton/Constants.pm | 7 +- proton-c/bindings/perl/lib/qpid/proton/Data.pm | 93 +- .../bindings/perl/lib/qpid/proton/Message.pm | 28 +- proton-c/bindings/perl/lib/qpid/proton/utils.pm | 31 + proton-c/bindings/perl/lib/qpid_proton.pm | 1 + proton-c/bindings/perl/perl.i | 7 - proton-c/bindings/php/CMakeLists.txt | 3 + proton-c/bindings/php/proton.php | 2 +- proton-c/bindings/python/CMakeLists.txt | 90 +- proton-c/bindings/python/cproton.i | 379 ++++++ proton-c/bindings/python/proton.py | 242 ++-- proton-c/bindings/python/python.i | 378 ------ proton-c/bindings/python/setup.py.in | 107 ++ proton-c/bindings/ruby/CMakeLists.txt | 4 +- proton-c/bindings/ruby/lib/qpid_proton.rb | 1 + .../bindings/ruby/lib/qpid_proton/version.rb | 29 + proton-c/bindings/ruby/qpid_proton.gemspec | 1 + proton-c/bindings/ruby/ruby.i | 1 - proton-c/docs/man/CMakeLists.txt | 2 +- proton-c/docs/man/proton-dump.1 | 19 + proton-c/include/proton/buffer.h | 6 + proton-c/include/proton/cproton.i | 28 +- proton-c/include/proton/event.h | 153 ++- proton-c/include/proton/io.h | 2 + proton-c/include/proton/object.h | 11 +- proton-c/include/proton/sasl.h | 15 +- proton-c/include/proton/selector.h | 4 +- proton-c/include/proton/terminus.h | 8 +- proton-c/include/proton/transport.h | 13 +- proton-c/include/proton/types.h | 6 +- proton-c/src/buffer.c | 12 + proton-c/src/codec/codec.c | 48 +- proton-c/src/codec/data.h | 34 +- proton-c/src/codec/decoder.c | 2 +- proton-c/src/codec/encoder.c | 2 +- proton-c/src/dispatch_actions.h | 45 + proton-c/src/dispatcher/dispatcher.c | 63 +- proton-c/src/dispatcher/dispatcher.h | 21 +- proton-c/src/engine/engine-internal.h | 77 +- proton-c/src/engine/engine.c | 186 +-- proton-c/src/engine/event.c | 249 ++-- proton-c/src/engine/event.h | 9 +- proton-c/src/error.c | 2 +- proton-c/src/message/message.c | 25 +- proton-c/src/messenger/messenger.c | 137 ++- proton-c/src/messenger/store.c | 22 +- proton-c/src/messenger/subscription.c | 2 +- proton-c/src/messenger/transform.c | 4 +- proton-c/src/object/object.c | 104 +- proton-c/src/parser.c | 2 +- proton-c/src/platform.h | 2 + proton-c/src/posix/driver.c | 24 +- proton-c/src/posix/io.c | 12 +- proton-c/src/posix/selector.c | 4 +- proton-c/src/protocol.h.py | 98 +- proton-c/src/proton-dump.c | 28 + proton-c/src/proton.c | 4 +- proton-c/src/sasl/sasl.c | 91 +- proton-c/src/selectable.c | 2 +- proton-c/src/ssl/openssl.c | 57 +- proton-c/src/tests/object.c | 53 +- proton-c/src/tests/parse-url.c | 7 + proton-c/src/transport/transport.c | 366 +++--- proton-c/src/types.c | 14 +- proton-c/src/util.c | 22 +- proton-c/src/windows/io.c | 226 ++-- proton-c/src/windows/iocp.c | 1138 ++++++++++++++++++ proton-c/src/windows/iocp.h | 141 +++ proton-c/src/windows/selector.c | 325 +++-- proton-c/src/windows/write_pipeline.c | 312 +++++ .../java/org/apache/qpid/proton/Proton.java | 168 +-- .../org/apache/qpid/proton/ProtonFactory.java | 31 - .../apache/qpid/proton/ProtonFactoryImpl.java | 28 - .../apache/qpid/proton/ProtonFactoryLoader.java | 111 -- .../amqp/messaging/DeliveryAnnotations.java | 11 +- .../amqp/messaging/MessageAnnotations.java | 8 +- .../org/apache/qpid/proton/codec/Codec.java | 40 + .../java/org/apache/qpid/proton/codec/Data.java | 10 + .../apache/qpid/proton/codec/DataFactory.java | 28 - .../qpid/proton/codec/impl/DataFactoryImpl.java | 35 - .../org/apache/qpid/proton/driver/Driver.java | 10 + .../qpid/proton/driver/DriverFactory.java | 29 - .../qpid/proton/driver/impl/ConnectorImpl.java | 1 - .../proton/driver/impl/DriverFactoryImpl.java | 35 - .../qpid/proton/driver/impl/DriverImpl.java | 2 +- .../apache/qpid/proton/engine/Collector.java | 8 + .../apache/qpid/proton/engine/Connection.java | 11 + .../org/apache/qpid/proton/engine/Engine.java | 60 + .../qpid/proton/engine/EngineFactory.java | 29 - .../org/apache/qpid/proton/engine/Event.java | 41 +- .../org/apache/qpid/proton/engine/Sasl.java | 10 +- .../apache/qpid/proton/engine/SslDomain.java | 10 + .../qpid/proton/engine/SslPeerDetails.java | 12 +- .../apache/qpid/proton/engine/Transport.java | 30 +- .../qpid/proton/engine/impl/CollectorImpl.java | 42 +- .../qpid/proton/engine/impl/ConnectionImpl.java | 67 +- .../qpid/proton/engine/impl/EndpointImpl.java | 54 +- .../proton/engine/impl/EngineFactoryImpl.java | 59 - .../qpid/proton/engine/impl/EventImpl.java | 111 +- .../qpid/proton/engine/impl/FrameHandler.java | 3 +- .../qpid/proton/engine/impl/FrameParser.java | 58 +- .../qpid/proton/engine/impl/FrameWriter.java | 20 +- .../qpid/proton/engine/impl/LinkImpl.java | 36 +- .../qpid/proton/engine/impl/ReceiverImpl.java | 7 +- .../org/apache/qpid/proton/engine/impl/Ref.java | 46 + .../qpid/proton/engine/impl/SaslImpl.java | 18 +- .../qpid/proton/engine/impl/SenderImpl.java | 6 +- .../qpid/proton/engine/impl/SessionImpl.java | 42 +- .../proton/engine/impl/TransportFactory.java | 34 - .../engine/impl/TransportFactoryImpl.java | 41 - .../qpid/proton/engine/impl/TransportImpl.java | 298 +++-- .../qpid/proton/engine/impl/TransportLink.java | 28 +- .../engine/impl/TransportOutputAdaptor.java | 66 +- .../engine/impl/TransportOutputWriter.java | 4 +- .../proton/engine/impl/TransportSession.java | 32 +- .../impl/ssl/SimpleSslTransportWrapper.java | 33 +- .../proton/engine/impl/ssl/SslDomainImpl.java | 1 - .../engine/impl/ssl/SslPeerDetailsImpl.java | 1 - .../org/apache/qpid/proton/message/Message.java | 23 + .../qpid/proton/message/MessageFactory.java | 37 - .../proton/message/impl/MessageFactoryImpl.java | 54 - .../qpid/proton/message/impl/MessageImpl.java | 12 +- .../apache/qpid/proton/messenger/Messenger.java | 14 + .../qpid/proton/messenger/MessengerFactory.java | 28 - .../messenger/impl/MessengerFactoryImpl.java | 42 - .../proton/messenger/impl/MessengerImpl.java | 9 +- proton-j/src/main/resources/cengine.py | 140 ++- proton-j/src/main/resources/csasl.py | 10 +- .../qpid/proton/ProtonFactoryLoaderTest.java | 129 -- .../proton/engine/impl/FrameParserTest.java | 16 +- .../proton/engine/impl/TransportImplTest.java | 15 + .../engine/impl/TransportOutputAdaptorTest.java | 4 +- .../impl/ssl/SimpleSslTransportWrapperTest.java | 13 +- .../DummyProtonCFactory.java | 29 - .../DummyProtonFactory.java | 25 - .../DummyProtonJFactory.java | 29 - .../systemtests/ProtonEngineExampleTest.java | 34 +- .../proton/systemtests/ProtonFactoryTest.java | 60 - .../qpid/proton/systemtests/SimpleTest.java | 13 +- .../systemtests/engine/ConnectionTest.java | 31 +- .../engine/ProtonFactoryTestFixture.java | 54 - .../systemtests/engine/TransportTest.java | 6 +- .../org/apache/qpid/proton/InteropTest.java | 5 +- tests/python/proton_tests/common.py | 81 +- tests/python/proton_tests/engine.py | 242 +++- tests/python/proton_tests/messenger.py | 98 +- tests/python/proton_tests/sasl.py | 77 +- tests/python/proton_tests/ssl.py | 72 +- tests/python/proton_tests/transport.py | 117 +- tests/tools/apps/c/CMakeLists.txt | 2 + tests/tools/apps/c/msgr-common.h | 4 + version.txt | 2 +- 208 files changed, 10388 insertions(+), 3391 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/CMakeLists.txt b/CMakeLists.txt index a503467..7917258 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -18,6 +18,14 @@ # cmake_minimum_required (VERSION 2.6) +# Set default build type. Must come before project() which sets default to "" +set (CMAKE_BUILD_TYPE RelWithDebInfo CACHE string + "Build type: Debug, Release, RelWithDebInfo or MinSizeRel (default RelWithDebInfo)") +if (CMAKE_BUILD_TYPE MATCHES "Deb") + set (has_debug_symbols " (has debug symbols)") +endif (CMAKE_BUILD_TYPE MATCHES "Deb") +message("Build type is \"${CMAKE_BUILD_TYPE}\"${has_debug_symbols}") + option(BUILD_WITH_CXX "Compile Proton using C++" OFF) if ("${CMAKE_GENERATOR}" MATCHES "^Visual Studio") # No C99 capability, use C++ @@ -152,3 +160,9 @@ if (JAVA_FOUND AND MAVEN_EXE) else (JAVA_FOUND AND MAVEN_EXE) message (STATUS "Cannot find both Java and Maven: testing disabled for Proton-J") endif (JAVA_FOUND AND MAVEN_EXE) + +# Generate test environment settings +configure_file(${CMAKE_SOURCE_DIR}/config.sh.in + ${CMAKE_BINARY_DIR}/config.sh @ONLY) +configure_file(${CMAKE_SOURCE_DIR}/config.bat.in + ${CMAKE_BINARY_DIR}/config.bat @ONLY) http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/DEVELOPERS ---------------------------------------------------------------------- diff --git a/DEVELOPERS b/DEVELOPERS index 97cfdaf..b0e0015 100644 --- a/DEVELOPERS +++ b/DEVELOPERS @@ -5,32 +5,15 @@ DEVELOPMENT ENVIRONMENT ======================= To setup the variables for your development environment, simply source -the file $REPO/config.sh: +the file $BLDDIR/config.sh [$BLDDIR points to the proton build directory]: $ source config.sh This file sets the needed environment variables for all supported dynamic -languages (Python, Perl, Ruby, PHP) as well as for Java and for testing. It, -by default, assumes that you're building Proton in the directory: - - $REPO/build - -where $REPO points the location where the Proton Subversion or Git repo has -been checked out. - -If, however, you use a different location for your build files, then you'll want -to set the environment variable CPROTON_BUILD to point to it first. So, for -example, if you're building in: - - /home/yourname/devel/proton/cmake - -then you would have the following environment variable set: - - $ export CPROTON_BUILD=/hojme/yourname/devel/proton/cmake/proton-c - -NOTE: You need to point the environment variable to the proton-c directory under -where your build is done. +languages (Python, Perl, Ruby, PHP) as well as for Java and for testing. +You will need to have set up the build directory first with cmake before the file +will exist (see the instructions in README). MAILING LIST http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/config.bat.in ---------------------------------------------------------------------- diff --git a/config.bat.in b/config.bat.in new file mode 100644 index 0000000..a73a88e --- /dev/null +++ b/config.bat.in @@ -0,0 +1,66 @@ +REM +REM Licensed to the Apache Software Foundation (ASF) under one +REM or more contributor license agreements. See the NOTICE file +REM distributed with this work for additional information +REM regarding copyright ownership. The ASF licenses this file +REM to you under the Apache License, Version 2.0 (the +REM "License"); you may not use this file except in compliance +REM with the License. You may obtain a copy of the License at +REM +REM http://www.apache.org/licenses/LICENSE-2.0 +REM +REM Unless required by applicable law or agreed to in writing, +REM software distributed under the License is distributed on an +REM "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +REM KIND, either express or implied. See the License for the +REM specific language governing permissions and limitations +REM under the License. +REM + +REM This is a generated file and will be overwritten the next +REM time that cmake is run. + +REM This build may be one of @CMAKE_CONFIGURATION_TYPES@ +REM Choose the configuration this script should reference: +SET PROTON_BUILD_CONFIGURATION=relwithdebinfo + +REM PROTON_HOME is the root of the proton checkout +REM PROTON_BUILD is where cmake was run + +set PROTON_HOME=@CMAKE_SOURCE_DIR@ +set PROTON_BUILD=@CMAKE_BINARY_DIR@ + +set PROTON_HOME=%PROTON_HOME:/=\% +set PROTON_BUILD=%PROTON_BUILD:/=\% + +set PROTON_BINDINGS=%PROTON_BUILD%\proton-c\bindings +set PROTON_JARS=%PROTON_BUILD%\proton-j\proton-j.jar + +REM Python & Jython +set PYTHON_BINDINGS=%PROTON_BINDINGS%\python +set COMMON_PYPATH=%PROTON_HOME%\tests\python;%PROTON_HOME%\proton-c\bindings\python +set PYTHONPATH=%COMMON_PYPATH%;%PYTHON_BINDINGS% +set JYTHONPATH=%COMMON_PYPATH%;%PROTON_HOME%\proton-j\src\main\resources;%PROTON_JARS% +set CLASSPATH=%PROTON_JARS% + +REM PHP +set PHP_BINDINGS=%PROTON_BINDINGS%\php +if EXIST %PHP_BINDINGS% ( + echo include_path="%PHP_BINDINGS%;%PROTON_HOME%\proton-c\bindings\php" > %PHP_BINDINGS%\php.ini + echo extension="%PHP_BINDINGS%\cproton.so" >> %PHP_BINDINGS%\php.ini + set PHPRC=%PHP_BINDINGS%\php.ini +) + +REM Ruby +set RUBY_BINDINGS=%PROTON_BINDINGS%\ruby +set RUBYLIB=%RUBY_BINDINGS%;%PROTON_HOME%\proton-c\bindings\ruby\lib;%PROTON_HOME%\tests\ruby + +REM Perl +set PERL_BINDINGS=%PROTON_BINDINGS%\perl +set PERL5LIB=%PERL5LIB%;%PERL_BINDINGS%;%PROTON_HOME%\proton-c\bindings\perl\lib + +REM test applications +set PATH=%PATH%;%PROTON_BUILD%\tests\tools\apps\c +set PATH=%PATH%;%PROTON_HOME%\tests\tools\apps\python +set PATH=%PATH%;%PROTON_HOME%\tests\python +set PATH=%PATH%;%PROTON_BUILD%\proton-c\%PROTON_BUILD_CONFIGURATION% http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/config.sh ---------------------------------------------------------------------- diff --git a/config.sh b/config.sh deleted file mode 100755 index 90ad707..0000000 --- a/config.sh +++ /dev/null @@ -1,79 +0,0 @@ -#!/bin/bash -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -cd $(dirname ${BASH_SOURCE[0]}) > /dev/null -export PROTON_HOME=$(pwd) -cd - > /dev/null - -if [ -z "$CPROTON_BUILD" ]; then - if [ -d $PROTON_HOME/build/proton-c ]; then - PROTON_BINDINGS=$PROTON_HOME/build/proton-c/bindings - else - PROTON_BINDINGS=$PROTON_HOME/proton-c/bindings - fi - if [ -d $PROTON_HOME/build/proton-j ]; then - PROTON_JARS=$PROTON_HOME/build/proton-j/proton-api/proton-api.jar:$PROTON_HOME/build/proton-j/proton/proton-j-impl.jar - else - PROTON_JARS=$PROTON_HOME/proton-j/proton-api/proton-api.jar:$PROTON_HOME/proton-j/proton/proton-j-impl.jar - fi -else - PROTON_BINDINGS=$CPROTON_BUILD/bindings -fi - -# Python & Jython -export PYTHON_BINDINGS=$PROTON_BINDINGS/python -export COMMON_PYPATH=$PROTON_HOME/tests/python -export PYTHONPATH=$COMMON_PYPATH:$PROTON_HOME/proton-c/bindings/python:$PYTHON_BINDINGS -export JYTHONPATH=$COMMON_PYPATH:$PROTON_HOME/proton-j/proton-api/src/main/resources:$PROTON_JARS -export CLASSPATH=$PROTON_JARS - -# PHP -export PHP_BINDINGS=$PROTON_BINDINGS/php -if [ -d $PHP_BINDINGS ]; then - cat <<EOF > $PHP_BINDINGS/php.ini -include_path="$PHP_BINDINGS:$PROTON_HOME/proton-c/bindings/php" -extension="$PHP_BINDINGS/cproton.so" -EOF - export PHPRC=$PHP_BINDINGS/php.ini -fi - -# Ruby -export RUBY_BINDINGS=$PROTON_BINDINGS/ruby -export RUBYLIB=$RUBY_BINDINGS:$PROTON_HOME/proton-c/bindings/ruby/lib:$PROTON_HOME/tests/ruby - -# Perl -export PERL_BINDINGS=$PROTON_BINDINGS/perl -export PERL5LIB=$PERL5LIB:$PERL_BINDINGS:$PROTON_HOME/proton-c/bindings/perl/lib - -# test applications -if [ -d $PROTON_HOME/build/tests/tools/apps/c ]; then - export PATH="$PATH:$PROTON_HOME/build/tests/tools/apps/c" -fi -if [ -d $PROTON_HOME/tests/tools/apps/python ]; then - export PATH="$PATH:$PROTON_HOME/tests/tools/apps/python" -fi - -# test applications -export PATH="$PATH:$PROTON_HOME/tests/python" - -# can the test harness use valgrind? -if [[ -x "$(type -p valgrind)" ]] ; then - export VALGRIND=1 -fi http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/config.sh.in ---------------------------------------------------------------------- diff --git a/config.sh.in b/config.sh.in new file mode 100755 index 0000000..4b60b2f --- /dev/null +++ b/config.sh.in @@ -0,0 +1,61 @@ +#!/bin/bash +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +PROTON_HOME=@CMAKE_SOURCE_DIR@ +PROTON_BUILD=@CMAKE_BINARY_DIR@ + +PROTON_BINDINGS=$PROTON_BUILD/proton-c/bindings +PROTON_JARS=$PROTON_BUILD/proton-j/proton-j.jar + +PYTHON_BINDINGS=$PROTON_BINDINGS/python +PHP_BINDINGS=$PROTON_BINDINGS/php +RUBY_BINDINGS=$PROTON_BINDINGS/ruby +PERL_BINDINGS=$PROTON_BINDINGS/perl + +# Python & Jython +COMMON_PYPATH=$PROTON_HOME/tests/python:$PROTON_HOME/proton-c/bindings/python +export PYTHONPATH=$COMMON_PYPATH:$PYTHON_BINDINGS +export JYTHONPATH=$COMMON_PYPATH:$PROTON_HOME/proton-j/src/main/resources:$PROTON_JARS +export CLASSPATH=$PROTON_JARS + +# PHP +if [ -d $PHP_BINDINGS ]; then + cat <<EOF > $PHP_BINDINGS/php.ini +include_path="$PHP_BINDINGS:$PROTON_HOME/proton-c/bindings/php" +extension="$PHP_BINDINGS/cproton.so" +EOF + export PHPRC=$PHP_BINDINGS/php.ini +fi + +# Ruby +export RUBYLIB=$RUBY_BINDINGS:$PROTON_HOME/proton-c/bindings/ruby/lib:$PROTON_HOME/tests/ruby + +# Perl +export PERL5LIB=$PERL5LIB:$PERL_BINDINGS:$PROTON_HOME/proton-c/bindings/perl/lib + +# test applications +export PATH="$PATH:$PROTON_BUILD/tests/tools/apps/c" +export PATH="$PATH:$PROTON_HOME/tests/tools/apps/python" +export PATH="$PATH:$PROTON_HOME/tests/python" + +# can the test harness use valgrind? +if [[ -x "$(type -p valgrind)" ]] ; then + export VALGRIND=$(type -p valgrind) +fi http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/contrib/proton-hawtdispatch/pom.xml ---------------------------------------------------------------------- diff --git a/contrib/proton-hawtdispatch/pom.xml b/contrib/proton-hawtdispatch/pom.xml new file mode 100644 index 0000000..0eaa171 --- /dev/null +++ b/contrib/proton-hawtdispatch/pom.xml @@ -0,0 +1,61 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <parent> + <groupId>org.apache.qpid</groupId> + <artifactId>proton-project</artifactId> + <version>1.0-SNAPSHOT</version> + <relativePath>../..</relativePath> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>proton-hawtdispatch</artifactId> + <name>proton-hawtdispatch</name> + + <properties> + <hawtbuf-version>1.9</hawtbuf-version> + <hawtdispatch-version>1.18</hawtdispatch-version> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.qpid</groupId> + <artifactId>proton-j</artifactId> + <version>${project.parent.version}</version> + </dependency> + <dependency> + <groupId>org.fusesource.hawtdispatch</groupId> + <artifactId>hawtdispatch-transport</artifactId> + <version>${hawtdispatch-version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.fusesource.hawtbuf</groupId> + <artifactId>hawtbuf</artifactId> + <version>${hawtbuf-version}</version> + <scope>provided</scope> + </dependency> + </dependencies> + + <build> + </build> + <scm> + <url>http://svn.apache.org/viewvc/qpid/proton/</url> + </scm> + +</project> http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpConnectOptions.java ---------------------------------------------------------------------- diff --git a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpConnectOptions.java b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpConnectOptions.java new file mode 100644 index 0000000..3c3543d --- /dev/null +++ b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpConnectOptions.java @@ -0,0 +1,228 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.proton.hawtdispatch.api; + +import org.fusesource.hawtdispatch.DispatchQueue; +import org.fusesource.hawtdispatch.transport.TcpTransport; + +import javax.net.ssl.SSLContext; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.*; + +/** + * @author <a href="http://hiramchirino.com">Hiram Chirino</a> + */ +public class AmqpConnectOptions implements Cloneable { + + private static final long KEEP_ALIVE = Long.parseLong(System.getProperty("amqp.thread.keep_alive", ""+1000)); + private static final long STACK_SIZE = Long.parseLong(System.getProperty("amqp.thread.stack_size", ""+1024*512)); + private static ThreadPoolExecutor blockingThreadPool; + + public synchronized static ThreadPoolExecutor getBlockingThreadPool() { + if( blockingThreadPool == null ) { + blockingThreadPool = new ThreadPoolExecutor(0, Integer.MAX_VALUE, KEEP_ALIVE, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() { + public Thread newThread(Runnable r) { + Thread rc = new Thread(null, r, "AMQP Task", STACK_SIZE); + rc.setDaemon(true); + return rc; + } + }) { + + @Override + public void shutdown() { + // we don't ever shutdown since we are shared.. + } + + @Override + public List<Runnable> shutdownNow() { + // we don't ever shutdown since we are shared.. + return Collections.emptyList(); + } + }; + } + return blockingThreadPool; + } + public synchronized static void setBlockingThreadPool(ThreadPoolExecutor pool) { + blockingThreadPool = pool; + } + + private static final URI DEFAULT_HOST; + static { + try { + DEFAULT_HOST = new URI("tcp://localhost"); + } catch (URISyntaxException e) { + throw new RuntimeException(e); + } + } + + URI host = DEFAULT_HOST; + URI localAddress; + SSLContext sslContext; + DispatchQueue dispatchQueue; + Executor blockingExecutor; + int maxReadRate; + int maxWriteRate; + int trafficClass = TcpTransport.IPTOS_THROUGHPUT; + boolean useLocalHost; + int receiveBufferSize = 1024*64; + int sendBufferSize = 1024*64; + String localContainerId; + String remoteContainerId; + String user; + String password; + + + @Override + public AmqpConnectOptions clone() { + try { + return (AmqpConnectOptions) super.clone(); + } catch (CloneNotSupportedException e) { + throw new RuntimeException(e); + } + } + + public String getLocalContainerId() { + return localContainerId; + } + + public void setLocalContainerId(String localContainerId) { + this.localContainerId = localContainerId; + } + + public String getPassword() { + return password; + } + + public void setPassword(String password) { + this.password = password; + } + + public String getRemoteContainerId() { + return remoteContainerId; + } + + public void setRemoteContainerId(String remoteContainerId) { + this.remoteContainerId = remoteContainerId; + } + + public String getUser() { + return user; + } + + public void setUser(String user) { + this.user = user; + } + + public Executor getBlockingExecutor() { + return blockingExecutor; + } + + public void setBlockingExecutor(Executor blockingExecutor) { + this.blockingExecutor = blockingExecutor; + } + + public DispatchQueue getDispatchQueue() { + return dispatchQueue; + } + + public void setDispatchQueue(DispatchQueue dispatchQueue) { + this.dispatchQueue = dispatchQueue; + } + + public URI getLocalAddress() { + return localAddress; + } + + public void setLocalAddress(String localAddress) throws URISyntaxException { + this.setLocalAddress(new URI(localAddress)); + } + public void setLocalAddress(URI localAddress) { + this.localAddress = localAddress; + } + + public int getMaxReadRate() { + return maxReadRate; + } + + public void setMaxReadRate(int maxReadRate) { + this.maxReadRate = maxReadRate; + } + + public int getMaxWriteRate() { + return maxWriteRate; + } + + public void setMaxWriteRate(int maxWriteRate) { + this.maxWriteRate = maxWriteRate; + } + + public int getReceiveBufferSize() { + return receiveBufferSize; + } + + public void setReceiveBufferSize(int receiveBufferSize) { + this.receiveBufferSize = receiveBufferSize; + } + + public URI getHost() { + return host; + } + public void setHost(String host, int port) throws URISyntaxException { + this.setHost(new URI("tcp://"+host+":"+port)); + } + public void setHost(String host) throws URISyntaxException { + this.setHost(new URI(host)); + } + public void setHost(URI host) { + this.host = host; + } + + public int getSendBufferSize() { + return sendBufferSize; + } + + public void setSendBufferSize(int sendBufferSize) { + this.sendBufferSize = sendBufferSize; + } + + public SSLContext getSslContext() { + return sslContext; + } + + public void setSslContext(SSLContext sslContext) { + this.sslContext = sslContext; + } + + public int getTrafficClass() { + return trafficClass; + } + + public void setTrafficClass(int trafficClass) { + this.trafficClass = trafficClass; + } + + public boolean isUseLocalHost() { + return useLocalHost; + } + + public void setUseLocalHost(boolean useLocalHost) { + this.useLocalHost = useLocalHost; + } +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpConnection.java ---------------------------------------------------------------------- diff --git a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpConnection.java b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpConnection.java new file mode 100644 index 0000000..b308209 --- /dev/null +++ b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpConnection.java @@ -0,0 +1,201 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.qpid.proton.hawtdispatch.api; + +import org.apache.qpid.proton.amqp.transport.ErrorCondition; +import org.apache.qpid.proton.hawtdispatch.impl.AmqpListener; +import org.apache.qpid.proton.hawtdispatch.impl.AmqpTransport; +import org.apache.qpid.proton.engine.Delivery; +import org.apache.qpid.proton.engine.Endpoint; +import org.apache.qpid.proton.engine.ProtonJConnection; +import org.apache.qpid.proton.engine.ProtonJSession; +import org.apache.qpid.proton.engine.impl.ProtocolTracer; +import org.fusesource.hawtdispatch.DispatchQueue; +import org.fusesource.hawtdispatch.Task; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; + +/** + * @author <a href="http://hiramchirino.com">Hiram Chirino</a> + */ +public class AmqpConnection extends AmqpEndpointBase { + + AmqpTransport transport; + ProtonJConnection connection; + HashSet<AmqpSender> senders = new HashSet<AmqpSender>(); + boolean closing = false; + + public static AmqpConnection connect(AmqpConnectOptions options) { + return new AmqpConnection(options); + } + + private AmqpConnection(AmqpConnectOptions options) { + transport = AmqpTransport.connect(options); + transport.setListener(new AmqpListener() { + @Override + public void processDelivery(Delivery delivery) { + Attachment attachment = (Attachment) getTransport().context(delivery.getLink()).getAttachment(); + AmqpLink link = (AmqpLink) attachment.endpoint(); + link.processDelivery(delivery); + } + + @Override + public void processRefill() { + for(AmqpSender sender: new ArrayList<AmqpSender>(senders)) { + sender.pumpDeliveries(); + } + pumpOut(); + } + + public void processTransportFailure(final IOException e) { + } + }); + connection = transport.connection(); + connection.open(); + attach(); + } + + public void waitForConnected() throws Exception { + assertNotOnDispatchQueue(); + getConnectedFuture().await(); + } + + public Future<Void> getConnectedFuture() { + final Promise<Void> rc = new Promise<Void>(); + queue().execute(new Task() { + @Override + public void run() { + onConnected(rc); + } + }); + return rc; + } + + public void onConnected(Callback<Void> cb) { + transport.onTransportConnected(cb); + } + + @Override + protected Endpoint getEndpoint() { + return connection; + } + + @Override + protected AmqpConnection getConnection() { + return this; + } + + @Override + protected AmqpEndpointBase getParent() { + return null; + } + + public AmqpSession createSession() { + assertExecuting(); + ProtonJSession session = connection.session(); + session.open(); + pumpOut(); + return new AmqpSession(this, session); + } + + public int getMaxSessions() { + return connection.getMaxChannels(); + } + + public void disconnect() { + closing = true; + transport.disconnect(); + } + + public void waitForDisconnected() throws Exception { + assertNotOnDispatchQueue(); + getDisconnectedFuture().await(); + } + + public Future<Void> getDisconnectedFuture() { + final Promise<Void> rc = new Promise<Void>(); + queue().execute(new Task() { + @Override + public void run() { + onDisconnected(rc); + } + }); + return rc; + } + + public void onDisconnected(Callback<Void> cb) { + transport.onTransportDisconnected(cb); + } + + public TransportState getTransportState() { + return transport.getState(); + } + + public Throwable getTransportFailure() { + return transport.getFailure(); + } + + public Future<Throwable> getTransportFailureFuture() { + final Promise<Throwable> rc = new Promise<Throwable>(); + queue().execute(new Task() { + @Override + public void run() { + onTransportFailure(rc); + } + }); + return rc; + } + + public void onTransportFailure(Callback<Throwable> cb) { + transport.onTransportFailure(cb); + } + + @Override + public DispatchQueue queue() { + return super.queue(); + } + + public void setProtocolTracer(ProtocolTracer protocolTracer) { + transport.setProtocolTracer(protocolTracer); + } + + public ProtocolTracer getProtocolTracer() { + return transport.getProtocolTracer(); + } + + /** + * Once the remote end, closes the transport is disconnected. + */ + @Override + public void close() { + super.close(); + onRemoteClose(new Callback<ErrorCondition>() { + @Override + public void onSuccess(ErrorCondition value) { + disconnect(); + } + + @Override + public void onFailure(Throwable value) { + disconnect(); + } + }); + } +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpDeliveryListener.java ---------------------------------------------------------------------- diff --git a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpDeliveryListener.java b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpDeliveryListener.java new file mode 100644 index 0000000..1e9f4e2 --- /dev/null +++ b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpDeliveryListener.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.qpid.proton.hawtdispatch.api; + +/** + * @author <a href="http://hiramchirino.com">Hiram Chirino</a> + */ +public interface AmqpDeliveryListener { + + /** + * Caller should suspend/resume the AmqpReceiver to + * flow control the delivery of messages. + * + * @param delivery + */ + void onMessageDelivery(MessageDelivery delivery); +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpEndpointBase.java ---------------------------------------------------------------------- diff --git a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpEndpointBase.java b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpEndpointBase.java new file mode 100644 index 0000000..4ebd8e2 --- /dev/null +++ b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpEndpointBase.java @@ -0,0 +1,158 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.qpid.proton.hawtdispatch.api; + +import org.apache.qpid.proton.amqp.Symbol; +import org.apache.qpid.proton.amqp.transport.ErrorCondition; +import org.apache.qpid.proton.hawtdispatch.impl.*; +import org.apache.qpid.proton.engine.Endpoint; +import org.apache.qpid.proton.engine.EndpointState; +import org.fusesource.hawtdispatch.Dispatch; +import org.fusesource.hawtdispatch.DispatchQueue; +import org.fusesource.hawtdispatch.Task; + +/** + * @author <a href="http://hiramchirino.com">Hiram Chirino</a> + */ +abstract class AmqpEndpointBase extends WatchBase { + abstract protected Endpoint getEndpoint(); + abstract protected AmqpEndpointBase getParent(); + + protected AmqpConnection getConnection() { + return getParent().getConnection(); + } + + protected AmqpTransport getTransport() { + return getConnection().transport; + } + + protected DispatchQueue queue() { + return getTransport().queue(); + } + + protected void assertExecuting() { + getTransport().assertExecuting(); + } + + public void waitForRemoteOpen() throws Exception { + assertNotOnDispatchQueue(); + getRemoteOpenFuture().await(); + } + + public Future<Void> getRemoteOpenFuture() { + final Promise<Void> rc = new Promise<Void>(); + queue().execute(new Task() { + @Override + public void run() { + onRemoteOpen(rc); + } + }); + return rc; + } + + public void onRemoteOpen(final Callback<Void> cb) { + addWatch(new Watch() { + @Override + public boolean execute() { + switch (getEndpoint().getRemoteState()) { + case ACTIVE: + cb.onSuccess(null); + return true; + case CLOSED: + cb.onFailure(Support.illegalState("closed")); + return true; + } + return false; + } + }); + } + + public ErrorCondition waitForRemoteClose() throws Exception { + assertNotOnDispatchQueue(); + return getRemoteCloseFuture().await(); + } + + public Future<ErrorCondition> getRemoteCloseFuture() { + final Promise<ErrorCondition> rc = new Promise<ErrorCondition>(); + queue().execute(new Task() { + @Override + public void run() { + onRemoteClose(rc); + } + }); + return rc; + } + + public void onRemoteClose(final Callback<ErrorCondition> cb) { + addWatch(new Watch() { + @Override + public boolean execute() { + if (getEndpoint().getRemoteState() == EndpointState.CLOSED) { + cb.onSuccess(getEndpoint().getRemoteCondition()); + return true; + } + return false; + } + }); + } + + public void close() { + getEndpoint().close(); + pumpOut(); + } + + public EndpointState getRemoteState() { + return getEndpoint().getRemoteState(); + } + + public ErrorCondition getRemoteError() { + return getEndpoint().getRemoteCondition(); + } + + static protected ErrorCondition toError(Throwable value) { + return new ErrorCondition(Symbol.valueOf("error"), value.toString()); + } + + class Attachment extends Task { + AmqpEndpointBase endpoint() { + return AmqpEndpointBase.this; + } + + @Override + public void run() { + fireWatches(); + } + } + + protected void attach() { + getTransport().context(getEndpoint()).setAttachment(new Attachment()); + } + + protected void defer(Defer defer) { + getTransport().defer(defer); + } + + protected void pumpOut() { + getTransport().pumpOut(); + } + + static protected void assertNotOnDispatchQueue() { + assert Dispatch.getCurrentQueue()==null : "Not allowed to be called when executing on a dispatch queue"; + } + +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpLink.java ---------------------------------------------------------------------- diff --git a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpLink.java b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpLink.java new file mode 100644 index 0000000..dd6f32e --- /dev/null +++ b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpLink.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.qpid.proton.hawtdispatch.api; + +import org.apache.qpid.proton.engine.Delivery; + +/** + * @author <a href="http://hiramchirino.com">Hiram Chirino</a> + */ +abstract public class AmqpLink extends AmqpEndpointBase { + abstract protected void processDelivery(Delivery delivery); +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpReceiver.java ---------------------------------------------------------------------- diff --git a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpReceiver.java b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpReceiver.java new file mode 100644 index 0000000..644f72a --- /dev/null +++ b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpReceiver.java @@ -0,0 +1,141 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.qpid.proton.hawtdispatch.api; + +import org.apache.qpid.proton.hawtdispatch.impl.Defer; +import org.apache.qpid.proton.engine.Delivery; +import org.apache.qpid.proton.engine.Receiver; +import org.apache.qpid.proton.amqp.messaging.Accepted; +import org.fusesource.hawtbuf.Buffer; +import org.fusesource.hawtbuf.ByteArrayOutputStream; + +import java.util.LinkedList; + +/** + * @author <a href="http://hiramchirino.com">Hiram Chirino</a> + */ +public class AmqpReceiver extends AmqpLink { + + final AmqpSession parent; + final Receiver receiver; + + public AmqpReceiver(AmqpSession parent, Receiver receiver2, QoS qos) { + this.parent = parent; + this.receiver = receiver2; + attach(); + } + + @Override + protected Receiver getEndpoint() { + return receiver; + } + @Override + protected AmqpSession getParent() { + return parent; + } + + ByteArrayOutputStream current = new ByteArrayOutputStream(); + + @Override + protected void processDelivery(Delivery delivery) { + if( !delivery.isReadable() ) { + System.out.println("it was not readable!"); + return; + } + + if( current==null ) { + current = new ByteArrayOutputStream(); + } + + int count; + byte data[] = new byte[1024*4]; + while( (count = receiver.recv(data, 0, data.length)) > 0 ) { + current.write(data, 0, count); + } + + // Expecting more deliveries.. + if( count == 0 ) { + return; + } + + receiver.advance(); + Buffer buffer = current.toBuffer(); + current = null; + onMessage(delivery, buffer); + + } + + LinkedList<MessageDelivery> inbound = new LinkedList<MessageDelivery>(); + + protected void onMessage(Delivery delivery, Buffer buffer) { + MessageDelivery md = new MessageDelivery(buffer) { + @Override + AmqpLink link() { + return AmqpReceiver.this; + } + + @Override + public void settle() { + if( !delivery.isSettled() ) { + delivery.disposition(new Accepted()); + delivery.settle(); + } + drain(); + } + }; + md.delivery = delivery; + delivery.setContext(md); + inbound.add(md); + drainInbound(); + } + + public void drain() { + defer(deferedDrain); + } + + Defer deferedDrain = new Defer(){ + public void run() { + drainInbound(); + } + }; + int resumed = 0; + + public void resume() { + resumed++; + } + public void suspend() { + resumed--; + } + + AmqpDeliveryListener deliveryListener; + private void drainInbound() { + while( deliveryListener!=null && !inbound.isEmpty() && resumed>0) { + deliveryListener.onMessageDelivery(inbound.removeFirst()); + receiver.flow(1); + } + } + + public AmqpDeliveryListener getDeliveryListener() { + return deliveryListener; + } + + public void setDeliveryListener(AmqpDeliveryListener deliveryListener) { + this.deliveryListener = deliveryListener; + drainInbound(); + } +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpSender.java ---------------------------------------------------------------------- diff --git a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpSender.java b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpSender.java new file mode 100644 index 0000000..9a672d5 --- /dev/null +++ b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpSender.java @@ -0,0 +1,227 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.qpid.proton.hawtdispatch.api; + +import org.apache.qpid.proton.hawtdispatch.impl.Defer; +import org.apache.qpid.proton.hawtdispatch.impl.Watch; +import org.apache.qpid.proton.engine.Delivery; +import org.apache.qpid.proton.engine.Sender; +import org.apache.qpid.proton.message.Message; +import org.apache.qpid.proton.amqp.messaging.Accepted; +import org.apache.qpid.proton.amqp.messaging.Modified; +import org.apache.qpid.proton.amqp.messaging.Rejected; +import org.apache.qpid.proton.amqp.messaging.Released; +import org.apache.qpid.proton.amqp.transport.DeliveryState; +import org.fusesource.hawtbuf.Buffer; + +import java.io.UnsupportedEncodingException; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; + +/** + * @author <a href="http://hiramchirino.com">Hiram Chirino</a> + */ +public class AmqpSender extends AmqpLink { + + private byte[] EMPTY_BYTE_ARRAY = new byte[]{}; + long nextTagId = 0; + HashSet<byte[]> tagCache = new HashSet<byte[]>(); + + final AmqpSession parent; + private final QoS qos; + final Sender sender; + + public AmqpSender(AmqpSession parent, Sender sender2, QoS qos) { + this.parent = parent; + this.sender = sender2; + this.qos = qos; + attach(); + getConnection().senders.add(this); + } + + @Override + public void close() { + super.close(); + getConnection().senders.remove(this); + } + + @Override + protected Sender getEndpoint() { + return sender; + } + + @Override + protected AmqpSession getParent() { + return parent; + } + + final LinkedList<MessageDelivery> outbound = new LinkedList<MessageDelivery>(); + long outboundBufferSize; + + public MessageDelivery send(Message message) { + assertExecuting(); + MessageDelivery rc = new MessageDelivery(message) { + @Override + AmqpLink link() { + return AmqpSender.this; + } + + @Override + public void redeliver(boolean incrementDeliveryCounter) { + super.redeliver(incrementDeliveryCounter); + outbound.add(this); + outboundBufferSize += initialSize; + defer(deferedPumpDeliveries); + } + }; + outbound.add(rc); + outboundBufferSize += rc.initialSize; + pumpDeliveries(); + pumpOut(); + return rc; + } + + Buffer currentBuffer; + Delivery currentDelivery; + + Defer deferedPumpDeliveries = new Defer() { + public void run() { + pumpDeliveries(); + } + }; + + public long getOverflowBufferSize() { + return outboundBufferSize; + } + + protected void pumpDeliveries() { + assertExecuting(); + try { + while(true) { + while( currentBuffer !=null ) { + if( sender.getCredit() > 0 ) { + int sent = sender.send(currentBuffer.data, currentBuffer.offset, currentBuffer.length); + currentBuffer.moveHead(sent); + if( currentBuffer.length == 0 ) { + Delivery current = currentDelivery; + MessageDelivery md = (MessageDelivery) current.getContext(); + currentBuffer = null; + currentDelivery = null; + if( qos == QoS.AT_MOST_ONCE ) { + current.settle(); + } else { + sender.advance(); + } + md.fireWatches(); + } + } else { + return; + } + } + + if( outbound.isEmpty() ) { + return; + } + + final MessageDelivery md = outbound.removeFirst(); + outboundBufferSize -= md.initialSize; + currentBuffer = md.encoded(); + if( qos == QoS.AT_MOST_ONCE ) { + currentDelivery = sender.delivery(EMPTY_BYTE_ARRAY, 0, 0); + } else { + final byte[] tag = nextTag(); + currentDelivery = sender.delivery(tag, 0, tag.length); + } + md.delivery = currentDelivery; + currentDelivery.setContext(md); + } + } finally { + fireWatches(); + } + } + + @Override + protected void processDelivery(Delivery delivery) { + final MessageDelivery md = (MessageDelivery) delivery.getContext(); + if( delivery.remotelySettled() ) { + if( delivery.getTag().length > 0 ) { + checkinTag(delivery.getTag()); + } + + final DeliveryState state = delivery.getRemoteState(); + if( state==null || state instanceof Accepted) { + if( !delivery.remotelySettled() ) { + delivery.disposition(new Accepted()); + } + } else if( state instanceof Rejected) { + // re-deliver /w incremented delivery counter. + md.delivery = null; + md.incrementDeliveryCount(); + outbound.addLast(md); + } else if( state instanceof Released) { + // re-deliver && don't increment the counter. + md.delivery = null; + outbound.addLast(md); + } else if( state instanceof Modified) { + Modified modified = (Modified) state; + if ( modified.getDeliveryFailed() ) { + // increment delivery counter.. + md.incrementDeliveryCount(); + } + } + delivery.settle(); + } + md.fireWatches(); + } + + byte[] nextTag() { + byte[] rc; + if (tagCache != null && !tagCache.isEmpty()) { + final Iterator<byte[]> iterator = tagCache.iterator(); + rc = iterator.next(); + iterator.remove(); + } else { + try { + rc = Long.toHexString(nextTagId++).getBytes("UTF-8"); + } catch (UnsupportedEncodingException e) { + throw new RuntimeException(e); + } + } + return rc; + } + + void checkinTag(byte[] data) { + if( tagCache.size() < 1024 ) { + tagCache.add(data); + } + } + + public void onOverflowBufferDrained(final Callback<Void> cb) { + addWatch(new Watch() { + @Override + public boolean execute() { + if (outboundBufferSize==0) { + cb.onSuccess(null); + return true; + } + return false; + } + }); + } +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpSession.java ---------------------------------------------------------------------- diff --git a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpSession.java b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpSession.java new file mode 100644 index 0000000..b25a1b7 --- /dev/null +++ b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpSession.java @@ -0,0 +1,141 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.qpid.proton.hawtdispatch.api; + +import org.apache.qpid.proton.amqp.messaging.Section; +import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode; +import org.apache.qpid.proton.engine.Endpoint; +import org.apache.qpid.proton.engine.Link; +import org.apache.qpid.proton.engine.ProtonJSession; +import org.apache.qpid.proton.engine.Receiver; +import org.apache.qpid.proton.engine.Sender; +import org.apache.qpid.proton.message.Message; +import org.apache.qpid.proton.amqp.Binary; +import org.apache.qpid.proton.amqp.messaging.*; +import org.apache.qpid.proton.amqp.transport.SenderSettleMode; + +import java.util.UUID; + +/** + * @author <a href="http://hiramchirino.com">Hiram Chirino</a> + */ +public class AmqpSession extends AmqpEndpointBase { + + final AmqpConnection parent; + final ProtonJSession session; + + + public AmqpSession(AmqpConnection parent, ProtonJSession session) { + this.parent = parent; + this.session = session; + attach(); + } + + @Override + protected Endpoint getEndpoint() { + return session; + } + + @Override + protected AmqpConnection getParent() { + return parent; + } + + public AmqpSender createSender(Target target) { + return createSender(target, QoS.AT_LEAST_ONCE); + } + + public AmqpSender createSender(Target target, QoS qos) { + return createSender(target, qos, UUID.randomUUID().toString()); + } + + public AmqpSender createSender(Target target, QoS qos, String name) { + assertExecuting(); + Sender sender = session.sender(name); + attach(); +// Source source = new Source(); +// source.setAddress(UUID.randomUUID().toString()); +// sender.setSource(source); + sender.setTarget(target); + configureQos(sender, qos); + sender.open(); + pumpOut(); + return new AmqpSender(this, sender, qos); + } + + public AmqpReceiver createReceiver(Source source) { + return createReceiver(source, QoS.AT_LEAST_ONCE); + } + + public AmqpReceiver createReceiver(Source source, QoS qos) { + return createReceiver(source, qos, 100); + } + + public AmqpReceiver createReceiver(Source source, QoS qos, int prefetch) { + return createReceiver(source, qos, prefetch, UUID.randomUUID().toString()); + } + + public AmqpReceiver createReceiver(Source source, QoS qos, int prefetch, String name) { + assertExecuting(); + Receiver receiver = session.receiver(name); + receiver.setSource(source); +// Target target = new Target(); +// target.setAddress(UUID.randomUUID().toString()); +// receiver.setTarget(target); + receiver.flow(prefetch); + configureQos(receiver, qos); + receiver.open(); + pumpOut(); + return new AmqpReceiver(this, receiver, qos); + } + + private void configureQos(Link link, QoS qos) { + switch (qos) { + case AT_MOST_ONCE: + link.setSenderSettleMode(SenderSettleMode.SETTLED); + link.setReceiverSettleMode(ReceiverSettleMode.FIRST); + break; + case AT_LEAST_ONCE: + link.setSenderSettleMode(SenderSettleMode.UNSETTLED); + link.setReceiverSettleMode(ReceiverSettleMode.FIRST); + break; + case EXACTLY_ONCE: + link.setSenderSettleMode(SenderSettleMode.UNSETTLED); + link.setReceiverSettleMode(ReceiverSettleMode.SECOND); + break; + } + } + + public Message createTextMessage(String value) { + Message msg = Message.Factory.create(); + Section body = new AmqpValue(value); + msg.setBody(body); + return msg; + } + + public Message createBinaryMessage(byte value[]) { + return createBinaryMessage(value, 0, value.length); + } + + public Message createBinaryMessage(byte value[], int offset, int len) { + Message msg = Message.Factory.create(); + Data body = new Data(new Binary(value, offset,len)); + msg.setBody(body); + return msg; + } +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/Callback.java ---------------------------------------------------------------------- diff --git a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/Callback.java b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/Callback.java new file mode 100644 index 0000000..89fbdd1 --- /dev/null +++ b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/Callback.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.proton.hawtdispatch.api; + +/** + * <p> + * Function Result that carries one value. + * </p> + * + * @author <a href="http://hiramchirino.com">Hiram Chirino</a> + */ +public interface Callback<T> { + public void onSuccess(T value); + public void onFailure(Throwable value); +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/ChainedCallback.java ---------------------------------------------------------------------- diff --git a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/ChainedCallback.java b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/ChainedCallback.java new file mode 100644 index 0000000..e53f512 --- /dev/null +++ b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/ChainedCallback.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.proton.hawtdispatch.api; + +/** + * <p> + * Function Result that carries one value. + * </p> + * + * @author <a href="http://hiramchirino.com">Hiram Chirino</a> + */ +abstract public class ChainedCallback<In,Out> implements Callback<In> { + + public final Callback<Out> next; + + public ChainedCallback(Callback<Out> next) { + this.next = next; + } + + public void onFailure(Throwable value) { + next.onFailure(value); + } +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/DeliveryAttachment.java ---------------------------------------------------------------------- diff --git a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/DeliveryAttachment.java b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/DeliveryAttachment.java new file mode 100644 index 0000000..290076f --- /dev/null +++ b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/DeliveryAttachment.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.qpid.proton.hawtdispatch.api; + +import org.apache.qpid.proton.engine.Delivery; + +/** + * @author <a href="http://hiramchirino.com">Hiram Chirino</a> + */ +abstract public class DeliveryAttachment { + abstract void processDelivery(Delivery delivery); +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/Future.java ---------------------------------------------------------------------- diff --git a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/Future.java b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/Future.java new file mode 100644 index 0000000..4a9eb5e --- /dev/null +++ b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/Future.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.proton.hawtdispatch.api; + +import java.util.concurrent.TimeUnit; + +/** + * <p>A simplified Future function results interface.</p> + * + * @author <a href="http://hiramchirino.com">Hiram Chirino</a> + */ +public interface Future<T> { + T await() throws Exception; + T await(long amount, TimeUnit unit) throws Exception; + void then(Callback<T> callback); + +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/MessageDelivery.java ---------------------------------------------------------------------- diff --git a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/MessageDelivery.java b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/MessageDelivery.java new file mode 100644 index 0000000..b115557 --- /dev/null +++ b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/MessageDelivery.java @@ -0,0 +1,226 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.qpid.proton.hawtdispatch.api; + +import org.apache.qpid.proton.amqp.transport.DeliveryState; +import org.apache.qpid.proton.engine.Delivery; +import org.apache.qpid.proton.hawtdispatch.impl.Watch; +import org.apache.qpid.proton.hawtdispatch.impl.WatchBase; +import org.apache.qpid.proton.message.Message; +import org.apache.qpid.proton.message.ProtonJMessage; +import org.fusesource.hawtbuf.Buffer; +import org.fusesource.hawtdispatch.Task; + +/** + * @author <a href="http://hiramchirino.com">Hiram Chirino</a> + */ +public abstract class MessageDelivery extends WatchBase { + + final int initialSize; + private Message message; + private Buffer encoded; + public Delivery delivery; + private int sizeHint = 32; + + static Buffer encode(Message message, int sizeHint) { + byte[] buffer = new byte[sizeHint]; + int size = ((ProtonJMessage)message).encode2(buffer, 0, sizeHint); + if( size > sizeHint ) { + buffer = new byte[size]; + size = message.encode(buffer, 0, size); + } + return new Buffer(buffer, 0, size); + } + + static Message decode(Buffer buffer) { + Message msg = Message.Factory.create(); + int offset = buffer.offset; + int len = buffer.length; + while( len > 0 ) { + int decoded = msg.decode(buffer.data, offset, len); + assert decoded > 0: "Make progress decoding the message"; + offset += decoded; + len -= decoded; + } + return msg; + } + + public MessageDelivery(Message message) { + this(message, encode(message, 32)); + } + + public MessageDelivery(Buffer encoded) { + this(null, encoded); + } + + public MessageDelivery(Message message, Buffer encoded) { + this.message = message; + this.encoded = encoded; + sizeHint = this.encoded.length; + initialSize = sizeHint; + } + + public Message getMessage() { + if( message == null ) { + message = decode(encoded); + } + return message; + } + + public Buffer encoded() { + if( encoded == null ) { + encoded = encode(message, sizeHint); + sizeHint = encoded.length; + } + return encoded; + } + + public boolean isSettled() { + return delivery!=null && delivery.isSettled(); + } + + public DeliveryState getRemoteState() { + return delivery==null ? null : delivery.getRemoteState(); + } + + public DeliveryState getLocalState() { + return delivery==null ? null : delivery.getLocalState(); + } + + public void onEncoded(final Callback<Void> cb) { + addWatch(new Watch() { + @Override + public boolean execute() { + if( delivery!=null ) { + cb.onSuccess(null); + return true; + } + return false; + } + }); + } + + /** + * @return the remote delivery state when it changes. + * @throws Exception + */ + public DeliveryState getRemoteStateChange() throws Exception { + AmqpEndpointBase.assertNotOnDispatchQueue(); + return getRemoteStateChangeFuture().await(); + } + + /** + * @return the future remote delivery state when it changes. + */ + public Future<DeliveryState> getRemoteStateChangeFuture() { + final Promise<DeliveryState> rc = new Promise<DeliveryState>(); + link().queue().execute(new Task() { + @Override + public void run() { + onRemoteStateChange(rc); + } + }); + return rc; + } + + abstract AmqpLink link(); + + boolean watchingRemoteStateChange; + public void onRemoteStateChange(final Callback<DeliveryState> cb) { + watchingRemoteStateChange = true; + final DeliveryState original = delivery.getRemoteState(); + addWatch(new Watch() { + @Override + public boolean execute() { + if (original == null) { + if( delivery.getRemoteState()!=null ) { + cb.onSuccess(delivery.getRemoteState()); + watchingRemoteStateChange = false; + return true; + } + } else { + if( !original.equals(delivery.getRemoteState()) ) { + cb.onSuccess(delivery.getRemoteState()); + watchingRemoteStateChange = false; + return true; + } + } + return false; + } + }); + } + + /** + * @return the remote delivery state once settled. + * @throws Exception + */ + public DeliveryState getSettle() throws Exception { + AmqpEndpointBase.assertNotOnDispatchQueue(); + return getSettleFuture().await(); + } + + /** + * @return the future remote delivery state once the delivery is settled. + */ + public Future<DeliveryState> getSettleFuture() { + final Promise<DeliveryState> rc = new Promise<DeliveryState>(); + link().queue().execute(new Task() { + @Override + public void run() { + onSettle(rc); + } + }); + return rc; + } + + public void onSettle(final Callback<DeliveryState> cb) { + addWatch(new Watch() { + @Override + public boolean execute() { + if( delivery!=null && delivery.isSettled() ) { + cb.onSuccess(delivery.getRemoteState()); + return true; + } + return false; + } + }); + } + + @Override + protected void fireWatches() { + super.fireWatches(); + } + + void incrementDeliveryCount() { + Message msg = getMessage(); + msg.setDeliveryCount(msg.getDeliveryCount()+1); + encoded = null; + } + + public void redeliver(boolean incrementDeliveryCounter) { + if( incrementDeliveryCounter ) { + incrementDeliveryCount(); + } + } + + public void settle() { + if( !delivery.isSettled() ) { + delivery.settle(); + } + } +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/Promise.java ---------------------------------------------------------------------- diff --git a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/Promise.java b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/Promise.java new file mode 100644 index 0000000..b914b44 --- /dev/null +++ b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/Promise.java @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.proton.hawtdispatch.api; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * <p> + * </p> + * + * @author <a href="http://hiramchirino.com">Hiram Chirino</a> + */ +public class Promise<T> implements Callback<T>, Future<T> { + + private final CountDownLatch latch = new CountDownLatch(1); + Callback<T> next; + Throwable error; + T value; + + public void onFailure(Throwable value) { + Callback<T> callback = null; + synchronized(this) { + error = value; + latch.countDown(); + callback = next; + } + if( callback!=null ) { + callback.onFailure(value); + } + } + + public void onSuccess(T value) { + Callback<T> callback = null; + synchronized(this) { + this.value = value; + latch.countDown(); + callback = next; + } + if( callback!=null ) { + callback.onSuccess(value); + } + } + + public void then(Callback<T> callback) { + boolean fire = false; + synchronized(this) { + next = callback; + if( latch.getCount() == 0 ) { + fire = true; + } + } + if( fire ) { + if( error!=null ) { + callback.onFailure(error); + } else { + callback.onSuccess(value); + } + } + } + + public T await(long amount, TimeUnit unit) throws Exception { + if( latch.await(amount, unit) ) { + return get(); + } else { + throw new TimeoutException(); + } + } + + public T await() throws Exception { + latch.await(); + return get(); + } + + private T get() throws Exception { + Throwable e = error; + if( e !=null ) { + if( e instanceof RuntimeException ) { + throw (RuntimeException) e; + } else if( e instanceof Exception) { + throw (Exception) e; + } else if( e instanceof Error) { + throw (Error) e; + } else { + // don't expect to hit this case. + throw new RuntimeException(e); + } + } + return value; + } + +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/QoS.java ---------------------------------------------------------------------- diff --git a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/QoS.java b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/QoS.java new file mode 100644 index 0000000..5b4a8dc --- /dev/null +++ b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/QoS.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.proton.hawtdispatch.api; + +/** + * @author <a href="http://hiramchirino.com">Hiram Chirino</a> + */ +public enum QoS { + AT_MOST_ONCE, + AT_LEAST_ONCE, + EXACTLY_ONCE +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
