[
https://issues.apache.org/jira/browse/MINIFICPP-301?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16251701#comment-16251701
]
ASF GitHub Bot commented on MINIFICPP-301:
------------------------------------------
Github user achristianson commented on a diff in the pull request:
https://github.com/apache/nifi-minifi-cpp/pull/183#discussion_r150892195
--- Diff: extensions/tensorflow/TFApplyGraph.cpp ---
@@ -0,0 +1,227 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "TFApplyGraph.h"
+
+#include "tensorflow/cc/ops/standard_ops.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+core::Property TFApplyGraph::InputNode( // NOLINT
+ "Input Node",
+ "The node of the TensorFlow graph to feed tensor inputs to", "");
+core::Property TFApplyGraph::OutputNode( // NOLINT
+ "Output Node",
+ "The node of the TensorFlow graph to read tensor outputs from", "");
+
+core::Relationship TFApplyGraph::Success( // NOLINT
+ "success",
+ "Successful graph application outputs");
+core::Relationship TFApplyGraph::Retry( // NOLINT
+ "retry",
+ "Inputs which fail graph application but may work if sent again");
+core::Relationship TFApplyGraph::Failure( // NOLINT
+ "failure",
+ "Failures which will not work if retried");
+
+void TFApplyGraph::initialize() {
+ std::set<core::Property> properties;
+ properties.insert(InputNode);
+ properties.insert(OutputNode);
+ setSupportedProperties(std::move(properties));
+
+ std::set<core::Relationship> relationships;
+ relationships.insert(Success);
+ relationships.insert(Retry);
+ relationships.insert(Failure);
+ setSupportedRelationships(std::move(relationships));
+}
+
+void TFApplyGraph::onSchedule(core::ProcessContext *context,
core::ProcessSessionFactory *sessionFactory) {
+ context->getProperty(InputNode.getName(), input_node_);
+
+ if (input_node_.empty()) {
+ logger_->log_error("Invalid input node");
+ }
+
+ context->getProperty(OutputNode.getName(), output_node_);
+
+ if (output_node_.empty()) {
+ logger_->log_error("Invalid output node");
+ }
+}
+
+void TFApplyGraph::onTrigger(const std::shared_ptr<core::ProcessContext>
&context,
+ const std::shared_ptr<core::ProcessSession>
&session) {
+ auto flow_file = session->get();
+
+ if (!flow_file) {
+ return;
+ }
+
+ try {
+ // Read graph
+ std::string tf_type;
+ flow_file->getAttribute("tf.type", tf_type);
+
+ std::shared_ptr<tensorflow::GraphDef> graph_def;
+ uint32_t graph_version;
+
+ {
+ std::lock_guard<std::mutex> guard(graph_def_mtx_);
+
+ if ("graph" == tf_type) {
--- End diff --
Later down, when it pulls the context off the queue, it lets it expire if
the graph is outdated:
```c++
if (ctx->graph_version != graph_version) {
logger_->log_info("Allowing session with stale graph to expire");
ctx = nullptr;
}
```
All resources should be freed. Does that answer your question?
> Create processor to apply arbitrary Tensor Flow graphs to tensors
> -----------------------------------------------------------------
>
> Key: MINIFICPP-301
> URL: https://issues.apache.org/jira/browse/MINIFICPP-301
> Project: NiFi MiNiFi C++
> Issue Type: Improvement
> Reporter: Andrew Christianson
> Assignee: Andrew Christianson
>
> In many cases, it may be desirable to interpret/preprocess raw signal inputs
> on the edge, where MiNiFI runs, before sending semantic interpretations
> upstream.
> Tensor Flow is a data flow system for processing tensors, and many graphs
> exist or could be created which are well-suited to interpret signal inputs.
> It would therefore be useful to have a processor in MiNiFi - C++ which takes
> tensors (serialized as binary protocol buffers), feeds them into an input
> node on a supplied graph, reads tensors from an output node, and finally
> writes those output tensors as flow files containing binary protocol buffers.
> While there are many additional convenience utilities which would be helpful,
> such as converting various standard sensor types into tensors, the initial
> scope of this feature is a processor which processes arbitrary tensors
> through arbitrary graphs.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)