Repository: apex-malhar Updated Branches: refs/heads/master 10dd94ef5 -> b5c003c94
APEXMALHAR-2455 Create example for Kafka 0.9 API exactly-once output Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/b5c003c9 Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/b5c003c9 Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/b5c003c9 Branch: refs/heads/master Commit: b5c003c94d19b1308f62dc91515e82a660024264 Parents: 10dd94e Author: Oliver Winke <[email protected]> Authored: Mon Apr 24 16:58:54 2017 -0700 Committer: Oliver Winke <[email protected]> Committed: Thu May 4 14:25:15 2017 -0700 ---------------------------------------------------------------------- examples/kafka/README.md | 75 +++++++++ examples/kafka/logicalDAGKafkaExactlyOnce.png | Bin 0 -> 87679 bytes examples/kafka/pom.xml | 1 + .../kafka/exactlyonceoutput/Application.java | 94 +++++++++++ .../BatchSequenceGenerator.java | 105 +++++++++++++ .../PassthroughFailOperator.java | 155 +++++++++++++++++++ .../exactlyonceoutput/ValidationToFile.java | 132 ++++++++++++++++ .../properties-KafkaExactlyOnceOutput.xml | 114 ++++++++++++++ .../exactlyonceoutput/ApplicationTest.java | 150 ++++++++++++++++++ 9 files changed, 826 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b5c003c9/examples/kafka/README.md ---------------------------------------------------------------------- diff --git a/examples/kafka/README.md b/examples/kafka/README.md index 1a7a9c4..5454edc 100644 --- a/examples/kafka/README.md +++ b/examples/kafka/README.md @@ -14,3 +14,78 @@ them out to a Kafka topic. Each line of the input file is considered a separate message. The topic name, the name of the directory that is monitored for input files, and other parameters are configurable in `META_INF/properties-hdfs2kafka.xml`. +## Kafka exactly-once output example (Kafka 0.9 API) + +This application verifies exactly-once semantics by writing a defined sequence of input data to two Kafka +output operators -- one that guarantees those semantics and one that does not, +each writing to a different topic. It deliberately causes the intermediate pass-through +operator to fail causing it to be restarted and some tuples to be reprocessed. +Then a KafkaInputOperator reads tuples from both topics to verify that the former topic has no duplicates +but the latter does and writes a single line to a HDFS file with the verification results +of the following form: + + Duplicates: exactly-once: 0, at-least-once: 5 + +NOTE: KafkaInputOperator guarantees at-least-once semantics; in most scenarios +it also yields exactly-once results, though in rare corner cases duplicate processing +may occur. When this happens validation in this example will output wrong results. + +**DAG of this application:** + + + +Plain text representation of DAG: + + sequenceGenerator --> passthrough ==> {kafkaExactlyOnceOutputOperator, kafkaOutputOperator(at-least-once)} + + {kafkaTopicExactly, kafkaTopicAtLeast} --> validationToFile + + +**Running the Application** + +***Run Test*** + +The application can be run in local mode which will write the validation file + to target/validation.txt + + +***Run on Cluster*** + +To run the application on a cluster a running Kafka service is needed. + A local Kafka single-node instance can easily be deployed + (see [kafka.apache.org/quickstart](https://kafka.apache.org/quickstart)). + +By default Kafka creates topics automatically when a message + to a non-existing topic arrives. If disabled manually creation of the two + topics is needed: +```shell +bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic exactly-once +bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic at-least-once +``` + +Kafka topics should be cleared/deleted after every run in order for + validation to work correctly + +Enable topic deletion in Kafka's server.properties file: +``` +delete.topic.enable=true +``` + +Delete topics: +```shell +bin/kafka-topics --zookeeper localhost:2181 --delete --topic exactly-once +bin/kafka-topics --zookeeper localhost:2181 --delete --topic at-least-once +``` + +Check if deletion was successful: +```shell +kafka-topics --list --zookeeper localhost:2181 +``` + +****properties:**** + +By default the Kafka broker is set to 'localhost:9092'. To set a different broker + address change the value in properties.xml as well as in Application.java + The directory for the validation file and the number of tuples to be generated + can also be changed in properties.xml + http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b5c003c9/examples/kafka/logicalDAGKafkaExactlyOnce.png ---------------------------------------------------------------------- diff --git a/examples/kafka/logicalDAGKafkaExactlyOnce.png b/examples/kafka/logicalDAGKafkaExactlyOnce.png new file mode 100644 index 0000000..0cc8004 Binary files /dev/null and b/examples/kafka/logicalDAGKafkaExactlyOnce.png differ http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b5c003c9/examples/kafka/pom.xml ---------------------------------------------------------------------- diff --git a/examples/kafka/pom.xml b/examples/kafka/pom.xml index 497a39e..7e607e3 100644 --- a/examples/kafka/pom.xml +++ b/examples/kafka/pom.xml @@ -36,6 +36,7 @@ kafka2hdfs is a example show how to read lines from a Kafka topic using the new (0.9) Kafka input operator and write them out to HDFS. hdfs2kafka is a simple application to transfer data from HDFS to Kafka + KafkaExactlyOnceOutput example demonstrates exactly once behavior using KafkaSinglePortExactlyOnceOutputOperator (0.9) </description> <dependencies> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b5c003c9/examples/kafka/src/main/java/org/apache/apex/examples/kafka/exactlyonceoutput/Application.java ---------------------------------------------------------------------- diff --git a/examples/kafka/src/main/java/org/apache/apex/examples/kafka/exactlyonceoutput/Application.java b/examples/kafka/src/main/java/org/apache/apex/examples/kafka/exactlyonceoutput/Application.java new file mode 100644 index 0000000..da03645 --- /dev/null +++ b/examples/kafka/src/main/java/org/apache/apex/examples/kafka/exactlyonceoutput/Application.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.kafka.exactlyonceoutput; + +import org.apache.apex.malhar.kafka.KafkaSinglePortExactlyOnceOutputOperator; +import org.apache.apex.malhar.kafka.KafkaSinglePortInputOperator; +import org.apache.apex.malhar.kafka.KafkaSinglePortOutputOperator; +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.DAG; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.api.annotation.ApplicationAnnotation; + +/** + * Kafka exactly-once example (Kafka 0.9 API) + * + * This application verifies exactly-once semantics by writing a defined sequence of input data to two Kafka + * output operators -- one that guarantees those semantics (using KafkaSingleExactlyOnceOutputOperator) and one that does not, + * each writing to a different topic. It deliberately causes the intermediate pass-through + * operator to fail causing it to be restarted and some tuples to be reprocessed. + * Then a KafkaInputOperator reads tuples from both topics to verify that the former topic has no duplicates + * but the latter does and writes a single line to a HDFS file with the verification results + * of the following form: + * + * Duplicates: exactly-once: 0, at-least-once: 5 + * + * Operators: + * + * **BatchSequenceGenerator:** + * Generates a sequence of numbers starting going from 1 to maxTuplesTotal which can be set in the properties. + * + * **PassthroughFailOperator:** + * This operator kills itself after a defined number of processed tuples by intentionally throwing an exception. + * STRAM will redeploy the operator on a new container. The exception only needs to be thrown once, so a file is + * written to HDFS just before throwing the exception and its presence is checked after restart to determine + * if the exception was already thrown. + * + * **KafkaSinglePortExactlyOnceOutputOperator:** + * Topic, bootstrap.servers, serializer and deserializer are set in properties.xml. + * The topic names should not be changed for this application. + * + * **ValidationToFile:** + * Puts values of input into list depending on topic. If value of maxTuplesTotal is reached it will calculate duplicates + * and write validation output to HDFS. (output line will be printed in container dt.log as well). + */ + +@ApplicationAnnotation(name = "KafkaExactlyOnceOutput") +public class Application implements StreamingApplication +{ + + @Override + public void populateDAG(DAG dag, Configuration conf) + { + + BatchSequenceGenerator sequenceGenerator = dag.addOperator("sequenceGenerator", BatchSequenceGenerator.class); + PassthroughFailOperator passthroughFailOperator = dag.addOperator("passthrough", PassthroughFailOperator.class); + KafkaSinglePortExactlyOnceOutputOperator<String> kafkaExactlyOnceOutputOperator = + dag.addOperator("kafkaExactlyOnceOutputOperator", KafkaSinglePortExactlyOnceOutputOperator.class); + KafkaSinglePortOutputOperator kafkaOutputOperator = + dag.addOperator("kafkaOutputOperator", KafkaSinglePortOutputOperator.class); + + dag.addStream("sequenceToPassthrough", sequenceGenerator.out, passthroughFailOperator.input); + dag.addStream("linesToKafka", passthroughFailOperator.output, kafkaOutputOperator.inputPort, + kafkaExactlyOnceOutputOperator.inputPort); + + KafkaSinglePortInputOperator kafkaInputTopicExactly = dag.addOperator("kafkaTopicExactly", KafkaSinglePortInputOperator.class); + kafkaInputTopicExactly.setInitialOffset(KafkaSinglePortInputOperator.InitialOffset.EARLIEST.name()); + + KafkaSinglePortInputOperator kafkaInputTopicAtLeast = dag.addOperator("kafkaTopicAtLeast", KafkaSinglePortInputOperator.class); + kafkaInputTopicAtLeast.setInitialOffset(KafkaSinglePortInputOperator.InitialOffset.EARLIEST.name()); + + ValidationToFile validationToFile = dag.addOperator("validationToFile", ValidationToFile.class); + + dag.addStream("messagesFromExactly", kafkaInputTopicExactly.outputPort, validationToFile.topicExactlyInput); + dag.addStream("messagesFromAtLeast", kafkaInputTopicAtLeast.outputPort, validationToFile.topicAtLeastInput); + + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b5c003c9/examples/kafka/src/main/java/org/apache/apex/examples/kafka/exactlyonceoutput/BatchSequenceGenerator.java ---------------------------------------------------------------------- diff --git a/examples/kafka/src/main/java/org/apache/apex/examples/kafka/exactlyonceoutput/BatchSequenceGenerator.java b/examples/kafka/src/main/java/org/apache/apex/examples/kafka/exactlyonceoutput/BatchSequenceGenerator.java new file mode 100644 index 0000000..1654434 --- /dev/null +++ b/examples/kafka/src/main/java/org/apache/apex/examples/kafka/exactlyonceoutput/BatchSequenceGenerator.java @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.kafka.exactlyonceoutput; + +import javax.validation.constraints.Min; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.InputOperator; +import com.datatorrent.common.util.BaseOperator; + +/** + * Simple operator that emits Strings from 1 to maxTuplesTotal + */ +public class BatchSequenceGenerator extends BaseOperator implements InputOperator +{ + private static final Logger LOG = LoggerFactory.getLogger(BatchSequenceGenerator.class); + + // properties + + @Min(1) + private int maxTuplesTotal; // max number of tuples in total + @Min(1) + private int maxTuples; // max number of tuples per window + + private int sleepTime; + + private int numTuplesTotal = 0; + + //start with empty windows to ensure tests run reliable + private int emptyWindowsCount = 0; + + // transient fields + + private transient int numTuples = 0; // number emitted in current window + + public final transient DefaultOutputPort<String> out = new DefaultOutputPort<>(); + + @Override + public void beginWindow(long windowId) + { + numTuples = 0; + super.beginWindow(windowId); + LOG.debug("beginWindow: " + windowId); + ++emptyWindowsCount; + } + + @Override + public void emitTuples() + { + + if (numTuplesTotal < maxTuplesTotal && numTuples < maxTuples && emptyWindowsCount > 3) { + ++numTuplesTotal; + ++numTuples; + out.emit(String.valueOf(numTuplesTotal)); + LOG.debug("Line emitted: " + numTuplesTotal); + + try { + // avoid repeated calls to this function + Thread.sleep(sleepTime); + } catch (InterruptedException e) { + LOG.info("Sleep interrupted"); + } + } + + } + + public int getMaxTuples() + { + return maxTuples; + } + + public void setMaxTuples(int v) + { + maxTuples = v; + } + + public int getMaxTuplesTotal() + { + return maxTuplesTotal; + } + + public void setMaxTuplesTotal(int v) + { + maxTuplesTotal = v; + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b5c003c9/examples/kafka/src/main/java/org/apache/apex/examples/kafka/exactlyonceoutput/PassthroughFailOperator.java ---------------------------------------------------------------------- diff --git a/examples/kafka/src/main/java/org/apache/apex/examples/kafka/exactlyonceoutput/PassthroughFailOperator.java b/examples/kafka/src/main/java/org/apache/apex/examples/kafka/exactlyonceoutput/PassthroughFailOperator.java new file mode 100644 index 0000000..487c773 --- /dev/null +++ b/examples/kafka/src/main/java/org/apache/apex/examples/kafka/exactlyonceoutput/PassthroughFailOperator.java @@ -0,0 +1,155 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.kafka.exactlyonceoutput; + +import java.io.IOException; + +import javax.validation.constraints.NotNull; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.common.util.BaseOperator; + +/** + * To produce an exactly-once scenario the PassthroughFailOperator kills itself after a certain number + * of processed lines by throwing an exception. YARN will deploy the Operator in a new container, + * hence not checkpointed tuples will be passed to the OutputOperators more than once. + */ +public class PassthroughFailOperator extends BaseOperator +{ + private static final Logger LOG = LoggerFactory.getLogger(PassthroughFailOperator.class); + private boolean killed; + + @NotNull + private int tuplesUntilKill; + + //start with empty windows to ensure tests run reliable + private int emptyWindowsCount = 0; + + @NotNull + private String directoryPath; + + private String filePath; + private transient FileSystem hdfs; + private transient Path filePathObj; + + public final transient DefaultOutputPort<String> output = new DefaultOutputPort<>(); + + /** + * Loads file from HDFS and sets {@link #killed} flag if it already exists + * + * @param context + */ + @Override + public void setup(Context.OperatorContext context) + { + super.setup(context); + String appId = context.getValue(Context.DAGContext.APPLICATION_ID); + filePath = directoryPath + "/" + appId; + + LOG.info("FilePath: " + filePath); + filePathObj = new Path(filePath); + try { + hdfs = FileSystem.newInstance(filePathObj.toUri(), new Configuration()); + } catch (IOException e) { + e.printStackTrace(); + } + + try { + if (hdfs.exists(filePathObj)) { + killed = true; + LOG.info("file already exists -> Operator has been killed before"); + } + } catch (IOException e) { + e.printStackTrace(); + } + } + + @Override + public void beginWindow(long windowId) + { + super.beginWindow(windowId); + LOG.debug("WindowId: " + windowId); + ++emptyWindowsCount; + } + + public final transient DefaultInputPort<String> input = new DefaultInputPort<String>() + { + /** + * Creates file on HDFS identified by ApplicationId to save killed state, if operator has not been killed yet. + * Throws Exception to kill operator. + * + * @param line + */ + @Override + public void process(String line) + { + if (emptyWindowsCount > 3) { + LOG.debug("LINE " + line); + if (killed) { + output.emit(line); + } else if (tuplesUntilKill > 0) { + output.emit(line); + tuplesUntilKill--; + } else { + try { + hdfs.createNewFile(filePathObj); + LOG.info("Created file " + filePath); + } catch (IOException e) { + e.printStackTrace(); + } + //kill operator + LOG.info("Operator intentionally killed through exception"); + RuntimeException e = new RuntimeException("Exception to intentionally kill operator"); + throw e; + } + } + } + }; + + public String getDirectoryPath() + { + return directoryPath; + } + + public void setDirectoryPath(String directoryPath) + { + this.directoryPath = directoryPath; + } + + public int getTuplesUntilKill() + { + return tuplesUntilKill; + } + + public void setTuplesUntilKill(int tuplesUntilKill) + { + this.tuplesUntilKill = tuplesUntilKill; + } + +} + http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b5c003c9/examples/kafka/src/main/java/org/apache/apex/examples/kafka/exactlyonceoutput/ValidationToFile.java ---------------------------------------------------------------------- diff --git a/examples/kafka/src/main/java/org/apache/apex/examples/kafka/exactlyonceoutput/ValidationToFile.java b/examples/kafka/src/main/java/org/apache/apex/examples/kafka/exactlyonceoutput/ValidationToFile.java new file mode 100644 index 0000000..0dd7818 --- /dev/null +++ b/examples/kafka/src/main/java/org/apache/apex/examples/kafka/exactlyonceoutput/ValidationToFile.java @@ -0,0 +1,132 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.kafka.exactlyonceoutput; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import javax.validation.constraints.NotNull; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.StreamCodec; +import com.datatorrent.api.annotation.InputPortFieldAnnotation; +import com.datatorrent.lib.io.fs.AbstractSingleFileOutputOperator; + +public class ValidationToFile extends AbstractSingleFileOutputOperator<byte[]> +{ + private static final Logger LOG = LoggerFactory.getLogger(ValidationToFile.class); + + private String latestExactlyValue; + private String latestAtLeastValue; + //for tests + static boolean validationDone = false; + + @NotNull + private String maxTuplesTotal; + + List<String> exactlyList; + List<String> atLeastList; + + @Override + public void setup(Context.OperatorContext context) + { + super.setup(context); + exactlyList = new ArrayList<>(); + atLeastList = new ArrayList<>(); + } + + @InputPortFieldAnnotation(optional = true) + public final transient DefaultInputPort<byte[]> input = new DefaultInputPort<byte[]>() + { + @Override + public void process(byte[] tuple) {} + }; + + public final transient DefaultInputPort<byte[]> topicExactlyInput = new DefaultInputPort<byte[]>() + { + @Override + public void process(byte[] tuple) + { + String message = new String(tuple); + latestExactlyValue = message; + exactlyList.add(message); + processTuple(tuple); + } + + @Override + public StreamCodec<byte[]> getStreamCodec() + { + if (ValidationToFile.this.streamCodec == null) { + return super.getStreamCodec(); + } else { + return streamCodec; + } + } + }; + + public final transient DefaultInputPort<byte[]> topicAtLeastInput = new DefaultInputPort<byte[]>() + { + @Override + public void process(byte[] tuple) + { + String message = new String(tuple); + latestAtLeastValue = message; + atLeastList.add(message); + processTuple(tuple); + } + }; + + @Override + protected byte[] getBytesForTuple(byte[] tuple) + { + if (latestExactlyValue != null && latestAtLeastValue != null) { + if (latestExactlyValue.equals(maxTuplesTotal) && latestAtLeastValue.equals(maxTuplesTotal)) { + Set<String> exactlySet = new HashSet<>(exactlyList); + Set<String> atLeastSet = new HashSet<>(atLeastList); + + int numDuplicatesExactly = exactlyList.size() - exactlySet.size(); + int numDuplicatesAtLeast = atLeastList.size() - atLeastSet.size(); + LOG.info("Duplicates: exactly-once: " + numDuplicatesExactly + ", at-least-once: " + numDuplicatesAtLeast); + validationDone = true; + return ("Duplicates: exactly-once: " + numDuplicatesExactly + ", at-least-once: " + numDuplicatesAtLeast).getBytes(); + } else { + return new byte[0]; + } + } else { + return new byte[0]; + + } + } + + public String getMaxTuplesTotal() + { + return maxTuplesTotal; + } + + public void setMaxTuplesTotal(String maxTuplesTotal) + { + this.maxTuplesTotal = maxTuplesTotal; + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b5c003c9/examples/kafka/src/main/resources/META-INF/properties-KafkaExactlyOnceOutput.xml ---------------------------------------------------------------------- diff --git a/examples/kafka/src/main/resources/META-INF/properties-KafkaExactlyOnceOutput.xml b/examples/kafka/src/main/resources/META-INF/properties-KafkaExactlyOnceOutput.xml new file mode 100644 index 0000000..183d0c4 --- /dev/null +++ b/examples/kafka/src/main/resources/META-INF/properties-KafkaExactlyOnceOutput.xml @@ -0,0 +1,114 @@ +<?xml version="1.0"?> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> +<configuration> + + <property> + <name>dt.operator.*.attr.MEMORY_MB</name> + <value>512</value> + </property> + + <!-- set maxTuplesTotal for both BatchSequenceGenerator and ValidationToFile --> + <property> + <name>dt.operator.(sequenceGenerator)|(validationToFile).prop.maxTuplesTotal</name> + <value>20</value> + </property> + <!-- max tuples per window --> + <property> + <name>dt.operator.sequenceGenerator.prop.maxTuples</name> + <value>5</value> + </property> + <property> + <name>dt.operator.passthrough.prop.directoryPath</name> + <value>/tmp/kafka_exactly</value> + </property> + <property> + <name>dt.operator.passthrough.prop.tuplesUntilKill</name> + <value>5</value> + </property> + + <!-- Do not change any topics for this application--> + <!-- KafkaExactlyOnceOutputOperator --> + <property> + <name>dt.operator.kafkaExactlyOnceOutputOperator.prop.topic</name> + <value>exactly-once</value> + </property> + <property> + <name>dt.operator.kafkaExactlyOnceOutputOperator.prop.properties(bootstrap.servers)</name> + <value>localhost:9092</value> + </property> + <property> + <name>dt.operator.kafkaExactlyOnceOutputOperator.prop.properties(value.serializer)</name> + <value>org.apache.kafka.common.serialization.StringSerializer</value> + </property> + <property> + <name>dt.operator.kafkaExactlyOnceOutputOperator.prop.properties(value.deserializer)</name> + <value>org.apache.kafka.common.serialization.StringDeserializer</value> + </property> + + <!-- KafkaOutputOperator --> + <property> + <name>dt.operator.kafkaOutputOperator.prop.topic</name> + <value>at-least-once</value> + </property> + <property> + <name>dt.operator.kafkaOutputOperator.prop.properties(bootstrap.servers)</name> + <value>localhost:9092</value> + </property> + <property> + <name>dt.operator.kafkaOutputOperator.prop.properties(key.serializer)</name> + <value>org.apache.kafka.common.serialization.StringSerializer</value> + </property> + <property> + <name>dt.operator.kafkaOutputOperator.prop.properties(value.serializer)</name> + <value>org.apache.kafka.common.serialization.StringSerializer</value> + </property> + + <!-- Validation: kafka input operator (0.9) --> + <property> + <name>dt.operator.kafkaTopicExactly.prop.topics</name> + <value>exactly-once</value> + </property> + <property> + <name>dt.operator.kafkaTopicExactly.prop.clusters</name> + <value>localhost:9092</value> <!-- broker (NOT zookeeper) address --> +</property> + + <property> + <name>dt.operator.kafkaTopicAtLeast.prop.topics</name> + <value>at-least-once</value> + </property> + <property> + <name>dt.operator.kafkaTopicAtLeast.prop.clusters</name> + <value>localhost:9092</value> <!-- broker (NOT zookeeper) address --> + </property> + + <!-- ValidationToFile --> + <property> + <name>dt.operator.validationToFile.prop.outputFileName</name> + <value>validation.txt</value> + </property> + <property> + <name>dt.operator.validationToFile.prop.filePath</name> + <value>/tmp/exactlyonceoutput</value> + </property> +</configuration> + http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b5c003c9/examples/kafka/src/test/java/org/apache/apex/examples/kafka/exactlyonceoutput/ApplicationTest.java ---------------------------------------------------------------------- diff --git a/examples/kafka/src/test/java/org/apache/apex/examples/kafka/exactlyonceoutput/ApplicationTest.java b/examples/kafka/src/test/java/org/apache/apex/examples/kafka/exactlyonceoutput/ApplicationTest.java new file mode 100644 index 0000000..cc4f63c --- /dev/null +++ b/examples/kafka/src/test/java/org/apache/apex/examples/kafka/exactlyonceoutput/ApplicationTest.java @@ -0,0 +1,150 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.kafka.exactlyonceoutput; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FilenameFilter; +import java.io.IOException; + +import javax.validation.ConstraintViolationException; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.commons.io.FileUtils; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.LocalMode; + +import info.batey.kafka.unit.KafkaUnit; +import info.batey.kafka.unit.KafkaUnitRule; + +import static org.junit.Assert.fail; + +/** + * Test the DAG declaration in local mode. + */ +public class ApplicationTest +{ + private static final String directory = "target/exactlyonceoutput"; + private String tuplesUntilKill; + + private static final int zkPort = 2181; + private static final int brokerPort = 9092; + + private static final Logger logger = LoggerFactory.getLogger(ApplicationTest.class); + + // broker port must match properties.xml + @Rule + public KafkaUnitRule kafkaUnitRule = new KafkaUnitRule(zkPort, brokerPort); + + //remove '@After' to keep validation output file + @Before + @After + public void cleanup() + { + FileUtils.deleteQuietly(new File(directory)); + } + + @Test + public void testApplication() throws IOException, Exception + { + try { + createTopics(); + + // run app asynchronously; terminate after results are checked + Configuration conf = getConfig(); + LocalMode lma = LocalMode.newInstance(); + lma.prepareDAG(new Application(), conf); + ValidationToFile validationToFile = (ValidationToFile)lma.getDAG().getOperatorMeta("validationToFile").getOperator(); + LocalMode.Controller lc = lma.getController(); + lc.runAsync(); + + // waits for the validation application to be done before shutting it down and checking its output + int count = 1; + int maxSleepRounds = 300; + while (!validationToFile.validationDone) { + logger.info("Sleeping ...."); + Thread.sleep(500); + if (count > maxSleepRounds) { + fail("validationDone flag did not get set to true in ValidationToFile operator"); + } + count++; + } + lc.shutdown(); + checkOutput(); + } catch (ConstraintViolationException e) { + fail("constraint violations: " + e.getConstraintViolations()); + } + } + + private Configuration getConfig() + { + Configuration conf = new Configuration(false); + conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties-KafkaExactlyOnceOutput.xml")); + conf.set("dt.operator.passthrough.prop.directoryPath", directory); + conf.set("dt.operator.validationToFile.prop.filePath", directory); + tuplesUntilKill = conf.get("dt.operator.passthrough.prop.tuplesUntilKill"); + return conf; + } + + private void createTopics() throws Exception + { + KafkaUnit ku = kafkaUnitRule.getKafkaUnit(); + ku.createTopic("exactly-once"); + ku.createTopic("at-least-once"); + } + + private void checkOutput() throws IOException + { + String validationOutput = ""; + File folder = new File(directory); + + FilenameFilter filenameFilter = new FilenameFilter() + { + @Override + public boolean accept(File dir, String name) + { + if (name.split("_")[0].equals("validation.txt")) { + return true; + } + return false; + } + }; + File validationFile = folder.listFiles(filenameFilter)[0]; + try (FileInputStream inputStream = new FileInputStream(validationFile)) { + while (validationOutput.isEmpty()) { + validationOutput = IOUtils.toString(inputStream); + logger.info("Validation output: {}", validationOutput); + } + } + + Assert.assertTrue(validationOutput.contains("exactly-once: 0")); + + //assert works only for tuplesUntilKill values low enough to kill operator before checkpointing + Assert.assertEquals("Duplicates: exactly-once: 0, at-least-once: " + tuplesUntilKill, validationOutput); + } +}
