This is an automated email from the ASF dual-hosted git repository.

fgerlits pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit 5a3f5464cb9c6c95be79c270d3a793984621d190
Author: Marton Szasz <[email protected]>
AuthorDate: Wed May 26 15:49:50 2021 +0200

    MINIFICPP-1507 convert OutputStream::write to size_t
    
    Signed-off-by: Ferenc Gerlits <[email protected]>
    
    This closes #1083
---
 controller/Controller.h                            |  14 +-
 extensions/aws/s3/S3Wrapper.cpp                    |  20 +-
 extensions/bustache/ApplyTemplate.cpp              |   3 +-
 extensions/civetweb/processors/ListenHTTP.cpp      |  23 ++-
 extensions/http-curl/client/HTTPStream.cpp         |  33 ++--
 extensions/http-curl/client/HTTPStream.h           |  13 +-
 extensions/http-curl/tests/HTTPHandlers.h          |   2 +-
 .../http-curl/tests/unit/InvokeHTTPTests.cpp       |   3 +-
 extensions/jni/jvm/JniReferenceObjects.h           |   4 +-
 extensions/libarchive/CompressContent.h            |  19 +-
 extensions/libarchive/MergeContent.h               |  67 +++----
 extensions/libarchive/UnfocusArchiveEntry.cpp      |   6 +-
 extensions/librdkafka/ConsumeKafka.cpp             |   9 +-
 extensions/mqtt/processors/ConsumeMQTT.h           |  24 ++-
 extensions/opc/include/fetchopc.h                  |   4 +-
 extensions/opencv/CaptureRTSPFrame.h               |  15 +-
 extensions/opencv/FrameIO.h                        |   7 +-
 .../SourceInitiatedSubscriptionListener.cpp        |   3 +-
 .../rocksdb-repos/DatabaseContentRepository.cpp    |   8 +-
 extensions/rocksdb-repos/RocksDbStream.cpp         |  46 ++---
 extensions/rocksdb-repos/RocksDbStream.h           |   2 +-
 extensions/script/lua/LuaBaseStream.cpp            |   2 +-
 extensions/script/python/PyBaseStream.cpp          |   5 +-
 extensions/sensors/SensorBase.h                    |  23 +--
 extensions/sftp/client/SFTPClient.cpp              |  10 +-
 .../processors/ExecuteProcess.h                    |  10 +-
 .../processors/GenerateFlowFile.cpp                |   2 +-
 .../processors/GenerateFlowFile.h                  |  22 +--
 extensions/standard-processors/processors/GetTCP.h |   3 +-
 .../standard-processors/processors/ListenSyslog.h  |  15 +-
 .../standard-processors/processors/TailFile.cpp    |  15 +-
 .../tests/integration/SecureSocketGetTCPTest.cpp   |  20 +-
 .../TLSServerSocketSupportedProtocolsTest.cpp      |  16 +-
 .../standard-processors/tests/unit/GetTCPTests.cpp |   6 +-
 extensions/usb-camera/CMakeLists.txt               |   2 +-
 extensions/usb-camera/GetUSBCamera.cpp             |  10 +-
 .../CollectorInitiatedSubscription.cpp             |   9 +-
 .../windows-event-log/ConsumeWindowsEventLog.cpp   |   3 +-
 libminifi/include/io/AtomicEntryStream.h           |  26 ++-
 libminifi/include/io/BufferStream.h                |   4 +-
 libminifi/include/io/CRCStream.h                   |   6 +-
 libminifi/include/io/ClientSocket.h                |   2 +-
 libminifi/include/io/DescriptorStream.h            |   2 +-
 libminifi/include/io/FileStream.h                  |   2 +-
 libminifi/include/io/OutputStream.h                |  30 ++-
 libminifi/include/io/Stream.h                      |  10 +-
 libminifi/include/io/StreamPipe.h                  |   6 +-
 libminifi/include/io/ZlibStream.h                  |   4 +-
 libminifi/include/io/tls/SecureDescriptorStream.h  |   2 +-
 libminifi/include/io/tls/TLSServerSocket.h         |   8 +-
 libminifi/include/io/tls/TLSSocket.h               |   4 +-
 .../include/serialization/FlowFileV3Serializer.h   |   4 +-
 libminifi/include/sitetosite/Peer.h                |   2 +-
 libminifi/include/sitetosite/SiteToSiteClient.h    |   2 +-
 libminifi/include/utils/GeneralUtils.h             |   2 +
 libminifi/src/FlowFileRecord.cpp                   |  99 +++++-----
 libminifi/src/c2/ControllerSocketProtocol.cpp      |  10 +-
 libminifi/src/core/ContentSession.cpp              |   8 +-
 libminifi/src/core/ProcessSession.cpp              |   6 +-
 libminifi/src/io/BufferStream.cpp                  |   3 +-
 libminifi/src/io/ClientSocket.cpp                  |  19 +-
 libminifi/src/io/DescriptorStream.cpp              |  22 +--
 libminifi/src/io/FileStream.cpp                    |  49 +++--
 libminifi/src/io/OutputStream.cpp                  |  24 +--
 libminifi/src/io/ZlibStream.cpp                    |  26 ++-
 libminifi/src/io/tls/SecureDescriptorStream.cpp    |  34 ++--
 libminifi/src/io/tls/TLSServerSocket.cpp           |  79 ++++----
 libminifi/src/io/tls/TLSSocket.cpp                 |  22 +--
 libminifi/src/provenance/Provenance.cpp            | 206 ++++++++++++---------
 .../src/serialization/FlowFileV3Serializer.cpp     | 103 +++++------
 libminifi/src/sitetosite/Peer.cpp                  |   7 +-
 libminifi/src/sitetosite/RawSocketProtocol.cpp     |  21 ++-
 libminifi/src/sitetosite/SiteToSiteClient.cpp      |  88 +++++----
 libminifi/src/utils/ByteArrayCallback.cpp          |   2 +-
 libminifi/test/archive-tests/MergeFileTests.cpp    |   4 +-
 .../test/persistence-tests/PersistenceTests.cpp    |   2 +-
 .../test/rocksdb-tests/ContentSessionTests.cpp     |   5 +-
 .../rocksdb-tests/DBContentRepositoryTests.cpp     |   2 +-
 .../test/rocksdb-tests/RocksDBStreamTests.cpp      |  10 +-
 libminifi/test/sql-tests/SQLTestPlan.h             |   2 +-
 libminifi/test/unit/CRCTests.cpp                   |  14 +-
 libminifi/test/unit/FileStreamTests.cpp            |  10 +-
 libminifi/test/unit/FlowFileSerializationTests.cpp |   4 +-
 libminifi/test/unit/SiteToSiteHelper.h             |   4 +-
 libminifi/test/unit/SocketTests.cpp                |  59 +++---
 libminifi/test/unit/ZlibStreamTests.cpp            |  31 ++--
 nanofi/src/api/nanofi.cpp                          |   6 +-
 87 files changed, 793 insertions(+), 802 deletions(-)

diff --git a/controller/Controller.h b/controller/Controller.h
index 40fb046..99c475f 100644
--- a/controller/Controller.h
+++ b/controller/Controller.h
@@ -36,7 +36,7 @@ bool sendSingleCommand(std::unique_ptr<minifi::io::Socket> 
socket, uint8_t op, c
   minifi::io::BufferStream stream;
   stream.write(&op, 1);
   stream.write(value);
-  return 
static_cast<size_t>(socket->write(const_cast<uint8_t*>(stream.getBuffer()), 
gsl::narrow<int>(stream.size()))) == stream.size();
+  return socket->write(stream.getBuffer(), stream.size()) == stream.size();
 }
 
 /**
@@ -77,7 +77,7 @@ int updateFlow(std::unique_ptr<minifi::io::Socket> socket, 
std::ostream &out, st
   stream.write(&op, 1);
   stream.write("flow");
   stream.write(file);
-  if (socket->write(const_cast<uint8_t*>(stream.getBuffer()), 
gsl::narrow<int>(stream.size())) < 0) {
+  if (minifi::io::isError(socket->write(stream.getBuffer(), stream.size()))) {
     return -1;
   }
   // read the response
@@ -107,7 +107,7 @@ int getFullConnections(std::unique_ptr<minifi::io::Socket> 
socket, std::ostream
   minifi::io::BufferStream stream;
   stream.write(&op, 1);
   stream.write("getfull");
-  if (socket->write(const_cast<uint8_t*>(stream.getBuffer()), 
gsl::narrow<int>(stream.size())) < 0) {
+  if (minifi::io::isError(socket->write(stream.getBuffer(), stream.size()))) {
     return -1;
   }
   // read the response
@@ -133,7 +133,7 @@ int getJstacks(std::unique_ptr<minifi::io::Socket> socket, 
std::ostream &out) {
   minifi::io::BufferStream stream;
   stream.write(&op, 1);
   stream.write("jstack");
-  if (socket->write(const_cast<uint8_t*>(stream.getBuffer()), 
gsl::narrow<int>(stream.size())) < 0) {
+  if (minifi::io::isError(socket->write(stream.getBuffer(), stream.size()))) {
     return -1;
   }
   // read the response
@@ -171,7 +171,7 @@ int getConnectionSize(std::unique_ptr<minifi::io::Socket> 
socket, std::ostream &
   stream.write(&op, 1);
   stream.write("queue");
   stream.write(connection);
-  if (socket->write(const_cast<uint8_t*>(stream.getBuffer()), 
gsl::narrow<int>(stream.size())) < 0) {
+  if (minifi::io::isError(socket->write(stream.getBuffer(), stream.size()))) {
     return -1;
   }
   // read the response
@@ -191,7 +191,7 @@ int listComponents(std::unique_ptr<minifi::io::Socket> 
socket, std::ostream &out
   uint8_t op = minifi::c2::Operation::DESCRIBE;
   stream.write(&op, 1);
   stream.write("components");
-  if (socket->write(const_cast<uint8_t*>(stream.getBuffer()), 
gsl::narrow<int>(stream.size())) < 0) {
+  if (minifi::io::isError(socket->write(stream.getBuffer(), stream.size()))) {
     return -1;
   }
   uint16_t responses = 0;
@@ -215,7 +215,7 @@ int listConnections(std::unique_ptr<minifi::io::Socket> 
socket, std::ostream &ou
   uint8_t op = minifi::c2::Operation::DESCRIBE;
   stream.write(&op, 1);
   stream.write("connections");
-  if (socket->write(const_cast<uint8_t*>(stream.getBuffer()), 
gsl::narrow<int>(stream.size())) < 0) {
+  if (minifi::io::isError(socket->write(stream.getBuffer(), stream.size()))) {
     return -1;
   }
   uint16_t responses = 0;
diff --git a/extensions/aws/s3/S3Wrapper.cpp b/extensions/aws/s3/S3Wrapper.cpp
index a7c375a..d2d55f1 100644
--- a/extensions/aws/s3/S3Wrapper.cpp
+++ b/extensions/aws/s3/S3Wrapper.cpp
@@ -24,11 +24,12 @@
 #include <utility>
 #include <vector>
 
+#include "S3ClientRequestSender.h"
 #include "utils/GeneralUtils.h"
 #include "utils/StringUtils.h"
 #include "utils/file/FileUtils.h"
 #include "utils/RegexUtils.h"
-#include "S3ClientRequestSender.h"
+#include "utils/gsl.h"
 
 namespace org {
 namespace apache {
@@ -124,19 +125,20 @@ bool S3Wrapper::deleteObject(const 
DeleteObjectRequestParameters& params) {
 
 int64_t S3Wrapper::writeFetchedBody(Aws::IOStream& source, const int64_t 
data_size, io::BaseStream& output) {
   std::vector<uint8_t> buffer(4096);
-  int64_t write_size = 0;
-  while (write_size < data_size) {
-    auto next_write_size = (std::min)(data_size - write_size, 
static_cast<int64_t>(4096));
-    if (!source.read(reinterpret_cast<char*>(buffer.data()), next_write_size)) 
{
+  size_t write_size = 0;
+  if (data_size < 0) return 0;
+  while (write_size < gsl::narrow<uint64_t>(data_size)) {
+    const auto next_write_size = (std::min)(gsl::narrow<size_t>(data_size) - 
write_size, size_t{4096});
+    if (!source.read(reinterpret_cast<char*>(buffer.data()), 
gsl::narrow<std::streamsize>(next_write_size))) {
       return -1;
     }
-    auto ret = output.write(buffer.data(), next_write_size);
-    if (ret < 0) {
-      return ret;
+    const auto ret = output.write(buffer.data(), next_write_size);
+    if (io::isError(ret)) {
+      return -1;
     }
     write_size += next_write_size;
   }
-  return write_size;
+  return gsl::narrow<int64_t>(write_size);
 }
 
 minifi::utils::optional<GetObjectResult> S3Wrapper::getObject(const 
GetObjectRequestParameters& get_object_params, io::BaseStream& out_body) {
diff --git a/extensions/bustache/ApplyTemplate.cpp 
b/extensions/bustache/ApplyTemplate.cpp
index d4cdde5..84b0338 100644
--- a/extensions/bustache/ApplyTemplate.cpp
+++ b/extensions/bustache/ApplyTemplate.cpp
@@ -84,8 +84,7 @@ int64_t ApplyTemplate::WriteCallback::process(const 
std::shared_ptr<io::BaseStre
 
   // TODO(calebj) write ostream reciever for format() to prevent excessive 
copying
   std::string ostring = to_string(format(data));
-  stream->write(reinterpret_cast<uint8_t *>(const_cast<char 
*>(ostring.c_str())),
-                    ostring.length());
+  stream->write(reinterpret_cast<const uint8_t *>(ostring.c_str()), 
ostring.length());
 
   return ostring.length();
 }
diff --git a/extensions/civetweb/processors/ListenHTTP.cpp 
b/extensions/civetweb/processors/ListenHTTP.cpp
index e544e82..09c52e8 100644
--- a/extensions/civetweb/processors/ListenHTTP.cpp
+++ b/extensions/civetweb/processors/ListenHTTP.cpp
@@ -476,29 +476,27 @@ void ListenHTTP::Handler::writeBody(mg_connection *conn, 
const mg_request_info *
 
 std::unique_ptr<io::BufferStream> 
ListenHTTP::Handler::createContentBuffer(struct mg_connection *conn, const 
struct mg_request_info *req_info) {
   auto content_buffer = utils::make_unique<io::BufferStream>();
-  int64_t rlen;
-  int64_t nlen = 0;
+  size_t nlen = 0;
   int64_t tlen = req_info->content_length;
   uint8_t buf[16384];
 
   // if we have no content length we should call mg_read until
   // there is no data left from the stream to be HTTP/1.1 compliant
-  while (tlen == -1 || nlen < tlen) {
-    rlen = tlen == -1 ? sizeof(buf) : tlen - nlen;
-
-    if (rlen > (int64_t) sizeof(buf)) {
-      rlen = (int64_t) sizeof(buf);
+  while (tlen == -1 || (tlen > 0 && nlen < gsl::narrow<size_t>(tlen))) {
+    auto rlen = tlen == -1 ? sizeof(buf) : gsl::narrow<size_t>(tlen) - nlen;
+    if (rlen > sizeof(buf)) {
+      rlen = sizeof(buf);
     }
 
     // Read a buffer of data from client
-    rlen = mg_read(conn, &buf[0], (size_t) rlen);
-
-    if (rlen <= 0) {
+    const auto mg_read_return = mg_read(conn, &buf[0], rlen);
+    if (mg_read_return <= 0) {
       break;
     }
+    rlen = gsl::narrow<size_t>(mg_read_return);
 
     // Transfer buffer data to the output stream
-    content_buffer->write(&buf[0], gsl::narrow<int>(rlen));
+    content_buffer->write(&buf[0], rlen);
 
     nlen += rlen;
   }
@@ -511,7 +509,8 @@ 
ListenHTTP::WriteCallback::WriteCallback(std::unique_ptr<io::BufferStream> reque
 }
 
 int64_t ListenHTTP::WriteCallback::process(const 
std::shared_ptr<io::BaseStream>& stream) {
-  return stream->write(const_cast<uint8_t*>(request_content_->getBuffer()), 
gsl::narrow<int>(request_content_->size()));
+  const auto write_ret = stream->write(request_content_->getBuffer(), 
request_content_->size());
+  return io::isError(write_ret) ? -1 : gsl::narrow<int64_t>(write_ret);
 }
 
 bool ListenHTTP::isSecure() const {
diff --git a/extensions/http-curl/client/HTTPStream.cpp 
b/extensions/http-curl/client/HTTPStream.cpp
index 9366f91..639e875 100644
--- a/extensions/http-curl/client/HTTPStream.cpp
+++ b/extensions/http-curl/client/HTTPStream.cpp
@@ -19,6 +19,7 @@
 #include "HTTPStream.h"
 
 #include <fstream>
+#include <utility>
 #include <memory>
 
 #include "HTTPCallback.h"
@@ -32,7 +33,7 @@ namespace minifi {
 namespace io {
 
 HttpStream::HttpStream(std::shared_ptr<utils::HTTPClient> client)
-    : http_client_(client),
+    : http_client_(std::move(client)),
       written(0),
       // given the nature of the stream we don't want to slow libCURL, we will 
produce
       // a warning instead allowing us to adjust it server side or through the 
local configuration.
@@ -54,27 +55,23 @@ void HttpStream::seek(size_t /*offset*/) {
 
 // data stream overrides
 
-int HttpStream::write(const uint8_t *value, int size) {
-  gsl_Expects(size >= 0);
-  if (size == 0) {
-    return 0;
+size_t HttpStream::write(const uint8_t *value, size_t size) {
+  if (size == 0) return 0;
+  if (IsNullOrEmpty(value)) {
+    return STREAM_ERROR;
   }
-  if (!IsNullOrEmpty(value)) {
+  if (!started_) {
+    std::lock_guard<std::mutex> lock(mutex_);
     if (!started_) {
-      std::lock_guard<std::mutex> lock(mutex_);
-      if (!started_) {
-        callback_.ptr = &http_callback_;
-        callback_.pos = 0;
-        http_client_->setUploadCallback(&callback_);
-        http_client_future_ = std::async(std::launch::async, submit_client, 
http_client_);
-        started_ = true;
-      }
+      callback_.ptr = &http_callback_;
+      callback_.pos = 0;
+      http_client_->setUploadCallback(&callback_);
+      http_client_future_ = std::async(std::launch::async, submit_client, 
http_client_);
+      started_ = true;
     }
-    http_callback_.process(value, size);
-    return size;
-  } else {
-    return -1;
   }
+  http_callback_.process(value, size);
+  return size;
 }
 
 size_t HttpStream::read(uint8_t *buf, size_t buflen) {
diff --git a/extensions/http-curl/client/HTTPStream.h 
b/extensions/http-curl/client/HTTPStream.h
index b23d515..d6ce39b 100644
--- a/extensions/http-curl/client/HTTPStream.h
+++ b/extensions/http-curl/client/HTTPStream.h
@@ -97,7 +97,7 @@ class HttpStream : public io::BaseStream {
    * @param value value to write
    * @param size size of value
    */
-  int write(const uint8_t *value, int size) override;
+  size_t write(const uint8_t *value, size_t size) override;
 
   static bool submit_client(std::shared_ptr<utils::HTTPClient> client) {
     if (client == nullptr)
@@ -132,17 +132,6 @@ class HttpStream : public io::BaseStream {
   }
 
  protected:
-  /**
-   * Populates the vector using the provided type name.
-   * @param buf output buffer
-   * @param t incoming object
-   * @returns number of bytes read.
-   */
-  template<typename T>
-  int readBuffer(std::vector<uint8_t>&, const T&);
-
-  void reset();
-
   std::vector<uint8_t> array;
 
   std::shared_ptr<utils::HTTPClient> http_client_;
diff --git a/extensions/http-curl/tests/HTTPHandlers.h 
b/extensions/http-curl/tests/HTTPHandlers.h
index 890acb7..e797b37 100644
--- a/extensions/http-curl/tests/HTTPHandlers.h
+++ b/extensions/http-curl/tests/HTTPHandlers.h
@@ -303,7 +303,7 @@ class FlowFileResponder : public ServerAwareHandler {
         }
         uint64_t length = flow->data.size();
         stream.write(length);
-        stream.write(flow->data.data(), gsl::narrow<int>(length));
+        stream.write(flow->data.data(), gsl::narrow<size_t>(length));
       }
     } else {
       mg_printf(conn, "HTTP/1.1 200 OK\r\nConnection: "
diff --git a/extensions/http-curl/tests/unit/InvokeHTTPTests.cpp 
b/extensions/http-curl/tests/unit/InvokeHTTPTests.cpp
index 9c03e64..a77bf00 100644
--- a/extensions/http-curl/tests/unit/InvokeHTTPTests.cpp
+++ b/extensions/http-curl/tests/unit/InvokeHTTPTests.cpp
@@ -152,7 +152,8 @@ class CallBack : public minifi::OutputStreamCallback {
   virtual int64_t process(const std::shared_ptr<minifi::io::BaseStream>& 
stream) {
     // leaving the typo for posterity sake
     std::string st = "we're gnna write some test stuff";
-    return 
stream->write(reinterpret_cast<uint8_t*>(const_cast<char*>(st.c_str())), 
gsl::narrow<int>(st.length()));
+    const auto write_ret = stream->write(reinterpret_cast<const 
uint8_t*>(st.c_str()), st.length());
+    return minifi::io::isError(write_ret) ? -1 : 
gsl::narrow<int64_t>(write_ret);
   }
 };
 
diff --git a/extensions/jni/jvm/JniReferenceObjects.h 
b/extensions/jni/jvm/JniReferenceObjects.h
index 5f994bb..b7ef344 100644
--- a/extensions/jni/jvm/JniReferenceObjects.h
+++ b/extensions/jni/jvm/JniReferenceObjects.h
@@ -29,6 +29,7 @@
 #include "core/Processor.h"
 #include "core/ProcessSession.h"
 #include "core/WeakReference.h"
+#include "utils/gsl.h"
 
 namespace org {
 namespace apache {
@@ -101,7 +102,8 @@ class JniByteOutStream : public 
minifi::OutputStreamCallback {
 
   virtual ~JniByteOutStream() = default;
   virtual int64_t process(const std::shared_ptr<minifi::io::BaseStream>& 
stream) {
-    return stream->write((uint8_t*) bytes_, length_);
+    const auto write_ret = stream->write((uint8_t*)bytes_, length_);
+    return io::isError(write_ret) ? -1 : gsl::narrow<int64_t>(write_ret);
   }
  private:
   jbyte *bytes_;
diff --git a/extensions/libarchive/CompressContent.h 
b/extensions/libarchive/CompressContent.h
index 9e06aa6..8865dfe 100644
--- a/extensions/libarchive/CompressContent.h
+++ b/extensions/libarchive/CompressContent.h
@@ -180,11 +180,10 @@ public:
     int status_;
 
     static la_ssize_t archive_write(struct archive* /*arch*/, void *context, 
const void *buff, size_t size) {
-      WriteCallback *callback = (WriteCallback *) context;
-      la_ssize_t ret = 
callback->stream_->write(reinterpret_cast<uint8_t*>(const_cast<void*>(buff)), 
gsl::narrow<int>(size));
-      if (ret > 0)
-        callback->size_ += (int64_t) ret;
-      return ret;
+      auto* const callback = static_cast<WriteCallback*>(context);
+      const auto ret = callback->stream_->write(reinterpret_cast<const 
uint8_t*>(buff), size);
+      if (!io::isError(ret)) callback->size_ += gsl::narrow<int64_t>(ret);
+      return io::isError(ret) ? -1 : gsl::narrow<la_ssize_t>(ret);
     }
 
     static la_ssize_t archive_read(struct archive* archive, void *context, 
const void **buff) {
@@ -337,8 +336,8 @@ public:
           if (read_result == 0)
             break;
           size_ += read_result;
-          const auto write_result = 
stream_->write(reinterpret_cast<uint8_t*>(buffer), 
gsl::narrow<int>(read_result));
-          if (write_result < 0) {
+          const auto write_result = 
stream_->write(reinterpret_cast<uint8_t*>(buffer), 
gsl::narrow<size_t>(read_result));
+          if (io::isError(write_result)) {
             archive_read_log_error_cleanup(arch);
             return -1;
           }
@@ -377,8 +376,8 @@ public:
 
         int64_t process(const std::shared_ptr<io::BaseStream>& inputStream) 
override {
           std::vector<uint8_t> buffer(16 * 1024U);
-          int64_t read_size = 0;
-          while (read_size < gsl::narrow<int64_t>(writer_.flow_->getSize())) {
+          size_t read_size = 0;
+          while (read_size < writer_.flow_->getSize()) {
             const auto ret = inputStream->read(buffer.data(), buffer.size());
             if (io::isError(ret)) {
               return -1;
@@ -393,7 +392,7 @@ public:
             }
           }
           outputStream_->close();
-          return read_size;
+          return gsl::narrow<int64_t>(read_size);
         }
 
         GzipWriteCallback& writer_;
diff --git a/extensions/libarchive/MergeContent.h 
b/extensions/libarchive/MergeContent.h
index ac83728..d14b924 100644
--- a/extensions/libarchive/MergeContent.h
+++ b/extensions/libarchive/MergeContent.h
@@ -64,7 +64,7 @@ class BinaryConcatenationMerge : public MergeBin {
   BinaryConcatenationMerge(const std::string& header, const std::string& 
footer, const std::string& demarcator);
 
   void merge(core::ProcessContext *context, core::ProcessSession *session,
-      std::deque<std::shared_ptr<core::FlowFile>> &flows, FlowFileSerializer& 
serializer, const std::shared_ptr<core::FlowFile> &flowFile);
+      std::deque<std::shared_ptr<core::FlowFile>> &flows, FlowFileSerializer& 
serializer, const std::shared_ptr<core::FlowFile> &flowFile) override;
   // Nest Callback Class for write stream
   class WriteCallback: public OutputStreamCallback {
    public:
@@ -77,35 +77,35 @@ class BinaryConcatenationMerge : public MergeBin {
     std::string &demarcator_;
     std::deque<std::shared_ptr<core::FlowFile>> &flows_;
     FlowFileSerializer& serializer_;
-    int64_t process(const std::shared_ptr<io::BaseStream>& stream) {
-      int64_t ret = 0;
+    int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
+      size_t write_size_sum = 0;
       if (!header_.empty()) {
-        int64_t len = 
stream->write(reinterpret_cast<uint8_t*>(const_cast<char*>(header_.data())), 
gsl::narrow<int>(header_.size()));
-        if (len < 0)
-          return len;
-        ret += len;
+        const auto write_ret = stream->write(reinterpret_cast<const 
uint8_t*>(header_.data()), header_.size());
+        if (io::isError(write_ret))
+          return -1;
+        write_size_sum += write_ret;
       }
       bool isFirst = true;
-      for (auto flow : flows_) {
+      for (const auto& flow : flows_) {
         if (!isFirst && !demarcator_.empty()) {
-          int64_t len = 
stream->write(reinterpret_cast<uint8_t*>(const_cast<char*>(demarcator_.data())),
 gsl::narrow<int>(demarcator_.size()));
-          if (len < 0)
-            return len;
-          ret += len;
+          const auto write_ret = stream->write(reinterpret_cast<const 
uint8_t*>(demarcator_.data()), demarcator_.size());
+          if (io::isError(write_ret))
+            return -1;
+          write_size_sum += write_ret;
         }
         int len = serializer_.serialize(flow, stream);
         if (len < 0)
           return len;
-        ret += len;
+        write_size_sum += gsl::narrow<size_t>(len);
         isFirst = false;
       }
       if (!footer_.empty()) {
-        int64_t len = 
stream->write(reinterpret_cast<uint8_t*>(const_cast<char*>(footer_.data())), 
gsl::narrow<int>(footer_.size()));
-        if (len < 0)
-          return len;
-        ret += len;
+        const auto write_ret = stream->write(reinterpret_cast<const 
uint8_t*>(footer_.data()), footer_.size());
+        if (io::isError(write_ret))
+          return -1;
+        write_size_sum += write_ret;
       }
-      return ret;
+      return gsl::narrow<int64_t>(write_size_sum);
     }
   };
 
@@ -122,25 +122,26 @@ class ArchiveMerge {
   class ArchiveWriter : public io::OutputStream {
    public:
     ArchiveWriter(struct archive *arch, struct archive_entry *entry) : 
arch_(arch), entry_(entry) {}
-    int write(const uint8_t* data, int size) override {
+    size_t write(const uint8_t* data, size_t size) override {
       if (!header_emitted_) {
         if (archive_write_header(arch_, entry_) != ARCHIVE_OK) {
-          return -1;
+          return io::STREAM_ERROR;
         }
         header_emitted_ = true;
       }
-      int totalWrote = 0;
-      int remaining = size;
+      size_t totalWrote = 0;
+      size_t remaining = size;
       while (remaining > 0) {
         const auto ret = archive_write_data(arch_, data + totalWrote, 
remaining);
         if (ret < 0) {
-          return ret;
+          return io::STREAM_ERROR;
         }
-        if (ret == 0) {
+        const auto zret = gsl::narrow<size_t>(ret);
+        if (zret == 0) {
           break;
         }
-        totalWrote += ret;
-        remaining -= ret;
+        totalWrote += zret;
+        remaining -= zret;
       }
       return totalWrote;
     }
@@ -161,7 +162,7 @@ class ArchiveMerge {
       size_ = 0;
       stream_ = nullptr;
     }
-    ~WriteCallback() = default;
+    ~WriteCallback() override = default;
 
     std::string merge_type_;
     std::deque<std::shared_ptr<core::FlowFile>> &flows_;
@@ -174,10 +175,10 @@ class ArchiveMerge {
       WriteCallback *callback = (WriteCallback *) context;
       uint8_t* data = reinterpret_cast<uint8_t*>(const_cast<void*>(buff));
       la_ssize_t totalWrote = 0;
-      int remaining = gsl::narrow<int>(size);
+      size_t remaining = size;
       while (remaining > 0) {
-        la_ssize_t ret = callback->stream_->write(data + totalWrote, 
remaining);
-        if (ret < 0) {
+        const auto ret = callback->stream_->write(data + totalWrote, 
remaining);
+        if (io::isError(ret)) {
           // libarchive expects us to return -1 on error
           return -1;
         }
@@ -185,13 +186,13 @@ class ArchiveMerge {
           break;
         }
         callback->size_ += ret;
-        totalWrote += ret;
+        totalWrote += static_cast<la_ssize_t>(ret);
         remaining -= ret;
       }
       return totalWrote;
     }
 
-    int64_t process(const std::shared_ptr<io::BaseStream>& stream) {
+    int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
       struct archive *arch;
 
       arch = archive_write_new();
@@ -305,7 +306,7 @@ class MergeContent : public processors::BinFiles {
     attributeStrategy_ = merge_content_options::ATTRIBUTE_STRATEGY_KEEP_COMMON;
   }
   // Destructor
-  virtual ~MergeContent() = default;
+  ~MergeContent() override = default;
   // Processor Name
   static constexpr char const* ProcessorName = "MergeContent";
   // Supported Properties
diff --git a/extensions/libarchive/UnfocusArchiveEntry.cpp 
b/extensions/libarchive/UnfocusArchiveEntry.cpp
index e1fb1f9..de54bc2 100644
--- a/extensions/libarchive/UnfocusArchiveEntry.cpp
+++ b/extensions/libarchive/UnfocusArchiveEntry.cpp
@@ -153,9 +153,9 @@ typedef struct {
 } UnfocusArchiveEntryWriteData;
 
 la_ssize_t UnfocusArchiveEntry::WriteCallback::write_cb(struct archive *, void 
*d, const void *buffer, size_t length) {
-  auto data = static_cast<UnfocusArchiveEntryWriteData *>(d);
-  const uint8_t *ui_buffer = static_cast<const uint8_t*>(buffer);
-  return data->stream->write(const_cast<uint8_t*>(ui_buffer), 
gsl::narrow<int>(length));
+  auto* const data = static_cast<UnfocusArchiveEntryWriteData *>(d);
+  const auto write_ret = data->stream->write(static_cast<const 
uint8_t*>(buffer), length);
+  return io::isError(write_ret) ? -1 : gsl::narrow<la_ssize_t>(write_ret);
 }
 
 int64_t UnfocusArchiveEntry::WriteCallback::process(const 
std::shared_ptr<io::BaseStream>& stream) {
diff --git a/extensions/librdkafka/ConsumeKafka.cpp 
b/extensions/librdkafka/ConsumeKafka.cpp
index 290466c..38f7f99 100644
--- a/extensions/librdkafka/ConsumeKafka.cpp
+++ b/extensions/librdkafka/ConsumeKafka.cpp
@@ -556,11 +556,10 @@ void ConsumeKafka::onTrigger(core::ProcessContext* /* 
context */, core::ProcessS
 }
 
 int64_t ConsumeKafka::WriteCallback::process(const 
std::shared_ptr<io::BaseStream>& stream) {
-  int64_t ret = 0;
-  if (data_) {
-    ret = stream->write(data_, gsl::narrow<int>(dataSize_));
-  }
-  return ret;
+  if (!data_) return 0;
+  const auto write_ret = stream->write(data_, dataSize_);
+  if (io::isError(write_ret)) return -1;
+  return gsl::narrow<int64_t>(write_ret);
 }
 
 }  // namespace processors
diff --git a/extensions/mqtt/processors/ConsumeMQTT.h 
b/extensions/mqtt/processors/ConsumeMQTT.h
index b3884fb..10208c6 100644
--- a/extensions/mqtt/processors/ConsumeMQTT.h
+++ b/extensions/mqtt/processors/ConsumeMQTT.h
@@ -32,6 +32,7 @@
 #include "concurrentqueue.h"
 #include "MQTTClient.h"
 #include "AbstractMQTTProcessor.h"
+#include "utils/gsl.h"
 
 namespace org {
 namespace apache {
@@ -57,7 +58,7 @@ class ConsumeMQTT : public processors::AbstractMQTTProcessor {
     maxSegSize_ = ULLONG_MAX;
   }
   // Destructor
-  virtual ~ConsumeMQTT() {
+  ~ConsumeMQTT() override {
     MQTTClient_message *message;
     while (queue_.try_dequeue(message)) {
       MQTTClient_freeMessage(&message);
@@ -74,18 +75,23 @@ class ConsumeMQTT : public 
processors::AbstractMQTTProcessor {
   // Nest Callback Class for write stream
   class WriteCallback : public OutputStreamCallback {
    public:
-    WriteCallback(MQTTClient_message *message)
+    explicit WriteCallback(MQTTClient_message *message)
         : message_(message) {
-      status_ = 0;
     }
     MQTTClient_message *message_;
-    int64_t process(const std::shared_ptr<io::BaseStream>& stream) {
-      int64_t len = 
stream->write(reinterpret_cast<uint8_t*>(message_->payload), 
message_->payloadlen);
-      if (len < 0)
+    int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
+      if (message_->payloadlen < 0) {
         status_ = -1;
-      return len;
+        return -1;
+      }
+      const auto len = 
stream->write(reinterpret_cast<uint8_t*>(message_->payload), 
gsl::narrow<size_t>(message_->payloadlen));
+      if (io::isError(len)) {
+        status_ = -1;
+        return -1;
+      }
+      return gsl::narrow<int64_t>(len);
     }
-    int status_;
+    int status_ = 0;
   };
 
  public:
@@ -99,7 +105,7 @@ class ConsumeMQTT : public processors::AbstractMQTTProcessor 
{
   // OnTrigger method, implemented by NiFi ConsumeMQTT
   void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const 
std::shared_ptr<core::ProcessSession> &session) override;
   // Initialize, over write by NiFi ConsumeMQTT
-  void initialize(void) override;
+  void initialize() override;
   bool enqueueReceiveMQTTMsg(MQTTClient_message *message) override;
 
  protected:
diff --git a/extensions/opc/include/fetchopc.h 
b/extensions/opc/include/fetchopc.h
index 3a1d978..f4102c2 100644
--- a/extensions/opc/include/fetchopc.h
+++ b/extensions/opc/include/fetchopc.h
@@ -38,6 +38,7 @@
 #include "controllers/SSLContextService.h"
 #include "core/logging/LoggerConfiguration.h"
 #include "utils/Id.h"
+#include "utils/gsl.h"
 
 namespace org {
 namespace apache {
@@ -83,7 +84,8 @@ protected:
       : data_(data) {
     }
     int64_t process(const std::shared_ptr<io::BaseStream>& stream) {
-      return 
stream->write(reinterpret_cast<uint8_t*>(const_cast<char*>(data_.c_str())), 
data_.size());
+      const auto write_ret = stream->write(reinterpret_cast<const 
uint8_t*>(data_.c_str()), data_.size());
+      return io::isError(write_ret) ? -1 : gsl::narrow<int64_t>(write_ret);
     }
   };
   std::string nodeID_;
diff --git a/extensions/opencv/CaptureRTSPFrame.h 
b/extensions/opencv/CaptureRTSPFrame.h
index 8d16a1e..0262478 100644
--- a/extensions/opencv/CaptureRTSPFrame.h
+++ b/extensions/opencv/CaptureRTSPFrame.h
@@ -19,13 +19,13 @@
 #define NIFI_MINIFI_CPP_CAPTURERTSPFRAME_H
 
 #include <atomic>
-
-#include <core/Resource.h>
-#include <core/Processor.h>
-#include <opencv2/opencv.hpp>
-
 #include <iomanip>
 #include <ctime>
+#include <opencv2/opencv.hpp>
+
+#include "core/Resource.h"
+#include "core/Processor.h"
+#include "utils/gsl.h"
 
 namespace org {
 namespace apache {
@@ -69,10 +69,9 @@ class CaptureRTSPFrame : public core::Processor {
     ~CaptureRTSPFrameWriteCallback() override = default;
 
     int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
-      int64_t ret = 0;
       imencode(image_encoding_, image_mat_, image_buf_);
-      ret = stream->write(image_buf_.data(), image_buf_.size());
-      return ret;
+      const auto ret = stream->write(image_buf_.data(), image_buf_.size());
+      return io::isError(ret) ? -1 : gsl::narrow<int64_t>(ret);
     }
 
    private:
diff --git a/extensions/opencv/FrameIO.h b/extensions/opencv/FrameIO.h
index 73155a3..fa8d192 100644
--- a/extensions/opencv/FrameIO.h
+++ b/extensions/opencv/FrameIO.h
@@ -18,6 +18,8 @@
 #ifndef NIFI_MINIFI_CPP_FRAMEIO_H
 #define NIFI_MINIFI_CPP_FRAMEIO_H
 
+#include "utils/gsl.h"
+
 namespace org {
 namespace apache {
 namespace nifi {
@@ -33,10 +35,9 @@ class FrameWriteCallback : public OutputStreamCallback {
     ~FrameWriteCallback() override = default;
 
     int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
-      int64_t ret = 0;
       imencode(image_encoding_, image_mat_, image_buf_);
-      ret = stream->write(image_buf_.data(), image_buf_.size());
-      return ret;
+      const auto ret = stream->write(image_buf_.data(), image_buf_.size());
+      return io::isError(ret) ? -1 : gsl::narrow<int64_t>(ret);
     }
 
   private:
diff --git 
a/extensions/openwsman/processors/SourceInitiatedSubscriptionListener.cpp 
b/extensions/openwsman/processors/SourceInitiatedSubscriptionListener.cpp
index 39a1eff..ee93d01 100644
--- a/extensions/openwsman/processors/SourceInitiatedSubscriptionListener.cpp
+++ b/extensions/openwsman/processors/SourceInitiatedSubscriptionListener.cpp
@@ -606,7 +606,8 @@ 
SourceInitiatedSubscriptionListener::Handler::WriteCallback::WriteCallback(char*
 }
 
 int64_t 
SourceInitiatedSubscriptionListener::Handler::WriteCallback::process(const 
std::shared_ptr<io::BaseStream>& stream) {
-  return stream->write(reinterpret_cast<uint8_t*>(text_), strlen(text_));
+  const auto write_ret = stream->write(reinterpret_cast<uint8_t*>(text_), 
strlen(text_));
+  return io::isError(write_ret) ? -1 : gsl::narrow<int64_t>(write_ret);
 }
 
 int 
SourceInitiatedSubscriptionListener::Handler::enumerateEventCallback(WsXmlNodeH 
node, void* data) {
diff --git a/extensions/rocksdb-repos/DatabaseContentRepository.cpp 
b/extensions/rocksdb-repos/DatabaseContentRepository.cpp
index 54e42d4..b7e5ef9 100644
--- a/extensions/rocksdb-repos/DatabaseContentRepository.cpp
+++ b/extensions/rocksdb-repos/DatabaseContentRepository.cpp
@@ -87,8 +87,8 @@ void DatabaseContentRepository::Session::commit() {
     if (outStream == nullptr) {
       throw Exception(REPOSITORY_EXCEPTION, "Couldn't open the underlying 
resource for write: " + resource.first->getContentFullPath());
     }
-    const int size = gsl::narrow<int>(resource.second->size());
-    if (outStream->write(const_cast<uint8_t*>(resource.second->getBuffer()), 
size) != size) {
+    const auto size = resource.second->size();
+    if (outStream->write(resource.second->getBuffer(), size) != size) {
       throw Exception(REPOSITORY_EXCEPTION, "Failed to write new resource: " + 
resource.first->getContentFullPath());
     }
   }
@@ -97,8 +97,8 @@ void DatabaseContentRepository::Session::commit() {
     if (outStream == nullptr) {
       throw Exception(REPOSITORY_EXCEPTION, "Couldn't open the underlying 
resource for append: " + resource.first->getContentFullPath());
     }
-    const int size = gsl::narrow<int>(resource.second->size());
-    if (outStream->write(const_cast<uint8_t*>(resource.second->getBuffer()), 
size) != size) {
+    const auto size = resource.second->size();
+    if (outStream->write(resource.second->getBuffer(), size) != size) {
       throw Exception(REPOSITORY_EXCEPTION, "Failed to append to resource: " + 
resource.first->getContentFullPath());
     }
   }
diff --git a/extensions/rocksdb-repos/RocksDbStream.cpp 
b/extensions/rocksdb-repos/RocksDbStream.cpp
index 735692b..4146a87 100644
--- a/extensions/rocksdb-repos/RocksDbStream.cpp
+++ b/extensions/rocksdb-repos/RocksDbStream.cpp
@@ -51,36 +51,28 @@ void RocksDbStream::seek(size_t /*offset*/) {
   // noop
 }
 
-int RocksDbStream::write(const uint8_t *value, int size) {
-  gsl_Expects(size >= 0);
-  if (!write_enable_) {
-    return -1;
+size_t RocksDbStream::write(const uint8_t *value, size_t size) {
+  if (!write_enable_) return STREAM_ERROR;
+  if (size == 0) return 0;
+  if (IsNullOrEmpty(value)) return STREAM_ERROR;
+  auto opendb = db_->open();
+  if (!opendb) {
+    return STREAM_ERROR;
   }
-  if (size == 0) {
-    return 0;
+  rocksdb::Slice slice_value((const char*)value, size);
+  rocksdb::Status status;
+  size_ += size;
+  if (batch_ != nullptr) {
+    status = batch_->Merge(path_, slice_value);
+  } else {
+    rocksdb::WriteOptions opts;
+    opts.sync = true;
+    status = opendb->Merge(opts, path_, slice_value);
   }
-  if (!IsNullOrEmpty(value)) {
-    auto opendb = db_->open();
-    if (!opendb) {
-      return -1;
-    }
-    rocksdb::Slice slice_value((const char *) value, size);
-    rocksdb::Status status;
-    size_ += size;
-    if (batch_ != nullptr) {
-      status = batch_->Merge(path_, slice_value);
-    } else {
-      rocksdb::WriteOptions opts;
-      opts.sync = true;
-      status = opendb->Merge(opts, path_, slice_value);
-    }
-    if (status.ok()) {
-      return size;
-    } else {
-      return -1;
-    }
+  if (status.ok()) {
+    return size;
   } else {
-    return -1;
+    return STREAM_ERROR;
   }
 }
 
diff --git a/extensions/rocksdb-repos/RocksDbStream.h 
b/extensions/rocksdb-repos/RocksDbStream.h
index cf4946f..e526d8c 100644
--- a/extensions/rocksdb-repos/RocksDbStream.h
+++ b/extensions/rocksdb-repos/RocksDbStream.h
@@ -77,7 +77,7 @@ class RocksDbStream : public io::BaseStream {
    * @param value value to write
    * @param size size of value
    */
-  int write(const uint8_t *value, int size) override;
+  size_t write(const uint8_t *value, size_t size) override;
 
  protected:
   std::string path_;
diff --git a/extensions/script/lua/LuaBaseStream.cpp 
b/extensions/script/lua/LuaBaseStream.cpp
index 3545c77..5e9d423 100644
--- a/extensions/script/lua/LuaBaseStream.cpp
+++ b/extensions/script/lua/LuaBaseStream.cpp
@@ -60,7 +60,7 @@ std::string LuaBaseStream::read(size_t len) {
 }
 
 size_t LuaBaseStream::write(std::string buf) {
-  return static_cast<size_t>(stream_->write(reinterpret_cast<uint8_t 
*>(const_cast<char *>(buf.data())), static_cast<int>(buf.length())));
+  return stream_->write(reinterpret_cast<const uint8_t*>(buf.data()), 
buf.length());
 }
 
 } /* namespace lua */
diff --git a/extensions/script/python/PyBaseStream.cpp 
b/extensions/script/python/PyBaseStream.cpp
index 5e583f7..60259ed 100644
--- a/extensions/script/python/PyBaseStream.cpp
+++ b/extensions/script/python/PyBaseStream.cpp
@@ -54,9 +54,8 @@ py::bytes PyBaseStream::read(size_t len) {
 }
 
 size_t PyBaseStream::write(const py::bytes& buf) {
-  const auto &&buf_str = buf.operator std::string();
-  return static_cast<size_t>(stream_->write(reinterpret_cast<uint8_t 
*>(const_cast<char *>(buf_str.data())),
-                                                
static_cast<int>(buf_str.length())));
+  auto buf_str = buf.operator std::string();
+  return stream_->write(reinterpret_cast<const uint8_t*>(buf_str.data()), 
buf_str.length());
 }
 
 } /* namespace python */
diff --git a/extensions/sensors/SensorBase.h b/extensions/sensors/SensorBase.h
index 0bdeb52..9b064b4 100644
--- a/extensions/sensors/SensorBase.h
+++ b/extensions/sensors/SensorBase.h
@@ -48,13 +48,13 @@ class SensorBase : public core::Processor {
   /*!
    * Create a new processor
    */
-  SensorBase(const std::string& name, const utils::Identifier& uuid = {})
+  explicit SensorBase(const std::string& name, const utils::Identifier& uuid = 
{})
       : Processor(name, uuid),
         imu(nullptr),
         logger_(logging::LoggerFactory<SensorBase>::getLogger()) {
   }
   // Destructor
-  virtual ~SensorBase();
+  ~SensorBase() override;
   // Processor Name
   static core::Relationship Success;
   // Supported Properties
@@ -65,17 +65,14 @@ class SensorBase : public core::Processor {
 
   class WriteCallback : public OutputStreamCallback {
      public:
-      WriteCallback(std::string data)
-          : _data(const_cast<char*>(data.data())),
-            _dataSize(data.size()) {
-      }
-      char *_data;
-      uint64_t _dataSize;
-      int64_t process(const std::shared_ptr<io::BaseStream>& stream) {
-        int64_t ret = 0;
-        if (_data && _dataSize > 0)
-          ret = stream->write(reinterpret_cast<uint8_t*>(_data), _dataSize);
-        return ret;
+      explicit WriteCallback(std::string data)
+          : data_{std::move(data)}
+      {}
+      std::string data_;
+      int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
+        if (data_.empty()) return 0;
+        const auto write_ret = stream->write(reinterpret_cast<const 
uint8_t*>(data_.data()), data_.size());
+        return io::isError(write_ret) ? -1 : gsl::narrow<int64_t>(write_ret);
       }
     };
  protected:
diff --git a/extensions/sftp/client/SFTPClient.cpp 
b/extensions/sftp/client/SFTPClient.cpp
index 7af066d..cfa47ba 100644
--- a/extensions/sftp/client/SFTPClient.cpp
+++ b/extensions/sftp/client/SFTPClient.cpp
@@ -513,16 +513,16 @@ bool SFTPClient::getFile(const std::string& path, 
io::BaseStream& output, int64_
       break;
     }
     logger_->log_trace("Read %d bytes from remote file \"%s\"", read_ret, 
path.c_str());
-    total_read += read_ret;
-    int remaining = read_ret;
+    total_read += gsl::narrow<uint64_t>(read_ret);
+    auto remaining = read_ret;
     while (remaining > 0) {
-      int write_ret = output.write(buf.data() + (read_ret - remaining), 
remaining);
-      if (write_ret < 0) {
+      const auto write_ret = output.write(buf.data() + (read_ret - remaining), 
gsl::narrow<size_t>(remaining));
+      if (io::isError(write_ret)) {
         last_error_.setLibssh2Error(LIBSSH2_FX_OK);
         logger_->log_error("Failed to write output");
         return false;
       }
-      remaining -= write_ret;
+      remaining -= gsl::narrow<decltype(remaining)>(write_ret);
     }
   } while (true);
 
diff --git a/extensions/standard-processors/processors/ExecuteProcess.h 
b/extensions/standard-processors/processors/ExecuteProcess.h
index 51d72fd..c9e5b1b 100644
--- a/extensions/standard-processors/processors/ExecuteProcess.h
+++ b/extensions/standard-processors/processors/ExecuteProcess.h
@@ -42,6 +42,7 @@
 #include "core/Resource.h"
 #include "FlowFileRecord.h"
 #include "io/BaseStream.h"
+#include "utils/gsl.h"
 
 namespace org {
 namespace apache {
@@ -92,11 +93,10 @@ class ExecuteProcess : public core::Processor {
     char *_data;
     uint64_t _dataSize;
     // void process(std::ofstream *stream) {
-    int64_t process(const std::shared_ptr<io::BaseStream>& stream) {
-      int64_t ret = 0;
-      if (_data && _dataSize > 0)
-        ret = stream->write(reinterpret_cast<uint8_t*>(_data), _dataSize);
-      return ret;
+    int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
+      if (!_data || _dataSize <= 0) return 0;
+      const auto write_ret = stream->write(reinterpret_cast<uint8_t*>(_data), 
_dataSize);
+      return io::isError(write_ret) ? -1 : gsl::narrow<int64_t>(write_ret);
     }
   };
 
diff --git a/extensions/standard-processors/processors/GenerateFlowFile.cpp 
b/extensions/standard-processors/processors/GenerateFlowFile.cpp
index bf199b4..58cc738 100644
--- a/extensions/standard-processors/processors/GenerateFlowFile.cpp
+++ b/extensions/standard-processors/processors/GenerateFlowFile.cpp
@@ -148,7 +148,7 @@ void GenerateFlowFile::onTrigger(core::ProcessContext* 
/*context*/, core::Proces
       if (fileSize_ > 0) {
         generateData(data, textData_);
       }
-      GenerateFlowFile::WriteCallback callback(std::move(data));
+      GenerateFlowFile::WriteCallback callback(data);
       session->write(flowFile, &callback);
     } else {
       GenerateFlowFile::WriteCallback callback(data_);
diff --git a/extensions/standard-processors/processors/GenerateFlowFile.h 
b/extensions/standard-processors/processors/GenerateFlowFile.h
index 3c346d3..37b2bbb 100644
--- a/extensions/standard-processors/processors/GenerateFlowFile.h
+++ b/extensions/standard-processors/processors/GenerateFlowFile.h
@@ -52,7 +52,7 @@ class GenerateFlowFile : public core::Processor {
     textData_ = false;
   }
   // Destructor
-  virtual ~GenerateFlowFile() = default;
+  ~GenerateFlowFile() override = default;
   // Processor Name
   static constexpr char const* ProcessorName = "GenerateFlowFile";
   // Supported Properties
@@ -67,16 +67,14 @@ class GenerateFlowFile : public core::Processor {
   // Nest Callback Class for write stream
   class WriteCallback : public OutputStreamCallback {
    public:
-    WriteCallback(std::vector<char> && data) : data_(std::move(data)) { // 
NOLINT
-    }
-    WriteCallback(const std::vector<char>& data) : data_(data) { // NOLINT
-    }
-    std::vector<char> data_;
-    int64_t process(const std::shared_ptr<io::BaseStream>& stream) {
-      int64_t ret = 0;
-      if (data_.size() > 0)
-        ret = stream->write(reinterpret_cast<uint8_t*>(&data_[0]), 
gsl::narrow<int>(data_.size()));
-      return ret;
+    explicit WriteCallback(const std::vector<char>& data)
+        :data_(&data)
+    { }
+    gsl::not_null<const std::vector<char>*> data_;
+    int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
+      if (data_->empty()) return 0;
+      const auto write_ret = stream->write(reinterpret_cast<const 
uint8_t*>(data_->data()), data_->size());
+      return io::isError(write_ret) ? -1 : gsl::narrow<int64_t>(write_ret);
     }
   };
 
@@ -85,7 +83,7 @@ class GenerateFlowFile : public core::Processor {
   // OnTrigger method, implemented by NiFi GenerateFlowFile
   void onTrigger(core::ProcessContext *context, core::ProcessSession *session) 
override;
   // Initialize, over write by NiFi GenerateFlowFile
-  void initialize(void) override;
+  void initialize() override;
 
  protected:
   std::vector<char> data_;
diff --git a/extensions/standard-processors/processors/GetTCP.h 
b/extensions/standard-processors/processors/GetTCP.h
index c2a5b49..7595763 100644
--- a/extensions/standard-processors/processors/GetTCP.h
+++ b/extensions/standard-processors/processors/GetTCP.h
@@ -98,7 +98,8 @@ class DataHandlerCallback : public OutputStreamCallback {
   ~DataHandlerCallback() override = default;
 
   int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
-    return stream->write(message_, gsl::narrow<int>(size_));
+    const auto write_ret = stream->write(message_, size_);
+    return io::isError(write_ret) ? -1 : gsl::narrow<int64_t>(write_ret);
   }
 
  private:
diff --git a/extensions/standard-processors/processors/ListenSyslog.h 
b/extensions/standard-processors/processors/ListenSyslog.h
index 271f739..b986730 100644
--- a/extensions/standard-processors/processors/ListenSyslog.h
+++ b/extensions/standard-processors/processors/ListenSyslog.h
@@ -50,6 +50,7 @@
 #include "core/ProcessSession.h"
 #include "core/Resource.h"
 #include "FlowFileRecord.h"
+#include "utils/gsl.h"
 
 #ifndef WIN32
 
@@ -89,15 +90,14 @@ class ListenSyslog : public core::Processor {
     _serverSocket = 0;
     _maxFds = 0;
     FD_ZERO(&_readfds);
-    _thread = NULL;
+    _thread = nullptr;
     _resetServerSocket = false;
     _serverTheadRunning = false;
   }
   // Destructor
-  virtual ~ListenSyslog() {
+  ~ListenSyslog() override {
     _serverTheadRunning = false;
-    if (this->_thread)
-      delete this->_thread;
+    delete this->_thread;
     // need to reset the socket
     std::vector<int>::iterator it;
     for (it = _clientSockets.begin(); it != _clientSockets.end(); ++it) {
@@ -135,10 +135,9 @@ class ListenSyslog : public core::Processor {
     uint8_t *_data;
     uint64_t _dataSize;
     int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
-      int64_t ret = 0;
-      if (_data && _dataSize > 0)
-        ret = stream->write(_data, _dataSize);
-      return ret;
+      if (!_data || _dataSize <= 0) return 0;
+      const auto write_ret = stream->write(_data, _dataSize);
+      return io::isError(write_ret) ? -1 : gsl::narrow<int64_t>(write_ret);
     }
   };
 
diff --git a/extensions/standard-processors/processors/TailFile.cpp 
b/extensions/standard-processors/processors/TailFile.cpp
index 26ea193..6d4bf78 100644
--- a/extensions/standard-processors/processors/TailFile.cpp
+++ b/extensions/standard-processors/processors/TailFile.cpp
@@ -214,7 +214,7 @@ class FileReaderCallback : public OutputStreamCallback {
 
     while (hasMoreToRead() && !found_delimiter) {
       if (begin_ == end_) {
-        input_stream_.read(reinterpret_cast<char *>(buffer_.data()), 
buffer_.size());
+        input_stream_.read(reinterpret_cast<char *>(buffer_.data()), 
gsl::narrow<std::streamsize>(buffer_.size()));
 
         const auto num_bytes_read = input_stream_.gcount();
         logger_->log_trace("Read %jd bytes of input", 
std::intmax_t{num_bytes_read});
@@ -226,15 +226,10 @@ class FileReaderCallback : public OutputStreamCallback {
       char *delimiter_pos = std::find(begin_, end_, input_delimiter_);
       found_delimiter = (delimiter_pos != end_);
 
-      ptrdiff_t zlen{std::distance(begin_, delimiter_pos)};
-      if (found_delimiter) {
-        zlen += 1;
-      }
-      const int len = gsl::narrow<int>(zlen);
-
-      crc_stream.write(reinterpret_cast<uint8_t*>(begin_), len);
-      num_bytes_written += len;
-      begin_ += len;
+      const auto zlen = gsl::narrow<size_t>(std::distance(begin_, 
delimiter_pos)) + (found_delimiter ? 1 : 0);
+      crc_stream.write(reinterpret_cast<uint8_t*>(begin_), zlen);
+      num_bytes_written += zlen;
+      begin_ += zlen;
     }
 
     if (found_delimiter) {
diff --git 
a/extensions/standard-processors/tests/integration/SecureSocketGetTCPTest.cpp 
b/extensions/standard-processors/tests/integration/SecureSocketGetTCPTest.cpp
index da29f4b..4f221d9 100644
--- 
a/extensions/standard-processors/tests/integration/SecureSocketGetTCPTest.cpp
+++ 
b/extensions/standard-processors/tests/integration/SecureSocketGetTCPTest.cpp
@@ -92,7 +92,7 @@ class SecureSocketTest : public IntegrationBase {
     std::shared_ptr<minifi::processors::GetTCP> inv = 
std::dynamic_pointer_cast<minifi::processors::GetTCP>(proc);
 
     assert(inv != nullptr);
-    std::string url = "";
+    std::string url;
     configuration->set("nifi.remote.input.secure", "true");
     std::string path = key_dir + "cn.crt.pem";
     configuration->set("nifi.security.client.certificate", path);
@@ -117,23 +117,15 @@ class SecureSocketTest : public IntegrationBase {
     assert(0 == server_socket_->initialize());
 
     isRunning_ = true;
-    check = [this]() -> bool {
-      return isRunning_;
+    auto handler = [](std::vector<uint8_t> *b) {
+      *b = {'h', 'e', 'l', 'l', 'o', ' ', 'w', 'o', 'r', 'l', 'd', 0, 0, 0, 0, 
0, 0, 0, 0, 0};
+      assert(b->size() == 20);
+      return b->size();
     };
-    handler = [](std::vector<uint8_t> *b, int *size) {
-      std::cout << "oh write!" << std::endl;
-      b->reserve(20);
-      memset(b->data(), 0x00, 20);
-      memcpy(b->data(), "hello world", 11);
-      *size = 20;
-      return *size;
-    };
-    server_socket_->registerCallback(check, handler, 
std::chrono::milliseconds(50));
+    server_socket_->registerCallback([this] { return isRunning_.load(); }, 
std::move(handler), std::chrono::milliseconds(50));
   }
 
  protected:
-  std::function<bool()> check;
-  std::function<int(std::vector<uint8_t>*b, int *size)> handler;
   bool isSecure;
   std::atomic<bool> isRunning_;
   std::string dir;
diff --git 
a/extensions/standard-processors/tests/integration/TLSServerSocketSupportedProtocolsTest.cpp
 
b/extensions/standard-processors/tests/integration/TLSServerSocketSupportedProtocolsTest.cpp
index 9278b9d..55800b0 100644
--- 
a/extensions/standard-processors/tests/integration/TLSServerSocketSupportedProtocolsTest.cpp
+++ 
b/extensions/standard-processors/tests/integration/TLSServerSocketSupportedProtocolsTest.cpp
@@ -248,17 +248,13 @@ class TLSServerSocketSupportedProtocolsTest {
       assert(0 == server_socket_->initialize());
 
       is_running_ = true;
-      auto check = [this]() -> bool {
-        return is_running_;
+      auto handler = [](std::vector<uint8_t> *bytes_written) {
+        const char contents[] = "hello world";
+        *bytes_written = {std::begin(contents), std::end(contents)};
+        assert(12 == bytes_written->size());
+        return bytes_written->size();
       };
-      auto handler = [](std::vector<uint8_t> *bytes_written, int *size) {
-        std::string contents = "hello world";
-        *bytes_written = {contents.begin(), contents.end()};
-        bytes_written->push_back(0);
-        *size = bytes_written->size();
-        return *size;
-      };
-      server_socket_->registerCallback(check, handler, 
std::chrono::milliseconds(50));
+      server_socket_->registerCallback([this]{ return is_running_.load(); }, 
std::move(handler), std::chrono::milliseconds(50));
     }
 
     void verifyTLSServerSocketExclusiveCompatibilityWithTLSv1_2() {
diff --git a/extensions/standard-processors/tests/unit/GetTCPTests.cpp 
b/extensions/standard-processors/tests/unit/GetTCPTests.cpp
index 91ca60e..859ecbf 100644
--- a/extensions/standard-processors/tests/unit/GetTCPTests.cpp
+++ b/extensions/standard-processors/tests/unit/GetTCPTests.cpp
@@ -110,7 +110,7 @@ TEST_CASE("GetTCPWithoutEOM", "[GetTCP1]") {
   std::shared_ptr<core::ProcessSessionFactory> factory = 
std::make_shared<core::ProcessSessionFactory>(context);
   processor->onSchedule(context, factory);
   processor->onTrigger(context, session);
-  server.write(buffer, gsl::narrow<int>(buffer.size()));
+  server.write(buffer, buffer.size());
   std::this_thread::sleep_for(std::chrono::seconds(2));
 
   logAttribute->initialize();
@@ -225,7 +225,7 @@ TEST_CASE("GetTCPWithOEM", "[GetTCP2]") {
   std::shared_ptr<core::ProcessSessionFactory> factory = 
std::make_shared<core::ProcessSessionFactory>(context);
   processor->onSchedule(context, factory);
   processor->onTrigger(context, session);
-  server.write(buffer, gsl::narrow<int>(buffer.size()));
+  server.write(buffer, buffer.size());
   std::this_thread::sleep_for(std::chrono::seconds(2));
 
   logAttribute->initialize();
@@ -349,7 +349,7 @@ TEST_CASE("GetTCPWithOnlyOEM", "[GetTCP3]") {
   std::shared_ptr<core::ProcessSessionFactory> factory = 
std::make_shared<core::ProcessSessionFactory>(context);
   processor->onSchedule(context, factory);
   processor->onTrigger(context, session);
-  server.write(buffer, gsl::narrow<int>(buffer.size()));
+  server.write(buffer, buffer.size());
   std::this_thread::sleep_for(std::chrono::seconds(2));
 
   logAttribute->initialize();
diff --git a/extensions/usb-camera/CMakeLists.txt 
b/extensions/usb-camera/CMakeLists.txt
index 14c1c08..87e8c02 100644
--- a/extensions/usb-camera/CMakeLists.txt
+++ b/extensions/usb-camera/CMakeLists.txt
@@ -23,7 +23,7 @@ find_package(PNG)
 if(PNG_FOUND)
     set(PNG_LINK_FLAGS ${PNG_LIBRARIES})
 else()
-    pkg_check_modules(PNG QUIET libpng)
+    pkg_check_modules(PNG libpng)
     if(PNG_FOUND)
         set(PNG_INCLUDE_DIR ${PNG_INCLUDE_DIRS})
         set(PNG_LINK_FLAGS ${PNG_LDFLAGS})
diff --git a/extensions/usb-camera/GetUSBCamera.cpp 
b/extensions/usb-camera/GetUSBCamera.cpp
index 074341f..4d6d021 100644
--- a/extensions/usb-camera/GetUSBCamera.cpp
+++ b/extensions/usb-camera/GetUSBCamera.cpp
@@ -17,6 +17,8 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+#include "GetUSBCamera.h"
+
 #include <png.h>
 
 #include <utility>
@@ -25,7 +27,7 @@
 #include <set>
 #include <algorithm>
 
-#include "GetUSBCamera.h"
+#include "utils/gsl.h"
 
 namespace org {
 namespace apache {
@@ -460,7 +462,8 @@ int64_t GetUSBCamera::PNGWriteCallback::process(const 
std::shared_ptr<io::BaseSt
     throw;
   }
 
-  return stream->write(png_output_buf_.data(), png_output_buf_.size());
+  const auto write_ret = stream->write(png_output_buf_.data(), 
png_output_buf_.size());
+  return io::isError(write_ret) ? -1 : gsl::narrow<int64_t>(write_ret);
 }
 
 GetUSBCamera::RawWriteCallback::RawWriteCallback(uvc_frame_t *frame)
@@ -470,7 +473,8 @@ 
GetUSBCamera::RawWriteCallback::RawWriteCallback(uvc_frame_t *frame)
 
 int64_t GetUSBCamera::RawWriteCallback::process(const 
std::shared_ptr<io::BaseStream>& stream) {
   logger_->log_info("Writing %d bytes of raw capture data", 
frame_->data_bytes);
-  return stream->write(reinterpret_cast<uint8_t *>(frame_->data), 
frame_->data_bytes);
+  const auto write_ret = 
stream->write(reinterpret_cast<uint8_t*>(frame_->data), frame_->data_bytes);
+  return io::isError(write_ret) ? -1 : gsl::narrow<int64_t>(write_ret);
 }
 
 } /* namespace processors */
diff --git a/extensions/windows-event-log/CollectorInitiatedSubscription.cpp 
b/extensions/windows-event-log/CollectorInitiatedSubscription.cpp
index b1d7201..4c74552 100644
--- a/extensions/windows-event-log/CollectorInitiatedSubscription.cpp
+++ b/extensions/windows-event-log/CollectorInitiatedSubscription.cpp
@@ -625,16 +625,15 @@ void CollectorInitiatedSubscription::unsubscribe() {
 int CollectorInitiatedSubscription::processQueue(const 
std::shared_ptr<core::ProcessSession> &session) {
   struct WriteCallback: public OutputStreamCallback {
     explicit WriteCallback(const std::string& str)
-      : str_(str) {
-      status_ = 0;
+      : str_(&str) {
     }
 
     int64_t process(const std::shared_ptr<io::BaseStream>& stream) {
-      return stream->write(reinterpret_cast<uint8_t*>(&str_[0]), 
gsl::narrow<int>(str_.size()));
+      const auto write_ret = stream->write(reinterpret_cast<const 
uint8_t*>(str_->data()), str_->size());
+      return io::isError(write_ret) ? -1 : gsl::narrow<int64_t>(write_ret);
     }
 
-    std::string str_;
-    int status_;
+    gsl::not_null<const std::string*> str_;
   };
 
   int flowFileCount = 0;
diff --git a/extensions/windows-event-log/ConsumeWindowsEventLog.cpp 
b/extensions/windows-event-log/ConsumeWindowsEventLog.cpp
index 3746826..04a90a0 100644
--- a/extensions/windows-event-log/ConsumeWindowsEventLog.cpp
+++ b/extensions/windows-event-log/ConsumeWindowsEventLog.cpp
@@ -695,7 +695,8 @@ void 
ConsumeWindowsEventLog::putEventRenderFlowFileToSession(const EventRender&
     }
 
     int64_t process(const std::shared_ptr<io::BaseStream>& stream) {
-      return 
stream->write(reinterpret_cast<uint8_t*>(const_cast<char*>(str_.c_str())), 
gsl::narrow<int>(str_.size()));
+      const auto write_ret = stream->write(reinterpret_cast<const 
uint8_t*>(str_.c_str()), str_.size());
+      return io::isError(write_ret) ? -1 : gsl::narrow<int64_t>(write_ret);
     }
 
     const std::string& str_;
diff --git a/libminifi/include/io/AtomicEntryStream.h 
b/libminifi/include/io/AtomicEntryStream.h
index 6456ff2..10b2eda 100644
--- a/libminifi/include/io/AtomicEntryStream.h
+++ b/libminifi/include/io/AtomicEntryStream.h
@@ -82,7 +82,7 @@ class AtomicEntryStream : public BaseStream {
    * @param value value to write
    * @param size size of value
    */
-  int write(const uint8_t *value, int size) override;
+  size_t write(const uint8_t *value, size_t size) override;
 
  private:
   size_t length_;
@@ -110,22 +110,18 @@ void AtomicEntryStream<T>::seek(size_t offset) {
 
 // data stream overrides
 template<typename T>
-int AtomicEntryStream<T>::write(const uint8_t *value, int size) {
-  gsl_Expects(size >= 0);
-  if (size == 0) {
-    return 0;
-  }
-  if (nullptr != value && !invalid_stream_) {
-    std::lock_guard<std::recursive_mutex> lock(entry_lock_);
-    if (entry_->insert(key_, const_cast<uint8_t*>(value), size)) {
-      offset_ += size;
-      if (offset_ > length_) {
-        length_ = offset_;
-      }
-      return size;
+size_t AtomicEntryStream<T>::write(const uint8_t *value, size_t size) {
+  if (size == 0) return 0;
+  if (!value || invalid_stream_) return STREAM_ERROR;
+  std::lock_guard<std::recursive_mutex> lock(entry_lock_);
+  if (entry_->insert(key_, const_cast<uint8_t*>(value), size)) {
+    offset_ += size;
+    if (offset_ > length_) {
+      length_ = offset_;
     }
+    return size;
   }
-  return -1;
+  return STREAM_ERROR;
 }
 
 template<typename T>
diff --git a/libminifi/include/io/BufferStream.h 
b/libminifi/include/io/BufferStream.h
index 7ab2b9e..2290a3f 100644
--- a/libminifi/include/io/BufferStream.h
+++ b/libminifi/include/io/BufferStream.h
@@ -40,13 +40,13 @@ class BufferStream : public BaseStream {
   }
 
   explicit BufferStream(const std::string& data) {
-    write(reinterpret_cast<const uint8_t*>(data.c_str()), 
gsl::narrow<int>(data.length()));
+    write(reinterpret_cast<const uint8_t*>(data.c_str()), data.length());
   }
 
   using BaseStream::read;
   using BaseStream::write;
 
-  int write(const uint8_t* data, int len) final;
+  size_t write(const uint8_t* data, size_t len) final;
 
   size_t read(uint8_t* buffer, size_t len) override;
 
diff --git a/libminifi/include/io/CRCStream.h b/libminifi/include/io/CRCStream.h
index b6ae751..70949a4 100644
--- a/libminifi/include/io/CRCStream.h
+++ b/libminifi/include/io/CRCStream.h
@@ -104,9 +104,9 @@ class OutputCRCStream : public virtual 
CRCStreamBase<StreamType>, public OutputS
  public:
   using OutputStream::write;
 
-  int write(const uint8_t *value, int size) override {
-    int ret = child_stream_->write(value, size);
-    if (ret > 0) {
+  size_t write(const uint8_t *value, size_t size) override {
+    const auto ret = child_stream_->write(value, size);
+    if (ret > 0 && !io::isError(ret)) {
       crc_ = crc32(crc_, value, ret);
     }
     return ret;
diff --git a/libminifi/include/io/ClientSocket.h 
b/libminifi/include/io/ClientSocket.h
index db71cba..e5f72b3 100644
--- a/libminifi/include/io/ClientSocket.h
+++ b/libminifi/include/io/ClientSocket.h
@@ -151,7 +151,7 @@ class Socket : public BaseStream {
   using BaseStream::write;
   using BaseStream::read;
 
-  int write(const uint8_t *value, int size) override;
+  size_t write(const uint8_t *value, size_t size) override;
 
   /**
    * Reads data and places it into buf
diff --git a/libminifi/include/io/DescriptorStream.h 
b/libminifi/include/io/DescriptorStream.h
index ee5aec7..695f543 100644
--- a/libminifi/include/io/DescriptorStream.h
+++ b/libminifi/include/io/DescriptorStream.h
@@ -66,7 +66,7 @@ class DescriptorStream : public io::BaseStream {
    * @param value value to write
    * @param size size of value
    */
-  int write(const uint8_t *value, int size) override;
+  size_t write(const uint8_t *value, size_t size) override;
 
  private:
   std::recursive_mutex file_lock_;
diff --git a/libminifi/include/io/FileStream.h 
b/libminifi/include/io/FileStream.h
index ad7f0dc..3ef4db7 100644
--- a/libminifi/include/io/FileStream.h
+++ b/libminifi/include/io/FileStream.h
@@ -84,7 +84,7 @@ class FileStream : public io::BaseStream {
    * @param value value to write
    * @param size size of value
    */
-  int write(const uint8_t *value, int size) override;
+  size_t write(const uint8_t *value, size_t size) override;
 
  private:
   void seekToEndOfFile(const char* caller_error_msg);
diff --git a/libminifi/include/io/OutputStream.h 
b/libminifi/include/io/OutputStream.h
index f14a0b3..b02c0ca 100644
--- a/libminifi/include/io/OutputStream.h
+++ b/libminifi/include/io/OutputStream.h
@@ -45,40 +45,52 @@ class OutputStream : public virtual Stream {
    * @param len length of value
    * @return resulting write size
    **/
-  virtual int write(const uint8_t *value, int len) = 0;
+  virtual size_t write(const uint8_t *value, size_t len) = 0;
 
-  int write(const std::vector<uint8_t>& buffer, int len);
+  /**
+   * write: resolve nullptr ambiguity
+   * call: write(nullptr, 0)
+   * candidate1: write(const uint8_t*, size_t): conversion from std::nullptr_t 
to const uint8_t* and from int to size_t
+   * candidate2: write(const char*, bool): conversion from std::nullptr_t to 
const char* and from int to bool
+   * @param len
+   * @return
+   */
+  size_t write(std::nullptr_t, size_t len) {
+    return write(static_cast<const uint8_t*>(nullptr), len);
+  }
+
+  size_t write(const std::vector<uint8_t>& buffer, size_t len);
 
   /**
    * write bool to stream
    * @param value non encoded value
    * @return resulting write size
    **/
-  int write(bool value);
+  size_t write(bool value);
 
   /**
    * write Identifier to stream
    * @param value non encoded value
    * @return resulting write size
    **/
-  int write(const utils::Identifier& value);
+  size_t write(const utils::Identifier& value);
 
   /**
    * write string to stream
    * @param str string to write
    * @return resulting write size
    **/
-  int write(const std::string& str, bool widen = false);
+  size_t write(const std::string& str, bool widen = false);
 
   /**
    * write string to stream
    * @param str string to write
    * @return resulting write size
    **/
-  int write(const char* str, bool widen = false);
+  size_t write(const char* str, bool widen = false);
 
   template<size_t N>
-  int write(const utils::SmallString<N>& str, bool widen = false) {
+  size_t write(const utils::SmallString<N>& str, bool widen = false) {
     return write(str.c_str(), widen);
   }
 
@@ -88,7 +100,7 @@ class OutputStream : public virtual Stream {
   * @return resulting write size
   **/
   template<typename Integral, typename = 
std::enable_if<std::is_unsigned<Integral>::value && !std::is_same<Integral, 
bool>::value>>
-  int write(Integral value) {
+  size_t write(Integral value) {
     uint8_t buffer[sizeof(Integral)]{};
 
     for (std::size_t byteIdx = 0; byteIdx < sizeof(Integral); ++byteIdx) {
@@ -99,7 +111,7 @@ class OutputStream : public virtual Stream {
   }
 
  private:
-  int write_str(const char* str, uint32_t len, bool widen);
+  size_t write_str(const char* str, uint32_t len, bool widen);
 };
 
 }  // namespace io
diff --git a/libminifi/include/io/Stream.h b/libminifi/include/io/Stream.h
index e694c2e..0428cf7 100644
--- a/libminifi/include/io/Stream.h
+++ b/libminifi/include/io/Stream.h
@@ -26,13 +26,9 @@ namespace io {
 
 constexpr size_t STREAM_ERROR = static_cast<size_t>(-1);
 
-inline bool isError(const size_t read_return) noexcept {
-  return read_return == STREAM_ERROR  // general error
-      || read_return == static_cast<size_t>(-2);  // Socket EAGAIN, to be 
refactored to eliminate this error condition
-}
-
-inline bool isError(const int write_return) noexcept {
-  return write_return == -1;
+inline bool isError(const size_t read_write_return) noexcept {
+  return read_write_return == STREAM_ERROR  // general error
+      || read_write_return == static_cast<size_t>(-2);  // read: Socket 
EAGAIN, to be refactored to eliminate this error condition
 }
 
 /**
diff --git a/libminifi/include/io/StreamPipe.h 
b/libminifi/include/io/StreamPipe.h
index abac015..b92b993 100644
--- a/libminifi/include/io/StreamPipe.h
+++ b/libminifi/include/io/StreamPipe.h
@@ -55,14 +55,14 @@ inline int64_t pipe(const std::shared_ptr<io::InputStream>& 
src, const std::shar
     auto remaining = readRet;
     int transferred = 0;
     while (remaining > 0) {
-      int writeRet = dst->write(buffer + transferred, remaining);
+      const auto writeRet = dst->write(buffer + transferred, remaining);
       // TODO(adebreceni):
       //   write might return 0, e.g. in case of a congested server
       //   what should we return then?
       //     - the number of bytes read or
       //     - the number of bytes wrote
-      if (writeRet < 0) {
-        return writeRet;
+      if (io::isError(writeRet)) {
+        return -1;
       }
       transferred += writeRet;
       remaining -= writeRet;
diff --git a/libminifi/include/io/ZlibStream.h 
b/libminifi/include/io/ZlibStream.h
index ab4d419..b146413 100644
--- a/libminifi/include/io/ZlibStream.h
+++ b/libminifi/include/io/ZlibStream.h
@@ -76,7 +76,7 @@ class ZlibCompressStream : public ZlibBaseStream {
 
   ~ZlibCompressStream() override;
 
-  int write(const uint8_t* value, int size) override;
+  size_t write(const uint8_t* value, size_t size) override;
 
   void close() override;
 
@@ -95,7 +95,7 @@ class ZlibDecompressStream : public ZlibBaseStream {
 
   ~ZlibDecompressStream() override;
 
-  int write(const uint8_t *value, int size) override;
+  size_t write(const uint8_t *value, size_t size) override;
 
  private:
   std::shared_ptr<logging::Logger> 
logger_{logging::LoggerFactory<ZlibDecompressStream>::getLogger()};
diff --git a/libminifi/include/io/tls/SecureDescriptorStream.h 
b/libminifi/include/io/tls/SecureDescriptorStream.h
index 5f285c1..2953711 100644
--- a/libminifi/include/io/tls/SecureDescriptorStream.h
+++ b/libminifi/include/io/tls/SecureDescriptorStream.h
@@ -81,7 +81,7 @@ class SecureDescriptorStream : public io::BaseStream {
    * @param value value to write
    * @param size size of value
    */
-  int write(const uint8_t *value, int size) override;
+  size_t write(const uint8_t *value, size_t size) override;
 
  protected:
   std::recursive_mutex file_lock_;
diff --git a/libminifi/include/io/tls/TLSServerSocket.h 
b/libminifi/include/io/tls/TLSServerSocket.h
index 13eaf0c..53eb3b1 100644
--- a/libminifi/include/io/tls/TLSServerSocket.h
+++ b/libminifi/include/io/tls/TLSServerSocket.h
@@ -54,9 +54,9 @@ class TLSServerSocket : public BaseServerSocket, public 
TLSSocket {
    * Registers a call back and starts the read for the server socket.
    */
   void registerCallback(
-    std::function<bool()> accept_function,
-    std::function<int(std::vector<uint8_t>*, int *)> handler,
-    std::chrono::milliseconds timeout = std::chrono::milliseconds(3000));
+      std::function<bool()> accept_function,
+      std::function<size_t(std::vector<uint8_t>*)> handler,
+      std::chrono::milliseconds timeout = std::chrono::milliseconds(3000));
 
   /**
    * Initializes the socket
@@ -65,8 +65,6 @@ class TLSServerSocket : public BaseServerSocket, public 
TLSSocket {
   void registerCallback(std::function<bool()> accept_function, 
std::function<void(io::BaseStream *)> handler) override;
 
  private:
-  std::function<void(std::function<bool()> accept_function, 
std::function<int(std::vector<uint8_t>*, int *)> handler, 
std::chrono::milliseconds timeout)> fx;
-
   void close_fd(int fd);
 
   std::atomic<bool> running_;
diff --git a/libminifi/include/io/tls/TLSSocket.h 
b/libminifi/include/io/tls/TLSSocket.h
index 24df1d9..271de46 100644
--- a/libminifi/include/io/tls/TLSSocket.h
+++ b/libminifi/include/io/tls/TLSSocket.h
@@ -151,12 +151,12 @@ class TLSSocket : public Socket {
    * @param buflen buffer to write
    *
    */
-  int write(const uint8_t *value, int size) override;
+  size_t write(const uint8_t *value, size_t size) override;
 
   void close() override;
 
  protected:
-  int writeData(const uint8_t *value, unsigned int size, int fd);
+  size_t writeData(const uint8_t *value, size_t size, int fd);
 
   SSL *get_ssl(int fd) {
     if (UNLIKELY(listeners_ > 0)) {
diff --git a/libminifi/include/serialization/FlowFileV3Serializer.h 
b/libminifi/include/serialization/FlowFileV3Serializer.h
index a6eb2cf..06a284c 100644
--- a/libminifi/include/serialization/FlowFileV3Serializer.h
+++ b/libminifi/include/serialization/FlowFileV3Serializer.h
@@ -34,9 +34,9 @@ class FlowFileV3Serializer : public FlowFileSerializer {
 
   static constexpr uint16_t MAX_2_BYTE_VALUE = 
(std::numeric_limits<uint16_t>::max)();
 
-  static int writeLength(std::size_t length, const 
std::shared_ptr<io::OutputStream>& out);
+  static size_t writeLength(std::size_t length, const 
std::shared_ptr<io::OutputStream>& out);
 
-  static int writeString(const std::string& str, const 
std::shared_ptr<io::OutputStream>& out);
+  static size_t writeString(const std::string& str, const 
std::shared_ptr<io::OutputStream>& out);
 
  public:
   using FlowFileSerializer::FlowFileSerializer;
diff --git a/libminifi/include/sitetosite/Peer.h 
b/libminifi/include/sitetosite/Peer.h
index d2138ea..cea5e88 100644
--- a/libminifi/include/sitetosite/Peer.h
+++ b/libminifi/include/sitetosite/Peer.h
@@ -295,7 +295,7 @@ class SiteToSitePeer : public 
org::apache::nifi::minifi::io::BaseStream {
   using BaseStream::write;
   using BaseStream::read;
 
-  int write(const uint8_t* data, int len) override {
+  size_t write(const uint8_t* data, size_t len) override {
     return stream_->write(data, len);
   }
 
diff --git a/libminifi/include/sitetosite/SiteToSiteClient.h 
b/libminifi/include/sitetosite/SiteToSiteClient.h
index d142bfa..cffda6c 100644
--- a/libminifi/include/sitetosite/SiteToSiteClient.h
+++ b/libminifi/include/sitetosite/SiteToSiteClient.h
@@ -308,7 +308,7 @@ class ReadCallback : public InputStreamCallback {
       if (readSize == 0) break;
       if (io::isError(readSize)) return -1;
       const auto ret = _packet->transaction_->getStream().write(buffer, 
readSize);
-      if (ret < 0 || gsl::narrow<size_t>(ret) != readSize) {
+      if (io::isError(ret) || gsl::narrow<size_t>(ret) != readSize) {
         logging::LOG_INFO(_packet->logger_reference_) << "Site2Site Send Flow 
Size " << readSize << " Failed " << ret;
         return -1;
       }
diff --git a/libminifi/include/utils/GeneralUtils.h 
b/libminifi/include/utils/GeneralUtils.h
index bd70412..6a766c9 100644
--- a/libminifi/include/utils/GeneralUtils.h
+++ b/libminifi/include/utils/GeneralUtils.h
@@ -19,6 +19,7 @@
 #ifndef LIBMINIFI_INCLUDE_UTILS_GENERALUTILS_H_
 #define LIBMINIFI_INCLUDE_UTILS_GENERALUTILS_H_
 
+#include <algorithm>
 #include <memory>
 #include <type_traits>
 #include <utility>
@@ -174,6 +175,7 @@ auto invoke(F&& f, Args&&... args) 
MINIFICPP_UTIL_DEDUCED(detail::invoke_impl(st
 using std::invoke
 #endif /* < C++17 */
 
+
 namespace detail {
 struct dereference_t {
   template<typename T>
diff --git a/libminifi/src/FlowFileRecord.cpp b/libminifi/src/FlowFileRecord.cpp
index 89096a9..db080de 100644
--- a/libminifi/src/FlowFileRecord.cpp
+++ b/libminifi/src/FlowFileRecord.cpp
@@ -70,69 +70,82 @@ std::shared_ptr<FlowFileRecord> 
FlowFileRecord::DeSerialize(const std::string& k
 }
 
 bool FlowFileRecord::Serialize(io::OutputStream &outStream) {
-  int ret;
-
-  ret = outStream.write(event_time_);
-  if (ret != 8) {
-    return false;
+  {
+    const auto ret = outStream.write(event_time_);
+    if (ret != 8) {
+      return false;
+    }
   }
-
-  ret = outStream.write(entry_date_);
-  if (ret != 8) {
-    return false;
+  {
+    const auto ret = outStream.write(entry_date_);
+    if (ret != 8) {
+      return false;
+    }
   }
-
-  ret = outStream.write(lineage_start_date_);
-  if (ret != 8) {
-    return false;
+  {
+    const auto ret = outStream.write(lineage_start_date_);
+    if (ret != 8) {
+      return false;
+    }
   }
-
-  ret = outStream.write(uuid_);
-  if (ret <= 0) {
-    return false;
+  {
+    const auto ret = outStream.write(uuid_);
+    if (ret == 0 || io::isError(ret)) {
+      return false;
+    }
   }
-
   utils::Identifier containerId;
   if (connection_) {
     containerId = connection_->getUUID();
   }
-  ret = outStream.write(containerId);
-  if (ret <= 0) {
-    return false;
+  {
+    const auto ret = outStream.write(containerId);
+    if (ret == 0 || io::isError(ret)) {
+      return false;
+    }
   }
   // write flow attributes
-  uint32_t numAttributes = gsl::narrow<uint32_t>(attributes_.size());
-  ret = outStream.write(numAttributes);
-  if (ret != 4) {
-    return false;
+  {
+    const auto numAttributes = gsl::narrow<uint32_t>(attributes_.size());
+    const auto ret = outStream.write(numAttributes);
+    if (ret != 4) {
+      return false;
+    }
   }
 
   for (auto& itAttribute : attributes_) {
-    ret = outStream.write(itAttribute.first, true);
-    if (ret <= 0) {
-      return false;
+    {
+      const auto ret = outStream.write(itAttribute.first, true);
+      if (ret == 0 || io::isError(ret)) {
+        return false;
+      }
     }
-    ret = outStream.write(itAttribute.second, true);
-    if (ret <= 0) {
-      return false;
+    {
+      const auto ret = outStream.write(itAttribute.second, true);
+      if (ret == 0 || io::isError(ret)) {
+        return false;
+      }
     }
   }
 
-  ret = outStream.write(getContentFullPath());
-  if (ret <= 0) {
-    return false;
+  {
+    const auto ret = outStream.write(getContentFullPath());
+    if (ret == 0 || io::isError(ret)) {
+      return false;
+    }
   }
-
-  ret = outStream.write(size_);
-  if (ret != 8) {
-    return false;
+  {
+    const auto ret = outStream.write(size_);
+    if (ret != 8) {
+      return false;
+    }
   }
-
-  ret = outStream.write(offset_);
-  if (ret != 8) {
-    return false;
+  {
+    const auto ret = outStream.write(offset_);
+    if (ret != 8) {
+      return false;
+    }
   }
-
   return true;
 }
 
diff --git a/libminifi/src/c2/ControllerSocketProtocol.cpp 
b/libminifi/src/c2/ControllerSocketProtocol.cpp
index 049a3d0..63e4b0e 100644
--- a/libminifi/src/c2/ControllerSocketProtocol.cpp
+++ b/libminifi/src/c2/ControllerSocketProtocol.cpp
@@ -177,7 +177,7 @@ void 
ControllerSocketProtocol::initialize(core::controller::ControllerServicePro
             io::BufferStream resp;
             resp.write(&head, 1);
             resp.write(response.str());
-            stream->write(const_cast<uint8_t*>(resp.getBuffer()), 
gsl::narrow<int>(resp.size()));
+            stream->write(resp.getBuffer(), resp.size());
           } else if (what == "components") {
             io::BufferStream resp;
             resp.write(&head, 1);
@@ -187,7 +187,7 @@ void 
ControllerSocketProtocol::initialize(core::controller::ControllerServicePro
               resp.write(component->getComponentName());
               resp.write(component->isRunning() ? "true" : "false");
             }
-            stream->write(const_cast<uint8_t*>(resp.getBuffer()), 
gsl::narrow<int>(resp.size()));
+            stream->write(resp.getBuffer(), resp.size());
           } else if (what == "jstack") {
             io::BufferStream resp;
             resp.write(&head, 1);
@@ -203,7 +203,7 @@ void 
ControllerSocketProtocol::initialize(core::controller::ControllerServicePro
                 resp.write(line);
               }
             }
-            stream->write(const_cast<uint8_t*>(resp.getBuffer()), 
gsl::narrow<int>(resp.size()));
+            stream->write(resp.getBuffer(), resp.size());
           } else if (what == "connections") {
             io::BufferStream resp;
             resp.write(&head, 1);
@@ -212,7 +212,7 @@ void 
ControllerSocketProtocol::initialize(core::controller::ControllerServicePro
             for (const auto &connection : queue_full_) {
               resp.write(connection.first, false);
             }
-            stream->write(const_cast<uint8_t*>(resp.getBuffer()), 
gsl::narrow<int>(resp.size()));
+            stream->write(resp.getBuffer(), resp.size());
           } else if (what == "getfull") {
             std::vector<std::string> full_connections;
             {
@@ -230,7 +230,7 @@ void 
ControllerSocketProtocol::initialize(core::controller::ControllerServicePro
             for (const auto& conn : full_connections) {
               resp.write(conn);
             }
-            stream->write(const_cast<uint8_t*>(resp.getBuffer()), 
gsl::narrow<int>(resp.size()));
+            stream->write(resp.getBuffer(), resp.size());
           }
         }
         break;
diff --git a/libminifi/src/core/ContentSession.cpp 
b/libminifi/src/core/ContentSession.cpp
index 14221ca..74ffcff 100644
--- a/libminifi/src/core/ContentSession.cpp
+++ b/libminifi/src/core/ContentSession.cpp
@@ -72,8 +72,8 @@ void ContentSession::commit() {
     if (outStream == nullptr) {
       throw Exception(REPOSITORY_EXCEPTION, "Couldn't open the underlying 
resource for write: " + resource.first->getContentFullPath());
     }
-    const int size = gsl::narrow<int>(resource.second->size());
-    const int bytes_written = 
outStream->write(const_cast<uint8_t*>(resource.second->getBuffer()), size);
+    const auto size = resource.second->size();
+    const auto bytes_written = outStream->write(resource.second->getBuffer(), 
size);
     if (bytes_written != size) {
       throw Exception(REPOSITORY_EXCEPTION, "Failed to write new resource: " + 
resource.first->getContentFullPath());
     }
@@ -83,8 +83,8 @@ void ContentSession::commit() {
     if (outStream == nullptr) {
       throw Exception(REPOSITORY_EXCEPTION, "Couldn't open the underlying 
resource for append: " + resource.first->getContentFullPath());
     }
-    const int size = gsl::narrow<int>(resource.second->size());
-    const int bytes_written = 
outStream->write(const_cast<uint8_t*>(resource.second->getBuffer()), size);
+    const auto size = resource.second->size();
+    const auto bytes_written = outStream->write(resource.second->getBuffer(), 
size);
     if (bytes_written != size) {
       throw Exception(REPOSITORY_EXCEPTION, "Failed to append to resource: " + 
resource.first->getContentFullPath());
     }
diff --git a/libminifi/src/core/ProcessSession.cpp 
b/libminifi/src/core/ProcessSession.cpp
index 5e3454d..887798e 100644
--- a/libminifi/src/core/ProcessSession.cpp
+++ b/libminifi/src/core/ProcessSession.cpp
@@ -407,12 +407,12 @@ void ProcessSession::import(std::string source, const 
std::shared_ptr<FlowFile>
       while (input.good()) {
         input.read(reinterpret_cast<char*>(charBuffer.data()), size);
         if (input) {
-          if (stream->write(charBuffer.data(), gsl::narrow<int>(size)) < 0) {
+          if (io::isError(stream->write(charBuffer.data(), size))) {
             invalidWrite = true;
             break;
           }
         } else {
-          if (stream->write(reinterpret_cast<uint8_t*>(charBuffer.data()), 
gsl::narrow<int>(input.gcount())) < 0) {
+          if 
(io::isError(stream->write(reinterpret_cast<uint8_t*>(charBuffer.data()), 
gsl::narrow<size_t>(input.gcount())))) {
             invalidWrite = true;
             break;
           }
@@ -489,7 +489,7 @@ void ProcessSession::import(const std::string& source, 
std::vector<std::shared_p
       while (true) {
         startTime = utils::timeutils::getTimeMillis();
         uint8_t* delimiterPos = std::find(begin, end, 
static_cast<uint8_t>(inputDelimiter));
-        const auto len = gsl::narrow<int>(delimiterPos - begin);
+        const auto len = gsl::narrow<size_t>(delimiterPos - begin);
 
         logging::LOG_TRACE(logger_) << "Read input of " << read << " length is 
" << len << " is at end?" << (delimiterPos == end);
         /*
diff --git a/libminifi/src/io/BufferStream.cpp 
b/libminifi/src/io/BufferStream.cpp
index bd0a9d2..7e878d2 100644
--- a/libminifi/src/io/BufferStream.cpp
+++ b/libminifi/src/io/BufferStream.cpp
@@ -28,8 +28,7 @@ namespace nifi {
 namespace minifi {
 namespace io {
 
-int BufferStream::write(const uint8_t *value, int size) {
-  gsl_Expects(size >= 0);
+size_t BufferStream::write(const uint8_t *value, size_t size) {
   size_t originalSize = buffer_.size();
   buffer_.resize(originalSize + size);
   std::memcpy(buffer_.data() + originalSize, value, size);
diff --git a/libminifi/src/io/ClientSocket.cpp 
b/libminifi/src/io/ClientSocket.cpp
index 57c6915..d26a01b 100644
--- a/libminifi/src/io/ClientSocket.cpp
+++ b/libminifi/src/io/ClientSocket.cpp
@@ -501,25 +501,22 @@ std::string Socket::getHostname() const {
 
 // data stream overrides
 
-int Socket::write(const uint8_t *value, int size) {
-  gsl_Expects(size >= 0);
-
-  int ret = 0, bytes = 0;
-
+size_t Socket::write(const uint8_t *value, size_t size) {
+  size_t bytes = 0;
   int fd = select_descriptor(1000);
-  if (fd < 0) { return -1; }
+  if (fd < 0) { return STREAM_ERROR; }
   while (bytes < size) {
-    ret = send(fd, reinterpret_cast<const char*>(value) + bytes, size - bytes, 
0);
+    const auto send_ret = send(fd, reinterpret_cast<const char*>(value) + 
bytes, size - bytes, 0);
     // check for errors
-    if (ret <= 0) {
+    if (send_ret <= 0) {
       utils::file::FileUtils::close(fd);
       logger_->log_error("Could not send to %d, error: %s", fd, 
get_last_socket_error_message());
-      return ret;
+      return STREAM_ERROR;
     }
-    bytes += ret;
+    bytes += gsl::narrow<size_t>(send_ret);
   }
 
-  if (ret)
+  if (bytes > 0)
     logger_->log_trace("Send data size %d over socket %d", size, fd);
   total_written_ += bytes;
   return bytes;
diff --git a/libminifi/src/io/DescriptorStream.cpp 
b/libminifi/src/io/DescriptorStream.cpp
index a3a4ee8..64c0840 100644
--- a/libminifi/src/io/DescriptorStream.cpp
+++ b/libminifi/src/io/DescriptorStream.cpp
@@ -49,24 +49,18 @@ void DescriptorStream::seek(size_t offset) {
 #endif
 }
 
-int DescriptorStream::write(const uint8_t *value, int size) {
-  gsl_Expects(size >= 0);
-  if (size == 0) {
-    return 0;
-  }
-  if (!IsNullOrEmpty(value)) {
-    std::lock_guard<std::recursive_mutex> lock(file_lock_);
+size_t DescriptorStream::write(const uint8_t *value, size_t size) {
+  if (size == 0) return 0;
+  if (IsNullOrEmpty(value)) return STREAM_ERROR;
+  std::lock_guard<std::recursive_mutex> lock(file_lock_);
 #ifdef WIN32
-    if (_write(fd_, value, size) != size) {
+  if (static_cast<size_t>(_write(fd_, value, size)) != size) {
 #else
-    if (::write(fd_, value, size) != size) {
+  if (static_cast<size_t>(::write(fd_, value, size)) != size) {
 #endif
-      return -1;
-    } else {
-      return size;
-    }
+    return STREAM_ERROR;
   } else {
-    return -1;
+    return size;
   }
 }
 
diff --git a/libminifi/src/io/FileStream.cpp b/libminifi/src/io/FileStream.cpp
index 90fb0a2..c5470b4 100644
--- a/libminifi/src/io/FileStream.cpp
+++ b/libminifi/src/io/FileStream.cpp
@@ -113,35 +113,30 @@ void FileStream::seek(size_t offset) {
     logging::LOG_ERROR(logger_) << SEEK_ERROR_MSG << SEEKP_CALL_ERROR_MSG;
 }
 
-int FileStream::write(const uint8_t *value, int size) {
-  gsl_Expects(size >= 0);
-  if (size == 0) {
-    return 0;
-  }
-  if (!IsNullOrEmpty(value)) {
-    std::lock_guard<std::mutex> lock(file_lock_);
-    if (file_stream_ == nullptr || !file_stream_->is_open()) {
-      logging::LOG_ERROR(logger_) << WRITE_ERROR_MSG << 
INVALID_FILE_STREAM_ERROR_MSG;
-      return -1;
-    }
-    if (file_stream_->write(reinterpret_cast<const char*>(value), size)) {
-      offset_ += size;
-      if (offset_ > length_) {
-        length_ = offset_;
-      }
-      if (!file_stream_->flush()) {
-        logging::LOG_ERROR(logger_) << WRITE_ERROR_MSG << FLUSH_CALL_ERROR_MSG;
-        return -1;
-      }
-      return size;
-    } else {
-      logging::LOG_ERROR(logger_) << WRITE_ERROR_MSG << WRITE_CALL_ERROR_MSG;
-      return -1;
-    }
-  } else {
+size_t FileStream::write(const uint8_t *value, size_t size) {
+  if (size == 0) return 0;
+  if (IsNullOrEmpty(value)) {
     logging::LOG_ERROR(logger_) << WRITE_ERROR_MSG << EMPTY_MESSAGE_ERROR_MSG;
-    return -1;
+    return STREAM_ERROR;
+  }
+  std::lock_guard<std::mutex> lock(file_lock_);
+  if (file_stream_ == nullptr || !file_stream_->is_open()) {
+    logging::LOG_ERROR(logger_) << WRITE_ERROR_MSG << 
INVALID_FILE_STREAM_ERROR_MSG;
+    return STREAM_ERROR;
+  }
+  if (!file_stream_->write(reinterpret_cast<const char*>(value), 
gsl::narrow<std::streamsize>(size))) {
+    logging::LOG_ERROR(logger_) << WRITE_ERROR_MSG << WRITE_CALL_ERROR_MSG;
+    return STREAM_ERROR;
+  }
+  offset_ += size;
+  if (offset_ > length_) {
+    length_ = offset_;
+  }
+  if (!file_stream_->flush()) {
+    logging::LOG_ERROR(logger_) << WRITE_ERROR_MSG << FLUSH_CALL_ERROR_MSG;
+    return STREAM_ERROR;
   }
+  return size;
 }
 
 size_t FileStream::read(uint8_t *buf, size_t buflen) {
diff --git a/libminifi/src/io/OutputStream.cpp 
b/libminifi/src/io/OutputStream.cpp
index 355cea7..c19fff6 100644
--- a/libminifi/src/io/OutputStream.cpp
+++ b/libminifi/src/io/OutputStream.cpp
@@ -30,43 +30,43 @@ namespace nifi {
 namespace minifi {
 namespace io {
 
-int OutputStream::write(const std::vector<uint8_t>& buffer, int len) {
-  if (buffer.size() < gsl::narrow<size_t>(len)) {
-    return -1;
+size_t OutputStream::write(const std::vector<uint8_t>& buffer, size_t len) {
+  if (buffer.size() < len) {
+    return STREAM_ERROR;
   }
   return write(buffer.data(), len);
 }
 
-int OutputStream::write(bool value) {
+size_t OutputStream::write(bool value) {
   uint8_t temp = value;
   return write(&temp, 1);
 }
 
-int OutputStream::write(const utils::Identifier &value) {
+size_t OutputStream::write(const utils::Identifier &value) {
   return write(value.to_string());
 }
 
-int OutputStream::write(const std::string& str, bool widen) {
+size_t OutputStream::write(const std::string& str, bool widen) {
   return write_str(str.c_str(), gsl::narrow<uint32_t>(str.length()), widen);
 }
 
-int OutputStream::write(const char* str, bool widen) {
+size_t OutputStream::write(const char* str, bool widen) {
   return write_str(str, gsl::narrow<uint32_t>(std::strlen(str)), widen);
 }
 
-int OutputStream::write_str(const char* str, uint32_t len, bool widen) {
-  int ret = 0;
+size_t OutputStream::write_str(const char* str, uint32_t len, bool widen) {
+  size_t ret = 0;
   if (!widen) {
-    uint16_t shortLen = len;
+    const auto shortLen = gsl::narrow_cast<uint16_t>(len);
     if (len != shortLen) {
-      return -1;
+      return STREAM_ERROR;
     }
     ret = write(shortLen);
   } else {
     ret = write(len);
   }
 
-  if (ret <= 0) {
+  if (ret == 0 || isError(ret)) {
     return ret;
   }
 
diff --git a/libminifi/src/io/ZlibStream.cpp b/libminifi/src/io/ZlibStream.cpp
index aaf66b9..711a31d 100644
--- a/libminifi/src/io/ZlibStream.cpp
+++ b/libminifi/src/io/ZlibStream.cpp
@@ -61,15 +61,14 @@ ZlibCompressStream::~ZlibCompressStream() {
   }
 }
 
-int ZlibCompressStream::write(const uint8_t* value, int size) {
-  gsl_Expects(size >= 0);
+size_t ZlibCompressStream::write(const uint8_t* value, size_t size) {
   if (state_ != ZlibStreamState::INITIALIZED) {
     logger_->log_error("writeData called in invalid ZlibCompressStream state, 
state is %hhu", state_);
-    return -1;
+    return STREAM_ERROR;
   }
 
   strm_.next_in = const_cast<uint8_t*>(value);
-  strm_.avail_in = size;
+  strm_.avail_in = gsl::narrow<uInt>(size);
 
   /*
    * deflate consumes all input data it can (i.e. if it has enough output 
buffer it never leaves input data unconsumed)
@@ -92,14 +91,14 @@ int ZlibCompressStream::write(const uint8_t* value, int 
size) {
     if (ret == Z_STREAM_ERROR) {
       logger_->log_error("deflate failed, error code: %d", ret);
       state_ = ZlibStreamState::ERRORED;
-      return -1;
+      return STREAM_ERROR;
     }
-    int output_size = gsl::narrow<int>(outputBuffer_.size() - strm_.avail_out);
+    const auto output_size = outputBuffer_.size() - strm_.avail_out;
     logger_->log_trace("deflate produced %d B of output data", output_size);
     if (output_->write(outputBuffer_.data(), output_size) != output_size) {
       logger_->log_error("Failed to write to underlying stream");
       state_ = ZlibStreamState::ERRORED;
-      return -1;
+      return STREAM_ERROR;
     }
   } while (strm_.avail_out == 0);
 
@@ -131,15 +130,14 @@ ZlibDecompressStream::~ZlibDecompressStream() {
   }
 }
 
-int ZlibDecompressStream::write(const uint8_t* value, int size) {
-  gsl_Expects(size >= 0);
+size_t ZlibDecompressStream::write(const uint8_t* value, size_t size) {
   if (state_ != ZlibStreamState::INITIALIZED) {
     logger_->log_error("writeData called in invalid ZlibDecompressStream 
state, state is %hhu", state_);
-    return -1;
+    return STREAM_ERROR;
   }
 
   strm_.next_in = const_cast<uint8_t*>(value);
-  strm_.avail_in = size;
+  strm_.avail_in = gsl::narrow<uInt>(size);
 
   /*
    * inflate works similarly to deflate in that it will not leave input data 
unconsumed, and we have to watch avail_out,
@@ -160,14 +158,14 @@ int ZlibDecompressStream::write(const uint8_t* value, int 
size) {
         ret == Z_MEM_ERROR) {
       logger_->log_error("inflate failed, error code: %d", ret);
       state_ = ZlibStreamState::ERRORED;
-      return -1;
+      return STREAM_ERROR;
     }
-    int output_size = gsl::narrow<int>(outputBuffer_.size() - strm_.avail_out);
+    const auto output_size = outputBuffer_.size() - strm_.avail_out;
     logger_->log_trace("deflate produced %d B of output data", output_size);
     if (output_->write(outputBuffer_.data(), output_size) != output_size) {
       logger_->log_error("Failed to write to underlying stream");
       state_ = ZlibStreamState::ERRORED;
-      return -1;
+      return STREAM_ERROR;
     }
   } while (strm_.avail_out == 0);
 
diff --git a/libminifi/src/io/tls/SecureDescriptorStream.cpp 
b/libminifi/src/io/tls/SecureDescriptorStream.cpp
index ea93dea..e7a84fc 100644
--- a/libminifi/src/io/tls/SecureDescriptorStream.cpp
+++ b/libminifi/src/io/tls/SecureDescriptorStream.cpp
@@ -47,30 +47,24 @@ void SecureDescriptorStream::seek(size_t offset) {
 
 // data stream overrides
 
-int SecureDescriptorStream::write(const uint8_t *value, int size) {
-  gsl_Expects(size >= 0);
+size_t SecureDescriptorStream::write(const uint8_t *value, size_t size) {
   if (size == 0) {
     return 0;
   }
-  if (!IsNullOrEmpty(value)) {
-    std::lock_guard<std::recursive_mutex> lock(file_lock_);
-    int bytes = 0;
-     int sent = 0;
-     while (bytes < size) {
-       sent = SSL_write(ssl_, value + bytes, size - bytes);
-       // check for errors
-       if (sent < 0) {
-         int ret = 0;
-         ret = SSL_get_error(ssl_, sent);
-         logger_->log_error("WriteData socket %d send failed %s %d", fd_, 
strerror(errno), ret);
-         return sent;
-       }
-       bytes += sent;
-     }
-     return size;
-  } else {
-    return -1;
+  if (IsNullOrEmpty(value)) return STREAM_ERROR;
+  std::lock_guard<std::recursive_mutex> lock(file_lock_);
+  size_t bytes = 0;
+  while (bytes < size) {
+    const auto write_status = SSL_write(ssl_, value + bytes, 
gsl::narrow_cast<int>(size - bytes));
+    // check for errors
+    if (write_status < 0) {
+      const auto ret = SSL_get_error(ssl_, write_status);
+      logger_->log_error("WriteData socket %d send failed %s %d", fd_, 
strerror(errno), ret);
+      return STREAM_ERROR;
+    }
+    bytes += gsl::narrow<size_t>(write_status);
   }
+  return size;
 }
 
 size_t SecureDescriptorStream::read(uint8_t * const buf, const size_t buflen) {
diff --git a/libminifi/src/io/tls/TLSServerSocket.cpp 
b/libminifi/src/io/tls/TLSServerSocket.cpp
index ed9cdc5..ae9d692 100644
--- a/libminifi/src/io/tls/TLSServerSocket.cpp
+++ b/libminifi/src/io/tls/TLSServerSocket.cpp
@@ -33,6 +33,7 @@
 #include <cstdio>
 #include <memory>
 #include <chrono>
+#include <limits>
 #include <thread>
 #include <utility>
 #include <vector>
@@ -44,6 +45,8 @@
 #include "core/logging/LoggerConfiguration.h"
 #include "io/tls/SecureDescriptorStream.h"
 #include "io/validation.h"
+#include "utils/GeneralUtils.h"
+#include "utils/gsl.h"
 
 namespace org {
 namespace apache {
@@ -87,47 +90,55 @@ void 
TLSServerSocket::registerCallback(std::function<bool()> accept_function, st
  * Initializes the socket
  * @return result of the creation operation.
  */
-void TLSServerSocket::registerCallback(std::function<bool()> accept_function, 
std::function<int(std::vector<uint8_t>*, int *)> handler, 
std::chrono::milliseconds timeout) {
-  fx = [this](std::function<bool()> accept_function, 
std::function<int(std::vector<uint8_t>*, int *)> handler, 
std::chrono::milliseconds timeout) {
-    int ret = 0;
-    std::vector<int> fds;
-    int size;
-    while (accept_function()) {
-      int fd = select_descriptor(gsl::narrow<uint16_t>(timeout.count()));
-      if (fd > 0) {
-        int fd_remove = 0;
-        std::vector<uint8_t> data;
-        if ( handler(&data, &size) > 0 ) {
-          ret = writeData(data.data(), size, fd);
-          if (ret < 0) {
-            close_ssl(fd_remove);
-          } else {
-            fds.push_back(fd);
-          }
-        }
-      } else {
-        int fd_remove = 0;
-        for (auto &&fd : fds) {
+void TLSServerSocket::registerCallback(std::function<bool()> accept_function, 
std::function<size_t(std::vector<uint8_t>*)> handler, std::chrono::milliseconds 
timeout) {
+  struct Fx {
+    void operator()() const {
+      std::vector<int> fds;
+      size_t size;
+      while (accept_function()) {
+        int fd = 
server_socket_->select_descriptor(gsl::narrow<uint16_t>(timeout.count()));
+        if (fd > 0) {
+          int fd_remove = 0;
           std::vector<uint8_t> data;
-          if ( handler(&data, &size) > 0 ) {
-            ret = writeData(data.data(), size, fd);
-            if (ret < 0) {
-              fd_remove = fd;
-              break;
+          size = handler(&data);
+          if (size > 0 && !io::isError(size)) {
+            const auto ret = server_socket_->writeData(data.data(), size, fd);
+            if (io::isError(ret)) {
+              server_socket_->close_ssl(fd_remove);
+            } else {
+              fds.push_back(fd);
             }
           }
-        }
-        if (fd_remove > 0) {
-          close_ssl(fd_remove);
-          fds.erase(std::remove(fds.begin(), fds.end(), fd_remove), fds.end());
+        } else {
+          int fd_remove = 0;
+          for (auto &&fd : fds) {
+            std::vector<uint8_t> data;
+            size = handler(&data);
+            if (size > 0 && !io::isError(size)) {
+              const auto ret = server_socket_->writeData(data.data(), size, 
fd);
+              if (io::isError(ret)) {
+                fd_remove = fd;
+                break;
+              }
+            }
+          }
+          if (fd_remove > 0) {
+            server_socket_->close_ssl(fd_remove);
+            fds.erase(std::remove(fds.begin(), fds.end(), fd_remove), 
fds.end());
+          }
         }
       }
+      for (auto &&fd : fds) {
+        server_socket_->close_ssl(fd);
+      }
     }
-    for (auto &&fd : fds) {
-      close_ssl(fd);
-    }
+
+    TLSServerSocket* server_socket_;
+    std::function<bool()> accept_function;
+    std::function<size_t(std::vector<uint8_t>*)> handler;
+    std::chrono::milliseconds timeout;
   };
-  server_read_thread_ = std::thread(fx, accept_function, handler, timeout);
+  server_read_thread_ = std::thread(Fx{this, std::move(accept_function), 
std::move(handler), timeout});
 }
 
 void TLSServerSocket::close_fd(int fd) {
diff --git a/libminifi/src/io/tls/TLSSocket.cpp 
b/libminifi/src/io/tls/TLSSocket.cpp
index 93422fa..1cb6268 100644
--- a/libminifi/src/io/tls/TLSSocket.cpp
+++ b/libminifi/src/io/tls/TLSSocket.cpp
@@ -387,34 +387,32 @@ size_t TLSSocket::read(uint8_t *buf, size_t buflen, bool 
/*retrieve_all_bytes*/)
   return total_read;
 }
 
-int TLSSocket::writeData(const uint8_t *value, unsigned int size, int fd) {
-  unsigned int bytes = 0;
-  int sent = 0;
+size_t TLSSocket::writeData(const uint8_t *value, size_t size, int fd) {
+  size_t bytes = 0;
   auto fd_ssl = get_ssl(fd);
   if (IsNullOrEmpty(fd_ssl)) {
-    return -1;
+    return STREAM_ERROR;
   }
   while (bytes < size) {
-    sent = SSL_write(fd_ssl, value + bytes, gsl::narrow<int>(size - bytes));
+    const auto sent = SSL_write(fd_ssl, value + bytes, gsl::narrow<int>(size - 
bytes));
     // check for errors
     if (sent < 0) {
       int ret = 0;
       ret = SSL_get_error(fd_ssl, sent);
       logger_->log_trace("WriteData socket %d send failed %s %d", fd, 
strerror(errno), ret);
-
-      return sent;
+      return STREAM_ERROR;
     }
     logger_->log_trace("WriteData socket %d send succeed %d", fd, sent);
-    bytes += sent;
+    bytes += gsl::narrow<size_t>(sent);
   }
-  return gsl::narrow<int>(size);
+  return size;
 }
 
-int TLSSocket::write(const uint8_t *value, int size) {
-  int fd = select_descriptor(1000);
+size_t TLSSocket::write(const uint8_t *value, size_t size) {
+  const int fd = select_descriptor(1000);
   if (fd < 0) {
     close();
-    return -1;
+    return STREAM_ERROR;
   }
   return writeData(value, size, fd);
 }
diff --git a/libminifi/src/provenance/Provenance.cpp 
b/libminifi/src/provenance/Provenance.cpp
index f3b0715..c3f6caf 100644
--- a/libminifi/src/provenance/Provenance.cpp
+++ b/libminifi/src/provenance/Provenance.cpp
@@ -88,134 +88,158 @@ bool ProvenanceEventRecord::DeSerialize(const 
std::shared_ptr<core::Serializable
 }
 
 bool 
ProvenanceEventRecord::Serialize(org::apache::nifi::minifi::io::BufferStream& 
outStream) {
-  int ret;
-
-  ret = outStream.write(this->uuid_);
-  if (ret <= 0) {
-    return false;
+  {
+    const auto ret = outStream.write(this->uuid_);
+    if (ret == 0 || io::isError(ret)) {
+      return false;
+    }
   }
-
-  uint32_t eventType = this->_eventType;
-  ret = outStream.write(eventType);
-  if (ret != 4) {
-    return false;
+  {
+    uint32_t eventType = this->_eventType;
+    const auto ret = outStream.write(eventType);
+    if (ret != 4) {
+      return false;
+    }
   }
-
-  ret = outStream.write(this->_eventTime);
-  if (ret != 8) {
-    return false;
+  {
+    const auto ret = outStream.write(this->_eventTime);
+    if (ret != 8) {
+      return false;
+    }
   }
-
-  ret = outStream.write(this->_entryDate);
-  if (ret != 8) {
-    return false;
+  {
+    const auto ret = outStream.write(this->_entryDate);
+    if (ret != 8) {
+      return false;
+    }
   }
-
-  ret = outStream.write(this->_eventDuration);
-  if (ret != 8) {
-    return false;
+  {
+    const auto ret = outStream.write(this->_eventDuration);
+    if (ret != 8) {
+      return false;
+    }
   }
-
-  ret = outStream.write(this->_lineageStartDate);
-  if (ret != 8) {
-    return false;
+  {
+    const auto ret = outStream.write(this->_lineageStartDate);
+    if (ret != 8) {
+      return false;
+    }
   }
-
-  ret = outStream.write(this->_componentId);
-  if (ret <= 0) {
-    return false;
+  {
+    const auto ret = outStream.write(this->_componentId);
+    if (ret == 0 || io::isError(ret)) {
+      return false;
+    }
   }
-
-  ret = outStream.write(this->_componentType);
-  if (ret <= 0) {
-    return false;
+  {
+    const auto ret = outStream.write(this->_componentType);
+    if (ret == 0 || io::isError(ret)) {
+      return false;
+    }
   }
-
-  ret = outStream.write(this->flow_uuid_);
-  if (ret <= 0) {
-    return false;
+  {
+    const auto ret = outStream.write(this->flow_uuid_);
+    if (ret == 0 || io::isError(ret)) {
+      return false;
+    }
   }
-
-  ret = outStream.write(this->_details);
-  if (ret <= 0) {
-    return false;
+  {
+    const auto ret = outStream.write(this->_details);
+    if (ret == 0 || io::isError(ret)) {
+      return false;
+    }
   }
-
   // write flow attributes
-  uint32_t numAttributes = gsl::narrow<uint32_t>(this->_attributes.size());
-  ret = outStream.write(numAttributes);
-  if (ret != 4) {
-    return false;
+  {
+    const auto numAttributes = gsl::narrow<uint32_t>(this->_attributes.size());
+    const auto ret = outStream.write(numAttributes);
+    if (ret != 4) {
+      return false;
+    }
   }
-
   for (const auto& itAttribute : _attributes) {
-    ret = outStream.write(itAttribute.first);
-    if (ret <= 0) {
-      return false;
+    {
+      const auto ret = outStream.write(itAttribute.first);
+      if (ret == 0 || io::isError(ret)) {
+        return false;
+      }
     }
-    ret = outStream.write(itAttribute.second);
-    if (ret <= 0) {
-      return false;
+    {
+      const auto ret = outStream.write(itAttribute.second);
+      if (ret == 0 || io::isError(ret)) {
+        return false;
+      }
     }
   }
-
-  ret = outStream.write(this->_contentFullPath);
-  if (ret <= 0) {
-    return false;
+  {
+    const auto ret = outStream.write(this->_contentFullPath);
+    if (ret == 0 || io::isError(ret)) {
+      return false;
+    }
   }
-
-  ret = outStream.write(this->_size);
-  if (ret != 8) {
-    return false;
+  {
+    const auto ret = outStream.write(this->_size);
+    if (ret != 8) {
+      return false;
+    }
   }
-
-  ret = outStream.write(this->_offset);
-  if (ret != 8) {
-    return false;
+  {
+    const auto ret = outStream.write(this->_offset);
+    if (ret != 8) {
+      return false;
+    }
   }
-
-  ret = outStream.write(this->_sourceQueueIdentifier);
-  if (ret <= 0) {
-    return false;
+  {
+    const auto ret = outStream.write(this->_sourceQueueIdentifier);
+    if (ret == 0 || io::isError(ret)) {
+      return false;
+    }
   }
-
   if (this->_eventType == ProvenanceEventRecord::FORK || this->_eventType == 
ProvenanceEventRecord::CLONE || this->_eventType == 
ProvenanceEventRecord::JOIN) {
     // write UUIDs
-    uint32_t parent_uuids_count = 
gsl::narrow<uint32_t>(this->_parentUuids.size());
-    ret = outStream.write(parent_uuids_count);
-    if (ret != 4) {
-      return false;
+    {
+      const auto parent_uuids_count = 
gsl::narrow<uint32_t>(this->_parentUuids.size());
+      const auto ret = outStream.write(parent_uuids_count);
+      if (ret != 4) {
+        return false;
+      }
     }
     for (const auto& parentUUID : _parentUuids) {
-      ret = outStream.write(parentUUID);
-      if (ret <= 0) {
+      const auto ret = outStream.write(parentUUID);
+      if (ret == 0 || io::isError(ret)) {
         return false;
       }
     }
-    uint32_t children_uuids_count = 
gsl::narrow<uint32_t>(this->_childrenUuids.size());
-    ret = outStream.write(children_uuids_count);
-    if (ret != 4) {
-      return false;
+    {
+      const auto children_uuids_count = 
gsl::narrow<uint32_t>(this->_childrenUuids.size());
+      const auto ret = outStream.write(children_uuids_count);
+      if (ret != 4) {
+        return false;
+      }
     }
     for (const auto& childUUID : _childrenUuids) {
-      ret = outStream.write(childUUID);
-      if (ret <= 0) {
+      const auto ret = outStream.write(childUUID);
+      if (ret == 0 || io::isError(ret)) {
         return false;
       }
     }
   } else if (this->_eventType == ProvenanceEventRecord::SEND || 
this->_eventType == ProvenanceEventRecord::FETCH) {
-    ret = outStream.write(this->_transitUri);
-    if (ret <= 0) {
+    const auto ret = outStream.write(this->_transitUri);
+    if (ret == 0 || io::isError(ret)) {
       return false;
     }
   } else if (this->_eventType == ProvenanceEventRecord::RECEIVE) {
-    ret = outStream.write(this->_transitUri);
-    if (ret <= 0) {
-      return false;
+    {
+      const auto ret = outStream.write(this->_transitUri);
+      if (ret == 0 || io::isError(ret)) {
+        return false;
+      }
     }
-    ret = outStream.write(this->_sourceSystemFlowFileIdentifier);
-    if (ret <= 0) {
-      return false;
+    {
+      const auto ret = outStream.write(this->_sourceSystemFlowFileIdentifier);
+      if (ret == 0 || io::isError(ret)) {
+        return false;
+      }
     }
   }
 
diff --git a/libminifi/src/serialization/FlowFileV3Serializer.cpp 
b/libminifi/src/serialization/FlowFileV3Serializer.cpp
index 1fa30a7..2407f87 100644
--- a/libminifi/src/serialization/FlowFileV3Serializer.cpp
+++ b/libminifi/src/serialization/FlowFileV3Serializer.cpp
@@ -27,85 +27,78 @@ namespace minifi {
 
 constexpr uint8_t FlowFileV3Serializer::MAGIC_HEADER[];
 
-int FlowFileV3Serializer::writeLength(std::size_t length, const 
std::shared_ptr<io::OutputStream>& out) {
+size_t FlowFileV3Serializer::writeLength(std::size_t length, const 
std::shared_ptr<io::OutputStream>& out) {
   if (length < MAX_2_BYTE_VALUE) {
     return out->write(static_cast<uint16_t>(length));
   }
-  int sum = 0;
-  int ret;
-  ret = out->write(static_cast<uint16_t>(MAX_2_BYTE_VALUE));
-  if (ret < 0) {
-    return ret;
+  size_t sum = 0;
+  {
+    const auto ret = out->write(static_cast<uint16_t>(MAX_2_BYTE_VALUE));
+    if (io::isError(ret)) return ret;
+    sum += ret;
   }
-  sum += ret;
-  ret = out->write(static_cast<uint32_t>(length));
-  if (ret < 0) {
-    return ret;
+  {
+    const auto ret = out->write(static_cast<uint32_t>(length));
+    if (io::isError(ret)) return ret;
+    sum += ret;
   }
-  sum += ret;
   return sum;
 }
 
-int FlowFileV3Serializer::writeString(const std::string &str, const 
std::shared_ptr<io::OutputStream> &out) {
-  int sum = 0;
-  int ret;
-  ret = writeLength(str.length(), out);
-  if (ret < 0) {
-    return ret;
-  }
-  sum += ret;
-  ret = out->write(const_cast<uint8_t*>(reinterpret_cast<const 
uint8_t*>(str.data())), gsl::narrow<int>(str.length()));
-  if (ret < 0) {
-    return ret;
+size_t FlowFileV3Serializer::writeString(const std::string &str, const 
std::shared_ptr<io::OutputStream> &out) {
+  size_t sum = 0;
+  {
+    const auto ret = writeLength(str.length(), out);
+    if (io::isError(ret)) return ret;
+    sum += ret;
   }
-  if (gsl::narrow<size_t>(ret) != str.length()) {
-    return -1;
+  {
+    const auto ret = out->write(reinterpret_cast<const uint8_t*>(str.data()), 
str.length());
+    if (io::isError(ret)) return ret;
+    if (ret != str.length()) return io::STREAM_ERROR;
+    sum += ret;
   }
-  sum += ret;
   return sum;
 }
 
 int FlowFileV3Serializer::serialize(const std::shared_ptr<core::FlowFile>& 
flowFile, const std::shared_ptr<io::OutputStream>& out) {
-  int sum = 0;
-  int ret;
-  ret = out->write(const_cast<uint8_t*>(MAGIC_HEADER), sizeof(MAGIC_HEADER));
-  if (ret < 0) {
-    return ret;
-  }
-  if (ret != sizeof(MAGIC_HEADER)) {
-    return -1;
+  size_t sum = 0;
+  {
+    const auto ret = out->write(MAGIC_HEADER, sizeof(MAGIC_HEADER));
+    if (io::isError(ret)) return -1;
+    if (ret != sizeof(MAGIC_HEADER)) return -1;
+    sum += ret;
   }
-  sum += ret;
   const auto& attributes = flowFile->getAttributes();
-  ret = writeLength(attributes.size(), out);
-  if (ret < 0) {
-    return ret;
+  {
+    const auto ret = writeLength(attributes.size(), out);
+    if (io::isError(ret)) return -1;
+    sum += ret;
   }
-  sum += ret;
   for (const auto& attrIt : attributes) {
-    ret = writeString(attrIt.first, out);
-    if (ret < 0) {
-      return ret;
+    {
+      const auto ret = writeString(attrIt.first, out);
+      if (io::isError(ret)) return -1;
+      sum += ret;
     }
-    sum += ret;
-    ret = writeString(attrIt.second, out);
-    if (ret < 0) {
-      return ret;
+    {
+      const auto ret = writeString(attrIt.second, out);
+      if (io::isError(ret)) return -1;
+      sum += ret;
     }
-    sum += ret;
   }
-  ret = out->write(static_cast<uint64_t>(flowFile->getSize()));
-  if (ret < 0) {
-    return ret;
+  {
+    const auto ret = out->write(static_cast<uint64_t>(flowFile->getSize()));
+    if (io::isError(ret)) return -1;
+    sum += ret;
   }
-  sum += ret;
   InputStreamPipe pipe(out);
-  ret = reader_(flowFile, &pipe);
-  if (ret < 0) {
-    return ret;
+  {
+    const auto ret = reader_(flowFile, &pipe);
+    if (ret < 0) return -1;
+    sum += gsl::narrow<size_t>(ret);
   }
-  sum += ret;
-  return sum;
+  return gsl::narrow<int>(sum);
 }
 
 } /* namespace minifi */
diff --git a/libminifi/src/sitetosite/Peer.cpp 
b/libminifi/src/sitetosite/Peer.cpp
index 3ed6b70..5a5be8a 100644
--- a/libminifi/src/sitetosite/Peer.cpp
+++ b/libminifi/src/sitetosite/Peer.cpp
@@ -45,7 +45,7 @@ bool SiteToSitePeer::Open() {
    * previously by the socket preference.
    */
   if (!this->local_network_interface_.getInterface().empty()) {
-    auto socket = static_cast<io::Socket*>(stream_.get());
+    auto* socket = dynamic_cast<io::Socket*>(stream_.get());
     if (nullptr != socket) {
       
socket->setInterface(io::NetworkInterface(local_network_interface_.getInterface(),
 nullptr));
     }
@@ -54,9 +54,8 @@ bool SiteToSitePeer::Open() {
   if (stream_->initialize() < 0)
     return false;
 
-  int data_size = gsl::narrow<int>(sizeof MAGIC_BYTES);
-
-  if (stream_->write(reinterpret_cast<uint8_t 
*>(const_cast<char*>(MAGIC_BYTES)), data_size) != data_size) {
+  const auto data_size = sizeof MAGIC_BYTES;
+  if (stream_->write(reinterpret_cast<const uint8_t *>(MAGIC_BYTES), 
data_size) != data_size) {
     return false;
   }
 
diff --git a/libminifi/src/sitetosite/RawSocketProtocol.cpp 
b/libminifi/src/sitetosite/RawSocketProtocol.cpp
index b60b2f4..f02f785 100644
--- a/libminifi/src/sitetosite/RawSocketProtocol.cpp
+++ b/libminifi/src/sitetosite/RawSocketProtocol.cpp
@@ -117,7 +117,7 @@ bool RawSiteToSiteClient::initiateResourceNegotiation() {
   {
     const auto ret = peer_->write(getResourceName());
     logger_->log_trace("result of writing resource name is %i", ret);
-    if (ret <= 0) {
+    if (ret == 0 || io::isError(ret)) {
       logger_->log_debug("result of writing resource name is %i", ret);
       // tearDown();
       return false;
@@ -126,7 +126,7 @@ bool RawSiteToSiteClient::initiateResourceNegotiation() {
 
   {
     const auto ret = peer_->write(_currentVersion);
-    if (ret <= 0) {
+    if (ret == 0 || io::isError(ret)) {
       logger_->log_debug("result of writing version is %i", ret);
       return false;
     }
@@ -185,7 +185,7 @@ bool 
RawSiteToSiteClient::initiateCodecResourceNegotiation() {
 
   {
     const auto ret = peer_->write(getCodecResourceName());
-    if (ret <= 0) {
+    if (ret == 0 || io::isError(ret)) {
       logger_->log_debug("result of getCodecResourceName is %i", ret);
       return false;
     }
@@ -193,7 +193,7 @@ bool 
RawSiteToSiteClient::initiateCodecResourceNegotiation() {
 
   {
     const auto ret = peer_->write(_currentCodecVersion);
-    if (ret <= 0) {
+    if (ret == 0 || io::isError(ret)) {
       logger_->log_debug("result of _currentCodecVersion is %i", ret);
       return false;
     }
@@ -248,7 +248,7 @@ bool RawSiteToSiteClient::handShake() {
 
   {
     const auto ret = peer_->write(_commsIdentifier);
-    if (ret <= 0) {
+    if (ret == 0 || io::isError(ret)) {
       return false;
     }
   }
@@ -268,7 +268,7 @@ bool RawSiteToSiteClient::handShake() {
 
   if (_currentVersion >= 3) {
     const auto ret = peer_->write(peer_->getURL());
-    if (ret <= 0) {
+    if (ret == 0 || io::isError(ret)) {
       return false;
     }
   }
@@ -276,7 +276,7 @@ bool RawSiteToSiteClient::handShake() {
   {
     const auto size = gsl::narrow<uint32_t>(properties.size());
     const auto ret = peer_->write(size);
-    if (ret <= 0) {
+    if (ret == 0 || io::isError(ret)) {
       return false;
     }
   }
@@ -285,13 +285,13 @@ bool RawSiteToSiteClient::handShake() {
   for (it = properties.begin(); it != properties.end(); it++) {
     {
       const auto ret = peer_->write(it->first);
-      if (ret <= 0) {
+      if (ret == 0 || io::isError(ret)) {
         return false;
       }
     }
     {
       const auto ret = peer_->write(it->second);
-      if (ret <= 0) {
+      if (ret == 0 || io::isError(ret)) {
         return false;
       }
     }
@@ -412,7 +412,8 @@ bool 
RawSiteToSiteClient::getPeerList(std::vector<PeerStatus> &peers) {
     if (type >= MAX_REQUEST_TYPE)
       return -1;
 
-    return peer_->write(SiteToSiteRequest::RequestTypeStr[type]);
+    const auto write_result = 
peer_->write(SiteToSiteRequest::RequestTypeStr[type]);
+    return io::isError(write_result) ? -1 : gsl::narrow<int>(write_result);
   }
 
   int RawSiteToSiteClient::readRequestType(RequestType &type) {
diff --git a/libminifi/src/sitetosite/SiteToSiteClient.cpp 
b/libminifi/src/sitetosite/SiteToSiteClient.cpp
index 60cef54..f12a96d 100644
--- a/libminifi/src/sitetosite/SiteToSiteClient.cpp
+++ b/libminifi/src/sitetosite/SiteToSiteClient.cpp
@@ -80,29 +80,22 @@ void SiteToSiteClient::deleteTransaction(const 
utils::Identifier& transactionID)
 
 int SiteToSiteClient::writeResponse(const std::shared_ptr<Transaction>& 
/*transaction*/, RespondCode code, std::string message) {
   RespondCodeContext *resCode = this->getRespondCodeContext(code);
-
   if (!resCode) {
-    // Not a valid respond code
     return -1;
   }
 
-  uint8_t codeSeq[3];
-  codeSeq[0] = CODE_SEQUENCE_VALUE_1;
-  codeSeq[1] = CODE_SEQUENCE_VALUE_2;
-  codeSeq[2] = (uint8_t) code;
-
-  int ret = peer_->write(codeSeq, 3);
-
-  if (ret != 3)
-    return -1;
+  {
+    const uint8_t codeSeq[3] { CODE_SEQUENCE_VALUE_1, CODE_SEQUENCE_VALUE_2, 
static_cast<uint8_t>(code) };
+    const auto ret = peer_->write(codeSeq, 3);
+    if (ret != 3)
+      return -1;
+  }
 
   if (resCode->hasDescription) {
-    ret = peer_->write(message);
-    if (ret > 0) {
-      return (3 + ret);
-    } else {
-      return ret;
-    }
+    const auto ret = peer_->write(message);
+    if (io::isError(ret)) return -1;
+    if (ret == 0) return 0;
+    return 3 + gsl::narrow<int>(ret);
   } else {
     return 3;
   }
@@ -407,8 +400,6 @@ bool SiteToSiteClient::complete(const utils::Identifier& 
transactionID) {
 }
 
 int16_t SiteToSiteClient::send(const utils::Identifier &transactionID, 
DataPacket *packet, const std::shared_ptr<core::FlowFile> &flowFile, const 
std::shared_ptr<core::ProcessSession> &session) {
-  int ret;
-
   if (peer_state_ != READY) {
     bootstrap();
   }
@@ -418,7 +409,6 @@ int16_t SiteToSiteClient::send(const utils::Identifier 
&transactionID, DataPacke
   }
 
   auto it = this->known_transactions_.find(transactionID);
-
   if (it == known_transactions_.end()) {
     return -1;
   }
@@ -435,30 +425,34 @@ int16_t SiteToSiteClient::send(const utils::Identifier 
&transactionID, DataPacke
   }
 
   if (transaction->current_transfers_ > 0) {
-    ret = writeResponse(transaction, CONTINUE_TRANSACTION, 
"CONTINUE_TRANSACTION");
+    const auto ret = writeResponse(transaction, CONTINUE_TRANSACTION, 
"CONTINUE_TRANSACTION");
     if (ret <= 0) {
       return -1;
     }
   }
   // start to read the packet
-  uint32_t numAttributes = gsl::narrow<uint32_t>(packet->_attributes.size());
-  ret = transaction->getStream().write(numAttributes);
-  if (ret != 4) {
-    return -1;
+  {
+    const auto numAttributes = 
gsl::narrow<uint32_t>(packet->_attributes.size());
+    const auto ret = transaction->getStream().write(numAttributes);
+    if (ret != 4) {
+      return -1;
+    }
   }
 
-  std::map<std::string, std::string>::iterator itAttribute;
-  for (itAttribute = packet->_attributes.begin(); itAttribute != 
packet->_attributes.end(); itAttribute++) {
-    ret = transaction->getStream().write(itAttribute->first, true);
-
-    if (ret <= 0) {
-      return -1;
+  for (const auto& attribute : packet->_attributes) {
+    {
+      const auto ret = transaction->getStream().write(attribute.first, true);
+      if (ret == 0 || io::isError(ret)) {
+        return -1;
+      }
     }
-    ret = transaction->getStream().write(itAttribute->second, true);
-    if (ret <= 0) {
-      return -1;
+    {
+      const auto ret = transaction->getStream().write(attribute.second, true);
+      if (ret == 0 || io::isError(ret)) {
+        return -1;
+      }
     }
-    logger_->log_debug("Site2Site transaction %s send attribute key %s value 
%s", transactionID.to_string(), itAttribute->first, itAttribute->second);
+    logger_->log_debug("Site2Site transaction %s send attribute key %s value 
%s", transactionID.to_string(), attribute.first, attribute.second);
   }
 
   bool flowfile_has_content = (flowFile != nullptr);
@@ -472,7 +466,7 @@ int16_t SiteToSiteClient::send(const utils::Identifier 
&transactionID, DataPacke
   uint64_t len = 0;
   if (flowFile && flowfile_has_content) {
     len = flowFile->getSize();
-    ret = transaction->getStream().write(len);
+    const auto ret = transaction->getStream().write(len);
     if (ret != 8) {
       logger_->log_debug("Failed to write content size!");
       return -1;
@@ -493,20 +487,22 @@ int16_t SiteToSiteClient::send(const utils::Identifier 
&transactionID, DataPacke
     }
   } else if (packet->payload_.length() > 0) {
     len = packet->payload_.length();
-
-    ret = transaction->getStream().write(len);
-    if (ret != 8) {
-      return -1;
+    {
+      const auto ret = transaction->getStream().write(len);
+      if (ret != 8) {
+        return -1;
+      }
     }
-
-    ret = transaction->getStream().write(reinterpret_cast<uint8_t 
*>(const_cast<char*>(packet->payload_.c_str())), gsl::narrow<int>(len));
-    if (ret != gsl::narrow<int64_t>(len)) {
-      logger_->log_debug("Failed to write payload size!");
-      return -1;
+    {
+      const auto ret = transaction->getStream().write(reinterpret_cast<const 
uint8_t*>(packet->payload_.c_str()), gsl::narrow<size_t>(len));
+      if (ret != gsl::narrow<size_t>(len)) {
+        logger_->log_debug("Failed to write payload size!");
+        return -1;
+      }
     }
     packet->_size += len;
   } else if (flowFile && !flowfile_has_content) {
-    ret = transaction->getStream().write(len);  // Indicate zero length
+    const auto ret = transaction->getStream().write(len);  // Indicate zero 
length
     if (ret != 8) {
       logger_->log_debug("Failed to write content size (0)!");
       return -1;
diff --git a/libminifi/src/utils/ByteArrayCallback.cpp 
b/libminifi/src/utils/ByteArrayCallback.cpp
index a64c3ac..c64e779 100644
--- a/libminifi/src/utils/ByteArrayCallback.cpp
+++ b/libminifi/src/utils/ByteArrayCallback.cpp
@@ -45,7 +45,7 @@ int64_t StreamOutputCallback::process(const 
std::shared_ptr<io::BaseStream>& str
   stream->seek(0);
   std::unique_ptr<char> buffer = std::unique_ptr<char>(new char[size_.load()]);
   auto written = readFully(buffer.get(), size_);
-  stream->write(reinterpret_cast<uint8_t*>(buffer.get()), 
gsl::narrow<int>(written));
+  stream->write(reinterpret_cast<uint8_t*>(buffer.get()), written);
   return gsl::narrow<int64_t>(stream->size());
 }
 
diff --git a/libminifi/test/archive-tests/MergeFileTests.cpp 
b/libminifi/test/archive-tests/MergeFileTests.cpp
index 4ad473d..a0408ab 100644
--- a/libminifi/test/archive-tests/MergeFileTests.cpp
+++ b/libminifi/test/archive-tests/MergeFileTests.cpp
@@ -88,7 +88,7 @@ class FixedBuffer : public minifi::InputStreamCallback {
     REQUIRE(size_ + len <= capacity_);
     int total_read = 0;
     do {
-      const auto ret = input.read(end(), len);
+      const size_t ret{ input.read(end(), len) };
       if (ret == 0) break;
       if (minifi::io::isError(ret)) return -1;
       size_ += ret;
@@ -679,7 +679,7 @@ TEST_CASE_METHOD(MergeTestController, "Test Merge File 
Attributes Keeping All Un
 }
 
 void writeString(const std::string& str, const 
std::shared_ptr<minifi::io::BaseStream>& out) {
-  out->write(const_cast<uint8_t*>(reinterpret_cast<const 
uint8_t*>(str.data())), gsl::narrow<int>(str.length()));
+  out->write(reinterpret_cast<const uint8_t*>(str.data()), str.length());
 }
 
 TEST_CASE("FlowFile serialization", "[testFlowFileSerialization]") {
diff --git a/libminifi/test/persistence-tests/PersistenceTests.cpp 
b/libminifi/test/persistence-tests/PersistenceTests.cpp
index 1bf7b0c..88aa006 100644
--- a/libminifi/test/persistence-tests/PersistenceTests.cpp
+++ b/libminifi/test/persistence-tests/PersistenceTests.cpp
@@ -96,7 +96,7 @@ struct TestFlow{
     processor->onSchedule(processorContext.get(), new 
core::ProcessSessionFactory(processorContext));
   }
   std::shared_ptr<core::FlowFile> write(const std::string& data) {
-    minifi::io::BufferStream stream(reinterpret_cast<const 
uint8_t*>(data.c_str()), gsl::narrow<int>(data.length()));
+    minifi::io::BufferStream stream(reinterpret_cast<const 
uint8_t*>(data.c_str()), data.length());
     core::ProcessSession sessionGenFlowFile(inputContext);
     std::shared_ptr<core::FlowFile> flow = 
std::static_pointer_cast<core::FlowFile>(sessionGenFlowFile.create());
     sessionGenFlowFile.importFrom(stream, flow);
diff --git a/libminifi/test/rocksdb-tests/ContentSessionTests.cpp 
b/libminifi/test/rocksdb-tests/ContentSessionTests.cpp
index f63947b..50586ec 100644
--- a/libminifi/test/rocksdb-tests/ContentSessionTests.cpp
+++ b/libminifi/test/rocksdb-tests/ContentSessionTests.cpp
@@ -47,8 +47,7 @@ class ContentSessionController : public TestController {
 };
 
 const std::shared_ptr<minifi::io::BaseStream>& operator<<(const 
std::shared_ptr<minifi::io::BaseStream>& stream, const std::string& str) {
-  int length = gsl::narrow<int>(str.length());
-  REQUIRE(stream->write(const_cast<uint8_t*>(reinterpret_cast<const 
uint8_t*>(str.data())), length) == length);
+  REQUIRE(stream->write(reinterpret_cast<const uint8_t*>(str.data()), 
str.length()) == str.length());
   return stream;
 }
 
@@ -57,7 +56,7 @@ const std::shared_ptr<minifi::io::BaseStream>& 
operator>>(const std::shared_ptr<
   uint8_t buffer[4096]{};
   while (true) {
     const auto ret = stream->read(buffer, sizeof(buffer));
-    REQUIRE(!minifi::io::isError(ret));
+    REQUIRE_FALSE(minifi::io::isError(ret));
     if (ret == 0) { break; }
     str += std::string{reinterpret_cast<char*>(buffer), ret};
   }
diff --git a/libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp 
b/libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp
index 712d9f8..68f3d6c 100644
--- a/libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp
+++ b/libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp
@@ -65,7 +65,7 @@ TEST_CASE("Write Claim", "[TestDBCR1]") {
   // should not be able to write to the read stream
   // -1 will indicate that we were not able to write any data
 
-  REQUIRE(read_stream->write("other value") == -1);
+  REQUIRE(minifi::io::isError(read_stream->write("other value")));
 }
 
 TEST_CASE("Delete Claim", "[TestDBCR2]") {
diff --git a/libminifi/test/rocksdb-tests/RocksDBStreamTests.cpp 
b/libminifi/test/rocksdb-tests/RocksDBStreamTests.cpp
index 69336b8..d94dcce 100644
--- a/libminifi/test/rocksdb-tests/RocksDBStreamTests.cpp
+++ b/libminifi/test/rocksdb-tests/RocksDBStreamTests.cpp
@@ -45,7 +45,9 @@ TEST_CASE_METHOD(RocksDBStreamTest, "Verify simple 
operation") {
   std::string content = "banana";
   minifi::io::RocksDbStream outStream("one", gsl::make_not_null(db.get()), 
true);
   outStream.write(content);
-  REQUIRE(outStream.write(content) > 0);
+  const auto second_write_result = outStream.write(content);
+  REQUIRE(second_write_result > 0);
+  REQUIRE_FALSE(minifi::io::isError(second_write_result));
   minifi::io::RocksDbStream inStream("one", gsl::make_not_null(db.get()));
   std::string str;
   inStream.read(str);
@@ -59,12 +61,14 @@ TEST_CASE_METHOD(RocksDBStreamTest, "Write zero bytes") {
 
   minifi::io::RocksDbStream readonlyStream("two", 
gsl::make_not_null(db.get()), false);
 
-  REQUIRE(readonlyStream.write(nullptr, 0) == -1);
+  REQUIRE(minifi::io::isError(readonlyStream.write(nullptr, 0)));
 }
 
 TEST_CASE_METHOD(RocksDBStreamTest, "Read zero bytes") {
   minifi::io::RocksDbStream one("one", gsl::make_not_null(db.get()), true);
-  REQUIRE(one.write("banana") > 0);
+  const auto banana_write_result = one.write("banana");
+  REQUIRE_FALSE(minifi::io::isError(banana_write_result));
+  REQUIRE(banana_write_result > 0);
 
   minifi::io::RocksDbStream stream("one", gsl::make_not_null(db.get()));
 
diff --git a/libminifi/test/sql-tests/SQLTestPlan.h 
b/libminifi/test/sql-tests/SQLTestPlan.h
index f924d87..c69d847 100644
--- a/libminifi/test/sql-tests/SQLTestPlan.h
+++ b/libminifi/test/sql-tests/SQLTestPlan.h
@@ -55,7 +55,7 @@ class SQLTestPlan {
     if (content) {
       auto claim = 
std::make_shared<minifi::ResourceClaim>(plan_->getContentRepo());
       auto content_stream = plan_->getContentRepo()->write(*claim);
-      int ret = 
content_stream->write(reinterpret_cast<uint8_t*>(const_cast<char*>(content->c_str())),
 content->length());
+      const auto ret = content_stream->write(reinterpret_cast<const 
uint8_t*>(content->c_str()), content->length());
       REQUIRE(ret == content->length());
       flow_file->setOffset(0);
       flow_file->setSize(content->length());
diff --git a/libminifi/test/unit/CRCTests.cpp b/libminifi/test/unit/CRCTests.cpp
index 17c62da..57da97a 100644
--- a/libminifi/test/unit/CRCTests.cpp
+++ b/libminifi/test/unit/CRCTests.cpp
@@ -26,7 +26,7 @@
 TEST_CASE("Test CRC1", "[testcrc1]") {
   org::apache::nifi::minifi::io::BufferStream base;
   
org::apache::nifi::minifi::io::CRCStream<org::apache::nifi::minifi::io::BaseStream>
 test(gsl::make_not_null(&base));
-  test.write(reinterpret_cast<uint8_t*>(const_cast<char*>("cow")), 3);
+  test.write(reinterpret_cast<const uint8_t*>("cow"), 3);
   REQUIRE(2580823964 == test.getCRC());
 }
 
@@ -35,7 +35,7 @@ TEST_CASE("Test CRC2", "[testcrc2]") {
   
org::apache::nifi::minifi::io::CRCStream<org::apache::nifi::minifi::io::BaseStream>
 test(gsl::make_not_null(&base));
   std::string fox = "the quick brown fox jumped over the brown fox";
   std::vector<uint8_t> charvect(fox.begin(), fox.end());
-  test.write(charvect, gsl::narrow<int>(charvect.size()));
+  test.write(charvect, charvect.size());
   REQUIRE(1922388889 == test.getCRC());
 }
 
@@ -74,8 +74,8 @@ TEST_CASE("CRCStream with initial crc = 0 is the same as 
without initial crc", "
   std::vector<uint8_t> textVector1(textString.begin(), textString.end());
   std::vector<uint8_t> textVector2(textString.begin(), textString.end());
 
-  test_noinit.write(textVector1, gsl::narrow<int>(textVector1.size()));
-  test_initzero.write(textVector2, gsl::narrow<int>(textVector2.size()));
+  test_noinit.write(textVector1, textVector1.size());
+  test_initzero.write(textVector2, textVector2.size());
   REQUIRE(test_noinit.getCRC() == test_initzero.getCRC());
 }
 
@@ -85,17 +85,17 @@ TEST_CASE("CRCStream: one long write is the same as writing 
in two pieces", "[in
   org::apache::nifi::minifi::io::BufferStream base_full;
   
org::apache::nifi::minifi::io::CRCStream<org::apache::nifi::minifi::io::BaseStream>
 test_full(gsl::make_not_null(&base_full), 0);
   std::vector<uint8_t> textVector_full(textString.begin(), textString.end());
-  test_full.write(textVector_full, gsl::narrow<int>(textVector_full.size()));
+  test_full.write(textVector_full, textVector_full.size());
 
   org::apache::nifi::minifi::io::BufferStream base_piece1;
   
org::apache::nifi::minifi::io::CRCStream<org::apache::nifi::minifi::io::BaseStream>
 test_piece1(gsl::make_not_null(&base_piece1), 0);
   std::vector<uint8_t> textVector_piece1(textString.begin(), 
textString.begin() + 15);
-  test_piece1.write(textVector_piece1, 
gsl::narrow<int>(textVector_piece1.size()));
+  test_piece1.write(textVector_piece1, textVector_piece1.size());
 
   org::apache::nifi::minifi::io::BufferStream base_piece2;
   
org::apache::nifi::minifi::io::CRCStream<org::apache::nifi::minifi::io::BaseStream>
 test_piece2(gsl::make_not_null(&base_piece2), test_piece1.getCRC());
   std::vector<uint8_t> textVector_piece2(textString.begin() + 15, 
textString.end());
-  test_piece2.write(textVector_piece2, 
gsl::narrow<int>(textVector_piece2.size()));
+  test_piece2.write(textVector_piece2, textVector_piece2.size());
 
   REQUIRE(test_full.getCRC() == test_piece2.getCRC());
 }
diff --git a/libminifi/test/unit/FileStreamTests.cpp 
b/libminifi/test/unit/FileStreamTests.cpp
index 57bae57..ed6f10d 100644
--- a/libminifi/test/unit/FileStreamTests.cpp
+++ b/libminifi/test/unit/FileStreamTests.cpp
@@ -53,7 +53,7 @@ TEST_CASE("TestFileOverWrite", "[TestFiles]") {
 
   stream.seek(4);
 
-  stream.write(reinterpret_cast<uint8_t*>(const_cast<char*>("file")), 4);
+  stream.write(reinterpret_cast<const uint8_t*>("file"), 4);
 
   stream.seek(0);
 
@@ -91,7 +91,7 @@ TEST_CASE("TestFileBadArgumentNoChange", "[TestLoader]") {
 
   stream.seek(4);
 
-  stream.write(reinterpret_cast<uint8_t*>(const_cast<char*>("file")), 0);
+  stream.write(reinterpret_cast<const uint8_t*>("file"), 0);
 
   stream.seek(0);
 
@@ -276,7 +276,7 @@ TEST_CASE("Non-existing file read/write test") {
   minifi::io::FileStream stream(utils::file::concat_path(dir, 
"non_existing_file.txt"), 0, true);
   REQUIRE(test_controller.getLog().getInstance().contains("Error opening 
file", std::chrono::seconds(0)));
   REQUIRE(test_controller.getLog().getInstance().contains("No such file or 
directory", std::chrono::seconds(0)));
-  REQUIRE(stream.write("lorem ipsum", false) == -1);
+  REQUIRE(minifi::io::isError(stream.write("lorem ipsum", false)));
   REQUIRE(test_controller.getLog().getInstance().contains("Error writing to 
file: invalid file stream", std::chrono::seconds(0)));
   std::vector<uint8_t> readBuffer;
   stream.seek(0);
@@ -296,7 +296,7 @@ TEST_CASE("Existing file read/write test") {
   }
   minifi::io::FileStream stream(path_to_existing_file, 0, true);
   REQUIRE_FALSE(test_controller.getLog().getInstance().contains("Error opening 
file", std::chrono::seconds(0)));
-  REQUIRE_FALSE(stream.write("dolor sit amet", false) == -1);
+  REQUIRE_FALSE(minifi::io::isError(stream.write("dolor sit amet", false)));
   REQUIRE_FALSE(test_controller.getLog().getInstance().contains("Error writing 
to file", std::chrono::seconds(0)));
   std::vector<uint8_t> readBuffer;
   stream.seek(0);
@@ -341,6 +341,6 @@ TEST_CASE("Readonly filestream write test") {
     outfile.close();
   }
   minifi::io::FileStream stream(path_to_file, 0, false);
-  REQUIRE(stream.write("dolor sit amet", false) == -1);
+  REQUIRE(minifi::io::isError(stream.write("dolor sit amet", false)));
   REQUIRE(test_controller.getLog().getInstance().contains("Error writing to 
file: write call on file stream failed", std::chrono::seconds(0)));
 }
diff --git a/libminifi/test/unit/FlowFileSerializationTests.cpp 
b/libminifi/test/unit/FlowFileSerializationTests.cpp
index fef1327..5b7254b 100644
--- a/libminifi/test/unit/FlowFileSerializationTests.cpp
+++ b/libminifi/test/unit/FlowFileSerializationTests.cpp
@@ -35,7 +35,7 @@ std::shared_ptr<minifi::FlowFileRecord> createEmptyFlowFile() 
{
 TEST_CASE("Payload Serializer", "[testPayload]") {
   std::string content = "flowFileContent";
   auto contentStream = std::make_shared<minifi::io::BufferStream>();
-  contentStream->write(const_cast<uint8_t*>(reinterpret_cast<const 
uint8_t*>(content.data())), gsl::narrow<int>(content.length()));
+  contentStream->write(reinterpret_cast<const uint8_t*>(content.data()), 
content.length());
 
   auto result = std::make_shared<minifi::io::BufferStream>();
 
@@ -57,7 +57,7 @@ TEST_CASE("Payload Serializer", "[testPayload]") {
 TEST_CASE("FFv3 Serializer", "[testFFv3]") {
   std::string content = "flowFileContent";
   auto contentStream = std::make_shared<minifi::io::BufferStream>();
-  contentStream->write(const_cast<uint8_t*>(reinterpret_cast<const 
uint8_t*>(content.data())), gsl::narrow<int>(content.length()));
+  contentStream->write(reinterpret_cast<const uint8_t*>(content.data()), 
content.length());
 
   auto result = std::make_shared<minifi::io::BufferStream>();
 
diff --git a/libminifi/test/unit/SiteToSiteHelper.h 
b/libminifi/test/unit/SiteToSiteHelper.h
index 6e913b9..05c7938 100644
--- a/libminifi/test/unit/SiteToSiteHelper.h
+++ b/libminifi/test/unit/SiteToSiteHelper.h
@@ -40,10 +40,10 @@ class SiteToSiteResponder : public minifi::io::BaseStream {
   }
 
   void push_response(const std::string& resp) {
-    server_responses_.write(reinterpret_cast<const uint8_t*>(resp.data()), 
gsl::narrow<int>(resp.length()));
+    server_responses_.write(reinterpret_cast<const uint8_t*>(resp.data()), 
resp.length());
   }
 
-  int write(const uint8_t *value, int size) override {
+  size_t write(const uint8_t *value, size_t size) override {
     client_responses_.push(std::string(reinterpret_cast<const char*>(value), 
size));
     return size;
   }
diff --git a/libminifi/test/unit/SocketTests.cpp 
b/libminifi/test/unit/SocketTests.cpp
index f4f3227..85970c9 100644
--- a/libminifi/test/unit/SocketTests.cpp
+++ b/libminifi/test/unit/SocketTests.cpp
@@ -27,17 +27,20 @@
 #include "io/StreamFactory.h"
 #include "io/Sockets.h"
 #include "utils/ThreadPool.h"
-using Sockets = org::apache::nifi::minifi::io::Socket;
+
+namespace minifi = org::apache::nifi::minifi;
+namespace io = minifi::io;
+using io::Socket;
 
 TEST_CASE("TestSocket", "[TestSocket1]") {
-  org::apache::nifi::minifi::io::Socket 
socket(std::make_shared<org::apache::nifi::minifi::io::SocketContext>(std::make_shared<minifi::Configure>()),
 Sockets::getMyHostName(), 8183);
+  Socket 
socket(std::make_shared<io::SocketContext>(std::make_shared<minifi::Configure>()),
 Socket::getMyHostName(), 8183);
   REQUIRE(-1 == socket.initialize());
-  REQUIRE(socket.getHostname().rfind(Sockets::getMyHostName(), 0) == 0);
+  REQUIRE(socket.getHostname().rfind(Socket::getMyHostName(), 0) == 0);
   socket.close();
 }
 
 TEST_CASE("TestSocketWriteTest1", "[TestSocket2]") {
-  org::apache::nifi::minifi::io::Socket 
socket(std::make_shared<org::apache::nifi::minifi::io::SocketContext>(std::make_shared<minifi::Configure>()),
 Sockets::getMyHostName(), 8183);
+  Socket 
socket(std::make_shared<io::SocketContext>(std::make_shared<minifi::Configure>()),
 Socket::getMyHostName(), 8183);
   REQUIRE(-1 == socket.initialize());
 
   socket.write((const uint8_t*)nullptr, 0);
@@ -45,7 +48,7 @@ TEST_CASE("TestSocketWriteTest1", "[TestSocket2]") {
   std::vector<uint8_t> buffer;
   buffer.push_back('a');
 
-  REQUIRE(-1 == socket.write(buffer, 1));
+  REQUIRE(io::isError(socket.write(buffer, 1)));
 
   socket.close();
 }
@@ -53,12 +56,12 @@ TEST_CASE("TestSocketWriteTest1", "[TestSocket2]") {
 TEST_CASE("TestSocketWriteTest2", "[TestSocket3]") {
   std::vector<uint8_t> buffer;
   buffer.push_back('a');
-  std::shared_ptr<org::apache::nifi::minifi::io::SocketContext> socket_context 
= 
std::make_shared<org::apache::nifi::minifi::io::SocketContext>(std::make_shared<minifi::Configure>());
-  org::apache::nifi::minifi::io::ServerSocket server(socket_context, 
Sockets::getMyHostName(), 9183, 1);
+  std::shared_ptr<io::SocketContext> socket_context = 
std::make_shared<io::SocketContext>(std::make_shared<minifi::Configure>());
+  io::ServerSocket server(socket_context, Socket::getMyHostName(), 9183, 1);
 
   REQUIRE(-1 != server.initialize());
 
-  org::apache::nifi::minifi::io::Socket client(socket_context, 
Sockets::getMyHostName(), 9183);
+  Socket client(socket_context, Socket::getMyHostName(), 9183);
 
   REQUIRE(-1 != client.initialize());
 
@@ -77,19 +80,19 @@ TEST_CASE("TestSocketWriteTest2", "[TestSocket3]") {
 }
 
 TEST_CASE("TestGetHostName", "[TestSocket4]") {
-  REQUIRE(Sockets::getMyHostName().length() > 0);
+  REQUIRE(Socket::getMyHostName().length() > 0);
 }
 
 TEST_CASE("TestWriteEndian64", "[TestSocket5]") {
   std::vector<uint8_t> buffer;
   buffer.push_back('a');
-  std::shared_ptr<org::apache::nifi::minifi::io::SocketContext> socket_context 
= 
std::make_shared<org::apache::nifi::minifi::io::SocketContext>(std::make_shared<minifi::Configure>());
+  std::shared_ptr<io::SocketContext> socket_context = 
std::make_shared<io::SocketContext>(std::make_shared<minifi::Configure>());
 
-  org::apache::nifi::minifi::io::ServerSocket server(socket_context, 
Sockets::getMyHostName(), 9183, 1);
+  io::ServerSocket server(socket_context, Socket::getMyHostName(), 9183, 1);
 
   REQUIRE(-1 != server.initialize());
 
-  org::apache::nifi::minifi::io::Socket client(socket_context, 
Sockets::getMyHostName(), 9183);
+  Socket client(socket_context, Socket::getMyHostName(), 9183);
 
   REQUIRE(-1 != client.initialize());
 
@@ -110,12 +113,12 @@ TEST_CASE("TestWriteEndian32", "[TestSocket6]") {
   std::vector<uint8_t> buffer;
   buffer.push_back('a');
 
-  std::shared_ptr<org::apache::nifi::minifi::io::SocketContext> socket_context 
= 
std::make_shared<org::apache::nifi::minifi::io::SocketContext>(std::make_shared<minifi::Configure>());
+  std::shared_ptr<io::SocketContext> socket_context = 
std::make_shared<io::SocketContext>(std::make_shared<minifi::Configure>());
 
-  org::apache::nifi::minifi::io::ServerSocket server(socket_context, 
Sockets::getMyHostName(), 9183, 1);
+  io::ServerSocket server(socket_context, Socket::getMyHostName(), 9183, 1);
   REQUIRE(-1 != server.initialize());
 
-  org::apache::nifi::minifi::io::Socket client(socket_context, 
Sockets::getMyHostName(), 9183);
+  Socket client(socket_context, Socket::getMyHostName(), 9183);
 
   REQUIRE(-1 != client.initialize());
   {
@@ -145,13 +148,13 @@ TEST_CASE("TestSocketWriteTestAfterClose", 
"[TestSocket7]") {
   std::vector<uint8_t> buffer;
   buffer.push_back('a');
 
-  std::shared_ptr<org::apache::nifi::minifi::io::SocketContext> socket_context 
= 
std::make_shared<org::apache::nifi::minifi::io::SocketContext>(std::make_shared<minifi::Configure>());
+  std::shared_ptr<io::SocketContext> socket_context = 
std::make_shared<io::SocketContext>(std::make_shared<minifi::Configure>());
 
-  org::apache::nifi::minifi::io::ServerSocket server(socket_context, 
Sockets::getMyHostName(), 9183, 1);
+  io::ServerSocket server(socket_context, Socket::getMyHostName(), 9183, 1);
 
   REQUIRE(-1 != server.initialize());
 
-  org::apache::nifi::minifi::io::Socket client(socket_context, 
Sockets::getMyHostName(), 9183);
+  Socket client(socket_context, Socket::getMyHostName(), 9183);
 
   REQUIRE(-1 != client.initialize());
 
@@ -166,7 +169,7 @@ TEST_CASE("TestSocketWriteTestAfterClose", "[TestSocket7]") 
{
 
   client.close();
 
-  REQUIRE(-1 == client.write(buffer, 1));
+  REQUIRE(io::isError(client.write(buffer, 1)));
 
   server.close();
 }
@@ -182,7 +185,7 @@ bool createSocket() {
   std::this_thread::sleep_for(std::chrono::milliseconds { distribution(seed) 
});
 
   for (int i = 0; i < 50; i++) {
-    std::shared_ptr<org::apache::nifi::minifi::io::TLSContext> socketA = 
std::make_shared<org::apache::nifi::minifi::io::TLSContext>(configuration);
+    std::shared_ptr<io::TLSContext> socketA = 
std::make_shared<io::TLSContext>(configuration);
     socketA->initialize();
   }
 
@@ -219,10 +222,10 @@ TEST_CASE("TestTLSContextCreation", "[TestSocket8]") {
 TEST_CASE("TestTLSContextCreation2", "[TestSocket9]") {
   std::shared_ptr<minifi::Configure> configure = 
std::make_shared<minifi::Configure>();
   configure->set("nifi.remote.input.secure", "false");
-  auto factory = minifi::io::StreamFactory::getInstance(configure);
-  std::string host = Sockets::getMyHostName();
-  minifi::io::Socket *socket = factory->createSocket(host, 10001).release();
-  minifi::io::TLSSocket *tls = dynamic_cast<minifi::io::TLSSocket*>(socket);
+  auto factory = io::StreamFactory::getInstance(configure);
+  std::string host = Socket::getMyHostName();
+  Socket *socket = factory->createSocket(host, 10001).release();
+  io::TLSSocket *tls = dynamic_cast<io::TLSSocket*>(socket);
   REQUIRE(tls == nullptr);
 }
 
@@ -233,10 +236,10 @@ TEST_CASE("TestTLSContextCreation2", "[TestSocket9]") {
 TEST_CASE("TestTLSContextCreationNullptr", "[TestSocket10]") {
   std::shared_ptr<minifi::Configure> configure = 
std::make_shared<minifi::Configure>();
   configure->set("nifi.remote.input.secure", "false");
-  auto factory = minifi::io::StreamFactory::getInstance(configure);
-  std::string host = Sockets::getMyHostName();
-  minifi::io::Socket *socket = factory->createSecureSocket(host, 10001, 
nullptr).release();
-  minifi::io::TLSSocket *tls = dynamic_cast<minifi::io::TLSSocket*>(socket);
+  auto factory = io::StreamFactory::getInstance(configure);
+  std::string host = Socket::getMyHostName();
+  io::Socket *socket = factory->createSecureSocket(host, 10001, 
nullptr).release();
+  io::TLSSocket *tls = dynamic_cast<minifi::io::TLSSocket*>(socket);
   REQUIRE(tls == nullptr);
 }
 #endif  // OPENSSL_SUPPORT
diff --git a/libminifi/test/unit/ZlibStreamTests.cpp 
b/libminifi/test/unit/ZlibStreamTests.cpp
index 3418448..dbf3b31 100644
--- a/libminifi/test/unit/ZlibStreamTests.cpp
+++ b/libminifi/test/unit/ZlibStreamTests.cpp
@@ -35,17 +35,17 @@ TEST_CASE("gzip compression and decompression", "[basic]") {
   SECTION("Empty") {
   }
   SECTION("Simple content in one write") {
-    int length = gsl::narrow<int>(strlen("foobar"));
-    REQUIRE(length == 
compressStream.write(reinterpret_cast<uint8_t*>(const_cast<char*>("foobar")), 
length));
+    const auto length = strlen("foobar");
+    REQUIRE(length == compressStream.write(reinterpret_cast<const 
uint8_t*>("foobar"), length));
     original += "foobar";
   }
   SECTION("Simple content in two writes") {
-    int foo_length = gsl::narrow<int>(strlen("foo"));
-    REQUIRE(foo_length == 
compressStream.write(reinterpret_cast<uint8_t*>(const_cast<char*>("foo")), 
foo_length));
+    const auto foo_length = strlen("foo");
+    REQUIRE(foo_length == compressStream.write(reinterpret_cast<const 
uint8_t*>("foo"), foo_length));
     original += "foo";
 
-    int bar_length = gsl::narrow<int>(strlen("bar"));
-    REQUIRE(bar_length == 
compressStream.write(reinterpret_cast<uint8_t*>(const_cast<char*>("bar")), 
bar_length));
+    const auto bar_length = strlen("bar");
+    REQUIRE(bar_length == compressStream.write(reinterpret_cast<const 
uint8_t*>("bar"), bar_length));
     original += "bar";
   }
   SECTION("Large data") {
@@ -55,7 +55,7 @@ TEST_CASE("gzip compression and decompression", "[basic]") {
     for (size_t i = 0U; i < 1024U; i++) {
       std::generate(buf.begin(), buf.end(), [&](){return dist(gen);});
       original += std::string(reinterpret_cast<const char*>(buf.data()), 
buf.size());
-      REQUIRE(-1 != compressStream.write(buf.data(), 
gsl::narrow<int>(buf.size())));
+      REQUIRE_FALSE(io::isError(compressStream.write(buf.data(), buf.size())));
     }
   }
 
@@ -71,7 +71,7 @@ TEST_CASE("gzip compression and decompression", "[basic]") {
   io::BufferStream decompressBuffer;
   io::ZlibDecompressStream 
decompressStream(gsl::make_not_null(&decompressBuffer));
 
-  decompressStream.write(const_cast<uint8_t*>(compressBuffer.getBuffer()), 
gsl::narrow<int>(compressBuffer.size()));
+  decompressStream.write(compressBuffer.getBuffer(), compressBuffer.size());
 
   REQUIRE(decompressStream.isFinished());
   REQUIRE(original == std::string(reinterpret_cast<const 
char*>(decompressBuffer.getBuffer()), decompressBuffer.size()));
@@ -86,17 +86,17 @@ TEST_CASE("gzip compression and decompression pipeline", 
"[basic]") {
   SECTION("Empty") {
   }
   SECTION("Simple content in one write") {
-    int foobar_length = gsl::narrow<int>(strlen("foobar"));
-    REQUIRE(foobar_length == 
compressStream.write(reinterpret_cast<uint8_t*>(const_cast<char*>("foobar")), 
foobar_length));
+    const auto foobar_length = strlen("foobar");
+    REQUIRE(foobar_length == compressStream.write(reinterpret_cast<const 
uint8_t*>("foobar"), foobar_length));
     original += "foobar";
   }
   SECTION("Simple content in two writes") {
-    int foo_length = gsl::narrow<int>(strlen("foo"));
-    REQUIRE(foo_length == 
compressStream.write(reinterpret_cast<uint8_t*>(const_cast<char*>("foo")), 
foo_length));
+    const auto foo_length = strlen("foo");
+    REQUIRE(foo_length == compressStream.write(reinterpret_cast<const 
uint8_t*>("foo"), foo_length));
     original += "foo";
 
-    int bar_length = gsl::narrow<int>(strlen("bar"));
-    REQUIRE(bar_length == 
compressStream.write(reinterpret_cast<uint8_t*>(const_cast<char*>("bar")), 
bar_length));
+    const auto bar_length = strlen("bar");
+    REQUIRE(bar_length == compressStream.write(reinterpret_cast<const 
uint8_t*>("bar"), bar_length));
     original += "bar";
   }
   SECTION("Large data") {
@@ -106,8 +106,7 @@ TEST_CASE("gzip compression and decompression pipeline", 
"[basic]") {
     for (size_t i = 0U; i < 1024U; i++) {
       std::generate(buf.begin(), buf.end(), [&](){return dist(gen);});
       original += std::string(reinterpret_cast<const char*>(buf.data()), 
buf.size());
-      int buffer_size = gsl::narrow<int>(buf.size());
-      REQUIRE(buffer_size == compressStream.write(buf.data(), buffer_size));
+      REQUIRE(buf.size() == compressStream.write(buf.data(), buf.size()));
     }
   }
 
diff --git a/nanofi/src/api/nanofi.cpp b/nanofi/src/api/nanofi.cpp
index d3808a1..0b724d2 100644
--- a/nanofi/src/api/nanofi.cpp
+++ b/nanofi/src/api/nanofi.cpp
@@ -510,7 +510,7 @@ int transmit_flowfile(flow_file_record *ff, nifi_instance 
*instance) {
     } else {
       file_buffer fb = file_to_buffer(ff->contentLocation);
       stream = std::make_shared<minifi::io::BufferStream>();
-      stream->write(fb.buffer, gsl::narrow<int>(fb.file_len));
+      stream->write(fb.buffer, fb.file_len);
       free(fb.buffer);
     }
   } else {
@@ -718,7 +718,7 @@ flow_file_record *invoke_ff(standalone_processor* proc, 
const flow_file_record *
     } else {
       ff_data->content_stream = std::make_shared<minifi::io::BufferStream>();
       file_buffer fb = file_to_buffer(input_ff->contentLocation);
-      ff_data->content_stream->write(fb.buffer, gsl::narrow<int>(fb.file_len));
+      ff_data->content_stream->write(fb.buffer, fb.file_len);
       free(fb.buffer);
     }
 
@@ -745,7 +745,7 @@ flow_file_record *invoke_chunk(standalone_processor* proc, 
uint8_t* buf, uint64_
 
   auto ff_data = std::make_shared<flowfile_input_params>();
   ff_data->content_stream = std::make_shared<minifi::io::BufferStream>();
-  ff_data->content_stream->write(buf, gsl::narrow<int>(size));
+  ff_data->content_stream->write(buf, size);
 
   plan->runNextProcessor(nullptr, ff_data);
   while (plan->runNextProcessor()) {

Reply via email to