mjsax commented on code in PR #14548:
URL: https://github.com/apache/kafka/pull/14548#discussion_r1359477497
##########
build.gradle:
##########
@@ -2151,7 +2151,7 @@ project(':streams') {
task genStreamsConfigDocs(type: JavaExec) {
classpath = sourceSets.main.runtimeClasspath
- mainClass = 'org.apache.kafka.streams.StreamsConfig'
+ mainClass = 'org.apache.kafka.streams.internals.InternalStreamsConfig'
Review Comment:
We use `main` to generate the config HTML for the webpage, so update to use
internal one going forward
##########
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##########
@@ -843,7 +843,7 @@ public KafkaStreams(final Topology topology,
public KafkaStreams(final Topology topology,
final StreamsConfig applicationConfigs,
final KafkaClientSupplier clientSupplier) {
- this(new TopologyMetadata(topology.internalTopologyBuilder,
applicationConfigs), applicationConfigs, clientSupplier);
+ this(topology, new InternalStreamsConfig(applicationConfigs),
clientSupplier, Time.SYSTEM);
Review Comment:
Some side cleanup on constructor methods -- no public change -- just to make
the flow simpler
##########
streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java:
##########
@@ -1089,34 +1091,30 @@ public void
shouldTriggerRecordingOfRocksDBMetricsIfRecordingLevelIsDebug() {
@Test
public void shouldGetClientSupplierFromConfigForConstructor() {
Review Comment:
This and the next test needed an update, because the refactoring break the
mocking -- Cf the change in `MockClientSupplier`, too.
##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -1171,72 +1170,6 @@ public class StreamsConfig extends AbstractConfig {
CONSUMER_EOS_OVERRIDES =
Collections.unmodifiableMap(tempConsumerDefaultOverrides);
}
- public static class InternalConfig {
Review Comment:
Even is this class it's public, it's called `InternalConfig` and thus
clearly internal -- I believe it's ok to move to new sub-class without
deprecation (as a matter of fact, we won't need this helper class any longer,
and I only moved the members of this class to `InternalStreamsConfig`.
##########
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##########
@@ -790,7 +790,7 @@ public KafkaStreams(final Topology topology,
public KafkaStreams(final Topology topology,
final Properties props,
final Time time) {
- this(topology, new StreamsConfig(props), time);
+ this(topology, new InternalStreamsConfig(props), null, time);
Review Comment:
Some side cleanup on constructor methods -- no public change -- just to make
the flow simpler
##########
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##########
@@ -860,32 +860,37 @@ public KafkaStreams(final Topology topology,
public KafkaStreams(final Topology topology,
final StreamsConfig applicationConfigs,
final Time time) {
- this(new TopologyMetadata(topology.internalTopologyBuilder,
applicationConfigs), applicationConfigs,
applicationConfigs.getKafkaClientSupplier(), time);
+ this(topology, new InternalStreamsConfig(applicationConfigs), null,
time);
Review Comment:
Some side cleanup on constructor methods -- no public change -- just to make
the flow simpler
##########
streams/src/main/java/org/apache/kafka/streams/internals/InternalStreamsConfig.java:
##########
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.internals;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.streams.KafkaClientSupplier;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
+import org.apache.kafka.streams.errors.ProductionExceptionHandler;
+import org.apache.kafka.streams.processor.TimestampExtractor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class InternalStreamsConfig extends StreamsConfig {
+ private static final Logger log =
LoggerFactory.getLogger(StreamsConfig.class);
+
+ // This is settable in the main Streams config, but it's a private API for
now
+ public static final String TASK_ASSIGNOR_CLASS =
"__internal.task.assignor.class__";
+ // These are not settable in the main Streams config; they are set by the
StreamThread to pass internal
+ // state into the assignor.
+ public static final String REFERENCE_CONTAINER_PARTITION_ASSIGNOR =
"__reference.container.instance__";
+ // This is settable in the main Streams config, but it's a private API for
testing
+ public static final String ASSIGNMENT_LISTENER = "__assignment.listener__";
+ // Private API used to control the emit latency for left/outer join
results (https://issues.apache.org/jira/browse/KAFKA-10847)
+ public static final String
EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX =
"__emit.interval.ms.kstreams.outer.join.spurious.results.fix__";
+ // Private API used to control the emit latency for windowed aggregation
results for ON_WINDOW_CLOSE emit strategy
+ public static final String EMIT_INTERVAL_MS_KSTREAMS_WINDOWED_AGGREGATION
= "__emit.interval.ms.kstreams.windowed.aggregation__";
+ // Private API used to control the usage of consistency offset vectors
+ public static final String IQ_CONSISTENCY_OFFSET_VECTOR_ENABLED =
"__iq.consistency.offset.vector.enabled__";
+ // Private API used to control the prefix of the auto created topics
+ public static final String TOPIC_PREFIX_ALTERNATIVE =
"__internal.override.topic.prefix__";
+ // Private API to enable the state updater (i.e. state updating on a
dedicated thread)
+ public static final String STATE_UPDATER_ENABLED =
"__state.updater.enabled__";
+
+
+
+ public InternalStreamsConfig(final StreamsConfig streamsConfig) {
+ super(streamsConfig.originals(), false);
+ }
+
+ public InternalStreamsConfig(final Map<?, ?> configs) {
+ super(configs, false);
+ }
+
+
+
+ /**
+ * Return a copy of the config definition.
+ *
+ * @return a copy of the config definition
+ */
+ @SuppressWarnings({"deprecation", "unused"})
+ public static ConfigDef configDef() {
+ return new ConfigDef(CONFIG);
+ }
+
+ public static boolean stateUpdaterEnabled(final Map<String, Object>
configs) {
Review Comment:
Dropped the `get` prefix during refactoring -- similar for other method
below.
##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -1273,16 +1206,6 @@ public static String restoreConsumerPrefix(final String
consumerProp) {
return RESTORE_CONSUMER_PREFIX + consumerProp;
}
- /**
- * Prefix a client tag key with {@link #CLIENT_TAG_PREFIX}.
- *
- * @param clientTagKey client tag key
- * @return {@link #CLIENT_TAG_PREFIX} + {@code clientTagKey}
- */
- public static String clientTagPrefix(final String clientTagKey) {
Review Comment:
Just moved further below -- no change.
##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -240,14 +241,12 @@ public class StreamsConfig extends AbstractConfig {
public static final String CLIENT_TAG_PREFIX = "client.tag.";
/** {@code topology.optimization} */
- private static final String CONFIG_ERROR_MSG = "Acceptable values are:"
Review Comment:
Just moved below -- the above `JavaDoc` belongs to
`TOPOLOGY_OPTIMIZATION_CONFIG` and we should fix this in any case...
##########
streams/src/main/java/org/apache/kafka/streams/internals/InternalStreamsConfig.java:
##########
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.internals;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.streams.KafkaClientSupplier;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
+import org.apache.kafka.streams.errors.ProductionExceptionHandler;
+import org.apache.kafka.streams.processor.TimestampExtractor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class InternalStreamsConfig extends StreamsConfig {
+ private static final Logger log =
LoggerFactory.getLogger(StreamsConfig.class);
Review Comment:
To hide `InternalStreamsConfig` I pass in `StreamsConfig.class` -- not sure
if this is good or bad...
##########
streams/src/main/java/org/apache/kafka/streams/internals/InternalStreamsConfig.java:
##########
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.internals;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.streams.KafkaClientSupplier;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
+import org.apache.kafka.streams.errors.ProductionExceptionHandler;
+import org.apache.kafka.streams.processor.TimestampExtractor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class InternalStreamsConfig extends StreamsConfig {
+ private static final Logger log =
LoggerFactory.getLogger(StreamsConfig.class);
+
+ // This is settable in the main Streams config, but it's a private API for
now
+ public static final String TASK_ASSIGNOR_CLASS =
"__internal.task.assignor.class__";
+ // These are not settable in the main Streams config; they are set by the
StreamThread to pass internal
+ // state into the assignor.
+ public static final String REFERENCE_CONTAINER_PARTITION_ASSIGNOR =
"__reference.container.instance__";
+ // This is settable in the main Streams config, but it's a private API for
testing
+ public static final String ASSIGNMENT_LISTENER = "__assignment.listener__";
+ // Private API used to control the emit latency for left/outer join
results (https://issues.apache.org/jira/browse/KAFKA-10847)
+ public static final String
EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX =
"__emit.interval.ms.kstreams.outer.join.spurious.results.fix__";
+ // Private API used to control the emit latency for windowed aggregation
results for ON_WINDOW_CLOSE emit strategy
+ public static final String EMIT_INTERVAL_MS_KSTREAMS_WINDOWED_AGGREGATION
= "__emit.interval.ms.kstreams.windowed.aggregation__";
+ // Private API used to control the usage of consistency offset vectors
+ public static final String IQ_CONSISTENCY_OFFSET_VECTOR_ENABLED =
"__iq.consistency.offset.vector.enabled__";
+ // Private API used to control the prefix of the auto created topics
+ public static final String TOPIC_PREFIX_ALTERNATIVE =
"__internal.override.topic.prefix__";
+ // Private API to enable the state updater (i.e. state updating on a
dedicated thread)
+ public static final String STATE_UPDATER_ENABLED =
"__state.updater.enabled__";
+
+
+
+ public InternalStreamsConfig(final StreamsConfig streamsConfig) {
+ super(streamsConfig.originals(), false);
+ }
+
+ public InternalStreamsConfig(final Map<?, ?> configs) {
+ super(configs, false);
+ }
+
+
+
+ /**
+ * Return a copy of the config definition.
+ *
+ * @return a copy of the config definition
+ */
+ @SuppressWarnings({"deprecation", "unused"})
+ public static ConfigDef configDef() {
+ return new ConfigDef(CONFIG);
+ }
+
+ public static boolean stateUpdaterEnabled(final Map<String, Object>
configs) {
+ return getBoolean(configs, STATE_UPDATER_ENABLED, true);
+ }
+
+ public static boolean getBoolean(final Map<String, Object> configs, final
String key, final boolean defaultValue) {
Review Comment:
Kept the `get` here and below for similar oned, because `AbstractConfig`
also uses `get` -- not 100% sure what is cleaner.
##########
streams/src/main/java/org/apache/kafka/streams/internals/InternalStreamsConfig.java:
##########
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.internals;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.streams.KafkaClientSupplier;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
+import org.apache.kafka.streams.errors.ProductionExceptionHandler;
+import org.apache.kafka.streams.processor.TimestampExtractor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class InternalStreamsConfig extends StreamsConfig {
+ private static final Logger log =
LoggerFactory.getLogger(StreamsConfig.class);
+
+ // This is settable in the main Streams config, but it's a private API for
now
+ public static final String TASK_ASSIGNOR_CLASS =
"__internal.task.assignor.class__";
Review Comment:
Minor rename -- removed `INTERNAL_` prefix on the variable name to align to
all other internal config -- also added `__` as prefix/suffix to the config
name.
##########
streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java:
##########
@@ -58,6 +60,7 @@ public void setCluster(final Cluster cluster) {
@Override
public Admin getAdmin(final Map<String, Object> config) {
+ ++adminCount;
Review Comment:
Minor change to fix the test as mentioned above.
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/ClientUtils.java:
##########
@@ -52,12 +51,6 @@
public class ClientUtils {
private static final Logger LOG =
LoggerFactory.getLogger(ClientUtils.class);
- public static final class QuietStreamsConfig extends StreamsConfig {
Review Comment:
The new `InternalStreamsConfig` is a "quiet config" and thus we don't need
this helper any longer
##########
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##########
@@ -860,32 +860,37 @@ public KafkaStreams(final Topology topology,
public KafkaStreams(final Topology topology,
final StreamsConfig applicationConfigs,
final Time time) {
- this(new TopologyMetadata(topology.internalTopologyBuilder,
applicationConfigs), applicationConfigs,
applicationConfigs.getKafkaClientSupplier(), time);
+ this(topology, new InternalStreamsConfig(applicationConfigs), null,
time);
}
private KafkaStreams(final Topology topology,
- final StreamsConfig applicationConfigs,
+ final InternalStreamsConfig applicationConfigs,
final KafkaClientSupplier clientSupplier,
final Time time) throws StreamsException {
- this(new TopologyMetadata(topology.internalTopologyBuilder,
applicationConfigs), applicationConfigs, clientSupplier, time);
- }
-
- protected KafkaStreams(final TopologyMetadata topologyMetadata,
Review Comment:
Can remove this as post of side-clenaup
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]