This is an automated email from the ASF dual-hosted git repository. zhoubo pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
commit 1376d8206633c494dab351b0b88723be0a6af102 Author: yuchenlichuck <[email protected]> AuthorDate: Thu Aug 15 21:57:54 2019 +0800 Develop TimestampIncrementingQuerier Mode --- .../org/apache/rocketmq/connect/jdbc/Config.java | 7 +- .../jdbc/connector/JdbcSourceConnector.java | 18 +- .../connect/jdbc/connector/JdbcSourceTask.java | 100 +++++-- .../rocketmq/connect/jdbc/source/JdbcUtils.java | 198 +++++++++++++ .../rocketmq/connect/jdbc/source/Querier.java | 77 +++-- .../jdbc/source/TimestampIncrementingQuerier.java | 315 +++++++++++++++++++++ .../connect/jdbc/connector/JdbcSourceTaskTest.java | 89 +++++- 7 files changed, 727 insertions(+), 77 deletions(-) diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/Config.java b/src/main/java/org/apache/rocketmq/connect/jdbc/Config.java index 533d53b..f93c4db 100644 --- a/src/main/java/org/apache/rocketmq/connect/jdbc/Config.java +++ b/src/main/java/org/apache/rocketmq/connect/jdbc/Config.java @@ -27,8 +27,6 @@ import java.util.List; import java.util.Set; public class Config { - @SuppressWarnings("serial") - private static final Logger LOG = LoggerFactory.getLogger(Config.class); /* Database Connection Config */ @@ -68,11 +66,12 @@ public class Config { add("jdbcUrl"); add("jdbcUsername"); add("jdbcPassword"); - // add("mode"); - // add("rocketmqTopic"); + add("mode"); + add("rocketmqTopic"); } }; + public void load(KeyValue props) { log.info("Config.load.start"); properties2Object(props, this); diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConnector.java b/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConnector.java index bdbeb8b..4a870c0 100644 --- a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConnector.java +++ b/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConnector.java @@ -19,17 +19,15 @@ package org.apache.rocketmq.connect.jdbc.connector; import java.util.ArrayList; import java.util.List; -import java.util.Set; - -import io.openmessaging.KeyValue; -import io.openmessaging.connector.api.Task; -import io.openmessaging.connector.api.source.SourceConnector; import org.apache.rocketmq.connect.jdbc.Config; -import org.apache.rocketmq.connect.jdbc.connector.JdbcSourceTask; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.openmessaging.KeyValue; +import io.openmessaging.connector.api.Task; +import io.openmessaging.connector.api.source.SourceConnector; + public class JdbcSourceConnector extends SourceConnector { private static final Logger log = LoggerFactory.getLogger(JdbcSourceConnector.class); private KeyValue config; @@ -37,7 +35,7 @@ public class JdbcSourceConnector extends SourceConnector { @Override public String verifyAndSetConfig(KeyValue config) { - log.info("JdbcSourceConnector verifyAndSetConfig enter"); + log.info("1216123 JdbcSourceConnector verifyAndSetConfig enter"); for (String requestKey : Config.REQUEST_CONFIG) { if (!config.containsKey(requestKey)) { @@ -59,11 +57,13 @@ public class JdbcSourceConnector extends SourceConnector { } - @Override public void pause() { + @Override + public void pause() { } - @Override public void resume() { + @Override + public void resume() { } diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java b/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java index 91659ec..45252bb 100644 --- a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java +++ b/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java @@ -1,5 +1,4 @@ - /* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with @@ -20,20 +19,31 @@ package org.apache.rocketmq.connect.jdbc.connector; import io.openmessaging.connector.api.source.SourceTask; + import java.nio.ByteBuffer; +import java.sql.SQLException; import java.util.ArrayList; import java.util.Collection; import java.util.LinkedList; import java.util.List; +import java.util.Map; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + import org.apache.rocketmq.connect.jdbc.Config; import org.apache.rocketmq.connect.jdbc.schema.Table; import org.apache.rocketmq.connect.jdbc.source.Querier; +import org.apache.rocketmq.connect.jdbc.source.TimestampIncrementingQuerier; import org.apache.rocketmq.connect.jdbc.schema.column.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.alibaba.fastjson.JSON; + import io.openmessaging.KeyValue; import io.openmessaging.connector.api.data.EntryType; import io.openmessaging.connector.api.data.Schema; @@ -49,26 +59,31 @@ public class JdbcSourceTask extends SourceTask { private Config config; - private List<Table> list = new LinkedList<>(); - - Querier querier = new Querier(); + BlockingQueue<Querier> tableQueue = new LinkedBlockingQueue<Querier>(); + static final String INCREMENTING_FIELD = "incrementing"; + static final String TIMESTAMP_FIELD = "timestamp"; + private Querier querier; @Override public Collection<SourceDataEntry> poll() { List<SourceDataEntry> res = new ArrayList<>(); try { - - JSONObject jsonObject = new JSONObject(); - jsonObject.put("nextQuery", "database"); - jsonObject.put("nextPosition", "10"); - //To be Continued + if (tableQueue.size() > 1) + querier = tableQueue.poll(1000, TimeUnit.MILLISECONDS); + else + querier = tableQueue.peek(); + Timer timer = new java.util.Timer(); + try { + Thread.currentThread(); + Thread.sleep(1000);//毫秒 + } catch (Exception e) { + throw e; + } querier.poll(); - log.info("querier.poll, start"); - int mm = 0; for (Table dataRow : querier.getList()) { - System.out.println(dataRow.getColList().get(0)); - log.info("xunhuankaishi"); - log.info("Received {} record: {} ", dataRow.getColList().get(0), mm++); + JSONObject jsonObject = new JSONObject(); + jsonObject.put("nextQuery", "database"); + jsonObject.put("nextPosition", "table"); Schema schema = new Schema(); schema.setDataSource(dataRow.getDatabase()); schema.setName(dataRow.getName()); @@ -80,37 +95,60 @@ public class JdbcSourceTask extends SourceTask { schema.getFields().add(field); } DataEntryBuilder dataEntryBuilder = new DataEntryBuilder(schema); - dataEntryBuilder.timestamp(System.currentTimeMillis()) - .queue(dataRow.getName()) - .entryType(EntryType.CREATE); + dataEntryBuilder.timestamp(System.currentTimeMillis()).queue(dataRow.getName()) + .entryType(EntryType.CREATE); for (int i = 0; i < dataRow.getColList().size(); i++) { Object value = dataRow.getDataList().get(i); - System.out.println(1); - System.out.println(dataRow.getColList().get(i) + "|" + value); - dataEntryBuilder.putFiled(dataRow.getColList().get(i), JSON.toJSONString(value)); + // System.out.println(dataRow.getColList().get(i) + "|" + value); + dataEntryBuilder.putFiled(dataRow.getColList().get(i), value); } + SourceDataEntry sourceDataEntry = dataEntryBuilder.buildSourceDataEntry( - ByteBuffer.wrap(config.jdbcUrl.getBytes("UTF-8")), - ByteBuffer.wrap(jsonObject.toJSONString().getBytes("UTF-8"))); + ByteBuffer.wrap(config.jdbcUrl.getBytes("UTF-8")), + ByteBuffer.wrap(jsonObject.toJSONString().getBytes("UTF-8"))); res.add(sourceDataEntry); + } } catch (Exception e) { log.error("JDBC task poll error, current config:" + JSON.toJSONString(config), e); } + log.info("dataEntry poll successfully,{}", res); return res; } @Override public void start(KeyValue props) { try { - this.config = new Config(); - this.config.load(props); - log.info("querier.start"); - querier.start(); - + config = new Config(); + config.load(props); } catch (Exception e) { - log.error("JDBC task start failed.", e); + log.error("Cannot start Jdbc Source Task because of configuration error{}", e); + } + Map<Map<String, String>, Map<String, Object>> offsets = null; + String mode = config.mode; + if (mode.equals("bulk")) { + Querier querier = new Querier(); + try { + querier.setConfig(config); + querier.start(); + tableQueue.add(querier); + } catch (Exception e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } else { + TimestampIncrementingQuerier querier = new TimestampIncrementingQuerier(); + try { + querier.setConfig(config); + querier.start(); + tableQueue.add(querier); + } catch (Exception e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + } @Override @@ -118,11 +156,13 @@ public class JdbcSourceTask extends SourceTask { querier.stop(); } - @Override public void pause() { + @Override + public void pause() { } - @Override public void resume() { + @Override + public void resume() { } } diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/source/JdbcUtils.java b/src/main/java/org/apache/rocketmq/connect/jdbc/source/JdbcUtils.java new file mode 100644 index 0000000..cbcca6a --- /dev/null +++ b/src/main/java/org/apache/rocketmq/connect/jdbc/source/JdbcUtils.java @@ -0,0 +1,198 @@ + +/** + * Copyright 2015 Confluent Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.rocketmq.connect.jdbc.source; +import org.apache.rocketmq.connect.jdbc.connector.JdbcSourceTask; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.sql.Statement; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Date; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.TimeZone; + +/** + * Utilties for interacting with a JDBC database. + */ +public class JdbcUtils { + + private static final Logger log = LoggerFactory.getLogger(JdbcSourceTask.class); + + /** + * The default table types to include when listing tables if none are specified. Valid values + * are those specified by the @{java.sql.DatabaseMetaData#getTables} method's TABLE_TYPE column. + * The default only includes standard, user-defined tables. + */ + public static final Set<String> DEFAULT_TABLE_TYPES = Collections.unmodifiableSet( + new HashSet<String>(Arrays.asList("TABLE")) + ); + + private static final int GET_TABLES_TYPE_COLUMN = 4; + private static final int GET_TABLES_NAME_COLUMN = 3; + + private static final int GET_COLUMNS_COLUMN_NAME = 4; + private static final int GET_COLUMNS_IS_NULLABLE = 18; + private static final int GET_COLUMNS_IS_AUTOINCREMENT = 23; + + + private static ThreadLocal<SimpleDateFormat> DATE_FORMATTER = new ThreadLocal<SimpleDateFormat>() { + @Override + protected SimpleDateFormat initialValue() { + SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); + sdf.setTimeZone(TimeZone.getTimeZone("UTC")); + return sdf; + } + }; + + /** + * Get a list of tables in the database. This uses the default filters, which only include + * user-defined tables. + * @param conn database connection + * @return a list of tables + * @throws SQLException + */ + public static List<String> getTables(Connection conn) throws SQLException { + return getTables(conn, DEFAULT_TABLE_TYPES); + } + + /** + * Get a list of table names in the database. + * @param conn database connection + * @param types a set of table types that should be included in the results + * @throws SQLException + */ + public static List<String> getTables(Connection conn, Set<String> types) throws SQLException { + DatabaseMetaData metadata = conn.getMetaData(); + ResultSet rs = metadata.getTables(null, null, "%", null); + List<String> tableNames = new ArrayList<String>(); + while (rs.next()) { + if (types.contains(rs.getString(GET_TABLES_TYPE_COLUMN))) { + String colName = rs.getString(GET_TABLES_NAME_COLUMN); + // SQLite JDBC driver does not correctly mark these as system tables + if (metadata.getDatabaseProductName().equals("SQLite") && colName.startsWith("sqlite_")) { + continue; + } + + tableNames.add(colName); + } + } + return tableNames; + } + + /** + * Look up the autoincrement column for the specified table. + * @param conn database connection + * @param table the table to + * @return the name of the column that is an autoincrement column, or null if there is no + * autoincrement column or more than one exists + * @throws SQLException + */ + public static String getAutoincrementColumn(Connection conn, String table) throws SQLException { + String result = null; + int matches = 0; + + ResultSet rs = conn.getMetaData().getColumns(null, null, table, "%"); + // Some database drivers (SQLite) don't include all the columns + if (rs.getMetaData().getColumnCount() >= GET_COLUMNS_IS_AUTOINCREMENT) { + while(rs.next()) { + if (rs.getString(GET_COLUMNS_IS_AUTOINCREMENT).equals("YES")) { + result = rs.getString(GET_COLUMNS_COLUMN_NAME); + matches++; + } + } + return (matches == 1 ? result : null); + } + + // Fallback approach is to query for a single row. This unfortunately does not work with any + // empty table + log.trace("Falling back to SELECT detection of auto-increment column for {}:{}", conn, table); + Statement stmt = conn.createStatement(); + try { + String quoteString = getIdentifierQuoteString(conn); + rs = stmt.executeQuery("SELECT * FROM " + quoteString + table + quoteString + " LIMIT 1"); + ResultSetMetaData rsmd = rs.getMetaData(); + for(int i = 1; i < rsmd.getColumnCount(); i++) { + if (rsmd.isAutoIncrement(i)) { + result = rsmd.getColumnName(i); + matches++; + } + } + } finally { + rs.close(); + stmt.close(); + } + return (matches == 1 ? result : null); + } + + public static boolean isColumnNullable(Connection conn, String table, String column) + throws SQLException { + ResultSet rs = conn.getMetaData().getColumns(null, null, table, column); + if (rs.getMetaData().getColumnCount() > GET_COLUMNS_IS_NULLABLE) { + // Should only be one match + if (!rs.next()) { + return false; + } + String val = rs.getString(GET_COLUMNS_IS_NULLABLE); + return rs.getString(GET_COLUMNS_IS_NULLABLE).equals("YES"); + } + + return false; + } + + /** + * Format the given Date assuming UTC timezone in a format supported by SQL. + * @param date the date to convert to a String + * @return the formatted string + */ + public static String formatUTC(Date date) { + return DATE_FORMATTER.get().format(date); + } + + /** + * Get the string used for quoting identifiers in this database's SQL dialect. + * @param connection the database connection + * @return the quote string + * @throws SQLException + */ + public static String getIdentifierQuoteString(Connection connection) throws SQLException { + String quoteString = connection.getMetaData().getIdentifierQuoteString(); + quoteString = quoteString == null ? "" : quoteString; + return quoteString; + } + + /** + * Quote the given string. + * @param orig the string to quote + * @param quote the quote character + * @return the quoted string + */ + public static String quoteString(String orig, String quote) { + return quote + orig + quote; + } +} + diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/source/Querier.java b/src/main/java/org/apache/rocketmq/connect/jdbc/source/Querier.java index 073a896..0907d40 100644 --- a/src/main/java/org/apache/rocketmq/connect/jdbc/source/Querier.java +++ b/src/main/java/org/apache/rocketmq/connect/jdbc/source/Querier.java @@ -13,27 +13,55 @@ import java.util.Map; import java.util.Properties; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; - import javax.sql.DataSource; - import org.apache.rocketmq.connect.jdbc.Config; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import com.alibaba.druid.pool.DruidDataSourceFactory; - import org.apache.rocketmq.connect.jdbc.schema.*; import org.apache.rocketmq.connect.jdbc.schema.column.ColumnParser; public class Querier { - static final String JDBC_DRIVER = "com.mysql.cj.jdbc.Driver"; + private final Logger log = LoggerFactory.getLogger(getClass()); // use concrete subclass protected String topicPrefix; protected String jdbcUrl; private final Queue<Connection> connections = new ConcurrentLinkedQueue<>(); - private Config config = new Config(); + private Config config; + + /** + * @return the config + */ + public Config getConfig() { + return config; + } + + public void setConfig(Config config) { + this.config = config; + log.info("config load successfully"); + } + private DataSource dataSource; private List<Table> list = new LinkedList<>(); + private String mode; + + + public DataSource getDataSource() { + return dataSource; + } + + public void setDataSource(DataSource dataSource) { + this.dataSource = dataSource; + } + + public String getMode() { + return mode; + } + + public void setMode(String mode) { + this.mode = mode; + } + public List<Table> getList() { return list; @@ -44,7 +72,6 @@ public class Querier { } public Connection getConnection() throws SQLException { - // These config names are the same for both source and sink configs ... String username = config.jdbcUsername; String dbPassword = config.jdbcPassword; @@ -76,7 +103,7 @@ public class Querier { protected PreparedStatement createDBPreparedStatement(Connection db) throws SQLException { String SQL = "select table_name,column_name,data_type,column_type,character_set_name " - + "from information_schema.columns " + "where table_schema = jdbc_db order by ORDINAL_POSITION"; + + "from information_schema.columns " + "where table_schema = jdbc_db order by ORDINAL_POSITION"; log.trace("Creating a PreparedStatement '{}'", SQL); PreparedStatement stmt = db.prepareStatement(SQL); @@ -96,30 +123,29 @@ public class Querier { return stmt.executeQuery(); } + private Schema schema; + public static void main(String[] args) throws Exception { - Querier querier = new Querier(); + TimestampIncrementingQuerier querier = new TimestampIncrementingQuerier(); try { querier.start(); querier.poll(); - } catch (SQLException e) { // TODO Auto-generated catch block e.printStackTrace(); } - } - private Schema schema; - public void poll() { try { PreparedStatement stmt; String query = "select * from "; Connection conn = dataSource.getConnection(); - for (Map.Entry<String, Database> entry : schema.dbMap.entrySet()) { String db = entry.getKey(); + if (!db.contains("jdbc_db")) + continue; Iterator<Map.Entry<String, Table>> iterator = entry.getValue().tableMap.entrySet().iterator(); while (iterator.hasNext()) { Map.Entry<String, Table> tableEntry = iterator.next(); @@ -155,7 +181,13 @@ public class Querier { } public void start() throws Exception { - initDataSource(); + try { + + log.info("datasorce success"); + initDataSource(); + } catch (Throwable exception) { + log.info("error,{}", exception); + } schema = new Schema(dataSource); schema.load(); log.info("schema load successful"); @@ -163,9 +195,10 @@ public class Querier { private void initDataSource() throws Exception { Map<String, String> map = new HashMap<>(); + map.put("driverClassName", "com.mysql.cj.jdbc.Driver"); map.put("url", - "jdbc:mysql://" + config.jdbcUrl + "?useSSL=true&verifyServerCertificate=false&serverTimezone=GMT%2B8"); + "jdbc:mysql://" + config.jdbcUrl + "?useSSL=true&verifyServerCertificate=false&serverTimezone=GMT%2B8"); map.put("username", config.jdbcUsername); map.put("password", config.jdbcPassword); map.put("initialSize", "2"); @@ -175,9 +208,15 @@ public class Querier { map.put("minEvictableIdleTimeMillis", "300000"); map.put("validationQuery", "SELECT 1 FROM DUAL"); map.put("testWhileIdle", "true"); - log.info("{},config read successful", map); - dataSource = DruidDataSourceFactory.createDataSource(map); - + log.info("{} config read successful", map); + try { + dataSource = DruidDataSourceFactory.createDataSource(map); + } catch (Exception exception) { + log.info("exeception,{}", exception); + } catch (Error e) { + log.info("error,{},e", e); + } + log.info("datasorce success"); } } diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/source/TimestampIncrementingQuerier.java b/src/main/java/org/apache/rocketmq/connect/jdbc/source/TimestampIncrementingQuerier.java new file mode 100644 index 0000000..1dadc4f --- /dev/null +++ b/src/main/java/org/apache/rocketmq/connect/jdbc/source/TimestampIncrementingQuerier.java @@ -0,0 +1,315 @@ +package org.apache.rocketmq.connect.jdbc.source; + +import java.sql.Connection; +import java.sql.Date; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Timestamp; +import java.util.Calendar; +import java.util.Collections; +import java.util.GregorianCalendar; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.TimeZone; +import java.util.concurrent.ConcurrentLinkedQueue; + +import javax.sql.DataSource; + +import org.apache.rocketmq.connect.jdbc.Config; +import org.apache.rocketmq.connect.jdbc.schema.Database; +import org.apache.rocketmq.connect.jdbc.schema.Schema; +import org.apache.rocketmq.connect.jdbc.schema.Table; +import org.apache.rocketmq.connect.jdbc.schema.column.ColumnParser; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import com.alibaba.druid.pool.DruidDataSourceFactory; + + +public class TimestampIncrementingQuerier extends Querier { + protected PreparedStatement stmt; + static final String JDBC_DRIVER = "com.mysql.cj.jdbc.Driver"; + protected String jdbcUrl; + + private Config config; + private DataSource dataSource; + private static final Logger log = LoggerFactory.getLogger(TimestampIncrementingQuerier.class); + private List<Table> list = new LinkedList<>(); + private HashMap<String, Long> incrementingMap; + private HashMap<String, Timestamp> timestampMap; + private static final Calendar UTC_CALENDAR = new GregorianCalendar(TimeZone.getTimeZone("UTC+8")); + private String timestampColumn = ""; + static final String INCREMENTING_FIELD = "incrementing"; + static final String TIMESTAMP_FIELD = "timestamp"; + private Map<String, Long> offset; + private Long timestampOffset; + private String incrementingColumn = ""; + private Map<String, String> partition; + private Schema schema; + + public String getTimestampColumn() { + return timestampColumn; + } + + public void setTimestampColumn(String timestampColumn) { + this.timestampColumn = timestampColumn; + } + + public String getIncrementingColumn() { + return incrementingColumn; + } + + public void setIncrementingColumn(String incrementingColumn) { + this.incrementingColumn = incrementingColumn; + } + + private Long incrementingOffset = null; + private String name; + + public void extractRecord(String name) throws SQLException { + if (incrementingColumn != null) { + log.info("{}", name); + incrementingMap.put(name, incrementingOffset); + } + if (timestampColumn != null) { + timestampMap.put(name, new Timestamp(timestampOffset)); + } + } + + @SuppressWarnings("deprecation") + public void storeRecord(String name) throws SQLException { + offset = new HashMap<>(); + if (incrementingColumn != null) { + Long id = 0L; + + if (incrementingMap.containsKey(name)) { + id = incrementingMap.get(name); + System.out.println("read incrementingMap" + id); + } + assert (incrementingOffset == null || id > incrementingOffset) || timestampColumn != null; + incrementingOffset = id; + offset.put(INCREMENTING_FIELD, id); + } + if (timestampColumn != null) { + Timestamp timestamp = new Timestamp(0); + if (timestampMap.containsKey(name)) + timestamp = timestampMap.get(name); + + System.out.println("read timestampColumn" + timestamp.toString()); + timestampOffset = timestamp.getTimezoneOffset() + timestamp.getTime(); + System.out.println("read" + new Timestamp(timestampOffset)); + offset.put(TIMESTAMP_FIELD, timestampOffset); + } + log.info("{}store", new Timestamp(timestampOffset)); + partition = Collections.singletonMap("table", name); + } + + protected void createPreparedStatement(Connection conn) throws SQLException { + // Default when unspecified uses an autoincrementing column + if (incrementingColumn != null && incrementingColumn.isEmpty()) { + incrementingColumn = JdbcUtils.getAutoincrementColumn(conn, name); + } + + String quoteString = conn.getMetaData().getIdentifierQuoteString(); + StringBuilder builder = new StringBuilder(); + builder.append("SELECT * FROM "); + builder.append(name); + + quoteString = quoteString == null ? "" : quoteString; + + if (incrementingColumn != null && timestampColumn != null) { + // This version combines two possible conditions. The first checks timestamp == + // last + // timestamp and incrementing > last incrementing. The timestamp alone would + // include + // duplicates, but adding the incrementing condition ensures no duplicates, e.g. + // you would + // get only the row with id = 23: + // timestamp 1234, id 22 <- last + // timestamp 1234, id 23 + // The second check only uses the timestamp >= last timestamp. This covers + // everything new, + // even if it is an update of the existing row. If we previously had: + // timestamp 1234, id 22 <- last + // and then these rows were written: + // timestamp 1235, id 22 + // timestamp 1236, id 23 + // We should capture both id = 22 (an update) and id = 23 (a new row) + String timeString = quoteString + timestampColumn + quoteString; + String incrString = quoteString + incrementingColumn + quoteString; + builder.append(" WHERE "); + builder.append(timeString); + builder.append(" < CURRENT_TIMESTAMP AND (("); + builder.append(timeString); + builder.append(" = ? AND "); + builder.append(incrString); + builder.append(" > ?"); + builder.append(") OR "); + builder.append(timeString); + builder.append(" > ?)"); + builder.append(" ORDER BY "); + builder.append(timeString); + builder.append(","); + builder.append(incrString); + builder.append(" ASC"); + + } else if (incrementingColumn != null) { + String incrString = quoteString + incrementingColumn + quoteString; + builder.append(" WHERE "); + builder.append(incrString); + builder.append(" > ?"); + builder.append(" ORDER BY "); + builder.append(incrString); + builder.append(" ASC"); + } else if (timestampColumn != null) { + String timeString = quoteString + timestampColumn + quoteString; + builder.append(" WHERE "); + builder.append(timeString); + builder.append(" > ? AND "); + builder.append(timeString); + builder.append(" < CURRENT_TIMESTAMP ORDER BY "); + builder.append(timeString); + builder.append(" ASC"); + } + String queryString = builder.toString(); + stmt = conn.prepareStatement(queryString); + log.info(queryString); + } + + public static void main(String[] args) throws Exception { + TimestampIncrementingQuerier querier = new TimestampIncrementingQuerier(); + try { + querier.start(); + querier.poll(); + } catch (SQLException e) { + e.printStackTrace(); + } + } + + protected ResultSet executeQuery() throws SQLException { + if (incrementingColumn != null && timestampColumn != null) { + Timestamp ts = new Timestamp(timestampOffset == null ? 0 : timestampOffset); + stmt.setTimestamp(1, ts); + stmt.setLong(2, (incrementingOffset == null ? -1 : incrementingOffset)); + stmt.setTimestamp(3, ts); + } else if (incrementingColumn != null) { + stmt.setLong(1, (incrementingOffset == null ? -1 : incrementingOffset)); + } else if (timestampColumn != null) { + Timestamp ts = new Timestamp(timestampOffset == null ? 0 : timestampOffset); + stmt.setTimestamp(1, ts); + } + log.info("{}·", stmt); + log.info("{},{}", incrementingOffset, timestampOffset); + return stmt.executeQuery(); + } + + public List<Table> getList() { + return list; + } + + public void setList(List<Table> list) { + this.list = list; + } + + public void poll() { + try { + list.clear(); + Connection conn = dataSource.getConnection(); + for (Map.Entry<String, Database> entry : schema.dbMap.entrySet()) { + String db = entry.getKey(); + if (!db.contains("time_db")) + continue; + log.info("{} database is loading", db); + Iterator<Map.Entry<String, Table>> iterator = entry.getValue().tableMap.entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry<String, Table> tableEntry = iterator.next(); + String tb = tableEntry.getKey(); + log.info("{} table is loading", tb); + name = db + "." + tb; + storeRecord(name); + createPreparedStatement(conn); + ResultSet rs; + rs = executeQuery(); + List<String> colList = tableEntry.getValue().getColList(); + List<String> DataTypeList = tableEntry.getValue().getRawDataTypeList(); + List<ColumnParser> ParserList = tableEntry.getValue().getParserList(); + + while (rs.next()) { + Table table = new Table(db, tb); + System.out.print("|"); + table.setColList(colList); + table.setRawDataTypeList(DataTypeList); + table.setParserList(ParserList); + for (String string : colList) { + table.getDataList().add(rs.getObject(string)); + System.out.print(string + " : " + rs.getObject(string) + "|"); + } + incrementingOffset = incrementingOffset > rs.getInt(incrementingColumn) ? incrementingOffset + : rs.getInt(incrementingColumn); + timestampOffset = timestampOffset > rs.getTimestamp(timestampColumn).getTime() ? timestampOffset + : rs.getTimestamp(timestampColumn).getTime(); + System.out.println(timestampOffset); + list.add(table); + System.out.println(); + } + extractRecord(name); + incrementingOffset = 0L; + timestampOffset = 0L; + } + } + conn.close(); + } catch (SQLException e) { + e.printStackTrace(); + } + } + + public void start() throws Exception { + try { + initDataSource(); + if (incrementingColumn != null && timestampColumn != null) { + incrementingMap = new HashMap<>(); + timestampMap = new HashMap<>(); + } else if (incrementingColumn != null) { + incrementingMap = new HashMap<>(); + } else if (timestampColumn != null) { + timestampMap = new HashMap<>(); + } + } catch (Throwable exception) { + log.info("error,{}", exception); + } + schema = new Schema(dataSource); + schema.load(); + } + + private void initDataSource() throws Exception { + Map<String, String> map = new HashMap<>(); + config = super.getConfig(); + timestampColumn = config.getTimestampColmnName(); + incrementingColumn = config.getIncrementingColumnName(); + map.put("driverClassName", "com.mysql.cj.jdbc.Driver"); + map.put("url", + "jdbc:mysql://" + config.jdbcUrl + "?useSSL=true&verifyServerCertificate=false&serverTimezone=GMT%2B8"); + map.put("username", config.jdbcUsername); + map.put("password", config.jdbcPassword); + map.put("initialSize", "2"); + map.put("maxActive", "2"); + map.put("maxWait", "60000"); + map.put("timeBetweenEvictionRunsMillis", "60000"); + map.put("minEvictableIdleTimeMillis", "300000"); + map.put("validationQuery", "SELECT 1 FROM DUAL"); + map.put("testWhileIdle", "true"); + log.info("{}config read successfully", map); + try { + dataSource = DruidDataSourceFactory.createDataSource(map); + } catch (Exception exception) { + log.info("exeception,{}", exception); + } catch (Error error) { + log.info("error,{}", error); + } + log.info("datasorce success"); + } +} diff --git a/src/test/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTaskTest.java b/src/test/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTaskTest.java index f9c8c6f..429494b 100644 --- a/src/test/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTaskTest.java +++ b/src/test/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTaskTest.java @@ -16,29 +16,88 @@ */ package org.apache.rocketmq.connect.jdbc.connector; + import java.util.Collection; -import org.junit.Test; +import java.util.HashMap; +import java.util.Map; + +import javax.sql.DataSource; +import org.junit.Test; +import java.sql.*; +import com.alibaba.druid.pool.DruidDataSourceFactory; import io.openmessaging.KeyValue; +import io.openmessaging.connector.api.Task; import io.openmessaging.connector.api.data.SourceDataEntry; import io.openmessaging.internal.DefaultKeyValue; public class JdbcSourceTaskTest { + KeyValue kv; + DataSource dataSource; + + @Test + public void testBulk() throws InterruptedException { + KeyValue kv = new DefaultKeyValue(); + kv.put("jdbcUrl", "localhost:3306"); + kv.put("jdbcUsername", "root"); + kv.put("jdbcPassword", "199812160"); + kv.put("mode", "bulk"); + kv.put("rocketmqTopic", "JdbcTopic"); + JdbcSourceTask task = new JdbcSourceTask(); + task.start(kv); + Collection<SourceDataEntry> sourceDataEntry = task.poll(); + System.out.println(sourceDataEntry); + } + + @Test + public void testTimestampIncrementing() throws InterruptedException, SQLException { + kv = new DefaultKeyValue(); + kv.put("jdbcUrl", "localhost:3306"); + kv.put("jdbcUsername", "root"); + kv.put("jdbcPassword", "199812160"); + kv.put("incrementingColumnName", "id"); + kv.put("timestampColmnName", "timestamp"); + kv.put("mode", "incrementing+timestamp"); + kv.put("rocketmqTopic", "JdbcTopic"); + JdbcSourceTask task = new JdbcSourceTask(); + task.start(kv); + Collection<SourceDataEntry> sourceDataEntry = task.poll(); + System.out.println(sourceDataEntry); + Map<String, String> map = new HashMap<>(); + map.put("driverClassName", "com.mysql.cj.jdbc.Driver"); + map.put("url", "jdbc:mysql://" + kv.getString("jdbcUrl") + + "?useSSL=true&verifyServerCertificate=false&serverTimezone=GMT%2B8"); + map.put("username", kv.getString("jdbcUsername")); + map.put("password", kv.getString("jdbcPassword")); + map.put("initialSize", "2"); + map.put("maxActive", "2"); + map.put("maxWait", "60000"); + map.put("timeBetweenEvictionRunsMillis", "60000"); + map.put("minEvictableIdleTimeMillis", "300000"); + map.put("validationQuery", "SELECT 1 FROM DUAL"); + map.put("testWhileIdle", "true"); + try { + dataSource = DruidDataSourceFactory.createDataSource(map); + } catch (Exception e) { + e.printStackTrace(); + } + Connection connection= dataSource.getConnection(); + PreparedStatement statement; + String s="insert into time_db.timestamp_tb (name) values(\"test\")"; + statement=connection.prepareStatement(s); + statement.executeUpdate(); - @Test - public void test() throws InterruptedException { - KeyValue kv = new DefaultKeyValue(); - kv.put("jdbcUrl","localhost:3306"); - kv.put("jdbcUsername","root"); - kv.put("jdbcPassword","199812160"); - kv.put("mode","bulk"); - kv.put("rocketmqTopic","JdbcTopic"); - JdbcSourceTask task = new JdbcSourceTask(); - task.start(kv); - Collection<SourceDataEntry> sourceDataEntry = task.poll(); - System.out.println(sourceDataEntry); - - } + sourceDataEntry = task.poll(); + System.out.println(sourceDataEntry); + s="update time_db.timestamp_tb set name=\"liu\" where id < 2"; + statement=connection.prepareStatement(s); + statement.executeUpdate(); + sourceDataEntry = task.poll(); + System.out.println(sourceDataEntry); + task.stop(); + + connection.close(); + } }
