kirktrue commented on code in PR #14670:
URL: https://github.com/apache/kafka/pull/14670#discussion_r1376925327


##########
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java:
##########
@@ -321,20 +320,6 @@ public class ConsumerConfig extends AbstractConfig {
      */
     static final String LEAVE_GROUP_ON_CLOSE_CONFIG = 
"internal.leave.group.on.close";
 
-    /**
-     * <code>internal.throw.on.fetch.stable.offset.unsupported</code>
-     * Whether or not the consumer should throw when the new stable offset 
feature is supported.
-     * If set to <code>true</code> then the client shall crash upon hitting it.
-     * The purpose of this flag is to prevent unexpected broker downgrade 
which makes
-     * the offset fetch protection against pending commit invalid. The safest 
approach
-     * is to fail fast to avoid introducing correctness issue.
-     *
-     * <p>
-     * Note: this is an internal configuration and could be changed in the 
future in a backward incompatible way
-     *
-     */
-    static final String THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED = 
"internal.throw.on.fetch.stable.offset.unsupported";
-

Review Comment:
   I moved this to `ConsumerUtils` because `LegacyKafkaConsumer` and 
`AsyncKafkaConsumer` couldn't access it via package-level visibility since 
they're in the `internals` sub-package.
   
   @dajac—will this be considered a breaking change? I assumed not because 
`THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED` isn't public.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -250,7 +255,7 @@ public PrototypeAsyncConsumer(final Time time,
             // no coordinator will be constructed for the default (null) group 
id
             if (!groupId.isPresent()) {
                 config.ignore(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG);
-                
//config.ignore(ConsumerConfig.THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED);
+                
config.ignore(ConsumerUtils.THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED);

Review Comment:
   Moving the package-level variable and method from `ConsumerConfig` also 
fixes this issue.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -982,23 +987,6 @@ private boolean initWithCommittedOffsetsIfNeeded(Timer 
timer) {
         }
     }
 
-    // This is here temporary as we don't have public access to the 
ConsumerConfig in this module.
-    public static Map<String, Object> appendDeserializerToConfig(Map<String, 
Object> configs,
-                                                                 
Deserializer<?> keyDeserializer,
-                                                                 
Deserializer<?> valueDeserializer) {
-        // validate deserializer configuration, if the passed deserializer 
instance is null, the user must explicitly set a valid deserializer 
configuration value
-        Map<String, Object> newConfigs = new HashMap<>(configs);
-        if (keyDeserializer != null)
-            newConfigs.put(KEY_DESERIALIZER_CLASS_CONFIG, 
keyDeserializer.getClass());
-        else if (newConfigs.get(KEY_DESERIALIZER_CLASS_CONFIG) == null)
-            throw new ConfigException(KEY_DESERIALIZER_CLASS_CONFIG, null, 
"must be non-null.");
-        if (valueDeserializer != null)
-            newConfigs.put(VALUE_DESERIALIZER_CLASS_CONFIG, 
valueDeserializer.getClass());
-        else if (newConfigs.get(VALUE_DESERIALIZER_CLASS_CONFIG) == null)
-            throw new ConfigException(VALUE_DESERIALIZER_CLASS_CONFIG, null, 
"must be non-null.");
-        return newConfigs;
-    }
-

Review Comment:
   The original version of this method as it appeared in `ConsumerConfig` was 
moved to `ConsumerUtils` so now it can be used from here too 😄 



##########
tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java:
##########
@@ -430,8 +430,9 @@ public void testConsume(final long prodTimeMs) throws 
Throwable {
                 () -> log.info("offsetsForTime = {}", offsetsForTime.result));
             // Whether or not offsetsForTimes works, beginningOffsets and 
endOffsets
             // should work.
-            consumer.beginningOffsets(timestampsToSearch.keySet());
-            consumer.endOffsets(timestampsToSearch.keySet());
+            Map<TopicPartition, Long> beginningOffsets = 
consumer.beginningOffsets(timestampsToSearch.keySet());
+            Map<TopicPartition, Long> endingOffsets = 
consumer.endOffsets(timestampsToSearch.keySet());
+            log.trace("beginningOffsets: {}, endingOffsets: {}", 
beginningOffsets, endingOffsets);

Review Comment:
   This is super annoying.
   
   I started getting errors from SpotBugs because the offsets methods were 
called but the return value was being ignored. This is a very brute force way 
of silencing the checker. I could not find a clean way to ignore the warning.
   
   I also don't know why this is suddenly being caught by SpotBugs. From its 
perspective, nothing has changed in the `KafkaConsumer` API, right?



##########
tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java:
##########
@@ -281,7 +281,7 @@ private int 
maybeResetInputAndSeekToEndIntermediateTopicOffsets(final Map<Object
 
             if (!options.hasDryRun()) {
                 for (final TopicPartition p : partitions) {
-                    client.position(p);
+                    long pos = client.position(p);

Review Comment:
   This is another hack for bypassing the error from SpotBugs about ignored 
return values.



##########
core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala:
##########
@@ -171,19 +170,6 @@ abstract class IntegrationTestHarness extends 
KafkaServerTestHarness {
     producer
   }
 
-  def createAsyncConsumer[K, V](keyDeserializer: Deserializer[K] = new 
ByteArrayDeserializer,
-                                valueDeserializer: Deserializer[V] = new 
ByteArrayDeserializer,
-                                configOverrides: Properties = new Properties,
-                                configsToRemove: List[String] = List()): 
PrototypeAsyncConsumer[K, V] = {
-    val props = new Properties
-    props ++= consumerConfig
-    props ++= configOverrides
-    configsToRemove.foreach(props.remove(_))
-    val consumer = new PrototypeAsyncConsumer[K, V](props, keyDeserializer, 
valueDeserializer)
-    consumers += consumer
-    consumer
-  }
-

Review Comment:
   This is no longer needed as we can create `KafkaConsumer` directly



##########
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java:
##########
@@ -666,19 +651,6 @@ else if (newConfigs.get(VALUE_DESERIALIZER_CLASS_CONFIG) 
== null)
         return newConfigs;
     }
 
-    boolean maybeOverrideEnableAutoCommit() {
-        Optional<String> groupId = 
Optional.ofNullable(getString(CommonClientConfigs.GROUP_ID_CONFIG));
-        boolean enableAutoCommit = 
getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
-        if (!groupId.isPresent()) { // overwrite in case of default group id 
where the config is not explicitly provided
-            if (!originals().containsKey(ENABLE_AUTO_COMMIT_CONFIG)) {
-                enableAutoCommit = false;
-            } else if (enableAutoCommit) {
-                throw new 
InvalidConfigurationException(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG + " 
cannot be set to true when default group id (null) is used.");
-            }
-        }
-        return enableAutoCommit;
-    }
-

Review Comment:
   I also moved this to `ConsumerUtils` for the same reason as above.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java:
##########
@@ -321,20 +320,6 @@ public class ConsumerConfig extends AbstractConfig {
      */
     static final String LEAVE_GROUP_ON_CLOSE_CONFIG = 
"internal.leave.group.on.close";
 
-    /**
-     * <code>internal.throw.on.fetch.stable.offset.unsupported</code>
-     * Whether or not the consumer should throw when the new stable offset 
feature is supported.
-     * If set to <code>true</code> then the client shall crash upon hitting it.
-     * The purpose of this flag is to prevent unexpected broker downgrade 
which makes
-     * the offset fetch protection against pending commit invalid. The safest 
approach
-     * is to fail fast to avoid introducing correctness issue.
-     *
-     * <p>
-     * Note: this is an internal configuration and could be changed in the 
future in a backward incompatible way
-     *
-     */
-    static final String THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED = 
"internal.throw.on.fetch.stable.offset.unsupported";
-

Review Comment:
   I moved this to `ConsumerUtils` because `LegacyKafkaConsumer` and 
`AsyncKafkaConsumer` couldn't access it via package-level visibility since 
they're in the `internals` sub-package.
   
   @dajac—will this be considered a breaking change? I assumed not because 
`THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED` isn't public.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -982,23 +987,6 @@ private boolean initWithCommittedOffsetsIfNeeded(Timer 
timer) {
         }
     }
 
-    // This is here temporary as we don't have public access to the 
ConsumerConfig in this module.
-    public static Map<String, Object> appendDeserializerToConfig(Map<String, 
Object> configs,
-                                                                 
Deserializer<?> keyDeserializer,
-                                                                 
Deserializer<?> valueDeserializer) {
-        // validate deserializer configuration, if the passed deserializer 
instance is null, the user must explicitly set a valid deserializer 
configuration value
-        Map<String, Object> newConfigs = new HashMap<>(configs);
-        if (keyDeserializer != null)
-            newConfigs.put(KEY_DESERIALIZER_CLASS_CONFIG, 
keyDeserializer.getClass());
-        else if (newConfigs.get(KEY_DESERIALIZER_CLASS_CONFIG) == null)
-            throw new ConfigException(KEY_DESERIALIZER_CLASS_CONFIG, null, 
"must be non-null.");
-        if (valueDeserializer != null)
-            newConfigs.put(VALUE_DESERIALIZER_CLASS_CONFIG, 
valueDeserializer.getClass());
-        else if (newConfigs.get(VALUE_DESERIALIZER_CLASS_CONFIG) == null)
-            throw new ConfigException(VALUE_DESERIALIZER_CLASS_CONFIG, null, 
"must be non-null.");
-        return newConfigs;
-    }
-

Review Comment:
   The original version of this method as it appeared in `ConsumerConfig` was 
moved to `ConsumerUtils` so now it can be used from here too 😄 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to