Repository: kafka Updated Branches: refs/heads/trunk a10d7b1b7 -> de05c9d3a
MINOR: Add code quality checks (and suppressions) to checkstyle.xml Author: Damian Guy <[email protected]> Reviewers: Guozhang Wang <[email protected]>, Ewen Cheslack-Postava <[email protected]>, Jason Gustafson <[email protected]>, Ismael Juma <[email protected]> Closes #2594 from dguy/checkstyle Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/de05c9d3 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/de05c9d3 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/de05c9d3 Branch: refs/heads/trunk Commit: de05c9d3a0d79a555088afe9344b52e002d287f2 Parents: a10d7b1 Author: Damian Guy <[email protected]> Authored: Tue Feb 28 22:55:46 2017 +0000 Committer: Ismael Juma <[email protected]> Committed: Tue Feb 28 22:57:57 2017 +0000 ---------------------------------------------------------------------- build.gradle | 5 +- checkstyle/checkstyle.xml | 36 +++ checkstyle/suppressions.xml | 229 +++++++++++++++++++ .../kafka/common/protocol/types/Struct.java | 2 +- .../SaslClientCallbackHandler.java | 2 +- .../clients/producer/ProducerRecordTest.java | 12 +- .../org/apache/kafka/streams/StreamsConfig.java | 6 +- .../kstream/internals/SessionKeySerde.java | 6 +- .../streams/processor/TopologyBuilder.java | 4 +- .../KStreamAggregationDedupIntegrationTest.java | 12 +- .../KStreamAggregationIntegrationTest.java | 24 +- .../internals/StreamsMetadataStateTest.java | 7 +- .../kafka/test/ProcessorTopologyTestDriver.java | 2 +- 13 files changed, 307 insertions(+), 40 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/de05c9d3/build.gradle ---------------------------------------------------------------------- diff --git a/build.gradle b/build.gradle index 417383d..fa2ed96 100644 --- a/build.gradle +++ b/build.gradle @@ -109,7 +109,8 @@ if (new File('.git').exists()) { 'gradlew.bat', '**/README.md', '**/id_rsa', - '**/id_rsa.pub' + '**/id_rsa.pub', + 'checkstyle/suppressions.xml' ]) } } @@ -272,6 +273,8 @@ subprojects { checkstyle { configFile = new File(rootDir, "checkstyle/checkstyle.xml") configProperties = [importControlFile: "$rootDir/checkstyle/import-control.xml"] + // version 7.x requires Java 8 + toolVersion = '6.19' } test.dependsOn('checkstyleMain', 'checkstyleTest') http://git-wip-us.apache.org/repos/asf/kafka/blob/de05c9d3/checkstyle/checkstyle.xml ---------------------------------------------------------------------- diff --git a/checkstyle/checkstyle.xml b/checkstyle/checkstyle.xml index 51b613d..9a4a37f 100644 --- a/checkstyle/checkstyle.xml +++ b/checkstyle/checkstyle.xml @@ -93,5 +93,41 @@ <property name="illegalPattern" value="true"/> <property name="ignoreComments" value="true"/> </module> + + <!-- code quality --> + <module name="MethodLength"/> + <module name="ParameterNumber"> + <!-- default is 8 --> + <property name="max" value="10"/> + </module> + <module name="ClassDataAbstractionCoupling"> + <!-- default is 7 --> + <property name="max" value="15"/> + </module> + <module name="BooleanExpressionComplexity"> + <!-- default is 3 --> + <property name="max" value="4"/> + </module> + + <module name="ClassFanOutComplexity"> + <!-- default is 20 --> + <property name="max" value="35"/> + </module> + <module name="CyclomaticComplexity"> + <!-- default is 10--> + <property name="max" value="15"/> + </module> + <module name="JavaNCSS"> + <!-- default is 50 --> + <property name="methodMaximum" value="100"/> + </module> + <module name="NPathComplexity"> + <!-- default is 200 --> + <property name="max" value="500"/> + </module> + </module> + + <module name="SuppressionFilter"> + <property name="file" value="checkstyle/suppressions.xml"/> </module> </module> http://git-wip-us.apache.org/repos/asf/kafka/blob/de05c9d3/checkstyle/suppressions.xml ---------------------------------------------------------------------- diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml new file mode 100644 index 0000000..9f13307 --- /dev/null +++ b/checkstyle/suppressions.xml @@ -0,0 +1,229 @@ +<?xml version="1.0"?> + +<!DOCTYPE suppressions PUBLIC + "-//Puppy Crawl//DTD Suppressions 1.1//EN" + "http://www.puppycrawl.com/dtds/suppressions_1_1.dtd"> + +<suppressions> + + <!-- Clients --> + <suppress checks="ClassFanOutComplexity" + files="(Fetcher|ConsumerCoordinator|KafkaConsumer|KafkaProducer|SaslServerAuthenticator).java"/> + <suppress checks="ClassFanOutComplexity" + files=".*/protocol/Errors.java"/> + + <suppress checks="MethodLength" + files="KerberosLogin.java"/> + + <suppress checks="ParameterNumber" + files="NetworkClient.java"/> + <suppress checks="ParameterNumber" + files="KafkaConsumer.java"/> + <suppress checks="ParameterNumber" + files="ConsumerCoordinator.java"/> + <suppress checks="ParameterNumber" + files="Fetcher.java"/> + <suppress checks="ParameterNumber" + files="ConfigDef.java"/> + + <suppress checks="ClassDataAbstractionCoupling" + files="(KafkaConsumer|ConsumerCoordinator|Fetcher|KafkaProducer|AbstractRequest|AbstractResponse).java"/> + <suppress checks="ClassDataAbstractionCoupling" + files=".*/protocol/Errors.java"/> + + <suppress checks="BooleanExpressionComplexity" + files="KafkaLZ4BlockOutputStream.java"/> + + <suppress checks="CyclomaticComplexity" + files="ConsumerCoordinator.java"/> + <suppress checks="CyclomaticComplexity" + files="Fetcher.java"/> + <suppress checks="CyclomaticComplexity" + files="KafkaProducer.java"/> + <suppress checks="CyclomaticComplexity" + files="BufferPool.java"/> + <suppress checks="CyclomaticComplexity" + files="RecordAccumulator.java"/> + <suppress checks="CyclomaticComplexity" + files="ConfigDef.java"/> + <suppress checks="CyclomaticComplexity" + files="Selector.java"/> + <suppress checks="CyclomaticComplexity" + files="SslTransportLayer.java"/> + <suppress checks="CyclomaticComplexity" + files="KerberosLogin.java"/> + <suppress checks="CyclomaticComplexity" + files="AbstractRequest.java"/> + <suppress checks="CyclomaticComplexity" + files="AbstractResponse.java"/> + + <suppress checks="JavaNCSS" + files="KerberosLogin.java"/> + + <suppress checks="NPathComplexity" + files="BufferPool.java"/> + <suppress checks="NPathComplexity" + files="MetricName.java"/> + <suppress checks="NPathComplexity" + files="Node.java"/> + <suppress checks="NPathComplexity" + files="ConfigDef.java"/> + <suppress checks="NPathComplexity" + files="SslTransportLayer.java"/> + <suppress checks="NPathComplexity" + files="MetadataResponse.java"/> + <suppress checks="NPathComplexity" + files="KerberosLogin.java"/> + <suppress checks="NPathComplexity" + files="SslTransportLayer.java"/> + + <!-- clients tests --> + <suppress checks="ClassDataAbstractionCoupling" + files="(Fetcher|KafkaConsumer|Metrics|ConsumerCoordinator|RequestResponse)Test.java"/> + + <suppress checks="ClassFanOutComplexity" + files="(ConsumerCoordinator|KafkaConsumer|RequestResponse|Fetcher)Test.java"/> + + <!-- Connect --> + <suppress checks="ClassFanOutComplexity" + files="DistributedHerder.java"/> + + <suppress checks="MethodLength" + files="KafkaConfigBackingStore.java"/> + + <suppress checks="ParameterNumber" + files="WorkerSourceTask.java"/> + <suppress checks="ParameterNumber" + files="WorkerCoordinator.java"/> + <suppress checks="ParameterNumber" + files="ConfigKeyInfo.java"/> + + <suppress checks="ClassDataAbstractionCoupling" + files="(RestServer|AbstractHerder|DistributedHerder).java"/> + + <suppress checks="BooleanExpressionComplexity" + files="JsonConverter.java"/> + + <suppress checks="CyclomaticComplexity" + files="ConnectRecord.java"/> + <suppress checks="CyclomaticComplexity" + files="JsonConverter.java"/> + <suppress checks="CyclomaticComplexity" + files="FileStreamSourceTask.java"/> + <suppress checks="CyclomaticComplexity" + files="DistributedHerder.java"/> + <suppress checks="CyclomaticComplexity" + files="KafkaConfigBackingStore.java"/> + + <suppress checks="JavaNCSS" + files="KafkaConfigBackingStore.java"/> + + <suppress checks="NPathComplexity" + files="ConnectRecord.java"/> + <suppress checks="NPathComplexity" + files="ConnectSchema.java"/> + <suppress checks="NPathComplexity" + files="FileStreamSourceTask.java"/> + <suppress checks="NPathComplexity" + files="JsonConverter.java"/> + <suppress checks="NPathComplexity" + files="DistributedHerder.java"/> + + <!-- connect tests--> + <suppress checks="ClassDataAbstractionCoupling" + files="(DistributedHerder|KafkaBasedLog)Test.java"/> + + <!-- Streams --> + <suppress checks="ClassFanOutComplexity" + files="(KafkaStreams|KStreamImpl|KTableImpl|StreamThread).java"/> + <suppress checks="ClassFanOutComplexity" + files="KStreamImpl.java"/> + <suppress checks="ClassFanOutComplexity" + files="KTableImpl.java"/> + <suppress checks="ClassFanOutComplexity" + files="StreamThread.java"/> + + <suppress checks="MethodLength" + files="StreamPartitionAssignor.java"/> + + <suppress checks="ParameterNumber" + files="StreamTask.java"/> + <suppress checks="ParameterNumber" + files="RocksDBWindowStoreSupplier.java"/> + + <suppress checks="ClassDataAbstractionCoupling" + files="(KStreamImpl|StreamPartitionAssignor|KafkaStreams|KTableImpl).java"/> + + <suppress checks="CyclomaticComplexity" + files="TopologyBuilder.java"/> + <suppress checks="CyclomaticComplexity" + files="StreamPartitionAssignor.java"/> + <suppress checks="CyclomaticComplexity" + files="StreamThread.java"/> + + <suppress checks="JavaNCSS" + files="StreamPartitionAssignor.java"/> + + <suppress checks="NPathComplexity" + files="ProcessorStateManager.java"/> + <suppress checks="NPathComplexity" + files="StreamPartitionAssignor.java"/> + <suppress checks="NPathComplexity" + files="StreamThread.java"/> + + <!-- streams tests --> + <suppress checks="ClassFanOutComplexity" + files="(StreamTaskTest|ProcessorTopologyTestDriver).java"/> + + <suppress checks="MethodLength" + files="KStreamKTableJoinIntegrationTest.java"/> + <suppress checks="MethodLength" + files="KStreamKStreamJoinTest.java"/> + <suppress checks="MethodLength" + files="KStreamWindowAggregateTest.java"/> + + <suppress checks="ClassDataAbstractionCoupling" + files=".*/streams/.*/Test.java"/> + <suppress checks="ClassDataAbstractionCoupling" + files=".*/streams/.*test/.*.java"/> + + <suppress checks="BooleanExpressionComplexity" + files="SmokeTestDriver.java"/> + + <suppress checks="CyclomaticComplexity" + files="KStreamKStreamJoinTest.java"/> + <suppress checks="CyclomaticComplexity" + files="SmokeTestDriver.java"/> + + <suppress checks="JavaNCSS" + files="KStreamKStreamJoinTest.java"/> + <suppress checks="JavaNCSS" + files="SmokeTestDriver.java"/> + + <suppress checks="NPathComplexity" + files="KStreamKStreamJoinTest.java"/> + <suppress checks="NPathComplexity" + files="KStreamKStreamLeftJoinTest.java"/> + + <!-- Tools --> + <suppress checks="ClassDataAbstractionCoupling" + files="VerifiableConsumer.java"/> + + <suppress checks="CyclomaticComplexity" + files="StreamsResetter.java"/> + <suppress checks="CyclomaticComplexity" + files="ProducerPerformance.java"/> + + <suppress checks="NPathComplexity" + files="StreamsResetter.java"/> + <suppress checks="NPathComplexity" + files="ProducerPerformance.java"/> + + <!-- Log4J-Appender --> + <suppress checks="CyclomaticComplexity" + files="KafkaLog4jAppender.java"/> + + <suppress checks="NPathComplexity" + files="KafkaLog4jAppender.java"/> + +</suppressions> http://git-wip-us.apache.org/repos/asf/kafka/blob/de05c9d3/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java index 1258763..c32aea7 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java @@ -316,7 +316,7 @@ public class Struct { Field f = this.schema.get(i); if (f.type() instanceof ArrayOf) { if (this.get(f) != null) { - Object[] arrayObject = (Object []) this.get(f); + Object[] arrayObject = (Object[]) this.get(f); for (Object arrayItem: arrayObject) result = prime * result + arrayItem.hashCode(); } http://git-wip-us.apache.org/repos/asf/kafka/blob/de05c9d3/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java index 3391ff3..7111bad 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java +++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java @@ -58,7 +58,7 @@ public class SaslClientCallbackHandler implements AuthCallbackHandler { nc.setName(nc.getDefaultName()); } else if (callback instanceof PasswordCallback) { if (!isKerberos && subject != null && !subject.getPrivateCredentials(String.class).isEmpty()) { - char [] password = subject.getPrivateCredentials(String.class).iterator().next().toCharArray(); + char[] password = subject.getPrivateCredentials(String.class).iterator().next().toCharArray(); ((PasswordCallback) callback).setPassword(password); } else { String errorMessage = "Could not login: the client is being asked for a password, but the Kafka" + http://git-wip-us.apache.org/repos/asf/kafka/blob/de05c9d3/clients/src/test/java/org/apache/kafka/clients/producer/ProducerRecordTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/ProducerRecordTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/ProducerRecordTest.java index 7d725fa..b5a7a60 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/ProducerRecordTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/ProducerRecordTest.java @@ -26,24 +26,24 @@ public class ProducerRecordTest { @Test public void testEqualsAndHashCode() { - ProducerRecord<String, Integer> producerRecord = new ProducerRecord<>("test", 1 , "key", 1); + ProducerRecord<String, Integer> producerRecord = new ProducerRecord<>("test", 1, "key", 1); assertEquals(producerRecord, producerRecord); assertEquals(producerRecord.hashCode(), producerRecord.hashCode()); - ProducerRecord<String, Integer> equalRecord = new ProducerRecord<>("test", 1 , "key", 1); + ProducerRecord<String, Integer> equalRecord = new ProducerRecord<>("test", 1, "key", 1); assertEquals(producerRecord, equalRecord); assertEquals(producerRecord.hashCode(), equalRecord.hashCode()); - ProducerRecord<String, Integer> topicMisMatch = new ProducerRecord<>("test-1", 1 , "key", 1); + ProducerRecord<String, Integer> topicMisMatch = new ProducerRecord<>("test-1", 1, "key", 1); assertFalse(producerRecord.equals(topicMisMatch)); - ProducerRecord<String, Integer> partitionMismatch = new ProducerRecord<>("test", 2 , "key", 1); + ProducerRecord<String, Integer> partitionMismatch = new ProducerRecord<>("test", 2, "key", 1); assertFalse(producerRecord.equals(partitionMismatch)); - ProducerRecord<String, Integer> keyMisMatch = new ProducerRecord<>("test", 1 , "key-1", 1); + ProducerRecord<String, Integer> keyMisMatch = new ProducerRecord<>("test", 1, "key-1", 1); assertFalse(producerRecord.equals(keyMisMatch)); - ProducerRecord<String, Integer> valueMisMatch = new ProducerRecord<>("test", 1 , "key", 2); + ProducerRecord<String, Integer> valueMisMatch = new ProducerRecord<>("test", 1, "key", 2); assertFalse(producerRecord.equals(valueMisMatch)); ProducerRecord<String, Integer> nullFieldsRecord = new ProducerRecord<>("topic", null, null, null, null); http://git-wip-us.apache.org/repos/asf/kafka/blob/de05c9d3/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index 8c81318..0eb3f7b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -388,8 +388,7 @@ public class StreamsConfig extends AbstractConfig { // this is the list of configs for underlying clients // that streams prefer different default values private static final Map<String, Object> PRODUCER_DEFAULT_OVERRIDES; - static - { + static { final Map<String, Object> tempProducerDefaultOverrides = new HashMap<>(); tempProducerDefaultOverrides.put(ProducerConfig.LINGER_MS_CONFIG, "100"); @@ -397,8 +396,7 @@ public class StreamsConfig extends AbstractConfig { } private static final Map<String, Object> CONSUMER_DEFAULT_OVERRIDES; - static - { + static { final Map<String, Object> tempConsumerDefaultOverrides = new HashMap<>(); tempConsumerDefaultOverrides.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1000"); tempConsumerDefaultOverrides.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); http://git-wip-us.apache.org/repos/asf/kafka/blob/de05c9d3/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionKeySerde.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionKeySerde.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionKeySerde.java index d4757ab..7eb8300 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionKeySerde.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionKeySerde.java @@ -112,15 +112,15 @@ public class SessionKeySerde<K> implements Serde<Windowed<K>> { } } - public static long extractEnd(final byte [] binaryKey) { + public static long extractEnd(final byte[] binaryKey) { return ByteBuffer.wrap(binaryKey).getLong(binaryKey.length - 2 * TIMESTAMP_SIZE); } - public static long extractStart(final byte [] binaryKey) { + public static long extractStart(final byte[] binaryKey) { return ByteBuffer.wrap(binaryKey).getLong(binaryKey.length - TIMESTAMP_SIZE); } - public static Window extractWindow(final byte [] binaryKey) { + public static Window extractWindow(final byte[] binaryKey) { final ByteBuffer buffer = ByteBuffer.wrap(binaryKey); final long start = buffer.getLong(binaryKey.length - TIMESTAMP_SIZE); final long end = buffer.getLong(binaryKey.length - 2 * TIMESTAMP_SIZE); http://git-wip-us.apache.org/repos/asf/kafka/blob/de05c9d3/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java index 827a152..99f5d65 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java @@ -294,7 +294,7 @@ public class TopologyBuilder { * Enum used to define auto offset reset policy when creating {@link KStream} or {@link KTable} */ public enum AutoOffsetReset { - EARLIEST , LATEST + EARLIEST, LATEST } /** @@ -864,7 +864,7 @@ public class TopologyBuilder { } } - private Set<String> findSourceTopicsForProcessorParents(String [] parents) { + private Set<String> findSourceTopicsForProcessorParents(String[] parents) { final Set<String> sourceTopics = new HashSet<>(); for (String parent : parents) { NodeFactory nodeFactory = nodeFactories.get(parent); http://git-wip-us.apache.org/repos/asf/kafka/blob/de05c9d3/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java index 039be44..dcaa222 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java @@ -131,8 +131,8 @@ public class KStreamAggregationDedupIntegrationTest { List<KeyValue<String, String>> results = receiveMessages( new StringDeserializer(), - new StringDeserializer() - , 5); + new StringDeserializer(), + 5); Collections.sort(results, new Comparator<KeyValue<String, String>>() { @Override @@ -181,8 +181,8 @@ public class KStreamAggregationDedupIntegrationTest { List<KeyValue<String, String>> windowedOutput = receiveMessages( new StringDeserializer(), - new StringDeserializer() - , 10); + new StringDeserializer(), + 10); Comparator<KeyValue<String, String>> comparator = @@ -233,8 +233,8 @@ public class KStreamAggregationDedupIntegrationTest { final List<KeyValue<String, Long>> results = receiveMessages( new StringDeserializer(), - new LongDeserializer() - , 5); + new LongDeserializer(), + 5); Collections.sort(results, new Comparator<KeyValue<String, Long>>() { @Override public int compare(final KeyValue<String, Long> o1, final KeyValue<String, Long> o2) { http://git-wip-us.apache.org/repos/asf/kafka/blob/de05c9d3/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java index 2551717..4eb582c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java @@ -155,8 +155,8 @@ public class KStreamAggregationIntegrationTest { final List<KeyValue<String, String>> results = receiveMessages( new StringDeserializer(), - new StringDeserializer() - , 10); + new StringDeserializer(), + 10); Collections.sort(results, new Comparator<KeyValue<String, String>>() { @Override @@ -209,8 +209,8 @@ public class KStreamAggregationIntegrationTest { final List<KeyValue<String, String>> windowedOutput = receiveMessages( new StringDeserializer(), - new StringDeserializer() - , 15); + new StringDeserializer(), + 15); final Comparator<KeyValue<String, String>> comparator = @@ -263,8 +263,8 @@ public class KStreamAggregationIntegrationTest { final List<KeyValue<String, Integer>> results = receiveMessages( new StringDeserializer(), - new IntegerDeserializer() - , 10); + new IntegerDeserializer(), + 10); Collections.sort(results, new Comparator<KeyValue<String, Integer>>() { @Override @@ -313,8 +313,8 @@ public class KStreamAggregationIntegrationTest { final List<KeyValue<String, Integer>> windowedMessages = receiveMessages( new StringDeserializer(), - new IntegerDeserializer() - , 15); + new IntegerDeserializer(), + 15); final Comparator<KeyValue<String, Integer>> comparator = @@ -364,8 +364,8 @@ public class KStreamAggregationIntegrationTest { final List<KeyValue<String, Long>> results = receiveMessages( new StringDeserializer(), - new LongDeserializer() - , 10); + new LongDeserializer(), + 10); Collections.sort(results, new Comparator<KeyValue<String, Long>>() { @Override public int compare(final KeyValue<String, Long> o1, final KeyValue<String, Long> o2) { @@ -406,8 +406,8 @@ public class KStreamAggregationIntegrationTest { final List<KeyValue<String, Long>> results = receiveMessages( new StringDeserializer(), - new LongDeserializer() - , 10); + new LongDeserializer(), + 10); Collections.sort(results, new Comparator<KeyValue<String, Long>>() { @Override public int compare(final KeyValue<String, Long> o1, final KeyValue<String, Long> o2) { http://git-wip-us.apache.org/repos/asf/kafka/blob/de05c9d3/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java index c121d96..c8ab6f1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java @@ -62,7 +62,7 @@ public class StreamsMetadataStateTest { private TopicPartition topic4P0; private List<PartitionInfo> partitionInfos; private Cluster cluster; - private final String globalTable = "global-table";; + private final String globalTable = "global-table"; private StreamPartitioner<String, Object> partitioner; @Before @@ -200,8 +200,9 @@ public class StreamsMetadataStateTest { final StreamsMetadata expected = new StreamsMetadata(hostThree, Utils.mkSet(globalTable, "table-three"), Collections.singleton(topic3P0)); - final StreamsMetadata actual = discovery.getMetadataWithKey("table-three", "the-key", - Serdes.String().serializer()); + final StreamsMetadata actual = discovery.getMetadataWithKey("table-three", + "the-key", + Serdes.String().serializer()); assertEquals(expected, actual); } http://git-wip-us.apache.org/repos/asf/kafka/blob/de05c9d3/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java index 5808e9a..b704aa7 100644 --- a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java @@ -202,7 +202,7 @@ public class ProcessorTopologyTestDriver { final MockConsumer<byte[], byte[]> globalConsumer = createGlobalConsumer(); for (final String topicName : globalTopology.sourceTopics()) { List<PartitionInfo> partitionInfos = new ArrayList<>(); - partitionInfos.add(new PartitionInfo(topicName , 1, null, null, null)); + partitionInfos.add(new PartitionInfo(topicName, 1, null, null, null)); globalConsumer.updatePartitions(topicName, partitionInfos); final TopicPartition partition = new TopicPartition(topicName, 1); globalConsumer.updateEndOffsets(Collections.singletonMap(partition, 0L));
