This is an automated email from the ASF dual-hosted git repository. zhoubo pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
commit 42cbfb306735e8246f16b5e9f4be06b65cfad366 Author: chenyi19851209 <[email protected]> AuthorDate: Thu Dec 19 09:22:16 2019 +0800 [ISSUE #485] Support repeat consumption (#486) --- .../apache/rocketmq/connect/jdbc/sink/Updater.java | 74 ++++++++++++++++++++-- 1 file changed, 67 insertions(+), 7 deletions(-) diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/sink/Updater.java b/src/main/java/org/apache/rocketmq/connect/jdbc/sink/Updater.java index 3571852..e30c65f 100644 --- a/src/main/java/org/apache/rocketmq/connect/jdbc/sink/Updater.java +++ b/src/main/java/org/apache/rocketmq/connect/jdbc/sink/Updater.java @@ -32,18 +32,35 @@ public class Updater { public boolean push(String dbName, String tableName, Map<Field, Object[]> fieldMap, EntryType entryType) { Boolean isSuccess = false; - int id = 0; + int beforeUpdateId = 0; + int afterUpdateId = 0; switch (entryType) { case CREATE: - isSuccess = updateRow(dbName, tableName, fieldMap, id); + afterUpdateId = queryAfterUpdateRowId(dbName, tableName, fieldMap); + if (afterUpdateId != 0){ + isSuccess = true; + break; + } + isSuccess = updateRow(dbName, tableName, fieldMap, beforeUpdateId); break; case UPDATE: - id = queryRowId(dbName, tableName, fieldMap); - isSuccess = updateRow(dbName, tableName, fieldMap, id); + afterUpdateId = queryAfterUpdateRowId(dbName, tableName, fieldMap); + if (afterUpdateId != 0){ + isSuccess = true; + // 再查原有数据是否存在,存在则删除 + beforeUpdateId = queryBeforeUpdateRowId(dbName, tableName, fieldMap); + if (beforeUpdateId != 0){ + isSuccess = deleteRow(dbName, tableName, beforeUpdateId); + } + break; + } + + beforeUpdateId = queryBeforeUpdateRowId(dbName, tableName, fieldMap); + isSuccess = updateRow(dbName, tableName, fieldMap, beforeUpdateId); break; case DELETE: - id = queryRowId(dbName, tableName, fieldMap); - isSuccess = deleteRow(dbName, tableName, id); + beforeUpdateId = queryBeforeUpdateRowId(dbName, tableName, fieldMap); + isSuccess = deleteRow(dbName, tableName, beforeUpdateId); break; default: log.error("entryType {} is illegal.", entryType.toString()); @@ -85,7 +102,7 @@ public class Updater { return sql; } - private Integer queryRowId(String dbName, String tableName, Map<Field, Object[]> fieldMap) { + private Integer queryBeforeUpdateRowId(String dbName, String tableName, Map<Field, Object[]> fieldMap) { int count = 0, id = 0; ResultSet rs; PreparedStatement stmt; @@ -128,6 +145,49 @@ public class Updater { return id; } + private Integer queryAfterUpdateRowId(String dbName, String tableName, Map<Field, Object[]> fieldMap) { + int count = 0, id = 0; + ResultSet rs; + PreparedStatement stmt; + Boolean finishQuery = false; + String query = "select id from " + dbName + "." + tableName + " where "; + + for (Map.Entry<Field, Object[]> entry : fieldMap.entrySet()) { + count ++; + String fieldName = entry.getKey().getName(); + FieldType fieldType = entry.getKey().getType(); + Object fieldValue = entry.getValue()[1]; + if ("id".equals(fieldName)) + continue; + if (count != 1) { + query += " and "; + } + if (fieldValue == null) + { + query += fieldName + " is NULL"; + } else { + query = typeParser(fieldType, fieldName, fieldValue, query); + } + } + + try { + while (!connection.isClosed() && !finishQuery){ + stmt = connection.prepareStatement(query); + rs = stmt.executeQuery(); + if (rs != null) { + while (rs.next()) { + id = rs.getInt("id"); + } + finishQuery = true; + rs.close(); + } + } + } catch (SQLException e) { + log.error("query table error,{}", e); + } + return id; + } + private Boolean updateRow(String dbName, String tableName, Map<Field, Object[]> fieldMap, Integer id) { int count = 0; PreparedStatement stmt;
