[ 
https://issues.apache.org/jira/browse/MINIFICPP-301?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16251701#comment-16251701
 ] 

ASF GitHub Bot commented on MINIFICPP-301:
------------------------------------------

Github user achristianson commented on a diff in the pull request:

    https://github.com/apache/nifi-minifi-cpp/pull/183#discussion_r150892195
  
    --- Diff: extensions/tensorflow/TFApplyGraph.cpp ---
    @@ -0,0 +1,227 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +#include "TFApplyGraph.h"
    +
    +#include "tensorflow/cc/ops/standard_ops.h"
    +
    +namespace org {
    +namespace apache {
    +namespace nifi {
    +namespace minifi {
    +namespace processors {
    +
    +core::Property TFApplyGraph::InputNode(  // NOLINT
    +    "Input Node",
    +    "The node of the TensorFlow graph to feed tensor inputs to", "");
    +core::Property TFApplyGraph::OutputNode(  // NOLINT
    +    "Output Node",
    +    "The node of the TensorFlow graph to read tensor outputs from", "");
    +
    +core::Relationship TFApplyGraph::Success(  // NOLINT
    +    "success",
    +    "Successful graph application outputs");
    +core::Relationship TFApplyGraph::Retry(  // NOLINT
    +    "retry",
    +    "Inputs which fail graph application but may work if sent again");
    +core::Relationship TFApplyGraph::Failure(  // NOLINT
    +    "failure",
    +    "Failures which will not work if retried");
    +
    +void TFApplyGraph::initialize() {
    +  std::set<core::Property> properties;
    +  properties.insert(InputNode);
    +  properties.insert(OutputNode);
    +  setSupportedProperties(std::move(properties));
    +
    +  std::set<core::Relationship> relationships;
    +  relationships.insert(Success);
    +  relationships.insert(Retry);
    +  relationships.insert(Failure);
    +  setSupportedRelationships(std::move(relationships));
    +}
    +
    +void TFApplyGraph::onSchedule(core::ProcessContext *context, 
core::ProcessSessionFactory *sessionFactory) {
    +  context->getProperty(InputNode.getName(), input_node_);
    +
    +  if (input_node_.empty()) {
    +    logger_->log_error("Invalid input node");
    +  }
    +
    +  context->getProperty(OutputNode.getName(), output_node_);
    +
    +  if (output_node_.empty()) {
    +    logger_->log_error("Invalid output node");
    +  }
    +}
    +
    +void TFApplyGraph::onTrigger(const std::shared_ptr<core::ProcessContext> 
&context,
    +                             const std::shared_ptr<core::ProcessSession> 
&session) {
    +  auto flow_file = session->get();
    +
    +  if (!flow_file) {
    +    return;
    +  }
    +
    +  try {
    +    // Read graph
    +    std::string tf_type;
    +    flow_file->getAttribute("tf.type", tf_type);
    +
    +    std::shared_ptr<tensorflow::GraphDef> graph_def;
    +    uint32_t graph_version;
    +
    +    {
    +      std::lock_guard<std::mutex> guard(graph_def_mtx_);
    +
    +      if ("graph" == tf_type) {
    --- End diff --
    
    Later down, when it pulls the context off the queue, it lets it expire if 
the graph is outdated:
    
    ```c++
          if (ctx->graph_version != graph_version) {
             logger_->log_info("Allowing session with stale graph to expire");
             ctx = nullptr;
           }
    ```
    
    All resources should be freed. Does that answer your question?


> Create processor to apply arbitrary Tensor Flow graphs to tensors
> -----------------------------------------------------------------
>
>                 Key: MINIFICPP-301
>                 URL: https://issues.apache.org/jira/browse/MINIFICPP-301
>             Project: NiFi MiNiFi C++
>          Issue Type: Improvement
>            Reporter: Andrew Christianson
>            Assignee: Andrew Christianson
>
> In many cases, it may be desirable to interpret/preprocess raw signal inputs 
> on the edge, where MiNiFI runs, before sending semantic interpretations 
> upstream.
> Tensor Flow is a data flow system for processing tensors, and many graphs 
> exist or could be created which are well-suited to interpret signal inputs. 
> It would therefore be useful to have a processor in MiNiFi - C++ which takes 
> tensors (serialized as binary protocol buffers), feeds them into an input 
> node on a supplied graph, reads tensors from an output node, and finally 
> writes those output tensors as flow files containing binary protocol buffers.
> While there are many additional convenience utilities which would be helpful, 
> such as converting various standard sensor types into tensors, the initial 
> scope of this feature is a processor which processes arbitrary tensors 
> through arbitrary graphs.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to