[ 
https://issues.apache.org/jira/browse/MINIFICPP-342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16312318#comment-16312318
 ] 

ASF GitHub Bot commented on MINIFICPP-342:
------------------------------------------

Github user phrocker commented on a diff in the pull request:

    https://github.com/apache/nifi-minifi-cpp/pull/228#discussion_r159792117
  
    --- Diff: extensions/mqtt/AbstractMQTTProcessor.cpp ---
    @@ -0,0 +1,158 @@
    +/**
    + * @file AbstractMQTTProcessor.cpp
    + * AbstractMQTTProcessor class implementation
    + *
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +#include "AbstractMQTTProcessor.h"
    +#include <stdio.h>
    +#include <memory>
    +#include <string>
    +#include "utils/TimeUtil.h"
    +#include "utils/StringUtils.h"
    +#include "core/ProcessContext.h"
    +#include "core/ProcessSession.h"
    +
    +namespace org {
    +namespace apache {
    +namespace nifi {
    +namespace minifi {
    +namespace processors {
    +
    +core::Property AbstractMQTTProcessor::BrokerURL("Broker URI", "The URI to 
use to connect to the MQTT broker", "");
    +core::Property AbstractMQTTProcessor::CleanSession("Session state", 
"Whether to start afresh or resume previous flows. See the allowable value 
descriptions for more details", "true");
    +core::Property AbstractMQTTProcessor::ClientID("Client ID", "MQTT client 
ID to use", "");
    +core::Property AbstractMQTTProcessor::UserName("Username", "Username to 
use when connecting to the broker", "");
    +core::Property AbstractMQTTProcessor::PassWord("Password", "Password to 
use when connecting to the broker", "");
    +core::Property AbstractMQTTProcessor::KeepLiveInterval("Keep Alive 
Interval", "Defines the maximum time interval between messages sent or 
received", "60 sec");
    +core::Property AbstractMQTTProcessor::ConnectionTimeOut("Connection 
Timeout", "Maximum time interval the client will wait for the network 
connection to the MQTT server", "30 sec");
    +core::Property AbstractMQTTProcessor::QOS("Quality of Service", "The 
Quality of Service(QoS) to send the message with. Accepts three values '0', '1' 
and '2'", "MQTT_QOS_0");
    +core::Property AbstractMQTTProcessor::Topic("Topic", "The topic to publish 
the message to", "");
    +core::Relationship AbstractMQTTProcessor::Success("success", "FlowFiles 
that are sent successfully to the destination are transferred to this 
relationship");
    +core::Relationship AbstractMQTTProcessor::Failure("failure", "FlowFiles 
that failed to send to the destination are transferred to this relationship");
    +
    +void AbstractMQTTProcessor::initialize() {
    +  // Set the supported properties
    +  std::set<core::Property> properties;
    +  properties.insert(BrokerURL);
    +  properties.insert(CleanSession);
    +  properties.insert(ClientID);
    +  properties.insert(UserName);
    +  properties.insert(PassWord);
    +  properties.insert(KeepLiveInterval);
    +  properties.insert(ConnectionTimeOut);
    +  properties.insert(QOS);
    +  properties.insert(Topic);
    +  setSupportedProperties(properties);
    +  // Set the supported relationships
    +  std::set<core::Relationship> relationships;
    +  relationships.insert(Success);
    +  relationships.insert(Failure);
    +  setSupportedRelationships(relationships);
    +}
    +
    +void AbstractMQTTProcessor::onSchedule(core::ProcessContext *context, 
core::ProcessSessionFactory *sessionFactory) {
    +  std::string value;
    +  int64_t valInt;
    +  value = "";
    +  if (context->getProperty(BrokerURL.getName(), value) && !value.empty()) {
    +    uri_ = value;
    +    logger_->log_info("AbstractMQTTProcessor: BrokerURL [%s]", uri_);
    +  }
    +  value = "";
    +  if (context->getProperty(ClientID.getName(), value) && !value.empty()) {
    +    clientID_ = value;
    +    logger_->log_info("AbstractMQTTProcessor: ClientID [%s]", clientID_);
    +  }
    +  value = "";
    +  if (context->getProperty(Topic.getName(), value) && !value.empty()) {
    +    topic_ = value;
    +    logger_->log_info("AbstractMQTTProcessor: Topic [%s]", topic_);
    +  }
    +  value = "";
    +  if (context->getProperty(UserName.getName(), value) && !value.empty()) {
    +    userName_ = value;
    +    logger_->log_info("AbstractMQTTProcessor: UserName [%s]", userName_);
    +  }
    +  value = "";
    +  if (context->getProperty(PassWord.getName(), value) && !value.empty()) {
    +    passWord_ = value;
    +    logger_->log_info("AbstractMQTTProcessor: PassWord [%s]", passWord_);
    +  }
    +  value = "";
    +  if (context->getProperty(CleanSession.getName(), value) && 
!value.empty() &&
    +      org::apache::nifi::minifi::utils::StringUtils::StringToBool(value, 
cleanSession_)) {
    +    logger_->log_info("AbstractMQTTProcessor: CleanSession [%d]", 
cleanSession_);
    +  }
    +  value = "";
    +  if (context->getProperty(KeepLiveInterval.getName(), value) && 
!value.empty()) {
    +    core::TimeUnit unit;
    +    if (core::Property::StringToTime(value, valInt, unit) && 
core::Property::ConvertTimeUnitToMS(valInt, unit, valInt)) {
    +      keepAliveInterval_ = valInt/1000;
    +      logger_->log_info("AbstractMQTTProcessor: KeepLiveInterval [%d]", 
keepAliveInterval_);
    +    }
    +  }
    +  value = "";
    +  if (context->getProperty(ConnectionTimeOut.getName(), value) && 
!value.empty()) {
    +    core::TimeUnit unit;
    +    if (core::Property::StringToTime(value, valInt, unit) && 
core::Property::ConvertTimeUnitToMS(valInt, unit, valInt)) {
    +      connectionTimeOut_ = valInt/1000;
    +      logger_->log_info("AbstractMQTTProcessor: ConnectionTimeOut [%d]", 
connectionTimeOut_);
    --- End diff --
    
    In the cases where you are using int64, please use %ll instead of %d 


> MQTT framework
> --------------
>
>                 Key: MINIFICPP-342
>                 URL: https://issues.apache.org/jira/browse/MINIFICPP-342
>             Project: NiFi MiNiFi C++
>          Issue Type: New Feature
>    Affects Versions: 0.3.0
>            Reporter: bqiu
>            Assignee: bqiu
>             Fix For: 0.3.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to