Repository: apex-malhar Updated Branches: refs/heads/master c4a11299b -> cc9d50366
APEXMALHAR-2076 #resolve #comment add AbstractTupleUniqueExactlyOnceKafkaOutputOperator Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/33a5c2ec Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/33a5c2ec Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/33a5c2ec Branch: refs/heads/master Commit: 33a5c2ec95c9ee6f33023ea4ae82d156a140cb25 Parents: 72de840 Author: brightchen <[email protected]> Authored: Wed May 25 13:01:26 2016 -0700 Committer: brightchen <[email protected]> Committed: Thu Jun 2 11:18:21 2016 -0700 ---------------------------------------------------------------------- .../kafka/AbstractKafkaOutputOperator.java | 2 +- ...pleUniqueExactlyOnceKafkaOutputOperator.java | 610 +++++++++++++++++++ .../contrib/kafka/KafkaMetadataUtil.java | 121 +++- .../datatorrent/contrib/kafka/KafkaUtil.java | 358 +++++++++++ ...upleUniqueExactlyOnceOutputOperatorTest.java | 512 ++++++++++++++++ .../contrib/kafka/KafkaUtilTester.java | 128 ++++ contrib/src/test/resources/log4j.properties | 1 + 7 files changed, 1713 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/33a5c2ec/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaOutputOperator.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaOutputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaOutputOperator.java index f0835c4..8003669 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaOutputOperator.java +++ b/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaOutputOperator.java @@ -100,7 +100,7 @@ public abstract class AbstractKafkaOutputOperator<K, V> implements Operator return new ProducerConfig(configProperties); } - + public Producer<K, V> getProducer() { return producer; http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/33a5c2ec/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractTupleUniqueExactlyOnceKafkaOutputOperator.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractTupleUniqueExactlyOnceKafkaOutputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractTupleUniqueExactlyOnceKafkaOutputOperator.java new file mode 100644 index 0000000..15fea37 --- /dev/null +++ b/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractTupleUniqueExactlyOnceKafkaOutputOperator.java @@ -0,0 +1,610 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.contrib.kafka; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.classification.InterfaceStability; + +import com.google.common.base.Splitter; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; + +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.common.util.Pair; + +import kafka.api.OffsetRequest; +import kafka.javaapi.consumer.SimpleConsumer; +import kafka.javaapi.producer.Producer; +import kafka.producer.KeyedMessage; +import kafka.producer.ProducerConfig; +import kafka.serializer.StringDecoder; + +/** + * Assumptions: - assume the value of incoming tuples are not duplicate(at least + * in one window) among all operator partitions. - assume one Kafka partition + * can be written by multiple operator partitions at the same time - assume the + * the Kafka partition was decided by tuple value itself( not depended on + * operator partition) + * + * Notes: - the order of data could be changed when replay. - the data could go + * to the other partition when replay. For example if the upstream operator + * failed. + * + * Implementation: for each Kafka partition, load minimum last window and the + * minimum offset of the last window of all operator partitions. And then load + * the tuples from Kafka based on this minimum offset. When processing tuple, if + * the window id is less than the minimum last window, just ignore the tuple. If + * window id equals loaded minimum window id, and tuple equals any of loaded + * tuple, ignore it. Else, send to Kafka + * + * @displayName Abstract Tuple Unique Exactly Once Kafka Output + * @category Messaging + * @tags output operator + */ [email protected] +public abstract class AbstractTupleUniqueExactlyOnceKafkaOutputOperator<T, K, V> + extends AbstractKafkaOutputOperator<K, V> +{ + public static final String DEFAULT_CONTROL_TOPIC = "ControlTopic"; + protected transient int partitionNum = 1; + + /** + * allow client set the partitioner as partitioner may need some attributes + */ + protected kafka.producer.Partitioner partitioner; + + protected transient int operatorPartitionId; + + protected String controlTopic = DEFAULT_CONTROL_TOPIC; + + //The control info includes the time, use this time to track the head of control info we care. + protected int controlInfoTrackBackTime = 120000; + + /** + * max number of offset need to check + */ + protected int maxNumOffsetsOfControl = 1000; + + protected String controlProducerProperties; + protected Set<String> brokerSet; + + protected transient long currentWindowId; + + /** + * the map from Kafka partition id to the control offset. this one is + * checkpointed and as the start offset to load the recovery control + * information Note: this only keep the information of this operator + * partition. + */ + protected transient Map<Integer, Long> partitionToLastControlOffset = Maps.newHashMap(); + + /** + * keep the minimal last window id for recovery. If only one partition + * crashed, it is ok just use the last window id of this operator partition as + * the recovery window id If all operator partitions crashed, should use the + * minimal last window id as the recovery window id, as the data may go to the + * other partitions. But as the operator can't distinguish which is the case. + * use the most general one. + */ + protected transient long minRecoveryWindowId = -2; + protected transient long maxRecoveryWindowId = -2; + + /** + * A map from Kafka partition id to lastMessages writtten to this kafka + * partition. This information was loaded depends on the + * RecoveryControlInfo.kafkaPartitionIdToOffset + */ + protected transient Map<Integer, List<Pair<byte[], byte[]>>> partitionToLastMsgs = Maps.newHashMap(); + + /** + * The messages are assume to written to the kafka partition decided by + * tupleToKeyValue(T tuple) and partitioner. But it also depended on the + * system. for example, it could be only one partition when create topic. + * Don't distinguish kafka partitions if partition is not reliable. + */ + protected transient Set<Pair<byte[], byte[]>> totalLastMsgs = Sets.newHashSet(); + + protected transient RecoveryControlInfo controlInfo = new RecoveryControlInfo(); + protected transient Producer<String, String> controlDataProducer; + protected transient StringDecoder controlInfoDecoder; + + @Override + public void setup(OperatorContext context) + { + getBrokerSet(); + + super.setup(context); + controlInfoDecoder = new StringDecoder(null); + + operatorPartitionId = context.getId(); + + controlDataProducer = new Producer<String, String>(createKafkaControlProducerConfig()); + + if (partitioner == null) { + createDefaultPartitioner(); + } + + loadControlData(); + } + + @Override + public void beginWindow(long windowId) + { + currentWindowId = windowId; + } + + /** + * Implement Operator Interface. + */ + @Override + public void endWindow() + { + //we'd better flush the cached tuples, but Kafka 0.8.1 doesn't support flush. + //keep the control information of this operator partition to control topic + saveControlData(); + } + + protected void createDefaultPartitioner() + { + try { + String className = (String)getConfigProperties().get(KafkaMetadataUtil.PRODUCER_PROP_PARTITIONER); + if (className != null) { + partitioner = (kafka.producer.Partitioner)Class.forName(className).newInstance(); + } + } catch (Exception e) { + throw new RuntimeException("Failed to initialize partitioner", e); + } + } + + /** + * load control data OUTPUT: lastMsgs and partitionToMinLastWindowId + */ + protected void loadControlData() + { + long loadDataTime = System.currentTimeMillis(); + + final String clientNamePrefix = getClientNamePrefix(); + Map<Integer, SimpleConsumer> consumers = KafkaUtil.createSimpleConsumers(clientNamePrefix, brokerSet, controlTopic); + if (consumers == null || consumers.size() != 1) { + logger.error("The consumer for recovery information was not expected. {}", consumers); + return; + } + final SimpleConsumer consumer = consumers.get(0); + if (consumer == null) { + logger.error("No consumer for recovery information."); + return; + } + + long latestOffset = KafkaMetadataUtil.getLastOffset(consumer, controlTopic, 0, OffsetRequest.LatestTime(), + KafkaMetadataUtil.getClientName(clientNamePrefix, controlTopic, 0)); + logger.debug("latestOffsets: {}", latestOffset); + if (latestOffset <= 0) { + return; + } + + int batchMessageSize = 100; + List<Pair<byte[], byte[]>> messages = Lists.newArrayList(); + + boolean isControlMessageEnough = false; + Map<Integer, RecoveryControlInfo> operatorPartitionIdToLastControlInfo = Maps.newHashMap(); + + while (latestOffset > 0 && !isControlMessageEnough) { + long startOffset = latestOffset - batchMessageSize + 1; + if (startOffset < 0) { + startOffset = 0; + } + + //read offsets as batch and handle them. + messages.clear(); + KafkaUtil.readMessagesBetween(consumer, KafkaMetadataUtil.getClientName(clientNamePrefix, controlTopic, 0), + controlTopic, 0, startOffset, latestOffset - 1, messages, 3); + for (Pair<byte[], byte[]> message : messages) { + //handle the message; we have to handle all the message. + RecoveryControlInfo rci = RecoveryControlInfo.fromString((String)controlInfoDecoder.fromBytes(message.second)); + isControlMessageEnough = (loadControlInfoIntermedia(rci, loadDataTime, + operatorPartitionIdToLastControlInfo) == 0); + + if (isControlMessageEnough) { + break; + } + } + + latestOffset = startOffset - 1; + } + + loadRecoveryWindowId(operatorPartitionIdToLastControlInfo); + loadLastMessages(operatorPartitionIdToLastControlInfo); + } + + /** + * load the recovery window id. right now use the minimal window id as the + * recovery window id Different Operator partitions maybe crashed at different + * window. use the minimal window of all operator partitions as the window for + * recovery. + * + * @param operatorPartitionIdToLastWindowId + */ + protected void loadRecoveryWindowId(Map<Integer, RecoveryControlInfo> operatorPartitionIdToLastControlInfo) + { + for (RecoveryControlInfo rci : operatorPartitionIdToLastControlInfo.values()) { + if (minRecoveryWindowId < 0 || rci.windowId < minRecoveryWindowId) { + minRecoveryWindowId = rci.windowId; + } + if (maxRecoveryWindowId < 0 || rci.windowId > maxRecoveryWindowId) { + maxRecoveryWindowId = rci.windowId; + } + } + } + + /** + * load control information from intermedia to + * + * @param operatorPartitionIdToLastWindowId + * @param operatorToKafkaToOffset + */ + protected void loadLastMessages(Map<Integer, RecoveryControlInfo> operatorPartitionIdToLastControlInfo) + { + partitionToLastControlOffset.clear(); + + for (Map.Entry<Integer, RecoveryControlInfo> entry : operatorPartitionIdToLastControlInfo.entrySet()) { + RecoveryControlInfo rci = entry.getValue(); + if (rci.windowId == this.minRecoveryWindowId) { + //get the minimal offset + for (Map.Entry<Integer, Long> kafkaPartitionEntry : rci.kafkaPartitionIdToOffset.entrySet()) { + Long offset = partitionToLastControlOffset.get(kafkaPartitionEntry.getKey()); + if (offset == null || offset > kafkaPartitionEntry.getValue()) { + partitionToLastControlOffset.put(kafkaPartitionEntry.getKey(), kafkaPartitionEntry.getValue()); + } + } + } + } + + partitionToLastMsgs.clear(); + + KafkaUtil.readMessagesAfterOffsetTo(getClientNamePrefix(), brokerSet, getTopic(), partitionToLastControlOffset, + partitionToLastMsgs); + + loadTotalLastMsgs(); + } + + /** + * load Total Last Messages from partitionToLastMsgs; + */ + protected void loadTotalLastMsgs() + { + totalLastMsgs.clear(); + if (partitionToLastMsgs == null || partitionToLastMsgs.isEmpty()) { + return; + } + for (List<Pair<byte[], byte[]>> msgs : partitionToLastMsgs.values()) { + totalLastMsgs.addAll(msgs); + } + } + + protected int loadControlInfoIntermedia(RecoveryControlInfo controlInfo, long loadDataTime, + Map<Integer, RecoveryControlInfo> operatorPartitionIdToLastControlInfo) + { + if (controlInfo.generateTime + controlInfoTrackBackTime < loadDataTime) { + return 0; + } + + //The record should be in ascent order, so the later should override the previous + operatorPartitionIdToLastControlInfo.put(controlInfo.partitionIdOfOperator, controlInfo); + + return 1; + } + + /** + * Current implementation we can get the number of operator partitions. So we + * we use the controlInfoTrackBackTime to control the trace back of control + * information. + * + * @param controlInfo + * @param loadDataTime + * @param operatorPartitionIdToLastWindowId + * @param operatorToKafkaToOffset + * @return 0 if control information is enough and don't need to load any more + */ + protected int loadControlInfoIntermedia(RecoveryControlInfo controlInfo, long loadDataTime, + Map<Integer, Long> operatorPartitionIdToLastWindowId, Map<Integer, Map<Integer, Long>> operatorToKafkaToOffset) + { + if (controlInfo.generateTime + controlInfoTrackBackTime < loadDataTime) { + return 0; + } + + //The record should be in ascent order, so the later should override the previous + operatorPartitionIdToLastWindowId.put(controlInfo.partitionIdOfOperator, controlInfo.windowId); + operatorToKafkaToOffset.put(controlInfo.partitionIdOfOperator, controlInfo.kafkaPartitionIdToOffset); + + return 1; + } + + /** + * save the control data. each operator partition only save its control data + */ + protected void saveControlData() + { + controlInfo.generateTime = System.currentTimeMillis(); + controlInfo.partitionIdOfOperator = operatorPartitionId; + controlInfo.windowId = this.currentWindowId; + if (controlInfo.kafkaPartitionIdToOffset == null) { + controlInfo.kafkaPartitionIdToOffset = Maps.newHashMap(); + } else { + controlInfo.kafkaPartitionIdToOffset.clear(); + } + KafkaMetadataUtil.getLastOffsetsTo(getClientNamePrefix(), brokerSet, getTopic(), + controlInfo.kafkaPartitionIdToOffset); + + //send to control topic + controlDataProducer.send(new KeyedMessage<String, String>(getControlTopic(), null, 0, controlInfo.toString())); + } + + protected String getClientNamePrefix() + { + return getClass().getName().replace('$', '.'); + } + + + protected Set<String> getBrokerSet() + { + if (brokerSet == null) { + brokerSet = Sets.newHashSet((String)getConfigProperties().get(KafkaMetadataUtil.PRODUCER_PROP_BROKERLIST)); + } + return brokerSet; + } + + /** + * This input port receives tuples that will be written out to Kafka. + */ + public final transient DefaultInputPort<T> inputPort = new DefaultInputPort<T>() + { + @Override + public void process(T tuple) + { + processTuple(tuple); + } + }; + + /** + * separate to a method to give sub-class chance to override + * + * @param tuple + */ + protected void processTuple(T tuple) + { + Pair<K, V> keyValue = tupleToKeyValue(tuple); + final int pid = getPartitionKey(keyValue.first); + + if (!skipTuple(pid, keyValue)) { + getProducer().send(new KeyedMessage<K, V>(getTopic(), keyValue.first, pid, keyValue.second)); + sendCount++; + } + } + + protected boolean skipTuple(int partitionId, Pair<K, V> msg) + { + if (currentWindowId <= minRecoveryWindowId) { + return true; + } + if (currentWindowId > maxRecoveryWindowId + 1) { + return false; + } + + return isDuplicateTuple(partitionId, msg); + } + + protected boolean isDuplicateTuple(int partitionId, Pair<K, V> msg) + { + Collection<Pair<byte[], byte[]>> lastMsgs = partitionToLastMsgs.get(partitionId); + + //check depended on the partition only + if (lastMsgs == null || lastMsgs.isEmpty()) { + lastMsgs = totalLastMsgs; + } + + for (Pair<byte[], byte[]> cachedMsg : lastMsgs) { + if (equals(cachedMsg, msg)) { + return true; + } + } + return false; + + } + + protected boolean equals(Pair<byte[], byte[]> cachedMsg, Pair<K, V> msg) + { + if (cachedMsg.first == null ^ msg.first == null) { + return false; + } + if (cachedMsg.second == null ^ msg.second == null) { + return false; + } + + if (cachedMsg.first == null && msg.first == null && cachedMsg.second == null && msg.second == null) { + return true; + } + + if (!equals(cachedMsg.first, msg.first)) { + return false; + } + + return equals(cachedMsg.second, msg.second); + } + + /** + * + * @param bytes + * @param value + * @return + */ + protected abstract <M> boolean equals(byte[] bytes, M value); + + /** + * get the partition key. for 0.8.1, If a partition key is provided it will + * override the key for the purpose of partitioning but will not be stored. + * + * @return + */ + protected int getPartitionKey(K key) + { + if (partitioner != null) { + return partitioner.partition(key, partitionNum); + } + + if (key != null) { + return key.hashCode(); + } + + //stick to the Kafka partition, so can't use round robbin + return 0; + } + + /** + * setup the configuration for control producer + * + * @return + */ + protected ProducerConfig createKafkaControlProducerConfig() + { + if (controlProducerProperties == null || controlProducerProperties.isEmpty()) { + controlProducerProperties = getProducerProperties(); + } + + Properties prop = new Properties(); + for (String propString : controlProducerProperties.split(",")) { + if (!propString.contains("=")) { + continue; + } + String[] keyVal = StringUtils.trim(propString).split("="); + prop.put(StringUtils.trim(keyVal[0]), StringUtils.trim(keyVal[1])); + } + + //only support String encoder now, overwrite + prop.setProperty("serializer.class", "kafka.serializer.StringEncoder"); + prop.setProperty("key.serializer.class", "kafka.serializer.StringEncoder"); + + Properties configProperties = this.getConfigProperties(); + configProperties.putAll(prop); + + return new ProducerConfig(configProperties); + } + + /** + * Tell the operator how to convert a input tuple to a kafka key value pair + * + * @param tuple + * @return A kafka key value pair. + */ + protected abstract Pair<K, V> tupleToKeyValue(T tuple); + + public kafka.producer.Partitioner getPartitioner() + { + return partitioner; + } + + public void setPartitioner(kafka.producer.Partitioner partitioner) + { + this.partitioner = partitioner; + } + + public String getControlTopic() + { + return controlTopic; + } + + public void setControlTopic(String controlTopic) + { + this.controlTopic = controlTopic; + } + + public String getControlProducerProperties() + { + return controlProducerProperties; + } + + public void setControlProducerProperties(String controlProducerProperties) + { + this.controlProducerProperties = controlProducerProperties; + } + + private static final Logger logger = LoggerFactory.getLogger(AbstractExactlyOnceKafkaOutputOperator.class); + + /** + * This class used to keep the recovery information + * + */ + protected static class RecoveryControlInfo + { + protected static final String SEPERATOR = "#"; + protected int partitionIdOfOperator; + protected long generateTime; + protected long windowId; + protected Map<Integer, Long> kafkaPartitionIdToOffset; + //( operatorPartitionId => ( lastWindowId, (KafkaPartitionId => offset) ) ) + + @Override + public String toString() + { + StringBuilder sb = new StringBuilder(); + sb.append(partitionIdOfOperator).append(SEPERATOR).append(generateTime).append(SEPERATOR).append(windowId); + sb.append(SEPERATOR).append(kafkaPartitionIdToOffset); + return sb.toString(); + } + + public static RecoveryControlInfo fromString(String str) + { + if (str == null || str.isEmpty()) { + throw new IllegalArgumentException("Input parameter is null or empty."); + } + String[] fields = str.split(SEPERATOR); + if (fields == null || fields.length != 4) { + throw new IllegalArgumentException( + "Invalid input String: \"" + str + "\", " + "expected fields seperated by '" + SEPERATOR + "'"); + } + + RecoveryControlInfo rci = new RecoveryControlInfo(); + rci.partitionIdOfOperator = Integer.valueOf(fields[0]); + rci.generateTime = Long.valueOf(fields[1]); + rci.windowId = Long.valueOf(fields[2]); + + String mapString = fields[3].trim(); + if (mapString.startsWith("{") && mapString.endsWith("}")) { + mapString = mapString.substring(1, mapString.length() - 1); + } + Map<String, String> idToOffsetAsString = Splitter.on(",").withKeyValueSeparator("=").split(mapString); + rci.kafkaPartitionIdToOffset = Maps.newHashMap(); + for (Map.Entry<String, String> entry : idToOffsetAsString.entrySet()) { + rci.kafkaPartitionIdToOffset.put(Integer.valueOf(entry.getKey()), Long.valueOf(entry.getValue())); + } + return rci; + } + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/33a5c2ec/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaMetadataUtil.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaMetadataUtil.java b/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaMetadataUtil.java index f6057cd..5f4d4c4 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaMetadataUtil.java +++ b/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaMetadataUtil.java @@ -28,11 +28,10 @@ import java.util.Map; import java.util.Set; import org.I0Itec.zkclient.ZkClient; -import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.collection.JavaConversions; +import org.apache.commons.lang3.StringUtils; import com.google.common.collect.Maps; import com.google.common.collect.Maps.EntryTransformer; @@ -50,6 +49,8 @@ import kafka.javaapi.consumer.SimpleConsumer; import kafka.utils.ZKStringSerializer$; import kafka.utils.ZkUtils; +import scala.collection.JavaConversions; + /** * A util class used to retrieve all the metadatas for partitions/topics * Every method in the class creates a temporary simple kafka consumer and @@ -69,7 +70,7 @@ public class KafkaMetadataUtil // A temporary client used to retrieve the metadata of topic/partition etc private static final String mdClientId = "Kafka_Metadata_Lookup_Client"; - private static final int timeout=10000; + private static final int timeout = 10000; //buffer size for MD lookup client is 128k should be enough for most cases private static final int bufferSize = 128 * 1024; @@ -95,20 +96,23 @@ public class KafkaMetadataUtil * @return Get the partition metadata list for the specific topic via the brokers * null if topic is not found */ - public static Map<String, List<PartitionMetadata>> getPartitionsForTopic(SetMultimap<String, String> brokers, final String topic) + public static Map<String, List<PartitionMetadata>> getPartitionsForTopic(SetMultimap<String, String> brokers, + final String topic) { - return Maps.transformEntries(brokers.asMap(), new EntryTransformer<String, Collection<String>, List<PartitionMetadata>>(){ - @Override - public List<PartitionMetadata> transformEntry(String key, Collection<String> bs) - { - return getPartitionsForTopic(new HashSet<String>(bs), topic); - }}); + return Maps.transformEntries(brokers.asMap(), + new EntryTransformer<String, Collection<String>, List<PartitionMetadata>>() + { + @Override + public List<PartitionMetadata> transformEntry(String key, Collection<String> bs) + { + return getPartitionsForTopic(new HashSet<String>(bs), topic); + } + }); } - - public static Set<String> getBrokers(Set<String> zkHost){ - - ZkClient zkclient = new ZkClient(StringUtils.join(zkHost, ',') ,30000, 30000, ZKStringSerializer$.MODULE$); + public static Set<String> getBrokers(Set<String> zkHost) + { + ZkClient zkclient = new ZkClient(StringUtils.join(zkHost, ','), 30000, 30000, ZKStringSerializer$.MODULE$); Set<String> brokerHosts = new HashSet<String>(); for (Broker b : JavaConversions.asJavaIterable(ZkUtils.getAllBrokersInCluster(zkclient))) { brokerHosts.add(b.getConnectionString()); @@ -149,7 +153,7 @@ public class KafkaMetadataUtil public static TopicMetadata getTopicMetadata(Set<String> brokerSet, String topic) { SimpleConsumer mdConsumer = null; - if (brokerSet == null || brokerSet == null || brokerSet.size() == 0) { + if (brokerSet == null || brokerSet.size() == 0) { return null; } try { @@ -191,12 +195,12 @@ public class KafkaMetadataUtil * @param partition * @param whichTime * @param clientName - * @return 0 if consumer is null at this time + * @return the last offset, value value should be >=0. Return <0 if consumer is null or error. */ public static long getLastOffset(SimpleConsumer consumer, String topic, int partition, long whichTime, String clientName) { if (consumer == null) { - return 0; + return -1; } TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition); Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>(); @@ -206,11 +210,92 @@ public class KafkaMetadataUtil if (response.hasError()) { logger.error("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition)); - return 0; + return -1; } long[] offsets = response.offsets(topic, partition); return offsets[0]; } + + /** + * this method wrapper kafka.javaapi.consumer.SimpleConsumer.getOffsetsBefore(OffsetRequest) + * @param consumer + * @param clientName + * @param topic + * @param partitionId + * @param time + * @param maxNumOffsets + * @return + */ + public static long[] getOffsetsBefore(SimpleConsumer consumer, String clientName, String topic, int partitionId, long time, int maxNumOffsets) + { + if (consumer == null) { + throw new IllegalArgumentException("consumer is not suppose to be null."); + } + + TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partitionId); + Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>(); + requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(time, maxNumOffsets)); + OffsetRequest request = new OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName); + OffsetResponse response = consumer.getOffsetsBefore(request); + + if (response.hasError()) { + logger.error("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partitionId)); + return null; + } + return response.offsets(topic, partitionId); + } + + + /** + * get the last offset of each partition to the partitionToOffset map + * @param clientNamePrefix + * @param brokerSet + * @param topic + * @param time + * @param partitionToOffset + */ + public static void getLastOffsetsTo(String clientNamePrefix, Set<String> brokerSet, String topic, + Map<Integer, Long> partitionToOffset) + { + // read last received kafka message + TopicMetadata tm = KafkaMetadataUtil.getTopicMetadata(brokerSet, topic); + + if (tm == null) { + throw new RuntimeException("Failed to retrieve topic metadata"); + } + + for (PartitionMetadata pm : tm.partitionsMetadata()) { + SimpleConsumer consumer = null; + try { + int partitionId = pm.partitionId(); + + String leadBroker = pm.leader().host(); + int port = pm.leader().port(); + final String clientName = getClientName(clientNamePrefix, topic, partitionId); + consumer = new SimpleConsumer(leadBroker, port, 100000, 64 * 1024, clientName); + + TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partitionId); + Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>(); + requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(kafka.api.OffsetRequest.LatestTime(), 1)); + OffsetRequest request = new OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName); + OffsetResponse response = consumer.getOffsetsBefore(request); + + if (response.hasError()) { + logger.error("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partitionId)); + } + partitionToOffset.put(partitionId, response.offsets(topic, partitionId)[0]); + } finally { + if (consumer != null) { + consumer.close(); + } + } + } + } + + public static String getClientName(String clientNamePrefix, String topic, int partitionId) + { + return clientNamePrefix + "_" + topic + "_" + partitionId; + } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/33a5c2ec/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaUtil.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaUtil.java b/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaUtil.java new file mode 100644 index 0000000..d49e462 --- /dev/null +++ b/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaUtil.java @@ -0,0 +1,358 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.contrib.kafka; + +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + +import com.datatorrent.common.util.Pair; + +import kafka.api.FetchRequest; +import kafka.api.FetchRequestBuilder; +import kafka.javaapi.FetchResponse; +import kafka.javaapi.PartitionMetadata; +import kafka.javaapi.TopicMetadata; +import kafka.javaapi.consumer.SimpleConsumer; +import kafka.javaapi.message.ByteBufferMessageSet; +import kafka.message.Message; +import kafka.message.MessageAndOffset; + +public class KafkaUtil +{ + private static Logger logger = LoggerFactory.getLogger(KafkaUtil.class); + public static final int DEFAULT_TIMEOUT = 200; + public static final int DEFAULT_BUFFER_SIZE = 64 * 10240; + public static final int DEFAULT_FETCH_SIZE = 200; + + /** + * read last message ( the start offset send from partitionToOffset ) of all + * partition to partitionToMessages + * + * @param clientNamePrefix + * @param brokerSet + * @param topic + * @param partitionToStartOffset + * @param partitionToMessages + */ + public static void readMessagesAfterOffsetTo(String clientNamePrefix, Set<String> brokerSet, String topic, + Map<Integer, Long> partitionToStartOffset, Map<Integer, List<Pair<byte[], byte[]>>> partitionToMessages) + { + // read last received kafka message + TopicMetadata tm = KafkaMetadataUtil.getTopicMetadata(brokerSet, topic); + + if (tm == null) { + throw new RuntimeException("Failed to retrieve topic metadata"); + } + + for (PartitionMetadata pm : tm.partitionsMetadata()) { + SimpleConsumer consumer = null; + try { + List<Pair<byte[], byte[]>> messagesOfPartition = partitionToMessages.get(pm.partitionId()); + if (messagesOfPartition == null) { + messagesOfPartition = Lists.newArrayList(); + partitionToMessages.put(pm.partitionId(), messagesOfPartition); + } + + long startOffset = partitionToStartOffset.get(pm.partitionId()) == null ? 0 + : partitionToStartOffset.get(pm.partitionId()); + final String clientName = KafkaMetadataUtil.getClientName(clientNamePrefix, tm.topic(), pm.partitionId()); + consumer = createSimpleConsumer(clientName, tm.topic(), pm); + + //the returned lastOffset is the offset which haven't written data to. + long lastOffset = KafkaMetadataUtil.getLastOffset(consumer, tm.topic(), pm.partitionId(), + kafka.api.OffsetRequest.LatestTime(), clientName); + logger.debug("lastOffset = {}", lastOffset); + if (lastOffset <= 0) { + continue; + } + + readMessagesBetween(consumer, clientName, topic, pm.partitionId(), startOffset, lastOffset - 1, + messagesOfPartition); + } finally { + if (consumer != null) { + consumer.close(); + } + } + } + } + + public static void readMessagesBetween(String clientNamePrefix, Set<String> brokerSet, String topic, int partitionId, + long startOffset, long endOffset, List<Pair<byte[], byte[]>> messages) + { + Map<Integer, SimpleConsumer> consumers = createSimpleConsumers(clientNamePrefix, brokerSet, topic); + if (consumers == null) { + throw new RuntimeException("Can't find any consumer."); + } + + SimpleConsumer consumer = consumers.get(partitionId); + if (consumer == null) { + throw new IllegalArgumentException("No consumer for partition: " + partitionId); + } + + readMessagesBetween(consumer, KafkaMetadataUtil.getClientName(clientNamePrefix, topic, partitionId), topic, + partitionId, startOffset, endOffset, messages); + } + + /** + * get A map of partition id to SimpleConsumer + * + * @param clientNamePrefix + * @param brokerSet + * @param topic + * @return A map of partition id to SimpleConsumer + */ + public static Map<Integer, SimpleConsumer> createSimpleConsumers(String clientNamePrefix, Set<String> brokerSet, + String topic) + { + return createSimpleConsumers(clientNamePrefix, brokerSet, topic, DEFAULT_TIMEOUT); + } + + /** + * get A map of partition id to SimpleConsumer + * + * @param clientNamePrefix + * @param brokerSet + * @param topic + * @param timeOut + * @return A map of partition id to SimpleConsumer + */ + public static Map<Integer, SimpleConsumer> createSimpleConsumers(String clientNamePrefix, Set<String> brokerSet, + String topic, int timeOut) + { + if (clientNamePrefix == null || clientNamePrefix.isEmpty() || brokerSet == null || brokerSet.isEmpty() + || topic == null || topic.isEmpty()) { + throw new IllegalArgumentException( + "clientNamePrefix = " + clientNamePrefix + ", brokerSet = " + brokerSet + ", topic = " + topic); + } + + TopicMetadata tm = KafkaMetadataUtil.getTopicMetadata(brokerSet, topic); + + if (tm == null) { + throw new RuntimeException("Failed to retrieve topic metadata"); + } + + Map<Integer, SimpleConsumer> consumers = Maps.newHashMap(); + for (PartitionMetadata pm : tm.partitionsMetadata()) { + String clientName = KafkaMetadataUtil.getClientName(clientNamePrefix, tm.topic(), pm.partitionId()); + consumers.put(pm.partitionId(), createSimpleConsumer(clientName, tm.topic(), pm)); + } + return consumers; + } + + public static void readMessagesBetween(SimpleConsumer consumer, String clientName, String topic, int partitionId, + long startOffset, long endOffset, List<Pair<byte[], byte[]>> messages) + { + readMessagesBetween(consumer, clientName, topic, partitionId, startOffset, endOffset, messages, 1); + } + + /** + * read messages of a certain partition into messages + * + * @param consumer + * @param clientNamePrefix + * @param topic + * @param partitionId + * @param startOffset + * inclusive + * @param endOffset + * inclusive + * @param messages + * @param tryTimesOnEmptyMessage + * how many times should to try when response message is empty. <=0 + * means try forever. + */ + public static void readMessagesBetween(SimpleConsumer consumer, String clientName, String topic, int partitionId, + long startOffset, long endOffset, List<Pair<byte[], byte[]>> messages, int tryTimesOnEmptyMessage) + { + if (startOffset < 0 || endOffset < 0 || endOffset < startOffset) { + throw new IllegalArgumentException( + "Both offset should not less than zero and endOffset should not less than startOffset. startOffset = " + + startOffset + ", endoffset = " + endOffset); + } + + int readSize = 0; + int wantedSize = (int)(endOffset - startOffset + 1); + + int triedTimesOnEmptyMessage = 0; + while (readSize < wantedSize + && (tryTimesOnEmptyMessage <= 0 || triedTimesOnEmptyMessage < tryTimesOnEmptyMessage)) { + logger.debug("startOffset = {}", startOffset); + FetchRequest req = new FetchRequestBuilder().clientId(clientName) + .addFetch(topic, partitionId, startOffset, DEFAULT_FETCH_SIZE).build(); + + FetchResponse fetchResponse = consumer.fetch(req); + if (fetchResponse.hasError()) { + logger.error( + "Error fetching data Offset Data the Broker. Reason: " + fetchResponse.errorCode(topic, partitionId)); + return; + } + + triedTimesOnEmptyMessage++; + ByteBufferMessageSet messageSet = fetchResponse.messageSet(topic, partitionId); + for (MessageAndOffset messageAndOffset : messageSet) { + long offset = messageAndOffset.offset(); + logger.debug("offset = " + offset); + + if (offset > endOffset || offset < startOffset) { + continue; + } + triedTimesOnEmptyMessage = 0; + startOffset = offset + 1; + ++readSize; + messages.add(kafkaMessageToPair(messageAndOffset.message())); + } + } + } + + /** + * read last message of each partition into lastMessages + * + * @param clientNamePrefix + * @param brokerSet + * @param topic + * @param lastMessages + */ + public static void readLastMessages(String clientNamePrefix, Set<String> brokerSet, String topic, + Map<Integer, Pair<byte[], byte[]>> lastMessages) + { + // read last received kafka message + TopicMetadata tm = KafkaMetadataUtil.getTopicMetadata(brokerSet, topic); + + if (tm == null) { + throw new RuntimeException("Failed to retrieve topic metadata"); + } + + for (PartitionMetadata pm : tm.partitionsMetadata()) { + SimpleConsumer consumer = null; + try { + String clientName = KafkaMetadataUtil.getClientName(clientNamePrefix, tm.topic(), pm.partitionId()); + consumer = createSimpleConsumer(clientName, tm.topic(), pm); + + long readOffset = KafkaMetadataUtil.getLastOffset(consumer, tm.topic(), pm.partitionId(), + kafka.api.OffsetRequest.LatestTime(), clientName); + + FetchRequest req = new FetchRequestBuilder().clientId(clientName) + .addFetch(tm.topic(), pm.partitionId(), readOffset - 1, DEFAULT_FETCH_SIZE).build(); + + FetchResponse fetchResponse = consumer.fetch(req); + if (fetchResponse.hasError()) { + logger.error("Error fetching data Offset Data the Broker. Reason: " + + fetchResponse.errorCode(topic, pm.partitionId())); + return; + } + + for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(tm.topic(), pm.partitionId())) { + lastMessages.put(pm.partitionId(), kafkaMessageToPair(messageAndOffset.message())); + } + } finally { + if (consumer != null) { + consumer.close(); + } + } + + } + } + + public static Pair<byte[], byte[]> readLastMessage(String clientNamePrefix, Set<String> brokerSet, String topic, + int partitionId) + { + // read last received kafka message + TopicMetadata tm = KafkaMetadataUtil.getTopicMetadata(brokerSet, topic); + + if (tm == null) { + throw new RuntimeException("Failed to retrieve topic metadata"); + } + + for (PartitionMetadata pm : tm.partitionsMetadata()) { + SimpleConsumer consumer = null; + try { + if (pm.partitionId() != partitionId) { + continue; + } + + String clientName = KafkaMetadataUtil.getClientName(clientNamePrefix, tm.topic(), pm.partitionId()); + consumer = createSimpleConsumer(clientName, topic, pm); + + long readOffset = KafkaMetadataUtil.getLastOffset(consumer, topic, partitionId, + kafka.api.OffsetRequest.LatestTime(), clientName); + + FetchRequest req = new FetchRequestBuilder().clientId(clientName) + .addFetch(tm.topic(), pm.partitionId(), readOffset - 1, DEFAULT_FETCH_SIZE).build(); + + FetchResponse fetchResponse = consumer.fetch(req); + if (fetchResponse.hasError()) { + logger.error("Error fetching data Offset Data the Broker. Reason: " + + fetchResponse.errorCode(topic, pm.partitionId())); + return null; + } + + for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(topic, partitionId)) { + return kafkaMessageToPair(messageAndOffset.message()); + } + } finally { + if (consumer != null) { + consumer.close(); + } + } + } + return null; + } + + /** + * convert Kafka message to pair + * + * @param m + * @return + */ + public static Pair<byte[], byte[]> kafkaMessageToPair(Message m) + { + ByteBuffer payload = m.payload(); + ByteBuffer key = m.key(); + byte[] keyBytes = null; + if (key != null) { + keyBytes = new byte[key.limit()]; + key.get(keyBytes); + } + + byte[] valueBytes = new byte[payload.limit()]; + payload.get(valueBytes); + return new Pair<byte[], byte[]>(keyBytes, valueBytes); + } + + public static SimpleConsumer createSimpleConsumer(String clientName, String topic, PartitionMetadata pm) + { + return createSimpleConsumer(clientName, topic, pm, DEFAULT_TIMEOUT, DEFAULT_BUFFER_SIZE); + } + + public static SimpleConsumer createSimpleConsumer(String clientName, String topic, PartitionMetadata pm, int timeout, + int bufferSize) + { + String leadBroker = pm.leader().host(); + int port = pm.leader().port(); + return new SimpleConsumer(leadBroker, port, timeout, bufferSize, clientName); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/33a5c2ec/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaTupleUniqueExactlyOnceOutputOperatorTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaTupleUniqueExactlyOnceOutputOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaTupleUniqueExactlyOnceOutputOperatorTest.java new file mode 100644 index 0000000..abdcb01 --- /dev/null +++ b/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaTupleUniqueExactlyOnceOutputOperatorTest.java @@ -0,0 +1,512 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.contrib.kafka; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; + +import org.junit.Assert; +import org.junit.Test; + +import org.apache.hadoop.conf.Configuration; + +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; + +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.InputOperator; +import com.datatorrent.api.LocalMode; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.api.annotation.OutputPortFieldAnnotation; +import com.datatorrent.common.util.BaseOperator; +import com.datatorrent.common.util.Pair; +import com.datatorrent.stram.api.OperatorDeployInfo; + +import kafka.producer.ProducerConfig; +import kafka.serializer.StringDecoder; + +public class KafkaTupleUniqueExactlyOnceOutputOperatorTest extends KafkaOperatorTestBase +{ + public static final int TUPLE_NUM_IN_ONE_WINDOW = 10; + public static final String topic1 = "OperatorTest1"; + public static final String controlTopic1 = "ControlTopic1"; + + public static final String topic2 = "OperatorTest2"; + public static final String controlTopic2 = "ControlTopic2"; + + public static final String topic3 = "OperatorTest3"; + public static final String controlTopic3 = "ControlTopic3"; + + public static class TupleUniqueExactlyOnceKafkaOutputTestOperator + extends AbstractTupleUniqueExactlyOnceKafkaOutputOperator<Integer, String, String> + { + protected transient StringDecoder decoder = null; + + @Override + public void setup(OperatorContext context) + { + decoder = new StringDecoder(null); + super.setup(context); + } + + @Override + protected Pair<String, String> tupleToKeyValue(Integer tuple) + { + return new Pair<>(String.valueOf(tuple % 2), String.valueOf(tuple)); + } + + @Override + protected <T> boolean equals(byte[] bytes, T value) + { + if (bytes == null && value == null) { + return true; + } + if (value == null) { + return false; + } + return value.equals(decoder.fromBytes(bytes)); + } + + } + + protected void createTopic(String topicName) + { + createTopic(0, topicName); + if (hasMultiCluster) { + createTopic(1, topicName); + } + } + + protected ProducerConfig createKafkaControlProducerConfig() + { + return new ProducerConfig(this.getKafkaProperties()); + } + + /** + * This test case there are only one operator partition, and the order of data + * changed when recovery. + */ + @Test + public void testOutOfOrder() + { + OperatorDeployInfo context = new OperatorDeployInfo(); + context.id = 1; + int[] expectedTuple = new int[(int)(TUPLE_NUM_IN_ONE_WINDOW * 3)]; + int tupleIndex = 0; + long windowId = 0; + { + //create required topics + createTopic(topic1); + createTopic(controlTopic1); + + TupleUniqueExactlyOnceKafkaOutputTestOperator operator = createOperator(topic1, controlTopic1, 1); + + int i = 0; + for (int windowCount = 0; windowCount < 2; ++windowCount) { + operator.beginWindow(windowId++); + + for (; i < TUPLE_NUM_IN_ONE_WINDOW * (windowCount + 1); ++i) { + operator.processTuple(i); + expectedTuple[tupleIndex++] = i; + } + waitMills(500); + operator.endWindow(); + } + + //last window, the crash window + operator.beginWindow(windowId++); + for (; i < TUPLE_NUM_IN_ONE_WINDOW * 2.5; ++i) { + operator.processTuple(i); + expectedTuple[tupleIndex++] = i; + } + + //crashed now. + } + + //let kafka message send to server + waitMills(1000); + + { + //recovery + TupleUniqueExactlyOnceKafkaOutputTestOperator operator = new TupleUniqueExactlyOnceKafkaOutputTestOperator(); + operator.setTopic(topic1); + operator.setControlTopic(controlTopic1); + operator.setConfigProperties(getKafkaProperties()); + + operator.setup(context); + + //assume replay start with 2nd window, but different order + int i = TUPLE_NUM_IN_ONE_WINDOW; + + windowId = 1; + operator.beginWindow(windowId++); + for (; i < TUPLE_NUM_IN_ONE_WINDOW * 2; i += 2) { + operator.processTuple(i); + } + i = TUPLE_NUM_IN_ONE_WINDOW + 1; + for (; i < TUPLE_NUM_IN_ONE_WINDOW * 2; i += 2) { + operator.processTuple(i); + } + waitMills(500); + operator.endWindow(); + + //3rd window, in different order + operator.beginWindow(windowId++); + i = TUPLE_NUM_IN_ONE_WINDOW * 2; + for (; i < TUPLE_NUM_IN_ONE_WINDOW * 3; i += 2) { + operator.processTuple(i); + if (i >= TUPLE_NUM_IN_ONE_WINDOW * 2.5) { + expectedTuple[tupleIndex++] = i; + } + } + + i = TUPLE_NUM_IN_ONE_WINDOW * 2 + 1; + for (; i < TUPLE_NUM_IN_ONE_WINDOW * 3; i += 2) { + operator.processTuple(i); + if (i >= TUPLE_NUM_IN_ONE_WINDOW * 2.5) { + expectedTuple[tupleIndex++] = i; + } + } + } + + int[] actualTuples = readTuplesFromKafka(topic1); + Assert.assertArrayEquals(expectedTuple, actualTuples); + } + + protected TupleUniqueExactlyOnceKafkaOutputTestOperator createOperator(String topic, String controlTopic, int id) + { + TupleUniqueExactlyOnceKafkaOutputTestOperator operator = new TupleUniqueExactlyOnceKafkaOutputTestOperator(); + operator.setTopic(topic); + operator.setControlTopic(controlTopic); + + operator.setConfigProperties(getKafkaProperties()); + OperatorDeployInfo context = new OperatorDeployInfo(); + context.id = id; + operator.setup(context); + + return operator; + } + + /** + * This test case test the case the tuple go to other operator partition when + * recovery. + */ + @Test + public void testDifferentPartition() + { + //hasMultiPartition = true; + + int[] expectedTuples = new int[(int)(TUPLE_NUM_IN_ONE_WINDOW * 6)]; + int tupleIndex = 0; + long windowId1 = 0; + long windowId2 = 0; + + //create required topics + createTopic(topic2); + createTopic(controlTopic2); + + { + TupleUniqueExactlyOnceKafkaOutputTestOperator operator1 = createOperator(topic2, controlTopic2, 1); + TupleUniqueExactlyOnceKafkaOutputTestOperator operator2 = createOperator(topic2, controlTopic2, 2); + TupleUniqueExactlyOnceKafkaOutputTestOperator[] operators = new TupleUniqueExactlyOnceKafkaOutputTestOperator[] { + operator1, operator2 }; + + //send as round robin + int i = 0; + for (int windowCount = 0; windowCount < 2; ++windowCount) { + operator1.beginWindow(windowId1++); + operator2.beginWindow(windowId2++); + + for (; i < TUPLE_NUM_IN_ONE_WINDOW * (windowCount + 1) * 2; ++i) { + operators[i % 2].processTuple(i); + expectedTuples[tupleIndex++] = i; + } + waitMills(500); + operator1.endWindow(); + operator2.endWindow(); + } + + //last window, the crash window + operator1.beginWindow(windowId1++); + operator2.beginWindow(windowId2++); + for (; i < TUPLE_NUM_IN_ONE_WINDOW * 2.5 * 2; ++i) { + operators[i % 2].processTuple(i); + expectedTuples[tupleIndex++] = i; + } + + //crashed now. + } + + //let kafka message send to server + waitMills(1000); + int lastTuple = tupleIndex - 1; + { + //recovery + TupleUniqueExactlyOnceKafkaOutputTestOperator operator1 = createOperator(topic2, controlTopic2, 1); + TupleUniqueExactlyOnceKafkaOutputTestOperator operator2 = createOperator(topic2, controlTopic2, 2); + //tuple go to different partition + TupleUniqueExactlyOnceKafkaOutputTestOperator[] operators = new TupleUniqueExactlyOnceKafkaOutputTestOperator[] { + operator2, operator1 }; + + //assume replay start with 2nd window, but different order + int i = TUPLE_NUM_IN_ONE_WINDOW * 2; + + windowId1 = 1; + windowId2 = 1; + + //window id: 1, 2 + for (int windowCount = 0; windowCount < 2; ++windowCount) { + operator1.beginWindow(windowId1++); + operator2.beginWindow(windowId2++); + + for (; i < TUPLE_NUM_IN_ONE_WINDOW * (windowCount + 2) * 2; ++i) { + operators[i % 2].processTuple(i); + if (i > lastTuple) { + expectedTuples[tupleIndex++] = i; + } + } + waitMills(500); + operator1.endWindow(); + operator2.endWindow(); + } + } + + int[] actualTuples = readTuplesFromKafka(topic2); + Arrays.sort(actualTuples); + Arrays.sort(expectedTuples); + + assertArrayEqualsWithDetailInfo(expectedTuples, actualTuples); + } + + /** + * This test case test only one operator partition crash, while the other + * operator partition keep on write data to the same Kafka partition. + */ + @Test + public void testOnePartitionCrash() + { + + int[] expectedTuples = new int[(int)(TUPLE_NUM_IN_ONE_WINDOW * 6)]; + int tupleIndex = 0; + long windowId1 = 0; + long windowId2 = 0; + + //create required topics + createTopic(topic3); + createTopic(controlTopic3); + + { + TupleUniqueExactlyOnceKafkaOutputTestOperator operator1 = createOperator(topic3, controlTopic3, 1); + TupleUniqueExactlyOnceKafkaOutputTestOperator operator2 = createOperator(topic3, controlTopic3, 2); + TupleUniqueExactlyOnceKafkaOutputTestOperator[] operators = new TupleUniqueExactlyOnceKafkaOutputTestOperator[] { + operator1, operator2 }; + + //send as round robin + int i = 0; + for (int windowCount = 0; windowCount < 2; ++windowCount) { + operator1.beginWindow(windowId1++); + operator2.beginWindow(windowId2++); + + for (; i < TUPLE_NUM_IN_ONE_WINDOW * (windowCount + 1) * 2; ++i) { + operators[i % 2].processTuple(i); + expectedTuples[tupleIndex++] = i; + } + waitMills(500); + operator1.endWindow(); + operator2.endWindow(); + } + + //operator1 crash, while operator2 alive + operator1.beginWindow(windowId1++); + //operator1 handle even number; + for (; i < TUPLE_NUM_IN_ONE_WINDOW * 2.5 * 2; i += 2) { + operators[i % 2].processTuple(i); + expectedTuples[tupleIndex++] = i; + } + + //operator1 crashed now. + + //operator2 still alive, operator2 handle odd number + operator2.beginWindow(windowId2++); + i = TUPLE_NUM_IN_ONE_WINDOW * 4 + 1; + for (; i < TUPLE_NUM_IN_ONE_WINDOW * 3 * 2; i += 2) { + operator2.processTuple(i); + expectedTuples[tupleIndex++] = i; + } + operator2.endWindow(); + + } + + //let kafka message send to server + waitMills(1000); + + //operator1 recover from second window + int lastTuple = (int)(TUPLE_NUM_IN_ONE_WINDOW * 2.5 * 2) - 1; + { + //recovery + TupleUniqueExactlyOnceKafkaOutputTestOperator operator1 = createOperator(topic3, controlTopic3, 1); + + //assume replay start with 2nd window, same order + int i = TUPLE_NUM_IN_ONE_WINDOW * 2; + + windowId1 = 1; + + //window id: 1, 2 + for (int windowCount = 0; windowCount < 2; ++windowCount) { + operator1.beginWindow(windowId1++); + + for (; i < TUPLE_NUM_IN_ONE_WINDOW * (windowCount + 2) * 2; i += 2) { + operator1.processTuple(i); + if (i > lastTuple) { + expectedTuples[tupleIndex++] = i; + } + } + waitMills(500); + operator1.endWindow(); + } + } + + int[] actualTuples = readTuplesFromKafka(topic3); + Arrays.sort(actualTuples); + Arrays.sort(expectedTuples); + + assertArrayEqualsWithDetailInfo(expectedTuples, actualTuples); + } + + /** + * Test the application which using TupleUniqueExactlyOnceKafkaOutputTestOperator is launchalbe in local mode + */ + @Test + public void testLaunchApp() throws Exception + { + Configuration conf = new Configuration(false); + LocalMode lma = LocalMode.newInstance(); + DAG dag = lma.getDAG(); + + TupleGenerateOperator generateOperator = new TupleGenerateOperator(); + dag.addOperator("GenerateOperator", generateOperator); + + TupleUniqueExactlyOnceKafkaOutputTestOperator testOperator = new TupleUniqueExactlyOnceKafkaOutputTestOperator(); + dag.addOperator("TestOperator", testOperator); + + dag.addStream("stream", generateOperator.outputPort, testOperator.inputPort); + + StreamingApplication app = new StreamingApplication() + { + @Override + public void populateDAG(DAG dag, Configuration conf) + { + } + }; + + lma.prepareDAG(app, conf); + + // Create local cluster + final LocalMode.Controller lc = lma.getController(); + lc.run(5000); + + lc.shutdown(); + } + + public static void assertArrayEqualsWithDetailInfo(int[] expectedTuples, int[] actualTuples) + { + Assert.assertTrue("Length incorrect. expected " + expectedTuples.length + "; actual " + actualTuples.length, + actualTuples.length == expectedTuples.length); + for (int i = 0; i < actualTuples.length; ++i) { + Assert.assertEquals("Not equal. index=" + i + ", expected=" + expectedTuples[i] + ", actual=" + actualTuples[i], + actualTuples[i], expectedTuples[i]); + } + } + + public void waitMills(long millis) + { + try { + Thread.sleep(millis); + } catch (Exception e) { + //ignore + } + } + + public int[] readTuplesFromKafka(String topic) + { + StringDecoder decoder = new StringDecoder(null); + Map<Integer, Long> partitionToStartOffset = Maps.newHashMap(); + partitionToStartOffset.put(0, 0L); + + this.waitMills(1000); + + Map<Integer, List<Pair<byte[], byte[]>>> partitionToMessages = Maps.newHashMap(); + KafkaUtil.readMessagesAfterOffsetTo("TestOperator", getBrokerSet(), topic, partitionToStartOffset, + partitionToMessages); + + List<Pair<byte[], byte[]>> msgList = partitionToMessages.get(0); + int[] values = new int[msgList.size()]; + int index = 0; + for (Pair<byte[], byte[]> msg : msgList) { + values[index++] = Integer.valueOf(decoder.fromBytes(msg.second)); + } + return values; + } + + + protected Set<String> getBrokerSet() + { + return Sets.newHashSet((String)getKafkaProperties().get(KafkaMetadataUtil.PRODUCER_PROP_BROKERLIST)); + } + + public Properties getKafkaProperties() + { + Properties props = new Properties(); + props.setProperty("serializer.class", "kafka.serializer.StringEncoder"); + //props.setProperty("serializer.class", "kafka.serializer.DefaultEncoder"); + //props.setProperty("key.serializer.class", "kafka.serializer.StringEncoder"); + props.put("metadata.broker.list", "localhost:9092"); + //props.setProperty("producer.type", "sync"); + props.setProperty("producer.type", "async"); + props.setProperty("queue.buffering.max.ms", "100"); + props.setProperty("queue.buffering.max.messages", "5"); + props.setProperty("batch.num.messages", "5"); + return props; + } + + + public static class TupleGenerateOperator extends BaseOperator implements InputOperator + { + @OutputPortFieldAnnotation(optional = true) + public final transient DefaultOutputPort<Integer> outputPort = new DefaultOutputPort<>(); + protected int value = 0; + + @Override + public void emitTuples() + { + if (!outputPort.isConnected()) { + return; + } + + for (int i = 0; i < 100; ++i) { + outputPort.emit(++value); + } + } + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/33a5c2ec/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaUtilTester.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaUtilTester.java b/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaUtilTester.java new file mode 100644 index 0000000..c27803d --- /dev/null +++ b/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaUtilTester.java @@ -0,0 +1,128 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.contrib.kafka; + +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; + +import org.junit.Assert; +import org.junit.Test; + +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; + +import com.datatorrent.common.util.Pair; + +import kafka.javaapi.producer.Producer; +import kafka.producer.KeyedMessage; +import kafka.producer.ProducerConfig; + +public class KafkaUtilTester extends KafkaOperatorTestBase +{ + public static final String topic = "UtilTestTopic"; + public static final String clientNamePrefix = "UtilTestClient"; + public static final int DATA_SIZE = 50; + + protected Producer<String, String> producer; + private transient Set<String> brokerSet; + + public void beforeTest() + { + //Got exception when using multiple partition. + //java.io.FileNotFoundException: target/kafka-server-data/1/1/replication-offset-checkpoint (No such file or directory) + //hasMultiPartition = true; + + super.beforeTest(); + createTopic(topic); + + producer = new Producer<String, String>(createKafkaProducerConfig()); + getBrokerSet(); + + sendData(); + } + + public void sendData() + { + for (int i = 0; i < DATA_SIZE; ++i) { + producer.send(new KeyedMessage<String, String>(topic, null, "message " + i)); + } + + waitMills(1000); + } + + @Test + public void testReadMessagesAfterOffsetTo() + { + Map<Integer, Long> partitionToStartOffset = Maps.newHashMap(); + partitionToStartOffset.put(1, 0L); + Map<Integer, List<Pair<byte[], byte[]>>> partitionToMessages = Maps.newHashMap(); + KafkaUtil.readMessagesAfterOffsetTo(clientNamePrefix, brokerSet, topic, partitionToStartOffset, + partitionToMessages); + final int dataSize = partitionToMessages.entrySet().iterator().next().getValue().size(); + Assert.assertTrue("data size is: " + dataSize, dataSize == DATA_SIZE); + } + + public void waitMills(long millis) + { + try { + Thread.sleep(millis); + } catch (Exception e) { + //ignore + } + } + + protected void createTopic(String topicName) + { + createTopic(0, topicName); + if (hasMultiCluster) { + createTopic(1, topicName); + } + } + + protected Properties getConfigProperties() + { + Properties props = new Properties(); + props.setProperty("serializer.class", "kafka.serializer.StringEncoder"); + //props.setProperty("serializer.class", "kafka.serializer.DefaultEncoder"); + //props.setProperty("key.serializer.class", "kafka.serializer.StringEncoder"); + props.put("metadata.broker.list", "localhost:9092"); + //props.setProperty("producer.type", "sync"); + props.setProperty("producer.type", "async"); + props.setProperty("queue.buffering.max.ms", "10"); + props.setProperty("queue.buffering.max.messages", "10"); + props.setProperty("batch.num.messages", "5"); + + return props; + } + + protected ProducerConfig createKafkaProducerConfig() + { + return new ProducerConfig(getConfigProperties()); + } + + protected Set<String> getBrokerSet() + { + if (brokerSet == null) { + brokerSet = Sets.newHashSet((String)getConfigProperties().get(KafkaMetadataUtil.PRODUCER_PROP_BROKERLIST)); + } + return brokerSet; + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/33a5c2ec/contrib/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/contrib/src/test/resources/log4j.properties b/contrib/src/test/resources/log4j.properties index 2fcbe38..cfc50cf 100644 --- a/contrib/src/test/resources/log4j.properties +++ b/contrib/src/test/resources/log4j.properties @@ -39,3 +39,4 @@ log4j.logger.org=info #log4j.logger.org.apache.commons.beanutils=warn log4j.logger.com.datatorrent=debug log4j.logger.org.apache.apex=debug +log4j.logger.kafka=info
