http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/src/processors/InvokeHTTP.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/processors/InvokeHTTP.cpp b/libminifi/src/processors/InvokeHTTP.cpp index 295560f..fd39a64 100644 --- a/libminifi/src/processors/InvokeHTTP.cpp +++ b/libminifi/src/processors/InvokeHTTP.cpp @@ -51,73 +51,37 @@ namespace processors { const char *InvokeHTTP::ProcessorName = "InvokeHTTP"; -core::Property InvokeHTTP::Method( - "HTTP Method", - "HTTP request method (GET, POST, PUT, PATCH, DELETE, HEAD, OPTIONS). " - "Arbitrary methods are also supported. Methods other than POST, PUT and PATCH will be sent without a message body.", - "GET"); -core::Property InvokeHTTP::URL( - "Remote URL", - "Remote URL which will be connected to, including scheme, host, port, path.", - ""); -core::Property InvokeHTTP::ConnectTimeout( - "Connection Timeout", "Max wait time for connection to remote service.", - "5 secs"); -core::Property InvokeHTTP::ReadTimeout( - "Read Timeout", "Max wait time for response from remote service.", - "15 secs"); -core::Property InvokeHTTP::DateHeader( - "Include Date Header", "Include an RFC-2616 Date header in the request.", - "True"); -core::Property InvokeHTTP::FollowRedirects( - "Follow Redirects", "Follow HTTP redirects issued by remote server.", - "True"); -core::Property InvokeHTTP::AttributesToSend( - "Attributes to Send", - "Regular expression that defines which attributes to send as HTTP" - " headers in the request. If not defined, no attributes are sent as headers.", - ""); -core::Property InvokeHTTP::SSLContext( - "SSL Context Service", - "The SSL Context Service used to provide client certificate information for TLS/SSL (https) connections.", - ""); -core::Property InvokeHTTP::ProxyHost( - "Proxy Host", - "The fully qualified hostname or IP address of the proxy server", ""); -core::Property InvokeHTTP::ProxyPort("Proxy Port", - "The port of the proxy server", ""); -core::Property InvokeHTTP::ProxyUser( - "invokehttp-proxy-user", - "Username to set when authenticating against proxy", ""); -core::Property InvokeHTTP::ProxyPassword( - "invokehttp-proxy-password", - "Password to set when authenticating against proxy", ""); -core::Property InvokeHTTP::ContentType( - "Content-type", - "The Content-Type to specify for when content is being transmitted through a PUT, " - "POST or PATCH. In the case of an empty value after evaluating an expression language expression, " - "Content-Type defaults to", - "application/octet-stream"); -core::Property InvokeHTTP::SendBody( - "send-message-body", - "If true, sends the HTTP message body on POST/PUT/PATCH requests (default). " - "If false, suppresses the message body and content-type header for these requests.", - "true"); - -core::Property InvokeHTTP::PropPutOutputAttributes( - "Put Response Body in Attribute", - "If set, the response body received back will be put into an attribute of the original " - "FlowFile instead of a separate FlowFile. The attribute key to put to is determined by evaluating value of this property. ", - ""); -core::Property InvokeHTTP::AlwaysOutputResponse( - "Always Output Response", - "Will force a response FlowFile to be generated and routed to the 'Response' relationship " - "regardless of what the server status code received is ", - "false"); -core::Property InvokeHTTP::PenalizeOnNoRetry( - "Penalize on \"No Retry\"", - "Enabling this property will penalize FlowFiles that are routed to the \"No Retry\" relationship.", - "false"); +core::Property InvokeHTTP::Method("HTTP Method", "HTTP request method (GET, POST, PUT, PATCH, DELETE, HEAD, OPTIONS). " + "Arbitrary methods are also supported. Methods other than POST, PUT and PATCH will be sent without a message body.", + "GET"); +core::Property InvokeHTTP::URL("Remote URL", "Remote URL which will be connected to, including scheme, host, port, path.", ""); +core::Property InvokeHTTP::ConnectTimeout("Connection Timeout", "Max wait time for connection to remote service.", "5 secs"); +core::Property InvokeHTTP::ReadTimeout("Read Timeout", "Max wait time for response from remote service.", "15 secs"); +core::Property InvokeHTTP::DateHeader("Include Date Header", "Include an RFC-2616 Date header in the request.", "True"); +core::Property InvokeHTTP::FollowRedirects("Follow Redirects", "Follow HTTP redirects issued by remote server.", "True"); +core::Property InvokeHTTP::AttributesToSend("Attributes to Send", "Regular expression that defines which attributes to send as HTTP" + " headers in the request. If not defined, no attributes are sent as headers.", + ""); +core::Property InvokeHTTP::SSLContext("SSL Context Service", "The SSL Context Service used to provide client certificate information for TLS/SSL (https) connections.", ""); +core::Property InvokeHTTP::ProxyHost("Proxy Host", "The fully qualified hostname or IP address of the proxy server", ""); +core::Property InvokeHTTP::ProxyPort("Proxy Port", "The port of the proxy server", ""); +core::Property InvokeHTTP::ProxyUser("invokehttp-proxy-user", "Username to set when authenticating against proxy", ""); +core::Property InvokeHTTP::ProxyPassword("invokehttp-proxy-password", "Password to set when authenticating against proxy", ""); +core::Property InvokeHTTP::ContentType("Content-type", "The Content-Type to specify for when content is being transmitted through a PUT, " + "POST or PATCH. In the case of an empty value after evaluating an expression language expression, " + "Content-Type defaults to", + "application/octet-stream"); +core::Property InvokeHTTP::SendBody("send-message-body", "If true, sends the HTTP message body on POST/PUT/PATCH requests (default). " + "If false, suppresses the message body and content-type header for these requests.", + "true"); + +core::Property InvokeHTTP::PropPutOutputAttributes("Put Response Body in Attribute", "If set, the response body received back will be put into an attribute of the original " + "FlowFile instead of a separate FlowFile. The attribute key to put to is determined by evaluating value of this property. ", + ""); +core::Property InvokeHTTP::AlwaysOutputResponse("Always Output Response", "Will force a response FlowFile to be generated and routed to the 'Response' relationship " + "regardless of what the server status code received is ", + "false"); +core::Property InvokeHTTP::PenalizeOnNoRetry("Penalize on \"No Retry\"", "Enabling this property will penalize FlowFiles that are routed to the \"No Retry\" relationship.", "false"); const char* InvokeHTTP::STATUS_CODE = "invokehttp.status.code"; const char* InvokeHTTP::STATUS_MESSAGE = "invokehttp.status.message"; @@ -128,31 +92,22 @@ const char* InvokeHTTP::REMOTE_DN = "invokehttp.remote.dn"; const char* InvokeHTTP::EXCEPTION_CLASS = "invokehttp.java.exception.class"; const char* InvokeHTTP::EXCEPTION_MESSAGE = "invokehttp.java.exception.message"; -core::Relationship InvokeHTTP::Success("success", - "All files are routed to success"); +core::Relationship InvokeHTTP::Success("success", "All files are routed to success"); -core::Relationship InvokeHTTP::RelResponse("response", - "Represents a response flowfile"); +core::Relationship InvokeHTTP::RelResponse("response", "Represents a response flowfile"); -core::Relationship InvokeHTTP::RelRetry( - "retry", - "The original FlowFile will be routed on any status code that can be retried " - "(5xx status codes). It will have new attributes detailing the request."); +core::Relationship InvokeHTTP::RelRetry("retry", "The original FlowFile will be routed on any status code that can be retried " + "(5xx status codes). It will have new attributes detailing the request."); -core::Relationship InvokeHTTP::RelNoRetry( - "no retry", - "The original FlowFile will be routed on any status code that should NOT " - "be retried (1xx, 3xx, 4xx status codes). It will have new attributes detailing the request."); +core::Relationship InvokeHTTP::RelNoRetry("no retry", "The original FlowFile will be routed on any status code that should NOT " + "be retried (1xx, 3xx, 4xx status codes). It will have new attributes detailing the request."); -core::Relationship InvokeHTTP::RelFailure( - "failure", - "The original FlowFile will be routed on any type of connection failure, " - "timeout or general exception. It will have new attributes detailing the request."); +core::Relationship InvokeHTTP::RelFailure("failure", "The original FlowFile will be routed on any type of connection failure, " + "timeout or general exception. It will have new attributes detailing the request."); void InvokeHTTP::set_request_method(CURL *curl, const std::string &method) { std::string my_method = method; - std::transform(my_method.begin(), my_method.end(), my_method.begin(), - ::toupper); + std::transform(my_method.begin(), my_method.end(), my_method.begin(), ::toupper); if (my_method == "POST") { curl_easy_setopt(curl, CURLOPT_POST, 1); } else if (my_method == "PUT") { @@ -190,19 +145,14 @@ void InvokeHTTP::initialize() { setSupportedRelationships(relationships); } -void InvokeHTTP::onSchedule(core::ProcessContext *context, - core::ProcessSessionFactory *sessionFactory) { +void InvokeHTTP::onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) { if (!context->getProperty(Method.getName(), method_)) { - logger_->log_info( - "%s attribute is missing, so default value of %s will be used", - Method.getName().c_str(), Method.getValue().c_str()); + logger_->log_info("%s attribute is missing, so default value of %s will be used", Method.getName().c_str(), Method.getValue().c_str()); return; } if (!context->getProperty(URL.getName(), url_)) { - logger_->log_info( - "%s attribute is missing, so default value of %s will be used", - URL.getName().c_str(), URL.getValue().c_str()); + logger_->log_info("%s attribute is missing, so default value of %s will be used", URL.getName().c_str(), URL.getValue().c_str()); return; } @@ -213,9 +163,7 @@ void InvokeHTTP::onSchedule(core::ProcessContext *context, // set the timeout in curl options. } else { - logger_->log_info( - "%s attribute is missing, so default value of %s will be used", - ConnectTimeout.getName().c_str(), ConnectTimeout.getValue().c_str()); + logger_->log_info("%s attribute is missing, so default value of %s will be used", ConnectTimeout.getName().c_str(), ConnectTimeout.getValue().c_str()); return; } @@ -224,67 +172,43 @@ void InvokeHTTP::onSchedule(core::ProcessContext *context, core::Property::StringToInt(timeoutStr, read_timeout_); } else { - logger_->log_info( - "%s attribute is missing, so default value of %s will be used", - ReadTimeout.getName().c_str(), ReadTimeout.getValue().c_str()); + logger_->log_info("%s attribute is missing, so default value of %s will be used", ReadTimeout.getName().c_str(), ReadTimeout.getValue().c_str()); } std::string dateHeaderStr; if (!context->getProperty(DateHeader.getName(), dateHeaderStr)) { - logger_->log_info( - "%s attribute is missing, so default value of %s will be used", - DateHeader.getName().c_str(), DateHeader.getValue().c_str()); + logger_->log_info("%s attribute is missing, so default value of %s will be used", DateHeader.getName().c_str(), DateHeader.getValue().c_str()); } - date_header_include_ = utils::StringUtils::StringToBool(dateHeaderStr, - date_header_include_); + date_header_include_ = utils::StringUtils::StringToBool(dateHeaderStr, date_header_include_); - if (!context->getProperty(PropPutOutputAttributes.getName(), - put_attribute_name_)) { - logger_->log_info( - "%s attribute is missing, so default value of %s will be used", - PropPutOutputAttributes.getName().c_str(), - PropPutOutputAttributes.getValue().c_str()); + if (!context->getProperty(PropPutOutputAttributes.getName(), put_attribute_name_)) { + logger_->log_info("%s attribute is missing, so default value of %s will be used", PropPutOutputAttributes.getName().c_str(), PropPutOutputAttributes.getValue().c_str()); } - if (!context->getProperty(AttributesToSend.getName(), - attribute_to_send_regex_)) { - logger_->log_info( - "%s attribute is missing, so default value of %s will be used", - AttributesToSend.getName().c_str(), - AttributesToSend.getValue().c_str()); + if (!context->getProperty(AttributesToSend.getName(), attribute_to_send_regex_)) { + logger_->log_info("%s attribute is missing, so default value of %s will be used", AttributesToSend.getName().c_str(), AttributesToSend.getValue().c_str()); } std::string always_output_response = "false"; - if (!context->getProperty(AlwaysOutputResponse.getName(), - always_output_response)) { - logger_->log_info( - "%s attribute is missing, so default value of %s will be used", - AttributesToSend.getName().c_str(), - AttributesToSend.getValue().c_str()); + if (!context->getProperty(AlwaysOutputResponse.getName(), always_output_response)) { + logger_->log_info("%s attribute is missing, so default value of %s will be used", AttributesToSend.getName().c_str(), AttributesToSend.getValue().c_str()); } - utils::StringUtils::StringToBool(always_output_response, - always_output_response_); + utils::StringUtils::StringToBool(always_output_response, always_output_response_); std::string penalize_no_retry = "false"; if (!context->getProperty(PenalizeOnNoRetry.getName(), penalize_no_retry)) { - logger_->log_info( - "%s attribute is missing, so default value of %s will be used", - AttributesToSend.getName().c_str(), - AttributesToSend.getValue().c_str()); + logger_->log_info("%s attribute is missing, so default value of %s will be used", AttributesToSend.getName().c_str(), AttributesToSend.getValue().c_str()); } utils::StringUtils::StringToBool(penalize_no_retry, penalize_no_retry_); std::string context_name; - if (context->getProperty(SSLContext.getName(), context_name) - && !IsNullOrEmpty(context_name)) { - std::shared_ptr<core::controller::ControllerService> service = context - ->getControllerService(context_name); + if (context->getProperty(SSLContext.getName(), context_name) && !IsNullOrEmpty(context_name)) { + std::shared_ptr<core::controller::ControllerService> service = context->getControllerService(context_name); if (nullptr != service) { - ssl_context_service_ = std::static_pointer_cast< - minifi::controllers::SSLContextService>(service); + ssl_context_service_ = std::static_pointer_cast<minifi::controllers::SSLContextService>(service); } } } @@ -293,8 +217,7 @@ InvokeHTTP::~InvokeHTTP() { curl_global_cleanup(); } -inline bool InvokeHTTP::matches(const std::string &value, - const std::string &sregex) { +inline bool InvokeHTTP::matches(const std::string &value, const std::string &sregex) { if (sregex == ".*") return true; @@ -322,9 +245,7 @@ bool InvokeHTTP::emitFlowFile(const std::string &method) { return ("POST" == method || "PUT" == method || "PATCH" == method); } -struct curl_slist *InvokeHTTP::build_header_list( - CURL *curl, std::string regex, - const std::map<std::string, std::string> &attributes) { +struct curl_slist *InvokeHTTP::build_header_list(CURL *curl, std::string regex, const std::map<std::string, std::string> &attributes) { struct curl_slist *list = NULL; if (curl) { for (auto attribute : attributes) { @@ -345,8 +266,7 @@ bool InvokeHTTP::isSecure(const std::string &url) { } CURLcode InvokeHTTP::configure_ssl_context(CURL *curl, void *ctx, void *param) { - minifi::controllers::SSLContextService *ssl_context_service = - static_cast<minifi::controllers::SSLContextService*>(param); + minifi::controllers::SSLContextService *ssl_context_service = static_cast<minifi::controllers::SSLContextService*>(param); if (!ssl_context_service->configure_ssl_context(static_cast<SSL_CTX*>(ctx))) { return CURLE_FAILED_INIT; } @@ -354,26 +274,20 @@ CURLcode InvokeHTTP::configure_ssl_context(CURL *curl, void *ctx, void *param) { } void InvokeHTTP::configure_secure_connection(CURL *http_session) { - logger_->log_debug("InvokeHTTP -- Using certificate file %s", - ssl_context_service_->getCertificateFile()); + logger_->log_debug("InvokeHTTP -- Using certificate file %s", ssl_context_service_->getCertificateFile()); curl_easy_setopt(http_session, CURLOPT_VERBOSE, 1L); - curl_easy_setopt(http_session, CURLOPT_SSL_CTX_FUNCTION, - &InvokeHTTP::configure_ssl_context); - curl_easy_setopt(http_session, CURLOPT_SSL_CTX_DATA, - static_cast<void*>(ssl_context_service_.get())); + curl_easy_setopt(http_session, CURLOPT_SSL_CTX_FUNCTION, &InvokeHTTP::configure_ssl_context); + curl_easy_setopt(http_session, CURLOPT_SSL_CTX_DATA, static_cast<void*>(ssl_context_service_.get())); } -void InvokeHTTP::onTrigger(core::ProcessContext *context, - core::ProcessSession *session) { - std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast< - FlowFileRecord>(session->get()); +void InvokeHTTP::onTrigger(core::ProcessContext *context, core::ProcessSession *session) { + std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<FlowFileRecord>(session->get()); logger_->log_info("onTrigger InvokeHTTP with %s", method_.c_str()); if (flowFile == nullptr) { if (!emitFlowFile(method_)) { - logger_->log_info("InvokeHTTP -- create flow file with %s", - method_.c_str()); + logger_->log_info("InvokeHTTP -- create flow file with %s", method_.c_str()); flowFile = std::static_pointer_cast<FlowFileRecord>(session->create()); } else { logger_->log_info("exiting because method is %s", method_.c_str()); @@ -402,11 +316,9 @@ void InvokeHTTP::onTrigger(core::ProcessContext *context, curl_easy_setopt(http_session, CURLOPT_TIMEOUT, read_timeout_); } HTTPRequestResponse content; - curl_easy_setopt(http_session, CURLOPT_WRITEFUNCTION, - &HTTPRequestResponse::recieve_write); + curl_easy_setopt(http_session, CURLOPT_WRITEFUNCTION, &HTTPRequestResponse::recieve_write); - curl_easy_setopt(http_session, CURLOPT_WRITEDATA, - static_cast<void*>(&content)); + curl_easy_setopt(http_session, CURLOPT_WRITEDATA, static_cast<void*>(&content)); if (emitFlowFile(method_)) { logger_->log_info("InvokeHTTP -- reading flowfile"); @@ -419,12 +331,9 @@ void InvokeHTTP::onTrigger(core::ProcessContext *context, callbackObj->pos = 0; logger_->log_info("InvokeHTTP -- Setting callback"); curl_easy_setopt(http_session, CURLOPT_UPLOAD, 1L); - curl_easy_setopt(http_session, CURLOPT_INFILESIZE_LARGE, - (curl_off_t)callback->getBufferSize()); - curl_easy_setopt(http_session, CURLOPT_READFUNCTION, - &HTTPRequestResponse::send_write); - curl_easy_setopt(http_session, CURLOPT_READDATA, - static_cast<void*>(callbackObj)); + curl_easy_setopt(http_session, CURLOPT_INFILESIZE_LARGE, (curl_off_t)callback->getBufferSize()); + curl_easy_setopt(http_session, CURLOPT_READFUNCTION, &HTTPRequestResponse::send_write); + curl_easy_setopt(http_session, CURLOPT_READDATA, static_cast<void*>(callbackObj)); } else { logger_->log_error("InvokeHTTP -- no resource claim"); } @@ -434,9 +343,7 @@ void InvokeHTTP::onTrigger(core::ProcessContext *context, } // append all headers - struct curl_slist *headers = build_header_list(http_session, - attribute_to_send_regex_, - flowFile->getAttributes()); + struct curl_slist *headers = build_header_list(http_session, attribute_to_send_regex_, flowFile->getAttributes()); curl_easy_setopt(http_session, CURLOPT_HTTPHEADER, headers); logger_->log_info("InvokeHTTP -- curl performed"); @@ -459,10 +366,8 @@ void InvokeHTTP::onTrigger(core::ProcessContext *context, flowFile->addAttribute(REQUEST_URL, url_); flowFile->addAttribute(TRANSACTION_ID, tx_id); - bool isSuccess = ((int32_t) (http_code / 100)) == 2 - && res != CURLE_ABORTED_BY_CALLBACK; - bool output_body_to_requestAttr = (!isSuccess || putToAttribute) - && flowFile != nullptr; + bool isSuccess = ((int32_t) (http_code / 100)) == 2 && res != CURLE_ABORTED_BY_CALLBACK; + bool output_body_to_requestAttr = (!isSuccess || putToAttribute) && flowFile != nullptr; bool output_body_to_content = isSuccess && !putToAttribute; bool body_empty = IsNullOrEmpty(content.data); @@ -471,11 +376,9 @@ void InvokeHTTP::onTrigger(core::ProcessContext *context, if (output_body_to_content) { if (flowFile != nullptr) { - response_flow = std::static_pointer_cast<FlowFileRecord>( - session->create(flowFile)); + response_flow = std::static_pointer_cast<FlowFileRecord>(session->create(flowFile)); } else { - response_flow = std::static_pointer_cast<FlowFileRecord>( - session->create()); + response_flow = std::static_pointer_cast<FlowFileRecord>(session->create()); } std::string ct = content_type; @@ -484,28 +387,22 @@ void InvokeHTTP::onTrigger(core::ProcessContext *context, response_flow->addAttribute(STATUS_MESSAGE, response_body); response_flow->addAttribute(REQUEST_URL, url_); response_flow->addAttribute(TRANSACTION_ID, tx_id); - io::DataStream stream((const uint8_t*) content.data.data(), - content.data.size()); + io::DataStream stream((const uint8_t*) content.data.data(), content.data.size()); // need an import from the data stream. session->importFrom(stream, response_flow); } else { logger_->log_info("Cannot output body to content"); - response_flow = std::static_pointer_cast<FlowFileRecord>( - session->create()); + response_flow = std::static_pointer_cast<FlowFileRecord>(session->create()); } route(flowFile, response_flow, session, context, isSuccess, http_code); } else { - logger_->log_error("InvokeHTTP -- curl_easy_perform() failed %s\n", - curl_easy_strerror(res)); + logger_->log_error("InvokeHTTP -- curl_easy_perform() failed %s\n", curl_easy_strerror(res)); } curl_slist_free_all(headers); curl_easy_cleanup(http_session); } -void InvokeHTTP::route(std::shared_ptr<FlowFileRecord> &request, - std::shared_ptr<FlowFileRecord> &response, - core::ProcessSession *session, - core::ProcessContext *context, bool isSuccess, +void InvokeHTTP::route(std::shared_ptr<FlowFileRecord> &request, std::shared_ptr<FlowFileRecord> &response, core::ProcessSession *session, core::ProcessContext *context, bool isSuccess, int statusCode) { // check if we should yield the processor if (!isSuccess && request == nullptr) {
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/src/processors/ListenHTTP.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/processors/ListenHTTP.cpp b/libminifi/src/processors/ListenHTTP.cpp index 9fce69a..c26b41d 100644 --- a/libminifi/src/processors/ListenHTTP.cpp +++ b/libminifi/src/processors/ListenHTTP.cpp @@ -42,38 +42,20 @@ namespace nifi { namespace minifi { namespace processors { -core::Property ListenHTTP::BasePath("Base Path", - "Base path for incoming connections", - "contentListener"); -core::Property ListenHTTP::Port( - "Listening Port", "The Port to listen on for incoming connections", ""); -core::Property ListenHTTP::AuthorizedDNPattern( - "Authorized DN Pattern", - "A Regular Expression to apply against the Distinguished Name of incoming" - " connections. If the Pattern does not match the DN, the connection will be refused.", - ".*"); -core::Property ListenHTTP::SSLCertificate( - "SSL Certificate", - "File containing PEM-formatted file including TLS/SSL certificate and key", - ""); -core::Property ListenHTTP::SSLCertificateAuthority( - "SSL Certificate Authority", - "File containing trusted PEM-formatted certificates", ""); -core::Property ListenHTTP::SSLVerifyPeer( - "SSL Verify Peer", - "Whether or not to verify the client's certificate (yes/no)", "no"); -core::Property ListenHTTP::SSLMinimumVersion( - "SSL Minimum Version", - "Minimum TLS/SSL version allowed (SSL2, SSL3, TLS1.0, TLS1.1, TLS1.2)", - "SSL2"); -core::Property ListenHTTP::HeadersAsAttributesRegex( - "HTTP Headers to receive as Attributes (Regex)", - "Specifies the Regular Expression that determines the names of HTTP Headers that" - " should be passed along as FlowFile attributes", - ""); - -core::Relationship ListenHTTP::Success("success", - "All files are routed to success"); +core::Property ListenHTTP::BasePath("Base Path", "Base path for incoming connections", "contentListener"); +core::Property ListenHTTP::Port("Listening Port", "The Port to listen on for incoming connections", ""); +core::Property ListenHTTP::AuthorizedDNPattern("Authorized DN Pattern", "A Regular Expression to apply against the Distinguished Name of incoming" + " connections. If the Pattern does not match the DN, the connection will be refused.", + ".*"); +core::Property ListenHTTP::SSLCertificate("SSL Certificate", "File containing PEM-formatted file including TLS/SSL certificate and key", ""); +core::Property ListenHTTP::SSLCertificateAuthority("SSL Certificate Authority", "File containing trusted PEM-formatted certificates", ""); +core::Property ListenHTTP::SSLVerifyPeer("SSL Verify Peer", "Whether or not to verify the client's certificate (yes/no)", "no"); +core::Property ListenHTTP::SSLMinimumVersion("SSL Minimum Version", "Minimum TLS/SSL version allowed (SSL2, SSL3, TLS1.0, TLS1.1, TLS1.2)", "SSL2"); +core::Property ListenHTTP::HeadersAsAttributesRegex("HTTP Headers to receive as Attributes (Regex)", "Specifies the Regular Expression that determines the names of HTTP Headers that" + " should be passed along as FlowFile attributes", + ""); + +core::Relationship ListenHTTP::Success("success", "All files are routed to success"); void ListenHTTP::initialize() { logger_->log_info("Initializing ListenHTTP"); @@ -95,14 +77,11 @@ void ListenHTTP::initialize() { setSupportedRelationships(relationships); } -void ListenHTTP::onSchedule(core::ProcessContext *context, - core::ProcessSessionFactory *sessionFactory) { +void ListenHTTP::onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) { std::string basePath; if (!context->getProperty(BasePath.getName(), basePath)) { - logger_->log_info( - "%s attribute is missing, so default value of %s will be used", - BasePath.getName().c_str(), BasePath.getValue().c_str()); + logger_->log_info("%s attribute is missing, so default value of %s will be used", BasePath.getName().c_str(), BasePath.getValue().c_str()); basePath = BasePath.getValue(); } @@ -111,26 +90,20 @@ void ListenHTTP::onSchedule(core::ProcessContext *context, std::string listeningPort; if (!context->getProperty(Port.getName(), listeningPort)) { - logger_->log_error("%s attribute is missing or invalid", - Port.getName().c_str()); + logger_->log_error("%s attribute is missing or invalid", Port.getName().c_str()); return; } std::string authDNPattern; - if (context->getProperty(AuthorizedDNPattern.getName(), authDNPattern) - && !authDNPattern.empty()) { - logger_->log_info("ListenHTTP using %s: %s", - AuthorizedDNPattern.getName().c_str(), - authDNPattern.c_str()); + if (context->getProperty(AuthorizedDNPattern.getName(), authDNPattern) && !authDNPattern.empty()) { + logger_->log_info("ListenHTTP using %s: %s", AuthorizedDNPattern.getName().c_str(), authDNPattern.c_str()); } std::string sslCertFile; - if (context->getProperty(SSLCertificate.getName(), sslCertFile) - && !sslCertFile.empty()) { - logger_->log_info("ListenHTTP using %s: %s", - SSLCertificate.getName().c_str(), sslCertFile.c_str()); + if (context->getProperty(SSLCertificate.getName(), sslCertFile) && !sslCertFile.empty()) { + logger_->log_info("ListenHTTP using %s: %s", SSLCertificate.getName().c_str(), sslCertFile.c_str()); } // Read further TLS/SSL options only if TLS/SSL usage is implied by virtue of certificate value being set @@ -139,12 +112,8 @@ void ListenHTTP::onSchedule(core::ProcessContext *context, std::string sslMinVer; if (!sslCertFile.empty()) { - if (context->getProperty(SSLCertificateAuthority.getName(), - sslCertAuthorityFile) - && !sslCertAuthorityFile.empty()) { - logger_->log_info("ListenHTTP using %s: %s", - SSLCertificateAuthority.getName().c_str(), - sslCertAuthorityFile.c_str()); + if (context->getProperty(SSLCertificateAuthority.getName(), sslCertAuthorityFile) && !sslCertAuthorityFile.empty()) { + logger_->log_info("ListenHTTP using %s: %s", SSLCertificateAuthority.getName().c_str(), sslCertAuthorityFile.c_str()); } if (context->getProperty(SSLVerifyPeer.getName(), sslVerifyPeer)) { @@ -158,26 +127,19 @@ void ListenHTTP::onSchedule(core::ProcessContext *context, } if (context->getProperty(SSLMinimumVersion.getName(), sslMinVer)) { - logger_->log_info("ListenHTTP using %s: %s", - SSLMinimumVersion.getName().c_str(), sslMinVer.c_str()); + logger_->log_info("ListenHTTP using %s: %s", SSLMinimumVersion.getName().c_str(), sslMinVer.c_str()); } } std::string headersAsAttributesPattern; - if (context->getProperty(HeadersAsAttributesRegex.getName(), - headersAsAttributesPattern) - && !headersAsAttributesPattern.empty()) { - logger_->log_info("ListenHTTP using %s: %s", - HeadersAsAttributesRegex.getName().c_str(), - headersAsAttributesPattern.c_str()); + if (context->getProperty(HeadersAsAttributesRegex.getName(), headersAsAttributesPattern) && !headersAsAttributesPattern.empty()) { + logger_->log_info("ListenHTTP using %s: %s", HeadersAsAttributesRegex.getName().c_str(), headersAsAttributesPattern.c_str()); } auto numThreads = getMaxConcurrentTasks(); - logger_->log_info( - "ListenHTTP starting HTTP server on port %s and path %s with %d threads", - listeningPort.c_str(), basePath.c_str(), numThreads); + logger_->log_info("ListenHTTP starting HTTP server on port %s and path %s with %d threads", listeningPort.c_str(), basePath.c_str(), numThreads); // Initialize web server std::vector<std::string> options; @@ -231,19 +193,15 @@ void ListenHTTP::onSchedule(core::ProcessContext *context, } _server.reset(new CivetServer(options)); - _handler.reset( - new Handler(context, sessionFactory, std::move(authDNPattern), - std::move(headersAsAttributesPattern))); + _handler.reset(new Handler(context, sessionFactory, std::move(authDNPattern), std::move(headersAsAttributesPattern))); _server->addHandler(basePath, _handler.get()); } ListenHTTP::~ListenHTTP() { } -void ListenHTTP::onTrigger(core::ProcessContext *context, - core::ProcessSession *session) { - std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast< - FlowFileRecord>(session->get()); +void ListenHTTP::onTrigger(core::ProcessContext *context, core::ProcessSession *session) { + std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<FlowFileRecord>(session->get()); // Do nothing if there are no incoming files if (!flowFile) { @@ -251,10 +209,7 @@ void ListenHTTP::onTrigger(core::ProcessContext *context, } } -ListenHTTP::Handler::Handler(core::ProcessContext *context, - core::ProcessSessionFactory *sessionFactory, - std::string &&authDNPattern, - std::string &&headersAsAttributesPattern) +ListenHTTP::Handler::Handler(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory, std::string &&authDNPattern, std::string &&headersAsAttributesPattern) : _authDNRegex(std::move(authDNPattern)), _headersAsAttributesRegex(std::move(headersAsAttributesPattern)), logger_(logging::LoggerFactory<ListenHTTP::Handler>::getLogger()) { @@ -268,11 +223,9 @@ void ListenHTTP::Handler::sendErrorResponse(struct mg_connection *conn) { "Content-Length: 0\r\n\r\n"); } -bool ListenHTTP::Handler::handlePost(CivetServer *server, - struct mg_connection *conn) { +bool ListenHTTP::Handler::handlePost(CivetServer *server, struct mg_connection *conn) { auto req_info = mg_get_request_info(conn); - logger_->log_info("ListenHTTP handling POST request of length %d", - req_info->content_length); + logger_->log_info("ListenHTTP handling POST request of length %d", req_info->content_length); // If this is a two-way TLS connection, authorize the peer against the configured pattern if (req_info->is_ssl && req_info->client_cert != nullptr) { @@ -280,8 +233,7 @@ bool ListenHTTP::Handler::handlePost(CivetServer *server, mg_printf(conn, "HTTP/1.1 403 Forbidden\r\n" "Content-Type: text/html\r\n" "Content-Length: 0\r\n\r\n"); - logger_->log_warn("ListenHTTP client DN not authorized: %s", - req_info->client_cert->subject); + logger_->log_warn("ListenHTTP client DN not authorized: %s", req_info->client_cert->subject); return true; } } @@ -337,8 +289,8 @@ bool ListenHTTP::Handler::handlePost(CivetServer *server, return true; } -ListenHTTP::WriteCallback::WriteCallback(struct mg_connection *conn, const struct mg_request_info *reqInfo) : - logger_(logging::LoggerFactory<ListenHTTP::WriteCallback>::getLogger()) { +ListenHTTP::WriteCallback::WriteCallback(struct mg_connection *conn, const struct mg_request_info *reqInfo) + : logger_(logging::LoggerFactory<ListenHTTP::WriteCallback>::getLogger()) { _conn = conn; _reqInfo = reqInfo; } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/src/processors/ListenSyslog.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/processors/ListenSyslog.cpp b/libminifi/src/processors/ListenSyslog.cpp index e7d2e7b..054d585 100644 --- a/libminifi/src/processors/ListenSyslog.cpp +++ b/libminifi/src/processors/ListenSyslog.cpp @@ -35,37 +35,18 @@ namespace nifi { namespace minifi { namespace processors { -core::Property ListenSyslog::RecvBufSize( - "Receive Buffer Size", - "The size of each buffer used to receive Syslog messages.", "65507 B"); -core::Property ListenSyslog::MaxSocketBufSize( - "Max Size of Socket Buffer", - "The maximum size of the socket buffer that should be used.", "1 MB"); -core::Property ListenSyslog::MaxConnections( - "Max Number of TCP Connections", - "The maximum number of concurrent connections to accept Syslog messages in TCP mode.", - "2"); -core::Property ListenSyslog::MaxBatchSize( - "Max Batch Size", - "The maximum number of Syslog events to add to a single FlowFile.", "1"); -core::Property ListenSyslog::MessageDelimiter( - "Message Delimiter", - "Specifies the delimiter to place between Syslog messages when multiple " - "messages are bundled together (see <Max Batch Size> core::Property).", - "\n"); -core::Property ListenSyslog::ParseMessages( - "Parse Messages", - "Indicates if the processor should parse the Syslog messages. If set to false, each outgoing FlowFile will only.", - "false"); -core::Property ListenSyslog::Protocol("Protocol", - "The protocol for Syslog communication.", - "UDP"); -core::Property ListenSyslog::Port("Port", "The port for Syslog communication.", - "514"); -core::Relationship ListenSyslog::Success("success", - "All files are routed to success"); -core::Relationship ListenSyslog::Invalid("invalid", - "SysLog message format invalid"); +core::Property ListenSyslog::RecvBufSize("Receive Buffer Size", "The size of each buffer used to receive Syslog messages.", "65507 B"); +core::Property ListenSyslog::MaxSocketBufSize("Max Size of Socket Buffer", "The maximum size of the socket buffer that should be used.", "1 MB"); +core::Property ListenSyslog::MaxConnections("Max Number of TCP Connections", "The maximum number of concurrent connections to accept Syslog messages in TCP mode.", "2"); +core::Property ListenSyslog::MaxBatchSize("Max Batch Size", "The maximum number of Syslog events to add to a single FlowFile.", "1"); +core::Property ListenSyslog::MessageDelimiter("Message Delimiter", "Specifies the delimiter to place between Syslog messages when multiple " + "messages are bundled together (see <Max Batch Size> core::Property).", + "\n"); +core::Property ListenSyslog::ParseMessages("Parse Messages", "Indicates if the processor should parse the Syslog messages. If set to false, each outgoing FlowFile will only.", "false"); +core::Property ListenSyslog::Protocol("Protocol", "The protocol for Syslog communication.", "UDP"); +core::Property ListenSyslog::Port("Port", "The port for Syslog communication.", "514"); +core::Relationship ListenSyslog::Success("success", "All files are routed to success"); +core::Relationship ListenSyslog::Invalid("invalid", "SysLog message format invalid"); void ListenSyslog::initialize() { // Set the supported properties @@ -140,8 +121,7 @@ void ListenSyslog::runThread() { if (_protocol == "TCP") listen(sockfd, 5); _serverSocket = sockfd; - logger_->log_error("ListenSysLog Server socket %d bind OK to port %d", - _serverSocket, portno); + logger_->log_error("ListenSysLog Server socket %d bind OK to port %d", _serverSocket, portno); } FD_ZERO(&_readfds); FD_SET(_serverSocket, &_readfds); @@ -171,14 +151,11 @@ void ListenSyslog::runThread() { socklen_t clilen; struct sockaddr_in cli_addr; clilen = sizeof(cli_addr); - int newsockfd = accept(_serverSocket, - reinterpret_cast<struct sockaddr *>(&cli_addr), - &clilen); + int newsockfd = accept(_serverSocket, reinterpret_cast<struct sockaddr *>(&cli_addr), &clilen); if (newsockfd > 0) { if (_clientSockets.size() < _maxConnections) { _clientSockets.push_back(newsockfd); - logger_->log_info("ListenSysLog new client socket %d connection", - newsockfd); + logger_->log_info("ListenSysLog new client socket %d connection", newsockfd); continue; } else { close(newsockfd); @@ -188,10 +165,8 @@ void ListenSyslog::runThread() { socklen_t clilen; struct sockaddr_in cli_addr; clilen = sizeof(cli_addr); - int recvlen = recvfrom(_serverSocket, _buffer, sizeof(_buffer), 0, - (struct sockaddr *) &cli_addr, &clilen); - if (recvlen > 0 - && (recvlen + getEventQueueByteSize()) <= _recvBufSize) { + int recvlen = recvfrom(_serverSocket, _buffer, sizeof(_buffer), 0, (struct sockaddr *) &cli_addr, &clilen); + if (recvlen > 0 && (recvlen + getEventQueueByteSize()) <= _recvBufSize) { uint8_t *payload = new uint8_t[recvlen]; memcpy(payload, _buffer, recvlen); putEvent(payload, recvlen); @@ -205,8 +180,7 @@ void ListenSyslog::runThread() { int recvlen = readline(clientSocket, _buffer, sizeof(_buffer)); if (recvlen <= 0) { close(clientSocket); - logger_->log_info("ListenSysLog client socket %d close", - clientSocket); + logger_->log_info("ListenSysLog client socket %d close", clientSocket); it = _clientSockets.erase(it); } else { if ((recvlen + getEventQueueByteSize()) <= _recvBufSize) { @@ -253,8 +227,7 @@ int ListenSyslog::readline(int fd, char *bufptr, size_t len) { return -1; } -void ListenSyslog::onTrigger(core::ProcessContext *context, - core::ProcessSession *session) { +void ListenSyslog::onTrigger(core::ProcessContext *context, core::ProcessSession *session) { std::string value; bool needResetServerSocket = false; if (context->getProperty(Protocol.getName(), value)) { @@ -275,8 +248,7 @@ void ListenSyslog::onTrigger(core::ProcessContext *context, _messageDelimiter = value; } if (context->getProperty(ParseMessages.getName(), value)) { - org::apache::nifi::minifi::utils::StringUtils::StringToBool(value, - _parseMessages); + org::apache::nifi::minifi::utils::StringUtils::StringToBool(value, _parseMessages); } if (context->getProperty(Port.getName(), value)) { int64_t oldPort = _port; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/src/processors/LogAttribute.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/processors/LogAttribute.cpp b/libminifi/src/processors/LogAttribute.cpp index d2dcd10..e308901 100644 --- a/libminifi/src/processors/LogAttribute.cpp +++ b/libminifi/src/processors/LogAttribute.cpp @@ -39,27 +39,14 @@ namespace apache { namespace nifi { namespace minifi { namespace processors { -core::Property LogAttribute::LogLevel( - "Log Level", "The Log Level to use when logging the Attributes", "info"); -core::Property LogAttribute::AttributesToLog( - "Attributes to Log", - "A comma-separated list of Attributes to Log. If not specified, all attributes will be logged.", - ""); -core::Property LogAttribute::AttributesToIgnore( - "Attributes to Ignore", - "A comma-separated list of Attributes to ignore. If not specified, no attributes will be ignored.", - ""); -core::Property LogAttribute::LogPayload( - "Log Payload", - "If true, the FlowFile's payload will be logged, in addition to its attributes;" - "otherwise, just the Attributes will be logged.", - "false"); -core::Property LogAttribute::LogPrefix( - "Log prefix", - "Log prefix appended to the log lines. It helps to distinguish the output of multiple LogAttribute processors.", - ""); -core::Relationship LogAttribute::Success( - "success", "success operational on the flow record"); +core::Property LogAttribute::LogLevel("Log Level", "The Log Level to use when logging the Attributes", "info"); +core::Property LogAttribute::AttributesToLog("Attributes to Log", "A comma-separated list of Attributes to Log. If not specified, all attributes will be logged.", ""); +core::Property LogAttribute::AttributesToIgnore("Attributes to Ignore", "A comma-separated list of Attributes to ignore. If not specified, no attributes will be ignored.", ""); +core::Property LogAttribute::LogPayload("Log Payload", "If true, the FlowFile's payload will be logged, in addition to its attributes;" + "otherwise, just the Attributes will be logged.", + "false"); +core::Property LogAttribute::LogPrefix("Log prefix", "Log prefix appended to the log lines. It helps to distinguish the output of multiple LogAttribute processors.", ""); +core::Relationship LogAttribute::Success("success", "success operational on the flow record"); void LogAttribute::initialize() { // Set the supported properties @@ -76,8 +63,7 @@ void LogAttribute::initialize() { setSupportedRelationships(relationships); } -void LogAttribute::onTrigger(core::ProcessContext *context, - core::ProcessSession *session) { +void LogAttribute::onTrigger(core::ProcessContext *context, core::ProcessSession *session) { std::string dashLine = "--------------------------------------------------"; LogAttrLevel level = LogAttrLevelInfo; bool logPayload = false; @@ -96,8 +82,7 @@ void LogAttribute::onTrigger(core::ProcessContext *context, dashLine = "-----" + value + "-----"; } if (context->getProperty(LogPayload.getName(), value)) { - org::apache::nifi::minifi::utils::StringUtils::StringToBool(value, - logPayload); + org::apache::nifi::minifi::utils::StringUtils::StringToBool(value, logPayload); } message << "Logging for flow file " << "\n"; @@ -105,10 +90,8 @@ void LogAttribute::onTrigger(core::ProcessContext *context, message << "\nStandard FlowFile Attributes"; message << "\n" << "UUID:" << flow->getUUIDStr(); message << "\n" << "EntryDate:" << getTimeStr(flow->getEntryDate()); - message << "\n" << "lineageStartDate:" - << getTimeStr(flow->getlineageStartDate()); - message << "\n" << "Size:" << flow->getSize() << " Offset:" - << flow->getOffset(); + message << "\n" << "lineageStartDate:" << getTimeStr(flow->getlineageStartDate()); + message << "\n" << "Size:" << flow->getSize() << " Offset:" << flow->getOffset(); message << "\nFlowFile Attributes Map Content"; std::map<std::string, std::string> attrs = flow->getAttributes(); std::map<std::string, std::string>::iterator it; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/src/processors/PutFile.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/processors/PutFile.cpp b/libminifi/src/processors/PutFile.cpp index d8832c7..0aba3d7 100644 --- a/libminifi/src/processors/PutFile.cpp +++ b/libminifi/src/processors/PutFile.cpp @@ -38,19 +38,12 @@ namespace nifi { namespace minifi { namespace processors { -core::Property PutFile::Directory("Output Directory", - "The output directory to which to put files", - "."); -core::Property PutFile::ConflictResolution( - "Conflict Resolution Strategy", - "Indicates what should happen when a file with the same name already exists in the output directory", - CONFLICT_RESOLUTION_STRATEGY_FAIL); - -core::Relationship PutFile::Success("success", - "All files are routed to success"); -core::Relationship PutFile::Failure( - "failure", - "Failed files (conflict, write failure, etc.) are transferred to failure"); +core::Property PutFile::Directory("Output Directory", "The output directory to which to put files", "."); +core::Property PutFile::ConflictResolution("Conflict Resolution Strategy", "Indicates what should happen when a file with the same name already exists in the output directory", + CONFLICT_RESOLUTION_STRATEGY_FAIL); + +core::Relationship PutFile::Success("success", "All files are routed to success"); +core::Relationship PutFile::Failure("failure", "Failed files (conflict, write failure, etc.) are transferred to failure"); void PutFile::initialize() { // Set the supported properties @@ -65,28 +58,23 @@ void PutFile::initialize() { setSupportedRelationships(relationships); } -void PutFile::onSchedule(core::ProcessContext *context, - core::ProcessSessionFactory *sessionFactory) { +void PutFile::onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) { if (!context->getProperty(Directory.getName(), directory_)) { logger_->log_error("Directory attribute is missing or invalid"); } - if (!context->getProperty(ConflictResolution.getName(), - conflict_resolution_)) { - logger_->log_error( - "Conflict Resolution Strategy attribute is missing or invalid"); + if (!context->getProperty(ConflictResolution.getName(), conflict_resolution_)) { + logger_->log_error("Conflict Resolution Strategy attribute is missing or invalid"); } } -void PutFile::onTrigger(core::ProcessContext *context, - core::ProcessSession *session) { +void PutFile::onTrigger(core::ProcessContext *context, core::ProcessSession *session) { if (IsNullOrEmpty(directory_) || IsNullOrEmpty(conflict_resolution_)) { context->yield(); return; } - std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast< - FlowFileRecord>(session->get()); + std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<FlowFileRecord>(session->get()); // Do nothing if there are no incoming files if (!flowFile) { @@ -111,16 +99,13 @@ void PutFile::onTrigger(core::ProcessContext *context, destFileSs << directory_ << "/" << filename; std::string destFile = destFileSs.str(); - logger_->log_info("PutFile writing file %s into directory %s", - filename.c_str(), directory_.c_str()); + logger_->log_info("PutFile writing file %s into directory %s", filename.c_str(), directory_.c_str()); // If file exists, apply conflict resolution strategy struct stat statResult; if (stat(destFile.c_str(), &statResult) == 0) { - logger_->log_info( - "Destination file %s exists; applying Conflict Resolution Strategy: %s", - destFile.c_str(), conflict_resolution_.c_str()); + logger_->log_info("Destination file %s exists; applying Conflict Resolution Strategy: %s", destFile.c_str(), conflict_resolution_.c_str()); if (conflict_resolution_ == CONFLICT_RESOLUTION_STRATEGY_REPLACE) { putFile(session, flowFile, tmpFile, destFile); @@ -134,9 +119,7 @@ void PutFile::onTrigger(core::ProcessContext *context, } } -bool PutFile::putFile(core::ProcessSession *session, - std::shared_ptr<FlowFileRecord> flowFile, - const std::string &tmpFile, const std::string &destFile) { +bool PutFile::putFile(core::ProcessSession *session, std::shared_ptr<FlowFileRecord> flowFile, const std::string &tmpFile, const std::string &destFile) { ReadCallback cb(tmpFile, destFile); session->read(flowFile, &cb); @@ -149,8 +132,7 @@ bool PutFile::putFile(core::ProcessSession *session, return false; } -PutFile::ReadCallback::ReadCallback(const std::string &tmpFile, - const std::string &destFile) +PutFile::ReadCallback::ReadCallback(const std::string &tmpFile, const std::string &destFile) : _tmpFile(tmpFile), _tmpFileOs(tmpFile), _destFile(destFile), @@ -170,25 +152,19 @@ void PutFile::ReadCallback::process(std::ifstream *stream) { bool PutFile::ReadCallback::commit() { bool success = false; - logger_->log_info("PutFile committing put file operation to %s", - _destFile.c_str()); + logger_->log_info("PutFile committing put file operation to %s", _destFile.c_str()); if (_writeSucceeded) { _tmpFileOs.close(); if (rename(_tmpFile.c_str(), _destFile.c_str())) { - logger_->log_info( - "PutFile commit put file operation to %s failed because rename() call failed", - _destFile.c_str()); + logger_->log_info("PutFile commit put file operation to %s failed because rename() call failed", _destFile.c_str()); } else { success = true; - logger_->log_info("PutFile commit put file operation to %s succeeded", - _destFile.c_str()); + logger_->log_info("PutFile commit put file operation to %s succeeded", _destFile.c_str()); } } else { - logger_->log_error( - "PutFile commit put file operation to %s failed because write failed", - _destFile.c_str()); + logger_->log_error("PutFile commit put file operation to %s failed because write failed", _destFile.c_str()); } return success; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/src/processors/TailFile.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/processors/TailFile.cpp b/libminifi/src/processors/TailFile.cpp index abb02ca..d4f1b80 100644 --- a/libminifi/src/processors/TailFile.cpp +++ b/libminifi/src/processors/TailFile.cpp @@ -46,16 +46,11 @@ namespace nifi { namespace minifi { namespace processors { -core::Property TailFile::FileName( - "File to Tail", - "Fully-qualified filename of the file that should be tailed", ""); -core::Property TailFile::StateFile( - "State File", - "Specifies the file that should be used for storing state about" - " what data has been ingested so that upon restart NiFi can resume from where it left off", - "TailFileState"); -core::Relationship TailFile::Success("success", - "All files are routed to success"); +core::Property TailFile::FileName("File to Tail", "Fully-qualified filename of the file that should be tailed", ""); +core::Property TailFile::StateFile("State File", "Specifies the file that should be used for storing state about" + " what data has been ingested so that upon restart NiFi can resume from where it left off", + "TailFileState"); +core::Relationship TailFile::Success("success", "All files are routed to success"); void TailFile::initialize() { // Set the supported properties @@ -84,8 +79,7 @@ void TailFile::parseStateFileLine(char *buf) { ++line; char first = line[0]; - if ((first == '\0') || (first == '#') || (first == '\r') || (first == '\n') - || (first == '=')) { + if ((first == '\0') || (first == '#') || (first == '\r') || (first == '\n') || (first == '=')) { return; } @@ -125,8 +119,7 @@ void TailFile::recoverState() { return; } char buf[BUFFER_SIZE]; - for (file.getline(buf, BUFFER_SIZE); file.good(); - file.getline(buf, BUFFER_SIZE)) { + for (file.getline(buf, BUFFER_SIZE); file.good(); file.getline(buf, BUFFER_SIZE)) { parseStateFileLine(buf); } } @@ -142,12 +135,10 @@ void TailFile::storeState() { file.close(); } -static bool sortTailMatchedFileItem(TailMatchedFileItem i, - TailMatchedFileItem j) { +static bool sortTailMatchedFileItem(TailMatchedFileItem i, TailMatchedFileItem j) { return (i.modifiedTime < j.modifiedTime); } -void TailFile::checkRollOver(const std::string &fileLocation, - const std::string &fileName) { +void TailFile::checkRollOver(const std::string &fileLocation, const std::string &fileName) { struct stat statbuf; std::vector<TailMatchedFileItem> matchedFiles; std::string fullPath = fileLocation + "/" + _currentTailFileName; @@ -157,8 +148,7 @@ void TailFile::checkRollOver(const std::string &fileLocation, // there are new input for the current tail file return; - uint64_t modifiedTimeCurrentTailFile = - ((uint64_t) (statbuf.st_mtime) * 1000); + uint64_t modifiedTimeCurrentTailFile = ((uint64_t) (statbuf.st_mtime) * 1000); std::string pattern = fileName; std::size_t found = fileName.find_last_of("."); if (found != std::string::npos) @@ -176,10 +166,8 @@ void TailFile::checkRollOver(const std::string &fileLocation, if (!(entry->d_type & DT_DIR)) { std::string fileName = d_name; std::string fileFullName = fileLocation + "/" + d_name; - if (fileFullName.find(pattern) != std::string::npos - && stat(fileFullName.c_str(), &statbuf) == 0) { - if (((uint64_t) (statbuf.st_mtime) * 1000) - >= modifiedTimeCurrentTailFile) { + if (fileFullName.find(pattern) != std::string::npos && stat(fileFullName.c_str(), &statbuf) == 0) { + if (((uint64_t) (statbuf.st_mtime) * 1000) >= modifiedTimeCurrentTailFile) { TailMatchedFileItem item; item.fileName = fileName; item.modifiedTime = ((uint64_t) (statbuf.st_mtime) * 1000); @@ -191,18 +179,14 @@ void TailFile::checkRollOver(const std::string &fileLocation, closedir(d); // Sort the list based on modified time - std::sort(matchedFiles.begin(), matchedFiles.end(), - sortTailMatchedFileItem); - for (std::vector<TailMatchedFileItem>::iterator it = matchedFiles.begin(); - it != matchedFiles.end(); ++it) { + std::sort(matchedFiles.begin(), matchedFiles.end(), sortTailMatchedFileItem); + for (std::vector<TailMatchedFileItem>::iterator it = matchedFiles.begin(); it != matchedFiles.end(); ++it) { TailMatchedFileItem item = *it; if (item.fileName == _currentTailFileName) { ++it; if (it != matchedFiles.end()) { TailMatchedFileItem nextItem = *it; - logger_->log_info("TailFile File Roll Over from %s to %s", - _currentTailFileName.c_str(), - nextItem.fileName.c_str()); + logger_->log_info("TailFile File Roll Over from %s to %s", _currentTailFileName.c_str(), nextItem.fileName.c_str()); _currentTailFileName = nextItem.fileName; _currentTailFilePosition = 0; storeState(); @@ -215,8 +199,7 @@ void TailFile::checkRollOver(const std::string &fileLocation, } } -void TailFile::onTrigger(core::ProcessContext *context, - core::ProcessSession *session) { +void TailFile::onTrigger(core::ProcessContext *context, core::ProcessSession *session) { std::lock_guard<std::mutex> tail_lock(tail_file_mutex_); std::string value; std::string fileLocation = ""; @@ -245,8 +228,7 @@ void TailFile::onTrigger(core::ProcessContext *context, context->yield(); return; } - std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast< - FlowFileRecord>(session->create()); + std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<FlowFileRecord>(session->create()); if (!flowFile) return; std::size_t found = _currentTailFileName.find_last_of("."); @@ -256,12 +238,8 @@ void TailFile::onTrigger(core::ProcessContext *context, flowFile->addKeyedAttribute(ABSOLUTE_PATH, fullPath); session->import(fullPath, flowFile, true, this->_currentTailFilePosition); session->transfer(flowFile, Success); - logger_->log_info("TailFile %s for %d bytes", _currentTailFileName.c_str(), - flowFile->getSize()); - std::string logName = baseName + "." - + std::to_string(_currentTailFilePosition) + "-" - + std::to_string(_currentTailFilePosition + flowFile->getSize()) + "." - + extension; + logger_->log_info("TailFile %s for %d bytes", _currentTailFileName.c_str(), flowFile->getSize()); + std::string logName = baseName + "." + std::to_string(_currentTailFilePosition) + "-" + std::to_string(_currentTailFilePosition + flowFile->getSize()) + "." + extension; flowFile->updateKeyedAttribute(FILENAME, logName); this->_currentTailFilePosition += flowFile->getSize(); storeState(); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/src/provenance/Provenance.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/provenance/Provenance.cpp b/libminifi/src/provenance/Provenance.cpp index 083d0b2..ff6a149 100644 --- a/libminifi/src/provenance/Provenance.cpp +++ b/libminifi/src/provenance/Provenance.cpp @@ -34,49 +34,37 @@ namespace nifi { namespace minifi { namespace provenance { -const char *ProvenanceEventRecord::ProvenanceEventTypeStr[REPLAY+1] = -{ "CREATE", "RECEIVE", "FETCH", "SEND", "DOWNLOAD", "DROP", "EXPIRE", "FORK", - "JOIN", "CLONE", "CONTENT_MODIFIED", "ATTRIBUTES_MODIFIED", "ROUTE", - "ADDINFO", "REPLAY"}; +const char *ProvenanceEventRecord::ProvenanceEventTypeStr[REPLAY + 1] = { "CREATE", "RECEIVE", "FETCH", "SEND", "DOWNLOAD", "DROP", "EXPIRE", "FORK", "JOIN", "CLONE", "CONTENT_MODIFIED", + "ATTRIBUTES_MODIFIED", "ROUTE", "ADDINFO", "REPLAY" }; // DeSerialize -bool ProvenanceEventRecord::DeSerialize( - const std::shared_ptr<core::Repository> &repo, std::string key) { +bool ProvenanceEventRecord::DeSerialize(const std::shared_ptr<core::Repository> &repo, std::string key) { std::string value; bool ret; ret = repo->Get(key, value); if (!ret) { - logger_->log_error("NiFi Provenance Store event %s can not found", - key.c_str()); + logger_->log_error("NiFi Provenance Store event %s can not found", key.c_str()); return false; } else { - logger_->log_debug("NiFi Provenance Read event %s length %d", key.c_str(), - value.length()); + logger_->log_debug("NiFi Provenance Read event %s length %d", key.c_str(), value.length()); } - org::apache::nifi::minifi::io::DataStream stream( - (const uint8_t*) value.data(), value.length()); + org::apache::nifi::minifi::io::DataStream stream((const uint8_t*) value.data(), value.length()); ret = DeSerialize(stream); if (ret) { - logger_->log_debug( - "NiFi Provenance retrieve event %s size %d eventType %d success", - _eventIdStr.c_str(), stream.getSize(), _eventType); + logger_->log_debug("NiFi Provenance retrieve event %s size %d eventType %d success", _eventIdStr.c_str(), stream.getSize(), _eventType); } else { - logger_->log_debug( - "NiFi Provenance retrieve event %s size %d eventType %d fail", - _eventIdStr.c_str(), stream.getSize(), _eventType); + logger_->log_debug("NiFi Provenance retrieve event %s size %d eventType %d fail", _eventIdStr.c_str(), stream.getSize(), _eventType); } return ret; } -bool ProvenanceEventRecord::Serialize( - const std::shared_ptr<core::Repository> &repo) { - +bool ProvenanceEventRecord::Serialize(const std::shared_ptr<core::Repository> &repo) { org::apache::nifi::minifi::io::DataStream outStream; int ret; @@ -170,9 +158,7 @@ bool ProvenanceEventRecord::Serialize( return false; } - if (this->_eventType == ProvenanceEventRecord::FORK - || this->_eventType == ProvenanceEventRecord::CLONE - || this->_eventType == ProvenanceEventRecord::JOIN) { + if (this->_eventType == ProvenanceEventRecord::FORK || this->_eventType == ProvenanceEventRecord::CLONE || this->_eventType == ProvenanceEventRecord::JOIN) { // write UUIDs uint32_t number = this->_parentUuids.size(); ret = write(number, &outStream); @@ -196,8 +182,7 @@ bool ProvenanceEventRecord::Serialize( return false; } } - } else if (this->_eventType == ProvenanceEventRecord::SEND - || this->_eventType == ProvenanceEventRecord::FETCH) { + } else if (this->_eventType == ProvenanceEventRecord::SEND || this->_eventType == ProvenanceEventRecord::FETCH) { ret = writeUTF(this->_transitUri, &outStream); if (ret <= 0) { return false; @@ -213,19 +198,15 @@ bool ProvenanceEventRecord::Serialize( } } // Persistent to the DB - if (repo->Put(_eventIdStr, const_cast<uint8_t*>(outStream.getBuffer()), - outStream.getSize())) { - logger_->log_debug("NiFi Provenance Store event %s size %d success", - _eventIdStr.c_str(), outStream.getSize()); + if (repo->Put(_eventIdStr, const_cast<uint8_t*>(outStream.getBuffer()), outStream.getSize())) { + logger_->log_debug("NiFi Provenance Store event %s size %d success", _eventIdStr.c_str(), outStream.getSize()); } else { - logger_->log_error("NiFi Provenance Store event %s size %d fail", - _eventIdStr.c_str(), outStream.getSize()); + logger_->log_error("NiFi Provenance Store event %s size %d fail", _eventIdStr.c_str(), outStream.getSize()); } return true; } -bool ProvenanceEventRecord::DeSerialize(const uint8_t *buffer, - const int bufferSize) { +bool ProvenanceEventRecord::DeSerialize(const uint8_t *buffer, const int bufferSize) { int ret; org::apache::nifi::minifi::io::DataStream outStream(buffer, bufferSize); @@ -325,9 +306,7 @@ bool ProvenanceEventRecord::DeSerialize(const uint8_t *buffer, return false; } - if (this->_eventType == ProvenanceEventRecord::FORK - || this->_eventType == ProvenanceEventRecord::CLONE - || this->_eventType == ProvenanceEventRecord::JOIN) { + if (this->_eventType == ProvenanceEventRecord::FORK || this->_eventType == ProvenanceEventRecord::CLONE || this->_eventType == ProvenanceEventRecord::JOIN) { // read UUIDs uint32_t number = 0; ret = read(number, &outStream); @@ -356,8 +335,7 @@ bool ProvenanceEventRecord::DeSerialize(const uint8_t *buffer, } this->addChildUuid(childUUID); } - } else if (this->_eventType == ProvenanceEventRecord::SEND - || this->_eventType == ProvenanceEventRecord::FETCH) { + } else if (this->_eventType == ProvenanceEventRecord::SEND || this->_eventType == ProvenanceEventRecord::FETCH) { ret = readUTF(this->_transitUri, &outStream); if (ret <= 0) { return false; @@ -386,8 +364,7 @@ void ProvenanceReporter::commit() { } } -void ProvenanceReporter::create(std::shared_ptr<core::FlowFile> flow, - std::string detail) { +void ProvenanceReporter::create(std::shared_ptr<core::FlowFile> flow, std::string detail) { ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::CREATE, flow); if (event) { @@ -396,9 +373,7 @@ void ProvenanceReporter::create(std::shared_ptr<core::FlowFile> flow, } } -void ProvenanceReporter::route(std::shared_ptr<core::FlowFile> flow, - core::Relationship relation, std::string detail, - uint64_t processingDuration) { +void ProvenanceReporter::route(std::shared_ptr<core::FlowFile> flow, core::Relationship relation, std::string detail, uint64_t processingDuration) { ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::ROUTE, flow); if (event) { @@ -409,10 +384,8 @@ void ProvenanceReporter::route(std::shared_ptr<core::FlowFile> flow, } } -void ProvenanceReporter::modifyAttributes(std::shared_ptr<core::FlowFile> flow, - std::string detail) { - ProvenanceEventRecord *event = allocate( - ProvenanceEventRecord::ATTRIBUTES_MODIFIED, flow); +void ProvenanceReporter::modifyAttributes(std::shared_ptr<core::FlowFile> flow, std::string detail) { + ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::ATTRIBUTES_MODIFIED, flow); if (event) { event->setDetails(detail); @@ -420,11 +393,8 @@ void ProvenanceReporter::modifyAttributes(std::shared_ptr<core::FlowFile> flow, } } -void ProvenanceReporter::modifyContent(std::shared_ptr<core::FlowFile> flow, - std::string detail, - uint64_t processingDuration) { - ProvenanceEventRecord *event = allocate( - ProvenanceEventRecord::CONTENT_MODIFIED, flow); +void ProvenanceReporter::modifyContent(std::shared_ptr<core::FlowFile> flow, std::string detail, uint64_t processingDuration) { + ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::CONTENT_MODIFIED, flow); if (event) { event->setDetails(detail); @@ -433,8 +403,7 @@ void ProvenanceReporter::modifyContent(std::shared_ptr<core::FlowFile> flow, } } -void ProvenanceReporter::clone(std::shared_ptr<core::FlowFile> parent, - std::shared_ptr<core::FlowFile> child) { +void ProvenanceReporter::clone(std::shared_ptr<core::FlowFile> parent, std::shared_ptr<core::FlowFile> child) { ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::CLONE, parent); if (event) { @@ -444,10 +413,7 @@ void ProvenanceReporter::clone(std::shared_ptr<core::FlowFile> parent, } } -void ProvenanceReporter::join( - std::vector<std::shared_ptr<core::FlowFile> > parents, - std::shared_ptr<core::FlowFile> child, std::string detail, - uint64_t processingDuration) { +void ProvenanceReporter::join(std::vector<std::shared_ptr<core::FlowFile> > parents, std::shared_ptr<core::FlowFile> child, std::string detail, uint64_t processingDuration) { ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::JOIN, child); if (event) { @@ -463,10 +429,7 @@ void ProvenanceReporter::join( } } -void ProvenanceReporter::fork( - std::vector<std::shared_ptr<core::FlowFile> > child, - std::shared_ptr<core::FlowFile> parent, std::string detail, - uint64_t processingDuration) { +void ProvenanceReporter::fork(std::vector<std::shared_ptr<core::FlowFile> > child, std::shared_ptr<core::FlowFile> parent, std::string detail, uint64_t processingDuration) { ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::FORK, parent); if (event) { @@ -482,8 +445,7 @@ void ProvenanceReporter::fork( } } -void ProvenanceReporter::expire(std::shared_ptr<core::FlowFile> flow, - std::string detail) { +void ProvenanceReporter::expire(std::shared_ptr<core::FlowFile> flow, std::string detail) { ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::EXPIRE, flow); if (event) { @@ -492,8 +454,7 @@ void ProvenanceReporter::expire(std::shared_ptr<core::FlowFile> flow, } } -void ProvenanceReporter::drop(std::shared_ptr<core::FlowFile> flow, - std::string reason) { +void ProvenanceReporter::drop(std::shared_ptr<core::FlowFile> flow, std::string reason) { ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::DROP, flow); if (event) { @@ -503,9 +464,7 @@ void ProvenanceReporter::drop(std::shared_ptr<core::FlowFile> flow, } } -void ProvenanceReporter::send(std::shared_ptr<core::FlowFile> flow, - std::string transitUri, std::string detail, - uint64_t processingDuration, bool force) { +void ProvenanceReporter::send(std::shared_ptr<core::FlowFile> flow, std::string transitUri, std::string detail, uint64_t processingDuration, bool force) { ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::SEND, flow); if (event) { @@ -522,11 +481,7 @@ void ProvenanceReporter::send(std::shared_ptr<core::FlowFile> flow, } } -void ProvenanceReporter::receive(std::shared_ptr<core::FlowFile> flow, - std::string transitUri, - std::string sourceSystemFlowFileIdentifier, - std::string detail, - uint64_t processingDuration) { +void ProvenanceReporter::receive(std::shared_ptr<core::FlowFile> flow, std::string transitUri, std::string sourceSystemFlowFileIdentifier, std::string detail, uint64_t processingDuration) { ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::RECEIVE, flow); if (event) { @@ -538,9 +493,7 @@ void ProvenanceReporter::receive(std::shared_ptr<core::FlowFile> flow, } } -void ProvenanceReporter::fetch(std::shared_ptr<core::FlowFile> flow, - std::string transitUri, std::string detail, - uint64_t processingDuration) { +void ProvenanceReporter::fetch(std::shared_ptr<core::FlowFile> flow, std::string transitUri, std::string detail, uint64_t processingDuration) { ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::FETCH, flow); if (event) { http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/src/provenance/ProvenanceRepository.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/provenance/ProvenanceRepository.cpp b/libminifi/src/provenance/ProvenanceRepository.cpp index 77de5ba..e4a8ffa 100644 --- a/libminifi/src/provenance/ProvenanceRepository.cpp +++ b/libminifi/src/provenance/ProvenanceRepository.cpp @@ -40,14 +40,11 @@ void ProvenanceRepository::run() { for (it->SeekToFirst(); it->Valid(); it->Next()) { ProvenanceEventRecord eventRead; std::string key = it->key().ToString(); - if (eventRead.DeSerialize( - reinterpret_cast<uint8_t*>(const_cast<char*>(it->value().data())), - it->value().size())) { + if (eventRead.DeSerialize(reinterpret_cast<uint8_t*>(const_cast<char*>(it->value().data())), it->value().size())) { if ((curTime - eventRead.getEventTime()) > max_partition_millis_) purgeList.push_back(key); } else { - logger_->log_debug("NiFi Provenance retrieve event %s fail", - key.c_str()); + logger_->log_debug("NiFi Provenance retrieve event %s fail", key.c_str()); purgeList.push_back(key); } } @@ -56,8 +53,7 @@ void ProvenanceRepository::run() { for (itPurge = purgeList.begin(); itPurge != purgeList.end(); itPurge++) { std::string eventId = *itPurge; - logger_->log_info("ProvenanceRepository Repo Purge %s", - eventId.c_str()); + logger_->log_info("ProvenanceRepository Repo Purge %s", eventId.c_str()); Delete(eventId); } } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/test/Server.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/Server.cpp b/libminifi/test/Server.cpp index bb3e682..39efb12 100644 --- a/libminifi/test/Server.cpp +++ b/libminifi/test/Server.cpp @@ -50,8 +50,7 @@ typedef enum { } FlowControlMsgType; // FlowControl Protocol Msg Type String -static const char *FlowControlMsgTypeStr[MAX_FLOW_CONTROL_MSG_TYPE] = { - "REGISTER_REQ", "REGISTER_RESP", "REPORT_REQ", "REPORT_RESP" }; +static const char *FlowControlMsgTypeStr[MAX_FLOW_CONTROL_MSG_TYPE] = { "REGISTER_REQ", "REGISTER_RESP", "REPORT_REQ", "REPORT_RESP" }; // Flow Control Msg Type to String inline const char *FlowControlMsgTypeToStr(FlowControlMsgType type) { @@ -83,10 +82,8 @@ typedef enum { } FlowControlMsgID; // FlowControl Protocol Msg ID String -static const char *FlowControlMsgIDStr[MAX_FLOW_MSG_ID] = { - "FLOW_SERIAL_NUMBER", "FLOW_YAML_NAME", "FLOW_YAML_CONTENT", - "REPORT_INTERVAL", "PROCESSOR_NAME" - "PROPERTY_NAME", "PROPERTY_VALUE", "REPORT_BLOB" }; +static const char *FlowControlMsgIDStr[MAX_FLOW_MSG_ID] = { "FLOW_SERIAL_NUMBER", "FLOW_YAML_NAME", "FLOW_YAML_CONTENT", "REPORT_INTERVAL", "PROCESSOR_NAME" + "PROPERTY_NAME", "PROPERTY_VALUE", "REPORT_BLOB" }; #define TYPE_HDR_LEN 4 // Fix Hdr Type #define TLV_HDR_LEN 8 // Type 4 bytes and Len 4 bytes @@ -122,9 +119,7 @@ typedef enum { } FlowControlRespCode; // FlowControl Resp Code str -static const char *FlowControlRespCodeStr[MAX_RESP_CODE] = { "RESP_SUCCESS", - "RESP_TRIGGER_REGISTER", "RESP_START_FLOW_CONTROLLER", - "RESP_STOP_FLOW_CONTROLLER", "RESP_FAILURE" }; +static const char *FlowControlRespCodeStr[MAX_RESP_CODE] = { "RESP_SUCCESS", "RESP_TRIGGER_REGISTER", "RESP_START_FLOW_CONTROLLER", "RESP_STOP_FLOW_CONTROLLER", "RESP_FAILURE" }; // Flow Control Resp Code to String inline const char *FlowControlRespCodeToStr(FlowControlRespCode code) { @@ -332,8 +327,7 @@ int main(int argc, char *argv[]) { exit(1); } - if (signal(SIGINT, sigHandler) == SIG_ERR - || signal(SIGTERM, sigHandler) == SIG_ERR) { + if (signal(SIGINT, sigHandler) == SIG_ERR || signal(SIGTERM, sigHandler) == SIG_ERR) { return -1; } @@ -360,13 +354,10 @@ int main(int argc, char *argv[]) { FlowControlProtocolHeader hdr; int status = readHdr(newsockfd, &hdr); if (status > 0) { - printf("Flow Control Protocol receive MsgType %s\n", - FlowControlMsgTypeToStr((FlowControlMsgType) hdr.msgType)); + printf("Flow Control Protocol receive MsgType %s\n", FlowControlMsgTypeToStr((FlowControlMsgType) hdr.msgType)); printf("Flow Control Protocol receive Seq Num %d\n", hdr.seqNumber); - printf("Flow Control Protocol receive Resp Code %s\n", - FlowControlRespCodeToStr((FlowControlRespCode) hdr.status)); - printf("Flow Control Protocol receive Payload len %d\n", - hdr.payloadLen); + printf("Flow Control Protocol receive Resp Code %s\n", FlowControlRespCodeToStr((FlowControlRespCode) hdr.status)); + printf("Flow Control Protocol receive Payload len %d\n", hdr.payloadLen); if (((FlowControlMsgType) hdr.msgType) == REGISTER_REQ) { printf("Flow Control Protocol Register Req receive\n"); uint8_t *payload = new uint8_t[hdr.payloadLen]; @@ -384,12 +375,10 @@ int main(int argc, char *argv[]) { } else if (((FlowControlMsgID) msgID) == FLOW_YAML_NAME) { uint32_t len; payloadPtr = decode(payloadPtr, len); - printf("Flow Control Protocol receive YAML name length %d\n", - len); + printf("Flow Control Protocol receive YAML name length %d\n", len); std::string flowName = (const char *) payloadPtr; payloadPtr += len; - printf("Flow Control Protocol receive YAML name %s\n", - flowName.c_str()); + printf("Flow Control Protocol receive YAML name %s\n", flowName.c_str()); } else { break; } @@ -399,11 +388,9 @@ int main(int argc, char *argv[]) { // Calculate the total payload msg size char *ymlContent; uint32_t yamlLen = readYAML(&ymlContent); - uint32_t payloadSize = FlowControlMsgIDEncodingLen(REPORT_INTERVAL, - 0); + uint32_t payloadSize = FlowControlMsgIDEncodingLen(REPORT_INTERVAL, 0); if (yamlLen > 0) - payloadSize += FlowControlMsgIDEncodingLen(FLOW_YAML_CONTENT, - yamlLen); + payloadSize += FlowControlMsgIDEncodingLen(FLOW_YAML_CONTENT, yamlLen); uint32_t size = sizeof(FlowControlProtocolHeader) + payloadSize; uint8_t *data = new uint8_t[size]; @@ -444,12 +431,10 @@ int main(int argc, char *argv[]) { if (((FlowControlMsgID) msgID) == FLOW_YAML_NAME) { uint32_t len; payloadPtr = decode(payloadPtr, len); - printf("Flow Control Protocol receive YAML name length %d\n", - len); + printf("Flow Control Protocol receive YAML name length %d\n", len); std::string flowName = (const char *) payloadPtr; payloadPtr += len; - printf("Flow Control Protocol receive YAML name %s\n", - flowName.c_str()); + printf("Flow Control Protocol receive YAML name %s\n", flowName.c_str()); } else { break; } @@ -475,16 +460,11 @@ int main(int argc, char *argv[]) { propertyValue2 = "41"; flag = 0; } - uint32_t payloadSize = FlowControlMsgIDEncodingLen( - PROCESSOR_NAME, processor.size() + 1); - payloadSize += FlowControlMsgIDEncodingLen(PROPERTY_NAME, - propertyName1.size() + 1); - payloadSize += FlowControlMsgIDEncodingLen(PROPERTY_VALUE, - propertyValue1.size() + 1); - payloadSize += FlowControlMsgIDEncodingLen(PROPERTY_NAME, - propertyName2.size() + 1); - payloadSize += FlowControlMsgIDEncodingLen(PROPERTY_VALUE, - propertyValue2.size() + 1); + uint32_t payloadSize = FlowControlMsgIDEncodingLen(PROCESSOR_NAME, processor.size() + 1); + payloadSize += FlowControlMsgIDEncodingLen(PROPERTY_NAME, propertyName1.size() + 1); + payloadSize += FlowControlMsgIDEncodingLen(PROPERTY_VALUE, propertyValue1.size() + 1); + payloadSize += FlowControlMsgIDEncodingLen(PROPERTY_NAME, propertyName2.size() + 1); + payloadSize += FlowControlMsgIDEncodingLen(PROPERTY_VALUE, propertyValue2.size() + 1); uint32_t size = sizeof(FlowControlProtocolHeader) + payloadSize; uint8_t *data = new uint8_t[size]; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/test/TestBase.h ---------------------------------------------------------------------- diff --git a/libminifi/test/TestBase.h b/libminifi/test/TestBase.h index 7b1ac6b..e675043 100644 --- a/libminifi/test/TestBase.h +++ b/libminifi/test/TestBase.h @@ -36,40 +36,40 @@ class LogTestController { public: static LogTestController& getInstance() { - static LogTestController instance; - return instance; + static LogTestController instance; + return instance; } - + template<typename T> void setTrace() { setLevel<T>(spdlog::level::trace); } - + template<typename T> void setDebug() { setLevel<T>(spdlog::level::debug); } - + template<typename T> void setInfo() { setLevel<T>(spdlog::level::info); } - + template<typename T> void setWarn() { setLevel<T>(spdlog::level::warn); } - + template<typename T> void setError() { setLevel<T>(spdlog::level::err); } - + template<typename T> void setOff() { setLevel<T>(spdlog::level::off); } - + template<typename T> void setLevel(spdlog::level::level_enum level) { logging::LoggerFactory<T>::getLogger(); @@ -77,17 +77,17 @@ class LogTestController { modified_loggers.push_back(name); setLevel(name, level); } - + bool contains(const std::string &ending) { - return contains(log_output, ending); + return contains(log_output, ending); } - + bool contains(const std::ostringstream &stream, const std::string &ending) { std::string str = stream.str(); logger_->log_info("Looking for %s in %s.", ending, str); return (ending.length() > 0 && str.find(ending) != std::string::npos); } - + void reset() { for (auto const & name : modified_loggers) { setLevel(name, spdlog::level::err); @@ -95,35 +95,40 @@ class LogTestController { modified_loggers = std::vector<std::string>(); resetStream(log_output); } - + inline void resetStream(std::ostringstream &stream) { stream.str(""); stream.clear(); } - + std::ostringstream log_output; - + std::shared_ptr<logging::Logger> logger_; private: - class TestBootstrapLogger: public logging::Logger { - public: - TestBootstrapLogger(std::shared_ptr<spdlog::logger> logger):Logger(logger){}; - }; + class TestBootstrapLogger : public logging::Logger { + public: + TestBootstrapLogger(std::shared_ptr<spdlog::logger> logger) + : Logger(logger) { + } + ; + }; LogTestController() { - std::shared_ptr<logging::LoggerProperties> logger_properties = std::make_shared<logging::LoggerProperties>(); - logger_properties->set("logger.root", "ERROR,ostream"); - logger_properties->set("logger." + core::getClassName<LogTestController>(), "INFO"); - logger_properties->set("logger." + core::getClassName<logging::LoggerConfiguration>(), "DEBUG"); - std::shared_ptr<spdlog::sinks::dist_sink_mt> dist_sink = std::make_shared<spdlog::sinks::dist_sink_mt>(); - dist_sink->add_sink(std::make_shared<spdlog::sinks::ostream_sink_mt>(log_output, true)); - dist_sink->add_sink(spdlog::sinks::stderr_sink_mt::instance()); - logger_properties->add_sink("ostream", dist_sink); - logging::LoggerConfiguration::getConfiguration().initialize(logger_properties); - logger_ = logging::LoggerFactory<LogTestController>::getLogger(); + std::shared_ptr<logging::LoggerProperties> logger_properties = std::make_shared<logging::LoggerProperties>(); + logger_properties->set("logger.root", "ERROR,ostream"); + logger_properties->set("logger." + core::getClassName<LogTestController>(), "INFO"); + logger_properties->set("logger." + core::getClassName<logging::LoggerConfiguration>(), "DEBUG"); + std::shared_ptr<spdlog::sinks::dist_sink_mt> dist_sink = std::make_shared<spdlog::sinks::dist_sink_mt>(); + dist_sink->add_sink(std::make_shared<spdlog::sinks::ostream_sink_mt>(log_output, true)); + dist_sink->add_sink(spdlog::sinks::stderr_sink_mt::instance()); + logger_properties->add_sink("ostream", dist_sink); + logging::LoggerConfiguration::getConfiguration().initialize(logger_properties); + logger_ = logging::LoggerFactory<LogTestController>::getLogger(); } LogTestController(LogTestController const&); LogTestController& operator=(LogTestController const&); - ~LogTestController() {}; + ~LogTestController() { + } + ; void setLevel(const std::string name, spdlog::level::level_enum level) { logger_->log_info("Setting log level for %s to %s", name, spdlog::level::to_str(level));
