This is an automated email from the ASF dual-hosted git repository. bohdan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/drill.git
commit cb6de5a5c2ff4928ce978c0c505ad1ec1d793162 Author: Anton Gozhiy <[email protected]> AuthorDate: Wed Jul 10 19:59:15 2019 +0300 DRILL-6961: Handle exceptions during queries to information_schema closes #1833 --- .../exec/store/jdbc/TestJdbcPluginWithMySQLIT.java | 6 ++ .../src/test/resources/mysql-test-data.sql | 2 + .../store/kafka/schema/KafkaMessageSchema.java | 11 ++- .../drill/exec/store/kafka/KafkaQueriesTest.java | 6 ++ .../openTSDB/client/services/ServiceImpl.java | 8 +-- .../drill/store/openTSDB/TestOpenTSDBPlugin.java | 80 +++++++++++++++------- .../exec/store/StoragePluginRegistryImpl.java | 6 +- .../store/ischema/InfoSchemaRecordGenerator.java | 3 +- 8 files changed, 86 insertions(+), 36 deletions(-) diff --git a/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPluginWithMySQLIT.java b/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPluginWithMySQLIT.java index cd6b4b8..0ff6894 100644 --- a/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPluginWithMySQLIT.java +++ b/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPluginWithMySQLIT.java @@ -323,4 +323,10 @@ public class TestJdbcPluginWithMySQLIT extends ClusterTest { .baselineValuesForSingleColumn(1, 2, 3, 5) .go(); } + + @Test + public void testInformationSchemaViews() throws Exception { + String query = "select * from information_schema.`views`"; + run(query); + } } diff --git a/contrib/storage-jdbc/src/test/resources/mysql-test-data.sql b/contrib/storage-jdbc/src/test/resources/mysql-test-data.sql index 9a180e0..b1af4d1 100644 --- a/contrib/storage-jdbc/src/test/resources/mysql-test-data.sql +++ b/contrib/storage-jdbc/src/test/resources/mysql-test-data.sql @@ -91,3 +91,5 @@ insert into person (first_name, last_name, address, city, state, zip, bigint_fie 'ZZZ'); insert into person (person_id) values (5); + +create view person_view as select * from person; diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/schema/KafkaMessageSchema.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/schema/KafkaMessageSchema.java index 6ab826f..034927a 100644 --- a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/schema/KafkaMessageSchema.java +++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/schema/KafkaMessageSchema.java @@ -17,12 +17,12 @@ */ package org.apache.drill.exec.store.kafka.schema; +import java.util.Collections; import java.util.Map; import java.util.Set; import org.apache.calcite.schema.SchemaPlus; import org.apache.calcite.schema.Table; -import org.apache.drill.common.exceptions.UserException; import org.apache.drill.exec.planner.logical.DrillTable; import org.apache.drill.exec.planner.logical.DynamicDrillTable; import org.apache.drill.exec.store.AbstractSchema; @@ -30,7 +30,6 @@ import org.apache.drill.exec.store.kafka.KafkaScanSpec; import org.apache.drill.exec.store.kafka.KafkaStoragePlugin; import org.apache.drill.exec.store.kafka.KafkaStoragePluginConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.common.KafkaException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,7 +44,7 @@ public class KafkaMessageSchema extends AbstractSchema { private Set<String> tableNames; public KafkaMessageSchema(final KafkaStoragePlugin plugin, final String name) { - super(ImmutableList.<String> of(), name); + super(ImmutableList.of(), name); this.plugin = plugin; } @@ -76,9 +75,9 @@ public class KafkaMessageSchema extends AbstractSchema { if (tableNames == null) { try (KafkaConsumer<?, ?> kafkaConsumer = new KafkaConsumer<>(plugin.getConfig().getKafkaConsumerProps())) { tableNames = kafkaConsumer.listTopics().keySet(); - } catch(KafkaException e) { - throw UserException.dataReadError(e).message("Failed to get tables information").addContext(e.getMessage()) - .build(logger); + } catch (Exception e) { + logger.warn("Failure while loading table names for database '{}': {}", getName(), e.getMessage(), e.getCause()); + return Collections.emptySet(); } } return tableNames; diff --git a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaQueriesTest.java b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaQueriesTest.java index 51bb91d..6dc1b3e 100644 --- a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaQueriesTest.java +++ b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaQueriesTest.java @@ -74,6 +74,12 @@ public class KafkaQueriesTest extends KafkaTestBase { .baselineValues(endOffsetsMap.get(new TopicPartition(TestQueryConstants.JSON_TOPIC, 0))-1).go(); } + @Test + public void testInformationSchema() throws Exception { + String query = "select * from information_schema.`views`"; + runSQL(query); + } + private Map<TopicPartition, Long> fetchOffsets(int flag) { KafkaConsumer<byte[], byte[]> kafkaConsumer = new KafkaConsumer<>(storagePluginConfig.getKafkaConsumerProps(), new ByteArrayDeserializer(), new ByteArrayDeserializer()); diff --git a/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/client/services/ServiceImpl.java b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/client/services/ServiceImpl.java index 41730bd..40ed682 100644 --- a/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/client/services/ServiceImpl.java +++ b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/client/services/ServiceImpl.java @@ -33,6 +33,7 @@ import retrofit2.converter.jackson.JacksonConverterFactory; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -101,10 +102,9 @@ public class ServiceImpl implements Service { try { return client.getAllTablesName().execute().body(); } catch (IOException e) { - throw UserException.connectionError(e) - .message("Cannot connect to the db. " + - "Maybe you have incorrect connection params or db unavailable now") - .build(log); + log.warn("Cannot connect to the db. " + + "Maybe you have incorrect connection params or db unavailable now: {}", e.getMessage(), e.getCause()); + return Collections.emptySet(); } } diff --git a/contrib/storage-opentsdb/src/test/java/org/apache/drill/store/openTSDB/TestOpenTSDBPlugin.java b/contrib/storage-opentsdb/src/test/java/org/apache/drill/store/openTSDB/TestOpenTSDBPlugin.java index bcff4d6..4fe9cc2 100644 --- a/contrib/storage-opentsdb/src/test/java/org/apache/drill/store/openTSDB/TestOpenTSDBPlugin.java +++ b/contrib/storage-opentsdb/src/test/java/org/apache/drill/store/openTSDB/TestOpenTSDBPlugin.java @@ -18,12 +18,13 @@ package org.apache.drill.store.openTSDB; import com.github.tomakehurst.wiremock.junit.WireMockRule; -import org.apache.drill.PlanTestBase; import org.apache.drill.common.exceptions.UserRemoteException; +import org.apache.drill.exec.proto.UserBitShared.QueryType; import org.apache.drill.exec.store.StoragePluginRegistry; import org.apache.drill.exec.store.openTSDB.OpenTSDBStoragePluginConfig; +import org.apache.drill.test.ClusterFixture; +import org.apache.drill.test.ClusterTest; import org.apache.drill.test.QueryTestUtil; -import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Rule; @@ -48,8 +49,9 @@ import static org.apache.drill.store.openTSDB.TestDataHolder.SAMPLE_DATA_FOR_POS import static org.apache.drill.store.openTSDB.TestDataHolder.SAMPLE_DATA_FOR_POST_END_REQUEST_WITHOUT_TAGS; import static org.apache.drill.store.openTSDB.TestDataHolder.SAMPLE_DATA_FOR_POST_END_REQUEST_WITH_TAGS; import static org.apache.drill.store.openTSDB.TestDataHolder.SAMPLE_DATA_FOR_POST_REQUEST_WITH_TAGS; +import static org.junit.Assert.assertEquals; -public class TestOpenTSDBPlugin extends PlanTestBase { +public class TestOpenTSDBPlugin extends ClusterTest { private static int portNumber; @@ -58,8 +60,9 @@ public class TestOpenTSDBPlugin extends PlanTestBase { @BeforeClass public static void setup() throws Exception { + startCluster(ClusterFixture.builder(dirTestWatcher)); portNumber = QueryTestUtil.getFreePortNumber(10_000, 200); - final StoragePluginRegistry pluginRegistry = getDrillbitContext().getStorage(); + final StoragePluginRegistry pluginRegistry = cluster.drillbit().getContext().getStorage(); OpenTSDBStoragePluginConfig storagePluginConfig = new OpenTSDBStoragePluginConfig(String.format("http://localhost:%s", portNumber)); storagePluginConfig.setEnabled(true); @@ -109,11 +112,11 @@ public class TestOpenTSDBPlugin extends PlanTestBase { .withBody(SAMPLE_DATA_FOR_POST_DOWNSAMPLE_REQUEST_WITHOUT_TAGS))); wireMockRule.stubFor(post(urlEqualTo("/api/query")) - .withRequestBody(equalToJson(END_PARAM_REQUEST_WTIHOUT_TAGS)) - .willReturn(aResponse() - .withStatus(200) - .withHeader("Content-Type", "application/json") - .withBody(SAMPLE_DATA_FOR_POST_END_REQUEST_WITHOUT_TAGS))); + .withRequestBody(equalToJson(END_PARAM_REQUEST_WTIHOUT_TAGS)) + .willReturn(aResponse() + .withStatus(200) + .withHeader("Content-Type", "application/json") + .withBody(SAMPLE_DATA_FOR_POST_END_REQUEST_WITHOUT_TAGS))); wireMockRule.stubFor(post(urlEqualTo("/api/query")) .withRequestBody(equalToJson(DOWNSAMPLE_REQUEST_WITH_TAGS)) @@ -123,11 +126,11 @@ public class TestOpenTSDBPlugin extends PlanTestBase { .withBody(SAMPLE_DATA_FOR_POST_DOWNSAMPLE_REQUEST_WITH_TAGS))); wireMockRule.stubFor(post(urlEqualTo("/api/query")) - .withRequestBody(equalToJson(END_PARAM_REQUEST_WITH_TAGS)) - .willReturn(aResponse() - .withStatus(200) - .withHeader("Content-Type", "application/json") - .withBody(SAMPLE_DATA_FOR_POST_END_REQUEST_WITH_TAGS))); + .withRequestBody(equalToJson(END_PARAM_REQUEST_WITH_TAGS)) + .willReturn(aResponse() + .withStatus(200) + .withHeader("Content-Type", "application/json") + .withBody(SAMPLE_DATA_FOR_POST_END_REQUEST_WITH_TAGS))); wireMockRule.stubFor(post(urlEqualTo("/api/query")) .withRequestBody(equalToJson(REQUEST_TO_NONEXISTENT_METRIC)) @@ -141,48 +144,77 @@ public class TestOpenTSDBPlugin extends PlanTestBase { public void testBasicQueryFromWithRequiredParams() throws Exception { String query = "select * from openTSDB.`(metric=warp.speed.test, start=47y-ago, aggregator=sum)`"; - Assert.assertEquals(18, testSql(query)); + assertEquals(18, runQuery(query)); } @Test public void testBasicQueryGroupBy() throws Exception { String query = "select `timestamp`, sum(`aggregated value`) from openTSDB.`(metric=warp.speed.test, aggregator=sum, start=47y-ago)` group by `timestamp`"; - Assert.assertEquals(15, testSql(query)); + assertEquals(15, runQuery(query)); } @Test public void testBasicQueryFromWithInterpolationParam() throws Exception { String query = "select * from openTSDB.`(metric=warp.speed.test, aggregator=sum, start=47y-ago, downsample=5y-avg)`"; - Assert.assertEquals(4, testSql(query)); + assertEquals(4, runQuery(query)); } @Test public void testBasicQueryFromWithEndParam() throws Exception { String query = "select * from openTSDB.`(metric=warp.speed.test, aggregator=sum, start=47y-ago, end=1407165403000))`"; - Assert.assertEquals(5, testSql(query)); + assertEquals(5, runQuery(query)); } @Test(expected = UserRemoteException.class) public void testBasicQueryWithoutTableName() throws Exception { - test("select * from openTSDB.``;"); + runQuery("select * from openTSDB.``;"); } @Test(expected = UserRemoteException.class) public void testBasicQueryWithNonExistentTableName() throws Exception { - test("select * from openTSDB.`warp.spee`"); + runQuery("select * from openTSDB.`warp.spee`"); } @Test public void testPhysicalPlanSubmission() throws Exception { String query = "select * from openTSDB.`(metric=warp.speed.test, start=47y-ago, aggregator=sum)`"; - testPhysicalPlanExecutionBasedOnQuery(query); + String plan = queryBuilder() + .sql(query) + .explainJson(); + queryBuilder() + .query(QueryType.PHYSICAL, plan) + .run(); } @Test public void testDescribe() throws Exception { - test("use openTSDB"); - test("describe `warp.speed.test`"); - Assert.assertEquals(1, testSql("show tables")); + runQuery("use openTSDB"); + runQuery("describe `warp.speed.test`"); + assertEquals(1, runQuery("show tables")); + } + + @Test + public void testInformationSchemaWrongPluginConfig() throws Exception { + ClusterFixture cluster = ClusterFixture.bareBuilder(dirTestWatcher) + .build(); + int portNumber = QueryTestUtil.getFreePortNumber(10_000, 200); + final StoragePluginRegistry pluginRegistry = cluster.drillbit().getContext().getStorage(); + OpenTSDBStoragePluginConfig storagePluginConfig = + new OpenTSDBStoragePluginConfig(String.format("http://localhost:%s/", portNumber)); + storagePluginConfig.setEnabled(true); + pluginRegistry.createOrUpdate(OpenTSDBStoragePluginConfig.NAME, storagePluginConfig, true); + String query = "select * from information_schema.`views`"; + cluster.clientFixture() + .queryBuilder() + .sql(query) + .run(); + } + + private long runQuery(String query) throws Exception { + return queryBuilder() + .sql(query) + .run() + .recordCount(); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistryImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistryImpl.java index 0fbfe4f..2bf6bc5 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistryImpl.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistryImpl.java @@ -581,7 +581,11 @@ public class StoragePluginRegistryImpl implements StoragePluginRegistry { // finally register schemas with the refreshed plugins for (StoragePlugin plugin : enabledPlugins.plugins()) { - plugin.registerSchemas(schemaConfig, parent); + try { + plugin.registerSchemas(schemaConfig, parent); + } catch (Exception e) { + logger.warn("Error during `{}` schema initialization: {}", plugin.getName(), e.getMessage(), e.getCause()); + } } } catch (ExecutionSetupException e) { throw new DrillRuntimeException("Failure while updating storage plugins", e); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaRecordGenerator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaRecordGenerator.java index a85a7a7..2197f1a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaRecordGenerator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaRecordGenerator.java @@ -378,8 +378,9 @@ public abstract class InfoSchemaRecordGenerator<S> { @Override public boolean visitTable(String schemaName, String tableName, Table table) { if (table.getJdbcTableType() == TableType.VIEW) { + // View's SQL may not be available for some non-Drill views, for example, JDBC view records.add(new Records.View(IS_CATALOG_NAME, schemaName, tableName, - ((DrillViewInfoProvider) table).getViewSql())); + table instanceof DrillViewInfoProvider ? ((DrillViewInfoProvider) table).getViewSql() : "")); } return false; }
