Repository: apex-malhar Updated Branches: refs/heads/master 4cbbb7507 -> 623b803f5
fixing all checkstyle violations, deleting maxAllowedViolations config in pom Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/623b803f Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/623b803f Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/623b803f Branch: refs/heads/master Commit: 623b803f5b960b3e33054e2b60e8c156b7b0150a Parents: 4cbbb75 Author: Apex Dev <[email protected]> Authored: Wed Jan 18 17:14:32 2017 -0800 Committer: Oliver W <[email protected]> Committed: Fri Jan 20 11:48:58 2017 -0800 ---------------------------------------------------------------------- kafka/pom.xml | 8 --- .../kafka/AbstractKafkaInputOperator.java | 48 +++++++-------- .../kafka/AbstractKafkaOutputOperator.java | 4 +- .../malhar/kafka/AbstractKafkaPartitioner.java | 32 +++++----- .../apex/malhar/kafka/KafkaConsumerWrapper.java | 13 ++--- .../apex/malhar/kafka/KafkaPartition.java | 4 -- ...afkaSinglePortExactlyOnceOutputOperator.java | 61 ++++++++++---------- .../kafka/KafkaSinglePortOutputOperator.java | 11 ++-- .../apex/malhar/kafka/OneToManyPartitioner.java | 6 +- .../apex/malhar/kafka/OneToOnePartitioner.java | 3 +- .../apex/malhar/kafka/PartitionStrategy.java | 3 +- .../apache/apex/malhar/kafka/EmbeddedKafka.java | 7 ++- .../kafka/KafkaConsumerPropertiesTest.java | 10 ++-- .../apache/apex/malhar/kafka/KafkaHelper.java | 3 +- .../malhar/kafka/KafkaInputOperatorTest.java | 50 ++++++++-------- .../malhar/kafka/KafkaOperatorTestBase.java | 25 ++++---- .../malhar/kafka/KafkaOutputOperatorTest.java | 35 ++++++----- .../apex/malhar/kafka/KafkaTestPartitioner.java | 8 ++- .../apex/malhar/kafka/KafkaTestProducer.java | 20 ++++--- 19 files changed, 182 insertions(+), 169 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/623b803f/kafka/pom.xml ---------------------------------------------------------------------- diff --git a/kafka/pom.xml b/kafka/pom.xml index 590fa09..654c62c 100755 --- a/kafka/pom.xml +++ b/kafka/pom.xml @@ -177,14 +177,6 @@ </execution> </executions> </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-checkstyle-plugin</artifactId> - <configuration> - <maxAllowedViolations>35</maxAllowedViolations> - <logViolationsToConsole>${checkstyle.console}</logViolationsToConsole> - </configuration> - </plugin> <plugin> <artifactId>maven-surefire-plugin</artifactId> <configuration> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/623b803f/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java ---------------------------------------------------------------------- diff --git a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java index 6fc7693..1d05580 100644 --- a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java +++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java @@ -65,7 +65,7 @@ import com.datatorrent.netlet.util.DTThrowable; * * <ol> * <li>Out-of-box One-to-one and one-to-many partition strategy support plus customizable partition strategy - * refer to AbstractKafkaPartitioner </li> + * refer to AbstractKafkaPartitioner </li> * <li>Fault-tolerant when the input operator goes down, it redeploys on other node</li> * <li>At-least-once semantics for operator failure (no matter which operator fails)</li> * <li>At-least-once semantics for cold restart (no data loss even if you restart the application)</li> @@ -77,7 +77,9 @@ import com.datatorrent.netlet.util.DTThrowable; * @since 3.3.0 */ @InterfaceStability.Evolving -public abstract class AbstractKafkaInputOperator implements InputOperator, Operator.ActivationListener<Context.OperatorContext>, Operator.CheckpointNotificationListener, Partitioner<AbstractKafkaInputOperator>, StatsListener, OffsetCommitCallback +public abstract class AbstractKafkaInputOperator implements InputOperator, + Operator.ActivationListener<Context.OperatorContext>, Operator.CheckpointNotificationListener, + Partitioner<AbstractKafkaInputOperator>, StatsListener, OffsetCommitCallback { private static final Logger logger = LoggerFactory.getLogger(AbstractKafkaInputOperator.class); @@ -92,7 +94,8 @@ public abstract class AbstractKafkaInputOperator implements InputOperator, Opera { EARLIEST, // consume from beginning of the partition every time when application restart LATEST, // consume from latest of the partition every time when application restart - APPLICATION_OR_EARLIEST, // consume from committed position from last run or earliest if there is no committed offset(s) + // consume from committed position from last run or earliest if there is no committed offset(s) + APPLICATION_OR_EARLIEST, APPLICATION_OR_LATEST // consume from committed position from last run or latest if there is no committed offset(s) } @@ -103,7 +106,7 @@ public abstract class AbstractKafkaInputOperator implements InputOperator, Opera private String[] topics; /** - * offset track for checkpoint + * offset track for checkpoint */ private final Map<AbstractKafkaPartitioner.PartitionMeta, Long> offsetTrack = new HashMap<>(); @@ -148,6 +151,7 @@ public abstract class AbstractKafkaInputOperator implements InputOperator, Opera /** * By default the strategy is one to one + * * @see PartitionStrategy */ private PartitionStrategy strategy = PartitionStrategy.ONE_TO_ONE; @@ -161,7 +165,8 @@ public abstract class AbstractKafkaInputOperator implements InputOperator, Opera /** * store offsets with window id, only keep offsets with windows that have not been committed */ - private final transient List<Pair<Long, Map<AbstractKafkaPartitioner.PartitionMeta, Long>>> offsetHistory = new LinkedList<>(); + private final transient List<Pair<Long, Map<AbstractKafkaPartitioner.PartitionMeta, Long>>> offsetHistory = + new LinkedList<>(); /** * Application name is used as group.id for kafka consumer @@ -212,7 +217,8 @@ public abstract class AbstractKafkaInputOperator implements InputOperator, Opera return; } //ask kafka consumer wrapper to store the committed offsets - for (Iterator<Pair<Long, Map<AbstractKafkaPartitioner.PartitionMeta, Long>>> iter = offsetHistory.iterator(); iter.hasNext(); ) { + for (Iterator<Pair<Long, Map<AbstractKafkaPartitioner.PartitionMeta, Long>>> iter = + offsetHistory.iterator(); iter.hasNext(); ) { Pair<Long, Map<AbstractKafkaPartitioner.PartitionMeta, Long>> item = iter.next(); if (item.getLeft() <= windowId) { if (item.getLeft() == windowId) { @@ -302,8 +308,6 @@ public abstract class AbstractKafkaInputOperator implements InputOperator, Opera } } - - @Override public void setup(Context.OperatorContext context) { @@ -314,7 +318,6 @@ public abstract class AbstractKafkaInputOperator implements InputOperator, Opera operatorId = context.getId(); } - @Override public void teardown() { @@ -382,8 +385,8 @@ public abstract class AbstractKafkaInputOperator implements InputOperator, Opera } /** - * * A callback from consumer after it commits the offset + * * @param map * @param e */ @@ -435,9 +438,9 @@ public abstract class AbstractKafkaInputOperator implements InputOperator, Opera } /** - * Same setting as bootstrap.servers property to KafkaConsumer - * refer to http://kafka.apache.org/documentation.html#newconsumerconfigs - * To support multi cluster, you can have multiple bootstrap.servers separated by ";" + * Same setting as bootstrap.servers property to KafkaConsumer + * refer to http://kafka.apache.org/documentation.html#newconsumerconfigs + * To support multi cluster, you can have multiple bootstrap.servers separated by ";" */ public String getClusters() { @@ -474,13 +477,13 @@ public abstract class AbstractKafkaInputOperator implements InputOperator, Opera } /** - * Initial offset, it should be one of the following - * <ul> - * <li>earliest</li> - * <li>latest</li> - * <li>application_or_earliest</li> - * <li>application_or_latest</li> - * </ul> + * Initial offset, it should be one of the following + * <ul> + * <li>earliest</li> + * <li>latest</li> + * <li>application_or_earliest</li> + * <li>application_or_latest</li> + * </ul> */ public String getInitialOffset() { @@ -512,7 +515,6 @@ public abstract class AbstractKafkaInputOperator implements InputOperator, Opera * <li>key.deserializer</li> * <li>value.deserializer</li> * </ul> - * */ public Properties getConsumerProps() { @@ -534,7 +536,7 @@ public abstract class AbstractKafkaInputOperator implements InputOperator, Opera /** * @see <a href="http://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#poll(long)"> - * org.apache.kafka.clients.consumer.KafkaConsumer.poll</a> + * org.apache.kafka.clients.consumer.KafkaConsumer.poll</a> */ public long getConsumerTimeout() { @@ -610,8 +612,8 @@ public abstract class AbstractKafkaInputOperator implements InputOperator, Opera } /** - * @omitFromUI * @return current checkpointed offsets + * @omitFromUI */ public Map<AbstractKafkaPartitioner.PartitionMeta, Long> getOffsetTrack() { http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/623b803f/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaOutputOperator.java ---------------------------------------------------------------------- diff --git a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaOutputOperator.java b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaOutputOperator.java index f38ead9..0e16fe1 100644 --- a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaOutputOperator.java +++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaOutputOperator.java @@ -20,10 +20,12 @@ package org.apache.apex.malhar.kafka; import java.util.Properties; + import javax.validation.constraints.NotNull; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; + import com.datatorrent.api.Context; import com.datatorrent.api.Operator; @@ -99,7 +101,7 @@ public abstract class AbstractKafkaOutputOperator<K, V> implements Operator */ public void setProperty(Object key, Object val) { - properties.put(key,val); + properties.put(key, val); } public String getTopic() http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/623b803f/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java ---------------------------------------------------------------------- diff --git a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java index c9b40be..791972f 100644 --- a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java +++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java @@ -47,8 +47,6 @@ import com.datatorrent.api.Partitioner; import com.datatorrent.api.StatsListener; import com.datatorrent.lib.util.KryoCloneUtils; -import kafka.common.AuthorizationException; - /** * Abstract partitioner used to manage the partitions of kafka input operator. * It use a number of kafka consumers(one for each cluster) to get the latest partition metadata for topics that @@ -74,8 +72,8 @@ public abstract class AbstractKafkaPartitioner implements Partitioner<AbstractKa private ArrayList<KafkaConsumer<byte[], byte[]>> metadataRefreshClients; - - private final List<Set<AbstractKafkaPartitioner.PartitionMeta>> currentPartitions = new LinkedList<>(); // prevent null + // prevent null + private final List<Set<AbstractKafkaPartitioner.PartitionMeta>> currentPartitions = new LinkedList<>(); public AbstractKafkaPartitioner(String[] clusters, String[] topics, AbstractKafkaInputOperator prototypeOperator) { @@ -84,12 +82,11 @@ public abstract class AbstractKafkaPartitioner implements Partitioner<AbstractKa this.prototypeOperator = prototypeOperator; } - abstract List<Set<PartitionMeta>> assign(Map<String, Map<String,List<PartitionInfo>>> metadata); - - + abstract List<Set<PartitionMeta>> assign(Map<String, Map<String, List<PartitionInfo>>> metadata); @Override - public Collection<Partition<AbstractKafkaInputOperator>> definePartitions(Collection<Partition<AbstractKafkaInputOperator>> collection, PartitioningContext partitioningContext) + public Collection<Partition<AbstractKafkaInputOperator>> definePartitions( + Collection<Partition<AbstractKafkaInputOperator>> collection, PartitioningContext partitioningContext) { initMetadataClients(); @@ -127,7 +124,8 @@ public abstract class AbstractKafkaPartitioner implements Partitioner<AbstractKa } //end while if (tryTime == 0) { - throw new RuntimeException("Get partition info for topic completely failed. Please check the log file. topic name: " + topic); + throw new RuntimeException( + "Get partition info for topic completely failed. Please check the log file. topic name: " + topic); } } } @@ -143,7 +141,6 @@ public abstract class AbstractKafkaPartitioner implements Partitioner<AbstractKa e.printStackTrace(); } - if (currentPartitions == parts || currentPartitions.equals(parts)) { logger.debug("No partition change found"); return collection; @@ -153,7 +150,7 @@ public abstract class AbstractKafkaPartitioner implements Partitioner<AbstractKa currentPartitions.addAll(parts); int i = 0; List<Partition<AbstractKafkaInputOperator>> result = new LinkedList<>(); - for (Iterator<Partition<AbstractKafkaInputOperator>> iter = collection.iterator(); iter.hasNext();) { + for (Iterator<Partition<AbstractKafkaInputOperator>> iter = collection.iterator(); iter.hasNext(); ) { Partition<AbstractKafkaInputOperator> nextPartition = iter.next(); if (parts.remove(nextPartition.getPartitionedInstance().assignment())) { if (logger.isInfoEnabled()) { @@ -186,7 +183,6 @@ public abstract class AbstractKafkaPartitioner implements Partitioner<AbstractKa metadataRefreshClients = null; } - @Override public void partitioned(Map<Integer, Partition<AbstractKafkaInputOperator>> map) { @@ -201,12 +197,15 @@ public abstract class AbstractKafkaPartitioner implements Partitioner<AbstractKa return response; } - protected Partitioner.Partition<AbstractKafkaInputOperator> createPartition(Set<AbstractKafkaPartitioner.PartitionMeta> partitionAssignment) + protected Partitioner.Partition<AbstractKafkaInputOperator> createPartition( + Set<AbstractKafkaPartitioner.PartitionMeta> partitionAssignment) { - Partitioner.Partition<AbstractKafkaInputOperator> p = new DefaultPartition<AbstractKafkaInputOperator>(KryoCloneUtils.cloneObject(prototypeOperator)); + Partitioner.Partition<AbstractKafkaInputOperator> p = + new DefaultPartition<AbstractKafkaInputOperator>(KryoCloneUtils.cloneObject(prototypeOperator)); p.getPartitionedInstance().assign(partitionAssignment); return p; } + /** * */ @@ -243,12 +242,13 @@ public abstract class AbstractKafkaPartitioner implements Partitioner<AbstractKa * @param prop * @return String */ - private String getPropertyAsString(Properties prop) { + private String getPropertyAsString(Properties prop) + { StringWriter writer = new StringWriter(); try { prop.store(writer, ""); } catch (IOException e) { - logger.error("Cannot retrieve consumer properties for Logging : {}", e.getMessage() ); + logger.error("Cannot retrieve consumer properties for Logging : {}", e.getMessage()); } return writer.getBuffer().toString(); } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/623b803f/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaConsumerWrapper.java ---------------------------------------------------------------------- diff --git a/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaConsumerWrapper.java b/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaConsumerWrapper.java index fa4856e..2b3c762 100644 --- a/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaConsumerWrapper.java +++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaConsumerWrapper.java @@ -133,7 +133,8 @@ public class KafkaConsumerWrapper implements Closeable try { kc.position(tp); } catch (NoOffsetForPartitionException e) { - //the poll() method of a consumer will throw exception if any of subscribed consumers not initialized with position + //the poll() method of a consumer will throw exception + // if any of subscribed consumers not initialized with position handleNoOffsetForPartitionException(e, kc); } kc.pause(tp); @@ -145,7 +146,7 @@ public class KafkaConsumerWrapper implements Closeable while (windowCount > 0) { try { ConsumerRecords<byte[], byte[]> records = kc.poll(ownerOperator.getConsumerTimeout()); - for (Iterator<ConsumerRecord<byte[], byte[]>> cri = records.iterator(); cri.hasNext() && windowCount > 0;) { + for (Iterator<ConsumerRecord<byte[], byte[]>> cri = records.iterator(); cri.hasNext() && windowCount > 0; ) { ownerOperator.emitTuple(meta.getCluster(), cri.next()); windowCount--; } @@ -194,7 +195,6 @@ public class KafkaConsumerWrapper implements Closeable { try { - while (wrapper.isAlive.get()) { if (wrapper.waitForReplay) { Thread.sleep(100); @@ -229,7 +229,8 @@ public class KafkaConsumerWrapper implements Closeable } } - protected void handleNoOffsetForPartitionException(NoOffsetForPartitionException e, KafkaConsumer<byte[], byte[]> consumer) + protected void handleNoOffsetForPartitionException(NoOffsetForPartitionException e, + KafkaConsumer<byte[], byte[]> consumer) { // if initialOffset is set to EARLIST or LATEST // and the application is run as first time @@ -260,7 +261,6 @@ public class KafkaConsumerWrapper implements Closeable } } - /** * This method is called in the activate method of the operator */ @@ -289,7 +289,6 @@ public class KafkaConsumerWrapper implements Closeable Map<AbstractKafkaPartitioner.PartitionMeta, Long> currentOffset = ownerOperator.getOffsetTrack(); - // create one thread for each cluster // each thread use one KafkaConsumer to consume from 1+ partition(s) of 1+ topic(s) for (Map.Entry<String, List<TopicPartition>> e : consumerAssignment.entrySet()) { @@ -334,7 +333,6 @@ public class KafkaConsumerWrapper implements Closeable kafkaConsumerExecutor.submit(new ConsumerThread(e.getKey(), kc, this)); } - } /** @@ -375,7 +373,6 @@ public class KafkaConsumerWrapper implements Closeable holdingBuffer.put(msg); } - @Override public void close() throws IOException { http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/623b803f/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaPartition.java ---------------------------------------------------------------------- diff --git a/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaPartition.java b/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaPartition.java index 1646ffe..a07fe33 100644 --- a/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaPartition.java +++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaPartition.java @@ -18,7 +18,6 @@ */ package org.apache.apex.malhar.kafka; - import java.io.Serializable; import org.apache.hadoop.classification.InterfaceStability; @@ -54,7 +53,6 @@ public class KafkaPartition implements Serializable */ private static final long serialVersionUID = 7556802229202221546L; - private String clusterId; private int partitionId; @@ -141,6 +139,4 @@ public class KafkaPartition implements Serializable return "KafkaPartition [clusterId=" + clusterId + ", partitionId=" + partitionId + ", topic=" + topic + "]"; } - - } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/623b803f/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java ---------------------------------------------------------------------- diff --git a/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java b/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java index ff16610..a8e333f 100644 --- a/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java +++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java @@ -52,47 +52,47 @@ import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZE /** * Kafka output operator with exactly once processing semantics. - *<br> + * <br> * - * <p> + * <p> * <b>Requirements</b> * <li>In the Kafka message, only Value will be available for users</li> * <li>Users need to provide Value deserializers for Kafka message as it is used during recovery</li> - * <li>Value type should have well defined Equals & HashCodes, as during messages are stored in HashMaps for comparison.</li> + * <li>Value type should have well defined Equals & HashCodes, + * as during messages are stored in HashMaps for comparison.</li> * <p> * <b>Recovery handling</b> * <li> Offsets of the Kafka partitions are stored in the WindowDataManager at the endWindow</li> * <li> During recovery, * <ul> - * <li>Partially written Streaming Window before the crash is constructed. ( Explained below ) </li> - * <li>Tuples from the completed Streaming Window's are skipped </li> - * <li>Tuples coming for the partially written Streaming Window are skipped. - * (No assumption is made on the order and the uniqueness of the tuples) </li> - * </ul> - * </li> - *</p> + * <li>Partially written Streaming Window before the crash is constructed. ( Explained below ) </li> + * <li>Tuples from the completed Streaming Window's are skipped </li> + * <li>Tuples coming for the partially written Streaming Window are skipped. + * (No assumption is made on the order and the uniqueness of the tuples) </li> + * </ul> + * </li> + * </p> * * <p> * <b>Partial Window Construction</b> * <li> Operator uses the Key in the Kafka message, which is not available for use by the operator users.</li> * <li> Key is used to uniquely identify the message written by the particular instance of this operator.</li> - * This allows multiple writers to same Kafka partitions. Format of the key is "APPLICATTION_ID#OPERATOR_ID". + * This allows multiple writers to same Kafka partitions. Format of the key is "APPLICATTION_ID#OPERATOR_ID". * <li>During recovery Kafka partitions are read between the latest offset and the last written offsets.</li> - *<li>All the tuples written by the particular instance is kept in the Map</li> - *</p> + * <li>All the tuples written by the particular instance is kept in the Map</li> + * </p> * * <p> * <b>Limitations</b> * <li> Key in the Kafka message is reserved for Operator's use </li> - * <li> During recovery, operator needs to read tuples between 2 offsets, if there are lot of data to be read, Operator may - * appear to be blocked to the Stram and can kill the operator. </li> - *</p> + * <li> During recovery, operator needs to read tuples between 2 offsets, + * if there are lot of data to be read, Operator may + * appear to be blocked to the Stram and can kill the operator. </li> + * </p> * * @displayName Kafka Single Port Exactly Once Output(0.9.0) * @category Messaging * @tags output operator - * - * * @since 3.5.0 */ @org.apache.hadoop.classification.InterfaceStability.Evolving @@ -128,7 +128,8 @@ public class KafkaSinglePortExactlyOnceOutputOperator<T> extends AbstractKafkaOu setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KEY_SERIALIZER); if (getProperties().getProperty(VALUE_DESERIALIZER_CLASS_CONFIG) == null) { - throw new IllegalArgumentException("Value deserializer needs to be set for the operator, as it is used during recovery."); + throw new IllegalArgumentException( + "Value deserializer needs to be set for the operator, as it is used during recovery."); } super.setup(context); @@ -242,19 +243,19 @@ public class KafkaSinglePortExactlyOnceOutputOperator<T> extends AbstractKafkaOu return false; } - private Map<Integer,Long> getPartitionsAndOffsets(boolean latest) throws ExecutionException, InterruptedException + private Map<Integer, Long> getPartitionsAndOffsets(boolean latest) throws ExecutionException, InterruptedException { List<PartitionInfo> partitionInfoList = consumer.partitionsFor(getTopic()); List<TopicPartition> topicPartitionList = new java.util.ArrayList<>(); - for ( PartitionInfo partitionInfo: partitionInfoList) { - topicPartitionList.add(new TopicPartition(getTopic(), partitionInfo.partition()) ); + for (PartitionInfo partitionInfo : partitionInfoList) { + topicPartitionList.add(new TopicPartition(getTopic(), partitionInfo.partition())); } - Map<Integer,Long> parttionsAndOffset = new HashMap<>(); + Map<Integer, Long> parttionsAndOffset = new HashMap<>(); consumer.assign(topicPartitionList); - for (PartitionInfo partitionInfo: partitionInfoList) { + for (PartitionInfo partitionInfo : partitionInfoList) { try { TopicPartition topicPartition = new TopicPartition(getTopic(), partitionInfo.partition()); if (latest) { @@ -275,11 +276,11 @@ public class KafkaSinglePortExactlyOnceOutputOperator<T> extends AbstractKafkaOu { logger.info("Rebuild the partial window after " + windowDataManager.getLargestCompletedWindow()); - Map<Integer,Long> storedOffsets; - Map<Integer,Long> currentOffsets; + Map<Integer, Long> storedOffsets; + Map<Integer, Long> currentOffsets; try { - storedOffsets = (Map<Integer,Long>)this.windowDataManager.retrieve(windowId); + storedOffsets = (Map<Integer, Long>)this.windowDataManager.retrieve(windowId); currentOffsets = getPartitionsAndOffsets(true); } catch (IOException | ExecutionException | InterruptedException e) { throw new RuntimeException(e); @@ -303,13 +304,13 @@ public class KafkaSinglePortExactlyOnceOutputOperator<T> extends AbstractKafkaOu List<TopicPartition> topicPartitions = new ArrayList<>(); - for (Map.Entry<Integer,Long> entry: currentOffsets.entrySet()) { + for (Map.Entry<Integer, Long> entry : currentOffsets.entrySet()) { topicPartitions.add(new TopicPartition(getTopic(), entry.getKey())); } consumer.assign(topicPartitions); - for (Map.Entry<Integer,Long> entry: currentOffsets.entrySet()) { + for (Map.Entry<Integer, Long> entry : currentOffsets.entrySet()) { Long storedOffset = 0L; Integer currentPartition = entry.getKey(); Long currentOffset = entry.getValue(); @@ -390,7 +391,7 @@ public class KafkaSinglePortExactlyOnceOutputOperator<T> extends AbstractKafkaOu return; } - getProducer().send(new ProducerRecord<>(getTopic(), key, tuple),new Callback() + getProducer().send(new ProducerRecord<>(getTopic(), key, tuple), new Callback() { public void onCompletion(RecordMetadata metadata, Exception e) { http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/623b803f/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortOutputOperator.java ---------------------------------------------------------------------- diff --git a/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortOutputOperator.java b/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortOutputOperator.java index 500602c..c47cf3d 100644 --- a/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortOutputOperator.java +++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortOutputOperator.java @@ -20,6 +20,7 @@ package org.apache.apex.malhar.kafka; import org.apache.kafka.clients.producer.ProducerRecord; + import com.datatorrent.api.DefaultInputPort; /** @@ -29,17 +30,17 @@ import com.datatorrent.api.DefaultInputPort; * @since 3.5.0 */ @org.apache.hadoop.classification.InterfaceStability.Evolving -public class KafkaSinglePortOutputOperator<K,V> extends AbstractKafkaOutputOperator +public class KafkaSinglePortOutputOperator<K, V> extends AbstractKafkaOutputOperator { - /** - * This input port receives tuples that will be written out to Kafka. - */ + /** + * This input port receives tuples that will be written out to Kafka. + */ public final transient DefaultInputPort<V> inputPort = new DefaultInputPort<V>() { @Override public void process(V tuple) { - getProducer().send(new ProducerRecord<K,V>(getTopic(),tuple)); + getProducer().send(new ProducerRecord<K, V>(getTopic(), tuple)); } }; } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/623b803f/kafka/src/main/java/org/apache/apex/malhar/kafka/OneToManyPartitioner.java ---------------------------------------------------------------------- diff --git a/kafka/src/main/java/org/apache/apex/malhar/kafka/OneToManyPartitioner.java b/kafka/src/main/java/org/apache/apex/malhar/kafka/OneToManyPartitioner.java index 3b4d3f3..eb0cc40 100644 --- a/kafka/src/main/java/org/apache/apex/malhar/kafka/OneToManyPartitioner.java +++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/OneToManyPartitioner.java @@ -50,7 +50,8 @@ public class OneToManyPartitioner extends AbstractKafkaPartitioner } int partitionCount = prototypeOperator.getInitialPartitionCount(); - ArrayList<Set<PartitionMeta>> eachPartitionAssignment = new ArrayList<>(prototypeOperator.getInitialPartitionCount()); + ArrayList<Set<PartitionMeta>> eachPartitionAssignment = + new ArrayList<>(prototypeOperator.getInitialPartitionCount()); int i = 0; for (Map.Entry<String, Map<String, List<PartitionInfo>>> clusterMap : metadata.entrySet()) { for (Map.Entry<String, List<PartitionInfo>> topicPartition : clusterMap.getValue().entrySet()) { @@ -59,7 +60,8 @@ public class OneToManyPartitioner extends AbstractKafkaPartitioner if (index >= eachPartitionAssignment.size()) { eachPartitionAssignment.add(new HashSet<PartitionMeta>()); } - eachPartitionAssignment.get(index).add(new PartitionMeta(clusterMap.getKey(), topicPartition.getKey(), pif.partition())); + eachPartitionAssignment.get(index).add(new PartitionMeta(clusterMap.getKey(), + topicPartition.getKey(), pif.partition())); } } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/623b803f/kafka/src/main/java/org/apache/apex/malhar/kafka/OneToOnePartitioner.java ---------------------------------------------------------------------- diff --git a/kafka/src/main/java/org/apache/apex/malhar/kafka/OneToOnePartitioner.java b/kafka/src/main/java/org/apache/apex/malhar/kafka/OneToOnePartitioner.java index 570bdea..05faab6 100644 --- a/kafka/src/main/java/org/apache/apex/malhar/kafka/OneToOnePartitioner.java +++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/OneToOnePartitioner.java @@ -50,7 +50,8 @@ public class OneToOnePartitioner extends AbstractKafkaPartitioner for (Map.Entry<String, Map<String, List<PartitionInfo>>> clusterMap : metadata.entrySet()) { for (Map.Entry<String, List<PartitionInfo>> topicPartition : clusterMap.getValue().entrySet()) { for (PartitionInfo pif : topicPartition.getValue()) { - currentAssignment.add(Sets.newHashSet(new PartitionMeta(clusterMap.getKey(), topicPartition.getKey(), pif.partition()))); + currentAssignment.add(Sets.newHashSet(new PartitionMeta(clusterMap.getKey(), + topicPartition.getKey(), pif.partition()))); } } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/623b803f/kafka/src/main/java/org/apache/apex/malhar/kafka/PartitionStrategy.java ---------------------------------------------------------------------- diff --git a/kafka/src/main/java/org/apache/apex/malhar/kafka/PartitionStrategy.java b/kafka/src/main/java/org/apache/apex/malhar/kafka/PartitionStrategy.java index 7c142c5..feafa3b 100644 --- a/kafka/src/main/java/org/apache/apex/malhar/kafka/PartitionStrategy.java +++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/PartitionStrategy.java @@ -31,7 +31,8 @@ public enum PartitionStrategy */ ONE_TO_ONE, /** - * Each operator consumes from several kafka partitions with overall input rate under some certain hard limit in msgs/s or bytes/s + * Each operator consumes from several kafka partitions with overall input rate under + * some certain hard limit in msgs/s or bytes/s * For now it <b>only</b> support <b>simple kafka consumer</b> */ ONE_TO_MANY, http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/623b803f/kafka/src/test/java/org/apache/apex/malhar/kafka/EmbeddedKafka.java ---------------------------------------------------------------------- diff --git a/kafka/src/test/java/org/apache/apex/malhar/kafka/EmbeddedKafka.java b/kafka/src/test/java/org/apache/apex/malhar/kafka/EmbeddedKafka.java index 5ddcb18..e9fcc36 100644 --- a/kafka/src/test/java/org/apache/apex/malhar/kafka/EmbeddedKafka.java +++ b/kafka/src/test/java/org/apache/apex/malhar/kafka/EmbeddedKafka.java @@ -118,7 +118,7 @@ public class EmbeddedKafka { Properties producerProps = new Properties(); producerProps.setProperty("bootstrap.servers", BROKERHOST + ":" + BROKERPORT); - producerProps.setProperty("key.serializer","org.apache.kafka.common.serialization.IntegerSerializer"); + producerProps.setProperty("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer"); producerProps.setProperty("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); try (KafkaProducer<Integer, byte[]> producer = new KafkaProducer<>(producerProps)) { @@ -144,9 +144,10 @@ public class EmbeddedKafka consumerProps.setProperty("bootstrap.servers", BROKERHOST + ":" + BROKERPORT); consumerProps.setProperty("group.id", "group0"); consumerProps.setProperty("client.id", "consumer0"); - consumerProps.setProperty("key.deserializer","org.apache.kafka.common.serialization.IntegerDeserializer"); + consumerProps.setProperty("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer"); consumerProps.setProperty("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"); - consumerProps.put("auto.offset.reset", earliest ? "earliest" : "latest"); // to make sure the consumer starts from the beginning of the topic + // to make sure the consumer starts from the beginning of the topic + consumerProps.put("auto.offset.reset", earliest ? "earliest" : "latest"); KafkaConsumer<Integer, byte[]> consumer = new KafkaConsumer<>(consumerProps); consumer.subscribe(Arrays.asList(topic)); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/623b803f/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaConsumerPropertiesTest.java ---------------------------------------------------------------------- diff --git a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaConsumerPropertiesTest.java b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaConsumerPropertiesTest.java index 83e0de6..81d7ade 100644 --- a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaConsumerPropertiesTest.java +++ b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaConsumerPropertiesTest.java @@ -41,6 +41,7 @@ public class KafkaConsumerPropertiesTest public class Watcher extends TestWatcher { Context.OperatorContext context; + @Override protected void starting(Description description) { @@ -50,8 +51,8 @@ public class KafkaConsumerPropertiesTest kafkaInput.setTopics("apexTest"); kafkaInput.setInitialOffset(AbstractKafkaInputOperator.InitialOffset.EARLIEST.name()); Properties prop = new Properties(); - prop.setProperty("security.protocol","SASL_PLAINTEXT"); - prop.setProperty("sasl.kerberos.service.name","kafka"); + prop.setProperty("security.protocol", "SASL_PLAINTEXT"); + prop.setProperty("sasl.kerberos.service.name", "kafka"); kafkaInput.setConsumerProps(prop); } @@ -71,8 +72,9 @@ public class KafkaConsumerPropertiesTest kafkaInput.definePartitions(null, null); } catch (KafkaException e) { //Ensures the properties of the consumer are set/not reset. - Assert.assertEquals("java.lang.IllegalArgumentException: You must pass java.security.auth.login.config in " + - "secure mode.", e.getCause().getMessage()); + Assert.assertEquals( + "java.lang.IllegalArgumentException: You must pass java.security.auth.login.config in " + + "secure mode.", e.getCause().getMessage()); } } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/623b803f/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaHelper.java ---------------------------------------------------------------------- diff --git a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaHelper.java b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaHelper.java index c550032..abf3a5b 100644 --- a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaHelper.java +++ b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaHelper.java @@ -25,7 +25,8 @@ import java.util.Map; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serializer; -public class KafkaHelper implements Serializer<KafkaOutputOperatorTest.Person>, Deserializer<KafkaOutputOperatorTest.Person> +public class KafkaHelper implements Serializer<KafkaOutputOperatorTest.Person>, + Deserializer<KafkaOutputOperatorTest.Person> { @Override public KafkaOutputOperatorTest.Person deserialize(String s, byte[] bytes) http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/623b803f/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java ---------------------------------------------------------------------- diff --git a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java index 4e97d72..f16c8f4 100644 --- a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java +++ b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java @@ -21,14 +21,10 @@ package org.apache.apex.malhar.kafka; import java.io.File; import java.util.Arrays; import java.util.Collection; -import java.util.Collections; -import java.util.Comparator; import java.util.HashMap; -import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -54,7 +50,8 @@ import com.datatorrent.common.util.BaseOperator; import com.datatorrent.stram.StramLocalCluster; /** - * A bunch of test to verify the input operator will be automatically partitioned per kafka partition This test is launching its + * A bunch of test to verify the input operator will be automatically partitioned + * per kafka partition This test is launching its * own Kafka cluster. */ @RunWith(Parameterized.class) @@ -90,7 +87,6 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase @Rule public final KafkaTestInfo testInfo = new KafkaTestInfo(); - @Parameterized.Parameters(name = "multi-cluster: {0}, multi-partition: {1}, partition: {2}") public static Collection<Object[]> testScenario() { @@ -106,8 +102,6 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase }); } - - @Before public void before() { @@ -196,7 +190,6 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase endTuples = 0; } - public void processTuple(byte[] bt) { String tuple = new String(bt); @@ -220,7 +213,8 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase String key = operatorId + "," + currentWindowId; List<String> msgsInWin = tupleCollectedInWindow.get(key); if (msgsInWin != null) { - Assert.assertEquals("replay messages should be exactly same as previous window", msgsInWin, windowTupleCollector); + Assert.assertEquals( + "replay messages should be exactly same as previous window", msgsInWin, windowTupleCollector); } else { List<String> newList = Lists.newArrayList(); newList.addAll(windowTupleCollector); @@ -235,10 +229,11 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase int countDownTupleSize = countDownAll ? tupleSize : endTuples; if (latch != null) { - Assert.assertTrue("received END_TUPLES more than expected.", latch.getCount() >= countDownTupleSize); + Assert.assertTrue( + "received END_TUPLES more than expected.", latch.getCount() >= countDownTupleSize); while (countDownTupleSize > 0) { - latch.countDown(); - --countDownTupleSize; + latch.countDown(); + --countDownTupleSize; } if (latch.getCount() == 0) { /** @@ -265,7 +260,6 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase } - /** * Test AbstractKafkaSinglePortInputOperator (i.e. an input adapter for Kafka, aka consumer). This module receives * data from an outside test generator through Kafka message bus and feed that data into Malhar streaming platform. @@ -283,7 +277,6 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase testInputOperator(false, false); } - @Test public void testInputOperatorWithFailure() throws Exception { @@ -303,7 +296,9 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase // each broker should get a END_TUPLE message latch = new CountDownLatch(countDownAll ? totalCount + totalBrokers : totalBrokers); - logger.info("Test Case: name: {}; totalBrokers: {}; hasFailure: {}; hasMultiCluster: {}; hasMultiPartition: {}, partition: {}", + logger.info( + "Test Case: name: {}; totalBrokers: {}; hasFailure: {}; hasMultiCluster: {};" + + " hasMultiPartition: {}, partition: {}", testName, totalBrokers, hasFailure, hasMultiCluster, hasMultiPartition, partition); // Start producer @@ -319,7 +314,8 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase DAG dag = lma.getDAG(); // Create KafkaSinglePortStringInputOperator - KafkaSinglePortInputOperator node = dag.addOperator("Kafka input" + testName, KafkaSinglePortInputOperator.class); + KafkaSinglePortInputOperator node = dag.addOperator( + "Kafka input" + testName, KafkaSinglePortInputOperator.class); node.setInitialPartitionCount(1); // set topic node.setTopics(testName); @@ -330,13 +326,13 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase node.setWindowDataManager(new FSWindowDataManager()); } - // Create Test tuple collector CollectorModule collector = dag.addOperator("TestMessageCollector", CollectorModule.class); collector.isIdempotentTest = idempotent; // Connect ports - dag.addStream("Kafka message" + testName, node.outputPort, collector.inputPort).setLocality(Locality.CONTAINER_LOCAL); + dag.addStream("Kafka message" + testName, node.outputPort, collector.inputPort) + .setLocality(Locality.CONTAINER_LOCAL); if (hasFailure) { setupHasFailureTest(node, dag); @@ -347,7 +343,8 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase lc.setHeartbeatMonitoringEnabled(false); //let the Controller to run the inside another thread. It is almost same as call Controller.runAsync(), - //but Controller.runAsync() don't expose the thread which run it, so we don't know when the thread will be terminated. + //but Controller.runAsync() don't expose the thread which run it, + //so we don't know when the thread will be terminated. //create this thread and then call join() to make sure the Controller shutdown completely. monitorThread = new Thread((StramLocalCluster)lc, "master"); monitorThread.start(); @@ -370,21 +367,23 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase logger.info("Number of received/expected tuples: {}/{}, testName: {}, tuples: \n{}", tupleCollection.size(), expectedReceiveCount, testName, tupleCollection); } - Assert.assertTrue("TIMEOUT. testName: " + this.testName + "; Collected data: " + tupleCollection, notTimeout); + Assert.assertTrue("TIMEOUT. testName: " + this.testName + "; Collected data: " + + tupleCollection, notTimeout); // Check results - Assert.assertTrue( "testName: " + testName + "; Collected tuple size: " + tupleCollection.size() + "; Expected tuple size: " + expectedReceiveCount + "; data: \n" + tupleCollection, + Assert.assertTrue("testName: " + testName + "; Collected tuple size: " + tupleCollection.size() + + "; Expected tuple size: " + expectedReceiveCount + "; data: \n" + tupleCollection, expectedReceiveCount == tupleCollection.size()); logger.info("End of test case: {}", testName); } - private void setupHasFailureTest(KafkaSinglePortInputOperator operator, DAG dag) { operator.setHoldingBufferSize(5000); dag.setAttribute(Context.DAGContext.CHECKPOINT_WINDOW_COUNT, 1); - //dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new FSStorageAgent(APPLICATION_PATH + "failureck", new Configuration())); + //dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new FSStorageAgent( + // APPLICATION_PATH + "failureck", new Configuration())); operator.setMaxTuplesPerWindow(tuplesPerWindow); } @@ -394,8 +393,7 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase return l + TEST_KAFKA_BROKER_PORT[0][0] + (hasMultiPartition ? "," + l + TEST_KAFKA_BROKER_PORT[0][1] : "") + (hasMultiCluster ? ";" + l + TEST_KAFKA_BROKER_PORT[1][0] : "") + - (hasMultiCluster && hasMultiPartition ? "," + l + TEST_KAFKA_BROKER_PORT[1][1] : ""); + (hasMultiCluster && hasMultiPartition ? "," + l + TEST_KAFKA_BROKER_PORT[1][1] : ""); } - } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/623b803f/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaOperatorTestBase.java ---------------------------------------------------------------------- diff --git a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaOperatorTestBase.java b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaOperatorTestBase.java index a05fd9b..3910546 100644 --- a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaOperatorTestBase.java +++ b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaOperatorTestBase.java @@ -72,9 +72,9 @@ public class KafkaOperatorTestBase } TEST_ZOOKEEPER_PORT = new int[]{p[0], p[1]}; - TEST_KAFKA_BROKER_PORT = new int[][] { - new int[] {p[2], p[3]}, - new int[] {p[4], p[5]} + TEST_KAFKA_BROKER_PORT = new int[][]{ + new int[]{p[2], p[3]}, + new int[]{p[4], p[5]} }; } @@ -93,8 +93,10 @@ public class KafkaOperatorTestBase private static final String zkBaseDir = "zookeeper-server-data"; private static final String kafkaBaseDir = "kafka-server-data"; - private static final String[] zkdir = new String[] { "zookeeper-server-data/1", "zookeeper-server-data/2" }; - private static final String[][] kafkadir = new String[][] { new String[] { "kafka-server-data/1/1", "kafka-server-data/1/2" }, new String[] { "kafka-server-data/2/1", "kafka-server-data/2/2" } }; + private static final String[] zkdir = new String[]{"zookeeper-server-data/1", "zookeeper-server-data/2"}; + private static final String[][] kafkadir = new String[][]{ + new String[]{"kafka-server-data/1/1", "kafka-server-data/1/2"}, + new String[]{"kafka-server-data/2/1", "kafka-server-data/2/2"}}; protected boolean hasMultiPartition = false; protected boolean hasMultiCluster = false; @@ -132,8 +134,8 @@ public class KafkaOperatorTestBase zkf.shutdown(); } } - zkServer = new ZooKeeperServer[2]; - zkFactory = new ServerCnxnFactory[2]; + zkServer = new ZooKeeperServer[2]; + zkFactory = new ServerCnxnFactory[2]; } public static void startKafkaServer(int clusterid, int brokerid) @@ -156,7 +158,8 @@ public class KafkaOperatorTestBase { FileUtils.deleteQuietly(new File(baseDir, kafkaBaseDir)); - //boolean[][] startable = new boolean[][] { new boolean[] { true, hasMultiPartition }, new boolean[] { hasMultiCluster, hasMultiCluster && hasMultiPartition } }; + //boolean[][] startable = new boolean[][] { new boolean[] { true, hasMultiPartition }, + // new boolean[] { hasMultiCluster, hasMultiCluster && hasMultiPartition } }; startKafkaServer(0, 0); startKafkaServer(0, 1); startKafkaServer(1, 0); @@ -261,13 +264,15 @@ public class KafkaOperatorTestBase // TODO Auto-generated constructor stub } - public TestZookeeperServer(FileTxnSnapLog txnLogFactory, int tickTime, DataTreeBuilder treeBuilder) throws IOException + public TestZookeeperServer(FileTxnSnapLog txnLogFactory, int tickTime, DataTreeBuilder treeBuilder) + throws IOException { super(txnLogFactory, tickTime, treeBuilder); // TODO Auto-generated constructor stub } - public TestZookeeperServer(FileTxnSnapLog txnLogFactory, int tickTime, int minSessionTimeout, int maxSessionTimeout, DataTreeBuilder treeBuilder, ZKDatabase zkDb) + public TestZookeeperServer(FileTxnSnapLog txnLogFactory, int tickTime, int minSessionTimeout, + int maxSessionTimeout, DataTreeBuilder treeBuilder, ZKDatabase zkDb) { super(txnLogFactory, tickTime, minSessionTimeout, maxSessionTimeout, treeBuilder, zkDb); // TODO Auto-generated constructor stub http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/623b803f/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaOutputOperatorTest.java ---------------------------------------------------------------------- diff --git a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaOutputOperatorTest.java b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaOutputOperatorTest.java index 58d69f6..7abf0f8 100644 --- a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaOutputOperatorTest.java +++ b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaOutputOperatorTest.java @@ -31,7 +31,6 @@ import org.junit.Before; import org.junit.Test; import org.apache.apex.malhar.lib.wal.FSWindowDataManager; - import org.apache.commons.io.FileUtils; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; @@ -145,7 +144,8 @@ public class KafkaOutputOperatorTest extends KafkaOperatorTestBase Assert.assertTrue("No failure", fromKafka.size() > toKafka.size()); } - private void sendDataToKafka(boolean exactlyOnce, List<Person> toKafka, boolean hasFailure, boolean differentTuplesAfterRecovery) throws InterruptedException + private void sendDataToKafka(boolean exactlyOnce, List<Person> toKafka, boolean hasFailure, + boolean differentTuplesAfterRecovery) throws InterruptedException { Properties props = new Properties(); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, VALUE_SERIALIZER); @@ -159,7 +159,8 @@ public class KafkaOutputOperatorTest extends KafkaOperatorTestBase attributeMap.put(Context.DAGContext.APPLICATION_NAME, "MyKafkaApp"); attributeMap.put(DAG.APPLICATION_PATH, APPLICATION_PATH); - OperatorContextTestHelper.TestIdOperatorContext operatorContext = new OperatorContextTestHelper.TestIdOperatorContext(2, attributeMap); + OperatorContextTestHelper.TestIdOperatorContext operatorContext = + new OperatorContextTestHelper.TestIdOperatorContext(2, attributeMap); cleanUp(operatorContext); @@ -167,11 +168,13 @@ public class KafkaOutputOperatorTest extends KafkaOperatorTestBase DefaultInputPort<Person> inputPort; if (exactlyOnce) { - KafkaSinglePortExactlyOnceOutputOperator<Person> kafkaOutputTemp = ResetKafkaOutput(testName, props, operatorContext); + KafkaSinglePortExactlyOnceOutputOperator<Person> kafkaOutputTemp = + ResetKafkaOutput(testName, props, operatorContext); inputPort = kafkaOutputTemp.inputPort; kafkaOutput = kafkaOutputTemp; } else { - KafkaSinglePortOutputOperator<String, Person> kafkaOutputTemp = ResetKafkaSimpleOutput(testName, props, operatorContext); + KafkaSinglePortOutputOperator<String, Person> kafkaOutputTemp = + ResetKafkaSimpleOutput(testName, props, operatorContext); inputPort = kafkaOutputTemp.inputPort; kafkaOutput = kafkaOutputTemp; } @@ -192,11 +195,13 @@ public class KafkaOutputOperatorTest extends KafkaOperatorTestBase if (hasFailure) { if (exactlyOnce) { - KafkaSinglePortExactlyOnceOutputOperator<Person> kafkaOutputTemp = ResetKafkaOutput(testName, props, operatorContext); + KafkaSinglePortExactlyOnceOutputOperator<Person> kafkaOutputTemp = + ResetKafkaOutput(testName, props, operatorContext); inputPort = kafkaOutputTemp.inputPort; kafkaOutput = kafkaOutputTemp; } else { - KafkaSinglePortOutputOperator<String,Person> kafkaOutputTemp = ResetKafkaSimpleOutput(testName, props, operatorContext); + KafkaSinglePortOutputOperator<String, Person> kafkaOutputTemp = + ResetKafkaSimpleOutput(testName, props, operatorContext); inputPort = kafkaOutputTemp.inputPort; kafkaOutput = kafkaOutputTemp; } @@ -225,7 +230,8 @@ public class KafkaOutputOperatorTest extends KafkaOperatorTestBase cleanUp(operatorContext); } - private KafkaSinglePortExactlyOnceOutputOperator<Person> ResetKafkaOutput(String testName, Properties props, Context.OperatorContext operatorContext) + private KafkaSinglePortExactlyOnceOutputOperator<Person> ResetKafkaOutput( + String testName, Properties props, Context.OperatorContext operatorContext) { KafkaSinglePortExactlyOnceOutputOperator<Person> kafkaOutput = new KafkaSinglePortExactlyOnceOutputOperator<>(); kafkaOutput.setTopic(testName); @@ -235,9 +241,10 @@ public class KafkaOutputOperatorTest extends KafkaOperatorTestBase return kafkaOutput; } - private KafkaSinglePortOutputOperator<String, Person> ResetKafkaSimpleOutput(String testName, Properties props, Context.OperatorContext operatorContext) + private KafkaSinglePortOutputOperator<String, Person> ResetKafkaSimpleOutput( + String testName, Properties props, Context.OperatorContext operatorContext) { - KafkaSinglePortOutputOperator<String,Person> kafkaOutput = new KafkaSinglePortOutputOperator<>(); + KafkaSinglePortOutputOperator<String, Person> kafkaOutput = new KafkaSinglePortOutputOperator<>(); kafkaOutput.setTopic(testName); kafkaOutput.setProperties(props); kafkaOutput.setup(operatorContext); @@ -263,7 +270,7 @@ public class KafkaOutputOperatorTest extends KafkaOperatorTestBase } for (int i = 0; i < fromKafka.size(); ++i) { - if ( !fromKafka.get(i).equals(toKafka.get(i))) { + if (!fromKafka.get(i).equals(toKafka.get(i))) { return false; } } @@ -275,9 +282,9 @@ public class KafkaOutputOperatorTest extends KafkaOperatorTestBase { String l = "localhost:"; return l + TEST_KAFKA_BROKER_PORT[0][0] + - (hasMultiPartition ? "," + l + TEST_KAFKA_BROKER_PORT[0][1] : "") + - (hasMultiCluster ? ";" + l + TEST_KAFKA_BROKER_PORT[1][0] : "") + - (hasMultiCluster && hasMultiPartition ? "," + l + TEST_KAFKA_BROKER_PORT[1][1] : ""); + (hasMultiPartition ? "," + l + TEST_KAFKA_BROKER_PORT[0][1] : "") + + (hasMultiCluster ? ";" + l + TEST_KAFKA_BROKER_PORT[1][0] : "") + + (hasMultiCluster && hasMultiPartition ? "," + l + TEST_KAFKA_BROKER_PORT[1][1] : ""); } private List<Person> GenerateList() http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/623b803f/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaTestPartitioner.java ---------------------------------------------------------------------- diff --git a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaTestPartitioner.java b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaTestPartitioner.java index 21f8977..6098bde 100644 --- a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaTestPartitioner.java +++ b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaTestPartitioner.java @@ -33,11 +33,13 @@ import kafka.utils.VerifiableProperties; */ public class KafkaTestPartitioner implements Partitioner { - public KafkaTestPartitioner(VerifiableProperties props) { + public KafkaTestPartitioner(VerifiableProperties props) + { } - public KafkaTestPartitioner() { + public KafkaTestPartitioner() + { } @@ -45,7 +47,7 @@ public class KafkaTestPartitioner implements Partitioner public int partition(String topic, Object key, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) { int num_partitions = cluster.partitionsForTopic(topic).size(); - return Integer.parseInt((String)key)%num_partitions; + return Integer.parseInt((String)key) % num_partitions; } @Override http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/623b803f/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaTestProducer.java ---------------------------------------------------------------------- diff --git a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaTestProducer.java b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaTestProducer.java index ca6cc98..0f18666 100644 --- a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaTestProducer.java +++ b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaTestProducer.java @@ -62,7 +62,8 @@ public class KafkaTestProducer implements Runnable this.sendCount = sendCount; } - public void setMessages(List<String> messages) { + public void setMessages(List<String> messages) + { this.messages = messages; } @@ -72,8 +73,8 @@ public class KafkaTestProducer implements Runnable props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.setProperty(ProducerConfig.PARTITIONER_CLASS_CONFIG, KafkaTestPartitioner.class.getName()); - String brokerList = "localhost:"+KafkaOperatorTestBase.TEST_KAFKA_BROKER_PORT[cid][0]; - brokerList += hasPartition ? (",localhost:" + KafkaOperatorTestBase.TEST_KAFKA_BROKER_PORT[cid][1]):""; + String brokerList = "localhost:" + KafkaOperatorTestBase.TEST_KAFKA_BROKER_PORT[cid][0]; + brokerList += hasPartition ? (",localhost:" + KafkaOperatorTestBase.TEST_KAFKA_BROKER_PORT[cid][1]) : ""; props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList); props.setProperty(ProducerConfig.METADATA_MAX_AGE_CONFIG, "20000"); props.setProperty(ProducerConfig.ACKS_CONFIG, getAckType()); @@ -94,14 +95,15 @@ public class KafkaTestProducer implements Runnable this.hasPartition = hasPartition; this.hasMultiCluster = hasMultiCluster; producer = new KafkaProducer<>(createProducerConfig(0)); - if(hasMultiCluster){ + if (hasMultiCluster) { producer1 = new KafkaProducer<>(createProducerConfig(1)); } else { producer1 = null; } } - public KafkaTestProducer(String topic, boolean hasPartition) { + public KafkaTestProducer(String topic, boolean hasPartition) + { this(topic, hasPartition, false); } @@ -115,7 +117,7 @@ public class KafkaTestProducer implements Runnable String messageStr = "_" + messageNo++; int k = rand.nextInt(100); sendTasks.add(producer.send(new ProducerRecord<>(topic, "" + k, "c1" + messageStr))); - if(hasMultiCluster && messageNo <= sendCount){ + if (hasMultiCluster && messageNo <= sendCount) { messageStr = "_" + messageNo++; sendTasks.add(producer1.send(new ProducerRecord<>(topic, "" + k, "c2" + messageStr))); } @@ -142,7 +144,7 @@ public class KafkaTestProducer implements Runnable } producer.flush(); - if (producer1!=null) { + if (producer1 != null) { producer1.flush(); } @@ -160,7 +162,7 @@ public class KafkaTestProducer implements Runnable public void close() { producer.close(); - if (producer1!=null) { + if (producer1 != null) { producer1.close(); } } @@ -174,4 +176,4 @@ public class KafkaTestProducer implements Runnable { this.ackType = ackType; } -} // End of KafkaTestProducer \ No newline at end of file +} // End of KafkaTestProducer
