This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new c446e799be3 KAFKA-17010 Remove `DescribeLogDirsResponse#LogDirInfo`,
`DescribeLogDirsResponse#ReplicaInfo`, and `DescribeLogDirsResult#all` (#17953)
c446e799be3 is described below
commit c446e799be38eaa15be7650df4fc404d6abb6eed
Author: Chia-Chuan Yu <[email protected]>
AuthorDate: Thu Nov 28 04:42:34 2024 +0800
KAFKA-17010 Remove `DescribeLogDirsResponse#LogDirInfo`,
`DescribeLogDirsResponse#ReplicaInfo`, and `DescribeLogDirsResult#all` (#17953)
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../kafka/clients/admin/DescribeLogDirsResult.java | 46 -------------
.../common/requests/DescribeLogDirsResponse.java | 66 -------------------
.../kafka/clients/admin/KafkaAdminClientTest.java | 77 ----------------------
docs/upgrade.html | 6 ++
4 files changed, 6 insertions(+), 189 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeLogDirsResult.java
b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeLogDirsResult.java
index 1ed2d49c962..82b0f111f53 100644
---
a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeLogDirsResult.java
+++
b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeLogDirsResult.java
@@ -19,15 +19,11 @@ package org.apache.kafka.clients.admin;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.annotation.InterfaceStability;
-import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.requests.DescribeLogDirsResponse;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
/**
@@ -43,36 +39,6 @@ public class DescribeLogDirsResult {
this.futures = futures;
}
- /**
- * Return a map from brokerId to future which can be used to check the
information of partitions on each individual broker.
- * @deprecated Deprecated Since Kafka 2.7. Use {@link #descriptions()}.
- */
- @Deprecated
- public Map<Integer, KafkaFuture<Map<String,
DescribeLogDirsResponse.LogDirInfo>>> values() {
- return descriptions().entrySet().stream()
- .collect(Collectors.toMap(
- Map.Entry::getKey,
- entry -> entry.getValue().thenApply(this::convertMapValues)));
- }
-
- @SuppressWarnings("deprecation")
- private Map<String, DescribeLogDirsResponse.LogDirInfo>
convertMapValues(Map<String, LogDirDescription> map) {
- Stream<Map.Entry<String, LogDirDescription>> stream =
map.entrySet().stream();
- return stream.collect(Collectors.toMap(
- Map.Entry::getKey,
- infoEntry -> {
- LogDirDescription logDir = infoEntry.getValue();
- return new DescribeLogDirsResponse.LogDirInfo(logDir.error()
== null ? Errors.NONE : Errors.forException(logDir.error()),
-
logDir.replicaInfos().entrySet().stream().collect(Collectors.toMap(
- Map.Entry::getKey,
- replicaEntry -> new
DescribeLogDirsResponse.ReplicaInfo(
- replicaEntry.getValue().size(),
- replicaEntry.getValue().offsetLag(),
- replicaEntry.getValue().isFuture())
- )));
- }));
- }
-
/**
* Return a map from brokerId to future which can be used to check the
information of partitions on each individual broker.
* The result of the future is a map from broker log directory path to a
description of that log directory.
@@ -81,18 +47,6 @@ public class DescribeLogDirsResult {
return futures;
}
- /**
- * Return a future which succeeds only if all the brokers have responded
without error
- * @deprecated Deprecated Since Kafka 2.7. Use {@link #allDescriptions()}.
- */
- @Deprecated
- public KafkaFuture<Map<Integer, Map<String,
DescribeLogDirsResponse.LogDirInfo>>> all() {
- return allDescriptions().thenApply(map ->
map.entrySet().stream().collect(Collectors.toMap(
- Map.Entry::getKey,
- entry -> convertMapValues(entry.getValue())
- )));
- }
-
/**
* Return a future which succeeds only if all the brokers have responded
without error.
* The result of the future is a map from brokerId to a map from broker
log directory path
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsResponse.java
index 0177b2f0f55..b2245d3edce 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsResponse.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsResponse.java
@@ -17,7 +17,6 @@
package org.apache.kafka.common.requests;
-import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.message.DescribeLogDirsResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
@@ -69,71 +68,6 @@ public class DescribeLogDirsResponse extends
AbstractResponse {
return new DescribeLogDirsResponse(new DescribeLogDirsResponseData(new
ByteBufferAccessor(buffer), version));
}
- // Note this class is part of the public API, reachable from
Admin.describeLogDirs()
- /**
- * Possible error code:
- *
- * KAFKA_STORAGE_ERROR (56)
- * UNKNOWN (-1)
- *
- * @deprecated Deprecated Since Kafka 2.7.
- * Use {@link
org.apache.kafka.clients.admin.DescribeLogDirsResult#descriptions()}
- * and {@link
org.apache.kafka.clients.admin.DescribeLogDirsResult#allDescriptions()} to
access the replacement
- * class {@link org.apache.kafka.clients.admin.LogDirDescription}.
- */
- @Deprecated
- public static class LogDirInfo {
- public final Errors error;
- public final Map<TopicPartition, ReplicaInfo> replicaInfos;
-
- public LogDirInfo(Errors error, Map<TopicPartition, ReplicaInfo>
replicaInfos) {
- this.error = error;
- this.replicaInfos = replicaInfos;
- }
-
- @Override
- public String toString() {
- return "(error=" +
- error +
- ", replicas=" +
- replicaInfos +
- ")";
- }
- }
-
- // Note this class is part of the public API, reachable from
Admin.describeLogDirs()
-
- /**
- * @deprecated Deprecated Since Kafka 2.7.
- * Use {@link
org.apache.kafka.clients.admin.DescribeLogDirsResult#descriptions()}
- * and {@link
org.apache.kafka.clients.admin.DescribeLogDirsResult#allDescriptions()} to
access the replacement
- * class {@link org.apache.kafka.clients.admin.ReplicaInfo}.
- */
- @Deprecated
- public static class ReplicaInfo {
-
- public final long size;
- public final long offsetLag;
- public final boolean isFuture;
-
- public ReplicaInfo(long size, long offsetLag, boolean isFuture) {
- this.size = size;
- this.offsetLag = offsetLag;
- this.isFuture = isFuture;
- }
-
- @Override
- public String toString() {
- return "(size=" +
- size +
- ", offsetLag=" +
- offsetLag +
- ", isFuture=" +
- isFuture +
- ")";
- }
- }
-
@Override
public boolean shouldClientThrottle(short version) {
return version >= 1;
diff --git
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 17e0a20dbf9..782dd00d0a3 100644
---
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -2320,50 +2320,6 @@ public class KafkaAdminClientTest {
}
}
- @SuppressWarnings("deprecation")
- @Test
- public void testDescribeLogDirsDeprecated() throws ExecutionException,
InterruptedException {
- Set<Integer> brokers = singleton(0);
- TopicPartition tp = new TopicPartition("topic", 12);
- String logDir = "/var/data/kafka";
- Errors error = Errors.NONE;
- int offsetLag = 24;
- long partitionSize = 1234567890;
-
- try (AdminClientUnitTestEnv env = mockClientEnv()) {
- env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
- env.kafkaClient().prepareResponseFrom(
- prepareDescribeLogDirsResponse(error, logDir, tp,
partitionSize, offsetLag),
- env.cluster().nodeById(0));
-
- DescribeLogDirsResult result =
env.adminClient().describeLogDirs(brokers);
-
- Map<Integer, KafkaFuture<Map<String,
DescribeLogDirsResponse.LogDirInfo>>> deprecatedValues = result.values();
- assertEquals(brokers, deprecatedValues.keySet());
- assertNotNull(deprecatedValues.get(0));
- assertDescriptionContains(deprecatedValues.get(0).get(), logDir,
tp, error, offsetLag, partitionSize);
-
- Map<Integer, Map<String, DescribeLogDirsResponse.LogDirInfo>>
deprecatedAll = result.all().get();
- assertEquals(brokers, deprecatedAll.keySet());
- assertDescriptionContains(deprecatedAll.get(0), logDir, tp, error,
offsetLag, partitionSize);
- }
- }
-
- @SuppressWarnings("deprecation")
- private static void assertDescriptionContains(Map<String,
DescribeLogDirsResponse.LogDirInfo> descriptionsMap,
- String logDir, TopicPartition tp,
Errors error,
- int offsetLag, long partitionSize) {
- assertNotNull(descriptionsMap);
- assertEquals(singleton(logDir), descriptionsMap.keySet());
- assertEquals(error, descriptionsMap.get(logDir).error);
- Map<TopicPartition, DescribeLogDirsResponse.ReplicaInfo>
allReplicaInfos =
- descriptionsMap.get(logDir).replicaInfos;
- assertEquals(singleton(tp), allReplicaInfos.keySet());
- assertEquals(partitionSize, allReplicaInfos.get(tp).size);
- assertEquals(offsetLag, allReplicaInfos.get(tp).offsetLag);
- assertFalse(allReplicaInfos.get(tp).isFuture);
- }
-
@Test
public void testDescribeLogDirsOfflineDir() throws ExecutionException,
InterruptedException {
Set<Integer> brokers = singleton(0);
@@ -2396,39 +2352,6 @@ public class KafkaAdminClientTest {
}
}
- @SuppressWarnings("deprecation")
- @Test
- public void testDescribeLogDirsOfflineDirDeprecated() throws
ExecutionException, InterruptedException {
- Set<Integer> brokers = singleton(0);
- String logDir = "/var/data/kafka";
- Errors error = Errors.KAFKA_STORAGE_ERROR;
-
- try (AdminClientUnitTestEnv env = mockClientEnv()) {
- env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
- env.kafkaClient().prepareResponseFrom(
- prepareDescribeLogDirsResponse(error, logDir, emptyList()),
- env.cluster().nodeById(0));
-
- DescribeLogDirsResult result =
env.adminClient().describeLogDirs(brokers);
-
- Map<Integer, KafkaFuture<Map<String,
DescribeLogDirsResponse.LogDirInfo>>> deprecatedValues = result.values();
- assertEquals(brokers, deprecatedValues.keySet());
- assertNotNull(deprecatedValues.get(0));
- Map<String, DescribeLogDirsResponse.LogDirInfo> valuesMap =
deprecatedValues.get(0).get();
- assertEquals(singleton(logDir), valuesMap.keySet());
- assertEquals(error, valuesMap.get(logDir).error);
- assertEquals(emptySet(),
valuesMap.get(logDir).replicaInfos.keySet());
-
- Map<Integer, Map<String, DescribeLogDirsResponse.LogDirInfo>>
deprecatedAll = result.all().get();
- assertEquals(brokers, deprecatedAll.keySet());
- Map<String, DescribeLogDirsResponse.LogDirInfo> allMap =
deprecatedAll.get(0);
- assertNotNull(allMap);
- assertEquals(singleton(logDir), allMap.keySet());
- assertEquals(error, allMap.get(logDir).error);
- assertEquals(emptySet(), allMap.get(logDir).replicaInfos.keySet());
- }
- }
-
@Test
public void testDescribeReplicaLogDirs() throws ExecutionException,
InterruptedException {
TopicPartitionReplica tpr1 = new TopicPartitionReplica("topic", 12, 1);
diff --git a/docs/upgrade.html b/docs/upgrade.html
index 9cc3aca61c3..4ed85d69c65 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -38,6 +38,12 @@
<li>The <code>bufferpool-wait-time-total</code>,
<code>io-waittime-total</code>, and <code>iotime-total</code> metrics were
removed.
Please use
<code>bufferpool-wait-time-ns-total</code>, <code>io-wait-time-ns-total</code>,
and <code>io-time-ns-total</code> metrics as replacements, respectively.
</li>
+ <li>The
<code>kafka.common.requests.DescribeLogDirsResponse.LogDirInfo</code> class was
removed. Please use the
<code>kafka.clients.admin.DescribeLogDirsResult.descriptions()</code> class
+ and
<code>kafka.clients.admin.DescribeLogDirsResult.allDescriptions()</code>instead.
+ </li>
+ <li>The
<code>kafka.common.requests.DescribeLogDirsResponse.ReplicaInfo</code> class
was removed. Please use the
<code>kafka.clients.admin.DescribeLogDirsResult.descriptions()</code> class
+ and
<code>kafka.clients.admin.DescribeLogDirsResult.allDescriptions()</code>instead.
+ </li>
</ul>
</li>
<li><b>Broker</b>