This is an automated email from the ASF dual-hosted git repository.

phrocker pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git


The following commit(s) were added to refs/heads/master by this push:
     new 42aee28  MINIFICPP-822 - Nanofi raw S2S implementation shouldn't 
depend on any C++ code.
42aee28 is described below

commit 42aee28c29bb9e0c448ca3272ea80cfe6477d00e
Author: Arpad Boda <[email protected]>
AuthorDate: Thu Apr 25 15:11:17 2019 +0200

    MINIFICPP-822 - Nanofi raw S2S implementation shouldn't depend on any C++ 
code.
    
    Fix return value of DescriptionStream::readData in case buffer is not fully 
filled
    
    Address review comments
    
    This closes #550.
    
    Signed-off-by: Marc Parisi <[email protected]>
---
 cmake/BuildTests.cmake                             |   4 +-
 libminifi/opsys/posix/io/ClientSocket.h            |  10 +-
 libminifi/src/io/DescriptorStream.cpp              |   5 +-
 libminifi/src/io/posix/ClientSocket.cpp            |   4 +-
 libminifi/test/CPPLINT.cfg                         |   1 +
 libminifi/test/RandomServerSocket.cpp              |  58 ++++
 .../test/RandomServerSocket.h                      |  43 ++-
 nanofi/include/core/cstructs.h                     |  13 +-
 nanofi/include/core/cxxstructs.h                   |   4 -
 nanofi/include/sitetosite/CPeer.h                  |  11 +-
 nanofi/include/sitetosite/CRawSocketProtocol.h     |   2 +-
 nanofi/include/sitetosite/CSiteToSite.h            |  13 +-
 nanofi/src/core/cstream.c                          | 236 ++++++++++++++++
 nanofi/src/core/cstream.cpp                        | 164 ------------
 nanofi/src/sitetosite/CPeer.c                      |   7 +-
 nanofi/src/sitetosite/CRawSocketProtocol.c         |   6 +-
 nanofi/tests/CSite2SiteTests.cpp                   | 297 +++++++++++----------
 17 files changed, 504 insertions(+), 374 deletions(-)

diff --git a/cmake/BuildTests.cmake b/cmake/BuildTests.cmake
index 591516d..668564e 100644
--- a/cmake/BuildTests.cmake
+++ b/cmake/BuildTests.cmake
@@ -97,7 +97,7 @@ SET(SPD_LIB spd_lib)
 add_library(${SPD_LIB} STATIC ${SPD_SOURCES})
 
 SET(TEST_BASE_LIB test_base)
-add_library(${TEST_BASE_LIB} STATIC "${TEST_DIR}/TestBase.cpp")
+add_library(${TEST_BASE_LIB} STATIC "${TEST_DIR}/TestBase.cpp" 
"${TEST_DIR}/RandomServerSocket.cpp")
 target_include_directories(${TEST_BASE_LIB} BEFORE PRIVATE 
"${CMAKE_SOURCE_DIR}/thirdparty/catch")
 target_include_directories(${TEST_BASE_LIB} BEFORE PRIVATE 
"${CMAKE_SOURCE_DIR}/libminifi/include/")
 target_include_directories(${TEST_BASE_LIB} BEFORE PRIVATE 
"${CMAKE_SOURCE_DIR}/thirdparty/cron")
@@ -139,7 +139,7 @@ FOREACH(testfile ${NANOFI_UNIT_TESTS})
     target_include_directories(${testfilename} BEFORE PRIVATE 
"${CMAKE_SOURCE_DIR}/extensions/standard-processors/processors/")
     target_include_directories(${testfilename} BEFORE PRIVATE 
"${CMAKE_SOURCE_DIR}/libminifi/test")
     appendIncludes("${testfilename}")
-    target_link_libraries(${testfilename} ${CMAKE_THREAD_LIBS_INIT} 
${CATCH_MAIN_LIB} nanofi)
+    target_link_libraries(${testfilename} ${CMAKE_THREAD_LIBS_INIT} 
${CATCH_MAIN_LIB} ${TEST_BASE_LIB}  nanofi)
     
     if (APPLE)
        # minifi-standard-processors
diff --git a/libminifi/opsys/posix/io/ClientSocket.h 
b/libminifi/opsys/posix/io/ClientSocket.h
index 4ed8b99..739f7f9 100644
--- a/libminifi/opsys/posix/io/ClientSocket.h
+++ b/libminifi/opsys/posix/io/ClientSocket.h
@@ -108,7 +108,9 @@ class Socket : public BaseStream {
    * Return the port for this socket
    * @returns port
    */
-  uint16_t getPort();
+  uint16_t getPort() const {
+    return port_;
+  }
 
   // data stream extensions
   /**
@@ -276,6 +278,12 @@ class Socket : public BaseStream {
 
 
   bool nonBlocking_;
+
+ protected:
+  void setPort(uint16_t port) {
+    port_ = port;
+  }
+
  private:
   std::shared_ptr<logging::Logger> logger_;
   static std::string init_hostname() {
diff --git a/libminifi/src/io/DescriptorStream.cpp 
b/libminifi/src/io/DescriptorStream.cpp
index 38e1c85..6431ef5 100644
--- a/libminifi/src/io/DescriptorStream.cpp
+++ b/libminifi/src/io/DescriptorStream.cpp
@@ -84,11 +84,10 @@ int DescriptorStream::readData(uint8_t *buf, int buflen) {
   if (!IsNullOrEmpty(buf)) {
     auto size_read = ::read(fd_, buf, buflen);
 
-    if (size_read != buflen) {
+    if (size_read < 0) {
       return -1;
-    } else {
-      return buflen;
     }
+    return  size_read;
 
   } else {
     return -1;
diff --git a/libminifi/src/io/posix/ClientSocket.cpp 
b/libminifi/src/io/posix/ClientSocket.cpp
index 113f914..7fa4d22 100644
--- a/libminifi/src/io/posix/ClientSocket.cpp
+++ b/libminifi/src/io/posix/ClientSocket.cpp
@@ -42,7 +42,7 @@ namespace nifi {
 namespace minifi {
 namespace io {
 
-Socket::Socket(const std::shared_ptr<SocketContext> &context, const 
std::string &hostname, const uint16_t port, const uint16_t listeners = -1)
+Socket::Socket(const std::shared_ptr<SocketContext>& /*context*/, const 
std::string &hostname, const uint16_t port, const uint16_t listeners = -1)
     : requested_hostname_(hostname),
       port_(port),
       addr_info_(0),
@@ -59,7 +59,7 @@ Socket::Socket(const std::shared_ptr<SocketContext> &context, 
const std::string
   FD_ZERO(&read_fds_);
 }
 
-Socket::Socket(const std::shared_ptr<SocketContext> &context, const 
std::string &hostname, const uint16_t port)
+Socket::Socket(const std::shared_ptr<SocketContext>& context, const 
std::string &hostname, const uint16_t port)
     : Socket(context, hostname, port, 0) {
 }
 
diff --git a/libminifi/test/CPPLINT.cfg b/libminifi/test/CPPLINT.cfg
index 7df4c76..9f461f1 100644
--- a/libminifi/test/CPPLINT.cfg
+++ b/libminifi/test/CPPLINT.cfg
@@ -2,3 +2,4 @@ set noparent
 filter=-build/include_order,-build/include_alpha
 exclude_files=Server.cpp
 exclude_files=TestBase.cpp
+exclude_files=RandomServerSocket.cpp
diff --git a/libminifi/test/RandomServerSocket.cpp 
b/libminifi/test/RandomServerSocket.cpp
new file mode 100644
index 0000000..2ff01f7
--- /dev/null
+++ b/libminifi/test/RandomServerSocket.cpp
@@ -0,0 +1,58 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "./RandomServerSocket.h"
+
+#include <sstream>
+#include <random>
+#include <string>
+#include <memory>
+
+#include "core/logging/Logger.h"
+#include "core/logging/LoggerConfiguration.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace io {
+
+  RandomServerSocket::RandomServerSocket(const std::string& host, uint16_t 
offset, uint16_t range, uint16_t retries) :
+      
ServerSocket::ServerSocket(std::make_shared<org::apache::nifi::minifi::io::SocketContext>(std::make_shared<minifi::Configure>()),
 host, 0, 1) {
+    std::random_device rd;
+    std::mt19937 gen(rd());
+    std::uniform_int_distribution<uint16_t> dis(offset, offset + range);
+    auto logger = logging::LoggerFactory<RandomServerSocket>::getLogger();
+    for (uint16_t i = 0; i < retries; ++i) {
+      setPort(dis(gen));
+      if (initialize() == 0) {
+        logger->log_info("Created socket listens on generated port: %hu", 
getPort());
+        return;
+      }
+    }
+    std::stringstream error;
+    error << "Couldn't bind to a port between " << offset << " and " << 
offset+range << " in " << retries << " try!";
+    logger->log_error(error.str().c_str());
+    throw error.str();
+  }
+
+} /* namespace io */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
diff --git a/nanofi/include/core/cxxstructs.h 
b/libminifi/test/RandomServerSocket.h
similarity index 53%
copy from nanofi/include/core/cxxstructs.h
copy to libminifi/test/RandomServerSocket.h
index b0b90eb..1cf7a57 100644
--- a/nanofi/include/core/cxxstructs.h
+++ b/libminifi/test/RandomServerSocket.h
@@ -16,35 +16,26 @@
  * limitations under the License.
  */
 
-#ifndef NIFI_MINIFI_CPP_CXXSTRUCTS_H
-#define NIFI_MINIFI_CPP_CXXSTRUCTS_H
+#ifndef NIFI_MINIFI_CPP_RANDOMSERVERSOCKET_H
+#define NIFI_MINIFI_CPP_RANDOMSERVERSOCKET_H
 
-#include "cstructs.h"
-#include "cxx/Plan.h"
-#include "io/DataStream.h"
+#include "io/ServerSocket.h"
 
-struct flow : public ExecutionPlan {
-  using ExecutionPlan::ExecutionPlan;
-};
-
-struct standalone_processor : public core::Processor {
-  using core::Processor::Processor;
-};
-
-struct processor : public core::Processor {
-  using core::Processor::Processor;
-};
-
-struct processor_session : public core::ProcessSession {
-  using core::ProcessSession::ProcessSession;
-};
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace io {
 
-struct processor_context : public core::ProcessContext {
-  using core::ProcessContext::ProcessContext;
+class RandomServerSocket : public ServerSocket {
+ public:
+  RandomServerSocket(const std::string& host = "localhost", uint16_t offset = 
30000, uint16_t range = 10000, uint16_t retries = 100);
 };
 
-struct cstream {
-  org::apache::nifi::minifi::io::BaseStream * impl;
-};
+} /* namespace io */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
 
-#endif //NIFI_MINIFI_CPP_CXXSTRUCTS_H
+#endif //NIFI_MINIFI_CPP_RANDOMSERVERSOCKET_H
diff --git a/nanofi/include/core/cstructs.h b/nanofi/include/core/cstructs.h
index 7d258f7..d775747 100644
--- a/nanofi/include/core/cstructs.h
+++ b/nanofi/include/core/cstructs.h
@@ -22,6 +22,11 @@
 #include <stddef.h>
 #include <stdint.h>
 
+#ifdef _WIN32
+#define NOMINMAX
+#include <winsock2.h>
+#endif
+
 #ifdef _MSC_VER
 #define DEPRECATED __declspec(deprecated)
 #elif defined(__GNUC__) | defined(__clang__)
@@ -136,6 +141,12 @@ typedef struct file_buffer {
   uint64_t file_len;
 } file_buffer;
 
-typedef struct cstream cstream;
+#ifndef _WIN32
+typedef int SOCKET;
+#endif
+
+typedef struct cstream {
+  SOCKET socket_;
+} cstream;
 
 #endif /* LIBMINIFI_SRC_CAPI_CSTRUCTS_H_ */
diff --git a/nanofi/include/core/cxxstructs.h b/nanofi/include/core/cxxstructs.h
index b0b90eb..29f7f5a 100644
--- a/nanofi/include/core/cxxstructs.h
+++ b/nanofi/include/core/cxxstructs.h
@@ -43,8 +43,4 @@ struct processor_context : public core::ProcessContext {
   using core::ProcessContext::ProcessContext;
 };
 
-struct cstream {
-  org::apache::nifi::minifi::io::BaseStream * impl;
-};
-
 #endif //NIFI_MINIFI_CPP_CXXSTRUCTS_H
diff --git a/nanofi/include/sitetosite/CPeer.h 
b/nanofi/include/sitetosite/CPeer.h
index 47bdb63..11e4ece 100644
--- a/nanofi/include/sitetosite/CPeer.h
+++ b/nanofi/include/sitetosite/CPeer.h
@@ -99,25 +99,20 @@ static void setPort(struct SiteToSiteCPeer * peer, uint16_t 
port) {
   }
 }
 
-static void initPeer(struct SiteToSiteCPeer * peer, cstream * injected_socket, 
const char * host, uint16_t port, const char * ifc) {
-  peer->_stream = injected_socket;
-  //peer->local_network_interface_= std::move(io::NetworkInterface(ifc, 
nullptr));
+static void initPeer(struct SiteToSiteCPeer * peer, const char * host, 
uint16_t port, const char * ifc) {
+  peer->_stream = NULL;
   peer->_host = NULL;
   peer->_url = NULL;
   peer->_port = 0;
   setHostName(peer, host);
   setPort(peer, port);
-
-  if(peer->_stream == NULL) {
-    peer->_owns_resource = True;
-  }
 }
 
 static void freePeer(struct SiteToSiteCPeer * peer) {
   closePeer(peer);
   setHostName(peer, NULL);
 
-  if(peer->_owns_resource == True && peer->_stream != NULL) {
+  if(peer->_stream != NULL) {
     free_socket(peer->_stream);
     peer->_stream = NULL;
   }
diff --git a/nanofi/include/sitetosite/CRawSocketProtocol.h 
b/nanofi/include/sitetosite/CRawSocketProtocol.h
index f99b8bd..977b482 100644
--- a/nanofi/include/sitetosite/CRawSocketProtocol.h
+++ b/nanofi/include/sitetosite/CRawSocketProtocol.h
@@ -194,7 +194,7 @@ static void initRawClient(struct CRawSiteToSiteClient 
*client, struct SiteToSite
 
 static struct CRawSiteToSiteClient* createClient(const char * host, uint16_t 
port, const char * nifi_port) {
   struct SiteToSiteCPeer * peer = (struct SiteToSiteCPeer 
*)malloc(sizeof(struct SiteToSiteCPeer));
-  initPeer(peer, NULL, host, port, "");
+  initPeer(peer, host, port, "");
   struct CRawSiteToSiteClient* client = (struct 
CRawSiteToSiteClient*)malloc(sizeof(struct CRawSiteToSiteClient));
   initRawClient(client, peer);
   client->_owns_resource = True;
diff --git a/nanofi/include/sitetosite/CSiteToSite.h 
b/nanofi/include/sitetosite/CSiteToSite.h
index 8728cef..0569f04 100644
--- a/nanofi/include/sitetosite/CSiteToSite.h
+++ b/nanofi/include/sitetosite/CSiteToSite.h
@@ -335,28 +335,22 @@ static int readData(CTransaction * transaction, uint8_t 
*buf, int buflen) {
   return ret;
 }
 
-static int is_little_endian() {
-  static unsigned int x = 1;
-  static char *c = (char*) &x;
-  return  (*c == 1) ? 1 : 0;
-}
-
 static int write_uint64t(CTransaction * transaction, uint64_t base_value) {
-  const uint64_t value = is_little_endian() == 1 ? htonll_r(base_value) : 
base_value;
+  const uint64_t value = htonll_r(base_value);
   const uint8_t * buf = (uint8_t*)(&value);
 
   return writeData(transaction, buf, sizeof(uint64_t));
 }
 
 static int write_uint32t(CTransaction * transaction, uint32_t base_value) {
-  const uint32_t value = is_little_endian() == 1 ? htonl(base_value) : 
base_value;
+  const uint32_t value = htonl(base_value);
   const uint8_t * buf = (uint8_t*)(&value);
 
   return writeData(transaction, buf, sizeof(uint32_t));
 }
 
 static int write_uint16t(CTransaction * transaction, uint16_t base_value) {
-  const uint16_t value = is_little_endian() == 1 ? htons(base_value) : 
base_value;
+  const uint16_t value = htons(base_value);
   const uint8_t *buf = (uint8_t *) (&value);
 
   return writeData(transaction, buf, sizeof(uint16_t));
@@ -391,7 +385,6 @@ static int write_UTF_len(CTransaction * transaction, const 
char * str, size_t le
 }
 
 static int write_UTF(CTransaction * transaction, const char * str, enum Bool 
widen) {
-  //return transaction->_stream->writeUTF(str, widen);
   return write_UTF_len(transaction, str, strlen(str), widen);
 }
 
diff --git a/nanofi/src/core/cstream.c b/nanofi/src/core/cstream.c
new file mode 100644
index 0000000..3919728
--- /dev/null
+++ b/nanofi/src/core/cstream.c
@@ -0,0 +1,236 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifdef _WIN32
+#define NOMINMAX
+#include <winsock2.h>
+#else
+#include <sys/socket.h>        // socket
+#include <arpa/inet.h> // inet_addr
+#include <netdb.h> // hostent
+#include <unistd.h> // close
+#endif
+
+#include <errno.h>
+#include <string.h>
+
+#include "api/nanofi.h"
+#include "core/cstructs.h"
+#include "core/cstream.h"
+#include "core/log.h"
+
+int write_uint32_t(uint32_t value, cstream * stream) {
+  value = htonl(value);
+  return write_buffer((uint8_t*)(&value), sizeof(uint32_t), stream);
+}
+
+int write_uint16_t(uint16_t value, cstream * stream) {
+  value = htons(value);
+  return write_buffer((uint8_t*)(&value), sizeof(uint16_t), stream);
+}
+
+int write_buffer(const uint8_t *value, int len, cstream * stream) {
+  int ret = 0, bytes = 0;
+
+  while (bytes < len) {
+    ret = send(stream->socket_, value + bytes, len - bytes, 0);
+    // check for errors
+    if (ret <= 0) {
+      if (ret < 0 && errno == EINTR) {
+        continue;
+      }
+      logc(err, "Could not send to %d, error: %s", stream->socket_, 
strerror(errno));
+      close_stream(stream);
+      return ret;
+    }
+    bytes += ret;
+  }
+
+  if (bytes)
+    logc(trace, "Sent data size %d over socket %d", bytes, stream->socket_);
+  return bytes;
+}
+
+int read_buffer(uint8_t *buf, int len, cstream * stream) {
+  int32_t total_read = 0;
+  while (len) {
+    int bytes_read = recv(stream->socket_, buf, len, 0);
+    if (bytes_read <= 0) {
+      if (bytes_read == 0) {
+        logc(debug, "Other side hung up on %d",stream->socket_);
+      } else {
+        if (errno == EINTR) {
+          continue;
+        }
+        logc(err, "Could not recv on %d, error: %s", stream->socket_, 
strerror(errno));
+      }
+      return -1;
+    }
+    len -= bytes_read;
+    buf += bytes_read;
+    total_read += bytes_read;
+  }
+  if(total_read)
+    logc(trace, "Received data size %d over socket %d", total_read, 
stream->socket_);
+  return total_read;
+}
+
+int writeUTF(const char * cstr, uint64_t len, enum Bool widen, cstream * 
stream) {
+  if (len > 65535) {
+    return -1;
+  }
+
+  int ret;
+  if (!widen) {
+    uint16_t shortlen = len;
+    ret = write_uint16_t(shortlen, stream);
+  } else {
+    ret = write_uint32_t(len, stream);
+  }
+
+  if(len == 0 || ret < 0) {
+    return ret;
+  }
+
+  const uint8_t *underlyingPtr = (const uint8_t *)cstr;
+
+  if (!widen) {
+    uint16_t short_length = len;
+    ret = write_buffer(underlyingPtr, short_length, stream);
+  } else {
+    ret = write_buffer(underlyingPtr, len, stream);
+  }
+  return ret;
+}
+
+int read_uint8_t(uint8_t *value, cstream * stream) {
+  uint8_t val;
+  int ret = read_buffer(&val, sizeof(uint8_t), stream);
+  if(ret == sizeof(uint8_t)) {
+    *value = val;
+  }
+  return ret;
+}
+int read_uint16_t(uint16_t *value, cstream * stream) {
+  uint16_t val;
+  int ret = read_buffer((uint8_t*)&val, sizeof(uint16_t), stream);
+  if(ret == sizeof(uint16_t)) {
+    *value = ntohs(val);
+  }
+  return ret;
+}
+int read_uint32_t(uint32_t *value, cstream * stream) {
+  uint32_t val;
+  int ret = read_buffer((uint8_t*)&val, sizeof(uint32_t), stream);
+  if(ret == sizeof(uint32_t)) {
+    *value = ntohl(val);
+  }
+  return ret;
+}
+
+int readUTFLen(uint32_t * utflen, cstream * stream) {
+  int ret = 1;
+  uint16_t shortLength = 0;
+  ret = read_uint16_t(&shortLength, stream);
+  if (ret > 0) {
+    *utflen = shortLength;
+  }
+  return ret;
+}
+
+int readUTF(char * buf, uint64_t buflen, cstream * stream) {
+  //return stream->impl->readData((uint8_t*)buf, buflen);
+  return read_buffer((uint8_t*)buf, buflen, stream);
+}
+
+void close_stream(cstream * stream) {
+  if(stream != NULL && stream->socket_ != -1) {
+#ifdef _WIN32
+    shutdown(stream->socket_, SD_BOTH);
+    closesocket(stream->socket_);
+    WSACleanup();
+#else
+    shutdown(stream->socket_, SHUT_RDWR);
+    close(stream->socket_);
+#endif
+    stream->socket_ = -1;
+  }
+}
+
+cstream * create_socket(const char * host, uint16_t portnum) {
+  logc(trace, "Creating socket to connect to: %s:%d", host, portnum);
+
+#ifdef _WIN32
+  WSADATA wsa;
+  if (WSAStartup(MAKEWORD(2,2),&wsa) != 0)
+  {
+    logc(err, "%s", "WSAStartup failed");
+    return NULL;
+  }
+#endif
+
+  struct addrinfo *result, *rp;
+  struct addrinfo hints;
+  memset(&hints, 0, sizeof(struct addrinfo));
+  hints.ai_family = AF_UNSPEC;    /* Allow IPv4 or IPv6 */
+  hints.ai_socktype = SOCK_STREAM; /* Datagram socket */
+  hints.ai_flags = 0;
+  hints.ai_protocol = 0;          /* Any protocol */
+
+  char portstr[6];
+  snprintf(portstr, 6, "%d", portnum);
+
+  if (getaddrinfo(host, portstr, &hints, &result) != 0) {
+    logc(err, "%s%s", "Failed to resolve hostname: ", host);
+    return NULL;
+  }
+
+  SOCKET sock;
+
+  for (rp = result; rp != NULL; rp = rp->ai_next) {
+    sock = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol);
+    if (sock == -1) {
+      continue;
+    }
+
+    if (connect(sock, rp->ai_addr, rp->ai_addrlen) != -1) {
+      break;
+    }
+
+    close(sock);
+  }
+
+  freeaddrinfo(result);
+
+  if (rp == NULL) {
+    logc(err, "Failed to connect to %s:%u", host, portnum);
+    return NULL;
+  }
+
+  cstream *stream = (cstream *) malloc(sizeof(cstream));
+  stream->socket_ = sock;
+  logc(debug, "%s", "Socket successfully connected");
+  return stream;
+}
+
+void free_socket(cstream * stream) {
+  if(stream != NULL) {
+    close_stream(stream);
+    free(stream);
+  }
+}
diff --git a/nanofi/src/core/cstream.cpp b/nanofi/src/core/cstream.cpp
deleted file mode 100644
index a463437..0000000
--- a/nanofi/src/core/cstream.cpp
+++ /dev/null
@@ -1,164 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "api/nanofi.h"
-#include "core/cstructs.h"
-#include "core/cxxstructs.h"
-#include "core/cstream.h"
-#include "io/BaseStream.h"
-#include "io/DataStream.h"
-#include "io/ClientSocket.h"
-#include "cxx/Instance.h"
-
-int write_uint64_t(uint64_t value, cstream * stream) {
-  return stream->impl->Serializable::write(value, stream->impl);
-}
-int write_uint32_t(uint32_t value, cstream * stream) {
-  return stream->impl->Serializable::write(value, stream->impl);
-}
-int write_uint16_t(uint16_t value, cstream * stream) {
-  return stream->impl->Serializable::write(value, stream->impl);
-}
-int write_uint8_t(uint8_t value, cstream * stream) {
-  return stream->impl->Serializable::write(value, stream->impl);
-}
-int write_char(char value, cstream * stream) {
-  return stream->impl->Serializable::write(value, stream->impl);
-}
-int write_buffer(const uint8_t *value, int len, cstream * stream) {
-  int ret_val = stream->impl->Serializable::write(const_cast<uint8_t 
*>(value), len, stream->impl);
-  return ret_val;
-}
-
-int writeUTF(const char * cstr, uint64_t len, Bool widen, cstream * stream) {
-  std::string str(cstr, len);
-  return stream->impl->Serializable::writeUTF(str, stream->impl, widen == 
True);
-}
-
-int read_char(char *value, cstream * stream) {
-  char val;
-  int ret = stream->impl->Serializable::read(val, stream->impl);
-  if(ret == sizeof(char)) {
-    *value = val;
-  }
-  return ret;
-}
-int read_uint8_t(uint8_t *value, cstream * stream) {
-  uint8_t val;
-  int ret = stream->impl->Serializable::read(val, stream->impl);
-  if(ret == sizeof(uint8_t)) {
-    *value = val;
-  }
-  return ret;
-}
-int read_uint16_t(uint16_t *value, cstream * stream) {
-  uint16_t val;
-  int ret = stream->impl->Serializable::read(val, stream->impl);
-  if(ret == sizeof(uint16_t)) {
-    *value = val;
-  }
-  return ret;
-}
-int read_uint32_t(uint32_t *value, cstream * stream) {
-  uint32_t val;
-  int ret = stream->impl->Serializable::read(val, stream->impl);
-  if(ret == sizeof(uint32_t)) {
-    *value = val;
-  }
-  return ret;
-}
-int read_uint64_t(uint64_t *value, cstream * stream) {
-  uint64_t val;
-  int ret = stream->impl->Serializable::read(val, stream->impl);
-  if(ret == sizeof(uint64_t)) {
-    *value = val;
-  }
-  return ret;
-}
-
-int read_buffer(uint8_t *value, int len, cstream * stream) {
-  return stream->impl->Serializable::read(value, len, stream->impl);
-}
-
-int readUTFLen(uint32_t * utflen, cstream * stream) {
-  int ret = 1;
-  uint16_t shortLength = 0;
-  ret = read_uint16_t(&shortLength, stream);
-  if (ret > 0) {
-    *utflen = shortLength;
-  }
-  return ret;
-}
-
-int readUTF(char * buf, uint64_t buflen, cstream * stream) {
-  return stream->impl->readData((uint8_t*)buf, buflen);
-}
-
-void close_stream(cstream * stream) {
-  if(stream != NULL && stream->impl != NULL) {
-    stream->impl->closeStream();
-  }
-}
-
-int open_stream(cstream * stream) {
-  if(stream != NULL && stream->impl != NULL) {
-    return stream->impl->initialize();
-  }
-  return -1;
-}
-
-cstream * create_socket(const char * host, uint16_t portnum) {
-  nifi_port nport;
-
-  char random_port[6] = "65443";
-
-  nport.port_id = random_port;
-
-  nifi_instance *instance = create_instance(host, &nport);
-
-  auto minifi_instance_ref = 
static_cast<minifi::Instance*>(instance->instance_ptr);
-
-  auto stream_factory_ = 
minifi::io::StreamFactory::getInstance(minifi_instance_ref->getConfiguration());
-
-  cstream * stream = (cstream*)malloc(sizeof(cstream));
-
-  auto socket = stream_factory_->createSocket(host, portnum);
-
-  free_instance(instance);
-
-  if(socket) {
-    if(socket->initialize() == 0) {
-      stream->impl = socket.release();
-      return stream;
-    }
-  }
-  return NULL;
-}
-
-void free_socket(cstream * stream) {
-  if(stream != NULL) {
-    if(stream->impl != NULL) {
-      auto socket = static_cast<minifi::io::Socket*>(stream->impl);
-      if(socket) {
-        socket->closeStream();
-        delete socket; //This is ugly, but only sockets get deleted this way
-      }
-    }
-    free(stream);
-  }
-}
diff --git a/nanofi/src/sitetosite/CPeer.c b/nanofi/src/sitetosite/CPeer.c
index fb1f417..70ecede 100644
--- a/nanofi/src/sitetosite/CPeer.c
+++ b/nanofi/src/sitetosite/CPeer.c
@@ -28,7 +28,7 @@ int openPeer(struct SiteToSiteCPeer * peer) {
   }
 
   //In case there was no socket injected, let's create it
-  if(peer->_stream == NULL && peer->_owns_resource == True) {
+  if(peer->_stream == NULL) {
     peer->_stream = create_socket(peer->_host, peer->_port);
     if(peer->_stream == NULL) {
       logc(err, "%s", "failed to open socket");
@@ -44,11 +44,6 @@ int openPeer(struct SiteToSiteCPeer * peer) {
 
   // TODO: support provided interface
 
-  if(open_stream(peer->_stream) != 0) {
-    logc(err, "%s", "failed to open stream");
-    return -1;
-  }
-
   uint16_t data_size = sizeof MAGIC_BYTES;
 
   if(write_buffer((uint8_t*)MAGIC_BYTES, data_size, peer->_stream) != 
data_size) {
diff --git a/nanofi/src/sitetosite/CRawSocketProtocol.c 
b/nanofi/src/sitetosite/CRawSocketProtocol.c
index e89a36c..73741f2 100644
--- a/nanofi/src/sitetosite/CRawSocketProtocol.c
+++ b/nanofi/src/sitetosite/CRawSocketProtocol.c
@@ -187,6 +187,7 @@ int handShake(struct CRawSiteToSiteClient * client) {
   ret = readResponse(client, &code);
 
   if (ret <= 0) {
+    logc(err, "failed to receive response code from server");
     return -1;
   }
 
@@ -722,7 +723,8 @@ int confirm(struct CRawSiteToSiteClient * client, const 
char * transactionID) {
 
       if(content_size > 0 && ff->crp != NULL) {
         content_buf = (uint8_t*)malloc(content_size*sizeof(uint8_t));
-        if(get_content(ff, content_buf, content_size) <= 0) {
+        len = get_content(ff, content_buf, content_size);
+        if(len <= 0) {
           return -2;
         }
         ret = write_uint64t(transaction, len);
@@ -733,7 +735,7 @@ int confirm(struct CRawSiteToSiteClient * client, const 
char * transactionID) {
         writeData(transaction, content_buf, len);
       }
 
-    } else if (strlen(packet->payload_) > 0) {
+    } else if (packet->payload_ != NULL && strlen(packet->payload_) > 0) {
       len = strlen(packet->payload_);
 
       ret = write_uint64t(transaction, len);
diff --git a/nanofi/tests/CSite2SiteTests.cpp b/nanofi/tests/CSite2SiteTests.cpp
index 4b37de2..cc2902f 100644
--- a/nanofi/tests/CSite2SiteTests.cpp
+++ b/nanofi/tests/CSite2SiteTests.cpp
@@ -18,31 +18,59 @@
 
 #include <stdlib.h>
 
-#include <uuid/uuid.h>
 #include <algorithm>
+#include <chrono>
 #include <string>
 #include <memory>
 #include <utility>
 #include <map>
+#include <thread>
+
 #include "io/BaseStream.h"
 #include "TestBase.h"
 #include "unit/SiteToSiteHelper.h"
 #include "sitetosite/CPeer.h"
 #include "sitetosite/CRawSocketProtocol.h"
 #include "sitetosite/CSiteToSite.h"
-#include <algorithm>
-#include <core/cxxstructs.h>
+#include "sitetosite/RawSocketProtocol.h"
+#include "uuid/uuid.h"
+#include "core/cstructs.h"
+#include "RandomServerSocket.h"
+#include "core/log.h"
 
 #define FMT_DEFAULT fmt_lower
 
-TEST_CASE("TestSetPortId", "[S2S1]") {
-  auto stream_ptr = std::unique_ptr<minifi::io::BaseStream>(new 
org::apache::nifi::minifi::io::BaseStream());
-
-  cstream cstrm;
-  cstrm.impl = stream_ptr.get();
+const char * ATTR_NAME = "some_key";
+const char * ATTR_VALUE = "some value";
+const char * PAYLOAD = "Test MiNiFi payload";
+const char * PAYLOAD_CRC = "2006463717"; // Depends on both payload and 
attributes, do NOT change manually!
+std::string CODEC_NAME = "StandardFlowFileCodec";
+
+struct S2SReceivedData {
+  bool request_type_ok = false;
+  std::string magic_string;
+  uint32_t attr_num = 0;
+  std::map<std::string, std::string> attributes;
+  std::vector<uint8_t> payload;
+};
+
+// This struct is used to sync between simulated clients and servers
+struct TransferState {
+  std::atomic<bool> handshake_data_processed{false};
+  std::atomic<bool> transer_completed{false};
+  std::atomic<bool> data_processed{false};
+
+};
+
+void wait_until(std::atomic<bool>& b) {
+  while(!b) {
+    std::this_thread::sleep_for(std::chrono::milliseconds(0)); //Just yield
+  }
+}
 
+TEST_CASE("TestSetPortId", "[S2S1]") {
   SiteToSiteCPeer peer;
-  initPeer(&peer, &cstrm, "fake_host", 65433, "");
+  initPeer(&peer, "fake_host", 65433, "");
   CRawSiteToSiteClient * protocol = 
(CRawSiteToSiteClient*)malloc(sizeof(CRawSiteToSiteClient));
 
   initRawClient(protocol, &peer);
@@ -60,164 +88,145 @@ TEST_CASE("TestSetPortId", "[S2S1]") {
   free(protocol);
 }
 
-TEST_CASE("TestSetPortIdUppercase", "[S2S2]") {
-  auto stream_ptr = std::unique_ptr<minifi::io::BaseStream>(new 
org::apache::nifi::minifi::io::BaseStream());
-
-  cstream cstrm;
-  cstrm.impl = stream_ptr.get();
-
-  SiteToSiteCPeer peer;
-  initPeer(&peer, &cstrm, "fake_host", 65433, "");
-  CRawSiteToSiteClient * protocol = 
(CRawSiteToSiteClient*)malloc(sizeof(CRawSiteToSiteClient));
-
-  initRawClient(protocol, &peer);
-
-  std::string uuid_str = "C56A4180-65AA-42EC-A945-5FD21DEC0538";
-
-  //setPortId(protocol, uuid_str.c_str());
-
-  //REQUIRE(uuid_str != getPortId(protocol));
-
-  std::transform(uuid_str.begin(), uuid_str.end(), uuid_str.begin(), 
::tolower);
-
-  //REQUIRE(uuid_str == std::string(getPortId(protocol)));
-
-  tearDown(protocol);
-
-  freePeer(&peer);
-
-  free(protocol);
+void send_response_code(minifi::io::BaseStream* stream, uint8_t resp) {
+  std::array<uint8_t, 3> resp_codes = {'R', 'C', resp};
+  for(uint8_t r : resp_codes) {
+    stream->write(&r, 1);
+  }
 }
 
-void sunny_path_bootstrap(SiteToSiteResponder *collector) {
-  char a = 0x14;  // RESOURCE_OK
-  std::string resp_code;
-  resp_code.insert(resp_code.begin(), a);
-  collector->push_response(resp_code);
-
-  // Handshake respond code
-  resp_code = "R";
-  collector->push_response(resp_code);
-  resp_code = "C";
-  collector->push_response(resp_code);
-  char b = 0x1;
-  resp_code = b;
-  collector->push_response(resp_code);
-
-  // Codec Negotiation
-  resp_code = a;
-  collector->push_response(resp_code);
+void accept_transfer(minifi::io::BaseStream* stream, const std::string& 
crcstr, TransferState& transfer_state, S2SReceivedData& s2s_data) {
+  //In long term it would be nice to calculate the crc of the received data 
here
+  send_response_code(stream, 12); // confirmed
+  stream->writeUTF(crcstr);
+  send_response_code(stream, 13); // transaction finished
+
+  wait_until(transfer_state.transer_completed);
+
+  std::string requesttype;
+  stream->readUTF(requesttype);
+
+  if(requesttype == "SEND_FLOWFILES") {
+    s2s_data.request_type_ok = true;
+    stream->read(s2s_data.attr_num);
+    std::string key, value;
+    for(int i = 0; i < s2s_data.attr_num; ++i) {
+      stream->readUTF(key, true);
+      stream->readUTF(value, true);
+      s2s_data.attributes[key] = value;
+    }
+    uint64_t content_size=0;
+    stream->read(content_size);
+    s2s_data.payload.resize(content_size);
+    stream->readData(s2s_data.payload, content_size);
+  } else {
+    s2s_data.request_type_ok = false;
+  }
+  transfer_state.data_processed = true;
 }
 
-TEST_CASE("TestSiteToSiteVerifySend", "[S2S3]") {
-
-  SiteToSiteResponder *collector = new SiteToSiteResponder();
-  sunny_path_bootstrap(collector);
+void sunny_path_bootstrap(minifi::io::BaseStream* stream, TransferState& 
transfer_state, S2SReceivedData& s2s_data) {
+  //Verify the magic string
+  char c_array[4];
+  stream->readData((uint8_t*)c_array, 4);
+  s2s_data.magic_string = std::string(c_array, 4);
+  uint8_t success = 0x14;
+  stream->write(&success, 1);
+  send_response_code(stream, 0x1);
+  stream->write(&success, 1);
+
+  //just consume handshake data
+  bool found_codec = false;
+  int read_len = 0;
+  while(!found_codec) {
+    uint8_t handshake_data[1000];
+    int actual_len = stream->readData(handshake_data+read_len, 1000-read_len);
+    if(actual_len <= 0) {
+      continue;
+    }
+    read_len += actual_len;
+    std::string incoming_data(reinterpret_cast<const char *>(handshake_data), 
read_len);
+    auto it = std::search(incoming_data.begin(), incoming_data.end(), 
CODEC_NAME.begin(), CODEC_NAME.end());
+    if(it != incoming_data.end()){
+      size_t idx = std::distance(incoming_data.begin(), it);
+      //Actual version follows the string as an uint32_t // that should be the 
end of the buffer
+      found_codec = idx + CODEC_NAME.length() + sizeof(uint32_t) == read_len;
+    }
+  }
+
+  transfer_state.handshake_data_processed = true;
+}
 
-  auto stream_ptr = std::unique_ptr<minifi::io::BaseStream>(new 
org::apache::nifi::minifi::io::BaseStream(collector));
+void different_version_bootstrap(minifi::io::BaseStream* stream, 
TransferState& transfer_state, S2SReceivedData& s2s_data) {
+  uint8_t resp_code = 0x15;
+  stream->write(&resp_code, 1);
 
-  cstream cstrm;
-  cstrm.impl = stream_ptr.get();
+  uint32_t version = 4;
+  stream->write(version);
 
-  SiteToSiteCPeer peer;
-  initPeer(&peer, &cstrm, "fake_host", 65433, "");
+  sunny_path_bootstrap(stream, transfer_state, s2s_data);
+}
 
-  CRawSiteToSiteClient * protocol = 
(CRawSiteToSiteClient*)malloc(sizeof(CRawSiteToSiteClient));
+TEST_CASE("TestSiteToBootStrap", "[S2S3]") {
 
-  initRawClient(protocol, &peer);
+  std::array<std::function<void(minifi::io::BaseStream*, TransferState&, 
S2SReceivedData&)>, 2> bootstrap_functions =
+      {sunny_path_bootstrap, different_version_bootstrap};
 
-  std::string uuid_str = "C56A4180-65AA-42EC-A945-5FD21DEC0538";
+  for(const auto& bootstrap_func : bootstrap_functions) {
+    TransferState transfer_state;
+    S2SReceivedData received_data;
+    std::unique_ptr<minifi::io::ServerSocket> sckt(new 
minifi::io::RandomServerSocket("localhost"));
+    uint16_t port = sckt->getPort();
 
-  setPortId(protocol, uuid_str.c_str());
+    sckt->registerCallback([]() -> bool { return true; },  [&bootstrap_func, 
&transfer_state, &received_data](minifi::io::BaseStream* stream)
+      {bootstrap_func(stream, transfer_state, received_data); 
accept_transfer(stream, PAYLOAD_CRC, transfer_state, received_data); } );
 
-  REQUIRE(0 == bootstrap(protocol));
-
-  REQUIRE(collector->get_next_client_response() == "NiFi");
-  collector->get_next_client_response();
-  REQUIRE(collector->get_next_client_response() == "SocketFlowFileProtocol");
-  collector->get_next_client_response();
-  collector->get_next_client_response();
-  collector->get_next_client_response();
-  collector->get_next_client_response();
-  REQUIRE(collector->get_next_client_response() == "nifi://fake_host:65433");
-  collector->get_next_client_response();
-  collector->get_next_client_response();
-  REQUIRE(collector->get_next_client_response() == "GZIP");
-  collector->get_next_client_response();
-  REQUIRE(collector->get_next_client_response() == "false");
-  collector->get_next_client_response();
-  REQUIRE(collector->get_next_client_response() == "PORT_IDENTIFIER");
-  collector->get_next_client_response();
-
-  std::string temp_val = collector->get_next_client_response();
-  std::transform(temp_val.begin(), temp_val.end(), temp_val.begin(), 
::tolower);
-
-  REQUIRE(temp_val == "c56a4180-65aa-42ec-a945-5fd21dec0538");
-  collector->get_next_client_response();
-  REQUIRE(collector->get_next_client_response() == 
"REQUEST_EXPIRATION_MILLIS");
-  collector->get_next_client_response();
-  REQUIRE(collector->get_next_client_response() == "30000");
-  collector->get_next_client_response();
-  REQUIRE(collector->get_next_client_response() == "NEGOTIATE_FLOWFILE_CODEC");
-  collector->get_next_client_response();
-  REQUIRE(collector->get_next_client_response() == "StandardFlowFileCodec");
-  collector->get_next_client_response();  // codec version
-
-  // start to send the stuff
-  // Create the transaction
-  const char * transactionID;
-  const char * payload = "Test MiNiFi payload";
-  CTransaction* transaction;
-  transaction = createTransaction(protocol, SEND);
-  REQUIRE(transaction != NULL);
-  transactionID = getUUIDStr(transaction);
-  collector->get_next_client_response();
-  REQUIRE(collector->get_next_client_response() == "SEND_FLOWFILES");
-  attribute_set as;
-  as.size = 0;
-  as.attributes = NULL;
-  CDataPacket packet;
-  initPacket(&packet, transaction, &as, payload);
-  REQUIRE(sendPacket(protocol, transactionID, &packet, nullptr) == 0);
-  collector->get_next_client_response();
-  collector->get_next_client_response();
-
-  std::string rx_payload = collector->get_next_client_response();
-  REQUIRE(payload == rx_payload);
+    bool c_handshake_ok = false;
+    bool c_transfer_ok = false;
 
-  freePeer(&peer);
+    auto c_client_thread = [&transfer_state, &c_handshake_ok, &c_transfer_ok, 
port]() {
+      SiteToSiteCPeer cpeer;
+      initPeer(&cpeer, "localhost", port, "");
 
-  free(protocol);
-}
+      CRawSiteToSiteClient cprotocol;
 
-TEST_CASE("TestSiteToSiteVerifyNegotiationFail", "[S2S4]") {
-  SiteToSiteResponder *collector = new SiteToSiteResponder();
+      initRawClient(&cprotocol, &cpeer);
 
-  char a = 0xFF;
-  std::string resp_code;
-  resp_code.insert(resp_code.begin(), a);
-  collector->push_response(resp_code);
-  collector->push_response(resp_code);
+      std::string uuid_str = "C56A4180-65AA-42EC-A945-5FD21DEC0538";
 
-  auto stream_ptr = std::unique_ptr<minifi::io::BaseStream>(new 
org::apache::nifi::minifi::io::BaseStream(collector));
+      c_handshake_ok = bootstrap(&cprotocol) == 0;
 
-  cstream cstrm;
-  cstrm.impl = stream_ptr.get();
+      const char * payload = PAYLOAD;
 
-  SiteToSiteCPeer peer;
-  initPeer(&peer, &cstrm, "fake_host", 65433, "");
+      attribute attribute1;
 
-  CRawSiteToSiteClient * protocol = 
(CRawSiteToSiteClient*)malloc(sizeof(CRawSiteToSiteClient));
+      attribute1.key = ATTR_NAME;
+      const char * attr_value = ATTR_VALUE;
+      attribute1.value = (void *)attr_value;
+      attribute1.value_size = strlen(attr_value);
 
-  initRawClient(protocol, &peer);
+      attribute_set as;
+      as.size = 1;
+      as.attributes = &attribute1;
 
-  std::string uuid_str = "C56A4180-65AA-42EC-A945-5FD21DEC0538";
+      wait_until(transfer_state.handshake_data_processed);
+      c_transfer_ok = (transmitPayload(&cprotocol, payload, &as) == 0);
+      transfer_state.transer_completed = true;
 
-  setPortId(protocol, uuid_str.c_str());
+      destroyClient(&cprotocol);
+    };
 
-  REQUIRE(-1 == bootstrap(protocol));
+    std::thread c_thread(c_client_thread);
+    c_thread.join();
+    wait_until(transfer_state.data_processed);
 
-  freePeer(&peer);
+    REQUIRE(c_handshake_ok == true);
+    REQUIRE(c_transfer_ok == true);
 
-  free(protocol);
-}
+    REQUIRE(received_data.magic_string == "NiFi");
+    REQUIRE(received_data.request_type_ok);
+    REQUIRE(received_data.attr_num == 1);
+    REQUIRE(received_data.attributes[ATTR_NAME] == ATTR_VALUE);
+    REQUIRE(std::string(reinterpret_cast<const 
char*>(received_data.payload.data()), received_data.payload.size()) == PAYLOAD);
+  }
+}
\ No newline at end of file

Reply via email to