Github user apiri commented on a diff in the pull request:
https://github.com/apache/nifi-minifi-cpp/pull/321#discussion_r186547479
--- Diff: extensions/mqtt/protocol/PayloadSerializer.h ---
@@ -0,0 +1,365 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef EXTENSIONS_MQTT_PROTOCOL_PAYLOADSERIALIZER_H_
+#define EXTENSIONS_MQTT_PROTOCOL_PAYLOADSERIALIZER_H_
+
+#include "c2/C2Protocol.h"
+#include "io/BaseStream.h"
+#include "core/state/Value.h"
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace c2 {
+namespace mqtt {
+
+class PayloadSerializer {
+ public:
+
+ /**
+ * Static function that serializes the value nodes
+ */
+ static void serializeValueNode(state::response::ValueNode &value,
std::shared_ptr<io::BaseStream> stream) {
+ auto base_type = value.getValue();
+ uint8_t type = 0x00;
+ if (auto sub_type =
std::dynamic_pointer_cast<state::response::IntValue>(base_type)) {
+ type = 1;
+ stream->write(&type, 1);
+ uint32_t value = sub_type->getValue();
+ stream->write(value);
+ } else if (auto sub_type =
std::dynamic_pointer_cast<state::response::Int64Value>(base_type)) {
+ type = 2;
+ stream->write(&type, 1);
+ uint64_t value = sub_type->getValue();
+ stream->write(value);
+ } else if (auto sub_type =
std::dynamic_pointer_cast<state::response::BoolValue>(base_type)) {
+ type = 3;
+ stream->write(&type, 1);
+ if (sub_type->getValue()) {
+ type = 1;
+
+ } else
+ type = 0;
+ stream->write(&type, 1);
+ } else {
+ auto str = base_type->getStringValue();
+ type = 4;
+ stream->write(&type, 1);
+ stream->writeUTF(str);
+ }
+ }
+ static void serialize(uint8_t op, const C2Payload &payload,
std::shared_ptr<io::BaseStream> stream) {
+ uint8_t st;
+ stream->write(payload.getNestedPayloads().size());
+ for (auto nested_payload : payload.getNestedPayloads()) {
+ switch (nested_payload.getOperation()) {
+ case Operation::ACKNOWLEDGE:
+ op = 1;
+ break;
+ case Operation::HEARTBEAT:
+ op = 2;
+ break;
+ case Operation::RESTART:
+ op = 3;
+ break;
+ case Operation::DESCRIBE:
+ op = 4;
+ break;
+ case Operation::STOP:
+ op = 5;
+ break;
+ case Operation::START:
+ op = 6;
+ break;
+ case Operation::UPDATE:
+ op = 7;
+ break;
+ default:
+ op = 2;
+ break;
+ }
+ stream->write(&op, 1);
+ stream->write(&st, 1);
+ stream->writeUTF(nested_payload.getLabel());
+ stream->writeUTF(nested_payload.getIdentifier());
+ const std::vector<C2ContentResponse> &content =
nested_payload.getContent();
+ stream->write(content.size());
+ for (const auto &payload_content : content) {
+ stream->writeUTF(payload_content.name);
+ stream->write(payload_content.operation_arguments.size());
+ for (auto content : payload_content.operation_arguments) {
+ stream->writeUTF(content.first);
+ serializeValueNode(content.second, stream);
+ }
+ }
+ if (nested_payload.getNestedPayloads().size() > 0) {
+ serialize(op, nested_payload, stream);
+ } else {
+ size_t size = 0;
+ stream->write(size);
+ }
+ }
+ }
+
+ static uint8_t opToInt(Operation opt) {
+ uint8_t op;
+
+ switch (opt) {
+ case Operation::ACKNOWLEDGE:
+ op = 1;
+ break;
+ case Operation::HEARTBEAT:
+ op = 2;
+ break;
+ case Operation::RESTART:
+ op = 3;
+ break;
+ case Operation::DESCRIBE:
+ op = 4;
+ break;
+ case Operation::STOP:
+ op = 5;
+ break;
+ case Operation::START:
+ op = 6;
+ break;
+ case Operation::UPDATE:
+ op = 7;
+ break;
+ default:
+ op = 2;
+ break;
+ }
+ return op;
+ }
+ static std::shared_ptr<io::BaseStream> serialize(const C2Payload
&payload) {
+ std::shared_ptr<io::BaseStream> stream =
std::make_shared<io::BaseStream>();
+ uint8_t op, st = 0;
+
+ switch (payload.getOperation()) {
--- End diff --
opToInt?
---