This is an automated email from the ASF dual-hosted git repository.
phrocker pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
The following commit(s) were added to refs/heads/master by this push:
new 42aee28 MINIFICPP-822 - Nanofi raw S2S implementation shouldn't
depend on any C++ code.
42aee28 is described below
commit 42aee28c29bb9e0c448ca3272ea80cfe6477d00e
Author: Arpad Boda <[email protected]>
AuthorDate: Thu Apr 25 15:11:17 2019 +0200
MINIFICPP-822 - Nanofi raw S2S implementation shouldn't depend on any C++
code.
Fix return value of DescriptionStream::readData in case buffer is not fully
filled
Address review comments
This closes #550.
Signed-off-by: Marc Parisi <[email protected]>
---
cmake/BuildTests.cmake | 4 +-
libminifi/opsys/posix/io/ClientSocket.h | 10 +-
libminifi/src/io/DescriptorStream.cpp | 5 +-
libminifi/src/io/posix/ClientSocket.cpp | 4 +-
libminifi/test/CPPLINT.cfg | 1 +
libminifi/test/RandomServerSocket.cpp | 58 ++++
.../test/RandomServerSocket.h | 43 ++-
nanofi/include/core/cstructs.h | 13 +-
nanofi/include/core/cxxstructs.h | 4 -
nanofi/include/sitetosite/CPeer.h | 11 +-
nanofi/include/sitetosite/CRawSocketProtocol.h | 2 +-
nanofi/include/sitetosite/CSiteToSite.h | 13 +-
nanofi/src/core/cstream.c | 236 ++++++++++++++++
nanofi/src/core/cstream.cpp | 164 ------------
nanofi/src/sitetosite/CPeer.c | 7 +-
nanofi/src/sitetosite/CRawSocketProtocol.c | 6 +-
nanofi/tests/CSite2SiteTests.cpp | 297 +++++++++++----------
17 files changed, 504 insertions(+), 374 deletions(-)
diff --git a/cmake/BuildTests.cmake b/cmake/BuildTests.cmake
index 591516d..668564e 100644
--- a/cmake/BuildTests.cmake
+++ b/cmake/BuildTests.cmake
@@ -97,7 +97,7 @@ SET(SPD_LIB spd_lib)
add_library(${SPD_LIB} STATIC ${SPD_SOURCES})
SET(TEST_BASE_LIB test_base)
-add_library(${TEST_BASE_LIB} STATIC "${TEST_DIR}/TestBase.cpp")
+add_library(${TEST_BASE_LIB} STATIC "${TEST_DIR}/TestBase.cpp"
"${TEST_DIR}/RandomServerSocket.cpp")
target_include_directories(${TEST_BASE_LIB} BEFORE PRIVATE
"${CMAKE_SOURCE_DIR}/thirdparty/catch")
target_include_directories(${TEST_BASE_LIB} BEFORE PRIVATE
"${CMAKE_SOURCE_DIR}/libminifi/include/")
target_include_directories(${TEST_BASE_LIB} BEFORE PRIVATE
"${CMAKE_SOURCE_DIR}/thirdparty/cron")
@@ -139,7 +139,7 @@ FOREACH(testfile ${NANOFI_UNIT_TESTS})
target_include_directories(${testfilename} BEFORE PRIVATE
"${CMAKE_SOURCE_DIR}/extensions/standard-processors/processors/")
target_include_directories(${testfilename} BEFORE PRIVATE
"${CMAKE_SOURCE_DIR}/libminifi/test")
appendIncludes("${testfilename}")
- target_link_libraries(${testfilename} ${CMAKE_THREAD_LIBS_INIT}
${CATCH_MAIN_LIB} nanofi)
+ target_link_libraries(${testfilename} ${CMAKE_THREAD_LIBS_INIT}
${CATCH_MAIN_LIB} ${TEST_BASE_LIB} nanofi)
if (APPLE)
# minifi-standard-processors
diff --git a/libminifi/opsys/posix/io/ClientSocket.h
b/libminifi/opsys/posix/io/ClientSocket.h
index 4ed8b99..739f7f9 100644
--- a/libminifi/opsys/posix/io/ClientSocket.h
+++ b/libminifi/opsys/posix/io/ClientSocket.h
@@ -108,7 +108,9 @@ class Socket : public BaseStream {
* Return the port for this socket
* @returns port
*/
- uint16_t getPort();
+ uint16_t getPort() const {
+ return port_;
+ }
// data stream extensions
/**
@@ -276,6 +278,12 @@ class Socket : public BaseStream {
bool nonBlocking_;
+
+ protected:
+ void setPort(uint16_t port) {
+ port_ = port;
+ }
+
private:
std::shared_ptr<logging::Logger> logger_;
static std::string init_hostname() {
diff --git a/libminifi/src/io/DescriptorStream.cpp
b/libminifi/src/io/DescriptorStream.cpp
index 38e1c85..6431ef5 100644
--- a/libminifi/src/io/DescriptorStream.cpp
+++ b/libminifi/src/io/DescriptorStream.cpp
@@ -84,11 +84,10 @@ int DescriptorStream::readData(uint8_t *buf, int buflen) {
if (!IsNullOrEmpty(buf)) {
auto size_read = ::read(fd_, buf, buflen);
- if (size_read != buflen) {
+ if (size_read < 0) {
return -1;
- } else {
- return buflen;
}
+ return size_read;
} else {
return -1;
diff --git a/libminifi/src/io/posix/ClientSocket.cpp
b/libminifi/src/io/posix/ClientSocket.cpp
index 113f914..7fa4d22 100644
--- a/libminifi/src/io/posix/ClientSocket.cpp
+++ b/libminifi/src/io/posix/ClientSocket.cpp
@@ -42,7 +42,7 @@ namespace nifi {
namespace minifi {
namespace io {
-Socket::Socket(const std::shared_ptr<SocketContext> &context, const
std::string &hostname, const uint16_t port, const uint16_t listeners = -1)
+Socket::Socket(const std::shared_ptr<SocketContext>& /*context*/, const
std::string &hostname, const uint16_t port, const uint16_t listeners = -1)
: requested_hostname_(hostname),
port_(port),
addr_info_(0),
@@ -59,7 +59,7 @@ Socket::Socket(const std::shared_ptr<SocketContext> &context,
const std::string
FD_ZERO(&read_fds_);
}
-Socket::Socket(const std::shared_ptr<SocketContext> &context, const
std::string &hostname, const uint16_t port)
+Socket::Socket(const std::shared_ptr<SocketContext>& context, const
std::string &hostname, const uint16_t port)
: Socket(context, hostname, port, 0) {
}
diff --git a/libminifi/test/CPPLINT.cfg b/libminifi/test/CPPLINT.cfg
index 7df4c76..9f461f1 100644
--- a/libminifi/test/CPPLINT.cfg
+++ b/libminifi/test/CPPLINT.cfg
@@ -2,3 +2,4 @@ set noparent
filter=-build/include_order,-build/include_alpha
exclude_files=Server.cpp
exclude_files=TestBase.cpp
+exclude_files=RandomServerSocket.cpp
diff --git a/libminifi/test/RandomServerSocket.cpp
b/libminifi/test/RandomServerSocket.cpp
new file mode 100644
index 0000000..2ff01f7
--- /dev/null
+++ b/libminifi/test/RandomServerSocket.cpp
@@ -0,0 +1,58 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "./RandomServerSocket.h"
+
+#include <sstream>
+#include <random>
+#include <string>
+#include <memory>
+
+#include "core/logging/Logger.h"
+#include "core/logging/LoggerConfiguration.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace io {
+
+ RandomServerSocket::RandomServerSocket(const std::string& host, uint16_t
offset, uint16_t range, uint16_t retries) :
+
ServerSocket::ServerSocket(std::make_shared<org::apache::nifi::minifi::io::SocketContext>(std::make_shared<minifi::Configure>()),
host, 0, 1) {
+ std::random_device rd;
+ std::mt19937 gen(rd());
+ std::uniform_int_distribution<uint16_t> dis(offset, offset + range);
+ auto logger = logging::LoggerFactory<RandomServerSocket>::getLogger();
+ for (uint16_t i = 0; i < retries; ++i) {
+ setPort(dis(gen));
+ if (initialize() == 0) {
+ logger->log_info("Created socket listens on generated port: %hu",
getPort());
+ return;
+ }
+ }
+ std::stringstream error;
+ error << "Couldn't bind to a port between " << offset << " and " <<
offset+range << " in " << retries << " try!";
+ logger->log_error(error.str().c_str());
+ throw error.str();
+ }
+
+} /* namespace io */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
diff --git a/nanofi/include/core/cxxstructs.h
b/libminifi/test/RandomServerSocket.h
similarity index 53%
copy from nanofi/include/core/cxxstructs.h
copy to libminifi/test/RandomServerSocket.h
index b0b90eb..1cf7a57 100644
--- a/nanofi/include/core/cxxstructs.h
+++ b/libminifi/test/RandomServerSocket.h
@@ -16,35 +16,26 @@
* limitations under the License.
*/
-#ifndef NIFI_MINIFI_CPP_CXXSTRUCTS_H
-#define NIFI_MINIFI_CPP_CXXSTRUCTS_H
+#ifndef NIFI_MINIFI_CPP_RANDOMSERVERSOCKET_H
+#define NIFI_MINIFI_CPP_RANDOMSERVERSOCKET_H
-#include "cstructs.h"
-#include "cxx/Plan.h"
-#include "io/DataStream.h"
+#include "io/ServerSocket.h"
-struct flow : public ExecutionPlan {
- using ExecutionPlan::ExecutionPlan;
-};
-
-struct standalone_processor : public core::Processor {
- using core::Processor::Processor;
-};
-
-struct processor : public core::Processor {
- using core::Processor::Processor;
-};
-
-struct processor_session : public core::ProcessSession {
- using core::ProcessSession::ProcessSession;
-};
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace io {
-struct processor_context : public core::ProcessContext {
- using core::ProcessContext::ProcessContext;
+class RandomServerSocket : public ServerSocket {
+ public:
+ RandomServerSocket(const std::string& host = "localhost", uint16_t offset =
30000, uint16_t range = 10000, uint16_t retries = 100);
};
-struct cstream {
- org::apache::nifi::minifi::io::BaseStream * impl;
-};
+} /* namespace io */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
-#endif //NIFI_MINIFI_CPP_CXXSTRUCTS_H
+#endif //NIFI_MINIFI_CPP_RANDOMSERVERSOCKET_H
diff --git a/nanofi/include/core/cstructs.h b/nanofi/include/core/cstructs.h
index 7d258f7..d775747 100644
--- a/nanofi/include/core/cstructs.h
+++ b/nanofi/include/core/cstructs.h
@@ -22,6 +22,11 @@
#include <stddef.h>
#include <stdint.h>
+#ifdef _WIN32
+#define NOMINMAX
+#include <winsock2.h>
+#endif
+
#ifdef _MSC_VER
#define DEPRECATED __declspec(deprecated)
#elif defined(__GNUC__) | defined(__clang__)
@@ -136,6 +141,12 @@ typedef struct file_buffer {
uint64_t file_len;
} file_buffer;
-typedef struct cstream cstream;
+#ifndef _WIN32
+typedef int SOCKET;
+#endif
+
+typedef struct cstream {
+ SOCKET socket_;
+} cstream;
#endif /* LIBMINIFI_SRC_CAPI_CSTRUCTS_H_ */
diff --git a/nanofi/include/core/cxxstructs.h b/nanofi/include/core/cxxstructs.h
index b0b90eb..29f7f5a 100644
--- a/nanofi/include/core/cxxstructs.h
+++ b/nanofi/include/core/cxxstructs.h
@@ -43,8 +43,4 @@ struct processor_context : public core::ProcessContext {
using core::ProcessContext::ProcessContext;
};
-struct cstream {
- org::apache::nifi::minifi::io::BaseStream * impl;
-};
-
#endif //NIFI_MINIFI_CPP_CXXSTRUCTS_H
diff --git a/nanofi/include/sitetosite/CPeer.h
b/nanofi/include/sitetosite/CPeer.h
index 47bdb63..11e4ece 100644
--- a/nanofi/include/sitetosite/CPeer.h
+++ b/nanofi/include/sitetosite/CPeer.h
@@ -99,25 +99,20 @@ static void setPort(struct SiteToSiteCPeer * peer, uint16_t
port) {
}
}
-static void initPeer(struct SiteToSiteCPeer * peer, cstream * injected_socket,
const char * host, uint16_t port, const char * ifc) {
- peer->_stream = injected_socket;
- //peer->local_network_interface_= std::move(io::NetworkInterface(ifc,
nullptr));
+static void initPeer(struct SiteToSiteCPeer * peer, const char * host,
uint16_t port, const char * ifc) {
+ peer->_stream = NULL;
peer->_host = NULL;
peer->_url = NULL;
peer->_port = 0;
setHostName(peer, host);
setPort(peer, port);
-
- if(peer->_stream == NULL) {
- peer->_owns_resource = True;
- }
}
static void freePeer(struct SiteToSiteCPeer * peer) {
closePeer(peer);
setHostName(peer, NULL);
- if(peer->_owns_resource == True && peer->_stream != NULL) {
+ if(peer->_stream != NULL) {
free_socket(peer->_stream);
peer->_stream = NULL;
}
diff --git a/nanofi/include/sitetosite/CRawSocketProtocol.h
b/nanofi/include/sitetosite/CRawSocketProtocol.h
index f99b8bd..977b482 100644
--- a/nanofi/include/sitetosite/CRawSocketProtocol.h
+++ b/nanofi/include/sitetosite/CRawSocketProtocol.h
@@ -194,7 +194,7 @@ static void initRawClient(struct CRawSiteToSiteClient
*client, struct SiteToSite
static struct CRawSiteToSiteClient* createClient(const char * host, uint16_t
port, const char * nifi_port) {
struct SiteToSiteCPeer * peer = (struct SiteToSiteCPeer
*)malloc(sizeof(struct SiteToSiteCPeer));
- initPeer(peer, NULL, host, port, "");
+ initPeer(peer, host, port, "");
struct CRawSiteToSiteClient* client = (struct
CRawSiteToSiteClient*)malloc(sizeof(struct CRawSiteToSiteClient));
initRawClient(client, peer);
client->_owns_resource = True;
diff --git a/nanofi/include/sitetosite/CSiteToSite.h
b/nanofi/include/sitetosite/CSiteToSite.h
index 8728cef..0569f04 100644
--- a/nanofi/include/sitetosite/CSiteToSite.h
+++ b/nanofi/include/sitetosite/CSiteToSite.h
@@ -335,28 +335,22 @@ static int readData(CTransaction * transaction, uint8_t
*buf, int buflen) {
return ret;
}
-static int is_little_endian() {
- static unsigned int x = 1;
- static char *c = (char*) &x;
- return (*c == 1) ? 1 : 0;
-}
-
static int write_uint64t(CTransaction * transaction, uint64_t base_value) {
- const uint64_t value = is_little_endian() == 1 ? htonll_r(base_value) :
base_value;
+ const uint64_t value = htonll_r(base_value);
const uint8_t * buf = (uint8_t*)(&value);
return writeData(transaction, buf, sizeof(uint64_t));
}
static int write_uint32t(CTransaction * transaction, uint32_t base_value) {
- const uint32_t value = is_little_endian() == 1 ? htonl(base_value) :
base_value;
+ const uint32_t value = htonl(base_value);
const uint8_t * buf = (uint8_t*)(&value);
return writeData(transaction, buf, sizeof(uint32_t));
}
static int write_uint16t(CTransaction * transaction, uint16_t base_value) {
- const uint16_t value = is_little_endian() == 1 ? htons(base_value) :
base_value;
+ const uint16_t value = htons(base_value);
const uint8_t *buf = (uint8_t *) (&value);
return writeData(transaction, buf, sizeof(uint16_t));
@@ -391,7 +385,6 @@ static int write_UTF_len(CTransaction * transaction, const
char * str, size_t le
}
static int write_UTF(CTransaction * transaction, const char * str, enum Bool
widen) {
- //return transaction->_stream->writeUTF(str, widen);
return write_UTF_len(transaction, str, strlen(str), widen);
}
diff --git a/nanofi/src/core/cstream.c b/nanofi/src/core/cstream.c
new file mode 100644
index 0000000..3919728
--- /dev/null
+++ b/nanofi/src/core/cstream.c
@@ -0,0 +1,236 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifdef _WIN32
+#define NOMINMAX
+#include <winsock2.h>
+#else
+#include <sys/socket.h> // socket
+#include <arpa/inet.h> // inet_addr
+#include <netdb.h> // hostent
+#include <unistd.h> // close
+#endif
+
+#include <errno.h>
+#include <string.h>
+
+#include "api/nanofi.h"
+#include "core/cstructs.h"
+#include "core/cstream.h"
+#include "core/log.h"
+
+int write_uint32_t(uint32_t value, cstream * stream) {
+ value = htonl(value);
+ return write_buffer((uint8_t*)(&value), sizeof(uint32_t), stream);
+}
+
+int write_uint16_t(uint16_t value, cstream * stream) {
+ value = htons(value);
+ return write_buffer((uint8_t*)(&value), sizeof(uint16_t), stream);
+}
+
+int write_buffer(const uint8_t *value, int len, cstream * stream) {
+ int ret = 0, bytes = 0;
+
+ while (bytes < len) {
+ ret = send(stream->socket_, value + bytes, len - bytes, 0);
+ // check for errors
+ if (ret <= 0) {
+ if (ret < 0 && errno == EINTR) {
+ continue;
+ }
+ logc(err, "Could not send to %d, error: %s", stream->socket_,
strerror(errno));
+ close_stream(stream);
+ return ret;
+ }
+ bytes += ret;
+ }
+
+ if (bytes)
+ logc(trace, "Sent data size %d over socket %d", bytes, stream->socket_);
+ return bytes;
+}
+
+int read_buffer(uint8_t *buf, int len, cstream * stream) {
+ int32_t total_read = 0;
+ while (len) {
+ int bytes_read = recv(stream->socket_, buf, len, 0);
+ if (bytes_read <= 0) {
+ if (bytes_read == 0) {
+ logc(debug, "Other side hung up on %d",stream->socket_);
+ } else {
+ if (errno == EINTR) {
+ continue;
+ }
+ logc(err, "Could not recv on %d, error: %s", stream->socket_,
strerror(errno));
+ }
+ return -1;
+ }
+ len -= bytes_read;
+ buf += bytes_read;
+ total_read += bytes_read;
+ }
+ if(total_read)
+ logc(trace, "Received data size %d over socket %d", total_read,
stream->socket_);
+ return total_read;
+}
+
+int writeUTF(const char * cstr, uint64_t len, enum Bool widen, cstream *
stream) {
+ if (len > 65535) {
+ return -1;
+ }
+
+ int ret;
+ if (!widen) {
+ uint16_t shortlen = len;
+ ret = write_uint16_t(shortlen, stream);
+ } else {
+ ret = write_uint32_t(len, stream);
+ }
+
+ if(len == 0 || ret < 0) {
+ return ret;
+ }
+
+ const uint8_t *underlyingPtr = (const uint8_t *)cstr;
+
+ if (!widen) {
+ uint16_t short_length = len;
+ ret = write_buffer(underlyingPtr, short_length, stream);
+ } else {
+ ret = write_buffer(underlyingPtr, len, stream);
+ }
+ return ret;
+}
+
+int read_uint8_t(uint8_t *value, cstream * stream) {
+ uint8_t val;
+ int ret = read_buffer(&val, sizeof(uint8_t), stream);
+ if(ret == sizeof(uint8_t)) {
+ *value = val;
+ }
+ return ret;
+}
+int read_uint16_t(uint16_t *value, cstream * stream) {
+ uint16_t val;
+ int ret = read_buffer((uint8_t*)&val, sizeof(uint16_t), stream);
+ if(ret == sizeof(uint16_t)) {
+ *value = ntohs(val);
+ }
+ return ret;
+}
+int read_uint32_t(uint32_t *value, cstream * stream) {
+ uint32_t val;
+ int ret = read_buffer((uint8_t*)&val, sizeof(uint32_t), stream);
+ if(ret == sizeof(uint32_t)) {
+ *value = ntohl(val);
+ }
+ return ret;
+}
+
+int readUTFLen(uint32_t * utflen, cstream * stream) {
+ int ret = 1;
+ uint16_t shortLength = 0;
+ ret = read_uint16_t(&shortLength, stream);
+ if (ret > 0) {
+ *utflen = shortLength;
+ }
+ return ret;
+}
+
+int readUTF(char * buf, uint64_t buflen, cstream * stream) {
+ //return stream->impl->readData((uint8_t*)buf, buflen);
+ return read_buffer((uint8_t*)buf, buflen, stream);
+}
+
+void close_stream(cstream * stream) {
+ if(stream != NULL && stream->socket_ != -1) {
+#ifdef _WIN32
+ shutdown(stream->socket_, SD_BOTH);
+ closesocket(stream->socket_);
+ WSACleanup();
+#else
+ shutdown(stream->socket_, SHUT_RDWR);
+ close(stream->socket_);
+#endif
+ stream->socket_ = -1;
+ }
+}
+
+cstream * create_socket(const char * host, uint16_t portnum) {
+ logc(trace, "Creating socket to connect to: %s:%d", host, portnum);
+
+#ifdef _WIN32
+ WSADATA wsa;
+ if (WSAStartup(MAKEWORD(2,2),&wsa) != 0)
+ {
+ logc(err, "%s", "WSAStartup failed");
+ return NULL;
+ }
+#endif
+
+ struct addrinfo *result, *rp;
+ struct addrinfo hints;
+ memset(&hints, 0, sizeof(struct addrinfo));
+ hints.ai_family = AF_UNSPEC; /* Allow IPv4 or IPv6 */
+ hints.ai_socktype = SOCK_STREAM; /* Datagram socket */
+ hints.ai_flags = 0;
+ hints.ai_protocol = 0; /* Any protocol */
+
+ char portstr[6];
+ snprintf(portstr, 6, "%d", portnum);
+
+ if (getaddrinfo(host, portstr, &hints, &result) != 0) {
+ logc(err, "%s%s", "Failed to resolve hostname: ", host);
+ return NULL;
+ }
+
+ SOCKET sock;
+
+ for (rp = result; rp != NULL; rp = rp->ai_next) {
+ sock = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol);
+ if (sock == -1) {
+ continue;
+ }
+
+ if (connect(sock, rp->ai_addr, rp->ai_addrlen) != -1) {
+ break;
+ }
+
+ close(sock);
+ }
+
+ freeaddrinfo(result);
+
+ if (rp == NULL) {
+ logc(err, "Failed to connect to %s:%u", host, portnum);
+ return NULL;
+ }
+
+ cstream *stream = (cstream *) malloc(sizeof(cstream));
+ stream->socket_ = sock;
+ logc(debug, "%s", "Socket successfully connected");
+ return stream;
+}
+
+void free_socket(cstream * stream) {
+ if(stream != NULL) {
+ close_stream(stream);
+ free(stream);
+ }
+}
diff --git a/nanofi/src/core/cstream.cpp b/nanofi/src/core/cstream.cpp
deleted file mode 100644
index a463437..0000000
--- a/nanofi/src/core/cstream.cpp
+++ /dev/null
@@ -1,164 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "api/nanofi.h"
-#include "core/cstructs.h"
-#include "core/cxxstructs.h"
-#include "core/cstream.h"
-#include "io/BaseStream.h"
-#include "io/DataStream.h"
-#include "io/ClientSocket.h"
-#include "cxx/Instance.h"
-
-int write_uint64_t(uint64_t value, cstream * stream) {
- return stream->impl->Serializable::write(value, stream->impl);
-}
-int write_uint32_t(uint32_t value, cstream * stream) {
- return stream->impl->Serializable::write(value, stream->impl);
-}
-int write_uint16_t(uint16_t value, cstream * stream) {
- return stream->impl->Serializable::write(value, stream->impl);
-}
-int write_uint8_t(uint8_t value, cstream * stream) {
- return stream->impl->Serializable::write(value, stream->impl);
-}
-int write_char(char value, cstream * stream) {
- return stream->impl->Serializable::write(value, stream->impl);
-}
-int write_buffer(const uint8_t *value, int len, cstream * stream) {
- int ret_val = stream->impl->Serializable::write(const_cast<uint8_t
*>(value), len, stream->impl);
- return ret_val;
-}
-
-int writeUTF(const char * cstr, uint64_t len, Bool widen, cstream * stream) {
- std::string str(cstr, len);
- return stream->impl->Serializable::writeUTF(str, stream->impl, widen ==
True);
-}
-
-int read_char(char *value, cstream * stream) {
- char val;
- int ret = stream->impl->Serializable::read(val, stream->impl);
- if(ret == sizeof(char)) {
- *value = val;
- }
- return ret;
-}
-int read_uint8_t(uint8_t *value, cstream * stream) {
- uint8_t val;
- int ret = stream->impl->Serializable::read(val, stream->impl);
- if(ret == sizeof(uint8_t)) {
- *value = val;
- }
- return ret;
-}
-int read_uint16_t(uint16_t *value, cstream * stream) {
- uint16_t val;
- int ret = stream->impl->Serializable::read(val, stream->impl);
- if(ret == sizeof(uint16_t)) {
- *value = val;
- }
- return ret;
-}
-int read_uint32_t(uint32_t *value, cstream * stream) {
- uint32_t val;
- int ret = stream->impl->Serializable::read(val, stream->impl);
- if(ret == sizeof(uint32_t)) {
- *value = val;
- }
- return ret;
-}
-int read_uint64_t(uint64_t *value, cstream * stream) {
- uint64_t val;
- int ret = stream->impl->Serializable::read(val, stream->impl);
- if(ret == sizeof(uint64_t)) {
- *value = val;
- }
- return ret;
-}
-
-int read_buffer(uint8_t *value, int len, cstream * stream) {
- return stream->impl->Serializable::read(value, len, stream->impl);
-}
-
-int readUTFLen(uint32_t * utflen, cstream * stream) {
- int ret = 1;
- uint16_t shortLength = 0;
- ret = read_uint16_t(&shortLength, stream);
- if (ret > 0) {
- *utflen = shortLength;
- }
- return ret;
-}
-
-int readUTF(char * buf, uint64_t buflen, cstream * stream) {
- return stream->impl->readData((uint8_t*)buf, buflen);
-}
-
-void close_stream(cstream * stream) {
- if(stream != NULL && stream->impl != NULL) {
- stream->impl->closeStream();
- }
-}
-
-int open_stream(cstream * stream) {
- if(stream != NULL && stream->impl != NULL) {
- return stream->impl->initialize();
- }
- return -1;
-}
-
-cstream * create_socket(const char * host, uint16_t portnum) {
- nifi_port nport;
-
- char random_port[6] = "65443";
-
- nport.port_id = random_port;
-
- nifi_instance *instance = create_instance(host, &nport);
-
- auto minifi_instance_ref =
static_cast<minifi::Instance*>(instance->instance_ptr);
-
- auto stream_factory_ =
minifi::io::StreamFactory::getInstance(minifi_instance_ref->getConfiguration());
-
- cstream * stream = (cstream*)malloc(sizeof(cstream));
-
- auto socket = stream_factory_->createSocket(host, portnum);
-
- free_instance(instance);
-
- if(socket) {
- if(socket->initialize() == 0) {
- stream->impl = socket.release();
- return stream;
- }
- }
- return NULL;
-}
-
-void free_socket(cstream * stream) {
- if(stream != NULL) {
- if(stream->impl != NULL) {
- auto socket = static_cast<minifi::io::Socket*>(stream->impl);
- if(socket) {
- socket->closeStream();
- delete socket; //This is ugly, but only sockets get deleted this way
- }
- }
- free(stream);
- }
-}
diff --git a/nanofi/src/sitetosite/CPeer.c b/nanofi/src/sitetosite/CPeer.c
index fb1f417..70ecede 100644
--- a/nanofi/src/sitetosite/CPeer.c
+++ b/nanofi/src/sitetosite/CPeer.c
@@ -28,7 +28,7 @@ int openPeer(struct SiteToSiteCPeer * peer) {
}
//In case there was no socket injected, let's create it
- if(peer->_stream == NULL && peer->_owns_resource == True) {
+ if(peer->_stream == NULL) {
peer->_stream = create_socket(peer->_host, peer->_port);
if(peer->_stream == NULL) {
logc(err, "%s", "failed to open socket");
@@ -44,11 +44,6 @@ int openPeer(struct SiteToSiteCPeer * peer) {
// TODO: support provided interface
- if(open_stream(peer->_stream) != 0) {
- logc(err, "%s", "failed to open stream");
- return -1;
- }
-
uint16_t data_size = sizeof MAGIC_BYTES;
if(write_buffer((uint8_t*)MAGIC_BYTES, data_size, peer->_stream) !=
data_size) {
diff --git a/nanofi/src/sitetosite/CRawSocketProtocol.c
b/nanofi/src/sitetosite/CRawSocketProtocol.c
index e89a36c..73741f2 100644
--- a/nanofi/src/sitetosite/CRawSocketProtocol.c
+++ b/nanofi/src/sitetosite/CRawSocketProtocol.c
@@ -187,6 +187,7 @@ int handShake(struct CRawSiteToSiteClient * client) {
ret = readResponse(client, &code);
if (ret <= 0) {
+ logc(err, "failed to receive response code from server");
return -1;
}
@@ -722,7 +723,8 @@ int confirm(struct CRawSiteToSiteClient * client, const
char * transactionID) {
if(content_size > 0 && ff->crp != NULL) {
content_buf = (uint8_t*)malloc(content_size*sizeof(uint8_t));
- if(get_content(ff, content_buf, content_size) <= 0) {
+ len = get_content(ff, content_buf, content_size);
+ if(len <= 0) {
return -2;
}
ret = write_uint64t(transaction, len);
@@ -733,7 +735,7 @@ int confirm(struct CRawSiteToSiteClient * client, const
char * transactionID) {
writeData(transaction, content_buf, len);
}
- } else if (strlen(packet->payload_) > 0) {
+ } else if (packet->payload_ != NULL && strlen(packet->payload_) > 0) {
len = strlen(packet->payload_);
ret = write_uint64t(transaction, len);
diff --git a/nanofi/tests/CSite2SiteTests.cpp b/nanofi/tests/CSite2SiteTests.cpp
index 4b37de2..cc2902f 100644
--- a/nanofi/tests/CSite2SiteTests.cpp
+++ b/nanofi/tests/CSite2SiteTests.cpp
@@ -18,31 +18,59 @@
#include <stdlib.h>
-#include <uuid/uuid.h>
#include <algorithm>
+#include <chrono>
#include <string>
#include <memory>
#include <utility>
#include <map>
+#include <thread>
+
#include "io/BaseStream.h"
#include "TestBase.h"
#include "unit/SiteToSiteHelper.h"
#include "sitetosite/CPeer.h"
#include "sitetosite/CRawSocketProtocol.h"
#include "sitetosite/CSiteToSite.h"
-#include <algorithm>
-#include <core/cxxstructs.h>
+#include "sitetosite/RawSocketProtocol.h"
+#include "uuid/uuid.h"
+#include "core/cstructs.h"
+#include "RandomServerSocket.h"
+#include "core/log.h"
#define FMT_DEFAULT fmt_lower
-TEST_CASE("TestSetPortId", "[S2S1]") {
- auto stream_ptr = std::unique_ptr<minifi::io::BaseStream>(new
org::apache::nifi::minifi::io::BaseStream());
-
- cstream cstrm;
- cstrm.impl = stream_ptr.get();
+const char * ATTR_NAME = "some_key";
+const char * ATTR_VALUE = "some value";
+const char * PAYLOAD = "Test MiNiFi payload";
+const char * PAYLOAD_CRC = "2006463717"; // Depends on both payload and
attributes, do NOT change manually!
+std::string CODEC_NAME = "StandardFlowFileCodec";
+
+struct S2SReceivedData {
+ bool request_type_ok = false;
+ std::string magic_string;
+ uint32_t attr_num = 0;
+ std::map<std::string, std::string> attributes;
+ std::vector<uint8_t> payload;
+};
+
+// This struct is used to sync between simulated clients and servers
+struct TransferState {
+ std::atomic<bool> handshake_data_processed{false};
+ std::atomic<bool> transer_completed{false};
+ std::atomic<bool> data_processed{false};
+
+};
+
+void wait_until(std::atomic<bool>& b) {
+ while(!b) {
+ std::this_thread::sleep_for(std::chrono::milliseconds(0)); //Just yield
+ }
+}
+TEST_CASE("TestSetPortId", "[S2S1]") {
SiteToSiteCPeer peer;
- initPeer(&peer, &cstrm, "fake_host", 65433, "");
+ initPeer(&peer, "fake_host", 65433, "");
CRawSiteToSiteClient * protocol =
(CRawSiteToSiteClient*)malloc(sizeof(CRawSiteToSiteClient));
initRawClient(protocol, &peer);
@@ -60,164 +88,145 @@ TEST_CASE("TestSetPortId", "[S2S1]") {
free(protocol);
}
-TEST_CASE("TestSetPortIdUppercase", "[S2S2]") {
- auto stream_ptr = std::unique_ptr<minifi::io::BaseStream>(new
org::apache::nifi::minifi::io::BaseStream());
-
- cstream cstrm;
- cstrm.impl = stream_ptr.get();
-
- SiteToSiteCPeer peer;
- initPeer(&peer, &cstrm, "fake_host", 65433, "");
- CRawSiteToSiteClient * protocol =
(CRawSiteToSiteClient*)malloc(sizeof(CRawSiteToSiteClient));
-
- initRawClient(protocol, &peer);
-
- std::string uuid_str = "C56A4180-65AA-42EC-A945-5FD21DEC0538";
-
- //setPortId(protocol, uuid_str.c_str());
-
- //REQUIRE(uuid_str != getPortId(protocol));
-
- std::transform(uuid_str.begin(), uuid_str.end(), uuid_str.begin(),
::tolower);
-
- //REQUIRE(uuid_str == std::string(getPortId(protocol)));
-
- tearDown(protocol);
-
- freePeer(&peer);
-
- free(protocol);
+void send_response_code(minifi::io::BaseStream* stream, uint8_t resp) {
+ std::array<uint8_t, 3> resp_codes = {'R', 'C', resp};
+ for(uint8_t r : resp_codes) {
+ stream->write(&r, 1);
+ }
}
-void sunny_path_bootstrap(SiteToSiteResponder *collector) {
- char a = 0x14; // RESOURCE_OK
- std::string resp_code;
- resp_code.insert(resp_code.begin(), a);
- collector->push_response(resp_code);
-
- // Handshake respond code
- resp_code = "R";
- collector->push_response(resp_code);
- resp_code = "C";
- collector->push_response(resp_code);
- char b = 0x1;
- resp_code = b;
- collector->push_response(resp_code);
-
- // Codec Negotiation
- resp_code = a;
- collector->push_response(resp_code);
+void accept_transfer(minifi::io::BaseStream* stream, const std::string&
crcstr, TransferState& transfer_state, S2SReceivedData& s2s_data) {
+ //In long term it would be nice to calculate the crc of the received data
here
+ send_response_code(stream, 12); // confirmed
+ stream->writeUTF(crcstr);
+ send_response_code(stream, 13); // transaction finished
+
+ wait_until(transfer_state.transer_completed);
+
+ std::string requesttype;
+ stream->readUTF(requesttype);
+
+ if(requesttype == "SEND_FLOWFILES") {
+ s2s_data.request_type_ok = true;
+ stream->read(s2s_data.attr_num);
+ std::string key, value;
+ for(int i = 0; i < s2s_data.attr_num; ++i) {
+ stream->readUTF(key, true);
+ stream->readUTF(value, true);
+ s2s_data.attributes[key] = value;
+ }
+ uint64_t content_size=0;
+ stream->read(content_size);
+ s2s_data.payload.resize(content_size);
+ stream->readData(s2s_data.payload, content_size);
+ } else {
+ s2s_data.request_type_ok = false;
+ }
+ transfer_state.data_processed = true;
}
-TEST_CASE("TestSiteToSiteVerifySend", "[S2S3]") {
-
- SiteToSiteResponder *collector = new SiteToSiteResponder();
- sunny_path_bootstrap(collector);
+void sunny_path_bootstrap(minifi::io::BaseStream* stream, TransferState&
transfer_state, S2SReceivedData& s2s_data) {
+ //Verify the magic string
+ char c_array[4];
+ stream->readData((uint8_t*)c_array, 4);
+ s2s_data.magic_string = std::string(c_array, 4);
+ uint8_t success = 0x14;
+ stream->write(&success, 1);
+ send_response_code(stream, 0x1);
+ stream->write(&success, 1);
+
+ //just consume handshake data
+ bool found_codec = false;
+ int read_len = 0;
+ while(!found_codec) {
+ uint8_t handshake_data[1000];
+ int actual_len = stream->readData(handshake_data+read_len, 1000-read_len);
+ if(actual_len <= 0) {
+ continue;
+ }
+ read_len += actual_len;
+ std::string incoming_data(reinterpret_cast<const char *>(handshake_data),
read_len);
+ auto it = std::search(incoming_data.begin(), incoming_data.end(),
CODEC_NAME.begin(), CODEC_NAME.end());
+ if(it != incoming_data.end()){
+ size_t idx = std::distance(incoming_data.begin(), it);
+ //Actual version follows the string as an uint32_t // that should be the
end of the buffer
+ found_codec = idx + CODEC_NAME.length() + sizeof(uint32_t) == read_len;
+ }
+ }
+
+ transfer_state.handshake_data_processed = true;
+}
- auto stream_ptr = std::unique_ptr<minifi::io::BaseStream>(new
org::apache::nifi::minifi::io::BaseStream(collector));
+void different_version_bootstrap(minifi::io::BaseStream* stream,
TransferState& transfer_state, S2SReceivedData& s2s_data) {
+ uint8_t resp_code = 0x15;
+ stream->write(&resp_code, 1);
- cstream cstrm;
- cstrm.impl = stream_ptr.get();
+ uint32_t version = 4;
+ stream->write(version);
- SiteToSiteCPeer peer;
- initPeer(&peer, &cstrm, "fake_host", 65433, "");
+ sunny_path_bootstrap(stream, transfer_state, s2s_data);
+}
- CRawSiteToSiteClient * protocol =
(CRawSiteToSiteClient*)malloc(sizeof(CRawSiteToSiteClient));
+TEST_CASE("TestSiteToBootStrap", "[S2S3]") {
- initRawClient(protocol, &peer);
+ std::array<std::function<void(minifi::io::BaseStream*, TransferState&,
S2SReceivedData&)>, 2> bootstrap_functions =
+ {sunny_path_bootstrap, different_version_bootstrap};
- std::string uuid_str = "C56A4180-65AA-42EC-A945-5FD21DEC0538";
+ for(const auto& bootstrap_func : bootstrap_functions) {
+ TransferState transfer_state;
+ S2SReceivedData received_data;
+ std::unique_ptr<minifi::io::ServerSocket> sckt(new
minifi::io::RandomServerSocket("localhost"));
+ uint16_t port = sckt->getPort();
- setPortId(protocol, uuid_str.c_str());
+ sckt->registerCallback([]() -> bool { return true; }, [&bootstrap_func,
&transfer_state, &received_data](minifi::io::BaseStream* stream)
+ {bootstrap_func(stream, transfer_state, received_data);
accept_transfer(stream, PAYLOAD_CRC, transfer_state, received_data); } );
- REQUIRE(0 == bootstrap(protocol));
-
- REQUIRE(collector->get_next_client_response() == "NiFi");
- collector->get_next_client_response();
- REQUIRE(collector->get_next_client_response() == "SocketFlowFileProtocol");
- collector->get_next_client_response();
- collector->get_next_client_response();
- collector->get_next_client_response();
- collector->get_next_client_response();
- REQUIRE(collector->get_next_client_response() == "nifi://fake_host:65433");
- collector->get_next_client_response();
- collector->get_next_client_response();
- REQUIRE(collector->get_next_client_response() == "GZIP");
- collector->get_next_client_response();
- REQUIRE(collector->get_next_client_response() == "false");
- collector->get_next_client_response();
- REQUIRE(collector->get_next_client_response() == "PORT_IDENTIFIER");
- collector->get_next_client_response();
-
- std::string temp_val = collector->get_next_client_response();
- std::transform(temp_val.begin(), temp_val.end(), temp_val.begin(),
::tolower);
-
- REQUIRE(temp_val == "c56a4180-65aa-42ec-a945-5fd21dec0538");
- collector->get_next_client_response();
- REQUIRE(collector->get_next_client_response() ==
"REQUEST_EXPIRATION_MILLIS");
- collector->get_next_client_response();
- REQUIRE(collector->get_next_client_response() == "30000");
- collector->get_next_client_response();
- REQUIRE(collector->get_next_client_response() == "NEGOTIATE_FLOWFILE_CODEC");
- collector->get_next_client_response();
- REQUIRE(collector->get_next_client_response() == "StandardFlowFileCodec");
- collector->get_next_client_response(); // codec version
-
- // start to send the stuff
- // Create the transaction
- const char * transactionID;
- const char * payload = "Test MiNiFi payload";
- CTransaction* transaction;
- transaction = createTransaction(protocol, SEND);
- REQUIRE(transaction != NULL);
- transactionID = getUUIDStr(transaction);
- collector->get_next_client_response();
- REQUIRE(collector->get_next_client_response() == "SEND_FLOWFILES");
- attribute_set as;
- as.size = 0;
- as.attributes = NULL;
- CDataPacket packet;
- initPacket(&packet, transaction, &as, payload);
- REQUIRE(sendPacket(protocol, transactionID, &packet, nullptr) == 0);
- collector->get_next_client_response();
- collector->get_next_client_response();
-
- std::string rx_payload = collector->get_next_client_response();
- REQUIRE(payload == rx_payload);
+ bool c_handshake_ok = false;
+ bool c_transfer_ok = false;
- freePeer(&peer);
+ auto c_client_thread = [&transfer_state, &c_handshake_ok, &c_transfer_ok,
port]() {
+ SiteToSiteCPeer cpeer;
+ initPeer(&cpeer, "localhost", port, "");
- free(protocol);
-}
+ CRawSiteToSiteClient cprotocol;
-TEST_CASE("TestSiteToSiteVerifyNegotiationFail", "[S2S4]") {
- SiteToSiteResponder *collector = new SiteToSiteResponder();
+ initRawClient(&cprotocol, &cpeer);
- char a = 0xFF;
- std::string resp_code;
- resp_code.insert(resp_code.begin(), a);
- collector->push_response(resp_code);
- collector->push_response(resp_code);
+ std::string uuid_str = "C56A4180-65AA-42EC-A945-5FD21DEC0538";
- auto stream_ptr = std::unique_ptr<minifi::io::BaseStream>(new
org::apache::nifi::minifi::io::BaseStream(collector));
+ c_handshake_ok = bootstrap(&cprotocol) == 0;
- cstream cstrm;
- cstrm.impl = stream_ptr.get();
+ const char * payload = PAYLOAD;
- SiteToSiteCPeer peer;
- initPeer(&peer, &cstrm, "fake_host", 65433, "");
+ attribute attribute1;
- CRawSiteToSiteClient * protocol =
(CRawSiteToSiteClient*)malloc(sizeof(CRawSiteToSiteClient));
+ attribute1.key = ATTR_NAME;
+ const char * attr_value = ATTR_VALUE;
+ attribute1.value = (void *)attr_value;
+ attribute1.value_size = strlen(attr_value);
- initRawClient(protocol, &peer);
+ attribute_set as;
+ as.size = 1;
+ as.attributes = &attribute1;
- std::string uuid_str = "C56A4180-65AA-42EC-A945-5FD21DEC0538";
+ wait_until(transfer_state.handshake_data_processed);
+ c_transfer_ok = (transmitPayload(&cprotocol, payload, &as) == 0);
+ transfer_state.transer_completed = true;
- setPortId(protocol, uuid_str.c_str());
+ destroyClient(&cprotocol);
+ };
- REQUIRE(-1 == bootstrap(protocol));
+ std::thread c_thread(c_client_thread);
+ c_thread.join();
+ wait_until(transfer_state.data_processed);
- freePeer(&peer);
+ REQUIRE(c_handshake_ok == true);
+ REQUIRE(c_transfer_ok == true);
- free(protocol);
-}
+ REQUIRE(received_data.magic_string == "NiFi");
+ REQUIRE(received_data.request_type_ok);
+ REQUIRE(received_data.attr_num == 1);
+ REQUIRE(received_data.attributes[ATTR_NAME] == ATTR_VALUE);
+ REQUIRE(std::string(reinterpret_cast<const
char*>(received_data.payload.data()), received_data.payload.size()) == PAYLOAD);
+ }
+}
\ No newline at end of file