This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch 2.2 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.2 by this push: new 237af21 MINOR: Java8 cleanup (#6598) 237af21 is described below commit 237af219aaf59e7c10ed77864320d5c231a77844 Author: Matthias J. Sax <mj...@apache.org> AuthorDate: Fri Apr 19 18:44:27 2019 -0700 MINOR: Java8 cleanup (#6598) Reviewers: Bill Bejeck <b...@confluent.io>, Guozhang Wang <guozh...@confluent.io> --- .../kafka/streams/tests/StreamsUpgradeTest.java | 53 ++++++++++------------ 1 file changed, 23 insertions(+), 30 deletions(-) diff --git a/streams/upgrade-system-tests-21/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/upgrade-system-tests-21/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java index 3e719cf..140f291 100644 --- a/streams/upgrade-system-tests-21/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java +++ b/streams/upgrade-system-tests-21/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java @@ -16,17 +16,17 @@ */ package org.apache.kafka.streams.tests; -import java.util.Properties; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.processor.AbstractProcessor; -import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.ProcessorSupplier; +import java.util.Properties; + public class StreamsUpgradeTest { @@ -59,40 +59,33 @@ public class StreamsUpgradeTest { final KafkaStreams streams = new KafkaStreams(builder.build(), config); streams.start(); - Runtime.getRuntime().addShutdownHook(new Thread() { - @Override - public void run() { - streams.close(); - System.out.println("UPGRADE-TEST-CLIENT-CLOSED"); - System.out.flush(); - } - }); + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + streams.close(); + System.out.println("UPGRADE-TEST-CLIENT-CLOSED"); + System.out.flush(); + })); } private static <K, V> ProcessorSupplier<K, V> printProcessorSupplier() { - return new ProcessorSupplier<K, V>() { - public Processor<K, V> get() { - return new AbstractProcessor<K, V>() { - private int numRecordsProcessed = 0; - - @Override - public void init(final ProcessorContext context) { - System.out.println("initializing processor: topic=data taskId=" + context.taskId()); - numRecordsProcessed = 0; - } + return () -> new AbstractProcessor<K, V>() { + private int numRecordsProcessed = 0; - @Override - public void process(final K key, final V value) { - numRecordsProcessed++; - if (numRecordsProcessed % 100 == 0) { - System.out.println("processed " + numRecordsProcessed + " records from topic=data"); - } - } + @Override + public void init(final ProcessorContext context) { + System.out.println("initializing processor: topic=data taskId=" + context.taskId()); + numRecordsProcessed = 0; + } - @Override - public void close() {} - }; + @Override + public void process(final K key, final V value) { + numRecordsProcessed++; + if (numRecordsProcessed % 100 == 0) { + System.out.println("processed " + numRecordsProcessed + " records from topic=data"); + } } + + @Override + public void close() {} }; } }