Repository: nifi-minifi-cpp Updated Branches: refs/heads/master 4d20f840b -> 0c58dcf6d
MINIFICPP-665: Add reference checks for self MINIFICPP-665: Fix tests after rebase This closes #435. Approved by arpadboda on GH. Signed-off-by: Marc Parisi <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/0c58dcf6 Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/0c58dcf6 Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/0c58dcf6 Branch: refs/heads/master Commit: 0c58dcf6d35650af3f2c41d8392223499478f2b5 Parents: 4d20f84 Author: Marc Parisi <[email protected]> Authored: Sat Nov 10 21:06:08 2018 -0500 Committer: Marc Parisi <[email protected]> Committed: Wed Nov 14 12:33:35 2018 -0500 ---------------------------------------------------------------------- libminifi/include/io/BaseStream.h | 27 +++++++----- libminifi/src/io/BaseStream.cpp | 46 +++++++++++++------- libminifi/test/TestBase.h | 15 +++++-- libminifi/test/unit/StreamTests.cpp | 75 ++++++++++++++++++++++++++++++++ nanofi/tests/CAPITests.cpp | 12 +++-- 5 files changed, 141 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0c58dcf6/libminifi/include/io/BaseStream.h ---------------------------------------------------------------------- diff --git a/libminifi/include/io/BaseStream.h b/libminifi/include/io/BaseStream.h index fa6fe1b..8311a30 100644 --- a/libminifi/include/io/BaseStream.h +++ b/libminifi/include/io/BaseStream.h @@ -23,6 +23,7 @@ #include "EndianCheck.h" #include "DataStream.h" #include "Serializable.h" +#include "core/expect.h" namespace org { namespace apache { @@ -31,7 +32,10 @@ namespace minifi { namespace io { /** - * Base Stream. Not intended to be thread safe as it is not intended to be shared + * Base Stream is the base of a composable stream architecture. + * Intended to be the base of layered streams ala DatInputStreams in Java. + * + * ** Not intended to be thread safe as it is not intended to be shared** * * Extensions may be thread safe and thus shareable, but that is up to the implementation. */ @@ -61,7 +65,7 @@ class BaseStream : public DataStream, public Serializable { virtual int writeData(uint8_t *value, int size); virtual void seek(uint64_t offset) { - if (composable_stream_ != this) { + if (LIKELY(composable_stream_ != this)) { composable_stream_->seek(offset); } else { DataStream::seek(offset); @@ -172,15 +176,14 @@ class BaseStream : public DataStream, public Serializable { virtual int read(uint64_t &value, bool is_little_endian = EndiannessCheck::IS_LITTLE); virtual const uint64_t getSize() const { - if (composable_stream_ == this){ - return buffer.size(); - } - else{ - return composable_stream_->getSize(); - } - + if (LIKELY(composable_stream_ == this)) { + return buffer.size(); + } else { + return composable_stream_->getSize(); } + } + /** * read UTF from stream * @param str reference string @@ -189,8 +192,12 @@ class BaseStream : public DataStream, public Serializable { **/ virtual int readUTF(std::string &str, bool widen = false); protected: + /** + * Changed to private to facilitate easier management of composable_stream_ and make it immutable + */ DataStream *composable_stream_; -}; +} +; } /* namespace io */ } /* namespace minifi */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0c58dcf6/libminifi/src/io/BaseStream.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/io/BaseStream.cpp b/libminifi/src/io/BaseStream.cpp index a750586..bcd9111 100644 --- a/libminifi/src/io/BaseStream.cpp +++ b/libminifi/src/io/BaseStream.cpp @@ -19,6 +19,7 @@ #include <vector> #include <string> #include "io/Serializable.h" +#include "core/expect.h" namespace org { namespace apache { @@ -33,11 +34,11 @@ namespace io { * @return resulting write size **/ int BaseStream::write(uint32_t base_value, bool is_little_endian) { - return Serializable::write(base_value, reinterpret_cast<DataStream*>(composable_stream_), is_little_endian); + return Serializable::write(base_value, composable_stream_, is_little_endian); } int BaseStream::writeData(uint8_t *value, int size) { - if (composable_stream_ == this) { + if (LIKELY(composable_stream_ == this)) { return DataStream::writeData(value, size); } else { return composable_stream_->writeData(value, size); @@ -52,7 +53,7 @@ int BaseStream::writeData(uint8_t *value, int size) { * @return resulting write size **/ int BaseStream::write(uint16_t base_value, bool is_little_endian) { - return Serializable::write(base_value, reinterpret_cast<DataStream*>(composable_stream_), is_little_endian); + return Serializable::write(base_value, composable_stream_, is_little_endian); } /** @@ -63,7 +64,7 @@ int BaseStream::write(uint16_t base_value, bool is_little_endian) { * @return resulting write size **/ int BaseStream::write(uint8_t *value, int len) { - return Serializable::write(value, len, reinterpret_cast<DataStream*>(composable_stream_)); + return Serializable::write(value, len, composable_stream_); } /** @@ -74,7 +75,7 @@ int BaseStream::write(uint8_t *value, int len) { * @return resulting write size **/ int BaseStream::write(uint64_t base_value, bool is_little_endian) { - return Serializable::write(base_value, reinterpret_cast<DataStream*>(composable_stream_), is_little_endian); + return Serializable::write(base_value, composable_stream_, is_little_endian); } /** @@ -84,7 +85,7 @@ int BaseStream::write(uint64_t base_value, bool is_little_endian) { **/ int BaseStream::write(bool value) { uint8_t v = value; - return Serializable::write(v, reinterpret_cast<DataStream*>(composable_stream_)); + return Serializable::write(v, composable_stream_); } /** @@ -93,7 +94,7 @@ int BaseStream::write(bool value) { * @return resulting write size **/ int BaseStream::writeUTF(std::string str, bool widen) { - return Serializable::writeUTF(str, reinterpret_cast<DataStream*>(composable_stream_), widen); + return Serializable::writeUTF(str, composable_stream_, widen); } /** @@ -103,7 +104,7 @@ int BaseStream::writeUTF(std::string str, bool widen) { * @return resulting read size **/ int BaseStream::read(uint8_t &value) { - return Serializable::read(value, reinterpret_cast<DataStream*>(composable_stream_)); + return Serializable::read(value, composable_stream_); } /** @@ -113,7 +114,10 @@ int BaseStream::read(uint8_t &value) { * @return resulting read size **/ int BaseStream::read(uint16_t &base_value, bool is_little_endian) { - return Serializable::read(base_value, reinterpret_cast<DataStream*>(composable_stream_)); + if (LIKELY(composable_stream_ == this)) + return DataStream::read(base_value, is_little_endian); + else + return Serializable::read(base_value, composable_stream_); } /** @@ -123,7 +127,7 @@ int BaseStream::read(uint16_t &base_value, bool is_little_endian) { * @return resulting read size **/ int BaseStream::read(char &value) { - return Serializable::read(value, reinterpret_cast<DataStream*>(composable_stream_)); + return Serializable::read(value, composable_stream_); } /** @@ -134,7 +138,7 @@ int BaseStream::read(char &value) { * @return resulting read size **/ int BaseStream::read(uint8_t *value, int len) { - return Serializable::read(value, len, reinterpret_cast<DataStream*>(composable_stream_)); + return Serializable::read(value, len, composable_stream_); } /** @@ -143,7 +147,7 @@ int BaseStream::read(uint8_t *value, int len) { * @param buflen */ int BaseStream::readData(std::vector<uint8_t> &buf, int buflen) { - return Serializable::read(&buf[0], buflen, reinterpret_cast<DataStream*>(composable_stream_)); + return Serializable::read(&buf[0], buflen, composable_stream_); } /** * Reads data and places it into buf @@ -151,7 +155,11 @@ int BaseStream::readData(std::vector<uint8_t> &buf, int buflen) { * @param buflen */ int BaseStream::readData(uint8_t *buf, int buflen) { - return Serializable::read(buf, buflen, reinterpret_cast<DataStream*>(composable_stream_)); + if (LIKELY(composable_stream_ == this)) { + return DataStream::readData(buf, buflen); + } else { + return Serializable::read(buf, buflen, composable_stream_); + } } /** @@ -161,7 +169,10 @@ int BaseStream::readData(uint8_t *buf, int buflen) { * @return resulting read size **/ int BaseStream::read(uint32_t &value, bool is_little_endian) { - return Serializable::read(value, reinterpret_cast<DataStream*>(composable_stream_), is_little_endian); + if (LIKELY(composable_stream_ == this)) + return DataStream::read(value, is_little_endian); + else + return Serializable::read(value, composable_stream_, is_little_endian); } /** @@ -171,7 +182,10 @@ int BaseStream::read(uint32_t &value, bool is_little_endian) { * @return resulting read size **/ int BaseStream::read(uint64_t &value, bool is_little_endian) { - return Serializable::read(value, reinterpret_cast<DataStream*>(composable_stream_), is_little_endian); + if (LIKELY(composable_stream_ == this)) + return DataStream::read(value, is_little_endian); + else + return Serializable::read(value, composable_stream_, is_little_endian); } /** @@ -181,7 +195,7 @@ int BaseStream::read(uint64_t &value, bool is_little_endian) { * @return resulting read size **/ int BaseStream::readUTF(std::string &str, bool widen) { - return Serializable::readUTF(str, reinterpret_cast<DataStream*>(composable_stream_), widen); + return Serializable::readUTF(str, composable_stream_, widen); } } /* namespace io */ } /* namespace minifi */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0c58dcf6/libminifi/test/TestBase.h ---------------------------------------------------------------------- diff --git a/libminifi/test/TestBase.h b/libminifi/test/TestBase.h index 4792011..42e7798 100644 --- a/libminifi/test/TestBase.h +++ b/libminifi/test/TestBase.h @@ -256,7 +256,7 @@ class TestController { for (auto dir : directories) { DIR *created_dir; struct dirent *dir_entry; - created_dir = opendir(dir); + created_dir = opendir(dir.c_str()); if (created_dir != NULL) { while ((dir_entry = readdir(created_dir)) != NULL) { if (dir_entry->d_name[0] != '.') { @@ -270,13 +270,21 @@ class TestController { closedir(created_dir); } - rmdir(dir); + rmdir(dir.c_str()); } } + /** + * format will be changed by mkdtemp, so don't rely on a shared variable. + */ char *createTempDirectory(char *format) { char *dir = mkdtemp(format); + if (NULL == dir){ + perror("mkdtemp failed: "); + } directories.push_back(dir); + // TODO: return const char or don't return char* at all and use the format passed in as mkdtemp + // but I'm inclined to keep as-is for the time being. return dir; } @@ -285,10 +293,9 @@ class TestController { std::shared_ptr<minifi::state::response::FlowVersion> flow_version_; std::mutex test_mutex; - //std::map<std::string,> LogTestController &log; - std::vector<char*> directories; + std::vector<std::string> directories; }; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0c58dcf6/libminifi/test/unit/StreamTests.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/unit/StreamTests.cpp b/libminifi/test/unit/StreamTests.cpp new file mode 100644 index 0000000..c01e68a --- /dev/null +++ b/libminifi/test/unit/StreamTests.cpp @@ -0,0 +1,75 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <thread> +#include <random> +#include <chrono> +#include <vector> +#include <string> +#include <memory> +#include <utility> +#include "../TestBase.h" +#include "io/BaseStream.h" + +TEST_CASE("TestReadData", "[testread]") { + auto base = std::make_shared<minifi::io::BaseStream>(); + uint64_t b = 8; + base->write(b); + uint64_t c = 0; + base->readData(reinterpret_cast<uint8_t*>(&c), 8); + if (minifi::io::EndiannessCheck::IS_LITTLE) + REQUIRE(c == 576460752303423488); + else + REQUIRE(c == 8); +} + +TEST_CASE("TestRead8", "[testread]") { + auto base = std::make_shared<minifi::io::BaseStream>(); + uint64_t b = 8; + base->write(b); + uint64_t c = 0; + base->read(c); + REQUIRE(c == 8); +} + +TEST_CASE("TestRead2", "[testread]") { + auto base = std::make_shared<minifi::io::BaseStream>(); + uint16_t b = 8; + base->write(b); + uint16_t c = 0; + base->read(c); + REQUIRE(c == 8); +} + +TEST_CASE("TestRead1", "[testread]") { + auto base = std::make_shared<minifi::io::BaseStream>(); + uint8_t b = 8; + base->write(&b, 1); + uint8_t c = 0; + base->read(c); + REQUIRE(c == 8); +} + +TEST_CASE("TestRead4", "[testread]") { + auto base = std::make_shared<minifi::io::BaseStream>(); + uint32_t b = 8; + base->write(b); + uint32_t c = 0; + base->read(c); + REQUIRE(c == 8); +} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0c58dcf6/nanofi/tests/CAPITests.cpp ---------------------------------------------------------------------- diff --git a/nanofi/tests/CAPITests.cpp b/nanofi/tests/CAPITests.cpp index 54eae0e..1c94b87 100644 --- a/nanofi/tests/CAPITests.cpp +++ b/nanofi/tests/CAPITests.cpp @@ -31,8 +31,6 @@ #include <thread> #include "api/nanofi.h" -char src_format[] = "/tmp/gt.XXXXXX"; -char put_format[] = "/tmp/pt.XXXXXX"; std::string test_file_content = "C API raNdOMcaSe test d4t4 th1s is!"; std::string test_file_name = "tstFile.ext"; @@ -110,6 +108,8 @@ TEST_CASE("Set valid and invalid properties", "[setProcesssorProperties]") { TEST_CASE("get file and put file", "[getAndPutFile]") { TestController testController; + char src_format[] = "/tmp/gt.XXXXXX"; + char put_format[] = "/tmp/pt.XXXXXX"; const char *sourcedir = testController.createTempDirectory(src_format); const char *putfiledir = testController.createTempDirectory(put_format); auto instance = create_instance_obj(); @@ -157,6 +157,7 @@ TEST_CASE("get file and put file", "[getAndPutFile]") { TEST_CASE("Test manipulation of attributes", "[testAttributes]") { TestController testController; + char src_format[] = "/tmp/gt.XXXXXX"; const char *sourcedir = testController.createTempDirectory(src_format); create_testfile_for_getfile(sourcedir); @@ -231,7 +232,7 @@ TEST_CASE("Test manipulation of attributes", "[testAttributes]") { TEST_CASE("Test error handling callback", "[errorHandling]") { TestController testController; - + char src_format[] = "/tmp/gt.XXXXXX"; const char *sourcedir = testController.createTempDirectory(src_format); auto instance = create_instance_obj(); @@ -277,6 +278,7 @@ TEST_CASE("Test error handling callback", "[errorHandling]") { TEST_CASE("Test standalone processors", "[testStandalone]") { TestController testController; + char src_format[] = "/tmp/gt.XXXXXX"; const char *sourcedir = testController.createTempDirectory(src_format); create_testfile_for_getfile(sourcedir); @@ -323,6 +325,8 @@ TEST_CASE("Test standalone processors", "[testStandalone]") { TEST_CASE("Test interaction of flow and standlone processors", "[testStandaloneWithFlow]") { TestController testController; + char src_format[] = "/tmp/gt.XXXXXX"; + char put_format[] = "/tmp/pt.XXXXXX"; const char *sourcedir = testController.createTempDirectory(src_format); const char *putfiledir = testController.createTempDirectory(put_format); @@ -366,7 +370,7 @@ TEST_CASE("Test standalone processors with file input", "[testStandaloneWithFile TestController testController; enable_logging(); - + char src_format[] = "/tmp/gt.XXXXXX"; const char *sourcedir = testController.createTempDirectory(src_format); std::string path = create_testfile_for_getfile(sourcedir);
