http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a8703b5c/thirdparty/paho.mqtt.c/src/Socket.c
----------------------------------------------------------------------
diff --git a/thirdparty/paho.mqtt.c/src/Socket.c 
b/thirdparty/paho.mqtt.c/src/Socket.c
new file mode 100644
index 0000000..939dbab
--- /dev/null
+++ b/thirdparty/paho.mqtt.c/src/Socket.c
@@ -0,0 +1,898 @@
+/*******************************************************************************
+ * Copyright (c) 2009, 2017 IBM Corp.
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * and Eclipse Distribution License v1.0 which accompany this distribution.
+ *
+ * The Eclipse Public License is available at
+ *    http://www.eclipse.org/legal/epl-v10.html
+ * and the Eclipse Distribution License is available at
+ *   http://www.eclipse.org/org/documents/edl-v10.php.
+ *
+ * Contributors:
+ *    Ian Craggs - initial implementation and documentation
+ *    Ian Craggs - async client updates
+ *    Ian Craggs - fix for bug 484496
+ *    Juergen Kosel, Ian Craggs - fix for issue #135
+ *    Ian Craggs - issue #217
+ *    Ian Craggs - fix for issue #186
+ 
*******************************************************************************/
+
+/**
+ * @file
+ * \brief Socket related functions
+ *
+ * Some other related functions are in the SocketBuffer module
+ */
+
+
+#include "Socket.h"
+#include "Log.h"
+#include "SocketBuffer.h"
+#include "Messages.h"
+#include "StackTrace.h"
+#if defined(OPENSSL)
+#include "SSLSocket.h"
+#endif
+
+#include <stdlib.h>
+#include <string.h>
+#include <signal.h>
+#include <ctype.h>
+
+#include "Heap.h"
+
+int Socket_setnonblocking(int sock);
+int Socket_error(char* aString, int sock);
+int Socket_addSocket(int newSd);
+int isReady(int socket, fd_set* read_set, fd_set* write_set);
+int Socket_writev(int socket, iobuf* iovecs, int count, unsigned long* bytes);
+int Socket_close_only(int socket);
+int Socket_continueWrite(int socket);
+int Socket_continueWrites(fd_set* pwset);
+char* Socket_getaddrname(struct sockaddr* sa, int sock);
+
+#if defined(WIN32) || defined(WIN64)
+#define iov_len len
+#define iov_base buf
+#endif
+
+/**
+ * Structure to hold all socket data for the module
+ */
+Sockets s;
+static fd_set wset;
+
+/**
+ * Set a socket non-blocking, OS independently
+ * @param sock the socket to set non-blocking
+ * @return TCP call error code
+ */
+int Socket_setnonblocking(int sock)
+{
+       int rc;
+#if defined(WIN32) || defined(WIN64)
+       u_long flag = 1L;
+
+       FUNC_ENTRY;
+       rc = ioctl(sock, FIONBIO, &flag);
+#else
+       int flags;
+
+       FUNC_ENTRY;
+       if ((flags = fcntl(sock, F_GETFL, 0)))
+               flags = 0;
+       rc = fcntl(sock, F_SETFL, flags | O_NONBLOCK);
+#endif
+       FUNC_EXIT_RC(rc);
+       return rc;
+}
+
+
+/**
+ * Gets the specific error corresponding to SOCKET_ERROR
+ * @param aString the function that was being used when the error occurred
+ * @param sock the socket on which the error occurred
+ * @return the specific TCP error code
+ */
+int Socket_error(char* aString, int sock)
+{
+#if defined(WIN32) || defined(WIN64)
+       int errno;
+#endif
+
+       FUNC_ENTRY;
+#if defined(WIN32) || defined(WIN64)
+       errno = WSAGetLastError();
+#endif
+       if (errno != EINTR && errno != EAGAIN && errno != EINPROGRESS && errno 
!= EWOULDBLOCK)
+       {
+               if (strcmp(aString, "shutdown") != 0 || (errno != ENOTCONN && 
errno != ECONNRESET))
+                       Log(TRACE_MINIMUM, -1, "Socket error %s(%d) in %s for 
socket %d", strerror(errno), errno, aString, sock);
+       }
+       FUNC_EXIT_RC(errno);
+       return errno;
+}
+
+
+/**
+ * Initialize the socket module
+ */
+void Socket_outInitialize(void)
+{
+#if defined(WIN32) || defined(WIN64)
+       WORD    winsockVer = 0x0202;
+       WSADATA wsd;
+
+       FUNC_ENTRY;
+       WSAStartup(winsockVer, &wsd);
+#else
+       FUNC_ENTRY;
+       signal(SIGPIPE, SIG_IGN);
+#endif
+
+       SocketBuffer_initialize();
+       s.clientsds = ListInitialize();
+       s.connect_pending = ListInitialize();
+       s.write_pending = ListInitialize();
+       s.cur_clientsds = NULL;
+       FD_ZERO(&(s.rset));                                                     
                                                        /* Initialize the 
descriptor set */
+       FD_ZERO(&(s.pending_wset));
+       s.maxfdp1 = 0;
+       memcpy((void*)&(s.rset_saved), (void*)&(s.rset), sizeof(s.rset_saved));
+       FUNC_EXIT;
+}
+
+
+/**
+ * Terminate the socket module
+ */
+void Socket_outTerminate(void)
+{
+       FUNC_ENTRY;
+       ListFree(s.connect_pending);
+       ListFree(s.write_pending);
+       ListFree(s.clientsds);
+       SocketBuffer_terminate();
+#if defined(WIN32) || defined(WIN64)
+       WSACleanup();
+#endif
+       FUNC_EXIT;
+}
+
+
+/**
+ * Add a socket to the list of socket to check with select
+ * @param newSd the new socket to add
+ */
+int Socket_addSocket(int newSd)
+{
+       int rc = 0;
+
+       FUNC_ENTRY;
+       if (ListFindItem(s.clientsds, &newSd, intcompare) == NULL) /* make sure 
we don't add the same socket twice */
+       {
+               if (s.clientsds->count >= FD_SETSIZE)
+               {
+                       Log(LOG_ERROR, -1, "addSocket: exceeded FD_SETSIZE %d", 
FD_SETSIZE);
+                       rc = SOCKET_ERROR;
+               }
+               else
+               {
+                       int* pnewSd = (int*)malloc(sizeof(newSd));
+                       *pnewSd = newSd;
+                       ListAppend(s.clientsds, pnewSd, sizeof(newSd));
+                       FD_SET(newSd, &(s.rset_saved));
+                       s.maxfdp1 = max(s.maxfdp1, newSd + 1);
+                       rc = Socket_setnonblocking(newSd);
+                       if (rc == SOCKET_ERROR)
+                               Log(LOG_ERROR, -1, "addSocket: setnonblocking");
+               }
+       }
+       else
+               Log(LOG_ERROR, -1, "addSocket: socket %d already in the list", 
newSd);
+
+       FUNC_EXIT_RC(rc);
+       return rc;
+}
+
+
+/**
+ * Don't accept work from a client unless it is accepting work back, i.e. its 
socket is writeable
+ * this seems like a reasonable form of flow control, and practically, seems 
to work.
+ * @param socket the socket to check
+ * @param read_set the socket read set (see select doc)
+ * @param write_set the socket write set (see select doc)
+ * @return boolean - is the socket ready to go?
+ */
+int isReady(int socket, fd_set* read_set, fd_set* write_set)
+{
+       int rc = 1;
+
+       FUNC_ENTRY;
+       if  (ListFindItem(s.connect_pending, &socket, intcompare) && 
FD_ISSET(socket, write_set))
+               ListRemoveItem(s.connect_pending, &socket, intcompare);
+       else
+               rc = FD_ISSET(socket, read_set) && FD_ISSET(socket, write_set) 
&& Socket_noPendingWrites(socket);
+       FUNC_EXIT_RC(rc);
+       return rc;
+}
+
+
+/**
+ *  Returns the next socket ready for communications as indicated by select
+ *  @param more_work flag to indicate more work is waiting, and thus a timeout 
value of 0 should
+ *  be used for the select
+ *  @param tp the timeout to be used for the select, unless overridden
+ *  @return the socket next ready, or 0 if none is ready
+ */
+int Socket_getReadySocket(int more_work, struct timeval *tp)
+{
+       int rc = 0;
+       static struct timeval zero = {0L, 0L}; /* 0 seconds */
+       static struct timeval one = {1L, 0L}; /* 1 second */
+       struct timeval timeout = one;
+
+       FUNC_ENTRY;
+       if (s.clientsds->count == 0)
+               goto exit;
+
+       if (more_work)
+               timeout = zero;
+       else if (tp)
+               timeout = *tp;
+
+       while (s.cur_clientsds != NULL)
+       {
+               if (isReady(*((int*)(s.cur_clientsds->content)), &(s.rset), 
&wset))
+                       break;
+               ListNextElement(s.clientsds, &s.cur_clientsds);
+       }
+
+       if (s.cur_clientsds == NULL)
+       {
+               int rc1;
+               fd_set pwset;
+
+               memcpy((void*)&(s.rset), (void*)&(s.rset_saved), 
sizeof(s.rset));
+               memcpy((void*)&(pwset), (void*)&(s.pending_wset), 
sizeof(pwset));
+               if ((rc = select(s.maxfdp1, &(s.rset), &pwset, NULL, &timeout)) 
== SOCKET_ERROR)
+               {
+                       Socket_error("read select", 0);
+                       goto exit;
+               }
+               Log(TRACE_MAX, -1, "Return code %d from read select", rc);
+
+               if (Socket_continueWrites(&pwset) == SOCKET_ERROR)
+               {
+                       rc = 0;
+                       goto exit;
+               }
+
+               memcpy((void*)&wset, (void*)&(s.rset_saved), sizeof(wset));
+               if ((rc1 = select(s.maxfdp1, NULL, &(wset), NULL, &zero)) == 
SOCKET_ERROR)
+               {
+                       Socket_error("write select", 0);
+                       rc = rc1;
+                       goto exit;
+               }
+               Log(TRACE_MAX, -1, "Return code %d from write select", rc1);
+
+               if (rc == 0 && rc1 == 0)
+                       goto exit; /* no work to do */
+
+               s.cur_clientsds = s.clientsds->first;
+               while (s.cur_clientsds != NULL)
+               {
+                       int cursock = *((int*)(s.cur_clientsds->content));
+                       if (isReady(cursock, &(s.rset), &wset))
+                               break;
+                       ListNextElement(s.clientsds, &s.cur_clientsds);
+               }
+       }
+
+       if (s.cur_clientsds == NULL)
+               rc = 0;
+       else
+       {
+               rc = *((int*)(s.cur_clientsds->content));
+               ListNextElement(s.clientsds, &s.cur_clientsds);
+       }
+exit:
+       FUNC_EXIT_RC(rc);
+       return rc;
+} /* end getReadySocket */
+
+
+/**
+ *  Reads one byte from a socket
+ *  @param socket the socket to read from
+ *  @param c the character read, returned
+ *  @return completion code
+ */
+int Socket_getch(int socket, char* c)
+{
+       int rc = SOCKET_ERROR;
+
+       FUNC_ENTRY;
+       if ((rc = SocketBuffer_getQueuedChar(socket, c)) != 
SOCKETBUFFER_INTERRUPTED)
+               goto exit;
+
+       if ((rc = recv(socket, c, (size_t)1, 0)) == SOCKET_ERROR)
+       {
+               int err = Socket_error("recv - getch", socket);
+               if (err == EWOULDBLOCK || err == EAGAIN)
+               {
+                       rc = TCPSOCKET_INTERRUPTED;
+                       SocketBuffer_interrupted(socket, 0);
+               }
+       }
+       else if (rc == 0)
+               rc = SOCKET_ERROR;      /* The return value from recv is 0 when 
the peer has performed an orderly shutdown. */
+       else if (rc == 1)
+       {
+               SocketBuffer_queueChar(socket, *c);
+               rc = TCPSOCKET_COMPLETE;
+       }
+exit:
+       FUNC_EXIT_RC(rc);
+       return rc;
+}
+
+
+/**
+ *  Attempts to read a number of bytes from a socket, non-blocking. If a 
previous read did not
+ *  finish, then retrieve that data.
+ *  @param socket the socket to read from
+ *  @param bytes the number of bytes to read
+ *  @param actual_len the actual number of bytes read
+ *  @return completion code
+ */
+char *Socket_getdata(int socket, size_t bytes, size_t* actual_len)
+{
+       int rc;
+       char* buf;
+
+       FUNC_ENTRY;
+       if (bytes == 0)
+       {
+               buf = SocketBuffer_complete(socket);
+               goto exit;
+       }
+
+       buf = SocketBuffer_getQueuedData(socket, bytes, actual_len);
+
+       if ((rc = recv(socket, buf + (*actual_len), (int)(bytes - 
(*actual_len)), 0)) == SOCKET_ERROR)
+       {
+               rc = Socket_error("recv - getdata", socket);
+               if (rc != EAGAIN && rc != EWOULDBLOCK)
+               {
+                       buf = NULL;
+                       goto exit;
+               }
+       }
+       else if (rc == 0) /* rc 0 means the other end closed the socket, albeit 
"gracefully" */
+       {
+               buf = NULL;
+               goto exit;
+       }
+       else
+               *actual_len += rc;
+
+       if (*actual_len == bytes)
+               SocketBuffer_complete(socket);
+       else /* we didn't read the whole packet */
+       {
+               SocketBuffer_interrupted(socket, *actual_len);
+               Log(TRACE_MAX, -1, "%d bytes expected but %d bytes now 
received", bytes, *actual_len);
+       }
+exit:
+       FUNC_EXIT;
+       return buf;
+}
+
+
+/**
+ *  Indicate whether any data is pending outbound for a socket.
+ *  @return boolean - true == data pending.
+ */
+int Socket_noPendingWrites(int socket)
+{
+       int cursock = socket;
+       return ListFindItem(s.write_pending, &cursock, intcompare) == NULL;
+}
+
+
+/**
+ *  Attempts to write a series of iovec buffers to a socket in *one* system 
call so that
+ *  they are sent as one packet.
+ *  @param socket the socket to write to
+ *  @param iovecs an array of buffers to write
+ *  @param count number of buffers in iovecs
+ *  @param bytes number of bytes actually written returned
+ *  @return completion code, especially TCPSOCKET_INTERRUPTED
+ */
+int Socket_writev(int socket, iobuf* iovecs, int count, unsigned long* bytes)
+{
+       int rc;
+
+       FUNC_ENTRY;
+#if defined(WIN32) || defined(WIN64)
+       rc = WSASend(socket, iovecs, count, (LPDWORD)bytes, 0, NULL, NULL);
+       if (rc == SOCKET_ERROR)
+       {
+               int err = Socket_error("WSASend - putdatas", socket);
+               if (err == EWOULDBLOCK || err == EAGAIN)
+                       rc = TCPSOCKET_INTERRUPTED;
+       }
+#else
+       *bytes = 0L;
+       rc = writev(socket, iovecs, count);
+       if (rc == SOCKET_ERROR)
+       {
+               int err = Socket_error("writev - putdatas", socket);
+               if (err == EWOULDBLOCK || err == EAGAIN)
+                       rc = TCPSOCKET_INTERRUPTED;
+       }
+       else
+               *bytes = rc;
+#endif
+       FUNC_EXIT_RC(rc);
+       return rc;
+}
+
+
+/**
+ *  Attempts to write a series of buffers to a socket in *one* system call so 
that they are
+ *  sent as one packet.
+ *  @param socket the socket to write to
+ *  @param buf0 the first buffer
+ *  @param buf0len the length of data in the first buffer
+ *  @param count number of buffers
+ *  @param buffers an array of buffers to write
+ *  @param buflens an array of corresponding buffer lengths
+ *  @return completion code, especially TCPSOCKET_INTERRUPTED
+ */
+int Socket_putdatas(int socket, char* buf0, size_t buf0len, int count, char** 
buffers, size_t* buflens, int* frees)
+{
+       unsigned long bytes = 0L;
+       iobuf iovecs[5];
+       int frees1[5];
+       int rc = TCPSOCKET_INTERRUPTED, i;
+       size_t total = buf0len;
+
+       FUNC_ENTRY;
+       if (!Socket_noPendingWrites(socket))
+       {
+               Log(LOG_SEVERE, -1, "Trying to write to socket %d for which 
there is already pending output", socket);
+               rc = SOCKET_ERROR;
+               goto exit;
+       }
+
+       for (i = 0; i < count; i++)
+               total += buflens[i];
+
+       iovecs[0].iov_base = buf0;
+       iovecs[0].iov_len = (ULONG)buf0len;
+       frees1[0] = 1;
+       for (i = 0; i < count; i++)
+       {
+               iovecs[i+1].iov_base = buffers[i];
+               iovecs[i+1].iov_len = (ULONG)buflens[i];
+               frees1[i+1] = frees[i];
+       }
+
+       if ((rc = Socket_writev(socket, iovecs, count+1, &bytes)) != 
SOCKET_ERROR)
+       {
+               if (bytes == total)
+                       rc = TCPSOCKET_COMPLETE;
+               else
+               {
+                       int* sockmem = (int*)malloc(sizeof(int));
+                       Log(TRACE_MIN, -1, "Partial write: %ld bytes of %d 
actually written on socket %d",
+                                       bytes, total, socket);
+#if defined(OPENSSL)
+                       SocketBuffer_pendingWrite(socket, NULL, count+1, 
iovecs, frees1, total, bytes);
+#else
+                       SocketBuffer_pendingWrite(socket, count+1, iovecs, 
frees1, total, bytes);
+#endif
+                       *sockmem = socket;
+                       ListAppend(s.write_pending, sockmem, sizeof(int));
+                       FD_SET(socket, &(s.pending_wset));
+                       rc = TCPSOCKET_INTERRUPTED;
+               }
+       }
+exit:
+       FUNC_EXIT_RC(rc);
+       return rc;
+}
+
+
+/**
+ *  Add a socket to the pending write list, so that it is checked for writing 
in select.  This is used
+ *  in connect processing when the TCP connect is incomplete, as we need to 
check the socket for both
+ *  ready to read and write states.
+ *  @param socket the socket to add
+ */
+void Socket_addPendingWrite(int socket)
+{
+       FD_SET(socket, &(s.pending_wset));
+}
+
+
+/**
+ *  Clear a socket from the pending write list - if one was added with 
Socket_addPendingWrite
+ *  @param socket the socket to remove
+ */
+void Socket_clearPendingWrite(int socket)
+{
+       if (FD_ISSET(socket, &(s.pending_wset)))
+               FD_CLR(socket, &(s.pending_wset));
+}
+
+
+/**
+ *  Close a socket without removing it from the select list.
+ *  @param socket the socket to close
+ *  @return completion code
+ */
+int Socket_close_only(int socket)
+{
+       int rc;
+
+       FUNC_ENTRY;
+#if defined(WIN32) || defined(WIN64)
+       if (shutdown(socket, SD_BOTH) == SOCKET_ERROR)
+               Socket_error("shutdown", socket);
+       if ((rc = closesocket(socket)) == SOCKET_ERROR)
+               Socket_error("close", socket);
+#else
+       if (shutdown(socket, SHUT_WR) == SOCKET_ERROR)
+               Socket_error("shutdown", socket);
+       if ((rc = recv(socket, NULL, (size_t)0, 0)) == SOCKET_ERROR)
+               Socket_error("shutdown", socket);
+       if ((rc = close(socket)) == SOCKET_ERROR)
+               Socket_error("close", socket);
+#endif
+       FUNC_EXIT_RC(rc);
+       return rc;
+}
+
+
+/**
+ *  Close a socket and remove it from the select list.
+ *  @param socket the socket to close
+ *  @return completion code
+ */
+void Socket_close(int socket)
+{
+       FUNC_ENTRY;
+       Socket_close_only(socket);
+       FD_CLR(socket, &(s.rset_saved));
+       if (FD_ISSET(socket, &(s.pending_wset)))
+               FD_CLR(socket, &(s.pending_wset));
+       if (s.cur_clientsds != NULL && *(int*)(s.cur_clientsds->content) == 
socket)
+               s.cur_clientsds = s.cur_clientsds->next;
+       ListRemoveItem(s.connect_pending, &socket, intcompare);
+       ListRemoveItem(s.write_pending, &socket, intcompare);
+       SocketBuffer_cleanup(socket);
+
+       if (ListRemoveItem(s.clientsds, &socket, intcompare))
+               Log(TRACE_MIN, -1, "Removed socket %d", socket);
+       else
+               Log(LOG_ERROR, -1, "Failed to remove socket %d", socket);
+       if (socket + 1 >= s.maxfdp1)
+       {
+               /* now we have to reset s.maxfdp1 */
+               ListElement* cur_clientsds = NULL;
+
+               s.maxfdp1 = 0;
+               while (ListNextElement(s.clientsds, &cur_clientsds))
+                       s.maxfdp1 = max(*((int*)(cur_clientsds->content)), 
s.maxfdp1);
+               ++(s.maxfdp1);
+               Log(TRACE_MAX, -1, "Reset max fdp1 to %d", s.maxfdp1);
+       }
+       FUNC_EXIT;
+}
+
+
+/**
+ *  Create a new socket and TCP connect to an address/port
+ *  @param addr the address string
+ *  @param port the TCP port
+ *  @param sock returns the new socket
+ *  @return completion code
+ */
+int Socket_new(char* addr, int port, int* sock)
+{
+       int type = SOCK_STREAM;
+       struct sockaddr_in address;
+#if defined(AF_INET6)
+       struct sockaddr_in6 address6;
+#endif
+       int rc = SOCKET_ERROR;
+#if defined(WIN32) || defined(WIN64)
+       short family;
+#else
+       sa_family_t family = AF_INET;
+#endif
+       struct addrinfo *result = NULL;
+       struct addrinfo hints = {0, AF_UNSPEC, SOCK_STREAM, IPPROTO_TCP, 0, 
NULL, NULL, NULL};
+
+       FUNC_ENTRY;
+       *sock = -1;
+       memset(&address6, '\0', sizeof(address6));
+
+       if (addr[0] == '[')
+         ++addr;
+
+       if ((rc = getaddrinfo(addr, NULL, &hints, &result)) == 0)
+       {
+               struct addrinfo* res = result;
+
+               while (res)
+               {       /* prefer ip4 addresses */
+                       if (res->ai_family == AF_INET || res->ai_next == NULL)
+                               break;
+                       res = res->ai_next;
+               }
+
+               if (res == NULL)
+                       rc = -1;
+               else
+#if defined(AF_INET6)
+               if (res->ai_family == AF_INET6)
+               {
+                       address6.sin6_port = htons(port);
+                       address6.sin6_family = family = AF_INET6;
+                       memcpy(&address6.sin6_addr, &((struct 
sockaddr_in6*)(res->ai_addr))->sin6_addr, sizeof(address6.sin6_addr));
+               }
+               else
+#endif
+               if (res->ai_family == AF_INET)
+               {
+                       address.sin_port = htons(port);
+                       address.sin_family = family = AF_INET;
+                       address.sin_addr = ((struct 
sockaddr_in*)(res->ai_addr))->sin_addr;
+               }
+               else
+                       rc = -1;
+
+               freeaddrinfo(result);
+       }
+       else
+               Log(LOG_ERROR, -1, "getaddrinfo failed for addr %s with rc %d", 
addr, rc);
+
+       if (rc != 0)
+               Log(LOG_ERROR, -1, "%s is not a valid IP address", addr);
+       else
+       {
+               *sock = (int)socket(family, type, 0);
+               if (*sock == INVALID_SOCKET)
+                       rc = Socket_error("socket", *sock);
+               else
+               {
+#if defined(NOSIGPIPE)
+                       int opt = 1;
+
+                       if (setsockopt(*sock, SOL_SOCKET, SO_NOSIGPIPE, 
(void*)&opt, sizeof(opt)) != 0)
+                               Log(LOG_ERROR, -1, "Could not set SO_NOSIGPIPE 
for socket %d", *sock);
+#endif
+
+                       Log(TRACE_MIN, -1, "New socket %d for %s, port %d",     
*sock, addr, port);
+                       if (Socket_addSocket(*sock) == SOCKET_ERROR)
+                               rc = Socket_error("addSocket", *sock);
+                       else
+                       {
+                               /* this could complete immmediately, even 
though we are non-blocking */
+                               if (family == AF_INET)
+                                       rc = connect(*sock, (struct 
sockaddr*)&address, sizeof(address));
+       #if defined(AF_INET6)
+                               else
+                                       rc = connect(*sock, (struct 
sockaddr*)&address6, sizeof(address6));
+       #endif
+                               if (rc == SOCKET_ERROR)
+                                       rc = Socket_error("connect", *sock);
+                               if (rc == EINPROGRESS || rc == EWOULDBLOCK)
+                               {
+                                       int* pnewSd = (int*)malloc(sizeof(int));
+                                       *pnewSd = *sock;
+                                       ListAppend(s.connect_pending, pnewSd, 
sizeof(int));
+                                       Log(TRACE_MIN, 15, "Connect pending");
+                               }
+                       }
+                        /* Prevent socket leak by closing unusable sockets,
+                         * as reported in
+                         * https://github.com/eclipse/paho.mqtt.c/issues/135
+                         */
+                        if (rc != 0 && (rc != EINPROGRESS) && (rc != 
EWOULDBLOCK))
+                        {
+                            Socket_close(*sock); /* close socket and remove 
from our list of sockets */
+                            *sock = -1; /* as initialized before */
+                        }
+               }
+       }
+       FUNC_EXIT_RC(rc);
+       return rc;
+}
+
+
+static Socket_writeComplete* writecomplete = NULL;
+
+void Socket_setWriteCompleteCallback(Socket_writeComplete* mywritecomplete)
+{
+       writecomplete = mywritecomplete;
+}
+
+/**
+ *  Continue an outstanding write for a particular socket
+ *  @param socket that socket
+ *  @return completion code
+ */
+int Socket_continueWrite(int socket)
+{
+       int rc = 0;
+       pending_writes* pw;
+       unsigned long curbuflen = 0L, /* cumulative total of buffer lengths */
+               bytes;
+       int curbuf = -1, i;
+       iobuf iovecs1[5];
+
+       FUNC_ENTRY;
+       pw = SocketBuffer_getWrite(socket);
+
+#if defined(OPENSSL)
+       if (pw->ssl)
+       {
+               rc = SSLSocket_continueWrite(pw);
+               goto exit;
+       }
+#endif
+
+       for (i = 0; i < pw->count; ++i)
+       {
+               if (pw->bytes <= curbuflen)
+               { /* if previously written length is less than the buffer we 
are currently looking at,
+                               add the whole buffer */
+                       iovecs1[++curbuf].iov_len = pw->iovecs[i].iov_len;
+                       iovecs1[curbuf].iov_base = pw->iovecs[i].iov_base;
+               }
+               else if (pw->bytes < curbuflen + pw->iovecs[i].iov_len)
+               { /* if previously written length is in the middle of the 
buffer we are currently looking at,
+                               add some of the buffer */
+                       size_t offset = pw->bytes - curbuflen;
+                       iovecs1[++curbuf].iov_len = pw->iovecs[i].iov_len - 
(ULONG)offset;
+                       iovecs1[curbuf].iov_base = 
(char*)pw->iovecs[i].iov_base + offset;
+                       break;
+               }
+               curbuflen += pw->iovecs[i].iov_len;
+       }
+
+       if ((rc = Socket_writev(socket, iovecs1, curbuf+1, &bytes)) != 
SOCKET_ERROR)
+       {
+               pw->bytes += bytes;
+               if ((rc = (pw->bytes == pw->total)))
+               {  /* topic and payload buffers are freed elsewhere, when all 
references to them have been removed */
+                       for (i = 0; i < pw->count; i++)
+                       {
+                               if (pw->frees[i])
+                                       free(pw->iovecs[i].iov_base);
+                       }
+                       Log(TRACE_MIN, -1, "ContinueWrite: partial write now 
complete for socket %d", socket);
+               }
+               else
+                       Log(TRACE_MIN, -1, "ContinueWrite wrote +%lu bytes on 
socket %d", bytes, socket);
+       }
+#if defined(OPENSSL)
+exit:
+#endif
+       FUNC_EXIT_RC(rc);
+       return rc;
+}
+
+
+/**
+ *  Continue any outstanding writes for a socket set
+ *  @param pwset the set of sockets
+ *  @return completion code
+ */
+int Socket_continueWrites(fd_set* pwset)
+{
+       int rc1 = 0;
+       ListElement* curpending = s.write_pending->first;
+
+       FUNC_ENTRY;
+       while (curpending)
+       {
+               int socket = *(int*)(curpending->content);
+               if (FD_ISSET(socket, pwset) && Socket_continueWrite(socket))
+               {
+                       if (!SocketBuffer_writeComplete(socket))
+                               Log(LOG_SEVERE, -1, "Failed to remove pending 
write from socket buffer list");
+                       FD_CLR(socket, &(s.pending_wset));
+                       if (!ListRemove(s.write_pending, curpending->content))
+                       {
+                               Log(LOG_SEVERE, -1, "Failed to remove pending 
write from list");
+                               ListNextElement(s.write_pending, &curpending);
+                       }
+                       curpending = s.write_pending->current;
+
+                       if (writecomplete)
+                               (*writecomplete)(socket);
+               }
+               else
+                       ListNextElement(s.write_pending, &curpending);
+       }
+       FUNC_EXIT_RC(rc1);
+       return rc1;
+}
+
+
+/**
+ *  Convert a numeric address to character string
+ *  @param sa  socket numerical address
+ *  @param sock socket
+ *  @return the peer information
+ */
+char* Socket_getaddrname(struct sockaddr* sa, int sock)
+{
+/**
+ * maximum length of the address string
+ */
+#define ADDRLEN INET6_ADDRSTRLEN+1
+/**
+ * maximum length of the port string
+ */
+#define PORTLEN 10
+       static char addr_string[ADDRLEN + PORTLEN];
+
+#if defined(WIN32) || defined(WIN64)
+       int buflen = ADDRLEN*2;
+       wchar_t buf[ADDRLEN*2];
+       if (WSAAddressToStringW(sa, sizeof(struct sockaddr_in6), NULL, buf, 
(LPDWORD)&buflen) == SOCKET_ERROR)
+               Socket_error("WSAAddressToString", sock);
+       else
+               wcstombs(addr_string, buf, sizeof(addr_string));
+       /* TODO: append the port information - format: [00:00:00::]:port */
+       /* strcpy(&addr_string[strlen(addr_string)], "what?"); */
+#else
+       struct sockaddr_in *sin = (struct sockaddr_in *)sa;
+       inet_ntop(sin->sin_family, &sin->sin_addr, addr_string, ADDRLEN);
+       sprintf(&addr_string[strlen(addr_string)], ":%d", ntohs(sin->sin_port));
+#endif
+       return addr_string;
+}
+
+
+/**
+ *  Get information about the other end connected to a socket
+ *  @param sock the socket to inquire on
+ *  @return the peer information
+ */
+char* Socket_getpeer(int sock)
+{
+       struct sockaddr_in6 sa;
+       socklen_t sal = sizeof(sa);
+       int rc;
+
+       if ((rc = getpeername(sock, (struct sockaddr*)&sa, &sal)) == 
SOCKET_ERROR)
+       {
+               Socket_error("getpeername", sock);
+               return "unknown";
+       }
+
+       return Socket_getaddrname((struct sockaddr*)&sa, sock);
+}
+
+
+#if defined(Socket_TEST)
+
+int main(int argc, char *argv[])
+{
+       Socket_connect("127.0.0.1", 1883);
+       Socket_connect("localhost", 1883);
+       Socket_connect("loadsadsacalhost", 1883);
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a8703b5c/thirdparty/paho.mqtt.c/src/Socket.h
----------------------------------------------------------------------
diff --git a/thirdparty/paho.mqtt.c/src/Socket.h 
b/thirdparty/paho.mqtt.c/src/Socket.h
new file mode 100644
index 0000000..dbf21b4
--- /dev/null
+++ b/thirdparty/paho.mqtt.c/src/Socket.h
@@ -0,0 +1,143 @@
+/*******************************************************************************
+ * Copyright (c) 2009, 2014 IBM Corp.
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * and Eclipse Distribution License v1.0 which accompany this distribution.
+ *
+ * The Eclipse Public License is available at
+ *    http://www.eclipse.org/legal/epl-v10.html
+ * and the Eclipse Distribution License is available at
+ *   http://www.eclipse.org/org/documents/edl-v10.php.
+ *
+ * Contributors:
+ *    Ian Craggs - initial implementation and documentation
+ *    Ian Craggs - async client updates
+ 
*******************************************************************************/
+
+#if !defined(SOCKET_H)
+#define SOCKET_H
+
+#include <sys/types.h>
+
+#if defined(WIN32) || defined(WIN64)
+#include <winsock2.h>
+#include <ws2tcpip.h>
+#define MAXHOSTNAMELEN 256
+#if !defined(SSLSOCKET_H)
+#undef EAGAIN
+#define EAGAIN WSAEWOULDBLOCK
+#undef EINTR
+#define EINTR WSAEINTR
+#undef EINPROGRESS
+#define EINPROGRESS WSAEINPROGRESS
+#undef EWOULDBLOCK
+#define EWOULDBLOCK WSAEWOULDBLOCK
+#undef ENOTCONN
+#define ENOTCONN WSAENOTCONN
+#undef ECONNRESET
+#define ECONNRESET WSAECONNRESET
+#undef ETIMEDOUT
+#define ETIMEDOUT WAIT_TIMEOUT
+#endif
+#define ioctl ioctlsocket
+#define socklen_t int
+#else
+#define INVALID_SOCKET SOCKET_ERROR
+#include <sys/socket.h>
+#if !defined(_WRS_KERNEL)
+#include <sys/param.h>
+#include <sys/time.h>
+#include <sys/select.h>
+#include <sys/uio.h>
+#else
+#include <selectLib.h>
+#endif
+#include <netinet/in.h>
+#include <netinet/tcp.h>
+#include <arpa/inet.h>
+#include <netdb.h>
+#include <stdio.h>
+#include <unistd.h>
+#include <errno.h>
+#include <fcntl.h>
+#include <unistd.h>
+#define ULONG size_t
+#endif
+
+/** socket operation completed successfully */
+#define TCPSOCKET_COMPLETE 0
+#if !defined(SOCKET_ERROR)
+       /** error in socket operation */
+       #define SOCKET_ERROR -1
+#endif
+/** must be the same as SOCKETBUFFER_INTERRUPTED */
+#define TCPSOCKET_INTERRUPTED -22
+#define SSL_FATAL -3
+
+#if !defined(INET6_ADDRSTRLEN)
+#define INET6_ADDRSTRLEN 46 /** only needed for gcc/cygwin on windows */
+#endif
+
+
+#if !defined(max)
+#define max(A,B) ( (A) > (B) ? (A):(B))
+#endif
+
+#include "LinkedList.h"
+
+/*BE
+def FD_SET
+{
+   128 n8 "data"
+}
+
+def SOCKETS
+{
+       FD_SET "rset"
+       FD_SET "rset_saved"
+       n32 dec "maxfdp1"
+       n32 ptr INTList "clientsds"
+       n32 ptr INTItem "cur_clientsds"
+       n32 ptr INTList "connect_pending"
+       n32 ptr INTList "write_pending"
+       FD_SET "pending_wset"
+}
+BE*/
+
+
+/**
+ * Structure to hold all socket data for the module
+ */
+typedef struct
+{
+       fd_set rset, /**< socket read set (see select doc) */
+               rset_saved; /**< saved socket read set */
+       int maxfdp1; /**< max descriptor used +1 (again see select doc) */
+       List* clientsds; /**< list of client socket descriptors */
+       ListElement* cur_clientsds; /**< current client socket descriptor 
(iterator) */
+       List* connect_pending; /**< list of sockets for which a connect is 
pending */
+       List* write_pending; /**< list of sockets for which a write is pending 
*/
+       fd_set pending_wset; /**< socket pending write set for select */
+} Sockets;
+
+
+void Socket_outInitialize(void);
+void Socket_outTerminate(void);
+int Socket_getReadySocket(int more_work, struct timeval *tp);
+int Socket_getch(int socket, char* c);
+char *Socket_getdata(int socket, size_t bytes, size_t* actual_len);
+int Socket_putdatas(int socket, char* buf0, size_t buf0len, int count, char** 
buffers, size_t* buflens, int* frees);
+void Socket_close(int socket);
+int Socket_new(char* addr, int port, int* socket);
+
+int Socket_noPendingWrites(int socket);
+char* Socket_getpeer(int sock);
+
+void Socket_addPendingWrite(int socket);
+void Socket_clearPendingWrite(int socket);
+
+typedef void Socket_writeComplete(int socket);
+void Socket_setWriteCompleteCallback(Socket_writeComplete*);
+
+#endif /* SOCKET_H */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a8703b5c/thirdparty/paho.mqtt.c/src/SocketBuffer.c
----------------------------------------------------------------------
diff --git a/thirdparty/paho.mqtt.c/src/SocketBuffer.c 
b/thirdparty/paho.mqtt.c/src/SocketBuffer.c
new file mode 100644
index 0000000..ba640f1
--- /dev/null
+++ b/thirdparty/paho.mqtt.c/src/SocketBuffer.c
@@ -0,0 +1,413 @@
+/*******************************************************************************
+ * Copyright (c) 2009, 2017 IBM Corp.
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * and Eclipse Distribution License v1.0 which accompany this distribution.
+ *
+ * The Eclipse Public License is available at
+ *    http://www.eclipse.org/legal/epl-v10.html
+ * and the Eclipse Distribution License is available at
+ *   http://www.eclipse.org/org/documents/edl-v10.php.
+ *
+ * Contributors:
+ *    Ian Craggs - initial API and implementation and/or initial documentation
+ *    Ian Craggs, Allan Stockdill-Mander - SSL updates
+ *    Ian Craggs - fix for issue #244, issue #20
+ 
*******************************************************************************/
+
+/**
+ * @file
+ * \brief Socket buffering related functions
+ *
+ * Some other related functions are in the Socket module
+ */
+#include "SocketBuffer.h"
+#include "LinkedList.h"
+#include "Log.h"
+#include "Messages.h"
+#include "StackTrace.h"
+
+#include <stdlib.h>
+#include <stdio.h>
+#include <string.h>
+
+#include "Heap.h"
+
+#if defined(WIN32) || defined(WIN64)
+#define iov_len len
+#define iov_base buf
+#endif
+
+/**
+ * Default input queue buffer
+ */
+static socket_queue* def_queue;
+
+/**
+ * List of queued input buffers
+ */
+static List* queues;
+
+/**
+ * List of queued write buffers
+ */
+static List writes;
+
+
+int socketcompare(void* a, void* b);
+void SocketBuffer_newDefQ(void);
+void SocketBuffer_freeDefQ(void);
+int pending_socketcompare(void* a, void* b);
+
+
+/**
+ * List callback function for comparing socket_queues by socket
+ * @param a first integer value
+ * @param b second integer value
+ * @return boolean indicating whether a and b are equal
+ */
+int socketcompare(void* a, void* b)
+{
+       return ((socket_queue*)a)->socket == *(int*)b;
+}
+
+
+/**
+ * Create a new default queue when one has just been used.
+ */
+void SocketBuffer_newDefQ(void)
+{
+       def_queue = malloc(sizeof(socket_queue));
+       def_queue->buflen = 1000;
+       def_queue->buf = malloc(def_queue->buflen);
+       def_queue->socket = def_queue->index = 0;
+       def_queue->buflen = def_queue->datalen = 0;
+}
+
+
+/**
+ * Initialize the socketBuffer module
+ */
+void SocketBuffer_initialize(void)
+{
+       FUNC_ENTRY;
+       SocketBuffer_newDefQ();
+       queues = ListInitialize();
+       ListZero(&writes);
+       FUNC_EXIT;
+}
+
+
+/**
+ * Free the default queue memory
+ */
+void SocketBuffer_freeDefQ(void)
+{
+       free(def_queue->buf);
+       free(def_queue);
+}
+
+
+/**
+ * Terminate the socketBuffer module
+ */
+void SocketBuffer_terminate(void)
+{
+       ListElement* cur = NULL;
+       ListEmpty(&writes);
+
+       FUNC_ENTRY;
+       while (ListNextElement(queues, &cur))
+               free(((socket_queue*)(cur->content))->buf);
+       ListFree(queues);
+       SocketBuffer_freeDefQ();
+       FUNC_EXIT;
+}
+
+
+/**
+ * Cleanup any buffers for a specific socket
+ * @param socket the socket to clean up
+ */
+void SocketBuffer_cleanup(int socket)
+{
+       FUNC_ENTRY;
+       SocketBuffer_writeComplete(socket); /* clean up write buffers */
+       if (ListFindItem(queues, &socket, socketcompare))
+       {
+               free(((socket_queue*)(queues->current->content))->buf);
+               ListRemove(queues, queues->current->content);
+       }
+       if (def_queue->socket == socket)
+       {
+               def_queue->socket = def_queue->index = 0;
+               def_queue->headerlen = def_queue->datalen = 0;
+       }
+       FUNC_EXIT;
+}
+
+
+/**
+ * Get any queued data for a specific socket
+ * @param socket the socket to get queued data for
+ * @param bytes the number of bytes of data to retrieve
+ * @param actual_len the actual length returned
+ * @return the actual data
+ */
+char* SocketBuffer_getQueuedData(int socket, size_t bytes, size_t* actual_len)
+{
+       socket_queue* queue = NULL;
+
+       FUNC_ENTRY;
+       if (ListFindItem(queues, &socket, socketcompare))
+       {  /* if there is queued data for this socket, add any data read to it 
*/
+               queue = (socket_queue*)(queues->current->content);
+               *actual_len = queue->datalen;
+       }
+       else
+       {
+               *actual_len = 0;
+               queue = def_queue;
+       }
+       if (bytes > queue->buflen)
+       {
+               if (queue->datalen > 0)
+               {
+                       void* newmem = malloc(bytes);
+                       memcpy(newmem, queue->buf, queue->datalen);
+                       free(queue->buf);
+                       queue->buf = newmem;
+               }
+               else
+                       queue->buf = realloc(queue->buf, bytes);
+               queue->buflen = bytes;
+       }
+
+       FUNC_EXIT;
+       return queue->buf;
+}
+
+
+/**
+ * Get any queued character for a specific socket
+ * @param socket the socket to get queued data for
+ * @param c the character returned if any
+ * @return completion code
+ */
+int SocketBuffer_getQueuedChar(int socket, char* c)
+{
+       int rc = SOCKETBUFFER_INTERRUPTED;
+
+       FUNC_ENTRY;
+       if (ListFindItem(queues, &socket, socketcompare))
+       {  /* if there is queued data for this socket, read that first */
+               socket_queue* queue = (socket_queue*)(queues->current->content);
+               if (queue->index < queue->headerlen)
+               {
+                       *c = queue->fixed_header[(queue->index)++];
+                       Log(TRACE_MAX, -1, "index is now %d, headerlen %d", 
queue->index, queue->headerlen);
+                       rc = SOCKETBUFFER_COMPLETE;
+                       goto exit;
+               }
+               else if (queue->index > 4)
+               {
+                       Log(LOG_FATAL, -1, "header is already at full length");
+                       rc = SOCKET_ERROR;
+                       goto exit;
+               }
+       }
+exit:
+       FUNC_EXIT_RC(rc);
+       return rc;  /* there was no queued char if rc is 
SOCKETBUFFER_INTERRUPTED*/
+}
+
+
+/**
+ * A socket read was interrupted so we need to queue data
+ * @param socket the socket to get queued data for
+ * @param actual_len the actual length of data that was read
+ */
+void SocketBuffer_interrupted(int socket, size_t actual_len)
+{
+       socket_queue* queue = NULL;
+
+       FUNC_ENTRY;
+       if (ListFindItem(queues, &socket, socketcompare))
+               queue = (socket_queue*)(queues->current->content);
+       else /* new saved queue */
+       {
+               queue = def_queue;
+               /* if SocketBuffer_queueChar() has not yet been called, then 
the socket number
+                 in def_queue will not have been set.  Issue #244.
+                 If actual_len == 0 then we may not need to do anything - I'll 
leave that
+                 optimization for another time. */
+               queue->socket = socket;
+               ListAppend(queues, def_queue, 
sizeof(socket_queue)+def_queue->buflen);
+               SocketBuffer_newDefQ();
+       }
+       queue->index = 0;
+       queue->datalen = actual_len;
+       FUNC_EXIT;
+}
+
+
+/**
+ * A socket read has now completed so we can get rid of the queue
+ * @param socket the socket for which the operation is now complete
+ * @return pointer to the default queue data
+ */
+char* SocketBuffer_complete(int socket)
+{
+       FUNC_ENTRY;
+       if (ListFindItem(queues, &socket, socketcompare))
+       {
+               socket_queue* queue = (socket_queue*)(queues->current->content);
+               SocketBuffer_freeDefQ();
+               def_queue = queue;
+               ListDetach(queues, queue);
+       }
+       def_queue->socket = def_queue->index = 0;
+       def_queue->headerlen = def_queue->datalen = 0;
+       FUNC_EXIT;
+       return def_queue->buf;
+}
+
+
+/**
+ * A socket operation had now completed so we can get rid of the queue
+ * @param socket the socket for which the operation is now complete
+ * @param c the character to queue
+ */
+void SocketBuffer_queueChar(int socket, char c)
+{
+       int error = 0;
+       socket_queue* curq = def_queue;
+
+       FUNC_ENTRY;
+       if (ListFindItem(queues, &socket, socketcompare))
+               curq = (socket_queue*)(queues->current->content);
+       else if (def_queue->socket == 0)
+       {
+               def_queue->socket = socket;
+               def_queue->index = 0;
+               def_queue->datalen = 0;
+       }
+       else if (def_queue->socket != socket)
+       {
+               Log(LOG_FATAL, -1, "attempt to reuse socket queue");
+               error = 1;
+       }
+       if (curq->index > 4)
+       {
+               Log(LOG_FATAL, -1, "socket queue fixed_header field full");
+               error = 1;
+       }
+       if (!error)
+       {
+               curq->fixed_header[(curq->index)++] = c;
+               curq->headerlen = curq->index;
+       }
+       Log(TRACE_MAX, -1, "queueChar: index is now %d, headerlen %d", 
curq->index, curq->headerlen);
+       FUNC_EXIT;
+}
+
+
+/**
+ * A socket write was interrupted so store the remaining data
+ * @param socket the socket for which the write was interrupted
+ * @param count the number of iovec buffers
+ * @param iovecs buffer array
+ * @param total total data length to be written
+ * @param bytes actual data length that was written
+ */
+#if defined(OPENSSL)
+void SocketBuffer_pendingWrite(int socket, SSL* ssl, int count, iobuf* iovecs, 
int* frees, size_t total, size_t bytes)
+#else
+void SocketBuffer_pendingWrite(int socket, int count, iobuf* iovecs, int* 
frees, size_t total, size_t bytes)
+#endif
+{
+       int i = 0;
+       pending_writes* pw = NULL;
+
+       FUNC_ENTRY;
+       /* store the buffers until the whole packet is written */
+       pw = malloc(sizeof(pending_writes));
+       pw->socket = socket;
+#if defined(OPENSSL)
+       pw->ssl = ssl;
+#endif
+       pw->bytes = bytes;
+       pw->total = total;
+       pw->count = count;
+       for (i = 0; i < count; i++)
+       {
+               pw->iovecs[i] = iovecs[i];
+               pw->frees[i] = frees[i];
+       }
+       ListAppend(&writes, pw, sizeof(pw) + total);
+       FUNC_EXIT;
+}
+
+
+/**
+ * List callback function for comparing pending_writes by socket
+ * @param a first integer value
+ * @param b second integer value
+ * @return boolean indicating whether a and b are equal
+ */
+int pending_socketcompare(void* a, void* b)
+{
+       return ((pending_writes*)a)->socket == *(int*)b;
+}
+
+
+/**
+ * Get any queued write data for a specific socket
+ * @param socket the socket to get queued data for
+ * @return pointer to the queued data or NULL
+ */
+pending_writes* SocketBuffer_getWrite(int socket)
+{
+       ListElement* le = ListFindItem(&writes, &socket, pending_socketcompare);
+       return (le) ? (pending_writes*)(le->content) : NULL;
+}
+
+
+/**
+ * A socket write has now completed so we can get rid of the queue
+ * @param socket the socket for which the operation is now complete
+ * @return completion code, boolean - was the queue removed?
+ */
+int SocketBuffer_writeComplete(int socket)
+{
+       return ListRemoveItem(&writes, &socket, pending_socketcompare);
+}
+
+
+/**
+ * Update the queued write data for a socket in the case of QoS 0 messages.
+ * @param socket the socket for which the operation is now complete
+ * @param topic the topic of the QoS 0 write
+ * @param payload the payload of the QoS 0 write
+ * @return pointer to the updated queued data structure, or NULL
+ */
+pending_writes* SocketBuffer_updateWrite(int socket, char* topic, char* 
payload)
+{
+       pending_writes* pw = NULL;
+       ListElement* le = NULL;
+
+       FUNC_ENTRY;
+       if ((le = ListFindItem(&writes, &socket, pending_socketcompare)) != 
NULL)
+       {
+               pw = (pending_writes*)(le->content);
+               if (pw->count == 4)
+               {
+                       pw->iovecs[2].iov_base = topic;
+                       pw->iovecs[3].iov_base = payload;
+               }
+       }
+
+       FUNC_EXIT;
+       return pw;
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a8703b5c/thirdparty/paho.mqtt.c/src/SocketBuffer.h
----------------------------------------------------------------------
diff --git a/thirdparty/paho.mqtt.c/src/SocketBuffer.h 
b/thirdparty/paho.mqtt.c/src/SocketBuffer.h
new file mode 100644
index 0000000..f7702dc
--- /dev/null
+++ b/thirdparty/paho.mqtt.c/src/SocketBuffer.h
@@ -0,0 +1,84 @@
+/*******************************************************************************
+ * Copyright (c) 2009, 2014 IBM Corp.
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * and Eclipse Distribution License v1.0 which accompany this distribution. 
+ *
+ * The Eclipse Public License is available at 
+ *    http://www.eclipse.org/legal/epl-v10.html
+ * and the Eclipse Distribution License is available at 
+ *   http://www.eclipse.org/org/documents/edl-v10.php.
+ *
+ * Contributors:
+ *    Ian Craggs - initial API and implementation and/or initial documentation
+ *    Ian Craggs, Allan Stockdill-Mander - SSL updates
+ 
*******************************************************************************/
+
+#if !defined(SOCKETBUFFER_H)
+#define SOCKETBUFFER_H
+
+#if defined(WIN32) || defined(WIN64)
+#include <winsock2.h>
+#else
+#include <sys/socket.h>
+#endif
+
+#if defined(OPENSSL)
+#include <openssl/ssl.h>
+#endif
+
+#if defined(WIN32) || defined(WIN64)
+       typedef WSABUF iobuf;
+#else
+       typedef struct iovec iobuf;
+#endif
+
+typedef struct
+{
+       int socket;
+       unsigned int index;
+       size_t headerlen;
+       char fixed_header[5];   /**< header plus up to 4 length bytes */
+       size_t buflen,                  /**< total length of the buffer */
+               datalen;                        /**< current length of data in 
buf */
+       char* buf;
+} socket_queue;
+
+typedef struct
+{
+       int socket, count;
+       size_t total;
+#if defined(OPENSSL)
+       SSL* ssl;
+#endif
+       size_t bytes;
+       iobuf iovecs[5];
+       int frees[5];
+} pending_writes;
+
+#define SOCKETBUFFER_COMPLETE 0
+#if !defined(SOCKET_ERROR)
+       #define SOCKET_ERROR -1
+#endif
+#define SOCKETBUFFER_INTERRUPTED -22 /* must be the same value as 
TCPSOCKET_INTERRUPTED */
+
+void SocketBuffer_initialize(void);
+void SocketBuffer_terminate(void);
+void SocketBuffer_cleanup(int socket);
+char* SocketBuffer_getQueuedData(int socket, size_t bytes, size_t* actual_len);
+int SocketBuffer_getQueuedChar(int socket, char* c);
+void SocketBuffer_interrupted(int socket, size_t actual_len);
+char* SocketBuffer_complete(int socket);
+void SocketBuffer_queueChar(int socket, char c);
+
+#if defined(OPENSSL)
+void SocketBuffer_pendingWrite(int socket, SSL* ssl, int count, iobuf* iovecs, 
int* frees, size_t total, size_t bytes);
+#else
+void SocketBuffer_pendingWrite(int socket, int count, iobuf* iovecs, int* 
frees, size_t total, size_t bytes);
+#endif
+pending_writes* SocketBuffer_getWrite(int socket);
+int SocketBuffer_writeComplete(int socket);
+pending_writes* SocketBuffer_updateWrite(int socket, char* topic, char* 
payload);
+
+#endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a8703b5c/thirdparty/paho.mqtt.c/src/StackTrace.c
----------------------------------------------------------------------
diff --git a/thirdparty/paho.mqtt.c/src/StackTrace.c 
b/thirdparty/paho.mqtt.c/src/StackTrace.c
new file mode 100644
index 0000000..dd55f71
--- /dev/null
+++ b/thirdparty/paho.mqtt.c/src/StackTrace.c
@@ -0,0 +1,207 @@
+/*******************************************************************************
+ * Copyright (c) 2009, 2014 IBM Corp.
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * and Eclipse Distribution License v1.0 which accompany this distribution. 
+ *
+ * The Eclipse Public License is available at 
+ *    http://www.eclipse.org/legal/epl-v10.html
+ * and the Eclipse Distribution License is available at 
+ *   http://www.eclipse.org/org/documents/edl-v10.php.
+ *
+ * Contributors:
+ *    Ian Craggs - initial API and implementation and/or initial documentation
+ 
*******************************************************************************/
+
+#include "StackTrace.h"
+#include "Log.h"
+#include "LinkedList.h"
+
+#include "Clients.h"
+#include "Thread.h"
+
+#include <stdio.h>
+#include <string.h>
+#include <stdlib.h>
+
+#if defined(WIN32) || defined(WIN64)
+#define snprintf _snprintf
+#endif
+
+/*BE
+def STACKENTRY
+{
+       n32 ptr STRING open "name"
+       n32 dec "line"
+}
+
+defList(STACKENTRY)
+BE*/
+
+#define MAX_STACK_DEPTH 50
+#define MAX_FUNCTION_NAME_LENGTH 30
+#define MAX_THREADS 255
+
+typedef struct
+{
+       thread_id_type threadid;
+       char name[MAX_FUNCTION_NAME_LENGTH];
+       int line;
+} stackEntry;
+
+typedef struct
+{
+       thread_id_type id;
+       int maxdepth;
+       int current_depth;
+       stackEntry callstack[MAX_STACK_DEPTH];
+} threadEntry;
+
+#include "StackTrace.h"
+
+static int thread_count = 0;
+static threadEntry threads[MAX_THREADS];
+static threadEntry *cur_thread = NULL;
+
+#if defined(WIN32) || defined(WIN64)
+mutex_type stack_mutex;
+#else
+static pthread_mutex_t stack_mutex_store = PTHREAD_MUTEX_INITIALIZER;
+static mutex_type stack_mutex = &stack_mutex_store;
+#endif
+
+
+int setStack(int create);
+
+
+int setStack(int create)
+{
+       int i = -1;
+       thread_id_type curid = Thread_getid();
+
+       cur_thread = NULL;
+       for (i = 0; i < MAX_THREADS && i < thread_count; ++i)
+       {
+               if (threads[i].id == curid)
+               {
+                       cur_thread = &threads[i];
+                       break;
+               }
+       }
+
+       if (cur_thread == NULL && create && thread_count < MAX_THREADS)
+       {
+               cur_thread = &threads[thread_count];
+               cur_thread->id = curid;
+               cur_thread->maxdepth = 0;
+               cur_thread->current_depth = 0;
+               ++thread_count;
+       }
+       return cur_thread != NULL; /* good == 1 */
+}
+
+void StackTrace_entry(const char* name, int line, enum LOG_LEVELS trace_level)
+{
+       Thread_lock_mutex(stack_mutex);
+       if (!setStack(1))
+               goto exit;
+       if (trace_level != -1)
+               Log_stackTrace(trace_level, 9, (int)cur_thread->id, 
cur_thread->current_depth, name, line, NULL);
+       strncpy(cur_thread->callstack[cur_thread->current_depth].name, name, 
sizeof(cur_thread->callstack[0].name)-1);
+       cur_thread->callstack[(cur_thread->current_depth)++].line = line;
+       if (cur_thread->current_depth > cur_thread->maxdepth)
+               cur_thread->maxdepth = cur_thread->current_depth;
+       if (cur_thread->current_depth >= MAX_STACK_DEPTH)
+               Log(LOG_FATAL, -1, "Max stack depth exceeded");
+exit:
+       Thread_unlock_mutex(stack_mutex);
+}
+
+
+void StackTrace_exit(const char* name, int line, void* rc, enum LOG_LEVELS 
trace_level)
+{
+       Thread_lock_mutex(stack_mutex);
+       if (!setStack(0))
+               goto exit;
+       if (--(cur_thread->current_depth) < 0)
+               Log(LOG_FATAL, -1, "Minimum stack depth exceeded for thread 
%lu", cur_thread->id);
+       if (strncmp(cur_thread->callstack[cur_thread->current_depth].name, 
name, sizeof(cur_thread->callstack[0].name)-1) != 0)
+               Log(LOG_FATAL, -1, "Stack mismatch. Entry:%s Exit:%s\n", 
cur_thread->callstack[cur_thread->current_depth].name, name);
+       if (trace_level != -1)
+       {
+               if (rc == NULL)
+                       Log_stackTrace(trace_level, 10, (int)cur_thread->id, 
cur_thread->current_depth, name, line, NULL);
+               else
+                       Log_stackTrace(trace_level, 11, (int)cur_thread->id, 
cur_thread->current_depth, name, line, (int*)rc);
+       }
+exit:
+       Thread_unlock_mutex(stack_mutex);
+}
+
+
+void StackTrace_printStack(FILE* dest)
+{
+       FILE* file = stdout;
+       int t = 0;
+
+       if (dest)
+               file = dest;
+       for (t = 0; t < thread_count; ++t)
+       {
+               threadEntry *cur_thread = &threads[t];
+
+               if (cur_thread->id > 0)
+               {
+                       int i = cur_thread->current_depth - 1;
+
+                       fprintf(file, "=========== Start of stack trace for 
thread %lu ==========\n", (unsigned long)cur_thread->id);
+                       if (i >= 0)
+                       {
+                               fprintf(file, "%s (%d)\n", 
cur_thread->callstack[i].name, cur_thread->callstack[i].line);
+                               while (--i >= 0)
+                                       fprintf(file, "   at %s (%d)\n", 
cur_thread->callstack[i].name, cur_thread->callstack[i].line);
+                       }
+                       fprintf(file, "=========== End of stack trace for 
thread %lu ==========\n\n", (unsigned long)cur_thread->id);
+               }
+       }
+       if (file != stdout && file != stderr && file != NULL)
+               fclose(file);
+}
+
+
+char* StackTrace_get(thread_id_type threadid)
+{
+       int bufsize = 256;
+       char* buf = NULL;
+       int t = 0;
+
+       if ((buf = malloc(bufsize)) == NULL)
+               goto exit;
+       buf[0] = '\0';
+       for (t = 0; t < thread_count; ++t)
+       {
+               threadEntry *cur_thread = &threads[t];
+
+               if (cur_thread->id == threadid)
+               {
+                       int i = cur_thread->current_depth - 1;
+                       int curpos = 0;
+
+                       if (i >= 0)
+                       {
+                               curpos += snprintf(&buf[curpos], bufsize - 
curpos -1,
+                                               "%s (%d)\n", 
cur_thread->callstack[i].name, cur_thread->callstack[i].line);
+                               while (--i >= 0)
+                                       curpos += snprintf(&buf[curpos], 
bufsize - curpos -1,
+                                                       "   at %s (%d)\n", 
cur_thread->callstack[i].name, cur_thread->callstack[i].line);
+                               if (buf[--curpos] == '\n')
+                                       buf[curpos] = '\0';
+                       }
+                       break;
+               }
+       }
+exit:
+       return buf;
+}
+

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a8703b5c/thirdparty/paho.mqtt.c/src/StackTrace.h
----------------------------------------------------------------------
diff --git a/thirdparty/paho.mqtt.c/src/StackTrace.h 
b/thirdparty/paho.mqtt.c/src/StackTrace.h
new file mode 100644
index 0000000..f4ebba2
--- /dev/null
+++ b/thirdparty/paho.mqtt.c/src/StackTrace.h
@@ -0,0 +1,71 @@
+/*******************************************************************************
+ * Copyright (c) 2009, 2014 IBM Corp.
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * and Eclipse Distribution License v1.0 which accompany this distribution. 
+ *
+ * The Eclipse Public License is available at 
+ *    http://www.eclipse.org/legal/epl-v10.html
+ * and the Eclipse Distribution License is available at 
+ *   http://www.eclipse.org/org/documents/edl-v10.php.
+ *
+ * Contributors:
+ *    Ian Craggs - initial API and implementation and/or initial documentation
+ 
*******************************************************************************/
+
+#ifndef STACKTRACE_H_
+#define STACKTRACE_H_
+
+#include <stdio.h>
+#include "Log.h"
+#include "Thread.h"
+
+#if defined(NOSTACKTRACE)
+#define FUNC_ENTRY
+#define FUNC_ENTRY_NOLOG
+#define FUNC_ENTRY_MED
+#define FUNC_ENTRY_MAX
+#define FUNC_EXIT
+#define FUNC_EXIT_NOLOG
+#define FUNC_EXIT_MED
+#define FUNC_EXIT_MAX
+#define FUNC_EXIT_RC(x)
+#define FUNC_EXIT_MED_RC(x)
+#define FUNC_EXIT_MAX_RC(x)
+#else
+#if defined(WIN32) || defined(WIN64)
+#define inline __inline
+#define FUNC_ENTRY StackTrace_entry(__FUNCTION__, __LINE__, TRACE_MINIMUM)
+#define FUNC_ENTRY_NOLOG StackTrace_entry(__FUNCTION__, __LINE__, -1)
+#define FUNC_ENTRY_MED StackTrace_entry(__FUNCTION__, __LINE__, TRACE_MEDIUM)
+#define FUNC_ENTRY_MAX StackTrace_entry(__FUNCTION__, __LINE__, TRACE_MAXIMUM)
+#define FUNC_EXIT StackTrace_exit(__FUNCTION__, __LINE__, NULL, TRACE_MINIMUM)
+#define FUNC_EXIT_NOLOG StackTrace_exit(__FUNCTION__, __LINE__, -1)
+#define FUNC_EXIT_MED StackTrace_exit(__FUNCTION__, __LINE__, NULL, 
TRACE_MEDIUM)
+#define FUNC_EXIT_MAX StackTrace_exit(__FUNCTION__, __LINE__, NULL, 
TRACE_MAXIMUM)
+#define FUNC_EXIT_RC(x) StackTrace_exit(__FUNCTION__, __LINE__, &x, 
TRACE_MINIMUM)
+#define FUNC_EXIT_MED_RC(x) StackTrace_exit(__FUNCTION__, __LINE__, &x, 
TRACE_MEDIUM)
+#define FUNC_EXIT_MAX_RC(x) StackTrace_exit(__FUNCTION__, __LINE__, &x, 
TRACE_MAXIMUM)
+#else
+#define FUNC_ENTRY StackTrace_entry(__func__, __LINE__, TRACE_MINIMUM)
+#define FUNC_ENTRY_NOLOG StackTrace_entry(__func__, __LINE__, -1)
+#define FUNC_ENTRY_MED StackTrace_entry(__func__, __LINE__, TRACE_MEDIUM)
+#define FUNC_ENTRY_MAX StackTrace_entry(__func__, __LINE__, TRACE_MAXIMUM)
+#define FUNC_EXIT StackTrace_exit(__func__, __LINE__, NULL, TRACE_MINIMUM)
+#define FUNC_EXIT_NOLOG StackTrace_exit(__func__, __LINE__, NULL, -1)
+#define FUNC_EXIT_MED StackTrace_exit(__func__, __LINE__, NULL, TRACE_MEDIUM)
+#define FUNC_EXIT_MAX StackTrace_exit(__func__, __LINE__, NULL, TRACE_MAXIMUM)
+#define FUNC_EXIT_RC(x) StackTrace_exit(__func__, __LINE__, &x, TRACE_MINIMUM)
+#define FUNC_EXIT_MED_RC(x) StackTrace_exit(__func__, __LINE__, &x, 
TRACE_MEDIUM)
+#define FUNC_EXIT_MAX_RC(x) StackTrace_exit(__func__, __LINE__, &x, 
TRACE_MAXIMUM)
+#endif
+#endif
+
+void StackTrace_entry(const char* name, int line, enum LOG_LEVELS trace);
+void StackTrace_exit(const char* name, int line, void* return_value, enum 
LOG_LEVELS trace);
+
+void StackTrace_printStack(FILE* dest);
+char* StackTrace_get(thread_id_type);
+
+#endif /* STACKTRACE_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a8703b5c/thirdparty/paho.mqtt.c/src/Thread.c
----------------------------------------------------------------------
diff --git a/thirdparty/paho.mqtt.c/src/Thread.c 
b/thirdparty/paho.mqtt.c/src/Thread.c
new file mode 100644
index 0000000..37aaa58
--- /dev/null
+++ b/thirdparty/paho.mqtt.c/src/Thread.c
@@ -0,0 +1,462 @@
+/*******************************************************************************
+ * Copyright (c) 2009, 2017 IBM Corp.
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * and Eclipse Distribution License v1.0 which accompany this distribution.
+ *
+ * The Eclipse Public License is available at
+ *    http://www.eclipse.org/legal/epl-v10.html
+ * and the Eclipse Distribution License is available at
+ *   http://www.eclipse.org/org/documents/edl-v10.php.
+ *
+ * Contributors:
+ *    Ian Craggs - initial implementation
+ *    Ian Craggs, Allan Stockdill-Mander - async client updates
+ *    Ian Craggs - bug #415042 - start Linux thread as disconnected
+ *    Ian Craggs - fix for bug #420851
+ *    Ian Craggs - change MacOS semaphore implementation
+ 
*******************************************************************************/
+
+/**
+ * @file
+ * \brief Threading related functions
+ *
+ * Used to create platform independent threading functions
+ */
+
+
+#include "Thread.h"
+#if defined(THREAD_UNIT_TESTS)
+#define NOSTACKTRACE
+#endif
+#include "StackTrace.h"
+
+#undef malloc
+#undef realloc
+#undef free
+
+#if !defined(WIN32) && !defined(WIN64)
+#include <errno.h>
+#include <unistd.h>
+#include <sys/time.h>
+#include <fcntl.h>
+#include <stdio.h>
+#include <sys/stat.h>
+#include <limits.h>
+#endif
+#include <stdlib.h>
+
+#include "OsWrapper.h"
+
+/**
+ * Start a new thread
+ * @param fn the function to run, must be of the correct signature
+ * @param parameter pointer to the function parameter, can be NULL
+ * @return the new thread
+ */
+thread_type Thread_start(thread_fn fn, void* parameter)
+{
+#if defined(WIN32) || defined(WIN64)
+       thread_type thread = NULL;
+#else
+       thread_type thread = 0;
+       pthread_attr_t attr;
+#endif
+
+       FUNC_ENTRY;
+#if defined(WIN32) || defined(WIN64)
+       thread = CreateThread(NULL, 0, fn, parameter, 0, NULL);
+#else
+       pthread_attr_init(&attr);
+       pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
+       if (pthread_create(&thread, &attr, fn, parameter) != 0)
+               thread = 0;
+       pthread_attr_destroy(&attr);
+#endif
+       FUNC_EXIT;
+       return thread;
+}
+
+
+/**
+ * Create a new mutex
+ * @return the new mutex
+ */
+mutex_type Thread_create_mutex(void)
+{
+       mutex_type mutex = NULL;
+       int rc = 0;
+
+       FUNC_ENTRY;
+       #if defined(WIN32) || defined(WIN64)
+               mutex = CreateMutex(NULL, 0, NULL);
+               if (mutex == NULL)
+                       rc = GetLastError();
+       #else
+               mutex = malloc(sizeof(pthread_mutex_t));
+               rc = pthread_mutex_init(mutex, NULL);
+       #endif
+       FUNC_EXIT_RC(rc);
+       return mutex;
+}
+
+
+/**
+ * Lock a mutex which has alrea
+ * @return completion code, 0 is success
+ */
+int Thread_lock_mutex(mutex_type mutex)
+{
+       int rc = -1;
+
+       /* don't add entry/exit trace points as the stack log uses mutexes - 
recursion beckons */
+       #if defined(WIN32) || defined(WIN64)
+               /* WaitForSingleObject returns WAIT_OBJECT_0 (0), on success */
+               rc = WaitForSingleObject(mutex, INFINITE);
+       #else
+               rc = pthread_mutex_lock(mutex);
+       #endif
+
+       return rc;
+}
+
+
+/**
+ * Unlock a mutex which has already been locked
+ * @param mutex the mutex
+ * @return completion code, 0 is success
+ */
+int Thread_unlock_mutex(mutex_type mutex)
+{
+       int rc = -1;
+
+       /* don't add entry/exit trace points as the stack log uses mutexes - 
recursion beckons */
+       #if defined(WIN32) || defined(WIN64)
+               /* if ReleaseMutex fails, the return value is 0 */
+               if (ReleaseMutex(mutex) == 0)
+                       rc = GetLastError();
+               else
+                       rc = 0;
+       #else
+               rc = pthread_mutex_unlock(mutex);
+       #endif
+
+       return rc;
+}
+
+
+/**
+ * Destroy a mutex which has already been created
+ * @param mutex the mutex
+ */
+void Thread_destroy_mutex(mutex_type mutex)
+{
+       int rc = 0;
+
+       FUNC_ENTRY;
+       #if defined(WIN32) || defined(WIN64)
+               rc = CloseHandle(mutex);
+       #else
+               rc = pthread_mutex_destroy(mutex);
+               free(mutex);
+       #endif
+       FUNC_EXIT_RC(rc);
+}
+
+
+/**
+ * Get the thread id of the thread from which this function is called
+ * @return thread id, type varying according to OS
+ */
+thread_id_type Thread_getid(void)
+{
+       #if defined(WIN32) || defined(WIN64)
+               return GetCurrentThreadId();
+       #else
+               return pthread_self();
+       #endif
+}
+
+
+/**
+ * Create a new semaphore
+ * @return the new condition variable
+ */
+sem_type Thread_create_sem(void)
+{
+       sem_type sem = NULL;
+       int rc = 0;
+
+       FUNC_ENTRY;
+       #if defined(WIN32) || defined(WIN64)
+               sem = CreateEvent(
+                       NULL,               /* default security attributes */
+                       FALSE,              /* manual-reset event? */
+                       FALSE,              /* initial state is nonsignaled */
+                       NULL                /* object name */
+                       );
+       #elif defined(OSX)
+               sem = dispatch_semaphore_create(0L);
+               rc = (sem == NULL) ? -1 : 0;
+       #else
+               sem = malloc(sizeof(sem_t));
+               rc = sem_init(sem, 0, 0);
+       #endif
+       FUNC_EXIT_RC(rc);
+       return sem;
+}
+
+
+/**
+ * Wait for a semaphore to be posted, or timeout.
+ * @param sem the semaphore
+ * @param timeout the maximum time to wait, in milliseconds
+ * @return completion code
+ */
+int Thread_wait_sem(sem_type sem, int timeout)
+{
+/* sem_timedwait is the obvious call to use, but seemed not to work on the 
Viper,
+ * so I've used trywait in a loop instead. Ian Craggs 23/7/2010
+ */
+       int rc = -1;
+#if !defined(WIN32) && !defined(WIN64) && !defined(OSX)
+#define USE_TRYWAIT
+#if defined(USE_TRYWAIT)
+       int i = 0;
+       int interval = 10000; /* 10000 microseconds: 10 milliseconds */
+       int count = (1000 * timeout) / interval; /* how many intervals in 
timeout period */
+#else
+       struct timespec ts;
+#endif
+#endif
+
+       FUNC_ENTRY;
+       #if defined(WIN32) || defined(WIN64)
+               rc = WaitForSingleObject(sem, timeout < 0 ? 0 : timeout);
+  #elif defined(OSX)
+               rc = (int)dispatch_semaphore_wait(sem, 
dispatch_time(DISPATCH_TIME_NOW, (int64_t)timeout*1000000L));
+       #elif defined(USE_TRYWAIT)
+               while (++i < count && (rc = sem_trywait(sem)) != 0)
+               {
+                       if (rc == -1 && ((rc = errno) != EAGAIN))
+                       {
+                               rc = 0;
+                               break;
+                       }
+                       usleep(interval); /* microseconds - .1 of a second */
+               }
+       #else
+               if (clock_gettime(CLOCK_REALTIME, &ts) != -1)
+               {
+                       ts.tv_sec += timeout;
+                       rc = sem_timedwait(sem, &ts);
+               }
+       #endif
+
+       FUNC_EXIT_RC(rc);
+       return rc;
+}
+
+
+/**
+ * Check to see if a semaphore has been posted, without waiting.
+ * @param sem the semaphore
+ * @return 0 (false) or 1 (true)
+ */
+int Thread_check_sem(sem_type sem)
+{
+#if defined(WIN32) || defined(WIN64)
+       return WaitForSingleObject(sem, 0) == WAIT_OBJECT_0;
+#elif defined(OSX)
+  return dispatch_semaphore_wait(sem, DISPATCH_TIME_NOW) == 0;
+#else
+       int semval = -1;
+       sem_getvalue(sem, &semval);
+       return semval > 0;
+#endif
+}
+
+
+/**
+ * Post a semaphore
+ * @param sem the semaphore
+ * @return completion code
+ */
+int Thread_post_sem(sem_type sem)
+{
+       int rc = 0;
+
+       FUNC_ENTRY;
+       #if defined(WIN32) || defined(WIN64)
+               if (SetEvent(sem) == 0)
+                       rc = GetLastError();
+       #elif defined(OSX)
+               rc = (int)dispatch_semaphore_signal(sem);
+       #else
+               if (sem_post(sem) == -1)
+                       rc = errno;
+       #endif
+
+       FUNC_EXIT_RC(rc);
+  return rc;
+}
+
+
+/**
+ * Destroy a semaphore which has already been created
+ * @param sem the semaphore
+ */
+int Thread_destroy_sem(sem_type sem)
+{
+       int rc = 0;
+
+       FUNC_ENTRY;
+       #if defined(WIN32) || defined(WIN64)
+               rc = CloseHandle(sem);
+  #elif defined(OSX)
+         dispatch_release(sem);
+       #else
+               rc = sem_destroy(sem);
+               free(sem);
+       #endif
+       FUNC_EXIT_RC(rc);
+       return rc;
+}
+
+
+#if !defined(WIN32) && !defined(WIN64)
+/**
+ * Create a new condition variable
+ * @return the condition variable struct
+ */
+cond_type Thread_create_cond(void)
+{
+       cond_type condvar = NULL;
+       int rc = 0;
+
+       FUNC_ENTRY;
+       condvar = malloc(sizeof(cond_type_struct));
+       rc = pthread_cond_init(&condvar->cond, NULL);
+       rc = pthread_mutex_init(&condvar->mutex, NULL);
+
+       FUNC_EXIT_RC(rc);
+       return condvar;
+}
+
+/**
+ * Signal a condition variable
+ * @return completion code
+ */
+int Thread_signal_cond(cond_type condvar)
+{
+       int rc = 0;
+
+       pthread_mutex_lock(&condvar->mutex);
+       rc = pthread_cond_signal(&condvar->cond);
+       pthread_mutex_unlock(&condvar->mutex);
+
+       return rc;
+}
+
+/**
+ * Wait with a timeout (seconds) for condition variable
+ * @return completion code
+ */
+int Thread_wait_cond(cond_type condvar, int timeout)
+{
+       FUNC_ENTRY;
+       int rc = 0;
+       struct timespec cond_timeout;
+       struct timeval cur_time;
+
+       gettimeofday(&cur_time, NULL);
+
+       cond_timeout.tv_sec = cur_time.tv_sec + timeout;
+       cond_timeout.tv_nsec = cur_time.tv_usec * 1000;
+
+       pthread_mutex_lock(&condvar->mutex);
+       rc = pthread_cond_timedwait(&condvar->cond, &condvar->mutex, 
&cond_timeout);
+       pthread_mutex_unlock(&condvar->mutex);
+
+       FUNC_EXIT_RC(rc);
+       return rc;
+}
+
+/**
+ * Destroy a condition variable
+ * @return completion code
+ */
+int Thread_destroy_cond(cond_type condvar)
+{
+       int rc = 0;
+
+       rc = pthread_mutex_destroy(&condvar->mutex);
+       rc = pthread_cond_destroy(&condvar->cond);
+       free(condvar);
+
+       return rc;
+}
+#endif
+
+
+#if defined(THREAD_UNIT_TESTS)
+
+#include <stdio.h>
+
+thread_return_type secondary(void* n)
+{
+       int rc = 0;
+
+       /*
+       cond_type cond = n;
+
+       printf("Secondary thread about to wait\n");
+       rc = Thread_wait_cond(cond);
+       printf("Secondary thread returned from wait %d\n", rc);*/
+
+       sem_type sem = n;
+
+       printf("Secondary thread about to wait\n");
+       rc = Thread_wait_sem(sem);
+       printf("Secondary thread returned from wait %d\n", rc);
+
+       printf("Secondary thread about to wait\n");
+       rc = Thread_wait_sem(sem);
+       printf("Secondary thread returned from wait %d\n", rc);
+       printf("Secondary check sem %d\n", Thread_check_sem(sem));
+
+       return 0;
+}
+
+
+int main(int argc, char *argv[])
+{
+       int rc = 0;
+
+       sem_type sem = Thread_create_sem();
+
+       printf("check sem %d\n", Thread_check_sem(sem));
+
+       printf("post secondary\n");
+       rc = Thread_post_sem(sem);
+       printf("posted secondary %d\n", rc);
+
+       printf("check sem %d\n", Thread_check_sem(sem));
+
+       printf("Starting secondary thread\n");
+       Thread_start(secondary, (void*)sem);
+
+       sleep(3);
+       printf("check sem %d\n", Thread_check_sem(sem));
+
+       printf("post secondary\n");
+       rc = Thread_post_sem(sem);
+       printf("posted secondary %d\n", rc);
+
+       sleep(3);
+
+       printf("Main thread ending\n");
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a8703b5c/thirdparty/paho.mqtt.c/src/Thread.h
----------------------------------------------------------------------
diff --git a/thirdparty/paho.mqtt.c/src/Thread.h 
b/thirdparty/paho.mqtt.c/src/Thread.h
new file mode 100644
index 0000000..995e221
--- /dev/null
+++ b/thirdparty/paho.mqtt.c/src/Thread.h
@@ -0,0 +1,73 @@
+/*******************************************************************************
+ * Copyright (c) 2009, 2017 IBM Corp.
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * and Eclipse Distribution License v1.0 which accompany this distribution.
+ *
+ * The Eclipse Public License is available at
+ *    http://www.eclipse.org/legal/epl-v10.html
+ * and the Eclipse Distribution License is available at
+ *   http://www.eclipse.org/org/documents/edl-v10.php.
+ *
+ * Contributors:
+ *    Ian Craggs - initial implementation
+ *    Ian Craggs, Allan Stockdill-Mander - async client updates
+ *    Ian Craggs - fix for bug #420851
+ *    Ian Craggs - change MacOS semaphore implementation
+ 
*******************************************************************************/
+#include "MQTTClient.h"
+
+#if !defined(THREAD_H)
+#define THREAD_H
+
+#if defined(WIN32) || defined(WIN64)
+       #include <windows.h>
+       #define thread_type HANDLE
+       #define thread_id_type DWORD
+       #define thread_return_type DWORD
+       #define thread_fn LPTHREAD_START_ROUTINE
+       #define mutex_type HANDLE
+       #define cond_type HANDLE
+       #define sem_type HANDLE
+#else
+       #include <pthread.h>
+
+       #define thread_type pthread_t
+       #define thread_id_type pthread_t
+       #define thread_return_type void*
+       typedef thread_return_type (*thread_fn)(void*);
+       #define mutex_type pthread_mutex_t*
+       typedef struct { pthread_cond_t cond; pthread_mutex_t mutex; } 
cond_type_struct;
+       typedef cond_type_struct *cond_type;
+       #if defined(OSX)
+         #include <dispatch/dispatch.h>
+         typedef dispatch_semaphore_t sem_type;
+       #else
+         #include <semaphore.h>
+         typedef sem_t *sem_type;
+       #endif
+
+       cond_type Thread_create_cond(void);
+       int Thread_signal_cond(cond_type);
+       int Thread_wait_cond(cond_type condvar, int timeout);
+       int Thread_destroy_cond(cond_type);
+#endif
+
+DLLExport thread_type Thread_start(thread_fn, void*);
+
+DLLExport mutex_type Thread_create_mutex();
+DLLExport int Thread_lock_mutex(mutex_type);
+DLLExport int Thread_unlock_mutex(mutex_type);
+void Thread_destroy_mutex(mutex_type);
+
+DLLExport thread_id_type Thread_getid();
+
+sem_type Thread_create_sem(void);
+int Thread_wait_sem(sem_type sem, int timeout);
+int Thread_check_sem(sem_type sem);
+int Thread_post_sem(sem_type sem);
+int Thread_destroy_sem(sem_type sem);
+
+
+#endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a8703b5c/thirdparty/paho.mqtt.c/src/Tree.c
----------------------------------------------------------------------
diff --git a/thirdparty/paho.mqtt.c/src/Tree.c 
b/thirdparty/paho.mqtt.c/src/Tree.c
new file mode 100644
index 0000000..13134d6
--- /dev/null
+++ b/thirdparty/paho.mqtt.c/src/Tree.c
@@ -0,0 +1,724 @@
+/*******************************************************************************
+ * Copyright (c) 2009, 2013 IBM Corp.
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * and Eclipse Distribution License v1.0 which accompany this distribution. 
+ *
+ * The Eclipse Public License is available at 
+ *    http://www.eclipse.org/legal/epl-v10.html
+ * and the Eclipse Distribution License is available at 
+ *   http://www.eclipse.org/org/documents/edl-v10.php.
+ *
+ * Contributors:
+ *    Ian Craggs - initial implementation and documentation
+ 
*******************************************************************************/
+
+/** @file
+ * \brief functions which apply to tree structures.
+ *
+ * These trees can hold data of any sort, pointed to by the content pointer of 
the
+ * Node structure.
+ * */
+
+#define NO_HEAP_TRACKING 1
+
+#include "Tree.h"
+
+#include <stdlib.h>
+#include <stdio.h>
+#include <string.h>
+
+#include "Heap.h"
+
+
+int isRed(Node* aNode);
+int isBlack(Node* aNode);
+int TreeWalk(Node* curnode, int depth);
+int TreeMaxDepth(Tree *aTree);
+void TreeRotate(Tree* aTree, Node* curnode, int direction, int index);
+Node* TreeBAASub(Tree* aTree, Node* curnode, int which, int index);
+void TreeBalanceAfterAdd(Tree* aTree, Node* curnode, int index);
+void* TreeAddByIndex(Tree* aTree, void* content, size_t size, int index);
+Node* TreeFindIndex1(Tree* aTree, void* key, int index, int value);
+Node* TreeFindContentIndex(Tree* aTree, void* key, int index);
+Node* TreeMinimum(Node* curnode);
+Node* TreeSuccessor(Node* curnode);
+Node* TreeNextElementIndex(Tree* aTree, Node* curnode, int index);
+Node* TreeBARSub(Tree* aTree, Node* curnode, int which, int index);
+void TreeBalanceAfterRemove(Tree* aTree, Node* curnode, int index);
+void* TreeRemoveIndex(Tree* aTree, void* content, int index);
+
+
+void TreeInitializeNoMalloc(Tree* aTree, int(*compare)(void*, void*, int))
+{
+       memset(aTree, '\0', sizeof(Tree));
+       aTree->heap_tracking = 1;
+       aTree->index[0].compare = compare;
+       aTree->indexes = 1;
+}
+
+/**
+ * Allocates and initializes a new tree structure.
+ * @return a pointer to the new tree structure
+ */
+Tree* TreeInitialize(int(*compare)(void*, void*, int))
+{
+#if defined(UNIT_TESTS)
+       Tree* newt = malloc(sizeof(Tree));
+#else
+       Tree* newt = mymalloc(__FILE__, __LINE__, sizeof(Tree));
+#endif
+       TreeInitializeNoMalloc(newt, compare);
+       return newt;
+}
+
+
+void TreeAddIndex(Tree* aTree, int(*compare)(void*, void*, int))
+{
+       aTree->index[aTree->indexes].compare = compare;
+       ++(aTree->indexes);
+}
+
+
+void TreeFree(Tree* aTree)
+{
+#if defined(UNIT_TESTS)
+       free(aTree);
+#else
+       (aTree->heap_tracking) ? myfree(__FILE__, __LINE__, aTree) : 
free(aTree);
+#endif
+}
+
+
+#define LEFT 0
+#define RIGHT 1
+#if !defined(max)
+#define max(a, b) (a > b) ? a : b;
+#endif
+
+
+
+int isRed(Node* aNode)
+{
+       return (aNode != NULL) && (aNode->red);
+}
+
+
+int isBlack(Node* aNode)
+{
+       return (aNode == NULL) || (aNode->red == 0);
+}
+
+
+int TreeWalk(Node* curnode, int depth)
+{
+       if (curnode)
+       {
+               int left = TreeWalk(curnode->child[LEFT], depth+1);
+               int right = TreeWalk(curnode->child[RIGHT], depth+1);
+               depth = max(left, right);
+               if (curnode->red)
+               {
+                       /*if (isRed(curnode->child[LEFT]) || 
isRed(curnode->child[RIGHT]))
+                       {
+                               printf("red/black tree violation %p\n", 
curnode->content);
+                               exit(-99);
+                       }*/;
+               }
+       }
+       return depth;
+}
+
+
+int TreeMaxDepth(Tree *aTree)
+{
+       int rc = TreeWalk(aTree->index[0].root, 0);
+       /*if (aTree->root->red)
+       {
+               printf("root node should not be red %p\n", 
aTree->root->content);
+               exit(-99);
+       }*/
+       return rc;
+}
+
+
+void TreeRotate(Tree* aTree, Node* curnode, int direction, int index)
+{
+       Node* other = curnode->child[!direction];
+
+       curnode->child[!direction] = other->child[direction];
+       if (other->child[direction] != NULL)
+               other->child[direction]->parent = curnode;
+       other->parent = curnode->parent;
+       if (curnode->parent == NULL)
+               aTree->index[index].root = other;
+       else if (curnode == curnode->parent->child[direction])
+               curnode->parent->child[direction] = other;
+       else
+               curnode->parent->child[!direction] = other;
+       other->child[direction] = curnode;
+       curnode->parent = other;
+}
+
+
+Node* TreeBAASub(Tree* aTree, Node* curnode, int which, int index)
+{
+       Node* uncle = curnode->parent->parent->child[which];
+
+       if (isRed(uncle))
+       {
+               curnode->parent->red = uncle->red = 0;
+               curnode = curnode->parent->parent;
+               curnode->red = 1;
+       }
+       else
+       {
+               if (curnode == curnode->parent->child[which])
+               {
+                       curnode = curnode->parent;
+                       TreeRotate(aTree, curnode, !which, index);
+               }
+               curnode->parent->red = 0;
+               curnode->parent->parent->red = 1;
+               TreeRotate(aTree, curnode->parent->parent, which, index);
+       }
+       return curnode;
+}
+
+
+void TreeBalanceAfterAdd(Tree* aTree, Node* curnode, int index)
+{
+       while (curnode && isRed(curnode->parent) && curnode->parent->parent)
+       {
+               if (curnode->parent == curnode->parent->parent->child[LEFT])
+                       curnode = TreeBAASub(aTree, curnode, RIGHT, index);
+               else
+                       curnode = TreeBAASub(aTree, curnode, LEFT, index);
+  }
+  aTree->index[index].root->red = 0;
+}
+
+
+/**
+ * Add an item to a tree
+ * @param aTree the list to which the item is to be added
+ * @param content the list item content itself
+ * @param size the size of the element
+ */
+void* TreeAddByIndex(Tree* aTree, void* content, size_t size, int index)
+{
+       Node* curparent = NULL;
+       Node* curnode = aTree->index[index].root;
+       Node* newel = NULL;
+       int left = 0;
+       int result = 1;
+       void* rc = NULL;
+
+       while (curnode)
+       {
+               result = aTree->index[index].compare(curnode->content, content, 
1);
+               left = (result > 0);
+               if (result == 0)
+                       break;
+               else
+               {
+                       curparent = curnode;
+                       curnode = curnode->child[left];
+               }
+       }
+       
+       if (result == 0)
+       {
+               if (aTree->allow_duplicates)
+                       exit(-99);
+               {
+                       newel = curnode;
+                       rc = newel->content;
+                       if (index == 0)
+                               aTree->size += (size - curnode->size);
+               }
+       }
+       else
+       {
+               #if defined(UNIT_TESTS)
+                       newel = malloc(sizeof(Node));
+               #else
+                       newel = (aTree->heap_tracking) ? mymalloc(__FILE__, 
__LINE__, sizeof(Node)) : malloc(sizeof(Node));
+               #endif
+               memset(newel, '\0', sizeof(Node));
+               if (curparent)
+                       curparent->child[left] = newel;
+               else
+                       aTree->index[index].root = newel;
+               newel->parent = curparent;
+               newel->red = 1;
+               if (index == 0)
+               {
+                       ++(aTree->count);
+                       aTree->size += size;
+               }
+       }
+       newel->content = content;
+       newel->size = size;
+       TreeBalanceAfterAdd(aTree, newel, index);
+       return rc;
+}
+
+
+void* TreeAdd(Tree* aTree, void* content, size_t size)
+{
+       void* rc = NULL;
+       int i;
+
+       for (i = 0; i < aTree->indexes; ++i)
+               rc = TreeAddByIndex(aTree, content, size, i);
+
+       return rc;
+}
+
+
+Node* TreeFindIndex1(Tree* aTree, void* key, int index, int value)
+{
+       int result = 0;
+       Node* curnode = aTree->index[index].root;
+
+       while (curnode)
+       {
+               result = aTree->index[index].compare(curnode->content, key, 
value);
+               if (result == 0)
+                       break;
+               else
+                       curnode = curnode->child[result > 0];
+       }
+       return curnode;
+}
+
+
+Node* TreeFindIndex(Tree* aTree, void* key, int index)
+{
+       return TreeFindIndex1(aTree, key, index, 0);
+}
+
+
+Node* TreeFindContentIndex(Tree* aTree, void* key, int index)
+{
+       return TreeFindIndex1(aTree, key, index, 1);
+}
+
+
+Node* TreeFind(Tree* aTree, void* key)
+{
+       return TreeFindIndex(aTree, key, 0);
+}
+
+
+Node* TreeMinimum(Node* curnode)
+{
+       if (curnode)
+               while (curnode->child[LEFT])
+                       curnode = curnode->child[LEFT];
+       return curnode;
+}
+
+
+Node* TreeSuccessor(Node* curnode)
+{
+       if (curnode->child[RIGHT])
+               curnode = TreeMinimum(curnode->child[RIGHT]);
+       else
+       {
+               Node* curparent = curnode->parent;
+               while (curparent && curnode == curparent->child[RIGHT])
+               {
+                       curnode = curparent;
+                       curparent = curparent->parent;
+               }
+               curnode = curparent;
+       }
+       return curnode;
+}
+
+
+Node* TreeNextElementIndex(Tree* aTree, Node* curnode, int index)
+{
+       if (curnode == NULL)
+               curnode = TreeMinimum(aTree->index[index].root);
+       else
+               curnode = TreeSuccessor(curnode);
+       return curnode;
+}
+
+
+Node* TreeNextElement(Tree* aTree, Node* curnode)
+{
+       return TreeNextElementIndex(aTree, curnode, 0);
+}
+
+
+Node* TreeBARSub(Tree* aTree, Node* curnode, int which, int index)
+{
+       Node* sibling = curnode->parent->child[which];
+
+       if (isRed(sibling))
+       {
+               sibling->red = 0;
+               curnode->parent->red = 1;
+               TreeRotate(aTree, curnode->parent, !which, index);
+               sibling = curnode->parent->child[which];
+       }
+       if (!sibling)
+               curnode = curnode->parent;
+       else if (isBlack(sibling->child[!which]) && 
isBlack(sibling->child[which]))
+       {
+               sibling->red = 1;
+               curnode = curnode->parent;
+       }
+       else
+       {
+               if (isBlack(sibling->child[which]))
+               {
+                       sibling->child[!which]->red = 0;
+                       sibling->red = 1;
+                       TreeRotate(aTree, sibling, which, index);
+                       sibling = curnode->parent->child[which];
+               }
+               sibling->red = curnode->parent->red;
+               curnode->parent->red = 0;
+               sibling->child[which]->red = 0;
+               TreeRotate(aTree, curnode->parent, !which, index);
+               curnode = aTree->index[index].root;
+       }
+       return curnode;
+}
+
+
+void TreeBalanceAfterRemove(Tree* aTree, Node* curnode, int index)
+{
+       while (curnode != aTree->index[index].root && isBlack(curnode))
+       {
+               /* curnode->content == NULL must equal curnode == NULL */
+               if (((curnode->content) ? curnode : NULL) == 
curnode->parent->child[LEFT])
+                       curnode = TreeBARSub(aTree, curnode, RIGHT, index);
+               else
+                       curnode = TreeBARSub(aTree, curnode, LEFT, index);
+    }
+       curnode->red = 0;
+}
+
+
+/**
+ * Remove an item from a tree
+ * @param aTree the list to which the item is to be added
+ * @param curnode the list item content itself
+ */
+void* TreeRemoveNodeIndex(Tree* aTree, Node* curnode, int index)
+{
+       Node* redundant = curnode;
+       Node* curchild = NULL;
+       size_t size = curnode->size;
+       void* content = curnode->content;
+
+       /* if the node to remove has 0 or 1 children, it can be removed without 
involving another node */
+       if (curnode->child[LEFT] && curnode->child[RIGHT]) /* 2 children */
+               redundant = TreeSuccessor(curnode);     /* now redundant must 
have at most one child */
+
+       curchild = redundant->child[(redundant->child[LEFT] != NULL) ? LEFT : 
RIGHT];
+       if (curchild) /* we could have no children at all */
+               curchild->parent = redundant->parent;
+
+       if (redundant->parent == NULL)
+               aTree->index[index].root = curchild;
+       else
+       {
+               if (redundant == redundant->parent->child[LEFT])
+                       redundant->parent->child[LEFT] = curchild;
+               else
+                       redundant->parent->child[RIGHT] = curchild;
+       }
+
+       if (redundant != curnode)
+       {
+               curnode->content = redundant->content;
+               curnode->size = redundant->size;
+       }
+
+       if (isBlack(redundant))
+       {
+               if (curchild == NULL)
+               {
+                       if (redundant->parent)
+                       {
+                               Node temp;
+                               memset(&temp, '\0', sizeof(Node));
+                               temp.parent = (redundant) ? redundant->parent : 
NULL;
+                               temp.red = 0;
+                               TreeBalanceAfterRemove(aTree, &temp, index);
+                       }
+               }
+               else
+                       TreeBalanceAfterRemove(aTree, curchild, index);
+       }
+
+#if defined(UNIT_TESTS)
+       free(redundant);
+#else
+       (aTree->heap_tracking) ? myfree(__FILE__, __LINE__, redundant) : 
free(redundant);
+#endif
+       if (index == 0)
+       {
+               aTree->size -= size;
+               --(aTree->count);
+       }
+       return content;
+}
+
+
+/**
+ * Remove an item from a tree
+ * @param aTree the list to which the item is to be added
+ * @param curnode the list item content itself
+ */
+void* TreeRemoveIndex(Tree* aTree, void* content, int index)
+{
+       Node* curnode = TreeFindContentIndex(aTree, content, index);
+
+       if (curnode == NULL)
+               return NULL;
+
+       return TreeRemoveNodeIndex(aTree, curnode, index);
+}
+
+
+void* TreeRemove(Tree* aTree, void* content)
+{
+       int i;
+       void* rc = NULL;
+
+       for (i = 0; i < aTree->indexes; ++i)
+               rc = TreeRemoveIndex(aTree, content, i);
+
+       return rc;
+}
+
+
+void* TreeRemoveKeyIndex(Tree* aTree, void* key, int index)
+{
+       Node* curnode = TreeFindIndex(aTree, key, index);
+       void* content = NULL;
+       int i;
+
+       if (curnode == NULL)
+               return NULL;
+
+       content = TreeRemoveNodeIndex(aTree, curnode, index);
+       for (i = 0; i < aTree->indexes; ++i)
+       {
+               if (i != index)
+                       content = TreeRemoveIndex(aTree, content, i);
+       }
+       return content;
+}
+
+
+void* TreeRemoveKey(Tree* aTree, void* key)
+{
+       return TreeRemoveKeyIndex(aTree, key, 0);
+}
+
+
+int TreeIntCompare(void* a, void* b, int content)
+{
+       int i = *((int*)a);
+       int j = *((int*)b);
+
+       /* printf("comparing %d %d\n", *((int*)a), *((int*)b)); */
+       return (i > j) ? -1 : (i == j) ? 0 : 1;
+}
+
+
+int TreePtrCompare(void* a, void* b, int content)
+{
+       return (a > b) ? -1 : (a == b) ? 0 : 1;
+}
+
+
+int TreeStringCompare(void* a, void* b, int content)
+{
+       return strcmp((char*)a, (char*)b);
+}
+
+
+#if defined(UNIT_TESTS)
+
+int check(Tree *t)
+{
+       Node* curnode = NULL;
+       int rc = 0;
+
+       curnode = TreeNextElement(t, curnode);
+       while (curnode)
+       {
+               Node* prevnode = curnode;
+
+               curnode = TreeNextElement(t, curnode);
+
+               if (prevnode && curnode && (*(int*)(curnode->content) < 
*(int*)(prevnode->content)))
+               {
+                       printf("out of order %d < %d\n", 
*(int*)(curnode->content), *(int*)(prevnode->content));
+                       rc = 99;
+               }
+       }
+       return rc;
+}
+
+
+int traverse(Tree *t, int lookfor)
+{
+       Node* curnode = NULL;
+       int rc = 0;
+
+       printf("Traversing\n");
+       curnode = TreeNextElement(t, curnode);
+       /* printf("content int %d\n", *(int*)(curnode->content)); */
+       while (curnode)
+       {
+               Node* prevnode = curnode;
+
+               curnode = TreeNextElement(t, curnode);
+               /* if (curnode)
+                       printf("content int %d\n", *(int*)(curnode->content)); 
*/
+               if (prevnode && curnode && (*(int*)(curnode->content) < 
*(int*)(prevnode->content)))
+               {
+                       printf("out of order %d < %d\n", 
*(int*)(curnode->content), *(int*)(prevnode->content));
+               }
+               if (curnode && (lookfor == *(int*)(curnode->content)))
+                       printf("missing item %d actually found\n", lookfor);
+       }
+       printf("End traverse %d\n", rc);
+       return rc;
+}
+
+
+int test(int limit)
+{
+       int i, *ip, *todelete;
+       Node* current = NULL;
+       Tree* t = TreeInitialize(TreeIntCompare);
+       int rc = 0;
+
+       printf("Tree initialized\n");
+
+       srand(time(NULL));
+
+       ip = malloc(sizeof(int));
+       *ip = 2;
+       TreeAdd(t, (void*)ip, sizeof(int));
+
+       check(t);
+
+       i = 2;
+       void* result = TreeRemove(t, (void*)&i);
+       if (result)
+               free(result);
+
+       int actual[limit];
+       for (i = 0; i < limit; i++)
+       {
+               void* replaced = NULL;
+
+               ip = malloc(sizeof(int));
+               *ip = rand();
+               replaced = TreeAdd(t, (void*)ip, sizeof(int));
+               if (replaced) /* duplicate */
+               {
+                       free(replaced);
+                       actual[i] = -1;
+               }
+               else
+                       actual[i] = *ip;
+               if (i==5)
+                       todelete = ip;
+               printf("Tree element added %d\n",  *ip);
+               if (1 % 1000 == 0)
+               {
+                       rc = check(t);
+                       printf("%d elements, check result %d\n", i+1, rc);
+                       if (rc != 0)
+                               return 88;
+               }
+       }
+
+       check(t);
+
+       for (i = 0; i < limit; i++)
+       {
+               int parm = actual[i];
+
+               if (parm == -1)
+                       continue;
+
+               Node* found = TreeFind(t, (void*)&parm);
+               if (found)
+                       printf("Tree find %d %d\n", parm, 
*(int*)(found->content));
+               else
+               {
+                       printf("%d not found\n", parm);
+                       traverse(t, parm);
+                       return -2;
+               }
+       }
+
+       check(t);
+
+       for (i = limit -1; i >= 0; i--)
+       {
+               int parm = actual[i];
+               void *found;
+
+               if (parm == -1) /* skip duplicate */
+                       continue;
+
+               found = TreeRemove(t, (void*)&parm);
+               if (found)
+               {
+                       printf("%d Tree remove %d %d\n", i, parm, 
*(int*)(found));
+                       free(found);
+               }
+               else
+               {
+                       int count = 0;
+                       printf("%d %d not found\n", i, parm);
+                       traverse(t, parm);
+                       for (i = 0; i < limit; i++)
+                               if (actual[i] == parm)
+                                       ++count;
+                       printf("%d occurs %d times\n", parm, count);
+                       return -2;
+               }
+               if (i % 1000 == 0)
+               {
+                       rc = check(t);
+                       printf("%d elements, check result %d\n", i+1, rc);
+                       if (rc != 0)
+                               return 88;
+               }
+       }
+       printf("finished\n");
+       return 0;
+}
+
+int main(int argc, char *argv[])
+{
+       int rc = 0;
+
+       while (rc == 0)
+               rc = test(999999);
+}
+
+#endif
+
+
+
+
+

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a8703b5c/thirdparty/paho.mqtt.c/src/Tree.h
----------------------------------------------------------------------
diff --git a/thirdparty/paho.mqtt.c/src/Tree.h 
b/thirdparty/paho.mqtt.c/src/Tree.h
new file mode 100644
index 0000000..bbbd014
--- /dev/null
+++ b/thirdparty/paho.mqtt.c/src/Tree.h
@@ -0,0 +1,115 @@
+/*******************************************************************************
+ * Copyright (c) 2009, 2013 IBM Corp.
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * and Eclipse Distribution License v1.0 which accompany this distribution. 
+ *
+ * The Eclipse Public License is available at 
+ *    http://www.eclipse.org/legal/epl-v10.html
+ * and the Eclipse Distribution License is available at 
+ *   http://www.eclipse.org/org/documents/edl-v10.php.
+ *
+ * Contributors:
+ *    Ian Craggs - initial implementation and documentation
+ 
*******************************************************************************/
+
+
+#if !defined(TREE_H)
+#define TREE_H
+
+#include <stdlib.h> /* for size_t definition */
+
+/*BE
+defm defTree(T) // macro to define a tree
+
+def T concat Node
+{
+       n32 ptr T concat Node "parent"
+       n32 ptr T concat Node "left"
+       n32 ptr T concat Node "right"
+       n32 ptr T id2str(T)
+       n32 suppress "size"
+}
+
+
+def T concat Tree
+{
+       struct
+       {
+               n32 ptr T concat Node suppress "root"
+               n32 ptr DATA suppress "compare"
+       } 
+       struct
+       {
+               n32 ptr T concat Node suppress "root"
+               n32 ptr DATA suppress "compare"
+       } 
+       n32 dec "count"
+       n32 dec suppress "size"
+}
+
+endm
+
+defTree(INT)
+defTree(STRING)
+defTree(TMP)
+
+BE*/
+
+/**
+ * Structure to hold all data for one list element
+ */
+typedef struct NodeStruct
+{
+       struct NodeStruct *parent,   /**< pointer to parent tree node, in case 
we need it */
+                                         *child[2]; /**< pointers to child 
tree nodes 0 = left, 1 = right */
+       void* content;                           /**< pointer to element 
content */
+       size_t size;                                     /**< size of content */
+       unsigned int red : 1;
+} Node;
+
+
+/**
+ * Structure to hold all data for one tree
+ */
+typedef struct
+{
+       struct
+       {
+               Node *root;     /**< root node pointer */
+               int (*compare)(void*, void*, int); /**< comparison function */
+       } index[2];
+       int indexes,  /**< no of indexes into tree */
+               count;    /**< no of items */
+       size_t size;  /**< heap storage used */
+       unsigned int heap_tracking : 1; /**< switch on heap tracking for this 
tree? */
+       unsigned int allow_duplicates : 1; /**< switch to allow duplicate 
entries */
+} Tree;
+
+
+Tree* TreeInitialize(int(*compare)(void*, void*, int));
+void TreeInitializeNoMalloc(Tree* aTree, int(*compare)(void*, void*, int));
+void TreeAddIndex(Tree* aTree, int(*compare)(void*, void*, int));
+
+void* TreeAdd(Tree* aTree, void* content, size_t size);
+
+void* TreeRemove(Tree* aTree, void* content);
+
+void* TreeRemoveKey(Tree* aTree, void* key);
+void* TreeRemoveKeyIndex(Tree* aTree, void* key, int index);
+
+void* TreeRemoveNodeIndex(Tree* aTree, Node* aNode, int index);
+
+void TreeFree(Tree* aTree);
+
+Node* TreeFind(Tree* aTree, void* key);
+Node* TreeFindIndex(Tree* aTree, void* key, int index);
+
+Node* TreeNextElement(Tree* aTree, Node* curnode);
+
+int TreeIntCompare(void* a, void* b, int);
+int TreePtrCompare(void* a, void* b, int);
+int TreeStringCompare(void* a, void* b, int);
+
+#endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a8703b5c/thirdparty/paho.mqtt.c/src/VersionInfo.h.in
----------------------------------------------------------------------
diff --git a/thirdparty/paho.mqtt.c/src/VersionInfo.h.in 
b/thirdparty/paho.mqtt.c/src/VersionInfo.h.in
new file mode 100644
index 0000000..5b91bf3
--- /dev/null
+++ b/thirdparty/paho.mqtt.c/src/VersionInfo.h.in
@@ -0,0 +1,7 @@
+#ifndef VERSIONINFO_H
+#define VERSIONINFO_H
+
+#define BUILD_TIMESTAMP "@BUILD_TIMESTAMP@"
+#define CLIENT_VERSION  "@CLIENT_VERSION@"
+
+#endif /* VERSIONINFO_H */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a8703b5c/thirdparty/paho.mqtt.c/src/samples/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/thirdparty/paho.mqtt.c/src/samples/CMakeLists.txt 
b/thirdparty/paho.mqtt.c/src/samples/CMakeLists.txt
new file mode 100644
index 0000000..79ea886
--- /dev/null
+++ b/thirdparty/paho.mqtt.c/src/samples/CMakeLists.txt
@@ -0,0 +1,65 @@
+#*******************************************************************************
+#  Copyright (c) 2015, 2017 logi.cals GmbH and others
+#
+#  All rights reserved. This program and the accompanying materials
+#  are made available under the terms of the Eclipse Public License v1.0
+#  and Eclipse Distribution License v1.0 which accompany this distribution.
+#
+#  The Eclipse Public License is available at
+#     http://www.eclipse.org/legal/epl-v10.html
+#  and the Eclipse Distribution License is available at
+#    http://www.eclipse.org/org/documents/edl-v10.php.
+#
+#  Contributors:
+#     Rainer Poisel - initial version
+#     Ian Craggs - update sample names
+#*******************************************************************************/
+
+# Note: on OS X you should install XCode and the associated command-line tools
+
+## compilation/linkage settings
+INCLUDE_DIRECTORIES(
+    .
+    ${CMAKE_SOURCE_DIR}/src
+    ${CMAKE_BINARY_DIR}
+    )
+
+IF (WIN32)
+       ADD_DEFINITIONS(/DCMAKE_BUILD /D_CRT_SECURE_NO_DEPRECATE)
+ENDIF()
+
+# sample files c
+ADD_EXECUTABLE(paho_c_pub paho_c_pub.c)
+ADD_EXECUTABLE(paho_c_sub paho_c_sub.c)
+ADD_EXECUTABLE(paho_cs_pub paho_cs_pub.c)
+ADD_EXECUTABLE(paho_cs_sub paho_cs_sub.c)
+
+TARGET_LINK_LIBRARIES(paho_c_pub paho-mqtt3a)
+TARGET_LINK_LIBRARIES(paho_c_sub paho-mqtt3a)
+TARGET_LINK_LIBRARIES(paho_cs_pub paho-mqtt3c)
+TARGET_LINK_LIBRARIES(paho_cs_sub paho-mqtt3c)
+
+ADD_EXECUTABLE(MQTTAsync_subscribe MQTTAsync_subscribe.c)
+ADD_EXECUTABLE(MQTTAsync_publish MQTTAsync_publish.c)
+ADD_EXECUTABLE(MQTTClient_subscribe MQTTClient_subscribe.c)
+ADD_EXECUTABLE(MQTTClient_publish MQTTClient_publish.c)
+ADD_EXECUTABLE(MQTTClient_publish_async MQTTClient_publish_async.c)
+
+TARGET_LINK_LIBRARIES(MQTTAsync_subscribe paho-mqtt3a)
+TARGET_LINK_LIBRARIES(MQTTAsync_publish paho-mqtt3a)
+TARGET_LINK_LIBRARIES(MQTTClient_subscribe paho-mqtt3c)
+TARGET_LINK_LIBRARIES(MQTTClient_publish paho-mqtt3c)
+TARGET_LINK_LIBRARIES(MQTTClient_publish_async paho-mqtt3c)
+
+INSTALL(TARGETS paho_c_sub
+                paho_c_pub
+                paho_cs_sub
+                paho_cs_pub
+                MQTTAsync_subscribe
+                MQTTAsync_publish
+                MQTTClient_subscribe
+                MQTTClient_publish
+                MQTTClient_publish_async
+
+    RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR}
+    LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR})

Reply via email to