This is an automated email from the ASF dual-hosted git repository.
lianetm pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new c4a769bc8be MINOR: Rename ambiguous method name (#19875)
c4a769bc8be is described below
commit c4a769bc8be54fe07e0ed5f05f00b48aa02a8404
Author: hgh1472 <[email protected]>
AuthorDate: Sat Jun 7 04:03:51 2025 +0900
MINOR: Rename ambiguous method name (#19875)
While reading through the code, I found the method name to be somewhat
ambiguous and not fully descriptive of its purpose.
So I renamed the method to make its purpose clearer and more
self-explanatory. If there was another reason for the original naming,
I’d be happy to hear about it.
Reviewers: Lianet Magrans <[email protected]>
---
.../clients/consumer/internals/AsyncKafkaConsumer.java | 14 +++++++-------
.../clients/consumer/internals/ClassicKafkaConsumer.java | 14 +++++++-------
2 files changed, 14 insertions(+), 14 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
index 29843c765c3..6f1f8c8bc64 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
@@ -954,7 +954,7 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
}
private CompletableFuture<Map<TopicPartition, OffsetAndMetadata>>
commit(final CommitEvent commitEvent) {
- maybeThrowInvalidGroupIdException();
+ throwIfGroupIdNotDefined();
offsetCommitCallbackInvoker.executeCallbacks();
if (commitEvent.offsets().isPresent() &&
commitEvent.offsets().get().isEmpty()) {
@@ -1083,7 +1083,7 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
acquireAndEnsureOpen();
long start = time.nanoseconds();
try {
- maybeThrowInvalidGroupIdException();
+ throwIfGroupIdNotDefined();
if (partitions.isEmpty()) {
return Collections.emptyMap();
}
@@ -1107,7 +1107,7 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
}
}
- private void maybeThrowInvalidGroupIdException() {
+ private void throwIfGroupIdNotDefined() {
if (groupMetadata.get().isEmpty()) {
throw new InvalidGroupIdException("To use the group management or
offset commit APIs, you must " +
"provide a valid " + ConsumerConfig.GROUP_ID_CONFIG + " in the
consumer configuration.");
@@ -1346,7 +1346,7 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
public ConsumerGroupMetadata groupMetadata() {
acquireAndEnsureOpen();
try {
- maybeThrowInvalidGroupIdException();
+ throwIfGroupIdNotDefined();
return groupMetadata.get().get();
} finally {
release();
@@ -2028,7 +2028,7 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
private void subscribeInternal(Pattern pattern,
Optional<ConsumerRebalanceListener> listener) {
acquireAndEnsureOpen();
try {
- maybeThrowInvalidGroupIdException();
+ throwIfGroupIdNotDefined();
if (pattern == null || pattern.toString().isEmpty())
throw new IllegalArgumentException("Topic pattern to subscribe
to cannot be " + (pattern == null ?
"null" : "empty"));
@@ -2052,7 +2052,7 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
Optional<ConsumerRebalanceListener>
listener) {
acquireAndEnsureOpen();
try {
- maybeThrowInvalidGroupIdException();
+ throwIfGroupIdNotDefined();
throwIfSubscriptionPatternIsInvalid(pattern);
log.info("Subscribing to regular expression {}", pattern);
applicationEventHandler.addAndGet(new
TopicRe2JPatternSubscriptionChangeEvent(
@@ -2076,7 +2076,7 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
private void subscribeInternal(Collection<String> topics,
Optional<ConsumerRebalanceListener> listener) {
acquireAndEnsureOpen();
try {
- maybeThrowInvalidGroupIdException();
+ throwIfGroupIdNotDefined();
if (topics == null)
throw new IllegalArgumentException("Topic collection to
subscribe to cannot be null");
if (topics.isEmpty()) {
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java
index 3a50ff037ab..0e4119b9e33 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java
@@ -477,7 +477,7 @@ public class ClassicKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
private void subscribeInternal(Collection<String> topics,
Optional<ConsumerRebalanceListener> listener) {
acquireAndEnsureOpen();
try {
- maybeThrowInvalidGroupIdException();
+ throwIfGroupIdNotDefined();
if (topics == null)
throw new IllegalArgumentException("Topic collection to
subscribe to cannot be null");
if (topics.isEmpty()) {
@@ -558,7 +558,7 @@ public class ClassicKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
* configured at-least one partition
assignment strategy
*/
private void subscribeInternal(Pattern pattern,
Optional<ConsumerRebalanceListener> listener) {
- maybeThrowInvalidGroupIdException();
+ throwIfGroupIdNotDefined();
if (pattern == null || pattern.toString().isEmpty())
throw new IllegalArgumentException("Topic pattern to subscribe to
cannot be " + (pattern == null ?
"null" : "empty"));
@@ -742,7 +742,7 @@ public class ClassicKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
acquireAndEnsureOpen();
long commitStart = time.nanoseconds();
try {
- maybeThrowInvalidGroupIdException();
+ throwIfGroupIdNotDefined();
offsets.forEach(this::updateLastSeenEpochIfNewer);
if (!coordinator.commitOffsetsSync(new HashMap<>(offsets),
time.timer(timeout))) {
throw new TimeoutException("Timeout of " + timeout.toMillis()
+ "ms expired before successfully " +
@@ -768,7 +768,7 @@ public class ClassicKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
public void commitAsync(final Map<TopicPartition, OffsetAndMetadata>
offsets, OffsetCommitCallback callback) {
acquireAndEnsureOpen();
try {
- maybeThrowInvalidGroupIdException();
+ throwIfGroupIdNotDefined();
log.debug("Committing offsets: {}", offsets);
offsets.forEach(this::updateLastSeenEpochIfNewer);
coordinator.commitOffsetsAsync(new HashMap<>(offsets), callback);
@@ -889,7 +889,7 @@ public class ClassicKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
acquireAndEnsureOpen();
long start = time.nanoseconds();
try {
- maybeThrowInvalidGroupIdException();
+ throwIfGroupIdNotDefined();
final Map<TopicPartition, OffsetAndMetadata> offsets;
offsets = coordinator.fetchCommittedOffsets(partitions,
time.timer(timeout));
if (offsets == null) {
@@ -1078,7 +1078,7 @@ public class ClassicKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
public ConsumerGroupMetadata groupMetadata() {
acquireAndEnsureOpen();
try {
- maybeThrowInvalidGroupIdException();
+ throwIfGroupIdNotDefined();
return coordinator.groupMetadata();
} finally {
release();
@@ -1272,7 +1272,7 @@ public class ClassicKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG + "
configuration property");
}
- private void maybeThrowInvalidGroupIdException() {
+ private void throwIfGroupIdNotDefined() {
if (groupId.isEmpty())
throw new InvalidGroupIdException("To use the group management or
offset commit APIs, you must " +
"provide a valid " + ConsumerConfig.GROUP_ID_CONFIG + " in
the consumer configuration.");