[
https://issues.apache.org/jira/browse/KAFKA-3338?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15220957#comment-15220957
]
Bill Bejeck commented on KAFKA-3338:
------------------------------------
I'm wondering if we need to provide a Serde for the print/writeAsText
functionality as Serdes are only required when reading from/writing
to a topic or when using a state store.
To give you an idea of where I'm coming from, here's the integration test for
the implementation of print()
{code}
KStream<String,StockTransaction> transactionKStream =
kStreamBuilder.stream(stringSerde,transactionSerde,"stocks");
transactionKStream.through(stringSerde, transactionSerde,"stocks-out").print()
.map((k,v)-> new KeyValue<>(v.getSymbol(),v)).print()
.aggregateByKey(StockTransactionCollector::new,
(k, v, stockTransactionCollector) ->
stockTransactionCollector.add(v),
TumblingWindows.of("stock-summaries").with(10000),
stringSerde,collectorSerde).print();
.to(windowedSerde,collectorSerde,"transaction-summary");
{code}
This allows me to see all elements from each stream (in order) on stdout.
Thoughts?
> Add print and writeAsText functions to the Streams DSL
> ------------------------------------------------------
>
> Key: KAFKA-3338
> URL: https://issues.apache.org/jira/browse/KAFKA-3338
> Project: Kafka
> Issue Type: Sub-task
> Reporter: Guozhang Wang
> Assignee: Bill Bejeck
> Labels: newbie++
> Fix For: 0.10.1.0
>
>
> We want to provide some REPL-like pattern for users for better debuggability.
> More concretely, we want to allow users to easily inspect their intermediate
> data streams in the topology while running the application. Theoretically
> this can be done by using a break point, or by calling System.out.println()
> inside the operator, or through a finer grained trace-level logging. But more
> user-friendly API would be to add a print() function to the KStream / KTable
> object like:
> {code}
> // Prints the elements in this stream to the stdout, i.e. "System.out" of the
> JVM
> KStream#print(/* optional serde */);
> KTable#print(/* optional serde */);
> // Writes the stream as text file(s) to the specified location.
> KStream#writeAsText(String filePath, /* optional serde */);
> KTable#writeAsText(String filePath, /* optional serde */);
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)