mjsax commented on code in PR #14548:
URL: https://github.com/apache/kafka/pull/14548#discussion_r1359477497


##########
build.gradle:
##########
@@ -2151,7 +2151,7 @@ project(':streams') {
 
   task genStreamsConfigDocs(type: JavaExec) {
     classpath = sourceSets.main.runtimeClasspath
-    mainClass = 'org.apache.kafka.streams.StreamsConfig'
+    mainClass = 'org.apache.kafka.streams.internals.InternalStreamsConfig'

Review Comment:
   We use `main` to generate the config HTML for the webpage, so update to use 
internal one going forward



##########
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##########
@@ -843,7 +843,7 @@ public KafkaStreams(final Topology topology,
     public KafkaStreams(final Topology topology,
                         final StreamsConfig applicationConfigs,
                         final KafkaClientSupplier clientSupplier) {
-        this(new TopologyMetadata(topology.internalTopologyBuilder, 
applicationConfigs), applicationConfigs, clientSupplier);
+        this(topology, new InternalStreamsConfig(applicationConfigs), 
clientSupplier, Time.SYSTEM);

Review Comment:
   Some side cleanup on constructor methods -- no public change -- just to make 
the flow simpler



##########
streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java:
##########
@@ -1089,34 +1091,30 @@ public void 
shouldTriggerRecordingOfRocksDBMetricsIfRecordingLevelIsDebug() {
 
     @Test
     public void shouldGetClientSupplierFromConfigForConstructor() {

Review Comment:
   This and the next test needed an update, because the refactoring break the 
mocking -- Cf the change in `MockClientSupplier`, too.



##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -1171,72 +1170,6 @@ public class StreamsConfig extends AbstractConfig {
         CONSUMER_EOS_OVERRIDES = 
Collections.unmodifiableMap(tempConsumerDefaultOverrides);
     }
 
-    public static class InternalConfig {

Review Comment:
   Even is this class it's public, it's called `InternalConfig` and thus 
clearly internal -- I believe it's ok to move to new sub-class without 
deprecation (as a matter of fact, we won't need this helper class any longer, 
and I only moved the members of this class to `InternalStreamsConfig`.



##########
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##########
@@ -790,7 +790,7 @@ public KafkaStreams(final Topology topology,
     public KafkaStreams(final Topology topology,
                         final Properties props,
                         final Time time) {
-        this(topology, new StreamsConfig(props), time);
+        this(topology, new InternalStreamsConfig(props), null, time);

Review Comment:
   Some side cleanup on constructor methods -- no public change -- just to make 
the flow simpler



##########
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##########
@@ -860,32 +860,37 @@ public KafkaStreams(final Topology topology,
     public KafkaStreams(final Topology topology,
                         final StreamsConfig applicationConfigs,
                         final Time time) {
-        this(new TopologyMetadata(topology.internalTopologyBuilder, 
applicationConfigs), applicationConfigs, 
applicationConfigs.getKafkaClientSupplier(), time);
+        this(topology, new InternalStreamsConfig(applicationConfigs), null, 
time);

Review Comment:
   Some side cleanup on constructor methods -- no public change -- just to make 
the flow simpler



##########
streams/src/main/java/org/apache/kafka/streams/internals/InternalStreamsConfig.java:
##########
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.internals;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.streams.KafkaClientSupplier;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
+import org.apache.kafka.streams.errors.ProductionExceptionHandler;
+import org.apache.kafka.streams.processor.TimestampExtractor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class InternalStreamsConfig extends StreamsConfig {
+    private static final Logger log = 
LoggerFactory.getLogger(StreamsConfig.class);
+
+    // This is settable in the main Streams config, but it's a private API for 
now
+    public static final String TASK_ASSIGNOR_CLASS = 
"__internal.task.assignor.class__";
+    // These are not settable in the main Streams config; they are set by the 
StreamThread to pass internal
+    // state into the assignor.
+    public static final String REFERENCE_CONTAINER_PARTITION_ASSIGNOR = 
"__reference.container.instance__";
+    // This is settable in the main Streams config, but it's a private API for 
testing
+    public static final String ASSIGNMENT_LISTENER = "__assignment.listener__";
+    // Private API used to control the emit latency for left/outer join 
results (https://issues.apache.org/jira/browse/KAFKA-10847)
+    public static final String 
EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX = 
"__emit.interval.ms.kstreams.outer.join.spurious.results.fix__";
+    // Private API used to control the emit latency for windowed aggregation 
results for ON_WINDOW_CLOSE emit strategy
+    public static final String EMIT_INTERVAL_MS_KSTREAMS_WINDOWED_AGGREGATION 
= "__emit.interval.ms.kstreams.windowed.aggregation__";
+    // Private API used to control the usage of consistency offset vectors
+    public static final String IQ_CONSISTENCY_OFFSET_VECTOR_ENABLED = 
"__iq.consistency.offset.vector.enabled__";
+    // Private API used to control the prefix of the auto created topics
+    public static final String TOPIC_PREFIX_ALTERNATIVE = 
"__internal.override.topic.prefix__";
+    // Private API to enable the state updater (i.e. state updating on a 
dedicated thread)
+    public static final String STATE_UPDATER_ENABLED = 
"__state.updater.enabled__";
+
+
+
+    public InternalStreamsConfig(final StreamsConfig streamsConfig) {
+        super(streamsConfig.originals(), false);
+    }
+
+    public InternalStreamsConfig(final Map<?, ?> configs) {
+        super(configs, false);
+    }
+
+
+
+    /**
+     * Return a copy of the config definition.
+     *
+     * @return a copy of the config definition
+     */
+    @SuppressWarnings({"deprecation", "unused"})
+    public static ConfigDef configDef() {
+        return new ConfigDef(CONFIG);
+    }
+
+    public static boolean stateUpdaterEnabled(final Map<String, Object> 
configs) {

Review Comment:
   Dropped the `get` prefix during refactoring -- similar for other method 
below.



##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -1273,16 +1206,6 @@ public static String restoreConsumerPrefix(final String 
consumerProp) {
         return RESTORE_CONSUMER_PREFIX + consumerProp;
     }
 
-    /**
-     * Prefix a client tag key with {@link #CLIENT_TAG_PREFIX}.
-     *
-     * @param clientTagKey client tag key
-     * @return {@link #CLIENT_TAG_PREFIX} + {@code clientTagKey}
-     */
-    public static String clientTagPrefix(final String clientTagKey) {

Review Comment:
   Just moved further below -- no change.



##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -240,14 +241,12 @@ public class StreamsConfig extends AbstractConfig {
     public static final String CLIENT_TAG_PREFIX = "client.tag.";
 
     /** {@code topology.optimization} */
-    private static final String CONFIG_ERROR_MSG = "Acceptable values are:"

Review Comment:
   Just moved below -- the above `JavaDoc` belongs to 
`TOPOLOGY_OPTIMIZATION_CONFIG` and we should fix this in any case...



##########
streams/src/main/java/org/apache/kafka/streams/internals/InternalStreamsConfig.java:
##########
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.internals;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.streams.KafkaClientSupplier;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
+import org.apache.kafka.streams.errors.ProductionExceptionHandler;
+import org.apache.kafka.streams.processor.TimestampExtractor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class InternalStreamsConfig extends StreamsConfig {
+    private static final Logger log = 
LoggerFactory.getLogger(StreamsConfig.class);

Review Comment:
   To hide `InternalStreamsConfig` I pass in `StreamsConfig.class` -- not sure 
if this is good or bad...



##########
streams/src/main/java/org/apache/kafka/streams/internals/InternalStreamsConfig.java:
##########
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.internals;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.streams.KafkaClientSupplier;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
+import org.apache.kafka.streams.errors.ProductionExceptionHandler;
+import org.apache.kafka.streams.processor.TimestampExtractor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class InternalStreamsConfig extends StreamsConfig {
+    private static final Logger log = 
LoggerFactory.getLogger(StreamsConfig.class);
+
+    // This is settable in the main Streams config, but it's a private API for 
now
+    public static final String TASK_ASSIGNOR_CLASS = 
"__internal.task.assignor.class__";
+    // These are not settable in the main Streams config; they are set by the 
StreamThread to pass internal
+    // state into the assignor.
+    public static final String REFERENCE_CONTAINER_PARTITION_ASSIGNOR = 
"__reference.container.instance__";
+    // This is settable in the main Streams config, but it's a private API for 
testing
+    public static final String ASSIGNMENT_LISTENER = "__assignment.listener__";
+    // Private API used to control the emit latency for left/outer join 
results (https://issues.apache.org/jira/browse/KAFKA-10847)
+    public static final String 
EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX = 
"__emit.interval.ms.kstreams.outer.join.spurious.results.fix__";
+    // Private API used to control the emit latency for windowed aggregation 
results for ON_WINDOW_CLOSE emit strategy
+    public static final String EMIT_INTERVAL_MS_KSTREAMS_WINDOWED_AGGREGATION 
= "__emit.interval.ms.kstreams.windowed.aggregation__";
+    // Private API used to control the usage of consistency offset vectors
+    public static final String IQ_CONSISTENCY_OFFSET_VECTOR_ENABLED = 
"__iq.consistency.offset.vector.enabled__";
+    // Private API used to control the prefix of the auto created topics
+    public static final String TOPIC_PREFIX_ALTERNATIVE = 
"__internal.override.topic.prefix__";
+    // Private API to enable the state updater (i.e. state updating on a 
dedicated thread)
+    public static final String STATE_UPDATER_ENABLED = 
"__state.updater.enabled__";
+
+
+
+    public InternalStreamsConfig(final StreamsConfig streamsConfig) {
+        super(streamsConfig.originals(), false);
+    }
+
+    public InternalStreamsConfig(final Map<?, ?> configs) {
+        super(configs, false);
+    }
+
+
+
+    /**
+     * Return a copy of the config definition.
+     *
+     * @return a copy of the config definition
+     */
+    @SuppressWarnings({"deprecation", "unused"})
+    public static ConfigDef configDef() {
+        return new ConfigDef(CONFIG);
+    }
+
+    public static boolean stateUpdaterEnabled(final Map<String, Object> 
configs) {
+        return getBoolean(configs, STATE_UPDATER_ENABLED, true);
+    }
+
+    public static boolean getBoolean(final Map<String, Object> configs, final 
String key, final boolean defaultValue) {

Review Comment:
   Kept the `get` here and below for similar oned, because `AbstractConfig` 
also uses `get` -- not 100% sure what is cleaner.



##########
streams/src/main/java/org/apache/kafka/streams/internals/InternalStreamsConfig.java:
##########
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.internals;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.streams.KafkaClientSupplier;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
+import org.apache.kafka.streams.errors.ProductionExceptionHandler;
+import org.apache.kafka.streams.processor.TimestampExtractor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class InternalStreamsConfig extends StreamsConfig {
+    private static final Logger log = 
LoggerFactory.getLogger(StreamsConfig.class);
+
+    // This is settable in the main Streams config, but it's a private API for 
now
+    public static final String TASK_ASSIGNOR_CLASS = 
"__internal.task.assignor.class__";

Review Comment:
   Minor rename -- removed `INTERNAL_` prefix on the variable name to align to 
all other internal config -- also added `__` as prefix/suffix to the config 
name.



##########
streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java:
##########
@@ -58,6 +60,7 @@ public void setCluster(final Cluster cluster) {
 
     @Override
     public Admin getAdmin(final Map<String, Object> config) {
+        ++adminCount;

Review Comment:
   Minor change to fix the test as mentioned above.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/ClientUtils.java:
##########
@@ -52,12 +51,6 @@
 public class ClientUtils {
     private static final Logger LOG = 
LoggerFactory.getLogger(ClientUtils.class);
 
-    public static final class QuietStreamsConfig extends StreamsConfig {

Review Comment:
   The new `InternalStreamsConfig` is a "quiet config" and thus we don't need 
this helper any longer



##########
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##########
@@ -860,32 +860,37 @@ public KafkaStreams(final Topology topology,
     public KafkaStreams(final Topology topology,
                         final StreamsConfig applicationConfigs,
                         final Time time) {
-        this(new TopologyMetadata(topology.internalTopologyBuilder, 
applicationConfigs), applicationConfigs, 
applicationConfigs.getKafkaClientSupplier(), time);
+        this(topology, new InternalStreamsConfig(applicationConfigs), null, 
time);
     }
 
     private KafkaStreams(final Topology topology,
-                         final StreamsConfig applicationConfigs,
+                         final InternalStreamsConfig applicationConfigs,
                          final KafkaClientSupplier clientSupplier,
                          final Time time) throws StreamsException {
-        this(new TopologyMetadata(topology.internalTopologyBuilder, 
applicationConfigs), applicationConfigs, clientSupplier, time);
-    }
-
-    protected KafkaStreams(final TopologyMetadata topologyMetadata,

Review Comment:
   Can remove this as post of side-clenaup



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to