Repository: kafka Updated Branches: refs/heads/trunk f1ba4ff87 -> 1c6d5bbac
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/common/utils/Utils.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java index 8a305b0..69530c1 100644 --- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java @@ -29,7 +29,7 @@ public class Utils { private static final Pattern HOST_PORT_PATTERN = Pattern.compile("\\[?(.+?)\\]?:(\\d+)"); - public static String NL = System.getProperty("line.separator"); + public static final String NL = System.getProperty("line.separator"); /** * Turn the given UTF8 byte array into a string @@ -87,10 +87,10 @@ public class Utils { * @return The integer read (MUST BE TREATED WITH SPECIAL CARE TO AVOID SIGNEDNESS) */ public static int readUnsignedIntLE(InputStream in) throws IOException { - return (in.read() << 8*0) - | (in.read() << 8*1) - | (in.read() << 8*2) - | (in.read() << 8*3); + return (in.read() << 8 * 0) + | (in.read() << 8 * 1) + | (in.read() << 8 * 2) + | (in.read() << 8 * 3); } /** @@ -102,10 +102,10 @@ public class Utils { * @return The integer read (MUST BE TREATED WITH SPECIAL CARE TO AVOID SIGNEDNESS) */ public static int readUnsignedIntLE(byte[] buffer, int offset) { - return (buffer[offset++] << 8*0) - | (buffer[offset++] << 8*1) - | (buffer[offset++] << 8*2) - | (buffer[offset] << 8*3); + return (buffer[offset++] << 8 * 0) + | (buffer[offset++] << 8 * 1) + | (buffer[offset++] << 8 * 2) + | (buffer[offset] << 8 * 3); } /** @@ -136,10 +136,10 @@ public class Utils { * @param value The value to write */ public static void writeUnsignedIntLE(OutputStream out, int value) throws IOException { - out.write(value >>> 8*0); - out.write(value >>> 8*1); - out.write(value >>> 8*2); - out.write(value >>> 8*3); + out.write(value >>> 8 * 0); + out.write(value >>> 8 * 1); + out.write(value >>> 8 * 2); + out.write(value >>> 8 * 3); } /** @@ -151,10 +151,10 @@ public class Utils { * @param value The value to write */ public static void writeUnsignedIntLE(byte[] buffer, int offset, int value) { - buffer[offset++] = (byte) (value >>> 8*0); - buffer[offset++] = (byte) (value >>> 8*1); - buffer[offset++] = (byte) (value >>> 8*2); - buffer[offset] = (byte) (value >>> 8*3); + buffer[offset++] = (byte) (value >>> 8 * 0); + buffer[offset++] = (byte) (value >>> 8 * 1); + buffer[offset++] = (byte) (value >>> 8 * 2); + buffer[offset] = (byte) (value >>> 8 * 3); } @@ -285,7 +285,7 @@ public class Utils { case 2: h ^= (data[(length & ~3) + 1] & 0xff) << 8; case 1: - h ^= (data[length & ~3] & 0xff); + h ^= data[length & ~3] & 0xff; h *= m; } @@ -348,11 +348,11 @@ public class Utils { public static <T> String join(Collection<T> list, String seperator) { StringBuilder sb = new StringBuilder(); Iterator<T> iter = list.iterator(); - while(iter.hasNext()) { + while (iter.hasNext()) { sb.append(iter.next()); - if(iter.hasNext()) - sb.append(seperator); + if (iter.hasNext()) + sb.append(seperator); } - return sb.toString(); + return sb.toString(); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/test/java/org/apache/kafka/clients/ClientUtilsTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/ClientUtilsTest.java b/clients/src/test/java/org/apache/kafka/clients/ClientUtilsTest.java new file mode 100644 index 0000000..13ce519 --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/clients/ClientUtilsTest.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients; + +import org.apache.kafka.common.config.ConfigException; +import org.junit.Test; + +import java.util.Arrays; + +public class ClientUtilsTest { + + @Test + public void testParseAndValidateAddresses() { + check("127.0.0.1:8000"); + check("mydomain.com:8080"); + check("[::1]:8000"); + check("[2001:db8:85a3:8d3:1319:8a2e:370:7348]:1234", "mydomain.com:10000"); + } + + @Test(expected = ConfigException.class) + public void testNoPort() { + check("127.0.0.1"); + } + + private void check(String... url) { + ClientUtils.parseAndValidateAddresses(Arrays.asList(url)); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/test/java/org/apache/kafka/clients/MockClient.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java index 67bee40..8f1a7a6 100644 --- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java +++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java @@ -1,3 +1,19 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.kafka.clients; import java.util.ArrayDeque; @@ -65,7 +81,7 @@ public class MockClient implements KafkaClient { @Override public List<ClientResponse> poll(long timeoutMs, long now) { - for(ClientResponse response: this.responses) + for (ClientResponse response: this.responses) if (response.request().hasCallback()) response.request().callback().onComplete(response); List<ClientResponse> copy = new ArrayList<ClientResponse>(); http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java index 5debcd6..8b27889 100644 --- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java @@ -1,3 +1,19 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.kafka.clients; import static org.junit.Assert.assertEquals; @@ -9,7 +25,6 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; -import org.apache.kafka.clients.producer.internals.Metadata; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java index e51d2df..677edd3 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java @@ -1,3 +1,19 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.kafka.clients.consumer; import static org.junit.Assert.*; http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java index 864f1c7..090087a 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java @@ -1,3 +1,19 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.kafka.clients.consumer.internals; import static org.junit.Assert.*; http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/test/java/org/apache/kafka/clients/producer/BufferPoolTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/BufferPoolTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/BufferPoolTest.java index 77b23e7..4ae43ed 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/BufferPoolTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/BufferPoolTest.java @@ -107,7 +107,7 @@ public class BufferPoolTest { private CountDownLatch asyncDeallocate(final BufferPool pool, final ByteBuffer buffer) { final CountDownLatch latch = new CountDownLatch(1); - new Thread() { + Thread thread = new Thread() { public void run() { try { latch.await(); @@ -116,13 +116,14 @@ public class BufferPoolTest { } pool.deallocate(buffer); } - }.start(); + }; + thread.start(); return latch; } private CountDownLatch asyncAllocate(final BufferPool pool, final int size) { final CountDownLatch completed = new CountDownLatch(1); - new Thread() { + Thread thread = new Thread() { public void run() { try { pool.allocate(size); @@ -132,7 +133,8 @@ public class BufferPoolTest { completed.countDown(); } } - }.start(); + }; + thread.start(); return completed; } @@ -172,12 +174,12 @@ public class BufferPoolTest { try { for (int i = 0; i < iterations; i++) { int size; - if (TestUtils.random.nextBoolean()) + if (TestUtils.RANDOM.nextBoolean()) // allocate poolable size size = pool.poolableSize(); else // allocate a random size - size = TestUtils.random.nextInt((int) pool.totalMemory()); + size = TestUtils.RANDOM.nextInt((int) pool.totalMemory()); ByteBuffer buffer = pool.allocate(size); pool.deallocate(buffer); } http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java index 74605c3..743aa7e 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java @@ -12,7 +12,7 @@ */ package org.apache.kafka.clients.producer; -import org.apache.kafka.clients.producer.internals.Metadata; +import org.apache.kafka.clients.Metadata; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.test.TestUtils; @@ -49,7 +49,7 @@ public class MetadataTest { } /** - * Tests that {@link org.apache.kafka.clients.producer.internals.Metadata#awaitUpdate(int, long)} doesn't + * Tests that {@link org.apache.kafka.clients.Metadata#awaitUpdate(int, long)} doesn't * wait forever with a max timeout value of 0 * * @throws Exception @@ -68,9 +68,9 @@ public class MetadataTest { // expected } // now try with a higher timeout value once - final long TWO_SECOND_WAIT = 2000; + final long twoSecondWait = 2000; try { - metadata.awaitUpdate(metadata.requestUpdate(), TWO_SECOND_WAIT); + metadata.awaitUpdate(metadata.requestUpdate(), twoSecondWait); fail("Wait on metadata update was expected to timeout, but it didn't"); } catch (TimeoutException te) { // expected http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java index d3377ef..75513b0 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java @@ -32,6 +32,7 @@ public class MockProducerTest { private String topic = "topic"; @Test + @SuppressWarnings("unchecked") public void testAutoCompleteMock() throws Exception { MockProducer producer = new MockProducer(true); ProducerRecord<byte[], byte[]> record = new ProducerRecord<byte[], byte[]>(topic, "key".getBytes(), "value".getBytes()); http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java index 82d8083..29c8417 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java @@ -31,7 +31,7 @@ public class PartitionerTest { private Node node0 = new Node(0, "localhost", 99); private Node node1 = new Node(1, "localhost", 100); private Node node2 = new Node(2, "localhost", 101); - private Node[] nodes = new Node[] { node0, node1, node2 }; + private Node[] nodes = new Node[] {node0, node1, node2}; private String topic = "test"; private List<PartitionInfo> partitions = asList(new PartitionInfo(topic, 0, node0, nodes, nodes), new PartitionInfo(topic, 1, node1, nodes, nodes), http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java index e2bb8da..8333863 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java @@ -128,6 +128,7 @@ public class RecordAccumulatorTest { assertEquals("But due to size bound only one partition should have been retrieved", 1, batches.size()); } + @SuppressWarnings("unused") @Test public void testStressfulSituation() throws Exception { final int numThreads = 5; @@ -194,7 +195,7 @@ public class RecordAccumulatorTest { assertEquals("Next check time should be defined by node1, half remaining linger time", lingerMs / 2, result.nextReadyCheckDelayMs); // Add data for another partition on node1, enough to make data sendable immediately - for (int i = 0; i < appends+1; i++) + for (int i = 0; i < appends + 1; i++) accum.append(tp2, key, value, CompressionType.NONE, null); result = accum.ready(cluster, time.milliseconds()); assertEquals("Node1 should be ready", Collections.singleton(node1), result.readyNodes); http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/test/java/org/apache/kafka/clients/producer/RecordSendTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/RecordSendTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/RecordSendTest.java index a3700a6..1e5d1c2 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/RecordSendTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/RecordSendTest.java @@ -78,7 +78,7 @@ public class RecordSendTest { /* create a new request result that will be completed after the given timeout */ public ProduceRequestResult asyncRequest(final long baseOffset, final RuntimeException error, final long timeout) { final ProduceRequestResult request = new ProduceRequestResult(); - new Thread() { + Thread thread = new Thread() { public void run() { try { sleep(timeout); @@ -87,7 +87,8 @@ public class RecordSendTest { e.printStackTrace(); } } - }.start(); + }; + thread.start(); return request; } http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java index 888b929..558942a 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java @@ -21,8 +21,8 @@ import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; +import org.apache.kafka.clients.Metadata; import org.apache.kafka.clients.MockClient; -import org.apache.kafka.clients.producer.internals.Metadata; import org.apache.kafka.clients.producer.internals.RecordAccumulator; import org.apache.kafka.clients.producer.internals.Sender; import org.apache.kafka.common.Cluster; @@ -147,8 +147,8 @@ public class SenderTest { partResp.set("partition", part); partResp.set("error_code", (short) error); partResp.set("base_offset", offset); - response.set("partition_responses", new Object[] { partResp }); - struct.set("responses", new Object[] { response }); + response.set("partition_responses", new Object[] {partResp}); + struct.set("responses", new Object[] {response}); return struct; } http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java b/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java index 3cfd36d..66442ed 100644 --- a/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java +++ b/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java @@ -14,87 +14,67 @@ package org.apache.kafka.common.config; import static org.junit.Assert.fail; -import java.util.List; import java.util.Map; import java.util.Properties; import org.apache.kafka.common.config.ConfigDef.Importance; import org.apache.kafka.common.config.ConfigDef.Type; -import org.apache.kafka.common.metrics.KafkaMetric; import org.apache.kafka.common.metrics.MetricsReporter; import org.junit.Test; public class AbstractConfigTest { - @Test - public void testConfiguredInstances() { - testValidInputs(""); - testValidInputs("org.apache.kafka.common.config.AbstractConfigTest$TestMetricsReporter"); - testValidInputs("org.apache.kafka.common.config.AbstractConfigTest$TestMetricsReporter,org.apache.kafka.common.config.AbstractConfigTest$TestMetricsReporter"); - testInvalidInputs(","); - testInvalidInputs("org.apache.kafka.clients.producer.unknown-metrics-reporter"); - testInvalidInputs("test1,test2"); - testInvalidInputs("org.apache.kafka.common.config.AbstractConfigTest$TestMetricsReporter,"); - } - - private void testValidInputs(String configValue) { - Properties props = new Properties(); - props.put(TestConfig.METRIC_REPORTER_CLASSES_CONFIG, configValue); - TestConfig config = new TestConfig(props); - try { - config.getConfiguredInstances(TestConfig.METRIC_REPORTER_CLASSES_CONFIG, - MetricsReporter.class); - } catch (ConfigException e) { - fail("No exceptions are expected here, valid props are :" + props); - } - } - - private void testInvalidInputs(String configValue) { - Properties props = new Properties(); - props.put(TestConfig.METRIC_REPORTER_CLASSES_CONFIG, configValue); - TestConfig config = new TestConfig(props); - try { - config.getConfiguredInstances(TestConfig.METRIC_REPORTER_CLASSES_CONFIG, - MetricsReporter.class); - fail("Expected a config exception due to invalid props :" + props); - } catch (ConfigException e) { - // this is good + @Test + public void testConfiguredInstances() { + testValidInputs(""); + testValidInputs("org.apache.kafka.common.metrics.FakeMetricsReporter"); + testValidInputs("org.apache.kafka.common.metrics.FakeMetricsReporter, org.apache.kafka.common.metrics.FakeMetricsReporter"); + testInvalidInputs(","); + testInvalidInputs("org.apache.kafka.clients.producer.unknown-metrics-reporter"); + testInvalidInputs("test1,test2"); + testInvalidInputs("org.apache.kafka.common.metrics.FakeMetricsReporter,"); } - } - - private static class TestConfig extends AbstractConfig { - - private static final ConfigDef config; - - public static final String METRIC_REPORTER_CLASSES_CONFIG = "metric.reporters"; - private static final String METRIC_REPORTER_CLASSES_DOC = "A list of classes to use as metrics reporters."; - static { - config = new ConfigDef().define(METRIC_REPORTER_CLASSES_CONFIG, - Type.LIST, "", Importance.LOW, METRIC_REPORTER_CLASSES_DOC); + private void testValidInputs(String configValue) { + Properties props = new Properties(); + props.put(TestConfig.METRIC_REPORTER_CLASSES_CONFIG, configValue); + TestConfig config = new TestConfig(props); + try { + config.getConfiguredInstances(TestConfig.METRIC_REPORTER_CLASSES_CONFIG, MetricsReporter.class); + } catch (ConfigException e) { + fail("No exceptions are expected here, valid props are :" + props); + } } - public TestConfig(Map<? extends Object, ? extends Object> props) { - super(config, props); + private void testInvalidInputs(String configValue) { + Properties props = new Properties(); + props.put(TestConfig.METRIC_REPORTER_CLASSES_CONFIG, configValue); + TestConfig config = new TestConfig(props); + try { + config.getConfiguredInstances(TestConfig.METRIC_REPORTER_CLASSES_CONFIG, MetricsReporter.class); + fail("Expected a config exception due to invalid props :" + props); + } catch (ConfigException e) { + // this is good + } } - } - - public static class TestMetricsReporter implements MetricsReporter { - @Override - public void configure(Map<String, ?> configs) { - } + private static class TestConfig extends AbstractConfig { - @Override - public void init(List<KafkaMetric> metrics) { -} + private static final ConfigDef CONFIG; - @Override - public void metricChange(KafkaMetric metric) { - } + public static final String METRIC_REPORTER_CLASSES_CONFIG = "metric.reporters"; + private static final String METRIC_REPORTER_CLASSES_DOC = "A list of classes to use as metrics reporters."; + + static { + CONFIG = new ConfigDef().define(METRIC_REPORTER_CLASSES_CONFIG, + Type.LIST, + "", + Importance.LOW, + METRIC_REPORTER_CLASSES_DOC); + } - @Override - public void close() { + public TestConfig(Map<? extends Object, ? extends Object> props) { + super(CONFIG, props); + } } - } } http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java b/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java index 16d3fed..44c2ef0 100644 --- a/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java +++ b/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java @@ -16,7 +16,6 @@ import static java.util.Arrays.asList; import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; -import java.util.Arrays; import java.util.HashMap; import java.util.Map; import java.util.Properties; @@ -110,7 +109,7 @@ public class ConfigDefTest { @Test(expected = ConfigException.class) public void testInvalidDefaultRange() { - new ConfigDef().define("name", Type.INT, -1, Range.between(0,10), Importance.HIGH, "docs"); + new ConfigDef().define("name", Type.INT, -1, Range.between(0, 10), Importance.HIGH, "docs"); } @Test(expected = ConfigException.class) @@ -120,7 +119,7 @@ public class ConfigDefTest { @Test public void testValidators() { - testValidators(Type.INT, Range.between(0,10), 5, new Object[]{1, 5, 9}, new Object[]{-1, 11}); + testValidators(Type.INT, Range.between(0, 10), 5, new Object[]{1, 5, 9}, new Object[]{-1, 11}); testValidators(Type.STRING, ValidString.in("good", "values", "default"), "default", new Object[]{"good", "values", "default"}, new Object[]{"bad", "inputs"}); } http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/test/java/org/apache/kafka/common/metrics/FakeMetricsReporter.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/FakeMetricsReporter.java b/clients/src/test/java/org/apache/kafka/common/metrics/FakeMetricsReporter.java new file mode 100644 index 0000000..7c7ead1 --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/common/metrics/FakeMetricsReporter.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.common.metrics; + +import java.util.List; +import java.util.Map; + +public class FakeMetricsReporter implements MetricsReporter { + + @Override + public void configure(Map<String, ?> configs) {} + + @Override + public void init(List<KafkaMetric> metrics) {} + + @Override + public void metricChange(KafkaMetric metric) {} + + @Override + public void close() {} + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java index 998a57c..544e120 100644 --- a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java +++ b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java @@ -36,7 +36,7 @@ import org.junit.Test; public class MetricsTest { - private static double EPS = 0.000001; + private static final double EPS = 0.000001; MockTime time = new MockTime(); Metrics metrics = new Metrics(new MetricConfig(), Arrays.asList((MetricsReporter) new JmxReporter()), time); @@ -71,7 +71,7 @@ public class MetricsTest { s.add(new MetricName("test.count", "grp1"), new Count()); s.add(new Percentiles(100, -100, 100, BucketSizing.CONSTANT, new Percentile(new MetricName("test.median", "grp1"), 50.0), - new Percentile(new MetricName("test.perc99_9", "grp1"),99.9))); + new Percentile(new MetricName("test.perc99_9", "grp1"), 99.9))); Sensor s2 = metrics.sensor("test.sensor2"); s2.add(new MetricName("s2.total", "grp1"), new Total()); http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/test/java/org/apache/kafka/common/metrics/stats/HistogramTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/stats/HistogramTest.java b/clients/src/test/java/org/apache/kafka/common/metrics/stats/HistogramTest.java index 3be6b2d..a55cc32 100644 --- a/clients/src/test/java/org/apache/kafka/common/metrics/stats/HistogramTest.java +++ b/clients/src/test/java/org/apache/kafka/common/metrics/stats/HistogramTest.java @@ -22,7 +22,6 @@ import java.util.Arrays; import java.util.Random; -import org.apache.kafka.common.metrics.stats.Histogram; import org.apache.kafka.common.metrics.stats.Histogram.BinScheme; import org.apache.kafka.common.metrics.stats.Histogram.ConstantBinScheme; import org.apache.kafka.common.metrics.stats.Histogram.LinearBinScheme; http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java index a14659a..0d030bc 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java +++ b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java @@ -40,7 +40,6 @@ import org.junit.Test; */ public class SelectorTest { - private static final List<NetworkSend> EMPTY = new ArrayList<NetworkSend>(); private static final int BUFFER_SIZE = 4 * 1024; private EchoServer server; http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/test/java/org/apache/kafka/common/protocol/types/ProtocolSerializationTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/protocol/types/ProtocolSerializationTest.java b/clients/src/test/java/org/apache/kafka/common/protocol/types/ProtocolSerializationTest.java index 4480e9b..8b92634 100644 --- a/clients/src/test/java/org/apache/kafka/common/protocol/types/ProtocolSerializationTest.java +++ b/clients/src/test/java/org/apache/kafka/common/protocol/types/ProtocolSerializationTest.java @@ -23,12 +23,6 @@ import static org.junit.Assert.fail; import java.nio.ByteBuffer; import java.util.Arrays; -import org.apache.kafka.common.protocol.types.ArrayOf; -import org.apache.kafka.common.protocol.types.Field; -import org.apache.kafka.common.protocol.types.Schema; -import org.apache.kafka.common.protocol.types.SchemaException; -import org.apache.kafka.common.protocol.types.Struct; -import org.apache.kafka.common.protocol.types.Type; import org.junit.Before; import org.junit.Test; @@ -53,8 +47,8 @@ public class ProtocolSerializationTest { .set("int64", (long) 1) .set("string", "1") .set("bytes", "1".getBytes()) - .set("array", new Object[] { 1 }); - this.struct.set("struct", this.struct.instance("struct").set("field", new Object[] { 1, 2, 3 })); + .set("array", new Object[] {1}); + this.struct.set("struct", this.struct.instance("struct").set("field", new Object[] {1, 2, 3})); } @Test @@ -68,9 +62,9 @@ public class ProtocolSerializationTest { check(Type.STRING, "A\u00ea\u00f1\u00fcC"); check(Type.BYTES, ByteBuffer.allocate(0)); check(Type.BYTES, ByteBuffer.wrap("abcd".getBytes())); - check(new ArrayOf(Type.INT32), new Object[] { 1, 2, 3, 4 }); + check(new ArrayOf(Type.INT32), new Object[] {1, 2, 3, 4}); check(new ArrayOf(Type.STRING), new Object[] {}); - check(new ArrayOf(Type.STRING), new Object[] { "hello", "there", "beautiful" }); + check(new ArrayOf(Type.STRING), new Object[] {"hello", "there", "beautiful"}); } @Test http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java index 94a1112..e343327 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java @@ -71,7 +71,7 @@ public class MemoryRecordsTest { public static Collection<Object[]> data() { List<Object[]> values = new ArrayList<Object[]>(); for (CompressionType type: CompressionType.values()) - values.add(new Object[] { type }); + values.add(new Object[] {type}); return values; } } http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/test/java/org/apache/kafka/common/record/RecordTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/record/RecordTest.java b/clients/src/test/java/org/apache/kafka/common/record/RecordTest.java index 2765913..957fc8f 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/RecordTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/RecordTest.java @@ -63,7 +63,7 @@ public class RecordTest { @Test public void testChecksum() { assertEquals(record.checksum(), record.computeChecksum()); - assertEquals(record.checksum(), record.computeChecksum( + assertEquals(record.checksum(), Record.computeChecksum( this.key == null ? null : this.key.array(), this.value == null ? null : this.value.array(), this.compression, 0, -1)); @@ -102,7 +102,7 @@ public class RecordTest { for (byte[] key : Arrays.asList(null, "".getBytes(), "key".getBytes(), payload)) for (byte[] value : Arrays.asList(null, "".getBytes(), "value".getBytes(), payload)) for (CompressionType compression : CompressionType.values()) - values.add(new Object[] { key, value, compression }); + values.add(new Object[] {key, value, compression}); return values; } http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index df37fc6..13237fd 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -17,6 +17,7 @@ import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.protocol.Errors; import org.junit.Test; import java.lang.reflect.Method; @@ -31,7 +32,7 @@ import static org.junit.Assert.assertEquals; public class RequestResponseTest { @Test - public void testSerialization() throws Exception{ + public void testSerialization() throws Exception { List<AbstractRequestResponse> requestList = Arrays.asList( createRequestHeader(), createResponseHeader(), @@ -67,7 +68,7 @@ public class RequestResponseTest { } private AbstractRequestResponse createRequestHeader() { - return new RequestHeader((short)10, (short)1, "", 10); + return new RequestHeader((short) 10, (short) 1, "", 10); } private AbstractRequestResponse createResponseHeader() { @@ -79,7 +80,7 @@ public class RequestResponseTest { } private AbstractRequestResponse createConsumerMetadataResponse() { - return new ConsumerMetadataResponse((short)1, new Node(10, "host1", 2014)); + return new ConsumerMetadataResponse((short) 1, new Node(10, "host1", 2014)); } private AbstractRequestResponse createFetchRequest() { @@ -91,7 +92,7 @@ public class RequestResponseTest { private AbstractRequestResponse createFetchResponse() { Map<TopicPartition, FetchResponse.PartitionData> responseData = new HashMap<TopicPartition, FetchResponse.PartitionData>(); - responseData.put(new TopicPartition("test", 0), new FetchResponse.PartitionData((short)0, 1000000, ByteBuffer.allocate(10))); + responseData.put(new TopicPartition("test", 0), new FetchResponse.PartitionData(Errors.NONE.code(), 1000000, ByteBuffer.allocate(10))); return new FetchResponse(responseData); } @@ -100,7 +101,7 @@ public class RequestResponseTest { } private AbstractRequestResponse createHeartBeatResponse() { - return new HeartbeatResponse((short)0); + return new HeartbeatResponse(Errors.NONE.code()); } private AbstractRequestResponse createJoinGroupRequest() { @@ -108,7 +109,7 @@ public class RequestResponseTest { } private AbstractRequestResponse createJoinGroupResponse() { - return new JoinGroupResponse((short)0, 1, "consumer1", Arrays.asList(new TopicPartition("test11", 1), new TopicPartition("test2", 1))); + return new JoinGroupResponse(Errors.NONE.code(), 1, "consumer1", Arrays.asList(new TopicPartition("test11", 1), new TopicPartition("test2", 1))); } private AbstractRequestResponse createListOffsetRequest() { @@ -119,7 +120,7 @@ public class RequestResponseTest { private AbstractRequestResponse createListOffsetResponse() { Map<TopicPartition, ListOffsetResponse.PartitionData> responseData = new HashMap<TopicPartition, ListOffsetResponse.PartitionData>(); - responseData.put(new TopicPartition("test", 0), new ListOffsetResponse.PartitionData((short)0, Arrays.asList(100L))); + responseData.put(new TopicPartition("test", 0), new ListOffsetResponse.PartitionData(Errors.NONE.code(), Arrays.asList(100L))); return new ListOffsetResponse(responseData); } @@ -145,7 +146,7 @@ public class RequestResponseTest { private AbstractRequestResponse createOffsetCommitResponse() { Map<TopicPartition, Short> responseData = new HashMap<TopicPartition, Short>(); - responseData.put(new TopicPartition("test", 0), (short)0); + responseData.put(new TopicPartition("test", 0), Errors.NONE.code()); return new OffsetCommitResponse(responseData); } @@ -155,19 +156,19 @@ public class RequestResponseTest { private AbstractRequestResponse createOffsetFetchResponse() { Map<TopicPartition, OffsetFetchResponse.PartitionData> responseData = new HashMap<TopicPartition, OffsetFetchResponse.PartitionData>(); - responseData.put(new TopicPartition("test", 0), new OffsetFetchResponse.PartitionData(100L, "", (short)0)); + responseData.put(new TopicPartition("test", 0), new OffsetFetchResponse.PartitionData(100L, "", Errors.NONE.code())); return new OffsetFetchResponse(responseData); } private AbstractRequestResponse createProduceRequest() { Map<TopicPartition, ByteBuffer> produceData = new HashMap<TopicPartition, ByteBuffer>(); produceData.put(new TopicPartition("test", 0), ByteBuffer.allocate(10)); - return new ProduceRequest((short)0, 5000, produceData); + return new ProduceRequest(Errors.NONE.code(), 5000, produceData); } private AbstractRequestResponse createProduceResponse() { Map<TopicPartition, ProduceResponse.PartitionResponse> responseData = new HashMap<TopicPartition, ProduceResponse.PartitionResponse>(); - responseData.put(new TopicPartition("test", 0), new ProduceResponse.PartitionResponse((short) 0, 10000)); + responseData.put(new TopicPartition("test", 0), new ProduceResponse.PartitionResponse(Errors.NONE.code(), 10000)); return new ProduceResponse(responseData); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java b/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java index b6e1497..f5cd61c 100644 --- a/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java +++ b/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java @@ -35,13 +35,13 @@ public class SerializationTest { @Test public void testStringSerializer() { - String str = "my string"; + String str = "my string"; String mytopic = "testTopic"; List<String> encodings = new ArrayList<String>(); encodings.add("UTF8"); encodings.add("UTF-16"); - for ( String encoding : encodings) { + for (String encoding : encodings) { SerDeser<String> serDeser = getStringSerDeser(encoding); Serializer<String> serializer = serDeser.serializer; Deserializer<String> deserializer = serDeser.deserializer; http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/test/java/org/apache/kafka/common/utils/ClientUtilsTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/utils/ClientUtilsTest.java b/clients/src/test/java/org/apache/kafka/common/utils/ClientUtilsTest.java deleted file mode 100644 index 6e37ea5..0000000 --- a/clients/src/test/java/org/apache/kafka/common/utils/ClientUtilsTest.java +++ /dev/null @@ -1,42 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.common.utils; - -import org.apache.kafka.common.config.ConfigException; -import org.junit.Test; - -import java.util.Arrays; - -public class ClientUtilsTest { - - @Test - public void testParseAndValidateAddresses() { - check("127.0.0.1:8000"); - check("mydomain.com:8080"); - check("[::1]:8000"); - check("[2001:db8:85a3:8d3:1319:8a2e:370:7348]:1234", "mydomain.com:10000"); - } - - @Test(expected = ConfigException.class) - public void testNoPort() { - check("127.0.0.1"); - } - - private void check(String... url) { - ClientUtils.parseAndValidateAddresses(Arrays.asList(url)); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/test/java/org/apache/kafka/common/utils/CrcTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/utils/CrcTest.java b/clients/src/test/java/org/apache/kafka/common/utils/CrcTest.java index 6b32381..c39c3cf 100644 --- a/clients/src/test/java/org/apache/kafka/common/utils/CrcTest.java +++ b/clients/src/test/java/org/apache/kafka/common/utils/CrcTest.java @@ -25,7 +25,7 @@ public class CrcTest { @Test public void testUpdate() { - final byte bytes[] = "Any String you want".getBytes(); + final byte[] bytes = "Any String you want".getBytes(); final int len = bytes.length; Crc32 crc1 = new Crc32(); @@ -33,10 +33,10 @@ public class CrcTest { Crc32 crc3 = new Crc32(); crc1.update(bytes, 0, len); - for(int i = 0; i < len; i++) + for (int i = 0; i < len; i++) crc2.update(bytes[i]); - crc3.update(bytes, 0, len/2); - crc3.update(bytes, len/2, len-len/2); + crc3.update(bytes, 0, len / 2); + crc3.update(bytes, len / 2, len - len / 2); assertEquals("Crc values should be the same", crc1.getValue(), crc2.getValue()); assertEquals("Crc values should be the same", crc1.getValue(), crc3.getValue()); http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/test/java/org/apache/kafka/test/Microbenchmarks.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/test/Microbenchmarks.java b/clients/src/test/java/org/apache/kafka/test/Microbenchmarks.java index b24d4de..8cd19b2 100644 --- a/clients/src/test/java/org/apache/kafka/test/Microbenchmarks.java +++ b/clients/src/test/java/org/apache/kafka/test/Microbenchmarks.java @@ -162,7 +162,6 @@ public class Microbenchmarks { for (int i = 0; i < numThreads; i++) { threads.add(new Thread() { public void run() { - int sum = 0; long start = System.nanoTime(); for (int j = 0; j < iters; j++) map.get(keys.get(j % threads.size())); http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/test/java/org/apache/kafka/test/TestUtils.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/test/TestUtils.java b/clients/src/test/java/org/apache/kafka/test/TestUtils.java index 76a17e8..20dba7b 100644 --- a/clients/src/test/java/org/apache/kafka/test/TestUtils.java +++ b/clients/src/test/java/org/apache/kafka/test/TestUtils.java @@ -35,15 +35,15 @@ import org.apache.kafka.common.PartitionInfo; */ public class TestUtils { - public static File IO_TMP_DIR = new File(System.getProperty("java.io.tmpdir")); + public static final File IO_TMP_DIR = new File(System.getProperty("java.io.tmpdir")); - public static String LETTERS = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"; - public static String DIGITS = "0123456789"; - public static String LETTERS_AND_DIGITS = LETTERS + DIGITS; + public static final String LETTERS = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"; + public static final String DIGITS = "0123456789"; + public static final String LETTERS_AND_DIGITS = LETTERS + DIGITS; /* A consistent random number generator to make tests repeatable */ - public static final Random seededRandom = new Random(192348092834L); - public static final Random random = new Random(); + public static final Random SEEDED_RANDOM = new Random(192348092834L); + public static final Random RANDOM = new Random(); public static Cluster singletonCluster(String topic, int partitions) { return clusterWith(1, topic, partitions); @@ -92,7 +92,7 @@ public class TestUtils { */ public static byte[] randomBytes(int size) { byte[] bytes = new byte[size]; - seededRandom.nextBytes(bytes); + SEEDED_RANDOM.nextBytes(bytes); return bytes; } @@ -105,7 +105,7 @@ public class TestUtils { public static String randomString(int len) { StringBuilder b = new StringBuilder(); for (int i = 0; i < len; i++) - b.append(LETTERS_AND_DIGITS.charAt(seededRandom.nextInt(LETTERS_AND_DIGITS.length()))); + b.append(LETTERS_AND_DIGITS.charAt(SEEDED_RANDOM.nextInt(LETTERS_AND_DIGITS.length()))); return b.toString(); } http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/core/src/main/scala/kafka/javaapi/consumer/ConsumerRebalanceListener.java ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/javaapi/consumer/ConsumerRebalanceListener.java b/core/src/main/scala/kafka/javaapi/consumer/ConsumerRebalanceListener.java index facf509..7f45a90 100644 --- a/core/src/main/scala/kafka/javaapi/consumer/ConsumerRebalanceListener.java +++ b/core/src/main/scala/kafka/javaapi/consumer/ConsumerRebalanceListener.java @@ -17,9 +17,6 @@ package kafka.javaapi.consumer; -import kafka.common.TopicAndPartition; -import kafka.consumer.ConsumerThreadId; - import java.util.Map; import java.util.Set; http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/core/src/main/scala/kafka/message/CompressionFactory.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/message/CompressionFactory.scala b/core/src/main/scala/kafka/message/CompressionFactory.scala index c721040..b047f68 100644 --- a/core/src/main/scala/kafka/message/CompressionFactory.scala +++ b/core/src/main/scala/kafka/message/CompressionFactory.scala @@ -22,7 +22,7 @@ import java.util.zip.GZIPOutputStream import java.util.zip.GZIPInputStream import java.io.InputStream -import org.apache.kafka.common.message.{KafkaLZ4BlockInputStream, KafkaLZ4BlockOutputStream} +import org.apache.kafka.common.record.{KafkaLZ4BlockInputStream, KafkaLZ4BlockOutputStream} object CompressionFactory { http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/core/src/main/scala/kafka/tools/KafkaMigrationTool.java ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/tools/KafkaMigrationTool.java b/core/src/main/scala/kafka/tools/KafkaMigrationTool.java index 7909d25..026d819 100644 --- a/core/src/main/scala/kafka/tools/KafkaMigrationTool.java +++ b/core/src/main/scala/kafka/tools/KafkaMigrationTool.java @@ -60,7 +60,7 @@ import java.util.concurrent.atomic.AtomicBoolean; @SuppressWarnings({"unchecked", "rawtypes"}) public class KafkaMigrationTool { - private static final org.apache.log4j.Logger logger = org.apache.log4j.Logger.getLogger(KafkaMigrationTool.class.getName()); + private static final org.apache.log4j.Logger log = org.apache.log4j.Logger.getLogger(KafkaMigrationTool.class.getName()); private static final String KAFKA_07_STATIC_CONSUMER_CLASS_NAME = "kafka.consumer.Consumer"; private static final String KAFKA_07_CONSUMER_CONFIG_CLASS_NAME = "kafka.consumer.ConsumerConfig"; private static final String KAFKA_07_CONSUMER_STREAM_CLASS_NAME = "kafka.consumer.KafkaStream"; @@ -194,7 +194,7 @@ public class KafkaMigrationTool kafkaConsumerProperties_07.load(new FileInputStream(consumerConfigFile_07)); /** Disable shallow iteration because the message format is different between 07 and 08, we have to get each individual message **/ if(kafkaConsumerProperties_07.getProperty("shallow.iterator.enable", "").equals("true")) { - logger.warn("Shallow iterator should not be used in the migration tool"); + log.warn("Shallow iterator should not be used in the migration tool"); kafkaConsumerProperties_07.setProperty("shallow.iterator.enable", "false"); } Object consumerConfig_07 = ConsumerConfigConstructor_07.newInstance(kafkaConsumerProperties_07); @@ -230,7 +230,7 @@ public class KafkaMigrationTool try { ConsumerConnectorShutdownMethod_07.invoke(consumerConnector_07); } catch(Exception e) { - logger.error("Error while shutting down Kafka consumer", e); + log.error("Error while shutting down Kafka consumer", e); } for(MigrationThread migrationThread : migrationThreads) { migrationThread.shutdown(); @@ -241,7 +241,7 @@ public class KafkaMigrationTool for(ProducerThread producerThread : producerThreads) { producerThread.awaitShutdown(); } - logger.info("Kafka migration tool shutdown successfully"); + log.info("Kafka migration tool shutdown successfully"); } }); @@ -266,7 +266,7 @@ public class KafkaMigrationTool } catch (Throwable e){ System.out.println("Kafka migration tool failed due to: " + Utils.stackTrace(e)); - logger.error("Kafka migration tool failed: ", e); + log.error("Kafka migration tool failed: ", e); } } @@ -388,7 +388,7 @@ public class KafkaMigrationTool KeyedMessage<byte[], byte[]> data = producerDataChannel.receiveRequest(); if(!data.equals(shutdownMessage)) { producer.send(data); - if(logger.isDebugEnabled()) logger.debug("Sending message %s".format(new String(data.message()))); + if(logger.isDebugEnabled()) logger.debug(String.format("Sending message %s", new String(data.message()))); } else break; http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/core/src/main/scala/kafka/utils/Crc32.java ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/utils/Crc32.java b/core/src/main/scala/kafka/utils/Crc32.java index af9fe0d..0e0e7bc 100644 --- a/core/src/main/scala/kafka/utils/Crc32.java +++ b/core/src/main/scala/kafka/utils/Crc32.java @@ -62,16 +62,16 @@ public class Crc32 implements Checksum { final int c1 =(b[off+1] ^ (localCrc >>>= 8)) & 0xff; final int c2 =(b[off+2] ^ (localCrc >>>= 8)) & 0xff; final int c3 =(b[off+3] ^ (localCrc >>>= 8)) & 0xff; - localCrc = (T[T8_7_start + c0] ^ T[T8_6_start + c1]) - ^ (T[T8_5_start + c2] ^ T[T8_4_start + c3]); + localCrc = (T[T8_7_START + c0] ^ T[T8_6_START + c1]) + ^ (T[T8_5_START + c2] ^ T[T8_4_START + c3]); final int c4 = b[off+4] & 0xff; final int c5 = b[off+5] & 0xff; final int c6 = b[off+6] & 0xff; final int c7 = b[off+7] & 0xff; - localCrc ^= (T[T8_3_start + c4] ^ T[T8_2_start + c5]) - ^ (T[T8_1_start + c6] ^ T[T8_0_start + c7]); + localCrc ^= (T[T8_3_START + c4] ^ T[T8_2_START + c5]) + ^ (T[T8_1_START + c6] ^ T[T8_0_START + c7]); off += 8; len -= 8; @@ -79,13 +79,13 @@ public class Crc32 implements Checksum { /* loop unroll - duff's device style */ switch(len) { - case 7: localCrc = (localCrc >>> 8) ^ T[T8_0_start + ((localCrc ^ b[off++]) & 0xff)]; - case 6: localCrc = (localCrc >>> 8) ^ T[T8_0_start + ((localCrc ^ b[off++]) & 0xff)]; - case 5: localCrc = (localCrc >>> 8) ^ T[T8_0_start + ((localCrc ^ b[off++]) & 0xff)]; - case 4: localCrc = (localCrc >>> 8) ^ T[T8_0_start + ((localCrc ^ b[off++]) & 0xff)]; - case 3: localCrc = (localCrc >>> 8) ^ T[T8_0_start + ((localCrc ^ b[off++]) & 0xff)]; - case 2: localCrc = (localCrc >>> 8) ^ T[T8_0_start + ((localCrc ^ b[off++]) & 0xff)]; - case 1: localCrc = (localCrc >>> 8) ^ T[T8_0_start + ((localCrc ^ b[off++]) & 0xff)]; + case 7: localCrc = (localCrc >>> 8) ^ T[T8_0_START + ((localCrc ^ b[off++]) & 0xff)]; + case 6: localCrc = (localCrc >>> 8) ^ T[T8_0_START + ((localCrc ^ b[off++]) & 0xff)]; + case 5: localCrc = (localCrc >>> 8) ^ T[T8_0_START + ((localCrc ^ b[off++]) & 0xff)]; + case 4: localCrc = (localCrc >>> 8) ^ T[T8_0_START + ((localCrc ^ b[off++]) & 0xff)]; + case 3: localCrc = (localCrc >>> 8) ^ T[T8_0_START + ((localCrc ^ b[off++]) & 0xff)]; + case 2: localCrc = (localCrc >>> 8) ^ T[T8_0_START + ((localCrc ^ b[off++]) & 0xff)]; + case 1: localCrc = (localCrc >>> 8) ^ T[T8_0_START + ((localCrc ^ b[off++]) & 0xff)]; default: /* nothing */ } @@ -96,21 +96,21 @@ public class Crc32 implements Checksum { @Override final public void update(int b) { - crc = (crc >>> 8) ^ T[T8_0_start + ((crc ^ b) & 0xff)]; + crc = (crc >>> 8) ^ T[T8_0_START + ((crc ^ b) & 0xff)]; } /* * CRC-32 lookup tables generated by the polynomial 0xEDB88320. * See also TestPureJavaCrc32.Table. */ - private static final int T8_0_start = 0*256; - private static final int T8_1_start = 1*256; - private static final int T8_2_start = 2*256; - private static final int T8_3_start = 3*256; - private static final int T8_4_start = 4*256; - private static final int T8_5_start = 5*256; - private static final int T8_6_start = 6*256; - private static final int T8_7_start = 7*256; + private static final int T8_0_START = 0*256; + private static final int T8_1_START = 1*256; + private static final int T8_2_START = 2*256; + private static final int T8_3_START = 3*256; + private static final int T8_4_START = 4*256; + private static final int T8_5_START = 5*256; + private static final int T8_6_START = 6*256; + private static final int T8_7_START = 7*256; private static final int[] T = new int[] { /* T8_0 */ http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/examples/src/main/java/kafka/examples/SimpleConsumerDemo.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/kafka/examples/SimpleConsumerDemo.java b/examples/src/main/java/kafka/examples/SimpleConsumerDemo.java index c79192c..0d66fe5 100644 --- a/examples/src/main/java/kafka/examples/SimpleConsumerDemo.java +++ b/examples/src/main/java/kafka/examples/SimpleConsumerDemo.java @@ -22,7 +22,7 @@ import kafka.javaapi.FetchResponse; import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; -import java.util.ArrayList; +import java.util.Collections; import java.util.List; import kafka.javaapi.consumer.SimpleConsumer; import kafka.javaapi.message.ByteBufferMessageSet; @@ -71,10 +71,9 @@ public class SimpleConsumerDemo { printMessages((ByteBufferMessageSet) fetchResponse.messageSet(KafkaProperties.topic2, 0)); System.out.println("Testing single multi-fetch"); - Map<String, List<Integer>> topicMap = new HashMap<String, List<Integer>>() {{ - put(KafkaProperties.topic2, new ArrayList<Integer>(){{ add(0); }}); - put(KafkaProperties.topic3, new ArrayList<Integer>(){{ add(0); }}); - }}; + Map<String, List<Integer>> topicMap = new HashMap<String, List<Integer>>(); + topicMap.put(KafkaProperties.topic2, Collections.singletonList(0)); + topicMap.put(KafkaProperties.topic3, Collections.singletonList(0)); req = new FetchRequestBuilder() .clientId(KafkaProperties.clientId) .addFetch(KafkaProperties.topic2, 0, 0L, 100)
