This is an automated email from the ASF dual-hosted git repository. wzhou pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit ec22a1e1cad9f2e0ab75a598eb75727c7a044172 Author: wzhou-code <[email protected]> AuthorDate: Fri Nov 24 20:56:04 2023 -0800 IMPALA-12502: Support Impala to Impala federation This patch adds support to read Impala tables in the Impala cluster through JDBC external data source. It also adds a new counter NumExternalDataSourceGetNext in profile for the total number of calls to ExternalDataSource::GetNext(). Setting query options for Impala will be supported in a following patch. Testing: - Added an end-to-end unit test to read Impala tables from Impala cluster through JDBC external data source. Manually ran the unit-test with Impala tables in Impala cluster on a remote host by setting $INTERNAL_LISTEN_HOST in jdbc.url as the ip address of the remote host on which an Impala cluster is running. - Added LDAP test for reading table through JDBC external data source with LDAP authentication. Manually ran the unit-test with Impala tables in a remote Impala cluster. - Passed core tests. Change-Id: I79ad3273932b658cb85c9c17cc834fa1b5fbd64f Reviewed-on: http://gerrit.cloudera.org:8080/20731 Reviewed-by: Abhishek Rawat <[email protected]> Tested-by: Wenzhe Zhou <[email protected]> --- be/src/exec/data-source-scan-node.cc | 6 + be/src/exec/data-source-scan-node.h | 3 + bin/impala-config.sh | 4 + .../apache/impala/customcluster/LdapHS2Test.java | 153 ++++++++++++- .../impala/extdatasource/jdbc/JdbcDataSource.java | 4 + .../extdatasource/jdbc/conf/DatabaseType.java | 3 +- .../extdatasource/jdbc/conf/JdbcStorageConfig.java | 2 + .../jdbc/dao/DatabaseAccessorFactory.java | 4 + .../jdbc/dao/GenericJdbcDatabaseAccessor.java | 8 +- .../ImpalaDatabaseAccessor.java} | 39 +++- testdata/bin/download-impala-jdbc-driver.sh | 72 +++++++ .../queries/QueryTest/impala-ext-jdbc-tables.test | 237 +++++++++++++++++++++ tests/custom_cluster/test_ext_data_sources.py | 34 +++ 13 files changed, 557 insertions(+), 12 deletions(-) diff --git a/be/src/exec/data-source-scan-node.cc b/be/src/exec/data-source-scan-node.cc index f37318184..df2eb7f5a 100644 --- a/be/src/exec/data-source-scan-node.cc +++ b/be/src/exec/data-source-scan-node.cc @@ -49,6 +49,9 @@ DEFINE_int32(data_source_batch_size, 1024, "Batch size for calls to GetNext() on namespace impala { +PROFILE_DEFINE_COUNTER(NumExternalDataSourceGetNext, DEBUG, TUnit::UNIT, + "The total number of calls to ExternalDataSource::GetNext()"); + // $0 = num expected cols, $1 = actual num columns const string ERROR_NUM_COLUMNS = "Data source returned unexpected number of columns. " "Expected $0 but received $1. This likely indicates a problem with the data source " @@ -93,6 +96,8 @@ Status DataSourceScanNode::Prepare(RuntimeState* state) { data_src_node_.init_string)); cols_next_val_idx_.resize(tuple_desc_->slots().size(), 0); + num_ext_data_source_get_next_ = + PROFILE_NumExternalDataSourceGetNext.Instantiate(runtime_profile_); return Status::OK(); } @@ -157,6 +162,7 @@ Status DataSourceScanNode::GetNextInputBatch() { Ubsan::MemSet(cols_next_val_idx_.data(), 0, sizeof(int) * cols_next_val_idx_.size()); TGetNextParams params; params.__set_scan_handle(scan_handle_); + COUNTER_ADD(num_ext_data_source_get_next_, 1); RETURN_IF_ERROR(data_source_executor_->GetNext(params, input_batch_.get())); RETURN_IF_ERROR(Status(input_batch_->status)); RETURN_IF_ERROR(ValidateRowBatchSize()); diff --git a/be/src/exec/data-source-scan-node.h b/be/src/exec/data-source-scan-node.h index 7b1e33714..452bac153 100644 --- a/be/src/exec/data-source-scan-node.h +++ b/be/src/exec/data-source-scan-node.h @@ -98,6 +98,9 @@ class DataSourceScanNode : public ScanNode { /// the next row batch. std::vector<int> cols_next_val_idx_; + /// The total number of calls to ExternalDataSource::GetNext(). + RuntimeProfile::Counter* num_ext_data_source_get_next_; + /// Materializes the next row (next_row_idx_) into tuple. 'local_tz' is used as the /// local time-zone for materializing 'TYPE_TIMESTAMP' slots. Status MaterializeNextRow(const Timezone* local_tz, MemPool* mem_pool, Tuple* tuple); diff --git a/bin/impala-config.sh b/bin/impala-config.sh index 7bc23770b..b6d3547be 100755 --- a/bin/impala-config.sh +++ b/bin/impala-config.sh @@ -207,6 +207,10 @@ if [[ $ARCH_NAME == 'aarch64' ]]; then export IMPALA_HADOOP_CLIENT_VERSION=3.3.6 unset IMPALA_HADOOP_CLIENT_URL fi + +# Impala JDBC driver for testing. +export IMPALA_SIMBA_JDBC_DRIVER_VERSION=42-2.6.32.1041 + # Thrift related environment variables. # IMPALA_THRIFT_POM_VERSION is used to populate IMPALA_THRIFT_JAVA_VERSION and # thrift.version in java/pom.xml. diff --git a/fe/src/test/java/org/apache/impala/customcluster/LdapHS2Test.java b/fe/src/test/java/org/apache/impala/customcluster/LdapHS2Test.java index 325f1a148..5b9540e77 100644 --- a/fe/src/test/java/org/apache/impala/customcluster/LdapHS2Test.java +++ b/fe/src/test/java/org/apache/impala/customcluster/LdapHS2Test.java @@ -40,6 +40,8 @@ import org.apache.thrift.transport.THttpClient; import org.apache.thrift.protocol.TBinaryProtocol; import org.junit.ClassRule; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; @CreateDS(name = "myDS", partitions = { @CreatePartition(name = "test", suffix = "dc=myorg,dc=com") }) @@ -51,6 +53,8 @@ import org.junit.Test; * ldap authentication is being used. */ public class LdapHS2Test { + private static final Logger LOG = LoggerFactory.getLogger(LdapHS2Test.class); + @ClassRule public static CreateLdapServerRule serverRule = new CreateLdapServerRule(); @@ -92,7 +96,9 @@ public class LdapHS2Test { verifySuccess(fetchResp.getStatus()); List<TColumn> columns = fetchResp.getResults().getColumns(); assertEquals(1, columns.size()); - assertEquals(expectedResult, columns.get(0).getStringVal().getValues().get(0)); + if (expectedResult != null) { + assertEquals(expectedResult, columns.get(0).getStringVal().getValues().get(0)); + } return execResp.getOperationHandle(); } @@ -704,4 +710,149 @@ public class LdapHS2Test { assertEquals(e.getMessage(), "HTTP Response code: 401"); } } + + /** + * Tests LDAP for reading Impala table through JDBC external data source. + */ + @Test + public void testImpalaExtJdbcTables() throws Exception { + setUp(""); + verifyMetrics(0, 0); + THttpClient transport = new THttpClient("http://localhost:28000"); + Map<String, String> headers = new HashMap<String, String>(); + // Authenticate as 'Test1Ldap' with password '12345' + headers.put("Authorization", "Basic VGVzdDFMZGFwOjEyMzQ1"); + transport.setCustomHeaders(headers); + transport.open(); + TCLIService.Iface client = new TCLIService.Client(new TBinaryProtocol(transport)); + + // Open a session which will get username 'Test1Ldap'. + TOpenSessionReq openReq = new TOpenSessionReq(); + TOpenSessionResp openResp = client.OpenSession(openReq); + TSessionHandle session = openResp.getSessionHandle(); + // One successful authentication. + verifyMetrics(1, 0); + + // Download Impala JDBC driver. + String downloadImpalaJdbcDriver = new File(System.getenv("IMPALA_HOME"), + "testdata/bin/download-impala-jdbc-driver.sh").getPath(); + String[] cmd = { downloadImpalaJdbcDriver }; + RunShellCommand.Run(cmd, /*shouldSucceed*/ true, "", ""); + + // Define queries. + String fileSystemPrefix = System.getenv("FILESYSTEM_PREFIX"); + String internalListenHost = System.getenv("INTERNAL_LISTEN_HOST"); + + String dropDSQuery = "DROP DATA SOURCE IF EXISTS impala_jdbc_test_ds"; + String createDSQuery = String.format("CREATE DATA SOURCE impala_jdbc_test_ds " + + "LOCATION '%s/test-warehouse/data-sources/jdbc-data-source.jar' " + + "CLASS 'org.apache.impala.extdatasource.jdbc.JdbcDataSource' " + + "API_VERSION 'V1'", fileSystemPrefix); + String dropTableQuery = "DROP TABLE IF EXISTS %s"; + // Set JDBC authentication mechanisms as LDAP (3) with username/password as + // TEST_USER_1/TEST_PASSWORD_1. + String createTableQuery = String.format("CREATE TABLE impala_jdbc_ext_test_table (" + + "id INT, bool_col BOOLEAN, tinyint_col TINYINT, smallint_col SMALLINT, " + + "int_col INT, bigint_col BIGINT, float_col FLOAT, double_col DOUBLE, " + + "date_string_col STRING, string_col STRING, timestamp_col TIMESTAMP) " + + "PRODUCED BY DATA SOURCE impala_jdbc_test_ds(" + + "'{\"database.type\":\"IMPALA\", " + + "\"jdbc.url\":\"jdbc:impala://%s:21050/functional\", " + + "\"jdbc.auth\":\"AuthMech=3\", " + + "\"jdbc.driver\":\"com.cloudera.impala.jdbc.Driver\", " + + "\"driver.url\":\"%s/test-warehouse/data-sources/jdbc-drivers/" + + "ImpalaJDBC42.jar\", " + + "\"dbcp.username\":\"%s\", " + + "\"dbcp.password\":\"%s\", " + + "\"table\":\"alltypes\"}')", + internalListenHost, fileSystemPrefix, TEST_USER_1, TEST_PASSWORD_1); + // Set JDBC authentication mechanisms as LDAP with wrong password. + String createTableWithWrongPassword = + String.format("CREATE TABLE impala_jdbc_tbl_wrong_password (" + + "id INT, bool_col BOOLEAN, tinyint_col TINYINT, smallint_col SMALLINT, " + + "int_col INT, bigint_col BIGINT, float_col FLOAT, double_col DOUBLE, " + + "date_string_col STRING, string_col STRING, timestamp_col TIMESTAMP) " + + "PRODUCED BY DATA SOURCE impala_jdbc_test_ds(" + + "'{\"database.type\":\"IMPALA\", " + + "\"jdbc.url\":\"jdbc:impala://%s:21050/functional\", " + + "\"jdbc.auth\":\"AuthMech=3\", " + + "\"jdbc.driver\":\"com.cloudera.impala.jdbc.Driver\", " + + "\"driver.url\":\"%s/test-warehouse/data-sources/jdbc-drivers/" + + "ImpalaJDBC42.jar\", " + + "\"dbcp.username\":\"%s\", " + + "\"dbcp.password\":\"wrong-password\", " + + "\"table\":\"alltypes\"}')", + internalListenHost, fileSystemPrefix, TEST_USER_1); + // Set JDBC authentication mechanisms as LDAP without AuthMech. + String createTableWithoutAuthMech = + String.format("CREATE TABLE impala_jdbc_tbl_without_auth_mech (" + + "id INT, bool_col BOOLEAN, tinyint_col TINYINT, smallint_col SMALLINT, " + + "int_col INT, bigint_col BIGINT, float_col FLOAT, double_col DOUBLE, " + + "date_string_col STRING, string_col STRING, timestamp_col TIMESTAMP) " + + "PRODUCED BY DATA SOURCE impala_jdbc_test_ds(" + + "'{\"database.type\":\"IMPALA\", " + + "\"jdbc.url\":\"jdbc:impala://%s:21050/functional\", " + + "\"jdbc.driver\":\"com.cloudera.impala.jdbc.Driver\", " + + "\"driver.url\":\"%s/test-warehouse/data-sources/jdbc-drivers/" + + "ImpalaJDBC42.jar\", " + + "\"dbcp.username\":\"%s\", " + + "\"dbcp.password\":\"%s\", " + + "\"table\":\"alltypes\"}')", + internalListenHost, fileSystemPrefix, TEST_USER_1, TEST_PASSWORD_1); + String selectQuery = "select string_col from %s where id=9"; + + // Run queries. + // + // Create data source and tables. + execAndFetch(client, session, dropDSQuery, null); + execAndFetch(client, session, createDSQuery, "Data source has been created."); + execAndFetch(client, session, + String.format(dropTableQuery, "impala_jdbc_ext_test_table"), null); + execAndFetch(client, session, createTableQuery, "Table has been created."); + execAndFetch(client, session, + String.format(dropTableQuery, "impala_jdbc_tbl_wrong_password"), null); + execAndFetch(client, session, createTableWithWrongPassword, + "Table has been created."); + execAndFetch(client, session, + String.format(dropTableQuery, "impala_jdbc_tbl_without_auth_mech"), null); + execAndFetch(client, session, createTableWithoutAuthMech, "Table has been created."); + + // Successfully access JDBC data source table with LDAP. + execAndFetch(client, session, + String.format(selectQuery, "impala_jdbc_ext_test_table"), "9"); + // Negative case for JDBC data source table with wrong password. + String expectedError = "Error initialized or created transport for authentication"; + try { + execAndFetch(client, session, + String.format(selectQuery, "impala_jdbc_tbl_wrong_password"), "9"); + fail("Expected error: " + expectedError); + } catch (Exception e) { + assertTrue(e.getMessage().contains(expectedError)); + } + // Negative case for JDBC data source table without AuthMech. + expectedError = "Communication link failure. Failed to connect to server"; + try { + execAndFetch(client, session, + String.format(selectQuery, "impala_jdbc_tbl_without_auth_mech"), "9"); + fail("Expected error: " + expectedError); + } catch (Exception e) { + assertTrue(String.format("Authentication failed with error: %s", e.getMessage()), + e.getMessage().contains(expectedError)); + } + + // Drop data source and tables. + execAndFetch(client, session, dropDSQuery, "Data source has been dropped."); + execAndFetch(client, session, + String.format(dropTableQuery, "impala_jdbc_ext_test_table"), + "Table has been dropped."); + execAndFetch(client, session, + String.format(dropTableQuery, "impala_jdbc_tbl_wrong_password"), + "Table has been dropped."); + execAndFetch(client, session, + String.format(dropTableQuery, "impala_jdbc_tbl_without_auth_mech"), + "Table has been dropped."); + + // Two successful authentications for each ExecAndFetch(). + verifyMetrics(31, 0); + } } diff --git a/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/JdbcDataSource.java b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/JdbcDataSource.java index 173753d64..10df71dd5 100644 --- a/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/JdbcDataSource.java +++ b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/JdbcDataSource.java @@ -229,6 +229,9 @@ public class JdbcDataSource implements ExternalDataSource { initString = initString.substring(CACHE_CLASS_PREFIX.length()); cacheClass_ = true; } + // Replace '\n' with single space character so that one property setting in + // initString can be broken into multiple lines for better readability. + initString = initString.replace('\n', ' '); Map<String, String> config = new ObjectMapper().readValue(initString, typeRef); tableConfig_ = JdbcStorageConfigManager.convertMapToConfiguration(config); } catch (JsonProcessingException e) { @@ -294,6 +297,7 @@ public class JdbcDataSource implements ExternalDataSource { } // Execute query and get iterator tableConfig_.set(JdbcStorageConfig.QUERY.getPropertyName(), sb.toString()); + LOG.trace("JDBC Query: " + sb.toString()); if (schema_.getColsSize() != 0) { int limit = -1; diff --git a/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/conf/DatabaseType.java b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/conf/DatabaseType.java index 9b30350bc..a01fb4108 100644 --- a/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/conf/DatabaseType.java +++ b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/conf/DatabaseType.java @@ -24,5 +24,6 @@ public enum DatabaseType { ORACLE, POSTGRES, MSSQL, - JETHRO_DATA + JETHRO_DATA, + IMPALA } diff --git a/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/conf/JdbcStorageConfig.java b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/conf/JdbcStorageConfig.java index 0e1ac5ab3..e48fb07ef 100644 --- a/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/conf/JdbcStorageConfig.java +++ b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/conf/JdbcStorageConfig.java @@ -26,6 +26,8 @@ public enum JdbcStorageConfig { // JDBC connection string, including the database type, IP address, port number, and // database name. For example, "jdbc:postgresql://127.0.0.1:5432/functional JDBC_URL("jdbc.url", true), + // Authentication mechanisms of JDBC driver. + JDBC_AUTH("jdbc.auth", false), // Class name of JDBC driver. For example, "org.postgresql.Driver" JDBC_DRIVER_CLASS("jdbc.driver", true), // Driver URL for downloading the Jar file package that is used to access the external diff --git a/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/DatabaseAccessorFactory.java b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/DatabaseAccessorFactory.java index 5415b5763..decde6797 100644 --- a/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/DatabaseAccessorFactory.java +++ b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/DatabaseAccessorFactory.java @@ -57,6 +57,10 @@ public class DatabaseAccessorFactory { accessor = new DB2DatabaseAccessor(); break; + case IMPALA: + accessor = new ImpalaDatabaseAccessor(); + break; + default: accessor = new GenericJdbcDatabaseAccessor(); break; diff --git a/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/GenericJdbcDatabaseAccessor.java b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/GenericJdbcDatabaseAccessor.java index b2e820c30..4ab214d2c 100644 --- a/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/GenericJdbcDatabaseAccessor.java +++ b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/GenericJdbcDatabaseAccessor.java @@ -48,6 +48,7 @@ import org.apache.impala.thrift.TStatus; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Strings; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; import com.google.common.cache.RemovalListener; @@ -283,7 +284,12 @@ public class GenericJdbcDatabaseAccessor implements DatabaseAccessor { } // essential properties - dbProperties.put("url", conf.get(JdbcStorageConfig.JDBC_URL.getPropertyName())); + String jdbcUrl = conf.get(JdbcStorageConfig.JDBC_URL.getPropertyName()); + String jdbcAuth = conf.get(JdbcStorageConfig.JDBC_AUTH.getPropertyName()); + if (!Strings.isNullOrEmpty(jdbcAuth)) { + jdbcUrl += ";" + jdbcAuth; + } + dbProperties.put("url", jdbcUrl); dbProperties.put("driverClassName", conf.get(JdbcStorageConfig.JDBC_DRIVER_CLASS.getPropertyName())); dbProperties.put("driverUrl", diff --git a/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/conf/DatabaseType.java b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/ImpalaDatabaseAccessor.java similarity index 51% copy from java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/conf/DatabaseType.java copy to java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/ImpalaDatabaseAccessor.java index 9b30350bc..361eba845 100644 --- a/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/conf/DatabaseType.java +++ b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/ImpalaDatabaseAccessor.java @@ -15,14 +15,35 @@ // specific language governing permissions and limitations // under the License. -package org.apache.impala.extdatasource.jdbc.conf; +package org.apache.impala.extdatasource.jdbc.dao; + +/** + * Impala specific data accessor. This is needed because Impala JDBC drivers do not + * support generic LIMIT and OFFSET escape functions + */ +public class ImpalaDatabaseAccessor extends GenericJdbcDatabaseAccessor { + + @Override + protected String addLimitAndOffsetToQuery(String sql, int limit, int offset) { + if (offset == 0) { + return addLimitToQuery(sql, limit); + } else { + if (limit != -1) { + return sql + " LIMIT " + limit + " OFFSET " + offset; + } else { + return sql; + } + } + } + + + @Override + protected String addLimitToQuery(String sql, int limit) { + if (limit != -1) { + return sql + " LIMIT " + limit; + } else { + return sql; + } + } -public enum DatabaseType { - MYSQL, - H2, - DB2, - ORACLE, - POSTGRES, - MSSQL, - JETHRO_DATA } diff --git a/testdata/bin/download-impala-jdbc-driver.sh b/testdata/bin/download-impala-jdbc-driver.sh new file mode 100755 index 000000000..05d445295 --- /dev/null +++ b/testdata/bin/download-impala-jdbc-driver.sh @@ -0,0 +1,72 @@ +#!/bin/bash +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +# This script download the Impala jdbc driver and copy it to Hadoop FS. + +set -euo pipefail +. $IMPALA_HOME/bin/report_build_error.sh +setup_report_build_error + +. ${IMPALA_HOME}/bin/impala-config.sh > /dev/null 2>&1 + +EXT_DATA_SOURCES_HDFS_PATH=${FILESYSTEM_PREFIX}/test-warehouse/data-sources +JDBC_DRIVERS_HDFS_PATH=${EXT_DATA_SOURCES_HDFS_PATH}/jdbc-drivers +SIMBA_DRIVER_ZIP_FILENAME=ClouderaImpala_JDBC${IMPALA_SIMBA_JDBC_DRIVER_VERSION} +INNER_SIMBA_DRIVER_ZIP_FILENAME=ClouderaImpalaJDBC${IMPALA_SIMBA_JDBC_DRIVER_VERSION} +DRIVER_JAR_VERSION=${IMPALA_SIMBA_JDBC_DRIVER_VERSION%-*} +SIMBA_DRIVER_JAR_FILENAME=ImpalaJDBC${DRIVER_JAR_VERSION}.jar + +found=$(hadoop fs -find ${JDBC_DRIVERS_HDFS_PATH} -name ${SIMBA_DRIVER_JAR_FILENAME}) +if [ ! -z "$found" ]; then + echo "JDBC driver jar file already exists" + exit 0 +fi + +hadoop fs -mkdir -p ${JDBC_DRIVERS_HDFS_PATH} +pushd /tmp + +mkdir -p impala_jdbc_driver +cd impala_jdbc_driver + +# Download Impala jdbc driver. +wget "https://downloads.cloudera.com/connectors/${SIMBA_DRIVER_ZIP_FILENAME}.zip" + +# Use Python modules to unzip zip file since 'unzip' command is not available in some +# testing environments. +cat > unzip.py <<__EOT__ +import sys +from zipfile import PyZipFile +pzf = PyZipFile(sys.argv[1]) +pzf.extractall() +__EOT__ + +# Extract driver jar file from zip file. +python ./unzip.py ${SIMBA_DRIVER_ZIP_FILENAME}.zip +python ./unzip.py ${SIMBA_DRIVER_ZIP_FILENAME}/${INNER_SIMBA_DRIVER_ZIP_FILENAME}.zip + +# Copy driver jar file to Hadoop FS. +hadoop fs -put -f /tmp/impala_jdbc_driver/${SIMBA_DRIVER_JAR_FILENAME} \ + ${JDBC_DRIVERS_HDFS_PATH}/${SIMBA_DRIVER_JAR_FILENAME} + +echo "Copied ${SIMBA_DRIVER_JAR_FILENAME} into HDFS ${JDBC_DRIVERS_HDFS_PATH}" + +cd .. +rm -rf impala_jdbc_driver +popd + diff --git a/testdata/workloads/functional-query/queries/QueryTest/impala-ext-jdbc-tables.test b/testdata/workloads/functional-query/queries/QueryTest/impala-ext-jdbc-tables.test new file mode 100644 index 000000000..8001295d4 --- /dev/null +++ b/testdata/workloads/functional-query/queries/QueryTest/impala-ext-jdbc-tables.test @@ -0,0 +1,237 @@ +==== +---- QUERY +# Create DataSource +DROP DATA SOURCE IF EXISTS TestJdbcDataSource; +CREATE DATA SOURCE TestJdbcDataSource +LOCATION '$FILESYSTEM_PREFIX/test-warehouse/data-sources/jdbc-data-source.jar' +CLASS 'org.apache.impala.extdatasource.jdbc.JdbcDataSource' +API_VERSION 'V1'; +---- RESULTS +'Data source has been created.' +==== +---- QUERY +# Show created DataSource +SHOW DATA SOURCES LIKE 'testjdbcdatasource'; +---- LABELS +NAME,LOCATION,CLASS NAME,API VERSION +---- RESULTS +'testjdbcdatasource',regex:'.*/test-warehouse/data-sources/jdbc-data-source.jar','org.apache.impala.extdatasource.jdbc.JdbcDataSource','V1' +---- TYPES +STRING,STRING,STRING,STRING +==== +---- QUERY +# Create external JDBC DataSource table +DROP TABLE IF EXISTS alltypes_jdbc_datasource; +CREATE TABLE alltypes_jdbc_datasource ( + id INT, + bool_col BOOLEAN, + tinyint_col TINYINT, + smallint_col SMALLINT, + int_col INT, + bigint_col BIGINT, + float_col FLOAT, + double_col DOUBLE, + date_string_col STRING, + string_col STRING, + timestamp_col TIMESTAMP) +PRODUCED BY DATA SOURCE TestJdbcDataSource( +'{"database.type":"IMPALA", +"jdbc.url":"jdbc:impala://$INTERNAL_LISTEN_HOST:21050/functional", +"jdbc.auth":"AuthMech=0", +"jdbc.driver":"com.cloudera.impala.jdbc.Driver", +"driver.url":"$FILESYSTEM_PREFIX/test-warehouse/data-sources/jdbc-drivers/ImpalaJDBC42.jar", +"dbcp.username":"impala", +"dbcp.password":"cloudera", +"table":"alltypes"}'); +---- RESULTS +'Table has been created.' +==== +---- QUERY +# Create external JDBC DataSource table +DROP TABLE IF EXISTS alltypes_jdbc_datasource_2; +CREATE TABLE alltypes_jdbc_datasource_2 ( + id INT, + bool_col BOOLEAN, + tinyint_col TINYINT, + smallint_col SMALLINT, + int_col INT, + bigint_col BIGINT, + float_col FLOAT, + double_col DOUBLE, + date_string_col STRING, + string_col STRING, + timestamp_col TIMESTAMP) +PRODUCED BY DATA SOURCE TestJdbcDataSource( +'{"database.type":"IMPALA", +"jdbc.url":"jdbc:impala://$INTERNAL_LISTEN_HOST:21050/functional", +"jdbc.auth":"AuthMech=0", +"jdbc.driver":"com.cloudera.impala.jdbc.Driver", +"driver.url":"$FILESYSTEM_PREFIX/test-warehouse/data-sources/jdbc-drivers/ImpalaJDBC42.jar", +"dbcp.username":"impala", +"dbcp.password":"cloudera", +"table":"alltypes"}'); +---- RESULTS +'Table has been created.' +==== +---- QUERY +# Test the jdbc DataSource +# count(*) with a predicate evaluated by Impala +# Binary predicates are pushed to the external jdbc DataSource. +select count(*) from alltypes_jdbc_datasource +where float_col = 0 and string_col is not NULL +---- RESULTS +730 +---- TYPES +BIGINT +---- RUNTIME_PROFILE +row_regex: .*NumExternalDataSourceGetNext: 1 .* +row_regex: .*RowsRead: 730 .* +aggregation(SUM, RowsRead): 730 +==== +---- QUERY +# count(*) with no predicates has no materialized slots +select count(*) from alltypes_jdbc_datasource +---- RESULTS +7300 +---- TYPES +BIGINT +---- RUNTIME_PROFILE +row_regex: .*NumExternalDataSourceGetNext: 1 .* +row_regex: .*RowsRead: 7.30K .* +aggregation(SUM, RowsRead): 7300 +==== +---- QUERY +# Gets all types including a row with a NULL value. The binary predicates are pushed to +# the DataSource, "order by" and "limit" are evaluated locally. +select * +from alltypes_jdbc_datasource +where id > 10 and int_col< 5 order by id limit 5 offset 0 +---- RESULTS +11,false,1,1,1,10,1.100000023841858,10.1,'01/02/09','1',2009-01-02 00:11:00.450000000 +12,true,2,2,2,20,2.200000047683716,20.2,'01/02/09','2',2009-01-02 00:12:00.460000000 +13,false,3,3,3,30,3.299999952316284,30.3,'01/02/09','3',2009-01-02 00:13:00.480000000 +14,true,4,4,4,40,4.400000095367432,40.4,'01/02/09','4',2009-01-02 00:14:00.510000000 +20,true,0,0,0,0,0,0,'01/03/09','0',2009-01-03 00:20:00.900000000 +---- TYPES +INT, BOOLEAN, TINYINT, SMALLINT, INT, BIGINT, FLOAT, DOUBLE, STRING, STRING, TIMESTAMP +---- RUNTIME_PROFILE +row_regex: .*NumExternalDataSourceGetNext: 4 .* +row_regex: .*RowsRead: 3.64K .* +aggregation(SUM, RowsRead): 3644 +==== +---- QUERY +# Gets specified columns. +# The binary predicates are pushed to the DataSource, "order by" and "limit" are +# evaluated locally. +select id, bool_col, smallint_col, float_col, double_col, date_string_col +from alltypes_jdbc_datasource +where id > 10 and int_col< 5 order by id limit 5 offset 0 +---- RESULTS +11,false,1,1.100000023841858,10.1,'01/02/09' +12,true,2,2.200000047683716,20.2,'01/02/09' +13,false,3,3.299999952316284,30.3,'01/02/09' +14,true,4,4.400000095367432,40.4,'01/02/09' +20,true,0,0,0,'01/03/09' +---- TYPES +INT, BOOLEAN, SMALLINT, FLOAT, DOUBLE, STRING +---- RUNTIME_PROFILE +row_regex: .*NumExternalDataSourceGetNext: 4 .* +row_regex: .*RowsRead: 3.64K .* +aggregation(SUM, RowsRead): 3644 +==== +---- QUERY +# Gets specified columns from external jdbc table with case sensitive column names +# and table name. +# The binary predicates are pushed to the DataSource, "order by" and "limit" are +# evaluated locally. +select id, bool_col, smallint_col, float_col, double_col, date_string_col +from alltypes_jdbc_datasource_2 +where id > 10 and int_col< 5 order by id limit 5 offset 0 +---- RESULTS +11,false,1,1.100000023841858,10.1,'01/02/09' +12,true,2,2.200000047683716,20.2,'01/02/09' +13,false,3,3.299999952316284,30.3,'01/02/09' +14,true,4,4.400000095367432,40.4,'01/02/09' +20,true,0,0,0,'01/03/09' +---- TYPES +INT, BOOLEAN, SMALLINT, FLOAT, DOUBLE, STRING +---- RUNTIME_PROFILE +row_regex: .*NumExternalDataSourceGetNext: 4 .* +row_regex: .*RowsRead: 3.64K .* +aggregation(SUM, RowsRead): 3644 +==== +---- QUERY +# Inner join with a non jdbc table +# The binary predicates are pushed to the DataSource, but no predicate defined for +# local table. +select a.id, b.int_col +from alltypes_jdbc_datasource a inner join functional.alltypes b on (a.id = b.id) +where a.id = 1 +---- RESULTS +1,1 +---- TYPES +INT, INT +---- RUNTIME_PROFILE +row_regex: .*NumExternalDataSourceGetNext: 1 .* +row_regex: .*RowsRead: 1 .* +aggregation(SUM, RowsRead): 7301 +==== +---- QUERY +# Inner join with another jdbc table +# The binary predicates are pushed to the two DataSource Nodes. +select a.id, b.int_col +from alltypes_jdbc_datasource a inner join alltypes_jdbc_datasource_2 b on (a.id = b.id) +where a.id < 3 group by a.id, b.int_col +---- RESULTS +0,0 +1,1 +2,2 +---- TYPES +INT, INT +---- RUNTIME_PROFILE +row_regex: .*NumExternalDataSourceGetNext: 1 .* +row_regex: .*RowsRead: 3 .* +aggregation(SUM, RowsRead): 6 +==== +---- QUERY +# Cross join +# The binary predicates are pushed to the two DataSource Nodes. +select a.id, b.id +from alltypes_jdbc_datasource a cross join alltypes_jdbc_datasource b +where (a.id < 3 and b.id < 3) +order by a.id, b.id limit 10 +---- RESULTS +0,0 +0,1 +0,2 +1,0 +1,1 +1,2 +2,0 +2,1 +2,2 +---- TYPES +INT, INT +---- RUNTIME_PROFILE +row_regex: .*NumExternalDataSourceGetNext: 1 .* +row_regex: .*RowsRead: 3 .* +aggregation(SUM, RowsRead): 6 +==== +---- QUERY +# Drop table +DROP TABLE alltypes_jdbc_datasource; +---- RESULTS +'Table has been dropped.' +==== +---- QUERY +# Drop table +DROP TABLE alltypes_jdbc_datasource_2; +---- RESULTS +'Table has been dropped.' +==== +---- QUERY +# Drop DataSource +DROP DATA SOURCE TestJdbcDataSource; +---- RESULTS +'Data source has been dropped.' +==== diff --git a/tests/custom_cluster/test_ext_data_sources.py b/tests/custom_cluster/test_ext_data_sources.py index 78f16dd75..f12fd1b56 100644 --- a/tests/custom_cluster/test_ext_data_sources.py +++ b/tests/custom_cluster/test_ext_data_sources.py @@ -109,3 +109,37 @@ class TestMySqlExtJdbcTables(CustomClusterTestSuite): def test_mysql_ext_jdbc_tables(self, vector, unique_database): """Run tests for external jdbc tables on MySQL""" self.run_test_case('QueryTest/mysql-ext-jdbc-tables', vector, use_db=unique_database) + + +class TestImpalaExtJdbcTables(CustomClusterTestSuite): + """Impala query tests for external jdbc tables in Impala cluster.""" + + @classmethod + def get_workload(cls): + return 'functional-query' + + @classmethod + def _download_impala_jdbc_driver(cls): + # Download Impala jdbc driver and copy jdbc driver to HDFS. + script = os.path.join( + os.environ['IMPALA_HOME'], 'testdata/bin/download-impala-jdbc-driver.sh') + run_cmd = [script] + try: + subprocess.check_call(run_cmd, close_fds=True) + except subprocess.CalledProcessError: + assert False, "Failed to download Impala JDBC driver" + + @classmethod + def setup_class(cls): + cls._download_impala_jdbc_driver() + super(TestImpalaExtJdbcTables, cls).setup_class() + + @classmethod + def teardown_class(cls): + super(TestImpalaExtJdbcTables, cls).teardown_class() + + @pytest.mark.execute_serially + def test_impala_ext_jdbc_tables(self, vector, unique_database): + """Run tests for external jdbc tables in Impala cluster""" + self.run_test_case( + 'QueryTest/impala-ext-jdbc-tables', vector, use_db=unique_database)
