Repository: qpid-proton Updated Branches: refs/heads/master 9d88f823c -> 6ea002d27
NO-JIRA: Add reactor clients to C examples Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/6ea002d2 Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/6ea002d2 Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/6ea002d2 Branch: refs/heads/master Commit: 6ea002d274692ac85e645041a988512c39c75c05 Parents: 9d88f82 Author: Ken Giusti <[email protected]> Authored: Mon Apr 11 11:42:06 2016 -0400 Committer: Ken Giusti <[email protected]> Committed: Tue Apr 12 09:54:57 2016 -0400 ---------------------------------------------------------------------- examples/c/CMakeLists.txt | 1 + examples/c/reactor/CMakeLists.txt | 27 ++++ examples/c/reactor/README | 30 ++++ examples/c/reactor/receiver.c | 237 +++++++++++++++++++++++++++ examples/c/reactor/sender.c | 286 +++++++++++++++++++++++++++++++++ 5 files changed, 581 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6ea002d2/examples/c/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/examples/c/CMakeLists.txt b/examples/c/CMakeLists.txt index 2f74094..1612a86 100644 --- a/examples/c/CMakeLists.txt +++ b/examples/c/CMakeLists.txt @@ -19,3 +19,4 @@ include_directories(${CMAKE_CURRENT_SOURCE_DIR}/include) add_subdirectory(messenger) +add_subdirectory(reactor) http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6ea002d2/examples/c/reactor/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/examples/c/reactor/CMakeLists.txt b/examples/c/reactor/CMakeLists.txt new file mode 100644 index 0000000..99e5b4a --- /dev/null +++ b/examples/c/reactor/CMakeLists.txt @@ -0,0 +1,27 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +find_package(Proton REQUIRED) + +include_directories(${Proton_INCLUDE_DIRS}) +add_executable(sender sender.c) +add_executable(receiver receiver.c) +target_link_libraries(sender ${Proton_LIBRARIES}) +target_link_libraries(receiver ${Proton_LIBRARIES}) + http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6ea002d2/examples/c/reactor/README ---------------------------------------------------------------------- diff --git a/examples/c/reactor/README b/examples/c/reactor/README new file mode 100644 index 0000000..8d61893 --- /dev/null +++ b/examples/c/reactor/README @@ -0,0 +1,30 @@ +These example clients require a broker or similar intermediary that +supports the AMQP 1.0 protocol, allows anonymous connections and +accepts links to and from a node named 'examples'. + +------------------------------------------------------------------ + +sender.c + +A simple message sending client. This example sends all messages but +the last as pre-settled (no ack required). It then pends waiting for +an ack for the last message sent before exiting. + +Use the '-h' command line option for a list of supported parameters. + +------------------------------------------------------------------ + +receiver.c + +A simple message consuming client. This example receives messages +from a target (default 'examples'). Received messages are +acknowledged if they are sent un-settled. The client will try to +decode the message payload assuming it has been generated by the +sender example. + +Use the '-h' command line option for a list of supported parameters. + + + + + http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6ea002d2/examples/c/reactor/receiver.c ---------------------------------------------------------------------- diff --git a/examples/c/reactor/receiver.c b/examples/c/reactor/receiver.c new file mode 100644 index 0000000..dd6a9e3 --- /dev/null +++ b/examples/c/reactor/receiver.c @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <stdlib.h> +#include <stdio.h> +#include <string.h> + +#include "pncompat/misc_funcs.inc" + +#include "proton/reactor.h" +#include "proton/message.h" +#include "proton/connection.h" +#include "proton/session.h" +#include "proton/link.h" +#include "proton/delivery.h" +#include "proton/event.h" +#include "proton/handlers.h" + +static int quiet = 0; + +// Credit batch if unlimited receive (-c 0) +static const int CAPACITY = 100; +#define MAX_SIZE 512 + +// Example application data. This data will be instantiated in the event +// handler, and is available during event processing. In this example it +// holds configuration and state information. +// +typedef struct { + int count; // # of messages to receive before exiting + char *source; // name of the source node to receive from + pn_message_t *message; // holds the received message +} app_data_t; + +// helper to pull pointer to app_data_t instance out of the pn_handler_t +// +#define GET_APP_DATA(handler) ((app_data_t *)pn_handler_mem(handler)) + +// Called when reactor exits to clean up app_data +// +static void delete_handler(pn_handler_t *handler) +{ + app_data_t *d = GET_APP_DATA(handler); + if (d->message) { + pn_decref(d->message); + d->message = NULL; + } +} + + +/* Process each event posted by the reactor. + */ +static void event_handler(pn_handler_t *handler, + pn_event_t *event, + pn_event_type_t type) +{ + app_data_t *data = GET_APP_DATA(handler); + + switch (type) { + + case PN_CONNECTION_INIT: { + // Create and open all the endpoints needed to send a message + // + pn_connection_t *conn = pn_event_connection(event); + pn_connection_open(conn); + pn_session_t *ssn = pn_session(conn); + pn_session_open(ssn); + pn_link_t *receiver = pn_receiver(ssn, "MyReceiver"); + pn_terminus_set_address(pn_link_source(receiver), data->source); + pn_link_open(receiver); + // cannot receive without granting credit: + pn_link_flow(receiver, data->count ? data->count : CAPACITY); + } break; + + case PN_DELIVERY: { + // A message has been received + // + pn_delivery_t *dlv = pn_event_delivery(event); + if (pn_delivery_readable(dlv) && !pn_delivery_partial(dlv)) { + // A full message has arrived + if (!quiet && pn_delivery_pending(dlv) < MAX_SIZE) { + // try to decode the message body + pn_bytes_t bytes; + bool found = false; + static char buffer[MAX_SIZE]; + size_t len = pn_link_recv(pn_delivery_link(dlv), buffer, MAX_SIZE); + pn_message_clear(data->message); + // decode the raw data into the message instance + if (pn_message_decode(data->message, buffer, len) == PN_OK) { + // Assuming the message came from the sender example, try + // to parse out a single string from the payload + // + int rc = pn_data_scan(pn_message_body(data->message) + , "?S", &found, &bytes); + if (!rc && found) { + fprintf(stdout, "Message: [%.*s]\n", + (int)bytes.size, bytes.start); + } else { + fprintf(stdout, "Message received!\n"); + } + } else { + fprintf(stdout, "Message received!\n"); + } + } + + pn_link_t *link = pn_delivery_link(dlv); + + if (!pn_delivery_settled(dlv)) { + // remote has not settled, so it is tracking the delivery. Ack + // it. + pn_delivery_update(dlv, PN_ACCEPTED); + } + + // done with the delivery, move to the next and free it + pn_link_advance(link); + pn_delivery_settle(dlv); // dlv is now freed + + if (data->count == 0) { + // send forever - see if more credit is needed + if (pn_link_credit(link) < CAPACITY/2) { + // Grant enough credit to bring it up to CAPACITY: + pn_link_flow(link, CAPACITY - pn_link_credit(link)); + } + } else if (--data->count == 0) { + // done receiving, close the endpoints + pn_link_close(link); + pn_session_t *ssn = pn_link_session(link); + pn_session_close(ssn); + pn_connection_close(pn_session_connection(ssn)); + } + } + } break; + + default: + break; + } +} + +static void usage(void) +{ + printf("Usage: receiver <options>\n"); + printf("-a \tThe host address [localhost:5672]\n"); + printf("-c \t# of messages to receive, 0=receive forever [1]\n"); + printf("-s \tSource address [examples]\n"); + printf("-i \tContainer name [ReceiveExample]\n"); + printf("-q \tQuiet - turn off stdout\n"); + exit(1); +} + +int main(int argc, char** argv) +{ + char *address = "localhost"; + char *container = "ReceiveExample"; + + /* create a handler for the connection's events. + * event_handler will be called for each event. The handler will allocate + * a app_data_t instance which can be accessed when the event_handler is + * called. + */ + pn_handler_t *handler = pn_handler_new(event_handler, + sizeof(app_data_t), + delete_handler); + + /* set up the application data with defaults */ + app_data_t *app_data = GET_APP_DATA(handler); + memset(app_data, 0, sizeof(app_data_t)); + app_data->count = 1; + app_data->source = "examples"; + app_data->message = pn_message(); + + /* Attach the pn_handshaker() handler. This handler deals with endpoint + * events from the peer so we don't have to. + */ + pn_handler_add(handler, pn_handshaker()); + + /* command line options */ + opterr = 0; + int c; + while((c = getopt(argc, argv, "i:a:c:s:qh")) != -1) { + switch(c) { + case 'h': usage(); break; + case 'a': address = optarg; break; + case 'c': + app_data->count = atoi(optarg); + if (app_data->count < 0) usage(); + break; + case 's': app_data->source = optarg; break; + case 'i': container = optarg; break; + case 'q': quiet = 1; break; + default: + usage(); + break; + } + } + + pn_reactor_t *reactor = pn_reactor(); + pn_connection_t *conn = pn_reactor_connection(reactor, handler); + + // the container name should be unique for each client + pn_connection_set_container(conn, container); + pn_connection_set_hostname(conn, address); // FIXME + + // wait up to 5 seconds for activity before returning from + // pn_reactor_process() + pn_reactor_set_timeout(reactor, 5000); + + pn_reactor_start(reactor); + + while (pn_reactor_process(reactor)) { + /* Returns 'true' until the connection is shut down. + * pn_reactor_process() will return true at least once every 5 seconds + * (due to the timeout). If no timeout was configured, + * pn_reactor_process() returns as soon as it finishes processing all + * pending I/O and events. Once the connection has closed, + * pn_reactor_process() will return false. + */ + } + + return 0; +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6ea002d2/examples/c/reactor/sender.c ---------------------------------------------------------------------- diff --git a/examples/c/reactor/sender.c b/examples/c/reactor/sender.c new file mode 100644 index 0000000..9a07c9e --- /dev/null +++ b/examples/c/reactor/sender.c @@ -0,0 +1,286 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <stdlib.h> +#include <stdio.h> +#include <string.h> + +#include "pncompat/misc_funcs.inc" + +#include "proton/reactor.h" +#include "proton/message.h" +#include "proton/connection.h" +#include "proton/session.h" +#include "proton/link.h" +#include "proton/delivery.h" +#include "proton/event.h" +#include "proton/handlers.h" + +static int quiet = 0; + +// Example application data. This data will be instantiated in the event +// handler, and is available during event processing. In this example it +// holds configuration and state information. +// +typedef struct { + int count; // # messages to send + int anon; // use anonymous link if true + char *target; // name of destination target + char *msg_data; // pre-encoded outbound message + int msg_len; // bytes in msg_data +} app_data_t; + +// helper to pull pointer to app_data_t instance out of the pn_handler_t +// +#define GET_APP_DATA(handler) ((app_data_t *)pn_handler_mem(handler)) + +// Called when reactor exits to clean up app_data +// +static void delete_handler(pn_handler_t *handler) +{ + app_data_t *d = GET_APP_DATA(handler); + if (d->msg_data) { + free(d->msg_data); + d->msg_data = NULL; + } +} + +/* Process each event posted by the reactor. + */ +static void event_handler(pn_handler_t *handler, + pn_event_t *event, + pn_event_type_t type) +{ + app_data_t *data = GET_APP_DATA(handler); + + switch (type) { + + case PN_CONNECTION_INIT: { + // Create and open all the endpoints needed to send a message + // + pn_connection_t *conn = pn_event_connection(event); + pn_connection_open(conn); + pn_session_t *ssn = pn_session(conn); + pn_session_open(ssn); + pn_link_t *sender = pn_sender(ssn, "MySender"); + // we do not wait for ack until the last message + pn_link_set_snd_settle_mode(sender, PN_SND_MIXED); + if (!data->anon) { + pn_terminus_set_address(pn_link_target(sender), data->target); + } + pn_link_open(sender); + } break; + + case PN_LINK_FLOW: { + // the remote has given us some credit, now we can send messages + // + static long tag = 0; // a simple tag generator + pn_link_t *sender = pn_event_link(event); + int credit = pn_link_credit(sender); + while (credit > 0 && data->count > 0) { + --credit; + --data->count; + ++tag; + pn_delivery_t *delivery; + delivery = pn_delivery(sender, + pn_dtag((const char *)&tag, sizeof(tag))); + pn_link_send(sender, data->msg_data, data->msg_len); + pn_link_advance(sender); + if (data->count > 0) { + // send pre-settled until the last one, then wait for an ack on + // the last sent message. This allows the sender to send + // messages as fast as possible and then exit when the consumer + // has dealt with the last one. + // + pn_delivery_settle(delivery); + } + } + } break; + + case PN_DELIVERY: { + // Since the example sends all messages but the last pre-settled + // (pre-acked), only the last message's delivery will get updated with + // the remote state (acked/nacked). + // + pn_delivery_t *dlv = pn_event_delivery(event); + if (pn_delivery_updated(dlv) && pn_delivery_remote_state(dlv)) { + uint64_t rs = pn_delivery_remote_state(dlv); + int done = 1; + switch (rs) { + case PN_RECEIVED: + // This is not a terminal state - it is informational, and the + // peer is still processing the message. + done = 0; + break; + case PN_ACCEPTED: + pn_delivery_settle(dlv); + if (!quiet) fprintf(stdout, "Send complete!\n"); + break; + case PN_REJECTED: + case PN_RELEASED: + case PN_MODIFIED: + pn_delivery_settle(dlv); + fprintf(stderr, "Message not accepted - code:%lu\n", (unsigned long)rs); + break; + default: + // ??? no other terminal states defined, so ignore anything else + pn_delivery_settle(dlv); + fprintf(stderr, "Unknown delivery failure - code=%lu\n", (unsigned long)rs); + break; + } + + if (done) { + // initiate clean shutdown of the endpoints + pn_link_t *link = pn_delivery_link(dlv); + pn_link_close(link); + pn_session_t *ssn = pn_link_session(link); + pn_session_close(ssn); + pn_connection_close(pn_session_connection(ssn)); + } + } + } break; + + default: + break; + } +} + +static void usage(void) +{ + printf("Usage: send <options> <message>\n"); + printf("-a \tThe host address [localhost:5672]\n"); + printf("-c \t# of messages to send [1]\n"); + printf("-t \tTarget address [examples]\n"); + printf("-n \tUse an anonymous link [off]\n"); + printf("-i \tContainer name [SendExample]\n"); + printf("-q \tQuiet - turn off stdout\n"); + printf("message \tA text string to send.\n"); + exit(1); +} + +int main(int argc, char** argv) +{ + char *address = "localhost"; + char *msgtext = "Hello World!"; + char *container = "SendExample"; + int anon = 0; + int c; + + /* Create a handler for the connection's events. event_handler() will be + * called for each event and delete_handler will be called when the + * connection is released. The handler will allocate an app_data_t + * instance which can be accessed when the event_handler is called. + */ + pn_handler_t *handler = pn_handler_new(event_handler, + sizeof(app_data_t), + delete_handler); + + /* set up the application data with defaults */ + app_data_t *app_data = GET_APP_DATA(handler); + memset(app_data, 0, sizeof(app_data_t)); + app_data->count = 1; + app_data->target = "examples"; + + /* Attach the pn_handshaker() handler. This handler deals with endpoint + * events from the peer so we don't have to. + */ + pn_handler_add(handler, pn_handshaker()); + + /* command line options */ + opterr = 0; + while((c = getopt(argc, argv, "i:a:c:t:nhq")) != -1) { + switch(c) { + case 'h': usage(); break; + case 'a': address = optarg; break; + case 'c': + app_data->count = atoi(optarg); + if (app_data->count < 1) usage(); + break; + case 't': app_data->target = optarg; break; + case 'n': app_data->anon = 1; break; + case 'i': container = optarg; break; + case 'q': quiet = 1; break; + default: + usage(); + break; + } + } + if (optind < argc) msgtext = argv[optind]; + + + // create a single message and pre-encode it so we only have to do that + // once. All transmits will use the same pre-encoded message simply for + // speed. + // + pn_message_t *message = pn_message(); + pn_message_set_address(message, app_data->target); + pn_data_t *body = pn_message_body(message); + pn_data_clear(body); + + // This message's body contains a single string + if (pn_data_fill(body, "S", msgtext)) { + fprintf(stderr, "Error building message!\n"); + exit(1); + } + pn_data_rewind(body); + { + // encode the message, expanding the encode buffer as needed + // + size_t len = 128; + char *buf = (char *)malloc(len); + int rc = 0; + do { + rc = pn_message_encode(message, buf, &len); + if (rc == PN_OVERFLOW) { + free(buf); + len *= 2; + buf = malloc(len); + } + } while (rc == PN_OVERFLOW); + app_data->msg_len = len; + app_data->msg_data = buf; + } + pn_decref(message); // message no longer needed + + pn_reactor_t *reactor = pn_reactor(); + pn_connection_t *conn = pn_reactor_connection(reactor, handler); + + // the container name should be unique for each client + pn_connection_set_container(conn, container); + pn_connection_set_hostname(conn, address); // FIXME + + // wait up to 5 seconds for activity before returning from + // pn_reactor_process() + pn_reactor_set_timeout(reactor, 5000); + + pn_reactor_start(reactor); + + while (pn_reactor_process(reactor)) { + /* Returns 'true' until the connection is shut down. + * pn_reactor_process() will return true at least once every 5 seconds + * (due to the timeout). If no timeout was configured, + * pn_reactor_process() returns as soon as it finishes processing all + * pending I/O and events. Once the connection has closed, + * pn_reactor_process() will return false. + */ + } + + return 0; +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
