This is an automated email from the ASF dual-hosted git repository. zhoubo pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
commit d06e10c29832f886ca15ecd83a4ca65bfd6d4d85 Author: lrhkobe <[email protected]> AuthorDate: Thu Dec 19 09:22:47 2019 +0800 [ISSUE #487] Jdbc source connector support syncing data with white ta… (#488) * [ISSUE #487] Jdbc source connector support syncing data with white table column value * [ISSUE #487] Jdbc source connector support syncing data with white table column value --- .../rocketmq/connect/jdbc/schema/Database.java | 6 ++- .../rocketmq/connect/jdbc/schema/Schema.java | 16 +++---- .../apache/rocketmq/connect/jdbc/schema/Table.java | 11 +++++ .../rocketmq/connect/jdbc/source/Querier.java | 56 +++++++++++++++------- 4 files changed, 63 insertions(+), 26 deletions(-) diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/schema/Database.java b/src/main/java/org/apache/rocketmq/connect/jdbc/schema/Database.java index 49e28cd..33a9a22 100644 --- a/src/main/java/org/apache/rocketmq/connect/jdbc/schema/Database.java +++ b/src/main/java/org/apache/rocketmq/connect/jdbc/schema/Database.java @@ -40,9 +40,12 @@ public class Database { public Set<String> tableWhiteList; - public Database(String name, Connection connection, Set<String> tableWhiteList) { + public Map<String, Map<String, String>> tableFilterMap; + + public Database(String name, Connection connection, Set<String> tableWhiteList, Map<String, Map<String, String>> tableFilterMap) { this.name = name; this.connection = connection; + this.tableFilterMap = tableFilterMap; this.tableWhiteList = tableWhiteList; } @@ -73,6 +76,7 @@ public class Database { table.addCol(colName); table.addParser(columnParser); table.addRawDataType(dataType); + table.setFilterMap(tableFilterMap.get(tableName)); } } finally { diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/schema/Schema.java b/src/main/java/org/apache/rocketmq/connect/jdbc/schema/Schema.java index 16d636f..1cfaf2c 100644 --- a/src/main/java/org/apache/rocketmq/connect/jdbc/schema/Schema.java +++ b/src/main/java/org/apache/rocketmq/connect/jdbc/schema/Schema.java @@ -34,18 +34,18 @@ public class Schema { Arrays.asList(new String[] {"information_schema", "mysql", "performance_schema", "sys"}) ); - public Set<String> dataBaseWhiteList; - - public Set<String> tableWhiteList; - private Connection connection; private Map<String, Database> dbMap; + public Map<String, Set<String>> dbTableMap; + + public Map<String, Map<String, String>> tableFilterMap; + public Schema(Connection connection) { this.connection = connection; - this.dataBaseWhiteList = new HashSet<>(); - this.tableWhiteList = new HashSet<>(); + this.dbTableMap = new HashMap<>(); + this.tableFilterMap = new HashMap<>(); } public void load() throws SQLException { @@ -61,8 +61,8 @@ public class Schema { while (rs.next()) { String dbName = rs.getString(1); - if (!IGNORED_DATABASES.contains(dbName) && dataBaseWhiteList.contains(dbName)) { - Database database = new Database(dbName, connection, tableWhiteList); + if (!IGNORED_DATABASES.contains(dbName) && dbTableMap.keySet().contains(dbName)) { + Database database = new Database(dbName, connection, dbTableMap.get(dbName), tableFilterMap); dbMap.put(dbName, database); } } diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/schema/Table.java b/src/main/java/org/apache/rocketmq/connect/jdbc/schema/Table.java index 8c9a42d..891fb9a 100644 --- a/src/main/java/org/apache/rocketmq/connect/jdbc/schema/Table.java +++ b/src/main/java/org/apache/rocketmq/connect/jdbc/schema/Table.java @@ -18,8 +18,11 @@ package org.apache.rocketmq.connect.jdbc.schema; import org.apache.rocketmq.connect.jdbc.schema.column.ColumnParser; + +import java.util.HashMap; import java.util.LinkedList; import java.util.List; +import java.util.Map; public class Table { @@ -29,6 +32,7 @@ public class Table { private List<ColumnParser> parserList = new LinkedList<>(); private List<String> rawDataTypeList = new LinkedList<>(); private List<Object> dataList = new LinkedList<>(); + private Map<String, String> filterMap = new HashMap<>(); public Table(String database, String table) { this.database = database; @@ -87,4 +91,11 @@ public class Table { this.colList = colList; } + public Map<String, String> getFilterMap() { + return filterMap; + } + + public void setFilterMap(Map<String, String> filterMap) { + this.filterMap = filterMap; + } } \ No newline at end of file diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/source/Querier.java b/src/main/java/org/apache/rocketmq/connect/jdbc/source/Querier.java index 8da3f21..d2544f9 100644 --- a/src/main/java/org/apache/rocketmq/connect/jdbc/source/Querier.java +++ b/src/main/java/org/apache/rocketmq/connect/jdbc/source/Querier.java @@ -1,5 +1,6 @@ package org.apache.rocketmq.connect.jdbc.source; +import com.alibaba.fastjson.JSONObject; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.connect.jdbc.config.Config; import org.apache.rocketmq.connect.jdbc.schema.Database; @@ -91,7 +92,7 @@ public class Querier { public void poll() { try { PreparedStatement stmt; - String query = "select * from "; + StringBuilder query = new StringBuilder("select * from "); LinkedList<Table> tableLinkedList = new LinkedList<>(); for (Map.Entry<String, Database> entry : schema.getDbMap().entrySet()) { String db = entry.getKey(); @@ -99,19 +100,35 @@ public class Querier { while (iterator.hasNext()) { Map.Entry<String, Table> tableEntry = iterator.next(); String tb = tableEntry.getKey(); + query.append(db + "." + tb); + Table t = tableEntry.getValue(); + Map<String, String> tableFilterMap = t.getFilterMap(); + if (tableFilterMap != null && !tableFilterMap.keySet().contains("NO-FILTER")){ + query = query.append(" where "); + int count = 0; + for (String key : tableFilterMap.keySet()){ + count ++; + String value = tableFilterMap.get(key); + if (count != 1){ + query.append(" and "); + } + String condition = key + "=" + "'" + value + "'"; + query.append(condition); + } + } stmt = connection.prepareStatement(query + db + "." + tb); ResultSet rs; rs = stmt.executeQuery(); List<String> colList = tableEntry.getValue().getColList(); - List<String> DataTypeList = tableEntry.getValue().getRawDataTypeList(); - List<ColumnParser> ParserList = tableEntry.getValue().getParserList(); + List<String> dataTypeList = tableEntry.getValue().getRawDataTypeList(); + List<ColumnParser> parserList = tableEntry.getValue().getParserList(); while (rs.next()) { Table table = new Table(db, tb); //System.out.print("|"); table.setColList(colList); - table.setRawDataTypeList(DataTypeList); - table.setParserList(ParserList); + table.setRawDataTypeList(dataTypeList); + table.setParserList(parserList); for (String string : colList) { table.getDataList().add(rs.getObject(string)); @@ -132,18 +149,23 @@ public class Querier { public void start() throws Exception { String whiteDataBases = config.getWhiteDataBase(); - String whiteTables = config.getWhiteTable(); - - if (!StringUtils.isEmpty(whiteDataBases)) { - Arrays.asList(whiteDataBases.trim().split(",")).forEach(whiteDataBase -> { - Collections.addAll(schema.dataBaseWhiteList, whiteDataBase); - }); - } - - if (!StringUtils.isEmpty(whiteTables)) { - Arrays.asList(whiteTables.trim().split(",")).forEach(whiteTable -> { - Collections.addAll(schema.tableWhiteList, whiteTable); - }); + JSONObject whiteDataBaseObject = JSONObject.parseObject(whiteDataBases); + + if (whiteDataBaseObject != null){ + for (String whiteDataBaseName : whiteDataBaseObject.keySet()){ + JSONObject whiteTableObject = (JSONObject)whiteDataBaseObject.get(whiteDataBaseName); + HashSet<String> whiteTableSet = new HashSet<>(); + for (String whiteTableName : whiteTableObject.keySet()){ + Collections.addAll(whiteTableSet, whiteTableName); + HashMap<String, String> filterMap = new HashMap<>(); + JSONObject tableFilterObject = (JSONObject)whiteTableObject.get(whiteTableName); + for(String filterKey : tableFilterObject.keySet()){ + filterMap.put(filterKey, tableFilterObject.getString(filterKey)); + } + schema.tableFilterMap.put(whiteTableName, filterMap); + } + schema.dbTableMap.put(whiteDataBaseName, whiteTableSet); + } } schema.load(); log.info("load schema success");
