Repository: incubator-apex-malhar Updated Branches: refs/heads/master b4fd6a60d -> 802240045
Add idempotent support for 0.9 kafka input operator Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/37f81309 Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/37f81309 Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/37f81309 Branch: refs/heads/master Commit: 37f8130920f0d7214bec00b7ac36448e58977f49 Parents: a23cc5b Author: Siyuan Hua <[email protected]> Authored: Mon Apr 4 13:56:36 2016 -0700 Committer: Siyuan Hua <[email protected]> Committed: Mon Apr 4 13:56:36 2016 -0700 ---------------------------------------------------------------------- .../kafka/AbstractKafkaInputOperator.java | 80 ++++++++++- .../apex/malhar/kafka/KafkaConsumerWrapper.java | 62 ++++++++- .../malhar/kafka/KafkaInputOperatorTest.java | 139 ++++++++++++++++--- .../malhar/kafka/KafkaOperatorTestBase.java | 79 ++++------- 4 files changed, 281 insertions(+), 79 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/37f81309/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java ---------------------------------------------------------------------- diff --git a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java index 89104a3..06cd470 100644 --- a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java +++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java @@ -18,6 +18,7 @@ */ package org.apache.apex.malhar.kafka; +import java.io.IOException; import java.util.Collection; import java.util.HashMap; import java.util.Iterator; @@ -33,6 +34,7 @@ import javax.validation.constraints.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.commons.lang3.tuple.MutablePair; import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.classification.InterfaceStability; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -50,6 +52,8 @@ import com.datatorrent.api.InputOperator; import com.datatorrent.api.Operator; import com.datatorrent.api.Partitioner; import com.datatorrent.api.StatsListener; +import com.datatorrent.lib.util.WindowDataManager; +import com.datatorrent.netlet.util.DTThrowable; /** * The abstract kafka input operator using kafka 0.9.0 new consumer API @@ -94,6 +98,10 @@ public abstract class AbstractKafkaInputOperator implements InputOperator, Opera */ private final Map<AbstractKafkaPartitioner.PartitionMeta, Long> offsetTrack = new HashMap<>(); + private final transient Map<AbstractKafkaPartitioner.PartitionMeta, Long> windowStartOffset = new HashMap<>(); + + private transient int operatorId; + private int initialPartitionCount = 1; private long repartitionInterval = 30000L; @@ -127,7 +135,7 @@ public abstract class AbstractKafkaInputOperator implements InputOperator, Opera * Wrapper consumer object * It wraps KafkaConsumer, maintains consumer thread and store messages in a queue */ - private transient final KafkaConsumerWrapper consumerWrapper = new KafkaConsumerWrapper(); + private final transient KafkaConsumerWrapper consumerWrapper = new KafkaConsumerWrapper(); /** * By default the strategy is one to one @@ -144,7 +152,7 @@ public abstract class AbstractKafkaInputOperator implements InputOperator, Opera /** * store offsets with window id, only keep offsets with windows that have not been committed */ - private transient final List<Pair<Long, Map<AbstractKafkaPartitioner.PartitionMeta, Long>>> offsetHistory = new LinkedList<>(); + private final transient List<Pair<Long, Map<AbstractKafkaPartitioner.PartitionMeta, Long>>> offsetHistory = new LinkedList<>(); /** * Application name is used as group.id for kafka consumer @@ -162,10 +170,12 @@ public abstract class AbstractKafkaInputOperator implements InputOperator, Opera @AutoMetric private transient KafkaMetrics metrics; + private WindowDataManager windowDataManager = new WindowDataManager.NoopWindowDataManager(); + @Override public void activate(Context.OperatorContext context) { - consumerWrapper.start(); + consumerWrapper.start(isIdempotent()); } @Override @@ -183,8 +193,9 @@ public abstract class AbstractKafkaInputOperator implements InputOperator, Opera @Override public void committed(long windowId) { - if (initialOffset == InitialOffset.LATEST || initialOffset == InitialOffset.EARLIEST) + if (initialOffset == InitialOffset.LATEST || initialOffset == InitialOffset.EARLIEST) { return; + } //ask kafka consumer wrapper to store the committed offsets for (Iterator<Pair<Long, Map<AbstractKafkaPartitioner.PartitionMeta, Long>>> iter = offsetHistory.iterator(); iter.hasNext(); ) { Pair<Long, Map<AbstractKafkaPartitioner.PartitionMeta, Long>> item = iter.next(); @@ -195,6 +206,13 @@ public abstract class AbstractKafkaInputOperator implements InputOperator, Opera iter.remove(); } } + if (isIdempotent()) { + try { + windowDataManager.deleteUpTo(operatorId, windowId); + } catch (IOException e) { + DTThrowable.rethrow(e); + } + } } @Override @@ -211,6 +229,9 @@ public abstract class AbstractKafkaInputOperator implements InputOperator, Opera AbstractKafkaPartitioner.PartitionMeta pm = new AbstractKafkaPartitioner.PartitionMeta(tuple.getLeft(), msg.topic(), msg.partition()); offsetTrack.put(pm, msg.offset() + 1); + if (isIdempotent() && !windowStartOffset.containsKey(pm)) { + windowStartOffset.put(pm, msg.offset()); + } } emitCount += count; } @@ -222,6 +243,23 @@ public abstract class AbstractKafkaInputOperator implements InputOperator, Opera { emitCount = 0; currentWindowId = wid; + windowStartOffset.clear(); + if (isIdempotent() && wid <= windowDataManager.getLargestRecoveryWindow()) { + replay(wid); + } else { + consumerWrapper.afterReplay(); + } + } + + private void replay(long windowId) + { + try { + Map<AbstractKafkaPartitioner.PartitionMeta, Pair<Long, Long>> windowData = + (Map<AbstractKafkaPartitioner.PartitionMeta, Pair<Long, Long>>)windowDataManager.load(operatorId, windowId); + consumerWrapper.emitImmediately(windowData); + } catch (IOException e) { + DTThrowable.rethrow(e); + } } @Override @@ -233,6 +271,19 @@ public abstract class AbstractKafkaInputOperator implements InputOperator, Opera //update metrics metrics.updateMetrics(clusters, consumerWrapper.getAllConsumerMetrics()); + + //update the windowDataManager + if (isIdempotent()) { + try { + Map<AbstractKafkaPartitioner.PartitionMeta, Pair<Long, Long>> windowData = new HashMap<>(); + for (Map.Entry<AbstractKafkaPartitioner.PartitionMeta, Long> e : windowStartOffset.entrySet()) { + windowData.put(e.getKey(), new MutablePair<>(e.getValue(), offsetTrack.get(e.getKey()) - e.getValue())); + } + windowDataManager.save(windowData, operatorId, currentWindowId); + } catch (IOException e) { + DTThrowable.rethrow(e); + } + } } @@ -243,13 +294,15 @@ public abstract class AbstractKafkaInputOperator implements InputOperator, Opera applicationName = context.getValue(Context.DAGContext.APPLICATION_NAME); consumerWrapper.create(this); metrics = new KafkaMetrics(metricsRefreshInterval); + windowDataManager.setup(context); + operatorId = context.getId(); } @Override public void teardown() { - + windowDataManager.teardown(); } private void initPartitioner() @@ -325,7 +378,7 @@ public abstract class AbstractKafkaInputOperator implements InputOperator, Opera } if (e != null) { logger.warn("Exceptions in committing offsets {} : {} ", - Joiner.on(';').withKeyValueSeparator("=").join(map), e); + Joiner.on(';').withKeyValueSeparator("=").join(map), e); } } @@ -339,6 +392,11 @@ public abstract class AbstractKafkaInputOperator implements InputOperator, Opera return assignment; } + private boolean isIdempotent() + { + return windowDataManager != null && !(windowDataManager instanceof WindowDataManager.NoopWindowDataManager); + } + //---------------------------------------------setters and getters---------------------------------------- public void setInitialPartitionCount(int partitionCount) { @@ -525,6 +583,16 @@ public abstract class AbstractKafkaInputOperator implements InputOperator, Opera return repartitionInterval; } + public void setWindowDataManager(WindowDataManager windowDataManager) + { + this.windowDataManager = windowDataManager; + } + + public WindowDataManager getWindowDataManager() + { + return windowDataManager; + } + /** * @omitFromUI * @return current checkpointed offsets http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/37f81309/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaConsumerWrapper.java ---------------------------------------------------------------------- diff --git a/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaConsumerWrapper.java b/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaConsumerWrapper.java index 7a1211a..adc9540 100644 --- a/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaConsumerWrapper.java +++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaConsumerWrapper.java @@ -22,6 +22,7 @@ package org.apache.apex.malhar.kafka; import java.io.Closeable; import java.io.IOException; import java.util.HashMap; +import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -51,8 +52,11 @@ import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import com.google.common.base.Joiner; +import com.google.common.collect.Iterables; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import com.datatorrent.netlet.util.DTThrowable; + /** * This is the wrapper class for new Kafka consumer API * @@ -83,6 +87,8 @@ public class KafkaConsumerWrapper implements Closeable private final Map<String, Map<TopicPartition, OffsetAndMetadata>> offsetsToCommit = new HashMap<>(); + private boolean waitForReplay = false; + /** * * Only put the offset needs to be committed in the ConsumerThread.offsetToCommit map @@ -109,6 +115,52 @@ public class KafkaConsumerWrapper implements Closeable } + public void emitImmediately(Map<AbstractKafkaPartitioner.PartitionMeta, Pair<Long, Long>> windowData) + { + for (Map.Entry<AbstractKafkaPartitioner.PartitionMeta, Pair<Long, Long>> windowEntry : windowData.entrySet()) { + AbstractKafkaPartitioner.PartitionMeta meta = windowEntry.getKey(); + Pair<Long, Long> replayOffsetSize = windowEntry.getValue(); + KafkaConsumer<byte[], byte[]> kc = consumers.get(meta.getCluster()); + if (kc == null && kc.assignment().contains(windowEntry.getKey().getTopicPartition())) { + throw new RuntimeException("Coundn't find consumer to replay the message PartitionMeta : " + meta); + } + //pause other partition + for (TopicPartition tp : kc.assignment()) { + if (meta.getTopicPartition().equals(tp)) { + kc.resume(tp); + } else { + kc.pause(tp); + } + } + // set the offset to window start offset + kc.seek(meta.getTopicPartition(), replayOffsetSize.getLeft()); + long windowCount = replayOffsetSize.getRight(); + while (windowCount > 0) { + try { + ConsumerRecords<byte[], byte[]> records = kc.poll(ownerOperator.getConsumerTimeout()); + for (Iterator<ConsumerRecord<byte[], byte[]>> cri = records.iterator(); cri.hasNext() && windowCount > 0;) { + ownerOperator.emitTuple(meta.getCluster(), cri.next()); + windowCount--; + } + } catch (NoOffsetForPartitionException e) { + throw new RuntimeException("Couldn't replay the offset", e); + } + } + // set the offset after window + kc.seek(meta.getTopicPartition(), replayOffsetSize.getLeft() + replayOffsetSize.getRight()); + } + + // resume all topics + for (KafkaConsumer<byte[], byte[]> kc : consumers.values()) { + kc.resume(Iterables.toArray(kc.assignment(), TopicPartition.class)); + } + + } + + public void afterReplay() + { + waitForReplay = false; + } static final class ConsumerThread implements Runnable { @@ -137,6 +189,10 @@ public class KafkaConsumerWrapper implements Closeable while (wrapper.isAlive) { + if (wrapper.waitForReplay) { + Thread.sleep(100); + continue; + } if (!this.offsetToCommit.isEmpty()) { // in each fetch cycle commit the offset if needed if (logger.isDebugEnabled()) { @@ -170,6 +226,8 @@ public class KafkaConsumerWrapper implements Closeable } } catch (WakeupException we) { logger.info("The consumer is being stopped"); + } catch (InterruptedException e) { + DTThrowable.rethrow(e); } finally { consumer.close(); } @@ -194,11 +252,11 @@ public class KafkaConsumerWrapper implements Closeable /** * This method is called in the activate method of the operator */ - public void start() + public void start(boolean waitForReplay) { + this.waitForReplay = waitForReplay; isAlive = true; - // thread to consume the kafka data // create thread pool for consumer threads kafkaConsumerExecutor = Executors.newCachedThreadPool( http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/37f81309/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java ---------------------------------------------------------------------- diff --git a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java index d055555..9c5d5dc 100644 --- a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java +++ b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java @@ -18,30 +18,44 @@ */ package org.apache.apex.malhar.kafka; +import java.io.File; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; import java.util.LinkedList; import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import org.junit.Assert; +import org.junit.Before; +import org.junit.Ignore; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.slf4j.LoggerFactory; +import org.apache.commons.io.FileUtils; + import com.datatorrent.api.Context; import com.datatorrent.api.DAG; import com.datatorrent.api.DAG.Locality; import com.datatorrent.api.DefaultInputPort; import com.datatorrent.api.LocalMode; import com.datatorrent.common.util.BaseOperator; +import com.datatorrent.lib.util.WindowDataManager; +import com.datatorrent.stram.StramLocalCluster; /** * A bunch of test to verify the input operator will be automatically partitioned per kafka partition This test is launching its * own Kafka cluster. */ +@Ignore @RunWith(Parameterized.class) public class KafkaInputOperatorTest extends KafkaOperatorTestBase { @@ -50,6 +64,10 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase private String partition = null; + private String testName = ""; + + public static String APPLICATION_PATH = baseDir + File.separator + StramLocalCluster.class.getName() + File.separator; + @Parameterized.Parameters(name = "multi-cluster: {0}, multi-partition: {1}, partition: {2}") public static Collection<Object[]> testScenario() { @@ -64,6 +82,19 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase }); } + @Before + public void before() + { + FileUtils.deleteQuietly(new File(APPLICATION_PATH)); + tupleCollection.clear(); + testName = TEST_TOPIC + testCounter++; + createTopic(0, testName); + if (hasMultiCluster) { + createTopic(1, testName); + } + + } + public KafkaInputOperatorTest(boolean hasMultiCluster, boolean hasMultiPartition, String partition) { // This class want to initialize several kafka brokers for multiple partitions @@ -76,6 +107,7 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase private static final org.slf4j.Logger logger = LoggerFactory.getLogger(KafkaInputOperatorTest.class); private static List<String> tupleCollection = new LinkedList<>(); + private static Map<String, Set<String>> tupleCollectedInWindow = new HashMap<>(); private static CountDownLatch latch; private static boolean hasFailure = false; private static int failureTrigger = 3000; @@ -88,11 +120,58 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase */ public static class CollectorModule extends BaseOperator { - public final transient CollectorInputPort inputPort = new CollectorInputPort(); + + public final transient CollectorInputPort inputPort = new CollectorInputPort(this); + + long currentWindowId; + + long operatorId; + + boolean isIdempotentTest = false; + + transient Set<String> windowTupleCollector = new HashSet<>(); + + @Override + public void setup(Context.OperatorContext context) + { + super.setup(context); + operatorId = context.getId(); + } + + @Override + public void beginWindow(long windowId) + { + super.beginWindow(windowId); + currentWindowId = windowId; + } + + @Override + public void endWindow() + { + super.endWindow(); + if (isIdempotentTest) { + String key = operatorId + "," + currentWindowId; + Set<String> msgsInWin = tupleCollectedInWindow.get(key); + if (msgsInWin!=null) { + Assert.assertEquals("replay messages should be exactly same as previous window", msgsInWin, windowTupleCollector); + } else { + Set<String> newSet = new HashSet<>(); + newSet.addAll(windowTupleCollector); + tupleCollectedInWindow.put(key, newSet); + } + } + windowTupleCollector.clear(); + } + } public static class CollectorInputPort extends DefaultInputPort<byte[]> { + CollectorModule ownerNode; + + CollectorInputPort(CollectorModule node) { + this.ownerNode = node; + } @Override public void process(byte[] bt) @@ -110,15 +189,11 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase return; } tupleCollection.add(tuple); - } - - @Override - public void setConnected(boolean flag) - { - if (flag) { - tupleCollection.clear(); + if (ownerNode.isIdempotentTest) { + ownerNode.windowTupleCollector.add(tuple); } } + } /** @@ -132,21 +207,28 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase * @throws Exception */ @Test - public void testPartitionableInputOperator() throws Exception + public void testInputOperator() throws Exception { hasFailure = false; - testInputOperator(false); + testInputOperator(false, false); } @Test - public void testPartitionableInputOperatorWithFailure() throws Exception + public void testInputOperatorWithFailure() throws Exception + { + hasFailure = true; + testInputOperator(true, false); + } + + @Test + public void testIdempotentInputOperatorWithFailure() throws Exception { hasFailure = true; - testInputOperator(true); + testInputOperator(true, true); } - public void testInputOperator(boolean hasFailure) throws Exception + public void testInputOperator(boolean hasFailure, boolean idempotent) throws Exception { // each broker should get a END_TUPLE message @@ -155,7 +237,7 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase int totalCount = 10000; // Start producer - KafkaTestProducer p = new KafkaTestProducer(TEST_TOPIC, hasMultiPartition, hasMultiCluster); + KafkaTestProducer p = new KafkaTestProducer(testName, hasMultiPartition, hasMultiCluster); p.setSendCount(totalCount); Thread t = new Thread(p); t.start(); @@ -168,32 +250,47 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase KafkaSinglePortInputOperator node = dag.addOperator("Kafka input", KafkaSinglePortInputOperator.class); node.setInitialPartitionCount(1); // set topic - node.setTopics(TEST_TOPIC); + node.setTopics(testName); node.setInitialOffset(AbstractKafkaInputOperator.InitialOffset.EARLIEST.name()); node.setClusters(getClusterConfig()); node.setStrategy(partition); + if(idempotent) { + node.setWindowDataManager(new WindowDataManager.FSWindowDataManager()); + } + // Create Test tuple collector - CollectorModule collector = dag.addOperator("TestMessageCollector", new CollectorModule()); + CollectorModule collector = dag.addOperator("TestMessageCollector", CollectorModule.class); + collector.isIdempotentTest = idempotent; // Connect ports dag.addStream("Kafka message", node.outputPort, collector.inputPort).setLocality(Locality.CONTAINER_LOCAL); - // Create local cluster - final LocalMode.Controller lc = lma.getController(); - lc.setHeartbeatMonitoringEnabled(false); if (hasFailure) { setupHasFailureTest(node, dag); } + + // Create local cluster + final LocalMode.Controller lc = lma.getController(); + lc.setHeartbeatMonitoringEnabled(false); + lc.runAsync(); // Wait 30s for consumer finish consuming all the messages boolean notTimeout = latch.await(40000, TimeUnit.MILLISECONDS); + Collections.sort(tupleCollection, new Comparator<String>() + { + @Override + public int compare(String o1, String o2) + { + return Integer.parseInt(o1.split("_")[1]) - Integer.parseInt(o2.split("_")[1]); + } + }); Assert.assertTrue("TIMEOUT: 40s Collected " + tupleCollection, notTimeout); // Check results - Assert.assertEquals("Tuple count", totalCount, tupleCollection.size()); + Assert.assertTrue("Collected tuples " + tupleCollection + " Tuple count is not expected", totalCount <=+ tupleCollection.size()); logger.debug(String.format("Number of emitted tuples: %d", tupleCollection.size())); t.join(); @@ -205,7 +302,7 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase { operator.setHoldingBufferSize(5000); dag.setAttribute(Context.DAGContext.CHECKPOINT_WINDOW_COUNT, 1); - //dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new FSStorageAgent("target/ck", new Configuration())); + //dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new FSStorageAgent(APPLICATION_PATH + "failureck", new Configuration())); operator.setMaxTuplesPerWindow(500); } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/37f81309/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaOperatorTestBase.java ---------------------------------------------------------------------- diff --git a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaOperatorTestBase.java b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaOperatorTestBase.java index 7085348..a05fd9b 100644 --- a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaOperatorTestBase.java +++ b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaOperatorTestBase.java @@ -24,8 +24,8 @@ import java.net.InetSocketAddress; import java.net.ServerSocket; import java.util.Properties; -import org.junit.After; -import org.junit.Before; +import org.junit.AfterClass; +import org.junit.BeforeClass; import org.slf4j.LoggerFactory; import org.apache.commons.io.FileUtils; @@ -51,6 +51,7 @@ public class KafkaOperatorTestBase public static final int[] TEST_ZOOKEEPER_PORT; public static final int[][] TEST_KAFKA_BROKER_PORT; public static final String TEST_TOPIC = "testtopic"; + public static int testCounter = 0; // get available ports static { @@ -81,27 +82,27 @@ public class KafkaOperatorTestBase // since Kafka 0.8 use KafkaServerStatble instead of KafkaServer // multiple brokers in multiple cluster - private KafkaServerStartable[][] broker = new KafkaServerStartable[2][2]; + private static KafkaServerStartable[][] broker = new KafkaServerStartable[2][2]; // multiple cluster - private ServerCnxnFactory[] zkFactory = new ServerCnxnFactory[2]; + private static ServerCnxnFactory[] zkFactory = new ServerCnxnFactory[2]; - private ZooKeeperServer[] zkServer = new ZooKeeperServer[2]; + private static ZooKeeperServer[] zkServer = new ZooKeeperServer[2]; - public String baseDir = "target"; + public static String baseDir = "target"; - private final String zkBaseDir = "zookeeper-server-data"; - private final String kafkaBaseDir = "kafka-server-data"; - private final String[] zkdir = new String[] { "zookeeper-server-data/1", "zookeeper-server-data/2" }; - private final String[][] kafkadir = new String[][] { new String[] { "kafka-server-data/1/1", "kafka-server-data/1/2" }, new String[] { "kafka-server-data/2/1", "kafka-server-data/2/2" } }; + private static final String zkBaseDir = "zookeeper-server-data"; + private static final String kafkaBaseDir = "kafka-server-data"; + private static final String[] zkdir = new String[] { "zookeeper-server-data/1", "zookeeper-server-data/2" }; + private static final String[][] kafkadir = new String[][] { new String[] { "kafka-server-data/1/1", "kafka-server-data/1/2" }, new String[] { "kafka-server-data/2/1", "kafka-server-data/2/2" } }; protected boolean hasMultiPartition = false; protected boolean hasMultiCluster = false; - public void startZookeeper(final int clusterId) + public static void startZookeeper(final int clusterId) { try { - int numConnections = 10; + int numConnections = 100; int tickTime = 2000; File dir = new File(baseDir, zkdir[clusterId]); @@ -117,7 +118,7 @@ public class KafkaOperatorTestBase } } - public void stopZookeeper() + public static void stopZookeeper() { for (ZooKeeperServer zs : zkServer) { if (zs != null) { @@ -135,44 +136,37 @@ public class KafkaOperatorTestBase zkFactory = new ServerCnxnFactory[2]; } - public void startKafkaServer(int clusterid, int brokerid) + public static void startKafkaServer(int clusterid, int brokerid) { Properties props = new Properties(); - props.setProperty("broker.id", "" + brokerid); + props.setProperty("broker.id", "" + clusterid * 10 + brokerid); props.setProperty("log.dirs", new File(baseDir, kafkadir[clusterid][brokerid]).toString()); props.setProperty("zookeeper.connect", "localhost:" + TEST_ZOOKEEPER_PORT[clusterid]); props.setProperty("port", "" + TEST_KAFKA_BROKER_PORT[clusterid][brokerid]); props.setProperty("default.replication.factor", "1"); // set this to 50000 to boost the performance so most test data are in memory before flush to disk props.setProperty("log.flush.interval.messages", "50000"); - if (hasMultiPartition) { - props.setProperty("num.partitions", "2"); - } else { - props.setProperty("num.partitions", "1"); - } broker[clusterid][brokerid] = new KafkaServerStartable(new KafkaConfig(props)); broker[clusterid][brokerid].startup(); } - public void startKafkaServer() + public static void startKafkaServer() { FileUtils.deleteQuietly(new File(baseDir, kafkaBaseDir)); - boolean[][] startable = new boolean[][] { new boolean[] { true, hasMultiPartition }, new boolean[] { hasMultiCluster, hasMultiCluster && hasMultiPartition } }; - for (int i = 0; i < startable.length; i++) { - for (int j = 0; j < startable[i].length; j++) { - if (startable[i][j]) - startKafkaServer(i, j); - } - } + //boolean[][] startable = new boolean[][] { new boolean[] { true, hasMultiPartition }, new boolean[] { hasMultiCluster, hasMultiCluster && hasMultiPartition } }; + startKafkaServer(0, 0); + startKafkaServer(0, 1); + startKafkaServer(1, 0); + startKafkaServer(1, 1); // startup is asynch operation. wait 2 sec for server to startup } - public void stopKafkaServer() + public static void stopKafkaServer() { for (int i = 0; i < broker.length; i++) { for (int j = 0; j < broker[i].length; j++) { @@ -185,28 +179,22 @@ public class KafkaOperatorTestBase } } - @Before - public void beforeTest() + @BeforeClass + public static void beforeTest() { try { startZookeeper(); startKafkaServer(); - createTopic(0, TEST_TOPIC); - if (hasMultiCluster) { - createTopic(1, TEST_TOPIC); - } } catch (java.nio.channels.CancelledKeyException ex) { logger.debug("LSHIL {}", ex.getLocalizedMessage()); } } - public void startZookeeper() + public static void startZookeeper() { FileUtils.deleteQuietly(new File(baseDir, zkBaseDir)); startZookeeper(0); - if (hasMultiCluster) { - startZookeeper(1); - } + startZookeeper(1); } public void createTopic(int clusterid, String topicName) @@ -229,19 +217,10 @@ public class KafkaOperatorTestBase ZkUtils zu = ZkUtils.apply("localhost:" + TEST_ZOOKEEPER_PORT[clusterid], 30000, 30000, false); TopicCommand.createTopic(zu, new TopicCommand.TopicCommandOptions(args)); - // Right now, there is no programmatic synchronized way to create the topic. have to wait 2 sec to make sure the - // topic is created - // So the tests will not hit any bizarre failure - try { - Thread.sleep(5000); - zu.close(); - } catch (InterruptedException e) { - e.printStackTrace(); - } } - @After - public void afterTest() + @AfterClass + public static void afterTest() { try { stopKafkaServer();
