This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch branch-1.4 in repository https://gitbox.apache.org/repos/asf/inlong.git
commit 1cf83e2167fbd2ebbde2477b32a538b48ab90c54 Author: haibo.duan <[email protected]> AuthorDate: Tue Nov 8 11:54:57 2022 +0800 [INLONG-6327][Agent] Support collect data from SQLServer by Debezium (#6338) --- .../inlong/agent/constant/SqlServerConstants.java | 39 +++ .../apache/inlong/agent/pojo/JobProfileDto.java | 39 +++ .../org/apache/inlong/agent/pojo/SqlServerJob.java | 76 +++++ inlong-agent/agent-plugins/pom.xml | 5 + .../agent/plugin/sources/SQLServerSource.java | 34 +- .../plugin/sources/reader/SQLServerReader.java | 344 ++++++++++++++------- .../sources/snapshot/SqlServerSnapshotBase.java | 52 ++++ .../agent/plugin/sources/TestSQLServerConnect.java | 48 +-- .../agent/plugin/sources/TestSQLServerReader.java | 156 +++++----- .../agent/plugin/sources/TestSQLServerSource.java | 13 - pom.xml | 6 + 11 files changed, 556 insertions(+), 256 deletions(-) diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/SqlServerConstants.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/SqlServerConstants.java new file mode 100644 index 000000000..ffb4d7ae8 --- /dev/null +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/SqlServerConstants.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.agent.constant; + +public class SqlServerConstants { + + /** + * Takes a snapshot of structure and data of captured tables; + * useful if topics should be populated with a complete representation of the data from the captured tables. + */ + public static final String INITIAL = "initial"; + + /** + * Takes a snapshot of structure and data like initial + * but instead does not transition into streaming changes once the snapshot has completed. + */ + public static final String INITIAL_ONLY = "initial_only"; + + /** + * Takes a snapshot of the structure of captured tables only; + * useful if only changes happening from now onwards should be propagated to topics. + */ + public static final String SCHEMA_ONLY = "schema_only"; +} diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java index d57b2c341..810a42d2b 100644 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java @@ -58,6 +58,10 @@ public class JobProfileDto { * mqtt source */ public static final String MQTT_SOURCE = "org.apache.inlong.agent.plugin.sources.MqttSource"; + /** + * sqlserver source + */ + public static final String SQLSERVER_SOURCE = "org.apache.inlong.agent.plugin.sources.SqlServerSource"; private static final Gson GSON = new Gson(); @@ -226,6 +230,34 @@ public class JobProfileDto { return mongoJob; } + private static SqlServerJob getSqlServerJob(DataConfig dataConfigs) { + SqlServerJob.SqlserverJobConfig config = GSON.fromJson(dataConfigs.getExtParams(), + SqlServerJob.SqlserverJobConfig.class); + SqlServerJob oracleJob = new SqlServerJob(); + oracleJob.setUser(config.getUser()); + oracleJob.setHostname(config.getHostname()); + oracleJob.setPassword(config.getPassword()); + oracleJob.setPort(config.getPort()); + oracleJob.setServerName(config.getServerName()); + oracleJob.setDbname(config.getDbname()); + + SqlServerJob.Offset offset = new SqlServerJob.Offset(); + offset.setFilename(config.getOffsetFilename()); + offset.setSpecificOffsetFile(config.getSpecificOffsetFile()); + offset.setSpecificOffsetPos(config.getSpecificOffsetPos()); + oracleJob.setOffset(offset); + + SqlServerJob.Snapshot snapshot = new SqlServerJob.Snapshot(); + snapshot.setMode(config.getSnapshotMode()); + oracleJob.setSnapshot(snapshot); + + SqlServerJob.History history = new SqlServerJob.History(); + history.setFilename(config.getHistoryFilename()); + oracleJob.setHistory(history); + + return oracleJob; + } + public static MqttJob getMqttJob(DataConfig dataConfigs) { MqttJob.MqttJobConfig config = GSON.fromJson(dataConfigs.getExtParams(), MqttJob.MqttJobConfig.class); @@ -309,6 +341,12 @@ public class JobProfileDto { job.setSource(KAFKA_SOURCE); profileDto.setJob(job); break; + case SQLSERVER: + SqlServerJob sqlserverJob = getSqlServerJob(dataConfig); + job.setSqlserverJob(sqlserverJob); + job.setSource(SQLSERVER_SOURCE); + profileDto.setJob(job); + break; case MONGODB: MongoJob mongoJob = getMongoJob(dataConfig); job.setMongoJob(mongoJob); @@ -349,6 +387,7 @@ public class JobProfileDto { private KafkaJob kafkaJob; private MongoJob mongoJob; private MqttJob mqttJob; + private SqlServerJob sqlserverJob; } @Data diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/SqlServerJob.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/SqlServerJob.java new file mode 100644 index 000000000..735c745bb --- /dev/null +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/SqlServerJob.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.agent.pojo; + +import lombok.Data; + +@Data +public class SqlServerJob { + + private String hostname; + private String user; + private String password; + private String port; + private String serverName; + private String dbname; + + private SqlServerJob.Snapshot snapshot; + private SqlServerJob.Offset offset; + private SqlServerJob.History history; + + @Data + public static class Offset { + + private String intervalMs; + private String filename; + private String specificOffsetFile; + private String specificOffsetPos; + } + + @Data + public static class Snapshot { + + private String mode; + } + + @Data + public static class History { + + private String filename; + } + + @Data + public static class SqlserverJobConfig { + + private String hostname; + private String user; + private String password; + private String port; + private String dbname; + private String serverName; + + private String snapshotMode; + private String intervalMs; + private String offsetFilename; + private String historyFilename; + + private String specificOffsetFile; + private String specificOffsetPos; + } + +} diff --git a/inlong-agent/agent-plugins/pom.xml b/inlong-agent/agent-plugins/pom.xml index 141f7ff9f..7bc8eb0f6 100644 --- a/inlong-agent/agent-plugins/pom.xml +++ b/inlong-agent/agent-plugins/pom.xml @@ -85,6 +85,11 @@ </exclusions> </dependency> + <dependency> + <groupId>io.debezium</groupId> + <artifactId>debezium-connector-sqlserver</artifactId> + </dependency> + <dependency> <groupId>io.debezium</groupId> <artifactId>debezium-connector-mongodb</artifactId> diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/SQLServerSource.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/SQLServerSource.java index af6d04a93..a65006152 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/SQLServerSource.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/SQLServerSource.java @@ -17,56 +17,32 @@ package org.apache.inlong.agent.plugin.sources; -import org.apache.commons.collections.CollectionUtils; -import org.apache.commons.lang3.StringUtils; import org.apache.inlong.agent.conf.JobProfile; import org.apache.inlong.agent.plugin.Reader; import org.apache.inlong.agent.plugin.sources.reader.SQLServerReader; -import org.apache.inlong.agent.utils.AgentDbUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; -import java.util.Arrays; import java.util.List; -import java.util.Objects; /** - * SQLServer SQL source, split SQLServer SQL source job into multi readers + * SQLServer source */ public class SQLServerSource extends AbstractSource { private static final Logger logger = LoggerFactory.getLogger(SQLServerSource.class); - public static final String JOB_DATABASE_SQL = "job.sql.command"; - public SQLServerSource() { } - private List<Reader> splitSqlJob(String sqlPattern) { - final List<Reader> result = new ArrayList<>(); - String[] sqlList = AgentDbUtils.replaceDynamicSeq(sqlPattern); - if (Objects.nonNull(sqlList)) { - Arrays.stream(sqlList).forEach(sql -> { - result.add(new SQLServerReader(sql)); - }); - } - return result; - } - @Override public List<Reader> split(JobProfile conf) { super.init(conf); - String sqlPattern = conf.get(JOB_DATABASE_SQL, StringUtils.EMPTY).toLowerCase(); - List<Reader> readerList = null; - if (StringUtils.isNotEmpty(sqlPattern)) { - readerList = splitSqlJob(sqlPattern); - } - if (CollectionUtils.isNotEmpty(readerList)) { - sourceMetric.sourceSuccessCount.incrementAndGet(); - } else { - sourceMetric.sourceFailCount.incrementAndGet(); - } + Reader sqlServerReader = new SQLServerReader(); + List<Reader> readerList = new ArrayList<>(); + readerList.add(sqlServerReader); + sourceMetric.sourceSuccessCount.incrementAndGet(); return readerList; } } diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SQLServerReader.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SQLServerReader.java index c4daa8560..079d9af07 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SQLServerReader.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SQLServerReader.java @@ -17,114 +17,129 @@ package org.apache.inlong.agent.plugin.sources.reader; -import org.apache.commons.codec.binary.Base64; -import org.apache.commons.lang3.CharUtils; +import com.google.common.base.Preconditions; +import com.google.gson.Gson; +import io.debezium.connector.sqlserver.SqlServerConnector; +import io.debezium.engine.ChangeEvent; +import io.debezium.engine.DebeziumEngine; +import io.debezium.relational.history.FileDatabaseHistory; import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.inlong.agent.conf.AgentConfiguration; import org.apache.inlong.agent.conf.JobProfile; +import org.apache.inlong.agent.constant.AgentConstants; +import org.apache.inlong.agent.constant.SnapshotModeConstants; +import org.apache.inlong.agent.constant.SqlServerConstants; import org.apache.inlong.agent.message.DefaultMessage; import org.apache.inlong.agent.metrics.audit.AuditUtils; import org.apache.inlong.agent.plugin.Message; -import org.apache.inlong.agent.utils.AgentDbUtils; +import org.apache.inlong.agent.plugin.sources.snapshot.SqlServerSnapshotBase; +import org.apache.inlong.agent.plugin.utils.InLongDatabaseHistory; +import org.apache.inlong.agent.plugin.utils.InLongFileOffsetBackingStore; +import org.apache.inlong.agent.pojo.DebeziumFormat; +import org.apache.inlong.agent.pojo.DebeziumOffset; import org.apache.inlong.agent.utils.AgentUtils; +import org.apache.inlong.agent.utils.DebeziumOffsetSerializer; +import org.apache.inlong.agent.utils.GsonUtil; +import org.apache.kafka.connect.storage.FileOffsetBackingStore; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; import java.nio.charset.StandardCharsets; -import java.sql.Connection; -import java.sql.PreparedStatement; -import java.sql.ResultSet; -import java.util.ArrayList; -import java.util.List; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.Properties; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; -import static java.sql.Types.BINARY; -import static java.sql.Types.BLOB; -import static java.sql.Types.LONGVARBINARY; -import static java.sql.Types.VARBINARY; +import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_MAP_CAPACITY; +import static org.apache.inlong.agent.constant.CommonConstants.PROXY_KEY_DATA; /** - * Read data from SQLServer database by SQL + * Read data from SQLServer database by Debezium */ public class SQLServerReader extends AbstractReader { public static final String SQLSERVER_READER_TAG_NAME = "AgentSQLServerMetric"; - public static final String JOB_DATABASE_USER = "job.sqlserverJob.user"; - public static final String JOB_DATABASE_PASSWORD = "job.sqlserverJob.password"; public static final String JOB_DATABASE_HOSTNAME = "job.sqlserverJob.hostname"; public static final String JOB_DATABASE_PORT = "job.sqlserverJob.port"; + public static final String JOB_DATABASE_USER = "job.sqlserverJob.user"; + public static final String JOB_DATABASE_PASSWORD = "job.sqlserverJob.password"; public static final String JOB_DATABASE_DBNAME = "job.sqlserverJob.dbname"; - public static final String JOB_DATABASE_BATCH_SIZE = "job.sqlserverJob.batchSize"; - public static final int DEFAULT_JOB_DATABASE_BATCH_SIZE = 1000; - public static final String JOB_DATABASE_DRIVER_CLASS = "job.database.driverClass"; - public static final String DEFAULT_JOB_DATABASE_DRIVER_CLASS = "com.microsoft.sqlserver.jdbc.SQLServerDriver"; - public static final String STD_FIELD_SEPARATOR_SHORT = "\001"; - public static final String JOB_DATABASE_SEPARATOR = "job.sql.separator"; - // pre-set sql lines, commands like "set xxx=xx;" - public static final String JOB_DATABASE_TYPE = "job.database.type"; - public static final String SQLSERVER = "sqlserver"; + public static final String JOB_DATABASE_SNAPSHOT_MODE = "job.sqlserverJob.snapshot.mode"; + public static final String JOB_DATABASE_QUEUE_SIZE = "job.sqlserverJob.queueSize"; + public static final String JOB_DATABASE_OFFSETS = "job.sqlserverJob.offsets"; + public static final String JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE = "job.sqlserverJob.offset.specificOffsetFile"; + public static final String JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS = "job.sqlserverJob.offset.specificOffsetPos"; + + public static final String JOB_DATABASE_SERVER_NAME = "job.sqlserverJob.serverName"; + + public static final String JOB_DATABASE_STORE_OFFSET_INTERVAL_MS = "job.sqlserverJob.offset.intervalMs"; + public static final String JOB_DATABASE_STORE_HISTORY_FILENAME = "job.sqlserverJob.history.filename"; + private static final Logger LOGGER = LoggerFactory.getLogger(SqlReader.class); - private static final String[] NEW_LINE_CHARS = new String[]{String.valueOf(CharUtils.CR), - String.valueOf(CharUtils.LF)}; - private static final String[] EMPTY_CHARS = new String[]{StringUtils.EMPTY, StringUtils.EMPTY}; + private final AgentConfiguration agentConf = AgentConfiguration.getAgentConf(); - private final String sql; + private static final Gson GSON = new Gson(); - private PreparedStatement preparedStatement; - private Connection conn; - private ResultSet resultSet; - private int columnCount; + private String databaseStoreHistoryName; + private String instanceId; + private String dbName; + private String serverName; + private String userName; + private String password; + private String hostName; + private String port; + private String offsetFlushIntervalMs; + private String offsetStoreFileName; + private String snapshotMode; + private String offset; + private String specificOffsetFile; + private String specificOffsetPos; - // column types - private String[] columnTypeNames; - private int[] columnTypeCodes; + private ExecutorService executor; + private SqlServerSnapshotBase sqlServerSnapshot; private boolean finished = false; - private String separator; - public SQLServerReader(String sql) { - this.sql = sql; + private LinkedBlockingQueue<Pair<String, String>> sqlServerMessageQueue; + private JobProfile jobProfile; + private boolean destroyed = false; + + public SQLServerReader() { + } @Override public Message read() { - try { - if (!resultSet.next()) { - finished = true; - return null; - } - final List<String> lineColumns = new ArrayList<>(); - for (int i = 1; i <= columnCount; i++) { - final String dataValue; - /* handle special blob value, encode with base64, BLOB=2004 */ - final int typeCode = columnTypeCodes[i - 1]; - final String typeName = columnTypeNames[i - 1]; - - // binary type - if (typeCode == BLOB || typeCode == BINARY || typeCode == VARBINARY - || typeCode == LONGVARBINARY || typeName.contains("BLOB")) { - final byte[] data = resultSet.getBytes(i); - dataValue = new String(Base64.encodeBase64(data, false), StandardCharsets.UTF_8); - } else { - // non-binary type - dataValue = StringUtils.replaceEachRepeatedly(resultSet.getString(i), - NEW_LINE_CHARS, EMPTY_CHARS); - } - lineColumns.add(dataValue); - } - long dataSize = lineColumns.stream().mapToLong(column -> column.length()).sum(); - AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS, inlongGroupId, inlongStreamId, - System.currentTimeMillis(), 1, dataSize); - readerMetric.pluginReadSuccessCount.incrementAndGet(); - readerMetric.pluginReadCount.incrementAndGet(); - return generateMessage(lineColumns); - } catch (Exception ex) { - LOGGER.error("error while reading data", ex); - readerMetric.pluginReadFailCount.incrementAndGet(); - readerMetric.pluginReadCount.incrementAndGet(); - throw new RuntimeException(ex); + if (!sqlServerMessageQueue.isEmpty()) { + return getSqlServerMessage(); + } else { + return null; + } + } + + /** + * poll message from buffer pool + * + * @return org.apache.inlong.agent.plugin.Message + */ + private DefaultMessage getSqlServerMessage() { + // Retrieves and removes the head of this queue, + // or returns null if this queue is empty. + Pair<String, String> message = sqlServerMessageQueue.poll(); + if (Objects.isNull(message)) { + return null; } + Map<String, String> header = new HashMap<>(DEFAULT_MAP_CAPACITY); + header.put(PROXY_KEY_DATA, message.getKey()); + return new DefaultMessage(GsonUtil.toJson(message.getValue()).getBytes(StandardCharsets.UTF_8), header); } - private Message generateMessage(List<String> lineColumns) { - return new DefaultMessage(StringUtils.join(lineColumns, separator).getBytes(StandardCharsets.UTF_8)); + public boolean isDestroyed() { + return destroyed; } @Override @@ -134,7 +149,7 @@ public class SQLServerReader extends AbstractReader { @Override public String getReadSource() { - return sql; + return instanceId; } @Override @@ -149,12 +164,16 @@ public class SQLServerReader extends AbstractReader { @Override public String getSnapshot() { - return StringUtils.EMPTY; + if (sqlServerSnapshot != null) { + return sqlServerSnapshot.getSnapshot(); + } else { + return StringUtils.EMPTY; + } } @Override public void finishRead() { - destroy(); + this.finished = true; } @Override @@ -162,59 +181,146 @@ public class SQLServerReader extends AbstractReader { return true; } + private String tryToInitAndGetHistoryPath() { + String historyPath = agentConf.get( + AgentConstants.AGENT_HISTORY_PATH, AgentConstants.DEFAULT_AGENT_HISTORY_PATH); + String parentPath = agentConf.get( + AgentConstants.AGENT_HOME, AgentConstants.DEFAULT_AGENT_HOME); + return AgentUtils.makeDirsIfNotExist(historyPath, parentPath).getAbsolutePath(); + } + @Override public void init(JobProfile jobConf) { super.init(jobConf); - int batchSize = jobConf.getInt(JOB_DATABASE_BATCH_SIZE, DEFAULT_JOB_DATABASE_BATCH_SIZE); - String userName = jobConf.get(JOB_DATABASE_USER); - String password = jobConf.get(JOB_DATABASE_PASSWORD); - String hostName = jobConf.get(JOB_DATABASE_HOSTNAME); - String dbname = jobConf.get(JOB_DATABASE_DBNAME); - int port = jobConf.getInt(JOB_DATABASE_PORT); - - String driverClass = jobConf.get(JOB_DATABASE_DRIVER_CLASS, - DEFAULT_JOB_DATABASE_DRIVER_CLASS); - separator = jobConf.get(JOB_DATABASE_SEPARATOR, STD_FIELD_SEPARATOR_SHORT); + jobProfile = jobConf; + LOGGER.info("init SqlServer reader with jobConf {}", jobConf.toJsonStr()); + userName = jobConf.get(JOB_DATABASE_USER); + password = jobConf.get(JOB_DATABASE_PASSWORD); + hostName = jobConf.get(JOB_DATABASE_HOSTNAME); + port = jobConf.get(JOB_DATABASE_PORT); + dbName = jobConf.get(JOB_DATABASE_DBNAME); + serverName = jobConf.get(JOB_DATABASE_SERVER_NAME); + instanceId = jobConf.getInstanceId(); + offsetFlushIntervalMs = jobConf.get(JOB_DATABASE_STORE_OFFSET_INTERVAL_MS, "100000"); + offsetStoreFileName = jobConf.get(JOB_DATABASE_STORE_HISTORY_FILENAME, + tryToInitAndGetHistoryPath()) + "/offset.dat" + instanceId; + snapshotMode = jobConf.get(JOB_DATABASE_SNAPSHOT_MODE, SqlServerConstants.INITIAL); + sqlServerMessageQueue = new LinkedBlockingQueue<>(jobConf.getInt(JOB_DATABASE_QUEUE_SIZE, 1000)); finished = false; + + databaseStoreHistoryName = jobConf.get(JOB_DATABASE_STORE_HISTORY_FILENAME, + tryToInitAndGetHistoryPath()) + "/history.dat" + jobConf.getInstanceId(); + offset = jobConf.get(JOB_DATABASE_OFFSETS, ""); + specificOffsetFile = jobConf.get(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE, ""); + specificOffsetPos = jobConf.get(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS, "-1"); + + sqlServerSnapshot = new SqlServerSnapshotBase(offsetStoreFileName); + sqlServerSnapshot.save(offset, sqlServerSnapshot.getFile()); + + Properties props = getEngineProps(); + + DebeziumEngine<ChangeEvent<String, String>> engine = DebeziumEngine.create( + io.debezium.engine.format.Json.class) + .using(props) + .notifying((records, committer) -> { + try { + for (ChangeEvent<String, String> record : records) { + DebeziumFormat debeziumFormat = GSON + .fromJson(record.value(), DebeziumFormat.class); + sqlServerMessageQueue.put(Pair.of(debeziumFormat.getSource().getTable(), record.value())); + committer.markProcessed(record); + } + committer.markBatchFinished(); + long dataSize = records.stream().mapToLong(c -> c.value().length()).sum(); + AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS, inlongGroupId, inlongStreamId, + System.currentTimeMillis(), records.size(), dataSize); + readerMetric.pluginReadSuccessCount.addAndGet(records.size()); + readerMetric.pluginReadCount.addAndGet(records.size()); + } catch (Exception e) { + readerMetric.pluginReadFailCount.addAndGet(records.size()); + readerMetric.pluginReadCount.addAndGet(records.size()); + LOGGER.error("parse SqlServer message error", e); + } + }) + .using((success, message, error) -> { + if (!success) { + LOGGER.error("SqlServer job with jobConf {} has error {}", instanceId, message, error); + } + }).build(); + + executor = Executors.newSingleThreadExecutor(); + executor.execute(engine); + + LOGGER.info("get initial snapshot of job {}, snapshot {}", instanceId, getSnapshot()); + } + + private String serializeOffset() { + Map<String, Object> sourceOffset = new HashMap<>(); + Preconditions.checkNotNull(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE, + JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE + "shouldn't be null"); + sourceOffset.put("file", specificOffsetFile); + Preconditions.checkNotNull(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS, + JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS + " shouldn't be null"); + sourceOffset.put("pos", specificOffsetPos); + DebeziumOffset specificOffset = new DebeziumOffset(); + specificOffset.setSourceOffset(sourceOffset); + Map<String, String> sourcePartition = new HashMap<>(); + sourcePartition.put("server", instanceId); + specificOffset.setSourcePartition(sourcePartition); + byte[] serializedOffset = new byte[0]; try { - String databaseType = jobConf.get(JOB_DATABASE_TYPE, SQLSERVER); - String url = String.format("jdbc:%s://%s:%d;databaseName=%s;", databaseType, hostName, port, dbname); - conn = AgentDbUtils.getConnectionFailover(driverClass, url, userName, password); - preparedStatement = conn.prepareStatement(sql); - preparedStatement.setFetchSize(batchSize); - resultSet = preparedStatement.executeQuery(); - - initColumnMeta(); - } catch (Exception ex) { - LOGGER.error("error create statement", ex); - destroy(); - throw new RuntimeException(ex); + serializedOffset = DebeziumOffsetSerializer.INSTANCE.serialize(specificOffset); + } catch (IOException e) { + LOGGER.error("serialize offset message error", e); } + return new String(serializedOffset, StandardCharsets.UTF_8); } - /** - * Init column meta data. - * - * @throws Exception - sql exception - */ - private void initColumnMeta() throws Exception { - columnCount = resultSet.getMetaData().getColumnCount(); - columnTypeNames = new String[columnCount]; - columnTypeCodes = new int[columnCount]; - for (int i = 0; i < columnCount; i++) { - columnTypeCodes[i] = resultSet.getMetaData().getColumnType(i + 1); - String t = resultSet.getMetaData().getColumnTypeName(i + 1); - if (t != null) { - columnTypeNames[i] = t.toUpperCase(); - } + private Properties getEngineProps() { + Properties props = new Properties(); + props.setProperty("name", "engine" + instanceId); + props.setProperty("connector.class", SqlServerConnector.class.getCanonicalName()); + props.setProperty("database.hostname", hostName); + props.setProperty("database.port", port); + props.setProperty("database.user", userName); + props.setProperty("database.password", password); + props.setProperty("database.dbname", dbName); + props.setProperty("database.server.name", serverName); + props.setProperty("offset.flush.interval.ms", offsetFlushIntervalMs); + props.setProperty("database.snapshot.mode", snapshotMode); + props.setProperty("key.converter.schemas.enable", "false"); + props.setProperty("value.converter.schemas.enable", "false"); + props.setProperty("snapshot.mode", snapshotMode); + props.setProperty("offset.storage.file.filename", offsetStoreFileName); + props.setProperty("database.history.file.filename", databaseStoreHistoryName); + if (SnapshotModeConstants.SPECIFIC_OFFSETS.equals(snapshotMode)) { + props.setProperty("offset.storage", InLongFileOffsetBackingStore.class.getCanonicalName()); + props.setProperty(InLongFileOffsetBackingStore.OFFSET_STATE_VALUE, serializeOffset()); + props.setProperty("database.history", InLongDatabaseHistory.class.getCanonicalName()); + } else { + props.setProperty("offset.storage", FileOffsetBackingStore.class.getCanonicalName()); + props.setProperty("database.history", FileDatabaseHistory.class.getCanonicalName()); } + props.setProperty("tombstones.on.delete", "false"); + props.setProperty("converters", "datetime"); + props.setProperty("datetime.type", "org.apache.inlong.agent.plugin.utils.BinlogTimeConverter"); + props.setProperty("datetime.format.date", "yyyy-MM-dd"); + props.setProperty("datetime.format.time", "HH:mm:ss"); + props.setProperty("datetime.format.datetime", "yyyy-MM-dd HH:mm:ss"); + props.setProperty("datetime.format.timestamp", "yyyy-MM-dd HH:mm:ss"); + + LOGGER.info("SqlServer job {} start with props {}", jobProfile.getInstanceId(), props); + return props; } @Override public void destroy() { - finished = true; - AgentUtils.finallyClose(resultSet); - AgentUtils.finallyClose(preparedStatement); - AgentUtils.finallyClose(conn); + synchronized (this) { + if (!destroyed) { + this.executor.shutdownNow(); + this.sqlServerSnapshot.close(); + this.destroyed = true; + } + } } } diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/snapshot/SqlServerSnapshotBase.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/snapshot/SqlServerSnapshotBase.java new file mode 100644 index 000000000..62ee38618 --- /dev/null +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/snapshot/SqlServerSnapshotBase.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.agent.plugin.sources.snapshot; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; + +/** + * SqlServer Snapshot + */ +public class SqlServerSnapshotBase extends AbstractSnapshot { + + private static final Logger LOGGER = LoggerFactory.getLogger(SqlServerSnapshotBase.class); + private final File file; + + public SqlServerSnapshotBase(String filePath) { + file = new File(filePath); + } + + @Override + public String getSnapshot() { + byte[] offset = this.load(this.file); + return ENCODER.encodeToString(offset); + } + + @Override + public void close() { + + } + + public File getFile() { + return file; + } + +} diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestSQLServerConnect.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestSQLServerConnect.java index b278abb73..58f83e2f4 100644 --- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestSQLServerConnect.java +++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestSQLServerConnect.java @@ -18,43 +18,47 @@ package org.apache.inlong.agent.plugin.sources; import org.apache.inlong.agent.conf.JobProfile; +import org.apache.inlong.agent.constant.JobConstants; import org.apache.inlong.agent.plugin.Message; -import org.apache.inlong.agent.plugin.Reader; import org.apache.inlong.agent.plugin.sources.reader.SQLServerReader; import org.junit.Ignore; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -import java.util.List; -import java.util.Objects; +import java.util.UUID; -import static org.junit.Assert.assertNotNull; +import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_GROUP_ID; +import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_STREAM_ID; /** * Test cases for {@link SQLServerReader}. */ public class TestSQLServerConnect { + private static final Logger LOGGER = LoggerFactory.getLogger(TestSQLServerConnect.class); + /** * Just using in local test. */ + @Ignore - public void testSQLServerReader() { - JobProfile jobProfile = JobProfile.parseJsonStr("{}"); - jobProfile.set(SQLServerReader.JOB_DATABASE_USER, "sa"); - jobProfile.set(SQLServerReader.JOB_DATABASE_PASSWORD, "123456"); - jobProfile.set(SQLServerReader.JOB_DATABASE_HOSTNAME, "127.0.0.1"); - jobProfile.set(SQLServerReader.JOB_DATABASE_PORT, "1434"); - jobProfile.set(SQLServerReader.JOB_DATABASE_DBNAME, "inlong"); - final String sql = "select * from dbo.test01"; - jobProfile.set(SQLServerSource.JOB_DATABASE_SQL, sql); - final SQLServerSource source = new SQLServerSource(); - List<Reader> readers = source.split(jobProfile); - for (Reader reader : readers) { - reader.init(jobProfile); - while (!reader.isFinished()) { - Message message = reader.read(); - if (Objects.nonNull(message)) { - assertNotNull(message.getBody()); - } + public void testSqlServer() { + JobProfile jobProfile = new JobProfile(); + jobProfile.set("job.sqlserverJob.hostname", "localhost"); + jobProfile.set("job.sqlserverJob.port", "1434"); + jobProfile.set("job.sqlserverJob.user", "sa"); + jobProfile.set("job.sqlserverJob.password", "123456"); + jobProfile.set("job.sqlserverJob.dbname", "inlong"); + jobProfile.set("job.sqlserverJob.serverName", "fullfillment"); + jobProfile.set(JobConstants.JOB_INSTANCE_ID, UUID.randomUUID().toString()); + jobProfile.set(PROXY_INLONG_GROUP_ID, UUID.randomUUID().toString()); + jobProfile.set(PROXY_INLONG_STREAM_ID, UUID.randomUUID().toString()); + SQLServerReader sqlServerReader = new SQLServerReader(); + sqlServerReader.init(jobProfile); + while (true) { + Message message = sqlServerReader.read(); + if (message != null) { + LOGGER.info("event content: {}", message); } } } diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestSQLServerReader.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestSQLServerReader.java index 23f2fc416..33e291fbe 100644 --- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestSQLServerReader.java +++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestSQLServerReader.java @@ -17,15 +17,17 @@ package org.apache.inlong.agent.plugin.sources; -import org.apache.commons.lang3.StringUtils; +import io.debezium.engine.ChangeEvent; +import io.debezium.engine.DebeziumEngine; +import org.apache.commons.lang3.tuple.Pair; import org.apache.inlong.agent.conf.JobProfile; import org.apache.inlong.agent.constant.CommonConstants; +import org.apache.inlong.agent.constant.SqlServerConstants; import org.apache.inlong.agent.metrics.AgentMetricItem; import org.apache.inlong.agent.metrics.AgentMetricItemSet; -import org.apache.inlong.agent.metrics.audit.AuditUtils; import org.apache.inlong.agent.plugin.Message; import org.apache.inlong.agent.plugin.sources.reader.SQLServerReader; -import org.apache.inlong.agent.utils.AgentDbUtils; +import org.apache.inlong.agent.plugin.sources.snapshot.SqlServerSnapshotBase; import org.apache.inlong.common.metric.MetricRegister; import org.junit.Before; import org.junit.Test; @@ -35,11 +37,10 @@ import org.powermock.core.classloader.annotations.PowerMockIgnore; import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; -import java.sql.Connection; -import java.sql.PreparedStatement; -import java.sql.ResultSet; -import java.sql.ResultSetMetaData; -import java.sql.Types; +import java.util.Properties; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicLong; import static org.junit.Assert.assertEquals; @@ -49,7 +50,6 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.powermock.api.mockito.PowerMockito.field; import static org.powermock.api.mockito.PowerMockito.mockStatic; @@ -60,7 +60,7 @@ import static org.powermock.api.mockito.PowerMockito.whenNew; * Test cases for {@link SQLServerReader}. */ @RunWith(PowerMockRunner.class) -@PrepareForTest({AgentDbUtils.class, MetricRegister.class, AuditUtils.class, SQLServerReader.class}) +@PrepareForTest({DebeziumEngine.class, Executors.class, MetricRegister.class, SQLServerReader.class}) @PowerMockIgnore({"javax.management.*"}) public class TestSQLServerReader { @@ -70,28 +70,31 @@ public class TestSQLServerReader { private JobProfile jobProfile; @Mock - private Connection conn; + private AgentMetricItemSet agentMetricItemSet; @Mock - private PreparedStatement preparedStatement; + private AgentMetricItem agentMetricItem; @Mock - private ResultSet resultSet; + private SqlServerSnapshotBase sqlServerSnapshot; @Mock - private ResultSetMetaData metaData; + private DebeziumEngine.Builder builder; @Mock - private AgentMetricItemSet agentMetricItemSet; + private ExecutorService executorService; @Mock - private AgentMetricItem agentMetricItem; + private LinkedBlockingQueue<Pair<String, String>> sqlServerMessageQueue; + + @Mock + private DebeziumEngine<ChangeEvent<String, String>> engine; private AtomicLong atomicLong; private AtomicLong atomicCountLong; - private String sql; + private final String instanceId = "s4bc475560b4444dbd4e9812ab1fd64d"; @Before public void setUp() throws Exception { @@ -99,44 +102,60 @@ public class TestSQLServerReader { final String password = "123456"; final String hostname = "127.0.0.1"; final String port = "1434"; - final String dbname = "inlong"; - final String typeName1 = "int"; - final String typeName2 = "varchar"; final String groupId = "group01"; final String streamId = "stream01"; + final String dbName = "inlong"; + final String serverName = "server1"; + final String offsetFlushIntervalMs = "1000"; + final String offsetStoreFileName = "/opt/offset.dat"; + final String snapshotMode = SqlServerConstants.INITIAL; + final int queueSize = 1000; + final String databaseStoreHistoryName = "/opt/history.dat"; + final String offset = "111"; + final String specificOffsetFile = ""; + final String specificOffsetPos = "-1"; + atomicLong = new AtomicLong(0L); atomicCountLong = new AtomicLong(0L); - sql = "select * from dbo.test01"; - + when(jobProfile.getInstanceId()).thenReturn(instanceId); when(jobProfile.get(eq(CommonConstants.PROXY_INLONG_GROUP_ID), anyString())).thenReturn(groupId); when(jobProfile.get(eq(CommonConstants.PROXY_INLONG_STREAM_ID), anyString())).thenReturn(streamId); when(jobProfile.get(eq(SQLServerReader.JOB_DATABASE_USER))).thenReturn(username); when(jobProfile.get(eq(SQLServerReader.JOB_DATABASE_PASSWORD))).thenReturn(password); when(jobProfile.get(eq(SQLServerReader.JOB_DATABASE_HOSTNAME))).thenReturn(hostname); when(jobProfile.get(eq(SQLServerReader.JOB_DATABASE_PORT))).thenReturn(port); - when(jobProfile.get(eq(SQLServerReader.JOB_DATABASE_DBNAME))).thenReturn(dbname); - when(jobProfile.get(eq(SQLServerReader.JOB_DATABASE_DRIVER_CLASS), anyString())).thenReturn( - SQLServerReader.DEFAULT_JOB_DATABASE_DRIVER_CLASS); - when(jobProfile.getInt(eq(SQLServerReader.JOB_DATABASE_BATCH_SIZE), anyInt())).thenReturn( - SQLServerReader.DEFAULT_JOB_DATABASE_BATCH_SIZE); - when(jobProfile.get(eq(SQLServerReader.JOB_DATABASE_TYPE), anyString())).thenReturn( - SQLServerReader.SQLSERVER); - when(jobProfile.get(eq(SQLServerReader.JOB_DATABASE_SEPARATOR), anyString())).thenReturn( - SQLServerReader.STD_FIELD_SEPARATOR_SHORT); - mockStatic(AgentDbUtils.class); - when(AgentDbUtils.getConnectionFailover(eq(SQLServerReader.DEFAULT_JOB_DATABASE_DRIVER_CLASS), anyString(), - eq(username), eq(password))).thenReturn(conn); - when(conn.prepareStatement(anyString())).thenReturn(preparedStatement); - when(preparedStatement.executeQuery()).thenReturn(resultSet); - when(resultSet.getMetaData()).thenReturn(metaData); - when(metaData.getColumnCount()).thenReturn(2); - when(metaData.getColumnName(1)).thenReturn("id"); - when(metaData.getColumnName(2)).thenReturn("cell"); - when(metaData.getColumnType(1)).thenReturn(Types.INTEGER); - when(metaData.getColumnType(2)).thenReturn(Types.VARCHAR); - when(metaData.getColumnTypeName(1)).thenReturn(typeName1); - when(metaData.getColumnTypeName(2)).thenReturn(typeName2); + when(jobProfile.get(eq(SQLServerReader.JOB_DATABASE_DBNAME))).thenReturn(dbName); + when(jobProfile.get(eq(SQLServerReader.JOB_DATABASE_SERVER_NAME))).thenReturn(serverName); + when(jobProfile.get(eq(SQLServerReader.JOB_DATABASE_STORE_OFFSET_INTERVAL_MS), anyString())).thenReturn( + offsetFlushIntervalMs); + when(jobProfile.get(eq(SQLServerReader.JOB_DATABASE_STORE_HISTORY_FILENAME), anyString())).thenReturn( + offsetStoreFileName); + when(jobProfile.get(eq(SQLServerReader.JOB_DATABASE_SNAPSHOT_MODE), anyString())).thenReturn(snapshotMode); + when(jobProfile.getInt(eq(SQLServerReader.JOB_DATABASE_QUEUE_SIZE), anyInt())).thenReturn(queueSize); + when(jobProfile.get(eq(SQLServerReader.JOB_DATABASE_STORE_HISTORY_FILENAME))).thenReturn( + databaseStoreHistoryName); + when(jobProfile.get(eq(SQLServerReader.JOB_DATABASE_OFFSETS), anyString())).thenReturn(offset); + when(jobProfile.get(eq(SQLServerReader.JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE), anyString())).thenReturn( + specificOffsetFile); + when(jobProfile.get(eq(SQLServerReader.JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS), anyString())).thenReturn( + specificOffsetPos); + whenNew(SqlServerSnapshotBase.class).withAnyArguments().thenReturn(sqlServerSnapshot); + + //mock sqlServerMessageQueue + whenNew(LinkedBlockingQueue.class).withAnyArguments().thenReturn(sqlServerMessageQueue); + + //mock DebeziumEngine + mockStatic(DebeziumEngine.class); + when(DebeziumEngine.create(io.debezium.engine.format.Json.class)).thenReturn(builder); + when(builder.using(any(Properties.class))).thenReturn(builder); + when(builder.notifying(any(DebeziumEngine.ChangeConsumer.class))).thenReturn(builder); + when(builder.using(any(DebeziumEngine.CompletionCallback.class))).thenReturn(builder); + when(builder.build()).thenReturn(engine); + + //mock executorService + mockStatic(Executors.class); + when(Executors.newSingleThreadExecutor()).thenReturn(executorService); //mock metrics whenNew(AgentMetricItemSet.class).withArguments(anyString()).thenReturn(agentMetricItemSet); @@ -146,7 +165,7 @@ public class TestSQLServerReader { //init method mockStatic(MetricRegister.class); - (reader = new SQLServerReader(sql)).init(jobProfile); + (reader = new SQLServerReader()).init(jobProfile); } /** @@ -154,23 +173,16 @@ public class TestSQLServerReader { */ @Test public void testRead() throws Exception { - final String v11 = "11"; - final String v12 = "12"; - final String v21 = "aa"; - final String v22 = "bb"; - - final String msg1 = String.join(SQLServerReader.STD_FIELD_SEPARATOR_SHORT, v11, v12); - final String msg2 = String.join(SQLServerReader.STD_FIELD_SEPARATOR_SHORT, v21, v22); - - when(resultSet.next()).thenReturn(true, true, false); - when(resultSet.getString(1)).thenReturn(v11, v21); - when(resultSet.getString(2)).thenReturn(v12, v22); - Message message1 = reader.read(); - assertEquals(msg1, message1.toString()); - verify(preparedStatement, times(1)).setFetchSize(SQLServerReader.DEFAULT_JOB_DATABASE_BATCH_SIZE); - Message message2 = reader.read(); - assertEquals(msg2, message2.toString()); - assertEquals(2L, atomicLong.get()); + final String right = "value"; + final String left = "key"; + final String dataKey = "dataKey"; + when(sqlServerMessageQueue.isEmpty()).thenReturn(true); + assertEquals(null, reader.read()); + when(sqlServerMessageQueue.isEmpty()).thenReturn(false); + when(sqlServerMessageQueue.poll()).thenReturn(Pair.of(left, right)); + Message result = reader.read(); + assertEquals(String.join(right, "\"", "\""), result.toString()); + assertEquals(left, result.getHeader().get(dataKey)); } /** @@ -178,12 +190,11 @@ public class TestSQLServerReader { */ @Test public void testDestroy() throws Exception { - assertFalse(reader.isFinished()); + assertFalse(reader.isDestroyed()); reader.destroy(); - verify(resultSet).close(); - verify(preparedStatement).close(); - verify(conn).close(); - assertTrue(reader.isFinished()); + verify(executorService).shutdownNow(); + verify(sqlServerSnapshot).close(); + assertTrue(reader.isDestroyed()); } /** @@ -192,10 +203,7 @@ public class TestSQLServerReader { @Test public void testFinishRead() throws Exception { assertFalse(reader.isFinished()); - reader.destroy(); - verify(resultSet).close(); - verify(preparedStatement).close(); - verify(conn).close(); + reader.finishRead(); assertTrue(reader.isFinished()); } @@ -212,7 +220,9 @@ public class TestSQLServerReader { */ @Test public void testGetSnapshot() { - assertEquals(StringUtils.EMPTY, reader.getSnapshot()); + final String snapShort = "snapShort"; + when(sqlServerSnapshot.getSnapshot()).thenReturn(snapShort); + assertEquals(snapShort, reader.getSnapshot()); } /** @@ -220,6 +230,6 @@ public class TestSQLServerReader { */ @Test public void testGetReadSource() { - assertEquals(sql, reader.getReadSource()); + assertEquals(instanceId, reader.getReadSource()); } } diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestSQLServerSource.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestSQLServerSource.java index 97646ccbf..90b71df9a 100644 --- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestSQLServerSource.java +++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestSQLServerSource.java @@ -17,9 +17,7 @@ package org.apache.inlong.agent.plugin.sources; -import org.apache.commons.lang3.StringUtils; import org.apache.inlong.agent.conf.JobProfile; -import org.apache.inlong.agent.constant.CommonConstants; import org.apache.inlong.agent.metrics.AgentMetricItem; import org.apache.inlong.agent.metrics.AgentMetricItemSet; import org.apache.inlong.common.metric.MetricItem; @@ -38,7 +36,6 @@ import java.util.concurrent.atomic.AtomicLong; import static org.junit.Assert.assertEquals; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.ArgumentMatchers.eq; import static org.powermock.api.mockito.PowerMockito.when; import static org.powermock.api.mockito.PowerMockito.whenNew; import static org.powermock.api.support.membermodification.MemberMatcher.field; @@ -84,20 +81,10 @@ public class TestSQLServerSource { */ @Test public void testSplit() { - final String sql1 = "select * from dbo.test01"; - final String sql2 = "select * from dbo.test${01,99}"; // build mock - when(jobProfile.get(eq(CommonConstants.PROXY_INLONG_GROUP_ID), anyString())).thenReturn("test_group"); - when(jobProfile.get(eq(CommonConstants.PROXY_INLONG_STREAM_ID), anyString())).thenReturn("test_stream"); - when(jobProfile.get(eq(SQLServerSource.JOB_DATABASE_SQL), eq(StringUtils.EMPTY))).thenReturn(StringUtils.EMPTY, - sql1, sql2); - final SQLServerSource source = new SQLServerSource(); - // assert - assertEquals(null, source.split(jobProfile)); assertEquals(1, source.split(jobProfile).size()); - assertEquals(99, source.split(jobProfile).size()); } } diff --git a/pom.xml b/pom.xml index eadd23c91..7b5c185f7 100644 --- a/pom.xml +++ b/pom.xml @@ -606,6 +606,12 @@ <version>${debezium.version}</version> </dependency> + <dependency> + <groupId>io.debezium</groupId> + <artifactId>debezium-connector-sqlserver</artifactId> + <version>${debezium.version}</version> + </dependency> + <dependency> <groupId>io.debezium</groupId> <artifactId>debezium-connector-mongodb</artifactId>
