Repository: apex-malhar Updated Branches: refs/heads/master 719cf952d -> 389a2d564
APEXMALHAR-2120 #resolve #comment solve problems of KafkaInputOperatorTest and AbstractKafkaInputOperator Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/f2b7a856 Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/f2b7a856 Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/f2b7a856 Branch: refs/heads/master Commit: f2b7a85677ab9ada29c95ff7821c43459c8468dd Parents: d2f0586 Author: brightchen <[email protected]> Authored: Tue Jun 14 16:30:17 2016 -0700 Committer: brightchen <[email protected]> Committed: Fri Jun 24 09:56:48 2016 -0700 ---------------------------------------------------------------------- .../malhar/kafka/AbstractKafkaPartitioner.java | 39 ++- .../apex/malhar/kafka/KafkaConsumerWrapper.java | 58 ++--- .../malhar/kafka/KafkaInputOperatorTest.java | 237 +++++++++++++------ .../apex/malhar/kafka/KafkaTestProducer.java | 54 +++-- kafka/src/test/resources/log4j.properties | 49 ++++ 5 files changed, 302 insertions(+), 135 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f2b7a856/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java ---------------------------------------------------------------------- diff --git a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java index 7bb8585..c6e47e9 100644 --- a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java +++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java @@ -45,6 +45,8 @@ import com.datatorrent.api.Partitioner; import com.datatorrent.api.StatsListener; import com.datatorrent.lib.util.KryoCloneUtils; +import kafka.common.AuthorizationException; + /** * Abstract partitioner used to manage the partitions of kafka input operator. * It use a number of kafka consumers(one for each cluster) to get the latest partition metadata for topics that @@ -87,27 +89,50 @@ public abstract class AbstractKafkaPartitioner implements Partitioner<AbstractKa @Override public Collection<Partition<AbstractKafkaInputOperator>> definePartitions(Collection<Partition<AbstractKafkaInputOperator>> collection, PartitioningContext partitioningContext) { - initMetadataClients(); Map<String, Map<String, List<PartitionInfo>>> metadata = new HashMap<>(); - for (int i = 0; i < clusters.length; i++) { metadata.put(clusters[i], new HashMap<String, List<PartitionInfo>>()); for (String topic : topics) { - List<PartitionInfo> ptis = metadataRefreshClients.get(i).partitionsFor(topic); - if (logger.isDebugEnabled()) { - logger.debug("Partition metadata for topic {} : {}", topic, Joiner.on(';').join(ptis)); + int tryTime = 3; + while (tryTime-- > 0) { + try { + List<PartitionInfo> ptis = metadataRefreshClients.get(i).partitionsFor(topic); + if (logger.isDebugEnabled()) { + logger.debug("Partition metadata for topic {} : {}", topic, Joiner.on(';').join(ptis)); + } + metadata.get(clusters[i]).put(topic, ptis); + break; + } catch (AuthorizationException ae) { + logger.error("Kafka AuthorizationException."); + throw new RuntimeException("Kafka AuthorizationException.", ae); + } catch (Exception e) { + logger.warn("Got Exception when trying get partition info for topic {}.", topic, e); + try { + Thread.sleep(100); + } catch (Exception e1) { + //ignore + } + } + } + if (tryTime == 0) { + throw new RuntimeException("Get partition info completely failed. Please check the log file"); } - metadata.get(clusters[i]).put(topic, ptis); } metadataRefreshClients.get(i).close(); } metadataRefreshClients = null; - List<Set<AbstractKafkaPartitioner.PartitionMeta>> parts = assign(metadata); + List<Set<AbstractKafkaPartitioner.PartitionMeta>> parts = null; + try { + parts = assign(metadata); + } catch (Exception e) { + logger.error("assign() exception.", e); + e.printStackTrace(); + } if (currentPartitions == parts || currentPartitions.equals(parts)) { http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f2b7a856/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaConsumerWrapper.java ---------------------------------------------------------------------- diff --git a/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaConsumerWrapper.java b/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaConsumerWrapper.java index adc9540..143a5bd 100644 --- a/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaConsumerWrapper.java +++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaConsumerWrapper.java @@ -32,6 +32,7 @@ import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicBoolean; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -74,7 +75,7 @@ public class KafkaConsumerWrapper implements Closeable private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerWrapper.class); - private boolean isAlive = false; + private AtomicBoolean isAlive = new AtomicBoolean(false); private final Map<String, KafkaConsumer<byte[], byte[]>> consumers = new HashMap<>(); @@ -129,6 +130,12 @@ public class KafkaConsumerWrapper implements Closeable if (meta.getTopicPartition().equals(tp)) { kc.resume(tp); } else { + try { + kc.position(tp); + } catch (NoOffsetForPartitionException e) { + //the poll() method of a consumer will throw exception if any of subscribed consumers not initialized with position + handleNoOffsetForPartitionException(e, kc); + } kc.pause(tp); } } @@ -188,7 +195,7 @@ public class KafkaConsumerWrapper implements Closeable try { - while (wrapper.isAlive) { + while (wrapper.isAlive.get()) { if (wrapper.waitForReplay) { Thread.sleep(100); continue; @@ -207,19 +214,7 @@ public class KafkaConsumerWrapper implements Closeable wrapper.putMessage(Pair.of(cluster, record)); } } catch (NoOffsetForPartitionException e) { - // if initialOffset is set to EARLIST or LATEST - // and the application is run as first time - // then there is no existing committed offset and this error will be caught - // we need to seek to either beginning or end of the partition - // based on the initial offset setting - AbstractKafkaInputOperator.InitialOffset io = - AbstractKafkaInputOperator.InitialOffset.valueOf(wrapper.ownerOperator.getInitialOffset()); - if (io == AbstractKafkaInputOperator.InitialOffset.APPLICATION_OR_EARLIEST - || io == AbstractKafkaInputOperator.InitialOffset.EARLIEST) { - consumer.seekToBeginning(e.partitions().toArray(new TopicPartition[0])); - } else { - consumer.seekToEnd(e.partitions().toArray(new TopicPartition[0])); - } + wrapper.handleNoOffsetForPartitionException(e, consumer); } catch (InterruptedException e) { throw new IllegalStateException("Consumer thread is interrupted unexpectedly", e); } @@ -233,7 +228,24 @@ public class KafkaConsumerWrapper implements Closeable } } } - + + protected void handleNoOffsetForPartitionException(NoOffsetForPartitionException e, KafkaConsumer<byte[], byte[]> consumer) + { + // if initialOffset is set to EARLIST or LATEST + // and the application is run as first time + // then there is no existing committed offset and this error will be caught + // we need to seek to either beginning or end of the partition + // based on the initial offset setting + AbstractKafkaInputOperator.InitialOffset io = + AbstractKafkaInputOperator.InitialOffset.valueOf(ownerOperator.getInitialOffset()); + if (io == AbstractKafkaInputOperator.InitialOffset.APPLICATION_OR_EARLIEST + || io == AbstractKafkaInputOperator.InitialOffset.EARLIEST) { + consumer.seekToBeginning(e.partitions().toArray(new TopicPartition[0])); + } else { + consumer.seekToEnd(e.partitions().toArray(new TopicPartition[0])); + } + + } /** * This method is called in setup method of Abstract Kafka Input Operator @@ -255,7 +267,7 @@ public class KafkaConsumerWrapper implements Closeable public void start(boolean waitForReplay) { this.waitForReplay = waitForReplay; - isAlive = true; + isAlive.set(true); // thread to consume the kafka data // create thread pool for consumer threads @@ -330,11 +342,11 @@ public class KafkaConsumerWrapper implements Closeable */ public void stop() { + isAlive.set(false); for (KafkaConsumer<byte[], byte[]> c : consumers.values()) { c.wakeup(); } kafkaConsumerExecutor.shutdownNow(); - isAlive = false; holdingBuffer.clear(); IOUtils.closeQuietly(this); } @@ -347,16 +359,6 @@ public class KafkaConsumerWrapper implements Closeable holdingBuffer.clear(); } - public boolean isAlive() - { - return isAlive; - } - - public void setAlive(boolean isAlive) - { - this.isAlive = isAlive; - } - public Pair<String, ConsumerRecord<byte[], byte[]>> pollMessage() { return holdingBuffer.poll(); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f2b7a856/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java ---------------------------------------------------------------------- diff --git a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java index 72ecd57..8440615 100644 --- a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java +++ b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java @@ -34,14 +34,16 @@ import java.util.concurrent.TimeUnit; import org.junit.Assert; import org.junit.Before; -import org.junit.Ignore; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TestWatcher; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.slf4j.LoggerFactory; import org.apache.apex.malhar.lib.wal.FSWindowDataManager; -import org.apache.commons.io.FileUtils; + +import com.google.common.collect.Lists; import com.datatorrent.api.Context; import com.datatorrent.api.DAG; @@ -55,7 +57,6 @@ import com.datatorrent.stram.StramLocalCluster; * A bunch of test to verify the input operator will be automatically partitioned per kafka partition This test is launching its * own Kafka cluster. */ -@Ignore @RunWith(Parameterized.class) public class KafkaInputOperatorTest extends KafkaOperatorTestBase { @@ -65,13 +66,36 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase private String partition = null; private String testName = ""; - + public static String APPLICATION_PATH = baseDir + File.separator + StramLocalCluster.class.getName() + File.separator; + public class KafkaTestInfo extends TestWatcher + { + public org.junit.runner.Description desc; + + public String getDir() + { + String methodName = desc.getMethodName(); + String className = desc.getClassName(); + return "target/" + className + "/" + methodName + "/" + testName; + } + + @Override + protected void starting(org.junit.runner.Description description) + { + this.desc = description; + } + } + + @Rule + public final KafkaTestInfo testInfo = new KafkaTestInfo(); + + @Parameterized.Parameters(name = "multi-cluster: {0}, multi-partition: {1}, partition: {2}") public static Collection<Object[]> testScenario() { - return Arrays.asList(new Object[][]{{true, false, "one_to_one"},// multi cluster with single partition + return Arrays.asList(new Object[][]{ + {true, false, "one_to_one"},// multi cluster with single partition {true, false, "one_to_many"}, {true, true, "one_to_one"},// multi cluster with multi partitions {true, true, "one_to_many"}, @@ -82,12 +106,17 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase }); } + + @Before public void before() { - FileUtils.deleteQuietly(new File(APPLICATION_PATH)); - tupleCollection.clear(); testName = TEST_TOPIC + testCounter++; + logger.info("before() test case: {}", testName); + tupleCollection.clear(); + //reset count for next new test case + k = 0; + createTopic(0, testName); if (hasMultiCluster) { createTopic(1, testName); @@ -107,12 +136,24 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase private static final org.slf4j.Logger logger = LoggerFactory.getLogger(KafkaInputOperatorTest.class); private static List<String> tupleCollection = new LinkedList<>(); - private static Map<String, Set<String>> tupleCollectedInWindow = new HashMap<>(); + + /** + * whether countDown latch count all tuples or just END_TUPLE + */ + private static final boolean countDownAll = false; + private static final int scale = 2; + private static final int totalCount = 10 * scale; + private static final int failureTrigger = 3 * scale; + private static final int tuplesPerWindow = 5 * scale; + private static final int waitTime = 60000 + 300 * scale; + + //This latch was used to count the END_TUPLE, but the order of tuple can't be guaranteed, + //so, count valid tuple instead. private static CountDownLatch latch; private static boolean hasFailure = false; - private static int failureTrigger = 3000; private static int k = 0; - + private static Thread monitorThread; + /** * Test Operator to collect tuples from KafkaSingleInputStringOperator. * @@ -120,8 +161,14 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase */ public static class CollectorModule extends BaseOperator { - - public final transient CollectorInputPort inputPort = new CollectorInputPort(this); + public final transient DefaultInputPort<byte[]> inputPort = new DefaultInputPort<byte[]>() + { + @Override + public void process(byte[] bt) + { + processTuple(bt); + } + }; long currentWindowId; @@ -129,8 +176,10 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase boolean isIdempotentTest = false; - transient Set<String> windowTupleCollector = new HashSet<>(); - + transient List<String> windowTupleCollector = Lists.newArrayList(); + private transient Map<String, List<String>> tupleCollectedInWindow = new HashMap<>(); + private int endTuples = 0; + @Override public void setup(Context.OperatorContext context) { @@ -143,59 +192,80 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase { super.beginWindow(windowId); currentWindowId = windowId; + windowTupleCollector.clear(); + endTuples = 0; } + + public void processTuple(byte[] bt) + { + String tuple = new String(bt); + if (hasFailure && k++ == failureTrigger) { + //you can only kill yourself once + hasFailure = false; + throw new RuntimeException(); + } + if (tuple.startsWith(KafkaOperatorTestBase.END_TUPLE)) { + endTuples++; + } + + windowTupleCollector.add(tuple); + } + @Override public void endWindow() { super.endWindow(); if (isIdempotentTest) { String key = operatorId + "," + currentWindowId; - Set<String> msgsInWin = tupleCollectedInWindow.get(key); - if (msgsInWin!=null) { + List<String> msgsInWin = tupleCollectedInWindow.get(key); + if (msgsInWin != null) { Assert.assertEquals("replay messages should be exactly same as previous window", msgsInWin, windowTupleCollector); } else { - Set<String> newSet = new HashSet<>(); - newSet.addAll(windowTupleCollector); - tupleCollectedInWindow.put(key, newSet); + List<String> newList = Lists.newArrayList(); + newList.addAll(windowTupleCollector); + tupleCollectedInWindow.put(key, newList); } } - windowTupleCollector.clear(); - } - - } - - public static class CollectorInputPort extends DefaultInputPort<byte[]> - { - CollectorModule ownerNode; - - CollectorInputPort(CollectorModule node) { - this.ownerNode = node; - } - @Override - public void process(byte[] bt) - { - String tuple = new String(bt); - if (hasFailure && k++ == failureTrigger) { - //you can only kill yourself once - hasFailure = false; - throw new RuntimeException(); - } - if (tuple.equals(KafkaOperatorTestBase.END_TUPLE)) { - if (latch != null) { - latch.countDown(); + //discard the tuples of this window if except happened + int tupleSize = windowTupleCollector.size(); + tupleCollection.addAll(windowTupleCollector); + + int countDownTupleSize = countDownAll ? tupleSize : endTuples; + + if (latch != null) { + Assert.assertTrue("received END_TUPLES more than expected.", latch.getCount() >= countDownTupleSize); + while (countDownTupleSize > 0) { + latch.countDown(); + --countDownTupleSize; + } + if (latch.getCount() == 0) { + /** + * The time before countDown() and the shutdown() of the application + * will cause fatal error: + * "Catastrophic Error: Invalid State - the operator blocked forever!" + * as the activeQueues could be cleared but alive haven't changed yet. + * throw the ShutdownException to let the engine shutdown; + */ + try { + throw new ShutdownException(); + //lc.shutdown(); + } finally { + /** + * interrupt the engine thread, let it wake from sleep and handle + * the shutdown at this time, all payload should be handled. so it + * should be ok to interrupt + */ + monitorThread.interrupt(); + } } - return; - } - tupleCollection.add(tuple); - if (ownerNode.isIdempotentTest) { - ownerNode.windowTupleCollector.add(tuple); } } } + /** * Test AbstractKafkaSinglePortInputOperator (i.e. an input adapter for Kafka, aka consumer). This module receives * data from an outside test generator through Kafka message bus and feed that data into Malhar streaming platform. @@ -230,11 +300,11 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase public void testInputOperator(boolean hasFailure, boolean idempotent) throws Exception { - // each broker should get a END_TUPLE message - latch = new CountDownLatch(totalBrokers); + latch = new CountDownLatch(countDownAll ? totalCount + totalBrokers : totalBrokers); - int totalCount = 10000; + logger.info("Test Case: name: {}; totalBrokers: {}; hasFailure: {}; hasMultiCluster: {}; hasMultiPartition: {}, partition: {}", + testName, totalBrokers, hasFailure, hasMultiCluster, hasMultiPartition, partition); // Start producer KafkaTestProducer p = new KafkaTestProducer(testName, hasMultiPartition, hasMultiCluster); @@ -242,19 +312,21 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase Thread t = new Thread(p); t.start(); + int expectedReceiveCount = totalCount + totalBrokers; + // Create DAG for testing. LocalMode lma = LocalMode.newInstance(); DAG dag = lma.getDAG(); // Create KafkaSinglePortStringInputOperator - KafkaSinglePortInputOperator node = dag.addOperator("Kafka input", KafkaSinglePortInputOperator.class); + KafkaSinglePortInputOperator node = dag.addOperator("Kafka input" + testName, KafkaSinglePortInputOperator.class); node.setInitialPartitionCount(1); // set topic node.setTopics(testName); node.setInitialOffset(AbstractKafkaInputOperator.InitialOffset.EARLIEST.name()); node.setClusters(getClusterConfig()); node.setStrategy(partition); - if(idempotent) { + if (idempotent) { node.setWindowDataManager(new FSWindowDataManager()); } @@ -264,49 +336,60 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase collector.isIdempotentTest = idempotent; // Connect ports - dag.addStream("Kafka message", node.outputPort, collector.inputPort).setLocality(Locality.CONTAINER_LOCAL); - + dag.addStream("Kafka message" + testName, node.outputPort, collector.inputPort).setLocality(Locality.CONTAINER_LOCAL); if (hasFailure) { setupHasFailureTest(node, dag); } // Create local cluster - final LocalMode.Controller lc = lma.getController(); + LocalMode.Controller lc = lma.getController(); lc.setHeartbeatMonitoringEnabled(false); - lc.runAsync(); - - // Wait 30s for consumer finish consuming all the messages - boolean notTimeout = latch.await(40000, TimeUnit.MILLISECONDS); - Collections.sort(tupleCollection, new Comparator<String>() - { - @Override - public int compare(String o1, String o2) - { - return Integer.parseInt(o1.split("_")[1]) - Integer.parseInt(o2.split("_")[1]); - } - }); - Assert.assertTrue("TIMEOUT: 40s Collected " + tupleCollection, notTimeout); + //let the Controller to run the inside another thread. It is almost same as call Controller.runAsync(), + //but Controller.runAsync() don't expose the thread which run it, so we don't know when the thread will be terminated. + //create this thread and then call join() to make sure the Controller shutdown completely. + monitorThread = new Thread((StramLocalCluster)lc, "master"); + monitorThread.start(); + + boolean notTimeout = true; + try { + // Wait 60s for consumer finish consuming all the messages + notTimeout = latch.await(waitTime, TimeUnit.MILLISECONDS); + lc.shutdown(); + + //wait until control thread finished. + monitorThread.join(); + } catch (Exception e) { + logger.warn(e.getMessage()); + } + + t.join(); + + if (!notTimeout || expectedReceiveCount != tupleCollection.size()) { + logger.info("Number of received/expected tuples: {}/{}, testName: {}, tuples: \n{}", tupleCollection.size(), + expectedReceiveCount, testName, tupleCollection); + } + Assert.assertTrue("TIMEOUT. testName: " + this.testName + "; Collected data: " + tupleCollection, notTimeout); // Check results - Assert.assertTrue("Collected tuples " + tupleCollection + " Tuple count is not expected", totalCount <=+ tupleCollection.size()); - logger.debug(String.format("Number of emitted tuples: %d", tupleCollection.size())); - - t.join(); - p.close(); - lc.shutdown(); + Assert.assertTrue( "testName: " + testName + "; Collected tuple size: " + tupleCollection.size() + "; Expected tuple size: " + expectedReceiveCount + "; data: \n" + tupleCollection, + expectedReceiveCount == tupleCollection.size()); + + logger.info("End of test case: {}", testName); } + private void setupHasFailureTest(KafkaSinglePortInputOperator operator, DAG dag) { operator.setHoldingBufferSize(5000); dag.setAttribute(Context.DAGContext.CHECKPOINT_WINDOW_COUNT, 1); //dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new FSStorageAgent(APPLICATION_PATH + "failureck", new Configuration())); - operator.setMaxTuplesPerWindow(500); + operator.setMaxTuplesPerWindow(tuplesPerWindow); } - private String getClusterConfig() { + private String getClusterConfig() + { String l = "localhost:"; return l + TEST_KAFKA_BROKER_PORT[0][0] + (hasMultiPartition ? "," + l + TEST_KAFKA_BROKER_PORT[0][1] : "") + http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f2b7a856/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaTestProducer.java ---------------------------------------------------------------------- diff --git a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaTestProducer.java b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaTestProducer.java index 36130ce..2f24a8a 100644 --- a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaTestProducer.java +++ b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaTestProducer.java @@ -28,8 +28,11 @@ import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.serialization.StringSerializer; +import com.google.common.collect.Lists; + /** * A kafka producer for testing */ @@ -102,31 +105,27 @@ public class KafkaTestProducer implements Runnable this(topic, hasPartition, false); } + private transient List<Future<RecordMetadata>> sendTasks = Lists.newArrayList(); + private void generateMessages() { // Create dummy message int messageNo = 1; while (messageNo <= sendCount) { - String messageStr = "Message_" + messageNo; + String messageStr = "_" + messageNo++; int k = rand.nextInt(100); - producer.send(new ProducerRecord<>(topic, "" + k, "c1" + messageStr)); - if(hasMultiCluster){ - messageNo++; - producer1.send(new ProducerRecord<>(topic, "" + k, "c2" + messageStr)); + sendTasks.add(producer.send(new ProducerRecord<>(topic, "" + k, "c1" + messageStr))); + if(hasMultiCluster && messageNo <= sendCount){ + messageStr = "_" + messageNo++; + sendTasks.add(producer1.send(new ProducerRecord<>(topic, "" + k, "c2" + messageStr))); } - messageNo++; // logger.debug(String.format("Producing %s", messageStr)); } // produce the end tuple to let the test input operator know it's done produce messages - producer.send(new ProducerRecord<>(topic, "" + 0, KafkaOperatorTestBase.END_TUPLE)); - if(hasMultiCluster) { - producer1.send(new ProducerRecord<>(topic, "" + 0, KafkaOperatorTestBase.END_TUPLE)); - } - if(hasPartition){ - // Send end_tuple to other partition if it exist - producer.send(new ProducerRecord<>(topic, "" + 1, KafkaOperatorTestBase.END_TUPLE)); - if(hasMultiCluster) { - producer1.send(new ProducerRecord<>(topic, "" + 1, KafkaOperatorTestBase.END_TUPLE)); + for (int i = 0; i < (hasPartition ? 2 : 1); ++i) { + sendTasks.add(producer.send(new ProducerRecord<>(topic, "" + i, KafkaOperatorTestBase.END_TUPLE))); + if (hasMultiCluster) { + sendTasks.add(producer1.send(new ProducerRecord<>(topic, "" + i, KafkaOperatorTestBase.END_TUPLE))); } } } @@ -138,23 +137,32 @@ public class KafkaTestProducer implements Runnable generateMessages(); } else { for (String msg : messages) { - Future f = producer.send(new ProducerRecord<>(topic, "", msg)); - try { - f.get(30, TimeUnit.SECONDS); - } catch (Exception e) { - throw new RuntimeException(e); - } + sendTasks.add(producer.send(new ProducerRecord<>(topic, "", msg))); } } - producer.close(); + + producer.flush(); if (producer1!=null) { - producer1.close(); + producer1.flush(); } + + try { + for (Future<RecordMetadata> task : sendTasks) { + task.get(30, TimeUnit.SECONDS); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + + close(); } public void close() { producer.close(); + if (producer1!=null) { + producer1.close(); + } } public String getAckType() http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f2b7a856/kafka/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/kafka/src/test/resources/log4j.properties b/kafka/src/test/resources/log4j.properties new file mode 100644 index 0000000..c115950 --- /dev/null +++ b/kafka/src/test/resources/log4j.properties @@ -0,0 +1,49 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +log4j.rootLogger=DEBUG,CONSOLE + +log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender +log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout +log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n +log4j.appender.CONSOLE.threshold=INFO +#log4j.appender.CONSOLE.threshold=${test.log.console.threshold} +test.log.console.threshold=WARN + +log4j.appender.RFA=org.apache.log4j.RollingFileAppender +log4j.appender.RFA.layout=org.apache.log4j.PatternLayout +log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n +log4j.appender.RFA.File=/tmp/app.log + +# to enable, add SYSLOG to rootLogger +log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender +log4j.appender.SYSLOG.syslogHost=127.0.0.1 +log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout +log4j.appender.SYSLOG.layout.conversionPattern=${dt.cid} %-5p [%t] %c{2} %x - %m%n +log4j.appender.SYSLOG.Facility=LOCAL1 + +log4j.logger.org=INFO + +#log4j.logger.org.apache.commons.beanutils=warn +log4j.logger.com.datatorrent=INFO +log4j.logger.org.apache.apex=INFO + +log4j.logger.org.apacke.kafka=WARN +log4j.logger.kafka.consumer=WARN +log4j.logger.kafka=WARN
