fgerlits commented on code in PR #1850: URL: https://github.com/apache/nifi-minifi-cpp/pull/1850#discussion_r1822781298
########## extensions/librdkafka/migrators/PublishKafkaMigrator.cpp: ########## @@ -0,0 +1,83 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "PublishKafkaMigrator.h" + +#include "core/Resource.h" +#include "core/flow/FlowSchema.h" +#include "controllers/SSLContextService.h" +#include "../PublishKafka.h" + + +namespace org::apache::nifi::minifi::kafka::migration { + +namespace { +constexpr std::string_view DEPRECATED_MESSAGE_KEY_FIELD = "Message Key Field"; +constexpr std::string_view DEPRECATED_SECURITY_CA = "Security CA"; +constexpr std::string_view DEPRECATED_SECURITY_CERT = "Security Cert"; +constexpr std::string_view DEPRECATED_SECURITY_PRIVATE_KEY = "Security Private Key"; +constexpr std::string_view DEPRECATED_SECURITY_PASS_PHRASE = "Security Pass Phrase"; + +void migrateKafkaPropertyToSSLContextService( + const std::string_view deprecated_publish_kafka_property, + const std::string_view ssl_context_service_property, + core::flow::Node& publish_kafka_properties, + core::flow::Node& ssl_controller_service_properties) { + const auto security_ca = publish_kafka_properties.getMember(deprecated_publish_kafka_property); + if (const auto security_ca_str = security_ca ? security_ca.getString() : std::nullopt) { + ssl_controller_service_properties.addMember(ssl_context_service_property, *security_ca_str); + } + + std::ignore = publish_kafka_properties.remove(deprecated_publish_kafka_property); +} +} // namespace + +void PublishKafkaMigrator::migrate(core::flow::Node& root_node, const core::flow::FlowSchema& schema) { + auto publish_kafka_processors = getProcessors(root_node, schema, "PublishKafka"); + for (auto& publish_kafka_processor : publish_kafka_processors) { + auto publish_kafka_properties = publish_kafka_processor[schema.processor_properties]; + if (publish_kafka_properties.remove(DEPRECATED_MESSAGE_KEY_FIELD)) { + logger_->log_warn("Removed deprecated property \"{}\" from {}", DEPRECATED_MESSAGE_KEY_FIELD, *publish_kafka_processor[schema.identifier].getString()); + } + if (publish_kafka_properties.contains(DEPRECATED_SECURITY_CA) || + publish_kafka_properties.contains(DEPRECATED_SECURITY_CERT) || + publish_kafka_properties.contains(DEPRECATED_SECURITY_PRIVATE_KEY) || + publish_kafka_properties.contains(DEPRECATED_SECURITY_PASS_PHRASE)) { + std::string publish_kafka_id_str = publish_kafka_processor[schema.identifier].getString().value_or(std::string{utils::IdGenerator::getIdGenerator()->generate().to_string()}); + auto ssl_context_service_name = fmt::format("GeneratedSSLContextServiceFor_{}", publish_kafka_id_str); + auto root_group = root_node[schema.root_group]; + auto controller_services = root_group[schema.controller_services]; + auto ssl_controller_service = *controller_services.pushBack(); + ssl_controller_service.addMember(schema.name[0], ssl_context_service_name); + ssl_controller_service.addMember(schema.identifier[0], utils::IdGenerator::getIdGenerator()->generate().to_string().c_str()); + ssl_controller_service.addMember(schema.type[0], "SSLContextService"); + + publish_kafka_properties.addMember(processors::PublishKafka::SSLContextService.name, ssl_context_service_name); + auto ssl_controller_service_properties = ssl_controller_service.addObject(schema.controller_service_properties[0]); + + migrateKafkaPropertyToSSLContextService(DEPRECATED_SECURITY_CA, controllers::SSLContextService::CACertificate.name, publish_kafka_properties, *ssl_controller_service_properties); + migrateKafkaPropertyToSSLContextService(DEPRECATED_SECURITY_CERT, controllers::SSLContextService::ClientCertificate.name, publish_kafka_properties, *ssl_controller_service_properties); + migrateKafkaPropertyToSSLContextService(DEPRECATED_SECURITY_PRIVATE_KEY, controllers::SSLContextService::PrivateKey.name, publish_kafka_properties, *ssl_controller_service_properties); + migrateKafkaPropertyToSSLContextService(DEPRECATED_SECURITY_PASS_PHRASE, controllers::SSLContextService::Passphrase.name, publish_kafka_properties, *ssl_controller_service_properties); + + logger_->log_warn("Removed deprecated Security Properties from {} and replaced them with SSLContextService", *publish_kafka_processor[schema.identifier].getString()); Review Comment: ```suggestion logger_->log_warn("Removed deprecated Security Properties from {} and replaced them with SSLContextService", publish_kafka_id_str); ``` ########## libminifi/src/core/flow/StructuredConfiguration.cpp: ########## @@ -1014,4 +1016,20 @@ std::string StructuredConfiguration::serialize(const core::ProcessGroup& process return flow_serializer_->serialize(process_group, schema_, sensitive_values_encryptor_, {}); } +void StructuredConfiguration::migrate(Node& root_node, const FlowSchema& schema) const { + const auto flow_migrator_classes = ClassLoader::getDefaultClassLoader().getAll(ResourceType::FlowMigrator); + + for (auto& flow_migrator_class : flow_migrator_classes) { + if (const auto flow_migrator = ClassLoader::getDefaultClassLoader().instantiate<FlowMigrator>(flow_migrator_class, flow_migrator_class)) { + try { + flow_migrator->migrate(root_node, schema); + } catch (const std::exception& exception) { + logger_->log_error("Caught Exception during flow {}::migration, type: {}, what: {}", flow_migrator->getName(), typeid(exception).name(), exception.what()); + } + } else { + logger_->log_error("ResourceType::FlowMigrator is not a core::flow::FlowMigrator"); Review Comment: Can we log the name of the flow migrator class? ```suggestion logger_->log_error("{} is registered as a flow migrator, but it is not a subclass of core::flow::FlowMigrator", flow_migrator_class); ``` ########## extensions/librdkafka/PublishKafka.cpp: ########## @@ -641,14 +636,6 @@ std::optional<utils::net::SslData> PublishKafka::getSslData(core::ProcessContext } utils::net::SslData ssl_data; - if (auto security_ca = context.getProperty(SecurityCA)) - ssl_data.ca_loc = *security_ca; - if (auto security_cert = context.getProperty(SecurityCert)) - ssl_data.cert_loc = *security_cert; - if (auto security_private_key = context.getProperty(SecurityPrivateKey)) - ssl_data.key_loc = *security_private_key; - if (auto security_private_key_pass = context.getProperty(SecurityPrivateKeyPassWord)) - ssl_data.key_pw = *security_private_key_pass; return ssl_data; } Review Comment: Can we get rid of the `getSslData()` function completely (from both here and `KafkaProcessorBase`)? As far as I can see, it's only called from one place, and its implementation is one line, which could be inlined. ########## extensions/librdkafka/migrators/PublishKafkaMigrator.cpp: ########## @@ -0,0 +1,83 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "PublishKafkaMigrator.h" + +#include "core/Resource.h" +#include "core/flow/FlowSchema.h" +#include "controllers/SSLContextService.h" +#include "../PublishKafka.h" + + +namespace org::apache::nifi::minifi::kafka::migration { + +namespace { +constexpr std::string_view DEPRECATED_MESSAGE_KEY_FIELD = "Message Key Field"; +constexpr std::string_view DEPRECATED_SECURITY_CA = "Security CA"; +constexpr std::string_view DEPRECATED_SECURITY_CERT = "Security Cert"; +constexpr std::string_view DEPRECATED_SECURITY_PRIVATE_KEY = "Security Private Key"; +constexpr std::string_view DEPRECATED_SECURITY_PASS_PHRASE = "Security Pass Phrase"; + +void migrateKafkaPropertyToSSLContextService( + const std::string_view deprecated_publish_kafka_property, + const std::string_view ssl_context_service_property, + core::flow::Node& publish_kafka_properties, + core::flow::Node& ssl_controller_service_properties) { + const auto security_ca = publish_kafka_properties.getMember(deprecated_publish_kafka_property); Review Comment: I would rename `security_ca` to something like `property_value`. ########## libminifi/include/core/flow/FlowMigrator.h: ########## @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + + +#include "core/Core.h" +#include "core/flow/Node.h" +#include "core/flow/FlowSchema.h" + +namespace org::apache::nifi::minifi::core::flow { + +class FlowMigrator : public CoreComponent { + public: + explicit FlowMigrator(const std::string_view name, const utils::Identifier& uuid = {}) : core::CoreComponent(name, uuid) {} + + virtual void migrate(Node& flow_root, const FlowSchema& schema) = 0; + + protected: + void doOnProcessGroup(Node& process_group, const FlowSchema& schema, auto func) const { + func(process_group, schema); + const auto process_group_children = process_group[schema.process_groups]; + if (process_group.isSequence()) { + for (auto child_process_group_node : process_group_children) { + doOnProcessGroup(child_process_group_node, schema, func); + } + } + } + + [[nodiscard]] std::vector<Node> getProcessors(const Node& root_node, const FlowSchema& schema, const std::string_view processor_to_get) const { + std::vector<Node> processors; + auto root_group = root_node[schema.root_group]; + doOnProcessGroup(root_group, schema, [&processors, processor_to_get](auto process_group, const FlowSchema& schema) { Review Comment: I'm genuinely impressed that the compiler can deduce this `auto` type, but spelling out `const Node&` (or is it `Node` by value?) would be helpful to us non-compilers. ########## extensions/librdkafka/tests/PublishKafkaMigratorTests.cpp: ########## @@ -0,0 +1,197 @@ +/** +* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "unit/TestBase.h" +#include "unit/Catch.h" +#include "PublishKafka.h" +#include "unit/ConfigurationTestController.h" +#include "core/flow/AdaptiveConfiguration.h" + +#include "yaml-cpp/yaml.h" + +namespace org::apache::nifi::minifi::test { + +TEST_CASE("PublishKafkaMigratorTest yaml") { + static constexpr std::string_view ORIGINAL_YAML = R"( +MiNiFi Config Version: 3 +Flow Controller: + name: MiNiFi Flow +Processors: + - name: Get files from /tmp/input + id: 7fd166aa-0662-4c42-affa-88f6fb39807f + class: org.apache.nifi.processors.standard.GetFile + scheduling period: 2 sec + scheduling strategy: TIMER_DRIVEN + Properties: + Input Directory: /tmp/input + - name: Publish messages to Kafka topic test + id: 8a534b4a-2b4a-4e1e-ab07-8a09fa08f848 + class: org.apache.nifi.processors.standard.PublishKafka + scheduling strategy: EVENT_DRIVEN + auto-terminated relationships list: + - success + - failure + Properties: + Message Key Field: foo + Batch Size: 10 + Client Name: test-client + Compress Codec: none + Delivery Guarantee: 1 + Known Brokers: kafka-broker:9092 + Message Timeout: 12 sec + Request Timeout: 10 sec + Topic Name: test + Security CA: /tmp/resources/certs/ca-cert + Security Cert: /tmp/resources/certs/client_test_client_client.pem + Security Pass Phrase: abcdefgh + Security Private Key: /tmp/resources/certs/client_test_client_client.key +Connections: + - name: GetFile/success/PublishKafka + id: 1edd529e-eee9-4b05-9e35-f1607bb0243b + source id: 7fd166aa-0662-4c42-affa-88f6fb39807f + source relationship name: success + destination id: 8a534b4a-2b4a-4e1e-ab07-8a09fa08f848 +Controller Services: [] +Remote Processing Groups: [] +)"; + ConfigurationTestController test_controller; + std::string serialized_flow_definition; + SECTION("YamlConfiguration") { + core::YamlConfiguration yaml_config(test_controller.getContext()); + auto root_flow_definition = yaml_config.getRootFromPayload(std::string{ORIGINAL_YAML}); + REQUIRE(root_flow_definition); + serialized_flow_definition = yaml_config.serialize(*root_flow_definition); + } + SECTION("Adaptive Yaml Configuration") { + core::flow::AdaptiveConfiguration adaptive_configuration(test_controller.getContext()); + auto root_flow_definition = adaptive_configuration.getRootFromPayload(std::string{ORIGINAL_YAML}); + REQUIRE(root_flow_definition); + serialized_flow_definition = adaptive_configuration.serialize(*root_flow_definition); + } + YAML::Node migrated_flow = YAML::Load(std::string{serialized_flow_definition}); + CHECK(migrated_flow["Controller Services"].IsSequence()); + CHECK(migrated_flow["Controller Services"].size() == 1); + CHECK(migrated_flow["Controller Services"][0]["name"].as<std::string>() == "GeneratedSSLContextServiceFor_8a534b4a-2b4a-4e1e-ab07-8a09fa08f848"); + CHECK(migrated_flow["Controller Services"][0]["class"].as<std::string>() == "SSLContextService"); + CHECK(migrated_flow["Controller Services"][0]["Properties"].IsMap()); + CHECK(migrated_flow["Controller Services"][0]["Properties"]["CA Certificate"].as<std::string>() == "/tmp/resources/certs/ca-cert"); + CHECK(migrated_flow["Controller Services"][0]["Properties"]["Client Certificate"].as<std::string>() == "/tmp/resources/certs/client_test_client_client.pem"); + CHECK(migrated_flow["Controller Services"][0]["Properties"]["Private Key"].as<std::string>() == "/tmp/resources/certs/client_test_client_client.key"); + CHECK(migrated_flow["Controller Services"][0]["Properties"]["CA Certificate"].as<std::string>() == "/tmp/resources/certs/ca-cert"); Review Comment: "CA Certificate" is checked twice, and "Passphrase" isn't checked (also in the JSON test below). ########## extensions/librdkafka/tests/PublishKafkaMigratorTests.cpp: ########## @@ -0,0 +1,197 @@ +/** +* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "unit/TestBase.h" +#include "unit/Catch.h" +#include "PublishKafka.h" +#include "unit/ConfigurationTestController.h" +#include "core/flow/AdaptiveConfiguration.h" + +#include "yaml-cpp/yaml.h" + +namespace org::apache::nifi::minifi::test { + +TEST_CASE("PublishKafkaMigratorTest yaml") { + static constexpr std::string_view ORIGINAL_YAML = R"( +MiNiFi Config Version: 3 +Flow Controller: + name: MiNiFi Flow +Processors: + - name: Get files from /tmp/input + id: 7fd166aa-0662-4c42-affa-88f6fb39807f + class: org.apache.nifi.processors.standard.GetFile + scheduling period: 2 sec + scheduling strategy: TIMER_DRIVEN + Properties: + Input Directory: /tmp/input + - name: Publish messages to Kafka topic test + id: 8a534b4a-2b4a-4e1e-ab07-8a09fa08f848 + class: org.apache.nifi.processors.standard.PublishKafka + scheduling strategy: EVENT_DRIVEN + auto-terminated relationships list: + - success + - failure + Properties: + Message Key Field: foo + Batch Size: 10 + Client Name: test-client + Compress Codec: none + Delivery Guarantee: 1 + Known Brokers: kafka-broker:9092 + Message Timeout: 12 sec + Request Timeout: 10 sec + Topic Name: test + Security CA: /tmp/resources/certs/ca-cert + Security Cert: /tmp/resources/certs/client_test_client_client.pem + Security Pass Phrase: abcdefgh + Security Private Key: /tmp/resources/certs/client_test_client_client.key +Connections: + - name: GetFile/success/PublishKafka + id: 1edd529e-eee9-4b05-9e35-f1607bb0243b + source id: 7fd166aa-0662-4c42-affa-88f6fb39807f + source relationship name: success + destination id: 8a534b4a-2b4a-4e1e-ab07-8a09fa08f848 +Controller Services: [] +Remote Processing Groups: [] +)"; + ConfigurationTestController test_controller; + std::string serialized_flow_definition; + SECTION("YamlConfiguration") { + core::YamlConfiguration yaml_config(test_controller.getContext()); + auto root_flow_definition = yaml_config.getRootFromPayload(std::string{ORIGINAL_YAML}); + REQUIRE(root_flow_definition); + serialized_flow_definition = yaml_config.serialize(*root_flow_definition); + } + SECTION("Adaptive Yaml Configuration") { + core::flow::AdaptiveConfiguration adaptive_configuration(test_controller.getContext()); + auto root_flow_definition = adaptive_configuration.getRootFromPayload(std::string{ORIGINAL_YAML}); + REQUIRE(root_flow_definition); + serialized_flow_definition = adaptive_configuration.serialize(*root_flow_definition); + } + YAML::Node migrated_flow = YAML::Load(std::string{serialized_flow_definition}); + CHECK(migrated_flow["Controller Services"].IsSequence()); + CHECK(migrated_flow["Controller Services"].size() == 1); + CHECK(migrated_flow["Controller Services"][0]["name"].as<std::string>() == "GeneratedSSLContextServiceFor_8a534b4a-2b4a-4e1e-ab07-8a09fa08f848"); + CHECK(migrated_flow["Controller Services"][0]["class"].as<std::string>() == "SSLContextService"); + CHECK(migrated_flow["Controller Services"][0]["Properties"].IsMap()); + CHECK(migrated_flow["Controller Services"][0]["Properties"]["CA Certificate"].as<std::string>() == "/tmp/resources/certs/ca-cert"); + CHECK(migrated_flow["Controller Services"][0]["Properties"]["Client Certificate"].as<std::string>() == "/tmp/resources/certs/client_test_client_client.pem"); + CHECK(migrated_flow["Controller Services"][0]["Properties"]["Private Key"].as<std::string>() == "/tmp/resources/certs/client_test_client_client.key"); + CHECK(migrated_flow["Controller Services"][0]["Properties"]["CA Certificate"].as<std::string>() == "/tmp/resources/certs/ca-cert"); + + CHECK(!migrated_flow["Processors"][1]["Properties"]["Message Key Field"].IsDefined()); + + CHECK(!migrated_flow["Processors"][1]["Properties"]["Security CA"].IsDefined()); + CHECK(!migrated_flow["Processors"][1]["Properties"]["Security Cert"].IsDefined()); + CHECK(!migrated_flow["Processors"][1]["Properties"]["Security Pass Phrase"].IsDefined()); + CHECK(!migrated_flow["Processors"][1]["Properties"]["Security Private Key"].IsDefined()); Review Comment: minor, but I find `CHECK_FALSE(...)` more readable than `CHECK(!...)` ########## extensions/librdkafka/migrators/PublishKafkaMigrator.cpp: ########## @@ -0,0 +1,83 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "PublishKafkaMigrator.h" + +#include "core/Resource.h" +#include "core/flow/FlowSchema.h" +#include "controllers/SSLContextService.h" +#include "../PublishKafka.h" + + +namespace org::apache::nifi::minifi::kafka::migration { + +namespace { +constexpr std::string_view DEPRECATED_MESSAGE_KEY_FIELD = "Message Key Field"; +constexpr std::string_view DEPRECATED_SECURITY_CA = "Security CA"; +constexpr std::string_view DEPRECATED_SECURITY_CERT = "Security Cert"; +constexpr std::string_view DEPRECATED_SECURITY_PRIVATE_KEY = "Security Private Key"; +constexpr std::string_view DEPRECATED_SECURITY_PASS_PHRASE = "Security Pass Phrase"; + +void migrateKafkaPropertyToSSLContextService( + const std::string_view deprecated_publish_kafka_property, + const std::string_view ssl_context_service_property, + core::flow::Node& publish_kafka_properties, + core::flow::Node& ssl_controller_service_properties) { + const auto security_ca = publish_kafka_properties.getMember(deprecated_publish_kafka_property); + if (const auto security_ca_str = security_ca ? security_ca.getString() : std::nullopt) { + ssl_controller_service_properties.addMember(ssl_context_service_property, *security_ca_str); + } + + std::ignore = publish_kafka_properties.remove(deprecated_publish_kafka_property); +} +} // namespace + +void PublishKafkaMigrator::migrate(core::flow::Node& root_node, const core::flow::FlowSchema& schema) { + auto publish_kafka_processors = getProcessors(root_node, schema, "PublishKafka"); + for (auto& publish_kafka_processor : publish_kafka_processors) { + auto publish_kafka_properties = publish_kafka_processor[schema.processor_properties]; + if (publish_kafka_properties.remove(DEPRECATED_MESSAGE_KEY_FIELD)) { + logger_->log_warn("Removed deprecated property \"{}\" from {}", DEPRECATED_MESSAGE_KEY_FIELD, *publish_kafka_processor[schema.identifier].getString()); + } + if (publish_kafka_properties.contains(DEPRECATED_SECURITY_CA) || + publish_kafka_properties.contains(DEPRECATED_SECURITY_CERT) || + publish_kafka_properties.contains(DEPRECATED_SECURITY_PRIVATE_KEY) || + publish_kafka_properties.contains(DEPRECATED_SECURITY_PASS_PHRASE)) { + std::string publish_kafka_id_str = publish_kafka_processor[schema.identifier].getString().value_or(std::string{utils::IdGenerator::getIdGenerator()->generate().to_string()}); Review Comment: Can it happen that the processor doesn't have an ID? If not, then we could replace the generation of a fake ID with an assertion. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
