This is an automated email from the ASF dual-hosted git repository. zhoubo pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
commit 5a83890b55770820325d32aa7c86df6b477c37b7 Author: yuchenlichuck <[email protected]> AuthorDate: Mon Jul 29 21:20:48 2019 +0800 Add JdbcSourceTask --- .../org/apache/rocketmq/connect/jdbc/Config.java | 12 +- .../apache/rocketmq/connect/jdbc/Replicator.java | 118 +++++++++++++ .../connect/jdbc/connector/JdbcSourceTask.java | 107 +++++++++++- .../rocketmq/connect/jdbc/schema/Database.java | 100 +++++++++++ .../rocketmq/connect/jdbc/schema/Schema.java | 128 ++++++++++++++ .../apache/rocketmq/connect/jdbc/schema/Table.java | 90 ++++++++++ .../column/BigIntColumnParser.java} | 40 ++++- .../connect/jdbc/schema/column/ColumnParser.java | 104 ++++++++++++ .../jdbc/schema/column/DateTimeColumnParser.java | 53 ++++++ .../column/DefaultColumnParser.java} | 27 ++- .../column/EnumColumnParser.java} | 36 +++- .../jdbc/schema/column/IntColumnParser.java | 66 ++++++++ .../jdbc/schema/column/SetColumnParser.java | 54 ++++++ .../jdbc/schema/column/StringColumnParser.java | 57 +++++++ .../column/TimeColumnParser.java} | 29 +++- .../column/YearColumnParser.java} | 30 +++- .../rocketmq/connect/jdbc/source/Querier.java | 187 +++++++++++++++++++++ 17 files changed, 1181 insertions(+), 57 deletions(-) diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/Config.java b/src/main/java/org/apache/rocketmq/connect/jdbc/Config.java index 217b449..69ff9b0 100644 --- a/src/main/java/org/apache/rocketmq/connect/jdbc/Config.java +++ b/src/main/java/org/apache/rocketmq/connect/jdbc/Config.java @@ -31,9 +31,9 @@ public class Config { private static final Logger LOG = LoggerFactory.getLogger(Config.class); /* Database Connection Config */ - public String jdbcUrl; - public String jdbcUsername; - public String jdbcPassword; + public String jdbcUrl="localhost:3306"; + public String jdbcUsername="root"; + public String jdbcPassword="199812160"; public String rocketmqTopic; public String jdbcBackoff; public String jdbcAttempts; @@ -54,7 +54,7 @@ public class Config { /*Connector config*/ public String tableTypes="table"; - public int pollInterval=5000; + public long pollInterval=5000; public int batchMaxRows=100; public long tablePollInterval=60000; public long timestampDelayInterval=0; @@ -67,7 +67,7 @@ public class Config { add("jdbcUsername"); add("jdbcPassword"); add("mode"); - add("queueName"); + add("rocketmqTopic"); } }; @@ -278,7 +278,7 @@ public class Config { this.tableTypes = tableTypes; } - public int getPollInterval() { + public long getPollInterval() { return pollInterval; } diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/Replicator.java b/src/main/java/org/apache/rocketmq/connect/jdbc/Replicator.java new file mode 100644 index 0000000..b24b7e5 --- /dev/null +++ b/src/main/java/org/apache/rocketmq/connect/jdbc/Replicator.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.connect.jdbc; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class Replicator { + + private static final Logger LOGGER = LoggerFactory.getLogger(Replicator.class); + + private static final Logger POSITION_LOGGER = LoggerFactory.getLogger("PositionLogger"); + + private Config config; + + private EventProcessor eventProcessor; + + private Object lock = new Object(); + private BinlogPosition nextBinlogPosition; + private long nextQueueOffset; + private long xid; + private BlockingQueue<Transaction> queue = new LinkedBlockingQueue<>(); + + public Replicator(Config config){ + this.config = config; + } + + public void start() { + + try { + + eventProcessor = new EventProcessor(this); + eventProcessor.start(); + + } catch (Exception e) { + LOGGER.error("Start error.", e); + } + } + + public void stop(){ + eventProcessor.stop(); + } + + public void commit(Transaction transaction, boolean isComplete) { + + queue.add(transaction); + for (int i = 0; i < 3; i++) { + try { + if (isComplete) { + long offset = 1; + synchronized (lock) { + xid = transaction.getXid(); + nextBinlogPosition = transaction.getNextBinlogPosition(); + nextQueueOffset = offset; + } + + } else { + } + break; + + } catch (Exception e) { + LOGGER.error("Push error,retry:" + (i + 1) + ",", e); + } + } + } + + public void logPosition() { + + String binlogFilename = null; + long xid = 0L; + long nextPosition = 0L; + long nextOffset = 0L; + + synchronized (lock) { + if (nextBinlogPosition != null) { + xid = this.xid; + binlogFilename = nextBinlogPosition.getBinlogFilename(); + nextPosition = nextBinlogPosition.getPosition(); + nextOffset = nextQueueOffset; + } + } + + if (binlogFilename != null) { + POSITION_LOGGER.info("XID: {}, BINLOG_FILE: {}, NEXT_POSITION: {}, NEXT_OFFSET: {}", + xid, binlogFilename, nextPosition, nextOffset); + } + + } + + public Config getConfig() { + return config; + } + +// public BinlogPosition getNextBinlogPosition() { +// return nextBinlogPosition; +// } + + public BlockingQueue<Transaction> getQueue() { + return queue; + } +} diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java b/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java index adb1c62..32ea763 100644 --- a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java +++ b/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java @@ -18,11 +18,106 @@ */ package org.apache.rocketmq.connect.jdbc.connector; - import io.openmessaging.connector.api.source.SourceTask; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.LinkedList; +import java.util.List; +import org.apache.rocketmq.connect.jdbc.Config; +import org.apache.rocketmq.connect.jdbc.Replicator; +import org.apache.rocketmq.connect.jdbc.schema.Table; +import org.apache.rocketmq.connect.jdbc.source.Querier; +import org.apache.rocketmq.connect.jdbc.schema.column.*; -public abstract class JdbcSourceTask extends SourceTask { -/* - * To Be Continued - */ -} \ No newline at end of file +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.alibaba.fastjson.JSON; +import io.openmessaging.KeyValue; +import io.openmessaging.connector.api.data.EntryType; +import io.openmessaging.connector.api.data.Schema; +import io.openmessaging.connector.api.data.SourceDataEntry; + +import com.alibaba.fastjson.JSONObject; +import io.openmessaging.connector.api.data.DataEntryBuilder; +import io.openmessaging.connector.api.data.Field; + +public class JdbcSourceTask extends SourceTask { + + private static final Logger log = LoggerFactory.getLogger(JdbcSourceTask.class); + + private Replicator replicator; + + private Config config; + + private List<Table> list=new LinkedList<>(); + + Querier querier = new Querier(); + @Override + public Collection<SourceDataEntry> poll() { + List<SourceDataEntry> res = new ArrayList<>(); + try { + JSONObject jsonObject = new JSONObject(); + jsonObject.put("nextQuery", "database"); + jsonObject.put("nextPosition", "10"); + //To be Continued + querier.poll(); + System.out.println(querier.getList().size()); + for(Table dataRow : querier.getList()){ + System.out.println(dataRow.getColList().get(0)); + Schema schema = new Schema(); + schema.setDataSource(dataRow.getDatabase()); + schema.setName(dataRow.getName()); + schema.setFields(new ArrayList<>()); + for(int i = 0; i < dataRow.getColList().size(); i++){ + String columnName = dataRow.getColList().get(i); + String rawDataType = dataRow.getRawDataTypeList().get(i); + Field field = new Field(i, columnName, ColumnParser.mapConnectorFieldType(rawDataType)); + schema.getFields().add(field); + } + DataEntryBuilder dataEntryBuilder = new DataEntryBuilder(schema); + dataEntryBuilder.timestamp(System.currentTimeMillis()) + .queue(dataRow.getName()) + .entryType(EntryType.CREATE); + for(int i = 0; i < dataRow.getColList().size(); i++){ + Object value=dataRow.getDataList().get(i); + System.out.println(1); + System.out.println(dataRow.getColList().get(i)+"|"+value); + dataEntryBuilder.putFiled(dataRow.getColList().get(i), JSON.toJSONString(value)); + } + SourceDataEntry sourceDataEntry = dataEntryBuilder.buildSourceDataEntry( + ByteBuffer.wrap(config.jdbcUrl.getBytes("UTF-8")), + ByteBuffer.wrap(jsonObject.toJSONString().getBytes("UTF-8"))); + res.add(sourceDataEntry); + } + } catch (Exception e) { + log.error("JDBC task poll error, current config:" + JSON.toJSONString(config), e); + } + return res; + } + + @Override + public void start(KeyValue props) { + try { + this.config = new Config(); + this.config.load(props); + querier.start(); + } catch (Exception e) { + log.error("JDBC task start failed.", e); + } + } + + @Override + public void stop() { + replicator.stop(); + } + + @Override public void pause() { + + } + + @Override public void resume() { + + } +} diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/schema/Database.java b/src/main/java/org/apache/rocketmq/connect/jdbc/schema/Database.java new file mode 100644 index 0000000..15fb77b --- /dev/null +++ b/src/main/java/org/apache/rocketmq/connect/jdbc/schema/Database.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.connect.jdbc.schema; + +//import io.openmessaging.mysql.binlog.EventProcessor; +import org.apache.rocketmq.connect.jdbc.schema.column.ColumnParser; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.HashMap; +import java.util.Map; +import javax.sql.DataSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class Database { + private static final String SQL = "select table_name,column_name,data_type,column_type,character_set_name " + + "from information_schema.columns " + + "where table_schema = ? order by ORDINAL_POSITION"; + private String name; + private DataSource dataSource; + public Map<String, Table> tableMap = new HashMap<String, Table>(); + public Database(String name, DataSource dataSource) { + this.name = name; + this.dataSource = dataSource; + } + + public void init() throws SQLException { + Connection conn = null; + PreparedStatement ps = null; + ResultSet rs = null; + + try { + conn = dataSource.getConnection(); + + ps = conn.prepareStatement(SQL); + ps.setString(1, name); + rs = ps.executeQuery(); + + while (rs.next()) { + String tableName = rs.getString(1); + String colName = rs.getString(2); + String dataType = rs.getString(3); + String colType = rs.getString(4); + String charset = rs.getString(5); + + ColumnParser columnParser = ColumnParser.getColumnParser(dataType, colType, charset); + + if (!tableMap.containsKey(tableName)) { + addTable(tableName); + } + Table table = tableMap.get(tableName); + table.addCol(colName); + table.addParser(columnParser); + table.addRawDataType(dataType); + } + + } finally { + if (conn != null) { + conn.close(); + } + if (ps != null) { + ps.close(); + } + if (rs != null) { + rs.close(); + } + } + + } + + private void addTable(String tableName) { + + // LOGGER.info("Schema load -- DATABASE:{},\tTABLE:{}", name, tableName); + + Table table = new Table(name, tableName); + tableMap.put(tableName, table); + } + + public Table getTable(String tableName) { + + return tableMap.get(tableName); + } +} diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/schema/Schema.java b/src/main/java/org/apache/rocketmq/connect/jdbc/schema/Schema.java new file mode 100644 index 0000000..6ce6621 --- /dev/null +++ b/src/main/java/org/apache/rocketmq/connect/jdbc/schema/Schema.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.connect.jdbc.schema; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import javax.sql.DataSource; +//import io.openmessaging.mysql.binlog.EventProcessor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class Schema { +// private static final Logger LOGGER = LoggerFactory.getLogger(EventProcessor.class); + + private static final String SQL = "select schema_name from information_schema.schemata"; + //取得数据库 + private static final List<String> IGNORED_DATABASES = new ArrayList<>( + Arrays.asList(new String[] {"information_schema", "mysql", "performance_schema", "sys"}) + ); + //忽略的数据库 + private DataSource dataSource; + + public Map<String, Database> dbMap; + + public Schema(DataSource dataSource) { + this.dataSource = dataSource; + } + + public void load() throws SQLException { + + dbMap = new HashMap<>(); + + Connection conn = null; + PreparedStatement ps = null; + ResultSet rs = null; + + try { + conn = dataSource.getConnection(); + + ps = conn.prepareStatement(SQL); + rs = ps.executeQuery(); + + while (rs.next()) { + String dbName = rs.getString(1); + if (!IGNORED_DATABASES.contains(dbName)) { + Database database = new Database(dbName, dataSource); + dbMap.put(dbName, database); + //dbMap存着各个数据库的名字 + } + } + + } finally { + + if (conn != null) { + conn.close(); + } + if (ps != null) { + ps.close(); + } + if (rs != null) { + rs.close(); + } + } + + for (Database db : dbMap.values()) { + db.init(); + } + + } + + public Table getTable(String dbName, String tableName) { + + if (dbMap == null) { + reload(); + } + + Database database = dbMap.get(dbName); + if (database == null) { + return null; + } + + Table table = database.getTable(tableName); + if (table == null) { + return null; + } + + return table; + } + + private void reload() { + + while (true) { + try { + load(); + break; + } catch (Exception e) { + // LOGGER.error("Reload schema error.", e); + System.out.println("Reload schema error."+e); + } + } + } + + public void reset() { + dbMap = null; + } +} diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/schema/Table.java b/src/main/java/org/apache/rocketmq/connect/jdbc/schema/Table.java new file mode 100644 index 0000000..c0d793d --- /dev/null +++ b/src/main/java/org/apache/rocketmq/connect/jdbc/schema/Table.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.connect.jdbc.schema; + +import org.apache.rocketmq.connect.jdbc.schema.column.ColumnParser; +import java.util.LinkedList; +import java.util.List; + +public class Table { + + private String database; + private String name; + private List<String> colList = new LinkedList<>(); + private List<ColumnParser> parserList = new LinkedList<>(); + private List<String> rawDataTypeList = new LinkedList<>(); + private List<Object> dataList =new LinkedList<>(); + + public Table(String database, String table) { + this.database = database; + this.name = table; + } + + public void addCol(String column) { + colList.add(column); + } + + public void setParserList(List<ColumnParser> parserList) { + this.parserList = parserList; + } + + public void setRawDataTypeList(List<String> rawDataTypeList) { + this.rawDataTypeList = rawDataTypeList; + } + + public void addParser(ColumnParser columnParser) { + parserList.add(columnParser); + } + + public void addRawDataType(String rawDataType){ + this.rawDataTypeList.add(rawDataType); + } + + public List<String> getColList() { + return colList; + } + + public List<String> getRawDataTypeList() { + return rawDataTypeList; + } + + public String getDatabase() { + return database; + } + + public String getName() { + return name; + } + + public List<ColumnParser> getParserList() { + return parserList; + } + + public List<Object> getDataList() { + return dataList; + } + + public void setDataList(List<Object> dataList) { + this.dataList = dataList; + } + + public void setColList(List<String> colList) { + this.colList = colList; + } + +} \ No newline at end of file diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java b/src/main/java/org/apache/rocketmq/connect/jdbc/schema/column/BigIntColumnParser.java similarity index 52% copy from src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java copy to src/main/java/org/apache/rocketmq/connect/jdbc/schema/column/BigIntColumnParser.java index adb1c62..610f07d 100644 --- a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java +++ b/src/main/java/org/apache/rocketmq/connect/jdbc/schema/column/BigIntColumnParser.java @@ -1,5 +1,3 @@ - - /* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with @@ -17,12 +15,36 @@ * limitations under the License. */ -package org.apache.rocketmq.connect.jdbc.connector; +package org.apache.rocketmq.connect.jdbc.schema.column; -import io.openmessaging.connector.api.source.SourceTask; +import java.math.BigInteger; -public abstract class JdbcSourceTask extends SourceTask { -/* - * To Be Continued - */ -} \ No newline at end of file +public class BigIntColumnParser extends ColumnParser { + + private static BigInteger max = BigInteger.ONE.shiftLeft(64); + + private boolean signed; + + public BigIntColumnParser(String colType) { + this.signed = !colType.matches(".* unsigned$"); + } + + @Override + public Object getValue(Object value) { + + if (value == null) { + return null; + } + + if (value instanceof BigInteger) { + return value; + } + + Long l = (Long) value; + if (!signed && l < 0) { + return max.add(BigInteger.valueOf(l)); + } else { + return l; + } + } +} diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/schema/column/ColumnParser.java b/src/main/java/org/apache/rocketmq/connect/jdbc/schema/column/ColumnParser.java new file mode 100644 index 0000000..341064e --- /dev/null +++ b/src/main/java/org/apache/rocketmq/connect/jdbc/schema/column/ColumnParser.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.connect.jdbc.schema.column; + +import io.openmessaging.connector.api.data.FieldType; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public abstract class ColumnParser { + + public static ColumnParser getColumnParser(String dataType, String colType, String charset) { + + switch (dataType) { + case "tinyint": + case "smallint": + case "mediumint": + case "int": + return new IntColumnParser(dataType, colType); + case "bigint": + return new BigIntColumnParser(colType); + case "tinytext": + case "text": + case "mediumtext": + case "longtext": + case "varchar": + case "char": + return new StringColumnParser(charset); + case "date": + case "datetime": + case "timestamp": + return new DateTimeColumnParser(); + case "time": + return new TimeColumnParser(); + case "year": + return new YearColumnParser(); + case "enum": + return new EnumColumnParser(colType); + case "set": + return new SetColumnParser(colType); + default: + return new DefaultColumnParser(); + } + } + + public static FieldType mapConnectorFieldType(String dataType) { + + switch (dataType) { + case "tinyint": + case "smallint": + case "mediumint": + case "int": + return FieldType.INT32; + case "bigint": + return FieldType.BIG_INTEGER; + case "tinytext": + case "text": + case "mediumtext": + case "longtext": + case "varchar": + case "char": + return FieldType.STRING; + case "date": + case "datetime": + case "timestamp": + case "time": + case "year": + return FieldType.DATETIME; + case "enum": + return null; + case "set": + return null; + default: + return FieldType.BYTES; + } + } + + public static String[] extractEnumValues(String colType) { + String[] enumValues = {}; + Matcher matcher = Pattern.compile("(enum|set)\\((.*)\\)").matcher(colType); + if (matcher.matches()) { + enumValues = matcher.group(2).replace("'", "").split(","); + } + + return enumValues; + } + + public abstract Object getValue(Object value); + +} diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/schema/column/DateTimeColumnParser.java b/src/main/java/org/apache/rocketmq/connect/jdbc/schema/column/DateTimeColumnParser.java new file mode 100644 index 0000000..8736280 --- /dev/null +++ b/src/main/java/org/apache/rocketmq/connect/jdbc/schema/column/DateTimeColumnParser.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.connect.jdbc.schema.column; + +import java.sql.Timestamp; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.TimeZone; + +public class DateTimeColumnParser extends ColumnParser { + + private static SimpleDateFormat dateTimeFormat; + private static SimpleDateFormat dateTimeUtcFormat; + + static { + dateTimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + dateTimeUtcFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + dateTimeUtcFormat.setTimeZone(TimeZone.getTimeZone("UTC")); + } + + @Override + public Object getValue(Object value) { + + if (value == null) { + return null; + } + + if (value instanceof Timestamp) { + return dateTimeFormat.format(value); + } + + if (value instanceof Long) { + return dateTimeUtcFormat.format(new Date((Long) value)); + } + + return value; + } +} diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java b/src/main/java/org/apache/rocketmq/connect/jdbc/schema/column/DefaultColumnParser.java similarity index 65% copy from src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java copy to src/main/java/org/apache/rocketmq/connect/jdbc/schema/column/DefaultColumnParser.java index adb1c62..ee3075a 100644 --- a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java +++ b/src/main/java/org/apache/rocketmq/connect/jdbc/schema/column/DefaultColumnParser.java @@ -1,5 +1,3 @@ - - /* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with @@ -17,12 +15,23 @@ * limitations under the License. */ -package org.apache.rocketmq.connect.jdbc.connector; +package org.apache.rocketmq.connect.jdbc.schema.column; -import io.openmessaging.connector.api.source.SourceTask; +import org.apache.commons.codec.binary.Base64; -public abstract class JdbcSourceTask extends SourceTask { -/* - * To Be Continued - */ -} \ No newline at end of file +public class DefaultColumnParser extends ColumnParser { + + @Override + public Object getValue(Object value) { + + if (value == null) { + return null; + } + + if (value instanceof byte[]) { + return Base64.encodeBase64String((byte[]) value); + } + + return value; + } +} diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java b/src/main/java/org/apache/rocketmq/connect/jdbc/schema/column/EnumColumnParser.java similarity index 57% copy from src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java copy to src/main/java/org/apache/rocketmq/connect/jdbc/schema/column/EnumColumnParser.java index adb1c62..0fd14ba 100644 --- a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java +++ b/src/main/java/org/apache/rocketmq/connect/jdbc/schema/column/EnumColumnParser.java @@ -1,5 +1,3 @@ - - /* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with @@ -17,12 +15,32 @@ * limitations under the License. */ -package org.apache.rocketmq.connect.jdbc.connector; +package org.apache.rocketmq.connect.jdbc.schema.column; -import io.openmessaging.connector.api.source.SourceTask; +public class EnumColumnParser extends ColumnParser { -public abstract class JdbcSourceTask extends SourceTask { -/* - * To Be Continued - */ -} \ No newline at end of file + private String[] enumValues; + + public EnumColumnParser(String colType) { + enumValues = extractEnumValues(colType); + } + + @Override + public Object getValue(Object value) { + + if (value == null) { + return null; + } + + if (value instanceof String) { + return value; + } + + Integer i = (Integer) value; + if (i == 0) { + return null; + } else { + return enumValues[i - 1]; + } + } +} diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/schema/column/IntColumnParser.java b/src/main/java/org/apache/rocketmq/connect/jdbc/schema/column/IntColumnParser.java new file mode 100644 index 0000000..36c6078 --- /dev/null +++ b/src/main/java/org/apache/rocketmq/connect/jdbc/schema/column/IntColumnParser.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.connect.jdbc.schema.column; + +public class IntColumnParser extends ColumnParser { + + private int bits; + private boolean signed; + + public IntColumnParser(String dataType, String colType) { + + switch (dataType) { + case "tinyint": + bits = 8; + break; + case "smallint": + bits = 16; + break; + case "mediumint": + bits = 24; + break; + case "int": + bits = 32; + } + + this.signed = !colType.matches(".* unsigned$"); + } + + @Override + public Object getValue(Object value) { + + if (value == null) { + return null; + } + + if (value instanceof Long) { + return value; + } + + if (value instanceof Integer) { + Integer i = (Integer) value; + if (signed || i > 0) { + return i; + } else { + return (1L << bits) + i; + } + } + + return value; + } +} diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/schema/column/SetColumnParser.java b/src/main/java/org/apache/rocketmq/connect/jdbc/schema/column/SetColumnParser.java new file mode 100644 index 0000000..d1e6bbc --- /dev/null +++ b/src/main/java/org/apache/rocketmq/connect/jdbc/schema/column/SetColumnParser.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.connect.jdbc.schema.column; + +public class SetColumnParser extends ColumnParser { + + private String[] enumValues; + + public SetColumnParser(String colType) { + enumValues = extractEnumValues(colType); + } + + @Override + public Object getValue(Object value) { + if (value == null) { + return null; + } + + if (value instanceof String) { + return value; + } + + StringBuilder builder = new StringBuilder(); + long l = (Long) value; + + boolean needSplit = false; + for (int i = 0; i < enumValues.length; i++) { + if (((l >> i) & 1) == 1) { + if (needSplit) + builder.append(","); + + builder.append(enumValues[i]); + needSplit = true; + } + } + + return builder.toString(); + } +} diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/schema/column/StringColumnParser.java b/src/main/java/org/apache/rocketmq/connect/jdbc/schema/column/StringColumnParser.java new file mode 100644 index 0000000..cd4f04f --- /dev/null +++ b/src/main/java/org/apache/rocketmq/connect/jdbc/schema/column/StringColumnParser.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.connect.jdbc.schema.column; + +import org.apache.commons.codec.Charsets; + +public class StringColumnParser extends ColumnParser { + + private String charset; + + public StringColumnParser(String charset) { + this.charset = charset.toLowerCase(); + } + + @Override + public Object getValue(Object value) { + + if (value == null) { + return null; + } + + if (value instanceof String) { + return value; + } + + byte[] bytes = (byte[]) value; + + switch (charset) { + case "utf8": + case "utf8mb4": + return new String(bytes, Charsets.UTF_8); + case "latin1": + case "ascii": + return new String(bytes, Charsets.ISO_8859_1); + case "ucs2": + return new String(bytes, Charsets.UTF_16); + default: + return new String(bytes, Charsets.toCharset(charset)); + + } + } +} diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java b/src/main/java/org/apache/rocketmq/connect/jdbc/schema/column/TimeColumnParser.java similarity index 65% copy from src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java copy to src/main/java/org/apache/rocketmq/connect/jdbc/schema/column/TimeColumnParser.java index adb1c62..9926d81 100644 --- a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java +++ b/src/main/java/org/apache/rocketmq/connect/jdbc/schema/column/TimeColumnParser.java @@ -1,5 +1,3 @@ - - /* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with @@ -17,12 +15,25 @@ * limitations under the License. */ -package org.apache.rocketmq.connect.jdbc.connector; +package org.apache.rocketmq.connect.jdbc.schema.column; -import io.openmessaging.connector.api.source.SourceTask; +import java.sql.Time; +import java.sql.Timestamp; -public abstract class JdbcSourceTask extends SourceTask { -/* - * To Be Continued - */ -} \ No newline at end of file +public class TimeColumnParser extends ColumnParser { + + @Override + public Object getValue(Object value) { + + if (value == null) { + return null; + } + + if (value instanceof Timestamp) { + + return new Time(((Timestamp) value).getTime()); + } + + return value; + } +} diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java b/src/main/java/org/apache/rocketmq/connect/jdbc/schema/column/YearColumnParser.java similarity index 61% copy from src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java copy to src/main/java/org/apache/rocketmq/connect/jdbc/schema/column/YearColumnParser.java index adb1c62..14cc798 100644 --- a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java +++ b/src/main/java/org/apache/rocketmq/connect/jdbc/schema/column/YearColumnParser.java @@ -1,5 +1,3 @@ - - /* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with @@ -17,12 +15,26 @@ * limitations under the License. */ -package org.apache.rocketmq.connect.jdbc.connector; +package org.apache.rocketmq.connect.jdbc.schema.column; -import io.openmessaging.connector.api.source.SourceTask; +import java.sql.Date; +import java.util.Calendar; -public abstract class JdbcSourceTask extends SourceTask { -/* - * To Be Continued - */ -} \ No newline at end of file +public class YearColumnParser extends ColumnParser { + + @Override + public Object getValue(Object value) { + + if (value == null) { + return null; + } + + if (value instanceof Date) { + Calendar calendar = Calendar.getInstance(); + calendar.setTime((Date) value); + return calendar.get(Calendar.YEAR); + } + + return value; + } +} diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/source/Querier.java b/src/main/java/org/apache/rocketmq/connect/jdbc/source/Querier.java new file mode 100644 index 0000000..61323d4 --- /dev/null +++ b/src/main/java/org/apache/rocketmq/connect/jdbc/source/Querier.java @@ -0,0 +1,187 @@ +package org.apache.rocketmq.connect.jdbc.source; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; + +import javax.sql.DataSource; + +import org.apache.rocketmq.connect.jdbc.Config; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.alibaba.druid.pool.DruidDataSourceFactory; + + +import org.apache.rocketmq.connect.jdbc.schema.*; +import org.apache.rocketmq.connect.jdbc.schema.column.ColumnParser; + +public class Querier { + static final String JDBC_DRIVER = "com.mysql.cj.jdbc.Driver"; + private final Logger log = LoggerFactory.getLogger(getClass()); // use concrete subclass + protected String topicPrefix; + protected String jdbcUrl; + private final Queue<Connection> connections = new ConcurrentLinkedQueue<>(); + private Config config = new Config(); + private DataSource dataSource; + private List<Table> list=new LinkedList<>(); + + + + public List<Table> getList() { + return list; + } + + public void setList(List<Table> list) { + this.list = list; + } + + public Connection getConnection() throws SQLException { + + // These config names are the same for both source and sink configs ... + String username = config.jdbcUsername; + String dbPassword = config.jdbcPassword; + jdbcUrl = config.jdbcUrl; + Properties properties = new Properties(); + if (username != null) { + properties.setProperty("user", username); + } + if (dbPassword != null) { + properties.setProperty("password", dbPassword); + } + Connection connection = DriverManager.getConnection(jdbcUrl, properties); + + connections.add(connection); + return connection; + } + + public void close() { + Connection conn; + while ((conn = connections.poll()) != null) { + try { + conn.close(); + } catch (Throwable e) { + log.warn("Error while closing connection to {}", "jdbc", e); + } + } + } + + protected PreparedStatement createDBPreparedStatement(Connection db) throws SQLException { + + String SQL = "select table_name,column_name,data_type,column_type,character_set_name " + + "from information_schema.columns " + "where table_schema = jdbc_db order by ORDINAL_POSITION"; + + log.trace("Creating a PreparedStatement '{}'", SQL); + PreparedStatement stmt = db.prepareStatement(SQL); + return stmt; + + } + + protected PreparedStatement createPreparedStatement(Connection db, String string) throws SQLException { + String query = "select * from " + string; + log.trace("Creating a PreparedStatement '{}'", query); + PreparedStatement stmt = db.prepareStatement(query); + return stmt; + + } + + protected ResultSet executeQuery(PreparedStatement stmt) throws SQLException { + return stmt.executeQuery(); + } + + public static void main(String[] args) throws Exception { + Querier querier = new Querier(); + try { + querier.start(); + querier.poll(); + + } catch (SQLException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + + } + + private Schema schema; + + private Map<Long, Table> tableMap = new HashMap<>(); + + public void poll() { + try { + + PreparedStatement stmt; + String query = "select * from "; + Connection conn = dataSource.getConnection(); + + for (Map.Entry<String, Database> entry : schema.dbMap.entrySet()) { + String db = entry.getKey(); + if(!db.contains("jdbc_db")) + continue; + Iterator<Map.Entry<String, Table>> iterator = entry.getValue().tableMap.entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry<String, Table> tableEntry = iterator.next(); + String tb=tableEntry.getKey(); + stmt = conn.prepareStatement(query+db + "." +tb); + ResultSet rs; + rs = stmt.executeQuery(); + List<String> colList = tableEntry.getValue().getColList(); + List<String> DataTypeList = tableEntry.getValue().getRawDataTypeList(); + List<ColumnParser> ParserList = tableEntry.getValue().getParserList(); + + while(rs.next()) { + Table table=new Table(db, tb); + System.out.print("|"); + table.setColList(colList); + table.setRawDataTypeList(DataTypeList); + table.setParserList(ParserList); + + for (String string : colList) { + table.getDataList().add(rs.getObject(string)); + System.out.print(string+" : "+rs.getObject(string)+"|"); + } + list.add(table); + System.out.println(); + } + } + } + + } catch (SQLException e) { + e.printStackTrace(); + } + + } + + public void start() throws Exception { + initDataSource(); + schema = new Schema(dataSource); + schema.load(); + } + + private void initDataSource() throws Exception { + Map<String, String> map = new HashMap<>(); + map.put("driverClassName", "com.mysql.cj.jdbc.Driver"); + map.put("url", + "jdbc:mysql://" + config.jdbcUrl + "?useSSL=true&verifyServerCertificate=false&serverTimezone=GMT%2B8"); + map.put("username", config.jdbcUsername); + map.put("password", config.jdbcPassword); + map.put("initialSize", "2"); + map.put("maxActive", "2"); + map.put("maxWait", "60000"); + map.put("timeBetweenEvictionRunsMillis", "60000"); + map.put("minEvictableIdleTimeMillis", "300000"); + map.put("validationQuery", "SELECT 1 FROM DUAL"); + map.put("testWhileIdle", "true"); + dataSource = DruidDataSourceFactory.createDataSource(map); + } + +}
