This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new e6f8f5d MINOR: Remove unused variables, methods, parameters, unthrown
exceptions, and fix typos (#9457)
e6f8f5d is described below
commit e6f8f5d0ae7777f15fb4a6d7744cd58c195aac31
Author: Lee Dongjin <[email protected]>
AuthorDate: Wed Mar 10 14:21:30 2021 +0900
MINOR: Remove unused variables, methods, parameters, unthrown exceptions,
and fix typos (#9457)
Reviewers: Chia-Ping Tsai <[email protected]
---
.../org/apache/kafka/clients/ClientUtilsTest.java | 2 +-
.../kafka/connect/file/FileStreamSinkTaskTest.java | 2 +-
.../resources/ConnectorPluginsResourceTest.java | 3 +-
.../kafka/api/PlaintextAdminIntegrationTest.scala | 18 +++---
.../unit/kafka/network/SocketServerTest.scala | 2 +-
.../integration/QueryableStateIntegrationTest.java | 67 +---------------------
.../apache/kafka/streams/test/TestRecordTest.java | 4 +-
.../kafka/tools/VerifiableLog4jAppender.java | 2 +-
8 files changed, 17 insertions(+), 83 deletions(-)
diff --git
a/clients/src/test/java/org/apache/kafka/clients/ClientUtilsTest.java
b/clients/src/test/java/org/apache/kafka/clients/ClientUtilsTest.java
index 2f0a590..dc85a52 100644
--- a/clients/src/test/java/org/apache/kafka/clients/ClientUtilsTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/ClientUtilsTest.java
@@ -36,7 +36,7 @@ public class ClientUtilsTest {
private HostResolver hostResolver = new DefaultHostResolver();
@Test
- public void testParseAndValidateAddresses() throws UnknownHostException {
+ public void testParseAndValidateAddresses() {
checkWithoutLookup("127.0.0.1:8000");
checkWithoutLookup("localhost:8080");
checkWithoutLookup("[::1]:8000");
diff --git
a/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSinkTaskTest.java
b/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSinkTaskTest.java
index 2a60ee9..3878530 100644
---
a/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSinkTaskTest.java
+++
b/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSinkTaskTest.java
@@ -47,7 +47,7 @@ public class FileStreamSinkTaskTest {
private String outputFile;
@BeforeEach
- public void setup() throws Exception {
+ public void setup() {
os = new ByteArrayOutputStream();
printStream = new PrintStream(os);
task = new FileStreamSinkTask(printStream);
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
index e458c5f..148b782 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
@@ -427,8 +427,7 @@ public class ConnectorPluginsResourceTest {
}
public static class MockConnectorPluginDesc extends PluginDesc<Connector> {
- public MockConnectorPluginDesc(Class<? extends Connector> klass,
String version)
- throws Exception {
+ public MockConnectorPluginDesc(Class<? extends Connector> klass,
String version) {
super(klass, version, new MockPluginClassLoader(null, new URL[0]));
}
diff --git
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
index 87cb257..3b3bf00 100644
---
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
+++
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
@@ -189,7 +189,7 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
def testDescribeLogDirs(): Unit = {
client = Admin.create(createConfig)
val topic = "topic"
- val leaderByPartition = createTopic(topic, numPartitions = 10,
replicationFactor = 1)
+ val leaderByPartition = createTopic(topic, numPartitions = 10)
val partitionsByBroker = leaderByPartition.groupBy { case (_, leaderId) =>
leaderId }.map { case (k, v) =>
k -> v.keys.toSeq
}
@@ -217,7 +217,7 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
def testDescribeReplicaLogDirs(): Unit = {
client = Admin.create(createConfig)
val topic = "topic"
- val leaderByPartition = createTopic(topic, numPartitions = 10,
replicationFactor = 1)
+ val leaderByPartition = createTopic(topic, numPartitions = 10)
val replicas = leaderByPartition.map { case (partition, brokerId) =>
new TopicPartitionReplica(topic, partition, brokerId)
}.toSeq
@@ -255,7 +255,7 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
assertTrue(exception.getCause.isInstanceOf[UnknownTopicOrPartitionException])
}
- createTopic(topic, numPartitions = 1, replicationFactor = brokerCount)
+ createTopic(topic, replicationFactor = brokerCount)
servers.foreach { server =>
val logDir = server.logManager.getLog(tp).get.dir.getParent
assertEquals(firstReplicaAssignment(new TopicPartitionReplica(topic, 0,
server.config.brokerId)), logDir)
@@ -332,7 +332,7 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
val topic2 = "describe-alter-configs-topic-2"
val topicResource2 = new ConfigResource(ConfigResource.Type.TOPIC, topic2)
- createTopic(topic2, numPartitions = 1, replicationFactor = 1)
+ createTopic(topic2)
// Describe topics and broker
val brokerResource1 = new ConfigResource(ConfigResource.Type.BROKER,
servers(1).config.brokerId.toString)
@@ -395,10 +395,10 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
// Create topics
val topic1 = "create-partitions-topic-1"
- createTopic(topic1, numPartitions = 1, replicationFactor = 1)
+ createTopic(topic1)
val topic2 = "create-partitions-topic-2"
- createTopic(topic2, numPartitions = 1, replicationFactor = 2)
+ createTopic(topic2, replicationFactor = 2)
// assert that both the topics have 1 partition
val topic1_metadata = getTopicMetadata(client, topic1)
@@ -683,7 +683,7 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
@Test
def testReplicaCanFetchFromLogStartOffsetAfterDeleteRecords(): Unit = {
- val leaders = createTopic(topic, numPartitions = 1, replicationFactor =
brokerCount)
+ val leaders = createTopic(topic, replicationFactor = brokerCount)
val followerIndex = if (leaders(0) != servers(0).config.brokerId) 0 else 1
def waitForFollowerLog(expectedStartOffset: Long, expectedEndOffset:
Long): Unit = {
@@ -731,7 +731,7 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
@Test
def testAlterLogDirsAfterDeleteRecords(): Unit = {
client = Admin.create(createConfig)
- createTopic(topic, numPartitions = 1, replicationFactor = brokerCount)
+ createTopic(topic, replicationFactor = brokerCount)
val expectedLEO = 100
val producer = createProducer()
sendRecords(producer, expectedLEO, topicPartition)
@@ -1628,7 +1628,7 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
// Create topics
val topic = "list-reassignments-no-reassignments"
- createTopic(topic, numPartitions = 1, replicationFactor = 3)
+ createTopic(topic, replicationFactor = 3)
val tp = new TopicPartition(topic, 0)
val reassignmentsMap =
client.listPartitionReassignments(Set(tp).asJava).reassignments().get()
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index d323030..de84efa 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -1743,7 +1743,7 @@ class SocketServerTest {
// In each iteration, SocketServer processes at most connectionQueueSize
(1 in this test)
// new connections and then does poll() to process data from existing
connections. So for
// 5 connections, we expect 5 iterations. Since we stop when the 5th
connection is processed,
- // we can safely check that there were atleast 4 polls prior to the 5th
connection.
+ // we can safely check that there were at least 4 polls prior to the 5th
connection.
val pollCount = testableSelector.operationCounts(SelectorOperation.Poll)
assertTrue(pollCount >= numConnections - 1, s"Connections created too
quickly: $pollCount")
verifyAcceptorBlockedPercent("PLAINTEXT", expectBlocked = true)
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
index a7f413a..d776cc2 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
@@ -83,7 +83,6 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -118,7 +117,6 @@ import static org.hamcrest.core.IsEqual.equalTo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThrows;
-import static org.junit.Assert.assertTrue;
@Category({IntegrationTest.class})
public class QueryableStateIntegrationTest {
@@ -1153,53 +1151,6 @@ public class QueryableStateIntegrationTest {
assertThat(countState, equalTo(expectedCount));
}
- /**
- * Verify that the new count is greater than or equal to the previous
count.
- * Note: this method changes the values in expectedWindowState and
expectedCount
- *
- * @param keys All the keys we ever expect to find
- * @param expectedWindowedCount Expected windowed count
- * @param expectedCount Expected count
- * @param windowStore Window Store
- * @param keyValueStore Key-value store
- */
- private void verifyGreaterOrEqual(final String[] keys,
- final Map<String, Long>
expectedWindowedCount,
- final Map<String, Long> expectedCount,
- final ReadOnlyWindowStore<String, Long>
windowStore,
- final ReadOnlyKeyValueStore<String,
Long> keyValueStore) {
- final Map<String, Long> windowState = new HashMap<>();
- final Map<String, Long> countState = new HashMap<>();
-
- for (final String key : keys) {
- final Map<String, Long> map = fetchMap(windowStore, key);
- windowState.putAll(map);
- final Long value = keyValueStore.get(key);
- if (value != null) {
- countState.put(key, value);
- }
- }
-
- for (final Map.Entry<String, Long> actualWindowStateEntry :
windowState.entrySet()) {
- if
(expectedWindowedCount.containsKey(actualWindowStateEntry.getKey())) {
- final Long expectedValue =
expectedWindowedCount.get(actualWindowStateEntry.getKey());
- assertTrue(actualWindowStateEntry.getValue() >= expectedValue);
- }
- // return this for next round of comparisons
- expectedWindowedCount.put(actualWindowStateEntry.getKey(),
actualWindowStateEntry.getValue());
- }
-
- for (final Map.Entry<String, Long> actualCountStateEntry :
countState.entrySet()) {
- if (expectedCount.containsKey(actualCountStateEntry.getKey())) {
- final Long expectedValue =
expectedCount.get(actualCountStateEntry.getKey());
- assertTrue(actualCountStateEntry.getValue() >= expectedValue);
- }
- // return this for next round of comparisons
- expectedCount.put(actualCountStateEntry.getKey(),
actualCountStateEntry.getValue());
- }
-
- }
-
private void waitUntilAtLeastNumRecordProcessed(final String topic,
final int numRecs) throws
Exception {
final long timeout = DEFAULT_TIMEOUT_MS;
@@ -1227,17 +1178,6 @@ public class QueryableStateIntegrationTest {
return Collections.emptySet();
}
- private Map<String, Long> fetchMap(final ReadOnlyWindowStore<String, Long>
store,
- final String key) {
- final WindowStoreIterator<Long> fetch =
- store.fetch(key, ofEpochMilli(0),
ofEpochMilli(System.currentTimeMillis()));
- if (fetch.hasNext()) {
- final KeyValue<Long, Long> next = fetch.next();
- return Collections.singletonMap(key, next.value);
- }
- return Collections.emptyMap();
- }
-
/**
* A class that periodically produces records in a separate thread
*/
@@ -1246,7 +1186,6 @@ public class QueryableStateIntegrationTest {
private final List<String> inputValues;
private final int numIterations;
private int currIteration = 0;
- boolean shutdown = false;
ProducerRunnable(final String topic,
final List<String> inputValues,
@@ -1264,10 +1203,6 @@ public class QueryableStateIntegrationTest {
return currIteration;
}
- synchronized void shutdown() {
- shutdown = true;
- }
-
@Override
public void run() {
final Properties producerConfig = new Properties();
@@ -1279,7 +1214,7 @@ public class QueryableStateIntegrationTest {
try (final KafkaProducer<String, String> producer =
new KafkaProducer<>(producerConfig, new
StringSerializer(), new StringSerializer())) {
- while (getCurrIteration() < numIterations && !shutdown) {
+ while (getCurrIteration() < numIterations) {
for (final String value : inputValues) {
producer.send(new ProducerRecord<>(topic, value));
}
diff --git
a/streams/test-utils/src/test/java/org/apache/kafka/streams/test/TestRecordTest.java
b/streams/test-utils/src/test/java/org/apache/kafka/streams/test/TestRecordTest.java
index 0999614..b16ea03 100644
---
a/streams/test-utils/src/test/java/org/apache/kafka/streams/test/TestRecordTest.java
+++
b/streams/test-utils/src/test/java/org/apache/kafka/streams/test/TestRecordTest.java
@@ -42,7 +42,7 @@ public class TestRecordTest {
private final Headers headers = new RecordHeaders(
new Header[]{
new RecordHeader("foo", "value".getBytes()),
- new RecordHeader("bar", (byte[]) null),
+ new RecordHeader("bar", null),
new RecordHeader("\"A\\u00ea\\u00f1\\u00fcC\"",
"value".getBytes())
});
private final Instant recordTime = Instant.parse("2019-06-01T10:00:00Z");
@@ -100,7 +100,7 @@ public class TestRecordTest {
final Headers headers2 = new RecordHeaders(
new Header[]{
new RecordHeader("foo", "value".getBytes()),
- new RecordHeader("bar", (byte[]) null),
+ new RecordHeader("bar", null),
});
final TestRecord<String, Integer> headerMismatch = new
TestRecord<>(key, value, headers2, recordTime);
assertNotEquals(testRecord, headerMismatch);
diff --git
a/tools/src/main/java/org/apache/kafka/tools/VerifiableLog4jAppender.java
b/tools/src/main/java/org/apache/kafka/tools/VerifiableLog4jAppender.java
index 534c21b..d390926 100644
--- a/tools/src/main/java/org/apache/kafka/tools/VerifiableLog4jAppender.java
+++ b/tools/src/main/java/org/apache/kafka/tools/VerifiableLog4jAppender.java
@@ -236,7 +236,7 @@ public class VerifiableLog4jAppender {
PropertyConfigurator.configure(props);
}
- public static void main(String[] args) throws IOException {
+ public static void main(String[] args) {
final VerifiableLog4jAppender appender = createFromArgs(args);
boolean infinite = appender.maxMessages < 0;