This is an automated email from the ASF dual-hosted git repository. zhoubo pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
commit 182fd469bc8e9dc89bb62a23be66549a33e93967 Author: Eason Chen <[email protected]> AuthorDate: Tue Nov 19 23:22:01 2019 +0800 [ISSUE #441] Add Jdbc Sink Connector (#442) --- .../rocketmq/connect/jdbc/common/DBUtils.java | 211 +++++++++++++++++++++ .../rocketmq/connect/jdbc/{ => config}/Config.java | 119 +++++------- .../rocketmq/connect/jdbc/config/ConfigUtil.java | 52 +++++ .../connect/jdbc/connector/JdbcSinkConnector.java | 58 ++++++ .../connect/jdbc/connector/JdbcSinkTask.java | 133 +++++++++++++ .../jdbc/connector/JdbcSourceConnector.java | 2 +- .../connect/jdbc/connector/JdbcSourceTask.java | 62 +++--- .../rocketmq/connect/jdbc/schema/Database.java | 43 +++-- .../rocketmq/connect/jdbc/schema/Schema.java | 51 +++-- .../jdbc/schema/column/DateTimeColumnParser.java | 2 +- .../apache/rocketmq/connect/jdbc/sink/Updater.java | 196 +++++++++++++++++++ .../rocketmq/connect/jdbc/source/JdbcUtils.java | 198 ------------------- .../rocketmq/connect/jdbc/source/Querier.java | 186 ++++++------------ .../jdbc/source/TimestampIncrementingQuerier.java | 24 ++- 14 files changed, 854 insertions(+), 483 deletions(-) diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/common/DBUtils.java b/src/main/java/org/apache/rocketmq/connect/jdbc/common/DBUtils.java new file mode 100644 index 0000000..ccee96b --- /dev/null +++ b/src/main/java/org/apache/rocketmq/connect/jdbc/common/DBUtils.java @@ -0,0 +1,211 @@ + +/** + * Copyright 2015 Confluent Inc. + * <p> + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.rocketmq.connect.jdbc.common; + +import com.alibaba.druid.pool.DruidDataSourceFactory; +import org.apache.rocketmq.connect.jdbc.config.Config; +import org.apache.rocketmq.connect.jdbc.connector.JdbcSourceTask; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.sql.DataSource; +import java.sql.*; +import java.text.SimpleDateFormat; +import java.util.*; +import java.util.Date; + +/** + * Utilties for interacting with a JDBC database. + */ +public class DBUtils { + + private static final Logger log = LoggerFactory.getLogger(JdbcSourceTask.class); + + /** + * The default table types to include when listing tables if none are specified. Valid values + * are those specified by the @{java.sql.DatabaseMetaData#getTables} method's TABLE_TYPE column. + * The default only includes standard, user-defined tables. + */ + public static final Set<String> DEFAULT_TABLE_TYPES = Collections.unmodifiableSet( + new HashSet<String>(Arrays.asList("TABLE")) + ); + + private static final int GET_TABLES_TYPE_COLUMN = 4; + private static final int GET_TABLES_NAME_COLUMN = 3; + + private static final int GET_COLUMNS_COLUMN_NAME = 4; + private static final int GET_COLUMNS_IS_NULLABLE = 18; + private static final int GET_COLUMNS_IS_AUTOINCREMENT = 23; + + + private static ThreadLocal<SimpleDateFormat> DATE_FORMATTER = new ThreadLocal<SimpleDateFormat>() { + @Override + protected SimpleDateFormat initialValue() { + SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); + sdf.setTimeZone(TimeZone.getTimeZone("UTC")); + return sdf; + } + }; + + /** + * Get a list of tables in the database. This uses the default filters, which only include + * user-defined tables. + * @param conn database connection + * @return a list of tables + * @throws SQLException + */ + public static List<String> getTables(Connection conn) throws SQLException { + return getTables(conn, DEFAULT_TABLE_TYPES); + } + + /** + * Get a list of table names in the database. + * @param conn database connection + * @param types a set of table types that should be included in the results + * @throws SQLException + */ + public static List<String> getTables(Connection conn, Set<String> types) throws SQLException { + DatabaseMetaData metadata = conn.getMetaData(); + ResultSet rs = metadata.getTables(null, null, "%", null); + List<String> tableNames = new ArrayList<String>(); + while (rs.next()) { + if (types.contains(rs.getString(GET_TABLES_TYPE_COLUMN))) { + String colName = rs.getString(GET_TABLES_NAME_COLUMN); + // SQLite JDBC driver does not correctly mark these as system tables + if (metadata.getDatabaseProductName().equals("SQLite") && colName.startsWith("sqlite_")) { + continue; + } + + tableNames.add(colName); + } + } + return tableNames; + } + + /** + * Look up the autoincrement column for the specified table. + * @param conn database connection + * @param table the table to + * @return the name of the column that is an autoincrement column, or null if there is no + * autoincrement column or more than one exists + * @throws SQLException + */ + public static String getAutoincrementColumn(Connection conn, String table) throws SQLException { + String result = null; + int matches = 0; + + ResultSet rs = conn.getMetaData().getColumns(null, null, table, "%"); + // Some database drivers (SQLite) don't include all the columns + if (rs.getMetaData().getColumnCount() >= GET_COLUMNS_IS_AUTOINCREMENT) { + while (rs.next()) { + if (rs.getString(GET_COLUMNS_IS_AUTOINCREMENT).equals("YES")) { + result = rs.getString(GET_COLUMNS_COLUMN_NAME); + matches++; + } + } + return (matches == 1 ? result : null); + } + + // Fallback approach is to query for a single row. This unfortunately does not work with any + // empty table + log.trace("Falling back to SELECT detection of auto-increment column for {}:{}", conn, table); + Statement stmt = conn.createStatement(); + try { + String quoteString = getIdentifierQuoteString(conn); + rs = stmt.executeQuery("SELECT * FROM " + quoteString + table + quoteString + " LIMIT 1"); + ResultSetMetaData rsmd = rs.getMetaData(); + for (int i = 1; i < rsmd.getColumnCount(); i++) { + if (rsmd.isAutoIncrement(i)) { + result = rsmd.getColumnName(i); + matches++; + } + } + } finally { + rs.close(); + stmt.close(); + } + return (matches == 1 ? result : null); + } + + public static boolean isColumnNullable(Connection conn, String table, String column) + throws SQLException { + ResultSet rs = conn.getMetaData().getColumns(null, null, table, column); + if (rs.getMetaData().getColumnCount() > GET_COLUMNS_IS_NULLABLE) { + // Should only be one match + if (!rs.next()) { + return false; + } + String val = rs.getString(GET_COLUMNS_IS_NULLABLE); + return rs.getString(GET_COLUMNS_IS_NULLABLE).equals("YES"); + } + + return false; + } + + /** + * Format the given Date assuming UTC timezone in a format supported by SQL. + * @param date the date to convert to a String + * @return the formatted string + */ + public static String formatUTC(Date date) { + return DATE_FORMATTER.get().format(date); + } + + /** + * Get the string used for quoting identifiers in this database's SQL dialect. + * @param connection the database connection + * @return the quote string + * @throws SQLException + */ + public static String getIdentifierQuoteString(Connection connection) throws SQLException { + String quoteString = connection.getMetaData().getIdentifierQuoteString(); + quoteString = quoteString == null ? "" : quoteString; + return quoteString; + } + + /** + * Quote the given string. + * @param orig the string to quote + * @param quote the quote character + * @return the quoted string + */ + public static String quoteString(String orig, String quote) { + return quote + orig + quote; + } + + public static DataSource initDataSource(Config config) throws Exception { + Map<String, String> map = new HashMap<>(); + map.put("driverClassName", "com.mysql.cj.jdbc.Driver"); + map.put("url", + "jdbc:mysql://" + config.getJdbcUrl() + "?useSSL=true&verifyServerCertificate=false&serverTimezone=GMT%2B8&characterEncoding=utf8"); + map.put("username", config.getJdbcUsername()); + map.put("password", config.getJdbcPassword()); + map.put("initialSize", "1"); + map.put("maxActive", "1"); + map.put("maxWait", "60000"); + map.put("timeBetweenEvictionRunsMillis", "60000"); + map.put("minEvictableIdleTimeMillis", "300000"); + map.put("validationQuery", "SELECT 1 FROM DUAL"); + map.put("testWhileIdle", "true"); + log.info("{} config read successful", map); + DataSource dataSource = DruidDataSourceFactory.createDataSource(map); + log.info("init data source success"); + return dataSource; + } +} + diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/Config.java b/src/main/java/org/apache/rocketmq/connect/jdbc/config/Config.java similarity index 65% rename from src/main/java/org/apache/rocketmq/connect/jdbc/Config.java rename to src/main/java/org/apache/rocketmq/connect/jdbc/config/Config.java index f93c4db..5491d43 100644 --- a/src/main/java/org/apache/rocketmq/connect/jdbc/Config.java +++ b/src/main/java/org/apache/rocketmq/connect/jdbc/config/Config.java @@ -15,13 +15,11 @@ * limitations under the License. */ -package org.apache.rocketmq.connect.jdbc; +package org.apache.rocketmq.connect.jdbc.config; -import io.openmessaging.KeyValue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.lang.reflect.Method; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -30,35 +28,37 @@ public class Config { private static final Logger LOG = LoggerFactory.getLogger(Config.class); /* Database Connection Config */ - public String jdbcUrl = "localhost:3306"; - public String jdbcUsername = "root"; - public String jdbcPassword = "199812160"; - public String rocketmqTopic; - public String jdbcBackoff; - public String jdbcAttempts; - public String catalogPattern = null; - public List tableWhitelist; - public List tableBlacklist; - public String schemaPattern = null; - public boolean numericPrecisionMapping = false; - public String bumericMapping = null; - public String dialectName = ""; + private String jdbcUrl; + private String jdbcUsername; + private String jdbcPassword; + private String rocketmqTopic; + private String jdbcBackoff; + private String jdbcAttempts; + private String catalogPattern; + private List tableWhitelist; + private List tableBlacklist; + private String schemaPattern; + private boolean numericPrecisionMapping = false; + private String bumericMapping; + private String dialectName = ""; + private String whiteDataBase; + private String whiteTable; /* Mode Config */ - public String mode = ""; - public String incrementingColumnName = ""; - public String query = ""; - public String timestampColmnName = ""; - public boolean validateNonNull = true; + private String mode = ""; + private String incrementingColumnName = ""; + private String query = ""; + private String timestampColmnName = ""; + private boolean validateNonNull = true; /*Connector config*/ - public String tableTypes = "table"; - public long pollInterval = 5000; - public int batchMaxRows = 100; - public long tablePollInterval = 60000; - public long timestampDelayInterval = 0; - public String dbTimezone = "UTC"; - public String queueName; + private String tableTypes = "table"; + private long pollInterval = 5000; + private int batchMaxRows = 100; + private long tablePollInterval = 60000; + private long timestampDelayInterval = 0; + private String dbTimezone = "UTC"; + private String queueName; private Logger log = LoggerFactory.getLogger(Config.class); public static final Set<String> REQUEST_CONFIG = new HashSet<String>() { @@ -71,53 +71,6 @@ public class Config { } }; - - public void load(KeyValue props) { - log.info("Config.load.start"); - properties2Object(props, this); - } - - private void properties2Object(final KeyValue p, final Object object) { - Method[] methods = object.getClass().getMethods(); - for (Method method : methods) { - String mn = method.getName(); - if (mn.startsWith("set")) { - try { - String tmp = mn.substring(4); - String first = mn.substring(3, 4); - - String key = first.toLowerCase() + tmp; - String property = p.getString(key); - if (property != null) { - Class<?>[] pt = method.getParameterTypes(); - if (pt != null && pt.length > 0) { - String cn = pt[0].getSimpleName(); - Object arg = null; - if (cn.equals("int") || cn.equals("Integer")) { - arg = Integer.parseInt(property); - } else if (cn.equals("long") || cn.equals("Long")) { - arg = Long.parseLong(property); - } else if (cn.equals("double") || cn.equals("Double")) { - arg = Double.parseDouble(property); - } else if (cn.equals("boolean") || cn.equals("Boolean")) { - arg = Boolean.parseBoolean(property); - } else if (cn.equals("float") || cn.equals("Float")) { - arg = Float.parseFloat(property); - } else if (cn.equals("String")) { - arg = property; - } else { - continue; - } - method.invoke(object, arg); - - } - } - } catch (Throwable ignored) { - } - } - } - } - public String getQueueName() { return queueName; } @@ -317,4 +270,20 @@ public class Config { public void setDbTimezone(String dbTimezone) { this.dbTimezone = dbTimezone; } + + public String getWhiteDataBase() { + return whiteDataBase; + } + + public void setWhiteDataBase(String whiteDataBase) { + this.whiteDataBase = whiteDataBase; + } + + public String getWhiteTable() { + return whiteTable; + } + + public void setWhiteTable(String whiteTable) { + this.whiteTable = whiteTable; + } } \ No newline at end of file diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/config/ConfigUtil.java b/src/main/java/org/apache/rocketmq/connect/jdbc/config/ConfigUtil.java new file mode 100644 index 0000000..53563f2 --- /dev/null +++ b/src/main/java/org/apache/rocketmq/connect/jdbc/config/ConfigUtil.java @@ -0,0 +1,52 @@ +package org.apache.rocketmq.connect.jdbc.config; + +import io.openmessaging.KeyValue; +import java.lang.reflect.Method; + +public class ConfigUtil { + public static <T> void load(KeyValue props, Object object) { + + properties2Object(props, object); + } + + private static <T> void properties2Object(final KeyValue p, final Object object) { + + Method[] methods = object.getClass().getMethods(); + for (Method method : methods) { + String mn = method.getName(); + if (mn.startsWith("set")) { + try { + String tmp = mn.substring(4); + String first = mn.substring(3, 4); + + String key = first.toLowerCase() + tmp; + String property = p.getString(key); + if (property != null) { + Class<?>[] pt = method.getParameterTypes(); + if (pt != null && pt.length > 0) { + String cn = pt[0].getSimpleName(); + Object arg; + if (cn.equals("int") || cn.equals("Integer")) { + arg = Integer.parseInt(property); + } else if (cn.equals("long") || cn.equals("Long")) { + arg = Long.parseLong(property); + } else if (cn.equals("double") || cn.equals("Double")) { + arg = Double.parseDouble(property); + } else if (cn.equals("boolean") || cn.equals("Boolean")) { + arg = Boolean.parseBoolean(property); + } else if (cn.equals("float") || cn.equals("Float")) { + arg = Float.parseFloat(property); + } else if (cn.equals("String")) { + arg = property; + } else { + continue; + } + method.invoke(object, arg); + } + } + } catch (Throwable ignored) { + } + } + } + } +} diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSinkConnector.java b/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSinkConnector.java new file mode 100644 index 0000000..e1d1256 --- /dev/null +++ b/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSinkConnector.java @@ -0,0 +1,58 @@ +package org.apache.rocketmq.connect.jdbc.connector; + +import io.openmessaging.KeyValue; +import io.openmessaging.connector.api.Task; +import io.openmessaging.connector.api.sink.SinkConnector; +import org.apache.rocketmq.connect.jdbc.config.Config; + +import java.util.ArrayList; +import java.util.List; + + +public class JdbcSinkConnector extends SinkConnector { + + private KeyValue config; + + @Override + public String verifyAndSetConfig(KeyValue config) { + for (String requestKey : Config.REQUEST_CONFIG) { + if (!config.containsKey(requestKey)) { + return "Request config key: " + requestKey; + } + } + this.config = config; + return ""; + } + + @Override + public void start() { + + } + + @Override + public void stop() { + + } + + @Override + public void pause() { + + } + + @Override + public void resume() { + + } + + @Override + public Class<? extends Task> taskClass() { + return JdbcSinkTask.class; + } + + @Override + public List<KeyValue> taskConfigs() { + List<KeyValue> config = new ArrayList<>(); + config.add(this.config); + return config; + } +} diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSinkTask.java b/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSinkTask.java new file mode 100644 index 0000000..4b55e5a --- /dev/null +++ b/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSinkTask.java @@ -0,0 +1,133 @@ +package org.apache.rocketmq.connect.jdbc.connector; + + +import com.alibaba.fastjson.JSONObject; +import io.openmessaging.KeyValue; +import io.openmessaging.connector.api.common.QueueMetaData; +import io.openmessaging.connector.api.data.EntryType; +import io.openmessaging.connector.api.data.Field; +import io.openmessaging.connector.api.data.Schema; +import io.openmessaging.connector.api.data.SinkDataEntry; +import io.openmessaging.connector.api.sink.SinkTask; +import org.apache.rocketmq.connect.jdbc.config.Config; +import org.apache.rocketmq.connect.jdbc.common.DBUtils; +import org.apache.rocketmq.connect.jdbc.config.ConfigUtil; +import org.apache.rocketmq.connect.jdbc.sink.Updater; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.sql.DataSource; +import java.sql.Connection; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +public class JdbcSinkTask extends SinkTask { + + private static final Logger log = LoggerFactory.getLogger(JdbcSinkTask.class); + + private Config config; + private DataSource dataSource; + private Connection connection; + private Updater updater; + private BlockingQueue<Updater> tableQueue = new LinkedBlockingQueue<Updater>(); + + public JdbcSinkTask() { + this.config = new Config(); + } + + @Override + public void put(Collection<SinkDataEntry> sinkDataEntries) { + try { + if (tableQueue.size() > 1) { + updater = tableQueue.poll(1000, TimeUnit.MILLISECONDS); + } else { + updater = tableQueue.peek(); + } + + for (SinkDataEntry record : sinkDataEntries) { + Map<Field, Object[]> fieldMap = new HashMap<>(); + Object[] payloads = record.getPayload(); + Schema schema = record.getSchema(); + EntryType entryType = record.getEntryType(); + String tableName = schema.getName(); + String dbName = schema.getDataSource(); + List<Field> fields = schema.getFields(); + Boolean parseError = false; + if (!fields.isEmpty()) { + for (Field field : fields) { + Object fieldValue = payloads[field.getIndex()]; + Object[] value = JSONObject.parseArray((String)fieldValue).toArray(); + if (value.length == 2) { + fieldMap.put(field, value); + } else { + log.error("parseArray error, fieldValue:{}", fieldValue); + parseError = true; + } + } + } + if (!parseError) { + Boolean isSuccess = updater.push(dbName, tableName, fieldMap, entryType); + if (!isSuccess) { + log.error("push data error, dbName:{}, tableName:{}, entryType:{}, fieldMap:{}", dbName, tableName, fieldMap, entryType); + } + } + } + } catch (Exception e) { + log.error("put sinkDataEntries error, {}", e); + } + } + + @Override + public void commit(Map<QueueMetaData, Long> map) { + + } + + @Override + public void start(KeyValue props) { + try { + ConfigUtil.load(props, this.config); + dataSource = DBUtils.initDataSource(config); + connection = dataSource.getConnection(); + log.info("init data source success"); + } catch (Exception e) { + log.error("Cannot start Jdbc Sink Task because of configuration error{}", e); + } + String mode = config.getMode(); + if (mode.equals("bulk")) { + Updater updater = new Updater(config, connection); + try { + updater.start(); + tableQueue.add(updater); + } catch (Exception e) { + log.error("fail to start updater{}", e); + } + } + } + + @Override + public void stop() { + try { + if (connection != null){ + connection.close(); + } + } catch (Throwable e) { + log.warn("sink task stop error while closing connection to {}", "jdbc", e); + } + } + + @Override + public void pause() { + + } + + @Override + public void resume() { + + } + +} diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConnector.java b/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConnector.java index 4a870c0..796f0e6 100644 --- a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConnector.java +++ b/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConnector.java @@ -20,7 +20,7 @@ package org.apache.rocketmq.connect.jdbc.connector; import java.util.ArrayList; import java.util.List; -import org.apache.rocketmq.connect.jdbc.Config; +import org.apache.rocketmq.connect.jdbc.config.Config; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java b/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java index 767c3aa..943d432 100644 --- a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java +++ b/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java @@ -21,19 +21,19 @@ package org.apache.rocketmq.connect.jdbc.connector; import io.openmessaging.connector.api.source.SourceTask; import java.nio.ByteBuffer; -import java.sql.SQLException; +import java.sql.Connection; import java.util.ArrayList; import java.util.Collection; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Timer; -import java.util.TimerTask; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; -import org.apache.rocketmq.connect.jdbc.Config; +import org.apache.rocketmq.connect.jdbc.config.Config; +import org.apache.rocketmq.connect.jdbc.common.DBUtils; +import org.apache.rocketmq.connect.jdbc.config.ConfigUtil; import org.apache.rocketmq.connect.jdbc.schema.Table; import org.apache.rocketmq.connect.jdbc.source.Querier; import org.apache.rocketmq.connect.jdbc.source.TimestampIncrementingQuerier; @@ -53,17 +53,27 @@ import com.alibaba.fastjson.JSONObject; import io.openmessaging.connector.api.data.DataEntryBuilder; import io.openmessaging.connector.api.data.Field; +import javax.sql.DataSource; + public class JdbcSourceTask extends SourceTask { private static final Logger log = LoggerFactory.getLogger(JdbcSourceTask.class); private Config config; + private DataSource dataSource; + + private Connection connection; + BlockingQueue<Querier> tableQueue = new LinkedBlockingQueue<Querier>(); static final String INCREMENTING_FIELD = "incrementing"; static final String TIMESTAMP_FIELD = "timestamp"; private Querier querier; + public JdbcSourceTask() { + this.config = new Config(); + } + @Override public Collection<SourceDataEntry> poll() { List<SourceDataEntry> res = new ArrayList<>(); @@ -96,42 +106,45 @@ public class JdbcSourceTask extends SourceTask { } DataEntryBuilder dataEntryBuilder = new DataEntryBuilder(schema); dataEntryBuilder.timestamp(System.currentTimeMillis()).queue(dataRow.getName()) - .entryType(EntryType.CREATE); + .entryType(EntryType.UPDATE); for (int i = 0; i < dataRow.getColList().size(); i++) { - Object value = dataRow.getDataList().get(i); - // System.out.println(dataRow.getColList().get(i) + "|" + value); - dataEntryBuilder.putFiled(dataRow.getColList().get(i), value); + Object[] value = new Object[2]; + value[0] = value[1] = dataRow.getDataList().get(i); + dataEntryBuilder.putFiled(dataRow.getColList().get(i), JSONObject.toJSONString(value)); } SourceDataEntry sourceDataEntry = dataEntryBuilder.buildSourceDataEntry( - ByteBuffer.wrap(config.jdbcUrl.getBytes("UTF-8")), + ByteBuffer.wrap(config.getJdbcUrl().getBytes("UTF-8")), ByteBuffer.wrap(jsonObject.toJSONString().getBytes("UTF-8"))); res.add(sourceDataEntry); - + log.debug("sourceDataEntry : {}", JSONObject.toJSONString(sourceDataEntry)); } } catch (Exception e) { log.error("JDBC task poll error, current config:" + JSON.toJSONString(config), e); } - log.info("dataEntry poll successfully,{}", res); + log.debug("dataEntry poll successfully,{}", JSONObject.toJSONString(res)); return res; } @Override public void start(KeyValue props) { - config = new Config(); - config.load(props); - + try { + ConfigUtil.load(props, this.config); + dataSource = DBUtils.initDataSource(config); + connection = dataSource.getConnection(); + log.info("init data source success"); + } catch (Exception e) { + log.error("Cannot start Jdbc Source Task because of configuration error{}", e); + } Map<Map<String, String>, Map<String, Object>> offsets = null; - String mode = config.mode; + String mode = config.getMode(); if (mode.equals("bulk")) { - Querier querier = new Querier(); + Querier querier = new Querier(config, connection); try { - querier.setConfig(config); querier.start(); tableQueue.add(querier); } catch (Exception e) { - // TODO Auto-generated catch block - log.error("Start unsuccessfully Because of {}",e); + log.error("start querier failed in bulk mode{}", e); } } else { TimestampIncrementingQuerier querier = new TimestampIncrementingQuerier(); @@ -140,8 +153,7 @@ public class JdbcSourceTask extends SourceTask { querier.start(); tableQueue.add(querier); } catch (Exception e) { - // TODO Auto-generated catch block - log.error("Start unsuccessfully Because of {}",e); + log.error("fail to start querier{}", e); } } @@ -150,7 +162,13 @@ public class JdbcSourceTask extends SourceTask { @Override public void stop() { - querier.stop(); + try { + if (connection != null){ + connection.close(); + } + } catch (Throwable e) { + log.warn("source task stop error while closing connection to {}", "jdbc", e); + } } @Override diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/schema/Database.java b/src/main/java/org/apache/rocketmq/connect/jdbc/schema/Database.java index 657ebb8..49e28cd 100644 --- a/src/main/java/org/apache/rocketmq/connect/jdbc/schema/Database.java +++ b/src/main/java/org/apache/rocketmq/connect/jdbc/schema/Database.java @@ -17,41 +17,41 @@ package org.apache.rocketmq.connect.jdbc.schema; -//import io.openmessaging.mysql.binlog.EventProcessor; - import org.apache.rocketmq.connect.jdbc.schema.column.ColumnParser; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; -import java.util.HashMap; -import java.util.Map; -import javax.sql.DataSource; +import java.util.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class Database { + private static final Logger LOGGER = LoggerFactory.getLogger(Database.class); + private static final String SQL = "select table_name,column_name,data_type,column_type,character_set_name " + "from information_schema.columns " + "where table_schema = ? order by ORDINAL_POSITION"; private String name; - private DataSource dataSource; - public Map<String, Table> tableMap = new HashMap<String, Table>(); - public Database(String name, DataSource dataSource) { + private Connection connection; + + private Map<String, Table> tableMap = new HashMap<String, Table>(); + + public Set<String> tableWhiteList; + + public Database(String name, Connection connection, Set<String> tableWhiteList) { this.name = name; - this.dataSource = dataSource; + this.connection = connection; + this.tableWhiteList = tableWhiteList; } public void init() throws SQLException { - Connection conn = null; PreparedStatement ps = null; ResultSet rs = null; try { - conn = dataSource.getConnection(); - - ps = conn.prepareStatement(SQL); + ps = connection.prepareStatement(SQL); ps.setString(1, name); rs = ps.executeQuery(); @@ -63,7 +63,9 @@ public class Database { String charset = rs.getString(5); ColumnParser columnParser = ColumnParser.getColumnParser(dataType, colType, charset); - + if (!tableWhiteList.contains(tableName)){ + continue; + } if (!tableMap.containsKey(tableName)) { addTable(tableName); } @@ -74,21 +76,20 @@ public class Database { } } finally { - if (conn != null) { - conn.close(); + if (rs != null) { + rs.close(); } if (ps != null) { ps.close(); } - if (rs != null) { - rs.close(); - } } } private void addTable(String tableName) { + LOGGER.info("Schema load -- DATABASE:{},\tTABLE:{}", name, tableName); + Table table = new Table(name, tableName); tableMap.put(tableName, table); } @@ -97,4 +98,8 @@ public class Database { return tableMap.get(tableName); } + + public Map<String, Table> getTableMap() { + return tableMap; + } } diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/schema/Schema.java b/src/main/java/org/apache/rocketmq/connect/jdbc/schema/Schema.java index 93b204f..16d636f 100644 --- a/src/main/java/org/apache/rocketmq/connect/jdbc/schema/Schema.java +++ b/src/main/java/org/apache/rocketmq/connect/jdbc/schema/Schema.java @@ -21,65 +21,59 @@ import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import javax.sql.DataSource; +import java.util.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class Schema { + private static final Logger LOGGER = LoggerFactory.getLogger(Schema.class); private static final String SQL = "select schema_name from information_schema.schemata"; - //acquiring databases + private static final List<String> IGNORED_DATABASES = new ArrayList<>( Arrays.asList(new String[] {"information_schema", "mysql", "performance_schema", "sys"}) ); - //ignored databases (System Databases) - private DataSource dataSource; - public Map<String, Database> dbMap; + public Set<String> dataBaseWhiteList; + + public Set<String> tableWhiteList; + + private Connection connection; + + private Map<String, Database> dbMap; - public Schema(DataSource dataSource) { - this.dataSource = dataSource; + public Schema(Connection connection) { + this.connection = connection; + this.dataBaseWhiteList = new HashSet<>(); + this.tableWhiteList = new HashSet<>(); } public void load() throws SQLException { dbMap = new HashMap<>(); - Connection conn = null; PreparedStatement ps = null; ResultSet rs = null; try { - conn = dataSource.getConnection(); - - ps = conn.prepareStatement(SQL); + ps = connection.prepareStatement(SQL); rs = ps.executeQuery(); while (rs.next()) { String dbName = rs.getString(1); - if (!IGNORED_DATABASES.contains(dbName)) { - Database database = new Database(dbName, dataSource); + if (!IGNORED_DATABASES.contains(dbName) && dataBaseWhiteList.contains(dbName)) { + Database database = new Database(dbName, connection, tableWhiteList); dbMap.put(dbName, database); - //dbMap存着各个数据库的名字 } } } finally { - - if (conn != null) { - conn.close(); + if (rs != null) { + rs.close(); } if (ps != null) { ps.close(); } - if (rs != null) { - rs.close(); - } } for (Database db : dbMap.values()) { @@ -114,8 +108,7 @@ public class Schema { load(); break; } catch (Exception e) { - // LOGGER.error("Reload schema error.", e); - System.out.println("Reload schema error." + e); + LOGGER.error("Reload schema error.", e); } } } @@ -123,4 +116,8 @@ public class Schema { public void reset() { dbMap = null; } + + public Map<String, Database> getDbMap() { + return dbMap; + } } diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/schema/column/DateTimeColumnParser.java b/src/main/java/org/apache/rocketmq/connect/jdbc/schema/column/DateTimeColumnParser.java index 8736280..c9b39e3 100644 --- a/src/main/java/org/apache/rocketmq/connect/jdbc/schema/column/DateTimeColumnParser.java +++ b/src/main/java/org/apache/rocketmq/connect/jdbc/schema/column/DateTimeColumnParser.java @@ -30,7 +30,7 @@ public class DateTimeColumnParser extends ColumnParser { static { dateTimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); dateTimeUtcFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); - dateTimeUtcFormat.setTimeZone(TimeZone.getTimeZone("UTC")); + dateTimeUtcFormat.setTimeZone(TimeZone.getTimeZone("GMT+8")); } @Override diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/sink/Updater.java b/src/main/java/org/apache/rocketmq/connect/jdbc/sink/Updater.java new file mode 100644 index 0000000..3571852 --- /dev/null +++ b/src/main/java/org/apache/rocketmq/connect/jdbc/sink/Updater.java @@ -0,0 +1,196 @@ +package org.apache.rocketmq.connect.jdbc.sink; + +import io.openmessaging.connector.api.data.EntryType; +import io.openmessaging.connector.api.data.Field; +import io.openmessaging.connector.api.data.FieldType; +import org.apache.rocketmq.connect.jdbc.config.Config; +import org.apache.rocketmq.connect.jdbc.schema.Schema; +import org.apache.rocketmq.connect.jdbc.schema.column.DateTimeColumnParser; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; + +public class Updater { + private final Logger log = LoggerFactory.getLogger(getClass()); + private final Queue<Connection> connections = new ConcurrentLinkedQueue<>(); + private Config config; + private Schema schema; + private Connection connection; + + public Updater(Config config, Connection connection) { + this.config = config; + this.connection = connection; + this.schema = new Schema(connection); + } + + public boolean push(String dbName, String tableName, Map<Field, Object[]> fieldMap, EntryType entryType) { + Boolean isSuccess = false; + int id = 0; + switch (entryType) { + case CREATE: + isSuccess = updateRow(dbName, tableName, fieldMap, id); + break; + case UPDATE: + id = queryRowId(dbName, tableName, fieldMap); + isSuccess = updateRow(dbName, tableName, fieldMap, id); + break; + case DELETE: + id = queryRowId(dbName, tableName, fieldMap); + isSuccess = deleteRow(dbName, tableName, id); + break; + default: + log.error("entryType {} is illegal.", entryType.toString()); + } + return isSuccess; + } + + public void start() throws Exception { + schema.load(); + log.info("schema load success"); + } + + public Config getConfig() { + return config; + } + + public void setConfig(Config config) { + this.config = config; + } + + private String typeParser(FieldType fieldType, String fieldName, Object fieldValue, String sql) { + switch (fieldType) { + case STRING: + sql += fieldName + " = " + "'" + fieldValue + "'"; + break; + case DATETIME: + sql += fieldName + " = " + "'" + new DateTimeColumnParser().getValue(fieldValue) + "'"; + break; + case INT32: + case INT64: + case FLOAT32: + case FLOAT64: + case BIG_INTEGER: + sql += fieldName + " = " + fieldValue; + break; + default: + log.error("fieldType {} is illegal.", fieldType.toString()); + } + return sql; + } + + private Integer queryRowId(String dbName, String tableName, Map<Field, Object[]> fieldMap) { + int count = 0, id = 0; + ResultSet rs; + PreparedStatement stmt; + Boolean finishQuery = false; + String query = "select id from " + dbName + "." + tableName + " where "; + + for (Map.Entry<Field, Object[]> entry : fieldMap.entrySet()) { + count ++; + String fieldName = entry.getKey().getName(); + FieldType fieldType = entry.getKey().getType(); + Object fieldValue = entry.getValue()[0]; + if ("id".equals(fieldName)) + continue; + if (count != 1) { + query += " and "; + } + if (fieldValue == null) + { + query += fieldName + " is NULL"; + } else { + query = typeParser(fieldType, fieldName, fieldValue, query); + } + } + + try { + while (!connection.isClosed() && !finishQuery){ + stmt = connection.prepareStatement(query); + rs = stmt.executeQuery(); + if (rs != null) { + while (rs.next()) { + id = rs.getInt("id"); + } + finishQuery = true; + rs.close(); + } + } + } catch (SQLException e) { + log.error("query table error,{}", e); + } + return id; + } + + private Boolean updateRow(String dbName, String tableName, Map<Field, Object[]> fieldMap, Integer id) { + int count = 0; + PreparedStatement stmt; + boolean finishUpdate = false; + String update = "replace into " + dbName + "." + tableName + " set "; + + for (Map.Entry<Field, Object[]> entry : fieldMap.entrySet()) { + count++; + String fieldName = entry.getKey().getName(); + FieldType fieldType = entry.getKey().getType(); + Object fieldValue = entry.getValue()[1]; + if ("id".equals(fieldName)) { + if (id == 0) + continue; + else + fieldValue = id; + } + if (count != 1) { + update += ", "; + } + if (fieldValue == null) { + update += fieldName + " = NULL"; + } else { + update = typeParser(fieldType, fieldName, fieldValue, update); + } + } + + try { + while (!connection.isClosed() && !finishUpdate){ + stmt = connection.prepareStatement(update); + int result = stmt.executeUpdate(); + if (result > 0) { + log.info("replace into table success"); + return true; + } + finishUpdate = true; + stmt.close(); + } + } catch (SQLException e) { + log.error("update table error,{}", e); + } + return false; + } + + private Boolean deleteRow(String dbName, String tableName, Integer id) { + PreparedStatement stmt; + String delete = "delete from " + dbName + "." + tableName + " where id = " + id ; + boolean finishDelete = false; + try { + while (!connection.isClosed() && !finishDelete){ + stmt = connection.prepareStatement(delete); + int result = stmt.executeUpdate(); + if (result > 0) { + log.info("delete from table success"); + return true; + } + finishDelete = true; + stmt.close(); + } + } catch (SQLException e) { + log.error("delete from table error,{}", e); + } + return false; + } + +} diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/source/JdbcUtils.java b/src/main/java/org/apache/rocketmq/connect/jdbc/source/JdbcUtils.java deleted file mode 100644 index cbcca6a..0000000 --- a/src/main/java/org/apache/rocketmq/connect/jdbc/source/JdbcUtils.java +++ /dev/null @@ -1,198 +0,0 @@ - -/** - * Copyright 2015 Confluent Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ - -package org.apache.rocketmq.connect.jdbc.source; -import org.apache.rocketmq.connect.jdbc.connector.JdbcSourceTask; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.sql.Connection; -import java.sql.DatabaseMetaData; -import java.sql.ResultSet; -import java.sql.ResultSetMetaData; -import java.sql.SQLException; -import java.sql.Statement; -import java.text.SimpleDateFormat; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.Date; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.TimeZone; - -/** - * Utilties for interacting with a JDBC database. - */ -public class JdbcUtils { - - private static final Logger log = LoggerFactory.getLogger(JdbcSourceTask.class); - - /** - * The default table types to include when listing tables if none are specified. Valid values - * are those specified by the @{java.sql.DatabaseMetaData#getTables} method's TABLE_TYPE column. - * The default only includes standard, user-defined tables. - */ - public static final Set<String> DEFAULT_TABLE_TYPES = Collections.unmodifiableSet( - new HashSet<String>(Arrays.asList("TABLE")) - ); - - private static final int GET_TABLES_TYPE_COLUMN = 4; - private static final int GET_TABLES_NAME_COLUMN = 3; - - private static final int GET_COLUMNS_COLUMN_NAME = 4; - private static final int GET_COLUMNS_IS_NULLABLE = 18; - private static final int GET_COLUMNS_IS_AUTOINCREMENT = 23; - - - private static ThreadLocal<SimpleDateFormat> DATE_FORMATTER = new ThreadLocal<SimpleDateFormat>() { - @Override - protected SimpleDateFormat initialValue() { - SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); - sdf.setTimeZone(TimeZone.getTimeZone("UTC")); - return sdf; - } - }; - - /** - * Get a list of tables in the database. This uses the default filters, which only include - * user-defined tables. - * @param conn database connection - * @return a list of tables - * @throws SQLException - */ - public static List<String> getTables(Connection conn) throws SQLException { - return getTables(conn, DEFAULT_TABLE_TYPES); - } - - /** - * Get a list of table names in the database. - * @param conn database connection - * @param types a set of table types that should be included in the results - * @throws SQLException - */ - public static List<String> getTables(Connection conn, Set<String> types) throws SQLException { - DatabaseMetaData metadata = conn.getMetaData(); - ResultSet rs = metadata.getTables(null, null, "%", null); - List<String> tableNames = new ArrayList<String>(); - while (rs.next()) { - if (types.contains(rs.getString(GET_TABLES_TYPE_COLUMN))) { - String colName = rs.getString(GET_TABLES_NAME_COLUMN); - // SQLite JDBC driver does not correctly mark these as system tables - if (metadata.getDatabaseProductName().equals("SQLite") && colName.startsWith("sqlite_")) { - continue; - } - - tableNames.add(colName); - } - } - return tableNames; - } - - /** - * Look up the autoincrement column for the specified table. - * @param conn database connection - * @param table the table to - * @return the name of the column that is an autoincrement column, or null if there is no - * autoincrement column or more than one exists - * @throws SQLException - */ - public static String getAutoincrementColumn(Connection conn, String table) throws SQLException { - String result = null; - int matches = 0; - - ResultSet rs = conn.getMetaData().getColumns(null, null, table, "%"); - // Some database drivers (SQLite) don't include all the columns - if (rs.getMetaData().getColumnCount() >= GET_COLUMNS_IS_AUTOINCREMENT) { - while(rs.next()) { - if (rs.getString(GET_COLUMNS_IS_AUTOINCREMENT).equals("YES")) { - result = rs.getString(GET_COLUMNS_COLUMN_NAME); - matches++; - } - } - return (matches == 1 ? result : null); - } - - // Fallback approach is to query for a single row. This unfortunately does not work with any - // empty table - log.trace("Falling back to SELECT detection of auto-increment column for {}:{}", conn, table); - Statement stmt = conn.createStatement(); - try { - String quoteString = getIdentifierQuoteString(conn); - rs = stmt.executeQuery("SELECT * FROM " + quoteString + table + quoteString + " LIMIT 1"); - ResultSetMetaData rsmd = rs.getMetaData(); - for(int i = 1; i < rsmd.getColumnCount(); i++) { - if (rsmd.isAutoIncrement(i)) { - result = rsmd.getColumnName(i); - matches++; - } - } - } finally { - rs.close(); - stmt.close(); - } - return (matches == 1 ? result : null); - } - - public static boolean isColumnNullable(Connection conn, String table, String column) - throws SQLException { - ResultSet rs = conn.getMetaData().getColumns(null, null, table, column); - if (rs.getMetaData().getColumnCount() > GET_COLUMNS_IS_NULLABLE) { - // Should only be one match - if (!rs.next()) { - return false; - } - String val = rs.getString(GET_COLUMNS_IS_NULLABLE); - return rs.getString(GET_COLUMNS_IS_NULLABLE).equals("YES"); - } - - return false; - } - - /** - * Format the given Date assuming UTC timezone in a format supported by SQL. - * @param date the date to convert to a String - * @return the formatted string - */ - public static String formatUTC(Date date) { - return DATE_FORMATTER.get().format(date); - } - - /** - * Get the string used for quoting identifiers in this database's SQL dialect. - * @param connection the database connection - * @return the quote string - * @throws SQLException - */ - public static String getIdentifierQuoteString(Connection connection) throws SQLException { - String quoteString = connection.getMetaData().getIdentifierQuoteString(); - quoteString = quoteString == null ? "" : quoteString; - return quoteString; - } - - /** - * Quote the given string. - * @param orig the string to quote - * @param quote the quote character - * @return the quoted string - */ - public static String quoteString(String orig, String quote) { - return quote + orig + quote; - } -} - diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/source/Querier.java b/src/main/java/org/apache/rocketmq/connect/jdbc/source/Querier.java index 13cbda5..8da3f21 100644 --- a/src/main/java/org/apache/rocketmq/connect/jdbc/source/Querier.java +++ b/src/main/java/org/apache/rocketmq/connect/jdbc/source/Querier.java @@ -1,104 +1,69 @@ package org.apache.rocketmq.connect.jdbc.source; +import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.connect.jdbc.config.Config; +import org.apache.rocketmq.connect.jdbc.schema.Database; +import org.apache.rocketmq.connect.jdbc.schema.Schema; +import org.apache.rocketmq.connect.jdbc.schema.Table; +import org.apache.rocketmq.connect.jdbc.schema.column.ColumnParser; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.sql.Connection; -import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.Queue; +import java.util.*; import java.util.concurrent.ConcurrentLinkedQueue; -import javax.sql.DataSource; -import org.apache.rocketmq.connect.jdbc.Config; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import com.alibaba.druid.pool.DruidDataSourceFactory; -import org.apache.rocketmq.connect.jdbc.schema.*; -import org.apache.rocketmq.connect.jdbc.schema.column.ColumnParser; public class Querier { - private final Logger log = LoggerFactory.getLogger(getClass()); // use concrete subclass + private final Logger log = LoggerFactory.getLogger(Querier.class); // use concrete subclass protected String topicPrefix; protected String jdbcUrl; private final Queue<Connection> connections = new ConcurrentLinkedQueue<>(); private Config config; - - /** - * @return the config - */ - public Config getConfig() { - return config; - } - - public void setConfig(Config config) { - this.config = config; - log.info("config load successfully"); - } - - private DataSource dataSource; + private Connection connection; private List<Table> list = new LinkedList<>(); private String mode; + private Schema schema; + public Querier(){ - public DataSource getDataSource() { - return dataSource; } - public void setDataSource(DataSource dataSource) { - this.dataSource = dataSource; + public Querier(Config config, Connection connection) { + this.config = config; + this.connection = connection; + this.schema = new Schema(connection); } - public String getMode() { - return mode; + /** + * @return the config + */ + public Config getConfig() { + return config; } - public void setMode(String mode) { - this.mode = mode; + public void setConfig(Config config) { + this.config = config; } +// public String getMode() { +// return mode; +// } +// +// public void setMode(String mode) { +// this.mode = mode; +// } public List<Table> getList() { return list; } - public void setList(List<Table> list) { - this.list = list; - } - - public Connection getConnection() throws SQLException { - // These config names are the same for both source and sink configs ... - String username = config.jdbcUsername; - String dbPassword = config.jdbcPassword; - jdbcUrl = config.jdbcUrl; - Properties properties = new Properties(); - if (username != null) { - properties.setProperty("user", username); - } - if (dbPassword != null) { - properties.setProperty("password", dbPassword); - } - Connection connection = DriverManager.getConnection(jdbcUrl, properties); - - connections.add(connection); - return connection; - } - - public void stop() { - Connection conn; - while ((conn = connections.poll()) != null) { - try { - conn.close(); - } catch (Throwable e) { - log.warn("Error while closing connection to {}", "jdbc", e); - } - } - } +// public void setList(List<Table> list) { +// this.list = list; +// } protected PreparedStatement createDBPreparedStatement(Connection db) throws SQLException { @@ -123,32 +88,18 @@ public class Querier { return stmt.executeQuery(); } - private Schema schema; - - public static void main(String[] args) throws Exception { - TimestampIncrementingQuerier querier = new TimestampIncrementingQuerier(); - try { - querier.start(); - querier.poll(); - } catch (SQLException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - } - - public void poll() { + public void poll() { try { - PreparedStatement stmt; String query = "select * from "; - Connection conn = dataSource.getConnection(); - for (Map.Entry<String, Database> entry : schema.dbMap.entrySet()) { + LinkedList<Table> tableLinkedList = new LinkedList<>(); + for (Map.Entry<String, Database> entry : schema.getDbMap().entrySet()) { String db = entry.getKey(); - Iterator<Map.Entry<String, Table>> iterator = entry.getValue().tableMap.entrySet().iterator(); + Iterator<Map.Entry<String, Table>> iterator = entry.getValue().getTableMap().entrySet().iterator(); while (iterator.hasNext()) { Map.Entry<String, Table> tableEntry = iterator.next(); String tb = tableEntry.getKey(); - stmt = conn.prepareStatement(query + db + "." + tb); + stmt = connection.prepareStatement(query + db + "." + tb); ResultSet rs; rs = stmt.executeQuery(); List<String> colList = tableEntry.getValue().getColList(); @@ -157,64 +108,45 @@ public class Querier { while (rs.next()) { Table table = new Table(db, tb); - System.out.print("|"); + //System.out.print("|"); table.setColList(colList); table.setRawDataTypeList(DataTypeList); table.setParserList(ParserList); for (String string : colList) { table.getDataList().add(rs.getObject(string)); - System.out.print(string + " : " + rs.getObject(string) + "|"); + //System.out.print(string + " : " + rs.getObject(string) + "|"); } - list.add(table); - System.out.println(); + tableLinkedList.add(table); } + rs.close(); + stmt.close(); } } - + list = tableLinkedList; } catch (SQLException e) { - e.printStackTrace(); + log.error("fail to poll data, {}", e); } } public void start() throws Exception { - try { + String whiteDataBases = config.getWhiteDataBase(); + String whiteTables = config.getWhiteTable(); - log.info("datasorce success"); - initDataSource(); - } catch (Throwable exception) { - log.info("error,{}", exception); + if (!StringUtils.isEmpty(whiteDataBases)) { + Arrays.asList(whiteDataBases.trim().split(",")).forEach(whiteDataBase -> { + Collections.addAll(schema.dataBaseWhiteList, whiteDataBase); + }); } - schema = new Schema(dataSource); - schema.load(); - log.info("schema load successful"); - } - private void initDataSource() throws Exception { - Map<String, String> map = new HashMap<>(); - - map.put("driverClassName", "com.mysql.cj.jdbc.Driver"); - map.put("url", - "jdbc:mysql://" + config.jdbcUrl + "?useSSL=true&verifyServerCertificate=false&serverTimezone=GMT%2B8"); - map.put("username", config.jdbcUsername); - map.put("password", config.jdbcPassword); - map.put("initialSize", "2"); - map.put("maxActive", "2"); - map.put("maxWait", "60000"); - map.put("timeBetweenEvictionRunsMillis", "60000"); - map.put("minEvictableIdleTimeMillis", "300000"); - map.put("validationQuery", "SELECT 1 FROM DUAL"); - map.put("testWhileIdle", "true"); - log.info("{} config read successful", map); - try { - dataSource = DruidDataSourceFactory.createDataSource(map); - } catch (Exception exception) { - log.info("exeception,{}", exception); - } catch (Error e) { - log.info("error,{},e", e); + if (!StringUtils.isEmpty(whiteTables)) { + Arrays.asList(whiteTables.trim().split(",")).forEach(whiteTable -> { + Collections.addAll(schema.tableWhiteList, whiteTable); + }); } - log.info("datasorce success"); + schema.load(); + log.info("load schema success"); } } diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/source/TimestampIncrementingQuerier.java b/src/main/java/org/apache/rocketmq/connect/jdbc/source/TimestampIncrementingQuerier.java index 3201456..964322d 100644 --- a/src/main/java/org/apache/rocketmq/connect/jdbc/source/TimestampIncrementingQuerier.java +++ b/src/main/java/org/apache/rocketmq/connect/jdbc/source/TimestampIncrementingQuerier.java @@ -1,7 +1,6 @@ package org.apache.rocketmq.connect.jdbc.source; import java.sql.Connection; -import java.sql.Date; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; @@ -14,13 +13,12 @@ import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Queue; import java.util.TimeZone; -import java.util.concurrent.ConcurrentLinkedQueue; import javax.sql.DataSource; -import org.apache.rocketmq.connect.jdbc.Config; +import org.apache.rocketmq.connect.jdbc.config.Config; +import org.apache.rocketmq.connect.jdbc.common.DBUtils; import org.apache.rocketmq.connect.jdbc.schema.Database; import org.apache.rocketmq.connect.jdbc.schema.Schema; import org.apache.rocketmq.connect.jdbc.schema.Table; @@ -111,7 +109,7 @@ public class TimestampIncrementingQuerier extends Querier { protected void createPreparedStatement(Connection conn) throws SQLException { // Default when unspecified uses an autoincrementing column if (incrementingColumn != null && incrementingColumn.isEmpty()) { - incrementingColumn = JdbcUtils.getAutoincrementColumn(conn, name); + incrementingColumn = DBUtils.getAutoincrementColumn(conn, name); } String quoteString = conn.getMetaData().getIdentifierQuoteString(); @@ -205,7 +203,7 @@ public class TimestampIncrementingQuerier extends Querier { log.info("{}·", stmt); log.info("{},{}", incrementingOffset, timestampOffset); return stmt.executeQuery(); - } + } public List<Table> getList() { return list; @@ -219,10 +217,10 @@ public class TimestampIncrementingQuerier extends Querier { try { list.clear(); Connection conn = dataSource.getConnection(); - for (Map.Entry<String, Database> entry : schema.dbMap.entrySet()) { + for (Map.Entry<String, Database> entry : schema.getDbMap().entrySet()) { String db = entry.getKey(); log.info("{} database is loading", db); - Iterator<Map.Entry<String, Table>> iterator = entry.getValue().tableMap.entrySet().iterator(); + Iterator<Map.Entry<String, Table>> iterator = entry.getValue().getTableMap().entrySet().iterator(); while (iterator.hasNext()) { Map.Entry<String, Table> tableEntry = iterator.next(); String tb = tableEntry.getKey(); @@ -279,20 +277,20 @@ public class TimestampIncrementingQuerier extends Querier { } catch (Throwable exception) { log.info("error,{}", exception); } - schema = new Schema(dataSource); + schema = new Schema(dataSource.getConnection()); schema.load(); } - private void initDataSource() throws Exception { + public void initDataSource() throws Exception { Map<String, String> map = new HashMap<>(); config = super.getConfig(); timestampColumn = config.getTimestampColmnName(); incrementingColumn = config.getIncrementingColumnName(); map.put("driverClassName", "com.mysql.cj.jdbc.Driver"); map.put("url", - "jdbc:mysql://" + config.jdbcUrl + "?useSSL=true&verifyServerCertificate=false&serverTimezone=GMT%2B8"); - map.put("username", config.jdbcUsername); - map.put("password", config.jdbcPassword); + "jdbc:mysql://" + config.getJdbcUrl() + "?useSSL=true&verifyServerCertificate=false&serverTimezone=GMT%2B8"); + map.put("username", config.getJdbcUsername()); + map.put("password", config.getJdbcPassword()); map.put("initialSize", "2"); map.put("maxActive", "2"); map.put("maxWait", "60000");
