fgerlits commented on a change in pull request #1252:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1252#discussion_r814630485
##########
File path: docker/test/integration/minifi/core/TransientMinifiContainer.py
##########
@@ -0,0 +1,8 @@
+from .MinifiContainer import MinifiContainer
Review comment:
I know you were just following existing practice by not including one,
but I think these python files should start with a license header, too. I
would add it to new files (and maybe also to old files in a separate pull
request).
##########
File path: extensions/libarchive/BinFiles.cpp
##########
@@ -346,10 +346,10 @@ void BinFiles::restore(const
std::shared_ptr<core::FlowFile>& flowFile) {
file_store_.put(flowFile);
}
-std::set<std::shared_ptr<core::Connectable>>
BinFiles::getOutGoingConnections(const std::string &relationship) const {
+std::set<core::Connectable*> BinFiles::getOutGoingConnections(const
std::string &relationship) const {
auto result = core::Connectable::getOutGoingConnections(relationship);
if (relationship == Self.getName()) {
-
result.insert(std::static_pointer_cast<core::Connectable>(std::const_pointer_cast<core::Processor>(shared_from_this())));
+ result.insert(const_cast<BinFiles*>(this));
Review comment:
Ugh, `const_cast`. This was an existing problem, your change has just
made it visible, but it would be good to fix it, e.g. by changing the return
type to `std::set<const core::Connectable*>` or by making
`getOutGoingConnections()` non-const.
Not sure how many additional changes either of those options would snowball
into...
Same comment in `DefragmentText.cpp`.
##########
File path: docker/test/integration/minifi/core/TransientMinifiContainer.py
##########
@@ -0,0 +1,8 @@
+from .MinifiContainer import MinifiContainer
+
+
+class TransientMinifiContainer(MinifiContainer):
+ def __init__(self, config_dir, name, vols, network, image_store,
command=None):
+ if not command:
+ command = ["/bin/sh", "-c", "cp /tmp/minifi_config/config.yml " +
MinifiContainer.MINIFI_ROOT + "/conf &&
/opt/minifi/minifi-current/bin/minifi.sh start && sleep 10 &&
/opt/minifi/minifi-current/bin/minifi.sh stop && sleep 100"]
Review comment:
Since the working directory is already set, this could be written
simpler as
```suggestion
command = ["/bin/sh", "-c", "cp /tmp/minifi_config/config.yml
./conf/ && ./bin/minifi.sh start && sleep 10 && ./bin/minifi.sh stop && sleep
100"]
```
##########
File path: libminifi/include/core/ProcessGroup.h
##########
@@ -176,29 +170,27 @@ class ProcessGroup : public CoreComponent {
parent_process_group_ = parent;
}
// get parent process group
- ProcessGroup *getParent(void) {
+ ProcessGroup *getParent() {
std::lock_guard<std::recursive_mutex> lock(mutex_);
return parent_process_group_;
}
// Add processor
- void addProcessor(const std::shared_ptr<Processor>& processor);
- // Remove processor
- void removeProcessor(const std::shared_ptr<Processor>& processor);
+ void addProcessor(std::unique_ptr<Processor> processor);
// Add child processor group
void addProcessGroup(std::unique_ptr<ProcessGroup> child);
// ! Add connections
- void addConnection(const std::shared_ptr<Connection>& connection);
+ void addConnection(std::unique_ptr<Connection>&& connection);
Review comment:
this would be better as by-value, too
##########
File path: libminifi/src/core/yaml/YamlConfiguration.cpp
##########
@@ -697,10 +689,21 @@ PropertyValue
YamlConfiguration::getValidatedProcessorPropertyForDefaultTypeInfo
if (defaultType == typeid(int64_t)) {
coercedValue = propertyValueNode.as<int64_t>();
} else if (defaultType == typeid(uint64_t)) {
- try {
- coercedValue = propertyValueNode.as<uint64_t>();
- } catch (...) {
- coercedValue = propertyValueNode.as<std::string>();
+ const auto uValue = propertyValueNode.as<uint64_t>(0);
+
+ // parsing uint64_t may have failed
+ if (uValue == 0) {
+ const auto sValue = propertyValueNode.as<std::string>();
+
+ // parsing uint64_t did not fail, the node was a 0
+ if (sValue == "0") {
+ coercedValue = uValue;
+ } else {
+ // parsing uint64_t really failed
+ coercedValue = sValue;
+ }
+ } else {
+ coercedValue = uValue;
Review comment:
I agree that using exceptions for flow control is not nice, but the new
code is so much more complicated than the old code that I would prefer to keep
the old version.
##########
File path: libminifi/include/core/state/nodes/QueueMetrics.h
##########
@@ -57,16 +57,16 @@ class QueueMetrics : public ResponseNode {
return "QueueMetrics";
}
- void addConnection(const std::shared_ptr<minifi::Connection> &connection) {
+ void addConnection(std::unique_ptr<minifi::Connection> connection) {
if (nullptr != connection) {
- connections.insert(std::make_pair(connection->getName(), connection));
+ connections.insert(std::make_pair(connection->getName(),
std::move(connection)));
}
}
std::vector<SerializedResponseNode> serialize() {
std::vector<SerializedResponseNode> serialized;
- for (auto conn : connections) {
- auto connection = conn.second;
+ for (const auto& conn : connections) {
+ auto& connection = conn.second;
Review comment:
minor, but this would look nicer as
```suggestion
for (const auto& [_, connection] : connections) {
```
##########
File path: libminifi/include/core/repository/VolatileRepository.h
##########
@@ -203,6 +195,8 @@ VolatileRepository<T>::~VolatileRepository() {
for (auto ent : value_vector_) {
delete ent;
}
+
+ stop();
Review comment:
Here, too, `stop()` is called twice now. It is enough to call it from
the `Repository` destructor.
##########
File path: extensions/rocksdb-repos/FlowFileRepository.h
##########
@@ -75,24 +75,28 @@ class FlowFileRepository : public core::Repository, public
std::enable_shared_fr
int64_t maxPartitionBytes =
MAX_FLOWFILE_REPOSITORY_STORAGE_SIZE,
std::chrono::milliseconds purgePeriod =
FLOWFILE_REPOSITORY_PURGE_PERIOD)
: core::SerializableComponent(repo_name),
- Repository(repo_name.length() > 0 ? repo_name :
core::getClassName<FlowFileRepository>(), directory, maxPartitionMillis,
maxPartitionBytes, purgePeriod),
- checkpoint_dir_(checkpoint_dir),
+ Repository(repo_name.length() > 0 ? repo_name :
core::getClassName<FlowFileRepository>(), std::move(directory),
maxPartitionMillis, maxPartitionBytes, purgePeriod),
+ checkpoint_dir_(std::move(checkpoint_dir)),
content_repo_(nullptr),
checkpoint_(nullptr),
logger_(logging::LoggerFactory<FlowFileRepository>::getLogger()) {
- db_ = NULL;
+ db_ = nullptr;
}
- virtual bool isNoop() {
+ ~FlowFileRepository() override {
+ stop();
+ }
Review comment:
What is the point of this? The `Repository` destructor will do the same
thing, so we call `stop()` twice. I understand that is harmless, but it does
not seem useful.
Same comment at `~ProvenanceRepository`, `~TestRepository` and
`~TestFlowRepository`: all these destructors could be removed.
##########
File path: extensions/script/python/PythonProcessor.cpp
##########
@@ -32,33 +32,30 @@ namespace python {
namespace py = pybind11;
namespace core = org::apache::nifi::minifi::core;
-PythonProcessor::PythonProcessor(std::shared_ptr<core::Processor> proc) {
- processor_ =
std::dynamic_pointer_cast<python::processors::ExecutePythonProcessor>(proc);
+PythonProcessor::PythonProcessor(core::Processor* proc) {
+ processor_ = dynamic_cast<python::processors::ExecutePythonProcessor*>(proc);
}
void PythonProcessor::setSupportsDynamicProperties() {
if (!processor_) {
throw std::runtime_error("Access of Processor after it has been released");
}
Review comment:
This check was pointless before, since we held on to the Processor, but
it's also pointless now, as nothing will set `processor_` to null. It can be
removed; also in `setDescription()`.
##########
File path: libminifi/include/agent/build_description.h
##########
@@ -136,15 +138,19 @@ class BuildDescription {
}
auto obj =
core::ClassLoader::getDefaultClassLoader().instantiate(class_name, class_name);
- std::shared_ptr<core::ConfigurableComponent> component =
std::dynamic_pointer_cast<core::ConfigurableComponent>(obj);
+ std::unique_ptr<core::ConfigurableComponent> component{
+ dynamic_cast<core::ConfigurableComponent*>(obj.get()) ?
+ dynamic_cast<core::ConfigurableComponent*>(obj.release()) :
+ nullptr
+ };
Review comment:
can we use `dynamic_unique_cast` here?
##########
File path: extensions/script/python/PythonProcessor.h
##########
@@ -36,25 +36,16 @@ namespace py = pybind11;
*/
class PythonProcessor {
public:
- explicit PythonProcessor(std::shared_ptr<core::Processor> proc);
+ explicit PythonProcessor(core::Processor* proc);
void setSupportsDynamicProperties();
void setDecription(const std::string &desc);
void addProperty(const std::string &name, const std::string &description,
const std::string &defaultvalue, bool required, bool el);
- /**
- * Sometimes we want to release shared pointers to core resources when
- * we know they are no longer in need. This method is for those times.
- *
- * For example, we do not want to hold on to shared pointers to FlowFiles
- * after an onTrigger call, because doing so can be very expensive in terms
- * of repository resources.
- */
- void releaseCoreResources();
private:
- std::shared_ptr<core::Processor> processor_;
+ core::Processor* processor_;
Review comment:
you could change the type to `ExecutePythonProcessor*`, then three
`dynamic_cast`s could be deleted
##########
File path: extensions/script/python/PythonProcessor.cpp
##########
@@ -32,33 +32,30 @@ namespace python {
namespace py = pybind11;
namespace core = org::apache::nifi::minifi::core;
-PythonProcessor::PythonProcessor(std::shared_ptr<core::Processor> proc) {
- processor_ =
std::dynamic_pointer_cast<python::processors::ExecutePythonProcessor>(proc);
+PythonProcessor::PythonProcessor(core::Processor* proc) {
+ processor_ = dynamic_cast<python::processors::ExecutePythonProcessor*>(proc);
Review comment:
a post-condition check could be added here, eg. `gsl_Expects(processor_)`
##########
File path: extensions/http-curl/tests/C2ClearCoreComponentStateTest.cpp
##########
@@ -50,10 +50,10 @@ class VerifyC2ClearCoreComponentState : public VerifyC2Base
{
}
protected:
- void updateProperties(std::shared_ptr<minifi::FlowController>
flow_controller) override {
-
std::dynamic_pointer_cast<minifi::state::ProcessorController>(flow_controller->getComponents("TailFile1")[0])
+ void updateProperties(std::unique_ptr<minifi::FlowController>&
flow_controller) override {
Review comment:
`std::unique_ptr<minifi::FlowController>&` is a strange parameter type.
I think `FlowController&`, with a dereference at the calling site, would be
better.
`const FlowController&` would be even better, if all the member functions
that the various `updateProperties` implementations call on `flow_controller`
can be made `const`.
##########
File path: extensions/test-processors/LogOnDestructionProcessor.h
##########
@@ -0,0 +1,43 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <memory>
+#include <string>
+
+#include "core/logging/LoggerConfiguration.h"
+#include "core/Processor.h"
+
+#pragma once
+
+namespace org::apache::nifi::minifi::processors {
+
+class LogOnDestructionProcessor : public core::Processor {
+ public:
+ explicit LogOnDestructionProcessor(const std::string& name, const
utils::Identifier& uuid = utils::Identifier())
+ : Processor(name, uuid) {
+ }
+
+ ~LogOnDestructionProcessor() override {
+ logger_->log_error("LogOnDestructionProcessor is being destructed");
Review comment:
very minor, but this is not an error; I would change it to `log_info`
##########
File path: libminifi/src/core/ProcessGroup.cpp
##########
@@ -91,33 +91,17 @@ ProcessGroup::~ProcessGroup() {
}
}
-bool ProcessGroup::isRootProcessGroup() {
- return (type_ == ROOT_PROCESS_GROUP);
-}
-
bool ProcessGroup::isRemoteProcessGroup() {
return (type_ == REMOTE_PROCESS_GROUP);
}
-void ProcessGroup::addProcessor(const std::shared_ptr<Processor>& processor) {
+void ProcessGroup::addProcessor(std::unique_ptr<Processor> processor) {
+ gsl_Expects(processor);
+ const auto name = processor->getName();
std::lock_guard<std::recursive_mutex> lock(mutex_);
-
- if (processors_.find(processor) == processors_.end()) {
- // We do not have the same processor in this process group yet
- processors_.insert(processor);
- logger_->log_debug("Add processor %s into process group %s",
processor->getName(), name_);
- }
-}
-
-void ProcessGroup::removeProcessor(const std::shared_ptr<Processor>&
processor) {
- std::lock_guard<std::recursive_mutex> lock(mutex_);
-
- if (processors_.find(processor) != processors_.end()) {
- // We do have the same processor in this process group yet
- processors_.erase(processor);
- logger_->log_debug("Remove processor %s from process group %s",
processor->getName(), name_);
- }
+ processors_.insert(std::move(processor));
+ logger_->log_debug("Add processor %s into process group %s", name, name_);
Review comment:
Can this function be called more than once with the same processor?
Before this PR, we only printed the log at the first (attempt of) insertion,
now we log it every time. Maybe do something like this?
```suggestion
const auto [iter, inserted] = processors_.insert(std::move(processor));
if (inserted) {
logger_->log_debug("Add processor %s into process group %s", name,
name_);
} else {
logger_->log_debug("Not adding processor %s into process group %s, as it
is already there", name, name_);
}
```
(or without the `else` branch if you want to preserve the exact previous
behavior)
##########
File path: extensions/standard-processors/processors/GetTCP.h
##########
@@ -269,10 +268,10 @@ class GetTCP : public core::Processor, public
state::response::MetricsNodeSource
std::shared_ptr<minifi::controllers::SSLContextService> ssl_service_;
- // last listing time for root directory ( if recursive, we will consider the
root
- // as the top level time.
-
std::shared_ptr<core::logging::Logger> logger_ =
core::logging::LoggerFactory<GetTCP>::getLogger();
+
+ // thread pool must be the last, as it is first to be destructed before
other members, otherwise segfault is possible
Review comment:
Where would the segfault happen? It would be useful to add that info to
the comment, as it isn't obvious to me.
##########
File path: extensions/test-processors/LogOnDestructionProcessor.h
##########
@@ -0,0 +1,43 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <memory>
+#include <string>
+
+#include "core/logging/LoggerConfiguration.h"
+#include "core/Processor.h"
+
+#pragma once
Review comment:
this should be before the `#include`s
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]