This is an automated email from the ASF dual-hosted git repository.

adebreceni pushed a commit to branch minifi-api
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit 046d50a5c00687a8a97cd25d3662945b3defdc9c
Author: Adam Debreceni <[email protected]>
AuthorDate: Mon Dec 2 13:43:24 2024 +0100

    Remove more duplicates, rebase fix
---
 .../minifi-cpp/core/ProcessSessionReadCallback.h   |  44 ----
 .../include/serialization/FlowFileSerializer.h     |   2 +-
 libminifi/include/core/Scheduling.h                |  57 -----
 libminifi/include/core/ValidationResult.h          |  29 ---
 libminifi/include/utils/MinifiConcurrentQueue.h    | 253 ---------------------
 libminifi/include/utils/StagingQueue.h             |   2 +-
 utils/include/io/StreamCallback.h                  |  33 ---
 utils/include/io/StreamPipe.h                      |   2 +-
 utils/include/io/StreamSlice.h                     |   2 +-
 9 files changed, 4 insertions(+), 420 deletions(-)

diff --git a/core/include/minifi-cpp/core/ProcessSessionReadCallback.h 
b/core/include/minifi-cpp/core/ProcessSessionReadCallback.h
deleted file mode 100644
index 55389ac45..000000000
--- a/core/include/minifi-cpp/core/ProcessSessionReadCallback.h
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * @file ProcessSessionReadCallback.h
- * ProcessSessionReadCallback class declaration
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#pragma once
-
-#include <memory>
-#include <string>
-
-#include "core/logging/LoggerFactory.h"
-#include "FlowFileRecord.h"
-#include "io/InputStream.h"
-
-namespace org::apache::nifi::minifi::core {
-class ProcessSessionReadCallback {
- public:
-  ProcessSessionReadCallback(std::filesystem::path temp_file, 
std::filesystem::path dest_file, std::shared_ptr<logging::Logger> logger);
-  ~ProcessSessionReadCallback();
-  int64_t operator()(const std::shared_ptr<io::InputStream>& stream);
-  bool commit();
-
- private:
-  std::shared_ptr<logging::Logger> logger_;
-  std::ofstream tmp_file_os_;
-  bool write_succeeded_ = false;
-  std::filesystem::path tmp_file_;
-  std::filesystem::path dest_file_;
-};
-}  // namespace org::apache::nifi::minifi::core
diff --git a/extension-utils/include/serialization/FlowFileSerializer.h 
b/extension-utils/include/serialization/FlowFileSerializer.h
index 6948a0042..e8a612799 100644
--- a/extension-utils/include/serialization/FlowFileSerializer.h
+++ b/extension-utils/include/serialization/FlowFileSerializer.h
@@ -21,7 +21,7 @@
 #include <memory>
 #include <utility>
 #include <functional>
-#include "io/StreamCallback.h"
+#include "minifi-cpp/io/StreamCallback.h"
 
 namespace org {
 namespace apache {
diff --git a/libminifi/include/core/Scheduling.h 
b/libminifi/include/core/Scheduling.h
deleted file mode 100644
index 9a46b6cb7..000000000
--- a/libminifi/include/core/Scheduling.h
+++ /dev/null
@@ -1,57 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef LIBMINIFI_INCLUDE_CORE_SCHEDULING_H_
-#define LIBMINIFI_INCLUDE_CORE_SCHEDULING_H_
-
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace core {
-
-/*
- * Indicates the valid values for the state of a entity
- * with respect to scheduling the entity to run.
- */
-enum ScheduledState {
-  /**
-   * Entity cannot be scheduled to run
-   */
-  DISABLED,
-  /**
-   * Entity can be scheduled to run but currently is not
-   */
-  STOPPED,
-  /**
-   * Entity is currently scheduled to run
-   */
-  RUNNING
-};
-
-enum SchedulingStrategy {
-  EVENT_DRIVEN,
-  TIMER_DRIVEN,
-  CRON_DRIVEN
-};
-
-}  // namespace core
-}  // namespace minifi
-}  // namespace nifi
-}  // namespace apache
-}  // namespace org
-#endif  // LIBMINIFI_INCLUDE_CORE_SCHEDULING_H_
diff --git a/libminifi/include/core/ValidationResult.h 
b/libminifi/include/core/ValidationResult.h
deleted file mode 100644
index 9a6378147..000000000
--- a/libminifi/include/core/ValidationResult.h
+++ /dev/null
@@ -1,29 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#pragma once
-
-#include <string>
-
-namespace org::apache::nifi::minifi::core {
-
-struct ValidationResult {
-  bool valid;
-  std::string subject;
-  std::string input;
-};
-
-}  // namespace org::apache::nifi::minifi::core
diff --git a/libminifi/include/utils/MinifiConcurrentQueue.h 
b/libminifi/include/utils/MinifiConcurrentQueue.h
deleted file mode 100644
index 34c9bca27..000000000
--- a/libminifi/include/utils/MinifiConcurrentQueue.h
+++ /dev/null
@@ -1,253 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef LIBMINIFI_INCLUDE_UTILS_MINIFICONCURRENTQUEUE_H_
-#define LIBMINIFI_INCLUDE_UTILS_MINIFICONCURRENTQUEUE_H_
-
-
-#include <algorithm>
-#include <chrono>
-#include <deque>
-#include <mutex>
-#include <condition_variable>
-#include <utility>
-#include <stdexcept>
-#include <atomic>
-
-#include "utils/TryMoveCall.h"
-
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace utils {
-
-// Provides a queue API and guarantees no race conditions in case of multiple 
producers and consumers.
-// Guarantees elements to be dequeued in order of insertion
-template <typename T>
-class ConcurrentQueue {
- public:
-  ConcurrentQueue() = default;
-
-  ConcurrentQueue(const ConcurrentQueue& other) = delete;
-  ConcurrentQueue& operator=(const ConcurrentQueue& other) = delete;
-  ConcurrentQueue(ConcurrentQueue&& other)
-    : ConcurrentQueue(std::move(other), 
std::lock_guard<std::mutex>(other.mtx_)) {}
-
-  ConcurrentQueue& operator=(ConcurrentQueue&& other) {
-    if (this != &other) {
-      std::lock(mtx_, other.mtx_);
-      std::lock_guard<std::mutex> lk1(mtx_, std::adopt_lock);
-      std::lock_guard<std::mutex> lk2(other.mtx_, std::adopt_lock);
-      queue_.swap(other.queue_);
-    }
-    return *this;
-  }
-
-  virtual ~ConcurrentQueue() = default;
-
-  bool tryDequeue(T& out) {
-    std::unique_lock<std::mutex> lck(mtx_);
-    return tryDequeueImpl(lck, out);
-  }
-
-  template<typename Functor>
-  bool consume(Functor&& fun) {
-    std::unique_lock<std::mutex> lck(mtx_);
-    return consumeImpl(std::move(lck), std::forward<Functor>(fun));
-  }
-
-  bool empty() const {
-    std::unique_lock<std::mutex> lck(mtx_);
-    return emptyImpl(lck);
-  }
-
-  size_t size() const {
-    std::lock_guard<std::mutex> guard(mtx_);
-    return queue_.size();
-  }
-
-  void clear() {
-    std::lock_guard<std::mutex> guard(mtx_);
-    queue_.clear();
-  }
-
-  template<typename Functor>
-  void remove(Functor fun) {
-    std::lock_guard<std::mutex> guard(mtx_);
-    queue_.erase(std::remove_if(queue_.begin(), queue_.end(), fun), 
queue_.end());
-  }
-
-  template <typename... Args>
-  void enqueue(Args&&... args) {
-    std::lock_guard<std::mutex> guard(mtx_);
-    queue_.emplace_back(std::forward<Args>(args)...);
-  }
-
- private:
-  ConcurrentQueue(ConcurrentQueue&& other, std::lock_guard<std::mutex>&)
-    : queue_(std::move(other.queue_)) {}
-
- protected:
-  void checkLock(std::unique_lock<std::mutex>& lck) const {
-    if (!lck.owns_lock()) {
-      throw std::logic_error("Caller of protected functions of ConcurrentQueue 
should own the lock!");
-    }
-  }
-
-  // Warning: this function copies if T is not nothrow move constructible
-  bool tryDequeueImpl(std::unique_lock<std::mutex>& lck, T& out) {
-    checkLock(lck);
-    if (queue_.empty()) {
-      return false;
-    }
-    out = std::move_if_noexcept(queue_.front());
-    queue_.pop_front();
-    return true;
-  }
-
-  // Warning: this function copies if T is not nothrow move constructible
-  template<typename Functor>
-  bool consumeImpl(std::unique_lock<std::mutex>&& lock_to_adopt, Functor&& 
fun) {
-    std::unique_lock<std::mutex> lock(std::move(lock_to_adopt));
-    checkLock(lock);
-    if (queue_.empty()) {
-      return false;
-    }
-    T elem = std::move_if_noexcept(queue_.front());
-    queue_.pop_front();
-    lock.unlock();
-    TryMoveCall<Functor, T>::call(std::forward<Functor>(fun), elem);
-    return true;
-  }
-
-  bool emptyImpl(std::unique_lock<std::mutex>& lck) const {
-    checkLock(lck);
-    return queue_.empty();
-  }
-
-  mutable std::mutex mtx_;
-
- private:
-  std::deque<T> queue_;
-};
-
-
-// A ConcurrentQueue extended with a condition variable to be able to block 
and wait for incoming data
-// Stopping interrupts all consumers without a chance to consume remaining 
elements in the queue although elements can still be enqueued
-// Started means queued elements can be consumed/dequeued and dequeueWait* 
calls can block
-template <typename T>
-class ConditionConcurrentQueue : private ConcurrentQueue<T> {
- public:
-  explicit ConditionConcurrentQueue(bool start = true) : ConcurrentQueue<T>{}, 
running_{start} {}
-
-  ConditionConcurrentQueue(const ConditionConcurrentQueue& other) = delete;
-  ConditionConcurrentQueue& operator=(const ConditionConcurrentQueue& other) = 
delete;
-  ConditionConcurrentQueue(ConditionConcurrentQueue&& other) = delete;
-  ConditionConcurrentQueue& operator=(ConditionConcurrentQueue&& other) = 
delete;
-
-  using ConcurrentQueue<T>::size;
-  using ConcurrentQueue<T>::empty;
-  using ConcurrentQueue<T>::clear;
-
-  template <typename... Args>
-  void enqueue(Args&&... args) {
-    ConcurrentQueue<T>::enqueue(std::forward<Args>(args)...);
-    if (running_) {
-      cv_.notify_one();
-    }
-  }
-
-  bool dequeueWait(T& out) {
-    std::unique_lock<std::mutex> lck(this->mtx_);
-    cv_.wait(lck, [this, &lck]{ return !running_ || !this->emptyImpl(lck); }); 
 // Only wake up if there is something to return or stopped
-    return running_ && ConcurrentQueue<T>::tryDequeueImpl(lck, out);
-  }
-
-  template<typename Functor>
-  bool consumeWait(Functor&& fun) {
-    std::unique_lock<std::mutex> lck(this->mtx_);
-    cv_.wait(lck, [this, &lck]{ return !running_ || !this->emptyImpl(lck); }); 
 // Only wake up if there is something to return or stopped
-    return running_ && ConcurrentQueue<T>::consumeImpl(std::move(lck), 
std::forward<Functor>(fun));
-  }
-
-  template< class Rep, class Period >
-  bool dequeueWaitFor(T& out, const std::chrono::duration<Rep, Period>& time) {
-    std::unique_lock<std::mutex> lck(this->mtx_);
-    cv_.wait_for(lck, time, [this, &lck]{ return !running_ || 
!this->emptyImpl(lck); });  // Wake up with timeout or in case there is 
something to do
-    return running_ && ConcurrentQueue<T>::tryDequeueImpl(lck, out);
-  }
-
-  bool dequeueWaitUntil(T& out, const std::chrono::system_clock::time_point& 
time) {
-    std::unique_lock<std::mutex> lck(this->mtx_);
-    cv_.wait_until(lck, time, [this, &lck]{ return !running_ || 
!this->emptyImpl(lck); });  // Wake up with timeout or in case there is 
something to do
-    return running_ && ConcurrentQueue<T>::tryDequeueImpl(lck, out);
-  }
-
-  template<typename Functor, class Rep, class Period>
-  bool consumeWaitFor(Functor&& fun, const std::chrono::duration<Rep, Period>& 
time) {
-    std::unique_lock<std::mutex> lck(this->mtx_);
-    cv_.wait_for(lck, time, [this, &lck]{ return !running_ || 
!this->emptyImpl(lck); });  // Wake up with timeout or in case there is 
something to do
-    return running_ && ConcurrentQueue<T>::consumeImpl(std::move(lck), 
std::forward<Functor>(fun));
-  }
-
-  template<typename Functor>
-  bool consumeWaitUntil(Functor&& fun, const 
std::chrono::system_clock::time_point& time) {
-    std::unique_lock<std::mutex> lck(this->mtx_);
-    cv_.wait_until(lck, time, [this, &lck]{ return !running_ || 
!this->emptyImpl(lck); });  // Wake up with timeout or in case there is 
something to do
-    return running_ && ConcurrentQueue<T>::consumeImpl(std::move(lck), 
std::forward<Functor>(fun));
-  }
-
-
-  bool tryDequeue(T& out) {
-    std::unique_lock<std::mutex> lck(this->mtx_);
-    return running_ && ConcurrentQueue<T>::tryDequeueImpl(lck, out);
-  }
-
-  void stop() {
-    // this lock ensures that other threads did not yet
-    // check the running_ condition (as they all acquire
-    // the lock before the check) or already unlocked and
-    // are waiting, thus receiving the notify_all
-    // TODO(adebreceni): investigate a waiting_ counter
-    //   approach that would render the locking here unnecessary
-    std::lock_guard<std::mutex> guard(this->mtx_);
-    running_ = false;
-    cv_.notify_all();
-  }
-
-  void start() {
-    running_ = true;
-  }
-
-  bool isRunning() const {
-    return running_;  // In case it's not running no notifications are 
generated, dequeueing fails instead of blocking to avoid hanging threads
-  }
-
-  using ConcurrentQueue<T>::remove;
-
- private:
-  std::atomic<bool> running_;
-  std::condition_variable cv_;
-};
-
-}  // namespace utils
-}  // namespace minifi
-}  // namespace nifi
-}  // namespace apache
-}  // namespace org
-
-#endif  // LIBMINIFI_INCLUDE_UTILS_MINIFICONCURRENTQUEUE_H_
diff --git a/libminifi/include/utils/StagingQueue.h 
b/libminifi/include/utils/StagingQueue.h
index 6751c5301..f0dc3b8b3 100644
--- a/libminifi/include/utils/StagingQueue.h
+++ b/libminifi/include/utils/StagingQueue.h
@@ -20,7 +20,7 @@
 #include <mutex>
 #include <atomic>
 #include <utility>
-#include "MinifiConcurrentQueue.h"
+#include "utils/MinifiConcurrentQueue.h"
 
 namespace org {
 namespace apache {
diff --git a/utils/include/io/StreamCallback.h 
b/utils/include/io/StreamCallback.h
deleted file mode 100644
index 6d4e6c00a..000000000
--- a/utils/include/io/StreamCallback.h
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#pragma once
-
-#include <functional>
-#include <memory>
-
-namespace org::apache::nifi::minifi::io {
-
-class InputStream;
-class OutputStream;
-
-// FlowFile IO Callback functions for input and output
-// throw exception for error
-using InputStreamCallback = std::function<int64_t(const 
std::shared_ptr<InputStream>& input_stream)>;
-using OutputStreamCallback = std::function<int64_t(const 
std::shared_ptr<OutputStream>& output_stream)>;
-using InputOutputStreamCallback = std::function<int64_t(const 
std::shared_ptr<InputStream>& input_stream, const 
std::shared_ptr<OutputStream>& output_stream)>;
-
-}  // namespace org::apache::nifi::minifi::io
diff --git a/utils/include/io/StreamPipe.h b/utils/include/io/StreamPipe.h
index c24d04bfa..877630bcb 100644
--- a/utils/include/io/StreamPipe.h
+++ b/utils/include/io/StreamPipe.h
@@ -25,7 +25,7 @@
 #include <utility>
 #include "InputStream.h"
 #include "OutputStream.h"
-#include "StreamCallback.h"
+#include "minifi-cpp/io/StreamCallback.h"
 
 namespace org::apache::nifi::minifi {
 namespace internal {
diff --git a/utils/include/io/StreamSlice.h b/utils/include/io/StreamSlice.h
index ec179480e..f11a2aba4 100644
--- a/utils/include/io/StreamSlice.h
+++ b/utils/include/io/StreamSlice.h
@@ -20,7 +20,7 @@
 #include <algorithm>
 #include <memory>
 
-#include "StreamCallback.h"
+#include "minifi-cpp/io/StreamCallback.h"
 #include "InputStream.h"
 
 namespace org::apache::nifi::minifi::io {

Reply via email to