[
https://issues.apache.org/jira/browse/MINIFICPP-558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16686769#comment-16686769
]
ASF GitHub Bot commented on MINIFICPP-558:
------------------------------------------
Github user arpadboda commented on a diff in the pull request:
https://github.com/apache/nifi-minifi-cpp/pull/437#discussion_r233507600
--- Diff: extensions/coap/protocols/CoapC2Protocol.cpp ---
@@ -0,0 +1,353 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "CoapC2Protocol.h"
+#include "c2/PayloadSerializer.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace c2 {
+
+#include "coap_functions.h"
+#include "io/BaseStream.h"
+
+CoapProtocol::CoapProtocol(std::string name, utils::Identifier uuid)
+ : RESTSender(name, uuid),
+ require_registration_(false),
+ logger_(logging::LoggerFactory<CoapProtocol>::getLogger()) {
+}
+
+CoapProtocol::~CoapProtocol() {
+}
+
+void CoapProtocol::initialize(const
std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const
std::shared_ptr<Configure> &configure) {
+ RESTSender::initialize(controller, configure);
+ if (configure->get("nifi.c2.coap.connector.service",
controller_service_name_)) {
+ auto service =
controller->getControllerService(controller_service_name_);
+ coap_service_ =
std::static_pointer_cast<controllers::CoapConnectorService>(service);
+ } else {
+ logger_->log_info("No CoAP connector configured, so using default
service");
+ coap_service_ =
std::make_shared<controllers::CoapConnectorService>("cs", configure);
+ coap_service_->onEnable();
+ }
+}
+
+C2Payload CoapProtocol::consumePayload(const std::string &url, const
C2Payload &payload, Direction direction, bool async) {
+ return RESTSender::consumePayload(url, payload, direction, false);
+}
+
+int CoapProtocol::writeAcknowledgement(io::BaseStream *stream, const
C2Payload &payload) {
+ auto ident = payload.getIdentifier();
+ auto state = payload.getStatus().getState();
+ stream->writeUTF(ident);
+ uint8_t payloadState = 0;
+ switch (state) {
+ case state::UpdateState::NESTED:
+ case state::UpdateState::INITIATE:
+ case state::UpdateState::FULLY_APPLIED:
+ case state::UpdateState::READ_COMPLETE:
+ payloadState = 0;
+ break;
+ case state::UpdateState::NOT_APPLIED:
+ case state::UpdateState::PARTIALLY_APPLIED:
+ payloadState = 1;
+ break;
+ case state::UpdateState::READ_ERROR:
+ payloadState = 2;
+ break;
+ case state::UpdateState::SET_ERROR:
+ payloadState = 3;
+ break;
+ }
+ stream->write(&payloadState, 1);
+ return 0;
+}
+
+int CoapProtocol::writeHeartbeat(io::BaseStream *stream, const C2Payload
&payload) {
+ bool byte;
+ uint16_t size = 0;
+
+ std::string deviceIdent;
+ // device identifier
+ auto deviceInfo = getPayload("deviceInfo", payload);
+ if (deviceInfo) {
+ for (const auto &component : deviceInfo->getContent()) {
+ if (!getString(&component, "identifier", &deviceIdent)) {
+ break;
+ }
+ }
+ }
+ stream->writeUTF(deviceIdent, false);
+ std::string agentIdent;
+ // agent identifier
+ auto agentInfo = getPayload("agentInfo", payload);
+ if (agentInfo) {
+ for (const auto &component : agentInfo->getContent()) {
+ if (!getString(&component, "identifier", &agentIdent)) {
+ break;
+ }
+ }
+ }
+ if (agentIdent.empty()) {
+ return -1;
+ }
+ stream->writeUTF(agentIdent, false);
+
+ auto flowInfo = getPayload("flowInfo", payload);
+
+ if (flowInfo != nullptr) {
+
+ auto components = getPayload("components", flowInfo);
+
+ auto queues = getPayload("queues", flowInfo);
+
+ auto versionedFlowSnapshotURI = getPayload("versionedFlowSnapshotURI",
flowInfo);
+
+ if (components && queues && versionedFlowSnapshotURI) {
+ byte = true;
+ stream->write(byte);
+ size = components->getNestedPayloads().size();
+ stream->write(size);
+ // write statuses
+ for (const auto &component : components->getNestedPayloads()) {
+ stream->writeUTF(component.getLabel(), false);
+ for (const auto &cmp : component.getContent()) {
+ auto exists = cmp.operation_arguments.find("running");
+ byte = 0x00;
+ if (exists != cmp.operation_arguments.end()) {
+ const auto &node = exists->second.getValue();
+ if (auto sub_type =
std::dynamic_pointer_cast<state::response::BoolValue>(node)) {
+ byte = sub_type->getValue();
+ }
+ }
+ stream->write(byte);
+ }
+ }
+
+ size = queues->getNestedPayloads().size();
+ stream->write(size);
+ // write statuses
+ for (const auto &component : queues->getNestedPayloads()) {
+ stream->writeUTF(component.getLabel(), false);
+ for (const auto &cmp : component.getContent()) {
+ uint64_t datasize = 0, datasizemax = 0, size = 0, sizemax = 0;
+ getLong(&cmp, "dataSize", &datasize);
+ getLong(&cmp, "dataSizeMax", &datasizemax);
+ getLong(&cmp, "size", &size);
+ getLong(&cmp, "sizeMax", &sizemax);
+ stream->write(datasize);
+ stream->write(datasizemax);
+ stream->write(size);
+ stream->write(sizemax);
+ }
+ }
+ std::string bucketId = "default", flowid = "";
+ for (const auto &cmp : versionedFlowSnapshotURI->getContent()) {
+ auto bid = cmp.operation_arguments.find("bucketId");
+ if (bid != cmp.operation_arguments.end()) {
+ bucketId = bid->second.to_string();
+ }
+
+ auto flowId = cmp.operation_arguments.find("flowId");
+ if (flowId != cmp.operation_arguments.end()) {
+ flowid = flowId->second.to_string();
+ }
+ }
+ stream->writeUTF(bucketId);
+ stream->writeUTF(flowid);
+
+ } else {
+ byte = false;
+ stream->write(byte);
+ }
+
+ } else {
+ byte = false;
+ stream->write(byte);
+ }
+ return 0;
+}
+
+Operation CoapProtocol::getOperation(int type) {
+ switch (type) {
+ case 0:
+ return ACKNOWLEDGE;
+ case 1:
+ return HEARTBEAT;
+ case 2:
+ return CLEAR;
+ case 3:
+ return DESCRIBE;
+ case 4:
+ return RESTART;
+ case 5:
+ return START;
+ case 6:
+ return UPDATE;
+ case 7:
+ return STOP;
+ }
+ return ACKNOWLEDGE;
+}
+
+C2Payload CoapProtocol::serialize(const C2Payload &payload) {
+ if (nullptr == coap_service_) {
+ return C2Payload(payload.getOperation(),
state::UpdateState::READ_ERROR, true);
+ }
+
+ if (require_registration_) {
+ logger_->log_debug("Server requested agent registration, so
attempting");
+ auto response = RESTSender::consumePayload(rest_uri_, payload,
TRANSMIT, false);
+ if (response.getStatus().getState() == state::UpdateState::READ_ERROR)
{
+ logger_->log_trace("Could not register");
+ return C2Payload(payload.getOperation(),
state::UpdateState::READ_COMPLETE, true);
+ } else {
+ logger_->log_trace("Registered agent.");
+ }
+ require_registration_ = false;
+
+ return C2Payload(payload.getOperation(),
state::UpdateState::READ_COMPLETE, true);
+
+ }
+
+ uint16_t version = 0;
+ uint8_t payload_type = 0;
+ uint64_t payload_u64 = 0;
+ uint16_t size = 0;
+ io::BaseStream stream;
+
+ stream.write(version);
+ std::string endpoint = "heartbeat";
+ switch (payload.getOperation()) {
+ case ACKNOWLEDGE:
+ endpoint = "acknowledge";
+ payload_type = 0;
+ stream.write(&payload_type, 1);
+ if (writeAcknowledgement(&stream, payload) != 0) {
+ return C2Payload(payload.getOperation(),
state::UpdateState::READ_ERROR, true);
+ }
+ break;
+ case HEARTBEAT:
+ payload_type = 1;
+ stream.write(&payload_type, 1);
+ if (writeHeartbeat(&stream, payload) != 0) {
+ return C2Payload(payload.getOperation(),
state::UpdateState::READ_ERROR, true);
+ }
+ break;
+ };
+
+ size_t bsize = stream.getSize();
+ auto buffer = (unsigned char*) stream.getBuffer();
+
+ controllers::CoapConnectorService::CoAPMessage
message(coap_service_->sendPayload(COAP_REQUEST_POST, endpoint, buffer, bsize));
+
+ if (message.isRegistrationRequest()) {
+ require_registration_ = true;
+ } else if (message.getSize() > 0) {
+ io::DataStream byteStream(message.getData(), message.getSize());
+ io::BaseStream responseStream(&byteStream);
+ responseStream.read(version);
+ responseStream.read(size);
+ C2Payload new_payload(payload.getOperation(),
state::UpdateState::NESTED, true);
+ for (int i = 0; i < size; i++) {
+
+ uint8_t operationType;
+ uint16_t argsize = 0;
+ std::string operand, id;
+ responseStream.read(operationType);
+ responseStream.readUTF(id, false);
+ responseStream.readUTF(operand, false);
+
+ auto newOp = getOperation(operationType);
+ C2Payload nested_payload(newOp, state::UpdateState::READ_COMPLETE,
true);
+ nested_payload.setIdentifier(id);
+ C2ContentResponse new_command(newOp);
+ new_command.delay = 0;
+ new_command.required = true;
+ new_command.ttl = -1;
+ new_command.name = operand;
+ new_command.ident = id;
+ responseStream.read(argsize);
+ for (int j = 0; j < argsize; j++) {
+ std::string key, value;
+ responseStream.readUTF(key);
+ responseStream.readUTF(value);
+ new_command.operation_arguments[key] = value;
+ }
+
+ nested_payload.addContent(std::move(new_command));
+ new_payload.addPayload(std::move(nested_payload));
+ }
+ return new_payload;
+ }
+
+ return C2Payload(payload.getOperation(), state::UpdateState::READ_ERROR,
true);
+}
+
+const C2Payload * const CoapProtocol::getPayload(const std::string &name,
const C2Payload * const payload) {
+ if (payload->getLabel() == name) {
+ return payload;
+ }
+ for (const auto &nested_payload : payload->getNestedPayloads()) {
+ if (nested_payload.getLabel() == name) {
+ return &nested_payload;
+ }
+ }
+ return nullptr;
+}
+
+int8_t CoapProtocol::getLong(const C2ContentResponse * const cmp, const
std::string &name, uint64_t *value) {
+ auto exists = cmp->operation_arguments.find(name);
+ if (exists != cmp->operation_arguments.end()) {
+ const auto &node = exists->second.getValue();
+ if (auto sub_type =
std::dynamic_pointer_cast<state::response::Int64Value>(node)) {
+ *value = sub_type->getValue();
+ return 0;
+ }
+ }
+ return -1;
+}
+
+int8_t CoapProtocol::getString(const C2ContentResponse * const cmp, const
std::string &name, std::string *value) {
+ auto exists = cmp->operation_arguments.find(name);
+ if (exists != cmp->operation_arguments.end()) {
+ const auto &node = exists->second.getValue();
+ *value = node->getStringValue();
+ return 0;
+ }
+ return -1;
+}
+
+const C2Payload * const CoapProtocol::getPayload(const std::string &name,
const C2Payload &payload) {
+ if (payload.getLabel() == name) {
--- End diff --
```
return getPayLoad(&payload);
```
As the pointer version exists, why to duplicate?
> Move PayloadSerializer in preparation for Coap
> ----------------------------------------------
>
> Key: MINIFICPP-558
> URL: https://issues.apache.org/jira/browse/MINIFICPP-558
> Project: NiFi MiNiFi C++
> Issue Type: Bug
> Reporter: Mr TheSegfault
> Assignee: Mr TheSegfault
> Priority: Major
> Fix For: 0.6.0
>
>
> Move PayloadSerializer
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)