This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git

commit d06e10c29832f886ca15ecd83a4ca65bfd6d4d85
Author: lrhkobe <[email protected]>
AuthorDate: Thu Dec 19 09:22:47 2019 +0800

    [ISSUE #487] Jdbc source connector support syncing data with white ta… 
(#488)
    
    * [ISSUE #487] Jdbc source connector support syncing data with white table 
column value
    
    * [ISSUE #487] Jdbc source connector support syncing data with white table 
column value
---
 .../rocketmq/connect/jdbc/schema/Database.java     |  6 ++-
 .../rocketmq/connect/jdbc/schema/Schema.java       | 16 +++----
 .../apache/rocketmq/connect/jdbc/schema/Table.java | 11 +++++
 .../rocketmq/connect/jdbc/source/Querier.java      | 56 +++++++++++++++-------
 4 files changed, 63 insertions(+), 26 deletions(-)

diff --git 
a/src/main/java/org/apache/rocketmq/connect/jdbc/schema/Database.java 
b/src/main/java/org/apache/rocketmq/connect/jdbc/schema/Database.java
index 49e28cd..33a9a22 100644
--- a/src/main/java/org/apache/rocketmq/connect/jdbc/schema/Database.java
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/schema/Database.java
@@ -40,9 +40,12 @@ public class Database {
 
     public Set<String> tableWhiteList;
 
-    public Database(String name, Connection connection, Set<String> 
tableWhiteList) {
+    public Map<String, Map<String, String>> tableFilterMap;
+
+    public Database(String name, Connection connection, Set<String> 
tableWhiteList, Map<String, Map<String, String>> tableFilterMap) {
         this.name = name;
         this.connection = connection;
+        this.tableFilterMap = tableFilterMap;
         this.tableWhiteList = tableWhiteList;
     }
 
@@ -73,6 +76,7 @@ public class Database {
                 table.addCol(colName);
                 table.addParser(columnParser);
                 table.addRawDataType(dataType);
+                table.setFilterMap(tableFilterMap.get(tableName));
             }
 
         } finally {
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/schema/Schema.java 
b/src/main/java/org/apache/rocketmq/connect/jdbc/schema/Schema.java
index 16d636f..1cfaf2c 100644
--- a/src/main/java/org/apache/rocketmq/connect/jdbc/schema/Schema.java
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/schema/Schema.java
@@ -34,18 +34,18 @@ public class Schema {
         Arrays.asList(new String[] {"information_schema", "mysql", 
"performance_schema", "sys"})
     );
 
-    public Set<String> dataBaseWhiteList;
-
-    public Set<String> tableWhiteList;
-
     private Connection connection;
 
     private Map<String, Database> dbMap;
 
+    public Map<String, Set<String>> dbTableMap;
+
+    public Map<String, Map<String, String>> tableFilterMap;
+
     public Schema(Connection connection) {
         this.connection = connection;
-        this.dataBaseWhiteList = new HashSet<>();
-        this.tableWhiteList = new HashSet<>();
+        this.dbTableMap = new HashMap<>();
+        this.tableFilterMap = new HashMap<>();
     }
 
     public void load() throws SQLException {
@@ -61,8 +61,8 @@ public class Schema {
 
             while (rs.next()) {
                 String dbName = rs.getString(1);
-                if (!IGNORED_DATABASES.contains(dbName) && 
dataBaseWhiteList.contains(dbName)) {
-                    Database database = new Database(dbName, connection, 
tableWhiteList);
+                if (!IGNORED_DATABASES.contains(dbName) && 
dbTableMap.keySet().contains(dbName)) {
+                    Database database = new Database(dbName, connection, 
dbTableMap.get(dbName), tableFilterMap);
                     dbMap.put(dbName, database);
                 }
             }
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/schema/Table.java 
b/src/main/java/org/apache/rocketmq/connect/jdbc/schema/Table.java
index 8c9a42d..891fb9a 100644
--- a/src/main/java/org/apache/rocketmq/connect/jdbc/schema/Table.java
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/schema/Table.java
@@ -18,8 +18,11 @@
 package org.apache.rocketmq.connect.jdbc.schema;
 
 import org.apache.rocketmq.connect.jdbc.schema.column.ColumnParser;
+
+import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 
 public class Table {
 
@@ -29,6 +32,7 @@ public class Table {
     private List<ColumnParser> parserList = new LinkedList<>();
     private List<String> rawDataTypeList = new LinkedList<>();
     private List<Object> dataList = new LinkedList<>();
+    private Map<String, String> filterMap = new HashMap<>();
 
     public Table(String database, String table) {
         this.database = database;
@@ -87,4 +91,11 @@ public class Table {
         this.colList = colList;
     }
 
+    public Map<String, String> getFilterMap() {
+        return filterMap;
+    }
+
+    public void setFilterMap(Map<String, String> filterMap) {
+        this.filterMap = filterMap;
+    }
 }
\ No newline at end of file
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/source/Querier.java 
b/src/main/java/org/apache/rocketmq/connect/jdbc/source/Querier.java
index 8da3f21..d2544f9 100644
--- a/src/main/java/org/apache/rocketmq/connect/jdbc/source/Querier.java
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/source/Querier.java
@@ -1,5 +1,6 @@
 package org.apache.rocketmq.connect.jdbc.source;
 
+import com.alibaba.fastjson.JSONObject;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.connect.jdbc.config.Config;
 import org.apache.rocketmq.connect.jdbc.schema.Database;
@@ -91,7 +92,7 @@ public class Querier {
     public void poll()  {
         try {
             PreparedStatement stmt;
-            String query = "select * from ";
+            StringBuilder query = new StringBuilder("select * from ");
             LinkedList<Table> tableLinkedList = new LinkedList<>();
             for (Map.Entry<String, Database> entry : 
schema.getDbMap().entrySet()) {
                 String db = entry.getKey();
@@ -99,19 +100,35 @@ public class Querier {
                 while (iterator.hasNext()) {
                     Map.Entry<String, Table> tableEntry = iterator.next();
                     String tb = tableEntry.getKey();
+                    query.append(db + "." + tb);
+                    Table t = tableEntry.getValue();
+                    Map<String, String> tableFilterMap = t.getFilterMap();
+                    if (tableFilterMap != null && 
!tableFilterMap.keySet().contains("NO-FILTER")){
+                        query = query.append(" where ");
+                        int count = 0;
+                        for (String key : tableFilterMap.keySet()){
+                            count ++;
+                            String value = tableFilterMap.get(key);
+                            if (count != 1){
+                                query.append(" and ");
+                            }
+                            String condition = key + "=" + "'" + value + "'";
+                            query.append(condition);
+                        }
+                    }
                     stmt = connection.prepareStatement(query + db + "." + tb);
                     ResultSet rs;
                     rs = stmt.executeQuery();
                     List<String> colList = tableEntry.getValue().getColList();
-                    List<String> DataTypeList = 
tableEntry.getValue().getRawDataTypeList();
-                    List<ColumnParser> ParserList = 
tableEntry.getValue().getParserList();
+                    List<String> dataTypeList = 
tableEntry.getValue().getRawDataTypeList();
+                    List<ColumnParser> parserList = 
tableEntry.getValue().getParserList();
 
                     while (rs.next()) {
                         Table table = new Table(db, tb);
                         //System.out.print("|");
                         table.setColList(colList);
-                        table.setRawDataTypeList(DataTypeList);
-                        table.setParserList(ParserList);
+                        table.setRawDataTypeList(dataTypeList);
+                        table.setParserList(parserList);
 
                         for (String string : colList) {
                             table.getDataList().add(rs.getObject(string));
@@ -132,18 +149,23 @@ public class Querier {
 
     public void start() throws Exception {
         String whiteDataBases = config.getWhiteDataBase();
-        String whiteTables = config.getWhiteTable();
-
-        if (!StringUtils.isEmpty(whiteDataBases)) {
-            
Arrays.asList(whiteDataBases.trim().split(",")).forEach(whiteDataBase -> {
-                Collections.addAll(schema.dataBaseWhiteList, whiteDataBase);
-            });
-        }
-
-        if (!StringUtils.isEmpty(whiteTables)) {
-            Arrays.asList(whiteTables.trim().split(",")).forEach(whiteTable -> 
{
-                Collections.addAll(schema.tableWhiteList, whiteTable);
-            });
+        JSONObject whiteDataBaseObject = 
JSONObject.parseObject(whiteDataBases);
+
+        if (whiteDataBaseObject != null){
+            for (String whiteDataBaseName : whiteDataBaseObject.keySet()){
+                JSONObject whiteTableObject = 
(JSONObject)whiteDataBaseObject.get(whiteDataBaseName);
+                HashSet<String> whiteTableSet = new HashSet<>();
+                for (String whiteTableName : whiteTableObject.keySet()){
+                    Collections.addAll(whiteTableSet, whiteTableName);
+                    HashMap<String, String> filterMap = new HashMap<>();
+                    JSONObject tableFilterObject = 
(JSONObject)whiteTableObject.get(whiteTableName);
+                    for(String filterKey : tableFilterObject.keySet()){
+                        filterMap.put(filterKey, 
tableFilterObject.getString(filterKey));
+                    }
+                    schema.tableFilterMap.put(whiteTableName, filterMap);
+                }
+                schema.dbTableMap.put(whiteDataBaseName, whiteTableSet);
+            }
         }
         schema.load();
         log.info("load schema success");

Reply via email to