Repository: nifi-minifi-cpp Updated Branches: refs/heads/master ee4bb1353 -> ee39673c4
receive flow files Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/14ad33eb Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/14ad33eb Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/14ad33eb Branch: refs/heads/master Commit: 14ad33eb196a5a596bc77cfa389df2b3244881b6 Parents: 19b74bf Author: Kathik Narayanan <[email protected]> Authored: Mon Dec 19 14:18:20 2016 -0500 Committer: Kathik Narayanan <[email protected]> Committed: Mon Dec 19 14:18:20 2016 -0500 ---------------------------------------------------------------------- libminifi/src/FlowControlProtocol.cpp | 2 ++ libminifi/src/FlowController.cpp | 21 ++++++++++++++++----- libminifi/src/RemoteProcessorGroupPort.cpp | 1 + main/MiNiFiMain.cpp | 2 +- 4 files changed, 20 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/14ad33eb/libminifi/src/FlowControlProtocol.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/FlowControlProtocol.cpp b/libminifi/src/FlowControlProtocol.cpp index 011ebcf..3a041c7 100644 --- a/libminifi/src/FlowControlProtocol.cpp +++ b/libminifi/src/FlowControlProtocol.cpp @@ -252,6 +252,7 @@ void FlowControlProtocol::run(FlowControlProtocol *protocol) int FlowControlProtocol::sendRegisterReq() { + _logger->log_info("registering"); if (_registered) { _logger->log_info("Already registered"); @@ -261,6 +262,7 @@ int FlowControlProtocol::sendRegisterReq() uint16_t port = this->_serverPort; if (this->_socket <= 0) + _logger->log_info("connecting to nifi %s %s",_serverName.c_str(), port); this->_socket = connectServer(_serverName.c_str(), port); if (this->_socket <= 0) http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/14ad33eb/libminifi/src/FlowController.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp index 3598716..cac33d8 100644 --- a/libminifi/src/FlowController.cpp +++ b/libminifi/src/FlowController.cpp @@ -562,6 +562,7 @@ void FlowController::parseRemoteProcessGroupYaml(YAML::Node *rpgNode, ProcessGro _logger->log_debug("parseRemoteProcessGroupYaml: yield period => [%s]", yieldPeriod.c_str()); YAML::Node inputPorts = rpgNode["Input Ports"].as<YAML::Node>(); + YAML::Node outputPorts = rpgNode["Output Ports"].as<YAML::Node>(); ProcessGroup *group = NULL; // generate the random UUID @@ -603,6 +604,16 @@ void FlowController::parseRemoteProcessGroupYaml(YAML::Node *rpgNode, ProcessGro this->parsePortYaml(&currPort, group, SEND); } // for node } + if (outputPorts.IsSequence()) { + for (YAML::const_iterator portIter = outputPorts.begin(); portIter != outputPorts.end(); ++portIter) { + _logger->log_debug("Got a current port, iterating..."); + + YAML::Node currPort = portIter->as<YAML::Node>(); + + this->parsePortYaml(&currPort, group, RECEIVE); + } // for node + } + } } } @@ -799,13 +810,13 @@ void FlowController::parsePortYaml(YAML::Node *portNode, ProcessGroup *parent, T return; } - YAML::Node inputPortsObj = portNode->as<YAML::Node>(); + YAML::Node portsObj = portNode->as<YAML::Node>(); // generate the random UIID uuid_generate(uuid); - auto portId = inputPortsObj["id"].as<std::string>(); - auto nameStr = inputPortsObj["name"].as<std::string>(); + auto portId = portsObj["id"].as<std::string>(); + auto nameStr = portsObj["name"].as<std::string>(); uuid_parse(portId.c_str(), uuid); port = new RemoteProcessorGroupPort(nameStr.c_str(), uuid); @@ -826,7 +837,7 @@ void FlowController::parsePortYaml(YAML::Node *portNode, ProcessGroup *parent, T // add processor to parent parent->addProcessor(processor); processor->setScheduledState(RUNNING); - auto rawMaxConcurrentTasks = inputPortsObj["max concurrent tasks"].as<std::string>(); + auto rawMaxConcurrentTasks = portsObj["max concurrent tasks"].as<std::string>(); int64_t maxConcurrentTasks; if (Property::StringToInt(rawMaxConcurrentTasks, maxConcurrentTasks)) { processor->setMaxConcurrentTasks(maxConcurrentTasks); @@ -1164,7 +1175,7 @@ void FlowController::load(ConfigFormat configFormat) { parseProcessorNodeYaml(processorsNode, this->_root); parseRemoteProcessGroupYaml(&remoteProcessingGroupNode, this->_root); parseConnectionYaml(&connectionsNode, this->_root); - + _logger->log_error("finished loading the yaml"); _initialized = true; } } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/14ad33eb/libminifi/src/RemoteProcessorGroupPort.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/RemoteProcessorGroupPort.cpp b/libminifi/src/RemoteProcessorGroupPort.cpp index 9d849ae..c9a89f3 100644 --- a/libminifi/src/RemoteProcessorGroupPort.cpp +++ b/libminifi/src/RemoteProcessorGroupPort.cpp @@ -52,6 +52,7 @@ void RemoteProcessorGroupPort::initialize() void RemoteProcessorGroupPort::onTrigger(ProcessContext *context, ProcessSession *session) { + _logger->log_error("Remote Process Group Triggered"); std::string value; if (!_transmitting) http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/14ad33eb/main/MiNiFiMain.cpp ---------------------------------------------------------------------- diff --git a/main/MiNiFiMain.cpp b/main/MiNiFiMain.cpp index 11e8f00..ace58f3 100644 --- a/main/MiNiFiMain.cpp +++ b/main/MiNiFiMain.cpp @@ -66,7 +66,7 @@ void sigHandler(int signal) int main(int argc, char **argv) { Logger *logger = Logger::getLogger(); - logger->setLogLevel(info); + logger->setLogLevel(trace); // assumes POSIX compliant environment std::string minifiHome;
