http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a8703b5c/thirdparty/paho.mqtt.c/src/MQTTPacket.c
----------------------------------------------------------------------
diff --git a/thirdparty/paho.mqtt.c/src/MQTTPacket.c 
b/thirdparty/paho.mqtt.c/src/MQTTPacket.c
new file mode 100644
index 0000000..c21a432
--- /dev/null
+++ b/thirdparty/paho.mqtt.c/src/MQTTPacket.c
@@ -0,0 +1,755 @@
+/*******************************************************************************
+ * Copyright (c) 2009, 2014 IBM Corp.
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * and Eclipse Distribution License v1.0 which accompany this distribution. 
+ *
+ * The Eclipse Public License is available at 
+ *    http://www.eclipse.org/legal/epl-v10.html
+ * and the Eclipse Distribution License is available at 
+ *   http://www.eclipse.org/org/documents/edl-v10.php.
+ *
+ * Contributors:
+ *    Ian Craggs - initial API and implementation and/or initial documentation
+ *    Ian Craggs, Allan Stockdill-Mander - SSL updates
+ *    Ian Craggs - MQTT 3.1.1 support
+ 
*******************************************************************************/
+
+/**
+ * @file
+ * \brief functions to deal with reading and writing of MQTT packets from and 
to sockets
+ *
+ * Some other related functions are in the MQTTPacketOut module
+ */
+
+#include "MQTTPacket.h"
+#include "Log.h"
+#if !defined(NO_PERSISTENCE)
+       #include "MQTTPersistence.h"
+#endif
+#include "Messages.h"
+#include "StackTrace.h"
+
+#include <stdlib.h>
+#include <string.h>
+
+#include "Heap.h"
+
+#if !defined(min)
+#define min(A,B) ( (A) < (B) ? (A):(B))
+#endif
+
+/**
+ * List of the predefined MQTT v3 packet names.
+ */
+static const char *packet_names[] =
+{
+       "RESERVED", "CONNECT", "CONNACK", "PUBLISH", "PUBACK", "PUBREC", 
"PUBREL",
+       "PUBCOMP", "SUBSCRIBE", "SUBACK", "UNSUBSCRIBE", "UNSUBACK",
+       "PINGREQ", "PINGRESP", "DISCONNECT"
+};
+
+const char** MQTTClient_packet_names = packet_names;
+
+
+/**
+ * Converts an MQTT packet code into its name
+ * @param ptype packet code
+ * @return the corresponding string, or "UNKNOWN"
+ */
+const char* MQTTPacket_name(int ptype)
+{
+       return (ptype >= 0 && ptype <= DISCONNECT) ? packet_names[ptype] : 
"UNKNOWN";
+}
+
+/**
+ * Array of functions to build packets, indexed according to packet code
+ */
+pf new_packets[] =
+{
+       NULL,   /**< reserved */
+       NULL,   /**< MQTTPacket_connect*/
+       MQTTPacket_connack, /**< CONNACK */
+       MQTTPacket_publish,     /**< PUBLISH */
+       MQTTPacket_ack, /**< PUBACK */
+       MQTTPacket_ack, /**< PUBREC */
+       MQTTPacket_ack, /**< PUBREL */
+       MQTTPacket_ack, /**< PUBCOMP */
+       NULL, /**< MQTTPacket_subscribe*/
+       MQTTPacket_suback, /**< SUBACK */
+       NULL, /**< MQTTPacket_unsubscribe*/
+       MQTTPacket_ack, /**< UNSUBACK */
+       MQTTPacket_header_only, /**< PINGREQ */
+       MQTTPacket_header_only, /**< PINGRESP */
+       MQTTPacket_header_only  /**< DISCONNECT */
+};
+
+
+static char* readUTFlen(char** pptr, char* enddata, int* len);
+static int MQTTPacket_send_ack(int type, int msgid, int dup, networkHandles 
*net);
+
+/**
+ * Reads one MQTT packet from a socket.
+ * @param socket a socket from which to read an MQTT packet
+ * @param error pointer to the error code which is completed if no packet is 
returned
+ * @return the packet structure or NULL if there was an error
+ */
+void* MQTTPacket_Factory(networkHandles* net, int* error)
+{
+       char* data = NULL;
+       static Header header;
+       size_t remaining_length;
+       int ptype;
+       void* pack = NULL;
+       size_t actual_len = 0;
+
+       FUNC_ENTRY;
+       *error = SOCKET_ERROR;  /* indicate whether an error occurred, or not */
+
+       /* read the packet data from the socket */
+#if defined(OPENSSL)
+       *error = (net->ssl) ? SSLSocket_getch(net->ssl, net->socket, 
&header.byte) : Socket_getch(net->socket, &header.byte); 
+#else
+       *error = Socket_getch(net->socket, &header.byte);
+#endif
+       if (*error != TCPSOCKET_COMPLETE)   /* first byte is the header byte */
+               goto exit; /* packet not read, *error indicates whether 
SOCKET_ERROR occurred */
+
+       /* now read the remaining length, so we know how much more to read */
+       if ((*error = MQTTPacket_decode(net, &remaining_length)) != 
TCPSOCKET_COMPLETE)
+               goto exit; /* packet not read, *error indicates whether 
SOCKET_ERROR occurred */
+
+       /* now read the rest, the variable header and payload */
+#if defined(OPENSSL)
+       data = (net->ssl) ? SSLSocket_getdata(net->ssl, net->socket, 
remaining_length, &actual_len) : 
+                                               Socket_getdata(net->socket, 
remaining_length, &actual_len);
+#else
+       data = Socket_getdata(net->socket, remaining_length, &actual_len);
+#endif
+       if (data == NULL)
+       {
+               *error = SOCKET_ERROR;
+               goto exit; /* socket error */
+       }
+
+       if (actual_len != remaining_length)
+               *error = TCPSOCKET_INTERRUPTED;
+       else
+       {
+               ptype = header.bits.type;
+               if (ptype < CONNECT || ptype > DISCONNECT || new_packets[ptype] 
== NULL)
+                       Log(TRACE_MIN, 2, NULL, ptype);
+               else
+               {
+                       if ((pack = (*new_packets[ptype])(header.byte, data, 
remaining_length)) == NULL)
+                               *error = BAD_MQTT_PACKET;
+#if !defined(NO_PERSISTENCE)
+                       else if (header.bits.type == PUBLISH && header.bits.qos 
== 2)
+                       {
+                               int buf0len;
+                               char *buf = malloc(10);
+                               buf[0] = header.byte;
+                               buf0len = 1 + MQTTPacket_encode(&buf[1], 
remaining_length);
+                               *error = MQTTPersistence_put(net->socket, buf, 
buf0len, 1,
+                                       &data, &remaining_length, 
header.bits.type, ((Publish *)pack)->msgId, 1);
+                               free(buf);
+                       }
+#endif
+               }
+       }
+       if (pack)
+               time(&(net->lastReceived));
+exit:
+       FUNC_EXIT_RC(*error);
+       return pack;
+}
+
+
+/**
+ * Sends an MQTT packet in one system call write
+ * @param socket the socket to which to write the data
+ * @param header the one-byte MQTT header
+ * @param buffer the rest of the buffer to write (not including remaining 
length)
+ * @param buflen the length of the data in buffer to be written
+ * @return the completion code (TCPSOCKET_COMPLETE etc)
+ */
+int MQTTPacket_send(networkHandles* net, Header header, char* buffer, size_t 
buflen, int freeData)
+{
+       int rc;
+       size_t buf0len;
+       char *buf;
+
+       FUNC_ENTRY;
+       buf = malloc(10);
+       buf[0] = header.byte;
+       buf0len = 1 + MQTTPacket_encode(&buf[1], buflen);
+#if !defined(NO_PERSISTENCE)
+       if (header.bits.type == PUBREL)
+       {
+               char* ptraux = buffer;
+               int msgId = readInt(&ptraux);
+               rc = MQTTPersistence_put(net->socket, buf, buf0len, 1, &buffer, 
&buflen,
+                       header.bits.type, msgId, 0);
+       }
+#endif
+
+#if defined(OPENSSL)
+       if (net->ssl)
+               rc = SSLSocket_putdatas(net->ssl, net->socket, buf, buf0len, 1, 
&buffer, &buflen, &freeData);
+       else
+#endif
+               rc = Socket_putdatas(net->socket, buf, buf0len, 1, &buffer, 
&buflen, &freeData);
+               
+       if (rc == TCPSOCKET_COMPLETE)
+               time(&(net->lastSent));
+       
+       if (rc != TCPSOCKET_INTERRUPTED)
+         free(buf);
+
+       FUNC_EXIT_RC(rc);
+       return rc;
+}
+
+
+/**
+ * Sends an MQTT packet from multiple buffers in one system call write
+ * @param socket the socket to which to write the data
+ * @param header the one-byte MQTT header
+ * @param count the number of buffers
+ * @param buffers the rest of the buffers to write (not including remaining 
length)
+ * @param buflens the lengths of the data in the array of buffers to be written
+ * @return the completion code (TCPSOCKET_COMPLETE etc)
+ */
+int MQTTPacket_sends(networkHandles* net, Header header, int count, char** 
buffers, size_t* buflens, int* frees)
+{
+       int i, rc;
+       size_t buf0len, total = 0;
+       char *buf;
+
+       FUNC_ENTRY;
+       buf = malloc(10);
+       buf[0] = header.byte;
+       for (i = 0; i < count; i++)
+               total += buflens[i];
+       buf0len = 1 + MQTTPacket_encode(&buf[1], total);
+#if !defined(NO_PERSISTENCE)
+       if (header.bits.type == PUBLISH && header.bits.qos != 0)
+       {   /* persist PUBLISH QoS1 and Qo2 */
+               char *ptraux = buffers[2];
+               int msgId = readInt(&ptraux);
+               rc = MQTTPersistence_put(net->socket, buf, buf0len, count, 
buffers, buflens,
+                       header.bits.type, msgId, 0);
+       }
+#endif
+#if defined(OPENSSL)
+       if (net->ssl)
+               rc = SSLSocket_putdatas(net->ssl, net->socket, buf, buf0len, 
count, buffers, buflens, frees);
+       else
+#endif
+               rc = Socket_putdatas(net->socket, buf, buf0len, count, buffers, 
buflens, frees);
+               
+       if (rc == TCPSOCKET_COMPLETE)
+               time(&(net->lastSent));
+       
+       if (rc != TCPSOCKET_INTERRUPTED)
+         free(buf);
+       FUNC_EXIT_RC(rc);
+       return rc;
+}
+
+
+/**
+ * Encodes the message length according to the MQTT algorithm
+ * @param buf the buffer into which the encoded data is written
+ * @param length the length to be encoded
+ * @return the number of bytes written to buffer
+ */
+int MQTTPacket_encode(char* buf, size_t length)
+{
+       int rc = 0;
+
+       FUNC_ENTRY;
+       do
+       {
+               char d = length % 128;
+               length /= 128;
+               /* if there are more digits to encode, set the top bit of this 
digit */
+               if (length > 0)
+                       d |= 0x80;
+               buf[rc++] = d;
+       } while (length > 0);
+       FUNC_EXIT_RC(rc);
+       return rc;
+}
+
+
+/**
+ * Decodes the message length according to the MQTT algorithm
+ * @param socket the socket from which to read the bytes
+ * @param value the decoded length returned
+ * @return the number of bytes read from the socket
+ */
+int MQTTPacket_decode(networkHandles* net, size_t* value)
+{
+       int rc = SOCKET_ERROR;
+       char c;
+       int multiplier = 1;
+       int len = 0;
+#define MAX_NO_OF_REMAINING_LENGTH_BYTES 4
+
+       FUNC_ENTRY;
+       *value = 0;
+       do
+       {
+               if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES)
+               {
+                       rc = SOCKET_ERROR;      /* bad data */
+                       goto exit;
+               }
+#if defined(OPENSSL)
+               rc = (net->ssl) ? SSLSocket_getch(net->ssl, net->socket, &c) : 
Socket_getch(net->socket, &c);
+#else
+               rc = Socket_getch(net->socket, &c);
+#endif
+               if (rc != TCPSOCKET_COMPLETE)
+                               goto exit;
+               *value += (c & 127) * multiplier;
+               multiplier *= 128;
+       } while ((c & 128) != 0);
+exit:
+       FUNC_EXIT_RC(rc);
+       return rc;
+}
+
+
+/**
+ * Calculates an integer from two bytes read from the input buffer
+ * @param pptr pointer to the input buffer - incremented by the number of 
bytes used & returned
+ * @return the integer value calculated
+ */
+int readInt(char** pptr)
+{
+       char* ptr = *pptr;
+       int len = 256*((unsigned char)(*ptr)) + (unsigned char)(*(ptr+1));
+       *pptr += 2;
+       return len;
+}
+
+
+/**
+ * Reads a "UTF" string from the input buffer.  UTF as in the MQTT v3 spec 
which really means
+ * a length delimited string.  So it reads the two byte length then the data 
according to
+ * that length.  The end of the buffer is provided too, so we can prevent 
buffer overruns caused
+ * by an incorrect length.
+ * @param pptr pointer to the input buffer - incremented by the number of 
bytes used & returned
+ * @param enddata pointer to the end of the buffer not to be read beyond
+ * @param len returns the calculcated value of the length bytes read
+ * @return an allocated C string holding the characters read, or NULL if the 
length read would
+ * have caused an overrun.
+ *
+ */
+static char* readUTFlen(char** pptr, char* enddata, int* len)
+{
+       char* string = NULL;
+
+       FUNC_ENTRY;
+       if (enddata - (*pptr) > 1) /* enough length to read the integer? */
+       {
+               *len = readInt(pptr);
+               if (&(*pptr)[*len] <= enddata)
+               {
+                       string = malloc(*len+1);
+                       memcpy(string, *pptr, *len);
+                       string[*len] = '\0';
+                       *pptr += *len;
+               }
+       }
+       FUNC_EXIT;
+       return string;
+}
+
+
+/**
+ * Reads a "UTF" string from the input buffer.  UTF as in the MQTT v3 spec 
which really means
+ * a length delimited string.  So it reads the two byte length then the data 
according to
+ * that length.  The end of the buffer is provided too, so we can prevent 
buffer overruns caused
+ * by an incorrect length.
+ * @param pptr pointer to the input buffer - incremented by the number of 
bytes used & returned
+ * @param enddata pointer to the end of the buffer not to be read beyond
+ * @return an allocated C string holding the characters read, or NULL if the 
length read would
+ * have caused an overrun.
+ */
+char* readUTF(char** pptr, char* enddata)
+{
+       int len;
+       return readUTFlen(pptr, enddata, &len);
+}
+
+
+/**
+ * Reads one character from the input buffer.
+ * @param pptr pointer to the input buffer - incremented by the number of 
bytes used & returned
+ * @return the character read
+ */
+unsigned char readChar(char** pptr)
+{
+       unsigned char c = **pptr;
+       (*pptr)++;
+       return c;
+}
+
+
+/**
+ * Writes one character to an output buffer.
+ * @param pptr pointer to the output buffer - incremented by the number of 
bytes used & returned
+ * @param c the character to write
+ */
+void writeChar(char** pptr, char c)
+{
+       **pptr = c;
+       (*pptr)++;
+}
+
+
+/**
+ * Writes an integer as 2 bytes to an output buffer.
+ * @param pptr pointer to the output buffer - incremented by the number of 
bytes used & returned
+ * @param anInt the integer to write
+ */
+void writeInt(char** pptr, int anInt)
+{
+       **pptr = (char)(anInt / 256);
+       (*pptr)++;
+       **pptr = (char)(anInt % 256);
+       (*pptr)++;
+}
+
+
+/**
+ * Writes a "UTF" string to an output buffer.  Converts C string to 
length-delimited.
+ * @param pptr pointer to the output buffer - incremented by the number of 
bytes used & returned
+ * @param string the C string to write
+ */
+void writeUTF(char** pptr, const char* string)
+{
+       size_t len = strlen(string);
+       writeInt(pptr, (int)len);
+       memcpy(*pptr, string, len);
+       *pptr += len;
+}
+
+
+/**
+ * Writes length delimited data to an output buffer
+ * @param pptr pointer to the output buffer - incremented by the number of 
bytes used & returned
+ * @param data the data to write
+ * @param datalen the length of the data to write
+ */
+void writeData(char** pptr, const void* data, int datalen)
+{
+       writeInt(pptr, datalen);
+       memcpy(*pptr, data, datalen);
+       *pptr += datalen;
+}
+
+
+/**
+ * Function used in the new packets table to create packets which have only a 
header.
+ * @param aHeader the MQTT header byte
+ * @param data the rest of the packet
+ * @param datalen the length of the rest of the packet
+ * @return pointer to the packet structure
+ */
+void* MQTTPacket_header_only(unsigned char aHeader, char* data, size_t datalen)
+{
+       static unsigned char header = 0;
+       header = aHeader;
+       return &header;
+}
+
+
+/**
+ * Send an MQTT disconnect packet down a socket.
+ * @param socket the open socket to send the data to
+ * @return the completion code (e.g. TCPSOCKET_COMPLETE)
+ */
+int MQTTPacket_send_disconnect(networkHandles *net, const char* clientID)
+{
+       Header header;
+       int rc = 0;
+
+       FUNC_ENTRY;
+       header.byte = 0;
+       header.bits.type = DISCONNECT;
+       rc = MQTTPacket_send(net, header, NULL, 0, 0);
+       Log(LOG_PROTOCOL, 28, NULL, net->socket, clientID, rc);
+       FUNC_EXIT_RC(rc);
+       return rc;
+}
+
+
+/**
+ * Function used in the new packets table to create publish packets.
+ * @param aHeader the MQTT header byte
+ * @param data the rest of the packet
+ * @param datalen the length of the rest of the packet
+ * @return pointer to the packet structure
+ */
+void* MQTTPacket_publish(unsigned char aHeader, char* data, size_t datalen)
+{
+       Publish* pack = malloc(sizeof(Publish));
+       char* curdata = data;
+       char* enddata = &data[datalen];
+
+       FUNC_ENTRY;
+       pack->header.byte = aHeader;
+       if ((pack->topic = readUTFlen(&curdata, enddata, &pack->topiclen)) == 
NULL) /* Topic name on which to publish */
+       {
+               free(pack);
+               pack = NULL;
+               goto exit;
+       }
+       if (pack->header.bits.qos > 0)  /* Msgid only exists for QoS 1 or 2 */
+               pack->msgId = readInt(&curdata);
+       else
+               pack->msgId = 0;
+       pack->payload = curdata;
+       pack->payloadlen = (int)(datalen-(curdata-data));
+exit:
+       FUNC_EXIT;
+       return pack;
+}
+
+
+/**
+ * Free allocated storage for a publish packet.
+ * @param pack pointer to the publish packet structure
+ */
+void MQTTPacket_freePublish(Publish* pack)
+{
+       FUNC_ENTRY;
+       if (pack->topic != NULL)
+               free(pack->topic);
+       free(pack);
+       FUNC_EXIT;
+}
+
+
+/**
+ * Send an MQTT acknowledgement packet down a socket.
+ * @param type the MQTT packet type e.g. SUBACK
+ * @param msgid the MQTT message id to use
+ * @param dup boolean - whether to set the MQTT DUP flag
+ * @param net the network handle to send the data to
+ * @return the completion code (e.g. TCPSOCKET_COMPLETE)
+ */
+static int MQTTPacket_send_ack(int type, int msgid, int dup, networkHandles 
*net)
+{
+       Header header;
+       int rc;
+       char *buf = malloc(2);
+       char *ptr = buf;
+
+       FUNC_ENTRY;
+       header.byte = 0;
+       header.bits.type = type;
+       header.bits.dup = dup;
+       if (type == PUBREL)
+           header.bits.qos = 1;
+       writeInt(&ptr, msgid);
+       if ((rc = MQTTPacket_send(net, header, buf, 2, 1)) != 
TCPSOCKET_INTERRUPTED)
+               free(buf);
+       FUNC_EXIT_RC(rc);
+       return rc;
+}
+
+
+/**
+ * Send an MQTT PUBACK packet down a socket.
+ * @param msgid the MQTT message id to use
+ * @param socket the open socket to send the data to
+ * @param clientID the string client identifier, only used for tracing
+ * @return the completion code (e.g. TCPSOCKET_COMPLETE)
+ */
+int MQTTPacket_send_puback(int msgid, networkHandles* net, const char* 
clientID)
+{
+       int rc = 0;
+
+       FUNC_ENTRY;
+       rc =  MQTTPacket_send_ack(PUBACK, msgid, 0, net);
+       Log(LOG_PROTOCOL, 12, NULL, net->socket, clientID, msgid, rc);
+       FUNC_EXIT_RC(rc);
+       return rc;
+}
+
+
+/**
+ * Free allocated storage for a suback packet.
+ * @param pack pointer to the suback packet structure
+ */
+void MQTTPacket_freeSuback(Suback* pack)
+{
+       FUNC_ENTRY;
+       if (pack->qoss != NULL)
+               ListFree(pack->qoss);
+       free(pack);
+       FUNC_EXIT;
+}
+
+
+/**
+ * Send an MQTT PUBREC packet down a socket.
+ * @param msgid the MQTT message id to use
+ * @param socket the open socket to send the data to
+ * @param clientID the string client identifier, only used for tracing
+ * @return the completion code (e.g. TCPSOCKET_COMPLETE)
+ */
+int MQTTPacket_send_pubrec(int msgid, networkHandles* net, const char* 
clientID)
+{
+       int rc = 0;
+
+       FUNC_ENTRY;
+       rc =  MQTTPacket_send_ack(PUBREC, msgid, 0, net);
+       Log(LOG_PROTOCOL, 13, NULL, net->socket, clientID, msgid, rc);
+       FUNC_EXIT_RC(rc);
+       return rc;
+}
+
+
+/**
+ * Send an MQTT PUBREL packet down a socket.
+ * @param msgid the MQTT message id to use
+ * @param dup boolean - whether to set the MQTT DUP flag
+ * @param socket the open socket to send the data to
+ * @param clientID the string client identifier, only used for tracing
+ * @return the completion code (e.g. TCPSOCKET_COMPLETE)
+ */
+int MQTTPacket_send_pubrel(int msgid, int dup, networkHandles* net, const 
char* clientID)
+{
+       int rc = 0;
+
+       FUNC_ENTRY;
+       rc = MQTTPacket_send_ack(PUBREL, msgid, dup, net);
+       Log(LOG_PROTOCOL, 16, NULL, net->socket, clientID, msgid, rc);
+       FUNC_EXIT_RC(rc);
+       return rc;
+}
+
+
+/**
+ * Send an MQTT PUBCOMP packet down a socket.
+ * @param msgid the MQTT message id to use
+ * @param socket the open socket to send the data to
+ * @param clientID the string client identifier, only used for tracing
+ * @return the completion code (e.g. TCPSOCKET_COMPLETE)
+ */
+int MQTTPacket_send_pubcomp(int msgid, networkHandles* net, const char* 
clientID)
+{
+       int rc = 0;
+
+       FUNC_ENTRY;
+       rc = MQTTPacket_send_ack(PUBCOMP, msgid, 0, net);
+       Log(LOG_PROTOCOL, 18, NULL, net->socket, clientID, msgid, rc);
+       FUNC_EXIT_RC(rc);
+       return rc;
+}
+
+
+/**
+ * Function used in the new packets table to create acknowledgement packets.
+ * @param aHeader the MQTT header byte
+ * @param data the rest of the packet
+ * @param datalen the length of the rest of the packet
+ * @return pointer to the packet structure
+ */
+void* MQTTPacket_ack(unsigned char aHeader, char* data, size_t datalen)
+{
+       Ack* pack = malloc(sizeof(Ack));
+       char* curdata = data;
+
+       FUNC_ENTRY;
+       pack->header.byte = aHeader;
+       pack->msgId = readInt(&curdata);
+       FUNC_EXIT;
+       return pack;
+}
+
+
+/**
+ * Send an MQTT PUBLISH packet down a socket.
+ * @param pack a structure from which to get some values to use, e.g topic, 
payload
+ * @param dup boolean - whether to set the MQTT DUP flag
+ * @param qos the value to use for the MQTT QoS setting
+ * @param retained boolean - whether to set the MQTT retained flag
+ * @param socket the open socket to send the data to
+ * @param clientID the string client identifier, only used for tracing
+ * @return the completion code (e.g. TCPSOCKET_COMPLETE)
+ */
+int MQTTPacket_send_publish(Publish* pack, int dup, int qos, int retained, 
networkHandles* net, const char* clientID)
+{
+       Header header;
+       char *topiclen;
+       int rc = -1;
+
+       FUNC_ENTRY;
+       topiclen = malloc(2);
+
+       header.bits.type = PUBLISH;
+       header.bits.dup = dup;
+       header.bits.qos = qos;
+       header.bits.retain = retained;
+       if (qos > 0)
+       {
+               char *buf = malloc(2);
+               char *ptr = buf;
+               char* bufs[4] = {topiclen, pack->topic, buf, pack->payload};
+               size_t lens[4] = {2, strlen(pack->topic), 2, pack->payloadlen};
+               int frees[4] = {1, 0, 1, 0};
+
+               writeInt(&ptr, pack->msgId);
+               ptr = topiclen;
+               writeInt(&ptr, (int)lens[1]);
+               rc = MQTTPacket_sends(net, header, 4, bufs, lens, frees);
+               if (rc != TCPSOCKET_INTERRUPTED)
+                       free(buf);
+       }
+       else
+       {
+               char* ptr = topiclen;
+               char* bufs[3] = {topiclen, pack->topic, pack->payload};
+               size_t lens[3] = {2, strlen(pack->topic), pack->payloadlen};
+               int frees[3] = {1, 0, 0};
+
+               writeInt(&ptr, (int)lens[1]);
+               rc = MQTTPacket_sends(net, header, 3, bufs, lens, frees);
+       }
+       if (rc != TCPSOCKET_INTERRUPTED)
+               free(topiclen);
+       if (qos == 0)
+               Log(LOG_PROTOCOL, 27, NULL, net->socket, clientID, retained, 
rc);
+       else
+               Log(LOG_PROTOCOL, 10, NULL, net->socket, clientID, pack->msgId, 
qos, retained, rc,
+                               min(20, pack->payloadlen), pack->payload);
+       FUNC_EXIT_RC(rc);
+       return rc;
+}
+
+
+/**
+ * Free allocated storage for a various packet tyoes
+ * @param pack pointer to the suback packet structure
+ */
+void MQTTPacket_free_packet(MQTTPacket* pack)
+{
+       FUNC_ENTRY;
+       if (pack->header.bits.type == PUBLISH)
+               MQTTPacket_freePublish((Publish*)pack);
+       /*else if (pack->header.type == SUBSCRIBE)
+               MQTTPacket_freeSubscribe((Subscribe*)pack, 1);
+       else if (pack->header.type == UNSUBSCRIBE)
+               MQTTPacket_freeUnsubscribe((Unsubscribe*)pack);*/
+       else
+               free(pack);
+       FUNC_EXIT;
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a8703b5c/thirdparty/paho.mqtt.c/src/MQTTPacket.h
----------------------------------------------------------------------
diff --git a/thirdparty/paho.mqtt.c/src/MQTTPacket.h 
b/thirdparty/paho.mqtt.c/src/MQTTPacket.h
new file mode 100644
index 0000000..8bad955
--- /dev/null
+++ b/thirdparty/paho.mqtt.c/src/MQTTPacket.h
@@ -0,0 +1,262 @@
+/*******************************************************************************
+ * Copyright (c) 2009, 2017 IBM Corp.
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * and Eclipse Distribution License v1.0 which accompany this distribution. 
+ *
+ * The Eclipse Public License is available at 
+ *    http://www.eclipse.org/legal/epl-v10.html
+ * and the Eclipse Distribution License is available at 
+ *   http://www.eclipse.org/org/documents/edl-v10.php.
+ *
+ * Contributors:
+ *    Ian Craggs - initial API and implementation and/or initial documentation
+ *    Ian Craggs, Allan Stockdill-Mander - SSL updates
+ *    Ian Craggs - MQTT 3.1.1 support
+ *    Ian Craggs - big endian Linux reversed definition
+ 
*******************************************************************************/
+
+#if !defined(MQTTPACKET_H)
+#define MQTTPACKET_H
+
+#include "Socket.h"
+#if defined(OPENSSL)
+#include "SSLSocket.h"
+#endif
+#include "LinkedList.h"
+#include "Clients.h"
+
+/*BE
+include "Socket"
+include "LinkedList"
+include "Clients"
+BE*/
+
+typedef unsigned int bool;
+typedef void* (*pf)(unsigned char, char*, size_t);
+
+#define BAD_MQTT_PACKET -4
+
+enum msgTypes
+{
+       CONNECT = 1, CONNACK, PUBLISH, PUBACK, PUBREC, PUBREL,
+       PUBCOMP, SUBSCRIBE, SUBACK, UNSUBSCRIBE, UNSUBACK,
+       PINGREQ, PINGRESP, DISCONNECT
+};
+
+#if defined(__linux__)
+#include <endian.h>
+#if __BYTE_ORDER == __BIG_ENDIAN
+       #define REVERSED 1
+#endif
+#endif
+
+/**
+ * Bitfields for the MQTT header byte.
+ */
+typedef union
+{
+       /*unsigned*/ char byte; /**< the whole byte */
+#if defined(REVERSED)
+       struct
+       {
+               unsigned int type : 4;  /**< message type nibble */
+               bool dup : 1;                   /**< DUP flag bit */
+               unsigned int qos : 2;   /**< QoS value, 0, 1 or 2 */
+               bool retain : 1;                /**< retained flag bit */
+       } bits;
+#else
+       struct
+       {
+               bool retain : 1;                /**< retained flag bit */
+               unsigned int qos : 2;   /**< QoS value, 0, 1 or 2 */
+               bool dup : 1;                   /**< DUP flag bit */
+               unsigned int type : 4;  /**< message type nibble */
+       } bits;
+#endif
+} Header;
+
+
+/**
+ * Data for a connect packet.
+ */
+typedef struct
+{
+       Header header;  /**< MQTT header byte */
+       union
+       {
+               unsigned char all;      /**< all connect flags */
+#if defined(REVERSED)
+               struct
+               {
+                       bool username : 1;                      /**< 3.1 user 
name */
+                       bool password : 1;                      /**< 3.1 
password */
+                       bool willRetain : 1;            /**< will retain 
setting */
+                       unsigned int willQoS : 2;       /**< will QoS value */
+                       bool will : 1;                  /**< will flag */
+                       bool cleanstart : 1;    /**< cleansession flag */
+                       int : 1;        /**< unused */
+               } bits;
+#else
+               struct
+               {
+                       int : 1;        /**< unused */
+                       bool cleanstart : 1;    /**< cleansession flag */
+                       bool will : 1;                  /**< will flag */
+                       unsigned int willQoS : 2;       /**< will QoS value */
+                       bool willRetain : 1;            /**< will retain 
setting */
+                       bool password : 1;                      /**< 3.1 
password */
+                       bool username : 1;                      /**< 3.1 user 
name */
+               } bits;
+#endif
+       } flags;        /**< connect flags byte */
+
+       char *Protocol, /**< MQTT protocol name */
+               *clientID,      /**< string client id */
+        *willTopic,    /**< will topic */
+        *willMsg;      /**< will payload */
+
+       int keepAliveTimer;             /**< keepalive timeout value in seconds 
*/
+       unsigned char version;  /**< MQTT version number */
+} Connect;
+
+
+/**
+ * Data for a connack packet.
+ */
+typedef struct
+{
+       Header header; /**< MQTT header byte */
+       union
+       {
+               unsigned char all;      /**< all connack flags */
+#if defined(REVERSED)
+               struct
+               {
+                       unsigned int reserved : 7;      /**< message type 
nibble */
+                       bool sessionPresent : 1;    /**< was a session found on 
the server? */
+               } bits;
+#else
+               struct
+               {
+                       bool sessionPresent : 1;    /**< was a session found on 
the server? */
+                       unsigned int reserved : 7;      /**< message type 
nibble */
+               } bits;
+#endif
+       } flags;         /**< connack flags byte */
+       char rc; /**< connack return code */
+} Connack;
+
+
+/**
+ * Data for a packet with header only.
+ */
+typedef struct
+{
+       Header header;  /**< MQTT header byte */
+} MQTTPacket;
+
+
+/**
+ * Data for a subscribe packet.
+ */
+typedef struct
+{
+       Header header;  /**< MQTT header byte */
+       int msgId;              /**< MQTT message id */
+       List* topics;   /**< list of topic strings */
+       List* qoss;             /**< list of corresponding QoSs */
+       int noTopics;   /**< topic and qos count */
+} Subscribe;
+
+
+/**
+ * Data for a suback packet.
+ */
+typedef struct
+{
+       Header header;  /**< MQTT header byte */
+       int msgId;              /**< MQTT message id */
+       List* qoss;             /**< list of granted QoSs */
+} Suback;
+
+
+/**
+ * Data for an unsubscribe packet.
+ */
+typedef struct
+{
+       Header header;  /**< MQTT header byte */
+       int msgId;              /**< MQTT message id */
+       List* topics;   /**< list of topic strings */
+       int noTopics;   /**< topic count */
+} Unsubscribe;
+
+
+/**
+ * Data for a publish packet.
+ */
+typedef struct
+{
+       Header header;  /**< MQTT header byte */
+       char* topic;    /**< topic string */
+       int topiclen;
+       int msgId;              /**< MQTT message id */
+       char* payload;  /**< binary payload, length delimited */
+       int payloadlen; /**< payload length */
+} Publish;
+
+
+/**
+ * Data for one of the ack packets.
+ */
+typedef struct
+{
+       Header header;  /**< MQTT header byte */
+       int msgId;              /**< MQTT message id */
+} Ack;
+
+typedef Ack Puback;
+typedef Ack Pubrec;
+typedef Ack Pubrel;
+typedef Ack Pubcomp;
+typedef Ack Unsuback;
+
+int MQTTPacket_encode(char* buf, size_t length);
+int MQTTPacket_decode(networkHandles* net, size_t* value);
+int readInt(char** pptr);
+char* readUTF(char** pptr, char* enddata);
+unsigned char readChar(char** pptr);
+void writeChar(char** pptr, char c);
+void writeInt(char** pptr, int anInt);
+void writeUTF(char** pptr, const char* string);
+void writeData(char** pptr, const void* data, int datalen);
+
+const char* MQTTPacket_name(int ptype);
+
+void* MQTTPacket_Factory(networkHandles* net, int* error);
+int MQTTPacket_send(networkHandles* net, Header header, char* buffer, size_t 
buflen, int free);
+int MQTTPacket_sends(networkHandles* net, Header header, int count, char** 
buffers, size_t* buflens, int* frees);
+
+void* MQTTPacket_header_only(unsigned char aHeader, char* data, size_t 
datalen);
+int MQTTPacket_send_disconnect(networkHandles* net, const char* clientID);
+
+void* MQTTPacket_publish(unsigned char aHeader, char* data, size_t datalen);
+void MQTTPacket_freePublish(Publish* pack);
+int MQTTPacket_send_publish(Publish* pack, int dup, int qos, int retained, 
networkHandles* net, const char* clientID);
+int MQTTPacket_send_puback(int msgid, networkHandles* net, const char* 
clientID);
+void* MQTTPacket_ack(unsigned char aHeader, char* data, size_t datalen);
+
+void MQTTPacket_freeSuback(Suback* pack);
+int MQTTPacket_send_pubrec(int msgid, networkHandles* net, const char* 
clientID);
+int MQTTPacket_send_pubrel(int msgid, int dup, networkHandles* net, const 
char* clientID);
+int MQTTPacket_send_pubcomp(int msgid, networkHandles* net, const char* 
clientID);
+
+void MQTTPacket_free_packet(MQTTPacket* pack);
+
+#if !defined(NO_BRIDGE)
+       #include "MQTTPacketOut.h"
+#endif
+
+#endif /* MQTTPACKET_H */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a8703b5c/thirdparty/paho.mqtt.c/src/MQTTPacketOut.c
----------------------------------------------------------------------
diff --git a/thirdparty/paho.mqtt.c/src/MQTTPacketOut.c 
b/thirdparty/paho.mqtt.c/src/MQTTPacketOut.c
new file mode 100644
index 0000000..b924085
--- /dev/null
+++ b/thirdparty/paho.mqtt.c/src/MQTTPacketOut.c
@@ -0,0 +1,269 @@
+/*******************************************************************************
+ * Copyright (c) 2009, 2017 IBM Corp.
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * and Eclipse Distribution License v1.0 which accompany this distribution. 
+ *
+ * The Eclipse Public License is available at 
+ *    http://www.eclipse.org/legal/epl-v10.html
+ * and the Eclipse Distribution License is available at 
+ *   http://www.eclipse.org/org/documents/edl-v10.php.
+ *
+ * Contributors:
+ *    Ian Craggs - initial API and implementation and/or initial documentation
+ *    Ian Craggs, Allan Stockdill-Mander - SSL updates
+ *    Ian Craggs - MQTT 3.1.1 support
+ *    Rong Xiang, Ian Craggs - C++ compatibility
+ *    Ian Craggs - binary password and will payload
+ 
*******************************************************************************/
+
+/**
+ * @file
+ * \brief functions to deal with reading and writing of MQTT packets from and 
to sockets
+ *
+ * Some other related functions are in the MQTTPacket module
+ */
+
+
+#include "MQTTPacketOut.h"
+#include "Log.h"
+#include "StackTrace.h"
+
+#include <string.h>
+#include <stdlib.h>
+
+#include "Heap.h"
+
+
+/**
+ * Send an MQTT CONNECT packet down a socket.
+ * @param client a structure from which to get all the required values
+ * @param MQTTVersion the MQTT version to connect with
+ * @return the completion code (e.g. TCPSOCKET_COMPLETE)
+ */
+int MQTTPacket_send_connect(Clients* client, int MQTTVersion)
+{
+       char *buf, *ptr;
+       Connect packet;
+       int rc = -1, len;
+
+       FUNC_ENTRY;
+       packet.header.byte = 0;
+       packet.header.bits.type = CONNECT;
+
+       len = ((MQTTVersion == 3) ? 12 : 10) + (int)strlen(client->clientID)+2;
+       if (client->will)
+               len += (int)strlen(client->will->topic)+2 + 
client->will->payloadlen+2;
+       if (client->username)
+               len += (int)strlen(client->username)+2;
+       if (client->password)
+               len += client->passwordlen+2;
+
+       ptr = buf = malloc(len);
+       if (MQTTVersion == 3)
+       {
+               writeUTF(&ptr, "MQIsdp");
+               writeChar(&ptr, (char)3);
+       }
+       else if (MQTTVersion == 4)
+       {
+               writeUTF(&ptr, "MQTT");
+               writeChar(&ptr, (char)4);
+       }
+       else
+               goto exit;
+
+       packet.flags.all = 0;
+       packet.flags.bits.cleanstart = client->cleansession;
+       packet.flags.bits.will = (client->will) ? 1 : 0;
+       if (packet.flags.bits.will)
+       {
+               packet.flags.bits.willQoS = client->will->qos;
+               packet.flags.bits.willRetain = client->will->retained;
+       }
+
+       if (client->username)
+               packet.flags.bits.username = 1;
+       if (client->password)
+               packet.flags.bits.password = 1;
+
+       writeChar(&ptr, packet.flags.all);
+       writeInt(&ptr, client->keepAliveInterval);
+       writeUTF(&ptr, client->clientID);
+       if (client->will)
+       {
+               writeUTF(&ptr, client->will->topic);
+               writeData(&ptr, client->will->payload, 
client->will->payloadlen);
+       }
+       if (client->username)
+               writeUTF(&ptr, client->username);
+       if (client->password)
+               writeData(&ptr, client->password, client->passwordlen);
+
+       rc = MQTTPacket_send(&client->net, packet.header, buf, len, 1);
+       Log(LOG_PROTOCOL, 0, NULL, client->net.socket, client->clientID, 
client->cleansession, rc);
+exit:
+       if (rc != TCPSOCKET_INTERRUPTED)
+               free(buf);
+       FUNC_EXIT_RC(rc);
+       return rc;
+}
+
+
+/**
+ * Function used in the new packets table to create connack packets.
+ * @param aHeader the MQTT header byte
+ * @param data the rest of the packet
+ * @param datalen the length of the rest of the packet
+ * @return pointer to the packet structure
+ */
+void* MQTTPacket_connack(unsigned char aHeader, char* data, size_t datalen)
+{
+       Connack* pack = malloc(sizeof(Connack));
+       char* curdata = data;
+
+       FUNC_ENTRY;
+       pack->header.byte = aHeader;
+       pack->flags.all = readChar(&curdata);
+       pack->rc = readChar(&curdata);
+       FUNC_EXIT;
+       return pack;
+}
+
+
+/**
+ * Send an MQTT PINGREQ packet down a socket.
+ * @param socket the open socket to send the data to
+ * @param clientID the string client identifier, only used for tracing
+ * @return the completion code (e.g. TCPSOCKET_COMPLETE)
+ */
+int MQTTPacket_send_pingreq(networkHandles* net, const char* clientID)
+{
+       Header header;
+       int rc = 0;
+       size_t buflen = 0;
+
+       FUNC_ENTRY;
+       header.byte = 0;
+       header.bits.type = PINGREQ;
+       rc = MQTTPacket_send(net, header, NULL, buflen,0);
+       Log(LOG_PROTOCOL, 20, NULL, net->socket, clientID, rc);
+       FUNC_EXIT_RC(rc);
+       return rc;
+}
+
+
+/**
+ * Send an MQTT subscribe packet down a socket.
+ * @param topics list of topics
+ * @param qoss list of corresponding QoSs
+ * @param msgid the MQTT message id to use
+ * @param dup boolean - whether to set the MQTT DUP flag
+ * @param socket the open socket to send the data to
+ * @param clientID the string client identifier, only used for tracing
+ * @return the completion code (e.g. TCPSOCKET_COMPLETE)
+ */
+int MQTTPacket_send_subscribe(List* topics, List* qoss, int msgid, int dup, 
networkHandles* net, const char* clientID)
+{
+       Header header;
+       char *data, *ptr;
+       int rc = -1;
+       ListElement *elem = NULL, *qosElem = NULL;
+       int datalen;
+
+       FUNC_ENTRY;
+       header.bits.type = SUBSCRIBE;
+       header.bits.dup = dup;
+       header.bits.qos = 1;
+       header.bits.retain = 0;
+
+       datalen = 2 + topics->count * 3; /* utf length + char qos == 3 */
+       while (ListNextElement(topics, &elem))
+               datalen += (int)strlen((char*)(elem->content));
+       ptr = data = malloc(datalen);
+
+       writeInt(&ptr, msgid);
+       elem = NULL;
+       while (ListNextElement(topics, &elem))
+       {
+               ListNextElement(qoss, &qosElem);
+               writeUTF(&ptr, (char*)(elem->content));
+               writeChar(&ptr, *(int*)(qosElem->content));
+       }
+       rc = MQTTPacket_send(net, header, data, datalen, 1);
+       Log(LOG_PROTOCOL, 22, NULL, net->socket, clientID, msgid, rc);
+       if (rc != TCPSOCKET_INTERRUPTED)
+               free(data);
+       FUNC_EXIT_RC(rc);
+       return rc;
+}
+
+
+/**
+ * Function used in the new packets table to create suback packets.
+ * @param aHeader the MQTT header byte
+ * @param data the rest of the packet
+ * @param datalen the length of the rest of the packet
+ * @return pointer to the packet structure
+ */
+void* MQTTPacket_suback(unsigned char aHeader, char* data, size_t datalen)
+{
+       Suback* pack = malloc(sizeof(Suback));
+       char* curdata = data;
+
+       FUNC_ENTRY;
+       pack->header.byte = aHeader;
+       pack->msgId = readInt(&curdata);
+       pack->qoss = ListInitialize();
+       while ((size_t)(curdata - data) < datalen)
+       {
+               int* newint;
+               newint = malloc(sizeof(int));
+               *newint = (int)readChar(&curdata);
+               ListAppend(pack->qoss, newint, sizeof(int));
+       }
+       FUNC_EXIT;
+       return pack;
+}
+
+
+/**
+ * Send an MQTT unsubscribe packet down a socket.
+ * @param topics list of topics
+ * @param msgid the MQTT message id to use
+ * @param dup boolean - whether to set the MQTT DUP flag
+ * @param socket the open socket to send the data to
+ * @param clientID the string client identifier, only used for tracing
+ * @return the completion code (e.g. TCPSOCKET_COMPLETE)
+ */
+int MQTTPacket_send_unsubscribe(List* topics, int msgid, int dup, 
networkHandles* net, const char* clientID)
+{
+       Header header;
+       char *data, *ptr;
+       int rc = -1;
+       ListElement *elem = NULL;
+       int datalen;
+
+       FUNC_ENTRY;
+       header.bits.type = UNSUBSCRIBE;
+       header.bits.dup = dup;
+       header.bits.qos = 1;
+       header.bits.retain = 0;
+
+       datalen = 2 + topics->count * 2; /* utf length == 2 */
+       while (ListNextElement(topics, &elem))
+               datalen += (int)strlen((char*)(elem->content));
+       ptr = data = malloc(datalen);
+
+       writeInt(&ptr, msgid);
+       elem = NULL;
+       while (ListNextElement(topics, &elem))
+               writeUTF(&ptr, (char*)(elem->content));
+       rc = MQTTPacket_send(net, header, data, datalen, 1);
+       Log(LOG_PROTOCOL, 25, NULL, net->socket, clientID, msgid, rc);
+       if (rc != TCPSOCKET_INTERRUPTED)
+               free(data);
+       FUNC_EXIT_RC(rc);
+       return rc;
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a8703b5c/thirdparty/paho.mqtt.c/src/MQTTPacketOut.h
----------------------------------------------------------------------
diff --git a/thirdparty/paho.mqtt.c/src/MQTTPacketOut.h 
b/thirdparty/paho.mqtt.c/src/MQTTPacketOut.h
new file mode 100644
index 0000000..700db77
--- /dev/null
+++ b/thirdparty/paho.mqtt.c/src/MQTTPacketOut.h
@@ -0,0 +1,34 @@
+/*******************************************************************************
+ * Copyright (c) 2009, 2014 IBM Corp.
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * and Eclipse Distribution License v1.0 which accompany this distribution. 
+ *
+ * The Eclipse Public License is available at 
+ *    http://www.eclipse.org/legal/epl-v10.html
+ * and the Eclipse Distribution License is available at 
+ *   http://www.eclipse.org/org/documents/edl-v10.php.
+ *
+ * Contributors:
+ *    Ian Craggs - initial API and implementation and/or initial documentation
+ *    Ian Craggs, Allan Stockdill-Mander - SSL updates
+ *    Ian Craggs - MQTT 3.1.1 support
+ 
*******************************************************************************/
+
+#if !defined(MQTTPACKETOUT_H)
+#define MQTTPACKETOUT_H
+
+#include "MQTTPacket.h"
+
+int MQTTPacket_send_connect(Clients* client, int MQTTVersion);
+void* MQTTPacket_connack(unsigned char aHeader, char* data, size_t datalen);
+
+int MQTTPacket_send_pingreq(networkHandles* net, const char* clientID);
+
+int MQTTPacket_send_subscribe(List* topics, List* qoss, int msgid, int dup, 
networkHandles* net, const char* clientID);
+void* MQTTPacket_suback(unsigned char aHeader, char* data, size_t datalen);
+
+int MQTTPacket_send_unsubscribe(List* topics, int msgid, int dup, 
networkHandles* net, const char* clientID);
+
+#endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a8703b5c/thirdparty/paho.mqtt.c/src/MQTTPersistence.c
----------------------------------------------------------------------
diff --git a/thirdparty/paho.mqtt.c/src/MQTTPersistence.c 
b/thirdparty/paho.mqtt.c/src/MQTTPersistence.c
new file mode 100644
index 0000000..24efb6d
--- /dev/null
+++ b/thirdparty/paho.mqtt.c/src/MQTTPersistence.c
@@ -0,0 +1,654 @@
+/*******************************************************************************
+ * Copyright (c) 2009, 2013 IBM Corp.
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * and Eclipse Distribution License v1.0 which accompany this distribution. 
+ *
+ * The Eclipse Public License is available at 
+ *    http://www.eclipse.org/legal/epl-v10.html
+ * and the Eclipse Distribution License is available at 
+ *   http://www.eclipse.org/org/documents/edl-v10.php.
+ *
+ * Contributors:
+ *    Ian Craggs - initial API and implementation and/or initial documentation
+ *    Ian Craggs - async client updates
+ *    Ian Craggs - fix for bug 432903 - queue persistence
+ 
*******************************************************************************/
+
+/**
+ * @file
+ * \brief Functions that apply to persistence operations.
+ *
+ */
+
+#include <stdio.h>
+#include <string.h>
+
+#include "MQTTPersistence.h"
+#include "MQTTPersistenceDefault.h"
+#include "MQTTProtocolClient.h"
+#include "Heap.h"
+
+
+static MQTTPersistence_qEntry* MQTTPersistence_restoreQueueEntry(char* buffer, 
size_t buflen);
+static void MQTTPersistence_insertInSeqOrder(List* list, 
MQTTPersistence_qEntry* qEntry, size_t size);
+
+/**
+ * Creates a ::MQTTClient_persistence structure representing a persistence 
implementation.
+ * @param persistence the ::MQTTClient_persistence structure.
+ * @param type the type of the persistence implementation. See 
::MQTTClient_create.
+ * @param pcontext the context for this persistence implementation. See 
::MQTTClient_create.
+ * @return 0 if success, #MQTTCLIENT_PERSISTENCE_ERROR otherwise.
+ */
+#include "StackTrace.h"
+
+int MQTTPersistence_create(MQTTClient_persistence** persistence, int type, 
void* pcontext)
+{
+       int rc = 0;
+       MQTTClient_persistence* per = NULL;
+
+       FUNC_ENTRY;
+#if !defined(NO_PERSISTENCE)
+       switch (type)
+       {
+               case MQTTCLIENT_PERSISTENCE_NONE :
+                       per = NULL;
+                       break;
+               case MQTTCLIENT_PERSISTENCE_DEFAULT :
+                       per = malloc(sizeof(MQTTClient_persistence));
+                       if ( per != NULL )
+                       {
+                               if ( pcontext != NULL )
+                               {
+                                       per->context = malloc(strlen(pcontext) 
+ 1);
+                                       strcpy(per->context, pcontext);
+                               }
+                               else
+                                       per->context = ".";  /* working 
directory */
+                               /* file system functions */
+                               per->popen        = pstopen;
+                               per->pclose       = pstclose;
+                               per->pput         = pstput;
+                               per->pget         = pstget;
+                               per->premove      = pstremove;
+                               per->pkeys        = pstkeys;
+                               per->pclear       = pstclear;
+                               per->pcontainskey = pstcontainskey;
+                       }
+                       else
+                               rc = MQTTCLIENT_PERSISTENCE_ERROR;
+                       break;
+               case MQTTCLIENT_PERSISTENCE_USER :
+                       per = (MQTTClient_persistence *)pcontext;
+                       if ( per == NULL || (per != NULL && (per->context == 
NULL || per->pclear == NULL ||
+                               per->pclose == NULL || per->pcontainskey == 
NULL || per->pget == NULL || per->pkeys == NULL ||
+                               per->popen == NULL || per->pput == NULL || 
per->premove == NULL)) )
+                               rc = MQTTCLIENT_PERSISTENCE_ERROR;
+                       break;
+               default:
+                       rc = MQTTCLIENT_PERSISTENCE_ERROR;
+                       break;
+       }
+#endif
+
+       *persistence = per;
+
+       FUNC_EXIT_RC(rc);
+       return rc;
+}
+
+
+/**
+ * Open persistent store and restore any persisted messages.
+ * @param client the client as ::Clients.
+ * @param serverURI the URI of the remote end.
+ * @return 0 if success, #MQTTCLIENT_PERSISTENCE_ERROR otherwise.
+ */
+int MQTTPersistence_initialize(Clients *c, const char *serverURI)
+{
+       int rc = 0;
+
+       FUNC_ENTRY;
+       if ( c->persistence != NULL )
+       {
+               rc = c->persistence->popen(&(c->phandle), c->clientID, 
serverURI, c->persistence->context);
+               if ( rc == 0 )
+                       rc = MQTTPersistence_restore(c);
+       }
+
+       FUNC_EXIT_RC(rc);
+       return rc;
+}
+
+
+/**
+ * Close persistent store.
+ * @param client the client as ::Clients.
+ * @return 0 if success, #MQTTCLIENT_PERSISTENCE_ERROR otherwise.
+ */
+int MQTTPersistence_close(Clients *c)
+{
+       int rc =0;
+
+       FUNC_ENTRY;
+       if (c->persistence != NULL)
+       {
+               rc = c->persistence->pclose(c->phandle);
+               c->phandle = NULL;
+#if !defined(NO_PERSISTENCE)
+               if ( c->persistence->popen == pstopen )
+                       free(c->persistence);
+#endif
+               c->persistence = NULL;
+       }
+
+       FUNC_EXIT_RC(rc);
+       return rc;
+}
+
+/**
+ * Clears the persistent store.
+ * @param client the client as ::Clients.
+ * @return 0 if success, #MQTTCLIENT_PERSISTENCE_ERROR otherwise.
+ */
+int MQTTPersistence_clear(Clients *c)
+{
+       int rc = 0;
+
+       FUNC_ENTRY;
+       if (c->persistence != NULL)
+               rc = c->persistence->pclear(c->phandle);
+
+       FUNC_EXIT_RC(rc);
+       return rc;
+}
+
+
+/**
+ * Restores the persisted records to the outbound and inbound message queues 
of the
+ * client.
+ * @param client the client as ::Clients.
+ * @return 0 if success, #MQTTCLIENT_PERSISTENCE_ERROR otherwise.
+ */
+int MQTTPersistence_restore(Clients *c)
+{
+       int rc = 0;
+       char **msgkeys = NULL,
+                *buffer = NULL;
+       int nkeys, buflen;
+       int i = 0;
+       int msgs_sent = 0;
+       int msgs_rcvd = 0;
+
+       FUNC_ENTRY;
+       if (c->persistence && (rc = c->persistence->pkeys(c->phandle, &msgkeys, 
&nkeys)) == 0)
+       {
+               while (rc == 0 && i < nkeys)
+               {
+                       if (strncmp(msgkeys[i], PERSISTENCE_COMMAND_KEY, 
strlen(PERSISTENCE_COMMAND_KEY)) == 0)
+                       {
+                               ;
+                       }
+                       else if (strncmp(msgkeys[i], PERSISTENCE_QUEUE_KEY, 
strlen(PERSISTENCE_QUEUE_KEY)) == 0)
+                       {
+                               ;
+                       }
+                       else if ((rc = c->persistence->pget(c->phandle, 
msgkeys[i], &buffer, &buflen)) == 0)
+                       {
+                               MQTTPacket* pack = 
MQTTPersistence_restorePacket(buffer, buflen);
+                               if ( pack != NULL )
+                               {
+                                       if ( 
strstr(msgkeys[i],PERSISTENCE_PUBLISH_RECEIVED) != NULL )
+                                       {
+                                               Publish* publish = 
(Publish*)pack;
+                                               Messages* msg = NULL;
+                                               msg = 
MQTTProtocol_createMessage(publish, &msg, publish->header.bits.qos, 
publish->header.bits.retain);
+                                               msg->nextMessageType = PUBREL;
+                                               /* order does not matter for 
persisted received messages */
+                                               ListAppend(c->inboundMsgs, msg, 
msg->len);
+                                               publish->topic = NULL;
+                                               MQTTPacket_freePublish(publish);
+                                               msgs_rcvd++;
+                                       }
+                                       else if ( 
strstr(msgkeys[i],PERSISTENCE_PUBLISH_SENT) != NULL )
+                                       {
+                                               Publish* publish = 
(Publish*)pack;
+                                               Messages* msg = NULL;
+                                               char *key = 
malloc(MESSAGE_FILENAME_LENGTH + 1);
+                                               sprintf(key, "%s%d", 
PERSISTENCE_PUBREL, publish->msgId);
+                                               msg = 
MQTTProtocol_createMessage(publish, &msg, publish->header.bits.qos, 
publish->header.bits.retain);
+                                               if ( 
c->persistence->pcontainskey(c->phandle, key) == 0 )
+                                                       /* PUBLISH Qo2 and 
PUBREL sent */
+                                                       msg->nextMessageType = 
PUBCOMP;
+                                               /* else: PUBLISH QoS1, or 
PUBLISH QoS2 and PUBREL not sent */
+                                               /* retry at the first 
opportunity */
+                                               msg->lastTouch = 0;
+                                               
MQTTPersistence_insertInOrder(c->outboundMsgs, msg, msg->len);
+                                               publish->topic = NULL;
+                                               MQTTPacket_freePublish(publish);
+                                               free(key);
+                                               msgs_sent++;
+                                       }
+                                       else if ( 
strstr(msgkeys[i],PERSISTENCE_PUBREL) != NULL )
+                                       {
+                                               /* orphaned PUBRELs ? */
+                                               Pubrel* pubrel = (Pubrel*)pack;
+                                               char *key = 
malloc(MESSAGE_FILENAME_LENGTH + 1);
+                                               sprintf(key, "%s%d", 
PERSISTENCE_PUBLISH_SENT, pubrel->msgId);
+                                               if ( 
c->persistence->pcontainskey(c->phandle, key) != 0 )
+                                                       rc = 
c->persistence->premove(c->phandle, msgkeys[i]);
+                                               free(pubrel);
+                                               free(key);
+                                       }
+                               }
+                               else  /* pack == NULL -> bad persisted record */
+                                       rc = 
c->persistence->premove(c->phandle, msgkeys[i]);
+                       }
+                       if (buffer)
+                       {
+                               free(buffer);
+                               buffer = NULL;
+                       }
+                       if (msgkeys[i])
+                               free(msgkeys[i]);
+                       i++;
+               }
+               if (msgkeys)
+                       free(msgkeys);
+       }
+       Log(TRACE_MINIMUM, -1, "%d sent messages and %d received messages 
restored for client %s\n", 
+               msgs_sent, msgs_rcvd, c->clientID);
+       MQTTPersistence_wrapMsgID(c);
+
+       FUNC_EXIT_RC(rc);
+       return rc;
+}
+
+
+/**
+ * Returns a MQTT packet restored from persisted data.
+ * @param buffer the persisted data.
+ * @param buflen the number of bytes of the data buffer.
+ */
+void* MQTTPersistence_restorePacket(char* buffer, size_t buflen)
+{
+       void* pack = NULL;
+       Header header;
+       int fixed_header_length = 1, ptype, remaining_length = 0;
+       char c;
+       int multiplier = 1;
+       extern pf new_packets[];
+
+       FUNC_ENTRY;
+       header.byte = buffer[0];
+       /* decode the message length according to the MQTT algorithm */
+       do
+       {
+               c = *(++buffer);
+               remaining_length += (c & 127) * multiplier;
+               multiplier *= 128;
+               fixed_header_length++;
+       } while ((c & 128) != 0);
+
+       if ( (fixed_header_length + remaining_length) == buflen )
+       {
+               ptype = header.bits.type;
+               if (ptype >= CONNECT && ptype <= DISCONNECT && 
new_packets[ptype] != NULL)
+                       pack = (*new_packets[ptype])(header.byte, ++buffer, 
remaining_length);
+       }
+
+       FUNC_EXIT;
+       return pack;
+}
+
+
+/**
+ * Inserts the specified message into the list, maintaining message ID order.
+ * @param list the list to insert the message into.
+ * @param content the message to add.
+ * @param size size of the message.
+ */
+void MQTTPersistence_insertInOrder(List* list, void* content, size_t size)
+{
+       ListElement* index = NULL;
+       ListElement* current = NULL;
+
+       FUNC_ENTRY;
+       while(ListNextElement(list, &current) != NULL && index == NULL)
+       {
+               if ( ((Messages*)content)->msgid < 
((Messages*)current->content)->msgid )
+                       index = current;
+       }
+
+       ListInsert(list, content, size, index);
+       FUNC_EXIT;
+}
+
+
+/**
+ * Adds a record to the persistent store. This function must not be called for 
QoS0
+ * messages.
+ * @param socket the socket of the client.
+ * @param buf0 fixed header.
+ * @param buf0len length of the fixed header.
+ * @param count number of buffers representing the variable header and/or the 
payload.
+ * @param buffers the buffers representing the variable header and/or the 
payload.
+ * @param buflens length of the buffers representing the variable header 
and/or the payload.
+ * @param msgId the message ID.
+ * @param scr 0 indicates message in the sending direction; 1 indicates 
message in the
+ * receiving direction.
+ * @return 0 if success, #MQTTCLIENT_PERSISTENCE_ERROR otherwise.
+ */
+int MQTTPersistence_put(int socket, char* buf0, size_t buf0len, int count,
+                                                                char** 
buffers, size_t* buflens, int htype, int msgId, int scr )
+{
+       int rc = 0;
+       extern ClientStates* bstate;
+       int nbufs, i;
+       int* lens = NULL;
+       char** bufs = NULL;
+       char *key;
+       Clients* client = NULL;
+
+       FUNC_ENTRY;
+       client = (Clients*)(ListFindItem(bstate->clients, &socket, 
clientSocketCompare)->content);
+       if (client->persistence != NULL)
+       {
+               key = malloc(MESSAGE_FILENAME_LENGTH + 1);
+               nbufs = 1 + count;
+               lens = (int *)malloc(nbufs * sizeof(int));
+               bufs = (char **)malloc(nbufs * sizeof(char *));
+               lens[0] = (int)buf0len;
+               bufs[0] = buf0;
+               for (i = 0; i < count; i++)
+               {
+                       lens[i+1] = (int)buflens[i];
+                       bufs[i+1] = buffers[i];
+               }
+
+               /* key */
+               if ( scr == 0 )
+               {  /* sending */
+                       if (htype == PUBLISH)   /* PUBLISH QoS1 and QoS2*/
+                               sprintf(key, "%s%d", PERSISTENCE_PUBLISH_SENT, 
msgId);
+                       if (htype == PUBREL)  /* PUBREL */
+                               sprintf(key, "%s%d", PERSISTENCE_PUBREL, msgId);
+               }
+               if ( scr == 1 )  /* receiving PUBLISH QoS2 */
+                       sprintf(key, "%s%d", PERSISTENCE_PUBLISH_RECEIVED, 
msgId);
+
+               rc = client->persistence->pput(client->phandle, key, nbufs, 
bufs, lens);
+
+               free(key);
+               free(lens);
+               free(bufs);
+       }
+
+       FUNC_EXIT_RC(rc);
+       return rc;
+}
+
+
+/**
+ * Deletes a record from the persistent store.
+ * @param client the client as ::Clients.
+ * @param type the type of the persisted record: #PERSISTENCE_PUBLISH_SENT, 
#PERSISTENCE_PUBREL
+ * or #PERSISTENCE_PUBLISH_RECEIVED.
+ * @param qos the qos field of the message.
+ * @param msgId the message ID.
+ * @return 0 if success, #MQTTCLIENT_PERSISTENCE_ERROR otherwise.
+ */
+int MQTTPersistence_remove(Clients* c, char *type, int qos, int msgId)
+{
+       int rc = 0;
+
+       FUNC_ENTRY;
+       if (c->persistence != NULL)
+       {
+               char *key = malloc(MESSAGE_FILENAME_LENGTH + 1);
+               if ( (strcmp(type,PERSISTENCE_PUBLISH_SENT) == 0) && qos == 2 )
+               {
+                       sprintf(key, "%s%d", PERSISTENCE_PUBLISH_SENT, msgId) ;
+                       rc = c->persistence->premove(c->phandle, key);
+                       sprintf(key, "%s%d", PERSISTENCE_PUBREL, msgId) ;
+                       rc = c->persistence->premove(c->phandle, key);
+               }
+               else /* PERSISTENCE_PUBLISH_SENT && qos == 1 */
+               {    /* or PERSISTENCE_PUBLISH_RECEIVED */
+                       sprintf(key, "%s%d", type, msgId) ;
+                       rc = c->persistence->premove(c->phandle, key);
+               }
+               free(key);
+       }
+
+       FUNC_EXIT_RC(rc);
+       return rc;
+}
+
+
+/**
+ * Checks whether the message IDs wrapped by looking for the largest gap 
between two consecutive
+ * message IDs in the outboundMsgs queue.
+ * @param client the client as ::Clients.
+ */
+void MQTTPersistence_wrapMsgID(Clients *client)
+{
+       ListElement* wrapel = NULL;
+       ListElement* current = NULL;
+
+       FUNC_ENTRY;
+       if ( client->outboundMsgs->count > 0 )
+       {
+               int firstMsgID = 
((Messages*)client->outboundMsgs->first->content)->msgid;
+               int lastMsgID = 
((Messages*)client->outboundMsgs->last->content)->msgid;
+               int gap = MAX_MSG_ID - lastMsgID + firstMsgID;
+               current = ListNextElement(client->outboundMsgs, &current);
+
+               while(ListNextElement(client->outboundMsgs, &current) != NULL)
+               {
+                       int curMsgID = ((Messages*)current->content)->msgid;
+                       int curPrevMsgID = 
((Messages*)current->prev->content)->msgid;
+                       int curgap = curMsgID - curPrevMsgID;
+                       if ( curgap > gap )
+                       {
+                               gap = curgap;
+                               wrapel = current;
+                       }
+               }
+       }
+
+       if ( wrapel != NULL )
+       {
+               /* put wrapel at the beginning of the queue */
+               client->outboundMsgs->first->prev = client->outboundMsgs->last;
+               client->outboundMsgs->last->next = client->outboundMsgs->first;
+               client->outboundMsgs->first = wrapel;
+               client->outboundMsgs->last = wrapel->prev;
+               client->outboundMsgs->first->prev = NULL;
+               client->outboundMsgs->last->next = NULL;
+       }
+       FUNC_EXIT;
+}
+
+
+#if !defined(NO_PERSISTENCE)
+int MQTTPersistence_unpersistQueueEntry(Clients* client, 
MQTTPersistence_qEntry* qe)
+{
+       int rc = 0;
+       char key[PERSISTENCE_MAX_KEY_LENGTH + 1];
+       
+       FUNC_ENTRY;
+       sprintf(key, "%s%u", PERSISTENCE_QUEUE_KEY, qe->seqno);
+       if ((rc = client->persistence->premove(client->phandle, key)) != 0)
+               Log(LOG_ERROR, 0, "Error %d removing qEntry from persistence", 
rc);
+       FUNC_EXIT_RC(rc);
+       return rc;
+}
+
+
+int MQTTPersistence_persistQueueEntry(Clients* aclient, 
MQTTPersistence_qEntry* qe)
+{
+       int rc = 0;
+       int nbufs = 8;
+       int bufindex = 0;
+       char key[PERSISTENCE_MAX_KEY_LENGTH + 1];
+       int* lens = NULL;
+       void** bufs = NULL;
+               
+       FUNC_ENTRY;
+       lens = (int*)malloc(nbufs * sizeof(int));
+       bufs = malloc(nbufs * sizeof(char *));
+                                               
+       bufs[bufindex] = &qe->msg->payloadlen;
+       lens[bufindex++] = sizeof(qe->msg->payloadlen);
+                               
+       bufs[bufindex] = qe->msg->payload;
+       lens[bufindex++] = qe->msg->payloadlen;
+               
+       bufs[bufindex] = &qe->msg->qos;
+       lens[bufindex++] = sizeof(qe->msg->qos);
+               
+       bufs[bufindex] = &qe->msg->retained;
+       lens[bufindex++] = sizeof(qe->msg->retained);
+               
+       bufs[bufindex] = &qe->msg->dup;
+       lens[bufindex++] = sizeof(qe->msg->dup);
+                               
+       bufs[bufindex] = &qe->msg->msgid;
+       lens[bufindex++] = sizeof(qe->msg->msgid);
+                                               
+       bufs[bufindex] = qe->topicName;
+       lens[bufindex++] = (int)strlen(qe->topicName) + 1;
+                               
+       bufs[bufindex] = &qe->topicLen;
+       lens[bufindex++] = sizeof(qe->topicLen);                        
+               
+       sprintf(key, "%s%d", PERSISTENCE_QUEUE_KEY, ++aclient->qentry_seqno);   
+       qe->seqno = aclient->qentry_seqno;
+
+       if ((rc = aclient->persistence->pput(aclient->phandle, key, nbufs, 
(char**)bufs, lens)) != 0)
+               Log(LOG_ERROR, 0, "Error persisting queue entry, rc %d", rc);
+
+       free(lens);
+       free(bufs);
+
+       FUNC_EXIT_RC(rc);
+       return rc;
+}
+
+
+static MQTTPersistence_qEntry* MQTTPersistence_restoreQueueEntry(char* buffer, 
size_t buflen)
+{
+       MQTTPersistence_qEntry* qe = NULL;
+       char* ptr = buffer;
+       int data_size;
+       
+       FUNC_ENTRY;
+       qe = malloc(sizeof(MQTTPersistence_qEntry));
+       memset(qe, '\0', sizeof(MQTTPersistence_qEntry));
+       
+       qe->msg = malloc(sizeof(MQTTPersistence_message));
+       memset(qe->msg, '\0', sizeof(MQTTPersistence_message));
+       
+       qe->msg->payloadlen = *(int*)ptr;
+       ptr += sizeof(int);
+       
+       data_size = qe->msg->payloadlen;
+       qe->msg->payload = malloc(data_size);
+       memcpy(qe->msg->payload, ptr, data_size);
+       ptr += data_size;
+       
+       qe->msg->qos = *(int*)ptr;
+       ptr += sizeof(int);
+       
+       qe->msg->retained = *(int*)ptr;
+       ptr += sizeof(int);
+       
+       qe->msg->dup = *(int*)ptr;
+       ptr += sizeof(int);
+       
+       qe->msg->msgid = *(int*)ptr;
+       ptr += sizeof(int);
+       
+       data_size = (int)strlen(ptr) + 1;       
+       qe->topicName = malloc(data_size);
+       strcpy(qe->topicName, ptr);
+       ptr += data_size;
+       
+       qe->topicLen = *(int*)ptr;
+       ptr += sizeof(int);
+
+       FUNC_EXIT;
+       return qe;
+}
+
+
+static void MQTTPersistence_insertInSeqOrder(List* list, 
MQTTPersistence_qEntry* qEntry, size_t size)
+{
+       ListElement* index = NULL;
+       ListElement* current = NULL;
+
+       FUNC_ENTRY;
+       while (ListNextElement(list, &current) != NULL && index == NULL)
+       {
+               if (qEntry->seqno < 
((MQTTPersistence_qEntry*)current->content)->seqno)
+                       index = current;
+       }
+       ListInsert(list, qEntry, size, index);
+       FUNC_EXIT;
+}
+
+
+/**
+ * Restores a queue of messages from persistence to memory
+ * @param c the client as ::Clients - the client object to restore the 
messages to
+ * @return return code, 0 if successful
+ */
+int MQTTPersistence_restoreMessageQueue(Clients* c)
+{
+       int rc = 0;
+       char **msgkeys;
+       int nkeys;
+       int i = 0;
+       int entries_restored = 0;
+
+       FUNC_ENTRY;
+       if (c->persistence && (rc = c->persistence->pkeys(c->phandle, &msgkeys, 
&nkeys)) == 0)
+       {
+               while (rc == 0 && i < nkeys)
+               {
+                       char *buffer = NULL;
+                       int buflen;
+                                       
+                       if (strncmp(msgkeys[i], PERSISTENCE_QUEUE_KEY, 
strlen(PERSISTENCE_QUEUE_KEY)) != 0)
+                       {
+                               ;
+                       }
+                       else if ((rc = c->persistence->pget(c->phandle, 
msgkeys[i], &buffer, &buflen)) == 0)
+                       {
+                               MQTTPersistence_qEntry* qe = 
MQTTPersistence_restoreQueueEntry(buffer, buflen);
+                               
+                               if (qe)
+                               {       
+                                       qe->seqno = atoi(msgkeys[i]+2);
+                                       
MQTTPersistence_insertInSeqOrder(c->messageQueue, qe, 
sizeof(MQTTPersistence_qEntry));
+                                       free(buffer);
+                                       c->qentry_seqno = max(c->qentry_seqno, 
qe->seqno);
+                                       entries_restored++;
+                               }
+                       }
+                       if (msgkeys[i])
+                       {
+                               free(msgkeys[i]);
+                       }
+                       i++;
+               }
+               if (msgkeys != NULL)
+                       free(msgkeys);
+       }
+       Log(TRACE_MINIMUM, -1, "%d queued messages restored for client %s", 
entries_restored, c->clientID);
+       FUNC_EXIT_RC(rc);
+       return rc;
+}
+#endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a8703b5c/thirdparty/paho.mqtt.c/src/MQTTPersistence.h
----------------------------------------------------------------------
diff --git a/thirdparty/paho.mqtt.c/src/MQTTPersistence.h 
b/thirdparty/paho.mqtt.c/src/MQTTPersistence.h
new file mode 100644
index 0000000..9a938ba
--- /dev/null
+++ b/thirdparty/paho.mqtt.c/src/MQTTPersistence.h
@@ -0,0 +1,74 @@
+/*******************************************************************************
+ * Copyright (c) 2009, 2013 IBM Corp.
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * and Eclipse Distribution License v1.0 which accompany this distribution. 
+ *
+ * The Eclipse Public License is available at 
+ *    http://www.eclipse.org/legal/epl-v10.html
+ * and the Eclipse Distribution License is available at 
+ *   http://www.eclipse.org/org/documents/edl-v10.php.
+ *
+ * Contributors:
+ *    Ian Craggs - initial API and implementation and/or initial documentation
+ *    Ian Craggs - async client updates
+ *    Ian Craggs - fix for bug 432903 - queue persistence
+ 
*******************************************************************************/
+
+#if defined(__cplusplus)
+ extern "C" {
+#endif
+
+#include "Clients.h"
+
+/** Stem of the key for a sent PUBLISH QoS1 or QoS2 */
+#define PERSISTENCE_PUBLISH_SENT "s-"
+/** Stem of the key for a sent PUBREL */
+#define PERSISTENCE_PUBREL "sc-"
+/** Stem of the key for a received PUBLISH QoS2 */
+#define PERSISTENCE_PUBLISH_RECEIVED "r-"
+/** Stem of the key for an async client command */
+#define PERSISTENCE_COMMAND_KEY "c-"
+/** Stem of the key for an async client message queue */
+#define PERSISTENCE_QUEUE_KEY "q-"
+#define PERSISTENCE_MAX_KEY_LENGTH 8
+
+int MQTTPersistence_create(MQTTClient_persistence** per, int type, void* 
pcontext);
+int MQTTPersistence_initialize(Clients* c, const char* serverURI);
+int MQTTPersistence_close(Clients* c);
+int MQTTPersistence_clear(Clients* c);
+int MQTTPersistence_restore(Clients* c);
+void* MQTTPersistence_restorePacket(char* buffer, size_t buflen);
+void MQTTPersistence_insertInOrder(List* list, void* content, size_t size);
+int MQTTPersistence_put(int socket, char* buf0, size_t buf0len, int count, 
+                                                                char** 
buffers, size_t* buflens, int htype, int msgId, int scr);
+int MQTTPersistence_remove(Clients* c, char* type, int qos, int msgId);
+void MQTTPersistence_wrapMsgID(Clients *c);
+
+typedef struct
+{
+       char struct_id[4];
+       int struct_version;
+       int payloadlen;
+       void* payload;
+       int qos;
+       int retained;
+       int dup;
+       int msgid;
+} MQTTPersistence_message;
+
+typedef struct
+{
+       MQTTPersistence_message* msg;
+       char* topicName;
+       int topicLen;
+       unsigned int seqno; /* only used on restore */
+} MQTTPersistence_qEntry;
+
+int MQTTPersistence_unpersistQueueEntry(Clients* client, 
MQTTPersistence_qEntry* qe);
+int MQTTPersistence_persistQueueEntry(Clients* aclient, 
MQTTPersistence_qEntry* qe);
+int MQTTPersistence_restoreMessageQueue(Clients* c);
+#ifdef __cplusplus
+     }
+#endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a8703b5c/thirdparty/paho.mqtt.c/src/MQTTPersistenceDefault.c
----------------------------------------------------------------------
diff --git a/thirdparty/paho.mqtt.c/src/MQTTPersistenceDefault.c 
b/thirdparty/paho.mqtt.c/src/MQTTPersistenceDefault.c
new file mode 100644
index 0000000..35c1f53
--- /dev/null
+++ b/thirdparty/paho.mqtt.c/src/MQTTPersistenceDefault.c
@@ -0,0 +1,841 @@
+/*******************************************************************************
+ * Copyright (c) 2009, 2016 IBM Corp.
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * and Eclipse Distribution License v1.0 which accompany this distribution. 
+ *
+ * The Eclipse Public License is available at 
+ *    http://www.eclipse.org/legal/epl-v10.html
+ * and the Eclipse Distribution License is available at 
+ *   http://www.eclipse.org/org/documents/edl-v10.php.
+ *
+ * Contributors:
+ *    Ian Craggs - initial API and implementation and/or initial documentation
+ *    Ian Craggs - async client updates
+ *    Ian Craggs - fix for bug 484496
+ 
*******************************************************************************/
+
+/**
+ * @file
+ * \brief A file system based persistence implementation.
+ *
+ * A directory is specified when the MQTT client is created. When the 
persistence is then
+ * opened (see ::Persistence_open), a sub-directory is made beneath the base 
for this
+ * particular client ID and connection key. This allows one persistence base 
directory to
+ * be shared by multiple clients.
+ *
+ */
+
+#if !defined(NO_PERSISTENCE)
+
+#include "OsWrapper.h"
+
+#include <stdio.h>
+#include <string.h>
+#include <errno.h>
+
+#if defined(WIN32) || defined(WIN64)
+       #include <direct.h>
+       /* Windows doesn't have strtok_r, so remap it to strtok */
+       #define strtok_r( A, B, C ) strtok( A, B )
+       int keysWin32(char *, char ***, int *);
+       int clearWin32(char *);
+       int containskeyWin32(char *, char *);
+#else
+       #include <sys/stat.h>
+       #include <dirent.h>
+       #include <unistd.h>
+       int keysUnix(char *, char ***, int *);
+       int clearUnix(char *);
+       int containskeyUnix(char *, char *);
+#endif
+
+#include "MQTTClientPersistence.h"
+#include "MQTTPersistenceDefault.h"
+#include "StackTrace.h"
+#include "Heap.h"
+
+/** Create persistence directory for the client: context/clientID-serverURI.
+ *  See ::Persistence_open
+ */
+
+int pstopen(void **handle, const char* clientID, const char* serverURI, void* 
context)
+{
+       int rc = 0;
+       char *dataDir = context;
+       char *clientDir;
+       char *pToken = NULL;
+       char *save_ptr = NULL;
+       char *pCrtDirName = NULL;
+       char *pTokDirName = NULL;
+       char *perserverURI = NULL, *ptraux;
+
+       FUNC_ENTRY;
+       /* Note that serverURI=address:port, but ":" not allowed in Windows 
directories */
+       perserverURI = malloc(strlen(serverURI) + 1);
+       strcpy(perserverURI, serverURI);
+       while ((ptraux = strstr(perserverURI, ":")) != NULL)
+               *ptraux = '-' ;
+
+       /* consider '/'  +  '-'  +  '\0' */
+       clientDir = malloc(strlen(dataDir) + strlen(clientID) + 
strlen(perserverURI) + 3);
+       sprintf(clientDir, "%s/%s-%s", dataDir, clientID, perserverURI);
+
+
+       /* create clientDir directory */
+
+       /* pCrtDirName - holds the directory name we are currently trying to 
create.           */
+       /*               This gets built up level by level until the full path 
name is created.*/
+       /* pTokDirName - holds the directory name that gets used by strtok.     
    */
+       pCrtDirName = (char*)malloc( strlen(clientDir) + 1 );
+       pTokDirName = (char*)malloc( strlen(clientDir) + 1 );
+       strcpy( pTokDirName, clientDir );
+
+       pToken = strtok_r( pTokDirName, "\\/", &save_ptr );
+
+       strcpy( pCrtDirName, pToken );
+       rc = pstmkdir( pCrtDirName );
+       pToken = strtok_r( NULL, "\\/", &save_ptr );
+       while ( (pToken != NULL) && (rc == 0) )
+       {
+               /* Append the next directory level and try to create it */
+               strcat( pCrtDirName, "/" );
+               strcat( pCrtDirName, pToken );
+               rc = pstmkdir( pCrtDirName );
+               pToken = strtok_r( NULL, "\\/", &save_ptr );
+       }
+
+       *handle = clientDir;
+
+       free(pTokDirName);
+       free(pCrtDirName);
+       free(perserverURI);
+
+       FUNC_EXIT_RC(rc);
+       return rc;
+}
+
+/** Function to create a directory.
+ * Returns 0 on success or if the directory already exists.
+ */
+int pstmkdir( char *pPathname )
+{
+       int rc = 0;
+
+       FUNC_ENTRY;
+#if defined(WIN32) || defined(WIN64)
+       if ( _mkdir( pPathname ) != 0 )
+       {
+#else
+       /* Create a directory with read, write and execute access for the owner 
and read access for the group */
+#if !defined(_WRS_KERNEL)
+       if ( mkdir( pPathname, S_IRWXU | S_IRGRP ) != 0 )
+#else
+       if ( mkdir( pPathname ) != 0 )
+#endif /* !defined(_WRS_KERNEL) */
+       {
+#endif
+               if ( errno != EEXIST )
+                       rc = MQTTCLIENT_PERSISTENCE_ERROR;
+       }
+
+       FUNC_EXIT_RC(rc);
+       return rc;
+}
+
+
+
+/** Write wire message to the client persistence directory.
+ *  See ::Persistence_put
+ */
+int pstput(void* handle, char* key, int bufcount, char* buffers[], int 
buflens[])
+{
+       int rc = 0;
+       char *clientDir = handle;
+       char *file;
+       FILE *fp;
+       size_t bytesWritten = 0,
+              bytesTotal = 0;
+       int i;
+
+       FUNC_ENTRY;
+       if (clientDir == NULL)
+       {
+               rc = MQTTCLIENT_PERSISTENCE_ERROR;
+               goto exit;
+       }
+
+       /* consider '/' + '\0' */
+       file = malloc(strlen(clientDir) + strlen(key) + 
strlen(MESSAGE_FILENAME_EXTENSION) + 2 );
+       sprintf(file, "%s/%s%s", clientDir, key, MESSAGE_FILENAME_EXTENSION);
+
+       fp = fopen(file, "wb");
+       if ( fp != NULL )
+       {
+               for(i=0; i<bufcount; i++)
+               {
+                       bytesTotal += buflens[i];
+                       bytesWritten += fwrite(buffers[i], sizeof(char), 
buflens[i], fp );
+               }
+               fclose(fp);
+               fp = NULL;
+       } else
+               rc = MQTTCLIENT_PERSISTENCE_ERROR;
+
+       if (bytesWritten != bytesTotal)
+       {
+               pstremove(handle, key);
+               rc = MQTTCLIENT_PERSISTENCE_ERROR;
+       }
+
+       free(file);
+
+exit:
+       FUNC_EXIT_RC(rc);
+       return rc;
+};
+
+
+/** Retrieve a wire message from the client persistence directory.
+ *  See ::Persistence_get
+ */
+int pstget(void* handle, char* key, char** buffer, int* buflen)
+{
+       int rc = 0;
+       FILE *fp;
+       char *clientDir = handle;
+       char *file;
+       char *buf;
+       unsigned long fileLen = 0;
+       unsigned long bytesRead = 0;
+
+       FUNC_ENTRY;
+       if (clientDir == NULL)
+       {
+               rc = MQTTCLIENT_PERSISTENCE_ERROR;
+               goto exit;
+       }
+
+       /* consider '/' + '\0' */
+       file = malloc(strlen(clientDir) + strlen(key) + 
strlen(MESSAGE_FILENAME_EXTENSION) + 2);
+       sprintf(file, "%s/%s%s", clientDir, key, MESSAGE_FILENAME_EXTENSION);
+
+       fp = fopen(file, "rb");
+       if ( fp != NULL )
+       {
+               fseek(fp, 0, SEEK_END);
+               fileLen = ftell(fp);
+               fseek(fp, 0, SEEK_SET);
+               buf=(char *)malloc(fileLen);
+               bytesRead = (int)fread(buf, sizeof(char), fileLen, fp);
+               *buffer = buf;
+               *buflen = bytesRead;
+               if ( bytesRead != fileLen )
+                       rc = MQTTCLIENT_PERSISTENCE_ERROR;
+               fclose(fp);
+               fp = NULL;
+       } else
+               rc = MQTTCLIENT_PERSISTENCE_ERROR;
+
+       free(file);
+       /* the caller must free buf */
+
+exit:
+       FUNC_EXIT_RC(rc);
+       return rc;
+}
+
+
+
+/** Delete a persisted message from the client persistence directory.
+ *  See ::Persistence_remove
+ */
+int pstremove(void* handle, char* key)
+{
+       int rc = 0;
+       char *clientDir = handle;
+       char *file;
+
+       FUNC_ENTRY;
+       if (clientDir == NULL)
+       {
+               return rc = MQTTCLIENT_PERSISTENCE_ERROR;
+               goto exit;
+       }
+
+       /* consider '/' + '\0' */
+       file = malloc(strlen(clientDir) + strlen(key) + 
strlen(MESSAGE_FILENAME_EXTENSION) + 2);
+       sprintf(file, "%s/%s%s", clientDir, key, MESSAGE_FILENAME_EXTENSION);
+
+#if defined(WIN32) || defined(WIN64)
+       if ( _unlink(file) != 0 )
+       {
+#else
+       if ( unlink(file) != 0 )
+       {
+#endif
+               if ( errno != ENOENT )
+                       rc = MQTTCLIENT_PERSISTENCE_ERROR;
+       }
+
+       free(file);
+
+exit:
+       FUNC_EXIT_RC(rc);
+       return rc;
+}
+
+
+/** Delete client persistence directory (if empty).
+ *  See ::Persistence_close
+ */
+int pstclose(void* handle)
+{
+       int rc = 0;
+       char *clientDir = handle;
+
+       FUNC_ENTRY;
+       if (clientDir == NULL)
+       {
+               rc = MQTTCLIENT_PERSISTENCE_ERROR;
+               goto exit;
+       }
+
+#if defined(WIN32) || defined(WIN64)
+       if ( _rmdir(clientDir) != 0 )
+       {
+#else
+       if ( rmdir(clientDir) != 0 )
+       {
+#endif
+               if ( errno != ENOENT && errno != ENOTEMPTY )
+                       rc = MQTTCLIENT_PERSISTENCE_ERROR;
+       }
+
+       free(clientDir);
+
+exit:
+       FUNC_EXIT_RC(rc);
+       return rc;
+}
+
+
+/** Returns whether if a wire message is persisted in the client persistence 
directory.
+ * See ::Persistence_containskey
+ */
+int pstcontainskey(void *handle, char *key)
+{
+       int rc = 0;
+       char *clientDir = handle;
+
+       FUNC_ENTRY;
+       if (clientDir == NULL)
+       {
+               rc = MQTTCLIENT_PERSISTENCE_ERROR;
+               goto exit;
+       }
+
+#if defined(WIN32) || defined(WIN64)
+       rc = containskeyWin32(clientDir, key);
+#else
+       rc = containskeyUnix(clientDir, key);
+#endif
+
+exit:
+       FUNC_EXIT_RC(rc);
+       return rc;
+}
+
+
+#if defined(WIN32) || defined(WIN64)
+int containskeyWin32(char *dirname, char *key)
+{
+       int notFound = MQTTCLIENT_PERSISTENCE_ERROR;
+       int fFinished = 0;
+       char *filekey, *ptraux;
+       char dir[MAX_PATH+1];
+       WIN32_FIND_DATAA FileData;
+       HANDLE hDir;
+
+       FUNC_ENTRY;
+       sprintf(dir, "%s/*", dirname);
+
+       hDir = FindFirstFileA(dir, &FileData);
+       if (hDir != INVALID_HANDLE_VALUE)
+       {
+               while (!fFinished)
+               {
+                       if (FileData.dwFileAttributes & FILE_ATTRIBUTE_ARCHIVE)
+                       {
+                               filekey = malloc(strlen(FileData.cFileName) + 
1);
+                               strcpy(filekey, FileData.cFileName);
+                               ptraux = strstr(filekey, 
MESSAGE_FILENAME_EXTENSION);
+                               if ( ptraux != NULL )
+                                       *ptraux = '\0' ;
+                               if(strcmp(filekey, key) == 0)
+                               {
+                                       notFound = 0;
+                                       fFinished = 1;
+                               }
+                               free(filekey);
+                       }
+                       if (!FindNextFileA(hDir, &FileData))
+                       {
+                               if (GetLastError() == ERROR_NO_MORE_FILES)
+                                       fFinished = 1;
+                       }
+               }
+               FindClose(hDir);
+       }
+
+       FUNC_EXIT_RC(notFound);
+       return notFound;
+}
+#else
+int containskeyUnix(char *dirname, char *key)
+{
+       int notFound = MQTTCLIENT_PERSISTENCE_ERROR;
+       char *filekey, *ptraux;
+       DIR *dp;
+       struct dirent *dir_entry;
+       struct stat stat_info;
+
+       FUNC_ENTRY;
+       if((dp = opendir(dirname)) != NULL)
+       {
+               while((dir_entry = readdir(dp)) != NULL && notFound)
+               {
+                       char* filename = malloc(strlen(dirname) + 
strlen(dir_entry->d_name) + 2);
+                       sprintf(filename, "%s/%s", dirname, dir_entry->d_name);
+                       lstat(filename, &stat_info);
+                       free(filename);
+                       if(S_ISREG(stat_info.st_mode))
+                       {
+                               filekey = malloc(strlen(dir_entry->d_name) + 1);
+                               strcpy(filekey, dir_entry->d_name);
+                               ptraux = strstr(filekey, 
MESSAGE_FILENAME_EXTENSION);
+                               if ( ptraux != NULL )
+                                       *ptraux = '\0' ;
+                               if(strcmp(filekey, key) == 0)
+                                       notFound = 0;
+                               free(filekey);
+                       }
+               }
+               closedir(dp);
+       }
+
+       FUNC_EXIT_RC(notFound);
+       return notFound;
+}
+#endif
+
+
+/** Delete all the persisted message in the client persistence directory.
+ * See ::Persistence_clear
+ */
+int pstclear(void *handle)
+{
+       int rc = 0;
+       char *clientDir = handle;
+
+       FUNC_ENTRY;
+       if (clientDir == NULL)
+       {
+               rc = MQTTCLIENT_PERSISTENCE_ERROR;
+               goto exit;
+       }
+
+#if defined(WIN32) || defined(WIN64)
+       rc = clearWin32(clientDir);
+#else
+       rc = clearUnix(clientDir);
+#endif
+
+exit:
+       FUNC_EXIT_RC(rc);
+       return rc;
+}
+
+
+#if defined(WIN32) || defined(WIN64)
+int clearWin32(char *dirname)
+{
+       int rc = 0;
+       int fFinished = 0;
+       char *file;
+       char dir[MAX_PATH+1];
+       WIN32_FIND_DATAA FileData;
+       HANDLE hDir;
+
+       FUNC_ENTRY;
+       sprintf(dir, "%s/*", dirname);
+
+       hDir = FindFirstFileA(dir, &FileData);
+       if (hDir != INVALID_HANDLE_VALUE)
+       {
+               while (!fFinished)
+               {
+                       if (FileData.dwFileAttributes & FILE_ATTRIBUTE_ARCHIVE)
+                       {
+                               file = malloc(strlen(dirname) + 
strlen(FileData.cFileName) + 2);
+                               sprintf(file, "%s/%s", dirname, 
FileData.cFileName);
+                               rc = remove(file);
+                               free(file);
+                               if ( rc != 0 )
+                               {
+                                       rc = MQTTCLIENT_PERSISTENCE_ERROR;
+                                       break;
+                               }
+                       }
+                       if (!FindNextFileA(hDir, &FileData))
+                       {
+                               if (GetLastError() == ERROR_NO_MORE_FILES)
+                                       fFinished = 1;
+                       }
+               }
+               FindClose(hDir);
+       } else
+               rc = MQTTCLIENT_PERSISTENCE_ERROR;
+
+       FUNC_EXIT_RC(rc);
+       return rc;
+}
+#else
+int clearUnix(char *dirname)
+{
+       int rc = 0;
+       DIR *dp;
+       struct dirent *dir_entry;
+       struct stat stat_info;
+
+       FUNC_ENTRY;
+       if((dp = opendir(dirname)) != NULL)
+       {
+               while((dir_entry = readdir(dp)) != NULL && rc == 0)
+               {
+                       lstat(dir_entry->d_name, &stat_info);
+                       if(S_ISREG(stat_info.st_mode))
+                       {
+                               if ( remove(dir_entry->d_name) != 0 )
+                                       rc = MQTTCLIENT_PERSISTENCE_ERROR;
+                       }
+               }
+               closedir(dp);
+       } else
+               rc = MQTTCLIENT_PERSISTENCE_ERROR;
+
+       FUNC_EXIT_RC(rc);
+       return rc;
+}
+#endif
+
+
+/** Returns the keys (file names w/o the extension) in the client persistence 
directory.
+ *  See ::Persistence_keys
+ */
+int pstkeys(void *handle, char ***keys, int *nkeys)
+{
+       int rc = 0;
+       char *clientDir = handle;
+
+       FUNC_ENTRY;
+       if (clientDir == NULL)
+       {
+               rc = MQTTCLIENT_PERSISTENCE_ERROR;
+               goto exit;
+       }
+
+#if defined(WIN32) || defined(WIN64)
+       rc = keysWin32(clientDir, keys, nkeys);
+#else
+       rc = keysUnix(clientDir, keys, nkeys);
+#endif
+
+exit:
+       FUNC_EXIT_RC(rc);
+       return rc;
+}
+
+
+#if defined(WIN32) || defined(WIN64)
+int keysWin32(char *dirname, char ***keys, int *nkeys)
+{
+       int rc = 0;
+       char **fkeys = NULL;
+       int nfkeys = 0;
+       char dir[MAX_PATH+1];
+       WIN32_FIND_DATAA FileData;
+       HANDLE hDir;
+       int fFinished = 0;
+       char *ptraux;
+       int i;
+
+       FUNC_ENTRY;
+       sprintf(dir, "%s/*", dirname);
+
+       /* get number of keys */
+       hDir = FindFirstFileA(dir, &FileData);
+       if (hDir != INVALID_HANDLE_VALUE)
+       {
+               while (!fFinished)
+               {
+                       if (FileData.dwFileAttributes & FILE_ATTRIBUTE_ARCHIVE)
+                               nfkeys++;
+                       if (!FindNextFileA(hDir, &FileData))
+                       {
+                               if (GetLastError() == ERROR_NO_MORE_FILES)
+                                       fFinished = 1;
+                       }
+               }
+               FindClose(hDir);
+       } else
+       {
+               rc = MQTTCLIENT_PERSISTENCE_ERROR;
+               goto exit;
+       }
+
+       if (nfkeys != 0 )
+               fkeys = (char **)malloc(nfkeys * sizeof(char *));
+
+       /* copy the keys */
+       hDir = FindFirstFileA(dir, &FileData);
+       if (hDir != INVALID_HANDLE_VALUE)
+       {
+               fFinished = 0;
+               i = 0;
+               while (!fFinished)
+               {
+                       if (FileData.dwFileAttributes & FILE_ATTRIBUTE_ARCHIVE)
+                       {
+                               fkeys[i] = malloc(strlen(FileData.cFileName) + 
1);
+                               strcpy(fkeys[i], FileData.cFileName);
+                               ptraux = strstr(fkeys[i], 
MESSAGE_FILENAME_EXTENSION);
+                               if ( ptraux != NULL )
+                                       *ptraux = '\0' ;
+                               i++;
+                       }
+                       if (!FindNextFileA(hDir, &FileData))
+                       {
+                               if (GetLastError() == ERROR_NO_MORE_FILES)
+                                       fFinished = 1;
+                       }
+               }
+               FindClose(hDir);
+       } else
+       {
+               rc = MQTTCLIENT_PERSISTENCE_ERROR;
+               goto exit;
+       }
+
+       *nkeys = nfkeys;
+       *keys = fkeys;
+       /* the caller must free keys */
+
+exit:
+       FUNC_EXIT_RC(rc);
+       return rc;
+}
+#else
+int keysUnix(char *dirname, char ***keys, int *nkeys)
+{
+       int rc = 0;
+       char **fkeys = NULL;
+       int nfkeys = 0;
+       char *ptraux;
+       int i;
+       DIR *dp;
+       struct dirent *dir_entry;
+       struct stat stat_info;
+
+       FUNC_ENTRY;
+       /* get number of keys */
+       if((dp = opendir(dirname)) != NULL)
+       {
+               while((dir_entry = readdir(dp)) != NULL)
+               {
+                       char* temp = 
malloc(strlen(dirname)+strlen(dir_entry->d_name)+2);
+
+                       sprintf(temp, "%s/%s", dirname, dir_entry->d_name);
+                       if (lstat(temp, &stat_info) == 0 && 
S_ISREG(stat_info.st_mode))
+                               nfkeys++;
+                       free(temp);
+               }
+               closedir(dp);
+       } else
+       {
+               rc = MQTTCLIENT_PERSISTENCE_ERROR;
+               goto exit;
+       }
+
+       if (nfkeys != 0)
+       {
+               fkeys = (char **)malloc(nfkeys * sizeof(char *));
+
+               /* copy the keys */
+               if((dp = opendir(dirname)) != NULL)
+               {
+                       i = 0;
+                       while((dir_entry = readdir(dp)) != NULL)
+                       {
+                               char* temp = 
malloc(strlen(dirname)+strlen(dir_entry->d_name)+2);
+       
+                               sprintf(temp, "%s/%s", dirname, 
dir_entry->d_name);
+                               if (lstat(temp, &stat_info) == 0 && 
S_ISREG(stat_info.st_mode))
+                               {
+                                       fkeys[i] = 
malloc(strlen(dir_entry->d_name) + 1);
+                                       strcpy(fkeys[i], dir_entry->d_name);
+                                       ptraux = strstr(fkeys[i], 
MESSAGE_FILENAME_EXTENSION);
+                                       if ( ptraux != NULL )
+                                               *ptraux = '\0' ;
+                                       i++;
+                               }
+                               free(temp);
+                       }
+                       closedir(dp);
+               } else
+               {
+                       rc = MQTTCLIENT_PERSISTENCE_ERROR;
+                       goto exit;
+               }
+       }
+
+       *nkeys = nfkeys;
+       *keys = fkeys;
+       /* the caller must free keys */
+
+exit:
+       FUNC_EXIT_RC(rc);
+       return rc;
+}
+#endif
+
+
+
+#if defined(UNIT_TESTS)
+int main (int argc, char *argv[])
+{
+#define MSTEM "m-"
+#define NMSGS 10
+#define NBUFS 4
+#define NDEL 2
+#define RC !rc ? "(Success)" : "(Failed) "
+
+       int rc;
+       char *handle;
+       char *perdir = ".";
+       const char *clientID = "TheUTClient";
+       const char *serverURI = "127.0.0.1:1883";
+
+       char *stem = MSTEM;
+       int msgId, i;
+       int nm[NDEL] = {5 , 8};  /* msgIds to get and remove */
+
+       char *key;
+       char **keys;
+       int nkeys;
+       char *buffer, *buff;
+       int buflen;
+
+       int nbufs = NBUFS;
+       char *bufs[NBUFS] = {"m0", "mm1", "mmm2" , "mmmm3"};  /* message 
content */
+       int buflens[NBUFS];
+       for(i=0;i<nbufs;i++)
+               buflens[i]=strlen(bufs[i]);
+
+       /* open */
+       /* printf("Persistence directory : %s\n", perdir); */
+       rc = pstopen((void**)&handle, clientID, serverURI, perdir);
+       printf("%s Persistence directory for client %s : %s\n", RC, clientID, 
handle);
+
+       /* put */
+       for(msgId=0;msgId<NMSGS;msgId++)
+       {
+               key = malloc(MESSAGE_FILENAME_LENGTH + 1);
+               sprintf(key, "%s%d", stem, msgId);
+               rc = pstput(handle, key, nbufs, bufs, buflens);
+               printf("%s Adding message %s\n", RC, key);
+               free(key);
+       }
+
+       /* keys ,ie, list keys added */
+       rc = pstkeys(handle, &keys, &nkeys);
+       printf("%s Found %d messages persisted in %s\n", RC, nkeys, handle);
+       for(i=0;i<nkeys;i++)
+               printf("%13s\n", keys[i]);
+
+       if (keys !=NULL)
+               free(keys);
+
+       /* containskey */
+       for(i=0;i<NDEL;i++)
+       {
+               key = malloc(MESSAGE_FILENAME_LENGTH + 1);
+               sprintf(key, "%s%d", stem, nm[i]);
+               rc = pstcontainskey(handle, key);
+               printf("%s Message %s is persisted ?\n", RC, key);
+               free(key);
+       }
+
+       /* get && remove*/
+       for(i=0;i<NDEL;i++)
+       {
+               key = malloc(MESSAGE_FILENAME_LENGTH + 1);
+               sprintf(key, "%s%d", stem, nm[i]);
+               rc = pstget(handle, key, &buffer, &buflen);
+               buff = malloc(buflen+1);
+               memcpy(buff, buffer, buflen);
+               buff[buflen] = '\0';
+               printf("%s Retrieving message %s : %s\n", RC, key, buff);
+               rc = pstremove(handle, key);
+               printf("%s Removing message %s\n", RC, key);
+               free(key);
+               free(buff);
+               free(buffer);
+       }
+
+       /* containskey */
+       for(i=0;i<NDEL;i++)
+       {
+               key = malloc(MESSAGE_FILENAME_LENGTH + 1);
+               sprintf(key, "%s%d", stem, nm[i]);
+               rc = pstcontainskey(handle, key);
+               printf("%s Message %s is persisted ?\n", RC, key);
+               free(key);
+       }
+
+       /* keys ,ie, list keys added */
+       rc = pstkeys(handle, &keys, &nkeys);
+       printf("%s Found %d messages persisted in %s\n", RC, nkeys, handle);
+       for(i=0;i<nkeys;i++)
+               printf("%13s\n", keys[i]);
+
+       if (keys != NULL)
+               free(keys);
+
+
+       /* close -> it will fail, since client persistence directory is not 
empty */
+       rc = pstclose(&handle);
+       printf("%s Closing client persistence directory for client %s\n", RC, 
clientID);
+
+       /* clear */
+       rc = pstclear(handle);
+       printf("%s Deleting all persisted messages in %s\n", RC, handle);
+
+       /* keys ,ie, list keys added */
+       rc = pstkeys(handle, &keys, &nkeys);
+       printf("%s Found %d messages persisted in %s\n", RC, nkeys, handle);
+       for(i=0;i<nkeys;i++)
+               printf("%13s\n", keys[i]);
+
+       if ( keys != NULL )
+               free(keys);
+
+       /* close */
+       rc = pstclose(&handle);
+       printf("%s Closing client persistence directory for client %s\n", RC, 
clientID);
+}
+#endif
+
+
+#endif /* NO_PERSISTENCE */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a8703b5c/thirdparty/paho.mqtt.c/src/MQTTPersistenceDefault.h
----------------------------------------------------------------------
diff --git a/thirdparty/paho.mqtt.c/src/MQTTPersistenceDefault.h 
b/thirdparty/paho.mqtt.c/src/MQTTPersistenceDefault.h
new file mode 100644
index 0000000..27fedd6
--- /dev/null
+++ b/thirdparty/paho.mqtt.c/src/MQTTPersistenceDefault.h
@@ -0,0 +1,33 @@
+/*******************************************************************************
+ * Copyright (c) 2009, 2013 IBM Corp.
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * and Eclipse Distribution License v1.0 which accompany this distribution. 
+ *
+ * The Eclipse Public License is available at 
+ *    http://www.eclipse.org/legal/epl-v10.html
+ * and the Eclipse Distribution License is available at 
+ *   http://www.eclipse.org/org/documents/edl-v10.php.
+ *
+ * Contributors:
+ *    Ian Craggs - initial API and implementation and/or initial documentation
+ 
*******************************************************************************/
+
+/** 8.3 filesystem */
+#define MESSAGE_FILENAME_LENGTH 8    
+/** Extension of the filename */
+#define MESSAGE_FILENAME_EXTENSION ".msg"
+
+/* prototypes of the functions for the default file system persistence */
+int pstopen(void** handle, const char* clientID, const char* serverURI, void* 
context); 
+int pstclose(void* handle); 
+int pstput(void* handle, char* key, int bufcount, char* buffers[], int 
buflens[]); 
+int pstget(void* handle, char* key, char** buffer, int* buflen); 
+int pstremove(void* handle, char* key); 
+int pstkeys(void* handle, char*** keys, int* nkeys); 
+int pstclear(void* handle); 
+int pstcontainskey(void* handle, char* key);
+
+int pstmkdir(char *pPathname);
+

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a8703b5c/thirdparty/paho.mqtt.c/src/MQTTProtocol.h
----------------------------------------------------------------------
diff --git a/thirdparty/paho.mqtt.c/src/MQTTProtocol.h 
b/thirdparty/paho.mqtt.c/src/MQTTProtocol.h
new file mode 100644
index 0000000..7478103
--- /dev/null
+++ b/thirdparty/paho.mqtt.c/src/MQTTProtocol.h
@@ -0,0 +1,46 @@
+/*******************************************************************************
+ * Copyright (c) 2009, 2014 IBM Corp.
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * and Eclipse Distribution License v1.0 which accompany this distribution. 
+ *
+ * The Eclipse Public License is available at 
+ *    http://www.eclipse.org/legal/epl-v10.html
+ * and the Eclipse Distribution License is available at 
+ *   http://www.eclipse.org/org/documents/edl-v10.php.
+ *
+ * Contributors:
+ *    Ian Craggs - initial API and implementation and/or initial documentation
+ *    Ian Craggs - MQTT 3.1.1 updates
+ 
*******************************************************************************/
+
+#if !defined(MQTTPROTOCOL_H)
+#define MQTTPROTOCOL_H
+
+#include "LinkedList.h"
+#include "MQTTPacket.h"
+#include "Clients.h"
+
+#define MAX_MSG_ID 65535
+#define MAX_CLIENTID_LEN 65535
+
+typedef struct
+{
+       int socket;
+       Publications* p;
+} pending_write;
+
+
+typedef struct
+{
+       List publications;
+       unsigned int msgs_received;
+       unsigned int msgs_sent;
+       List pending_writes; /* for qos 0 writes not complete */
+} MQTTProtocol;
+
+
+#include "MQTTProtocolOut.h"
+
+#endif

Reply via email to