This is an automated email from the ASF dual-hosted git repository. adebreceni pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit f6bbc46601d17a46aed2cea17b9aec68bfc4f629 Author: Martin Zink <[email protected]> AuthorDate: Tue Sep 28 10:56:05 2021 +0200 MINIFICPP-1644: ProcessSession::read ignores the size/offset of the flowfile Signed-off-by: Adam Debreceni <[email protected]> This closes #1177 --- extensions/http-curl/client/HTTPStream.cpp | 7 +- extensions/http-curl/client/HTTPStream.h | 2 + extensions/rocksdb-repos/RocksDbStream.cpp | 8 +- extensions/rocksdb-repos/RocksDbStream.h | 2 + libminifi/include/io/AtomicEntryStream.h | 7 + libminifi/include/io/BufferStream.h | 6 +- libminifi/include/io/DescriptorStream.h | 5 +- libminifi/include/io/FileStream.h | 2 + libminifi/include/io/Stream.h | 4 + libminifi/include/io/StreamSlice.h | 56 ++++++++ libminifi/include/io/tls/SecureDescriptorStream.h | 4 +- libminifi/src/core/ProcessSession.cpp | 6 +- libminifi/src/io/DescriptorStream.cpp | 13 +- libminifi/src/io/FileStream.cpp | 4 + libminifi/src/io/StreamSlice.cpp | 47 +++++++ libminifi/src/io/tls/SecureDescriptorStream.cpp | 13 +- libminifi/test/TestBase.h | 6 +- .../rocksdb-tests/DBContentRepositoryTests.cpp | 6 + .../test/unit/ContentRepositoryDependentTests.h | 143 +++++++++++++++++++++ libminifi/test/unit/ProcessSessionTests.cpp | 6 + libminifi/test/unit/StreamTests.cpp | 21 +++ 21 files changed, 353 insertions(+), 15 deletions(-) diff --git a/extensions/http-curl/client/HTTPStream.cpp b/extensions/http-curl/client/HTTPStream.cpp index 639e875..178c117 100644 --- a/extensions/http-curl/client/HTTPStream.cpp +++ b/extensions/http-curl/client/HTTPStream.cpp @@ -49,10 +49,15 @@ void HttpStream::close() { } void HttpStream::seek(size_t /*offset*/) { - // seek is an unnecessary part of this implementatino + // seek is an unnecessary part of this implementation throw std::logic_error{"HttpStream::seek is unimplemented"}; } +size_t HttpStream::tell() const { + // tell is an unnecessary part of this implementation + throw std::logic_error{"HttpStream::tell is unimplemented"}; +} + // data stream overrides size_t HttpStream::write(const uint8_t *value, size_t size) { diff --git a/extensions/http-curl/client/HTTPStream.h b/extensions/http-curl/client/HTTPStream.h index d6ce39b..947b319 100644 --- a/extensions/http-curl/client/HTTPStream.h +++ b/extensions/http-curl/client/HTTPStream.h @@ -78,6 +78,8 @@ class HttpStream : public io::BaseStream { */ void seek(size_t offset) override; + size_t tell() const override; + size_t size() const override { return written; } diff --git a/extensions/rocksdb-repos/RocksDbStream.cpp b/extensions/rocksdb-repos/RocksDbStream.cpp index a18ced0..170c3b1 100644 --- a/extensions/rocksdb-repos/RocksDbStream.cpp +++ b/extensions/rocksdb-repos/RocksDbStream.cpp @@ -47,8 +47,12 @@ RocksDbStream::RocksDbStream(std::string path, gsl::not_null<minifi::internal::R void RocksDbStream::close() { } -void RocksDbStream::seek(size_t /*offset*/) { - // noop +void RocksDbStream::seek(size_t offset) { + offset_ = offset; +} + +size_t RocksDbStream::tell() const { + return offset_; } size_t RocksDbStream::write(const uint8_t *value, size_t size) { diff --git a/extensions/rocksdb-repos/RocksDbStream.h b/extensions/rocksdb-repos/RocksDbStream.h index 58463bc..78a5541 100644 --- a/extensions/rocksdb-repos/RocksDbStream.h +++ b/extensions/rocksdb-repos/RocksDbStream.h @@ -57,6 +57,8 @@ class RocksDbStream : public io::BaseStream { */ void seek(size_t offset) override; + size_t tell() const override; + size_t size() const override { return size_; } diff --git a/libminifi/include/io/AtomicEntryStream.h b/libminifi/include/io/AtomicEntryStream.h index 10b2eda..ada7902 100644 --- a/libminifi/include/io/AtomicEntryStream.h +++ b/libminifi/include/io/AtomicEntryStream.h @@ -66,6 +66,12 @@ class AtomicEntryStream : public BaseStream { */ void seek(size_t offset) override; + + size_t tell() const override { + return offset_; + } + + size_t size() const override { return length_; } @@ -108,6 +114,7 @@ void AtomicEntryStream<T>::seek(size_t offset) { offset_ = gsl::narrow<size_t>(offset); } + // data stream overrides template<typename T> size_t AtomicEntryStream<T>::write(const uint8_t *value, size_t size) { diff --git a/libminifi/include/io/BufferStream.h b/libminifi/include/io/BufferStream.h index c16e15c..3a8fc7d 100644 --- a/libminifi/include/io/BufferStream.h +++ b/libminifi/include/io/BufferStream.h @@ -65,7 +65,11 @@ class BufferStream : public BaseStream { } void seek(size_t offset) override { - readOffset_ += offset; + readOffset_ = offset; + } + + size_t tell() const override { + return readOffset_; } void close() override { } diff --git a/libminifi/include/io/DescriptorStream.h b/libminifi/include/io/DescriptorStream.h index 695f543..a0cd3ec 100644 --- a/libminifi/include/io/DescriptorStream.h +++ b/libminifi/include/io/DescriptorStream.h @@ -54,6 +54,9 @@ class DescriptorStream : public io::BaseStream { */ void seek(size_t offset) override; + + size_t tell() const override; + /** * Reads data and places it into buf * @param buf buffer in which we extract data @@ -69,7 +72,7 @@ class DescriptorStream : public io::BaseStream { size_t write(const uint8_t *value, size_t size) override; private: - std::recursive_mutex file_lock_; + mutable std::recursive_mutex file_lock_; int fd_; std::shared_ptr<logging::Logger> logger_; diff --git a/libminifi/include/io/FileStream.h b/libminifi/include/io/FileStream.h index 3ef4db7..e0bbafb 100644 --- a/libminifi/include/io/FileStream.h +++ b/libminifi/include/io/FileStream.h @@ -65,6 +65,8 @@ class FileStream : public io::BaseStream { */ void seek(size_t offset) override; + size_t tell() const override; + size_t size() const override { return length_; } diff --git a/libminifi/include/io/Stream.h b/libminifi/include/io/Stream.h index 0428cf7..db1acea 100644 --- a/libminifi/include/io/Stream.h +++ b/libminifi/include/io/Stream.h @@ -42,6 +42,10 @@ class Stream { throw std::runtime_error("Seek is not supported"); } + virtual size_t tell() const { + throw std::runtime_error("Tell is not supported"); + } + virtual int initialize() { return 1; } diff --git a/libminifi/include/io/StreamSlice.h b/libminifi/include/io/StreamSlice.h new file mode 100644 index 0000000..99c4701 --- /dev/null +++ b/libminifi/include/io/StreamSlice.h @@ -0,0 +1,56 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once +#include <algorithm> +#include <memory> + +#include "BaseStream.h" + +namespace org::apache::nifi::minifi::io { + +/** + * A wrapped Base Stream with configurable offset and size + * It hides the original (bigger stream) and acts like the stream starts and ends at the configured offset/size + */ +class StreamSlice : public BaseStream { // TODO(MINIFICPP-1648) This should be an InputStreamCallback, because writing to Slice is not supported + public: + StreamSlice(std::shared_ptr<io::BaseStream>& stream, size_t offset, size_t size); + + // from InputStream + size_t size() const override { return slice_size_; } + size_t read(uint8_t *value, size_t len) override; + + // from OutputStream + size_t write(const uint8_t*, size_t) override { throw std::runtime_error("write is not supported in StreamSlice"); } + + // from Stream + void close() override { stream_->close(); } + int initialize() override { return stream_->initialize(); } + + void seek(size_t offset) override; + size_t tell() const override; + const uint8_t* getBuffer() const override; + + private: + const std::shared_ptr<io::BaseStream>& stream_; + size_t slice_offset_; + size_t slice_size_; +}; + +} // namespace org::apache::nifi::minifi::io diff --git a/libminifi/include/io/tls/SecureDescriptorStream.h b/libminifi/include/io/tls/SecureDescriptorStream.h index 2953711..1d731ad 100644 --- a/libminifi/include/io/tls/SecureDescriptorStream.h +++ b/libminifi/include/io/tls/SecureDescriptorStream.h @@ -65,6 +65,8 @@ class SecureDescriptorStream : public io::BaseStream { */ void seek(size_t offset) override; + size_t tell() const override; + size_t size() const override { return -1; } @@ -84,7 +86,7 @@ class SecureDescriptorStream : public io::BaseStream { size_t write(const uint8_t *value, size_t size) override; protected: - std::recursive_mutex file_lock_; + mutable std::recursive_mutex file_lock_; int fd_; diff --git a/libminifi/src/core/ProcessSession.cpp b/libminifi/src/core/ProcessSession.cpp index 3c57b68..b31e63e 100644 --- a/libminifi/src/core/ProcessSession.cpp +++ b/libminifi/src/core/ProcessSession.cpp @@ -31,6 +31,7 @@ #include <vector> #include "core/ProcessSessionReadCallback.h" +#include "io/StreamSlice.h" #include "utils/gsl.h" /* This implementation is only for native Windows systems. */ @@ -311,9 +312,8 @@ int64_t ProcessSession::read(const std::shared_ptr<core::FlowFile> &flow, InputS throw Exception(FILE_OPERATION_EXCEPTION, "Failed to open flowfile content for read"); } - stream->seek(flow->getOffset()); - - auto ret = callback->process(stream); + auto flow_file_stream = std::make_shared<io::StreamSlice>(stream, flow->getOffset(), flow->getSize()); + auto ret = callback->process(flow_file_stream); if (ret < 0) { throw Exception(FILE_OPERATION_EXCEPTION, "Failed to process flowfile content"); } diff --git a/libminifi/src/io/DescriptorStream.cpp b/libminifi/src/io/DescriptorStream.cpp index 64c0840..d9336b9 100644 --- a/libminifi/src/io/DescriptorStream.cpp +++ b/libminifi/src/io/DescriptorStream.cpp @@ -43,9 +43,18 @@ DescriptorStream::DescriptorStream(int fd) void DescriptorStream::seek(size_t offset) { std::lock_guard<std::recursive_mutex> lock(file_lock_); #ifdef WIN32 - _lseeki64(fd_, gsl::narrow<int64_t>(offset), 0x00); + _lseeki64(fd_, gsl::narrow<int64_t>(offset), SEEK_SET); #else - lseek(fd_, gsl::narrow<off_t>(offset), 0x00); + lseek(fd_, gsl::narrow<off_t>(offset), SEEK_SET); +#endif +} + +size_t DescriptorStream::tell() const { + std::lock_guard<std::recursive_mutex> lock(file_lock_); +#ifdef WIN32 + return _lseeki64(fd_, 0, SEEK_CUR); +#else + return lseek(fd_, 0, SEEK_CUR); #endif } diff --git a/libminifi/src/io/FileStream.cpp b/libminifi/src/io/FileStream.cpp index c5470b4..af46abf 100644 --- a/libminifi/src/io/FileStream.cpp +++ b/libminifi/src/io/FileStream.cpp @@ -113,6 +113,10 @@ void FileStream::seek(size_t offset) { logging::LOG_ERROR(logger_) << SEEK_ERROR_MSG << SEEKP_CALL_ERROR_MSG; } +size_t FileStream::tell() const { + return offset_; +} + size_t FileStream::write(const uint8_t *value, size_t size) { if (size == 0) return 0; if (IsNullOrEmpty(value)) { diff --git a/libminifi/src/io/StreamSlice.cpp b/libminifi/src/io/StreamSlice.cpp new file mode 100644 index 0000000..88c4dc6 --- /dev/null +++ b/libminifi/src/io/StreamSlice.cpp @@ -0,0 +1,47 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "io/StreamSlice.h" + +namespace org::apache::nifi::minifi::io { + +StreamSlice::StreamSlice(std::shared_ptr<io::BaseStream>& stream, size_t offset, size_t size) : stream_(stream), slice_offset_(offset), slice_size_(size) { + stream_->seek(slice_offset_); + if (stream_->size() < slice_offset_ + slice_size_) + throw std::invalid_argument("StreamSlice is bigger than the Stream"); +} + +size_t StreamSlice::read(uint8_t *value, size_t len) { + const size_t max_size = std::min(len, size() - tell()); + return stream_->read(value, max_size); +} + +void StreamSlice::seek(size_t offset) { + stream_->seek(slice_offset_ + offset); +} + +size_t StreamSlice::tell() const { + return stream_->tell() - slice_offset_; +} + +const uint8_t* StreamSlice::getBuffer() const { + const uint8_t* buffer = stream_->getBuffer(); + return buffer + slice_offset_; +} + +} // namespace org::apache::nifi::minifi::io diff --git a/libminifi/src/io/tls/SecureDescriptorStream.cpp b/libminifi/src/io/tls/SecureDescriptorStream.cpp index e7a84fc..f11a89b 100644 --- a/libminifi/src/io/tls/SecureDescriptorStream.cpp +++ b/libminifi/src/io/tls/SecureDescriptorStream.cpp @@ -39,9 +39,18 @@ SecureDescriptorStream::SecureDescriptorStream(int fd, SSL *ssl) void SecureDescriptorStream::seek(size_t offset) { std::lock_guard<std::recursive_mutex> lock(file_lock_); #ifdef WIN32 - _lseeki64(fd_, gsl::narrow<int64_t>(offset), 0x00); + _lseeki64(fd_, gsl::narrow<int64_t>(offset), SEEK_SET); #else - lseek(fd_, gsl::narrow<off_t>(offset), 0x00); + lseek(fd_, gsl::narrow<off_t>(offset), SEEK_SET); +#endif +} + +size_t SecureDescriptorStream::tell() const { + std::lock_guard<std::recursive_mutex> lock(file_lock_); +#ifdef WIN32 + return _lseeki64(fd_, 0, SEEK_CUR); +#else + return lseek(fd_, 0, SEEK_CUR); #endif } diff --git a/libminifi/test/TestBase.h b/libminifi/test/TestBase.h index 9293d6d..5c140de 100644 --- a/libminifi/test/TestBase.h +++ b/libminifi/test/TestBase.h @@ -427,12 +427,14 @@ class TestController { flow_version_ = std::make_shared<minifi::state::response::FlowVersion>("test", "test", "test"); } - std::shared_ptr<TestPlan> createPlan(std::shared_ptr<minifi::Configure> configuration = nullptr, const char* state_dir = nullptr) { + std::shared_ptr<TestPlan> createPlan(std::shared_ptr<minifi::Configure> configuration = nullptr, + const char* state_dir = nullptr, + std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>()) { if (configuration == nullptr) { configuration = std::make_shared<minifi::Configure>(); configuration->set(minifi::Configure::nifi_state_management_provider_local_class_name, "UnorderedMapKeyValueStoreService"); + configuration->set(minifi::Configure::nifi_dbcontent_repository_directory_default, createTempDirectory()); } - std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>(); content_repo->initialize(configuration); diff --git a/libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp b/libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp index fbee900..1b6383a 100644 --- a/libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp +++ b/libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp @@ -26,6 +26,7 @@ #include "provenance/Provenance.h" #include "../TestBase.h" #include "../unit/ProvenanceTestHelper.h" +#include "../unit/ContentRepositoryDependentTests.h" TEST_CASE("Write Claim", "[TestDBCR1]") { TestController testController; @@ -225,3 +226,8 @@ TEST_CASE("Delete Remove Count Claim", "[TestDBCR5]") { REQUIRE(readstr == "well hello there"); } + +TEST_CASE("ProcessSession::read reads the flowfile from offset to size", "[readoffsetsize]") { + ContentRepositoryDependentTests::testReadOnSmallerClonedFlowFiles(std::make_shared<core::repository::DatabaseContentRepository>()); +} + diff --git a/libminifi/test/unit/ContentRepositoryDependentTests.h b/libminifi/test/unit/ContentRepositoryDependentTests.h new file mode 100644 index 0000000..01c6563 --- /dev/null +++ b/libminifi/test/unit/ContentRepositoryDependentTests.h @@ -0,0 +1,143 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <memory> +#include <string> +#include <vector> + +#include <catch.hpp> +#include "core/ProcessSession.h" +#include "core/Resource.h" +#include "../TestBase.h" +#include "StreamPipe.h" + +#pragma once + +namespace ContentRepositoryDependentTests { + +struct WriteStringToFlowFile : public minifi::OutputStreamCallback { + const std::vector<uint8_t> buffer_; + + explicit WriteStringToFlowFile(const std::string& buffer) : buffer_(buffer.begin(), buffer.end()) {} + + int64_t process(const std::shared_ptr<minifi::io::BaseStream> &stream) override { + size_t bytes_written = stream->write(buffer_, buffer_.size()); + return minifi::io::isError(bytes_written) ? -1 : gsl::narrow<int64_t>(bytes_written); + } +}; + +struct ReadUntilStreamSize : public minifi::InputStreamCallback { + std::string value_; + + int64_t process(const std::shared_ptr<minifi::io::BaseStream> &stream) override { + value_.clear(); + std::vector<uint8_t> buffer; + size_t bytes_read = stream->read(buffer, stream->size()); + value_.assign(buffer.begin(), buffer.end()); + return minifi::io::isError(bytes_read) ? -1 : gsl::narrow<int64_t>(bytes_read); + } +}; + +struct ReadUntilItCan : public minifi::InputStreamCallback { + std::string value_; + + int64_t process(const std::shared_ptr<minifi::io::BaseStream> &stream) override { + value_.clear(); + std::vector<uint8_t> buffer; + size_t bytes_read = 0; + while (true) { + size_t read_result = stream->read(buffer, 1024); + if (minifi::io::isError(read_result)) + return -1; + if (read_result == 0) + return bytes_read; + bytes_read += read_result; + value_.append(buffer.begin(), buffer.end()); + } + } +}; + +class DummyProcessor : public core::Processor { + using core::Processor::Processor; +}; + +REGISTER_RESOURCE(DummyProcessor, "A processor that does nothing."); + +class Fixture { + public: + const core::Relationship Success{"success", "everything is fine"}; + const core::Relationship Failure{"failure", "something has gone awry"}; + + explicit Fixture(std::shared_ptr<core::ContentRepository> content_repo) { + test_plan_ = test_controller_.createPlan(nullptr, nullptr, content_repo); + dummy_processor_ = test_plan_->addProcessor("DummyProcessor", "dummyProcessor"); + context_ = [this] { + test_plan_->runNextProcessor(); + return test_plan_->getCurrentContext(); + }(); + process_session_ = std::make_unique<core::ProcessSession>(context_); + } + + core::ProcessSession &processSession() { return *process_session_; } + + void commitFlowFile(const std::string& content) { + const auto original_ff = process_session_->create(); + WriteStringToFlowFile callback(content); + process_session_->write(original_ff, &callback); + process_session_->transfer(original_ff, Success); + process_session_->commit(); + } + + private: + TestController test_controller_; + + std::shared_ptr<TestPlan> test_plan_; + std::shared_ptr<core::Processor> dummy_processor_; + std::shared_ptr<core::ProcessContext> context_; + std::unique_ptr<core::ProcessSession> process_session_; +}; + +void testReadOnSmallerClonedFlowFiles(std::shared_ptr<core::ContentRepository> content_repo) { + Fixture fixture = Fixture(content_repo); + core::ProcessSession& process_session = fixture.processSession(); + fixture.commitFlowFile("foobar"); + const auto original_ff = process_session.get(); + REQUIRE(original_ff); + auto clone_first_half = process_session.clone(original_ff, 0, 3); + auto clone_second_half = process_session.clone(original_ff, 3, 3); + REQUIRE(clone_first_half != nullptr); + REQUIRE(clone_second_half != nullptr); + ReadUntilStreamSize read_until_stream_size_callback; + ReadUntilItCan read_until_it_can_callback; + process_session.read(original_ff, &read_until_stream_size_callback); + process_session.read(original_ff, &read_until_it_can_callback); + CHECK(original_ff->getSize() == 6); + CHECK(read_until_stream_size_callback.value_ == "foobar"); + CHECK(read_until_it_can_callback.value_ == "foobar"); + process_session.read(clone_first_half, &read_until_stream_size_callback); + process_session.read(clone_first_half, &read_until_it_can_callback); + CHECK(clone_first_half->getSize() == 3); + CHECK(read_until_stream_size_callback.value_ == "foo"); + CHECK(read_until_it_can_callback.value_ == "foo"); + process_session.read(clone_second_half, &read_until_stream_size_callback); + process_session.read(clone_second_half, &read_until_it_can_callback); + CHECK(clone_second_half->getSize() == 3); + CHECK(read_until_stream_size_callback.value_ == "bar"); + CHECK(read_until_it_can_callback.value_ == "bar"); +} +} // namespace ContentRepositoryDependentTests diff --git a/libminifi/test/unit/ProcessSessionTests.cpp b/libminifi/test/unit/ProcessSessionTests.cpp index 2a40519..5629d43 100644 --- a/libminifi/test/unit/ProcessSessionTests.cpp +++ b/libminifi/test/unit/ProcessSessionTests.cpp @@ -21,6 +21,7 @@ #include "core/ProcessSession.h" #include "core/Resource.h" #include "../TestBase.h" +#include "ContentRepositoryDependentTests.h" namespace { @@ -94,3 +95,8 @@ TEST_CASE("ProcessSession::rollback penalizes affected flowfiles", "[rollback]") next_flow_file_to_be_processed = process_session.get(); REQUIRE(next_flow_file_to_be_processed == flow_file_3); } + +TEST_CASE("ProcessSession::read reads the flowfile from offset to size", "[readoffsetsize]") { + ContentRepositoryDependentTests::testReadOnSmallerClonedFlowFiles(std::make_shared<core::repository::VolatileContentRepository>()); + ContentRepositoryDependentTests::testReadOnSmallerClonedFlowFiles(std::make_shared<core::repository::FileSystemRepository>()); +} diff --git a/libminifi/test/unit/StreamTests.cpp b/libminifi/test/unit/StreamTests.cpp index 99bd92f..ce7bc2f 100644 --- a/libminifi/test/unit/StreamTests.cpp +++ b/libminifi/test/unit/StreamTests.cpp @@ -25,6 +25,7 @@ #include <utility> #include "../TestBase.h" #include "io/BaseStream.h" +#include "io/StreamSlice.h" TEST_CASE("TestReadData", "[testread]") { auto base = std::make_shared<minifi::io::BufferStream>(); @@ -77,3 +78,23 @@ TEST_CASE("TestWrite1", "[testwrite]") { REQUIRE(8 == base->read(reinterpret_cast<uint8_t*>(const_cast<char*>(bytes.data())), 8)); REQUIRE(bytes == "\x01\x02\x03\x04\x05\x06\x07\x08"); } + +TEST_CASE("InvalidStreamSliceTest", "[teststreamslice]") { + std::shared_ptr<minifi::io::BaseStream> base = std::make_shared<minifi::io::BufferStream>(); + base->write((const uint8_t*)"\x01\x02\x03\x04\x05\x06\x07\x08", 8); + REQUIRE_THROWS_WITH(std::make_shared<minifi::io::StreamSlice>(base, 0, 9), "StreamSlice is bigger than the Stream"); + REQUIRE_THROWS_WITH(std::make_shared<minifi::io::StreamSlice>(base, 7, 3), "StreamSlice is bigger than the Stream"); +} + +TEST_CASE("StreamSliceTest1", "[teststreamslice]") { + std::shared_ptr<minifi::io::BaseStream> base = std::make_shared<minifi::io::BufferStream>(); + base->write((const uint8_t*)"\x01\x02\x03\x04\x05\x06\x07\x08", 8); + std::shared_ptr<minifi::io::BaseStream> stream_slice = std::make_shared<minifi::io::StreamSlice>(base, 2, 4); + std::vector<uint8_t> buffer; + REQUIRE(stream_slice->read(buffer, stream_slice->size()) == 4); + stream_slice->seek(0); + std::vector<uint8_t> buffer2; + REQUIRE(stream_slice->read(buffer2, 1000) == 4); + REQUIRE(buffer == buffer2); + REQUIRE(buffer == std::vector<uint8_t>({3, 4, 5, 6})); +}
