This is an automated email from the ASF dual-hosted git repository. nic pushed a commit to branch 2.6.x in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 37ee3a81603baf7b1d8252736bf94f037140320c Author: hit-lacus <hit_la...@126.com> AuthorDate: Mon Jun 24 15:34:04 2019 +0800 KYLIN-4046 Refine JDBC Source(source.default=8) Currently, the function of ingest data from RDBMS(kylin.source.default=8) to Kylin has some problems , in this patch, I want to : 1. fix case-sensitive 2. fix weak dialect compatibility 3. fix mis-use quote character --- .../org/apache/kylin/common/SourceDialect.java | 56 +++- .../java/org/apache/kylin/job/JoinedFlatTable.java | 2 +- .../apache/kylin/metadata/model/PartitionDesc.java | 27 +- .../org/apache/kylin/metadata/model/TblColRef.java | 18 ++ .../DefaultPartitionConditionBuilderTest.java | 8 +- pom.xml | 3 + .../org/apache/kylin/source/jdbc/JdbcDialect.java | 26 -- .../org/apache/kylin/source/jdbc/JdbcExplorer.java | 17 +- .../kylin/source/jdbc/JdbcHiveInputBase.java | 344 +++++++++++++++++++-- .../apache/kylin/source/jdbc/JdbcTableReader.java | 15 +- .../java/org/apache/kylin/source/jdbc/SqlUtil.java | 5 +- .../source/jdbc/extensible/JdbcHiveInputBase.java | 2 +- .../source/jdbc/metadata/DefaultJdbcMetadata.java | 6 +- .../kylin/source/jdbc/metadata/IJdbcMetadata.java | 4 + .../source/jdbc/metadata/JdbcMetadataFactory.java | 16 +- .../source/jdbc/metadata/MySQLJdbcMetadata.java | 6 + .../jdbc/metadata/SQLServerJdbcMetadata.java | 6 + .../apache/kylin/source/jdbc/JdbcExplorerTest.java | 4 +- .../kylin/source/jdbc/JdbcHiveInputBaseTest.java | 68 ++++ 19 files changed, 533 insertions(+), 100 deletions(-) diff --git a/source-jdbc/src/test/java/org/apache/kylin/source/jdbc/metadata/JdbcMetadataFactoryTest.java b/core-common/src/main/java/org/apache/kylin/common/SourceDialect.java similarity index 52% rename from source-jdbc/src/test/java/org/apache/kylin/source/jdbc/metadata/JdbcMetadataFactoryTest.java rename to core-common/src/main/java/org/apache/kylin/common/SourceDialect.java index d9c7425..a87054d 100644 --- a/source-jdbc/src/test/java/org/apache/kylin/source/jdbc/metadata/JdbcMetadataFactoryTest.java +++ b/core-common/src/main/java/org/apache/kylin/common/SourceDialect.java @@ -15,21 +15,45 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.kylin.source.jdbc.metadata; - -import org.apache.kylin.source.jdbc.JdbcDialect; -import org.junit.Assert; -import org.junit.Test; - -public class JdbcMetadataFactoryTest { - - @Test - public void testGetJdbcMetadata() { - Assert.assertTrue( - JdbcMetadataFactory.getJdbcMetadata(JdbcDialect.DIALECT_MSSQL, null) instanceof SQLServerJdbcMetadata); - Assert.assertTrue( - JdbcMetadataFactory.getJdbcMetadata(JdbcDialect.DIALECT_MYSQL, null) instanceof MySQLJdbcMetadata); - Assert.assertTrue( - JdbcMetadataFactory.getJdbcMetadata(JdbcDialect.DIALECT_VERTICA, null) instanceof DefaultJdbcMetadata); + +package org.apache.kylin.common; + +/** + * Decide sql pattern according to dialect from differenct data source + */ +public enum SourceDialect { + HIVE("hive"), + + /** + * Support MySQL 5.7 + */ + MYSQL("mysql"), + + /** + * Support Microsoft Sql Server 2017 + */ + SQL_SERVER("mssql"), + + VERTICA("vertica"), + + /** + * Others + */ + UNKNOWN("unknown"); + + String source; + + SourceDialect(String source) { + this.source = source; + } + + public static SourceDialect getDialect(String name) { + + for (SourceDialect dialect : SourceDialect.values()) { + if (dialect.source.equalsIgnoreCase(name)) { + return dialect; + } + } + return UNKNOWN; } } diff --git a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java index 0d1cafb..4281885 100644 --- a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java +++ b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java @@ -241,7 +241,7 @@ public class JoinedFlatTable { if (segRange != null && !segRange.isInfinite()) { whereBuilder.append(" AND ("); String quotedPartitionCond = quoteIdentifierInSqlExpr(flatDesc, - partDesc.getPartitionConditionBuilder().buildDateRangeCondition(partDesc, flatDesc.getSegment(), segRange)); + partDesc.getPartitionConditionBuilder().buildDateRangeCondition(partDesc, flatDesc.getSegment(), segRange, null)); whereBuilder.append(quotedPartitionCond); whereBuilder.append(")" + sep); } diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/PartitionDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/PartitionDesc.java index 56ededb..f93996e 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/PartitionDesc.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/PartitionDesc.java @@ -20,6 +20,7 @@ package org.apache.kylin.metadata.model; import java.io.Serializable; import java.util.Locale; +import java.util.function.Function; import org.apache.commons.lang3.StringUtils; import org.apache.kylin.common.util.ClassUtil; @@ -184,19 +185,26 @@ public class PartitionDesc implements Serializable { // ============================================================================ public static interface IPartitionConditionBuilder { - String buildDateRangeCondition(PartitionDesc partDesc, ISegment seg, SegmentRange segRange); + String buildDateRangeCondition(PartitionDesc partDesc, ISegment seg, SegmentRange segRange, Function<TblColRef, String> quoteFunc); } public static class DefaultPartitionConditionBuilder implements IPartitionConditionBuilder, Serializable { @Override - public String buildDateRangeCondition(PartitionDesc partDesc, ISegment seg, SegmentRange segRange) { + public String buildDateRangeCondition(PartitionDesc partDesc, ISegment seg, SegmentRange segRange, Function<TblColRef, String> quoteFunc) { long startInclusive = (Long) segRange.start.v; long endExclusive = (Long) segRange.end.v; TblColRef partitionDateColumn = partDesc.getPartitionDateColumnRef(); TblColRef partitionTimeColumn = partDesc.getPartitionTimeColumnRef(); + if (partitionDateColumn != null) { + partitionDateColumn.setQuotedFunc(quoteFunc); + } + if (partitionTimeColumn != null) { + partitionTimeColumn.setQuotedFunc(quoteFunc); + } + StringBuilder builder = new StringBuilder(); if (partDesc.partitionColumnIsYmdInt()) { @@ -224,7 +232,7 @@ public class PartitionDesc implements Serializable { private static void buildSingleColumnRangeCondAsTimeMillis(StringBuilder builder, TblColRef partitionColumn, long startInclusive, long endExclusive) { - String partitionColumnName = partitionColumn.getIdentity(); + String partitionColumnName = partitionColumn.getQuotedIdentity(); builder.append(partitionColumnName + " >= " + startInclusive); builder.append(" AND "); builder.append(partitionColumnName + " < " + endExclusive); @@ -232,7 +240,7 @@ public class PartitionDesc implements Serializable { private static void buildSingleColumnRangeCondAsYmdInt(StringBuilder builder, TblColRef partitionColumn, long startInclusive, long endExclusive, String partitionColumnDateFormat) { - String partitionColumnName = partitionColumn.getIdentity(); + String partitionColumnName = partitionColumn.getQuotedIdentity(); builder.append(partitionColumnName + " >= " + DateFormat.formatToDateStr(startInclusive, partitionColumnDateFormat)); builder.append(" AND "); @@ -242,7 +250,7 @@ public class PartitionDesc implements Serializable { private static void buildSingleColumnRangeCondition(StringBuilder builder, TblColRef partitionColumn, long startInclusive, long endExclusive, String partitionColumnDateFormat) { - String partitionColumnName = partitionColumn.getIdentity(); + String partitionColumnName = partitionColumn.getQuotedIdentity(); if (endExclusive <= startInclusive) { builder.append("1=0"); @@ -267,8 +275,8 @@ public class PartitionDesc implements Serializable { private static void buildMultipleColumnRangeCondition(StringBuilder builder, TblColRef partitionDateColumn, TblColRef partitionTimeColumn, long startInclusive, long endExclusive, String partitionColumnDateFormat, String partitionColumnTimeFormat, boolean partitionDateColumnIsYmdInt) { - String partitionDateColumnName = partitionDateColumn.getIdentity(); - String partitionTimeColumnName = partitionTimeColumn.getIdentity(); + String partitionDateColumnName = partitionDateColumn.getQuotedIdentity(); + String partitionTimeColumnName = partitionTimeColumn.getQuotedIdentity(); String singleQuotation = partitionDateColumnIsYmdInt ? "" : "'"; builder.append("("); builder.append("("); @@ -308,11 +316,14 @@ public class PartitionDesc implements Serializable { public static class YearMonthDayPartitionConditionBuilder implements IPartitionConditionBuilder { @Override - public String buildDateRangeCondition(PartitionDesc partDesc, ISegment seg, SegmentRange segRange) { + public String buildDateRangeCondition(PartitionDesc partDesc, ISegment seg, SegmentRange segRange, Function<TblColRef, String> func) { long startInclusive = (Long) segRange.start.v; long endExclusive = (Long) segRange.end.v; TblColRef partitionColumn = partDesc.getPartitionDateColumnRef(); + if (partitionColumn != null) { + partitionColumn.setQuotedFunc(func); + } String tableAlias = partitionColumn.getTableAlias(); String concatField = String.format(Locale.ROOT, "CONCAT(%s.YEAR,'-',%s.MONTH,'-',%s.DAY)", tableAlias, diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/TblColRef.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/TblColRef.java index 918eedf..0dc08a9 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/TblColRef.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/TblColRef.java @@ -23,6 +23,8 @@ import static com.google.common.base.Preconditions.checkArgument; import java.io.Serializable; import java.util.Locale; +import java.util.function.Function; + import org.apache.commons.lang.StringUtils; import org.apache.kylin.metadata.datatype.DataType; @@ -120,6 +122,15 @@ public class TblColRef implements Serializable { private String identity; private String parserDescription; + /** + * Function used to get quoted identitier + */ + private transient Function<TblColRef, String> quotedFunc; + + public void setQuotedFunc(Function<TblColRef, String> quotedFunc) { + this.quotedFunc = quotedFunc; + } + TblColRef(ColumnDesc column) { this.column = column; } @@ -238,6 +249,13 @@ public class TblColRef implements Serializable { return identity; } + public String getQuotedIdentity() { + if (quotedFunc == null) + return getIdentity(); + else + return quotedFunc.apply(this); + } + @Override public String toString() { if (isInnerColumn() && parserDescription != null) diff --git a/core-metadata/src/test/java/org/apache/kylin/metadata/model/DefaultPartitionConditionBuilderTest.java b/core-metadata/src/test/java/org/apache/kylin/metadata/model/DefaultPartitionConditionBuilderTest.java index b536e29..438fb4a 100644 --- a/core-metadata/src/test/java/org/apache/kylin/metadata/model/DefaultPartitionConditionBuilderTest.java +++ b/core-metadata/src/test/java/org/apache/kylin/metadata/model/DefaultPartitionConditionBuilderTest.java @@ -53,12 +53,12 @@ public class DefaultPartitionConditionBuilderTest extends LocalFileMetadataTestC partitionDesc.setPartitionDateColumn(col.getCanonicalName()); partitionDesc.setPartitionDateFormat("yyyy-MM-dd"); TSRange range = new TSRange(DateFormat.stringToMillis("2016-02-22"), DateFormat.stringToMillis("2016-02-23")); - String condition = partitionConditionBuilder.buildDateRangeCondition(partitionDesc, null, range); + String condition = partitionConditionBuilder.buildDateRangeCondition(partitionDesc, null, range, null); Assert.assertEquals("UNKNOWN_ALIAS.DATE_COLUMN >= '2016-02-22' AND UNKNOWN_ALIAS.DATE_COLUMN < '2016-02-23'", condition); range = new TSRange(0L, 0L); - condition = partitionConditionBuilder.buildDateRangeCondition(partitionDesc, null, range); + condition = partitionConditionBuilder.buildDateRangeCondition(partitionDesc, null, range, null); Assert.assertEquals("1=0", condition); } @@ -71,7 +71,7 @@ public class DefaultPartitionConditionBuilderTest extends LocalFileMetadataTestC partitionDesc.setPartitionTimeFormat("HH"); TSRange range = new TSRange(DateFormat.stringToMillis("2016-02-22 00:00:00"), DateFormat.stringToMillis("2016-02-23 01:00:00")); - String condition = partitionConditionBuilder.buildDateRangeCondition(partitionDesc, null, range); + String condition = partitionConditionBuilder.buildDateRangeCondition(partitionDesc, null, range, null); Assert.assertEquals("UNKNOWN_ALIAS.HOUR_COLUMN >= '00' AND UNKNOWN_ALIAS.HOUR_COLUMN < '01'", condition); } @@ -88,7 +88,7 @@ public class DefaultPartitionConditionBuilderTest extends LocalFileMetadataTestC partitionDesc.setPartitionTimeFormat("H"); TSRange range = new TSRange(DateFormat.stringToMillis("2016-02-22 00:00:00"), DateFormat.stringToMillis("2016-02-23 01:00:00")); - String condition = partitionConditionBuilder.buildDateRangeCondition(partitionDesc, null, range); + String condition = partitionConditionBuilder.buildDateRangeCondition(partitionDesc, null, range, null); Assert.assertEquals( "((UNKNOWN_ALIAS.DATE_COLUMN = '2016-02-22' AND UNKNOWN_ALIAS.HOUR_COLUMN >= '0') OR (UNKNOWN_ALIAS.DATE_COLUMN > '2016-02-22')) AND ((UNKNOWN_ALIAS.DATE_COLUMN = '2016-02-23' AND UNKNOWN_ALIAS.HOUR_COLUMN < '1') OR (UNKNOWN_ALIAS.DATE_COLUMN < '2016-02-23'))", condition); diff --git a/pom.xml b/pom.xml index 84253be..3c691d8 100644 --- a/pom.xml +++ b/pom.xml @@ -1640,6 +1640,9 @@ <groupId>org.apache.rat</groupId> <artifactId>apache-rat-plugin</artifactId> <configuration> + <!-- Used to print file with unapproved licenses in project to stand output --> + <consoleOutput>true</consoleOutput> + <!-- Exclude files/folders for apache release --> <excludes> <exclude>DEPENDENCIES</exclude> diff --git a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/JdbcDialect.java b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/JdbcDialect.java deleted file mode 100644 index 7e5ecee..0000000 --- a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/JdbcDialect.java +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ -package org.apache.kylin.source.jdbc; - -public class JdbcDialect { - public static final String DIALECT_VERTICA = "vertica"; - public static final String DIALECT_ORACLE = "oracle"; - public static final String DIALECT_MYSQL = "mysql"; - public static final String DIALECT_HIVE = "hive"; - public static final String DIALECT_MSSQL = "mssql"; -} diff --git a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/JdbcExplorer.java b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/JdbcExplorer.java index 7eb4fa9..d728dcf 100644 --- a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/JdbcExplorer.java +++ b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/JdbcExplorer.java @@ -30,6 +30,7 @@ import java.util.UUID; import org.apache.commons.lang3.StringUtils; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.SourceDialect; import org.apache.kylin.common.util.DBUtils; import org.apache.kylin.common.util.Pair; import org.apache.kylin.common.util.RandomUtil; @@ -50,7 +51,7 @@ public class JdbcExplorer implements ISourceMetadataExplorer, ISampleDataDeploye private static final Logger logger = LoggerFactory.getLogger(JdbcExplorer.class); private final KylinConfig config; - private final String dialect; + private final SourceDialect dialect; private final DBConnConf dbconf; private final IJdbcMetadata jdbcMetadataDialect; @@ -61,7 +62,7 @@ public class JdbcExplorer implements ISourceMetadataExplorer, ISampleDataDeploye String jdbcUser = config.getJdbcSourceUser(); String jdbcPass = config.getJdbcSourcePass(); this.dbconf = new DBConnConf(driverClass, connectionUrl, jdbcUser, jdbcPass); - this.dialect = config.getJdbcSourceDialect(); + this.dialect = SourceDialect.getDialect(config.getJdbcSourceDialect()); this.jdbcMetadataDialect = JdbcMetadataFactory.getJdbcMetadata(dialect, dbconf); } @@ -117,7 +118,7 @@ public class JdbcExplorer implements ISourceMetadataExplorer, ISampleDataDeploye } private String getSqlDataType(String javaDataType) { - if (JdbcDialect.DIALECT_VERTICA.equals(dialect) || JdbcDialect.DIALECT_MSSQL.equals(dialect)) { + if (SourceDialect.VERTICA.equals(dialect) || SourceDialect.MYSQL.equals(dialect)) { if (javaDataType.toLowerCase(Locale.ROOT).equals("double")) { return "float"; } @@ -132,9 +133,9 @@ public class JdbcExplorer implements ISourceMetadataExplorer, ISampleDataDeploye } private String generateCreateSchemaSql(String schemaName) { - if (JdbcDialect.DIALECT_VERTICA.equals(dialect) || JdbcDialect.DIALECT_MYSQL.equals(dialect)) { + if (SourceDialect.VERTICA.equals(dialect) || SourceDialect.MYSQL.equals(dialect)) { return String.format(Locale.ROOT, "CREATE schema IF NOT EXISTS %s", schemaName); - } else if (JdbcDialect.DIALECT_MSSQL.equals(dialect)) { + } else if (SourceDialect.SQL_SERVER.equals(dialect)) { return String.format(Locale.ROOT, "IF NOT EXISTS (SELECT name FROM sys.schemas WHERE name = N'%s') EXEC('CREATE SCHEMA" + " [%s] AUTHORIZATION [dbo]')", @@ -151,13 +152,13 @@ public class JdbcExplorer implements ISourceMetadataExplorer, ISampleDataDeploye } private String generateLoadDataSql(String tableName, String tableFileDir) { - if (JdbcDialect.DIALECT_VERTICA.equals(dialect)) { + if (SourceDialect.VERTICA.equals(dialect)) { return String.format(Locale.ROOT, "copy %s from local '%s/%s.csv' delimiter as ',';", tableName, tableFileDir, tableName); - } else if (JdbcDialect.DIALECT_MYSQL.equals(dialect)) { + } else if (SourceDialect.MYSQL.equals(dialect)) { return String.format(Locale.ROOT, "LOAD DATA INFILE '%s/%s.csv' INTO %s FIELDS TERMINATED BY ',';", tableFileDir, tableName, tableName); - } else if (JdbcDialect.DIALECT_MSSQL.equals(dialect)) { + } else if (SourceDialect.SQL_SERVER.equals(dialect)) { return String.format(Locale.ROOT, "BULK INSERT %s FROM '%s/%s.csv' WITH(FIELDTERMINATOR = ',')", tableName, tableFileDir, tableName); } else { diff --git a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveInputBase.java b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveInputBase.java index 20f2dcb..94594f3 100644 --- a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveInputBase.java +++ b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveInputBase.java @@ -18,28 +18,43 @@ package org.apache.kylin.source.jdbc; import com.google.common.collect.Maps; +import org.apache.commons.lang3.StringUtils; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.SourceDialect; import org.apache.kylin.common.util.SourceConfigurationUtil; import org.apache.kylin.engine.mr.steps.CubingExecutableUtil; import org.apache.kylin.job.JoinedFlatTable; import org.apache.kylin.job.constant.ExecutableConstants; import org.apache.kylin.job.execution.AbstractExecutable; import org.apache.kylin.job.execution.DefaultChainedExecutable; -import org.apache.kylin.job.util.FlatTableSqlQuoteUtils; import org.apache.kylin.metadata.TableMetadataManager; +import org.apache.kylin.metadata.model.DataModelDesc; import org.apache.kylin.metadata.model.IJoinedFlatTableDesc; +import org.apache.kylin.metadata.model.JoinDesc; +import org.apache.kylin.metadata.model.JoinTableDesc; import org.apache.kylin.metadata.model.PartitionDesc; import org.apache.kylin.metadata.model.SegmentRange; import org.apache.kylin.metadata.model.TableExtDesc; import org.apache.kylin.metadata.model.TableRef; import org.apache.kylin.metadata.model.TblColRef; +import org.apache.kylin.source.hive.DBConnConf; import org.apache.kylin.source.hive.HiveInputBase; +import org.apache.kylin.source.jdbc.metadata.IJdbcMetadata; +import org.apache.kylin.source.jdbc.metadata.JdbcMetadataFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.ResultSet; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Set; +import java.util.TreeMap; public class JdbcHiveInputBase extends HiveInputBase { private static final Logger logger = LoggerFactory.getLogger(JdbcHiveInputBase.class); @@ -47,9 +62,66 @@ public class JdbcHiveInputBase extends HiveInputBase { private static final String DEFAULT_QUEUE = "default"; public static class JdbcBaseBatchCubingInputSide extends BaseBatchCubingInputSide { + private IJdbcMetadata jdbcMetadataDialect; + private DBConnConf dbconf; + private SourceDialect dialect; + private final Map<String, String> metaMap = new TreeMap<>(); public JdbcBaseBatchCubingInputSide(IJoinedFlatTableDesc flatDesc) { super(flatDesc); + KylinConfig config = KylinConfig.getInstanceFromEnv(); + String connectionUrl = config.getJdbcSourceConnectionUrl(); + String driverClass = config.getJdbcSourceDriver(); + String jdbcUser = config.getJdbcSourceUser(); + String jdbcPass = config.getJdbcSourcePass(); + dbconf = new DBConnConf(driverClass, connectionUrl, jdbcUser, jdbcPass); + dialect = SourceDialect.getDialect(config.getJdbcSourceDialect()); + jdbcMetadataDialect = JdbcMetadataFactory.getJdbcMetadata(dialect, dbconf); + calCachedJdbcMeta(metaMap, dbconf, jdbcMetadataDialect); + if (logger.isTraceEnabled()) { + StringBuilder dumpInfo = new StringBuilder(); + metaMap.forEach((k, v) -> dumpInfo.append("CachedMetadata: ").append(k).append(" => ").append(v) + .append(System.lineSeparator())); + logger.trace(dumpInfo.toString()); + } + } + + /** + * Fetch and cache metadata from JDBC API, which will help to resolve + * case-sensitive problem of sql identifier + * + * @param metadataMap a Map which mapping upper case identifier to real/original identifier + */ + public static void calCachedJdbcMeta(Map<String, String> metadataMap, DBConnConf dbconf, + IJdbcMetadata jdbcMetadataDialect) { + try (Connection connection = SqlUtil.getConnection(dbconf)) { + DatabaseMetaData databaseMetaData = connection.getMetaData(); + for (String database : jdbcMetadataDialect.listDatabases()) { + metadataMap.put(database.toUpperCase(Locale.ROOT), database); + ResultSet tableRs = jdbcMetadataDialect.getTable(databaseMetaData, database, null); + while (tableRs.next()) { + String tableName = tableRs.getString("TABLE_NAME"); + ResultSet colRs = jdbcMetadataDialect.listColumns(databaseMetaData, database, tableName); + while (colRs.next()) { + String colName = colRs.getString("COLUMN_NAME"); + colName = database + "." + tableName + "." + colName; + metadataMap.put(colName.toUpperCase(Locale.ROOT), colName); + } + colRs.close(); + tableName = database + "." + tableName; + metadataMap.put(tableName.toUpperCase(Locale.ROOT), tableName); + } + tableRs.close(); + } + } catch (IllegalStateException e) { + if (SqlUtil.DRIVER_MISS.equalsIgnoreCase(e.getMessage())) { + logger.warn("Ignore JDBC Driver Missing in yarn node.", e); + } else { + throw e; + } + } catch (Exception e) { + throw new IllegalStateException("Error when connect to JDBC source " + dbconf.getUrl(), e); + } } protected KylinConfig getConfig() { @@ -148,22 +220,19 @@ public class JdbcHiveInputBase extends HiveInputBase { partCol = partitionDesc.getPartitionDateColumn();//tablename.colname } - String splitTable; String splitTableAlias; String splitColumn; String splitDatabase; TblColRef splitColRef = determineSplitColumn(); - splitTable = splitColRef.getTableRef().getTableName(); splitTableAlias = splitColRef.getTableAlias(); - splitColumn = JoinedFlatTable.getQuotedColExpressionInSourceDB(flatDesc, splitColRef); + + splitColumn = getColumnIdentityQuoted(splitColRef, jdbcMetadataDialect, metaMap, true); splitDatabase = splitColRef.getColumnDesc().getTable().getDatabase(); - //using sqoop to extract data from jdbc source and dump them to hive - String selectSql = JoinedFlatTable.generateSelectDataStatement(flatDesc, true, new String[] { partCol }); + String selectSql = generateSelectDataStatementRDBMS(flatDesc, true, new String[] { partCol }, + jdbcMetadataDialect, metaMap); selectSql = escapeQuotationInSql(selectSql); - - String hiveTable = flatDesc.getTableName(); String connectionUrl = config.getJdbcSourceConnectionUrl(); String driverClass = config.getJdbcSourceDriver(); @@ -175,17 +244,19 @@ public class JdbcHiveInputBase extends HiveInputBase { String filedDelimiter = config.getJdbcSourceFieldDelimiter(); int mapperNum = config.getSqoopMapperNum(); - String bquery = String.format(Locale.ROOT, "SELECT min(%s), max(%s) FROM %s.%s as %s", splitColumn, - splitColumn, splitDatabase, splitTable, splitTableAlias); + String bquery = String.format(Locale.ROOT, "SELECT min(%s), max(%s) FROM %s.%s ", splitColumn, splitColumn, + getSchemaQuoted(metaMap, splitDatabase, jdbcMetadataDialect, true), + getTableIdentityQuoted(splitColRef.getTableRef(), metaMap, jdbcMetadataDialect, true)); if (partitionDesc.isPartitioned()) { SegmentRange segRange = flatDesc.getSegRange(); if (segRange != null && !segRange.isInfinite()) { if (partitionDesc.getPartitionDateColumnRef().getTableAlias().equals(splitTableAlias) && (partitionDesc.getPartitionTimeColumnRef() == null || partitionDesc - .getPartitionTimeColumnRef().getTableAlias().equals(splitTableAlias))) { - String quotedPartCond = FlatTableSqlQuoteUtils.quoteIdentifierInSqlExpr(flatDesc, - partitionDesc.getPartitionConditionBuilder().buildDateRangeCondition(partitionDesc, - flatDesc.getSegment(), segRange)); + .getPartitionTimeColumnRef().getTableAlias().equals(splitTableAlias))) { + + String quotedPartCond = partitionDesc.getPartitionConditionBuilder().buildDateRangeCondition( + partitionDesc, flatDesc.getSegment(), segRange, + col -> getTableColumnIdentityQuoted(col, jdbcMetadataDialect, metaMap, true)); bquery += " WHERE " + quotedPartCond; } } @@ -195,14 +266,13 @@ public class JdbcHiveInputBase extends HiveInputBase { // escape ` in cmd splitColumn = escapeQuotationInSql(splitColumn); - String cmd = String.format(Locale.ROOT, - "%s/bin/sqoop import" + generateSqoopConfigArgString() - + "--connect \"%s\" --driver %s --username %s --password \"%s\" --query \"%s AND \\$CONDITIONS\" " - + "--target-dir %s/%s --split-by %s --boundary-query \"%s\" --null-string '%s' " - + "--null-non-string '%s' --fields-terminated-by '%s' --num-mappers %d", - sqoopHome, connectionUrl, driverClass, jdbcUser, jdbcPass, selectSql, jobWorkingDir, hiveTable, - splitColumn, bquery, sqoopNullString, sqoopNullNonString, filedDelimiter, mapperNum); - logger.debug(String.format(Locale.ROOT, "sqoop cmd:%s", cmd)); + String cmd = String.format(Locale.ROOT, "%s/bin/sqoop import" + generateSqoopConfigArgString() + + "--connect \"%s\" --driver %s --username %s --password \"%s\" --query \"%s AND \\$CONDITIONS\" " + + "--target-dir %s/%s --split-by %s --boundary-query \"%s\" --null-string '%s' " + + "--null-non-string '%s' --fields-terminated-by '%s' --num-mappers %d", sqoopHome, connectionUrl, + driverClass, jdbcUser, jdbcPass, selectSql, jobWorkingDir, hiveTable, splitColumn, bquery, + sqoopNullString, sqoopNullNonString, filedDelimiter, mapperNum); + logger.debug("sqoop cmd : {}", cmd); CmdStep step = new CmdStep(); step.setCmd(cmd); step.setName(ExecutableConstants.STEP_NAME_SQOOP_TO_FLAT_HIVE_TABLE); @@ -212,7 +282,7 @@ public class JdbcHiveInputBase extends HiveInputBase { protected String generateSqoopConfigArgString() { KylinConfig kylinConfig = getConfig(); Map<String, String> config = Maps.newHashMap(); - config.put("mapreduce.job.queuename", getSqoopJobQueueName(kylinConfig)); // override job queue from mapreduce config + config.put(MR_OVERRIDE_QUEUE_KEY, getSqoopJobQueueName(kylinConfig)); // override job queue from mapreduce config config.putAll(SourceConfigurationUtil.loadSqoopConfiguration()); config.putAll(kylinConfig.getSqoopConfigOverride()); @@ -229,4 +299,232 @@ public class JdbcHiveInputBase extends HiveInputBase { sqlExpr = sqlExpr.replaceAll("`", "\\\\`"); return sqlExpr; } + + private static String generateSelectDataStatementRDBMS(IJoinedFlatTableDesc flatDesc, boolean singleLine, + String[] skipAs, IJdbcMetadata metadata, Map<String, String> metaMap) { + SourceDialect dialect = metadata.getDialect(); + final String sep = singleLine ? " " : "\n"; + + final List<String> skipAsList = (skipAs == null) ? new ArrayList<>() : Arrays.asList(skipAs); + + StringBuilder sql = new StringBuilder(); + sql.append("SELECT"); + sql.append(sep); + + for (int i = 0; i < flatDesc.getAllColumns().size(); i++) { + TblColRef col = flatDesc.getAllColumns().get(i); + if (i > 0) { + sql.append(","); + } + String colTotalName = String.format(Locale.ROOT, "%s.%s", col.getTableRef().getTableName(), col.getName()); + if (skipAsList.contains(colTotalName)) { + sql.append(getTableColumnIdentityQuoted(col, metadata, metaMap, true)).append(sep); + } else { + sql.append(getTableColumnIdentityQuoted(col, metadata, metaMap, true)).append(" as ") + .append(quoteIdentifier(JoinedFlatTable.colName(col), dialect)).append(sep); + } + } + appendJoinStatement(flatDesc, sql, singleLine, metadata, metaMap); + appendWhereStatement(flatDesc, sql, singleLine, metadata, metaMap); + return sql.toString(); + } + + private static void appendJoinStatement(IJoinedFlatTableDesc flatDesc, StringBuilder sql, boolean singleLine, + IJdbcMetadata metadata, Map<String, String> metaMap) { + final String sep = singleLine ? " " : "\n"; + Set<TableRef> dimTableCache = new HashSet<>(); + + DataModelDesc model = flatDesc.getDataModel(); + sql.append(" FROM ") + .append(getSchemaQuoted(metaMap, + flatDesc.getDataModel().getRootFactTable().getTableDesc().getDatabase(), metadata, true)) + .append(".") + .append(getTableIdentityQuoted(flatDesc.getDataModel().getRootFactTable(), metaMap, metadata, true)); + + sql.append(" "); + sql.append((getTableIdentityQuoted(flatDesc.getDataModel().getRootFactTable(), metaMap, metadata, true))) + .append(sep); + + for (JoinTableDesc lookupDesc : model.getJoinTables()) { + JoinDesc join = lookupDesc.getJoin(); + if (join != null && !join.getType().equals("")) { + TableRef dimTable = lookupDesc.getTableRef(); + if (!dimTableCache.contains(dimTable)) { + TblColRef[] pk = join.getPrimaryKeyColumns(); + TblColRef[] fk = join.getForeignKeyColumns(); + if (pk.length != fk.length) { + throw new RuntimeException("Invalid join condition of lookup table:" + lookupDesc); + } + String joinType = join.getType().toUpperCase(Locale.ROOT); + + sql.append(joinType).append(" JOIN ") + .append(getSchemaQuoted(metaMap, dimTable.getTableDesc().getDatabase(), metadata, true)) + .append(".").append(getTableIdentityQuoted(dimTable, metaMap, metadata, true)); + + sql.append(" "); + sql.append(getTableIdentityQuoted(dimTable, metaMap, metadata, true)).append(sep); + sql.append("ON "); + for (int i = 0; i < pk.length; i++) { + if (i > 0) { + sql.append(" AND "); + } + sql.append(getTableColumnIdentityQuoted(fk[i], metadata, metaMap, true)).append(" = ") + .append(getTableColumnIdentityQuoted(pk[i], metadata, metaMap, true)); + } + sql.append(sep); + dimTableCache.add(dimTable); + } + } + } + } + + private static void appendWhereStatement(IJoinedFlatTableDesc flatDesc, StringBuilder sql, boolean singleLine, + IJdbcMetadata metadata, Map<String, String> metaMap) { + final String sep = singleLine ? " " : "\n"; + + StringBuilder whereBuilder = new StringBuilder(); + whereBuilder.append("WHERE 1=1"); + + DataModelDesc model = flatDesc.getDataModel(); + if (StringUtils.isNotEmpty(model.getFilterCondition())) { + whereBuilder.append(" AND (").append(model.getFilterCondition()).append(") "); + } + + if (flatDesc.getSegment() != null) { + PartitionDesc partDesc = model.getPartitionDesc(); + if (partDesc != null && partDesc.getPartitionDateColumn() != null) { + SegmentRange segRange = flatDesc.getSegRange(); + + if (segRange != null && !segRange.isInfinite()) { + whereBuilder.append(" AND ("); + whereBuilder.append(partDesc.getPartitionConditionBuilder().buildDateRangeCondition(partDesc, + flatDesc.getSegment(), segRange, + col -> getTableColumnIdentityQuoted(col, metadata, metaMap, true))); + whereBuilder.append(")"); + whereBuilder.append(sep); + } + } + } + sql.append(whereBuilder.toString()); + } + + /** + * @return {TABLE_NAME}.{COLUMN_NAME} + */ + private static String getTableColumnIdentityQuoted(TblColRef col, IJdbcMetadata metadata, + Map<String, String> metaMap, boolean needQuote) { + String tblName = getTableIdentityQuoted(col.getTableRef(), metaMap, metadata, needQuote); + String colName = getColumnIdentityQuoted(col, metadata, metaMap, needQuote); + return tblName + "." + colName; + } + + /** + * @return {SCHEMA_NAME} + */ + static String getSchemaQuoted(Map<String, String> metaMap, String database, IJdbcMetadata metadata, + boolean needQuote) { + String databaseName = fetchValue(database, null, null, metaMap); + if (needQuote) { + return quoteIdentifier(databaseName, metadata.getDialect()); + } else { + return databaseName; + } + } + + /** + * @return {TABLE_NAME} + */ + static String getTableIdentityQuoted(TableRef tableRef, Map<String, String> metaMap, IJdbcMetadata metadata, + boolean needQuote) { + String value = fetchValue(tableRef.getTableDesc().getDatabase(), tableRef.getTableDesc().getName(), null, + metaMap); + String[] res = value.split("\\."); + value = res[res.length - 1]; + if (needQuote) { + return quoteIdentifier(value, metadata.getDialect()); + } else { + return value; + } + } + + /** + * @return {TABLE_NAME} + */ + static String getTableIdentityQuoted(String database, String table, Map<String, String> metaMap, + IJdbcMetadata metadata, boolean needQuote) { + String value = fetchValue(database, table, null, metaMap); + String[] res = value.split("\\."); + value = res[res.length - 1]; + if (needQuote) { + return quoteIdentifier(value, metadata.getDialect()); + } else { + return value; + } + } + + /** + * @return {COLUMN_NAME} + */ + private static String getColumnIdentityQuoted(TblColRef tblColRef, IJdbcMetadata metadata, + Map<String, String> metaMap, boolean needQuote) { + String value = fetchValue(tblColRef.getTableRef().getTableDesc().getDatabase(), + tblColRef.getTableRef().getTableDesc().getName(), tblColRef.getName(), metaMap); + String[] res = value.split("\\."); + value = res[res.length - 1]; + if (needQuote) { + return quoteIdentifier(value, metadata.getDialect()); + } else { + return value; + } + } + + /** + * Quote the identifier acccording to sql dialect, as far as I know, + * MySQL use backtick(`), oracle 11g use double quotation("), sql server 2017 + * use square brackets([ or ]) as quote character. + * + * @param identifier something looks like tableA.columnB + */ + static String quoteIdentifier(String identifier, SourceDialect dialect) { + if (KylinConfig.getInstanceFromEnv().enableHiveDdlQuote()) { + String[] identifierArray = identifier.split("\\."); + String quoted = ""; + for (int i = 0; i < identifierArray.length; i++) { + switch (dialect) { + case SQL_SERVER: + identifierArray[i] = "[" + identifierArray[i] + "]"; + break; + case MYSQL: + case HIVE: + identifierArray[i] = "`" + identifierArray[i] + "`"; + break; + default: + String quote = KylinConfig.getInstanceFromEnv().getQuoteCharacter(); + identifierArray[i] = quote + identifierArray[i] + quote; + } + } + quoted = String.join(".", identifierArray); + return quoted; + } else { + return identifier; + } + } + + static String fetchValue(String database, String table, String column, Map<String, String> metadataMap) { + String key; + if (table == null && column == null) { + key = database; + } else if (column == null) { + key = database + "." + table; + } else { + key = database + "." + table + "." + column; + } + String val = metadataMap.get(key.toUpperCase(Locale.ROOT)); + if (val == null) { + logger.warn("Not find for {} from metadata cache.", key); + return key; + } else { + return val; + } + } } diff --git a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/JdbcTableReader.java b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/JdbcTableReader.java index 3c2b4f9..1c689dd 100644 --- a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/JdbcTableReader.java +++ b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/JdbcTableReader.java @@ -23,10 +23,15 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; import java.util.Locale; +import java.util.Map; +import java.util.TreeMap; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.SourceDialect; import org.apache.kylin.source.IReadableTable.TableReader; import org.apache.kylin.source.hive.DBConnConf; +import org.apache.kylin.source.jdbc.metadata.IJdbcMetadata; +import org.apache.kylin.source.jdbc.metadata.JdbcMetadataFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,7 +66,15 @@ public class JdbcTableReader implements TableReader { String jdbcPass = config.getJdbcSourcePass(); dbconf = new DBConnConf(driverClass, connectionUrl, jdbcUser, jdbcPass); jdbcCon = SqlUtil.getConnection(dbconf); - String sql = String.format(Locale.ROOT, "select * from %s.%s", dbName, tableName); + IJdbcMetadata meta = JdbcMetadataFactory + .getJdbcMetadata(SourceDialect.getDialect(config.getJdbcSourceDialect()), dbconf); + + Map<String, String> metadataCache = new TreeMap<>(); + JdbcHiveInputBase.JdbcBaseBatchCubingInputSide.calCachedJdbcMeta(metadataCache, dbconf, meta); + String database = JdbcHiveInputBase.getSchemaQuoted(metadataCache, dbName, meta, true); + String table = JdbcHiveInputBase.getTableIdentityQuoted(dbName, tableName, metadataCache, meta, true); + + String sql = String.format(Locale.ROOT, "select * from %s.%s", database, table); try { statement = jdbcCon.createStatement(); rs = statement.executeQuery(sql); diff --git a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/SqlUtil.java b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/SqlUtil.java index 5242832..9299d78 100644 --- a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/SqlUtil.java +++ b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/SqlUtil.java @@ -23,7 +23,6 @@ import java.sql.SQLException; import java.sql.Statement; import java.sql.Types; import java.util.Random; - import org.apache.kylin.metadata.datatype.DataType; import org.apache.kylin.source.hive.DBConnConf; import org.slf4j.Logger; @@ -62,6 +61,7 @@ public class SqlUtil { } public static final int tryTimes = 5; + public static final String DRIVER_MISS = "DRIVER_MISS"; public static Connection getConnection(DBConnConf dbconf) { if (dbconf.getUrl() == null) @@ -70,7 +70,8 @@ public class SqlUtil { try { Class.forName(dbconf.getDriver()); } catch (Exception e) { - logger.error("", e); + logger.error("Miss Driver", e); + throw new IllegalStateException(DRIVER_MISS); } boolean got = false; int times = 0; diff --git a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/extensible/JdbcHiveInputBase.java b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/extensible/JdbcHiveInputBase.java index 9fd6d30..fcafae2 100644 --- a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/extensible/JdbcHiveInputBase.java +++ b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/extensible/JdbcHiveInputBase.java @@ -94,7 +94,7 @@ public class JdbcHiveInputBase extends org.apache.kylin.source.jdbc.JdbcHiveInpu .getPartitionTimeColumnRef().getTableAlias().equals(splitTableAlias))) { String quotedPartCond = FlatTableSqlQuoteUtils.quoteIdentifierInSqlExpr(flatDesc, partitionDesc.getPartitionConditionBuilder().buildDateRangeCondition(partitionDesc, - flatDesc.getSegment(), segRange)); + flatDesc.getSegment(), segRange, null)); bquery += " WHERE " + quotedPartCond; } } diff --git a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/metadata/DefaultJdbcMetadata.java b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/metadata/DefaultJdbcMetadata.java index 0842199..b9c65fc 100644 --- a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/metadata/DefaultJdbcMetadata.java +++ b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/metadata/DefaultJdbcMetadata.java @@ -23,8 +23,8 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.util.ArrayList; import java.util.List; - import java.util.Locale; +import org.apache.kylin.common.SourceDialect; import org.apache.kylin.source.hive.DBConnConf; import org.apache.kylin.source.jdbc.SqlUtil; import org.slf4j.Logger; @@ -74,4 +74,8 @@ public class DefaultJdbcMetadata implements IJdbcMetadata { public ResultSet listColumns(final DatabaseMetaData dbmd, String schema, String table) throws SQLException { return dbmd.getColumns(null, schema, table, null); } + + public SourceDialect getDialect() { + return SourceDialect.UNKNOWN; + } } diff --git a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/metadata/IJdbcMetadata.java b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/metadata/IJdbcMetadata.java index 169fe60..f41c3e8 100644 --- a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/metadata/IJdbcMetadata.java +++ b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/metadata/IJdbcMetadata.java @@ -17,12 +17,16 @@ */ package org.apache.kylin.source.jdbc.metadata; +import org.apache.kylin.common.SourceDialect; + import java.sql.DatabaseMetaData; import java.sql.ResultSet; import java.sql.SQLException; import java.util.List; public interface IJdbcMetadata { + SourceDialect getDialect(); + List<String> listDatabases() throws SQLException; List<String> listTables(String database) throws SQLException; diff --git a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/metadata/JdbcMetadataFactory.java b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/metadata/JdbcMetadataFactory.java index ae4c0ff..498bc09 100644 --- a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/metadata/JdbcMetadataFactory.java +++ b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/metadata/JdbcMetadataFactory.java @@ -17,17 +17,19 @@ */ package org.apache.kylin.source.jdbc.metadata; -import java.util.Locale; +import org.apache.kylin.common.SourceDialect; import org.apache.kylin.source.hive.DBConnConf; -import org.apache.kylin.source.jdbc.JdbcDialect; -public abstract class JdbcMetadataFactory { - public static IJdbcMetadata getJdbcMetadata(String dialect, final DBConnConf dbConnConf) { - String jdbcDialect = (null == dialect) ? "" : dialect.toLowerCase(Locale.ROOT); +public class JdbcMetadataFactory { + + private JdbcMetadataFactory() { + } + + public static IJdbcMetadata getJdbcMetadata(SourceDialect jdbcDialect, final DBConnConf dbConnConf) { switch (jdbcDialect) { - case (JdbcDialect.DIALECT_MSSQL): + case SQL_SERVER: return new SQLServerJdbcMetadata(dbConnConf); - case (JdbcDialect.DIALECT_MYSQL): + case MYSQL: return new MySQLJdbcMetadata(dbConnConf); default: return new DefaultJdbcMetadata(dbConnConf); diff --git a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/metadata/MySQLJdbcMetadata.java b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/metadata/MySQLJdbcMetadata.java index 54c2a03..e3c523c 100644 --- a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/metadata/MySQLJdbcMetadata.java +++ b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/metadata/MySQLJdbcMetadata.java @@ -24,6 +24,7 @@ import java.sql.SQLException; import java.util.ArrayList; import java.util.List; +import org.apache.kylin.common.SourceDialect; import org.apache.kylin.source.hive.DBConnConf; import org.apache.kylin.source.jdbc.SqlUtil; @@ -64,4 +65,9 @@ public class MySQLJdbcMetadata extends DefaultJdbcMetadata { public ResultSet getTable(final DatabaseMetaData dbmd, String catalog, String table) throws SQLException { return dbmd.getTables(catalog, null, table, null); } + + @Override + public SourceDialect getDialect() { + return SourceDialect.MYSQL; + } } diff --git a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/metadata/SQLServerJdbcMetadata.java b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/metadata/SQLServerJdbcMetadata.java index 5373672..696a350 100644 --- a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/metadata/SQLServerJdbcMetadata.java +++ b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/metadata/SQLServerJdbcMetadata.java @@ -26,6 +26,7 @@ import java.util.List; import java.util.Set; import org.apache.commons.lang3.StringUtils; +import org.apache.kylin.common.SourceDialect; import org.apache.kylin.source.hive.DBConnConf; import org.apache.kylin.source.jdbc.SqlUtil; @@ -59,4 +60,9 @@ public class SQLServerJdbcMetadata extends DefaultJdbcMetadata { } return new ArrayList<>(ret); } + + @Override + public SourceDialect getDialect() { + return SourceDialect.SQL_SERVER; + } } diff --git a/source-jdbc/src/test/java/org/apache/kylin/source/jdbc/JdbcExplorerTest.java b/source-jdbc/src/test/java/org/apache/kylin/source/jdbc/JdbcExplorerTest.java index a0df4f4..ed3d181 100644 --- a/source-jdbc/src/test/java/org/apache/kylin/source/jdbc/JdbcExplorerTest.java +++ b/source-jdbc/src/test/java/org/apache/kylin/source/jdbc/JdbcExplorerTest.java @@ -18,7 +18,6 @@ package org.apache.kylin.source.jdbc; import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; @@ -35,6 +34,7 @@ import java.util.List; import java.util.Locale; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.SourceDialect; import org.apache.kylin.common.util.LocalFileMetadataTestCase; import org.apache.kylin.common.util.Pair; import org.apache.kylin.metadata.model.ColumnDesc; @@ -83,7 +83,7 @@ public class JdbcExplorerTest extends LocalFileMetadataTestCase { PowerMockito.stub(PowerMockito.method(SqlUtil.class, "getConnection")).toReturn(connection); PowerMockito.mockStatic(JdbcMetadataFactory.class); - when(JdbcMetadataFactory.getJdbcMetadata(anyString(), any(DBConnConf.class))).thenReturn(jdbcMetadata); + when(JdbcMetadataFactory.getJdbcMetadata(any(SourceDialect.class), any(DBConnConf.class))).thenReturn(jdbcMetadata); when(connection.getMetaData()).thenReturn(dbmd); jdbcExplorer = spy(JdbcExplorer.class); diff --git a/source-jdbc/src/test/java/org/apache/kylin/source/jdbc/JdbcHiveInputBaseTest.java b/source-jdbc/src/test/java/org/apache/kylin/source/jdbc/JdbcHiveInputBaseTest.java new file mode 100644 index 0000000..f6415e6 --- /dev/null +++ b/source-jdbc/src/test/java/org/apache/kylin/source/jdbc/JdbcHiveInputBaseTest.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kylin.source.jdbc; + +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.SourceDialect; +import org.apache.kylin.common.util.LocalFileMetadataTestCase; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.sql.SQLException; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertEquals; + +public class JdbcHiveInputBaseTest extends LocalFileMetadataTestCase { + + @BeforeClass + public static void setupClass() throws SQLException { + staticCreateTestMetadata(); + KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); + kylinConfig.setProperty("kylin.source.hive.quote-enabled", "true"); + } + + @Test + public void testFetchValue() { + Map<String, String> map = new HashMap<>(); + String guess = JdbcHiveInputBase.fetchValue("DB_1", "TB_2", "COL_3", map); + + // not found, return input value + assertEquals("DB_1.TB_2.COL_3", guess); + map.put("DB_1.TB_2.COL_3", "Db_1.Tb_2.Col_3"); + + guess = JdbcHiveInputBase.fetchValue("DB_1", "TB_2", "COL_3", map); + // found, return cached value + assertEquals("Db_1.Tb_2.Col_3", guess); + } + + @Test + public void testQuoteIdentifier() { + String guess = JdbcHiveInputBase.quoteIdentifier("Tbl1.Col1", SourceDialect.MYSQL); + assertEquals("`Tbl1`.`Col1`", guess); + guess = JdbcHiveInputBase.quoteIdentifier("Tbl1.Col1", SourceDialect.SQL_SERVER); + assertEquals("[Tbl1].[Col1]", guess); + } + + @AfterClass + public static void clenup() { + staticCleanupTestMetadata(); + } +}