Github user phrocker commented on a diff in the pull request:
https://github.com/apache/nifi-minifi-cpp/pull/158#discussion_r147740763
--- Diff: extensions/http-curl/client/HTTPCallback.h ---
@@ -0,0 +1,187 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef EXTENSIONS_HTTP_CURL_CLIENT_HTTPCALLBACK_H_
+#define EXTENSIONS_HTTP_CURL_CLIENT_HTTPCALLBACK_H_
+
+#include "concurrentqueue.h"
+#include <thread>
+#include <mutex>
+#include <vector>
+#include <condition_variable>
+
+#include "utils/ByteArrayCallback.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+/**
+ * will stream as items are processed.
+ */
+class HttpStreamingCallback : public ByteInputCallBack {
+ public:
+ HttpStreamingCallback()
+ : ptr(nullptr),
+ is_alive_(true) {
+ previous_pos_ = 0;
+ rolling_count_ = 0;
+ }
+
+ virtual ~HttpStreamingCallback() {
+
+ }
+
+ void close() {
+ is_alive_ = false;
+ cv.notify_all();
+ }
+
+ virtual void seek(size_t pos) {
+ if ((pos - previous_pos_) >= current_vec_.size() ||
current_vec_.size() == 0)
+ load_buffer();
+ }
+
+ virtual int64_t process(std::shared_ptr<io::BaseStream> stream) {
+
+ std::vector<char> vec;
+
+ if (stream->getSize() > 0) {
+ vec.resize(stream->getSize());
+
+ stream->readData(reinterpret_cast<uint8_t*>(vec.data()),
stream->getSize());
+ }
+
+ size_t added_size = vec.size();
+
+ byte_arrays_.enqueue(std::move(vec));
+
+ cv.notify_all();
+
+ return added_size;
+
+ }
+
+ virtual int64_t process(uint8_t *vector, size_t size) {
+
+ std::vector<char> vec;
+
+ if (size > 0) {
+ vec.resize(size);
+
+ memcpy(vec.data(), vector, size);
+
+ size_t added_size = vec.size();
+
+ byte_arrays_.enqueue(std::move(vec));
--- End diff --
byte_arrays_ is a lock free queue. There can be concerns with using lock
free queues like moodycamel, particularly with ordering, but I've mitigated
this by how I'm using it.
---