This is an automated email from the ASF dual-hosted git repository.

adebreceni pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit f6bbc46601d17a46aed2cea17b9aec68bfc4f629
Author: Martin Zink <[email protected]>
AuthorDate: Tue Sep 28 10:56:05 2021 +0200

    MINIFICPP-1644: ProcessSession::read ignores the size/offset of the flowfile
    
    Signed-off-by: Adam Debreceni <[email protected]>
    
    This closes #1177
---
 extensions/http-curl/client/HTTPStream.cpp         |   7 +-
 extensions/http-curl/client/HTTPStream.h           |   2 +
 extensions/rocksdb-repos/RocksDbStream.cpp         |   8 +-
 extensions/rocksdb-repos/RocksDbStream.h           |   2 +
 libminifi/include/io/AtomicEntryStream.h           |   7 +
 libminifi/include/io/BufferStream.h                |   6 +-
 libminifi/include/io/DescriptorStream.h            |   5 +-
 libminifi/include/io/FileStream.h                  |   2 +
 libminifi/include/io/Stream.h                      |   4 +
 libminifi/include/io/StreamSlice.h                 |  56 ++++++++
 libminifi/include/io/tls/SecureDescriptorStream.h  |   4 +-
 libminifi/src/core/ProcessSession.cpp              |   6 +-
 libminifi/src/io/DescriptorStream.cpp              |  13 +-
 libminifi/src/io/FileStream.cpp                    |   4 +
 libminifi/src/io/StreamSlice.cpp                   |  47 +++++++
 libminifi/src/io/tls/SecureDescriptorStream.cpp    |  13 +-
 libminifi/test/TestBase.h                          |   6 +-
 .../rocksdb-tests/DBContentRepositoryTests.cpp     |   6 +
 .../test/unit/ContentRepositoryDependentTests.h    | 143 +++++++++++++++++++++
 libminifi/test/unit/ProcessSessionTests.cpp        |   6 +
 libminifi/test/unit/StreamTests.cpp                |  21 +++
 21 files changed, 353 insertions(+), 15 deletions(-)

diff --git a/extensions/http-curl/client/HTTPStream.cpp 
b/extensions/http-curl/client/HTTPStream.cpp
index 639e875..178c117 100644
--- a/extensions/http-curl/client/HTTPStream.cpp
+++ b/extensions/http-curl/client/HTTPStream.cpp
@@ -49,10 +49,15 @@ void HttpStream::close() {
 }
 
 void HttpStream::seek(size_t /*offset*/) {
-  // seek is an unnecessary part of this implementatino
+  // seek is an unnecessary part of this implementation
   throw std::logic_error{"HttpStream::seek is unimplemented"};
 }
 
+size_t HttpStream::tell() const  {
+  // tell is an unnecessary part of this implementation
+  throw std::logic_error{"HttpStream::tell is unimplemented"};
+}
+
 // data stream overrides
 
 size_t HttpStream::write(const uint8_t *value, size_t size) {
diff --git a/extensions/http-curl/client/HTTPStream.h 
b/extensions/http-curl/client/HTTPStream.h
index d6ce39b..947b319 100644
--- a/extensions/http-curl/client/HTTPStream.h
+++ b/extensions/http-curl/client/HTTPStream.h
@@ -78,6 +78,8 @@ class HttpStream : public io::BaseStream {
    */
   void seek(size_t offset) override;
 
+  size_t tell() const override;
+
   size_t size() const override {
     return written;
   }
diff --git a/extensions/rocksdb-repos/RocksDbStream.cpp 
b/extensions/rocksdb-repos/RocksDbStream.cpp
index a18ced0..170c3b1 100644
--- a/extensions/rocksdb-repos/RocksDbStream.cpp
+++ b/extensions/rocksdb-repos/RocksDbStream.cpp
@@ -47,8 +47,12 @@ RocksDbStream::RocksDbStream(std::string path, 
gsl::not_null<minifi::internal::R
 void RocksDbStream::close() {
 }
 
-void RocksDbStream::seek(size_t /*offset*/) {
-  // noop
+void RocksDbStream::seek(size_t offset) {
+  offset_ = offset;
+}
+
+size_t RocksDbStream::tell() const {
+  return offset_;
 }
 
 size_t RocksDbStream::write(const uint8_t *value, size_t size) {
diff --git a/extensions/rocksdb-repos/RocksDbStream.h 
b/extensions/rocksdb-repos/RocksDbStream.h
index 58463bc..78a5541 100644
--- a/extensions/rocksdb-repos/RocksDbStream.h
+++ b/extensions/rocksdb-repos/RocksDbStream.h
@@ -57,6 +57,8 @@ class RocksDbStream : public io::BaseStream {
    */
   void seek(size_t offset) override;
 
+  size_t tell() const override;
+
   size_t size() const override {
     return size_;
   }
diff --git a/libminifi/include/io/AtomicEntryStream.h 
b/libminifi/include/io/AtomicEntryStream.h
index 10b2eda..ada7902 100644
--- a/libminifi/include/io/AtomicEntryStream.h
+++ b/libminifi/include/io/AtomicEntryStream.h
@@ -66,6 +66,12 @@ class AtomicEntryStream : public BaseStream {
    */
   void seek(size_t offset) override;
 
+
+  size_t tell() const override {
+    return offset_;
+  }
+
+
   size_t size() const override {
     return length_;
   }
@@ -108,6 +114,7 @@ void AtomicEntryStream<T>::seek(size_t offset) {
   offset_ = gsl::narrow<size_t>(offset);
 }
 
+
 // data stream overrides
 template<typename T>
 size_t AtomicEntryStream<T>::write(const uint8_t *value, size_t size) {
diff --git a/libminifi/include/io/BufferStream.h 
b/libminifi/include/io/BufferStream.h
index c16e15c..3a8fc7d 100644
--- a/libminifi/include/io/BufferStream.h
+++ b/libminifi/include/io/BufferStream.h
@@ -65,7 +65,11 @@ class BufferStream : public BaseStream {
   }
 
   void seek(size_t offset) override {
-    readOffset_ += offset;
+    readOffset_ = offset;
+  }
+
+  size_t tell() const override {
+    return readOffset_;
   }
 
   void close() override { }
diff --git a/libminifi/include/io/DescriptorStream.h 
b/libminifi/include/io/DescriptorStream.h
index 695f543..a0cd3ec 100644
--- a/libminifi/include/io/DescriptorStream.h
+++ b/libminifi/include/io/DescriptorStream.h
@@ -54,6 +54,9 @@ class DescriptorStream : public io::BaseStream {
    */
   void seek(size_t offset) override;
 
+
+  size_t tell() const override;
+
   /**
    * Reads data and places it into buf
    * @param buf buffer in which we extract data
@@ -69,7 +72,7 @@ class DescriptorStream : public io::BaseStream {
   size_t write(const uint8_t *value, size_t size) override;
 
  private:
-  std::recursive_mutex file_lock_;
+  mutable std::recursive_mutex file_lock_;
   int fd_;
 
   std::shared_ptr<logging::Logger> logger_;
diff --git a/libminifi/include/io/FileStream.h 
b/libminifi/include/io/FileStream.h
index 3ef4db7..e0bbafb 100644
--- a/libminifi/include/io/FileStream.h
+++ b/libminifi/include/io/FileStream.h
@@ -65,6 +65,8 @@ class FileStream : public io::BaseStream {
    */
   void seek(size_t offset) override;
 
+  size_t tell() const override;
+
   size_t size() const override {
     return length_;
   }
diff --git a/libminifi/include/io/Stream.h b/libminifi/include/io/Stream.h
index 0428cf7..db1acea 100644
--- a/libminifi/include/io/Stream.h
+++ b/libminifi/include/io/Stream.h
@@ -42,6 +42,10 @@ class Stream {
     throw std::runtime_error("Seek is not supported");
   }
 
+  virtual size_t tell() const {
+    throw std::runtime_error("Tell is not supported");
+  }
+
   virtual int initialize() {
     return 1;
   }
diff --git a/libminifi/include/io/StreamSlice.h 
b/libminifi/include/io/StreamSlice.h
new file mode 100644
index 0000000..99c4701
--- /dev/null
+++ b/libminifi/include/io/StreamSlice.h
@@ -0,0 +1,56 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+#include <algorithm>
+#include <memory>
+
+#include "BaseStream.h"
+
+namespace org::apache::nifi::minifi::io {
+
+/**
+ * A wrapped Base Stream with configurable offset and size
+ * It hides the original (bigger stream) and acts like the stream starts and 
ends at the configured offset/size
+ */
+class StreamSlice : public BaseStream {  // TODO(MINIFICPP-1648) This should 
be an InputStreamCallback, because writing to Slice is not supported
+ public:
+  StreamSlice(std::shared_ptr<io::BaseStream>& stream, size_t offset, size_t 
size);
+
+  // from InputStream
+  size_t size() const override { return slice_size_; }
+  size_t read(uint8_t *value, size_t len) override;
+
+  // from OutputStream
+  size_t write(const uint8_t*, size_t) override { throw 
std::runtime_error("write is not supported in StreamSlice"); }
+
+  // from Stream
+  void close() override { stream_->close(); }
+  int initialize() override { return stream_->initialize(); }
+
+  void seek(size_t offset) override;
+  size_t tell() const override;
+  const uint8_t* getBuffer() const override;
+
+ private:
+  const std::shared_ptr<io::BaseStream>& stream_;
+  size_t slice_offset_;
+  size_t slice_size_;
+};
+
+}  // namespace org::apache::nifi::minifi::io
diff --git a/libminifi/include/io/tls/SecureDescriptorStream.h 
b/libminifi/include/io/tls/SecureDescriptorStream.h
index 2953711..1d731ad 100644
--- a/libminifi/include/io/tls/SecureDescriptorStream.h
+++ b/libminifi/include/io/tls/SecureDescriptorStream.h
@@ -65,6 +65,8 @@ class SecureDescriptorStream : public io::BaseStream {
    */
   void seek(size_t offset) override;
 
+  size_t tell() const override;
+
   size_t size() const override {
     return -1;
   }
@@ -84,7 +86,7 @@ class SecureDescriptorStream : public io::BaseStream {
   size_t write(const uint8_t *value, size_t size) override;
 
  protected:
-  std::recursive_mutex file_lock_;
+  mutable std::recursive_mutex file_lock_;
 
   int fd_;
 
diff --git a/libminifi/src/core/ProcessSession.cpp 
b/libminifi/src/core/ProcessSession.cpp
index 3c57b68..b31e63e 100644
--- a/libminifi/src/core/ProcessSession.cpp
+++ b/libminifi/src/core/ProcessSession.cpp
@@ -31,6 +31,7 @@
 #include <vector>
 
 #include "core/ProcessSessionReadCallback.h"
+#include "io/StreamSlice.h"
 #include "utils/gsl.h"
 
 /* This implementation is only for native Windows systems.  */
@@ -311,9 +312,8 @@ int64_t ProcessSession::read(const 
std::shared_ptr<core::FlowFile> &flow, InputS
       throw Exception(FILE_OPERATION_EXCEPTION, "Failed to open flowfile 
content for read");
     }
 
-    stream->seek(flow->getOffset());
-
-    auto ret = callback->process(stream);
+    auto flow_file_stream = std::make_shared<io::StreamSlice>(stream, 
flow->getOffset(), flow->getSize());
+    auto ret = callback->process(flow_file_stream);
     if (ret < 0) {
       throw Exception(FILE_OPERATION_EXCEPTION, "Failed to process flowfile 
content");
     }
diff --git a/libminifi/src/io/DescriptorStream.cpp 
b/libminifi/src/io/DescriptorStream.cpp
index 64c0840..d9336b9 100644
--- a/libminifi/src/io/DescriptorStream.cpp
+++ b/libminifi/src/io/DescriptorStream.cpp
@@ -43,9 +43,18 @@ DescriptorStream::DescriptorStream(int fd)
 void DescriptorStream::seek(size_t offset) {
   std::lock_guard<std::recursive_mutex> lock(file_lock_);
 #ifdef WIN32
-  _lseeki64(fd_, gsl::narrow<int64_t>(offset), 0x00);
+  _lseeki64(fd_, gsl::narrow<int64_t>(offset), SEEK_SET);
 #else
-  lseek(fd_, gsl::narrow<off_t>(offset), 0x00);
+  lseek(fd_, gsl::narrow<off_t>(offset), SEEK_SET);
+#endif
+}
+
+size_t DescriptorStream::tell() const {
+  std::lock_guard<std::recursive_mutex> lock(file_lock_);
+#ifdef WIN32
+  return _lseeki64(fd_, 0, SEEK_CUR);
+#else
+  return lseek(fd_, 0, SEEK_CUR);
 #endif
 }
 
diff --git a/libminifi/src/io/FileStream.cpp b/libminifi/src/io/FileStream.cpp
index c5470b4..af46abf 100644
--- a/libminifi/src/io/FileStream.cpp
+++ b/libminifi/src/io/FileStream.cpp
@@ -113,6 +113,10 @@ void FileStream::seek(size_t offset) {
     logging::LOG_ERROR(logger_) << SEEK_ERROR_MSG << SEEKP_CALL_ERROR_MSG;
 }
 
+size_t FileStream::tell() const {
+  return offset_;
+}
+
 size_t FileStream::write(const uint8_t *value, size_t size) {
   if (size == 0) return 0;
   if (IsNullOrEmpty(value)) {
diff --git a/libminifi/src/io/StreamSlice.cpp b/libminifi/src/io/StreamSlice.cpp
new file mode 100644
index 0000000..88c4dc6
--- /dev/null
+++ b/libminifi/src/io/StreamSlice.cpp
@@ -0,0 +1,47 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "io/StreamSlice.h"
+
+namespace org::apache::nifi::minifi::io {
+
+StreamSlice::StreamSlice(std::shared_ptr<io::BaseStream>& stream, size_t 
offset, size_t size) : stream_(stream), slice_offset_(offset), 
slice_size_(size) {
+  stream_->seek(slice_offset_);
+  if (stream_->size() < slice_offset_ + slice_size_)
+    throw std::invalid_argument("StreamSlice is bigger than the Stream");
+}
+
+size_t StreamSlice::read(uint8_t *value, size_t len) {
+  const size_t max_size = std::min(len, size() - tell());
+  return stream_->read(value, max_size);
+}
+
+void StreamSlice::seek(size_t offset) {
+  stream_->seek(slice_offset_ + offset);
+}
+
+size_t StreamSlice::tell() const {
+  return stream_->tell() - slice_offset_;
+}
+
+const uint8_t* StreamSlice::getBuffer() const {
+  const uint8_t* buffer = stream_->getBuffer();
+  return buffer + slice_offset_;
+}
+
+}  // namespace org::apache::nifi::minifi::io
diff --git a/libminifi/src/io/tls/SecureDescriptorStream.cpp 
b/libminifi/src/io/tls/SecureDescriptorStream.cpp
index e7a84fc..f11a89b 100644
--- a/libminifi/src/io/tls/SecureDescriptorStream.cpp
+++ b/libminifi/src/io/tls/SecureDescriptorStream.cpp
@@ -39,9 +39,18 @@ SecureDescriptorStream::SecureDescriptorStream(int fd, SSL 
*ssl)
 void SecureDescriptorStream::seek(size_t offset) {
   std::lock_guard<std::recursive_mutex> lock(file_lock_);
 #ifdef WIN32
-  _lseeki64(fd_, gsl::narrow<int64_t>(offset), 0x00);
+  _lseeki64(fd_, gsl::narrow<int64_t>(offset), SEEK_SET);
 #else
-  lseek(fd_, gsl::narrow<off_t>(offset), 0x00);
+  lseek(fd_, gsl::narrow<off_t>(offset), SEEK_SET);
+#endif
+}
+
+size_t SecureDescriptorStream::tell() const {
+  std::lock_guard<std::recursive_mutex> lock(file_lock_);
+#ifdef WIN32
+  return _lseeki64(fd_, 0, SEEK_CUR);
+#else
+  return lseek(fd_, 0, SEEK_CUR);
 #endif
 }
 
diff --git a/libminifi/test/TestBase.h b/libminifi/test/TestBase.h
index 9293d6d..5c140de 100644
--- a/libminifi/test/TestBase.h
+++ b/libminifi/test/TestBase.h
@@ -427,12 +427,14 @@ class TestController {
     flow_version_ = 
std::make_shared<minifi::state::response::FlowVersion>("test", "test", "test");
   }
 
-  std::shared_ptr<TestPlan> createPlan(std::shared_ptr<minifi::Configure> 
configuration = nullptr, const char* state_dir = nullptr) {
+  std::shared_ptr<TestPlan> createPlan(std::shared_ptr<minifi::Configure> 
configuration = nullptr,
+                                       const char* state_dir = nullptr,
+                                       
std::shared_ptr<core::ContentRepository> content_repo = 
std::make_shared<core::repository::VolatileContentRepository>()) {
     if (configuration == nullptr) {
       configuration = std::make_shared<minifi::Configure>();
       
configuration->set(minifi::Configure::nifi_state_management_provider_local_class_name,
 "UnorderedMapKeyValueStoreService");
+      
configuration->set(minifi::Configure::nifi_dbcontent_repository_directory_default,
 createTempDirectory());
     }
-    std::shared_ptr<core::ContentRepository> content_repo = 
std::make_shared<core::repository::VolatileContentRepository>();
 
     content_repo->initialize(configuration);
 
diff --git a/libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp 
b/libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp
index fbee900..1b6383a 100644
--- a/libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp
+++ b/libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp
@@ -26,6 +26,7 @@
 #include "provenance/Provenance.h"
 #include "../TestBase.h"
 #include "../unit/ProvenanceTestHelper.h"
+#include "../unit/ContentRepositoryDependentTests.h"
 
 TEST_CASE("Write Claim", "[TestDBCR1]") {
   TestController testController;
@@ -225,3 +226,8 @@ TEST_CASE("Delete Remove Count Claim", "[TestDBCR5]") {
 
   REQUIRE(readstr == "well hello there");
 }
+
+TEST_CASE("ProcessSession::read reads the flowfile from offset to size", 
"[readoffsetsize]") {
+  
ContentRepositoryDependentTests::testReadOnSmallerClonedFlowFiles(std::make_shared<core::repository::DatabaseContentRepository>());
+}
+
diff --git a/libminifi/test/unit/ContentRepositoryDependentTests.h 
b/libminifi/test/unit/ContentRepositoryDependentTests.h
new file mode 100644
index 0000000..01c6563
--- /dev/null
+++ b/libminifi/test/unit/ContentRepositoryDependentTests.h
@@ -0,0 +1,143 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <memory>
+#include <string>
+#include <vector>
+
+#include <catch.hpp>
+#include "core/ProcessSession.h"
+#include "core/Resource.h"
+#include "../TestBase.h"
+#include "StreamPipe.h"
+
+#pragma once
+
+namespace ContentRepositoryDependentTests {
+
+struct WriteStringToFlowFile : public minifi::OutputStreamCallback {
+  const std::vector<uint8_t> buffer_;
+
+  explicit WriteStringToFlowFile(const std::string& buffer) : 
buffer_(buffer.begin(), buffer.end()) {}
+
+  int64_t process(const std::shared_ptr<minifi::io::BaseStream> &stream) 
override {
+    size_t bytes_written = stream->write(buffer_, buffer_.size());
+    return minifi::io::isError(bytes_written) ? -1 : 
gsl::narrow<int64_t>(bytes_written);
+  }
+};
+
+struct ReadUntilStreamSize : public minifi::InputStreamCallback {
+  std::string value_;
+
+  int64_t process(const std::shared_ptr<minifi::io::BaseStream> &stream) 
override {
+    value_.clear();
+    std::vector<uint8_t> buffer;
+    size_t bytes_read = stream->read(buffer, stream->size());
+    value_.assign(buffer.begin(), buffer.end());
+    return minifi::io::isError(bytes_read) ? -1 : 
gsl::narrow<int64_t>(bytes_read);
+  }
+};
+
+struct ReadUntilItCan : public minifi::InputStreamCallback {
+  std::string value_;
+
+  int64_t process(const std::shared_ptr<minifi::io::BaseStream> &stream) 
override {
+    value_.clear();
+    std::vector<uint8_t> buffer;
+    size_t bytes_read = 0;
+    while (true) {
+      size_t read_result = stream->read(buffer, 1024);
+      if (minifi::io::isError(read_result))
+        return -1;
+      if (read_result == 0)
+        return bytes_read;
+      bytes_read += read_result;
+      value_.append(buffer.begin(), buffer.end());
+    }
+  }
+};
+
+class DummyProcessor : public core::Processor {
+  using core::Processor::Processor;
+};
+
+REGISTER_RESOURCE(DummyProcessor, "A processor that does nothing.");
+
+class Fixture {
+ public:
+  const core::Relationship Success{"success", "everything is fine"};
+  const core::Relationship Failure{"failure", "something has gone awry"};
+
+  explicit Fixture(std::shared_ptr<core::ContentRepository> content_repo) {
+    test_plan_ = test_controller_.createPlan(nullptr, nullptr, content_repo);
+    dummy_processor_ = test_plan_->addProcessor("DummyProcessor", 
"dummyProcessor");
+    context_ = [this] {
+      test_plan_->runNextProcessor();
+      return test_plan_->getCurrentContext();
+    }();
+    process_session_ = std::make_unique<core::ProcessSession>(context_);
+  }
+
+  core::ProcessSession &processSession() { return *process_session_; }
+
+  void commitFlowFile(const std::string& content) {
+    const auto original_ff = process_session_->create();
+    WriteStringToFlowFile callback(content);
+    process_session_->write(original_ff, &callback);
+    process_session_->transfer(original_ff, Success);
+    process_session_->commit();
+  }
+
+ private:
+  TestController test_controller_;
+
+  std::shared_ptr<TestPlan> test_plan_;
+  std::shared_ptr<core::Processor> dummy_processor_;
+  std::shared_ptr<core::ProcessContext> context_;
+  std::unique_ptr<core::ProcessSession> process_session_;
+};
+
+void testReadOnSmallerClonedFlowFiles(std::shared_ptr<core::ContentRepository> 
content_repo) {
+  Fixture fixture = Fixture(content_repo);
+  core::ProcessSession& process_session = fixture.processSession();
+  fixture.commitFlowFile("foobar");
+  const auto original_ff = process_session.get();
+  REQUIRE(original_ff);
+  auto clone_first_half = process_session.clone(original_ff, 0, 3);
+  auto clone_second_half = process_session.clone(original_ff, 3, 3);
+  REQUIRE(clone_first_half != nullptr);
+  REQUIRE(clone_second_half != nullptr);
+  ReadUntilStreamSize read_until_stream_size_callback;
+  ReadUntilItCan read_until_it_can_callback;
+  process_session.read(original_ff, &read_until_stream_size_callback);
+  process_session.read(original_ff, &read_until_it_can_callback);
+  CHECK(original_ff->getSize() == 6);
+  CHECK(read_until_stream_size_callback.value_ == "foobar");
+  CHECK(read_until_it_can_callback.value_ == "foobar");
+  process_session.read(clone_first_half, &read_until_stream_size_callback);
+  process_session.read(clone_first_half, &read_until_it_can_callback);
+  CHECK(clone_first_half->getSize() == 3);
+  CHECK(read_until_stream_size_callback.value_ == "foo");
+  CHECK(read_until_it_can_callback.value_ == "foo");
+  process_session.read(clone_second_half, &read_until_stream_size_callback);
+  process_session.read(clone_second_half, &read_until_it_can_callback);
+  CHECK(clone_second_half->getSize() == 3);
+  CHECK(read_until_stream_size_callback.value_ == "bar");
+  CHECK(read_until_it_can_callback.value_ == "bar");
+}
+}  // namespace ContentRepositoryDependentTests
diff --git a/libminifi/test/unit/ProcessSessionTests.cpp 
b/libminifi/test/unit/ProcessSessionTests.cpp
index 2a40519..5629d43 100644
--- a/libminifi/test/unit/ProcessSessionTests.cpp
+++ b/libminifi/test/unit/ProcessSessionTests.cpp
@@ -21,6 +21,7 @@
 #include "core/ProcessSession.h"
 #include "core/Resource.h"
 #include "../TestBase.h"
+#include "ContentRepositoryDependentTests.h"
 
 namespace {
 
@@ -94,3 +95,8 @@ TEST_CASE("ProcessSession::rollback penalizes affected 
flowfiles", "[rollback]")
   next_flow_file_to_be_processed = process_session.get();
   REQUIRE(next_flow_file_to_be_processed == flow_file_3);
 }
+
+TEST_CASE("ProcessSession::read reads the flowfile from offset to size", 
"[readoffsetsize]") {
+  
ContentRepositoryDependentTests::testReadOnSmallerClonedFlowFiles(std::make_shared<core::repository::VolatileContentRepository>());
+  
ContentRepositoryDependentTests::testReadOnSmallerClonedFlowFiles(std::make_shared<core::repository::FileSystemRepository>());
+}
diff --git a/libminifi/test/unit/StreamTests.cpp 
b/libminifi/test/unit/StreamTests.cpp
index 99bd92f..ce7bc2f 100644
--- a/libminifi/test/unit/StreamTests.cpp
+++ b/libminifi/test/unit/StreamTests.cpp
@@ -25,6 +25,7 @@
 #include <utility>
 #include "../TestBase.h"
 #include "io/BaseStream.h"
+#include "io/StreamSlice.h"
 
 TEST_CASE("TestReadData", "[testread]") {
   auto base = std::make_shared<minifi::io::BufferStream>();
@@ -77,3 +78,23 @@ TEST_CASE("TestWrite1", "[testwrite]") {
   REQUIRE(8 == 
base->read(reinterpret_cast<uint8_t*>(const_cast<char*>(bytes.data())), 8));
   REQUIRE(bytes == "\x01\x02\x03\x04\x05\x06\x07\x08");
 }
+
+TEST_CASE("InvalidStreamSliceTest", "[teststreamslice]") {
+  std::shared_ptr<minifi::io::BaseStream> base = 
std::make_shared<minifi::io::BufferStream>();
+  base->write((const uint8_t*)"\x01\x02\x03\x04\x05\x06\x07\x08", 8);
+  REQUIRE_THROWS_WITH(std::make_shared<minifi::io::StreamSlice>(base, 0, 9), 
"StreamSlice is bigger than the Stream");
+  REQUIRE_THROWS_WITH(std::make_shared<minifi::io::StreamSlice>(base, 7, 3), 
"StreamSlice is bigger than the Stream");
+}
+
+TEST_CASE("StreamSliceTest1", "[teststreamslice]") {
+  std::shared_ptr<minifi::io::BaseStream> base = 
std::make_shared<minifi::io::BufferStream>();
+  base->write((const uint8_t*)"\x01\x02\x03\x04\x05\x06\x07\x08", 8);
+  std::shared_ptr<minifi::io::BaseStream> stream_slice = 
std::make_shared<minifi::io::StreamSlice>(base, 2, 4);
+  std::vector<uint8_t> buffer;
+  REQUIRE(stream_slice->read(buffer, stream_slice->size()) == 4);
+  stream_slice->seek(0);
+  std::vector<uint8_t> buffer2;
+  REQUIRE(stream_slice->read(buffer2, 1000) == 4);
+  REQUIRE(buffer == buffer2);
+  REQUIRE(buffer == std::vector<uint8_t>({3, 4, 5, 6}));
+}

Reply via email to