Repository: lens Updated Branches: refs/heads/master 3576207a1 -> 3563aacf7
LENS-710 : Allow column name mapping for few/all columns in underlying storage tables Project: http://git-wip-us.apache.org/repos/asf/lens/repo Commit: http://git-wip-us.apache.org/repos/asf/lens/commit/3563aacf Tree: http://git-wip-us.apache.org/repos/asf/lens/tree/3563aacf Diff: http://git-wip-us.apache.org/repos/asf/lens/diff/3563aacf Branch: refs/heads/master Commit: 3563aacf7c41a257d4c306153555099333ed5a47 Parents: 3576207 Author: Amareshwari Sriramadasu <[email protected]> Authored: Tue Sep 8 21:49:39 2015 +0530 Committer: Rajat Khandelwal <[email protected]> Committed: Tue Sep 8 21:49:39 2015 +0530 ---------------------------------------------------------------------- lens-api/src/main/resources/cube-0.1.xsd | 9 + .../lens/driver/jdbc/ColumnarSQLRewriter.java | 179 ++++++++++++++----- .../driver/jdbc/TestColumnarSQLRewriter.java | 91 +++++++++- .../lens/server/api/LensConfConstants.java | 5 + 4 files changed, 232 insertions(+), 52 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/lens/blob/3563aacf/lens-api/src/main/resources/cube-0.1.xsd ---------------------------------------------------------------------- diff --git a/lens-api/src/main/resources/cube-0.1.xsd b/lens-api/src/main/resources/cube-0.1.xsd index 0a981dd..58f68f5 100644 --- a/lens-api/src/main/resources/cube-0.1.xsd +++ b/lens-api/src/main/resources/cube-0.1.xsd @@ -811,6 +811,15 @@ <xs:annotation> <xs:documentation> Table properties. + The following properties can be specified for DBStorage table : + 1. lens.metastore.native.db.name : The underlying databse name in DB storage. + 2. lens.metastore.native.table.name : The underlying table name in DB storage. + 3. lens.metastore.native.table.column.mapping : The column mapping for columns of the table if they are + different in underlying DB storage. The value is specified with comma separated map entries specified with + key-values separated by equalto. Example value: id=id1,name=name1 + The following properties can be specified for Elastic search tables : + 1. lens.metastore.es.index.name : The underlying ES index name. + 2. lens.metastore.es.type.name : The underlying ES type name. </xs:documentation> </xs:annotation> </xs:element> http://git-wip-us.apache.org/repos/asf/lens/blob/3563aacf/lens-driver-jdbc/src/main/java/org/apache/lens/driver/jdbc/ColumnarSQLRewriter.java ---------------------------------------------------------------------- diff --git a/lens-driver-jdbc/src/main/java/org/apache/lens/driver/jdbc/ColumnarSQLRewriter.java b/lens-driver-jdbc/src/main/java/org/apache/lens/driver/jdbc/ColumnarSQLRewriter.java index 9ceb9f3..295b476 100644 --- a/lens-driver-jdbc/src/main/java/org/apache/lens/driver/jdbc/ColumnarSQLRewriter.java +++ b/lens-driver-jdbc/src/main/java/org/apache/lens/driver/jdbc/ColumnarSQLRewriter.java @@ -42,6 +42,8 @@ import org.apache.hadoop.hive.ql.parse.SemanticException; import org.antlr.runtime.CommonToken; +import lombok.Getter; +import lombok.NoArgsConstructor; import lombok.extern.slf4j.Slf4j; /** @@ -147,24 +149,31 @@ public class ColumnarSQLRewriter implements QueryRewriter { private String fromTree; /** The join ast. */ + @Getter private ASTNode joinAST; /** The having ast. */ + @Getter private ASTNode havingAST; /** The select ast. */ + @Getter private ASTNode selectAST; /** The where ast. */ + @Getter private ASTNode whereAST; /** The order by ast. */ + @Getter private ASTNode orderByAST; /** The group by ast. */ + @Getter private ASTNode groupByAST; /** The from ast. */ + @Getter protected ASTNode fromAST; /** @@ -944,7 +953,7 @@ public class ColumnarSQLRewriter implements QueryRewriter { */ public void buildQuery(Configuration conf, HiveConf hconf) throws SemanticException { analyzeInternal(conf, hconf); - replaceWithUnderlyingStorage(hconf, fromAST); + replaceWithUnderlyingStorage(hconf); replaceAliasInAST(); getFilterInJoinCond(fromAST); getAggregateColumns(selectAST, new MutableInt(0)); @@ -1187,67 +1196,143 @@ public class ColumnarSQLRewriter implements QueryRewriter { return queryReplacedUdf; } - // Replace Lens database names with storage's proper DB and table name based - // on table properties. + + @NoArgsConstructor + private static class NativeTableInfo { + private Map<String, String> columnMapping = new HashMap<>(); + NativeTableInfo(Table tbl) { + String columnMappingProp = tbl.getProperty(LensConfConstants.NATIVE_TABLE_COLUMN_MAPPING); + if (StringUtils.isNotBlank(columnMappingProp)) { + String[] columnMapArray = StringUtils.split(columnMappingProp, ","); + for (String columnMapEntry : columnMapArray) { + String[] mapEntry = StringUtils.split(columnMapEntry, "="); + columnMapping.put(mapEntry[0].trim(), mapEntry[1].trim()); + } + } + } + String getNativeColumn(String col) { + String retCol = columnMapping.get(col); + return retCol != null ? retCol : col; + } + } + + private Map<String, NativeTableInfo> aliasToNativeTableInfo = new HashMap<>(); /** * Replace with underlying storage. * - * @param tree the AST tree + * @param metastoreConf the metastore configuration */ - protected void replaceWithUnderlyingStorage(HiveConf metastoreConf, ASTNode tree) { + protected void replaceWithUnderlyingStorage(HiveConf metastoreConf) { + replaceDBAndTableNames(metastoreConf, fromAST); + if (aliasToNativeTableInfo.isEmpty()) { + return; + } + replaceColumnNames(selectAST); + replaceColumnNames(fromAST); + replaceColumnNames(whereAST); + replaceColumnNames(groupByAST); + replaceColumnNames(orderByAST); + replaceColumnNames(havingAST); + } + // Replace Lens database names with storage's proper DB and table name based + // on table properties. + protected void replaceDBAndTableNames(HiveConf metastoreConf, ASTNode tree) { if (tree == null) { return; } - if (TOK_TABNAME == tree.getToken().getType()) { - // If it has two children, the first one is the DB name and second one is - // table identifier - // Else, we have to add the DB name as the first child - try { - if (tree.getChildCount() == 2) { - ASTNode dbIdentifier = (ASTNode) tree.getChild(0); - ASTNode tableIdentifier = (ASTNode) tree.getChild(1); - String lensTable = dbIdentifier.getText() + "." + tableIdentifier.getText(); - Table tbl = CubeMetastoreClient.getInstance(metastoreConf).getHiveTable(lensTable); - String table = getUnderlyingTableName(tbl); - String db = getUnderlyingDBName(tbl); - - // Replace both table and db names - if ("default".equalsIgnoreCase(db)) { - // Remove the db name for this case - tree.deleteChild(0); - } else if (StringUtils.isNotBlank(db)) { - dbIdentifier.getToken().setText(db); - } // If db is empty, then leave the tree untouched - - if (StringUtils.isNotBlank(table)) { - tableIdentifier.getToken().setText(table); - } - } else { - ASTNode tableIdentifier = (ASTNode) tree.getChild(0); - String lensTable = tableIdentifier.getText(); - Table tbl = CubeMetastoreClient.getInstance(metastoreConf).getHiveTable(lensTable); - String table = getUnderlyingTableName(tbl); - // Replace table name - if (StringUtils.isNotBlank(table)) { - tableIdentifier.getToken().setText(table); - } + if (TOK_TABREF == tree.getToken().getType()) { + // TOK_TABREF will have TOK_TABNAME as first child and alias as second child. + String alias; + String tblName = null; + Table tbl = null; + ASTNode tabNameChild = (ASTNode) tree.getChild(0); + if (TOK_TABNAME == tabNameChild.getToken().getType()) { + // If it has two children, the first one is the DB name and second one is + // table identifier + // Else, we have to add the DB name as the first child + try { + if (tabNameChild.getChildCount() == 2) { + ASTNode dbIdentifier = (ASTNode) tabNameChild.getChild(0); + ASTNode tableIdentifier = (ASTNode) tabNameChild.getChild(1); + tblName = tableIdentifier.getText(); + String lensTable = dbIdentifier.getText() + "." + tblName; + tbl = CubeMetastoreClient.getInstance(metastoreConf).getHiveTable(lensTable); + String table = getUnderlyingTableName(tbl); + String db = getUnderlyingDBName(tbl); + + // Replace both table and db names + if ("default".equalsIgnoreCase(db)) { + // Remove the db name for this case + tabNameChild.deleteChild(0); + } else if (StringUtils.isNotBlank(db)) { + dbIdentifier.getToken().setText(db); + } // If db is empty, then leave the tree untouched + + if (StringUtils.isNotBlank(table)) { + tableIdentifier.getToken().setText(table); + } + } else { + ASTNode tableIdentifier = (ASTNode) tabNameChild.getChild(0); + tblName = tableIdentifier.getText(); + tbl = CubeMetastoreClient.getInstance(metastoreConf).getHiveTable(tblName); + String table = getUnderlyingTableName(tbl); + // Replace table name + if (StringUtils.isNotBlank(table)) { + tableIdentifier.getToken().setText(table); + } - // Add db name as a new child - String dbName = getUnderlyingDBName(tbl); - if (StringUtils.isNotBlank(dbName) && !"default".equalsIgnoreCase(dbName)) { - ASTNode dbIdentifier = new ASTNode(new CommonToken(HiveParser.Identifier, dbName)); - dbIdentifier.setParent(tree); - tree.insertChild(0, dbIdentifier); + // Add db name as a new child + String dbName = getUnderlyingDBName(tbl); + if (StringUtils.isNotBlank(dbName) && !"default".equalsIgnoreCase(dbName)) { + ASTNode dbIdentifier = new ASTNode(new CommonToken(HiveParser.Identifier, dbName)); + dbIdentifier.setParent(tabNameChild); + tabNameChild.insertChild(0, dbIdentifier); + } + } + } catch (HiveException e) { + log.warn("No corresponding table in metastore:", e); + } + } + if (tree.getChildCount() == 2) { + alias = tree.getChild(1).getText(); + } else { + alias = tblName; + } + if (StringUtils.isNotBlank(alias)) { + alias = alias.toLowerCase(); + if (!aliasToNativeTableInfo.containsKey(alias)) { + if (tbl != null) { + aliasToNativeTableInfo.put(alias, new NativeTableInfo(tbl)); } } - } catch (HiveException e) { - log.warn("No corresponding table in metastore:", e); } } else { for (int i = 0; i < tree.getChildCount(); i++) { - replaceWithUnderlyingStorage(metastoreConf, (ASTNode) tree.getChild(i)); + replaceDBAndTableNames(metastoreConf, (ASTNode) tree.getChild(i)); + } + } + } + + void replaceColumnNames(ASTNode node) { + if (node == null) { + return; + } + int nodeType = node.getToken().getType(); + if (nodeType == HiveParser.DOT) { + ASTNode tabident = HQLParser.findNodeByPath(node, TOK_TABLE_OR_COL, Identifier); + ASTNode colIdent = (ASTNode) node.getChild(1); + String column = colIdent.getText().toLowerCase(); + String alias = tabident.getText().toLowerCase(); + if (aliasToNativeTableInfo.get(alias) != null) { + colIdent.getToken().setText(aliasToNativeTableInfo.get(alias).getNativeColumn(column)); + } + } else { + // recurse down + for (int i = 0; i < node.getChildCount(); i++) { + ASTNode child = (ASTNode) node.getChild(i); + replaceColumnNames(child); } } } http://git-wip-us.apache.org/repos/asf/lens/blob/3563aacf/lens-driver-jdbc/src/test/java/org/apache/lens/driver/jdbc/TestColumnarSQLRewriter.java ---------------------------------------------------------------------- diff --git a/lens-driver-jdbc/src/test/java/org/apache/lens/driver/jdbc/TestColumnarSQLRewriter.java b/lens-driver-jdbc/src/test/java/org/apache/lens/driver/jdbc/TestColumnarSQLRewriter.java index 3415a1e..db09a4b 100644 --- a/lens-driver-jdbc/src/test/java/org/apache/lens/driver/jdbc/TestColumnarSQLRewriter.java +++ b/lens-driver-jdbc/src/test/java/org/apache/lens/driver/jdbc/TestColumnarSQLRewriter.java @@ -886,7 +886,7 @@ public class TestColumnarSQLRewriter { System.out.println(joinTreeBeforeRewrite); // Rewrite - rewriter.replaceWithUnderlyingStorage(hconf, rewriter.fromAST); + rewriter.replaceWithUnderlyingStorage(hconf); String joinTreeAfterRewrite = HQLParser.getString(rewriter.fromAST); System.out.println("joinTreeAfterRewrite:" + joinTreeAfterRewrite); @@ -914,7 +914,7 @@ public class TestColumnarSQLRewriter { System.out.println(joinTreeBeforeRewrite); // Rewrite - rewriter.replaceWithUnderlyingStorage(hconf, rewriter.fromAST); + rewriter.replaceWithUnderlyingStorage(hconf); joinTreeAfterRewrite = HQLParser.getString(rewriter.fromAST); System.out.println(joinTreeAfterRewrite); @@ -933,7 +933,7 @@ public class TestColumnarSQLRewriter { rewriter.query = defaultQuery; rewriter.analyzeInternal(conf, hconf); joinTreeBeforeRewrite = HQLParser.getString(rewriter.fromAST); - rewriter.replaceWithUnderlyingStorage(hconf, rewriter.fromAST); + rewriter.replaceWithUnderlyingStorage(hconf); joinTreeAfterRewrite = HQLParser.getString(rewriter.fromAST); assertTrue(joinTreeBeforeRewrite.contains("examples"), joinTreeBeforeRewrite); assertFalse(joinTreeAfterRewrite.contains("examples"), joinTreeAfterRewrite); @@ -949,23 +949,104 @@ public class TestColumnarSQLRewriter { } /** + * Test replace column mapping. + * + * @throws Exception the exception + */ + @Test + public void testReplaceColumnMapping() throws Exception { + SessionState.start(hconf); + String testDB = "testrcm"; + + // Create test table + Database database = new Database(); + database.setName(testDB); + + Hive.get(hconf).createDatabase(database); + try { + SessionState.get().setCurrentDatabase(testDB); + Map<String, String> columnMap = new HashMap<>(); + columnMap.put("id", "id1"); + columnMap.put("name", "name1"); + createTable(hconf, testDB, "mytable", "testDB", "testTable_1", false, columnMap); + columnMap.put("id", "id2"); + columnMap.put("name", "name2"); + createTable(hconf, testDB, "mytable_2", "testDB", "testTable_2", false, columnMap); + columnMap.put("id", "id3"); + columnMap.put("name", "name3"); + createTable(hconf, "default", "mytable_3", "testDB", "testTable_3", false, columnMap); + + String query = "SELECT t1.id, t2.id, t3.id, t1.name, t2.name, t3.name, count(1) FROM " + testDB + + ".mytable t1 JOIN mytable_2 t2 ON t1.t2id = t2.id left outer join default.mytable_3 t3 on t2.t3id = t3.id" + + " WHERE t1.id = 100 GROUP BY t2.id HAVING count(t1.id) > 2 ORDER BY t3.id"; + + ColumnarSQLRewriter rewriter = new ColumnarSQLRewriter(); + rewriter.init(conf); + rewriter.ast = HQLParser.parseHQL(query, hconf); + rewriter.query = query; + rewriter.analyzeInternal(conf, hconf); + + // Rewrite + rewriter.replaceWithUnderlyingStorage(hconf); + String fromStringAfterRewrite = HQLParser.getString(rewriter.fromAST); + log.info("fromStringAfterRewrite:{}", fromStringAfterRewrite); + + assertEquals(HQLParser.getString(rewriter.getSelectAST()).trim(), "( t1 . id1 ), ( t2 . id2 ), ( t3 . id3 )," + + " ( t1 . name1 ), ( t2 . name2 ), ( t3 . name3 ), count( 1 )", + "Found :" + HQLParser.getString(rewriter.getSelectAST())); + assertEquals(HQLParser.getString(rewriter.getWhereAST()).trim(), "(( t1 . id1 ) = 100 )", + "Found: " + HQLParser.getString(rewriter.getWhereAST())); + assertEquals(HQLParser.getString(rewriter.getGroupByAST()).trim(), "( t2 . id2 )", + "Found: " + HQLParser.getString(rewriter.getGroupByAST())); + assertEquals(HQLParser.getString(rewriter.getOrderByAST()).trim(), "t3 . id3 asc", + "Found: " + HQLParser.getString(rewriter.getOrderByAST())); + assertEquals(HQLParser.getString(rewriter.getHavingAST()).trim(), "(count(( t1 . id1 )) > 2 )", + "Found: " + HQLParser.getString(rewriter.getHavingAST())); + assertTrue(fromStringAfterRewrite.contains("( t1 . t2id ) = ( t2 . id2 )") + && fromStringAfterRewrite.contains("( t2 . t3id ) = ( t3 . id3 )"), fromStringAfterRewrite); + assertFalse(fromStringAfterRewrite.contains(testDB), fromStringAfterRewrite); + assertTrue(fromStringAfterRewrite.contains("testdb"), fromStringAfterRewrite); + assertTrue(fromStringAfterRewrite.contains("testtable_1") && fromStringAfterRewrite.contains("testtable_2") + && fromStringAfterRewrite.contains("testtable_3"), fromStringAfterRewrite); + } finally { + Hive.get().dropTable("default", "mytable_3", true, true); + Hive.get().dropDatabase(testDB, true, true, true); + SessionState.get().setCurrentDatabase("default"); + } + } + + void createTable(HiveConf conf, String db, String table, String udb, String utable) throws Exception { + createTable(conf, db, table, udb, utable, true, null); + } + + /** * Creates the table. * * @param db the db * @param table the table * @param udb the udb * @param utable the utable + * @param setCustomSerde whether to set custom serde or not + * @param columnMapping columnmapping for the table + * * @throws Exception the exception */ - void createTable(HiveConf conf, String db, String table, String udb, String utable) throws Exception { + void createTable(HiveConf conf, String db, String table, String udb, String utable, boolean setCustomSerde, + Map<String, String> columnMapping) throws Exception { Table tbl1 = new Table(db, table); - tbl1.setSerializationLib("DatabaseJarSerde"); + if (setCustomSerde) { + tbl1.setSerializationLib("DatabaseJarSerde"); + } if (StringUtils.isNotBlank(udb)) { tbl1.setProperty(LensConfConstants.NATIVE_DB_NAME, udb); } if (StringUtils.isNotBlank(utable)) { tbl1.setProperty(LensConfConstants.NATIVE_TABLE_NAME, utable); } + if (columnMapping != null && !columnMapping.isEmpty()) { + tbl1.setProperty(LensConfConstants.NATIVE_TABLE_COLUMN_MAPPING, StringUtils.join(columnMapping.entrySet(), ",")); + log.info("columnMapping property:{}", tbl1.getProperty(LensConfConstants.NATIVE_TABLE_COLUMN_MAPPING)); + } List<FieldSchema> columns = new ArrayList<FieldSchema>(); columns.add(new FieldSchema("id", "int", "col1")); http://git-wip-us.apache.org/repos/asf/lens/blob/3563aacf/lens-server-api/src/main/java/org/apache/lens/server/api/LensConfConstants.java ---------------------------------------------------------------------- diff --git a/lens-server-api/src/main/java/org/apache/lens/server/api/LensConfConstants.java b/lens-server-api/src/main/java/org/apache/lens/server/api/LensConfConstants.java index 720825a..fb11f93 100644 --- a/lens-server-api/src/main/java/org/apache/lens/server/api/LensConfConstants.java +++ b/lens-server-api/src/main/java/org/apache/lens/server/api/LensConfConstants.java @@ -403,6 +403,11 @@ public final class LensConfConstants { public static final String NATIVE_TABLE_NAME = METASTORE_PFX + "native.table.name"; /** + * The property name for setting the column mapping, if column names in native table are different + */ + public static final String NATIVE_TABLE_COLUMN_MAPPING = METASTORE_PFX + "native.table.column.mapping"; + + /** * The Constant ES_INDEX_NAME. */ public static final String ES_INDEX_NAME = METASTORE_PFX + "es.index.name";
