Repository: kafka Updated Branches: refs/heads/trunk d2792e356 -> d9b784e14
KAFKA-4796; Fix some findbugs warnings in Kafka Java client Author: Colin P. Mccabe <[email protected]> Reviewers: Ismael Juma <[email protected]> Closes #2593 from cmccabe/KAFKA-4796 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d9b784e1 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d9b784e1 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d9b784e1 Branch: refs/heads/trunk Commit: d9b784e1470714c8b04e7c3d74f626a96ca1591e Parents: d2792e3 Author: Colin P. Mccabe <[email protected]> Authored: Sat Mar 4 00:52:26 2017 +0000 Committer: Ismael Juma <[email protected]> Committed: Sat Mar 4 00:52:26 2017 +0000 ---------------------------------------------------------------------- checkstyle/suppressions.xml | 2 +- .../producer/internals/DefaultPartitioner.java | 4 +- .../apache/kafka/common/config/ConfigDef.java | 7 +- .../apache/kafka/common/protocol/Protocol.java | 5 -- .../kafka/common/protocol/types/Struct.java | 2 +- .../org/apache/kafka/common/utils/Bytes.java | 3 +- .../org/apache/kafka/common/utils/Utils.java | 67 +++++--------------- .../org/apache/kafka/clients/MockClient.java | 2 +- .../internals/AbstractCoordinatorTest.java | 2 +- .../internals/RecordAccumulatorTest.java | 2 +- .../clients/producer/internals/SenderTest.java | 4 +- .../types/ProtocolSerializationTest.java | 12 ++++ .../kafka/common/security/JaasContextTest.java | 4 +- .../security/scram/ScramMessagesTest.java | 2 +- .../common/utils/AbstractIteratorTest.java | 2 +- .../apache/kafka/common/utils/UtilsTest.java | 64 +++++++++++++++++++ .../org/apache/kafka/test/TestSslUtils.java | 4 +- 17 files changed, 113 insertions(+), 75 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/d9b784e1/checkstyle/suppressions.xml ---------------------------------------------------------------------- diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index 9f13307..a39695f 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -32,7 +32,7 @@ files=".*/protocol/Errors.java"/> <suppress checks="BooleanExpressionComplexity" - files="KafkaLZ4BlockOutputStream.java"/> + files="(Utils|KafkaLZ4BlockOutputStream).java"/> <suppress checks="CyclomaticComplexity" files="ConsumerCoordinator.java"/> http://git-wip-us.apache.org/repos/asf/kafka/blob/d9b784e1/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java index 086534a..9d4ecbf 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java @@ -18,9 +18,9 @@ package org.apache.kafka.clients.producer.internals; import java.util.List; import java.util.Map; -import java.util.Random; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicInteger; import org.apache.kafka.clients.producer.Partitioner; @@ -73,7 +73,7 @@ public class DefaultPartitioner implements Partitioner { private int nextValue(String topic) { AtomicInteger counter = topicCounterMap.get(topic); if (null == counter) { - counter = new AtomicInteger(new Random().nextInt()); + counter = new AtomicInteger(ThreadLocalRandom.current().nextInt()); AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter); if (currentCounter != null) { counter = currentCounter; http://git-wip-us.apache.org/repos/asf/kafka/blob/d9b784e1/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java index f7cb8a9..3396f63 100644 --- a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java +++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java @@ -72,7 +72,10 @@ import java.util.Set; * functionality for accessing configs. */ public class ConfigDef { - + /** + * A unique Java object which represents the lack of a default value.<p> + * The 'new' here is intentional. + */ public static final Object NO_DEFAULT_VALUE = new String(""); private final Map<String, ConfigKey> configKeys; @@ -816,7 +819,7 @@ public class ConfigDef { public void ensureValid(String name, Object o) { if (o == null) - throw new ConfigException(name, o, "Value must be non-null"); + throw new ConfigException(name, null, "Value must be non-null"); Number n = (Number) o; if (min != null && n.doubleValue() < min.doubleValue()) throw new ConfigException(name, o, "Value must be at least " + min); http://git-wip-us.apache.org/repos/asf/kafka/blob/d9b784e1/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java index 25d380b..3343133 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java @@ -1090,11 +1090,6 @@ public class Protocol { Type innerType = ((ArrayOf) field.type).type(); if (!subTypes.containsKey(field.name)) subTypes.put(field.name, innerType); - } else if (field.type instanceof Schema) { - b.append(field.name); - b.append(" "); - if (!subTypes.containsKey(field.name)) - subTypes.put(field.name, field.type); } else { b.append(field.name); b.append(" "); http://git-wip-us.apache.org/repos/asf/kafka/blob/d9b784e1/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java index c32aea7..325690d 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java @@ -349,7 +349,7 @@ public class Struct { } else { Object thisField = this.get(f); Object otherField = other.get(f); - result = (thisField == null && otherField == null) || thisField.equals(otherField); + return (thisField == null) ? (otherField == null) : thisField.equals(otherField); } if (!result) return false; http://git-wip-us.apache.org/repos/asf/kafka/blob/d9b784e1/clients/src/main/java/org/apache/kafka/common/utils/Bytes.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Bytes.java b/clients/src/main/java/org/apache/kafka/common/utils/Bytes.java index e28d925..4099155 100644 --- a/clients/src/main/java/org/apache/kafka/common/utils/Bytes.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/Bytes.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.common.utils; +import java.io.Serializable; import java.util.Arrays; import java.util.Comparator; @@ -138,7 +139,7 @@ public class Bytes implements Comparable<Bytes> { */ public final static Comparator<byte[]> BYTES_LEXICO_COMPARATOR = new LexicographicByteArrayComparator(); - private interface ByteArrayComparator extends Comparator<byte[]> { + private interface ByteArrayComparator extends Comparator<byte[]>, Serializable { int compare(final byte[] buffer1, int offset1, int length1, final byte[] buffer2, int offset2, int length2); http://git-wip-us.apache.org/repos/asf/kafka/blob/d9b784e1/clients/src/main/java/org/apache/kafka/common/utils/Utils.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java index 20ab814..ed5eddb 100755 --- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java @@ -103,16 +103,6 @@ public class Utils { } /** - * Read an unsigned integer from the current position in the buffer, incrementing the position by 4 bytes - * - * @param buffer The buffer to read from - * @return The integer read, as a long to avoid signedness - */ - public static long readUnsignedInt(ByteBuffer buffer) { - return buffer.getInt() & 0xffffffffL; - } - - /** * Read an unsigned integer from the given position without modifying the buffers position * * @param buffer the buffer to read from @@ -130,28 +120,13 @@ public class Utils { * @return The integer read (MUST BE TREATED WITH SPECIAL CARE TO AVOID SIGNEDNESS) */ public static int readUnsignedIntLE(InputStream in) throws IOException { - return (in.read() << 8 * 0) - | (in.read() << 8 * 1) - | (in.read() << 8 * 2) - | (in.read() << 8 * 3); + return in.read() + | (in.read() << 8) + | (in.read() << 16) + | (in.read() << 24); } /** - * Get the little-endian value of an integer as a byte array. - * @param val The value to convert to a little-endian array - * @return The little-endian encoded array of bytes for the value - */ - public static byte[] toArrayLE(int val) { - return new byte[] { - (byte) (val >> 8 * 0), - (byte) (val >> 8 * 1), - (byte) (val >> 8 * 2), - (byte) (val >> 8 * 3) - }; - } - - - /** * Read an unsigned integer stored in little-endian format from a byte array * at a given offset. * @@ -160,20 +135,10 @@ public class Utils { * @return The integer read (MUST BE TREATED WITH SPECIAL CARE TO AVOID SIGNEDNESS) */ public static int readUnsignedIntLE(byte[] buffer, int offset) { - return (buffer[offset++] << 8 * 0) - | (buffer[offset++] << 8 * 1) - | (buffer[offset++] << 8 * 2) - | (buffer[offset] << 8 * 3); - } - - /** - * Write the given long value as a 4 byte unsigned integer. Overflow is ignored. - * - * @param buffer The buffer to write to - * @param value The value to write - */ - public static void writeUnsignedInt(ByteBuffer buffer, long value) { - buffer.putInt((int) (value & 0xffffffffL)); + return (buffer[offset] << 0 & 0xff) + | ((buffer[offset + 1] & 0xff) << 8) + | ((buffer[offset + 2] & 0xff) << 16) + | ((buffer[offset + 3] & 0xff) << 24); } /** @@ -194,10 +159,10 @@ public class Utils { * @param value The value to write */ public static void writeUnsignedIntLE(OutputStream out, int value) throws IOException { - out.write(value >>> 8 * 0); - out.write(value >>> 8 * 1); - out.write(value >>> 8 * 2); - out.write(value >>> 8 * 3); + out.write(value); + out.write(value >>> 8); + out.write(value >>> 16); + out.write(value >>> 24); } /** @@ -209,10 +174,10 @@ public class Utils { * @param value The value to write */ public static void writeUnsignedIntLE(byte[] buffer, int offset, int value) { - buffer[offset++] = (byte) (value >>> 8 * 0); - buffer[offset++] = (byte) (value >>> 8 * 1); - buffer[offset++] = (byte) (value >>> 8 * 2); - buffer[offset] = (byte) (value >>> 8 * 3); + buffer[offset] = (byte) value; + buffer[offset + 1] = (byte) (value >>> 8); + buffer[offset + 2] = (byte) (value >>> 16); + buffer[offset + 3] = (byte) (value >>> 24); } http://git-wip-us.apache.org/repos/asf/kafka/blob/d9b784e1/clients/src/test/java/org/apache/kafka/clients/MockClient.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java index f97e407..7e05881 100644 --- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java +++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java @@ -47,7 +47,7 @@ public class MockClient implements KafkaClient { } }; - private class FutureResponse { + private static class FutureResponse { public final AbstractResponse responseBody; public final boolean disconnected; public final RequestMatcher requestMatcher; http://git-wip-us.apache.org/repos/asf/kafka/blob/d9b784e1/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java index 3eb6561..45ee29a 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java @@ -483,7 +483,7 @@ public class AbstractCoordinatorTest { return new SyncGroupResponse(error, ByteBuffer.allocate(0)); } - public class DummyCoordinator extends AbstractCoordinator { + public static class DummyCoordinator extends AbstractCoordinator { private int onJoinPrepareInvokes = 0; private int onJoinCompleteInvokes = 0; http://git-wip-us.apache.org/repos/asf/kafka/blob/d9b784e1/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java index b96594f..1cb510e 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java @@ -84,7 +84,7 @@ public class RecordAccumulatorTest { public void testFull() throws Exception { long now = time.milliseconds(); int batchSize = 1024; - RecordAccumulator accum = new RecordAccumulator(batchSize, 10 * batchSize, CompressionType.NONE, 10L, 100L, metrics, time); + RecordAccumulator accum = new RecordAccumulator(batchSize, 10L * batchSize, CompressionType.NONE, 10L, 100L, metrics, time); int appends = batchSize / msgSize; for (int i = 0; i < appends; i++) { // append to the first batch http://git-wip-us.apache.org/repos/asf/kafka/blob/d9b784e1/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java index e9a7188..50ea219 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java @@ -151,7 +151,7 @@ public class SenderTest { sender.run(time.milliseconds()); // connect sender.run(time.milliseconds()); // send produce request String id = client.requests().peek().destination(); - Node node = new Node(Integer.valueOf(id), "localhost", 0); + Node node = new Node(Integer.parseInt(id), "localhost", 0); assertEquals(1, client.inFlightRequestCount()); assertTrue("Client ready status should be true", client.isReady(node, 0L)); client.disconnect(id); @@ -210,7 +210,7 @@ public class SenderTest { sender.run(time.milliseconds()); // send produce request String id = client.requests().peek().destination(); assertEquals(ApiKeys.PRODUCE, client.requests().peek().requestBuilder().apiKey()); - Node node = new Node(Integer.valueOf(id), "localhost", 0); + Node node = new Node(Integer.parseInt(id), "localhost", 0); assertEquals(1, client.inFlightRequestCount()); assertTrue("Client ready status should be true", client.isReady(node, 0L)); http://git-wip-us.apache.org/repos/asf/kafka/blob/d9b784e1/clients/src/test/java/org/apache/kafka/common/protocol/types/ProtocolSerializationTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/protocol/types/ProtocolSerializationTest.java b/clients/src/test/java/org/apache/kafka/common/protocol/types/ProtocolSerializationTest.java index 74c9302..1c14e82 100644 --- a/clients/src/test/java/org/apache/kafka/common/protocol/types/ProtocolSerializationTest.java +++ b/clients/src/test/java/org/apache/kafka/common/protocol/types/ProtocolSerializationTest.java @@ -17,6 +17,7 @@ package org.apache.kafka.common.protocol.types; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.fail; @@ -258,4 +259,15 @@ public class ProtocolSerializationTest { assertEquals("The object read back should be the same as what was written.", obj, result); } + @Test + public void testStructEquals() { + Schema schema = new Schema(new Field("field1", Type.NULLABLE_STRING), new Field("field2", Type.NULLABLE_STRING)); + Struct emptyStruct1 = new Struct(schema); + Struct emptyStruct2 = new Struct(schema); + assertEquals(emptyStruct1, emptyStruct2); + + Struct mostlyEmptyStruct = new Struct(schema).set("field1", "foo"); + assertNotEquals(emptyStruct1, mostlyEmptyStruct); + assertNotEquals(mostlyEmptyStruct, emptyStruct1); + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/d9b784e1/clients/src/test/java/org/apache/kafka/common/security/JaasContextTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/security/JaasContextTest.java b/clients/src/test/java/org/apache/kafka/common/security/JaasContextTest.java index 8d98a11..30799c5 100644 --- a/clients/src/test/java/org/apache/kafka/common/security/JaasContextTest.java +++ b/clients/src/test/java/org/apache/kafka/common/security/JaasContextTest.java @@ -58,8 +58,8 @@ public class JaasContextTest { } @After - public void tearDown() { - jaasConfigFile.delete(); + public void tearDown() throws Exception { + Files.delete(jaasConfigFile.toPath()); } @Test http://git-wip-us.apache.org/repos/asf/kafka/blob/d9b784e1/clients/src/test/java/org/apache/kafka/common/security/scram/ScramMessagesTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/security/scram/ScramMessagesTest.java b/clients/src/test/java/org/apache/kafka/common/security/scram/ScramMessagesTest.java index 53939ef..de97ce2 100644 --- a/clients/src/test/java/org/apache/kafka/common/security/scram/ScramMessagesTest.java +++ b/clients/src/test/java/org/apache/kafka/common/security/scram/ScramMessagesTest.java @@ -252,7 +252,7 @@ public class ScramMessagesTest { checkServerFinalMessage(m, null, serverSignature); // Default format used by Kafka clients for final message with error - str = String.format("e=other-error", serverSignature); + str = "e=other-error"; m = createScramMessage(ServerFinalMessage.class, str); checkServerFinalMessage(m, "other-error", null); m = new ServerFinalMessage(m.toBytes()); http://git-wip-us.apache.org/repos/asf/kafka/blob/d9b784e1/clients/src/test/java/org/apache/kafka/common/utils/AbstractIteratorTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/utils/AbstractIteratorTest.java b/clients/src/test/java/org/apache/kafka/common/utils/AbstractIteratorTest.java index 96adbbe..5ddab74 100644 --- a/clients/src/test/java/org/apache/kafka/common/utils/AbstractIteratorTest.java +++ b/clients/src/test/java/org/apache/kafka/common/utils/AbstractIteratorTest.java @@ -52,7 +52,7 @@ public class AbstractIteratorTest { iter.next(); } - class ListIterator<T> extends AbstractIterator<T> { + static class ListIterator<T> extends AbstractIterator<T> { private List<T> list; private int position = 0; http://git-wip-us.apache.org/repos/asf/kafka/blob/d9b784e1/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java index c3f69fa..5f36c1c 100755 --- a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java +++ b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java @@ -16,6 +16,8 @@ */ package org.apache.kafka.common.utils; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.EOFException; import java.nio.channels.FileChannel; import java.nio.file.StandardOpenOption; @@ -35,6 +37,7 @@ import org.junit.Test; import static org.apache.kafka.common.utils.Utils.formatAddress; import static org.apache.kafka.common.utils.Utils.getHost; import static org.apache.kafka.common.utils.Utils.getPort; +import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -333,4 +336,65 @@ public class UtilsTest { assertEquals(closeablesWithException[i].closeException, suppressed[i - 1]); } } + + @Test + public void testReadUnsignedIntLEFromArray() { + byte[] array1 = {0x01, 0x02, 0x03, 0x04, 0x05}; + assertEquals(0x04030201, Utils.readUnsignedIntLE(array1, 0)); + assertEquals(0x05040302, Utils.readUnsignedIntLE(array1, 1)); + + byte[] array2 = {(byte) 0xf1, (byte) 0xf2, (byte) 0xf3, (byte) 0xf4, (byte) 0xf5, (byte) 0xf6}; + assertEquals(0xf4f3f2f1, Utils.readUnsignedIntLE(array2, 0)); + assertEquals(0xf6f5f4f3, Utils.readUnsignedIntLE(array2, 2)); + } + + @Test + public void testReadUnsignedIntLEFromInputStream() throws IOException { + byte[] array1 = {0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09}; + ByteArrayInputStream is1 = new ByteArrayInputStream(array1); + assertEquals(0x04030201, Utils.readUnsignedIntLE(is1)); + assertEquals(0x08070605, Utils.readUnsignedIntLE(is1)); + + byte[] array2 = {(byte) 0xf1, (byte) 0xf2, (byte) 0xf3, (byte) 0xf4, (byte) 0xf5, (byte) 0xf6, (byte) 0xf7, (byte) 0xf8}; + ByteArrayInputStream is2 = new ByteArrayInputStream(array2); + assertEquals(0xf4f3f2f1, Utils.readUnsignedIntLE(is2)); + assertEquals(0xf8f7f6f5, Utils.readUnsignedIntLE(is2)); + } + + @Test + public void testWriteUnsignedIntLEToArray() { + int value1 = 0x04030201; + + byte[] array1 = new byte[4]; + Utils.writeUnsignedIntLE(array1, 0, value1); + assertArrayEquals(new byte[] {0x01, 0x02, 0x03, 0x04}, array1); + + array1 = new byte[8]; + Utils.writeUnsignedIntLE(array1, 2, value1); + assertArrayEquals(new byte[] {0, 0, 0x01, 0x02, 0x03, 0x04, 0, 0}, array1); + + int value2 = 0xf4f3f2f1; + + byte[] array2 = new byte[4]; + Utils.writeUnsignedIntLE(array2, 0, value2); + assertArrayEquals(new byte[] {(byte) 0xf1, (byte) 0xf2, (byte) 0xf3, (byte) 0xf4}, array2); + + array2 = new byte[8]; + Utils.writeUnsignedIntLE(array2, 2, value2); + assertArrayEquals(new byte[] {0, 0, (byte) 0xf1, (byte) 0xf2, (byte) 0xf3, (byte) 0xf4, 0, 0}, array2); + } + + @Test + public void testWriteUnsignedIntLEToOutputStream() throws IOException { + int value1 = 0x04030201; + ByteArrayOutputStream os1 = new ByteArrayOutputStream(); + Utils.writeUnsignedIntLE(os1, value1); + Utils.writeUnsignedIntLE(os1, value1); + assertArrayEquals(new byte[] {0x01, 0x02, 0x03, 0x04, 0x01, 0x02, 0x03, 0x04}, os1.toByteArray()); + + int value2 = 0xf4f3f2f1; + ByteArrayOutputStream os2 = new ByteArrayOutputStream(); + Utils.writeUnsignedIntLE(os2, value2); + assertArrayEquals(new byte[] {(byte) 0xf1, (byte) 0xf2, (byte) 0xf3, (byte) 0xf4}, os2.toByteArray()); + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/d9b784e1/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java b/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java index 7b78d3e..f4f8818 100644 --- a/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java +++ b/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java @@ -152,10 +152,8 @@ public class TestSslUtils { public static <T extends Certificate> void createTrustStore( String filename, Password password, Map<String, T> certs) throws GeneralSecurityException, IOException { KeyStore ks = KeyStore.getInstance("JKS"); - try { - FileInputStream in = new FileInputStream(filename); + try (FileInputStream in = new FileInputStream(filename)) { ks.load(in, password.value().toCharArray()); - in.close(); } catch (EOFException e) { ks = createEmptyKeyStore(); }
