This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 590e46ebe3 [INLONG-10289][Agent] Update the SQLServer Source (#10735)
590e46ebe3 is described below
commit 590e46ebe3471a8f72acc199cb8cad120b49fe42
Author: zoy0 <[email protected]>
AuthorDate: Fri Aug 2 09:48:25 2024 +0800
[INLONG-10289][Agent] Update the SQLServer Source (#10735)
---
.../inlong/agent/constant/TaskConstants.java | 11 ++
.../apache/inlong/agent/pojo/SqlServerTask.java | 5 +
.../apache/inlong/agent/pojo/TaskProfileDto.java | 5 +
.../agent/plugin/instance/SQLServerInstance.java | 31 ++++
.../agent/plugin/sources/SQLServerSource.java | 172 +++++++++++++++----
.../inlong/agent/plugin/task/SQLServerTask.java | 111 ++++++++++++
.../agent/plugin/utils/SQLServerTimeConverter.java | 124 ++++++++++++++
.../agent/plugin/sources/TestSQLServerSource.java | 188 +++++++++++++++++----
8 files changed, 585 insertions(+), 62 deletions(-)
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
index 4cd6ac56ed..1f142f839e 100755
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
@@ -153,6 +153,17 @@ public class TaskConstants extends CommonConstants {
public static final String TASK_MQTT_AUTOMATIC_RECONNECT =
"task.mqttTask.automaticReconnect";
public static final String TASK_MQTT_VERSION = "task.mqttTask.mqttVersion";
+ // SQLServer task
+ public static final String TASK_SQLSERVER_HOSTNAME =
"task.sqlserverTask.hostname";
+ public static final String TASK_SQLSERVER_PORT = "task.sqlserverTask.port";
+ public static final String TASK_SQLSERVER_USER = "task.sqlserverTask.user";
+ public static final String TASK_SQLSERVER_PASSWORD =
"task.sqlserverTask.password";
+ public static final String TASK_SQLSERVER_DB_NAME =
"task.sqlserverTask.dbname";
+ public static final String TASK_SQLSERVER_SNAPSHOT_MODE =
"task.sqlserverTask.snapshot.mode";
+ public static final String TASK_SQLSERVER_SERVER_NAME =
"task.sqlserverTask.serverName";
+ public static final String TASK_SQLSERVER_SCHEMA_NAME =
"task.sqlserverTask.schemaName";
+ public static final String TASK_SQLSERVER_TABLE_NAME =
"task.sqlserverTask.tableName";
+
public static final String TASK_STATE = "task.state";
public static final String INSTANCE_STATE = "instance.state";
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/SqlServerTask.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/SqlServerTask.java
index 56e4a9b920..9240e8366a 100644
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/SqlServerTask.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/SqlServerTask.java
@@ -28,6 +28,9 @@ public class SqlServerTask {
private String port;
private String serverName;
private String dbname;
+ private String schemaName;
+ private String tableName;
+ private String serverTimezone;
private SqlServerTask.Snapshot snapshot;
private SqlServerTask.Offset offset;
@@ -63,6 +66,8 @@ public class SqlServerTask {
private String port;
private String database;
private String schemaName;
+ private String tableName;
+ private String serverTimezone;
private String snapshotMode;
private String intervalMs;
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
index cc6cfe8244..039acea32d 100644
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
@@ -54,6 +54,7 @@ public class TaskProfileDto {
public static final String DEFAULT_MONGODB_TASK =
"org.apache.inlong.agent.plugin.task.MongoDBTask";
public static final String DEFAULT_POSTGRESQL_TASK =
"org.apache.inlong.agent.plugin.task.PostgreSQLTask";
public static final String DEFAULT_MQTT_TASK =
"org.apache.inlong.agent.plugin.task.MqttTask";
+ public static final String DEFAULT_SQLSERVER_TASK =
"org.apache.inlong.agent.plugin.task.SQLServerTask";
public static final String DEFAULT_CHANNEL =
"org.apache.inlong.agent.plugin.channel.MemoryChannel";
public static final String MANAGER_JOB = "MANAGER_JOB";
public static final String DEFAULT_DATA_PROXY_SINK =
"org.apache.inlong.agent.plugin.sinks.ProxySink";
@@ -343,6 +344,9 @@ public class TaskProfileDto {
sqlServerTask.setPort(config.getPort());
sqlServerTask.setServerName(config.getSchemaName());
sqlServerTask.setDbname(config.getDatabase());
+ sqlServerTask.setSchemaName(config.getSchemaName());
+ sqlServerTask.setTableName(config.getSchemaName() + "." +
config.getTableName());
+ sqlServerTask.setServerTimezone(config.getServerTimezone());
SqlServerTask.Offset offset = new SqlServerTask.Offset();
offset.setFilename(config.getOffsetFilename());
@@ -495,6 +499,7 @@ public class TaskProfileDto {
profileDto.setTask(task);
break;
case SQLSERVER:
+ task.setTaskClass(DEFAULT_SQLSERVER_TASK);
SqlServerTask sqlserverTask = getSqlServerTask(dataConfig);
task.setSqlserverTask(sqlserverTask);
task.setSource(SQLSERVER_SOURCE);
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/SQLServerInstance.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/SQLServerInstance.java
new file mode 100644
index 0000000000..735f416ff9
--- /dev/null
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/SQLServerInstance.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.instance;
+
+import org.apache.inlong.agent.conf.InstanceProfile;
+import org.apache.inlong.agent.constant.TaskConstants;
+
+import java.io.IOException;
+
+public class SQLServerInstance extends CommonInstance {
+
+ @Override
+ public void setInodeInfo(InstanceProfile profile) throws IOException {
+ profile.set(TaskConstants.INODE_INFO, "");
+ }
+}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/SQLServerSource.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/SQLServerSource.java
index e067833c6b..01e61a99bd 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/SQLServerSource.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/SQLServerSource.java
@@ -17,66 +17,184 @@
package org.apache.inlong.agent.plugin.sources;
+import org.apache.inlong.agent.common.AgentThreadFactory;
+import org.apache.inlong.agent.conf.AgentConfiguration;
import org.apache.inlong.agent.conf.InstanceProfile;
import org.apache.inlong.agent.conf.TaskProfile;
-import org.apache.inlong.agent.plugin.Message;
+import org.apache.inlong.agent.constant.AgentConstants;
+import org.apache.inlong.agent.constant.SqlServerConstants;
+import org.apache.inlong.agent.constant.TaskConstants;
+import org.apache.inlong.agent.except.FileException;
import org.apache.inlong.agent.plugin.file.Reader;
import org.apache.inlong.agent.plugin.sources.file.AbstractSource;
-import org.apache.inlong.agent.plugin.sources.reader.SQLServerReader;
+import io.debezium.connector.sqlserver.SqlServerConnector;
+import io.debezium.connector.sqlserver.SqlServerConnectorConfig;
+import io.debezium.engine.ChangeEvent;
+import io.debezium.engine.DebeziumEngine;
+import io.debezium.engine.format.Json;
+import io.debezium.engine.spi.OffsetCommitPolicy;
+import io.debezium.relational.history.FileDatabaseHistory;
+import org.apache.kafka.connect.storage.FileOffsetBackingStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
/**
* SQLServer source
*/
public class SQLServerSource extends AbstractSource {
- private static final Logger logger =
LoggerFactory.getLogger(SQLServerSource.class);
+ private static final Logger LOGGER =
LoggerFactory.getLogger(SQLServerSource.class);
+ private static final Integer DEBEZIUM_QUEUE_SIZE = 100;
+ private ExecutorService executor;
+ public InstanceProfile profile;
+ private BlockingQueue<SourceData> debeziumQueue;
+ private final Properties props = new Properties();
+ private String dbName;
+ private String schemaName;
+ private String tableName;
public SQLServerSource() {
}
- @Override
- public List<Reader> split(TaskProfile conf) {
- SQLServerReader sqlServerReader = new SQLServerReader();
- List<Reader> readerList = new ArrayList<>();
- readerList.add(sqlServerReader);
- sourceMetric.sourceSuccessCount.incrementAndGet();
- return readerList;
+ protected void initSource(InstanceProfile profile) {
+ try {
+ LOGGER.info("SQLServerSource init: {}", profile.toJsonStr());
+ debeziumQueue = new LinkedBlockingQueue<>(DEBEZIUM_QUEUE_SIZE);
+
+ dbName = profile.get(TaskConstants.TASK_SQLSERVER_DB_NAME);
+ schemaName = profile.get(TaskConstants.TASK_SQLSERVER_SCHEMA_NAME);
+ tableName = profile.get(TaskConstants.TASK_SQLSERVER_TABLE_NAME);
+
+ props.setProperty("name", "SQLServer-" + instanceId);
+ props.setProperty("connector.class",
SqlServerConnector.class.getName());
+ props.setProperty("offset.storage",
FileOffsetBackingStore.class.getName());
+ String agentPath = AgentConfiguration.getAgentConf()
+ .get(AgentConstants.AGENT_HOME,
AgentConstants.DEFAULT_AGENT_HOME);
+ String offsetPath = agentPath + "/" + getThreadName() +
"/offset.dat";
+ props.setProperty("offset.storage.file.filename", offsetPath);
+ props.setProperty("offset.flush.interval.ms", "10000");
+ props.setProperty("database.history",
FileDatabaseHistory.class.getCanonicalName());
+ props.setProperty("database.history.file.filename", agentPath +
"/" + getThreadName() + "/history.dat");
+ // ignore "schema" and extract data from "payload"
+ props.setProperty("key.converter.schemas.enable", "false");
+ props.setProperty("value.converter.schemas.enable", "false");
+ // ignore ddl
+ props.setProperty("include.schema.changes", "false");
+ // convert time to formatted string
+ props.setProperty("converters", "datetime");
+ props.setProperty("datetime.type",
"org.apache.inlong.agent.plugin.utils.SQLServerTimeConverter");
+ props.setProperty("datetime.format.date", "yyyy-MM-dd");
+ props.setProperty("datetime.format.time", "HH:mm:ss");
+ props.setProperty("datetime.format.datetime", "yyyy-MM-dd
HH:mm:ss");
+ props.setProperty("datetime.format.timestamp", "yyyy-MM-dd
HH:mm:ss");
+
+
props.setProperty(String.valueOf(SqlServerConnectorConfig.HOSTNAME),
+ profile.get(TaskConstants.TASK_SQLSERVER_HOSTNAME));
+ props.setProperty(String.valueOf(SqlServerConnectorConfig.PORT),
+ profile.get(TaskConstants.TASK_SQLSERVER_PORT));
+ props.setProperty(String.valueOf(SqlServerConnectorConfig.USER),
+ profile.get(TaskConstants.TASK_SQLSERVER_USER));
+
props.setProperty(String.valueOf(SqlServerConnectorConfig.PASSWORD),
+ profile.get(TaskConstants.TASK_SQLSERVER_PASSWORD));
+
props.setProperty(String.valueOf(SqlServerConnectorConfig.DATABASE_NAME),
+ profile.get(TaskConstants.TASK_SQLSERVER_DB_NAME));
+
props.setProperty(String.valueOf(SqlServerConnectorConfig.SNAPSHOT_MODE),
+ profile.get(TaskConstants.TASK_SQLSERVER_SNAPSHOT_MODE,
SqlServerConstants.INITIAL));
+
props.setProperty(String.valueOf(SqlServerConnectorConfig.SERVER_NAME),
+ profile.get(TaskConstants.TASK_SQLSERVER_SERVER_NAME));
+
props.setProperty(String.valueOf(SqlServerConnectorConfig.SCHEMA_INCLUDE_LIST),
+ profile.get(TaskConstants.TASK_SQLSERVER_SCHEMA_NAME));
+
props.setProperty(String.valueOf(SqlServerConnectorConfig.TABLE_INCLUDE_LIST),
+ profile.get(TaskConstants.TASK_SQLSERVER_TABLE_NAME));
+
+ executor = Executors.newSingleThreadExecutor();
+ executor.execute(startDebeziumEngine());
+ } catch (Exception ex) {
+ stopRunning();
+ throw new FileException("error init stream for " + instanceId, ex);
+ }
+ }
+
+ private Runnable startDebeziumEngine() {
+ return () -> {
+ AgentThreadFactory.nameThread(getThreadName() + "debezium");
+ try (DebeziumEngine<ChangeEvent<String, String>> debeziumEngine =
DebeziumEngine.create(Json.class)
+ .using(props)
+ .using(OffsetCommitPolicy.always())
+ .notifying(this::handleConsumerEvent)
+ .build()) {
+
+ debeziumEngine.run();
+ } catch (Throwable e) {
+ LOGGER.error("do run error in SQLServer debezium: ", e);
+ }
+ };
+ }
+
+ private void handleConsumerEvent(List<ChangeEvent<String, String>> records,
+ DebeziumEngine.RecordCommitter<ChangeEvent<String, String>>
committer) throws InterruptedException {
+ for (ChangeEvent<String, String> record : records) {
+ boolean offerSuc = false;
+ SourceData sourceData = new
SourceData(record.value().getBytes(StandardCharsets.UTF_8), "0");
+ while (isRunnable() && !offerSuc) {
+ offerSuc = debeziumQueue.offer(sourceData, 1,
TimeUnit.SECONDS);
+ }
+ committer.markProcessed(record);
+ }
+ committer.markBatchFinished();
}
@Override
- protected String getThreadName() {
+ public List<Reader> split(TaskProfile conf) {
return null;
}
@Override
- protected void initSource(InstanceProfile profile) {
-
+ protected String getThreadName() {
+ return "SQLServer-source-" + taskId + "-" + instanceId;
}
@Override
protected void printCurrentState() {
-
+ LOGGER.info("sqlserver databases is {} and schema is {} and table is
{}", dbName, schemaName, tableName);
}
@Override
protected boolean doPrepareToRead() {
- return false;
+ return true;
}
@Override
protected List<SourceData> readFromSource() {
- return null;
- }
-
- @Override
- public Message read() {
- return null;
+ List<SourceData> dataList = new ArrayList<>();
+ try {
+ int size = 0;
+ while (size < BATCH_READ_LINE_TOTAL_LEN) {
+ SourceData sourceData = debeziumQueue.poll(1,
TimeUnit.SECONDS);
+ if (sourceData != null) {
+ LOGGER.info("readFromSource: {}", sourceData.getData());
+ size += sourceData.getData().length;
+ dataList.add(sourceData);
+ } else {
+ break;
+ }
+ }
+
+ } catch (InterruptedException e) {
+ LOGGER.error("poll {} data from debezium queue interrupted.",
instanceId);
+ }
+ return dataList;
}
@Override
@@ -86,16 +204,12 @@ public class SQLServerSource extends AbstractSource {
@Override
protected void releaseSource() {
-
- }
-
- @Override
- public boolean sourceFinish() {
- return false;
+ LOGGER.info("release sqlserver source");
+ executor.shutdownNow();
}
@Override
public boolean sourceExist() {
- return false;
+ return true;
}
-}
+}
\ No newline at end of file
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/SQLServerTask.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/SQLServerTask.java
new file mode 100644
index 0000000000..dd1446b301
--- /dev/null
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/SQLServerTask.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.task;
+
+import org.apache.inlong.agent.conf.InstanceProfile;
+import org.apache.inlong.agent.conf.TaskProfile;
+import org.apache.inlong.agent.constant.CycleUnitType;
+import org.apache.inlong.agent.constant.TaskConstants;
+import org.apache.inlong.agent.utils.AgentUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.List;
+
+public class SQLServerTask extends AbstractTask {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(SQLServerTask.class);
+ public static final String DEFAULT_SQLSERVER_INSTANCE =
"org.apache.inlong.agent.plugin.instance.SQLServerInstance";
+ private boolean isAdded = false;
+
+ private String dbName;
+ private String schemaName;
+ private String tableName;
+ private String instanceId;
+
+ private final DateTimeFormatter dateTimeFormatter =
DateTimeFormatter.ofPattern("yyyyMMddHH");
+
+ @Override
+ protected int getInstanceLimit() {
+ return DEFAULT_INSTANCE_LIMIT;
+ }
+
+ @Override
+ protected void initTask() {
+ LOGGER.info("SQLServer commonInit: {}", taskProfile.toJsonStr());
+ dbName = taskProfile.get(TaskConstants.TASK_SQLSERVER_DB_NAME);
+ tableName = taskProfile.get(TaskConstants.TASK_SQLSERVER_TABLE_NAME);
+ schemaName = taskProfile.get(TaskConstants.TASK_SQLSERVER_SCHEMA_NAME);
+ instanceId = dbName + "-" + tableName;
+ }
+
+ @Override
+ public boolean isProfileValid(TaskProfile profile) {
+ if (!profile.allRequiredKeyExist()) {
+ LOGGER.error("task profile needs all required key");
+ return false;
+ }
+ if (!profile.hasKey(TaskConstants.TASK_SQLSERVER_HOSTNAME)) {
+ LOGGER.error("task profile needs hostname");
+ return false;
+ }
+ if (!profile.hasKey(TaskConstants.TASK_SQLSERVER_PORT)) {
+ LOGGER.error("task profile needs port");
+ return false;
+ }
+ if (!profile.hasKey(TaskConstants.TASK_SQLSERVER_USER)) {
+ LOGGER.error("task profile needs username");
+ return false;
+ }
+ if (!profile.hasKey(TaskConstants.TASK_SQLSERVER_PASSWORD)) {
+ LOGGER.error("task profile needs password");
+ return false;
+ }
+ if (!profile.hasKey(TaskConstants.TASK_SQLSERVER_DB_NAME)) {
+ LOGGER.error("task profile needs DB name");
+ return false;
+ }
+ if (!profile.hasKey(TaskConstants.TASK_SQLSERVER_SCHEMA_NAME)) {
+ LOGGER.error("task profile needs schema name");
+ return false;
+ }
+ if (!profile.hasKey(TaskConstants.TASK_SQLSERVER_TABLE_NAME)) {
+ LOGGER.error("task profile needs table name");
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ protected List<InstanceProfile> getNewInstanceList() {
+ List<InstanceProfile> list = new ArrayList<>();
+ if (isAdded) {
+ return list;
+ }
+ String dataTime = LocalDateTime.now().format(dateTimeFormatter);
+ InstanceProfile instanceProfile =
taskProfile.createInstanceProfile(DEFAULT_SQLSERVER_INSTANCE, instanceId,
+ CycleUnitType.HOUR, dataTime, AgentUtils.getCurrentTime());
+ list.add(instanceProfile);
+ this.isAdded = true;
+ return list;
+ }
+}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/SQLServerTimeConverter.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/SQLServerTimeConverter.java
new file mode 100644
index 0000000000..008dbda4ea
--- /dev/null
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/SQLServerTimeConverter.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.utils;
+
+import io.debezium.spi.converter.CustomConverter;
+import io.debezium.spi.converter.RelationalColumn;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.DateTimeException;
+import java.time.Instant;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
+import java.util.Properties;
+import java.util.function.Consumer;
+
+public class SQLServerTimeConverter implements CustomConverter<SchemaBuilder,
RelationalColumn> {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(SQLServerTimeConverter.class);
+
+ private DateTimeFormatter dateFormatter = DateTimeFormatter.ISO_DATE;
+ private DateTimeFormatter timeFormatter = DateTimeFormatter.ISO_TIME;
+ private DateTimeFormatter datetimeFormatter =
DateTimeFormatter.ISO_DATE_TIME;
+
+ private ZoneOffset defalutZoneOffset =
ZoneOffset.systemDefault().getRules().getOffset(Instant.now());
+
+ @Override
+ public void configure(Properties props) {
+ readProps(props, "format.date", p -> dateFormatter =
DateTimeFormatter.ofPattern(p));
+ readProps(props, "format.time", p -> timeFormatter =
DateTimeFormatter.ofPattern(p));
+ readProps(props, "format.datetime", p -> datetimeFormatter =
DateTimeFormatter.ofPattern(p));
+ readProps(props, "format.timestamp.zone", z -> defalutZoneOffset =
ZoneOffset.of(z));
+ }
+
+ private void readProps(Properties properties, String settingKey,
Consumer<String> callback) {
+ String settingValue = (String) properties.get(settingKey);
+ if (settingValue == null || settingValue.length() == 0) {
+ return;
+ }
+ try {
+ callback.accept(settingValue.trim());
+ } catch (IllegalArgumentException | DateTimeException e) {
+ LOGGER.error("The {} setting is illegal:{}", settingKey,
settingValue);
+ throw e;
+ }
+ }
+
+ @Override
+ public void converterFor(RelationalColumn column,
ConverterRegistration<SchemaBuilder> registration) {
+ String sqlType = column.typeName().toUpperCase();
+ SchemaBuilder schemaBuilder = null;
+ Converter converter = null;
+ if ("DATE".equals(sqlType)) {
+ schemaBuilder =
SchemaBuilder.string().optional().name("org.apache.inlong.agent.date.string");
+ converter = this::convertDate;
+ }
+ if ("TIME".equals(sqlType)) {
+ schemaBuilder =
SchemaBuilder.string().optional().name("org.apache.inlong.agent.time.string");
+ converter = this::convertTime;
+ }
+ if ("DATETIME".equals(sqlType) ||
+ "DATETIME2".equals(sqlType) ||
+ "SMALLDATETIME".equals(sqlType)) {
+ schemaBuilder =
SchemaBuilder.string().optional().name("org.apache.inlong.agent.datetime.string");
+ converter = this::convertDateTime;
+ }
+ if ("DATETIMEOFFSET".equals(sqlType)) {
+ schemaBuilder =
SchemaBuilder.string().optional().name("org.apache.inlong.agent.datetimeoffset.string");
+ converter = this::convertDateTimeOffset;
+ }
+ if (schemaBuilder != null) {
+ registration.register(schemaBuilder, converter);
+ LOGGER.info("register converter for sqlType {} to schema {}",
sqlType, schemaBuilder.name());
+ }
+ }
+
+ private String convertDate(Object input) {
+ if (input instanceof java.sql.Date) {
+ return dateFormatter.format(((java.sql.Date) input).toLocalDate());
+ }
+ return input == null ? null : input.toString();
+ }
+
+ private String convertTime(Object input) {
+ if (input instanceof java.sql.Time) {
+ return timeFormatter.format(((java.sql.Time) input).toLocalTime());
+ } else if (input instanceof java.sql.Timestamp) {
+ return timeFormatter.format(((java.sql.Timestamp)
input).toLocalDateTime().toLocalTime());
+ }
+ return input == null ? null : input.toString();
+ }
+
+ private String convertDateTime(Object input) {
+ if (input instanceof java.sql.Timestamp) {
+ return datetimeFormatter.format(((java.sql.Timestamp)
input).toLocalDateTime());
+ }
+ return input == null ? null : input.toString();
+ }
+
+ private String convertDateTimeOffset(Object input) {
+ if (input instanceof microsoft.sql.DateTimeOffset) {
+ microsoft.sql.DateTimeOffset dateTimeOffset =
(microsoft.sql.DateTimeOffset) input;
+ return datetimeFormatter.format(
+
dateTimeOffset.getOffsetDateTime().withOffsetSameInstant(defalutZoneOffset).toLocalDateTime());
+ }
+ return input == null ? null : input.toString();
+ }
+}
diff --git
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestSQLServerSource.java
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestSQLServerSource.java
index 778c2710c4..0dc8b71be4 100644
---
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestSQLServerSource.java
+++
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestSQLServerSource.java
@@ -17,74 +17,196 @@
package org.apache.inlong.agent.plugin.sources;
+import org.apache.inlong.agent.conf.InstanceProfile;
import org.apache.inlong.agent.conf.TaskProfile;
-import org.apache.inlong.agent.metrics.AgentMetricItem;
-import org.apache.inlong.agent.metrics.AgentMetricItemSet;
-import org.apache.inlong.common.metric.MetricItem;
+import org.apache.inlong.agent.constant.AgentConstants;
+import org.apache.inlong.agent.constant.CommonConstants;
+import org.apache.inlong.agent.constant.TaskConstants;
+import org.apache.inlong.agent.core.task.OffsetManager;
+import org.apache.inlong.agent.core.task.TaskManager;
+import org.apache.inlong.agent.plugin.AgentBaseTestsHelper;
+import org.apache.inlong.agent.store.Store;
+import org.apache.inlong.agent.utils.AgentUtils;
+import org.apache.inlong.common.enums.TaskStateEnum;
import org.apache.inlong.common.metric.MetricRegister;
+import io.debezium.engine.ChangeEvent;
+import io.debezium.engine.DebeziumEngine;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
-import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
-import java.util.concurrent.atomic.AtomicLong;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import static org.junit.Assert.*;
import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doNothing;
+import static org.powermock.api.mockito.PowerMockito.mockStatic;
import static org.powermock.api.mockito.PowerMockito.when;
-import static org.powermock.api.mockito.PowerMockito.whenNew;
-import static org.powermock.api.support.membermodification.MemberMatcher.field;
/**
* Test cases for {@link SQLServerSource}.
*/
@RunWith(PowerMockRunner.class)
-@PrepareForTest({SQLServerSource.class, MetricRegister.class})
+@PrepareForTest({DebeziumEngine.class, Executors.class, SQLServerSource.class,
MetricRegister.class})
@PowerMockIgnore({"javax.management.*"})
public class TestSQLServerSource {
+ private SQLServerSource source;
+
+ private static AgentBaseTestsHelper helper;
+ // task basic store
+ private static Store taskBasicStore;
+ // instance basic store
+ private static Store instanceBasicStore;
+ // offset basic store
+ private static Store offsetBasicStore;
+
+ InstanceProfile instanceProfile;
+
@Mock
- TaskProfile jobProfile;
+ private DebeziumEngine.Builder builder;
@Mock
- private AgentMetricItemSet agentMetricItemSet;
+ private ExecutorService executorService;
@Mock
- private AgentMetricItem agentMetricItem;
+ DebeziumEngine.RecordCommitter<ChangeEvent<String, String>> committer;
- private AtomicLong sourceSuccessCount;
+ @Mock
+ private DebeziumEngine<ChangeEvent<String, String>> engine;
+
+ private BlockingQueue queue;
- private AtomicLong sourceFailCount;
+ private final String instanceId = "s4bc475560b4444dbd4e9812ab1fd64d";
@Before
public void setup() throws Exception {
- sourceSuccessCount = new AtomicLong(0);
- sourceFailCount = new AtomicLong(0);
-
- // mock metrics
-
whenNew(AgentMetricItemSet.class).withArguments(anyString()).thenReturn(agentMetricItemSet);
-
when(agentMetricItemSet.findMetricItem(any())).thenReturn(agentMetricItem);
- field(AgentMetricItem.class,
"sourceSuccessCount").set(agentMetricItem, sourceSuccessCount);
- field(AgentMetricItem.class, "sourceFailCount").set(agentMetricItem,
sourceFailCount);
- PowerMockito.mockStatic(MetricRegister.class);
- PowerMockito.doNothing().when(
- MetricRegister.class, "register", any(MetricItem.class));
+
+ helper = new
AgentBaseTestsHelper(TestSQLServerSource.class.getName()).setupAgentHome();
+ taskBasicStore =
TaskManager.initStore(AgentConstants.AGENT_STORE_PATH_TASK);
+ instanceBasicStore =
TaskManager.initStore(AgentConstants.AGENT_STORE_PATH_INSTANCE);
+ offsetBasicStore =
+ TaskManager.initStore(AgentConstants.AGENT_STORE_PATH_OFFSET);
+ OffsetManager.init(taskBasicStore, instanceBasicStore,
offsetBasicStore);
+ // mock DebeziumEngine
+ mockStatic(DebeziumEngine.class);
+
when(DebeziumEngine.create(io.debezium.engine.format.Json.class)).thenReturn(builder);
+ when(builder.using(any(Properties.class))).thenReturn(builder);
+
when(builder.notifying(any(DebeziumEngine.ChangeConsumer.class))).thenReturn(builder);
+
when(builder.using(any(DebeziumEngine.CompletionCallback.class))).thenReturn(builder);
+ when(builder.build()).thenReturn(engine);
+
+ doNothing().when(committer).markProcessed(any(ChangeEvent.class));
+ doNothing().when(committer).markBatchFinished();
+
+ // mock executorService
+ mockStatic(Executors.class);
+ when(Executors.newSingleThreadExecutor()).thenReturn(executorService);
+
+ getSource();
+ // init source debeziumQueue
+ Field field = SQLServerSource.class.getDeclaredField("debeziumQueue");
+ field.setAccessible(true);
+ queue = (BlockingQueue) field.get(source);
+ }
+
+ private SQLServerSource getSource() {
+ final String username = "SA";
+ final String password = "123456";
+ final String hostname = "127.0.0.1";
+ final String port = "1434";
+ final String groupId = "group01";
+ final String streamId = "stream01";
+ final String dbName = "inlong";
+ final String schemaName = "dbo";
+ final String tableName = "test_source";
+ final String serverName = "server-01";
+
+ TaskProfile taskProfile = helper.getTaskProfile(1, "", false, 0L, 0L,
TaskStateEnum.RUNNING, "D",
+ "GMT+8:00");
+ instanceProfile = taskProfile.createInstanceProfile("",
+ "", taskProfile.getCycleUnit(), "20240725",
AgentUtils.getCurrentTime());
+ instanceProfile.set(CommonConstants.PROXY_INLONG_GROUP_ID, groupId);
+ instanceProfile.set(CommonConstants.PROXY_INLONG_STREAM_ID, streamId);
+ instanceProfile.set(TaskConstants.TASK_SQLSERVER_USER, username);
+ instanceProfile.set(TaskConstants.TASK_SQLSERVER_PASSWORD, password);
+ instanceProfile.set(TaskConstants.TASK_SQLSERVER_HOSTNAME, hostname);
+ instanceProfile.set(TaskConstants.TASK_SQLSERVER_PORT, port);
+ instanceProfile.set(TaskConstants.TASK_SQLSERVER_DB_NAME, dbName);
+ instanceProfile.set(TaskConstants.TASK_SQLSERVER_SCHEMA_NAME,
schemaName);
+ instanceProfile.set(TaskConstants.TASK_SQLSERVER_TABLE_NAME,
tableName);
+ instanceProfile.set(TaskConstants.TASK_SQLSERVER_SERVER_NAME,
serverName);
+ instanceProfile.set(TaskConstants.TASK_AUDIT_VERSION, "0");
+ instanceProfile.setInstanceId(instanceId);
+
+ (source = new SQLServerSource()).init(instanceProfile);
+ return source;
}
- /**
- * Test cases for .
- */
@Test
- public void testSplit() {
+ public void testSQLServerSource() throws Exception {
+ testHandleConsumerEvent();
+ TestReadDataFromSource();
+ TestReadEmptyFromSource();
+ }
+
+ // test DebeziumEngine get one recode from SQLServer
+ private void testHandleConsumerEvent() throws Exception {
+ List<ChangeEvent<String, String>> records = new ArrayList<>();
+ records.add(new ChangeEvent<String, String>() {
+
+ @Override
+ public String key() {
+ return "KEY";
+ }
+
+ @Override
+ public String value() {
+ return "VALUE";
+ }
+
+ @Override
+ public String destination() {
+ return null;
+ }
+ });
+ Method handleConsumerEvent =
SQLServerSource.class.getDeclaredMethod("handleConsumerEvent", List.class,
+ DebeziumEngine.RecordCommitter.class);
+ handleConsumerEvent.setAccessible(true);
+ handleConsumerEvent.invoke(source, records, committer);
+ assertEquals(1, queue.size());
+ }
+
+ // test read one source data from queue
+ private void TestReadDataFromSource() throws Exception {
+ Method handleConsumerEvent =
SQLServerSource.class.getDeclaredMethod("readFromSource");
+ handleConsumerEvent.setAccessible(true);
+
+ List result = (List) handleConsumerEvent.invoke(source);
+ assertFalse(result.isEmpty());
+ assertTrue(queue.isEmpty());
+ }
+
+ // test read
+ private void TestReadEmptyFromSource() throws Exception {
+ Method handleConsumerEvent =
SQLServerSource.class.getDeclaredMethod("readFromSource");
+ handleConsumerEvent.setAccessible(true);
- // build mock
- final SQLServerSource source = new SQLServerSource();
- // assert
- // assertEquals(1, source.split(jobProfile).size());
+ queue.clear();
+ List result = (List) handleConsumerEvent.invoke(source);
+ assertTrue(result.isEmpty());
+ assertTrue(queue.isEmpty());
}
}