szaszm commented on a change in pull request #1120:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1120#discussion_r680863837



##########
File path: 
docker/test/integration/resources/kafka_broker/conf/server-ssl.properties
##########
@@ -135,12 +135,13 @@ zookeeper.connection.timeout.ms=6000
 # However, in production environments the default value of 3 seconds is more 
suitable as this will help to avoid unnecessary, and potentially expensive, 
rebalances during application startup.
 group.initial.rebalance.delay.ms=0
 
-security.inter.broker.protocol=SSL
-listeners=PLAINTEXT://:9092,SSL://:9093
+listeners=SSL://kafka-broker:9093,SSL_HOST://0.0.0.0:29093
+advertised.listeners=SSL://kafka-broker:9093,SSL_HOST://localhost:29093
+listener.security.protocol.map=SSL:SSL,SSL_HOST:SSL
 
 # SSL
 ssl.protocol = TLS
-ssl.enabled.protocols=TLSv1.2
+ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1

Review comment:
       I'd rather test the original, more strict protocol suite. Proving that 
high security configurations work is more valuable than proving that low 
security ones work, since you can always go higher security, but lower may not 
be acceptable to some users.

##########
File path: docker/test/integration/minifi/core/SingleNodeDockerCluster.py
##########
@@ -225,21 +224,32 @@ def deploy_kafka_broker(self):
         logging.info('Adding container \'%s\'', zookeeper.name)
         self.containers[zookeeper.name] = zookeeper
 
+    def deploy_kafka_broker(self):
+        logging.info('Creating and running docker containers for kafka 
broker...')
+        self.deploy_zookeeper()
+
         test_dir = os.environ['PYTHONPATH'].split(':')[-1]  # Based on 
DockerVerify.sh
         broker_image = self.build_image_by_path(test_dir + 
"/resources/kafka_broker", 'minifi-kafka')
         broker = self.client.containers.run(
             broker_image[0],
             detach=True,
             name='kafka-broker',
             network=self.network.name,
-            ports={'9092/tcp': 9092, '29092/tcp': 29092},
+            ports={'9092/tcp': 9092, '29092/tcp': 29092, '9093/tcp': 9093, 
'29093/tcp': 29093},
             environment=[
                 "KAFKA_BROKER_ID=1",
-                'ALLOW_PLAINTEXT_LISTENER: "yes"',
-                
"KAFKA_LISTENERS=PLAINTEXT://kafka-broker:9092,SSL://kafka-broker:9093,PLAINTEXT_HOST://0.0.0.0:29092",
-                
"KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,SSL:SSL",
-                
"KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka-broker:9092,SSL://kafka-broker:9093,PLAINTEXT_HOST://localhost:29092",
-                "KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181"])
+                "ALLOW_PLAINTEXT_LISTENER=yes",
+                "KAFKA_AUTO_CREATE_TOPICS_ENABLE=true",
+                
"KAFKA_LISTENERS=PLAINTEXT://kafka-broker:9092,SSL://kafka-broker:9093,SSL_HOST://0.0.0.0:29093,PLAINTEXT_HOST://0.0.0.0:29092",
+                
"KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,SSL:SSL,SSL_HOST:SSL",
+                "KAFKA_SECURITY_INTER_BROKER_PROTOCOL=SSL",
+                
"KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka-broker:9092,PLAINTEXT_HOST://localhost:29092,SSL://kafka-broker:9093,SSL_HOST://localhost:29093",
+                "KAFKA_HEAP_OPTS=-Xms512m -Xmx1g",
+                "KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181",
+                "SSL_CLIENT_AUTH=none"],
+            volumes=self.vols,
+            sysctls={"net.ipv6.conf.all.disable_ipv6": "1"})

Review comment:
       Is there a reason for disabling IPv6 in the container?

##########
File path: extensions/librdkafka/tests/ConsumeKafkaTests.cpp
##########
@@ -586,4 +598,19 @@ TEST_CASE_METHOD(ConsumeKafkaContinuousPublishingTest, 
"ConsumeKafka can spend n
   //  I tried adding a wait time for more than "session.timeout.ms" inbetween 
tests, but it was not sufficient
 }
 
+TEST_CASE_METHOD(ConsumeKafkaPropertiesTest, "ConsumeKafka can communicate 
with the broker via SSL.", "[ConsumeKafka][Kafka][SSL]") {
+  auto run_tests = [&] (const std::vector<std::string>& messages_on_topic) {
+    single_consumer_with_plain_text_test(true, {}, messages_on_topic, 
NON_TRANSACTIONAL_MESSAGES, {}, "localhost:9093", 
ConsumeKafka::SECURITY_PROTOCOL_SSL, "ConsumeKafkaTest", {}, {}, 
"test_group_id", {}, {}, {}, {}, {}, {}, "1", "2 sec", "60 sec"); // NOLINT
+  };
+  const auto get_current_timestamp = [] {
+    const std::time_t result = std::time(nullptr);
+    std::stringstream time_stream;
+    time_stream << std::asctime(std::localtime(&result));
+    return time_stream.str();
+  };

Review comment:
       I'd prefer a C++'y way of doing this, using the chrono, date and date-tz 
libraries. In addition to the below suggestion, you need to link the date and 
date-tz targets to the test and include "date/date.h" and "date/tz.h".
   ```suggestion
     const auto get_current_timestamp = [] {
       return date::format("%a %b %e %H:%M:%S %Y", 
date::make_zoned(date::current_zone(), 
std::chrono::floor<std::chrono::seconds>(std::chrono::system_clock::now())));
     };
   ```
   
   The C++20 version would look like this, but currently only MSVC implements 
the timezone and format support required:
   ```
     const auto get_current_timestamp = [] {
       return std::format("{:%a %b %e %H:%M:%S %Y}", 
std::chrono::zoned_time{std::chrono::current_zone(), 
std::chrono::floor<std::chrono::seconds>(std::chrono::system_clock::now())});
     };
   ```
   
   If you don't want to introduce the dependencies just to save a few lines, 
that's also fine for me.

##########
File path: extensions/librdkafka/ConsumeKafka.cpp
##########
@@ -185,7 +205,11 @@ void ConsumeKafka::initialize() {
     DuplicateHeaderHandling,
     MaxPollRecords,
     MaxPollTime,
-    SessionTimeout
+    SessionTimeout,
+    SecurityCA,
+    SecurityCert,
+    SecurityPrivateKey,
+    SecurityPrivateKeyPassword

Review comment:
       NiFi uses SSLContextService to provide these details. We seem to have 
the same implemented in MiNiFi C++, so I would prefer to have the same approach.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to