Gerrrr commented on code in PR #12760:
URL: https://github.com/apache/kafka/pull/12760#discussion_r1003148402


##########
tests/kafkatest/tests/streams/streams_upgrade_test.py:
##########
@@ -345,13 +353,13 @@ def start_all_nodes_with(self, version):
                     second_monitor.wait_until(self.processed_data_msg,
                                               timeout_sec=60,
                                               err_msg="Never saw output '%s' 
on " % self.processed_data_msg + str(node2.account))
-                    if KafkaVersion(version).supports_fk_joins():
+                    if 'test.run_fk_join' in extra_properties:

Review Comment:
   nit: let's also check that the corresponding value is `True`
   
   ```suggestion
                       if extra_properties.get('test.run_fk_join', False):
   ```



##########
tests/kafkatest/tests/streams/streams_upgrade_test.py:
##########
@@ -370,20 +378,26 @@ def start_all_nodes_with(self, version):
                         third_monitor.wait_until(self.processed_data_msg,
                                                  timeout_sec=60,
                                                  err_msg="Never saw output 
'%s' on " % self.processed_data_msg + str(node3.account))
-                        if KafkaVersion(version).supports_fk_joins():
+                        if 'test.run_fk_join' in extra_properties:

Review Comment:
   nit: let's also check that the corresponding value is `True`
   
   ```suggestion
                           if extra_properties.get('test.run_fk_join', False):
   ```



##########
tests/kafkatest/tests/streams/streams_upgrade_test.py:
##########
@@ -454,6 +470,11 @@ def do_stop_start_bounce(self, processor, upgrade_from, 
new_version, counter):
                                            timeout_sec=60,
                                            err_msg="Never saw output '%s' on " 
% self.processed_data_msg + str(node.account))
 
+                        if "test.run_fk_join" in extra_properties:
+                            monitor.wait_until(self.processed_fk_msg,
+                                                     timeout_sec=60,

Review Comment:
   nit: indentation



##########
tests/kafkatest/tests/streams/streams_upgrade_test.py:
##########
@@ -323,14 +331,14 @@ def start_all_nodes_with(self, version):
                 monitor.wait_until(self.processed_data_msg,
                                    timeout_sec=60,
                                    err_msg="Never saw output '%s' on " % 
self.processed_data_msg + str(node1.account))
-                if KafkaVersion(version).supports_fk_joins():
+                if 'test.run_fk_join' in extra_properties:

Review Comment:
   nit: let's also check that the corresponding value is `True`
   
   ```suggestion
                   if extra_properties.get('test.run_fk_join', False):
   ```



##########
tests/kafkatest/tests/streams/streams_upgrade_test.py:
##########
@@ -454,6 +470,11 @@ def do_stop_start_bounce(self, processor, upgrade_from, 
new_version, counter):
                                            timeout_sec=60,
                                            err_msg="Never saw output '%s' on " 
% self.processed_data_msg + str(node.account))
 
+                        if "test.run_fk_join" in extra_properties:

Review Comment:
   nit: let's also check that the corresponding value is `True`
   
   ```suggestion
                           if extra_properties.get('test.run_fk_join', False):
   ```



##########
tests/kafkatest/tests/streams/streams_upgrade_test.py:
##########
@@ -308,11 +314,13 @@ def get_version_string(self, version):
         else:
             return "Kafka version: " + version
 
-    def start_all_nodes_with(self, version):
+    def start_all_nodes_with(self, version, extra_properties = None):

Review Comment:
   nit: [PEP8 recommends avoiding spaces 
around](https://peps.python.org/pep-0008/#other-recommendations) `=` in method 
arguments. A linter might complain here, so I suggest removing the empty spaces.



##########
tests/kafkatest/tests/streams/streams_upgrade_test.py:
##########
@@ -370,20 +378,26 @@ def start_all_nodes_with(self, version):
                         third_monitor.wait_until(self.processed_data_msg,
                                                  timeout_sec=60,
                                                  err_msg="Never saw output 
'%s' on " % self.processed_data_msg + str(node3.account))
-                        if KafkaVersion(version).supports_fk_joins():
+                        if 'test.run_fk_join' in extra_properties:
                             third_monitor.wait_until(self.processed_fk_msg,
                             timeout_sec=60,
                             err_msg="Never saw output '%s' on " % 
self.processed_fk_msg + str(node2.account))
 
     @staticmethod
-    def prepare_for(processor, version):
+    def prepare_for(processor, version, extra_properties):
         processor.node.account.ssh("rm -rf " + processor.PERSISTENT_ROOT, 
allow_fail=False)
+        for k, v in extra_properties.items():
+            processor.set_config(k, v)
         if version == str(DEV_VERSION):
             processor.set_version("")  # set to TRUNK
         else:
             processor.set_version(version)
 
-    def do_stop_start_bounce(self, processor, upgrade_from, new_version, 
counter):
+    def do_stop_start_bounce(self, processor, upgrade_from, new_version, 
counter, extra_properties=None):
+

Review Comment:
   nit: empty line



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to