MLHR-1960 #comment Clean resources that would conflict with tests to be run in before test
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/32b1c67e Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/32b1c67e Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/32b1c67e Branch: refs/heads/master Commit: 32b1c67eeca155ba17b84a3d61827764babc605a Parents: 216c56a Author: Siyuan Hua <[email protected]> Authored: Mon Jan 4 17:06:56 2016 -0800 Committer: Siyuan Hua <[email protected]> Committed: Mon Jan 4 17:06:56 2016 -0800 ---------------------------------------------------------------------- .../contrib/kafka/KafkaInputOperatorTest.java | 127 +++++++++++-------- .../contrib/kafka/KafkaOperatorTestBase.java | 10 +- 2 files changed, 83 insertions(+), 54 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/32b1c67e/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaInputOperatorTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaInputOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaInputOperatorTest.java index 2a5a38d..9db1355 100644 --- a/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaInputOperatorTest.java +++ b/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaInputOperatorTest.java @@ -18,6 +18,26 @@ */ package com.datatorrent.contrib.kafka; +import java.io.File; +import java.io.IOException; +import java.util.Collection; +import java.util.LinkedList; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestWatcher; +import org.junit.runner.Description; +import org.slf4j.LoggerFactory; + import com.datatorrent.api.Attribute; import com.datatorrent.api.Context; import com.datatorrent.api.DAG; @@ -32,25 +52,8 @@ import com.datatorrent.lib.helper.OperatorContextTestHelper; import com.datatorrent.lib.io.IdempotentStorageManager; import com.datatorrent.lib.partitioner.StatelessPartitionerTest; import com.datatorrent.lib.testbench.CollectorTestSink; -import org.apache.commons.io.FileUtils; -import org.junit.After; -import org.junit.Assert; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TestWatcher; -import org.junit.runner.Description; -import org.slf4j.LoggerFactory; +import com.datatorrent.stram.StramLocalCluster; -import java.io.File; -import java.io.IOException; -import java.util.Collection; -import java.util.LinkedList; -import java.util.List; -import java.util.Properties; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import org.apache.hadoop.conf.Configuration; public class KafkaInputOperatorTest extends KafkaOperatorTestBase { @@ -230,11 +233,20 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase } @Override - @After - public void afterTest() + @Before + public void beforeTest() { tupleCount.set(0); - super.afterTest(); + File syncCheckPoint = new File("target", "ck"); + File localFiles = new File("target" + StramLocalCluster.class.getName()); + try { + FileUtils.deleteQuietly(syncCheckPoint); + FileUtils.deleteQuietly(localFiles); + } catch (Exception e) { + throw new RuntimeException(e); + } finally { + super.beforeTest(); + } } public static class TestMeta extends TestWatcher @@ -251,9 +263,9 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase String methodName = description.getMethodName(); String className = description.getClassName(); baseDir = "target/" + className + "/" + methodName; - recoveryDir = baseDir + "/" + "recovery"; + recoveryDir = "recovery"; try { - FileUtils.deleteDirectory(new File(recoveryDir)); + FileUtils.deleteDirectory(new File(baseDir, "recovery")); } catch (IOException e) { throw new RuntimeException(e); } @@ -276,10 +288,50 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase p.setSendCount(totalCount); new Thread(p).start(); + + KafkaSinglePortStringInputOperator operator = createAndDeployOperator(); + latch.await(4000, TimeUnit.MILLISECONDS); + operator.beginWindow(1); + operator.emitTuples(); + operator.endWindow(); + operator.beginWindow(2); + operator.emitTuples(); + operator.endWindow(); + + //failure and then re-deployment of operator + testMeta.sink.collectedTuples.clear(); + operator.teardown(); + operator.deactivate(); + + operator = createAndDeployOperator(); + Assert.assertEquals("largest recovery window", 2, operator.getIdempotentStorageManager().getLargestRecoveryWindow()); + + operator.beginWindow(1); + operator.emitTuples(); + operator.endWindow(); + operator.beginWindow(2); + operator.emitTuples(); + operator.endWindow(); + latch.await(3000, TimeUnit.MILLISECONDS); + // Emiting data after all recovery windows are replayed + operator.beginWindow(3); + operator.emitTuples(); + operator.endWindow(); + + Assert.assertEquals("Total messages collected ", totalCount, testMeta.sink.collectedTuples.size()); + testMeta.sink.collectedTuples.clear(); + operator.teardown(); + operator.deactivate(); + } + + private KafkaSinglePortStringInputOperator createAndDeployOperator() + { + Attribute.AttributeMap attributeMap = new Attribute.AttributeMap.DefaultAttributeMap(); attributeMap.put(Context.OperatorContext.SPIN_MILLIS, 500); attributeMap.put(Context.DAGContext.APPLICATION_PATH, testMeta.baseDir); + testMeta.context = new OperatorContextTestHelper.TestIdOperatorContext(1, attributeMap); testMeta.operator = new KafkaSinglePortStringInputOperator(); @@ -304,37 +356,12 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase testMeta.sink = new CollectorTestSink<Object>(); testMeta.operator.outputPort.setSink(testMeta.sink); operator.outputPort.setSink(testMeta.sink); - operator.setup(testMeta.context); - operator.activate(testMeta.context); - latch.await(4000, TimeUnit.MILLISECONDS); - operator.beginWindow(1); - operator.emitTuples(); - operator.endWindow(); - operator.beginWindow(2); - operator.emitTuples(); - operator.endWindow(); - //failure and then re-deployment of operator - testMeta.sink.collectedTuples.clear(); - operator.teardown(); operator.setup(testMeta.context); + operator.activate(testMeta.context); - Assert.assertEquals("largest recovery window", 2, operator.getIdempotentStorageManager().getLargestRecoveryWindow()); + return operator; - operator.beginWindow(1); - operator.emitTuples(); - operator.endWindow(); - operator.beginWindow(2); - operator.emitTuples(); - operator.endWindow(); - latch.await(3000, TimeUnit.MILLISECONDS); - // Emiting data after all recovery windows are replayed - operator.beginWindow(3); - operator.emitTuples(); - operator.endWindow(); - - Assert.assertEquals("Total messages collected ", totalCount, testMeta.sink.collectedTuples.size()); - testMeta.sink.collectedTuples.clear(); } @Test http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/32b1c67e/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaOperatorTestBase.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaOperatorTestBase.java b/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaOperatorTestBase.java index f4f5ef2..64651f4 100644 --- a/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaOperatorTestBase.java +++ b/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaOperatorTestBase.java @@ -26,8 +26,8 @@ import java.util.Properties; import kafka.admin.TopicCommand; import kafka.server.KafkaConfig; import kafka.server.KafkaServerStartable; -import kafka.utils.Utils; +import org.apache.commons.io.FileUtils; import org.apache.zookeeper.server.NIOServerCnxnFactory; import org.apache.zookeeper.server.ServerCnxnFactory; import org.apache.zookeeper.server.ZKDatabase; @@ -71,7 +71,8 @@ public class KafkaOperatorTestBase { try { - + //before start, clean the zookeeper files if it exists + FileUtils.deleteQuietly(new File(baseDir, zkBaseDir)); int clientPort = TEST_ZOOKEEPER_PORT[clusterId]; int numConnections = 10; int tickTime = 2000; @@ -96,11 +97,13 @@ public class KafkaOperatorTestBase zkf.shutdown(); } } - Utils.rm(new File(baseDir, zkBaseDir)); } public void startKafkaServer(int clusterid, int brokerid, int defaultPartitions) { + // before start, clean the kafka dir if it exists + FileUtils.deleteQuietly(new File(baseDir, kafkaBaseDir)); + Properties props = new Properties(); props.setProperty("broker.id", "" + brokerid); props.setProperty("log.dirs", new File(baseDir, kafkadir[clusterid][brokerid]).toString()); @@ -149,7 +152,6 @@ public class KafkaOperatorTestBase } } } - Utils.rm(new File(baseDir, kafkaBaseDir)); } @Before
