Repository: sqoop Updated Branches: refs/heads/sqoop2 8f8205118 -> 185708672
SQOOP-2322: Sqoop2: Kafka topic should vary on a per method basis (Abraham Elmahrek via Jarek Jarcec Cecho) Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/18570867 Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/18570867 Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/18570867 Branch: refs/heads/sqoop2 Commit: 1857086722ef5900bc2ecea1475a009a043a6ba2 Parents: 8f82051 Author: Jarek Jarcec Cecho <[email protected]> Authored: Sun Apr 26 12:09:29 2015 -0700 Committer: Jarek Jarcec Cecho <[email protected]> Committed: Sun Apr 26 12:09:29 2015 -0700 ---------------------------------------------------------------------- .../test/testcases/KafkaConnectorTestCase.java | 17 ++++++++--------- .../connector/kafka/FromHDFSToKafkaTest.java | 2 ++ .../connector/kafka/FromRDBMSToKafkaTest.java | 2 ++ 3 files changed, 12 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sqoop/blob/18570867/test/src/main/java/org/apache/sqoop/test/testcases/KafkaConnectorTestCase.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/sqoop/test/testcases/KafkaConnectorTestCase.java b/test/src/main/java/org/apache/sqoop/test/testcases/KafkaConnectorTestCase.java index 9aa69ed..f15c07e 100644 --- a/test/src/main/java/org/apache/sqoop/test/testcases/KafkaConnectorTestCase.java +++ b/test/src/main/java/org/apache/sqoop/test/testcases/KafkaConnectorTestCase.java @@ -18,14 +18,13 @@ package org.apache.sqoop.test.testcases; import kafka.message.MessageAndMetadata; -import org.apache.sqoop.common.Direction; import org.apache.sqoop.model.MConfigList; import org.apache.sqoop.model.MJob; import org.apache.sqoop.model.MLink; -import org.testng.annotations.AfterClass; import org.testng.Assert; -import org.testng.annotations.BeforeClass; import org.apache.sqoop.common.test.kafka.TestUtil; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; import java.io.IOException; import java.io.UnsupportedEncodingException; @@ -39,16 +38,16 @@ import static org.apache.sqoop.connector.common.SqoopIDFUtils.toText; public class KafkaConnectorTestCase extends ConnectorTestCase { private static TestUtil testUtil = TestUtil.getInstance(); - private static final String TOPIC = "mytopic"; + protected String topic; @BeforeClass(alwaysRun = true) - public static void startKafka() throws Exception { + public void startKafka() throws Exception { // starts Kafka server and its dependent zookeeper testUtil.prepare(); } @AfterClass(alwaysRun = true) - public static void stopKafka() throws IOException { + public void stopKafka() throws IOException { testUtil.tearDown(); } @@ -61,9 +60,9 @@ public class KafkaConnectorTestCase extends ConnectorTestCase { protected void fillKafkaToConfig(MJob job){ MConfigList toConfig = job.getToJobConfig(); - toConfig.getStringInput("toJobConfig.topic").setValue(TOPIC); + toConfig.getStringInput("toJobConfig.topic").setValue(topic); List<String> topics = new ArrayList<String>(1); - topics.add(TOPIC); + topics.add(topic); testUtil.initTopicList(topics); } @@ -79,7 +78,7 @@ public class KafkaConnectorTestCase extends ConnectorTestCase { for(String str: content) { MessageAndMetadata<byte[],byte[]> fetchedMsg = - testUtil.getNextMessageFromConsumer(TOPIC); + testUtil.getNextMessageFromConsumer(topic); outputSet.add(toText(new String(fetchedMsg.message(), "UTF-8"))); } http://git-wip-us.apache.org/repos/asf/sqoop/blob/18570867/test/src/test/java/org/apache/sqoop/integration/connector/kafka/FromHDFSToKafkaTest.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/sqoop/integration/connector/kafka/FromHDFSToKafkaTest.java b/test/src/test/java/org/apache/sqoop/integration/connector/kafka/FromHDFSToKafkaTest.java index 88db2f2..9ec4e8f 100644 --- a/test/src/test/java/org/apache/sqoop/integration/connector/kafka/FromHDFSToKafkaTest.java +++ b/test/src/test/java/org/apache/sqoop/integration/connector/kafka/FromHDFSToKafkaTest.java @@ -34,6 +34,8 @@ public class FromHDFSToKafkaTest extends KafkaConnectorTestCase { }; @Test public void testBasic() throws Exception { + topic = getTestName(); + createFromFile("input-0001",input); // Create Kafka link http://git-wip-us.apache.org/repos/asf/sqoop/blob/18570867/test/src/test/java/org/apache/sqoop/integration/connector/kafka/FromRDBMSToKafkaTest.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/sqoop/integration/connector/kafka/FromRDBMSToKafkaTest.java b/test/src/test/java/org/apache/sqoop/integration/connector/kafka/FromRDBMSToKafkaTest.java index 92a52b8..dc1a80f 100644 --- a/test/src/test/java/org/apache/sqoop/integration/connector/kafka/FromRDBMSToKafkaTest.java +++ b/test/src/test/java/org/apache/sqoop/integration/connector/kafka/FromRDBMSToKafkaTest.java @@ -37,6 +37,8 @@ public class FromRDBMSToKafkaTest extends KafkaConnectorTestCase { @Test public void testBasic() throws Exception { + topic = getTestName(); + createAndLoadTableCities(); // Kafka link
