Repository: apex-malhar Updated Branches: refs/heads/master 70154f641 -> 70caa8909
APEXMALHAR-2298 Making the KafkaExactlyOnceOutputOperator take generic object as input. Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/70caa890 Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/70caa890 Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/70caa890 Branch: refs/heads/master Commit: 70caa8909284b6cebd62b3b96ad817432b5e9df7 Parents: 70154f6 Author: Sandesh Hegde <[email protected]> Authored: Tue Nov 8 11:25:16 2016 -0800 Committer: Sandesh Hegde <[email protected]> Committed: Wed Jan 4 09:01:57 2017 -0800 ---------------------------------------------------------------------- ...afkaSinglePortExactlyOnceOutputOperator.java | 54 ++++--- .../apache/apex/malhar/kafka/KafkaHelper.java | 65 ++++++++ .../malhar/kafka/KafkaOutputOperatorTest.java | 159 ++++++++++++------- 3 files changed, 197 insertions(+), 81 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/70caa890/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java ---------------------------------------------------------------------- diff --git a/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java b/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java index 6511cd4..ff16610 100644 --- a/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java +++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java @@ -32,14 +32,13 @@ import org.slf4j.LoggerFactory; import org.apache.apex.malhar.lib.wal.FSWindowDataManager; import org.apache.apex.malhar.lib.wal.WindowDataManager; - import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; - import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; @@ -55,6 +54,11 @@ import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZE * Kafka output operator with exactly once processing semantics. *<br> * + * <p> + * <b>Requirements</b> + * <li>In the Kafka message, only Value will be available for users</li> + * <li>Users need to provide Value deserializers for Kafka message as it is used during recovery</li> + * <li>Value type should have well defined Equals & HashCodes, as during messages are stored in HashMaps for comparison.</li> * <p> * <b>Recovery handling</b> * <li> Offsets of the Kafka partitions are stored in the WindowDataManager at the endWindow</li> @@ -106,8 +110,8 @@ public class KafkaSinglePortExactlyOnceOutputOperator<T> extends AbstractKafkaOu private final int KAFKA_CONNECT_ATTEMPT = 10; private final String KEY_SEPARATOR = "#"; - private final String KEY_DESERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer"; - private final String VALUE_DESERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer"; + public static final String KEY_DESERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer"; + public static final String KEY_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer"; public final transient DefaultInputPort<T> inputPort = new DefaultInputPort<T>() { @@ -121,12 +125,19 @@ public class KafkaSinglePortExactlyOnceOutputOperator<T> extends AbstractKafkaOu @Override public void setup(Context.OperatorContext context) { + setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KEY_SERIALIZER); + + if (getProperties().getProperty(VALUE_DESERIALIZER_CLASS_CONFIG) == null) { + throw new IllegalArgumentException("Value deserializer needs to be set for the operator, as it is used during recovery."); + } + super.setup(context); this.operatorId = context.getId(); this.windowDataManager.setup(context); this.appName = context.getValue(Context.DAGContext.APPLICATION_NAME); this.key = appName + KEY_SEPARATOR + (new Integer(operatorId)); + this.consumer = KafkaConsumerInit(); } @@ -211,7 +222,7 @@ public class KafkaSinglePortExactlyOnceOutputOperator<T> extends AbstractKafkaOu private boolean alreadyInKafka(T message) { - if ( windowId <= windowDataManager.getLargestCompletedWindow() ) { + if (windowId <= windowDataManager.getLargestCompletedWindow()) { return true; } @@ -219,17 +230,15 @@ public class KafkaSinglePortExactlyOnceOutputOperator<T> extends AbstractKafkaOu Integer val = partialWindowTuples.get(message); - if ( val == 0 ) { + if (val == 0) { return false; - } else if ( val == 1 ) { + } else if (val == 1) { partialWindowTuples.remove(message); } else { partialWindowTuples.put(message, val - 1); } - return true; } - return false; } @@ -245,10 +254,8 @@ public class KafkaSinglePortExactlyOnceOutputOperator<T> extends AbstractKafkaOu Map<Integer,Long> parttionsAndOffset = new HashMap<>(); consumer.assign(topicPartitionList); - for ( PartitionInfo partitionInfo: partitionInfoList) { - + for (PartitionInfo partitionInfo: partitionInfoList) { try { - TopicPartition topicPartition = new TopicPartition(getTopic(), partitionInfo.partition()); if (latest) { consumer.seekToEnd(topicPartition); @@ -256,7 +263,6 @@ public class KafkaSinglePortExactlyOnceOutputOperator<T> extends AbstractKafkaOu consumer.seekToBeginning(topicPartition); } parttionsAndOffset.put(partitionInfo.partition(), consumer.position(topicPartition)); - } catch (Exception ex) { throw new RuntimeException(ex); } @@ -280,13 +286,13 @@ public class KafkaSinglePortExactlyOnceOutputOperator<T> extends AbstractKafkaOu } if (currentOffsets == null) { - logger.debug("No tuples found while building partial window " + windowDataManager.getLargestCompletedWindow()); + logger.info("No tuples found while building partial window " + windowDataManager.getLargestCompletedWindow()); return; } if (storedOffsets == null) { - logger.debug("Stored offset not available, seeking to the beginning of the Kafka Partition."); + logger.info("Stored offset not available, seeking to the beginning of the Kafka Partition."); try { storedOffsets = getPartitionsAndOffsets(false); @@ -298,14 +304,12 @@ public class KafkaSinglePortExactlyOnceOutputOperator<T> extends AbstractKafkaOu List<TopicPartition> topicPartitions = new ArrayList<>(); for (Map.Entry<Integer,Long> entry: currentOffsets.entrySet()) { - topicPartitions.add(new TopicPartition(getTopic(), entry.getKey())); } consumer.assign(topicPartitions); for (Map.Entry<Integer,Long> entry: currentOffsets.entrySet()) { - Long storedOffset = 0L; Integer currentPartition = entry.getKey(); Long currentOffset = entry.getValue(); @@ -327,9 +331,9 @@ public class KafkaSinglePortExactlyOnceOutputOperator<T> extends AbstractKafkaOu int kafkaAttempt = 0; - while ( true ) { + while (true) { - ConsumerRecords<String, String> consumerRecords = consumer.poll(100); + ConsumerRecords<String, T> consumerRecords = consumer.poll(100); if (consumerRecords.count() == 0) { if (kafkaAttempt++ == KAFKA_CONNECT_ATTEMPT) { @@ -341,15 +345,15 @@ public class KafkaSinglePortExactlyOnceOutputOperator<T> extends AbstractKafkaOu boolean crossedBoundary = false; - for (ConsumerRecord consumerRecord : consumerRecords) { + for (ConsumerRecord<String, T> consumerRecord : consumerRecords) { - if (!doesKeyBelongsToThisInstance(operatorId, (String)consumerRecord.key())) { + if (!doesKeyBelongsToThisInstance(operatorId, consumerRecord.key())) { continue; } - T value = (T)consumerRecord.value(); + T value = consumerRecord.value(); - if ( partialWindowTuples.containsKey(value)) { + if (partialWindowTuples.containsKey(value)) { Integer count = partialWindowTuples.get(value); partialWindowTuples.put(value, count + 1); } else { @@ -375,14 +379,14 @@ public class KafkaSinglePortExactlyOnceOutputOperator<T> extends AbstractKafkaOu props.put(BOOTSTRAP_SERVERS_CONFIG, getProperties().get(BOOTSTRAP_SERVERS_CONFIG)); props.put(KEY_DESERIALIZER_CLASS_CONFIG, KEY_DESERIALIZER); - props.put(VALUE_DESERIALIZER_CLASS_CONFIG, VALUE_DESERIALIZER); + props.put(VALUE_DESERIALIZER_CLASS_CONFIG, getProperties().get(VALUE_DESERIALIZER_CLASS_CONFIG)); return new KafkaConsumer<>(props); } protected void sendTuple(T tuple) { - if ( alreadyInKafka(tuple) ) { + if (alreadyInKafka(tuple)) { return; } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/70caa890/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaHelper.java ---------------------------------------------------------------------- diff --git a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaHelper.java b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaHelper.java new file mode 100644 index 0000000..c550032 --- /dev/null +++ b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaHelper.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.apex.malhar.kafka; + +import java.nio.ByteBuffer; +import java.util.Map; + +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serializer; + +public class KafkaHelper implements Serializer<KafkaOutputOperatorTest.Person>, Deserializer<KafkaOutputOperatorTest.Person> +{ + @Override + public KafkaOutputOperatorTest.Person deserialize(String s, byte[] bytes) + { + ByteBuffer byteBuffer = ByteBuffer.wrap(bytes); + int nameLength = byteBuffer.getInt(); + byte[] name = new byte[nameLength]; + + byteBuffer.get(name, 0, nameLength); + + return new KafkaOutputOperatorTest.Person(new String(name), byteBuffer.getInt()); + } + + @Override + public byte[] serialize(String s, KafkaOutputOperatorTest.Person person) + { + byte[] name = person.name.getBytes(); + + ByteBuffer byteBuffer = ByteBuffer.allocate(name.length + 4 + 4); + + byteBuffer.putInt(name.length); + byteBuffer.put(name); + byteBuffer.putInt(person.age); + + return byteBuffer.array(); + } + + @Override + public void configure(Map<String, ?> map, boolean b) + { + } + + @Override + public void close() + { + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/70caa890/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaOutputOperatorTest.java ---------------------------------------------------------------------- diff --git a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaOutputOperatorTest.java b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaOutputOperatorTest.java index 5d0e59a..58d69f6 100644 --- a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaOutputOperatorTest.java +++ b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaOutputOperatorTest.java @@ -25,6 +25,17 @@ import java.util.LinkedList; import java.util.List; import java.util.Properties; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import org.apache.apex.malhar.lib.wal.FSWindowDataManager; + +import org.apache.commons.io.FileUtils; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerConfig; + import com.datatorrent.api.Attribute; import com.datatorrent.api.Context; import com.datatorrent.api.DAG; @@ -33,32 +44,20 @@ import com.datatorrent.api.LocalMode; import com.datatorrent.api.Operator; import com.datatorrent.common.util.BaseOperator; import com.datatorrent.lib.helper.OperatorContextTestHelper; -import com.datatorrent.stram.StramLocalCluster; -import org.apache.apex.malhar.lib.wal.FSWindowDataManager; -import org.apache.apex.malhar.lib.wal.WindowDataManager; -import org.apache.commons.io.FileUtils; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Ignore; -import org.junit.Test; import static org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG; import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG; import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG; import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG; -@Ignore public class KafkaOutputOperatorTest extends KafkaOperatorTestBase { String testName; - private static List<String> tupleCollection = new LinkedList<>(); - private final String KEY_DESERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer"; - private final String VALUE_DESERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer"; - private final String KEY_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer"; - private final String VALUE_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer"; + private static List<Person> tupleCollection = new LinkedList<>(); + private final String VALUE_DESERIALIZER = "org.apache.apex.malhar.kafka.KafkaHelper"; + private final String VALUE_SERIALIZER = "org.apache.apex.malhar.kafka.KafkaHelper"; - public static String APPLICATION_PATH = baseDir + File.separator + StramLocalCluster.class.getName() + File.separator; + public static String APPLICATION_PATH = baseDir + File.separator + "MyKafkaApp" + File.separator; @Before public void before() @@ -71,14 +70,20 @@ public class KafkaOutputOperatorTest extends KafkaOperatorTestBase } } + @After + public void after() + { + FileUtils.deleteQuietly(new File(APPLICATION_PATH)); + } + @Test public void testExactlyOnceWithFailure() throws Exception { - List<String> toKafka = GenerateList(); + List<Person> toKafka = GenerateList(); sendDataToKafka(true, toKafka, true, false); - List<String> fromKafka = ReadFromKafka(); + List<Person> fromKafka = ReadFromKafka(); Assert.assertTrue("With Failure", compare(fromKafka, toKafka)); } @@ -86,11 +91,11 @@ public class KafkaOutputOperatorTest extends KafkaOperatorTestBase @Test public void testExactlyOnceWithNoFailure() throws Exception { - List<String> toKafka = GenerateList(); + List<Person> toKafka = GenerateList(); sendDataToKafka(true, toKafka, false, false); - List<String> fromKafka = ReadFromKafka(); + List<Person> fromKafka = ReadFromKafka(); Assert.assertTrue("With No Failure", compare(fromKafka, toKafka)); } @@ -98,14 +103,14 @@ public class KafkaOutputOperatorTest extends KafkaOperatorTestBase @Test public void testExactlyOnceWithDifferentTuplesAfterRecovery() throws Exception { - List<String> toKafka = GenerateList(); + List<Person> toKafka = GenerateList(); try { sendDataToKafka(true, toKafka, true, true); } catch (RuntimeException ex) { boolean expectedException = false; - if ( ex.getMessage().contains("Violates")) { + if (ex.getMessage().contains("Violates")) { expectedException = true; } @@ -119,11 +124,11 @@ public class KafkaOutputOperatorTest extends KafkaOperatorTestBase @Test public void testKafkaOutput() throws Exception { - List<String> toKafka = GenerateList(); + List<Person> toKafka = GenerateList(); sendDataToKafka(false, toKafka, false, false); - List<String> fromKafka = ReadFromKafka(); + List<Person> fromKafka = ReadFromKafka(); Assert.assertTrue("No failure", compare(fromKafka, toKafka)); } @@ -131,38 +136,42 @@ public class KafkaOutputOperatorTest extends KafkaOperatorTestBase @Test public void testKafkaOutputWithFailure() throws Exception { - List<String> toKafka = GenerateList(); + List<Person> toKafka = GenerateList(); sendDataToKafka(false, toKafka, true, true); - List<String> fromKafka = ReadFromKafka(); + List<Person> fromKafka = ReadFromKafka(); Assert.assertTrue("No failure", fromKafka.size() > toKafka.size()); } - private void sendDataToKafka(boolean exactlyOnce, List<String> toKafka, boolean hasFailure, boolean differentTuplesAfterRecovery) throws InterruptedException + private void sendDataToKafka(boolean exactlyOnce, List<Person> toKafka, boolean hasFailure, boolean differentTuplesAfterRecovery) throws InterruptedException { Properties props = new Properties(); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, VALUE_SERIALIZER); - props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KEY_SERIALIZER); + if (!exactlyOnce) { + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaSinglePortExactlyOnceOutputOperator.KEY_SERIALIZER); + } props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, getClusterConfig()); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, VALUE_DESERIALIZER); Attribute.AttributeMap attributeMap = new Attribute.AttributeMap.DefaultAttributeMap(); attributeMap.put(Context.DAGContext.APPLICATION_NAME, "MyKafkaApp"); + attributeMap.put(DAG.APPLICATION_PATH, APPLICATION_PATH); OperatorContextTestHelper.TestIdOperatorContext operatorContext = new OperatorContextTestHelper.TestIdOperatorContext(2, attributeMap); cleanUp(operatorContext); Operator kafkaOutput; - DefaultInputPort<String> inputPort; + DefaultInputPort<Person> inputPort; - if ( exactlyOnce ) { - KafkaSinglePortExactlyOnceOutputOperator kafkaOutputTemp = ResetKafkaOutput(testName, props, operatorContext); + if (exactlyOnce) { + KafkaSinglePortExactlyOnceOutputOperator<Person> kafkaOutputTemp = ResetKafkaOutput(testName, props, operatorContext); inputPort = kafkaOutputTemp.inputPort; kafkaOutput = kafkaOutputTemp; } else { - KafkaSinglePortOutputOperator<String,String> kafkaOutputTemp = ResetKafkaSimpleOutput(testName, props, operatorContext); + KafkaSinglePortOutputOperator<String, Person> kafkaOutputTemp = ResetKafkaSimpleOutput(testName, props, operatorContext); inputPort = kafkaOutputTemp.inputPort; kafkaOutput = kafkaOutputTemp; } @@ -181,14 +190,13 @@ public class KafkaOutputOperatorTest extends KafkaOperatorTestBase inputPort.getSink().put(toKafka.get(6)); inputPort.getSink().put(toKafka.get(7)); - if ( hasFailure ) { - - if ( exactlyOnce ) { - KafkaSinglePortExactlyOnceOutputOperator kafkaOutputTemp = ResetKafkaOutput(testName, props, operatorContext); + if (hasFailure) { + if (exactlyOnce) { + KafkaSinglePortExactlyOnceOutputOperator<Person> kafkaOutputTemp = ResetKafkaOutput(testName, props, operatorContext); inputPort = kafkaOutputTemp.inputPort; kafkaOutput = kafkaOutputTemp; } else { - KafkaSinglePortOutputOperator<String,String> kafkaOutputTemp = ResetKafkaSimpleOutput(testName, props, operatorContext); + KafkaSinglePortOutputOperator<String,Person> kafkaOutputTemp = ResetKafkaSimpleOutput(testName, props, operatorContext); inputPort = kafkaOutputTemp.inputPort; kafkaOutput = kafkaOutputTemp; } @@ -217,9 +225,9 @@ public class KafkaOutputOperatorTest extends KafkaOperatorTestBase cleanUp(operatorContext); } - private KafkaSinglePortExactlyOnceOutputOperator<String> ResetKafkaOutput(String testName, Properties props, Context.OperatorContext operatorContext) + private KafkaSinglePortExactlyOnceOutputOperator<Person> ResetKafkaOutput(String testName, Properties props, Context.OperatorContext operatorContext) { - KafkaSinglePortExactlyOnceOutputOperator<String> kafkaOutput = new KafkaSinglePortExactlyOnceOutputOperator<>(); + KafkaSinglePortExactlyOnceOutputOperator<Person> kafkaOutput = new KafkaSinglePortExactlyOnceOutputOperator<>(); kafkaOutput.setTopic(testName); kafkaOutput.setProperties(props); kafkaOutput.setup(operatorContext); @@ -227,9 +235,9 @@ public class KafkaOutputOperatorTest extends KafkaOperatorTestBase return kafkaOutput; } - private KafkaSinglePortOutputOperator<String,String> ResetKafkaSimpleOutput(String testName, Properties props, Context.OperatorContext operatorContext) + private KafkaSinglePortOutputOperator<String, Person> ResetKafkaSimpleOutput(String testName, Properties props, Context.OperatorContext operatorContext) { - KafkaSinglePortOutputOperator<String,String> kafkaOutput = new KafkaSinglePortOutputOperator<>(); + KafkaSinglePortOutputOperator<String,Person> kafkaOutput = new KafkaSinglePortOutputOperator<>(); kafkaOutput.setTopic(testName); kafkaOutput.setProperties(props); kafkaOutput.setup(operatorContext); @@ -239,7 +247,7 @@ public class KafkaOutputOperatorTest extends KafkaOperatorTestBase private void cleanUp(Context.OperatorContext operatorContext) { - WindowDataManager windowDataManager = new FSWindowDataManager(); + FSWindowDataManager windowDataManager = new FSWindowDataManager(); windowDataManager.setup(operatorContext); try { windowDataManager.committed(windowDataManager.getLargestCompletedWindow()); @@ -248,7 +256,7 @@ public class KafkaOutputOperatorTest extends KafkaOperatorTestBase } } - private boolean compare(List<String> fromKafka, List<String> toKafka) + private boolean compare(List<Person> fromKafka, List<Person> toKafka) { if (fromKafka.size() != toKafka.size()) { return false; @@ -272,19 +280,18 @@ public class KafkaOutputOperatorTest extends KafkaOperatorTestBase (hasMultiCluster && hasMultiPartition ? "," + l + TEST_KAFKA_BROKER_PORT[1][1] : ""); } - private List<String> GenerateList() + private List<Person> GenerateList() { - List<String> strings = new ArrayList<>(); + List<Person> people = new ArrayList<>(); for (Integer i = 0; i < 12; ++i) { - - strings.add(i.toString()); + people.add(new Person(i.toString(), i)); } - return strings; + return people; } - public List<String> ReadFromKafka() + private List<Person> ReadFromKafka() { tupleCollection.clear(); @@ -292,11 +299,10 @@ public class KafkaOutputOperatorTest extends KafkaOperatorTestBase Properties props = new Properties(); props.put(BOOTSTRAP_SERVERS_CONFIG, getClusterConfig()); props.put(BOOTSTRAP_SERVERS_CONFIG, getClusterConfig()); - props.put(KEY_DESERIALIZER_CLASS_CONFIG, KEY_DESERIALIZER); + props.put(KEY_DESERIALIZER_CLASS_CONFIG, KafkaSinglePortExactlyOnceOutputOperator.KEY_DESERIALIZER); props.put(VALUE_DESERIALIZER_CLASS_CONFIG, VALUE_DESERIALIZER); props.put(GROUP_ID_CONFIG, "KafkaTest"); - LocalMode lma = LocalMode.newInstance(); DAG dag = lma.getDAG(); @@ -316,7 +322,6 @@ public class KafkaOutputOperatorTest extends KafkaOperatorTestBase // Connect ports dag.addStream("Kafka message", node.outputPort, collector1.inputPort); - // Create local cluster final LocalMode.Controller lc = lma.getController(); lc.setHeartbeatMonitoringEnabled(false); @@ -328,7 +333,6 @@ public class KafkaOutputOperatorTest extends KafkaOperatorTestBase public static class CollectorModule extends BaseOperator { - public final transient CollectorInputPort inputPort = new CollectorInputPort(this); long currentWindowId; @@ -353,7 +357,6 @@ public class KafkaOutputOperatorTest extends KafkaOperatorTestBase { super.endWindow(); } - } public static class CollectorInputPort extends DefaultInputPort<byte[]> @@ -368,8 +371,52 @@ public class KafkaOutputOperatorTest extends KafkaOperatorTestBase @Override public void process(byte[] bt) { - String tuple = new String(bt); - tupleCollection.add(tuple); + tupleCollection.add(new KafkaHelper().deserialize("r", bt)); + } + } + + public static class Person + { + public String name; + public Integer age; + + public Person(String name, Integer age) + { + this.name = name; + this.age = age; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + Person person = (Person)o; + + if (name != null ? !name.equals(person.name) : person.name != null) { + return false; + } + + return age != null ? age.equals(person.age) : person.age == null; + } + + @Override + public int hashCode() + { + int result = name != null ? name.hashCode() : 0; + result = 31 * result + (age != null ? age.hashCode() : 0); + return result; + } + + @Override + public String toString() + { + return name + age.toString(); } } }
