MLHR-1956 Added POJO Kafka Output operator with autometrics and batch processing
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/24fdbdfc Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/24fdbdfc Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/24fdbdfc Branch: refs/heads/master Commit: 24fdbdfcfe37ef211563173952a676a73110dcd6 Parents: 6ee61fc Author: Chaitanya <[email protected]> Authored: Wed Jan 6 10:52:04 2016 +0530 Committer: Chaitanya <[email protected]> Committed: Wed Jan 6 10:52:04 2016 +0530 ---------------------------------------------------------------------- .../kafka/AbstractKafkaOutputOperator.java | 19 ++ .../contrib/kafka/POJOKafkaOutputOperator.java | 261 +++++++++++++++++++ .../contrib/kafka/KafkaOutputOperatorTest.java | 67 +++++ 3 files changed, 347 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/24fdbdfc/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaOutputOperator.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaOutputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaOutputOperator.java index 77bf42b..f0835c4 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaOutputOperator.java +++ b/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaOutputOperator.java @@ -151,4 +151,23 @@ public abstract class AbstractKafkaOutputOperator<K, V> implements Operator public void endWindow() { } + + /** + * Return the additional producer properties + * @return producerProperties + */ + public String getProducerProperties() + { + return producerProperties; + } + + /** + * Specify the additional producer properties in comma separated as string in the + * form of key1=value1,key2=value2,key3=value3,.. + * @param producerProperties Given properties as string + */ + public void setProducerProperties(String producerProperties) + { + this.producerProperties = producerProperties; + } } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/24fdbdfc/contrib/src/main/java/com/datatorrent/contrib/kafka/POJOKafkaOutputOperator.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/kafka/POJOKafkaOutputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/kafka/POJOKafkaOutputOperator.java new file mode 100644 index 0000000..5c1f695 --- /dev/null +++ b/contrib/src/main/java/com/datatorrent/contrib/kafka/POJOKafkaOutputOperator.java @@ -0,0 +1,261 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.contrib.kafka; + +import java.lang.reflect.Field; +import javax.validation.constraints.Min; +import javax.validation.constraints.NotNull; +import org.apache.commons.lang3.ClassUtils; +import com.datatorrent.api.AutoMetric; +import com.datatorrent.api.Context; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.lib.util.PojoUtils; + +import kafka.producer.KeyedMessage; +import kafka.producer.ProducerConfig; + +/** + * POJOKafkaOutputOperator extends from AbstractKafkaOutputOperator receives the POJO + * from upstream and converts to Kafka Messages and writes to kafka topic. + * <p> + * <br> + * Ports:<br> + * <b>Input</b>: Have only one input port<br> + * <b>Output</b>: No Output Port <br> + * <br> + * Properties:<br> + * <b>brokerList</b>: List of brokers in the form of Host1:Port1,Host2:Port2,Host3:port3,...<br> + * <b>keyField</b>: Specifies the field creates distribution of tuples to kafka partition. <br> + * <b>isBatchProcessing</b>: Specifies whether to write messages in batch or not. By default, + * the value is true <br> + * <b>batchSize</b>: Specifies the batch size.<br> + * <br> + * <br> + * </p> + * + * @displayName POJO Kafka Output + * @category Messaging + * @tags Output operator + * + */ +public class POJOKafkaOutputOperator extends AbstractKafkaOutputOperator<Object,Object> +{ + @AutoMetric + private long outputMessagesPerSec; + @AutoMetric + private long outputBytesPerSec; + protected final String BROKER_KEY = "metadata.broker.list"; + protected final String BATCH_NUM_KEY = "batch.num.messages"; + protected final String PRODUCER_KEY = "producer.type"; + protected final String QUEUE_BUFFER_KEY = "queue.buffering.max.ms"; + protected final String ASYNC_PRODUCER_TYPE = "async"; + private long messageCount; + private long byteCount; + private String brokerList; + private double windowTimeSec; + private String keyField = ""; + protected boolean isBatchProcessing = true; + @Min(2) + protected int batchSize; + protected transient PojoUtils.Getter keyMethod; + protected transient Class<?> pojoClass; + + public final transient DefaultInputPort<Object> inputPort = new DefaultInputPort<Object>() { + @Override + public void setup(Context.PortContext context) + { + if (context.getAttributes().contains(Context.PortContext.TUPLE_CLASS)) { + pojoClass = context.getAttributes().get(Context.PortContext.TUPLE_CLASS); + } + } + + @Override + public void process(Object tuple) + { + processTuple(tuple); + } + }; + + /** + * setup producer configuration. + * @return ProducerConfig + */ + @Override + protected ProducerConfig createKafkaProducerConfig() + { + if (brokerList != null) { + getConfigProperties().setProperty(BROKER_KEY, brokerList); + } + if (isBatchProcessing) { + if (batchSize != 0) { + getConfigProperties().setProperty(BATCH_NUM_KEY, String.valueOf(batchSize)); + } + getConfigProperties().setProperty(PRODUCER_KEY, ASYNC_PRODUCER_TYPE); + } + return super.createKafkaProducerConfig(); + } + + @Override + public void setup(Context.OperatorContext context) + { + if (isBatchProcessing) { + getConfigProperties().setProperty(QUEUE_BUFFER_KEY, String.valueOf(context.getValue( + Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS))); + } + super.setup(context); + windowTimeSec = (context.getValue(Context.OperatorContext.APPLICATION_WINDOW_COUNT) + * context.getValue(Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS) * 1.0) / 1000.0; + if (pojoClass != null && keyField != "") { + try { + keyMethod = generateGetterForKeyField(); + } catch (NoSuchFieldException e) { + throw new RuntimeException("Field " + keyField + " is invalid: " + e); + } + } + } + + @Override + public void beginWindow(long windowId) + { + super.beginWindow(windowId); + outputMessagesPerSec = 0; + outputBytesPerSec = 0; + messageCount = 0; + byteCount = 0; + } + + /** + * Write the incoming tuple to Kafka + * @param tuple incoming tuple + */ + protected void processTuple(Object tuple) + { + // Get the getter method from the keyField + if (keyMethod == null && keyField != "") { + pojoClass = tuple.getClass(); + try { + keyMethod = generateGetterForKeyField(); + } catch (NoSuchFieldException e) { + throw new RuntimeException("Field " + keyField + " is invalid: " + e); + } + } + + // Convert the given tuple to KeyedMessage + KeyedMessage msg; + if (keyMethod != null) { + msg = new KeyedMessage(getTopic(), keyMethod.get(tuple), tuple); + } else { + msg = new KeyedMessage(getTopic(), tuple, tuple); + } + + getProducer().send(msg); + messageCount++; + if (tuple instanceof byte[]) { + byteCount += ((byte[])tuple).length; + } + } + + @Override + public void endWindow() + { + super.endWindow(); + outputBytesPerSec = (long)(byteCount / windowTimeSec); + outputMessagesPerSec = (long)(messageCount / windowTimeSec); + } + + private PojoUtils.Getter generateGetterForKeyField() throws NoSuchFieldException, SecurityException + { + Field f = pojoClass.getDeclaredField(keyField); + Class c = ClassUtils.primitiveToWrapper(f.getType()); + PojoUtils.Getter classGetter = PojoUtils.createGetter(pojoClass, keyField, c); + return classGetter; + } + + /** + * Returns the broker list of kafka clusters + * @return the broker list + */ + public String getBrokerList() + { + return brokerList; + } + + /** + * Sets the broker list with the given list + * @param brokerList + */ + public void setBrokerList(@NotNull String brokerList) + { + this.brokerList = brokerList; + } + + /** + * Specifies whether want to write in batch or not. + * @return isBatchProcessing + */ + public boolean isBatchProcessing() + { + return isBatchProcessing; + } + + /** + * Specifies whether want to write in batch or not. + * @param batchProcessing given batchProcessing + */ + public void setBatchProcessing(boolean batchProcessing) + { + isBatchProcessing = batchProcessing; + } + + /** + * Returns the batch size + * @return batch size + */ + public int getBatchSize() + { + return batchSize; + } + + /** + * Sets the batch size + * @param batchSize batch size + */ + public void setBatchSize(int batchSize) + { + this.batchSize = batchSize; + } + + /** + * Returns the key field + * @return the key field + */ + public String getKeyField() + { + return keyField; + } + + /** + * Sets the key field which specifies the messages writes to Kafka based on this key. + * @param keyField the key field + */ + public void setKeyField(String keyField) + { + this.keyField = keyField; + } +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/24fdbdfc/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaOutputOperatorTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaOutputOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaOutputOperatorTest.java index a09fdc5..cc1f267 100644 --- a/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaOutputOperatorTest.java +++ b/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaOutputOperatorTest.java @@ -182,5 +182,72 @@ public class KafkaOutputOperatorTest extends KafkaOperatorTestBase listener.close(); } + /** + * Test AbstractKafkaOutputOperator (i.e. an output adapter for Kafka, aka producer). + * This module sends data into kafka message bus. + * + * [Generate tuple] ==> [send tuple through Kafka output adapter(i.e. producer) into Kafka message bus] + * ==> [receive data in outside Kaka listener (i.e consumer)] + * + * @throws Exception + */ + @Test + @SuppressWarnings({"rawtypes", "unchecked"}) + public void testPOJOKafkaOutputOperator() throws Exception + { + tupleCount = 0; + //initialize the latch to synchronize the threads + latch = new CountDownLatch(maxTuple); + // Setup a message listener to receive the message + KafkaTestConsumer listener = new KafkaTestConsumer("topic1"); + listener.setLatch(latch); + new Thread(listener).start(); + + // Create DAG for testing. + LocalMode lma = LocalMode.newInstance(); + + StreamingApplication app = new StreamingApplication() { + @Override + public void populateDAG(DAG dag, Configuration conf) + { + } + }; + + DAG dag = lma.getDAG(); + StringGeneratorInputOperator generator = dag.addOperator("TestStringGenerator", StringGeneratorInputOperator.class); + POJOKafkaOutputOperator node = dag.addOperator("KafkaMessageProducer", POJOKafkaOutputOperator.class); + + Properties props = new Properties(); + props.setProperty("serializer.class", "kafka.serializer.StringEncoder"); + props.setProperty("producer.type", "async"); + props.setProperty("queue.buffering.max.ms", "200"); + props.setProperty("queue.buffering.max.messages", "10"); + + node.setConfigProperties(props); + node.setTopic("topic1"); + node.setBrokerList("localhost:9092"); + node.setBatchSize(5); + + // Connect ports + dag.addStream("Kafka message", generator.outputPort, node.inputPort).setLocality(DAG.Locality.CONTAINER_LOCAL); + + Configuration conf = new Configuration(false); + lma.prepareDAG(app, conf); + + // Create local cluster + final LocalMode.Controller lc = lma.getController(); + lc.runAsync(); + + // Immediately return unless latch timeout in 20 seconds + latch.await(20, TimeUnit.SECONDS); + lc.shutdown(); + + // Check values send vs received + Assert.assertEquals("Number of emitted tuples", maxTuple, listener.holdingBuffer.size()); + logger.debug(String.format("Number of emitted tuples: %d", listener.holdingBuffer.size())); + Assert.assertEquals("First tuple", "testString 1", listener.getMessage(listener.holdingBuffer.peek())); + + listener.close(); + } }
