kirktrue commented on code in PR #14670: URL: https://github.com/apache/kafka/pull/14670#discussion_r1376925327
########## clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java: ########## @@ -321,20 +320,6 @@ public class ConsumerConfig extends AbstractConfig { */ static final String LEAVE_GROUP_ON_CLOSE_CONFIG = "internal.leave.group.on.close"; - /** - * <code>internal.throw.on.fetch.stable.offset.unsupported</code> - * Whether or not the consumer should throw when the new stable offset feature is supported. - * If set to <code>true</code> then the client shall crash upon hitting it. - * The purpose of this flag is to prevent unexpected broker downgrade which makes - * the offset fetch protection against pending commit invalid. The safest approach - * is to fail fast to avoid introducing correctness issue. - * - * <p> - * Note: this is an internal configuration and could be changed in the future in a backward incompatible way - * - */ - static final String THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED = "internal.throw.on.fetch.stable.offset.unsupported"; - Review Comment: I moved this to `ConsumerUtils` because `LegacyKafkaConsumer` and `AsyncKafkaConsumer` couldn't access it via package-level visibility since they're in the `internals` sub-package. @dajac—will this be considered a breaking change? I assumed not because `THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED` isn't public. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ########## @@ -250,7 +255,7 @@ public PrototypeAsyncConsumer(final Time time, // no coordinator will be constructed for the default (null) group id if (!groupId.isPresent()) { config.ignore(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG); - //config.ignore(ConsumerConfig.THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED); + config.ignore(ConsumerUtils.THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED); Review Comment: Moving the package-level variable and method from `ConsumerConfig` also fixes this issue. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ########## @@ -982,23 +987,6 @@ private boolean initWithCommittedOffsetsIfNeeded(Timer timer) { } } - // This is here temporary as we don't have public access to the ConsumerConfig in this module. - public static Map<String, Object> appendDeserializerToConfig(Map<String, Object> configs, - Deserializer<?> keyDeserializer, - Deserializer<?> valueDeserializer) { - // validate deserializer configuration, if the passed deserializer instance is null, the user must explicitly set a valid deserializer configuration value - Map<String, Object> newConfigs = new HashMap<>(configs); - if (keyDeserializer != null) - newConfigs.put(KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer.getClass()); - else if (newConfigs.get(KEY_DESERIALIZER_CLASS_CONFIG) == null) - throw new ConfigException(KEY_DESERIALIZER_CLASS_CONFIG, null, "must be non-null."); - if (valueDeserializer != null) - newConfigs.put(VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer.getClass()); - else if (newConfigs.get(VALUE_DESERIALIZER_CLASS_CONFIG) == null) - throw new ConfigException(VALUE_DESERIALIZER_CLASS_CONFIG, null, "must be non-null."); - return newConfigs; - } - Review Comment: The original version of this method as it appeared in `ConsumerConfig` was moved to `ConsumerUtils` so now it can be used from here too 😄 ########## tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java: ########## @@ -430,8 +430,9 @@ public void testConsume(final long prodTimeMs) throws Throwable { () -> log.info("offsetsForTime = {}", offsetsForTime.result)); // Whether or not offsetsForTimes works, beginningOffsets and endOffsets // should work. - consumer.beginningOffsets(timestampsToSearch.keySet()); - consumer.endOffsets(timestampsToSearch.keySet()); + Map<TopicPartition, Long> beginningOffsets = consumer.beginningOffsets(timestampsToSearch.keySet()); + Map<TopicPartition, Long> endingOffsets = consumer.endOffsets(timestampsToSearch.keySet()); + log.trace("beginningOffsets: {}, endingOffsets: {}", beginningOffsets, endingOffsets); Review Comment: This is super annoying. I started getting errors from SpotBugs because the offsets methods were called but the return value was being ignored. This is a very brute force way of silencing the checker. I could not find a clean way to ignore the warning. I also don't know why this is suddenly being caught by SpotBugs. From its perspective, nothing has changed in the `KafkaConsumer` API, right? ########## tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java: ########## @@ -281,7 +281,7 @@ private int maybeResetInputAndSeekToEndIntermediateTopicOffsets(final Map<Object if (!options.hasDryRun()) { for (final TopicPartition p : partitions) { - client.position(p); + long pos = client.position(p); Review Comment: This is another hack for bypassing the error from SpotBugs about ignored return values. ########## core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala: ########## @@ -171,19 +170,6 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness { producer } - def createAsyncConsumer[K, V](keyDeserializer: Deserializer[K] = new ByteArrayDeserializer, - valueDeserializer: Deserializer[V] = new ByteArrayDeserializer, - configOverrides: Properties = new Properties, - configsToRemove: List[String] = List()): PrototypeAsyncConsumer[K, V] = { - val props = new Properties - props ++= consumerConfig - props ++= configOverrides - configsToRemove.foreach(props.remove(_)) - val consumer = new PrototypeAsyncConsumer[K, V](props, keyDeserializer, valueDeserializer) - consumers += consumer - consumer - } - Review Comment: This is no longer needed as we can create `KafkaConsumer` directly ########## clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java: ########## @@ -666,19 +651,6 @@ else if (newConfigs.get(VALUE_DESERIALIZER_CLASS_CONFIG) == null) return newConfigs; } - boolean maybeOverrideEnableAutoCommit() { - Optional<String> groupId = Optional.ofNullable(getString(CommonClientConfigs.GROUP_ID_CONFIG)); - boolean enableAutoCommit = getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG); - if (!groupId.isPresent()) { // overwrite in case of default group id where the config is not explicitly provided - if (!originals().containsKey(ENABLE_AUTO_COMMIT_CONFIG)) { - enableAutoCommit = false; - } else if (enableAutoCommit) { - throw new InvalidConfigurationException(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG + " cannot be set to true when default group id (null) is used."); - } - } - return enableAutoCommit; - } - Review Comment: I also moved this to `ConsumerUtils` for the same reason as above. ########## clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java: ########## @@ -321,20 +320,6 @@ public class ConsumerConfig extends AbstractConfig { */ static final String LEAVE_GROUP_ON_CLOSE_CONFIG = "internal.leave.group.on.close"; - /** - * <code>internal.throw.on.fetch.stable.offset.unsupported</code> - * Whether or not the consumer should throw when the new stable offset feature is supported. - * If set to <code>true</code> then the client shall crash upon hitting it. - * The purpose of this flag is to prevent unexpected broker downgrade which makes - * the offset fetch protection against pending commit invalid. The safest approach - * is to fail fast to avoid introducing correctness issue. - * - * <p> - * Note: this is an internal configuration and could be changed in the future in a backward incompatible way - * - */ - static final String THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED = "internal.throw.on.fetch.stable.offset.unsupported"; - Review Comment: I moved this to `ConsumerUtils` because `LegacyKafkaConsumer` and `AsyncKafkaConsumer` couldn't access it via package-level visibility since they're in the `internals` sub-package. @dajac—will this be considered a breaking change? I assumed not because `THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED` isn't public. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ########## @@ -982,23 +987,6 @@ private boolean initWithCommittedOffsetsIfNeeded(Timer timer) { } } - // This is here temporary as we don't have public access to the ConsumerConfig in this module. - public static Map<String, Object> appendDeserializerToConfig(Map<String, Object> configs, - Deserializer<?> keyDeserializer, - Deserializer<?> valueDeserializer) { - // validate deserializer configuration, if the passed deserializer instance is null, the user must explicitly set a valid deserializer configuration value - Map<String, Object> newConfigs = new HashMap<>(configs); - if (keyDeserializer != null) - newConfigs.put(KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer.getClass()); - else if (newConfigs.get(KEY_DESERIALIZER_CLASS_CONFIG) == null) - throw new ConfigException(KEY_DESERIALIZER_CLASS_CONFIG, null, "must be non-null."); - if (valueDeserializer != null) - newConfigs.put(VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer.getClass()); - else if (newConfigs.get(VALUE_DESERIALIZER_CLASS_CONFIG) == null) - throw new ConfigException(VALUE_DESERIALIZER_CLASS_CONFIG, null, "must be non-null."); - return newConfigs; - } - Review Comment: The original version of this method as it appeared in `ConsumerConfig` was moved to `ConsumerUtils` so now it can be used from here too 😄 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org