This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git

commit 182fd469bc8e9dc89bb62a23be66549a33e93967
Author: Eason Chen <[email protected]>
AuthorDate: Tue Nov 19 23:22:01 2019 +0800

    [ISSUE #441] Add Jdbc Sink Connector (#442)
---
 .../rocketmq/connect/jdbc/common/DBUtils.java      | 211 +++++++++++++++++++++
 .../rocketmq/connect/jdbc/{ => config}/Config.java | 119 +++++-------
 .../rocketmq/connect/jdbc/config/ConfigUtil.java   |  52 +++++
 .../connect/jdbc/connector/JdbcSinkConnector.java  |  58 ++++++
 .../connect/jdbc/connector/JdbcSinkTask.java       | 133 +++++++++++++
 .../jdbc/connector/JdbcSourceConnector.java        |   2 +-
 .../connect/jdbc/connector/JdbcSourceTask.java     |  62 +++---
 .../rocketmq/connect/jdbc/schema/Database.java     |  43 +++--
 .../rocketmq/connect/jdbc/schema/Schema.java       |  51 +++--
 .../jdbc/schema/column/DateTimeColumnParser.java   |   2 +-
 .../apache/rocketmq/connect/jdbc/sink/Updater.java | 196 +++++++++++++++++++
 .../rocketmq/connect/jdbc/source/JdbcUtils.java    | 198 -------------------
 .../rocketmq/connect/jdbc/source/Querier.java      | 186 ++++++------------
 .../jdbc/source/TimestampIncrementingQuerier.java  |  24 ++-
 14 files changed, 854 insertions(+), 483 deletions(-)

diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/common/DBUtils.java 
b/src/main/java/org/apache/rocketmq/connect/jdbc/common/DBUtils.java
new file mode 100644
index 0000000..ccee96b
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/common/DBUtils.java
@@ -0,0 +1,211 @@
+
+/**
+ * Copyright 2015 Confluent Inc.
+ * <p>
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.rocketmq.connect.jdbc.common;
+
+import com.alibaba.druid.pool.DruidDataSourceFactory;
+import org.apache.rocketmq.connect.jdbc.config.Config;
+import org.apache.rocketmq.connect.jdbc.connector.JdbcSourceTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.sql.DataSource;
+import java.sql.*;
+import java.text.SimpleDateFormat;
+import java.util.*;
+import java.util.Date;
+
+/**
+ * Utilties for interacting with a JDBC database.
+ */
+public class DBUtils {
+
+    private static final Logger log = 
LoggerFactory.getLogger(JdbcSourceTask.class);
+
+    /**
+     * The default table types to include when listing tables if none are 
specified. Valid values
+     * are those specified by the @{java.sql.DatabaseMetaData#getTables} 
method's TABLE_TYPE column.
+     * The default only includes standard, user-defined tables.
+     */
+    public static final Set<String> DEFAULT_TABLE_TYPES = 
Collections.unmodifiableSet(
+            new HashSet<String>(Arrays.asList("TABLE"))
+    );
+
+    private static final int GET_TABLES_TYPE_COLUMN = 4;
+    private static final int GET_TABLES_NAME_COLUMN = 3;
+
+    private static final int GET_COLUMNS_COLUMN_NAME = 4;
+    private static final int GET_COLUMNS_IS_NULLABLE = 18;
+    private static final int GET_COLUMNS_IS_AUTOINCREMENT = 23;
+
+
+    private static ThreadLocal<SimpleDateFormat> DATE_FORMATTER = new 
ThreadLocal<SimpleDateFormat>() {
+        @Override
+        protected SimpleDateFormat initialValue() {
+            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd 
HH:mm:ss.SSS");
+            sdf.setTimeZone(TimeZone.getTimeZone("UTC"));
+            return sdf;
+        }
+    };
+
+    /**
+     * Get a list of tables in the database. This uses the default filters, 
which only include
+     * user-defined tables.
+     * @param conn database connection
+     * @return a list of tables
+     * @throws SQLException
+     */
+    public static List<String> getTables(Connection conn) throws SQLException {
+        return getTables(conn, DEFAULT_TABLE_TYPES);
+    }
+
+    /**
+     * Get a list of table names in the database.
+     * @param conn database connection
+     * @param types a set of table types that should be included in the results
+     * @throws SQLException
+     */
+    public static List<String> getTables(Connection conn, Set<String> types) 
throws SQLException {
+        DatabaseMetaData metadata = conn.getMetaData();
+        ResultSet rs = metadata.getTables(null, null, "%", null);
+        List<String> tableNames = new ArrayList<String>();
+        while (rs.next()) {
+            if (types.contains(rs.getString(GET_TABLES_TYPE_COLUMN))) {
+                String colName = rs.getString(GET_TABLES_NAME_COLUMN);
+                // SQLite JDBC driver does not correctly mark these as system 
tables
+                if (metadata.getDatabaseProductName().equals("SQLite") && 
colName.startsWith("sqlite_")) {
+                    continue;
+                }
+
+                tableNames.add(colName);
+            }
+        }
+        return tableNames;
+    }
+
+    /**
+     * Look up the autoincrement column for the specified table.
+     * @param conn database connection
+     * @param table the table to
+     * @return the name of the column that is an autoincrement column, or null 
if there is no
+     *         autoincrement column or more than one exists
+     * @throws SQLException
+     */
+    public static String getAutoincrementColumn(Connection conn, String table) 
throws SQLException {
+        String result = null;
+        int matches = 0;
+
+        ResultSet rs = conn.getMetaData().getColumns(null, null, table, "%");
+        // Some database drivers (SQLite) don't include all the columns
+        if (rs.getMetaData().getColumnCount() >= GET_COLUMNS_IS_AUTOINCREMENT) 
{
+            while (rs.next()) {
+                if (rs.getString(GET_COLUMNS_IS_AUTOINCREMENT).equals("YES")) {
+                    result = rs.getString(GET_COLUMNS_COLUMN_NAME);
+                    matches++;
+                }
+            }
+            return (matches == 1 ? result : null);
+        }
+
+        // Fallback approach is to query for a single row. This unfortunately 
does not work with any
+        // empty table
+        log.trace("Falling back to SELECT detection of auto-increment column 
for {}:{}", conn, table);
+        Statement stmt = conn.createStatement();
+        try {
+            String quoteString = getIdentifierQuoteString(conn);
+            rs = stmt.executeQuery("SELECT * FROM " + quoteString + table + 
quoteString + " LIMIT 1");
+            ResultSetMetaData rsmd = rs.getMetaData();
+            for (int i = 1; i < rsmd.getColumnCount(); i++) {
+                if (rsmd.isAutoIncrement(i)) {
+                    result = rsmd.getColumnName(i);
+                    matches++;
+                }
+            }
+        } finally {
+            rs.close();
+            stmt.close();
+        }
+        return (matches == 1 ? result : null);
+    }
+
+    public static boolean isColumnNullable(Connection conn, String table, 
String column)
+            throws SQLException {
+        ResultSet rs = conn.getMetaData().getColumns(null, null, table, 
column);
+        if (rs.getMetaData().getColumnCount() > GET_COLUMNS_IS_NULLABLE) {
+            // Should only be one match
+            if (!rs.next()) {
+                return false;
+            }
+            String val = rs.getString(GET_COLUMNS_IS_NULLABLE);
+            return rs.getString(GET_COLUMNS_IS_NULLABLE).equals("YES");
+        }
+
+        return false;
+    }
+
+    /**
+     * Format the given Date assuming UTC timezone in a format supported by 
SQL.
+     * @param date the date to convert to a String
+     * @return the formatted string
+     */
+    public static String formatUTC(Date date) {
+        return DATE_FORMATTER.get().format(date);
+    }
+
+    /**
+     * Get the string used for quoting identifiers in this database's SQL 
dialect.
+     * @param connection the database connection
+     * @return the quote string
+     * @throws SQLException
+     */
+    public static String getIdentifierQuoteString(Connection connection) 
throws SQLException {
+        String quoteString = 
connection.getMetaData().getIdentifierQuoteString();
+        quoteString = quoteString == null ? "" : quoteString;
+        return quoteString;
+    }
+
+    /**
+     * Quote the given string.
+     * @param orig the string to quote
+     * @param quote the quote character
+     * @return the quoted string
+     */
+    public static String quoteString(String orig, String quote) {
+        return quote + orig + quote;
+    }
+
+    public static DataSource initDataSource(Config config) throws Exception {
+        Map<String, String> map = new HashMap<>();
+        map.put("driverClassName", "com.mysql.cj.jdbc.Driver");
+        map.put("url",
+                "jdbc:mysql://" + config.getJdbcUrl() + 
"?useSSL=true&verifyServerCertificate=false&serverTimezone=GMT%2B8&characterEncoding=utf8");
+        map.put("username", config.getJdbcUsername());
+        map.put("password", config.getJdbcPassword());
+        map.put("initialSize", "1");
+        map.put("maxActive", "1");
+        map.put("maxWait", "60000");
+        map.put("timeBetweenEvictionRunsMillis", "60000");
+        map.put("minEvictableIdleTimeMillis", "300000");
+        map.put("validationQuery", "SELECT 1 FROM DUAL");
+        map.put("testWhileIdle", "true");
+        log.info("{} config read successful", map);
+        DataSource dataSource = DruidDataSourceFactory.createDataSource(map);
+        log.info("init data source success");
+        return dataSource;
+    }
+}
+
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/Config.java 
b/src/main/java/org/apache/rocketmq/connect/jdbc/config/Config.java
similarity index 65%
rename from src/main/java/org/apache/rocketmq/connect/jdbc/Config.java
rename to src/main/java/org/apache/rocketmq/connect/jdbc/config/Config.java
index f93c4db..5491d43 100644
--- a/src/main/java/org/apache/rocketmq/connect/jdbc/Config.java
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/config/Config.java
@@ -15,13 +15,11 @@
  * limitations under the License.
  */
 
-package org.apache.rocketmq.connect.jdbc;
+package org.apache.rocketmq.connect.jdbc.config;
 
-import io.openmessaging.KeyValue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.lang.reflect.Method;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
@@ -30,35 +28,37 @@ public class Config {
     private static final Logger LOG = LoggerFactory.getLogger(Config.class);
 
     /* Database Connection Config */
-    public String jdbcUrl = "localhost:3306";
-    public String jdbcUsername = "root";
-    public String jdbcPassword = "199812160";
-    public String rocketmqTopic;
-    public String jdbcBackoff;
-    public String jdbcAttempts;
-    public String catalogPattern = null;
-    public List tableWhitelist;
-    public List tableBlacklist;
-    public String schemaPattern = null;
-    public boolean numericPrecisionMapping = false;
-    public String bumericMapping = null;
-    public String dialectName = "";
+    private String jdbcUrl;
+    private String jdbcUsername;
+    private String jdbcPassword;
+    private String rocketmqTopic;
+    private String jdbcBackoff;
+    private String jdbcAttempts;
+    private String catalogPattern;
+    private List tableWhitelist;
+    private List tableBlacklist;
+    private String schemaPattern;
+    private boolean numericPrecisionMapping = false;
+    private String bumericMapping;
+    private String dialectName = "";
+    private String whiteDataBase;
+    private String whiteTable;
 
     /* Mode Config */
-    public String mode = "";
-    public String incrementingColumnName = "";
-    public String query = "";
-    public String timestampColmnName = "";
-    public boolean validateNonNull = true;
+    private String mode = "";
+    private String incrementingColumnName = "";
+    private String query = "";
+    private String timestampColmnName = "";
+    private boolean validateNonNull = true;
 
     /*Connector config*/
-    public String tableTypes = "table";
-    public long pollInterval = 5000;
-    public int batchMaxRows = 100;
-    public long tablePollInterval = 60000;
-    public long timestampDelayInterval = 0;
-    public String dbTimezone = "UTC";
-    public String queueName;
+    private String tableTypes = "table";
+    private long pollInterval = 5000;
+    private int batchMaxRows = 100;
+    private long tablePollInterval = 60000;
+    private long timestampDelayInterval = 0;
+    private String dbTimezone = "UTC";
+    private String queueName;
 
     private Logger log = LoggerFactory.getLogger(Config.class);
     public static final Set<String> REQUEST_CONFIG = new HashSet<String>() {
@@ -71,53 +71,6 @@ public class Config {
         }
     };
 
-
-    public void load(KeyValue props) {
-        log.info("Config.load.start");
-        properties2Object(props, this);
-    }
-
-    private void properties2Object(final KeyValue p, final Object object) {
-        Method[] methods = object.getClass().getMethods();
-        for (Method method : methods) {
-            String mn = method.getName();
-            if (mn.startsWith("set")) {
-                try {
-                    String tmp = mn.substring(4);
-                    String first = mn.substring(3, 4);
-
-                    String key = first.toLowerCase() + tmp;
-                    String property = p.getString(key);
-                    if (property != null) {
-                        Class<?>[] pt = method.getParameterTypes();
-                        if (pt != null && pt.length > 0) {
-                            String cn = pt[0].getSimpleName();
-                            Object arg = null;
-                            if (cn.equals("int") || cn.equals("Integer")) {
-                                arg = Integer.parseInt(property);
-                            } else if (cn.equals("long") || cn.equals("Long")) 
{
-                                arg = Long.parseLong(property);
-                            } else if (cn.equals("double") || 
cn.equals("Double")) {
-                                arg = Double.parseDouble(property);
-                            } else if (cn.equals("boolean") || 
cn.equals("Boolean")) {
-                                arg = Boolean.parseBoolean(property);
-                            } else if (cn.equals("float") || 
cn.equals("Float")) {
-                                arg = Float.parseFloat(property);
-                            } else if (cn.equals("String")) {
-                                arg = property;
-                            } else {
-                                continue;
-                            }
-                            method.invoke(object, arg);
-
-                        }
-                    }
-                } catch (Throwable ignored) {
-                }
-            }
-        }
-    }
-
     public String getQueueName() {
         return queueName;
     }
@@ -317,4 +270,20 @@ public class Config {
     public void setDbTimezone(String dbTimezone) {
         this.dbTimezone = dbTimezone;
     }
+
+    public String getWhiteDataBase() {
+        return whiteDataBase;
+    }
+
+    public void setWhiteDataBase(String whiteDataBase) {
+        this.whiteDataBase = whiteDataBase;
+    }
+
+    public String getWhiteTable() {
+        return whiteTable;
+    }
+
+    public void setWhiteTable(String whiteTable) {
+        this.whiteTable = whiteTable;
+    }
 }
\ No newline at end of file
diff --git 
a/src/main/java/org/apache/rocketmq/connect/jdbc/config/ConfigUtil.java 
b/src/main/java/org/apache/rocketmq/connect/jdbc/config/ConfigUtil.java
new file mode 100644
index 0000000..53563f2
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/config/ConfigUtil.java
@@ -0,0 +1,52 @@
+package org.apache.rocketmq.connect.jdbc.config;
+
+import io.openmessaging.KeyValue;
+import java.lang.reflect.Method;
+
+public class ConfigUtil {
+    public static <T> void load(KeyValue props, Object object) {
+
+        properties2Object(props, object);
+    }
+
+    private static <T> void properties2Object(final KeyValue p, final Object 
object) {
+
+        Method[] methods = object.getClass().getMethods();
+        for (Method method : methods) {
+            String mn = method.getName();
+            if (mn.startsWith("set")) {
+                try {
+                    String tmp = mn.substring(4);
+                    String first = mn.substring(3, 4);
+
+                    String key = first.toLowerCase() + tmp;
+                    String property = p.getString(key);
+                    if (property != null) {
+                        Class<?>[] pt = method.getParameterTypes();
+                        if (pt != null && pt.length > 0) {
+                            String cn = pt[0].getSimpleName();
+                            Object arg;
+                            if (cn.equals("int") || cn.equals("Integer")) {
+                                arg = Integer.parseInt(property);
+                            } else if (cn.equals("long") || cn.equals("Long")) 
{
+                                arg = Long.parseLong(property);
+                            } else if (cn.equals("double") || 
cn.equals("Double")) {
+                                arg = Double.parseDouble(property);
+                            } else if (cn.equals("boolean") || 
cn.equals("Boolean")) {
+                                arg = Boolean.parseBoolean(property);
+                            } else if (cn.equals("float") || 
cn.equals("Float")) {
+                                arg = Float.parseFloat(property);
+                            } else if (cn.equals("String")) {
+                                arg = property;
+                            } else {
+                                continue;
+                            }
+                            method.invoke(object, arg);
+                        }
+                    }
+                } catch (Throwable ignored) {
+                }
+            }
+        }
+    }
+}
diff --git 
a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSinkConnector.java
 
b/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSinkConnector.java
new file mode 100644
index 0000000..e1d1256
--- /dev/null
+++ 
b/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSinkConnector.java
@@ -0,0 +1,58 @@
+package org.apache.rocketmq.connect.jdbc.connector;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.Task;
+import io.openmessaging.connector.api.sink.SinkConnector;
+import org.apache.rocketmq.connect.jdbc.config.Config;
+
+import java.util.ArrayList;
+import java.util.List;
+
+
+public class JdbcSinkConnector extends SinkConnector {
+
+    private KeyValue config;
+
+    @Override
+    public String verifyAndSetConfig(KeyValue config) {
+        for (String requestKey : Config.REQUEST_CONFIG) {
+            if (!config.containsKey(requestKey)) {
+                return "Request config key: " + requestKey;
+            }
+        }
+        this.config = config;
+        return "";
+    }
+
+    @Override
+    public void start() {
+
+    }
+
+    @Override
+    public void stop() {
+
+    }
+
+    @Override
+    public void pause() {
+
+    }
+
+    @Override
+    public void resume() {
+
+    }
+
+    @Override
+    public Class<? extends Task> taskClass() {
+        return JdbcSinkTask.class;
+    }
+
+    @Override
+    public List<KeyValue> taskConfigs() {
+        List<KeyValue> config = new ArrayList<>();
+        config.add(this.config);
+        return config;
+    }
+}
diff --git 
a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSinkTask.java 
b/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSinkTask.java
new file mode 100644
index 0000000..4b55e5a
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSinkTask.java
@@ -0,0 +1,133 @@
+package org.apache.rocketmq.connect.jdbc.connector;
+
+
+import com.alibaba.fastjson.JSONObject;
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.common.QueueMetaData;
+import io.openmessaging.connector.api.data.EntryType;
+import io.openmessaging.connector.api.data.Field;
+import io.openmessaging.connector.api.data.Schema;
+import io.openmessaging.connector.api.data.SinkDataEntry;
+import io.openmessaging.connector.api.sink.SinkTask;
+import org.apache.rocketmq.connect.jdbc.config.Config;
+import org.apache.rocketmq.connect.jdbc.common.DBUtils;
+import org.apache.rocketmq.connect.jdbc.config.ConfigUtil;
+import org.apache.rocketmq.connect.jdbc.sink.Updater;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.sql.DataSource;
+import java.sql.Connection;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+public class JdbcSinkTask extends SinkTask {
+
+    private static final Logger log = 
LoggerFactory.getLogger(JdbcSinkTask.class);
+
+    private Config config;
+    private DataSource dataSource;
+    private Connection connection;
+    private Updater updater;
+    private BlockingQueue<Updater> tableQueue = new 
LinkedBlockingQueue<Updater>();
+
+    public JdbcSinkTask() {
+        this.config = new Config();
+    }
+
+    @Override
+    public void put(Collection<SinkDataEntry> sinkDataEntries) {
+        try {
+            if (tableQueue.size() > 1) {
+                updater = tableQueue.poll(1000, TimeUnit.MILLISECONDS);
+            } else {
+                updater = tableQueue.peek();
+            }
+
+            for (SinkDataEntry record : sinkDataEntries) {
+                Map<Field, Object[]> fieldMap = new HashMap<>();
+                Object[] payloads = record.getPayload();
+                Schema schema = record.getSchema();
+                EntryType entryType = record.getEntryType();
+                String tableName = schema.getName();
+                String dbName = schema.getDataSource();
+                List<Field> fields = schema.getFields();
+                Boolean parseError = false;
+                if (!fields.isEmpty()) {
+                    for (Field field : fields) {
+                        Object fieldValue = payloads[field.getIndex()];
+                        Object[] value = 
JSONObject.parseArray((String)fieldValue).toArray();
+                        if (value.length == 2) {
+                            fieldMap.put(field, value);
+                        } else {
+                            log.error("parseArray error, fieldValue:{}", 
fieldValue);
+                            parseError = true;
+                        }
+                    }
+                }
+                if (!parseError) {
+                    Boolean isSuccess = updater.push(dbName, tableName, 
fieldMap, entryType);
+                    if (!isSuccess) {
+                        log.error("push data error, dbName:{}, tableName:{}, 
entryType:{}, fieldMap:{}", dbName, tableName, fieldMap, entryType);
+                    }
+                }
+            }
+        } catch (Exception e) {
+            log.error("put sinkDataEntries error, {}", e);
+        }
+    }
+
+    @Override
+    public void commit(Map<QueueMetaData, Long> map) {
+
+    }
+
+    @Override
+    public void start(KeyValue props) {
+        try {
+            ConfigUtil.load(props, this.config);
+            dataSource = DBUtils.initDataSource(config);
+            connection = dataSource.getConnection();
+            log.info("init data source success");
+        } catch (Exception e) {
+            log.error("Cannot start Jdbc Sink Task because of configuration 
error{}", e);
+        }
+        String mode = config.getMode();
+        if (mode.equals("bulk")) {
+            Updater updater = new Updater(config, connection);
+            try {
+                updater.start();
+                tableQueue.add(updater);
+            } catch (Exception e) {
+                log.error("fail to start updater{}", e);
+            }
+        }
+    }
+
+    @Override
+    public void stop() {
+        try {
+            if (connection != null){
+                connection.close();
+            }
+        } catch (Throwable e) {
+            log.warn("sink task stop error while closing connection to {}", 
"jdbc", e);
+        }
+    }
+
+    @Override
+    public void pause() {
+
+    }
+
+    @Override
+    public void resume() {
+
+    }
+
+}
diff --git 
a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConnector.java
 
b/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConnector.java
index 4a870c0..796f0e6 100644
--- 
a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConnector.java
+++ 
b/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConnector.java
@@ -20,7 +20,7 @@ package org.apache.rocketmq.connect.jdbc.connector;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.rocketmq.connect.jdbc.Config;
+import org.apache.rocketmq.connect.jdbc.config.Config;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git 
a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java 
b/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java
index 767c3aa..943d432 100644
--- 
a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java
+++ 
b/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java
@@ -21,19 +21,19 @@ package org.apache.rocketmq.connect.jdbc.connector;
 import io.openmessaging.connector.api.source.SourceTask;
 
 import java.nio.ByteBuffer;
-import java.sql.SQLException;
+import java.sql.Connection;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Timer;
-import java.util.TimerTask;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.rocketmq.connect.jdbc.Config;
+import org.apache.rocketmq.connect.jdbc.config.Config;
+import org.apache.rocketmq.connect.jdbc.common.DBUtils;
+import org.apache.rocketmq.connect.jdbc.config.ConfigUtil;
 import org.apache.rocketmq.connect.jdbc.schema.Table;
 import org.apache.rocketmq.connect.jdbc.source.Querier;
 import org.apache.rocketmq.connect.jdbc.source.TimestampIncrementingQuerier;
@@ -53,17 +53,27 @@ import com.alibaba.fastjson.JSONObject;
 import io.openmessaging.connector.api.data.DataEntryBuilder;
 import io.openmessaging.connector.api.data.Field;
 
+import javax.sql.DataSource;
+
 public class JdbcSourceTask extends SourceTask {
 
     private static final Logger log = 
LoggerFactory.getLogger(JdbcSourceTask.class);
 
     private Config config;
 
+    private DataSource dataSource;
+
+    private Connection connection;
+
     BlockingQueue<Querier> tableQueue = new LinkedBlockingQueue<Querier>();
     static final String INCREMENTING_FIELD = "incrementing";
     static final String TIMESTAMP_FIELD = "timestamp";
     private Querier querier;
 
+    public JdbcSourceTask() {
+        this.config = new Config();
+    }
+
     @Override
     public Collection<SourceDataEntry> poll() {
         List<SourceDataEntry> res = new ArrayList<>();
@@ -96,42 +106,45 @@ public class JdbcSourceTask extends SourceTask {
                 }
                 DataEntryBuilder dataEntryBuilder = new 
DataEntryBuilder(schema);
                 
dataEntryBuilder.timestamp(System.currentTimeMillis()).queue(dataRow.getName())
-                        .entryType(EntryType.CREATE);
+                        .entryType(EntryType.UPDATE);
                 for (int i = 0; i < dataRow.getColList().size(); i++) {
-                    Object value = dataRow.getDataList().get(i);
-                    // System.out.println(dataRow.getColList().get(i) + "|" + 
value);
-                    dataEntryBuilder.putFiled(dataRow.getColList().get(i), 
value);
+                    Object[] value = new Object[2];
+                    value[0] = value[1] = dataRow.getDataList().get(i);
+                    dataEntryBuilder.putFiled(dataRow.getColList().get(i), 
JSONObject.toJSONString(value));
                 }
 
                 SourceDataEntry sourceDataEntry = 
dataEntryBuilder.buildSourceDataEntry(
-                        ByteBuffer.wrap(config.jdbcUrl.getBytes("UTF-8")),
+                        ByteBuffer.wrap(config.getJdbcUrl().getBytes("UTF-8")),
                         
ByteBuffer.wrap(jsonObject.toJSONString().getBytes("UTF-8")));
                 res.add(sourceDataEntry);
-
+                log.debug("sourceDataEntry : {}", 
JSONObject.toJSONString(sourceDataEntry));
             }
         } catch (Exception e) {
             log.error("JDBC task poll error, current config:" + 
JSON.toJSONString(config), e);
         }
-        log.info("dataEntry poll successfully,{}", res);
+        log.debug("dataEntry poll successfully,{}", 
JSONObject.toJSONString(res));
         return res;
     }
 
     @Override
     public void start(KeyValue props) {
-        config = new Config();
-        config.load(props);
-        
+        try {
+            ConfigUtil.load(props, this.config);
+            dataSource = DBUtils.initDataSource(config);
+            connection = dataSource.getConnection();
+            log.info("init data source success");
+        } catch (Exception e) {
+            log.error("Cannot start Jdbc Source Task because of configuration 
error{}", e);
+        }
         Map<Map<String, String>, Map<String, Object>> offsets = null;
-        String mode = config.mode;
+        String mode = config.getMode();
         if (mode.equals("bulk")) {
-            Querier querier = new Querier();
+            Querier querier = new Querier(config, connection);
             try {
-                querier.setConfig(config);
                 querier.start();
                 tableQueue.add(querier);
             } catch (Exception e) {
-                // TODO Auto-generated catch block
-                log.error("Start unsuccessfully Because of {}",e);
+                log.error("start querier failed in bulk mode{}", e);
             }
         } else {
             TimestampIncrementingQuerier querier = new 
TimestampIncrementingQuerier();
@@ -140,8 +153,7 @@ public class JdbcSourceTask extends SourceTask {
                 querier.start();
                 tableQueue.add(querier);
             } catch (Exception e) {
-                // TODO Auto-generated catch block
-                log.error("Start unsuccessfully Because of {}",e);
+                log.error("fail to start querier{}", e);
             }
 
         }
@@ -150,7 +162,13 @@ public class JdbcSourceTask extends SourceTask {
 
     @Override
     public void stop() {
-        querier.stop();
+        try {
+            if (connection != null){
+                connection.close();
+            }
+        } catch (Throwable e) {
+            log.warn("source task stop error while closing connection to {}", 
"jdbc", e);
+        }
     }
 
     @Override
diff --git 
a/src/main/java/org/apache/rocketmq/connect/jdbc/schema/Database.java 
b/src/main/java/org/apache/rocketmq/connect/jdbc/schema/Database.java
index 657ebb8..49e28cd 100644
--- a/src/main/java/org/apache/rocketmq/connect/jdbc/schema/Database.java
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/schema/Database.java
@@ -17,41 +17,41 @@
 
 package org.apache.rocketmq.connect.jdbc.schema;
 
-//import io.openmessaging.mysql.binlog.EventProcessor;
-
 import org.apache.rocketmq.connect.jdbc.schema.column.ColumnParser;
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
-import java.util.HashMap;
-import java.util.Map;
-import javax.sql.DataSource;
+import java.util.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class Database {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(Database.class);
+
     private static final String SQL = "select 
table_name,column_name,data_type,column_type,character_set_name " +
         "from information_schema.columns " +
         "where table_schema = ? order by ORDINAL_POSITION";
     private String name;
-    private DataSource dataSource;
-    public Map<String, Table> tableMap = new HashMap<String, Table>();
 
-    public Database(String name, DataSource dataSource) {
+    private Connection connection;
+
+    private Map<String, Table> tableMap = new HashMap<String, Table>();
+
+    public Set<String> tableWhiteList;
+
+    public Database(String name, Connection connection, Set<String> 
tableWhiteList) {
         this.name = name;
-        this.dataSource = dataSource;
+        this.connection = connection;
+        this.tableWhiteList = tableWhiteList;
     }
 
     public void init() throws SQLException {
-        Connection conn = null;
         PreparedStatement ps = null;
         ResultSet rs = null;
 
         try {
-            conn = dataSource.getConnection();
-
-            ps = conn.prepareStatement(SQL);
+            ps = connection.prepareStatement(SQL);
             ps.setString(1, name);
             rs = ps.executeQuery();
 
@@ -63,7 +63,9 @@ public class Database {
                 String charset = rs.getString(5);
 
                 ColumnParser columnParser = 
ColumnParser.getColumnParser(dataType, colType, charset);
-
+                if (!tableWhiteList.contains(tableName)){
+                    continue;
+                }
                 if (!tableMap.containsKey(tableName)) {
                     addTable(tableName);
                 }
@@ -74,21 +76,20 @@ public class Database {
             }
 
         } finally {
-            if (conn != null) {
-                conn.close();
+            if (rs != null) {
+                rs.close();
             }
             if (ps != null) {
                 ps.close();
             }
-            if (rs != null) {
-                rs.close();
-            }
         }
 
     }
 
     private void addTable(String tableName) {
 
+        LOGGER.info("Schema load -- DATABASE:{},\tTABLE:{}", name, tableName);
+
         Table table = new Table(name, tableName);
         tableMap.put(tableName, table);
     }
@@ -97,4 +98,8 @@ public class Database {
 
         return tableMap.get(tableName);
     }
+
+    public Map<String, Table> getTableMap() {
+        return tableMap;
+    }
 }
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/schema/Schema.java 
b/src/main/java/org/apache/rocketmq/connect/jdbc/schema/Schema.java
index 93b204f..16d636f 100644
--- a/src/main/java/org/apache/rocketmq/connect/jdbc/schema/Schema.java
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/schema/Schema.java
@@ -21,65 +21,59 @@ import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import javax.sql.DataSource;
+import java.util.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class Schema {
+    private static final Logger LOGGER = LoggerFactory.getLogger(Schema.class);
 
     private static final String SQL = "select schema_name from 
information_schema.schemata";
-    //acquiring databases
+
     private static final List<String> IGNORED_DATABASES = new ArrayList<>(
         Arrays.asList(new String[] {"information_schema", "mysql", 
"performance_schema", "sys"})
     );
-    //ignored databases (System Databases)
-    private DataSource dataSource;
 
-    public Map<String, Database> dbMap;
+    public Set<String> dataBaseWhiteList;
+
+    public Set<String> tableWhiteList;
+
+    private Connection connection;
+
+    private Map<String, Database> dbMap;
 
-    public Schema(DataSource dataSource) {
-        this.dataSource = dataSource;
+    public Schema(Connection connection) {
+        this.connection = connection;
+        this.dataBaseWhiteList = new HashSet<>();
+        this.tableWhiteList = new HashSet<>();
     }
 
     public void load() throws SQLException {
 
         dbMap = new HashMap<>();
 
-        Connection conn = null;
         PreparedStatement ps = null;
         ResultSet rs = null;
 
         try {
-            conn = dataSource.getConnection();
-
-            ps = conn.prepareStatement(SQL);
+            ps = connection.prepareStatement(SQL);
             rs = ps.executeQuery();
 
             while (rs.next()) {
                 String dbName = rs.getString(1);
-                if (!IGNORED_DATABASES.contains(dbName)) {
-                    Database database = new Database(dbName, dataSource);
+                if (!IGNORED_DATABASES.contains(dbName) && 
dataBaseWhiteList.contains(dbName)) {
+                    Database database = new Database(dbName, connection, 
tableWhiteList);
                     dbMap.put(dbName, database);
-                    //dbMap存着各个数据库的名字
                 }
             }
 
         } finally {
-
-            if (conn != null) {
-                conn.close();
+            if (rs != null) {
+                rs.close();
             }
             if (ps != null) {
                 ps.close();
             }
-            if (rs != null) {
-                rs.close();
-            }
         }
 
         for (Database db : dbMap.values()) {
@@ -114,8 +108,7 @@ public class Schema {
                 load();
                 break;
             } catch (Exception e) {
-                //           LOGGER.error("Reload schema error.", e);
-                System.out.println("Reload schema error." + e);
+                LOGGER.error("Reload schema error.", e);
             }
         }
     }
@@ -123,4 +116,8 @@ public class Schema {
     public void reset() {
         dbMap = null;
     }
+
+    public Map<String, Database> getDbMap() {
+        return dbMap;
+    }
 }
diff --git 
a/src/main/java/org/apache/rocketmq/connect/jdbc/schema/column/DateTimeColumnParser.java
 
b/src/main/java/org/apache/rocketmq/connect/jdbc/schema/column/DateTimeColumnParser.java
index 8736280..c9b39e3 100644
--- 
a/src/main/java/org/apache/rocketmq/connect/jdbc/schema/column/DateTimeColumnParser.java
+++ 
b/src/main/java/org/apache/rocketmq/connect/jdbc/schema/column/DateTimeColumnParser.java
@@ -30,7 +30,7 @@ public class DateTimeColumnParser extends ColumnParser {
     static {
         dateTimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
         dateTimeUtcFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
-        dateTimeUtcFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
+        dateTimeUtcFormat.setTimeZone(TimeZone.getTimeZone("GMT+8"));
     }
 
     @Override
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/sink/Updater.java 
b/src/main/java/org/apache/rocketmq/connect/jdbc/sink/Updater.java
new file mode 100644
index 0000000..3571852
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/sink/Updater.java
@@ -0,0 +1,196 @@
+package org.apache.rocketmq.connect.jdbc.sink;
+
+import io.openmessaging.connector.api.data.EntryType;
+import io.openmessaging.connector.api.data.Field;
+import io.openmessaging.connector.api.data.FieldType;
+import org.apache.rocketmq.connect.jdbc.config.Config;
+import org.apache.rocketmq.connect.jdbc.schema.Schema;
+import org.apache.rocketmq.connect.jdbc.schema.column.DateTimeColumnParser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+public class Updater {
+    private final Logger log = LoggerFactory.getLogger(getClass());
+    private final Queue<Connection> connections = new 
ConcurrentLinkedQueue<>();
+    private Config config;
+    private Schema schema;
+    private Connection connection;
+
+    public Updater(Config config, Connection connection) {
+        this.config = config;
+        this.connection = connection;
+        this.schema = new Schema(connection);
+    }
+
+    public boolean push(String dbName, String tableName, Map<Field, Object[]> 
fieldMap, EntryType entryType) {
+        Boolean isSuccess = false;
+        int id = 0;
+        switch (entryType) {
+            case CREATE:
+                isSuccess = updateRow(dbName, tableName, fieldMap, id);
+                break;
+            case UPDATE:
+                id = queryRowId(dbName, tableName, fieldMap);
+                isSuccess = updateRow(dbName, tableName, fieldMap, id);
+                break;
+            case DELETE:
+                id = queryRowId(dbName, tableName, fieldMap);
+                isSuccess = deleteRow(dbName, tableName, id);
+                break;
+            default:
+                log.error("entryType {} is illegal.", entryType.toString());
+        }
+        return isSuccess;
+    }
+
+    public void start() throws Exception {
+        schema.load();
+        log.info("schema load success");
+    }
+
+    public Config getConfig() {
+        return config;
+    }
+
+    public void setConfig(Config config) {
+        this.config = config;
+    }
+
+    private String typeParser(FieldType fieldType, String fieldName, Object 
fieldValue, String sql) {
+        switch (fieldType) {
+            case STRING:
+                sql += fieldName + " = " + "'" + fieldValue + "'";
+                break;
+            case DATETIME:
+                sql += fieldName + " = " + "'" + new 
DateTimeColumnParser().getValue(fieldValue) + "'";
+                break;
+            case INT32:
+            case INT64:
+            case FLOAT32:
+            case FLOAT64:
+            case BIG_INTEGER:
+                sql += fieldName + " = " + fieldValue;
+                break;
+            default:
+                log.error("fieldType {} is illegal.", fieldType.toString());
+        }
+        return sql;
+    }
+
+    private Integer queryRowId(String dbName, String tableName, Map<Field, 
Object[]> fieldMap) {
+        int count = 0, id = 0;
+        ResultSet rs;
+        PreparedStatement stmt;
+        Boolean finishQuery = false;
+        String query = "select id from " + dbName + "." + tableName + " where 
";
+
+        for (Map.Entry<Field, Object[]> entry : fieldMap.entrySet()) {
+            count ++;
+            String fieldName = entry.getKey().getName();
+            FieldType fieldType = entry.getKey().getType();
+            Object fieldValue = entry.getValue()[0];
+            if ("id".equals(fieldName))
+                continue;
+            if (count != 1) {
+                query += " and ";
+            }
+            if (fieldValue == null)
+            {
+                query += fieldName + " is NULL";
+            } else {
+                query = typeParser(fieldType, fieldName, fieldValue, query);
+            }
+        }
+
+        try {
+            while (!connection.isClosed() && !finishQuery){
+                stmt = connection.prepareStatement(query);
+                rs = stmt.executeQuery();
+                if (rs != null) {
+                    while (rs.next()) {
+                        id = rs.getInt("id");
+                    }
+                    finishQuery = true;
+                    rs.close();
+                }
+            }
+        } catch (SQLException e) {
+            log.error("query table error,{}", e);
+        }
+        return id;
+    }
+
+    private Boolean updateRow(String dbName, String tableName, Map<Field, 
Object[]> fieldMap, Integer id) {
+        int count = 0;
+        PreparedStatement stmt;
+        boolean finishUpdate = false;
+        String update = "replace into " + dbName + "." + tableName + " set ";
+
+        for (Map.Entry<Field, Object[]> entry : fieldMap.entrySet()) {
+            count++;
+            String fieldName = entry.getKey().getName();
+            FieldType fieldType = entry.getKey().getType();
+            Object fieldValue = entry.getValue()[1];
+            if ("id".equals(fieldName)) {
+                if (id == 0)
+                    continue;
+                else
+                    fieldValue = id;
+            }
+            if (count != 1) {
+                update += ", ";
+            }
+            if (fieldValue == null) {
+                update += fieldName + " = NULL";
+            } else {
+                update = typeParser(fieldType, fieldName, fieldValue, update);
+            }
+        }
+
+        try {
+            while (!connection.isClosed() && !finishUpdate){
+                stmt = connection.prepareStatement(update);
+                int result = stmt.executeUpdate();
+                if (result > 0) {
+                    log.info("replace into table success");
+                    return true;
+                }
+                finishUpdate = true;
+                stmt.close();
+            }
+        } catch (SQLException e) {
+            log.error("update table error,{}", e);
+        }
+        return false;
+    }
+
+    private Boolean deleteRow(String dbName, String tableName, Integer id) {
+        PreparedStatement stmt;
+        String delete = "delete from " + dbName + "." + tableName + " where id 
= " + id ;
+        boolean finishDelete = false;
+        try {
+            while (!connection.isClosed() && !finishDelete){
+                stmt = connection.prepareStatement(delete);
+                int result = stmt.executeUpdate();
+                if (result > 0) {
+                    log.info("delete from table success");
+                    return true;
+                }
+                finishDelete = true;
+                stmt.close();
+            }
+        } catch (SQLException e) {
+            log.error("delete from table error,{}", e);
+        }
+        return false;
+    }
+
+}
diff --git 
a/src/main/java/org/apache/rocketmq/connect/jdbc/source/JdbcUtils.java 
b/src/main/java/org/apache/rocketmq/connect/jdbc/source/JdbcUtils.java
deleted file mode 100644
index cbcca6a..0000000
--- a/src/main/java/org/apache/rocketmq/connect/jdbc/source/JdbcUtils.java
+++ /dev/null
@@ -1,198 +0,0 @@
-
-/**
- * Copyright 2015 Confluent Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.rocketmq.connect.jdbc.source;
-import org.apache.rocketmq.connect.jdbc.connector.JdbcSourceTask;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.sql.Connection;
-import java.sql.DatabaseMetaData;
-import java.sql.ResultSet;
-import java.sql.ResultSetMetaData;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.TimeZone;
-
-/**
- * Utilties for interacting with a JDBC database.
- */
-public class JdbcUtils {
-
-  private static final Logger log = 
LoggerFactory.getLogger(JdbcSourceTask.class);
-
-  /**
-   * The default table types to include when listing tables if none are 
specified. Valid values
-   * are those specified by the @{java.sql.DatabaseMetaData#getTables} 
method's TABLE_TYPE column.
-   * The default only includes standard, user-defined tables.
-   */
-  public static final Set<String> DEFAULT_TABLE_TYPES = 
Collections.unmodifiableSet(
-      new HashSet<String>(Arrays.asList("TABLE"))
-  );
-
-  private static final int GET_TABLES_TYPE_COLUMN = 4;
-  private static final int GET_TABLES_NAME_COLUMN = 3;
-
-  private static final int GET_COLUMNS_COLUMN_NAME = 4;
-  private static final int GET_COLUMNS_IS_NULLABLE = 18;
-  private static final int GET_COLUMNS_IS_AUTOINCREMENT = 23;
-
-
-  private static ThreadLocal<SimpleDateFormat> DATE_FORMATTER = new 
ThreadLocal<SimpleDateFormat>() {
-    @Override
-    protected SimpleDateFormat initialValue() {
-      SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
-      sdf.setTimeZone(TimeZone.getTimeZone("UTC"));
-      return sdf;
-    }
-  };
-
-  /**
-   * Get a list of tables in the database. This uses the default filters, 
which only include
-   * user-defined tables.
-   * @param conn database connection
-   * @return a list of tables
-   * @throws SQLException
-   */
-  public static List<String> getTables(Connection conn) throws SQLException {
-    return getTables(conn, DEFAULT_TABLE_TYPES);
-  }
-
-  /**
-   * Get a list of table names in the database.
-   * @param conn database connection
-   * @param types a set of table types that should be included in the results
-   * @throws SQLException
-   */
-  public static List<String> getTables(Connection conn, Set<String> types) 
throws SQLException {
-    DatabaseMetaData metadata = conn.getMetaData();
-    ResultSet rs = metadata.getTables(null, null, "%", null);
-    List<String> tableNames = new ArrayList<String>();
-    while (rs.next()) {
-      if (types.contains(rs.getString(GET_TABLES_TYPE_COLUMN))) {
-        String colName = rs.getString(GET_TABLES_NAME_COLUMN);
-        // SQLite JDBC driver does not correctly mark these as system tables
-        if (metadata.getDatabaseProductName().equals("SQLite") && 
colName.startsWith("sqlite_")) {
-          continue;
-        }
-
-        tableNames.add(colName);
-      }
-    }
-    return tableNames;
-  }
-
-  /**
-   * Look up the autoincrement column for the specified table.
-   * @param conn database connection
-   * @param table the table to
-   * @return the name of the column that is an autoincrement column, or null 
if there is no
-   *         autoincrement column or more than one exists
-   * @throws SQLException
-   */
-  public static String getAutoincrementColumn(Connection conn, String table) 
throws SQLException {
-    String result = null;
-    int matches = 0;
-
-    ResultSet rs = conn.getMetaData().getColumns(null, null, table, "%");
-    // Some database drivers (SQLite) don't include all the columns
-    if (rs.getMetaData().getColumnCount() >= GET_COLUMNS_IS_AUTOINCREMENT) {
-      while(rs.next()) {
-        if (rs.getString(GET_COLUMNS_IS_AUTOINCREMENT).equals("YES")) {
-          result = rs.getString(GET_COLUMNS_COLUMN_NAME);
-          matches++;
-        }
-      }
-      return (matches == 1 ? result : null);
-    }
-
-    // Fallback approach is to query for a single row. This unfortunately does 
not work with any
-    // empty table
-    log.trace("Falling back to SELECT detection of auto-increment column for 
{}:{}", conn, table);
-    Statement stmt = conn.createStatement();
-    try {
-      String quoteString = getIdentifierQuoteString(conn);
-      rs = stmt.executeQuery("SELECT * FROM " + quoteString + table + 
quoteString + " LIMIT 1");
-      ResultSetMetaData rsmd = rs.getMetaData();
-      for(int i = 1; i < rsmd.getColumnCount(); i++) {
-        if (rsmd.isAutoIncrement(i)) {
-          result = rsmd.getColumnName(i);
-          matches++;
-        }
-      }
-    } finally {
-      rs.close();
-      stmt.close();
-    }
-    return (matches == 1 ? result : null);
-  }
-
-  public static boolean isColumnNullable(Connection conn, String table, String 
column)
-      throws SQLException {
-    ResultSet rs = conn.getMetaData().getColumns(null, null, table, column);
-    if (rs.getMetaData().getColumnCount() > GET_COLUMNS_IS_NULLABLE) {
-      // Should only be one match
-      if (!rs.next()) {
-        return false;
-      }
-      String val = rs.getString(GET_COLUMNS_IS_NULLABLE);
-      return rs.getString(GET_COLUMNS_IS_NULLABLE).equals("YES");
-    }
-
-    return false;
-  }
-
-  /**
-   * Format the given Date assuming UTC timezone in a format supported by SQL.
-   * @param date the date to convert to a String
-   * @return the formatted string
-   */
-  public static String formatUTC(Date date) {
-    return DATE_FORMATTER.get().format(date);
-  }
-
-  /**
-   * Get the string used for quoting identifiers in this database's SQL 
dialect.
-   * @param connection the database connection
-   * @return the quote string
-   * @throws SQLException
-   */
-  public static String getIdentifierQuoteString(Connection connection) throws 
SQLException {
-    String quoteString = connection.getMetaData().getIdentifierQuoteString();
-    quoteString = quoteString == null ? "" : quoteString;
-    return quoteString;
-  }
-
-  /**
-   * Quote the given string.
-   * @param orig the string to quote
-   * @param quote the quote character
-   * @return the quoted string
-   */
-  public static String quoteString(String orig, String quote) {
-    return quote + orig + quote;
-  }
-}
-
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/source/Querier.java 
b/src/main/java/org/apache/rocketmq/connect/jdbc/source/Querier.java
index 13cbda5..8da3f21 100644
--- a/src/main/java/org/apache/rocketmq/connect/jdbc/source/Querier.java
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/source/Querier.java
@@ -1,104 +1,69 @@
 package org.apache.rocketmq.connect.jdbc.source;
 
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.connect.jdbc.config.Config;
+import org.apache.rocketmq.connect.jdbc.schema.Database;
+import org.apache.rocketmq.connect.jdbc.schema.Schema;
+import org.apache.rocketmq.connect.jdbc.schema.Table;
+import org.apache.rocketmq.connect.jdbc.schema.column.ColumnParser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.sql.Connection;
-import java.sql.DriverManager;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Queue;
+import java.util.*;
 import java.util.concurrent.ConcurrentLinkedQueue;
-import javax.sql.DataSource;
-import org.apache.rocketmq.connect.jdbc.Config;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import com.alibaba.druid.pool.DruidDataSourceFactory;
-import org.apache.rocketmq.connect.jdbc.schema.*;
-import org.apache.rocketmq.connect.jdbc.schema.column.ColumnParser;
 
 public class Querier {
 
-    private final Logger log = LoggerFactory.getLogger(getClass()); // use 
concrete subclass
+    private final Logger log = LoggerFactory.getLogger(Querier.class); // use 
concrete subclass
     protected String topicPrefix;
     protected String jdbcUrl;
     private final Queue<Connection> connections = new 
ConcurrentLinkedQueue<>();
     private Config config;
-
-    /**
-     * @return the config
-     */
-    public Config getConfig() {
-        return config;
-    }
-
-    public void setConfig(Config config) {
-        this.config = config;
-        log.info("config load successfully");
-    }
-
-    private DataSource dataSource;
+    private Connection connection;
     private List<Table> list = new LinkedList<>();
     private String mode;
+    private Schema schema;
 
+    public Querier(){
 
-    public DataSource getDataSource() {
-        return dataSource;
     }
 
-    public void setDataSource(DataSource dataSource) {
-        this.dataSource = dataSource;
+    public Querier(Config config, Connection connection) {
+        this.config = config;
+        this.connection = connection;
+        this.schema = new Schema(connection);
     }
 
-    public String getMode() {
-        return mode;
+    /**
+     * @return the config
+     */
+    public Config getConfig() {
+        return config;
     }
 
-    public void setMode(String mode) {
-        this.mode = mode;
+    public void setConfig(Config config) {
+        this.config = config;
     }
 
+//    public String getMode() {
+//        return mode;
+//    }
+//
+//    public void setMode(String mode) {
+//        this.mode = mode;
+//    }
 
     public List<Table> getList() {
         return list;
     }
 
-    public void setList(List<Table> list) {
-        this.list = list;
-    }
-
-    public Connection getConnection() throws SQLException {
-        // These config names are the same for both source and sink configs ...
-        String username = config.jdbcUsername;
-        String dbPassword = config.jdbcPassword;
-        jdbcUrl = config.jdbcUrl;
-        Properties properties = new Properties();
-        if (username != null) {
-            properties.setProperty("user", username);
-        }
-        if (dbPassword != null) {
-            properties.setProperty("password", dbPassword);
-        }
-        Connection connection = DriverManager.getConnection(jdbcUrl, 
properties);
-
-        connections.add(connection);
-        return connection;
-    }
-
-    public void stop() {
-        Connection conn;
-        while ((conn = connections.poll()) != null) {
-            try {
-                conn.close();
-            } catch (Throwable e) {
-                log.warn("Error while closing connection to {}", "jdbc", e);
-            }
-        }
-    }
+//    public void setList(List<Table> list) {
+//        this.list = list;
+//    }
 
     protected PreparedStatement createDBPreparedStatement(Connection db) 
throws SQLException {
 
@@ -123,32 +88,18 @@ public class Querier {
         return stmt.executeQuery();
     }
 
-    private Schema schema;
-
-    public static void main(String[] args) throws Exception {
-        TimestampIncrementingQuerier querier = new 
TimestampIncrementingQuerier();
-        try {
-            querier.start();
-            querier.poll();
-        } catch (SQLException e) {
-            // TODO Auto-generated catch block
-            e.printStackTrace();
-        }
-    }
-
-    public void poll() {
+    public void poll()  {
         try {
-
             PreparedStatement stmt;
             String query = "select * from ";
-            Connection conn = dataSource.getConnection();
-            for (Map.Entry<String, Database> entry : schema.dbMap.entrySet()) {
+            LinkedList<Table> tableLinkedList = new LinkedList<>();
+            for (Map.Entry<String, Database> entry : 
schema.getDbMap().entrySet()) {
                 String db = entry.getKey();
-                Iterator<Map.Entry<String, Table>> iterator = 
entry.getValue().tableMap.entrySet().iterator();
+                Iterator<Map.Entry<String, Table>> iterator = 
entry.getValue().getTableMap().entrySet().iterator();
                 while (iterator.hasNext()) {
                     Map.Entry<String, Table> tableEntry = iterator.next();
                     String tb = tableEntry.getKey();
-                    stmt = conn.prepareStatement(query + db + "." + tb);
+                    stmt = connection.prepareStatement(query + db + "." + tb);
                     ResultSet rs;
                     rs = stmt.executeQuery();
                     List<String> colList = tableEntry.getValue().getColList();
@@ -157,64 +108,45 @@ public class Querier {
 
                     while (rs.next()) {
                         Table table = new Table(db, tb);
-                        System.out.print("|");
+                        //System.out.print("|");
                         table.setColList(colList);
                         table.setRawDataTypeList(DataTypeList);
                         table.setParserList(ParserList);
 
                         for (String string : colList) {
                             table.getDataList().add(rs.getObject(string));
-                            System.out.print(string + " : " + 
rs.getObject(string) + "|");
+                            //System.out.print(string + " : " + 
rs.getObject(string) + "|");
                         }
-                        list.add(table);
-                        System.out.println();
+                        tableLinkedList.add(table);
                     }
+                    rs.close();
+                    stmt.close();
                 }
             }
-
+            list = tableLinkedList;
         } catch (SQLException e) {
-            e.printStackTrace();
+            log.error("fail to poll data, {}", e);
         }
 
     }
 
     public void start() throws Exception {
-        try {
+        String whiteDataBases = config.getWhiteDataBase();
+        String whiteTables = config.getWhiteTable();
 
-            log.info("datasorce success");
-            initDataSource();
-        } catch (Throwable exception) {
-            log.info("error,{}", exception);
+        if (!StringUtils.isEmpty(whiteDataBases)) {
+            
Arrays.asList(whiteDataBases.trim().split(",")).forEach(whiteDataBase -> {
+                Collections.addAll(schema.dataBaseWhiteList, whiteDataBase);
+            });
         }
-        schema = new Schema(dataSource);
-        schema.load();
-        log.info("schema load successful");
-    }
 
-    private void initDataSource() throws Exception {
-        Map<String, String> map = new HashMap<>();
-
-        map.put("driverClassName", "com.mysql.cj.jdbc.Driver");
-        map.put("url",
-                "jdbc:mysql://" + config.jdbcUrl + 
"?useSSL=true&verifyServerCertificate=false&serverTimezone=GMT%2B8");
-        map.put("username", config.jdbcUsername);
-        map.put("password", config.jdbcPassword);
-        map.put("initialSize", "2");
-        map.put("maxActive", "2");
-        map.put("maxWait", "60000");
-        map.put("timeBetweenEvictionRunsMillis", "60000");
-        map.put("minEvictableIdleTimeMillis", "300000");
-        map.put("validationQuery", "SELECT 1 FROM DUAL");
-        map.put("testWhileIdle", "true");
-        log.info("{} config read successful", map);
-        try {
-            dataSource = DruidDataSourceFactory.createDataSource(map);
-        } catch (Exception exception) {
-            log.info("exeception,{}", exception);
-        } catch (Error e) {
-            log.info("error,{},e", e);
+        if (!StringUtils.isEmpty(whiteTables)) {
+            Arrays.asList(whiteTables.trim().split(",")).forEach(whiteTable -> 
{
+                Collections.addAll(schema.tableWhiteList, whiteTable);
+            });
         }
-        log.info("datasorce success");
+        schema.load();
+        log.info("load schema success");
     }
 
 }
diff --git 
a/src/main/java/org/apache/rocketmq/connect/jdbc/source/TimestampIncrementingQuerier.java
 
b/src/main/java/org/apache/rocketmq/connect/jdbc/source/TimestampIncrementingQuerier.java
index 3201456..964322d 100644
--- 
a/src/main/java/org/apache/rocketmq/connect/jdbc/source/TimestampIncrementingQuerier.java
+++ 
b/src/main/java/org/apache/rocketmq/connect/jdbc/source/TimestampIncrementingQuerier.java
@@ -1,7 +1,6 @@
 package org.apache.rocketmq.connect.jdbc.source;
 
 import java.sql.Connection;
-import java.sql.Date;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
@@ -14,13 +13,12 @@ import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.Queue;
 import java.util.TimeZone;
-import java.util.concurrent.ConcurrentLinkedQueue;
 
 import javax.sql.DataSource;
 
-import org.apache.rocketmq.connect.jdbc.Config;
+import org.apache.rocketmq.connect.jdbc.config.Config;
+import org.apache.rocketmq.connect.jdbc.common.DBUtils;
 import org.apache.rocketmq.connect.jdbc.schema.Database;
 import org.apache.rocketmq.connect.jdbc.schema.Schema;
 import org.apache.rocketmq.connect.jdbc.schema.Table;
@@ -111,7 +109,7 @@ public class TimestampIncrementingQuerier extends Querier {
     protected void createPreparedStatement(Connection conn) throws 
SQLException {
         // Default when unspecified uses an autoincrementing column
         if (incrementingColumn != null && incrementingColumn.isEmpty()) {
-            incrementingColumn = JdbcUtils.getAutoincrementColumn(conn, name);
+            incrementingColumn = DBUtils.getAutoincrementColumn(conn, name);
         }
 
         String quoteString = conn.getMetaData().getIdentifierQuoteString();
@@ -205,7 +203,7 @@ public class TimestampIncrementingQuerier extends Querier {
         log.info("{}·", stmt);
         log.info("{},{}", incrementingOffset, timestampOffset);
         return stmt.executeQuery();
-    }
+    }                       
 
     public List<Table> getList() {
         return list;
@@ -219,10 +217,10 @@ public class TimestampIncrementingQuerier extends Querier 
{
         try {
             list.clear();
             Connection conn = dataSource.getConnection();
-            for (Map.Entry<String, Database> entry : schema.dbMap.entrySet()) {
+            for (Map.Entry<String, Database> entry : 
schema.getDbMap().entrySet()) {
                 String db = entry.getKey();
                 log.info("{} database is loading", db);
-                Iterator<Map.Entry<String, Table>> iterator = 
entry.getValue().tableMap.entrySet().iterator();
+                Iterator<Map.Entry<String, Table>> iterator = 
entry.getValue().getTableMap().entrySet().iterator();
                 while (iterator.hasNext()) {
                     Map.Entry<String, Table> tableEntry = iterator.next();
                     String tb = tableEntry.getKey();
@@ -279,20 +277,20 @@ public class TimestampIncrementingQuerier extends Querier 
{
         } catch (Throwable exception) {
             log.info("error,{}", exception);
         }
-        schema = new Schema(dataSource);
+        schema = new Schema(dataSource.getConnection());
         schema.load();
     }
 
-    private void initDataSource() throws Exception {
+    public void initDataSource() throws Exception {
         Map<String, String> map = new HashMap<>();
         config = super.getConfig();
         timestampColumn = config.getTimestampColmnName();
         incrementingColumn = config.getIncrementingColumnName();
         map.put("driverClassName", "com.mysql.cj.jdbc.Driver");
         map.put("url",
-                "jdbc:mysql://" + config.jdbcUrl + 
"?useSSL=true&verifyServerCertificate=false&serverTimezone=GMT%2B8");
-        map.put("username", config.jdbcUsername);
-        map.put("password", config.jdbcPassword);
+                "jdbc:mysql://" + config.getJdbcUrl() + 
"?useSSL=true&verifyServerCertificate=false&serverTimezone=GMT%2B8");
+        map.put("username", config.getJdbcUsername());
+        map.put("password", config.getJdbcPassword());
         map.put("initialSize", "2");
         map.put("maxActive", "2");
         map.put("maxWait", "60000");

Reply via email to