This is an automated email from the ASF dual-hosted git repository.
luoc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push:
new 399ff45 DRILL-7929: Update Kafka unit tests to use ClusterTest
399ff45 is described below
commit 399ff45b5255828c2511756ba6c325f1fd743ac8
Author: Volodymyr Vysotskyi <[email protected]>
AuthorDate: Sat May 15 14:07:58 2021 +0300
DRILL-7929: Update Kafka unit tests to use ClusterTest
---
.../exec/store/kafka/KafkaFilterPushdownTest.java | 155 +++++++++++++++------
.../drill/exec/store/kafka/KafkaQueriesTest.java | 31 +++--
.../drill/exec/store/kafka/KafkaTestBase.java | 37 +++--
.../java/org/apache/drill/test/QueryBuilder.java | 12 ++
4 files changed, 154 insertions(+), 81 deletions(-)
diff --git
a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaFilterPushdownTest.java
b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaFilterPushdownTest.java
index 4bfd5d4..757f6e3 100644
---
a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaFilterPushdownTest.java
+++
b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaFilterPushdownTest.java
@@ -17,7 +17,6 @@
*/
package org.apache.drill.exec.store.kafka;
-import org.apache.drill.PlanTestBase;
import org.apache.drill.categories.KafkaStorageTest;
import org.apache.drill.categories.SlowTest;
import org.apache.kafka.common.serialization.StringSerializer;
@@ -59,8 +58,11 @@ public class KafkaFilterPushdownTest extends KafkaTestBase {
TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate1, predicate2);
runKafkaSQLVerifyCount(queryString, expectedRowCount);
- PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT,
- new String[] {String.format(EXPECTED_PATTERN, expectedRowCount)},
null);
+ queryBuilder()
+ .sql(queryString)
+ .jsonPlanMatcher()
+ .include(String.format(EXPECTED_PATTERN, expectedRowCount))
+ .match();
}
/**
@@ -75,8 +77,11 @@ public class KafkaFilterPushdownTest extends KafkaTestBase {
TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate);
runKafkaSQLVerifyCount(queryString, expectedRowCount);
- PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT,
- new String[] {String.format(EXPECTED_PATTERN, expectedRowCount)},
null);
+ queryBuilder()
+ .sql(queryString)
+ .jsonPlanMatcher()
+ .include(String.format(EXPECTED_PATTERN, expectedRowCount))
+ .match();
}
/**
@@ -91,8 +96,11 @@ public class KafkaFilterPushdownTest extends KafkaTestBase {
TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate);
runKafkaSQLVerifyCount(queryString,expectedRowCount);
- PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT,
- new String[] {String.format(EXPECTED_PATTERN, expectedRowCount)},
null);
+ queryBuilder()
+ .sql(queryString)
+ .jsonPlanMatcher()
+ .include(String.format(EXPECTED_PATTERN, expectedRowCount))
+ .match();
}
/**
@@ -108,8 +116,11 @@ public class KafkaFilterPushdownTest extends KafkaTestBase
{
TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate);
runKafkaSQLVerifyCount(queryString,expectedRowCount);
- PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT,
- new String[] {String.format(EXPECTED_PATTERN, expectedRowInPlan)},
null);
+ queryBuilder()
+ .sql(queryString)
+ .jsonPlanMatcher()
+ .include(String.format(EXPECTED_PATTERN, expectedRowInPlan))
+ .match();
}
/**
@@ -124,8 +135,11 @@ public class KafkaFilterPushdownTest extends KafkaTestBase
{
TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate);
runKafkaSQLVerifyCount(queryString,expectedRowCount);
- PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT,
- new String[] {String.format(EXPECTED_PATTERN, expectedRowCount)},
null);
+ queryBuilder()
+ .sql(queryString)
+ .jsonPlanMatcher()
+ .include(String.format(EXPECTED_PATTERN, expectedRowCount))
+ .match();
}
/**
@@ -140,8 +154,11 @@ public class KafkaFilterPushdownTest extends KafkaTestBase
{
TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate);
runKafkaSQLVerifyCount(queryString,expectedRowCount);
- PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT,
- new String[] {String.format(EXPECTED_PATTERN, expectedRowCount)},
null);
+ queryBuilder()
+ .sql(queryString)
+ .jsonPlanMatcher()
+ .include(String.format(EXPECTED_PATTERN, expectedRowCount))
+ .match();
}
/**
@@ -157,8 +174,11 @@ public class KafkaFilterPushdownTest extends KafkaTestBase
{
TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate1, predicate2);
runKafkaSQLVerifyCount(queryString,expectedRowCount);
- PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT,
- new String[] {String.format(EXPECTED_PATTERN, expectedRowCount)},
null);
+ queryBuilder()
+ .sql(queryString)
+ .jsonPlanMatcher()
+ .include(String.format(EXPECTED_PATTERN, expectedRowCount))
+ .match();
}
/**
@@ -174,48 +194,66 @@ public class KafkaFilterPushdownTest extends
KafkaTestBase {
TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgOffset = 10");
runKafkaSQLVerifyCount(queryString,expectedRowCount);
- PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT,
- new String[] {String.format(EXPECTED_PATTERN, expectedRowCount)},
null);
+ queryBuilder()
+ .sql(queryString)
+ .jsonPlanMatcher()
+ .include(String.format(EXPECTED_PATTERN, expectedRowCount))
+ .match();
//"equal" such that value < startOffset
queryString = String.format(TestQueryConstants.QUERY_TEMPLATE_BASIC,
TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgOffset = -1");
runKafkaSQLVerifyCount(queryString,expectedRowCount);
- PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT,
- new String[] {String.format(EXPECTED_PATTERN, expectedRowCount)},
null);
+ queryBuilder()
+ .sql(queryString)
+ .jsonPlanMatcher()
+ .include(String.format(EXPECTED_PATTERN, expectedRowCount))
+ .match();
//"greater_than" such that value = endOffset-1
queryString = String.format(TestQueryConstants.QUERY_TEMPLATE_BASIC,
TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgOffset > 9");
runKafkaSQLVerifyCount(queryString,expectedRowCount);
- PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT,
- new String[] {String.format(EXPECTED_PATTERN, expectedRowCount)},
null);
+ queryBuilder()
+ .sql(queryString)
+ .jsonPlanMatcher()
+ .include(String.format(EXPECTED_PATTERN, expectedRowCount))
+ .match();
//"greater_than_or_equal" such that value = endOffset
queryString = String.format(TestQueryConstants.QUERY_TEMPLATE_BASIC,
TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgOffset >= 10");
runKafkaSQLVerifyCount(queryString,expectedRowCount);
- PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT,
- new String[] {String.format(EXPECTED_PATTERN, expectedRowCount)},
null);
+ queryBuilder()
+ .sql(queryString)
+ .jsonPlanMatcher()
+ .include(String.format(EXPECTED_PATTERN, expectedRowCount))
+ .match();
//"less_than" such that value = startOffset
queryString = String.format(TestQueryConstants.QUERY_TEMPLATE_BASIC,
TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgOffset < 0");
runKafkaSQLVerifyCount(queryString,expectedRowCount);
- PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT,
- new String[] {String.format(EXPECTED_PATTERN, expectedRowCount)},
null);
+ queryBuilder()
+ .sql(queryString)
+ .jsonPlanMatcher()
+ .include(String.format(EXPECTED_PATTERN, expectedRowCount))
+ .match();
//"less_than_or_equal" such that value < startOffset
queryString = String.format(TestQueryConstants.QUERY_TEMPLATE_BASIC,
TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgOffset <= -1");
runKafkaSQLVerifyCount(queryString,expectedRowCount);
- PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT,
- new String[] {String.format(EXPECTED_PATTERN, expectedRowCount)},
null);
+ queryBuilder()
+ .sql(queryString)
+ .jsonPlanMatcher()
+ .include(String.format(EXPECTED_PATTERN, expectedRowCount))
+ .match();
}
/**
@@ -231,24 +269,33 @@ public class KafkaFilterPushdownTest extends
KafkaTestBase {
TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgOffset = 9");
runKafkaSQLVerifyCount(queryString, expectedRowCount);
- PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT,
- new String[] {String.format(EXPECTED_PATTERN, expectedRowCount)},
null);
+ queryBuilder()
+ .sql(queryString)
+ .jsonPlanMatcher()
+ .include(String.format(EXPECTED_PATTERN, expectedRowCount))
+ .match();
//"greater_than" such that value = endOffset-2
queryString = String.format(TestQueryConstants.QUERY_TEMPLATE_BASIC,
TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgOffset > 8");
runKafkaSQLVerifyCount(queryString,expectedRowCount);
- PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT,
- new String[] {String.format(EXPECTED_PATTERN, expectedRowCount)},
null);
+ queryBuilder()
+ .sql(queryString)
+ .jsonPlanMatcher()
+ .include(String.format(EXPECTED_PATTERN, expectedRowCount))
+ .match();
//"greater_than_or_equal" such that value = endOffset-1
queryString = String.format(TestQueryConstants.QUERY_TEMPLATE_BASIC,
TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgOffset >= 9");
runKafkaSQLVerifyCount(queryString,expectedRowCount);
- PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT,
- new String[] {String.format(EXPECTED_PATTERN, expectedRowCount)},
null);
+ queryBuilder()
+ .sql(queryString)
+ .jsonPlanMatcher()
+ .include(String.format(EXPECTED_PATTERN, expectedRowCount))
+ .match();
}
/**
@@ -265,8 +312,11 @@ public class KafkaFilterPushdownTest extends KafkaTestBase
{
TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate1, predicate2);
runKafkaSQLVerifyCount(queryString,expectedRowCount);
- PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT,
- new String[] {String.format(EXPECTED_PATTERN, expectedRowCount)},
null);
+ queryBuilder()
+ .sql(queryString)
+ .jsonPlanMatcher()
+ .include(String.format(EXPECTED_PATTERN, expectedRowCount))
+ .match();
}
/**
@@ -283,8 +333,11 @@ public class KafkaFilterPushdownTest extends KafkaTestBase
{
TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate1, predicate2);
runKafkaSQLVerifyCount(queryString,expectedRowCount);
- PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT,
- new String[] {String.format(EXPECTED_PATTERN, expectedRowInPlan)},
null);
+ queryBuilder()
+ .sql(queryString)
+ .jsonPlanMatcher()
+ .include(String.format(EXPECTED_PATTERN, expectedRowInPlan))
+ .match();
}
/**
@@ -301,8 +354,11 @@ public class KafkaFilterPushdownTest extends KafkaTestBase
{
TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate1, predicate2,
predicate3);
runKafkaSQLVerifyCount(queryString,expectedRowCount);
- PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT,
- new String[] {String.format(EXPECTED_PATTERN, expectedRowCount)},
null);
+ queryBuilder()
+ .sql(queryString)
+ .jsonPlanMatcher()
+ .include(String.format(EXPECTED_PATTERN, expectedRowCount))
+ .match();
}
/**
@@ -321,8 +377,11 @@ public class KafkaFilterPushdownTest extends KafkaTestBase
{
TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate1, predicate2,
predicate3, predicate4);
runKafkaSQLVerifyCount(queryString,expectedRowCount);
- PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT,
- new String[] {String.format(EXPECTED_PATTERN,
expectedRowCountInPlan)}, null);
+ queryBuilder()
+ .sql(queryString)
+ .jsonPlanMatcher()
+ .include(String.format(EXPECTED_PATTERN, expectedRowCountInPlan))
+ .match();
}
/**
@@ -340,8 +399,11 @@ public class KafkaFilterPushdownTest extends KafkaTestBase
{
TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate1, predicate2);
runKafkaSQLVerifyCount(queryString,expectedRowCount);
- PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT,
- new String[] {String.format(EXPECTED_PATTERN,
expectedRowCountInPlan)}, null);
+ queryBuilder()
+ .sql(queryString)
+ .jsonPlanMatcher()
+ .include(String.format(EXPECTED_PATTERN, expectedRowCountInPlan))
+ .match();
}
/**
@@ -360,7 +422,10 @@ public class KafkaFilterPushdownTest extends KafkaTestBase
{
TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate1, predicate2,
predicate3);
runKafkaSQLVerifyCount(queryString,expectedRowCount);
- PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT,
- new String[] {String.format(EXPECTED_PATTERN,
expectedRowCountInPlan)}, null);
+ queryBuilder()
+ .sql(queryString)
+ .jsonPlanMatcher()
+ .include(String.format(EXPECTED_PATTERN, expectedRowCountInPlan))
+ .match();
}
}
diff --git
a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaQueriesTest.java
b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaQueriesTest.java
index 62d1b66..cc7649f 100644
---
a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaQueriesTest.java
+++
b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaQueriesTest.java
@@ -58,7 +58,7 @@ public class KafkaQueriesTest extends KafkaTestBase {
}
@Test
- public void testResultCount() throws Exception {
+ public void testResultCount() {
String queryString = String.format(TestQueryConstants.MSG_SELECT_QUERY,
TestQueryConstants.JSON_TOPIC);
runKafkaSQLVerifyCount(queryString, TestKafkaSuit.NUM_JSON_MSG);
}
@@ -94,7 +94,7 @@ public class KafkaQueriesTest extends KafkaTestBase {
@Test
public void testInformationSchema() throws Exception {
String query = "select * from information_schema.`views`";
- runSQL(query);
+ queryBuilder().sql(query).run();
}
private Map<TopicPartition, Long> fetchOffsets(int flag) {
@@ -136,7 +136,8 @@ public class KafkaQueriesTest extends KafkaTestBase {
@Test
public void testPhysicalPlanSubmission() throws Exception {
String query = String.format(TestQueryConstants.MSG_SELECT_QUERY,
TestQueryConstants.JSON_TOPIC);
- testPhysicalPlanExecutionBasedOnQuery(query);
+ String plan = queryBuilder().sql(query).explainJson();
+ queryBuilder().physical(plan).run();
}
@Test
@@ -162,15 +163,15 @@ public class KafkaQueriesTest extends KafkaTestBase {
KafkaMessageGenerator generator = new
KafkaMessageGenerator(embeddedKafkaCluster.getKafkaBrokerList(),
StringSerializer.class);
generator.populateMessages(topicName, "Test");
- alterSession(ExecConstants.KAFKA_READER_SKIP_INVALID_RECORDS, false);
+ client.alterSession(ExecConstants.KAFKA_READER_SKIP_INVALID_RECORDS,
false);
try {
- test("select * from kafka.`%s`", topicName);
+ queryBuilder().sql("select * from kafka.`%s`", topicName).run();
fail();
} catch (UserException e) {
// expected
}
- alterSession(ExecConstants.KAFKA_READER_SKIP_INVALID_RECORDS, true);
+ client.alterSession(ExecConstants.KAFKA_READER_SKIP_INVALID_RECORDS,
true);
testBuilder()
.sqlQuery("select * from kafka.`%s`", topicName)
.expectsEmptyResultSet();
@@ -185,7 +186,7 @@ public class KafkaQueriesTest extends KafkaTestBase {
.baselineValues(2L)
.go();
} finally {
- resetSessionOption(ExecConstants.KAFKA_READER_SKIP_INVALID_RECORDS);
+ client.resetSession(ExecConstants.KAFKA_READER_SKIP_INVALID_RECORDS);
}
}
@@ -197,15 +198,15 @@ public class KafkaQueriesTest extends KafkaTestBase {
KafkaMessageGenerator generator = new
KafkaMessageGenerator(embeddedKafkaCluster.getKafkaBrokerList(),
StringSerializer.class);
generator.populateMessages(topicName, "{\"nan_col\":NaN,
\"inf_col\":Infinity}");
- alterSession(ExecConstants.KAFKA_READER_NAN_INF_NUMBERS, false);
+ client.alterSession(ExecConstants.KAFKA_READER_NAN_INF_NUMBERS, false);
try {
- test("select nan_col, inf_col from kafka.`%s`", topicName);
+ queryBuilder().sql("select nan_col, inf_col from kafka.`%s`",
topicName).run();
fail();
} catch (UserException e) {
// expected
}
- alterSession(ExecConstants.KAFKA_READER_NAN_INF_NUMBERS, true);
+ client.alterSession(ExecConstants.KAFKA_READER_NAN_INF_NUMBERS, true);
testBuilder()
.sqlQuery("select nan_col, inf_col from kafka.`%s`", topicName)
.unOrdered()
@@ -213,7 +214,7 @@ public class KafkaQueriesTest extends KafkaTestBase {
.baselineValues(Double.NaN, Double.POSITIVE_INFINITY)
.go();
} finally {
- resetSessionOption(ExecConstants.KAFKA_READER_NAN_INF_NUMBERS);
+ client.resetSession(ExecConstants.KAFKA_READER_NAN_INF_NUMBERS);
}
}
@@ -225,15 +226,15 @@ public class KafkaQueriesTest extends KafkaTestBase {
KafkaMessageGenerator generator = new
KafkaMessageGenerator(embeddedKafkaCluster.getKafkaBrokerList(),
StringSerializer.class);
generator.populateMessages(topicName, "{\"name\": \"AB\\\"\\C\"}");
- alterSession(ExecConstants.KAFKA_READER_ESCAPE_ANY_CHAR, false);
+ client.alterSession(ExecConstants.KAFKA_READER_ESCAPE_ANY_CHAR, false);
try {
- test("select name from kafka.`%s`", topicName);
+ queryBuilder().sql("select name from kafka.`%s`", topicName).run();
fail();
} catch (UserException e) {
// expected
}
- alterSession(ExecConstants.KAFKA_READER_ESCAPE_ANY_CHAR, true);
+ client.alterSession(ExecConstants.KAFKA_READER_ESCAPE_ANY_CHAR, true);
testBuilder()
.sqlQuery("select name from kafka.`%s`", topicName)
.unOrdered()
@@ -241,7 +242,7 @@ public class KafkaQueriesTest extends KafkaTestBase {
.baselineValues("AB\"C")
.go();
} finally {
- resetSessionOption(ExecConstants.KAFKA_READER_ESCAPE_ANY_CHAR);
+ client.resetSession(ExecConstants.KAFKA_READER_ESCAPE_ANY_CHAR);
}
}
}
diff --git
a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaTestBase.java
b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaTestBase.java
index f3c24e6..56e8138 100644
---
a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaTestBase.java
+++
b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaTestBase.java
@@ -17,15 +17,15 @@
*/
package org.apache.drill.exec.store.kafka;
-import java.util.List;
import java.util.Map;
-import org.apache.drill.PlanTestBase;
import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.rpc.user.QueryDataBatch;
import org.apache.drill.exec.store.StoragePluginRegistry;
import org.apache.drill.exec.store.kafka.cluster.EmbeddedKafkaCluster;
+import org.apache.drill.exec.store.kafka.decoders.JsonMessageReader;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterFixtureBuilder;
+import org.apache.drill.test.ClusterTest;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.junit.AfterClass;
import org.junit.Assert;
@@ -34,47 +34,42 @@ import org.junit.BeforeClass;
import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
-public class KafkaTestBase extends PlanTestBase {
+public class KafkaTestBase extends ClusterTest {
protected static KafkaStoragePluginConfig storagePluginConfig;
@BeforeClass
public static void setUpBeforeClass() throws Exception {
// Make sure this test is only running as part of the suit
Assume.assumeTrue(TestKafkaSuit.isRunningSuite());
+ ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher);
+ startCluster(builder);
TestKafkaSuit.initKafka();
initKafkaStoragePlugin(TestKafkaSuit.embeddedKafkaCluster);
}
public static void initKafkaStoragePlugin(EmbeddedKafkaCluster
embeddedKafkaCluster) throws Exception {
- final StoragePluginRegistry pluginRegistry =
getDrillbitContext().getStorage();
+ final StoragePluginRegistry pluginRegistry =
cluster.drillbit().getContext().getStorage();
Map<String, String> kafkaConsumerProps = Maps.newHashMap();
kafkaConsumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
embeddedKafkaCluster.getKafkaBrokerList());
kafkaConsumerProps.put(ConsumerConfig.GROUP_ID_CONFIG,
"drill-test-consumer");
storagePluginConfig = new KafkaStoragePluginConfig(kafkaConsumerProps);
storagePluginConfig.setEnabled(true);
pluginRegistry.put(KafkaStoragePluginConfig.NAME, storagePluginConfig);
- testNoResult(String.format("alter session set `%s` = '%s'",
ExecConstants.KAFKA_RECORD_READER,
- "org.apache.drill.exec.store.kafka.decoders.JsonMessageReader"));
- testNoResult(String.format("alter session set `%s` = %d",
ExecConstants.KAFKA_POLL_TIMEOUT, 5000));
+ client.alterSession(ExecConstants.KAFKA_RECORD_READER,
JsonMessageReader.class.getName());
+ client.alterSession(ExecConstants.KAFKA_POLL_TIMEOUT, 5000);
}
- public List<QueryDataBatch> runKafkaSQLWithResults(String sql) throws
Exception {
- return testSqlWithResults(sql);
- }
-
- public void runKafkaSQLVerifyCount(String sql, int expectedRowCount) throws
Exception {
- List<QueryDataBatch> results = runKafkaSQLWithResults(sql);
- logResultAndVerifyRowCount(results, expectedRowCount);
- }
-
- public void logResultAndVerifyRowCount(List<QueryDataBatch> results, int
expectedRowCount)
- throws SchemaChangeException {
- int rowCount = logResult(results);
+ public void runKafkaSQLVerifyCount(String sql, int expectedRowCount) {
+ long rowCount = queryBuilder().sql(sql).log();
if (expectedRowCount != -1) {
Assert.assertEquals(expectedRowCount, rowCount);
}
}
+ public static long testSql(String sql) {
+ return client.queryBuilder().sql(sql).log();
+ }
+
@AfterClass
public static void tearDownKafkaTestBase() {
if (TestKafkaSuit.isRunningSuite()) {
diff --git
a/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java
b/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java
index 0be54f3..fb1fe15 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java
@@ -672,6 +672,18 @@ public class QueryBuilder {
return new PlanMatcher(plan);
}
+ /**
+ * Submits explain plan statement
+ * and creates plan matcher instance based on return query plan.
+ *
+ * @return plan matcher
+ * @throws Exception if the query fails
+ */
+ public PlanMatcher jsonPlanMatcher() throws Exception {
+ String plan = explainJson();
+ return new PlanMatcher(plan);
+ }
+
private QuerySummary produceSummary(BufferingQueryEventListener listener)
throws Exception {
long start = System.currentTimeMillis();
int recordCount = 0;