This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch MINIFICPP-1507
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit cf5c879d8326b8721e358873f51260f476a093e7
Author: Marton Szasz <[email protected]>
AuthorDate: Wed Mar 17 15:27:44 2021 +0100

    introduce isError, fix a few issues
---
 extensions/aws/processors/PutS3Object.h            |  2 +-
 extensions/libarchive/CompressContent.h            |  6 ++--
 extensions/libarchive/FocusArchiveEntry.cpp        |  2 +-
 extensions/librdkafka/PublishKafka.cpp             |  2 +-
 extensions/opc/src/putopc.cpp                      |  2 +-
 extensions/script/lua/LuaBaseStream.cpp            | 13 +++-----
 extensions/sftp/client/SFTPClient.cpp              |  2 +-
 .../processors/ExecuteProcess.cpp                  | 16 +++++-----
 .../standard-processors/processors/ExtractText.cpp |  2 +-
 .../standard-processors/processors/GetTCP.cpp      | 12 ++++----
 .../standard-processors/processors/PutFile.cpp     |  2 +-
 extensions/tensorflow/TFApplyGraph.cpp             | 20 +++++-------
 extensions/tensorflow/TFConvertImageToTensor.cpp   |  6 +---
 extensions/tensorflow/TFExtractTopLabels.cpp       | 18 +++++------
 libminifi/include/io/CRCStream.h                   |  2 +-
 libminifi/include/io/InputStream.h                 | 24 +++++++--------
 libminifi/include/io/Stream.h                      |  9 ++++++
 libminifi/include/io/StreamPipe.h                  |  2 +-
 libminifi/include/provenance/Provenance.h          |  2 +-
 libminifi/include/sitetosite/SiteToSiteClient.h    |  2 +-
 libminifi/src/FlowControlProtocol.cpp              | 22 ++++++-------
 libminifi/src/FlowFileRecord.cpp                   | 10 +++---
 libminifi/src/c2/ControllerSocketProtocol.cpp      | 32 ++++++++++---------
 libminifi/src/core/ProcessSessionReadCallback.cpp  |  2 +-
 libminifi/src/io/InputStream.cpp                   |  4 +--
 libminifi/src/provenance/Provenance.cpp            | 28 ++++++++---------
 libminifi/src/sitetosite/RawSocketProtocol.cpp     | 20 ++++++------
 libminifi/src/sitetosite/SiteToSiteClient.cpp      | 36 +++++++++++++---------
 libminifi/test/BufferReader.h                      |  2 +-
 .../test/archive-tests/CompressContentTests.cpp    |  2 +-
 libminifi/test/archive-tests/MergeFileTests.cpp    |  5 +--
 .../test/rocksdb-tests/ContentSessionTests.cpp     |  2 +-
 .../rocksdb-tests/DBContentRepositoryTests.cpp     |  6 ++--
 .../test/rocksdb-tests/RocksDBStreamTests.cpp      |  2 +-
 libminifi/test/unit/FileStreamTests.cpp            |  8 ++---
 nanofi/tests/CSite2SiteTests.cpp                   |  2 +-
 thirdparty/google-styleguide/run_linter.sh         |  9 +++---
 37 files changed, 171 insertions(+), 167 deletions(-)

diff --git a/extensions/aws/processors/PutS3Object.h 
b/extensions/aws/processors/PutS3Object.h
index e55ac1e..6cb8618 100644
--- a/extensions/aws/processors/PutS3Object.h
+++ b/extensions/aws/processors/PutS3Object.h
@@ -97,7 +97,7 @@ class PutS3Object : public S3Processor {
       while (read_size_ < flow_size_) {
         const auto next_read_size = (std::min)(flow_size_ - read_size_, 
BUFFER_SIZE);
         const auto read_ret = stream->read(buffer.data(), next_read_size);
-        if (read_ret == static_cast<size_t>(-1)) {
+        if (io::isError(read_ret)) {
           return -1;
         }
         if (read_ret > 0) {
diff --git a/extensions/libarchive/CompressContent.h 
b/extensions/libarchive/CompressContent.h
index df7fc75..f5e44fa 100644
--- a/extensions/libarchive/CompressContent.h
+++ b/extensions/libarchive/CompressContent.h
@@ -109,7 +109,7 @@ public:
       }
       while (read_size < flow_->getSize()) {
         const auto readret = stream->read(buffer, sizeof(buffer));
-        if (readret == static_cast<size_t>(-1)) {
+        if (io::isError(readret)) {
           status_ = -1;
           return -1;
         }
@@ -145,7 +145,7 @@ public:
       stream->seek(offset_);
       const auto readRet = stream->read(buffer_, sizeof(buffer_));
       read_size_ = readRet;
-      if (readRet != static_cast<size_t>(-1)) {
+      if (!io::isError(readRet)) {
         offset_ += readRet;
       }
       return gsl::narrow<int64_t>(readRet);
@@ -383,7 +383,7 @@ public:
           int64_t read_size = 0;
           while (read_size < gsl::narrow<int64_t>(writer_.flow_->getSize())) {
             const auto ret = inputStream->read(buffer.data(), buffer.size());
-            if (ret == static_cast<size_t>(-1)) {
+            if (io::isError(ret)) {
               return -1;
             } else if (ret == 0) {
               break;
diff --git a/extensions/libarchive/FocusArchiveEntry.cpp 
b/extensions/libarchive/FocusArchiveEntry.cpp
index 9452dce..ea709bf 100644
--- a/extensions/libarchive/FocusArchiveEntry.cpp
+++ b/extensions/libarchive/FocusArchiveEntry.cpp
@@ -157,7 +157,7 @@ la_ssize_t FocusArchiveEntry::ReadCallback::read_cb(struct 
archive * a, void *d,
   do {
     last_read = data->stream->read(reinterpret_cast<uint8_t *>(data->buf), 
8196 - read);
     read += last_read;
-  } while (data->processor->isRunning() && last_read > 0 && last_read != 
static_cast<size_t>(-1) && read < 8196);
+  } while (data->processor->isRunning() && last_read > 0 && 
!io::isError(last_read) && read < 8196);
 
   if (!data->processor->isRunning()) {
     archive_set_error(a, EINTR, "Processor shut down during read");
diff --git a/extensions/librdkafka/PublishKafka.cpp 
b/extensions/librdkafka/PublishKafka.cpp
index bbb66a8..dd7bac8 100644
--- a/extensions/librdkafka/PublishKafka.cpp
+++ b/extensions/librdkafka/PublishKafka.cpp
@@ -410,7 +410,7 @@ class ReadCallback : public InputStreamCallback {
 
     for (size_t segment_num = 0; read_size_ < flow_size_; ++segment_num) {
       const auto readRet = stream->read(buffer.data(), buffer.size());
-      if (readRet == static_cast<size_t>(-1)) {
+      if (io::isError(readRet)) {
         status_ = -1;
         error_ = "Failed to read from stream";
         return read_size_;
diff --git a/extensions/opc/src/putopc.cpp b/extensions/opc/src/putopc.cpp
index e613d1b..daf0242 100644
--- a/extensions/opc/src/putopc.cpp
+++ b/extensions/opc/src/putopc.cpp
@@ -429,7 +429,7 @@ namespace processors {
 
     do {
       const auto read = stream->read(buf_.data() + size, 1024);
-      if (read == static_cast<size_t>(-1)) return -1;
+      if (io::isError(read)) return -1;
       if (read == 0) break;
       size += read;
     } while (size < stream->size());
diff --git a/extensions/script/lua/LuaBaseStream.cpp 
b/extensions/script/lua/LuaBaseStream.cpp
index 1387fae..883a13c 100644
--- a/extensions/script/lua/LuaBaseStream.cpp
+++ b/extensions/script/lua/LuaBaseStream.cpp
@@ -52,16 +52,11 @@ std::string LuaBaseStream::read(size_t len) {
   //     0 <= n < s.size()."
   //
   // http://www.open-std.org/jtc1/sc22/wg21/docs/papers/2012/n3337.pdf
-  auto read = stream_->read(reinterpret_cast<uint8_t *>(&buffer[0]), 
static_cast<int>(len));
-  if (read < 0) {
-    return nullptr;
-  }
-
-  if (gsl::narrow<size_t>(read) != len) {
-    buffer.resize(gsl::narrow<size_t>(read));
+  const auto read = stream_->read(reinterpret_cast<uint8_t *>(&buffer[0]), 
len);
+  if (!io::isError(read) && read != len) {
+    buffer.resize(read);
   }
-
-  return buffer;
+  return io::isError(read) ? std::string{} : buffer;
 }
 
 size_t LuaBaseStream::write(std::string buf) {
diff --git a/extensions/sftp/client/SFTPClient.cpp 
b/extensions/sftp/client/SFTPClient.cpp
index 1e76cb8..7af066d 100644
--- a/extensions/sftp/client/SFTPClient.cpp
+++ b/extensions/sftp/client/SFTPClient.cpp
@@ -567,7 +567,7 @@ bool SFTPClient::putFile(const std::string& path, 
io::BaseStream& input, bool ov
   uint64_t total_read = 0U;
   do {
     const auto read_ret = input.read(buf.data(), buf.size());
-    if (read_ret == static_cast<size_t>(-1)) {
+    if (io::isError(read_ret)) {
       last_error_.setLibssh2Error(LIBSSH2_FX_OK);
       logger_->log_error("Error while reading input");
       return false;
diff --git a/extensions/standard-processors/processors/ExecuteProcess.cpp 
b/extensions/standard-processors/processors/ExecuteProcess.cpp
index 1dac5c6..cc9212d 100644
--- a/extensions/standard-processors/processors/ExecuteProcess.cpp
+++ b/extensions/standard-processors/processors/ExecuteProcess.cpp
@@ -160,11 +160,11 @@ void ExecuteProcess::onTrigger(core::ProcessContext 
*context, core::ProcessSessi
           while (1) {
             
std::this_thread::sleep_for(std::chrono::milliseconds(_batchDuration));
             char buffer[4096];
-            int numRead = read(_pipefd[0], buffer, sizeof(buffer));
+            const auto  numRead = read(_pipefd[0], buffer, sizeof(buffer));
             if (numRead <= 0)
               break;
-            logger_->log_debug("Execute Command Respond %d", numRead);
-            ExecuteProcess::WriteCallback callback(buffer, numRead);
+            logger_->log_debug("Execute Command Respond %zd", numRead);
+            ExecuteProcess::WriteCallback callback(buffer, 
gsl::narrow<uint64_t>(numRead));
             auto flowFile = session->create();
             if (!flowFile)
               continue;
@@ -177,13 +177,13 @@ void ExecuteProcess::onTrigger(core::ProcessContext 
*context, core::ProcessSessi
         } else {
           char buffer[4096];
           char *bufPtr = buffer;
-          int totalRead = 0;
+          size_t totalRead = 0;
           std::shared_ptr<core::FlowFile> flowFile = nullptr;
           while (true) {
-            int numRead = read(_pipefd[0], bufPtr, (sizeof(buffer) - 
totalRead));
+            const auto numRead = read(_pipefd[0], bufPtr, (sizeof(buffer) - 
totalRead));
             if (numRead <= 0) {
               if (totalRead > 0) {
-                logger_->log_debug("Execute Command Respond %d", totalRead);
+                logger_->log_debug("Execute Command Respond %zu", totalRead);
                 // child exits and close the pipe
                 ExecuteProcess::WriteCallback callback(buffer, totalRead);
                 if (!flowFile) {
@@ -200,9 +200,9 @@ void ExecuteProcess::onTrigger(core::ProcessContext 
*context, core::ProcessSessi
               }
               break;
             } else {
-              if (numRead == static_cast<int>((sizeof(buffer) - totalRead))) {
+              if (numRead == static_cast<ssize_t>((sizeof(buffer) - 
totalRead))) {
                 // we reach the max buffer size
-                logger_->log_debug("Execute Command Max Respond %d", 
sizeof(buffer));
+                logger_->log_debug("Execute Command Max Respond %zu", 
sizeof(buffer));
                 ExecuteProcess::WriteCallback callback(buffer, sizeof(buffer));
                 if (!flowFile) {
                   flowFile = session->create();
diff --git a/extensions/standard-processors/processors/ExtractText.cpp 
b/extensions/standard-processors/processors/ExtractText.cpp
index 0000b84..9774cf8 100644
--- a/extensions/standard-processors/processors/ExtractText.cpp
+++ b/extensions/standard-processors/processors/ExtractText.cpp
@@ -134,7 +134,7 @@ int64_t ExtractText::ReadCallback::process(const 
std::shared_ptr<io::BaseStream>
     const auto length = std::min(size_limit - read_size, buffer_.size());
     const auto ret = stream->read(buffer_, length);
 
-    if (ret == static_cast<size_t>(-1)) {
+    if (io::isError(ret)) {
       return -1;  // Stream error
     } else if (ret == 0) {
       break;  // End of stream, no more data
diff --git a/extensions/standard-processors/processors/GetTCP.cpp 
b/extensions/standard-processors/processors/GetTCP.cpp
index e55967f..cbbe535 100644
--- a/extensions/standard-processors/processors/GetTCP.cpp
+++ b/extensions/standard-processors/processors/GetTCP.cpp
@@ -167,12 +167,12 @@ void GetTCP::onSchedule(const 
std::shared_ptr<core::ProcessContext> &context, co
       do {
         if ( socket_ring_buffer_.try_dequeue(socket_ptr) ) {
           buffer.resize(receive_buffer_size_);
-          int size_read = socket_ptr->read(buffer.data(), 
gsl::narrow<int>(receive_buffer_size_), false);
-          if (size_read >= 0) {
-            if (size_read > 0) {
+          const auto size_read = socket_ptr->read(buffer.data(), 
receive_buffer_size_, false);
+          if (size_read >= 0 && !io::isError(size_read)) {
+            if (size_read > 0 && !io::isError(size_read)) {
               // determine cut location
-              int startLoc = 0, i = 0;
-              for (; i < size_read; i++) {
+              size_t startLoc = 0;
+              for (size_t i = 0; i < size_read; i++) {
                 if (buffer.at(i) == endOfMessageByte && i > 0) {
                   if (i-startLoc > 0) {
                     handler_->handle(socket_ptr->getHostname(), 
buffer.data()+startLoc, (i-startLoc), true);
@@ -194,7 +194,7 @@ void GetTCP::onSchedule(const 
std::shared_ptr<core::ProcessContext> &context, co
               reconnects = 0;
             }
             socket_ring_buffer_.enqueue(std::move(socket_ptr));
-          } else if (size_read == -2 && stay_connected_) {
+          } else if (size_read == static_cast<size_t>(-2) && stay_connected_) {
             if (++reconnects > connection_attempt_limit_) {
               logger_->log_info("Too many reconnects, exiting thread");
               socket_ptr->close();
diff --git a/extensions/standard-processors/processors/PutFile.cpp 
b/extensions/standard-processors/processors/PutFile.cpp
index 2060867..db60c89 100644
--- a/extensions/standard-processors/processors/PutFile.cpp
+++ b/extensions/standard-processors/processors/PutFile.cpp
@@ -322,7 +322,7 @@ int64_t PutFile::ReadCallback::process(const 
std::shared_ptr<io::BaseStream>& st
 
   do {
     const auto read = stream->read(buffer, 1024);
-    if (read == static_cast<size_t>(-1)) return -1;
+    if (io::isError(read)) return -1;
     if (read == 0) break;
     tmp_file_os.write(reinterpret_cast<char *>(buffer), 
gsl::narrow<std::streamsize>(read));
     size += read;
diff --git a/extensions/tensorflow/TFApplyGraph.cpp 
b/extensions/tensorflow/TFApplyGraph.cpp
index 4419caa..14d349e 100644
--- a/extensions/tensorflow/TFApplyGraph.cpp
+++ b/extensions/tensorflow/TFApplyGraph.cpp
@@ -16,10 +16,12 @@
  */
 
 #include "TFApplyGraph.h"
-#include <core/ProcessContext.h>
-#include <core/ProcessSession.h>
 #include <tensorflow/cc/ops/standard_ops.h>
 
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "utils/gsl.h"
+
 namespace org {
 namespace apache {
 namespace nifi {
@@ -191,29 +193,23 @@ void TFApplyGraph::onTrigger(const 
std::shared_ptr<core::ProcessContext> &contex
 int64_t TFApplyGraph::GraphReadCallback::process(const 
std::shared_ptr<io::BaseStream>& stream) {
   std::string graph_proto_buf;
   graph_proto_buf.resize(stream->size());
-  auto num_read = stream->read(reinterpret_cast<uint8_t 
*>(&graph_proto_buf[0]),
-                                   static_cast<int>(stream->size()));
-
+  const auto num_read = stream->read(reinterpret_cast<uint8_t 
*>(&graph_proto_buf[0]), stream->size());
   if (num_read != stream->size()) {
     throw std::runtime_error("GraphReadCallback failed to fully read flow file 
input stream");
   }
-
   graph_def_->ParseFromString(graph_proto_buf);
-  return num_read;
+  return gsl::narrow<int64_t>(num_read);
 }
 
 int64_t TFApplyGraph::TensorReadCallback::process(const 
std::shared_ptr<io::BaseStream>& stream) {
   std::string tensor_proto_buf;
   tensor_proto_buf.resize(stream->size());
-  auto num_read = stream->read(reinterpret_cast<uint8_t 
*>(&tensor_proto_buf[0]),
-                                   static_cast<int>(stream->size()));
-
+  const auto num_read = stream->read(reinterpret_cast<uint8_t 
*>(&tensor_proto_buf[0]), stream->size());
   if (num_read != stream->size()) {
     throw std::runtime_error("TensorReadCallback failed to fully read flow 
file input stream");
   }
-
   tensor_proto_->ParseFromString(tensor_proto_buf);
-  return num_read;
+  return gsl::narrow<int64_t>(num_read);
 }
 
 int64_t TFApplyGraph::TensorWriteCallback::process(const 
std::shared_ptr<io::BaseStream>& stream) {
diff --git a/extensions/tensorflow/TFConvertImageToTensor.cpp 
b/extensions/tensorflow/TFConvertImageToTensor.cpp
index 5f94548..aea09f8 100644
--- a/extensions/tensorflow/TFConvertImageToTensor.cpp
+++ b/extensions/tensorflow/TFConvertImageToTensor.cpp
@@ -321,14 +321,10 @@ int64_t 
TFConvertImageToTensor::ImageReadCallback::process(const std::shared_ptr
   if (tensor_->AllocatedBytes() < stream->size()) {
     throw std::runtime_error("Tensor is not big enough to hold FlowFile 
bytes");
   }
-
-  auto num_read = stream->read(tensor_->flat<unsigned char>().data(),
-                                   static_cast<int>(stream->size()));
-
+  const auto num_read = stream->read(tensor_->flat<unsigned char>().data(), 
stream->size());
   if (num_read != stream->size()) {
     throw std::runtime_error("TensorReadCallback failed to fully read flow 
file input stream");
   }
-
   return num_read;
 }
 
diff --git a/extensions/tensorflow/TFExtractTopLabels.cpp 
b/extensions/tensorflow/TFExtractTopLabels.cpp
index f0853c1..b73a3e5 100644
--- a/extensions/tensorflow/TFExtractTopLabels.cpp
+++ b/extensions/tensorflow/TFExtractTopLabels.cpp
@@ -19,6 +19,8 @@
 
 #include "tensorflow/cc/ops/standard_ops.h"
 
+#include "utils/gsl.h"
+
 namespace org {
 namespace apache {
 namespace nifi {
@@ -124,7 +126,7 @@ void TFExtractTopLabels::onTrigger(const 
std::shared_ptr<core::ProcessContext> &
 }
 
 int64_t TFExtractTopLabels::LabelsReadCallback::process(const 
std::shared_ptr<io::BaseStream>& stream) {
-  int64_t total_read = 0;
+  size_t total_read = 0;
   std::string label;
   uint64_t max_label_len = 65536;
   label.resize(max_label_len);
@@ -134,9 +136,8 @@ int64_t 
TFExtractTopLabels::LabelsReadCallback::process(const std::shared_ptr<io
   buf.resize(buf_size);
 
   while (total_read < stream->size()) {
-    auto read = stream->read(reinterpret_cast<uint8_t *>(&buf[0]), 
static_cast<int>(buf_size));
-
-    for (auto i = 0; i < read; i++) {
+    const auto read = stream->read(reinterpret_cast<uint8_t *>(&buf[0]), 
buf_size);
+    for (size_t i = 0; i < read; i++) {
       if (buf[i] == '\n' || total_read + i == stream->size()) {
         labels_->emplace_back(label.substr(0, label_size));
         label_size = 0;
@@ -149,21 +150,18 @@ int64_t 
TFExtractTopLabels::LabelsReadCallback::process(const std::shared_ptr<io
     total_read += read;
   }
 
-  return total_read;
+  return gsl::narrow<int64_t>(total_read);
 }
 
 int64_t TFExtractTopLabels::TensorReadCallback::process(const 
std::shared_ptr<io::BaseStream>& stream) {
   std::string tensor_proto_buf;
   tensor_proto_buf.resize(stream->size());
-  auto num_read = stream->read(reinterpret_cast<uint8_t 
*>(&tensor_proto_buf[0]),
-                                   static_cast<int>(stream->size()));
-
+  const auto num_read = stream->read(reinterpret_cast<uint8_t 
*>(&tensor_proto_buf[0]), stream->size());
   if (num_read != stream->size()) {
     throw std::runtime_error("TensorReadCallback failed to fully read flow 
file input stream");
   }
-
   tensor_proto_->ParseFromString(tensor_proto_buf);
-  return num_read;
+  return gsl::narrow<int64_t>(num_read);
 }
 
 } /* namespace processors */
diff --git a/libminifi/include/io/CRCStream.h b/libminifi/include/io/CRCStream.h
index abe0762..b6ae751 100644
--- a/libminifi/include/io/CRCStream.h
+++ b/libminifi/include/io/CRCStream.h
@@ -86,7 +86,7 @@ class InputCRCStream : public virtual 
CRCStreamBase<StreamType>, public InputStr
 
   size_t read(uint8_t *buf, size_t buflen) override {
     const auto ret = child_stream_->read(buf, buflen);
-    if (ret > 0 && ret != static_cast<size_t>(-1)) {
+    if (ret > 0 && !io::isError(ret)) {
       crc_ = crc32(crc_, buf, ret);
     }
     return ret;
diff --git a/libminifi/include/io/InputStream.h 
b/libminifi/include/io/InputStream.h
index cebf266..4360acc 100644
--- a/libminifi/include/io/InputStream.h
+++ b/libminifi/include/io/InputStream.h
@@ -36,41 +36,41 @@ class InputStream : public virtual Stream {
     throw std::runtime_error("Querying size is not supported");
   }
   /**
-   * reads a byte array from the stream
+   * Reads a byte array from the stream. Use isError (Stream.h) to check for 
errors.
    * @param value reference in which will set the result
    * @param len length to read
-   * @return resulting read size or static_cast<size_t>(-1) on error or 
static_cast<size_t>(-2) for ClientSocket EAGAIN
+   * @return resulting read size or static_cast<size_t>(-1) on error or 
static_cast<size_t>(-2) on EAGAIN
    **/
   virtual size_t read(uint8_t *value, size_t len) = 0;
 
   size_t read(std::vector<uint8_t>& buffer, size_t len);
 
   /**
-   * read string from stream
+   * Read string from stream. Use isError (Stream.h) to check for errors.
    * @param str reference string
-   * @return resulting read size
+   * @return resulting read size or static_cast<size_t>(-1) on error or 
static_cast<size_t>(-2) on EAGAIN
    **/
   size_t read(std::string &str, bool widen = false);
 
   /**
-   * read a bool from stream
+   * Read a bool from stream. Use isError (Stream.h) to check for errors.
    * @param value reference to the output
-   * @return resulting read size
+   * @return resulting read size or static_cast<size_t>(-1) on error or 
static_cast<size_t>(-2) on EAGAIN
    **/
   size_t read(bool& value);
 
   /**
-   * read a uuid from stream
+   * Read a uuid from stream. Use isError (Stream.h) to check for errors.
    * @param value reference to the output
-   * @return resulting read size
+   * @return resulting read size or static_cast<size_t>(-1) on error or 
static_cast<size_t>(-2) on EAGAIN
    **/
   size_t read(utils::Identifier& value);
 
   /**
-  * reads sizeof(Integral) bytes from the stream
-  * @param value reference in which will set the result
-  * @return resulting read size
-  **/
+   * Reads sizeof(Integral) bytes from the stream. Use isError (Stream.h) to 
check for errors.
+   * @param value reference in which will set the result
+   * @return resulting read size or static_cast<size_t>(-1) on error or 
static_cast<size_t>(-2) on EAGAIN
+   **/
   template<typename Integral, typename = 
std::enable_if<std::is_unsigned<Integral>::value && !std::is_same<Integral, 
bool>::value>>
   size_t read(Integral& value) {
     uint8_t buf[sizeof(Integral)]{};
diff --git a/libminifi/include/io/Stream.h b/libminifi/include/io/Stream.h
index cb528e5..8cbff10 100644
--- a/libminifi/include/io/Stream.h
+++ b/libminifi/include/io/Stream.h
@@ -24,6 +24,15 @@ namespace nifi {
 namespace minifi {
 namespace io {
 
+inline bool isError(const size_t read_return) noexcept {
+  return read_return == static_cast<size_t>(-1)   // general error
+      || read_return == static_cast<size_t>(-2);  // Socket EAGAIN, to be 
refactored to eliminate this error condition
+}
+
+inline bool isError(const int write_return) noexcept {
+  return write_return == -1;
+}
+
 /**
  * All streams serialize/deserialize in big-endian
  */
diff --git a/libminifi/include/io/StreamPipe.h 
b/libminifi/include/io/StreamPipe.h
index 8c88568..abac015 100644
--- a/libminifi/include/io/StreamPipe.h
+++ b/libminifi/include/io/StreamPipe.h
@@ -50,7 +50,7 @@ inline int64_t pipe(const std::shared_ptr<io::InputStream>& 
src, const std::shar
   int64_t totalTransferred = 0;
   while (true) {
     const auto readRet = src->read(buffer, sizeof(buffer));
-    if (readRet == static_cast<size_t>(-1)) return -1;
+    if (io::isError(readRet)) return -1;
     if (readRet == 0) break;
     auto remaining = readRet;
     int transferred = 0;
diff --git a/libminifi/include/provenance/Provenance.h 
b/libminifi/include/provenance/Provenance.h
index c2fd38c..ed1cadc 100644
--- a/libminifi/include/provenance/Provenance.h
+++ b/libminifi/include/provenance/Provenance.h
@@ -367,7 +367,7 @@ class ProvenanceEventRecord : public 
core::SerializableComponent {
 
     std::string uuid;
     const auto uuidret = outStream.read(uuid);
-    if (uuidret == 0 || uuidret == static_cast<size_t>(-1)) {
+    if (uuidret == 0 || io::isError(uuidret)) {
       return 0;
     }
 
diff --git a/libminifi/include/sitetosite/SiteToSiteClient.h 
b/libminifi/include/sitetosite/SiteToSiteClient.h
index 36c3d96..b372ca3 100644
--- a/libminifi/include/sitetosite/SiteToSiteClient.h
+++ b/libminifi/include/sitetosite/SiteToSiteClient.h
@@ -306,7 +306,7 @@ class ReadCallback : public InputStreamCallback {
     do {
       const auto readSize = stream->read(buffer, 8192);
       if (readSize == 0) break;
-      if (readSize == static_cast<size_t>(-1)) return -1;
+      if (io::isError(readSize)) return -1;
       const auto ret = _packet->transaction_->getStream().write(buffer, 
readSize);
       if (ret != readSize) {
         logging::LOG_INFO(_packet->logger_reference_) << "Site2Site Send Flow 
Size " << readSize << " Failed " << ret;
diff --git a/libminifi/src/FlowControlProtocol.cpp 
b/libminifi/src/FlowControlProtocol.cpp
index 8d8ad4a..79334c2 100644
--- a/libminifi/src/FlowControlProtocol.cpp
+++ b/libminifi/src/FlowControlProtocol.cpp
@@ -71,26 +71,26 @@ int FlowControlProtocol::selectClient(int msec) {
 }
 
 int FlowControlProtocol::readData(uint8_t *buf, int buflen) {
+  gsl_Expects(buflen >= 0);
   int sendSize = buflen;
 
   while (buflen) {
-    int status;
-    status = selectClient(MAX_READ_TIMEOUT);
-    if (status <= 0) {
-      return status;
+    const auto selectstatus = selectClient(MAX_READ_TIMEOUT);
+    if (selectstatus <= 0) {
+      return selectstatus;
     }
 #ifdef WIN32
-    status = _read(_socket, buf, buflen);
+    const auto readstatus = _read(_socket, buf, buflen);
 #elif !defined(__MACH__)
-    status = read(_socket, buf, buflen);
+    const auto readstatus = read(_socket, buf, gsl::narrow<size_t>(buflen));
 #else
-    status = recv(_socket, buf, buflen, 0);
+    const auto readstatus = recv(_socket, buf, buflen, 0);
 #endif
-    if (status <= 0) {
-      return status;
+    if (readstatus <= 0) {
+      return gsl::narrow<int>(readstatus);
     }
-    buflen -= status;
-    buf += status;
+    buflen -= readstatus;
+    buf += readstatus;
   }
 
   return sendSize;
diff --git a/libminifi/src/FlowFileRecord.cpp b/libminifi/src/FlowFileRecord.cpp
index f7b1638..89096a9 100644
--- a/libminifi/src/FlowFileRecord.cpp
+++ b/libminifi/src/FlowFileRecord.cpp
@@ -186,14 +186,14 @@ std::shared_ptr<FlowFileRecord> 
FlowFileRecord::DeSerialize(io::InputStream& inS
 
   {
     const auto ret = inStream.read(file->uuid_);
-    if (ret == 0 || ret == static_cast<size_t>(-1)) {
+    if (ret == 0 || io::isError(ret)) {
       return {};
     }
   }
 
   {
     const auto ret = inStream.read(container);
-    if (ret == 0 || ret == static_cast<size_t>(-1)) {
+    if (ret == 0 || io::isError(ret)) {
       return {};
     }
   }
@@ -211,14 +211,14 @@ std::shared_ptr<FlowFileRecord> 
FlowFileRecord::DeSerialize(io::InputStream& inS
     std::string key;
     {
       const auto ret = inStream.read(key, true);
-      if (ret == 0 || ret == static_cast<size_t>(-1)) {
+      if (ret == 0 || io::isError(ret)) {
         return {};
       }
     }
     std::string value;
     {
       const auto ret = inStream.read(value, true);
-      if (ret == 0 || ret == static_cast<size_t>(-1)) {
+      if (ret == 0 || io::isError(ret)) {
         return {};
       }
     }
@@ -228,7 +228,7 @@ std::shared_ptr<FlowFileRecord> 
FlowFileRecord::DeSerialize(io::InputStream& inS
   std::string content_full_path;
   {
     const auto ret = inStream.read(content_full_path);
-    if (ret == 0 || ret == static_cast<size_t>(-1)) {
+    if (ret == 0 || io::isError(ret)) {
       return {};
     }
   }
diff --git a/libminifi/src/c2/ControllerSocketProtocol.cpp 
b/libminifi/src/c2/ControllerSocketProtocol.cpp
index 73b74ff..d441182 100644
--- a/libminifi/src/c2/ControllerSocketProtocol.cpp
+++ b/libminifi/src/c2/ControllerSocketProtocol.cpp
@@ -102,7 +102,7 @@ void 
ControllerSocketProtocol::initialize(core::controller::ControllerServicePro
         {
           std::string componentStr;
           const auto size = stream->read(componentStr);
-          if ( size != static_cast<size_t>(-1) ) {
+          if (!io::isError(size)) {
             auto components = update_sink_->getComponents(componentStr);
             for (const auto& component : components) {
               component->start();
@@ -116,7 +116,7 @@ void 
ControllerSocketProtocol::initialize(core::controller::ControllerServicePro
         {
           std::string componentStr;
           const auto size = stream->read(componentStr);
-          if ( size != static_cast<size_t>(-1) ) {
+          if (!io::isError(size)) {
             auto components = update_sink_->getComponents(componentStr);
             for (const auto& component : components) {
               component->stop();
@@ -130,7 +130,7 @@ void 
ControllerSocketProtocol::initialize(core::controller::ControllerServicePro
         {
           std::string connection;
           const auto size = stream->read(connection);
-          if ( size != static_cast<size_t>(-1) ) {
+          if (!io::isError(size)) {
             update_sink_->clearConnection(connection);
           }
         }
@@ -138,21 +138,25 @@ void 
ControllerSocketProtocol::initialize(core::controller::ControllerServicePro
         case Operation::UPDATE:
         {
           std::string what;
-          const auto size = stream->read(what);
-          if (size == static_cast<size_t>(-1)) {
-            logger_->log_debug("Connection broke");
-            break;
+          {
+            const auto size = stream->read(what);
+            if (io::isError(size)) {
+              logger_->log_debug("Connection broke");
+              break;
+            }
           }
           if (what == "flow") {
             std::string ff_loc;
-            const auto size = stream->read(ff_loc);
+            {
+              const auto size = stream->read(ff_loc);
+              if (io::isError(size)) {
+                logger_->log_debug("Connection broke");
+                break;
+              }
+            }
             std::ifstream tf(ff_loc);
             std::string configuration((std::istreambuf_iterator<char>(tf)),
                 std::istreambuf_iterator<char>());
-            if (size == static_cast<size_t>(-1)) {
-              logger_->log_debug("Connection broke");
-              break;
-            }
             update_sink_->applyUpdate("ControllerSocketProtocol", 
configuration);
           }
         }
@@ -161,14 +165,14 @@ void 
ControllerSocketProtocol::initialize(core::controller::ControllerServicePro
         {
           std::string what;
           const auto size = stream->read(what);
-          if (size == static_cast<size_t>(-1)) {
+          if (io::isError(size)) {
             logger_->log_debug("Connection broke");
             break;
           }
           if (what == "queue") {
             std::string connection;
             const auto size_ = stream->read(connection);
-            if (size_ == static_cast<size_t>(-1)) {
+            if (io::isError(size_)) {
               logger_->log_debug("Connection broke");
               break;
             }
diff --git a/libminifi/src/core/ProcessSessionReadCallback.cpp 
b/libminifi/src/core/ProcessSessionReadCallback.cpp
index d62fac8..0d822c7 100644
--- a/libminifi/src/core/ProcessSessionReadCallback.cpp
+++ b/libminifi/src/core/ProcessSessionReadCallback.cpp
@@ -49,7 +49,7 @@ int64_t ProcessSessionReadCallback::process(const 
std::shared_ptr<io::BaseStream
   uint8_t buffer[8192];
   do {
     const auto read = stream->read(buffer, 8192);
-    if (read == static_cast<size_t>(-1)) return -1;
+    if (io::isError(read)) return -1;
     if (read == 0) break;
     if (!_tmpFileOs.write(reinterpret_cast<char*>(buffer), read)) {
       return -1;
diff --git a/libminifi/src/io/InputStream.cpp b/libminifi/src/io/InputStream.cpp
index cd4166b..be0a273 100644
--- a/libminifi/src/io/InputStream.cpp
+++ b/libminifi/src/io/InputStream.cpp
@@ -35,7 +35,7 @@ size_t InputStream::read(std::vector<uint8_t>& buffer, size_t 
len) {
     buffer.resize(len);
   }
   const auto ret = read(buffer.data(), len);
-  if (ret == static_cast<size_t>(-1)) return ret;
+  if (io::isError(ret)) return ret;
   buffer.resize((std::max)(ret, size_t{0}));
   return ret;
 }
@@ -75,7 +75,7 @@ size_t InputStream::read(std::string &str, bool widen) {
     ret = read(len);
   }
 
-  if (ret <= 0) {
+  if (ret == 0 || isError(ret)) {
     return ret;
   }
 
diff --git a/libminifi/src/provenance/Provenance.cpp 
b/libminifi/src/provenance/Provenance.cpp
index 0dd9cfc..f3b0715 100644
--- a/libminifi/src/provenance/Provenance.cpp
+++ b/libminifi/src/provenance/Provenance.cpp
@@ -239,7 +239,7 @@ bool ProvenanceEventRecord::DeSerialize(const uint8_t 
*buffer, const size_t buff
 
   {
     const auto ret = outStream.read(uuid_);
-    if (ret == 0 || ret == static_cast<size_t>(-1)) {
+    if (ret == 0 || io::isError(ret)) {
       return false;
     }
   }
@@ -283,28 +283,28 @@ bool ProvenanceEventRecord::DeSerialize(const uint8_t 
*buffer, const size_t buff
 
   {
     const auto ret = outStream.read(this->_componentId);
-    if (ret == 0 || ret == static_cast<size_t>(-1)) {
+    if (ret == 0 || io::isError(ret)) {
       return false;
     }
   }
 
   {
     const auto ret = outStream.read(this->_componentType);
-    if (ret == 0 || ret == static_cast<size_t>(-1)) {
+    if (ret == 0 || io::isError(ret)) {
       return false;
     }
   }
 
   {
     const auto ret = outStream.read(this->flow_uuid_);
-    if (ret == 0 || ret == static_cast<size_t>(-1)) {
+    if (ret == 0 || io::isError(ret)) {
       return false;
     }
   }
 
   {
     const auto ret = outStream.read(this->_details);
-    if (ret == 0 || ret == static_cast<size_t>(-1)) {
+    if (ret == 0 || io::isError(ret)) {
       return false;
     }
   }
@@ -322,14 +322,14 @@ bool ProvenanceEventRecord::DeSerialize(const uint8_t 
*buffer, const size_t buff
     std::string key;
     {
       const auto ret = outStream.read(key);
-      if (ret == 0 || ret == static_cast<size_t>(-1)) {
+      if (ret == 0 || io::isError(ret)) {
         return false;
       }
     }
     std::string value;
     {
       const auto ret = outStream.read(value);
-      if (ret == 0 || ret == static_cast<size_t>(-1)) {
+      if (ret == 0 || io::isError(ret)) {
         return false;
       }
     }
@@ -338,7 +338,7 @@ bool ProvenanceEventRecord::DeSerialize(const uint8_t 
*buffer, const size_t buff
 
   {
     const auto ret = outStream.read(this->_contentFullPath);
-    if (ret == 0 || ret == static_cast<size_t>(-1)) {
+    if (ret == 0 || io::isError(ret)) {
       return false;
     }
   }
@@ -359,7 +359,7 @@ bool ProvenanceEventRecord::DeSerialize(const uint8_t 
*buffer, const size_t buff
 
   {
     const auto ret = outStream.read(this->_sourceQueueIdentifier);
-    if (ret == 0 || ret == static_cast<size_t>(-1)) {
+    if (ret == 0 || io::isError(ret)) {
       return false;
     }
   }
@@ -378,7 +378,7 @@ bool ProvenanceEventRecord::DeSerialize(const uint8_t 
*buffer, const size_t buff
       utils::Identifier parentUUID;
       {
         const auto ret = outStream.read(parentUUID);
-        if (ret == 0 || ret == static_cast<size_t>(-1)) {
+        if (ret == 0 || io::isError(ret)) {
           return false;
         }
       }
@@ -395,7 +395,7 @@ bool ProvenanceEventRecord::DeSerialize(const uint8_t 
*buffer, const size_t buff
       utils::Identifier childUUID;
       {
         const auto ret = outStream.read(childUUID);
-        if (ret == 0 || ret == static_cast<size_t>(-1)) {
+        if (ret == 0 || io::isError(ret)) {
           return false;
         }
       }
@@ -404,20 +404,20 @@ bool ProvenanceEventRecord::DeSerialize(const uint8_t 
*buffer, const size_t buff
   } else if (this->_eventType == ProvenanceEventRecord::SEND || 
this->_eventType == ProvenanceEventRecord::FETCH) {
     {
       const auto ret = outStream.read(this->_transitUri);
-      if (ret == 0 || ret == static_cast<size_t>(-1)) {
+      if (ret == 0 || io::isError(ret)) {
         return false;
       }
     }
   } else if (this->_eventType == ProvenanceEventRecord::RECEIVE) {
     {
       const auto ret = outStream.read(this->_transitUri);
-      if (ret == 0 || ret == static_cast<size_t>(-1)) {
+      if (ret == 0 || io::isError(ret)) {
         return false;
       }
     }
     {
       const auto ret = outStream.read(this->_sourceSystemFlowFileIdentifier);
-      if (ret == 0 || ret == static_cast<size_t>(-1)) {
+      if (ret == 0 || io::isError(ret)) {
         return false;
       }
     }
diff --git a/libminifi/src/sitetosite/RawSocketProtocol.cpp 
b/libminifi/src/sitetosite/RawSocketProtocol.cpp
index 9dce59d..463a96f 100644
--- a/libminifi/src/sitetosite/RawSocketProtocol.cpp
+++ b/libminifi/src/sitetosite/RawSocketProtocol.cpp
@@ -135,7 +135,7 @@ bool RawSiteToSiteClient::initiateResourceNegotiation() {
   uint8_t statusCode;
   {
     const auto ret = peer_->read(statusCode);
-    if (ret == 0 || ret == static_cast<size_t>(-1)) {
+    if (ret == 0 || io::isError(ret)) {
       logger_->log_debug("result of writing version status code  %i", ret);
       return false;
     }
@@ -149,7 +149,7 @@ bool RawSiteToSiteClient::initiateResourceNegotiation() {
       uint32_t serverVersion;
       {
         const auto ret = peer_->read(serverVersion);
-        if (ret == 0 || ret == static_cast<size_t>(-1)) {
+        if (ret == 0 || io::isError(ret)) {
           return false;
         }
       }
@@ -202,7 +202,7 @@ bool 
RawSiteToSiteClient::initiateCodecResourceNegotiation() {
   uint8_t statusCode;
   {
     const auto ret = peer_->read(statusCode);
-    if (ret == 0 || ret == static_cast<size_t>(-1)) {
+    if (ret == 0 || io::isError(ret)) {
       return false;
     }
   }
@@ -214,7 +214,7 @@ bool 
RawSiteToSiteClient::initiateCodecResourceNegotiation() {
       uint32_t serverVersion;
       {
         const auto ret = peer_->read(serverVersion);
-        if (ret == 0 || ret == static_cast<size_t>(-1)) {
+        if (ret == 0 || io::isError(ret)) {
           return false;
         }
       }
@@ -359,7 +359,7 @@ bool 
RawSiteToSiteClient::getPeerList(std::vector<PeerStatus> &peers) {
     uint32_t number;
     {
       const auto ret = peer_->read(number);
-      if (ret == 0 || ret == static_cast<size_t>(-1)) {
+      if (ret == 0 || io::isError(ret)) {
         tearDown();
         return false;
       }
@@ -369,7 +369,7 @@ bool 
RawSiteToSiteClient::getPeerList(std::vector<PeerStatus> &peers) {
       std::string host;
       {
         const auto ret = peer_->read(host);
-        if (ret == 0 || ret == static_cast<size_t>(-1)) {
+        if (ret == 0 || io::isError(ret)) {
           tearDown();
           return false;
         }
@@ -377,7 +377,7 @@ bool 
RawSiteToSiteClient::getPeerList(std::vector<PeerStatus> &peers) {
       uint32_t port;
       {
         const auto ret = peer_->read(port);
-        if (ret == 0 || ret == static_cast<size_t>(-1)) {
+        if (ret == 0 || io::isError(ret)) {
           tearDown();
           return false;
         }
@@ -385,7 +385,7 @@ bool 
RawSiteToSiteClient::getPeerList(std::vector<PeerStatus> &peers) {
       uint8_t secure;
       {
         const auto ret = peer_->read(secure);
-        if (ret == 0 || ret == static_cast<size_t>(-1)) {
+        if (ret == 0 || io::isError(ret)) {
           tearDown();
           return false;
         }
@@ -393,7 +393,7 @@ bool 
RawSiteToSiteClient::getPeerList(std::vector<PeerStatus> &peers) {
       uint32_t count;
       {
         const auto ret = peer_->read(count);
-        if (ret == 0 || ret == static_cast<size_t>(-1)) {
+        if (ret == 0 || io::isError(ret)) {
           tearDown();
           return false;
         }
@@ -422,7 +422,7 @@ bool 
RawSiteToSiteClient::getPeerList(std::vector<PeerStatus> &peers) {
     std::string requestTypeStr;
 
     const auto ret = peer_->read(requestTypeStr);
-    if (ret == 0 || ret == static_cast<size_t>(-1))
+    if (ret == 0 || io::isError(ret))
       return static_cast<int>(ret);
 
     for (int i = NEGOTIATE_FLOWFILE_CODEC; i <= SHUTDOWN; i++) {
diff --git a/libminifi/src/sitetosite/SiteToSiteClient.cpp 
b/libminifi/src/sitetosite/SiteToSiteClient.cpp
index 7e4a38d..c7cd919 100644
--- a/libminifi/src/sitetosite/SiteToSiteClient.cpp
+++ b/libminifi/src/sitetosite/SiteToSiteClient.cpp
@@ -31,19 +31,25 @@ namespace sitetosite {
 
 int SiteToSiteClient::readResponse(const std::shared_ptr<Transaction>& 
/*transaction*/, RespondCode &code, std::string &message) {
   uint8_t firstByte;
-  auto ret = peer_->read(firstByte);
-  if (ret == 0 || ret == static_cast<size_t>(-1) || firstByte != 
CODE_SEQUENCE_VALUE_1)
-    return -1;
+  {
+    const auto ret = peer_->read(firstByte);
+    if (ret == 0 || io::isError(ret) || firstByte != CODE_SEQUENCE_VALUE_1)
+      return -1;
+  }
 
   uint8_t secondByte;
-  ret = peer_->read(secondByte);
-  if (ret == 0 || ret == static_cast<size_t>(-1) || secondByte != 
CODE_SEQUENCE_VALUE_2)
-    return -1;
+  {
+    const auto ret = peer_->read(secondByte);
+    if (ret == 0 || io::isError(ret) || secondByte != CODE_SEQUENCE_VALUE_2)
+      return -1;
+  }
 
   uint8_t thirdByte;
-  ret = peer_->read(thirdByte);
-  if (ret == 0 || ret == static_cast<size_t>(-1))
-    return gsl::narrow_cast<int>(ret);
+  {
+    const auto ret = peer_->read(thirdByte);
+    if (ret == 0 || io::isError(ret))
+      return static_cast<int>(ret);
+  }
 
   code = (RespondCode) thirdByte;
   RespondCodeContext *resCode = this->getRespondCodeContext(code);
@@ -51,8 +57,8 @@ int SiteToSiteClient::readResponse(const 
std::shared_ptr<Transaction>& /*transac
     return -1;
   }
   if (resCode->hasDescription) {
-    ret = peer_->read(message);
-    if (ret != static_cast<size_t>(-1))
+    const auto ret = peer_->read(message);
+    if (ret == 0 || !io::isError(ret))
       return -1;
   }
   return gsl::narrow<int>(3 + message.size());
@@ -584,7 +590,7 @@ bool SiteToSiteClient::receive(const utils::Identifier& 
transactionID, DataPacke
   uint32_t numAttributes;
   {
     const auto ret = transaction->getStream().read(numAttributes);
-    if (ret == 0 || ret == static_cast<size_t>(-1) || numAttributes > 
MAX_NUM_ATTRIBUTES) {
+    if (ret == 0 || io::isError(ret) || numAttributes > MAX_NUM_ATTRIBUTES) {
       return false;
     }
   }
@@ -596,13 +602,13 @@ bool SiteToSiteClient::receive(const utils::Identifier& 
transactionID, DataPacke
     std::string value;
     {
       const auto ret = transaction->getStream().read(key, true);
-      if (ret == 0 || ret == static_cast<size_t>(-1)) {
+      if (ret == 0 || io::isError(ret)) {
         return false;
       }
     }
     {
       const auto ret = transaction->getStream().read(value, true);
-      if (ret == 0 || ret == static_cast<size_t>(-1)) {
+      if (ret == 0 || io::isError(ret)) {
         return false;
       }
     }
@@ -613,7 +619,7 @@ bool SiteToSiteClient::receive(const utils::Identifier& 
transactionID, DataPacke
   uint64_t len;
   {
     const auto ret = transaction->getStream().read(len);
-    if (ret == 0 || ret == static_cast<size_t>(-1)) {
+    if (ret == 0 || io::isError(ret)) {
       return false;
     }
   }
diff --git a/libminifi/test/BufferReader.h b/libminifi/test/BufferReader.h
index 312ca76..63bce60 100644
--- a/libminifi/test/BufferReader.h
+++ b/libminifi/test/BufferReader.h
@@ -33,7 +33,7 @@ class BufferReader : public 
org::apache::nifi::minifi::InputStreamCallback {
     while (remaining_len > 0) {
       const auto ret = input.read(tmpBuffer, std::min(remaining_len, 
sizeof(tmpBuffer)));
       if (ret == 0) break;
-      if (ret == static_cast<size_t>(-1)) return ret;
+      if (minifi::io::isError(ret)) return ret;
       remaining_len -= ret;
       total_read += ret;
       auto prevSize = buffer_.size();
diff --git a/libminifi/test/archive-tests/CompressContentTests.cpp 
b/libminifi/test/archive-tests/CompressContentTests.cpp
index 3d0da83..c11f47a 100644
--- a/libminifi/test/archive-tests/CompressContentTests.cpp
+++ b/libminifi/test/archive-tests/CompressContentTests.cpp
@@ -57,7 +57,7 @@ class ReadCallback: public minifi::InputStreamCallback {
     do {
       const auto ret = stream->read(buffer_ + read_size_, buffer_size_ - 
read_size_);
       if (ret == 0) break;
-      if (ret == static_cast<size_t>(-1)) return -1;
+      if (minifi::io::isError(ret)) return -1;
       read_size_ += gsl::narrow<size_t>(ret);
       total_read += ret;
     } while (buffer_size_ != read_size_);
diff --git a/libminifi/test/archive-tests/MergeFileTests.cpp 
b/libminifi/test/archive-tests/MergeFileTests.cpp
index 074b153..4114672 100644
--- a/libminifi/test/archive-tests/MergeFileTests.cpp
+++ b/libminifi/test/archive-tests/MergeFileTests.cpp
@@ -90,7 +90,7 @@ class FixedBuffer : public minifi::InputStreamCallback {
     do {
       const auto ret = input.read(end(), len);
       if (ret == 0) break;
-      if (ret == static_cast<size_t>(-1)) return -1;
+      if (minifi::io::isError(ret)) return -1;
       size_ += ret;
       len -= ret;
       total_read += ret;
@@ -112,7 +112,8 @@ std::vector<FixedBuffer> read_archives(const FixedBuffer& 
input) {
    public:
     explicit ArchiveEntryReader(archive* arch) : arch(arch) {}
     size_t read(uint8_t* out, std::size_t len) {
-      return gsl::narrow_cast<size_t>(archive_read_data(arch, out, len));
+      const auto ret = archive_read_data(arch, out, len);
+      return ret < 0 ? static_cast<size_t>(-1) : gsl::narrow<size_t>(ret);
     }
    private:
     archive* arch;
diff --git a/libminifi/test/rocksdb-tests/ContentSessionTests.cpp 
b/libminifi/test/rocksdb-tests/ContentSessionTests.cpp
index d2e6d51..77b7c73 100644
--- a/libminifi/test/rocksdb-tests/ContentSessionTests.cpp
+++ b/libminifi/test/rocksdb-tests/ContentSessionTests.cpp
@@ -58,7 +58,7 @@ const std::shared_ptr<minifi::io::BaseStream>& 
operator>>(const std::shared_ptr<
   while (true) {
     const auto ret = stream->read(buffer, sizeof(buffer));
     REQUIRE(ret >= 0);
-    REQUIRE(ret != static_cast<size_t>(-1));
+    REQUIRE(!minifi::io::isError(ret));
     if (ret == 0) { break; }
     str += std::string{reinterpret_cast<char*>(buffer), ret};
   }
diff --git a/libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp 
b/libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp
index 7b5c75c..4d5e5a3 100644
--- a/libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp
+++ b/libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp
@@ -103,8 +103,8 @@ TEST_CASE("Delete Claim", "[TestDBCR2]") {
 
   std::string readstr;
 
-  // -1 tell us we have an invalid stream
-  REQUIRE(read_stream->read(readstr) == static_cast<size_t>(-1));
+  // error tell us we have an invalid stream
+  REQUIRE(minifi::io::isError(read_stream->read(readstr)));
 }
 
 TEST_CASE("Test Empty Claim", "[TestDBCR3]") {
@@ -140,7 +140,7 @@ TEST_CASE("Test Empty Claim", "[TestDBCR3]") {
   std::string readstr;
 
   // -1 tell us we have an invalid stream
-  REQUIRE(read_stream->read(readstr) == static_cast<size_t>(-1));
+  REQUIRE(minifi::io::isError(read_stream->read(readstr)));
 }
 
 TEST_CASE("Delete NonExistent Claim", "[TestDBCR4]") {
diff --git a/libminifi/test/rocksdb-tests/RocksDBStreamTests.cpp 
b/libminifi/test/rocksdb-tests/RocksDBStreamTests.cpp
index 5e7c3b6..69336b8 100644
--- a/libminifi/test/rocksdb-tests/RocksDBStreamTests.cpp
+++ b/libminifi/test/rocksdb-tests/RocksDBStreamTests.cpp
@@ -72,5 +72,5 @@ TEST_CASE_METHOD(RocksDBStreamTest, "Read zero bytes") {
 
   minifi::io::RocksDbStream nonExistingStream("two", 
gsl::make_not_null(db.get()));
 
-  REQUIRE(nonExistingStream.read(nullptr, 0) == static_cast<size_t>(-1));
+  REQUIRE(minifi::io::isError(nonExistingStream.read(nullptr, 0)));
 }
diff --git a/libminifi/test/unit/FileStreamTests.cpp 
b/libminifi/test/unit/FileStreamTests.cpp
index 1ff4ab9..57bae57 100644
--- a/libminifi/test/unit/FileStreamTests.cpp
+++ b/libminifi/test/unit/FileStreamTests.cpp
@@ -173,7 +173,7 @@ TEST_CASE("TestFileBadArgumentNoChange3", "[TestLoader]") {
 
   std::vector<uint8_t> verifybuffer;
 
-  REQUIRE(stream.read(nullptr, stream.size()) == static_cast<size_t>(-1));
+  REQUIRE(minifi::io::isError(stream.read(nullptr, stream.size())));
 
   data = verifybuffer.data();
 
@@ -280,7 +280,7 @@ TEST_CASE("Non-existing file read/write test") {
   REQUIRE(test_controller.getLog().getInstance().contains("Error writing to 
file: invalid file stream", std::chrono::seconds(0)));
   std::vector<uint8_t> readBuffer;
   stream.seek(0);
-  REQUIRE(stream.read(readBuffer, 1) == static_cast<size_t>(-1));
+  REQUIRE(minifi::io::isError(stream.read(readBuffer, 1)));
   REQUIRE(test_controller.getLog().getInstance().contains("Error reading from 
file: invalid file stream", std::chrono::seconds(0)));
 }
 
@@ -300,10 +300,10 @@ TEST_CASE("Existing file read/write test") {
   REQUIRE_FALSE(test_controller.getLog().getInstance().contains("Error writing 
to file", std::chrono::seconds(0)));
   std::vector<uint8_t> readBuffer;
   stream.seek(0);
-  REQUIRE_FALSE(stream.read(readBuffer, 11) == static_cast<size_t>(-1));
+  REQUIRE_FALSE(minifi::io::isError(stream.read(readBuffer, 11)));
   REQUIRE_FALSE(test_controller.getLog().getInstance().contains("Error reading 
from file", std::chrono::seconds(0)));
   stream.seek(0);
-  REQUIRE(stream.read(nullptr, 11) == static_cast<size_t>(-1));
+  REQUIRE(minifi::io::isError(stream.read(nullptr, 11)));
   REQUIRE(test_controller.getLog().getInstance().contains("Error reading from 
file: invalid buffer", std::chrono::seconds(0)));
 }
 
diff --git a/nanofi/tests/CSite2SiteTests.cpp b/nanofi/tests/CSite2SiteTests.cpp
index ad3c086..821d8e6 100644
--- a/nanofi/tests/CSite2SiteTests.cpp
+++ b/nanofi/tests/CSite2SiteTests.cpp
@@ -147,7 +147,7 @@ void sunny_path_bootstrap(minifi::io::BaseStream* stream, 
TransferState& transfe
   while(!found_codec) {
     uint8_t handshake_data[1000];
     const auto actual_len = stream->read(handshake_data+read_len, 
1000-read_len);
-    if(actual_len == 0 || actual_len == static_cast<size_t>(-1)) {
+    if(actual_len == 0 || minifi::io::isError(actual_len)) {
       continue;
     }
     read_len += actual_len;
diff --git a/thirdparty/google-styleguide/run_linter.sh 
b/thirdparty/google-styleguide/run_linter.sh
index 5015c94..9fa4271 100755
--- a/thirdparty/google-styleguide/run_linter.sh
+++ b/thirdparty/google-styleguide/run_linter.sh
@@ -38,9 +38,8 @@ done
 [ x"$INCLUDE_DIRS" == x"" ] && echo "WARNING: No include directories 
specified."
 [ x"$SOURCE_DIRS" == x"" ] && echo "ERROR: No source directories specified." 
&& exit 1
 
-HEADERS=`find $INCLUDE_DIRS -name '*.h' | sort | uniq | tr '\n' ' '`
-SOURCES=`find $SOURCE_DIRS -name  '*.cpp' | sort | uniq | tr '\n' ' '`
+HEADERS=$(find $INCLUDE_DIRS -name '*.h' | sort | uniq | tr '\n' ' ')
+SOURCES=$(find $SOURCE_DIRS -name  '*.cpp' | sort | uniq | tr '\n' ' ')
 # this realpath alternative should work on mac
-alias prealpath="python -c 'import os, sys; 
print(os.path.realpath(sys.argv[1]))'"
-REPOSITORY="$(python -c 'import os, sys; print(os.path.realpath(sys.argv[1] + 
"../.."))' $(dirname "$0"))"
-python ${SCRIPT_DIR}/cpplint.py --linelength=200 --repository="$REPOSITORY" 
${HEADERS} ${SOURCES}
+REPOSITORY="$(python -c 'import os, sys; print(os.path.realpath(sys.argv[1] + 
"../.."))' "$(dirname "$0")")"
+python "${SCRIPT_DIR}"/cpplint.py --linelength=200 --repository="$REPOSITORY" 
${HEADERS} ${SOURCES}

Reply via email to