http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a8703b5c/thirdparty/paho.mqtt.c/src/Socket.c ---------------------------------------------------------------------- diff --git a/thirdparty/paho.mqtt.c/src/Socket.c b/thirdparty/paho.mqtt.c/src/Socket.c new file mode 100644 index 0000000..939dbab --- /dev/null +++ b/thirdparty/paho.mqtt.c/src/Socket.c @@ -0,0 +1,898 @@ +/******************************************************************************* + * Copyright (c) 2009, 2017 IBM Corp. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * Contributors: + * Ian Craggs - initial implementation and documentation + * Ian Craggs - async client updates + * Ian Craggs - fix for bug 484496 + * Juergen Kosel, Ian Craggs - fix for issue #135 + * Ian Craggs - issue #217 + * Ian Craggs - fix for issue #186 + *******************************************************************************/ + +/** + * @file + * \brief Socket related functions + * + * Some other related functions are in the SocketBuffer module + */ + + +#include "Socket.h" +#include "Log.h" +#include "SocketBuffer.h" +#include "Messages.h" +#include "StackTrace.h" +#if defined(OPENSSL) +#include "SSLSocket.h" +#endif + +#include <stdlib.h> +#include <string.h> +#include <signal.h> +#include <ctype.h> + +#include "Heap.h" + +int Socket_setnonblocking(int sock); +int Socket_error(char* aString, int sock); +int Socket_addSocket(int newSd); +int isReady(int socket, fd_set* read_set, fd_set* write_set); +int Socket_writev(int socket, iobuf* iovecs, int count, unsigned long* bytes); +int Socket_close_only(int socket); +int Socket_continueWrite(int socket); +int Socket_continueWrites(fd_set* pwset); +char* Socket_getaddrname(struct sockaddr* sa, int sock); + +#if defined(WIN32) || defined(WIN64) +#define iov_len len +#define iov_base buf +#endif + +/** + * Structure to hold all socket data for the module + */ +Sockets s; +static fd_set wset; + +/** + * Set a socket non-blocking, OS independently + * @param sock the socket to set non-blocking + * @return TCP call error code + */ +int Socket_setnonblocking(int sock) +{ + int rc; +#if defined(WIN32) || defined(WIN64) + u_long flag = 1L; + + FUNC_ENTRY; + rc = ioctl(sock, FIONBIO, &flag); +#else + int flags; + + FUNC_ENTRY; + if ((flags = fcntl(sock, F_GETFL, 0))) + flags = 0; + rc = fcntl(sock, F_SETFL, flags | O_NONBLOCK); +#endif + FUNC_EXIT_RC(rc); + return rc; +} + + +/** + * Gets the specific error corresponding to SOCKET_ERROR + * @param aString the function that was being used when the error occurred + * @param sock the socket on which the error occurred + * @return the specific TCP error code + */ +int Socket_error(char* aString, int sock) +{ +#if defined(WIN32) || defined(WIN64) + int errno; +#endif + + FUNC_ENTRY; +#if defined(WIN32) || defined(WIN64) + errno = WSAGetLastError(); +#endif + if (errno != EINTR && errno != EAGAIN && errno != EINPROGRESS && errno != EWOULDBLOCK) + { + if (strcmp(aString, "shutdown") != 0 || (errno != ENOTCONN && errno != ECONNRESET)) + Log(TRACE_MINIMUM, -1, "Socket error %s(%d) in %s for socket %d", strerror(errno), errno, aString, sock); + } + FUNC_EXIT_RC(errno); + return errno; +} + + +/** + * Initialize the socket module + */ +void Socket_outInitialize(void) +{ +#if defined(WIN32) || defined(WIN64) + WORD winsockVer = 0x0202; + WSADATA wsd; + + FUNC_ENTRY; + WSAStartup(winsockVer, &wsd); +#else + FUNC_ENTRY; + signal(SIGPIPE, SIG_IGN); +#endif + + SocketBuffer_initialize(); + s.clientsds = ListInitialize(); + s.connect_pending = ListInitialize(); + s.write_pending = ListInitialize(); + s.cur_clientsds = NULL; + FD_ZERO(&(s.rset)); /* Initialize the descriptor set */ + FD_ZERO(&(s.pending_wset)); + s.maxfdp1 = 0; + memcpy((void*)&(s.rset_saved), (void*)&(s.rset), sizeof(s.rset_saved)); + FUNC_EXIT; +} + + +/** + * Terminate the socket module + */ +void Socket_outTerminate(void) +{ + FUNC_ENTRY; + ListFree(s.connect_pending); + ListFree(s.write_pending); + ListFree(s.clientsds); + SocketBuffer_terminate(); +#if defined(WIN32) || defined(WIN64) + WSACleanup(); +#endif + FUNC_EXIT; +} + + +/** + * Add a socket to the list of socket to check with select + * @param newSd the new socket to add + */ +int Socket_addSocket(int newSd) +{ + int rc = 0; + + FUNC_ENTRY; + if (ListFindItem(s.clientsds, &newSd, intcompare) == NULL) /* make sure we don't add the same socket twice */ + { + if (s.clientsds->count >= FD_SETSIZE) + { + Log(LOG_ERROR, -1, "addSocket: exceeded FD_SETSIZE %d", FD_SETSIZE); + rc = SOCKET_ERROR; + } + else + { + int* pnewSd = (int*)malloc(sizeof(newSd)); + *pnewSd = newSd; + ListAppend(s.clientsds, pnewSd, sizeof(newSd)); + FD_SET(newSd, &(s.rset_saved)); + s.maxfdp1 = max(s.maxfdp1, newSd + 1); + rc = Socket_setnonblocking(newSd); + if (rc == SOCKET_ERROR) + Log(LOG_ERROR, -1, "addSocket: setnonblocking"); + } + } + else + Log(LOG_ERROR, -1, "addSocket: socket %d already in the list", newSd); + + FUNC_EXIT_RC(rc); + return rc; +} + + +/** + * Don't accept work from a client unless it is accepting work back, i.e. its socket is writeable + * this seems like a reasonable form of flow control, and practically, seems to work. + * @param socket the socket to check + * @param read_set the socket read set (see select doc) + * @param write_set the socket write set (see select doc) + * @return boolean - is the socket ready to go? + */ +int isReady(int socket, fd_set* read_set, fd_set* write_set) +{ + int rc = 1; + + FUNC_ENTRY; + if (ListFindItem(s.connect_pending, &socket, intcompare) && FD_ISSET(socket, write_set)) + ListRemoveItem(s.connect_pending, &socket, intcompare); + else + rc = FD_ISSET(socket, read_set) && FD_ISSET(socket, write_set) && Socket_noPendingWrites(socket); + FUNC_EXIT_RC(rc); + return rc; +} + + +/** + * Returns the next socket ready for communications as indicated by select + * @param more_work flag to indicate more work is waiting, and thus a timeout value of 0 should + * be used for the select + * @param tp the timeout to be used for the select, unless overridden + * @return the socket next ready, or 0 if none is ready + */ +int Socket_getReadySocket(int more_work, struct timeval *tp) +{ + int rc = 0; + static struct timeval zero = {0L, 0L}; /* 0 seconds */ + static struct timeval one = {1L, 0L}; /* 1 second */ + struct timeval timeout = one; + + FUNC_ENTRY; + if (s.clientsds->count == 0) + goto exit; + + if (more_work) + timeout = zero; + else if (tp) + timeout = *tp; + + while (s.cur_clientsds != NULL) + { + if (isReady(*((int*)(s.cur_clientsds->content)), &(s.rset), &wset)) + break; + ListNextElement(s.clientsds, &s.cur_clientsds); + } + + if (s.cur_clientsds == NULL) + { + int rc1; + fd_set pwset; + + memcpy((void*)&(s.rset), (void*)&(s.rset_saved), sizeof(s.rset)); + memcpy((void*)&(pwset), (void*)&(s.pending_wset), sizeof(pwset)); + if ((rc = select(s.maxfdp1, &(s.rset), &pwset, NULL, &timeout)) == SOCKET_ERROR) + { + Socket_error("read select", 0); + goto exit; + } + Log(TRACE_MAX, -1, "Return code %d from read select", rc); + + if (Socket_continueWrites(&pwset) == SOCKET_ERROR) + { + rc = 0; + goto exit; + } + + memcpy((void*)&wset, (void*)&(s.rset_saved), sizeof(wset)); + if ((rc1 = select(s.maxfdp1, NULL, &(wset), NULL, &zero)) == SOCKET_ERROR) + { + Socket_error("write select", 0); + rc = rc1; + goto exit; + } + Log(TRACE_MAX, -1, "Return code %d from write select", rc1); + + if (rc == 0 && rc1 == 0) + goto exit; /* no work to do */ + + s.cur_clientsds = s.clientsds->first; + while (s.cur_clientsds != NULL) + { + int cursock = *((int*)(s.cur_clientsds->content)); + if (isReady(cursock, &(s.rset), &wset)) + break; + ListNextElement(s.clientsds, &s.cur_clientsds); + } + } + + if (s.cur_clientsds == NULL) + rc = 0; + else + { + rc = *((int*)(s.cur_clientsds->content)); + ListNextElement(s.clientsds, &s.cur_clientsds); + } +exit: + FUNC_EXIT_RC(rc); + return rc; +} /* end getReadySocket */ + + +/** + * Reads one byte from a socket + * @param socket the socket to read from + * @param c the character read, returned + * @return completion code + */ +int Socket_getch(int socket, char* c) +{ + int rc = SOCKET_ERROR; + + FUNC_ENTRY; + if ((rc = SocketBuffer_getQueuedChar(socket, c)) != SOCKETBUFFER_INTERRUPTED) + goto exit; + + if ((rc = recv(socket, c, (size_t)1, 0)) == SOCKET_ERROR) + { + int err = Socket_error("recv - getch", socket); + if (err == EWOULDBLOCK || err == EAGAIN) + { + rc = TCPSOCKET_INTERRUPTED; + SocketBuffer_interrupted(socket, 0); + } + } + else if (rc == 0) + rc = SOCKET_ERROR; /* The return value from recv is 0 when the peer has performed an orderly shutdown. */ + else if (rc == 1) + { + SocketBuffer_queueChar(socket, *c); + rc = TCPSOCKET_COMPLETE; + } +exit: + FUNC_EXIT_RC(rc); + return rc; +} + + +/** + * Attempts to read a number of bytes from a socket, non-blocking. If a previous read did not + * finish, then retrieve that data. + * @param socket the socket to read from + * @param bytes the number of bytes to read + * @param actual_len the actual number of bytes read + * @return completion code + */ +char *Socket_getdata(int socket, size_t bytes, size_t* actual_len) +{ + int rc; + char* buf; + + FUNC_ENTRY; + if (bytes == 0) + { + buf = SocketBuffer_complete(socket); + goto exit; + } + + buf = SocketBuffer_getQueuedData(socket, bytes, actual_len); + + if ((rc = recv(socket, buf + (*actual_len), (int)(bytes - (*actual_len)), 0)) == SOCKET_ERROR) + { + rc = Socket_error("recv - getdata", socket); + if (rc != EAGAIN && rc != EWOULDBLOCK) + { + buf = NULL; + goto exit; + } + } + else if (rc == 0) /* rc 0 means the other end closed the socket, albeit "gracefully" */ + { + buf = NULL; + goto exit; + } + else + *actual_len += rc; + + if (*actual_len == bytes) + SocketBuffer_complete(socket); + else /* we didn't read the whole packet */ + { + SocketBuffer_interrupted(socket, *actual_len); + Log(TRACE_MAX, -1, "%d bytes expected but %d bytes now received", bytes, *actual_len); + } +exit: + FUNC_EXIT; + return buf; +} + + +/** + * Indicate whether any data is pending outbound for a socket. + * @return boolean - true == data pending. + */ +int Socket_noPendingWrites(int socket) +{ + int cursock = socket; + return ListFindItem(s.write_pending, &cursock, intcompare) == NULL; +} + + +/** + * Attempts to write a series of iovec buffers to a socket in *one* system call so that + * they are sent as one packet. + * @param socket the socket to write to + * @param iovecs an array of buffers to write + * @param count number of buffers in iovecs + * @param bytes number of bytes actually written returned + * @return completion code, especially TCPSOCKET_INTERRUPTED + */ +int Socket_writev(int socket, iobuf* iovecs, int count, unsigned long* bytes) +{ + int rc; + + FUNC_ENTRY; +#if defined(WIN32) || defined(WIN64) + rc = WSASend(socket, iovecs, count, (LPDWORD)bytes, 0, NULL, NULL); + if (rc == SOCKET_ERROR) + { + int err = Socket_error("WSASend - putdatas", socket); + if (err == EWOULDBLOCK || err == EAGAIN) + rc = TCPSOCKET_INTERRUPTED; + } +#else + *bytes = 0L; + rc = writev(socket, iovecs, count); + if (rc == SOCKET_ERROR) + { + int err = Socket_error("writev - putdatas", socket); + if (err == EWOULDBLOCK || err == EAGAIN) + rc = TCPSOCKET_INTERRUPTED; + } + else + *bytes = rc; +#endif + FUNC_EXIT_RC(rc); + return rc; +} + + +/** + * Attempts to write a series of buffers to a socket in *one* system call so that they are + * sent as one packet. + * @param socket the socket to write to + * @param buf0 the first buffer + * @param buf0len the length of data in the first buffer + * @param count number of buffers + * @param buffers an array of buffers to write + * @param buflens an array of corresponding buffer lengths + * @return completion code, especially TCPSOCKET_INTERRUPTED + */ +int Socket_putdatas(int socket, char* buf0, size_t buf0len, int count, char** buffers, size_t* buflens, int* frees) +{ + unsigned long bytes = 0L; + iobuf iovecs[5]; + int frees1[5]; + int rc = TCPSOCKET_INTERRUPTED, i; + size_t total = buf0len; + + FUNC_ENTRY; + if (!Socket_noPendingWrites(socket)) + { + Log(LOG_SEVERE, -1, "Trying to write to socket %d for which there is already pending output", socket); + rc = SOCKET_ERROR; + goto exit; + } + + for (i = 0; i < count; i++) + total += buflens[i]; + + iovecs[0].iov_base = buf0; + iovecs[0].iov_len = (ULONG)buf0len; + frees1[0] = 1; + for (i = 0; i < count; i++) + { + iovecs[i+1].iov_base = buffers[i]; + iovecs[i+1].iov_len = (ULONG)buflens[i]; + frees1[i+1] = frees[i]; + } + + if ((rc = Socket_writev(socket, iovecs, count+1, &bytes)) != SOCKET_ERROR) + { + if (bytes == total) + rc = TCPSOCKET_COMPLETE; + else + { + int* sockmem = (int*)malloc(sizeof(int)); + Log(TRACE_MIN, -1, "Partial write: %ld bytes of %d actually written on socket %d", + bytes, total, socket); +#if defined(OPENSSL) + SocketBuffer_pendingWrite(socket, NULL, count+1, iovecs, frees1, total, bytes); +#else + SocketBuffer_pendingWrite(socket, count+1, iovecs, frees1, total, bytes); +#endif + *sockmem = socket; + ListAppend(s.write_pending, sockmem, sizeof(int)); + FD_SET(socket, &(s.pending_wset)); + rc = TCPSOCKET_INTERRUPTED; + } + } +exit: + FUNC_EXIT_RC(rc); + return rc; +} + + +/** + * Add a socket to the pending write list, so that it is checked for writing in select. This is used + * in connect processing when the TCP connect is incomplete, as we need to check the socket for both + * ready to read and write states. + * @param socket the socket to add + */ +void Socket_addPendingWrite(int socket) +{ + FD_SET(socket, &(s.pending_wset)); +} + + +/** + * Clear a socket from the pending write list - if one was added with Socket_addPendingWrite + * @param socket the socket to remove + */ +void Socket_clearPendingWrite(int socket) +{ + if (FD_ISSET(socket, &(s.pending_wset))) + FD_CLR(socket, &(s.pending_wset)); +} + + +/** + * Close a socket without removing it from the select list. + * @param socket the socket to close + * @return completion code + */ +int Socket_close_only(int socket) +{ + int rc; + + FUNC_ENTRY; +#if defined(WIN32) || defined(WIN64) + if (shutdown(socket, SD_BOTH) == SOCKET_ERROR) + Socket_error("shutdown", socket); + if ((rc = closesocket(socket)) == SOCKET_ERROR) + Socket_error("close", socket); +#else + if (shutdown(socket, SHUT_WR) == SOCKET_ERROR) + Socket_error("shutdown", socket); + if ((rc = recv(socket, NULL, (size_t)0, 0)) == SOCKET_ERROR) + Socket_error("shutdown", socket); + if ((rc = close(socket)) == SOCKET_ERROR) + Socket_error("close", socket); +#endif + FUNC_EXIT_RC(rc); + return rc; +} + + +/** + * Close a socket and remove it from the select list. + * @param socket the socket to close + * @return completion code + */ +void Socket_close(int socket) +{ + FUNC_ENTRY; + Socket_close_only(socket); + FD_CLR(socket, &(s.rset_saved)); + if (FD_ISSET(socket, &(s.pending_wset))) + FD_CLR(socket, &(s.pending_wset)); + if (s.cur_clientsds != NULL && *(int*)(s.cur_clientsds->content) == socket) + s.cur_clientsds = s.cur_clientsds->next; + ListRemoveItem(s.connect_pending, &socket, intcompare); + ListRemoveItem(s.write_pending, &socket, intcompare); + SocketBuffer_cleanup(socket); + + if (ListRemoveItem(s.clientsds, &socket, intcompare)) + Log(TRACE_MIN, -1, "Removed socket %d", socket); + else + Log(LOG_ERROR, -1, "Failed to remove socket %d", socket); + if (socket + 1 >= s.maxfdp1) + { + /* now we have to reset s.maxfdp1 */ + ListElement* cur_clientsds = NULL; + + s.maxfdp1 = 0; + while (ListNextElement(s.clientsds, &cur_clientsds)) + s.maxfdp1 = max(*((int*)(cur_clientsds->content)), s.maxfdp1); + ++(s.maxfdp1); + Log(TRACE_MAX, -1, "Reset max fdp1 to %d", s.maxfdp1); + } + FUNC_EXIT; +} + + +/** + * Create a new socket and TCP connect to an address/port + * @param addr the address string + * @param port the TCP port + * @param sock returns the new socket + * @return completion code + */ +int Socket_new(char* addr, int port, int* sock) +{ + int type = SOCK_STREAM; + struct sockaddr_in address; +#if defined(AF_INET6) + struct sockaddr_in6 address6; +#endif + int rc = SOCKET_ERROR; +#if defined(WIN32) || defined(WIN64) + short family; +#else + sa_family_t family = AF_INET; +#endif + struct addrinfo *result = NULL; + struct addrinfo hints = {0, AF_UNSPEC, SOCK_STREAM, IPPROTO_TCP, 0, NULL, NULL, NULL}; + + FUNC_ENTRY; + *sock = -1; + memset(&address6, '\0', sizeof(address6)); + + if (addr[0] == '[') + ++addr; + + if ((rc = getaddrinfo(addr, NULL, &hints, &result)) == 0) + { + struct addrinfo* res = result; + + while (res) + { /* prefer ip4 addresses */ + if (res->ai_family == AF_INET || res->ai_next == NULL) + break; + res = res->ai_next; + } + + if (res == NULL) + rc = -1; + else +#if defined(AF_INET6) + if (res->ai_family == AF_INET6) + { + address6.sin6_port = htons(port); + address6.sin6_family = family = AF_INET6; + memcpy(&address6.sin6_addr, &((struct sockaddr_in6*)(res->ai_addr))->sin6_addr, sizeof(address6.sin6_addr)); + } + else +#endif + if (res->ai_family == AF_INET) + { + address.sin_port = htons(port); + address.sin_family = family = AF_INET; + address.sin_addr = ((struct sockaddr_in*)(res->ai_addr))->sin_addr; + } + else + rc = -1; + + freeaddrinfo(result); + } + else + Log(LOG_ERROR, -1, "getaddrinfo failed for addr %s with rc %d", addr, rc); + + if (rc != 0) + Log(LOG_ERROR, -1, "%s is not a valid IP address", addr); + else + { + *sock = (int)socket(family, type, 0); + if (*sock == INVALID_SOCKET) + rc = Socket_error("socket", *sock); + else + { +#if defined(NOSIGPIPE) + int opt = 1; + + if (setsockopt(*sock, SOL_SOCKET, SO_NOSIGPIPE, (void*)&opt, sizeof(opt)) != 0) + Log(LOG_ERROR, -1, "Could not set SO_NOSIGPIPE for socket %d", *sock); +#endif + + Log(TRACE_MIN, -1, "New socket %d for %s, port %d", *sock, addr, port); + if (Socket_addSocket(*sock) == SOCKET_ERROR) + rc = Socket_error("addSocket", *sock); + else + { + /* this could complete immmediately, even though we are non-blocking */ + if (family == AF_INET) + rc = connect(*sock, (struct sockaddr*)&address, sizeof(address)); + #if defined(AF_INET6) + else + rc = connect(*sock, (struct sockaddr*)&address6, sizeof(address6)); + #endif + if (rc == SOCKET_ERROR) + rc = Socket_error("connect", *sock); + if (rc == EINPROGRESS || rc == EWOULDBLOCK) + { + int* pnewSd = (int*)malloc(sizeof(int)); + *pnewSd = *sock; + ListAppend(s.connect_pending, pnewSd, sizeof(int)); + Log(TRACE_MIN, 15, "Connect pending"); + } + } + /* Prevent socket leak by closing unusable sockets, + * as reported in + * https://github.com/eclipse/paho.mqtt.c/issues/135 + */ + if (rc != 0 && (rc != EINPROGRESS) && (rc != EWOULDBLOCK)) + { + Socket_close(*sock); /* close socket and remove from our list of sockets */ + *sock = -1; /* as initialized before */ + } + } + } + FUNC_EXIT_RC(rc); + return rc; +} + + +static Socket_writeComplete* writecomplete = NULL; + +void Socket_setWriteCompleteCallback(Socket_writeComplete* mywritecomplete) +{ + writecomplete = mywritecomplete; +} + +/** + * Continue an outstanding write for a particular socket + * @param socket that socket + * @return completion code + */ +int Socket_continueWrite(int socket) +{ + int rc = 0; + pending_writes* pw; + unsigned long curbuflen = 0L, /* cumulative total of buffer lengths */ + bytes; + int curbuf = -1, i; + iobuf iovecs1[5]; + + FUNC_ENTRY; + pw = SocketBuffer_getWrite(socket); + +#if defined(OPENSSL) + if (pw->ssl) + { + rc = SSLSocket_continueWrite(pw); + goto exit; + } +#endif + + for (i = 0; i < pw->count; ++i) + { + if (pw->bytes <= curbuflen) + { /* if previously written length is less than the buffer we are currently looking at, + add the whole buffer */ + iovecs1[++curbuf].iov_len = pw->iovecs[i].iov_len; + iovecs1[curbuf].iov_base = pw->iovecs[i].iov_base; + } + else if (pw->bytes < curbuflen + pw->iovecs[i].iov_len) + { /* if previously written length is in the middle of the buffer we are currently looking at, + add some of the buffer */ + size_t offset = pw->bytes - curbuflen; + iovecs1[++curbuf].iov_len = pw->iovecs[i].iov_len - (ULONG)offset; + iovecs1[curbuf].iov_base = (char*)pw->iovecs[i].iov_base + offset; + break; + } + curbuflen += pw->iovecs[i].iov_len; + } + + if ((rc = Socket_writev(socket, iovecs1, curbuf+1, &bytes)) != SOCKET_ERROR) + { + pw->bytes += bytes; + if ((rc = (pw->bytes == pw->total))) + { /* topic and payload buffers are freed elsewhere, when all references to them have been removed */ + for (i = 0; i < pw->count; i++) + { + if (pw->frees[i]) + free(pw->iovecs[i].iov_base); + } + Log(TRACE_MIN, -1, "ContinueWrite: partial write now complete for socket %d", socket); + } + else + Log(TRACE_MIN, -1, "ContinueWrite wrote +%lu bytes on socket %d", bytes, socket); + } +#if defined(OPENSSL) +exit: +#endif + FUNC_EXIT_RC(rc); + return rc; +} + + +/** + * Continue any outstanding writes for a socket set + * @param pwset the set of sockets + * @return completion code + */ +int Socket_continueWrites(fd_set* pwset) +{ + int rc1 = 0; + ListElement* curpending = s.write_pending->first; + + FUNC_ENTRY; + while (curpending) + { + int socket = *(int*)(curpending->content); + if (FD_ISSET(socket, pwset) && Socket_continueWrite(socket)) + { + if (!SocketBuffer_writeComplete(socket)) + Log(LOG_SEVERE, -1, "Failed to remove pending write from socket buffer list"); + FD_CLR(socket, &(s.pending_wset)); + if (!ListRemove(s.write_pending, curpending->content)) + { + Log(LOG_SEVERE, -1, "Failed to remove pending write from list"); + ListNextElement(s.write_pending, &curpending); + } + curpending = s.write_pending->current; + + if (writecomplete) + (*writecomplete)(socket); + } + else + ListNextElement(s.write_pending, &curpending); + } + FUNC_EXIT_RC(rc1); + return rc1; +} + + +/** + * Convert a numeric address to character string + * @param sa socket numerical address + * @param sock socket + * @return the peer information + */ +char* Socket_getaddrname(struct sockaddr* sa, int sock) +{ +/** + * maximum length of the address string + */ +#define ADDRLEN INET6_ADDRSTRLEN+1 +/** + * maximum length of the port string + */ +#define PORTLEN 10 + static char addr_string[ADDRLEN + PORTLEN]; + +#if defined(WIN32) || defined(WIN64) + int buflen = ADDRLEN*2; + wchar_t buf[ADDRLEN*2]; + if (WSAAddressToStringW(sa, sizeof(struct sockaddr_in6), NULL, buf, (LPDWORD)&buflen) == SOCKET_ERROR) + Socket_error("WSAAddressToString", sock); + else + wcstombs(addr_string, buf, sizeof(addr_string)); + /* TODO: append the port information - format: [00:00:00::]:port */ + /* strcpy(&addr_string[strlen(addr_string)], "what?"); */ +#else + struct sockaddr_in *sin = (struct sockaddr_in *)sa; + inet_ntop(sin->sin_family, &sin->sin_addr, addr_string, ADDRLEN); + sprintf(&addr_string[strlen(addr_string)], ":%d", ntohs(sin->sin_port)); +#endif + return addr_string; +} + + +/** + * Get information about the other end connected to a socket + * @param sock the socket to inquire on + * @return the peer information + */ +char* Socket_getpeer(int sock) +{ + struct sockaddr_in6 sa; + socklen_t sal = sizeof(sa); + int rc; + + if ((rc = getpeername(sock, (struct sockaddr*)&sa, &sal)) == SOCKET_ERROR) + { + Socket_error("getpeername", sock); + return "unknown"; + } + + return Socket_getaddrname((struct sockaddr*)&sa, sock); +} + + +#if defined(Socket_TEST) + +int main(int argc, char *argv[]) +{ + Socket_connect("127.0.0.1", 1883); + Socket_connect("localhost", 1883); + Socket_connect("loadsadsacalhost", 1883); +} + +#endif
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a8703b5c/thirdparty/paho.mqtt.c/src/Socket.h ---------------------------------------------------------------------- diff --git a/thirdparty/paho.mqtt.c/src/Socket.h b/thirdparty/paho.mqtt.c/src/Socket.h new file mode 100644 index 0000000..dbf21b4 --- /dev/null +++ b/thirdparty/paho.mqtt.c/src/Socket.h @@ -0,0 +1,143 @@ +/******************************************************************************* + * Copyright (c) 2009, 2014 IBM Corp. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * Contributors: + * Ian Craggs - initial implementation and documentation + * Ian Craggs - async client updates + *******************************************************************************/ + +#if !defined(SOCKET_H) +#define SOCKET_H + +#include <sys/types.h> + +#if defined(WIN32) || defined(WIN64) +#include <winsock2.h> +#include <ws2tcpip.h> +#define MAXHOSTNAMELEN 256 +#if !defined(SSLSOCKET_H) +#undef EAGAIN +#define EAGAIN WSAEWOULDBLOCK +#undef EINTR +#define EINTR WSAEINTR +#undef EINPROGRESS +#define EINPROGRESS WSAEINPROGRESS +#undef EWOULDBLOCK +#define EWOULDBLOCK WSAEWOULDBLOCK +#undef ENOTCONN +#define ENOTCONN WSAENOTCONN +#undef ECONNRESET +#define ECONNRESET WSAECONNRESET +#undef ETIMEDOUT +#define ETIMEDOUT WAIT_TIMEOUT +#endif +#define ioctl ioctlsocket +#define socklen_t int +#else +#define INVALID_SOCKET SOCKET_ERROR +#include <sys/socket.h> +#if !defined(_WRS_KERNEL) +#include <sys/param.h> +#include <sys/time.h> +#include <sys/select.h> +#include <sys/uio.h> +#else +#include <selectLib.h> +#endif +#include <netinet/in.h> +#include <netinet/tcp.h> +#include <arpa/inet.h> +#include <netdb.h> +#include <stdio.h> +#include <unistd.h> +#include <errno.h> +#include <fcntl.h> +#include <unistd.h> +#define ULONG size_t +#endif + +/** socket operation completed successfully */ +#define TCPSOCKET_COMPLETE 0 +#if !defined(SOCKET_ERROR) + /** error in socket operation */ + #define SOCKET_ERROR -1 +#endif +/** must be the same as SOCKETBUFFER_INTERRUPTED */ +#define TCPSOCKET_INTERRUPTED -22 +#define SSL_FATAL -3 + +#if !defined(INET6_ADDRSTRLEN) +#define INET6_ADDRSTRLEN 46 /** only needed for gcc/cygwin on windows */ +#endif + + +#if !defined(max) +#define max(A,B) ( (A) > (B) ? (A):(B)) +#endif + +#include "LinkedList.h" + +/*BE +def FD_SET +{ + 128 n8 "data" +} + +def SOCKETS +{ + FD_SET "rset" + FD_SET "rset_saved" + n32 dec "maxfdp1" + n32 ptr INTList "clientsds" + n32 ptr INTItem "cur_clientsds" + n32 ptr INTList "connect_pending" + n32 ptr INTList "write_pending" + FD_SET "pending_wset" +} +BE*/ + + +/** + * Structure to hold all socket data for the module + */ +typedef struct +{ + fd_set rset, /**< socket read set (see select doc) */ + rset_saved; /**< saved socket read set */ + int maxfdp1; /**< max descriptor used +1 (again see select doc) */ + List* clientsds; /**< list of client socket descriptors */ + ListElement* cur_clientsds; /**< current client socket descriptor (iterator) */ + List* connect_pending; /**< list of sockets for which a connect is pending */ + List* write_pending; /**< list of sockets for which a write is pending */ + fd_set pending_wset; /**< socket pending write set for select */ +} Sockets; + + +void Socket_outInitialize(void); +void Socket_outTerminate(void); +int Socket_getReadySocket(int more_work, struct timeval *tp); +int Socket_getch(int socket, char* c); +char *Socket_getdata(int socket, size_t bytes, size_t* actual_len); +int Socket_putdatas(int socket, char* buf0, size_t buf0len, int count, char** buffers, size_t* buflens, int* frees); +void Socket_close(int socket); +int Socket_new(char* addr, int port, int* socket); + +int Socket_noPendingWrites(int socket); +char* Socket_getpeer(int sock); + +void Socket_addPendingWrite(int socket); +void Socket_clearPendingWrite(int socket); + +typedef void Socket_writeComplete(int socket); +void Socket_setWriteCompleteCallback(Socket_writeComplete*); + +#endif /* SOCKET_H */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a8703b5c/thirdparty/paho.mqtt.c/src/SocketBuffer.c ---------------------------------------------------------------------- diff --git a/thirdparty/paho.mqtt.c/src/SocketBuffer.c b/thirdparty/paho.mqtt.c/src/SocketBuffer.c new file mode 100644 index 0000000..ba640f1 --- /dev/null +++ b/thirdparty/paho.mqtt.c/src/SocketBuffer.c @@ -0,0 +1,413 @@ +/******************************************************************************* + * Copyright (c) 2009, 2017 IBM Corp. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * Contributors: + * Ian Craggs - initial API and implementation and/or initial documentation + * Ian Craggs, Allan Stockdill-Mander - SSL updates + * Ian Craggs - fix for issue #244, issue #20 + *******************************************************************************/ + +/** + * @file + * \brief Socket buffering related functions + * + * Some other related functions are in the Socket module + */ +#include "SocketBuffer.h" +#include "LinkedList.h" +#include "Log.h" +#include "Messages.h" +#include "StackTrace.h" + +#include <stdlib.h> +#include <stdio.h> +#include <string.h> + +#include "Heap.h" + +#if defined(WIN32) || defined(WIN64) +#define iov_len len +#define iov_base buf +#endif + +/** + * Default input queue buffer + */ +static socket_queue* def_queue; + +/** + * List of queued input buffers + */ +static List* queues; + +/** + * List of queued write buffers + */ +static List writes; + + +int socketcompare(void* a, void* b); +void SocketBuffer_newDefQ(void); +void SocketBuffer_freeDefQ(void); +int pending_socketcompare(void* a, void* b); + + +/** + * List callback function for comparing socket_queues by socket + * @param a first integer value + * @param b second integer value + * @return boolean indicating whether a and b are equal + */ +int socketcompare(void* a, void* b) +{ + return ((socket_queue*)a)->socket == *(int*)b; +} + + +/** + * Create a new default queue when one has just been used. + */ +void SocketBuffer_newDefQ(void) +{ + def_queue = malloc(sizeof(socket_queue)); + def_queue->buflen = 1000; + def_queue->buf = malloc(def_queue->buflen); + def_queue->socket = def_queue->index = 0; + def_queue->buflen = def_queue->datalen = 0; +} + + +/** + * Initialize the socketBuffer module + */ +void SocketBuffer_initialize(void) +{ + FUNC_ENTRY; + SocketBuffer_newDefQ(); + queues = ListInitialize(); + ListZero(&writes); + FUNC_EXIT; +} + + +/** + * Free the default queue memory + */ +void SocketBuffer_freeDefQ(void) +{ + free(def_queue->buf); + free(def_queue); +} + + +/** + * Terminate the socketBuffer module + */ +void SocketBuffer_terminate(void) +{ + ListElement* cur = NULL; + ListEmpty(&writes); + + FUNC_ENTRY; + while (ListNextElement(queues, &cur)) + free(((socket_queue*)(cur->content))->buf); + ListFree(queues); + SocketBuffer_freeDefQ(); + FUNC_EXIT; +} + + +/** + * Cleanup any buffers for a specific socket + * @param socket the socket to clean up + */ +void SocketBuffer_cleanup(int socket) +{ + FUNC_ENTRY; + SocketBuffer_writeComplete(socket); /* clean up write buffers */ + if (ListFindItem(queues, &socket, socketcompare)) + { + free(((socket_queue*)(queues->current->content))->buf); + ListRemove(queues, queues->current->content); + } + if (def_queue->socket == socket) + { + def_queue->socket = def_queue->index = 0; + def_queue->headerlen = def_queue->datalen = 0; + } + FUNC_EXIT; +} + + +/** + * Get any queued data for a specific socket + * @param socket the socket to get queued data for + * @param bytes the number of bytes of data to retrieve + * @param actual_len the actual length returned + * @return the actual data + */ +char* SocketBuffer_getQueuedData(int socket, size_t bytes, size_t* actual_len) +{ + socket_queue* queue = NULL; + + FUNC_ENTRY; + if (ListFindItem(queues, &socket, socketcompare)) + { /* if there is queued data for this socket, add any data read to it */ + queue = (socket_queue*)(queues->current->content); + *actual_len = queue->datalen; + } + else + { + *actual_len = 0; + queue = def_queue; + } + if (bytes > queue->buflen) + { + if (queue->datalen > 0) + { + void* newmem = malloc(bytes); + memcpy(newmem, queue->buf, queue->datalen); + free(queue->buf); + queue->buf = newmem; + } + else + queue->buf = realloc(queue->buf, bytes); + queue->buflen = bytes; + } + + FUNC_EXIT; + return queue->buf; +} + + +/** + * Get any queued character for a specific socket + * @param socket the socket to get queued data for + * @param c the character returned if any + * @return completion code + */ +int SocketBuffer_getQueuedChar(int socket, char* c) +{ + int rc = SOCKETBUFFER_INTERRUPTED; + + FUNC_ENTRY; + if (ListFindItem(queues, &socket, socketcompare)) + { /* if there is queued data for this socket, read that first */ + socket_queue* queue = (socket_queue*)(queues->current->content); + if (queue->index < queue->headerlen) + { + *c = queue->fixed_header[(queue->index)++]; + Log(TRACE_MAX, -1, "index is now %d, headerlen %d", queue->index, queue->headerlen); + rc = SOCKETBUFFER_COMPLETE; + goto exit; + } + else if (queue->index > 4) + { + Log(LOG_FATAL, -1, "header is already at full length"); + rc = SOCKET_ERROR; + goto exit; + } + } +exit: + FUNC_EXIT_RC(rc); + return rc; /* there was no queued char if rc is SOCKETBUFFER_INTERRUPTED*/ +} + + +/** + * A socket read was interrupted so we need to queue data + * @param socket the socket to get queued data for + * @param actual_len the actual length of data that was read + */ +void SocketBuffer_interrupted(int socket, size_t actual_len) +{ + socket_queue* queue = NULL; + + FUNC_ENTRY; + if (ListFindItem(queues, &socket, socketcompare)) + queue = (socket_queue*)(queues->current->content); + else /* new saved queue */ + { + queue = def_queue; + /* if SocketBuffer_queueChar() has not yet been called, then the socket number + in def_queue will not have been set. Issue #244. + If actual_len == 0 then we may not need to do anything - I'll leave that + optimization for another time. */ + queue->socket = socket; + ListAppend(queues, def_queue, sizeof(socket_queue)+def_queue->buflen); + SocketBuffer_newDefQ(); + } + queue->index = 0; + queue->datalen = actual_len; + FUNC_EXIT; +} + + +/** + * A socket read has now completed so we can get rid of the queue + * @param socket the socket for which the operation is now complete + * @return pointer to the default queue data + */ +char* SocketBuffer_complete(int socket) +{ + FUNC_ENTRY; + if (ListFindItem(queues, &socket, socketcompare)) + { + socket_queue* queue = (socket_queue*)(queues->current->content); + SocketBuffer_freeDefQ(); + def_queue = queue; + ListDetach(queues, queue); + } + def_queue->socket = def_queue->index = 0; + def_queue->headerlen = def_queue->datalen = 0; + FUNC_EXIT; + return def_queue->buf; +} + + +/** + * A socket operation had now completed so we can get rid of the queue + * @param socket the socket for which the operation is now complete + * @param c the character to queue + */ +void SocketBuffer_queueChar(int socket, char c) +{ + int error = 0; + socket_queue* curq = def_queue; + + FUNC_ENTRY; + if (ListFindItem(queues, &socket, socketcompare)) + curq = (socket_queue*)(queues->current->content); + else if (def_queue->socket == 0) + { + def_queue->socket = socket; + def_queue->index = 0; + def_queue->datalen = 0; + } + else if (def_queue->socket != socket) + { + Log(LOG_FATAL, -1, "attempt to reuse socket queue"); + error = 1; + } + if (curq->index > 4) + { + Log(LOG_FATAL, -1, "socket queue fixed_header field full"); + error = 1; + } + if (!error) + { + curq->fixed_header[(curq->index)++] = c; + curq->headerlen = curq->index; + } + Log(TRACE_MAX, -1, "queueChar: index is now %d, headerlen %d", curq->index, curq->headerlen); + FUNC_EXIT; +} + + +/** + * A socket write was interrupted so store the remaining data + * @param socket the socket for which the write was interrupted + * @param count the number of iovec buffers + * @param iovecs buffer array + * @param total total data length to be written + * @param bytes actual data length that was written + */ +#if defined(OPENSSL) +void SocketBuffer_pendingWrite(int socket, SSL* ssl, int count, iobuf* iovecs, int* frees, size_t total, size_t bytes) +#else +void SocketBuffer_pendingWrite(int socket, int count, iobuf* iovecs, int* frees, size_t total, size_t bytes) +#endif +{ + int i = 0; + pending_writes* pw = NULL; + + FUNC_ENTRY; + /* store the buffers until the whole packet is written */ + pw = malloc(sizeof(pending_writes)); + pw->socket = socket; +#if defined(OPENSSL) + pw->ssl = ssl; +#endif + pw->bytes = bytes; + pw->total = total; + pw->count = count; + for (i = 0; i < count; i++) + { + pw->iovecs[i] = iovecs[i]; + pw->frees[i] = frees[i]; + } + ListAppend(&writes, pw, sizeof(pw) + total); + FUNC_EXIT; +} + + +/** + * List callback function for comparing pending_writes by socket + * @param a first integer value + * @param b second integer value + * @return boolean indicating whether a and b are equal + */ +int pending_socketcompare(void* a, void* b) +{ + return ((pending_writes*)a)->socket == *(int*)b; +} + + +/** + * Get any queued write data for a specific socket + * @param socket the socket to get queued data for + * @return pointer to the queued data or NULL + */ +pending_writes* SocketBuffer_getWrite(int socket) +{ + ListElement* le = ListFindItem(&writes, &socket, pending_socketcompare); + return (le) ? (pending_writes*)(le->content) : NULL; +} + + +/** + * A socket write has now completed so we can get rid of the queue + * @param socket the socket for which the operation is now complete + * @return completion code, boolean - was the queue removed? + */ +int SocketBuffer_writeComplete(int socket) +{ + return ListRemoveItem(&writes, &socket, pending_socketcompare); +} + + +/** + * Update the queued write data for a socket in the case of QoS 0 messages. + * @param socket the socket for which the operation is now complete + * @param topic the topic of the QoS 0 write + * @param payload the payload of the QoS 0 write + * @return pointer to the updated queued data structure, or NULL + */ +pending_writes* SocketBuffer_updateWrite(int socket, char* topic, char* payload) +{ + pending_writes* pw = NULL; + ListElement* le = NULL; + + FUNC_ENTRY; + if ((le = ListFindItem(&writes, &socket, pending_socketcompare)) != NULL) + { + pw = (pending_writes*)(le->content); + if (pw->count == 4) + { + pw->iovecs[2].iov_base = topic; + pw->iovecs[3].iov_base = payload; + } + } + + FUNC_EXIT; + return pw; +} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a8703b5c/thirdparty/paho.mqtt.c/src/SocketBuffer.h ---------------------------------------------------------------------- diff --git a/thirdparty/paho.mqtt.c/src/SocketBuffer.h b/thirdparty/paho.mqtt.c/src/SocketBuffer.h new file mode 100644 index 0000000..f7702dc --- /dev/null +++ b/thirdparty/paho.mqtt.c/src/SocketBuffer.h @@ -0,0 +1,84 @@ +/******************************************************************************* + * Copyright (c) 2009, 2014 IBM Corp. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * Contributors: + * Ian Craggs - initial API and implementation and/or initial documentation + * Ian Craggs, Allan Stockdill-Mander - SSL updates + *******************************************************************************/ + +#if !defined(SOCKETBUFFER_H) +#define SOCKETBUFFER_H + +#if defined(WIN32) || defined(WIN64) +#include <winsock2.h> +#else +#include <sys/socket.h> +#endif + +#if defined(OPENSSL) +#include <openssl/ssl.h> +#endif + +#if defined(WIN32) || defined(WIN64) + typedef WSABUF iobuf; +#else + typedef struct iovec iobuf; +#endif + +typedef struct +{ + int socket; + unsigned int index; + size_t headerlen; + char fixed_header[5]; /**< header plus up to 4 length bytes */ + size_t buflen, /**< total length of the buffer */ + datalen; /**< current length of data in buf */ + char* buf; +} socket_queue; + +typedef struct +{ + int socket, count; + size_t total; +#if defined(OPENSSL) + SSL* ssl; +#endif + size_t bytes; + iobuf iovecs[5]; + int frees[5]; +} pending_writes; + +#define SOCKETBUFFER_COMPLETE 0 +#if !defined(SOCKET_ERROR) + #define SOCKET_ERROR -1 +#endif +#define SOCKETBUFFER_INTERRUPTED -22 /* must be the same value as TCPSOCKET_INTERRUPTED */ + +void SocketBuffer_initialize(void); +void SocketBuffer_terminate(void); +void SocketBuffer_cleanup(int socket); +char* SocketBuffer_getQueuedData(int socket, size_t bytes, size_t* actual_len); +int SocketBuffer_getQueuedChar(int socket, char* c); +void SocketBuffer_interrupted(int socket, size_t actual_len); +char* SocketBuffer_complete(int socket); +void SocketBuffer_queueChar(int socket, char c); + +#if defined(OPENSSL) +void SocketBuffer_pendingWrite(int socket, SSL* ssl, int count, iobuf* iovecs, int* frees, size_t total, size_t bytes); +#else +void SocketBuffer_pendingWrite(int socket, int count, iobuf* iovecs, int* frees, size_t total, size_t bytes); +#endif +pending_writes* SocketBuffer_getWrite(int socket); +int SocketBuffer_writeComplete(int socket); +pending_writes* SocketBuffer_updateWrite(int socket, char* topic, char* payload); + +#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a8703b5c/thirdparty/paho.mqtt.c/src/StackTrace.c ---------------------------------------------------------------------- diff --git a/thirdparty/paho.mqtt.c/src/StackTrace.c b/thirdparty/paho.mqtt.c/src/StackTrace.c new file mode 100644 index 0000000..dd55f71 --- /dev/null +++ b/thirdparty/paho.mqtt.c/src/StackTrace.c @@ -0,0 +1,207 @@ +/******************************************************************************* + * Copyright (c) 2009, 2014 IBM Corp. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * Contributors: + * Ian Craggs - initial API and implementation and/or initial documentation + *******************************************************************************/ + +#include "StackTrace.h" +#include "Log.h" +#include "LinkedList.h" + +#include "Clients.h" +#include "Thread.h" + +#include <stdio.h> +#include <string.h> +#include <stdlib.h> + +#if defined(WIN32) || defined(WIN64) +#define snprintf _snprintf +#endif + +/*BE +def STACKENTRY +{ + n32 ptr STRING open "name" + n32 dec "line" +} + +defList(STACKENTRY) +BE*/ + +#define MAX_STACK_DEPTH 50 +#define MAX_FUNCTION_NAME_LENGTH 30 +#define MAX_THREADS 255 + +typedef struct +{ + thread_id_type threadid; + char name[MAX_FUNCTION_NAME_LENGTH]; + int line; +} stackEntry; + +typedef struct +{ + thread_id_type id; + int maxdepth; + int current_depth; + stackEntry callstack[MAX_STACK_DEPTH]; +} threadEntry; + +#include "StackTrace.h" + +static int thread_count = 0; +static threadEntry threads[MAX_THREADS]; +static threadEntry *cur_thread = NULL; + +#if defined(WIN32) || defined(WIN64) +mutex_type stack_mutex; +#else +static pthread_mutex_t stack_mutex_store = PTHREAD_MUTEX_INITIALIZER; +static mutex_type stack_mutex = &stack_mutex_store; +#endif + + +int setStack(int create); + + +int setStack(int create) +{ + int i = -1; + thread_id_type curid = Thread_getid(); + + cur_thread = NULL; + for (i = 0; i < MAX_THREADS && i < thread_count; ++i) + { + if (threads[i].id == curid) + { + cur_thread = &threads[i]; + break; + } + } + + if (cur_thread == NULL && create && thread_count < MAX_THREADS) + { + cur_thread = &threads[thread_count]; + cur_thread->id = curid; + cur_thread->maxdepth = 0; + cur_thread->current_depth = 0; + ++thread_count; + } + return cur_thread != NULL; /* good == 1 */ +} + +void StackTrace_entry(const char* name, int line, enum LOG_LEVELS trace_level) +{ + Thread_lock_mutex(stack_mutex); + if (!setStack(1)) + goto exit; + if (trace_level != -1) + Log_stackTrace(trace_level, 9, (int)cur_thread->id, cur_thread->current_depth, name, line, NULL); + strncpy(cur_thread->callstack[cur_thread->current_depth].name, name, sizeof(cur_thread->callstack[0].name)-1); + cur_thread->callstack[(cur_thread->current_depth)++].line = line; + if (cur_thread->current_depth > cur_thread->maxdepth) + cur_thread->maxdepth = cur_thread->current_depth; + if (cur_thread->current_depth >= MAX_STACK_DEPTH) + Log(LOG_FATAL, -1, "Max stack depth exceeded"); +exit: + Thread_unlock_mutex(stack_mutex); +} + + +void StackTrace_exit(const char* name, int line, void* rc, enum LOG_LEVELS trace_level) +{ + Thread_lock_mutex(stack_mutex); + if (!setStack(0)) + goto exit; + if (--(cur_thread->current_depth) < 0) + Log(LOG_FATAL, -1, "Minimum stack depth exceeded for thread %lu", cur_thread->id); + if (strncmp(cur_thread->callstack[cur_thread->current_depth].name, name, sizeof(cur_thread->callstack[0].name)-1) != 0) + Log(LOG_FATAL, -1, "Stack mismatch. Entry:%s Exit:%s\n", cur_thread->callstack[cur_thread->current_depth].name, name); + if (trace_level != -1) + { + if (rc == NULL) + Log_stackTrace(trace_level, 10, (int)cur_thread->id, cur_thread->current_depth, name, line, NULL); + else + Log_stackTrace(trace_level, 11, (int)cur_thread->id, cur_thread->current_depth, name, line, (int*)rc); + } +exit: + Thread_unlock_mutex(stack_mutex); +} + + +void StackTrace_printStack(FILE* dest) +{ + FILE* file = stdout; + int t = 0; + + if (dest) + file = dest; + for (t = 0; t < thread_count; ++t) + { + threadEntry *cur_thread = &threads[t]; + + if (cur_thread->id > 0) + { + int i = cur_thread->current_depth - 1; + + fprintf(file, "=========== Start of stack trace for thread %lu ==========\n", (unsigned long)cur_thread->id); + if (i >= 0) + { + fprintf(file, "%s (%d)\n", cur_thread->callstack[i].name, cur_thread->callstack[i].line); + while (--i >= 0) + fprintf(file, " at %s (%d)\n", cur_thread->callstack[i].name, cur_thread->callstack[i].line); + } + fprintf(file, "=========== End of stack trace for thread %lu ==========\n\n", (unsigned long)cur_thread->id); + } + } + if (file != stdout && file != stderr && file != NULL) + fclose(file); +} + + +char* StackTrace_get(thread_id_type threadid) +{ + int bufsize = 256; + char* buf = NULL; + int t = 0; + + if ((buf = malloc(bufsize)) == NULL) + goto exit; + buf[0] = '\0'; + for (t = 0; t < thread_count; ++t) + { + threadEntry *cur_thread = &threads[t]; + + if (cur_thread->id == threadid) + { + int i = cur_thread->current_depth - 1; + int curpos = 0; + + if (i >= 0) + { + curpos += snprintf(&buf[curpos], bufsize - curpos -1, + "%s (%d)\n", cur_thread->callstack[i].name, cur_thread->callstack[i].line); + while (--i >= 0) + curpos += snprintf(&buf[curpos], bufsize - curpos -1, + " at %s (%d)\n", cur_thread->callstack[i].name, cur_thread->callstack[i].line); + if (buf[--curpos] == '\n') + buf[curpos] = '\0'; + } + break; + } + } +exit: + return buf; +} + http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a8703b5c/thirdparty/paho.mqtt.c/src/StackTrace.h ---------------------------------------------------------------------- diff --git a/thirdparty/paho.mqtt.c/src/StackTrace.h b/thirdparty/paho.mqtt.c/src/StackTrace.h new file mode 100644 index 0000000..f4ebba2 --- /dev/null +++ b/thirdparty/paho.mqtt.c/src/StackTrace.h @@ -0,0 +1,71 @@ +/******************************************************************************* + * Copyright (c) 2009, 2014 IBM Corp. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * Contributors: + * Ian Craggs - initial API and implementation and/or initial documentation + *******************************************************************************/ + +#ifndef STACKTRACE_H_ +#define STACKTRACE_H_ + +#include <stdio.h> +#include "Log.h" +#include "Thread.h" + +#if defined(NOSTACKTRACE) +#define FUNC_ENTRY +#define FUNC_ENTRY_NOLOG +#define FUNC_ENTRY_MED +#define FUNC_ENTRY_MAX +#define FUNC_EXIT +#define FUNC_EXIT_NOLOG +#define FUNC_EXIT_MED +#define FUNC_EXIT_MAX +#define FUNC_EXIT_RC(x) +#define FUNC_EXIT_MED_RC(x) +#define FUNC_EXIT_MAX_RC(x) +#else +#if defined(WIN32) || defined(WIN64) +#define inline __inline +#define FUNC_ENTRY StackTrace_entry(__FUNCTION__, __LINE__, TRACE_MINIMUM) +#define FUNC_ENTRY_NOLOG StackTrace_entry(__FUNCTION__, __LINE__, -1) +#define FUNC_ENTRY_MED StackTrace_entry(__FUNCTION__, __LINE__, TRACE_MEDIUM) +#define FUNC_ENTRY_MAX StackTrace_entry(__FUNCTION__, __LINE__, TRACE_MAXIMUM) +#define FUNC_EXIT StackTrace_exit(__FUNCTION__, __LINE__, NULL, TRACE_MINIMUM) +#define FUNC_EXIT_NOLOG StackTrace_exit(__FUNCTION__, __LINE__, -1) +#define FUNC_EXIT_MED StackTrace_exit(__FUNCTION__, __LINE__, NULL, TRACE_MEDIUM) +#define FUNC_EXIT_MAX StackTrace_exit(__FUNCTION__, __LINE__, NULL, TRACE_MAXIMUM) +#define FUNC_EXIT_RC(x) StackTrace_exit(__FUNCTION__, __LINE__, &x, TRACE_MINIMUM) +#define FUNC_EXIT_MED_RC(x) StackTrace_exit(__FUNCTION__, __LINE__, &x, TRACE_MEDIUM) +#define FUNC_EXIT_MAX_RC(x) StackTrace_exit(__FUNCTION__, __LINE__, &x, TRACE_MAXIMUM) +#else +#define FUNC_ENTRY StackTrace_entry(__func__, __LINE__, TRACE_MINIMUM) +#define FUNC_ENTRY_NOLOG StackTrace_entry(__func__, __LINE__, -1) +#define FUNC_ENTRY_MED StackTrace_entry(__func__, __LINE__, TRACE_MEDIUM) +#define FUNC_ENTRY_MAX StackTrace_entry(__func__, __LINE__, TRACE_MAXIMUM) +#define FUNC_EXIT StackTrace_exit(__func__, __LINE__, NULL, TRACE_MINIMUM) +#define FUNC_EXIT_NOLOG StackTrace_exit(__func__, __LINE__, NULL, -1) +#define FUNC_EXIT_MED StackTrace_exit(__func__, __LINE__, NULL, TRACE_MEDIUM) +#define FUNC_EXIT_MAX StackTrace_exit(__func__, __LINE__, NULL, TRACE_MAXIMUM) +#define FUNC_EXIT_RC(x) StackTrace_exit(__func__, __LINE__, &x, TRACE_MINIMUM) +#define FUNC_EXIT_MED_RC(x) StackTrace_exit(__func__, __LINE__, &x, TRACE_MEDIUM) +#define FUNC_EXIT_MAX_RC(x) StackTrace_exit(__func__, __LINE__, &x, TRACE_MAXIMUM) +#endif +#endif + +void StackTrace_entry(const char* name, int line, enum LOG_LEVELS trace); +void StackTrace_exit(const char* name, int line, void* return_value, enum LOG_LEVELS trace); + +void StackTrace_printStack(FILE* dest); +char* StackTrace_get(thread_id_type); + +#endif /* STACKTRACE_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a8703b5c/thirdparty/paho.mqtt.c/src/Thread.c ---------------------------------------------------------------------- diff --git a/thirdparty/paho.mqtt.c/src/Thread.c b/thirdparty/paho.mqtt.c/src/Thread.c new file mode 100644 index 0000000..37aaa58 --- /dev/null +++ b/thirdparty/paho.mqtt.c/src/Thread.c @@ -0,0 +1,462 @@ +/******************************************************************************* + * Copyright (c) 2009, 2017 IBM Corp. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * Contributors: + * Ian Craggs - initial implementation + * Ian Craggs, Allan Stockdill-Mander - async client updates + * Ian Craggs - bug #415042 - start Linux thread as disconnected + * Ian Craggs - fix for bug #420851 + * Ian Craggs - change MacOS semaphore implementation + *******************************************************************************/ + +/** + * @file + * \brief Threading related functions + * + * Used to create platform independent threading functions + */ + + +#include "Thread.h" +#if defined(THREAD_UNIT_TESTS) +#define NOSTACKTRACE +#endif +#include "StackTrace.h" + +#undef malloc +#undef realloc +#undef free + +#if !defined(WIN32) && !defined(WIN64) +#include <errno.h> +#include <unistd.h> +#include <sys/time.h> +#include <fcntl.h> +#include <stdio.h> +#include <sys/stat.h> +#include <limits.h> +#endif +#include <stdlib.h> + +#include "OsWrapper.h" + +/** + * Start a new thread + * @param fn the function to run, must be of the correct signature + * @param parameter pointer to the function parameter, can be NULL + * @return the new thread + */ +thread_type Thread_start(thread_fn fn, void* parameter) +{ +#if defined(WIN32) || defined(WIN64) + thread_type thread = NULL; +#else + thread_type thread = 0; + pthread_attr_t attr; +#endif + + FUNC_ENTRY; +#if defined(WIN32) || defined(WIN64) + thread = CreateThread(NULL, 0, fn, parameter, 0, NULL); +#else + pthread_attr_init(&attr); + pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); + if (pthread_create(&thread, &attr, fn, parameter) != 0) + thread = 0; + pthread_attr_destroy(&attr); +#endif + FUNC_EXIT; + return thread; +} + + +/** + * Create a new mutex + * @return the new mutex + */ +mutex_type Thread_create_mutex(void) +{ + mutex_type mutex = NULL; + int rc = 0; + + FUNC_ENTRY; + #if defined(WIN32) || defined(WIN64) + mutex = CreateMutex(NULL, 0, NULL); + if (mutex == NULL) + rc = GetLastError(); + #else + mutex = malloc(sizeof(pthread_mutex_t)); + rc = pthread_mutex_init(mutex, NULL); + #endif + FUNC_EXIT_RC(rc); + return mutex; +} + + +/** + * Lock a mutex which has alrea + * @return completion code, 0 is success + */ +int Thread_lock_mutex(mutex_type mutex) +{ + int rc = -1; + + /* don't add entry/exit trace points as the stack log uses mutexes - recursion beckons */ + #if defined(WIN32) || defined(WIN64) + /* WaitForSingleObject returns WAIT_OBJECT_0 (0), on success */ + rc = WaitForSingleObject(mutex, INFINITE); + #else + rc = pthread_mutex_lock(mutex); + #endif + + return rc; +} + + +/** + * Unlock a mutex which has already been locked + * @param mutex the mutex + * @return completion code, 0 is success + */ +int Thread_unlock_mutex(mutex_type mutex) +{ + int rc = -1; + + /* don't add entry/exit trace points as the stack log uses mutexes - recursion beckons */ + #if defined(WIN32) || defined(WIN64) + /* if ReleaseMutex fails, the return value is 0 */ + if (ReleaseMutex(mutex) == 0) + rc = GetLastError(); + else + rc = 0; + #else + rc = pthread_mutex_unlock(mutex); + #endif + + return rc; +} + + +/** + * Destroy a mutex which has already been created + * @param mutex the mutex + */ +void Thread_destroy_mutex(mutex_type mutex) +{ + int rc = 0; + + FUNC_ENTRY; + #if defined(WIN32) || defined(WIN64) + rc = CloseHandle(mutex); + #else + rc = pthread_mutex_destroy(mutex); + free(mutex); + #endif + FUNC_EXIT_RC(rc); +} + + +/** + * Get the thread id of the thread from which this function is called + * @return thread id, type varying according to OS + */ +thread_id_type Thread_getid(void) +{ + #if defined(WIN32) || defined(WIN64) + return GetCurrentThreadId(); + #else + return pthread_self(); + #endif +} + + +/** + * Create a new semaphore + * @return the new condition variable + */ +sem_type Thread_create_sem(void) +{ + sem_type sem = NULL; + int rc = 0; + + FUNC_ENTRY; + #if defined(WIN32) || defined(WIN64) + sem = CreateEvent( + NULL, /* default security attributes */ + FALSE, /* manual-reset event? */ + FALSE, /* initial state is nonsignaled */ + NULL /* object name */ + ); + #elif defined(OSX) + sem = dispatch_semaphore_create(0L); + rc = (sem == NULL) ? -1 : 0; + #else + sem = malloc(sizeof(sem_t)); + rc = sem_init(sem, 0, 0); + #endif + FUNC_EXIT_RC(rc); + return sem; +} + + +/** + * Wait for a semaphore to be posted, or timeout. + * @param sem the semaphore + * @param timeout the maximum time to wait, in milliseconds + * @return completion code + */ +int Thread_wait_sem(sem_type sem, int timeout) +{ +/* sem_timedwait is the obvious call to use, but seemed not to work on the Viper, + * so I've used trywait in a loop instead. Ian Craggs 23/7/2010 + */ + int rc = -1; +#if !defined(WIN32) && !defined(WIN64) && !defined(OSX) +#define USE_TRYWAIT +#if defined(USE_TRYWAIT) + int i = 0; + int interval = 10000; /* 10000 microseconds: 10 milliseconds */ + int count = (1000 * timeout) / interval; /* how many intervals in timeout period */ +#else + struct timespec ts; +#endif +#endif + + FUNC_ENTRY; + #if defined(WIN32) || defined(WIN64) + rc = WaitForSingleObject(sem, timeout < 0 ? 0 : timeout); + #elif defined(OSX) + rc = (int)dispatch_semaphore_wait(sem, dispatch_time(DISPATCH_TIME_NOW, (int64_t)timeout*1000000L)); + #elif defined(USE_TRYWAIT) + while (++i < count && (rc = sem_trywait(sem)) != 0) + { + if (rc == -1 && ((rc = errno) != EAGAIN)) + { + rc = 0; + break; + } + usleep(interval); /* microseconds - .1 of a second */ + } + #else + if (clock_gettime(CLOCK_REALTIME, &ts) != -1) + { + ts.tv_sec += timeout; + rc = sem_timedwait(sem, &ts); + } + #endif + + FUNC_EXIT_RC(rc); + return rc; +} + + +/** + * Check to see if a semaphore has been posted, without waiting. + * @param sem the semaphore + * @return 0 (false) or 1 (true) + */ +int Thread_check_sem(sem_type sem) +{ +#if defined(WIN32) || defined(WIN64) + return WaitForSingleObject(sem, 0) == WAIT_OBJECT_0; +#elif defined(OSX) + return dispatch_semaphore_wait(sem, DISPATCH_TIME_NOW) == 0; +#else + int semval = -1; + sem_getvalue(sem, &semval); + return semval > 0; +#endif +} + + +/** + * Post a semaphore + * @param sem the semaphore + * @return completion code + */ +int Thread_post_sem(sem_type sem) +{ + int rc = 0; + + FUNC_ENTRY; + #if defined(WIN32) || defined(WIN64) + if (SetEvent(sem) == 0) + rc = GetLastError(); + #elif defined(OSX) + rc = (int)dispatch_semaphore_signal(sem); + #else + if (sem_post(sem) == -1) + rc = errno; + #endif + + FUNC_EXIT_RC(rc); + return rc; +} + + +/** + * Destroy a semaphore which has already been created + * @param sem the semaphore + */ +int Thread_destroy_sem(sem_type sem) +{ + int rc = 0; + + FUNC_ENTRY; + #if defined(WIN32) || defined(WIN64) + rc = CloseHandle(sem); + #elif defined(OSX) + dispatch_release(sem); + #else + rc = sem_destroy(sem); + free(sem); + #endif + FUNC_EXIT_RC(rc); + return rc; +} + + +#if !defined(WIN32) && !defined(WIN64) +/** + * Create a new condition variable + * @return the condition variable struct + */ +cond_type Thread_create_cond(void) +{ + cond_type condvar = NULL; + int rc = 0; + + FUNC_ENTRY; + condvar = malloc(sizeof(cond_type_struct)); + rc = pthread_cond_init(&condvar->cond, NULL); + rc = pthread_mutex_init(&condvar->mutex, NULL); + + FUNC_EXIT_RC(rc); + return condvar; +} + +/** + * Signal a condition variable + * @return completion code + */ +int Thread_signal_cond(cond_type condvar) +{ + int rc = 0; + + pthread_mutex_lock(&condvar->mutex); + rc = pthread_cond_signal(&condvar->cond); + pthread_mutex_unlock(&condvar->mutex); + + return rc; +} + +/** + * Wait with a timeout (seconds) for condition variable + * @return completion code + */ +int Thread_wait_cond(cond_type condvar, int timeout) +{ + FUNC_ENTRY; + int rc = 0; + struct timespec cond_timeout; + struct timeval cur_time; + + gettimeofday(&cur_time, NULL); + + cond_timeout.tv_sec = cur_time.tv_sec + timeout; + cond_timeout.tv_nsec = cur_time.tv_usec * 1000; + + pthread_mutex_lock(&condvar->mutex); + rc = pthread_cond_timedwait(&condvar->cond, &condvar->mutex, &cond_timeout); + pthread_mutex_unlock(&condvar->mutex); + + FUNC_EXIT_RC(rc); + return rc; +} + +/** + * Destroy a condition variable + * @return completion code + */ +int Thread_destroy_cond(cond_type condvar) +{ + int rc = 0; + + rc = pthread_mutex_destroy(&condvar->mutex); + rc = pthread_cond_destroy(&condvar->cond); + free(condvar); + + return rc; +} +#endif + + +#if defined(THREAD_UNIT_TESTS) + +#include <stdio.h> + +thread_return_type secondary(void* n) +{ + int rc = 0; + + /* + cond_type cond = n; + + printf("Secondary thread about to wait\n"); + rc = Thread_wait_cond(cond); + printf("Secondary thread returned from wait %d\n", rc);*/ + + sem_type sem = n; + + printf("Secondary thread about to wait\n"); + rc = Thread_wait_sem(sem); + printf("Secondary thread returned from wait %d\n", rc); + + printf("Secondary thread about to wait\n"); + rc = Thread_wait_sem(sem); + printf("Secondary thread returned from wait %d\n", rc); + printf("Secondary check sem %d\n", Thread_check_sem(sem)); + + return 0; +} + + +int main(int argc, char *argv[]) +{ + int rc = 0; + + sem_type sem = Thread_create_sem(); + + printf("check sem %d\n", Thread_check_sem(sem)); + + printf("post secondary\n"); + rc = Thread_post_sem(sem); + printf("posted secondary %d\n", rc); + + printf("check sem %d\n", Thread_check_sem(sem)); + + printf("Starting secondary thread\n"); + Thread_start(secondary, (void*)sem); + + sleep(3); + printf("check sem %d\n", Thread_check_sem(sem)); + + printf("post secondary\n"); + rc = Thread_post_sem(sem); + printf("posted secondary %d\n", rc); + + sleep(3); + + printf("Main thread ending\n"); +} + +#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a8703b5c/thirdparty/paho.mqtt.c/src/Thread.h ---------------------------------------------------------------------- diff --git a/thirdparty/paho.mqtt.c/src/Thread.h b/thirdparty/paho.mqtt.c/src/Thread.h new file mode 100644 index 0000000..995e221 --- /dev/null +++ b/thirdparty/paho.mqtt.c/src/Thread.h @@ -0,0 +1,73 @@ +/******************************************************************************* + * Copyright (c) 2009, 2017 IBM Corp. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * Contributors: + * Ian Craggs - initial implementation + * Ian Craggs, Allan Stockdill-Mander - async client updates + * Ian Craggs - fix for bug #420851 + * Ian Craggs - change MacOS semaphore implementation + *******************************************************************************/ +#include "MQTTClient.h" + +#if !defined(THREAD_H) +#define THREAD_H + +#if defined(WIN32) || defined(WIN64) + #include <windows.h> + #define thread_type HANDLE + #define thread_id_type DWORD + #define thread_return_type DWORD + #define thread_fn LPTHREAD_START_ROUTINE + #define mutex_type HANDLE + #define cond_type HANDLE + #define sem_type HANDLE +#else + #include <pthread.h> + + #define thread_type pthread_t + #define thread_id_type pthread_t + #define thread_return_type void* + typedef thread_return_type (*thread_fn)(void*); + #define mutex_type pthread_mutex_t* + typedef struct { pthread_cond_t cond; pthread_mutex_t mutex; } cond_type_struct; + typedef cond_type_struct *cond_type; + #if defined(OSX) + #include <dispatch/dispatch.h> + typedef dispatch_semaphore_t sem_type; + #else + #include <semaphore.h> + typedef sem_t *sem_type; + #endif + + cond_type Thread_create_cond(void); + int Thread_signal_cond(cond_type); + int Thread_wait_cond(cond_type condvar, int timeout); + int Thread_destroy_cond(cond_type); +#endif + +DLLExport thread_type Thread_start(thread_fn, void*); + +DLLExport mutex_type Thread_create_mutex(); +DLLExport int Thread_lock_mutex(mutex_type); +DLLExport int Thread_unlock_mutex(mutex_type); +void Thread_destroy_mutex(mutex_type); + +DLLExport thread_id_type Thread_getid(); + +sem_type Thread_create_sem(void); +int Thread_wait_sem(sem_type sem, int timeout); +int Thread_check_sem(sem_type sem); +int Thread_post_sem(sem_type sem); +int Thread_destroy_sem(sem_type sem); + + +#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a8703b5c/thirdparty/paho.mqtt.c/src/Tree.c ---------------------------------------------------------------------- diff --git a/thirdparty/paho.mqtt.c/src/Tree.c b/thirdparty/paho.mqtt.c/src/Tree.c new file mode 100644 index 0000000..13134d6 --- /dev/null +++ b/thirdparty/paho.mqtt.c/src/Tree.c @@ -0,0 +1,724 @@ +/******************************************************************************* + * Copyright (c) 2009, 2013 IBM Corp. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * Contributors: + * Ian Craggs - initial implementation and documentation + *******************************************************************************/ + +/** @file + * \brief functions which apply to tree structures. + * + * These trees can hold data of any sort, pointed to by the content pointer of the + * Node structure. + * */ + +#define NO_HEAP_TRACKING 1 + +#include "Tree.h" + +#include <stdlib.h> +#include <stdio.h> +#include <string.h> + +#include "Heap.h" + + +int isRed(Node* aNode); +int isBlack(Node* aNode); +int TreeWalk(Node* curnode, int depth); +int TreeMaxDepth(Tree *aTree); +void TreeRotate(Tree* aTree, Node* curnode, int direction, int index); +Node* TreeBAASub(Tree* aTree, Node* curnode, int which, int index); +void TreeBalanceAfterAdd(Tree* aTree, Node* curnode, int index); +void* TreeAddByIndex(Tree* aTree, void* content, size_t size, int index); +Node* TreeFindIndex1(Tree* aTree, void* key, int index, int value); +Node* TreeFindContentIndex(Tree* aTree, void* key, int index); +Node* TreeMinimum(Node* curnode); +Node* TreeSuccessor(Node* curnode); +Node* TreeNextElementIndex(Tree* aTree, Node* curnode, int index); +Node* TreeBARSub(Tree* aTree, Node* curnode, int which, int index); +void TreeBalanceAfterRemove(Tree* aTree, Node* curnode, int index); +void* TreeRemoveIndex(Tree* aTree, void* content, int index); + + +void TreeInitializeNoMalloc(Tree* aTree, int(*compare)(void*, void*, int)) +{ + memset(aTree, '\0', sizeof(Tree)); + aTree->heap_tracking = 1; + aTree->index[0].compare = compare; + aTree->indexes = 1; +} + +/** + * Allocates and initializes a new tree structure. + * @return a pointer to the new tree structure + */ +Tree* TreeInitialize(int(*compare)(void*, void*, int)) +{ +#if defined(UNIT_TESTS) + Tree* newt = malloc(sizeof(Tree)); +#else + Tree* newt = mymalloc(__FILE__, __LINE__, sizeof(Tree)); +#endif + TreeInitializeNoMalloc(newt, compare); + return newt; +} + + +void TreeAddIndex(Tree* aTree, int(*compare)(void*, void*, int)) +{ + aTree->index[aTree->indexes].compare = compare; + ++(aTree->indexes); +} + + +void TreeFree(Tree* aTree) +{ +#if defined(UNIT_TESTS) + free(aTree); +#else + (aTree->heap_tracking) ? myfree(__FILE__, __LINE__, aTree) : free(aTree); +#endif +} + + +#define LEFT 0 +#define RIGHT 1 +#if !defined(max) +#define max(a, b) (a > b) ? a : b; +#endif + + + +int isRed(Node* aNode) +{ + return (aNode != NULL) && (aNode->red); +} + + +int isBlack(Node* aNode) +{ + return (aNode == NULL) || (aNode->red == 0); +} + + +int TreeWalk(Node* curnode, int depth) +{ + if (curnode) + { + int left = TreeWalk(curnode->child[LEFT], depth+1); + int right = TreeWalk(curnode->child[RIGHT], depth+1); + depth = max(left, right); + if (curnode->red) + { + /*if (isRed(curnode->child[LEFT]) || isRed(curnode->child[RIGHT])) + { + printf("red/black tree violation %p\n", curnode->content); + exit(-99); + }*/; + } + } + return depth; +} + + +int TreeMaxDepth(Tree *aTree) +{ + int rc = TreeWalk(aTree->index[0].root, 0); + /*if (aTree->root->red) + { + printf("root node should not be red %p\n", aTree->root->content); + exit(-99); + }*/ + return rc; +} + + +void TreeRotate(Tree* aTree, Node* curnode, int direction, int index) +{ + Node* other = curnode->child[!direction]; + + curnode->child[!direction] = other->child[direction]; + if (other->child[direction] != NULL) + other->child[direction]->parent = curnode; + other->parent = curnode->parent; + if (curnode->parent == NULL) + aTree->index[index].root = other; + else if (curnode == curnode->parent->child[direction]) + curnode->parent->child[direction] = other; + else + curnode->parent->child[!direction] = other; + other->child[direction] = curnode; + curnode->parent = other; +} + + +Node* TreeBAASub(Tree* aTree, Node* curnode, int which, int index) +{ + Node* uncle = curnode->parent->parent->child[which]; + + if (isRed(uncle)) + { + curnode->parent->red = uncle->red = 0; + curnode = curnode->parent->parent; + curnode->red = 1; + } + else + { + if (curnode == curnode->parent->child[which]) + { + curnode = curnode->parent; + TreeRotate(aTree, curnode, !which, index); + } + curnode->parent->red = 0; + curnode->parent->parent->red = 1; + TreeRotate(aTree, curnode->parent->parent, which, index); + } + return curnode; +} + + +void TreeBalanceAfterAdd(Tree* aTree, Node* curnode, int index) +{ + while (curnode && isRed(curnode->parent) && curnode->parent->parent) + { + if (curnode->parent == curnode->parent->parent->child[LEFT]) + curnode = TreeBAASub(aTree, curnode, RIGHT, index); + else + curnode = TreeBAASub(aTree, curnode, LEFT, index); + } + aTree->index[index].root->red = 0; +} + + +/** + * Add an item to a tree + * @param aTree the list to which the item is to be added + * @param content the list item content itself + * @param size the size of the element + */ +void* TreeAddByIndex(Tree* aTree, void* content, size_t size, int index) +{ + Node* curparent = NULL; + Node* curnode = aTree->index[index].root; + Node* newel = NULL; + int left = 0; + int result = 1; + void* rc = NULL; + + while (curnode) + { + result = aTree->index[index].compare(curnode->content, content, 1); + left = (result > 0); + if (result == 0) + break; + else + { + curparent = curnode; + curnode = curnode->child[left]; + } + } + + if (result == 0) + { + if (aTree->allow_duplicates) + exit(-99); + { + newel = curnode; + rc = newel->content; + if (index == 0) + aTree->size += (size - curnode->size); + } + } + else + { + #if defined(UNIT_TESTS) + newel = malloc(sizeof(Node)); + #else + newel = (aTree->heap_tracking) ? mymalloc(__FILE__, __LINE__, sizeof(Node)) : malloc(sizeof(Node)); + #endif + memset(newel, '\0', sizeof(Node)); + if (curparent) + curparent->child[left] = newel; + else + aTree->index[index].root = newel; + newel->parent = curparent; + newel->red = 1; + if (index == 0) + { + ++(aTree->count); + aTree->size += size; + } + } + newel->content = content; + newel->size = size; + TreeBalanceAfterAdd(aTree, newel, index); + return rc; +} + + +void* TreeAdd(Tree* aTree, void* content, size_t size) +{ + void* rc = NULL; + int i; + + for (i = 0; i < aTree->indexes; ++i) + rc = TreeAddByIndex(aTree, content, size, i); + + return rc; +} + + +Node* TreeFindIndex1(Tree* aTree, void* key, int index, int value) +{ + int result = 0; + Node* curnode = aTree->index[index].root; + + while (curnode) + { + result = aTree->index[index].compare(curnode->content, key, value); + if (result == 0) + break; + else + curnode = curnode->child[result > 0]; + } + return curnode; +} + + +Node* TreeFindIndex(Tree* aTree, void* key, int index) +{ + return TreeFindIndex1(aTree, key, index, 0); +} + + +Node* TreeFindContentIndex(Tree* aTree, void* key, int index) +{ + return TreeFindIndex1(aTree, key, index, 1); +} + + +Node* TreeFind(Tree* aTree, void* key) +{ + return TreeFindIndex(aTree, key, 0); +} + + +Node* TreeMinimum(Node* curnode) +{ + if (curnode) + while (curnode->child[LEFT]) + curnode = curnode->child[LEFT]; + return curnode; +} + + +Node* TreeSuccessor(Node* curnode) +{ + if (curnode->child[RIGHT]) + curnode = TreeMinimum(curnode->child[RIGHT]); + else + { + Node* curparent = curnode->parent; + while (curparent && curnode == curparent->child[RIGHT]) + { + curnode = curparent; + curparent = curparent->parent; + } + curnode = curparent; + } + return curnode; +} + + +Node* TreeNextElementIndex(Tree* aTree, Node* curnode, int index) +{ + if (curnode == NULL) + curnode = TreeMinimum(aTree->index[index].root); + else + curnode = TreeSuccessor(curnode); + return curnode; +} + + +Node* TreeNextElement(Tree* aTree, Node* curnode) +{ + return TreeNextElementIndex(aTree, curnode, 0); +} + + +Node* TreeBARSub(Tree* aTree, Node* curnode, int which, int index) +{ + Node* sibling = curnode->parent->child[which]; + + if (isRed(sibling)) + { + sibling->red = 0; + curnode->parent->red = 1; + TreeRotate(aTree, curnode->parent, !which, index); + sibling = curnode->parent->child[which]; + } + if (!sibling) + curnode = curnode->parent; + else if (isBlack(sibling->child[!which]) && isBlack(sibling->child[which])) + { + sibling->red = 1; + curnode = curnode->parent; + } + else + { + if (isBlack(sibling->child[which])) + { + sibling->child[!which]->red = 0; + sibling->red = 1; + TreeRotate(aTree, sibling, which, index); + sibling = curnode->parent->child[which]; + } + sibling->red = curnode->parent->red; + curnode->parent->red = 0; + sibling->child[which]->red = 0; + TreeRotate(aTree, curnode->parent, !which, index); + curnode = aTree->index[index].root; + } + return curnode; +} + + +void TreeBalanceAfterRemove(Tree* aTree, Node* curnode, int index) +{ + while (curnode != aTree->index[index].root && isBlack(curnode)) + { + /* curnode->content == NULL must equal curnode == NULL */ + if (((curnode->content) ? curnode : NULL) == curnode->parent->child[LEFT]) + curnode = TreeBARSub(aTree, curnode, RIGHT, index); + else + curnode = TreeBARSub(aTree, curnode, LEFT, index); + } + curnode->red = 0; +} + + +/** + * Remove an item from a tree + * @param aTree the list to which the item is to be added + * @param curnode the list item content itself + */ +void* TreeRemoveNodeIndex(Tree* aTree, Node* curnode, int index) +{ + Node* redundant = curnode; + Node* curchild = NULL; + size_t size = curnode->size; + void* content = curnode->content; + + /* if the node to remove has 0 or 1 children, it can be removed without involving another node */ + if (curnode->child[LEFT] && curnode->child[RIGHT]) /* 2 children */ + redundant = TreeSuccessor(curnode); /* now redundant must have at most one child */ + + curchild = redundant->child[(redundant->child[LEFT] != NULL) ? LEFT : RIGHT]; + if (curchild) /* we could have no children at all */ + curchild->parent = redundant->parent; + + if (redundant->parent == NULL) + aTree->index[index].root = curchild; + else + { + if (redundant == redundant->parent->child[LEFT]) + redundant->parent->child[LEFT] = curchild; + else + redundant->parent->child[RIGHT] = curchild; + } + + if (redundant != curnode) + { + curnode->content = redundant->content; + curnode->size = redundant->size; + } + + if (isBlack(redundant)) + { + if (curchild == NULL) + { + if (redundant->parent) + { + Node temp; + memset(&temp, '\0', sizeof(Node)); + temp.parent = (redundant) ? redundant->parent : NULL; + temp.red = 0; + TreeBalanceAfterRemove(aTree, &temp, index); + } + } + else + TreeBalanceAfterRemove(aTree, curchild, index); + } + +#if defined(UNIT_TESTS) + free(redundant); +#else + (aTree->heap_tracking) ? myfree(__FILE__, __LINE__, redundant) : free(redundant); +#endif + if (index == 0) + { + aTree->size -= size; + --(aTree->count); + } + return content; +} + + +/** + * Remove an item from a tree + * @param aTree the list to which the item is to be added + * @param curnode the list item content itself + */ +void* TreeRemoveIndex(Tree* aTree, void* content, int index) +{ + Node* curnode = TreeFindContentIndex(aTree, content, index); + + if (curnode == NULL) + return NULL; + + return TreeRemoveNodeIndex(aTree, curnode, index); +} + + +void* TreeRemove(Tree* aTree, void* content) +{ + int i; + void* rc = NULL; + + for (i = 0; i < aTree->indexes; ++i) + rc = TreeRemoveIndex(aTree, content, i); + + return rc; +} + + +void* TreeRemoveKeyIndex(Tree* aTree, void* key, int index) +{ + Node* curnode = TreeFindIndex(aTree, key, index); + void* content = NULL; + int i; + + if (curnode == NULL) + return NULL; + + content = TreeRemoveNodeIndex(aTree, curnode, index); + for (i = 0; i < aTree->indexes; ++i) + { + if (i != index) + content = TreeRemoveIndex(aTree, content, i); + } + return content; +} + + +void* TreeRemoveKey(Tree* aTree, void* key) +{ + return TreeRemoveKeyIndex(aTree, key, 0); +} + + +int TreeIntCompare(void* a, void* b, int content) +{ + int i = *((int*)a); + int j = *((int*)b); + + /* printf("comparing %d %d\n", *((int*)a), *((int*)b)); */ + return (i > j) ? -1 : (i == j) ? 0 : 1; +} + + +int TreePtrCompare(void* a, void* b, int content) +{ + return (a > b) ? -1 : (a == b) ? 0 : 1; +} + + +int TreeStringCompare(void* a, void* b, int content) +{ + return strcmp((char*)a, (char*)b); +} + + +#if defined(UNIT_TESTS) + +int check(Tree *t) +{ + Node* curnode = NULL; + int rc = 0; + + curnode = TreeNextElement(t, curnode); + while (curnode) + { + Node* prevnode = curnode; + + curnode = TreeNextElement(t, curnode); + + if (prevnode && curnode && (*(int*)(curnode->content) < *(int*)(prevnode->content))) + { + printf("out of order %d < %d\n", *(int*)(curnode->content), *(int*)(prevnode->content)); + rc = 99; + } + } + return rc; +} + + +int traverse(Tree *t, int lookfor) +{ + Node* curnode = NULL; + int rc = 0; + + printf("Traversing\n"); + curnode = TreeNextElement(t, curnode); + /* printf("content int %d\n", *(int*)(curnode->content)); */ + while (curnode) + { + Node* prevnode = curnode; + + curnode = TreeNextElement(t, curnode); + /* if (curnode) + printf("content int %d\n", *(int*)(curnode->content)); */ + if (prevnode && curnode && (*(int*)(curnode->content) < *(int*)(prevnode->content))) + { + printf("out of order %d < %d\n", *(int*)(curnode->content), *(int*)(prevnode->content)); + } + if (curnode && (lookfor == *(int*)(curnode->content))) + printf("missing item %d actually found\n", lookfor); + } + printf("End traverse %d\n", rc); + return rc; +} + + +int test(int limit) +{ + int i, *ip, *todelete; + Node* current = NULL; + Tree* t = TreeInitialize(TreeIntCompare); + int rc = 0; + + printf("Tree initialized\n"); + + srand(time(NULL)); + + ip = malloc(sizeof(int)); + *ip = 2; + TreeAdd(t, (void*)ip, sizeof(int)); + + check(t); + + i = 2; + void* result = TreeRemove(t, (void*)&i); + if (result) + free(result); + + int actual[limit]; + for (i = 0; i < limit; i++) + { + void* replaced = NULL; + + ip = malloc(sizeof(int)); + *ip = rand(); + replaced = TreeAdd(t, (void*)ip, sizeof(int)); + if (replaced) /* duplicate */ + { + free(replaced); + actual[i] = -1; + } + else + actual[i] = *ip; + if (i==5) + todelete = ip; + printf("Tree element added %d\n", *ip); + if (1 % 1000 == 0) + { + rc = check(t); + printf("%d elements, check result %d\n", i+1, rc); + if (rc != 0) + return 88; + } + } + + check(t); + + for (i = 0; i < limit; i++) + { + int parm = actual[i]; + + if (parm == -1) + continue; + + Node* found = TreeFind(t, (void*)&parm); + if (found) + printf("Tree find %d %d\n", parm, *(int*)(found->content)); + else + { + printf("%d not found\n", parm); + traverse(t, parm); + return -2; + } + } + + check(t); + + for (i = limit -1; i >= 0; i--) + { + int parm = actual[i]; + void *found; + + if (parm == -1) /* skip duplicate */ + continue; + + found = TreeRemove(t, (void*)&parm); + if (found) + { + printf("%d Tree remove %d %d\n", i, parm, *(int*)(found)); + free(found); + } + else + { + int count = 0; + printf("%d %d not found\n", i, parm); + traverse(t, parm); + for (i = 0; i < limit; i++) + if (actual[i] == parm) + ++count; + printf("%d occurs %d times\n", parm, count); + return -2; + } + if (i % 1000 == 0) + { + rc = check(t); + printf("%d elements, check result %d\n", i+1, rc); + if (rc != 0) + return 88; + } + } + printf("finished\n"); + return 0; +} + +int main(int argc, char *argv[]) +{ + int rc = 0; + + while (rc == 0) + rc = test(999999); +} + +#endif + + + + + http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a8703b5c/thirdparty/paho.mqtt.c/src/Tree.h ---------------------------------------------------------------------- diff --git a/thirdparty/paho.mqtt.c/src/Tree.h b/thirdparty/paho.mqtt.c/src/Tree.h new file mode 100644 index 0000000..bbbd014 --- /dev/null +++ b/thirdparty/paho.mqtt.c/src/Tree.h @@ -0,0 +1,115 @@ +/******************************************************************************* + * Copyright (c) 2009, 2013 IBM Corp. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * Contributors: + * Ian Craggs - initial implementation and documentation + *******************************************************************************/ + + +#if !defined(TREE_H) +#define TREE_H + +#include <stdlib.h> /* for size_t definition */ + +/*BE +defm defTree(T) // macro to define a tree + +def T concat Node +{ + n32 ptr T concat Node "parent" + n32 ptr T concat Node "left" + n32 ptr T concat Node "right" + n32 ptr T id2str(T) + n32 suppress "size" +} + + +def T concat Tree +{ + struct + { + n32 ptr T concat Node suppress "root" + n32 ptr DATA suppress "compare" + } + struct + { + n32 ptr T concat Node suppress "root" + n32 ptr DATA suppress "compare" + } + n32 dec "count" + n32 dec suppress "size" +} + +endm + +defTree(INT) +defTree(STRING) +defTree(TMP) + +BE*/ + +/** + * Structure to hold all data for one list element + */ +typedef struct NodeStruct +{ + struct NodeStruct *parent, /**< pointer to parent tree node, in case we need it */ + *child[2]; /**< pointers to child tree nodes 0 = left, 1 = right */ + void* content; /**< pointer to element content */ + size_t size; /**< size of content */ + unsigned int red : 1; +} Node; + + +/** + * Structure to hold all data for one tree + */ +typedef struct +{ + struct + { + Node *root; /**< root node pointer */ + int (*compare)(void*, void*, int); /**< comparison function */ + } index[2]; + int indexes, /**< no of indexes into tree */ + count; /**< no of items */ + size_t size; /**< heap storage used */ + unsigned int heap_tracking : 1; /**< switch on heap tracking for this tree? */ + unsigned int allow_duplicates : 1; /**< switch to allow duplicate entries */ +} Tree; + + +Tree* TreeInitialize(int(*compare)(void*, void*, int)); +void TreeInitializeNoMalloc(Tree* aTree, int(*compare)(void*, void*, int)); +void TreeAddIndex(Tree* aTree, int(*compare)(void*, void*, int)); + +void* TreeAdd(Tree* aTree, void* content, size_t size); + +void* TreeRemove(Tree* aTree, void* content); + +void* TreeRemoveKey(Tree* aTree, void* key); +void* TreeRemoveKeyIndex(Tree* aTree, void* key, int index); + +void* TreeRemoveNodeIndex(Tree* aTree, Node* aNode, int index); + +void TreeFree(Tree* aTree); + +Node* TreeFind(Tree* aTree, void* key); +Node* TreeFindIndex(Tree* aTree, void* key, int index); + +Node* TreeNextElement(Tree* aTree, Node* curnode); + +int TreeIntCompare(void* a, void* b, int); +int TreePtrCompare(void* a, void* b, int); +int TreeStringCompare(void* a, void* b, int); + +#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a8703b5c/thirdparty/paho.mqtt.c/src/VersionInfo.h.in ---------------------------------------------------------------------- diff --git a/thirdparty/paho.mqtt.c/src/VersionInfo.h.in b/thirdparty/paho.mqtt.c/src/VersionInfo.h.in new file mode 100644 index 0000000..5b91bf3 --- /dev/null +++ b/thirdparty/paho.mqtt.c/src/VersionInfo.h.in @@ -0,0 +1,7 @@ +#ifndef VERSIONINFO_H +#define VERSIONINFO_H + +#define BUILD_TIMESTAMP "@BUILD_TIMESTAMP@" +#define CLIENT_VERSION "@CLIENT_VERSION@" + +#endif /* VERSIONINFO_H */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a8703b5c/thirdparty/paho.mqtt.c/src/samples/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/thirdparty/paho.mqtt.c/src/samples/CMakeLists.txt b/thirdparty/paho.mqtt.c/src/samples/CMakeLists.txt new file mode 100644 index 0000000..79ea886 --- /dev/null +++ b/thirdparty/paho.mqtt.c/src/samples/CMakeLists.txt @@ -0,0 +1,65 @@ +#******************************************************************************* +# Copyright (c) 2015, 2017 logi.cals GmbH and others +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Eclipse Public License v1.0 +# and Eclipse Distribution License v1.0 which accompany this distribution. +# +# The Eclipse Public License is available at +# http://www.eclipse.org/legal/epl-v10.html +# and the Eclipse Distribution License is available at +# http://www.eclipse.org/org/documents/edl-v10.php. +# +# Contributors: +# Rainer Poisel - initial version +# Ian Craggs - update sample names +#*******************************************************************************/ + +# Note: on OS X you should install XCode and the associated command-line tools + +## compilation/linkage settings +INCLUDE_DIRECTORIES( + . + ${CMAKE_SOURCE_DIR}/src + ${CMAKE_BINARY_DIR} + ) + +IF (WIN32) + ADD_DEFINITIONS(/DCMAKE_BUILD /D_CRT_SECURE_NO_DEPRECATE) +ENDIF() + +# sample files c +ADD_EXECUTABLE(paho_c_pub paho_c_pub.c) +ADD_EXECUTABLE(paho_c_sub paho_c_sub.c) +ADD_EXECUTABLE(paho_cs_pub paho_cs_pub.c) +ADD_EXECUTABLE(paho_cs_sub paho_cs_sub.c) + +TARGET_LINK_LIBRARIES(paho_c_pub paho-mqtt3a) +TARGET_LINK_LIBRARIES(paho_c_sub paho-mqtt3a) +TARGET_LINK_LIBRARIES(paho_cs_pub paho-mqtt3c) +TARGET_LINK_LIBRARIES(paho_cs_sub paho-mqtt3c) + +ADD_EXECUTABLE(MQTTAsync_subscribe MQTTAsync_subscribe.c) +ADD_EXECUTABLE(MQTTAsync_publish MQTTAsync_publish.c) +ADD_EXECUTABLE(MQTTClient_subscribe MQTTClient_subscribe.c) +ADD_EXECUTABLE(MQTTClient_publish MQTTClient_publish.c) +ADD_EXECUTABLE(MQTTClient_publish_async MQTTClient_publish_async.c) + +TARGET_LINK_LIBRARIES(MQTTAsync_subscribe paho-mqtt3a) +TARGET_LINK_LIBRARIES(MQTTAsync_publish paho-mqtt3a) +TARGET_LINK_LIBRARIES(MQTTClient_subscribe paho-mqtt3c) +TARGET_LINK_LIBRARIES(MQTTClient_publish paho-mqtt3c) +TARGET_LINK_LIBRARIES(MQTTClient_publish_async paho-mqtt3c) + +INSTALL(TARGETS paho_c_sub + paho_c_pub + paho_cs_sub + paho_cs_pub + MQTTAsync_subscribe + MQTTAsync_publish + MQTTClient_subscribe + MQTTClient_publish + MQTTClient_publish_async + + RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR} + LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR})
