[
https://issues.apache.org/jira/browse/KAFKA-4772?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16344494#comment-16344494
]
ASF GitHub Bot commented on KAFKA-4772:
---------------------------------------
guozhangwang closed pull request #2704: KAFKA-4772: [WIP] Use KStreamPeek to
replace KeyValuePrinter
URL: https://github.com/apache/kafka/pull/2704
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 79abbb558eb..62fbecdf527 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -192,7 +192,7 @@ public void print(Serde<K> keySerde, Serde<V> valSerde) {
public void print(Serde<K> keySerde, Serde<V> valSerde, String streamName)
{
String name = topology.newName(PRINTING_NAME);
streamName = (streamName == null) ? this.name : streamName;
- topology.addProcessor(name, new KeyValuePrinter<>(keySerde, valSerde,
streamName), this.name);
+ topology.addProcessor(name, new KeyValuePrinter<>(System.out,
keySerde, valSerde, streamName), this.name);
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValuePrinter.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValuePrinter.java
index e193e52ec34..e4b142d92eb 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValuePrinter.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValuePrinter.java
@@ -18,104 +18,63 @@
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.ProcessorSupplier;
import java.io.PrintStream;
+class KeyValuePrinter<K, V> extends KStreamPeek {
-class KeyValuePrinter<K, V> implements ProcessorSupplier<K, V> {
-
- private final PrintStream printStream;
+ private PrintStream printStream;
private Serde<?> keySerde;
private Serde<?> valueSerde;
private String streamName;
-
- KeyValuePrinter(PrintStream printStream, Serde<?> keySerde, Serde<?>
valueSerde, String streamName) {
+ public KeyValuePrinter(final PrintStream printStream, final Serde<?>
keySerde, final Serde<?> valueSerde, final String streamName) {
+ super(null);
+ this.printStream = printStream;
this.keySerde = keySerde;
this.valueSerde = valueSerde;
this.streamName = streamName;
- if (printStream == null) {
- this.printStream = System.out;
- } else {
- this.printStream = printStream;
- }
- }
-
- KeyValuePrinter(PrintStream printStream, String streamName) {
- this(printStream, null, null, streamName);
- }
-
- KeyValuePrinter(Serde<?> keySerde, Serde<?> valueSerde, String streamName)
{
- this(System.out, keySerde, valueSerde, streamName);
}
@Override
public Processor<K, V> get() {
- return new KeyValuePrinterProcessor(this.printStream, this.keySerde,
this.valueSerde, this.streamName);
+ return new KStreamPeekProcessor();
}
-
- private class KeyValuePrinterProcessor extends AbstractProcessor<K, V> {
- private final PrintStream printStream;
- private Serde<?> keySerde;
- private Serde<?> valueSerde;
- private ProcessorContext processorContext;
- private String streamName;
-
- private KeyValuePrinterProcessor(PrintStream printStream, Serde<?>
keySerde, Serde<?> valueSerde, String streamName) {
- this.printStream = printStream;
- this.keySerde = keySerde;
- this.valueSerde = valueSerde;
- this.streamName = streamName;
- }
+ private class KStreamPeekProcessor extends AbstractProcessor<K, V> {
+ ForeachAction<K, V> action = printAction(context(), printStream,
keySerde, valueSerde, streamName);
@Override
- public void init(ProcessorContext context) {
- this.processorContext = context;
-
- if (this.keySerde == null) {
- keySerde = this.processorContext.keySerde();
- }
-
- if (this.valueSerde == null) {
- valueSerde = this.processorContext.valueSerde();
- }
- }
-
- @Override
- public void process(K key, V value) {
- K keyToPrint = (K) maybeDeserialize(key, keySerde.deserializer());
- V valueToPrint = (V) maybeDeserialize(value,
valueSerde.deserializer());
-
- printStream.println("[" + this.streamName + "]: " + keyToPrint + "
, " + valueToPrint);
-
- this.processorContext.forward(key, value);
+ public void process(final K key, final V value) {
+ action.apply(key, value);
+ context().forward(key, value);
}
+ }
-
- private Object maybeDeserialize(Object receivedElement,
Deserializer<?> deserializer) {
- if (receivedElement == null) {
- return null;
+ private static <K, V> ForeachAction<K, V> printAction(final
ProcessorContext context, final PrintStream printStream, final Serde<?>
keySerde, final Serde<?> valueSerde, final String streamName) {
+ return new ForeachAction<K, V>() {
+ @Override
+ public void apply(final K key, final V value) {
+ K keyToPrint = (K) maybeDeserialize(key,
keySerde.deserializer());
+ V valueToPrint = (V) maybeDeserialize(value,
valueSerde.deserializer());
+ printStream.println("[" + streamName + "]: " + keyToPrint + "
, " + valueToPrint);
}
- if (receivedElement instanceof byte[]) {
- return deserializer.deserialize(this.processorContext.topic(),
(byte[]) receivedElement);
- }
+ private Object maybeDeserialize(Object receivedElement,
Deserializer<?> deserializer) {
+ if (receivedElement == null) {
+ return null;
+ }
- return receivedElement;
- }
+ if (receivedElement instanceof byte[]) {
+ return deserializer.deserialize(context.topic(), (byte[])
receivedElement);
+ }
- @Override
- public void close() {
- if (this.printStream == System.out) {
- this.printStream.flush();
- } else {
- this.printStream.close();
+ return receivedElement;
}
- }
+ };
}
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Exploit #peek to implement #print() and other methods
> -----------------------------------------------------
>
> Key: KAFKA-4772
> URL: https://issues.apache.org/jira/browse/KAFKA-4772
> Project: Kafka
> Issue Type: Improvement
> Components: streams
> Reporter: Matthias J. Sax
> Assignee: james chien
> Priority: Minor
> Labels: beginner, newbie
> Fix For: 0.11.0.0
>
>
> From: https://github.com/apache/kafka/pull/2493#pullrequestreview-22157555
> Things that I can think of:
> - print / writeAsTest can be a special impl of peek; KStreamPrint etc can be
> removed.
> - consider collapse KStreamPeek with KStreamForeach with a flag parameter
> indicating if the acted key-value pair should still be forwarded.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)