mjsax commented on code in PR #12944:
URL: https://github.com/apache/kafka/pull/12944#discussion_r1040281808


##########
streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java:
##########
@@ -1088,6 +1088,28 @@ public void 
shouldTriggerRecordingOfRocksDBMetricsIfRecordingLevelIsDebug() {
         }
     }
 
+    @Test
+    public void shouldGetClientSupplierFromConfigForConstructor() {
+        final StreamsConfig config = new StreamsConfig(props);
+        final StreamsConfig mockConfig = spy(config);
+        when(mockConfig.getKafkaClientSupplier()).thenReturn(supplier);
+
+        new KafkaStreams(getBuilderWithSource().build(), mockConfig);
+        // It's called once in above when mock
+        verify(mockConfig, times(2)).getKafkaClientSupplier();

Review Comment:
   Why `times(2)` but not `times(1)` ?



##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -806,6 +812,11 @@ public class StreamsConfig extends AbstractConfig {
                     LogAndFailExceptionHandler.class.getName(),
                     Importance.MEDIUM,
                     DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC)
+            .define(DEFAULT_CLIENT_SUPPLIER_CONFIG,
+                    Type.CLASS,
+                    DefaultKafkaClientSupplier.class.getName(),
+                    Importance.MEDIUM,

Review Comment:
   I think it should be `LOW` priority.



##########
streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java:
##########
@@ -1088,6 +1088,28 @@ public void 
shouldTriggerRecordingOfRocksDBMetricsIfRecordingLevelIsDebug() {
         }
     }
 
+    @Test
+    public void shouldGetClientSupplierFromConfigForConstructor() {

Review Comment:
   Should we also test, that a supplier passed into the constructor overwrites 
the config?



##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -1727,6 +1738,15 @@ public static Set<String> 
verifyTopologyOptimizationConfigs(final String config)
         return verifiedConfigs;
     }
 
+    /**
+     * Return configured KafkaClientSupplier
+     * @return Configured KafkaClientSupplier
+     */
+    public KafkaClientSupplier getKafkaClientSupplier() {

Review Comment:
   Was this part of the KIP and do we need it? `StreamsConfig` is public API 
and we cannot just change it.
   
   Also, the client supplier does not implement `Configurable` and thus does 
not need to be configured.
   
   I am ok with reusing `getConfiguredInstance` but we would need a small 
change to the KIP. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to