This is an automated email from the ASF dual-hosted git repository. boaz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/drill.git
commit 4a0d2d01a702f20df9204da39f55f3da4d6a7143 Author: Sorabh Hamirwasia <[email protected]> AuthorDate: Thu Apr 18 14:50:38 2019 -0700 DRILL-7164: KafkaFilterPushdownTest is sometimes failing to pattern match correctly closes #1760 --- .../exec/store/kafka/KafkaFilterPushdownTest.java | 74 ++++++++++++++-------- .../test/java/org/apache/drill/PlanTestBase.java | 48 +++++++++----- 2 files changed, 79 insertions(+), 43 deletions(-) diff --git a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaFilterPushdownTest.java b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaFilterPushdownTest.java index 2a5cddd..8e8319b 100644 --- a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaFilterPushdownTest.java +++ b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaFilterPushdownTest.java @@ -17,10 +17,10 @@ */ package org.apache.drill.exec.store.kafka; +import org.apache.drill.PlanTestBase; import org.apache.drill.categories.KafkaStorageTest; import org.apache.drill.categories.SlowTest; import org.apache.kafka.common.serialization.StringSerializer; - import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -32,10 +32,8 @@ import static org.junit.Assert.assertTrue; @Category({KafkaStorageTest.class, SlowTest.class}) public class KafkaFilterPushdownTest extends KafkaTestBase { private static final int NUM_PARTITIONS = 5; - private static final String expectedSubStr = " \"kafkaScanSpec\" : {\n" + - " \"topicName\" : \"drill-pushdown-topic\"\n" + - " },\n" + - " \"cost\""; + private static final String EXPECTED_PATTERN = "kafkaScanSpec.*\\n.*\"topicName\" : \"drill-pushdown-topic\"\\n(" + + ".*\\n)?(.*\\n)?(.*\\n)?.*cost\"(.*\\n)(.*\\n).*outputRowCount\" : (%s.0)"; @BeforeClass public static void setup() throws Exception { @@ -63,7 +61,8 @@ public class KafkaFilterPushdownTest extends KafkaTestBase { TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate1, predicate2); runKafkaSQLVerifyCount(queryString, expectedRowCount); - testPhysicalPlan(queryString, String.format(expectedSubStr, expectedRowCount)); + PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT, + new String[] {String.format(EXPECTED_PATTERN, expectedRowCount)}, null); } /** @@ -79,7 +78,8 @@ public class KafkaFilterPushdownTest extends KafkaTestBase { TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate); runKafkaSQLVerifyCount(queryString, expectedRowCount); - testPhysicalPlan(queryString, String.format(expectedSubStr, expectedRowCount)); + PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT, + new String[] {String.format(EXPECTED_PATTERN, expectedRowCount)}, null); } /** @@ -95,7 +95,8 @@ public class KafkaFilterPushdownTest extends KafkaTestBase { TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate); runKafkaSQLVerifyCount(queryString,expectedRowCount); - testPhysicalPlan(queryString, String.format(expectedSubStr, expectedRowCount)); + PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT, + new String[] {String.format(EXPECTED_PATTERN, expectedRowCount)}, null); } /** @@ -112,7 +113,8 @@ public class KafkaFilterPushdownTest extends KafkaTestBase { TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate); runKafkaSQLVerifyCount(queryString,expectedRowCount); - testPhysicalPlan(queryString, String.format(expectedSubStr, expectedRowInPlan)); + PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT, + new String[] {String.format(EXPECTED_PATTERN, expectedRowInPlan)}, null); } /** @@ -128,7 +130,8 @@ public class KafkaFilterPushdownTest extends KafkaTestBase { TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate); runKafkaSQLVerifyCount(queryString,expectedRowCount); - testPhysicalPlan(queryString, String.format(expectedSubStr, expectedRowCount)); + PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT, + new String[] {String.format(EXPECTED_PATTERN, expectedRowCount)}, null); } /** @@ -144,7 +147,8 @@ public class KafkaFilterPushdownTest extends KafkaTestBase { TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate); runKafkaSQLVerifyCount(queryString,expectedRowCount); - testPhysicalPlan(queryString, String.format(expectedSubStr, expectedRowCount)); + PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT, + new String[] {String.format(EXPECTED_PATTERN, expectedRowCount)}, null); } /** @@ -161,7 +165,8 @@ public class KafkaFilterPushdownTest extends KafkaTestBase { TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate1, predicate2); runKafkaSQLVerifyCount(queryString,expectedRowCount); - testPhysicalPlan(queryString, String.format(expectedSubStr, expectedRowCount)); + PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT, + new String[] {String.format(EXPECTED_PATTERN, expectedRowCount)}, null); } /** @@ -178,42 +183,48 @@ public class KafkaFilterPushdownTest extends KafkaTestBase { TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgOffset = 10"); runKafkaSQLVerifyCount(queryString,expectedRowCount); - testPhysicalPlan(queryString, String.format(expectedSubStr, expectedRowCount)); + PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT, + new String[] {String.format(EXPECTED_PATTERN, expectedRowCount)}, null); //"equal" such that value < startOffset queryString = String.format(TestQueryConstants.QUERY_TEMPLATE_BASIC, TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgOffset = -1"); runKafkaSQLVerifyCount(queryString,expectedRowCount); - testPhysicalPlan(queryString, String.format(expectedSubStr, expectedRowCount)); + PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT, + new String[] {String.format(EXPECTED_PATTERN, expectedRowCount)}, null); //"greater_than" such that value = endOffset-1 queryString = String.format(TestQueryConstants.QUERY_TEMPLATE_BASIC, TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgOffset > 9"); runKafkaSQLVerifyCount(queryString,expectedRowCount); - testPhysicalPlan(queryString, String.format(expectedSubStr, expectedRowCount)); + PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT, + new String[] {String.format(EXPECTED_PATTERN, expectedRowCount)}, null); //"greater_than_or_equal" such that value = endOffset queryString = String.format(TestQueryConstants.QUERY_TEMPLATE_BASIC, TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgOffset >= 10"); runKafkaSQLVerifyCount(queryString,expectedRowCount); - testPhysicalPlan(queryString, String.format(expectedSubStr, expectedRowCount)); + PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT, + new String[] {String.format(EXPECTED_PATTERN, expectedRowCount)}, null); //"less_than" such that value = startOffset queryString = String.format(TestQueryConstants.QUERY_TEMPLATE_BASIC, TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgOffset < 0"); runKafkaSQLVerifyCount(queryString,expectedRowCount); - testPhysicalPlan(queryString, String.format(expectedSubStr, expectedRowCount)); + PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT, + new String[] {String.format(EXPECTED_PATTERN, expectedRowCount)}, null); //"less_than_or_equal" such that value < startOffset queryString = String.format(TestQueryConstants.QUERY_TEMPLATE_BASIC, TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgOffset <= -1"); runKafkaSQLVerifyCount(queryString,expectedRowCount); - testPhysicalPlan(queryString, String.format(expectedSubStr, expectedRowCount)); + PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT, + new String[] {String.format(EXPECTED_PATTERN, expectedRowCount)}, null); } /** @@ -230,21 +241,24 @@ public class KafkaFilterPushdownTest extends KafkaTestBase { TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgOffset = 9"); runKafkaSQLVerifyCount(queryString, expectedRowCount); - testPhysicalPlan(queryString, String.format(expectedSubStr, expectedRowCount)); + PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT, + new String[] {String.format(EXPECTED_PATTERN, expectedRowCount)}, null); //"greater_than" such that value = endOffset-2 queryString = String.format(TestQueryConstants.QUERY_TEMPLATE_BASIC, TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgOffset > 8"); runKafkaSQLVerifyCount(queryString,expectedRowCount); - testPhysicalPlan(queryString, String.format(expectedSubStr, expectedRowCount)); + PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT, + new String[] {String.format(EXPECTED_PATTERN, expectedRowCount)}, null); //"greater_than_or_equal" such that value = endOffset-1 queryString = String.format(TestQueryConstants.QUERY_TEMPLATE_BASIC, TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgOffset >= 9"); runKafkaSQLVerifyCount(queryString,expectedRowCount); - testPhysicalPlan(queryString, String.format(expectedSubStr, expectedRowCount)); + PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT, + new String[] {String.format(EXPECTED_PATTERN, expectedRowCount)}, null); } /** @@ -262,7 +276,8 @@ public class KafkaFilterPushdownTest extends KafkaTestBase { TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate1, predicate2); runKafkaSQLVerifyCount(queryString,expectedRowCount); - testPhysicalPlan(queryString, String.format(expectedSubStr, expectedRowCount)); + PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT, + new String[] {String.format(EXPECTED_PATTERN, expectedRowCount)}, null); } /** @@ -280,7 +295,8 @@ public class KafkaFilterPushdownTest extends KafkaTestBase { TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate1, predicate2); runKafkaSQLVerifyCount(queryString,expectedRowCount); - testPhysicalPlan(queryString, String.format(expectedSubStr, expectedRowInPlan)); + PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT, + new String[] {String.format(EXPECTED_PATTERN, expectedRowInPlan)}, null); } /** @@ -299,7 +315,8 @@ public class KafkaFilterPushdownTest extends KafkaTestBase { TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate1, predicate2, predicate3); runKafkaSQLVerifyCount(queryString,expectedRowCount); - testPhysicalPlan(queryString, String.format(expectedSubStr, expectedRowCount)); + PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT, + new String[] {String.format(EXPECTED_PATTERN, expectedRowCount)}, null); } /** @@ -319,7 +336,8 @@ public class KafkaFilterPushdownTest extends KafkaTestBase { TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate1, predicate2, predicate3, predicate4); runKafkaSQLVerifyCount(queryString,expectedRowCount); - testPhysicalPlan(queryString, String.format(expectedSubStr, expectedRowCountInPlan)); + PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT, + new String[] {String.format(EXPECTED_PATTERN, expectedRowCountInPlan)}, null); } /** @@ -338,7 +356,8 @@ public class KafkaFilterPushdownTest extends KafkaTestBase { TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate1, predicate2); runKafkaSQLVerifyCount(queryString,expectedRowCount); - testPhysicalPlan(queryString, String.format(expectedSubStr, expectedRowCountInPlan)); + PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT, + new String[] {String.format(EXPECTED_PATTERN, expectedRowCountInPlan)}, null); } /** @@ -358,7 +377,8 @@ public class KafkaFilterPushdownTest extends KafkaTestBase { TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate1, predicate2, predicate3); runKafkaSQLVerifyCount(queryString,expectedRowCount); - testPhysicalPlan(queryString, String.format(expectedSubStr, expectedRowCountInPlan)); + PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT, + new String[] {String.format(EXPECTED_PATTERN, expectedRowCountInPlan)}, null); } } diff --git a/exec/java-exec/src/test/java/org/apache/drill/PlanTestBase.java b/exec/java-exec/src/test/java/org/apache/drill/PlanTestBase.java index fae593f..a777276 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/PlanTestBase.java +++ b/exec/java-exec/src/test/java/org/apache/drill/PlanTestBase.java @@ -17,18 +17,8 @@ */ package org.apache.drill; -import java.nio.file.Paths; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -import java.io.File; -import java.io.IOException; -import java.nio.file.Files; -import java.util.List; -import java.util.Stack; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - +import org.apache.calcite.sql.SqlExplain.Depth; +import org.apache.calcite.sql.SqlExplainLevel; import org.apache.commons.io.FileUtils; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.exec.record.RecordBatchLoader; @@ -37,13 +27,23 @@ import org.apache.drill.exec.rpc.user.QueryDataBatch; import org.apache.drill.exec.store.parquet.metadata.Metadata; import org.apache.drill.exec.vector.NullableVarCharVector; import org.apache.drill.exec.vector.ValueVector; -import org.apache.calcite.sql.SqlExplain.Depth; -import org.apache.calcite.sql.SqlExplainLevel; - +import org.apache.drill.shaded.guava.com.google.common.base.Preconditions; import org.apache.drill.shaded.guava.com.google.common.base.Strings; import org.apache.drill.test.BaseTestQuery; import org.apache.drill.test.QueryTestUtil; +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.List; +import java.util.Stack; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + public class PlanTestBase extends BaseTestQuery { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PlanTestBase.class); @@ -92,7 +92,23 @@ public class PlanTestBase extends BaseTestQuery { public static void testPlanMatchingPatterns(String query, Pattern[] expectedPatterns, Pattern[] excludedPatterns) throws Exception { - final String plan = getPlanInString("EXPLAIN PLAN for " + QueryTestUtil.normalizeQuery(query), OPTIQ_FORMAT); + testPlanMatchingPatterns(query, OPTIQ_FORMAT, expectedPatterns, excludedPatterns); + } + + public static void testPlanMatchingPatterns(String query, String planFormat, + String[] expectedPatterns, String[] excludedPatterns) + throws Exception { + Preconditions.checkArgument((planFormat.equals(OPTIQ_FORMAT) || planFormat.equals(JSON_FORMAT)), "Unsupported " + + "plan format %s is provided for explain plan query. Supported formats are: %s, %s", planFormat, OPTIQ_FORMAT, + JSON_FORMAT); + testPlanMatchingPatterns(query, planFormat, stringsToPatterns(expectedPatterns), + stringsToPatterns(excludedPatterns)); + } + + private static void testPlanMatchingPatterns(String query, String planFormat, + Pattern[] expectedPatterns, Pattern[] excludedPatterns) + throws Exception { + final String plan = getPlanInString("EXPLAIN PLAN for " + QueryTestUtil.normalizeQuery(query), planFormat); // Check and make sure all expected patterns are in the plan if (expectedPatterns != null) {
