[
https://issues.apache.org/jira/browse/KAFKA-4772?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16344493#comment-16344493
]
ASF GitHub Bot commented on KAFKA-4772:
---------------------------------------
guozhangwang closed pull request #2703: KAFKA-4772: Use peek to implement print
URL: https://github.com/apache/kafka/pull/2703
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 79abbb558eb..68592b076f9 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -192,10 +192,9 @@ public void print(Serde<K> keySerde, Serde<V> valSerde) {
public void print(Serde<K> keySerde, Serde<V> valSerde, String streamName)
{
String name = topology.newName(PRINTING_NAME);
streamName = (streamName == null) ? this.name : streamName;
- topology.addProcessor(name, new KeyValuePrinter<>(keySerde, valSerde,
streamName), this.name);
+ peek(new PrintAction<>(System.out, keySerde, valSerde, streamName),
name);
}
-
@Override
public void writeAsText(String filePath) {
writeAsText(filePath, null, null, null);
@@ -226,7 +225,8 @@ public void writeAsText(String filePath, String streamName,
Serde<K> keySerde, S
try {
PrintStream printStream = new PrintStream(new
FileOutputStream(filePath));
- topology.addProcessor(name, new KeyValuePrinter<>(printStream,
keySerde, valSerde, streamName), this.name);
+ peek(new PrintAction<>(printStream, keySerde, valSerde,
streamName), name);
+
} catch (FileNotFoundException e) {
String message = "Unable to write stream to file at [" + filePath
+ "] " + e.getMessage();
@@ -320,11 +320,12 @@ public void foreach(ForeachAction<? super K, ? super V>
action) {
@Override
public KStream<K, V> peek(final ForeachAction<? super K, ? super V>
action) {
- Objects.requireNonNull(action, "action can't be null");
- final String name = topology.newName(PEEK_NAME);
+ return peek(action, topology.newName(PEEK_NAME));
+ }
+ private KStream<K, V> peek(final ForeachAction<? super K, ? super V>
action, final String name) {
+ Objects.requireNonNull(action, "action can't be null");
topology.addProcessor(name, new KStreamPeek<>(action), this.name);
-
return new KStreamImpl<>(topology, name, sourceNodes,
repartitionRequired);
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 3f801436dd1..a3433827fd1 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -140,7 +140,7 @@ public void print(Serde<K> keySerde, Serde<V> valSerde) {
public void print(Serde<K> keySerde, Serde<V> valSerde, String streamName)
{
String name = topology.newName(PRINTING_NAME);
streamName = (streamName == null) ? this.name : streamName;
- topology.addProcessor(name, new KeyValuePrinter<>(keySerde, valSerde,
streamName), this.name);
+ topology.addProcessor(name, new KStreamPeek<>(new
PrintAction<>(System.out, keySerde, valSerde, streamName)), this.name);
}
@Override
@@ -171,7 +171,7 @@ public void writeAsText(String filePath, String streamName,
Serde<K> keySerde, S
streamName = (streamName == null) ? this.name : streamName;
try {
PrintStream printStream = new PrintStream(new
FileOutputStream(filePath));
- topology.addProcessor(name, new KeyValuePrinter<>(printStream,
keySerde, valSerde, streamName), this.name);
+ topology.addProcessor(name, new KStreamPeek<>(new
PrintAction<>(printStream, keySerde, valSerde, streamName)), this.name);
} catch (FileNotFoundException e) {
String message = "Unable to write stream to file at [" + filePath
+ "] " + e.getMessage();
throw new TopologyBuilderException(message);
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValuePrinter.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValuePrinter.java
deleted file mode 100644
index e193e52ec34..00000000000
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValuePrinter.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.streams.processor.AbstractProcessor;
-import org.apache.kafka.streams.processor.Processor;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.ProcessorSupplier;
-
-import java.io.PrintStream;
-
-
-class KeyValuePrinter<K, V> implements ProcessorSupplier<K, V> {
-
- private final PrintStream printStream;
- private Serde<?> keySerde;
- private Serde<?> valueSerde;
- private String streamName;
-
-
- KeyValuePrinter(PrintStream printStream, Serde<?> keySerde, Serde<?>
valueSerde, String streamName) {
- this.keySerde = keySerde;
- this.valueSerde = valueSerde;
- this.streamName = streamName;
- if (printStream == null) {
- this.printStream = System.out;
- } else {
- this.printStream = printStream;
- }
- }
-
- KeyValuePrinter(PrintStream printStream, String streamName) {
- this(printStream, null, null, streamName);
- }
-
- KeyValuePrinter(Serde<?> keySerde, Serde<?> valueSerde, String streamName)
{
- this(System.out, keySerde, valueSerde, streamName);
- }
-
- @Override
- public Processor<K, V> get() {
- return new KeyValuePrinterProcessor(this.printStream, this.keySerde,
this.valueSerde, this.streamName);
- }
-
-
- private class KeyValuePrinterProcessor extends AbstractProcessor<K, V> {
- private final PrintStream printStream;
- private Serde<?> keySerde;
- private Serde<?> valueSerde;
- private ProcessorContext processorContext;
- private String streamName;
-
- private KeyValuePrinterProcessor(PrintStream printStream, Serde<?>
keySerde, Serde<?> valueSerde, String streamName) {
- this.printStream = printStream;
- this.keySerde = keySerde;
- this.valueSerde = valueSerde;
- this.streamName = streamName;
- }
-
- @Override
- public void init(ProcessorContext context) {
- this.processorContext = context;
-
- if (this.keySerde == null) {
- keySerde = this.processorContext.keySerde();
- }
-
- if (this.valueSerde == null) {
- valueSerde = this.processorContext.valueSerde();
- }
- }
-
- @Override
- public void process(K key, V value) {
- K keyToPrint = (K) maybeDeserialize(key, keySerde.deserializer());
- V valueToPrint = (V) maybeDeserialize(value,
valueSerde.deserializer());
-
- printStream.println("[" + this.streamName + "]: " + keyToPrint + "
, " + valueToPrint);
-
- this.processorContext.forward(key, value);
- }
-
-
- private Object maybeDeserialize(Object receivedElement,
Deserializer<?> deserializer) {
- if (receivedElement == null) {
- return null;
- }
-
- if (receivedElement instanceof byte[]) {
- return deserializer.deserialize(this.processorContext.topic(),
(byte[]) receivedElement);
- }
-
- return receivedElement;
- }
-
- @Override
- public void close() {
- if (this.printStream == System.out) {
- this.printStream.flush();
- } else {
- this.printStream.close();
- }
- }
- }
-}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/PrintAction.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/PrintAction.java
new file mode 100644
index 00000000000..e03d6e40cf0
--- /dev/null
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/PrintAction.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.streams.kstream.ForeachAction;
+
+import java.io.PrintStream;
+
+final class PrintAction<K, V> implements ForeachAction<K, V> {
+
+ private PrintStream printStream;
+ private Serde<?> keySerde;
+ private Serde<?> valueSerde;
+ private String streamName;
+
+ PrintAction(final PrintStream printStream, final Serde<?> keySerde, final
Serde<?> valueSerde, final String streamName){
+ this.printStream = printStream;
+ this.keySerde = keySerde;
+ this.valueSerde = valueSerde;
+ this.streamName = streamName;
+ }
+
+ @Override
+ public void apply(final K key, final V value) {
+ @SuppressWarnings("unchecked")
+ K keyToPrint = (K) maybeDeserialize(key, keySerde.deserializer());
+ @SuppressWarnings("unchecked")
+ V valueToPrint = (V) maybeDeserialize(value,
valueSerde.deserializer());
+ printStream.println("[" + streamName + "]: " + keyToPrint + " , " +
valueToPrint);
+ }
+
+ private Object maybeDeserialize(Object receivedElement, Deserializer<?>
deserializer) {
+ if (receivedElement == null) {
+ return null;
+ }
+
+ if (receivedElement instanceof byte[]) {
+ return deserializer.deserialize(null, (byte[]) receivedElement);
+ }
+
+ return receivedElement;
+ }
+
+}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KeyValuePrinterProcessorTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KeyValuePrinterProcessorTest.java
deleted file mode 100644
index 6c0162c9dd1..00000000000
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KeyValuePrinterProcessorTest.java
+++ /dev/null
@@ -1,197 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.kstream.internals;
-
-
-import org.apache.kafka.common.errors.SerializationException;
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
-import org.apache.kafka.test.KStreamTestDriver;
-import org.junit.After;
-import org.junit.Test;
-
-import java.io.ByteArrayOutputStream;
-import java.io.PrintStream;
-import java.nio.charset.Charset;
-import java.util.Map;
-
-import static org.junit.Assert.assertEquals;
-
-public class KeyValuePrinterProcessorTest {
-
- private final String topicName = "topic";
- private final Serde<String> stringSerde = Serdes.String();
- private final ByteArrayOutputStream baos = new ByteArrayOutputStream();
- private final KStreamBuilder builder = new KStreamBuilder();
- private final PrintStream printStream = new PrintStream(baos);
-
- private KStreamTestDriver driver = null;
-
- @After
- public void cleanup() {
- if (driver != null) {
- driver.close();
- }
- driver = null;
- }
-
- @Test
- public void testPrintKeyValueDefaultSerde() throws Exception {
-
- KeyValuePrinter<String, String> keyValuePrinter = new
KeyValuePrinter<>(printStream, null);
- String[] suppliedKeys = {"foo", "bar", null};
- String[] suppliedValues = {"value1", "value2", "value3"};
- String[] expectedValues = {"[null]: foo , value1", "[null]: bar ,
value2", "[null]: null , value3"};
-
-
- KStream<String, String> stream = builder.stream(stringSerde,
stringSerde, topicName);
- stream.process(keyValuePrinter);
-
- driver = new KStreamTestDriver(builder);
- for (int i = 0; i < suppliedKeys.length; i++) {
- driver.process(topicName, suppliedKeys[i], suppliedValues[i]);
- }
-
- String[] capturedValues = new String(baos.toByteArray(),
Charset.forName("UTF-8")).split("\n");
-
- for (int i = 0; i < capturedValues.length; i++) {
- assertEquals(capturedValues[i], expectedValues[i]);
- }
- }
-
- @Test
- public void testPrintKeyValuesWithName() throws Exception {
-
- KeyValuePrinter<String, String> keyValuePrinter = new
KeyValuePrinter<>(printStream, "test-stream");
- String[] suppliedKeys = {"foo", "bar", null};
- String[] suppliedValues = {"value1", "value2", "value3"};
- String[] expectedValues = {"[test-stream]: foo , value1",
"[test-stream]: bar , value2", "[test-stream]: null , value3"};
-
-
- KStream<String, String> stream = builder.stream(stringSerde,
stringSerde, topicName);
- stream.process(keyValuePrinter);
-
- driver = new KStreamTestDriver(builder);
- for (int i = 0; i < suppliedKeys.length; i++) {
- driver.process(topicName, suppliedKeys[i], suppliedValues[i]);
- }
-
- String[] capturedValues = new String(baos.toByteArray(),
Charset.forName("UTF-8")).split("\n");
-
- for (int i = 0; i < capturedValues.length; i++) {
- assertEquals(capturedValues[i], expectedValues[i]);
- }
- }
-
-
- @Test
- public void testPrintKeyValueWithProvidedSerde() throws Exception {
-
- Serde<MockObject> mockObjectSerde = Serdes.serdeFrom(new
MockSerializer(), new MockDeserializer());
- KeyValuePrinter<String, MockObject> keyValuePrinter = new
KeyValuePrinter<>(printStream, stringSerde, mockObjectSerde, null);
- KStream<String, MockObject> stream = builder.stream(stringSerde,
mockObjectSerde, topicName);
-
- stream.process(keyValuePrinter);
-
- driver = new KStreamTestDriver(builder);
-
- String suppliedKey = null;
- byte[] suppliedValue = "{\"name\":\"print\",
\"label\":\"test\"}".getBytes(Charset.forName("UTF-8"));
-
- driver.process(topicName, suppliedKey, suppliedValue);
- String expectedPrintedValue = "[null]: null , name:print label:test";
- String capturedValue = new String(baos.toByteArray(),
Charset.forName("UTF-8")).trim();
-
- assertEquals(capturedValue, expectedPrintedValue);
-
- }
-
- private static class MockObject {
- public String name;
- public String label;
-
- public MockObject() {
- }
-
- MockObject(String name, String label) {
- this.name = name;
- this.label = label;
- }
-
- @Override
- public String toString() {
- return "name:" + name + " label:" + label;
- }
- }
-
-
- private static class MockDeserializer implements Deserializer<MockObject> {
-
- private com.fasterxml.jackson.databind.ObjectMapper objectMapper = new
com.fasterxml.jackson.databind.ObjectMapper();
-
- @Override
- public void configure(Map<String, ?> configs, boolean isKey) {
-
- }
-
- @Override
- public MockObject deserialize(String topic, byte[] data) {
- MockObject mockObject;
- try {
- mockObject = objectMapper.readValue(data, MockObject.class);
- } catch (Exception e) {
- throw new SerializationException(e);
- }
- return mockObject;
- }
-
- @Override
- public void close() {
-
- }
- }
-
-
- private static class MockSerializer implements Serializer<MockObject> {
- private final com.fasterxml.jackson.databind.ObjectMapper objectMapper
= new com.fasterxml.jackson.databind.ObjectMapper();
-
- @Override
- public void configure(Map<String, ?> configs, boolean isKey) {
-
- }
-
- @Override
- public byte[] serialize(String topic, MockObject data) {
- try {
- return objectMapper.writeValueAsBytes(data);
- } catch (Exception e) {
- throw new SerializationException("Error serializing JSON
message", e);
- }
- }
-
- @Override
- public void close() {
-
- }
- }
-
-
-}
\ No newline at end of file
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Exploit #peek to implement #print() and other methods
> -----------------------------------------------------
>
> Key: KAFKA-4772
> URL: https://issues.apache.org/jira/browse/KAFKA-4772
> Project: Kafka
> Issue Type: Improvement
> Components: streams
> Reporter: Matthias J. Sax
> Assignee: james chien
> Priority: Minor
> Labels: beginner, newbie
> Fix For: 0.11.0.0
>
>
> From: https://github.com/apache/kafka/pull/2493#pullrequestreview-22157555
> Things that I can think of:
> - print / writeAsTest can be a special impl of peek; KStreamPrint etc can be
> removed.
> - consider collapse KStreamPeek with KStreamForeach with a flag parameter
> indicating if the acted key-value pair should still be forwarded.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)