This is an automated email from the ASF dual-hosted git repository. zhoubo pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
commit 488223c3aa3d15ce4676e6442d894cb99b06275e Author: yuchenlichuck <[email protected]> AuthorDate: Fri Aug 9 12:04:51 2019 +0800 develop the jdbcsource connector --- README.md | 43 ++- pom.xml | 8 + .../org/apache/rocketmq/connect/jdbc/Config.java | 52 ++-- .../jdbc/connector/JdbcSourceConnector.java | 12 +- .../connect/jdbc/connector/JdbcSourceTask.java | 39 +-- .../rocketmq/connect/jdbc/schema/Database.java | 4 +- .../rocketmq/connect/jdbc/schema/Schema.java | 4 +- .../apache/rocketmq/connect/jdbc/schema/Table.java | 36 +-- .../rocketmq/connect/jdbc/source/Querier.java | 314 ++++++++++----------- 9 files changed, 270 insertions(+), 242 deletions(-) diff --git a/README.md b/README.md index 96da884..32da4b5 100644 --- a/README.md +++ b/README.md @@ -149,34 +149,55 @@ httpPort=8081 看到日志目录查看connect_runtime.log -如果看到以下日志说明runttiime启动成功了 - -2019-07-16 10:56:24 INFO RebalanceService - RebalanceService service started -2019-07-16 10:56:24 INFO main - The worker [DEFAULT_WORKER_1] boot success. - -2、启动sourceConnector - - 正在做测试(To be continued)已实现Bulk Mode +windows用户可以用CMD到程序根目录下再输入: +``` cd target/distribution/ java -cp .;./conf/;./lib/* org.apache.rocketmq.connect.runtime.ConnectStartup -c conf/connect.conf +``` +如果看到以下日志说明runttiime启动成功了 +2019-07-16 10:56:24 INFO RebalanceService - RebalanceService service started +2019-07-16 10:56:24 INFO main - The worker [DEFAULT_WORKER_1] boot success. + +2、启动sourceConnector -在http中输入Get 请求 +``` +1、git clone https://github.com/apache/rocketmq-externals.git +2、cd rocketmq-externals/rocketmq-connect-jdbc +3、mvn -Dmaven.test.skip=true package -示例 +``` -[http://127.0.0.1:8085/connectors/testSourceConnector1?config={"connector-class":"org.apache.rocketmq.connect.jdbc.connector.JdbcSourceConnector","jdbcUrl":"127.0.0.1:3306","jdbcUsername":"root","jdbcPassword":"123456","task-class":"org.apache.rocketmq.connect.jdbc.connector.JdbcSourceConnector","rocketmqTopic":"jdbcTopic","mode":"bulk","source-record-converter":"org.apache.rocketmq.connect.runtime.converter.JsonConverter"}](http://127.0.0.1:8085/connectors/testSourceConnector1?config={% [...] +- 复制第三方jar至target +``` +mvn dependency:copy-dependencies +``` +已实现Bulk查询方法,在http中输入Get 请求(目前仅适配过MYSQL) +```http +http://127.0.0.1:8081/connectors/testSourceConnector1?config={"connector-class":"org.apache.rocketmq.connect.jdbc.connector.JdbcSourceConnector","jdbcUrl":"127.0.0.1:3306","jdbcUsername":"root","jdbcPassword":"123456","task-class":"org.apache.rocketmq.connect.jdbc.connector.JdbcSourceConnector","rocketmqTopic":"jdbcTopic","mode":"bulk","source-record-converter":"org.apache.rocketmq.connect.runtime.converter.JsonConverter"} +``` +看到一下日志说明Jdbc source connector启动成功了 +2019-08-09 11:33:22 INFO RebalanceService - JdbcSourceConnector verifyAndSetConfig enter +2019-08-09 11:33:23 INFO pool-9-thread-1 - Config.load.start +2019-08-09 11:33:23 INFO pool-9-thread-1 - querier.start +2019-08-09 11:33:23 INFO pool-9-thread-1 - {password=199812160, validationQuery=SELECT 1 FROM DUAL, testWhileIdle=true, timeBetweenEvictionRunsMillis=60000, minEvictableIdleTimeMillis=300000, initialSize=2, driverClassName=com.mysql.cj.jdbc.Driver, maxWait=60000, url=jdbc:mysql://localhost:3306?useSSL=true&verifyServerCertificate=false&serverTimezone=GMT%2B8, username=root, maxActive=2},config read successful +2019-08-09 11:33:24 INFO RebalanceService - JdbcSourceConnector verifyAndSetConfig enter +2019-08-09 11:33:25 INFO pool-9-thread-1 - {dataSource-1} inited +2019-08-09 11:33:27 INFO pool-9-thread-1 - schema load successful +2019-08-09 11:33:27 INFO pool-9-thread-1 - querier.poll +3、启动sinkConnector +To Be Continued. \ No newline at end of file diff --git a/pom.xml b/pom.xml index 830f9ed..1d708f3 100644 --- a/pom.xml +++ b/pom.xml @@ -54,6 +54,14 @@ <artifactId>clirr-maven-plugin</artifactId> <version>2.7</version> </plugin> + <plugin> + <artifactId>maven-dependency-plugin</artifactId> + <configuration> + <outputDirectory>${project.build.directory}/lib</outputDirectory> + <excludeTransitive>false</excludeTransitive> + <stripVersion>true</stripVersion> + </configuration> + </plugin> <plugin> <artifactId>maven-compiler-plugin</artifactId> <version>3.6.1</version> diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/Config.java b/src/main/java/org/apache/rocketmq/connect/jdbc/Config.java index 4f7456b..533d53b 100644 --- a/src/main/java/org/apache/rocketmq/connect/jdbc/Config.java +++ b/src/main/java/org/apache/rocketmq/connect/jdbc/Config.java @@ -25,56 +25,56 @@ import java.lang.reflect.Method; import java.util.HashSet; import java.util.List; import java.util.Set; + public class Config { @SuppressWarnings("serial") private static final Logger LOG = LoggerFactory.getLogger(Config.class); /* Database Connection Config */ - public String jdbcUrl="localhost:3306"; - public String jdbcUsername="root"; - public String jdbcPassword="199812160"; + public String jdbcUrl = "localhost:3306"; + public String jdbcUsername = "root"; + public String jdbcPassword = "199812160"; public String rocketmqTopic; public String jdbcBackoff; public String jdbcAttempts; - public String catalogPattern=null; + public String catalogPattern = null; public List tableWhitelist; public List tableBlacklist; - public String schemaPattern=null; - public boolean numericPrecisionMapping=false; - public String bumericMapping=null; - public String dialectName=""; + public String schemaPattern = null; + public boolean numericPrecisionMapping = false; + public String bumericMapping = null; + public String dialectName = ""; /* Mode Config */ - public String mode=""; - public String incrementingColumnName= ""; - public String query=""; - public String timestampColmnName=""; - public boolean validateNonNull=true; + public String mode = ""; + public String incrementingColumnName = ""; + public String query = ""; + public String timestampColmnName = ""; + public boolean validateNonNull = true; /*Connector config*/ - public String tableTypes="table"; - public long pollInterval=5000; - public int batchMaxRows=100; - public long tablePollInterval=60000; - public long timestampDelayInterval=0; - public String dbTimezone="UTC"; + public String tableTypes = "table"; + public long pollInterval = 5000; + public int batchMaxRows = 100; + public long tablePollInterval = 60000; + public long timestampDelayInterval = 0; + public String dbTimezone = "UTC"; public String queueName; private Logger log = LoggerFactory.getLogger(Config.class); public static final Set<String> REQUEST_CONFIG = new HashSet<String>() { { - // add("jdbcUrl"); - // add("jdbcUsername"); - // add("jdbcPassword"); - // add("mode"); - // add("rocketmqTopic"); + add("jdbcUrl"); + add("jdbcUsername"); + add("jdbcPassword"); + // add("mode"); + // add("rocketmqTopic"); } }; - public void load(KeyValue props) { - log.info("Config.load.start"); + log.info("Config.load.start"); properties2Object(props, this); } diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConnector.java b/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConnector.java index 8c30a62..bdbeb8b 100644 --- a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConnector.java +++ b/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConnector.java @@ -37,7 +37,7 @@ public class JdbcSourceConnector extends SourceConnector { @Override public String verifyAndSetConfig(KeyValue config) { - log.info("1216123 JdbcSourceConnector verifyAndSetConfig enter"); + log.info("JdbcSourceConnector verifyAndSetConfig enter"); for (String requestKey : Config.REQUEST_CONFIG) { if (!config.containsKey(requestKey)) { @@ -45,7 +45,7 @@ public class JdbcSourceConnector extends SourceConnector { } } this.config = config; - + return ""; } @@ -68,13 +68,13 @@ public class JdbcSourceConnector extends SourceConnector { } @Override - public Class<? extends Task> taskClass(){ - return JdbcSourceTask.class; - } + public Class<? extends Task> taskClass() { + return JdbcSourceTask.class; + } @Override public List<KeyValue> taskConfigs() { - log.info("List.start"); + log.info("List.start"); List<KeyValue> config = new ArrayList<>(); config.add(this.config); return config; diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java b/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java index 78f1809..91659ec 100644 --- a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java +++ b/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java @@ -18,6 +18,7 @@ */ package org.apache.rocketmq.connect.jdbc.connector; + import io.openmessaging.connector.api.source.SourceTask; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -47,32 +48,32 @@ public class JdbcSourceTask extends SourceTask { private static final Logger log = LoggerFactory.getLogger(JdbcSourceTask.class); private Config config; - - private List<Table> list=new LinkedList<>(); - - Querier querier = new Querier(); + + private List<Table> list = new LinkedList<>(); + + Querier querier = new Querier(); + @Override public Collection<SourceDataEntry> poll() { List<SourceDataEntry> res = new ArrayList<>(); try { - + JSONObject jsonObject = new JSONObject(); jsonObject.put("nextQuery", "database"); jsonObject.put("nextPosition", "10"); - //To be Continued - log.info("querier.poll"); + //To be Continued querier.poll(); - log.info("1216connector.start"); - int mm=0; - for(Table dataRow : querier.getList()){ - System.out.println(dataRow.getColList().get(0)); - log.info("xunhuankaishi"); + log.info("querier.poll, start"); + int mm = 0; + for (Table dataRow : querier.getList()) { + System.out.println(dataRow.getColList().get(0)); + log.info("xunhuankaishi"); log.info("Received {} record: {} ", dataRow.getColList().get(0), mm++); - Schema schema = new Schema(); + Schema schema = new Schema(); schema.setDataSource(dataRow.getDatabase()); schema.setName(dataRow.getName()); schema.setFields(new ArrayList<>()); - for(int i = 0; i < dataRow.getColList().size(); i++){ + for (int i = 0; i < dataRow.getColList().size(); i++) { String columnName = dataRow.getColList().get(i); String rawDataType = dataRow.getRawDataTypeList().get(i); Field field = new Field(i, columnName, ColumnParser.mapConnectorFieldType(rawDataType)); @@ -82,10 +83,10 @@ public class JdbcSourceTask extends SourceTask { dataEntryBuilder.timestamp(System.currentTimeMillis()) .queue(dataRow.getName()) .entryType(EntryType.CREATE); - for(int i = 0; i < dataRow.getColList().size(); i++){ - Object value=dataRow.getDataList().get(i); + for (int i = 0; i < dataRow.getColList().size(); i++) { + Object value = dataRow.getDataList().get(i); System.out.println(1); - System.out.println(dataRow.getColList().get(i)+"|"+value); + System.out.println(dataRow.getColList().get(i) + "|" + value); dataEntryBuilder.putFiled(dataRow.getColList().get(i), JSON.toJSONString(value)); } SourceDataEntry sourceDataEntry = dataEntryBuilder.buildSourceDataEntry( @@ -104,8 +105,8 @@ public class JdbcSourceTask extends SourceTask { try { this.config = new Config(); this.config.load(props); - log.info("querier.start"); - querier.start(); + log.info("querier.start"); + querier.start(); } catch (Exception e) { log.error("JDBC task start failed.", e); diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/schema/Database.java b/src/main/java/org/apache/rocketmq/connect/jdbc/schema/Database.java index b88661d..657ebb8 100644 --- a/src/main/java/org/apache/rocketmq/connect/jdbc/schema/Database.java +++ b/src/main/java/org/apache/rocketmq/connect/jdbc/schema/Database.java @@ -18,6 +18,7 @@ package org.apache.rocketmq.connect.jdbc.schema; //import io.openmessaging.mysql.binlog.EventProcessor; + import org.apache.rocketmq.connect.jdbc.schema.column.ColumnParser; import java.sql.Connection; import java.sql.PreparedStatement; @@ -36,6 +37,7 @@ public class Database { private String name; private DataSource dataSource; public Map<String, Table> tableMap = new HashMap<String, Table>(); + public Database(String name, DataSource dataSource) { this.name = name; this.dataSource = dataSource; @@ -59,7 +61,7 @@ public class Database { String dataType = rs.getString(3); String colType = rs.getString(4); String charset = rs.getString(5); - + ColumnParser columnParser = ColumnParser.getColumnParser(dataType, colType, charset); if (!tableMap.containsKey(tableName)) { diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/schema/Schema.java b/src/main/java/org/apache/rocketmq/connect/jdbc/schema/Schema.java index 6ce6621..7434bbc 100644 --- a/src/main/java/org/apache/rocketmq/connect/jdbc/schema/Schema.java +++ b/src/main/java/org/apache/rocketmq/connect/jdbc/schema/Schema.java @@ -116,8 +116,8 @@ public class Schema { load(); break; } catch (Exception e) { - // LOGGER.error("Reload schema error.", e); - System.out.println("Reload schema error."+e); + // LOGGER.error("Reload schema error.", e); + System.out.println("Reload schema error." + e); } } } diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/schema/Table.java b/src/main/java/org/apache/rocketmq/connect/jdbc/schema/Table.java index c0d793d..8c9a42d 100644 --- a/src/main/java/org/apache/rocketmq/connect/jdbc/schema/Table.java +++ b/src/main/java/org/apache/rocketmq/connect/jdbc/schema/Table.java @@ -28,7 +28,7 @@ public class Table { private List<String> colList = new LinkedList<>(); private List<ColumnParser> parserList = new LinkedList<>(); private List<String> rawDataTypeList = new LinkedList<>(); - private List<Object> dataList =new LinkedList<>(); + private List<Object> dataList = new LinkedList<>(); public Table(String database, String table) { this.database = database; @@ -40,18 +40,18 @@ public class Table { } public void setParserList(List<ColumnParser> parserList) { - this.parserList = parserList; - } + this.parserList = parserList; + } - public void setRawDataTypeList(List<String> rawDataTypeList) { - this.rawDataTypeList = rawDataTypeList; - } + public void setRawDataTypeList(List<String> rawDataTypeList) { + this.rawDataTypeList = rawDataTypeList; + } - public void addParser(ColumnParser columnParser) { + public void addParser(ColumnParser columnParser) { parserList.add(columnParser); } - public void addRawDataType(String rawDataType){ + public void addRawDataType(String rawDataType) { this.rawDataTypeList.add(rawDataType); } @@ -75,16 +75,16 @@ public class Table { return parserList; } - public List<Object> getDataList() { - return dataList; - } + public List<Object> getDataList() { + return dataList; + } + + public void setDataList(List<Object> dataList) { + this.dataList = dataList; + } - public void setDataList(List<Object> dataList) { - this.dataList = dataList; - } + public void setColList(List<String> colList) { + this.colList = colList; + } - public void setColList(List<String> colList) { - this.colList = colList; - } - } \ No newline at end of file diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/source/Querier.java b/src/main/java/org/apache/rocketmq/connect/jdbc/source/Querier.java index 1f630ea..3d5b2c6 100644 --- a/src/main/java/org/apache/rocketmq/connect/jdbc/source/Querier.java +++ b/src/main/java/org/apache/rocketmq/connect/jdbc/source/Querier.java @@ -22,168 +22,164 @@ import org.slf4j.LoggerFactory; import com.alibaba.druid.pool.DruidDataSourceFactory; - import org.apache.rocketmq.connect.jdbc.schema.*; import org.apache.rocketmq.connect.jdbc.schema.column.ColumnParser; public class Querier { - static final String JDBC_DRIVER = "com.mysql.cj.jdbc.Driver"; - private final Logger log = LoggerFactory.getLogger(getClass()); // use concrete subclass - protected String topicPrefix; - protected String jdbcUrl; - private final Queue<Connection> connections = new ConcurrentLinkedQueue<>(); - private Config config = new Config(); - private DataSource dataSource; - private List<Table> list=new LinkedList<>(); - - - - public List<Table> getList() { - return list; - } - - public void setList(List<Table> list) { - this.list = list; - } - - public Connection getConnection() throws SQLException { - - // These config names are the same for both source and sink configs ... - String username = config.jdbcUsername; - String dbPassword = config.jdbcPassword; - jdbcUrl = config.jdbcUrl; - Properties properties = new Properties(); - if (username != null) { - properties.setProperty("user", username); - } - if (dbPassword != null) { - properties.setProperty("password", dbPassword); - } - Connection connection = DriverManager.getConnection(jdbcUrl, properties); - - connections.add(connection); - return connection; - } - - public void stop() { - Connection conn; - while ((conn = connections.poll()) != null) { - try { - conn.close(); - } catch (Throwable e) { - log.warn("Error while closing connection to {}", "jdbc", e); - } - } - } - - protected PreparedStatement createDBPreparedStatement(Connection db) throws SQLException { - - String SQL = "select table_name,column_name,data_type,column_type,character_set_name " - + "from information_schema.columns " + "where table_schema = jdbc_db order by ORDINAL_POSITION"; - - log.trace("Creating a PreparedStatement '{}'", SQL); - PreparedStatement stmt = db.prepareStatement(SQL); - return stmt; - - } - - protected PreparedStatement createPreparedStatement(Connection db, String string) throws SQLException { - String query = "select * from " + string; - log.trace("Creating a PreparedStatement '{}'", query); - PreparedStatement stmt = db.prepareStatement(query); - return stmt; - - } - - protected ResultSet executeQuery(PreparedStatement stmt) throws SQLException { - return stmt.executeQuery(); - } - - public static void main(String[] args) throws Exception { - Querier querier = new Querier(); - try { - querier.start(); - querier.poll(); - - } catch (SQLException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - - } - - private Schema schema; - - - public void poll() { - try { - - PreparedStatement stmt; - String query = "select * from "; - Connection conn = dataSource.getConnection(); - - for (Map.Entry<String, Database> entry : schema.dbMap.entrySet()) { - String db = entry.getKey(); - if(!db.contains("jdbc_db")) - continue; - Iterator<Map.Entry<String, Table>> iterator = entry.getValue().tableMap.entrySet().iterator(); - while (iterator.hasNext()) { - Map.Entry<String, Table> tableEntry = iterator.next(); - String tb=tableEntry.getKey(); - stmt = conn.prepareStatement(query+db + "." +tb); - ResultSet rs; - rs = stmt.executeQuery(); - List<String> colList = tableEntry.getValue().getColList(); - List<String> DataTypeList = tableEntry.getValue().getRawDataTypeList(); - List<ColumnParser> ParserList = tableEntry.getValue().getParserList(); - - while(rs.next()) { - Table table=new Table(db, tb); - System.out.print("|"); - table.setColList(colList); - table.setRawDataTypeList(DataTypeList); - table.setParserList(ParserList); - - for (String string : colList) { - table.getDataList().add(rs.getObject(string)); - System.out.print(string+" : "+rs.getObject(string)+"|"); - } - list.add(table); - System.out.println(); - } - } - } - - } catch (SQLException e) { - e.printStackTrace(); - } - - } - - public void start() throws Exception { - initDataSource(); - schema = new Schema(dataSource); - schema.load(); - log.info("schema load successful"); - } - - private void initDataSource() throws Exception { - Map<String, String> map = new HashMap<>(); - map.put("driverClassName", "com.mysql.cj.jdbc.Driver"); - map.put("url", - "jdbc:mysql://" + config.jdbcUrl + "?useSSL=true&verifyServerCertificate=false&serverTimezone=GMT%2B8"); - map.put("username", config.jdbcUsername); - map.put("password", config.jdbcPassword); - map.put("initialSize", "2"); - map.put("maxActive", "2"); - map.put("maxWait", "60000"); - map.put("timeBetweenEvictionRunsMillis", "60000"); - map.put("minEvictableIdleTimeMillis", "300000"); - map.put("validationQuery", "SELECT 1 FROM DUAL"); - map.put("testWhileIdle", "true"); - log.info("{},config read successful",map); - dataSource = DruidDataSourceFactory.createDataSource(map); - - } + static final String JDBC_DRIVER = "com.mysql.cj.jdbc.Driver"; + private final Logger log = LoggerFactory.getLogger(getClass()); // use concrete subclass + protected String topicPrefix; + protected String jdbcUrl; + private final Queue<Connection> connections = new ConcurrentLinkedQueue<>(); + private Config config = new Config(); + private DataSource dataSource; + private List<Table> list = new LinkedList<>(); + + public List<Table> getList() { + return list; + } + + public void setList(List<Table> list) { + this.list = list; + } + + public Connection getConnection() throws SQLException { + + // These config names are the same for both source and sink configs ... + String username = config.jdbcUsername; + String dbPassword = config.jdbcPassword; + jdbcUrl = config.jdbcUrl; + Properties properties = new Properties(); + if (username != null) { + properties.setProperty("user", username); + } + if (dbPassword != null) { + properties.setProperty("password", dbPassword); + } + Connection connection = DriverManager.getConnection(jdbcUrl, properties); + + connections.add(connection); + return connection; + } + + public void stop() { + Connection conn; + while ((conn = connections.poll()) != null) { + try { + conn.close(); + } catch (Throwable e) { + log.warn("Error while closing connection to {}", "jdbc", e); + } + } + } + + protected PreparedStatement createDBPreparedStatement(Connection db) throws SQLException { + + String SQL = "select table_name,column_name,data_type,column_type,character_set_name " + + "from information_schema.columns " + "where table_schema = jdbc_db order by ORDINAL_POSITION"; + + log.trace("Creating a PreparedStatement '{}'", SQL); + PreparedStatement stmt = db.prepareStatement(SQL); + return stmt; + + } + + protected PreparedStatement createPreparedStatement(Connection db, String string) throws SQLException { + String query = "select * from " + string; + log.trace("Creating a PreparedStatement '{}'", query); + PreparedStatement stmt = db.prepareStatement(query); + return stmt; + + } + + protected ResultSet executeQuery(PreparedStatement stmt) throws SQLException { + return stmt.executeQuery(); + } + + public static void main(String[] args) throws Exception { + Querier querier = new Querier(); + try { + querier.start(); + querier.poll(); + + } catch (SQLException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + + } + + private Schema schema; + + public void poll() { + try { + + PreparedStatement stmt; + String query = "select * from "; + Connection conn = dataSource.getConnection(); + + for (Map.Entry<String, Database> entry : schema.dbMap.entrySet()) { + String db = entry.getKey(); + if (!db.contains("jdbc_db")) + continue; + Iterator<Map.Entry<String, Table>> iterator = entry.getValue().tableMap.entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry<String, Table> tableEntry = iterator.next(); + String tb = tableEntry.getKey(); + stmt = conn.prepareStatement(query + db + "." + tb); + ResultSet rs; + rs = stmt.executeQuery(); + List<String> colList = tableEntry.getValue().getColList(); + List<String> DataTypeList = tableEntry.getValue().getRawDataTypeList(); + List<ColumnParser> ParserList = tableEntry.getValue().getParserList(); + + while (rs.next()) { + Table table = new Table(db, tb); + System.out.print("|"); + table.setColList(colList); + table.setRawDataTypeList(DataTypeList); + table.setParserList(ParserList); + + for (String string : colList) { + table.getDataList().add(rs.getObject(string)); + System.out.print(string + " : " + rs.getObject(string) + "|"); + } + list.add(table); + System.out.println(); + } + } + } + + } catch (SQLException e) { + e.printStackTrace(); + } + + } + + public void start() throws Exception { + initDataSource(); + schema = new Schema(dataSource); + schema.load(); + log.info("schema load successful"); + } + + private void initDataSource() throws Exception { + Map<String, String> map = new HashMap<>(); + map.put("driverClassName", "com.mysql.cj.jdbc.Driver"); + map.put("url", + "jdbc:mysql://" + config.jdbcUrl + "?useSSL=true&verifyServerCertificate=false&serverTimezone=GMT%2B8"); + map.put("username", config.jdbcUsername); + map.put("password", config.jdbcPassword); + map.put("initialSize", "2"); + map.put("maxActive", "2"); + map.put("maxWait", "60000"); + map.put("timeBetweenEvictionRunsMillis", "60000"); + map.put("minEvictableIdleTimeMillis", "300000"); + map.put("validationQuery", "SELECT 1 FROM DUAL"); + map.put("testWhileIdle", "true"); + log.info("{},config read successful", map); + dataSource = DruidDataSourceFactory.createDataSource(map); + + } }
