[
https://issues.apache.org/jira/browse/FLINK-6539?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16011791#comment-16011791
]
ASF GitHub Bot commented on FLINK-6539:
---------------------------------------
Github user tzulitai commented on a diff in the pull request:
https://github.com/apache/flink/pull/3911#discussion_r116659709
--- Diff:
flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/Kafka010Example.java
---
@@ -17,49 +17,74 @@
package org.apache.flink.streaming.examples.kafka;
+import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
/**
- * Read Strings from Kafka and print them to standard out.
- * Note: On a cluster, DataStream.print() will print to the TaskManager's
.out file!
- *
- * Please pass the following arguments to run the example:
- * --topic test --bootstrap.servers localhost:9092
--zookeeper.connect localhost:2181 --group.id myconsumer
+ * An example that shows how to read from and write to Kafka. This will
read String messages
+ * from the input topic, prefix them by a configured prefix and output to
the output topic.
*
+ * <p>Example usage:
+ * --input-topic test-input --output-topic test-outpu
--bootstrap.servers localhost:9092 --zookeeper.connect localhost:2181
--group.id myconsumer
*/
-public class ReadFromKafka {
+public class Kafka010Example {
public static void main(String[] args) throws Exception {
// parse input arguments
final ParameterTool parameterTool =
ParameterTool.fromArgs(args);
- if(parameterTool.getNumberOfParameters() < 4) {
- System.out.println("Missing parameters!\nUsage: Kafka
--topic <topic> " +
- "--bootstrap.servers <kafka brokers>
--zookeeper.connect <zk quorum> --group.id <some id>");
+ if (parameterTool.getNumberOfParameters() < 5) {
+ System.out.println("Missing parameters!\n" +
+ "Usage: Kafka --input-topic <topic>
--output-topic <topic> " +
+ "--bootstrap.servers <kafka brokers> " +
+ "--zookeeper.connect <zk quorum>
--group.id <some id> [--prefix <prefix>]");
return;
}
+ String prefix = parameterTool.get("prefix", "PREFIX:");
+
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().disableSysoutLogging();
env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4,
10000));
- env.enableCheckpointing(5000); // create a checkpoint every 5
seconds
- env.getConfig().setGlobalJobParameters(parameterTool); // make
parameters available in the web interface
+// env.enableCheckpointing(5000); // create a checkpoint every 5
seconds
--- End diff --
By the way, was there any reason to not enable checkpointing in the e2e
tests?
> Add automated end-to-end tests
> ------------------------------
>
> Key: FLINK-6539
> URL: https://issues.apache.org/jira/browse/FLINK-6539
> Project: Flink
> Issue Type: Improvement
> Components: Tests
> Reporter: Aljoscha Krettek
> Assignee: Aljoscha Krettek
>
> We should add simple tests that exercise all the paths that a user would use
> when starting a cluster and submitting a program. Preferably with a simple
> batch program and a streaming program that uses Kafka.
> This would have catched some of the bugs that we now discovered right before
> the release.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)