This is an automated email from the ASF dual-hosted git repository.
aloyszhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new bd4c3dbecf [INLONG-10703][Manager] Manager Add Oceanbase Support
(#10701)
bd4c3dbecf is described below
commit bd4c3dbecfa75601ad53d68fb767b6afd639d3c5
Author: xxsc0529 <[email protected]>
AuthorDate: Wed Jul 31 20:02:03 2024 +0800
[INLONG-10703][Manager] Manager Add Oceanbase Support (#10701)
---
.../inlong/manager/common/consts/DataNodeType.java | 1 +
.../inlong/manager/common/consts/SinkType.java | 3 +
.../inlong/manager/common/consts/SourceType.java | 1 +
.../strategy/OceanBaseFieldTypeStrategy.java | 40 +++
.../resources/oceanbase-field-type-mapping.yaml | 228 +++++++++++++++++
.../pojo/node/oceanbase/OceanBaseDataNodeDTO.java | 85 +++++++
.../pojo/node/oceanbase/OceanBaseDataNodeInfo.java | 56 ++++
.../node/oceanbase/OceanBaseDataNodeRequest.java | 46 ++++
.../pojo/sink/oceanbase/OceanBaseColumnInfo.java | 37 +++
.../manager/pojo/sink/oceanbase/OceanBaseSink.java | 72 ++++++
.../pojo/sink/oceanbase/OceanBaseSinkDTO.java | 224 ++++++++++++++++
.../pojo/sink/oceanbase/OceanBaseSinkRequest.java | 60 +++++
.../pojo/sink/oceanbase/OceanBaseTableInfo.java | 43 ++++
.../pojo/sort/node/provider/OceanBaseProvider.java | 79 ++++++
.../source/oceanbase/OceanBaseBinlogSource.java | 117 +++++++++
.../source/oceanbase/OceanBaseBinlogSourceDTO.java | 146 +++++++++++
.../oceanbase/OceanBaseBinlogSourceRequest.java | 123 +++++++++
.../node/oceanbase/OceanBaseDataNodeOperator.java | 127 ++++++++++
.../sink/oceanbase/OceanBaseJdbcUtils.java | 282 +++++++++++++++++++++
.../sink/oceanbase/OceanBaseResourceOperator.java | 143 +++++++++++
.../sink/oceanbase/OceanBaseSqlBuilder.java | 231 +++++++++++++++++
.../sink/oceanbase/OceanBaseSinkOperator.java | 107 ++++++++
.../source/oceanbase/OceanBaseSourceOperator.java | 122 +++++++++
23 files changed, 2373 insertions(+)
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java
index fda7ee7262..0f1952c938 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java
@@ -37,6 +37,7 @@ public class DataNodeType {
public static final String SQLSERVER = "SQLSERVER";
public static final String MONGODB = "MONGODB";
public static final String DORIS = "DORIS";
+ public static final String OCEANBASE = "OCEANBASE";
/**
* Tencent cloud log service
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java
index 554651ac8c..5d069e33df 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java
@@ -71,6 +71,9 @@ public class SinkType extends StreamType {
@SupportSortType(sortType = SortType.SORT_FLINK)
public static final String TUBEMQ = "TUBEMQ";
+ @SupportSortType(sortType = SortType.SORT_FLINK)
+ public static final String OCEANBASE = "OCEANBASE";
+
/**
* Tencent cloud log service
* Details: <a href="https://www.tencentcloud.com/products/cls">CLS</a>
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SourceType.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SourceType.java
index f40593e421..cb6f1a7f6b 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SourceType.java
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SourceType.java
@@ -36,6 +36,7 @@ public class SourceType extends StreamType {
public static final String MONGODB = "MONGODB";
public static final String REDIS = "REDIS";
public static final String MQTT = "MQTT";
+ public static final String OCEANBASE = "OCEANBASE";
public static final Map<String, TaskTypeEnum> SOURCE_TASK_MAP = new
HashMap<String, TaskTypeEnum>() {
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/fieldtype/strategy/OceanBaseFieldTypeStrategy.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/fieldtype/strategy/OceanBaseFieldTypeStrategy.java
new file mode 100644
index 0000000000..8482899075
--- /dev/null
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/fieldtype/strategy/OceanBaseFieldTypeStrategy.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.fieldtype.strategy;
+
+import org.apache.inlong.manager.common.consts.DataNodeType;
+import org.apache.inlong.manager.common.fieldtype.FieldTypeMappingReader;
+
+import org.springframework.stereotype.Service;
+
+/**
+ * The oceanbase field type mapping strategy
+ */
+@Service
+public class OceanBaseFieldTypeStrategy extends DefaultFieldTypeStrategy {
+
+ public OceanBaseFieldTypeStrategy() {
+ this.reader = new FieldTypeMappingReader(DataNodeType.OCEANBASE);
+ }
+
+ @Override
+ public Boolean accept(String type) {
+ return DataNodeType.OCEANBASE.equals(type);
+ }
+
+}
diff --git
a/inlong-manager/manager-common/src/main/resources/oceanbase-field-type-mapping.yaml
b/inlong-manager/manager-common/src/main/resources/oceanbase-field-type-mapping.yaml
new file mode 100644
index 0000000000..0d2a0599cf
--- /dev/null
+++
b/inlong-manager/manager-common/src/main/resources/oceanbase-field-type-mapping.yaml
@@ -0,0 +1,228 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+source.type.to.target.type.converter:
+
+ - source.type: TINYINT
+ target.type: TINYINT
+
+ - source.type: SMALLINT
+ target.type: SMALLINT
+
+ - source.type: TINYINT UNSIGNED
+ target.type: SMALLINT
+
+ - source.type: TINYINT UNSIGNED ZEROFILL
+ target.type: SMALLINT
+
+ - source.type: INT
+ target.type: INT
+
+ - source.type: INTEGER
+ target.type: INT
+
+ - source.type: YEAR
+ target.type: INT
+
+ - source.type: SHORT
+ target.type: SHORT
+
+ - source.type: MEDIUMINT
+ target.type: INT
+
+ - source.type: SMALLINT UNSIGNED
+ target.type: INT
+
+ - source.type: SMALLINT UNSIGNED ZEROFILL
+ target.type: INT
+
+ - source.type: BIGINT
+ target.type: LONG
+
+ - source.type: INT UNSIGNED
+ target.type: LONG
+
+ - source.type: MEDIUMINT UNSIGNED
+ target.type: LONG
+
+ - source.type: MEDIUMINT UNSIGNED ZEROFILL
+ target.type: LONG
+
+ - source.type: INT UNSIGNED ZEROFILL
+ target.type: LONG
+
+ - source.type: BIGINT UNSIGNED
+ target.type: DECIMAL
+
+ - source.type: BIGINT UNSIGNED ZEROFILL
+ target.type: DECIMAL
+
+ - source.type: SERIAL
+ target.type: DECIMAL
+
+ - source.type: FLOAT
+ target.type: FLOAT
+
+ - source.type: FLOAT UNSIGNED
+ target.type: FLOAT
+
+ - source.type: FLOAT UNSIGNED ZEROFILL
+ target.type: FLOAT
+
+ - source.type: DOUBLE
+ target.type: DOUBLE
+
+ - source.type: DOUBLE UNSIGNED
+ target.type: DOUBLE
+
+ - source.type: DOUBLE UNSIGNED ZEROFILL
+ target.type: DOUBLE
+
+ - source.type: DOUBLE PRECISION
+ target.type: DOUBLE
+
+ - source.type: DOUBLE PRECISION UNSIGNED
+ target.type: DOUBLE
+
+ - source.type: ZEROFILL
+ target.type: DOUBLE
+
+ - source.type: REAL
+ target.type: DOUBLE
+
+ - source.type: REAL UNSIGNED
+ target.type: DOUBLE
+
+ - source.type: REAL UNSIGNED ZEROFILL
+ target.type: DOUBLE
+
+ - source.type: NUMERIC
+ target.type: DECIMAL
+
+ - source.type: NUMERIC UNSIGNED
+ target.type: DECIMAL
+
+ - source.type: NUMERIC UNSIGNED ZEROFILL
+ target.type: DECIMAL
+
+ - source.type: DECIMAL
+ target.type: DECIMAL
+
+ - source.type: DECIMAL UNSIGNED
+ target.type: DECIMAL
+
+ - source.type: DECIMAL UNSIGNED ZEROFILL
+ target.type: DECIMAL
+
+ - source.type: FIXED
+ target.type: DECIMAL
+
+ - source.type: FIXED UNSIGNED
+ target.type: DECIMAL
+
+ - source.type: FIXED UNSIGNED ZEROFILL
+ target.type: DECIMAL
+
+ - source.type: BOOLEAN
+ target.type: BOOLEAN
+
+ - source.type: DATE
+ target.type: DATE
+
+ - source.type: TIME
+ target.type: TIME
+
+ - source.type: DATETIME
+ target.type: TIMESTAMP
+
+ - source.type: TIMESTAMP
+ target.type: TIMESTAMP
+
+ - source.type: CHAR
+ target.type: STRING
+
+ - source.type: JSON
+ target.type: STRING
+
+ - source.type: BIT
+ target.type: STRING
+
+ - source.type: VARCHAR
+ target.type: STRING
+
+ - source.type: TEXT
+ target.type: STRING
+
+ - source.type: BLOB
+ target.type: STRING
+
+ - source.type: TINYBLOB
+ target.type: STRING
+
+ - source.type: TINYTEXT
+ target.type: STRING
+
+ - source.type: MEDIUMBLOB
+ target.type: STRING
+
+ - source.type: MEDIUMTEXT
+ target.type: STRING
+
+ - source.type: LONGBLOB
+ target.type: STRING
+
+ - source.type: LONGTEXT
+ target.type: STRING
+
+ - source.type: VARBINARY
+ target.type: STRING
+
+ - source.type: GEOMETRY
+ target.type: STRING
+
+ - source.type: POINT
+ target.type: STRING
+
+ - source.type: LINESTRING
+ target.type: STRING
+
+ - source.type: POLYGON
+ target.type: STRING
+
+ - source.type: MULTIPOINT
+ target.type: STRING
+
+ - source.type: MULTILINESTRING
+ target.type: STRING
+
+ - source.type: MULTIPOLYGON
+ target.type: STRING
+
+ - source.type: GEOMETRYCOLLECTION
+ target.type: STRING
+
+ - source.type: ENUM
+ target.type: STRING
+
+ - source.type: STRING
+ target.type: STRING
+
+ - source.type: BINARY
+ target.type: BINARY
+
+ - source.type: BYTE
+ target.type: BYTE
\ No newline at end of file
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/oceanbase/OceanBaseDataNodeDTO.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/oceanbase/OceanBaseDataNodeDTO.java
new file mode 100644
index 0000000000..2b2f9ff214
--- /dev/null
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/oceanbase/OceanBaseDataNodeDTO.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.node.oceanbase;
+
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonUtils;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.validation.constraints.NotNull;
+
+/**
+ * OceanBase data node info
+ */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+@ApiModel("OceanBase data node info")
+public class OceanBaseDataNodeDTO {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(OceanBaseDataNodeDTO.class);
+ private static final String OCEANBASE_JDBC_PREFIX = "jdbc:oceanbase://";
+
+ @ApiModelProperty("URL of backup DB server")
+ private String backupUrl;
+
+ /**
+ * Get the dto instance from the request
+ */
+ public static OceanBaseDataNodeDTO getFromRequest(OceanBaseDataNodeRequest
request, String extParams) {
+ OceanBaseDataNodeDTO dto = StringUtils.isNotBlank(extParams)
+ ? OceanBaseDataNodeDTO.getFromJson(extParams)
+ : new OceanBaseDataNodeDTO();
+ return CommonBeanUtils.copyProperties(request, dto, true);
+ }
+
+ /**
+ * Get the dto instance from the JSON string.
+ */
+ public static OceanBaseDataNodeDTO getFromJson(@NotNull String extParams) {
+ try {
+ return JsonUtils.parseObject(extParams,
OceanBaseDataNodeDTO.class);
+ } catch (Exception e) {
+ throw new BusinessException(ErrorCodeEnum.CLUSTER_INFO_INCORRECT,
+ String.format("Failed to parse extParams for OceanBase
node: %s", e.getMessage()));
+ }
+ }
+
+ /**
+ * Convert ip:post to jdbcurl.
+ */
+ public static String convertToJdbcurl(String url) {
+ String jdbcUrl = url;
+ if (StringUtils.isNotBlank(jdbcUrl) &&
!jdbcUrl.startsWith(OCEANBASE_JDBC_PREFIX)) {
+ jdbcUrl = OCEANBASE_JDBC_PREFIX + jdbcUrl;
+ }
+ return jdbcUrl;
+ }
+}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/oceanbase/OceanBaseDataNodeInfo.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/oceanbase/OceanBaseDataNodeInfo.java
new file mode 100644
index 0000000000..2751ab4a5a
--- /dev/null
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/oceanbase/OceanBaseDataNodeInfo.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.node.oceanbase;
+
+import org.apache.inlong.manager.common.consts.DataNodeType;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.node.DataNodeInfo;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import lombok.experimental.SuperBuilder;
+
+/**
+ * OceanBase data node info
+ */
+@Data
+@SuperBuilder
+@AllArgsConstructor
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeDefine(value = DataNodeType.OCEANBASE)
+@ApiModel("Hive data node info")
+public class OceanBaseDataNodeInfo extends DataNodeInfo {
+
+ @ApiModelProperty("URL of backup DB servere")
+ private String backupUrl;
+
+ public OceanBaseDataNodeInfo() {
+ this.setType(DataNodeType.OCEANBASE);
+ }
+
+ @Override
+ public OceanBaseDataNodeRequest genRequest() {
+ return CommonBeanUtils.copyProperties(this,
OceanBaseDataNodeRequest::new);
+ }
+}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/oceanbase/OceanBaseDataNodeRequest.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/oceanbase/OceanBaseDataNodeRequest.java
new file mode 100644
index 0000000000..9eacb13483
--- /dev/null
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/oceanbase/OceanBaseDataNodeRequest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.node.oceanbase;
+
+import org.apache.inlong.manager.common.consts.DataNodeType;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.node.DataNodeRequest;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+
+import javax.validation.constraints.Pattern;
+
+/**
+ * OceanBase data node request
+ */
+@Data
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeDefine(value = DataNodeType.OCEANBASE)
+@ApiModel("OceanBase data node request")
+public class OceanBaseDataNodeRequest extends DataNodeRequest {
+
+ @ApiModelProperty("URL of backup DB server")
+ @Pattern(regexp = "^((?!\\s).)*$", message = "not supports blank in url")
+ private String backupUrl;
+
+}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/oceanbase/OceanBaseColumnInfo.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/oceanbase/OceanBaseColumnInfo.java
new file mode 100644
index 0000000000..c5e848737d
--- /dev/null
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/oceanbase/OceanBaseColumnInfo.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sink.oceanbase;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/**
+ * OceanBase column info.
+ */
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class OceanBaseColumnInfo {
+
+ private String name;
+
+ private String type;
+
+ private String comment;
+}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/oceanbase/OceanBaseSink.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/oceanbase/OceanBaseSink.java
new file mode 100644
index 0000000000..f78d1c84ba
--- /dev/null
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/oceanbase/OceanBaseSink.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sink.oceanbase;
+
+import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.sink.SinkRequest;
+import org.apache.inlong.manager.pojo.sink.StreamSink;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import lombok.experimental.SuperBuilder;
+
+/**
+ * OceanBase sink info
+ */
+@Data
+@SuperBuilder
+@AllArgsConstructor
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@ApiModel(value = "OceanBase sink info")
+@JsonTypeDefine(value = SinkType.OCEANBASE)
+public class OceanBaseSink extends StreamSink {
+
+ @ApiModelProperty("OceanBase JDBC URL, such as jdbc:mysql://host:port")
+ private String jdbcUrl;
+
+ @ApiModelProperty("Username for JDBC URL")
+ private String username;
+
+ @ApiModelProperty("User password")
+ private String password;
+
+ @ApiModelProperty("Target database name")
+ private String databaseName;
+
+ @ApiModelProperty("Target table name")
+ private String tableName;
+
+ @ApiModelProperty("Primary key")
+ private String primaryKey;
+
+ public OceanBaseSink() {
+ this.setSinkType(SinkType.MYSQL);
+ }
+
+ @Override
+ public SinkRequest genSinkRequest() {
+ return CommonBeanUtils.copyProperties(this, OceanBaseSinkRequest::new);
+ }
+}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/oceanbase/OceanBaseSinkDTO.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/oceanbase/OceanBaseSinkDTO.java
new file mode 100644
index 0000000000..b358cc9f4e
--- /dev/null
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/oceanbase/OceanBaseSinkDTO.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sink.oceanbase;
+
+import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonUtils;
+import org.apache.inlong.manager.pojo.sink.BaseStreamSink;
+
+import com.google.common.base.Strings;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.validation.constraints.NotNull;
+
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * OceanBase sink info
+ */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class OceanBaseSinkDTO extends BaseStreamSink {
+
+ // The protocol of using mysql in sink
+ private static final Logger LOGGER =
LoggerFactory.getLogger(OceanBaseSinkDTO.class);
+ private static final String OCEANBASE_JDBC_PREFIX = "jdbc:oceanbase://";
+ private static final String OCEANBASE_JDBC_PREFIX_CDC = "jdbc:mysql://";
+
+ @ApiModelProperty("OceanBase JDBC URL, such as jdbc:oceanbase://host:port")
+ private String jdbcUrl;
+
+ @ApiModelProperty("Username for JDBC URL")
+ private String username;
+
+ @ApiModelProperty("User password")
+ private String password;
+
+ @ApiModelProperty("Target database name")
+ private String databaseName;
+
+ @ApiModelProperty("Target table name")
+ private String tableName;
+
+ @ApiModelProperty("Primary key")
+ private String primaryKey;
+
+ @ApiModelProperty("Properties for OceanBase")
+ private Map<String, Object> properties;
+
+ /**
+ * Get the dto instance from the request
+ *
+ * @param request OceanBaseSinkRequest
+ * @return {@link OceanBaseSinkDTO}
+ * @apiNote The config here will be saved to the database, so filter
sensitive params before saving.
+ */
+ public static OceanBaseSinkDTO getFromRequest(OceanBaseSinkRequest
request, String extParams) {
+ OceanBaseSinkDTO dto =
+ StringUtils.isNotBlank(extParams) ?
OceanBaseSinkDTO.getFromJson(extParams) : new OceanBaseSinkDTO();
+ CommonBeanUtils.copyProperties(request, dto, true);
+ return dto;
+ }
+
+ /**
+ * Get OceanBase sink info from JSON string
+ *
+ * @param extParams string ext params
+ * @return {@link OceanBaseSinkDTO}
+ */
+ public static OceanBaseSinkDTO getFromJson(@NotNull String extParams) {
+ try {
+ return JsonUtils.parseObject(extParams, OceanBaseSinkDTO.class);
+ } catch (Exception e) {
+ throw new BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT,
+ String.format("parse extParams of OceanBase SinkDTO
failure: %s", e.getMessage()));
+ }
+ }
+
+ /**
+ * Get OceanBase table info
+ *
+ * @param OceanBaseSink OceanBase sink dto,{@link OceanBaseSinkDTO}
+ * @param columnList OceanBase column info list,{@link OceanBaseColumnInfo}
+ * @return {@link OceanBaseTableInfo}
+ */
+ public static OceanBaseTableInfo getTableInfo(OceanBaseSinkDTO
OceanBaseSink,
+ List<OceanBaseColumnInfo> columnList) {
+ OceanBaseTableInfo tableInfo = new OceanBaseTableInfo();
+ tableInfo.setDbName(OceanBaseSink.getDatabaseName());
+ tableInfo.setTableName(OceanBaseSink.getTableName());
+ tableInfo.setPrimaryKey(OceanBaseSink.getPrimaryKey());
+ tableInfo.setColumns(columnList);
+ return tableInfo;
+ }
+
+ /**
+ * Get DbName from jdbcUrl
+ *
+ * @param jdbcUrl OceanBase JDBC url, such as
jdbc:oceanbase://host:port/database
+ * @return database name
+ */
+ private static String getDbNameFromUrl(String jdbcUrl) {
+ String database = null;
+
+ if (Strings.isNullOrEmpty(jdbcUrl)) {
+ throw new IllegalArgumentException("Invalid JDBC url.");
+ }
+
+ jdbcUrl = jdbcUrl.toLowerCase();
+ if (jdbcUrl.startsWith("jdbc:impala")) {
+ jdbcUrl = jdbcUrl.replace(":impala", "");
+ }
+
+ int pos1;
+ if (!jdbcUrl.startsWith("jdbc:")
+ || (pos1 = jdbcUrl.indexOf(':', 5)) == -1) {
+ throw new IllegalArgumentException("Invalid JDBC url.");
+ }
+
+ String connUri = jdbcUrl.substring(pos1 + 1);
+ if (connUri.startsWith("//")) {
+ int pos = connUri.indexOf('/', 2);
+ if (pos != -1) {
+ database = connUri.substring(pos + 1);
+ }
+ } else {
+ database = connUri;
+ }
+
+ if (Strings.isNullOrEmpty(database)) {
+ throw new IllegalArgumentException("Invalid JDBC URL: " + jdbcUrl);
+ }
+
+ if (database.contains(InlongConstants.QUESTION_MARK)) {
+ database = database.substring(0,
database.indexOf(InlongConstants.QUESTION_MARK));
+ }
+ if (database.contains(InlongConstants.SEMICOLON)) {
+ database = database.substring(0,
database.indexOf(InlongConstants.SEMICOLON));
+ }
+ return database;
+ }
+
+ public static String setDbNameToUrl(String jdbcUrl, String databaseName) {
+ if (StringUtils.isBlank(jdbcUrl)) {
+ return jdbcUrl;
+ }
+ String pattern =
"jdbc:oceanbase://(?<host>[a-zA-Z0-9-//.]+):(?<port>[0-9]+)?(?<ext>)";
+ Pattern namePattern = Pattern.compile(pattern);
+ Matcher dataMatcher = namePattern.matcher(jdbcUrl);
+ StringBuilder resultUrl;
+ if (dataMatcher.find()) {
+ String host = dataMatcher.group("host");
+ String port = dataMatcher.group("port");
+ resultUrl = new StringBuilder().append(OCEANBASE_JDBC_PREFIX)
+ .append(host)
+ .append(InlongConstants.COLON)
+ .append(port)
+ .append(InlongConstants.SLASH)
+ .append(databaseName);
+ } else {
+ throw new BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT,
+ "OceanBase JDBC URL was invalid, it should like
jdbc:mysql://host:port");
+ }
+ if (jdbcUrl.contains(InlongConstants.QUESTION_MARK)) {
+
resultUrl.append(jdbcUrl.substring(jdbcUrl.indexOf(InlongConstants.QUESTION_MARK)));
+ }
+ return resultUrl.toString();
+ }
+ public static String setDbNameToUrlWithCdc(String jdbcUrl, String
databaseName) {
+ if (StringUtils.isBlank(jdbcUrl)) {
+ return jdbcUrl;
+ }
+ String pattern =
"jdbc:oceanbase://(?<host>[a-zA-Z0-9-//.]+):(?<port>[0-9]+)?(?<ext>)";
+ Pattern namePattern = Pattern.compile(pattern);
+ Matcher dataMatcher = namePattern.matcher(jdbcUrl);
+ StringBuilder resultUrl;
+ if (dataMatcher.find()) {
+ String host = dataMatcher.group("host");
+ String port = dataMatcher.group("port");
+ resultUrl = new StringBuilder().append(OCEANBASE_JDBC_PREFIX_CDC)
+ .append(host)
+ .append(InlongConstants.COLON)
+ .append(port)
+ .append(InlongConstants.SLASH)
+ .append(databaseName);
+ } else {
+ throw new BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT,
+ "OceanBase JDBC URL was invalid, it should like
jdbc:mysql://host:port");
+ }
+ if (jdbcUrl.contains(InlongConstants.QUESTION_MARK)) {
+
resultUrl.append(jdbcUrl.substring(jdbcUrl.indexOf(InlongConstants.QUESTION_MARK)));
+ }
+ return resultUrl.toString();
+ }
+}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/oceanbase/OceanBaseSinkRequest.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/oceanbase/OceanBaseSinkRequest.java
new file mode 100644
index 0000000000..f7cc6f1efe
--- /dev/null
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/oceanbase/OceanBaseSinkRequest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sink.oceanbase;
+
+import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.sink.SinkRequest;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+
+import javax.validation.constraints.Pattern;
+
+/**
+ * OceanBase sink request.
+ */
+@Data
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@ApiModel(value = "OceanBase sink request")
+@JsonTypeDefine(value = SinkType.OCEANBASE)
+public class OceanBaseSinkRequest extends SinkRequest {
+
+ @ApiModelProperty("OceanBase JDBC URL, such as jdbc:mysql://host:port")
+ @Pattern(regexp = "^((?!\\s).)*$", message = "not supports blank in url")
+ private String jdbcUrl;
+
+ @ApiModelProperty("Username for JDBC URL")
+ private String username;
+
+ @ApiModelProperty("User password")
+ private String password;
+
+ @ApiModelProperty("Target database name")
+ private String databaseName;
+
+ @ApiModelProperty("Target table name")
+ private String tableName;
+
+ @ApiModelProperty("Primary key")
+ private String primaryKey;
+}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/oceanbase/OceanBaseTableInfo.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/oceanbase/OceanBaseTableInfo.java
new file mode 100644
index 0000000000..1f1f6a3cbd
--- /dev/null
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/oceanbase/OceanBaseTableInfo.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sink.oceanbase;
+
+import lombok.Data;
+
+import java.util.List;
+
+/**
+ * OceanBase table info.
+ */
+@Data
+public class OceanBaseTableInfo {
+
+ private String dbName;
+
+ private String tableName;
+
+ private String comment;
+
+ private String primaryKey;
+
+ private String engine;
+
+ private String charset;
+
+ private List<OceanBaseColumnInfo> columns;
+}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/OceanBaseProvider.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/OceanBaseProvider.java
new file mode 100644
index 0000000000..da117670e0
--- /dev/null
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/OceanBaseProvider.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sort.node.provider;
+
+import org.apache.inlong.manager.common.consts.SinkType;
+import
org.apache.inlong.manager.common.fieldtype.strategy.FieldTypeMappingStrategy;
+import
org.apache.inlong.manager.common.fieldtype.strategy.FieldTypeStrategyFactory;
+import org.apache.inlong.manager.pojo.sink.oceanbase.OceanBaseSink;
+import org.apache.inlong.manager.pojo.sink.oceanbase.OceanBaseSinkDTO;
+import org.apache.inlong.manager.pojo.sort.node.base.LoadNodeProvider;
+import org.apache.inlong.manager.pojo.stream.StreamField;
+import org.apache.inlong.manager.pojo.stream.StreamNode;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.node.LoadNode;
+import org.apache.inlong.sort.protocol.node.load.OceanBaseLoadNode;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
+
+import com.google.common.collect.Lists;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The Provider for creating OceanBase load nodes.
+ */
+@Service
+public class OceanBaseProvider implements LoadNodeProvider {
+
+ @Autowired
+ private FieldTypeStrategyFactory fieldTypeStrategyFactory;
+
+ @Override
+ public Boolean accept(String sinkType) {
+ return SinkType.OCEANBASE.equals(sinkType);
+ }
+
+ @Override
+ public LoadNode createLoadNode(StreamNode nodeInfo, Map<String,
StreamField> constantFieldMap) {
+ OceanBaseSink oceanBaseSink = (OceanBaseSink) nodeInfo;
+ Map<String, String> properties =
parseProperties(oceanBaseSink.getProperties());
+ FieldTypeMappingStrategy fieldTypeMappingStrategy =
+ fieldTypeStrategyFactory.getInstance(SinkType.MYSQL);
+ List<FieldInfo> fieldInfos =
parseSinkFieldInfos(oceanBaseSink.getSinkFieldList(),
oceanBaseSink.getSinkName(),
+ fieldTypeMappingStrategy);
+ List<FieldRelation> fieldRelations =
parseSinkFields(oceanBaseSink.getSinkFieldList(), constantFieldMap);
+
+ return new OceanBaseLoadNode(
+ oceanBaseSink.getSinkName(),
+ oceanBaseSink.getSinkName(),
+ fieldInfos,
+ fieldRelations,
+ Lists.newArrayList(),
+ null,
+ null,
+ properties,
+
OceanBaseSinkDTO.setDbNameToUrlWithCdc(oceanBaseSink.getJdbcUrl(),
oceanBaseSink.getDatabaseName()),
+ oceanBaseSink.getUsername(),
+ oceanBaseSink.getPassword(),
+ oceanBaseSink.getTableName(),
+ oceanBaseSink.getPrimaryKey());
+ }
+}
\ No newline at end of file
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/oceanbase/OceanBaseBinlogSource.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/oceanbase/OceanBaseBinlogSource.java
new file mode 100644
index 0000000000..585a5ba1f2
--- /dev/null
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/oceanbase/OceanBaseBinlogSource.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.source.oceanbase;
+
+import org.apache.inlong.manager.common.consts.SourceType;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.source.SourceRequest;
+import org.apache.inlong.manager.pojo.source.StreamSource;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import lombok.experimental.SuperBuilder;
+
+/**
+ * OceanBase binlog source info
+ */
+@Data
+@SuperBuilder
+@AllArgsConstructor
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@ApiModel(value = "OceanBase binlog source info")
+@JsonTypeDefine(value = SourceType.OCEANBASE)
+public class OceanBaseBinlogSource extends StreamSource {
+
+ @ApiModelProperty("Username of the OceanBase server")
+ private String username;
+
+ @ApiModelProperty("Password of the OceanBase server")
+ private String password;
+
+ @ApiModelProperty("Hostname of the OceanBase server")
+ private String hostname;
+
+ @ApiModelProperty("Port of the OceanBase server")
+ private Integer port;
+
+ @ApiModelProperty("Id of physical node of OceanBase Cluster, 0 if single
node")
+ @Builder.Default
+ private Integer serverId = 0;
+
+ @ApiModelProperty("Whether include schema, default is 'false'")
+ private String includeSchema;
+
+ @ApiModelProperty(value = "List of DBs to be collected, seperated by ',',
supporting regular expressions")
+ private String databaseWhiteList;
+
+ @ApiModelProperty(value = "List of tables to be collected, seperated by
',',supporting regular expressions")
+ private String tableWhiteList;
+
+ @ApiModelProperty("Database time zone, Default is UTC")
+ private String serverTimezone;
+
+ @ApiModelProperty("The interval for recording an offset")
+ private String intervalMs;
+
+ @ApiModelProperty("Snapshot mode, supports: initial, when_needed, never,
schema_only, schema_only_recovery")
+ private String snapshotMode;
+
+ @ApiModelProperty("The file path to store offset info")
+ private String offsetFilename;
+
+ @ApiModelProperty("The file path to store history info")
+ private String historyFilename;
+
+ @ApiModelProperty("Whether to monitor the DDL, default is 'false'")
+ private String monitoredDdl;
+
+ @ApiModelProperty("Timestamp standard for binlog: SQL, ISO_8601")
+ @Builder.Default
+ private String timestampFormatStandard = "SQL";
+
+ @ApiModelProperty("Need transfer total database")
+ private boolean allMigration;
+
+ @ApiModelProperty("Only incremental")
+ private boolean onlyIncremental;
+
+ @ApiModelProperty("Primary key must be shared by all tables")
+ private String primaryKey;
+
+ @ApiModelProperty("Directly read binlog from the specified offset
filename")
+ private String specificOffsetFile;
+
+ @ApiModelProperty("Directly read binlog from the specified offset
position")
+ private Integer specificOffsetPos;
+
+ public OceanBaseBinlogSource() {
+ this.setSourceType(SourceType.OCEANBASE);
+ }
+
+ @Override
+ public SourceRequest genSourceRequest() {
+ return CommonBeanUtils.copyProperties(this,
OceanBaseBinlogSourceRequest::new);
+ }
+}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/oceanbase/OceanBaseBinlogSourceDTO.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/oceanbase/OceanBaseBinlogSourceDTO.java
new file mode 100644
index 0000000000..7db34fd144
--- /dev/null
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/oceanbase/OceanBaseBinlogSourceDTO.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.source.oceanbase;
+
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonUtils;
+
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.commons.lang3.StringUtils;
+
+import javax.validation.constraints.NotNull;
+
+import java.util.Map;
+
+/**
+ * Binlog source info
+ */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class OceanBaseBinlogSourceDTO {
+
+ @ApiModelProperty("Username of the OceanBase server")
+ private String username;
+
+ @ApiModelProperty("Password of the OceanBase server")
+ private String password;
+
+ @ApiModelProperty("Hostname of the OceanBase server")
+ private String hostname;
+
+ @ApiModelProperty("Port of the OceanBase server")
+ private Integer port;
+
+ @ApiModelProperty("Id of physical node of OceanBase Cluster, 0 if single
node")
+ @Builder.Default
+ private Integer serverId = 0;
+
+ @ApiModelProperty("Whether include schema, default is 'false'")
+ private String includeSchema;
+
+ @ApiModelProperty(value = "List of DBs to be collected, supporting regular
expressions, "
+ + "seperated by ',', for example: db1,test_db*", notes = "DBs not
in this list are excluded. If not set, all DBs are monitored")
+ private String databaseWhiteList;
+
+ @ApiModelProperty(value = "List of tables to be collected, supporting
regular expressions, "
+ + "seperated by ',', for example: tb1,user*", notes = "Tables not
in this list are excluded. By default, all tables are monitored")
+ private String tableWhiteList;
+
+ @ApiModelProperty("Database time zone, Default is UTC")
+ private String serverTimezone;
+
+ @ApiModelProperty("The interval for recording an offset")
+ private String intervalMs;
+
+ /**
+ * <code>initial</code>: Default mode, do a snapshot when no offset is
found.
+ * <p/>
+ * <code>when_needed</code>: Similar to initial, do a snapshot when the
binlog position
+ * has been purged on the DB server.
+ * <p/>
+ * <code>never</code>: Do not snapshot.
+ * <p/>
+ * <code>schema_only</code>: All tables' column name will be taken, but
the table data will not be exported,
+ * and it will only be consumed from the end of the binlog at the task is
started.
+ * So it is very suitable for not caring about historical data, but only
about recent changes. the
+ * <p/>
+ * <code>schema_only_recovery</code>: When <code>schema_only</code> mode
fails, use this mode to recover, which is
+ * generally not used.
+ */
+ @ApiModelProperty("Snapshot mode, supports: initial, when_needed, never,
schema_only, schema_only_recovery")
+ private String snapshotMode;
+
+ @ApiModelProperty("The file path to store offset info")
+ private String offsetFilename;
+
+ @ApiModelProperty("The file path to store history info")
+ private String historyFilename;
+
+ @ApiModelProperty("Whether to monitor the DDL, default is 'false'")
+ private String monitoredDdl;
+
+ @ApiModelProperty("Timestamp standard for binlog: SQL, ISO_8601")
+ @Builder.Default
+ private String timestampFormatStandard = "SQL";
+
+ @ApiModelProperty("Whether to migrate all databases")
+ private boolean allMigration;
+
+ @ApiModelProperty("Only incremental")
+ private boolean onlyIncremental;
+
+ @ApiModelProperty("Primary key must be shared by all tables")
+ private String primaryKey;
+
+ @ApiModelProperty("Directly read binlog from the specified offset
filename")
+ private String specificOffsetFile;
+
+ @ApiModelProperty("Directly read binlog from the specified offset
position")
+ private Integer specificOffsetPos;
+
+ @ApiModelProperty("Properties for OceanBase")
+ private Map<String, Object> properties;
+
+ /**
+ * Get the dto instance from the request
+ */
+ public static OceanBaseBinlogSourceDTO
getFromRequest(OceanBaseBinlogSourceRequest request, String extParams) {
+ OceanBaseBinlogSourceDTO dto = StringUtils.isNotBlank(extParams)
+ ? OceanBaseBinlogSourceDTO.getFromJson(extParams)
+ : new OceanBaseBinlogSourceDTO();
+ return CommonBeanUtils.copyProperties(request, dto, true);
+ }
+
+ public static OceanBaseBinlogSourceDTO getFromJson(@NotNull String
extParams) {
+ try {
+ return JsonUtils.parseObject(extParams,
OceanBaseBinlogSourceDTO.class);
+ } catch (Exception e) {
+ throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT,
+ String.format("parse extParams of OceanBaseBinlogSource
failure: %s", e.getMessage()));
+ }
+ }
+
+}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/oceanbase/OceanBaseBinlogSourceRequest.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/oceanbase/OceanBaseBinlogSourceRequest.java
new file mode 100644
index 0000000000..3be8006e88
--- /dev/null
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/oceanbase/OceanBaseBinlogSourceRequest.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.source.oceanbase;
+
+import org.apache.inlong.manager.common.consts.SourceType;
+import org.apache.inlong.manager.common.enums.DataFormat;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.source.SourceRequest;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+
+/**
+ * OceanBase binlog source request
+ */
+@Data
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@ApiModel(value = "OceanBase binlog source request")
+@JsonTypeDefine(value = SourceType.OCEANBASE)
+public class OceanBaseBinlogSourceRequest extends SourceRequest {
+
+ @ApiModelProperty("Username of the DB server")
+ private String username;
+
+ @ApiModelProperty("Password of the DB server")
+ private String password;
+
+ @ApiModelProperty("Hostname of the DB server")
+ private String hostname;
+
+ @ApiModelProperty("Port of the DB server")
+ private Integer port = 3306;
+
+ @ApiModelProperty("Id of physical node of OceanBase Cluster, 0 if single
node")
+ private Integer serverId = 0;
+
+ @ApiModelProperty("Whether include schema, default is 'false'")
+ private String includeSchema;
+
+ @ApiModelProperty(value = "List of DBs to be collected, supporting regular
expressions, "
+ + "separate them with ',', for example: db1,test_db*", notes =
"DBs not in this list are excluded. If not set, all DBs are monitored")
+ private String databaseWhiteList;
+
+ @ApiModelProperty(value = "List of tables to be collected, supporting
regular expressions, "
+ + "separate them with ',', for example: tb1,user*", notes =
"Tables not in this list are excluded. By default, all tables are monitored")
+ private String tableWhiteList;
+
+ @ApiModelProperty("Database time zone, Default is UTC")
+ private String serverTimezone;
+
+ @ApiModelProperty("The interval for recording an offset")
+ private String intervalMs = "500";
+
+ /**
+ * <code>initial</code>: Default mode, do a snapshot when no offset is
found.
+ * <p/>
+ * <code>when_needed</code>: Similar to initial, do a snapshot when the
binlog position
+ * has been purged on the DB server.
+ * <p/>
+ * <code>never</code>: Do not snapshot.
+ * <p/>
+ * <code>schema_only</code>: All tables' column name will be taken, but
the table data will not be exported,
+ * and it will only be consumed from the end of the binlog at the task is
started.
+ * So it is very suitable for not caring about historical data, but only
about recent changes. the
+ * <p/>
+ * <code>schema_only_recovery</code>: When <code>schema_only</code> mode
fails, use this mode to recover, which is
+ * generally not used.
+ */
+ @ApiModelProperty("Snapshot mode, supports: initial, when_needed, never,
schema_only, schema_only_recovery")
+ private String snapshotMode = "initial";
+
+ @ApiModelProperty("The file path to store offset info")
+ private String offsetFilename;
+
+ @ApiModelProperty("The file path to store history info")
+ private String historyFilename;
+
+ @ApiModelProperty("Whether to monitor the DDL, default is 'false'")
+ private String monitoredDdl;
+
+ @ApiModelProperty("Timestamp standard for binlog: SQL, ISO_8601")
+ private String timestampFormatStandard = "SQL";
+
+ @ApiModelProperty("Need transfer total database")
+ private boolean allMigration = false;
+
+ @ApiModelProperty("Only incremental")
+ private boolean onlyIncremental;
+
+ @ApiModelProperty("Primary key must be shared by all tables")
+ private String primaryKey;
+
+ @ApiModelProperty("Directly read binlog from the specified offset
filename")
+ private String specificOffsetFile;
+
+ @ApiModelProperty("Directly read binlog from the specified offset
position")
+ private Integer specificOffsetPos;
+
+ public OceanBaseBinlogSourceRequest() {
+ this.setSourceType(SourceType.OCEANBASE);
+ this.setSerializationType(DataFormat.DEBEZIUM_JSON.getName());
+ }
+
+}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/oceanbase/OceanBaseDataNodeOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/oceanbase/OceanBaseDataNodeOperator.java
new file mode 100644
index 0000000000..84917e624e
--- /dev/null
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/oceanbase/OceanBaseDataNodeOperator.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.node.oceanbase;
+
+import org.apache.inlong.manager.common.consts.DataNodeType;
+import org.apache.inlong.manager.common.consts.SourceType;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.Preconditions;
+import org.apache.inlong.manager.dao.entity.DataNodeEntity;
+import org.apache.inlong.manager.pojo.node.DataNodeInfo;
+import org.apache.inlong.manager.pojo.node.DataNodeRequest;
+import org.apache.inlong.manager.pojo.node.oceanbase.OceanBaseDataNodeDTO;
+import org.apache.inlong.manager.pojo.node.oceanbase.OceanBaseDataNodeInfo;
+import org.apache.inlong.manager.pojo.node.oceanbase.OceanBaseDataNodeRequest;
+import org.apache.inlong.manager.service.node.AbstractDataNodeOperator;
+import
org.apache.inlong.manager.service.resource.sink.oceanbase.OceanBaseJdbcUtils;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.sql.Connection;
+import java.util.Objects;
+
+/**
+ * OceanBase data node operator
+ */
+@Service
+public class OceanBaseDataNodeOperator extends AbstractDataNodeOperator {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(OceanBaseDataNodeOperator.class);
+
+ @Autowired
+ private ObjectMapper objectMapper;
+
+ @Override
+ public Boolean accept(String dataNodeType) {
+ return getDataNodeType().equals(dataNodeType);
+ }
+
+ @Override
+ public String getDataNodeType() {
+ return DataNodeType.OCEANBASE;
+ }
+
+ @Override
+ public DataNodeInfo getFromEntity(DataNodeEntity entity) {
+ if (entity == null) {
+ throw new BusinessException(ErrorCodeEnum.DATA_NODE_NOT_FOUND);
+ }
+
+ OceanBaseDataNodeInfo dataNodeInfo = new OceanBaseDataNodeInfo();
+ CommonBeanUtils.copyProperties(entity, dataNodeInfo);
+ if (StringUtils.isNotBlank(entity.getExtParams())) {
+ OceanBaseDataNodeDTO dto =
OceanBaseDataNodeDTO.getFromJson(entity.getExtParams());
+ CommonBeanUtils.copyProperties(dto, dataNodeInfo);
+ }
+ return dataNodeInfo;
+ }
+
+ @Override
+ protected void setTargetEntity(DataNodeRequest request, DataNodeEntity
targetEntity) {
+ OceanBaseDataNodeRequest dataNodeRequest = (OceanBaseDataNodeRequest)
request;
+ CommonBeanUtils.copyProperties(dataNodeRequest, targetEntity, true);
+ try {
+ OceanBaseDataNodeDTO dto =
+ OceanBaseDataNodeDTO.getFromRequest(dataNodeRequest,
targetEntity.getExtParams());
+ targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
+ } catch (Exception e) {
+ throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT,
+ String.format("Failed to build extParams for OceanBase
node: %s", e.getMessage()));
+ }
+ }
+
+ @Override
+ public Boolean testConnection(DataNodeRequest request) {
+ String jdbcUrl =
OceanBaseDataNodeDTO.convertToJdbcurl(request.getUrl());
+ String username = request.getUsername();
+ String password = request.getToken();
+ Preconditions.expectNotBlank(jdbcUrl, ErrorCodeEnum.INVALID_PARAMETER,
"connection jdbcUrl cannot be empty");
+ try (Connection ignored = OceanBaseJdbcUtils.getConnection(jdbcUrl,
username, password)) {
+ LOGGER.info("OceanBase connection not null - connection success
for jdbcUrl={}, username={}, password={}",
+ jdbcUrl, username, password);
+ return true;
+ } catch (Exception e) {
+ String errMsg =
+ String.format("OceanBase connection failed for jdbcUrl=%s,
username=%s, password=%s", jdbcUrl,
+ username, password);
+ LOGGER.error(errMsg, e);
+ throw new BusinessException(errMsg);
+ }
+ }
+
+ @Override
+ public void updateRelatedStreamSource(DataNodeRequest request,
DataNodeEntity dataNodeEntity, String operator) {
+ OceanBaseDataNodeRequest nodeRequest = (OceanBaseDataNodeRequest)
request;
+ OceanBaseDataNodeInfo nodeInfo = (OceanBaseDataNodeInfo)
this.getFromEntity(dataNodeEntity);
+ boolean changed = !Objects.equals(nodeRequest.getUrl(),
nodeInfo.getUrl())
+ || !Objects.equals(nodeRequest.getBackupUrl(),
nodeInfo.getBackupUrl())
+ || !Objects.equals(nodeRequest.getUsername(),
nodeInfo.getUsername())
+ || !Objects.equals(nodeRequest.getToken(),
nodeInfo.getToken());
+ if (changed) {
+ retryStreamSourceByDataNodeNameAndType(dataNodeEntity.getName(),
SourceType.OCEANBASE, operator);
+ }
+ }
+
+}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/oceanbase/OceanBaseJdbcUtils.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/oceanbase/OceanBaseJdbcUtils.java
new file mode 100644
index 0000000000..35e67a5a57
--- /dev/null
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/oceanbase/OceanBaseJdbcUtils.java
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.resource.sink.oceanbase;
+
+import org.apache.inlong.manager.common.util.UrlVerificationUtils;
+import org.apache.inlong.manager.pojo.sink.oceanbase.OceanBaseColumnInfo;
+import org.apache.inlong.manager.pojo.sink.oceanbase.OceanBaseTableInfo;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Utils for OceanBase JDBC.
+ */
+public class OceanBaseJdbcUtils {
+
+ private static final String OCEANBASE_JDBC_PREFIX = "jdbc:oceanbase://";
+ private static final String OCEANBASE_DRIVER_CLASS =
"com.oceanbase.jdbc.Driver";
+ private static final Logger LOGGER =
LoggerFactory.getLogger(OceanBaseJdbcUtils.class);
+
+ /**
+ * Get OceanBase connection from the url and user.
+ *
+ * @param url jdbc url, such as jdbc:oceanbase://host:port/database
+ * @param user Username for JDBC URL
+ * @param password User password
+ * @return {@link Connection}
+ * @throws Exception on get connection error
+ */
+ public static Connection getConnection(String url, String user, String
password) throws Exception {
+ if (StringUtils.isNotBlank(url) && StringUtils.isNotBlank(url) &&
StringUtils.isNotBlank(password)) {
+ UrlVerificationUtils.extractHostAndValidatePortFromJdbcUrl(url,
OCEANBASE_JDBC_PREFIX);
+ return establishDatabaseConnection(url, user, password);
+ }
+ return null;
+ }
+
+ /**
+ * Establishes a database connection using the provided URL, username, and
password.
+ *
+ * @param url The JDBC URL
+ * @param user The username
+ * @param password The user's password
+ * @return A {@link Connection} object representing the database connection
+ * @throws Exception If an error occurs while obtaining the connection
+ */
+ private static Connection establishDatabaseConnection(String url, String
user, String password) throws Exception {
+ Connection conn;
+ try {
+ Class.forName(OCEANBASE_DRIVER_CLASS);
+ conn = DriverManager.getConnection(url, user, password);
+ } catch (Exception e) {
+ String errorMsg =
+ "Failed to get OceanBase connection, please check
OceanBase JDBC URL, username, or password!";
+ LOGGER.error(errorMsg, e);
+ throw new Exception(errorMsg + " Other error message: " +
e.getMessage());
+ }
+ if (conn == null) {
+ throw new Exception("get OceanBase connection failed, please
contact administrator");
+ }
+ LOGGER.info("get OceanBase connection success for url={}", url);
+ return conn;
+ }
+
+ /**
+ * Execute SQL command on OceanBase.
+ *
+ * @param conn JDBC {@link Connection}
+ * @param sql SQL to be executed
+ * @throws Exception on execute SQL error
+ */
+ public static void executeSql(final Connection conn, final String sql)
throws Exception {
+ try (Statement stmt = conn.createStatement()) {
+ stmt.execute(sql);
+ LOGGER.info("execute sql [{}] success", sql);
+ }
+ }
+
+ /**
+ * Execute batch query SQL on OceanBase.
+ *
+ * @param conn JDBC {@link Connection}
+ * @param sqls SQL to be executed
+ * @throws Exception on get execute SQL batch error
+ */
+ public static void executeSqlBatch(final Connection conn, final
List<String> sqls) throws Exception {
+ conn.setAutoCommit(false);
+ try (Statement stmt = conn.createStatement()) {
+ for (String entry : sqls) {
+ stmt.execute(entry);
+ }
+ conn.commit();
+ LOGGER.info("execute sql [{}] success", sqls);
+ } finally {
+ conn.setAutoCommit(true);
+ }
+ }
+
+ /**
+ * Create OceanBase database
+ *
+ * @param conn JDBC {@link Connection}
+ * @param dbName database name
+ * @throws Exception on create database error
+ */
+ public static void createDb(final Connection conn, final String dbName)
throws Exception {
+ if (!checkDbExist(conn, dbName)) {
+ final String createDbSql =
OceanBaseSqlBuilder.buildCreateDbSql(dbName);
+ executeSql(conn, createDbSql);
+ LOGGER.info("execute sql [{}] success", createDbSql);
+ } else {
+ LOGGER.info("The database [{}] are exists", dbName);
+ }
+ }
+
+ /**
+ * Check database from the OceanBase information_schema.
+ *
+ * @param conn JDBC {@link Connection}
+ * @param dbName database name
+ * @return true if table exist, otherwise false
+ * @throws Exception on check database exist error
+ */
+ public static boolean checkDbExist(final Connection conn, final String
dbName) throws Exception {
+ final String checkDbSql = OceanBaseSqlBuilder.getCheckDatabase(dbName);
+ try (Statement stmt = conn.createStatement();
+ ResultSet resultSet = stmt.executeQuery(checkDbSql)) {
+ if (Objects.nonNull(resultSet)) {
+ if (resultSet.next()) {
+ LOGGER.info("check db exist for db={}, result=true",
dbName);
+ return true;
+ }
+ }
+ }
+ LOGGER.info("check db exist for db={}, result=false", dbName);
+ return false;
+ }
+
+ /**
+ * Create OceanBase table by OceanBaseTableInfo
+ *
+ * @param conn JDBC {@link Connection}
+ * @param tableInfo table info {@link OceanBaseTableInfo}
+ * @throws Exception on create table error
+ */
+ public static void createTable(final Connection conn, final
OceanBaseTableInfo tableInfo) throws Exception {
+ if (checkTablesExist(conn, tableInfo.getDbName(),
tableInfo.getTableName())) {
+ LOGGER.info("The table [{}] are exists", tableInfo.getTableName());
+ } else {
+ final String createTableSql =
OceanBaseSqlBuilder.buildCreateTableSql(tableInfo);
+ executeSql(conn, createTableSql);
+ LOGGER.info("execute sql [{}] success", createTableSql);
+ }
+ }
+
+ /**
+ * Check tables from the OceanBase information_schema.
+ *
+ * @param conn JDBC {@link Connection}
+ * @param dbName database name
+ * @param tableName table name
+ * @return true if table exist, otherwise false
+ * @throws Exception on check table exist error
+ */
+ public static boolean checkTablesExist(final Connection conn, final String
dbName, final String tableName)
+ throws Exception {
+ boolean result = false;
+ final String checkTableSql = OceanBaseSqlBuilder.getCheckTable(dbName,
tableName);
+ try (Statement stmt = conn.createStatement();
+ ResultSet resultSet = stmt.executeQuery(checkTableSql)) {
+ if (Objects.nonNull(resultSet)) {
+ if (resultSet.next()) {
+ result = true;
+ }
+ }
+ }
+ LOGGER.info("check table exist for db={} table={}, result={}", dbName,
tableName, result);
+ return result;
+ }
+
+ /**
+ * Check whether the column exists in the OceanBase table.
+ *
+ * @param conn JDBC Connection {@link Connection}
+ * @param dbName database name
+ * @param tableName table name
+ * @param column table column name
+ * @return true if column exist in the table, otherwise false
+ * @throws Exception on check column exist error
+ */
+ public static boolean checkColumnExist(final Connection conn, final String
dbName, final String tableName,
+ final String column) throws Exception {
+ boolean result = false;
+ final String checkTableSql =
OceanBaseSqlBuilder.getCheckColumn(dbName, tableName, column);
+ try (Statement stmt = conn.createStatement();
+ ResultSet resultSet = stmt.executeQuery(checkTableSql)) {
+ if (Objects.nonNull(resultSet)) {
+ if (resultSet.next()) {
+ result = true;
+ }
+ }
+ }
+ LOGGER.info("check column exist for db={} table={}, result={}
column={}", dbName, tableName, result, column);
+ return result;
+ }
+
+ /**
+ * Query all OceanBase table columns by the given tableName.
+ *
+ * @param conn JDBC {@link Connection}
+ * @param dbName database name
+ * @param tableName table name
+ * @return {@link List}
+ * @throws Exception on get columns error
+ */
+ public static List<OceanBaseColumnInfo> getColumns(final Connection conn,
final String dbName,
+ final String tableName)
+ throws Exception {
+ final String querySql = OceanBaseSqlBuilder.buildDescTableSql(dbName,
tableName);
+ final List<OceanBaseColumnInfo> columnList = new ArrayList<>();
+
+ try (Statement stmt = conn.createStatement();
+ ResultSet rs = stmt.executeQuery(querySql)) {
+ if (Objects.nonNull(rs)) {
+ while (rs.next()) {
+ OceanBaseColumnInfo columnInfo = new
OceanBaseColumnInfo(rs.getString(1),
+ rs.getString(2), rs.getString(3));
+ columnList.add(columnInfo);
+ }
+ }
+ }
+ return columnList;
+ }
+
+ /**
+ * Add columns for OceanBase table.
+ *
+ * @param conn JDBC Connection {@link Connection}
+ * @param dbName database name
+ * @param tableName table name
+ * @param columns columns to be added
+ * @throws Exception on add columns error
+ */
+ public static void addColumns(final Connection conn, final String dbName,
final String tableName,
+ final List<OceanBaseColumnInfo> columns) throws Exception {
+ final List<OceanBaseColumnInfo> columnInfos = Lists.newArrayList();
+
+ for (OceanBaseColumnInfo columnInfo : columns) {
+ if (!checkColumnExist(conn, dbName, tableName,
columnInfo.getName())) {
+ columnInfos.add(columnInfo);
+ }
+ }
+ final List<String> addColumnSql =
OceanBaseSqlBuilder.buildAddColumnsSql(dbName, tableName, columnInfos);
+ executeSqlBatch(conn, addColumnSql);
+ }
+
+}
\ No newline at end of file
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/oceanbase/OceanBaseResourceOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/oceanbase/OceanBaseResourceOperator.java
new file mode 100644
index 0000000000..1da92bf3a8
--- /dev/null
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/oceanbase/OceanBaseResourceOperator.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.resource.sink.oceanbase;
+
+import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.enums.SinkStatus;
+import org.apache.inlong.manager.common.exceptions.WorkflowException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.Preconditions;
+import org.apache.inlong.manager.dao.entity.StreamSinkFieldEntity;
+import org.apache.inlong.manager.dao.mapper.StreamSinkFieldEntityMapper;
+import org.apache.inlong.manager.pojo.node.DataNodeInfo;
+import org.apache.inlong.manager.pojo.node.oceanbase.OceanBaseDataNodeDTO;
+import org.apache.inlong.manager.pojo.sink.SinkInfo;
+import org.apache.inlong.manager.pojo.sink.oceanbase.OceanBaseColumnInfo;
+import org.apache.inlong.manager.pojo.sink.oceanbase.OceanBaseSinkDTO;
+import org.apache.inlong.manager.pojo.sink.oceanbase.OceanBaseTableInfo;
+import org.apache.inlong.manager.service.node.DataNodeOperateHelper;
+import org.apache.inlong.manager.service.resource.sink.SinkResourceOperator;
+import org.apache.inlong.manager.service.sink.StreamSinkService;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.sql.Connection;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * OceanBase's resource operator.
+ */
+@Service
+public class OceanBaseResourceOperator implements SinkResourceOperator {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(OceanBaseResourceOperator.class);
+
+ @Autowired
+ private StreamSinkService sinkService;
+
+ @Autowired
+ private StreamSinkFieldEntityMapper fieldEntityMapper;
+
+ @Autowired
+ private DataNodeOperateHelper dataNodeHelper;
+
+ @Override
+ public Boolean accept(String sinkType) {
+ return SinkType.OCEANBASE.equals(sinkType);
+ }
+
+ @Override
+ public void createSinkResource(SinkInfo sinkInfo) {
+ LOG.info("begin to create OceanBase resources sinkId={}",
sinkInfo.getId());
+ if
(SinkStatus.CONFIG_SUCCESSFUL.getCode().equals(sinkInfo.getStatus())) {
+ LOG.warn("OceanBase resource [" + sinkInfo.getId() + "] already
success, skip to create");
+ return;
+ } else if
(InlongConstants.DISABLE_CREATE_RESOURCE.equals(sinkInfo.getEnableCreateResource()))
{
+ LOG.warn("create resource was disabled, skip to create for [" +
sinkInfo.getId() + "]");
+ return;
+ }
+ this.createTable(sinkInfo);
+ }
+
+ /**
+ * Create OceanBase table by SinkInfo.
+ *
+ * @param sinkInfo {@link SinkInfo}
+ */
+ private void createTable(SinkInfo sinkInfo) {
+ LOG.info("begin to create OceanBase table for sinkId={}",
sinkInfo.getId());
+ List<StreamSinkFieldEntity> fieldList =
fieldEntityMapper.selectBySinkId(sinkInfo.getId());
+ if (CollectionUtils.isEmpty(fieldList)) {
+ LOG.warn("no OceanBase fields found, skip to create table for
sinkId={}", sinkInfo.getId());
+ }
+ // set columns
+ List<OceanBaseColumnInfo> columnList = new ArrayList<>();
+ for (StreamSinkFieldEntity field : fieldList) {
+ OceanBaseColumnInfo columnInfo = new
OceanBaseColumnInfo(field.getFieldName(), field.getFieldType(),
+ field.getFieldComment());
+ columnList.add(columnInfo);
+ }
+
+ OceanBaseSinkDTO sinkDTO = this.getOceanBaseInfo(sinkInfo);
+ OceanBaseTableInfo tableInfo = OceanBaseSinkDTO.getTableInfo(sinkDTO,
columnList);
+ try (Connection conn =
OceanBaseJdbcUtils.getConnection(sinkDTO.getJdbcUrl(), sinkDTO.getUsername(),
+ sinkDTO.getPassword())) {
+ // 1. create database if not exists
+ OceanBaseJdbcUtils.createDb(conn, tableInfo.getDbName());
+ // 2. table not exists, create it
+ OceanBaseJdbcUtils.createTable(conn, tableInfo);
+ // 3. table exists, add columns - skip the exists columns
+ OceanBaseJdbcUtils.addColumns(conn, tableInfo.getDbName(),
tableInfo.getTableName(), columnList);
+
+ // 4. update the sink status to success
+ String info = "success to create OceanBase resource";
+ sinkService.updateStatus(sinkInfo.getId(),
SinkStatus.CONFIG_SUCCESSFUL.getCode(), info);
+ LOG.info(info + " for sinkInfo={}", sinkInfo);
+ } catch (Throwable e) {
+ String errMsg = "create OceanBase table failed: " + e.getMessage();
+ LOG.error(errMsg, e);
+ sinkService.updateStatus(sinkInfo.getId(),
SinkStatus.CONFIG_FAILED.getCode(), errMsg);
+ throw new WorkflowException(errMsg);
+ }
+ LOG.info("success create OceanBase table for data sink [" +
sinkInfo.getId() + "]");
+ }
+
+ private OceanBaseSinkDTO getOceanBaseInfo(SinkInfo sinkInfo) {
+ OceanBaseSinkDTO OceanBaseInfo =
OceanBaseSinkDTO.getFromJson(sinkInfo.getExtParams());
+
+ if (StringUtils.isBlank(OceanBaseInfo.getJdbcUrl())) {
+ String dataNodeName = sinkInfo.getDataNodeName();
+ Preconditions.expectNotBlank(dataNodeName,
ErrorCodeEnum.INVALID_PARAMETER,
+ "OceanBase jdbc url not specified and data node is empty");
+ DataNodeInfo dataNodeInfo =
dataNodeHelper.getDataNodeInfo(dataNodeName, sinkInfo.getSinkType());
+ CommonBeanUtils.copyProperties(dataNodeInfo, OceanBaseInfo);
+
OceanBaseInfo.setJdbcUrl(OceanBaseDataNodeDTO.convertToJdbcurl(dataNodeInfo.getUrl()));
+ OceanBaseInfo.setPassword(dataNodeInfo.getToken());
+ }
+ return OceanBaseInfo;
+ }
+
+}
\ No newline at end of file
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/oceanbase/OceanBaseSqlBuilder.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/oceanbase/OceanBaseSqlBuilder.java
new file mode 100644
index 0000000000..5cbef1557f
--- /dev/null
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/oceanbase/OceanBaseSqlBuilder.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.resource.sink.oceanbase;
+
+import org.apache.inlong.manager.pojo.sink.oceanbase.OceanBaseColumnInfo;
+import org.apache.inlong.manager.pojo.sink.oceanbase.OceanBaseTableInfo;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Builder the SQL string for OceanBase
+ */
+public class OceanBaseSqlBuilder {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(OceanBaseSqlBuilder.class);
+
+ /**
+ * Build SQL to check whether the database exists.
+ *
+ * @param dbName OceanBase database name
+ * @return the check database SQL string
+ */
+ public static String getCheckDatabase(String dbName) {
+ final StringBuilder sqlBuilder = new StringBuilder()
+ .append("SELECT schema_name ")
+ .append(" FROM information_schema.schemata ")
+ .append("WHERE schema_name = '")
+ .append(dbName)
+ .append("';");
+ LOGGER.info("check database sql: {}", sqlBuilder);
+ return sqlBuilder.toString();
+ }
+
+ /**
+ * Build SQL to check whether the table exists.
+ *
+ * @param dbName OceanBase database name
+ * @param tableName OceanBase table name
+ * @return the check table SQL string
+ */
+ public static String getCheckTable(String dbName, String tableName) {
+ final StringBuilder sqlBuilder = new StringBuilder()
+ .append("select table_schema,table_name ")
+ .append(" from information_schema.tables where table_schema =
'")
+ .append(dbName)
+ .append("' and table_name = '")
+ .append(tableName)
+ .append("' ;");
+ LOGGER.info("check table sql: {}", sqlBuilder);
+ return sqlBuilder.toString();
+ }
+
+ /**
+ * Build SQL to check whether the column exists.
+ *
+ * @param dbName OceanBase database name
+ * @param tableName OceanBase table name
+ * @param columnName OceanBase column name
+ * @return the check column SQL string
+ */
+ public static String getCheckColumn(String dbName, String tableName,
String columnName) {
+ final StringBuilder sqlBuilder = new StringBuilder()
+ .append("SELECT COLUMN_NAME,COLUMN_TYPE,COLUMN_COMMENT ")
+ .append(" from information_schema.COLUMNS where
table_schema='")
+ .append(dbName)
+ .append("' and table_name = '")
+ .append(tableName)
+ .append("' and column_name = '")
+ .append(columnName)
+ .append("';");
+ LOGGER.info("check table sql: {}", sqlBuilder);
+ return sqlBuilder.toString();
+ }
+
+ /**
+ * Build create database SQL.
+ *
+ * @param dbName OceanBase database name
+ * @return the create database SQL string
+ */
+ public static String buildCreateDbSql(String dbName) {
+ final String sql = "CREATE DATABASE " + dbName;
+ LOGGER.info("create db sql: {}", sql);
+ return sql;
+ }
+
+ /**
+ * Build create table SQL by OceanBaseTableInfo.
+ *
+ * @param table OceanBase table info {@link OceanBaseTableInfo}
+ * @return the create table SQL String
+ */
+ public static String buildCreateTableSql(OceanBaseTableInfo table) {
+ final StringBuilder sql = new StringBuilder()
+ .append("CREATE TABLE ").append(table.getDbName())
+ .append(".")
+ .append(table.getTableName())
+ .append(buildCreateColumnsSql(table));
+
+ if (StringUtils.isEmpty(table.getEngine())) {
+ sql.append(" ENGINE=InnoDB ");
+ } else {
+ sql.append(" ENGINE=")
+ .append(table.getEngine())
+ .append(" ");
+ }
+
+ if (!StringUtils.isEmpty(table.getCharset())) {
+ sql.append(" DEFAULT CHARSET=")
+ .append(table.getCharset())
+ .append(" ");
+ }
+
+ LOGGER.info("create table sql: {}", sql);
+ return sql.toString();
+ }
+
+ /**
+ * Build add columns SQL.
+ *
+ * @param tableName OceanBase table name
+ * @param columnList OceanBase column list {@link List}
+ * @return add column SQL string list
+ */
+ public static List<String> buildAddColumnsSql(String dbName, String
tableName,
+ List<OceanBaseColumnInfo> columnList) {
+ final List<String> columnInfoList = getColumnsInfo(columnList);
+ final List<String> resultList = new ArrayList<>();
+ final StringBuilder sqlBuilder = new StringBuilder();
+ columnInfoList.forEach(columnInfo -> {
+ sqlBuilder.append("ALTER TABLE ")
+ .append(dbName)
+ .append(".")
+ .append(tableName)
+ .append(" ADD COLUMN ")
+ .append(columnInfo)
+ .append(";");
+ resultList.add(sqlBuilder.toString());
+ sqlBuilder.delete(0, sqlBuilder.length());
+ });
+ return resultList;
+ }
+
+ /**
+ * Build create column SQL.
+ *
+ * @param table OceanBase table info {@link OceanBaseTableInfo}
+ * @return create column SQL string
+ */
+ private static String buildCreateColumnsSql(OceanBaseTableInfo table) {
+ final List<String> columnList = getColumnsInfo(table.getColumns());
+ final StringBuilder sql = new StringBuilder()
+ .append(" (")
+ .append(StringUtils.join(columnList, ","));
+ if (!StringUtils.isEmpty(table.getPrimaryKey())) {
+ sql.append(", PRIMARY KEY (")
+ .append(table.getPrimaryKey())
+ .append(")");
+ }
+ sql.append(") ");
+ LOGGER.info("create columns sql={}", sql);
+ return sql.toString();
+ }
+
+ /**
+ * Build column info by OceanBaseColumnInfo list.
+ *
+ * @param columns OceanBase column info {@link OceanBaseColumnInfo} list
+ * @return the SQL list
+ */
+ private static List<String> getColumnsInfo(List<OceanBaseColumnInfo>
columns) {
+ final List<String> columnList = new ArrayList<>();
+ final StringBuilder columnBuilder = new StringBuilder();
+ columns.forEach(columnInfo -> {
+ columnBuilder.append("`")
+ .append(columnInfo.getName())
+ .append("`")
+ .append(" ")
+ .append(columnInfo.getType());
+ if (!StringUtils.isEmpty(columnInfo.getComment())) {
+ columnBuilder.append(" COMMENT '")
+ .append(columnInfo.getComment())
+ .append("'");
+ }
+ columnBuilder.append(" ");
+ columnList.add(columnBuilder.toString());
+ columnBuilder.delete(0, columnBuilder.length());
+ });
+ return columnList;
+ }
+
+ /**
+ * Build query table SQL.
+ *
+ * @param dbName OceanBase database name
+ * @param tableName OceanBase table name
+ * @return desc table SQL string
+ */
+ public static String buildDescTableSql(String dbName, String tableName) {
+ final StringBuilder sql = new StringBuilder()
+ .append("SELECT COLUMN_NAME,COLUMN_TYPE,COLUMN_COMMENT ")
+ .append(" from information_schema.COLUMNS where
table_schema='")
+ .append(dbName)
+ .append("' and table_name = '")
+ .append(tableName)
+ .append("';");
+ LOGGER.info("desc table sql={}", sql);
+ return sql.toString();
+ }
+
+}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/oceanbase/OceanBaseSinkOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/oceanbase/OceanBaseSinkOperator.java
new file mode 100644
index 0000000000..debe791d82
--- /dev/null
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/oceanbase/OceanBaseSinkOperator.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.sink.oceanbase;
+
+import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
+import org.apache.inlong.manager.pojo.node.DataNodeInfo;
+import org.apache.inlong.manager.pojo.node.oceanbase.OceanBaseDataNodeDTO;
+import org.apache.inlong.manager.pojo.sink.SinkField;
+import org.apache.inlong.manager.pojo.sink.SinkRequest;
+import org.apache.inlong.manager.pojo.sink.StreamSink;
+import org.apache.inlong.manager.pojo.sink.oceanbase.OceanBaseSink;
+import org.apache.inlong.manager.pojo.sink.oceanbase.OceanBaseSinkDTO;
+import org.apache.inlong.manager.pojo.sink.oceanbase.OceanBaseSinkRequest;
+import org.apache.inlong.manager.service.sink.AbstractSinkOperator;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.List;
+
+/**
+ * OceanBase sink operator
+ */
+@Service
+public class OceanBaseSinkOperator extends AbstractSinkOperator {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(OceanBaseSinkOperator.class);
+
+ @Autowired
+ private ObjectMapper objectMapper;
+
+ @Override
+ public Boolean accept(String sinkType) {
+ return SinkType.OCEANBASE.equals(sinkType);
+ }
+
+ @Override
+ protected String getSinkType() {
+ return SinkType.OCEANBASE;
+ }
+
+ @Override
+ protected void setTargetEntity(SinkRequest request, StreamSinkEntity
targetEntity) {
+ if (!this.getSinkType().equals(request.getSinkType())) {
+ throw new BusinessException(ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT,
+ ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT.getMessage() + ": " +
getSinkType());
+ }
+ OceanBaseSinkRequest sinkRequest = (OceanBaseSinkRequest) request;
+ try {
+ OceanBaseSinkDTO dto =
OceanBaseSinkDTO.getFromRequest(sinkRequest, targetEntity.getExtParams());
+ targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
+ } catch (Exception e) {
+ throw new BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED,
+ String.format("serialize extParams of OceanBase SinkDTO
failure: %s", e.getMessage()));
+ }
+ }
+
+ @Override
+ public StreamSink getFromEntity(StreamSinkEntity entity) {
+ OceanBaseSink sink = new OceanBaseSink();
+ if (entity == null) {
+ return sink;
+ }
+
+ OceanBaseSinkDTO dto =
OceanBaseSinkDTO.getFromJson(entity.getExtParams());
+ if (StringUtils.isBlank(dto.getJdbcUrl())) {
+ if (StringUtils.isBlank(entity.getDataNodeName())) {
+ throw new BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT,
+ "OceanBase jdbc url not specified and data node is
blank");
+ }
+ DataNodeInfo dataNodeInfo = dataNodeHelper.getDataNodeInfo(
+ entity.getDataNodeName(), entity.getSinkType());
+ CommonBeanUtils.copyProperties(dataNodeInfo, dto, true);
+
dto.setJdbcUrl(OceanBaseDataNodeDTO.convertToJdbcurl(dataNodeInfo.getUrl()));
+ dto.setPassword(dataNodeInfo.getToken());
+ }
+ CommonBeanUtils.copyProperties(entity, sink, true);
+ CommonBeanUtils.copyProperties(dto, sink, true);
+ List<SinkField> sinkFields = super.getSinkFields(entity.getId());
+ sink.setSinkFieldList(sinkFields);
+ return sink;
+ }
+
+}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/oceanbase/OceanBaseSourceOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/oceanbase/OceanBaseSourceOperator.java
new file mode 100644
index 0000000000..ae55c41262
--- /dev/null
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/oceanbase/OceanBaseSourceOperator.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.source.oceanbase;
+
+import org.apache.inlong.manager.common.consts.DataNodeType;
+import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.common.consts.SourceType;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonUtils;
+import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
+import org.apache.inlong.manager.pojo.node.oceanbase.OceanBaseDataNodeInfo;
+import org.apache.inlong.manager.pojo.source.SourceRequest;
+import org.apache.inlong.manager.pojo.source.StreamSource;
+import org.apache.inlong.manager.pojo.source.oceanbase.OceanBaseBinlogSource;
+import
org.apache.inlong.manager.pojo.source.oceanbase.OceanBaseBinlogSourceDTO;
+import
org.apache.inlong.manager.pojo.source.oceanbase.OceanBaseBinlogSourceRequest;
+import org.apache.inlong.manager.pojo.stream.StreamField;
+import org.apache.inlong.manager.service.source.AbstractSourceOperator;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.lang3.StringUtils;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Binlog source operator
+ */
+@Service
+public class OceanBaseSourceOperator extends AbstractSourceOperator {
+
+ @Autowired
+ private ObjectMapper objectMapper;
+
+ @Override
+ public Boolean accept(String sourceType) {
+ return SourceType.OCEANBASE.equals(sourceType);
+ }
+
+ @Override
+ protected String getSourceType() {
+ return SourceType.OCEANBASE;
+ }
+
+ @Override
+ public String getExtParams(StreamSourceEntity sourceEntity) {
+ OceanBaseBinlogSourceDTO OceanBaseBinlogSourceDTO =
JsonUtils.parseObject(sourceEntity.getExtParams(),
+ OceanBaseBinlogSourceDTO.class);
+ if (Objects.nonNull(OceanBaseBinlogSourceDTO) &&
StringUtils.isBlank(OceanBaseBinlogSourceDTO.getHostname())) {
+ OceanBaseDataNodeInfo dataNodeInfo = (OceanBaseDataNodeInfo)
dataNodeService.get(
+ sourceEntity.getDataNodeName(), DataNodeType.OCEANBASE);
+ CommonBeanUtils.copyProperties(dataNodeInfo,
OceanBaseBinlogSourceDTO, true);
+ OceanBaseBinlogSourceDTO.setPassword(dataNodeInfo.getToken());
+
OceanBaseBinlogSourceDTO.setHostname(dataNodeInfo.getUrl().split(InlongConstants.COLON)[0]);
+
OceanBaseBinlogSourceDTO.setPort(Integer.valueOf(dataNodeInfo.getUrl().split(InlongConstants.COLON)[1]));
+ return JsonUtils.toJsonString(OceanBaseBinlogSourceDTO);
+ }
+ return sourceEntity.getExtParams();
+ }
+
+ @Override
+ protected void setTargetEntity(SourceRequest request, StreamSourceEntity
targetEntity) {
+ OceanBaseBinlogSourceRequest sourceRequest =
(OceanBaseBinlogSourceRequest) request;
+ CommonBeanUtils.copyProperties(sourceRequest, targetEntity, true);
+ try {
+ OceanBaseBinlogSourceDTO dto =
+ OceanBaseBinlogSourceDTO.getFromRequest(sourceRequest,
targetEntity.getExtParams());
+ targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
+ } catch (Exception e) {
+ throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT,
+ String.format("serialize extParams of OceanBaseBinlog
SourceDTO failure: %s", e.getMessage()));
+ }
+ }
+
+ @Override
+ public StreamSource getFromEntity(StreamSourceEntity entity) {
+ OceanBaseBinlogSource source = new OceanBaseBinlogSource();
+ if (entity == null) {
+ return source;
+ }
+
+ OceanBaseBinlogSourceDTO dto =
OceanBaseBinlogSourceDTO.getFromJson(entity.getExtParams());
+ if (StringUtils.isBlank(dto.getHostname())) {
+ if (StringUtils.isBlank(entity.getDataNodeName())) {
+ throw new
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT,
+ "OceanBase url and data node is blank");
+ }
+ OceanBaseDataNodeInfo dataNodeInfo = (OceanBaseDataNodeInfo)
dataNodeService.get(
+ entity.getDataNodeName(), DataNodeType.OCEANBASE);
+ CommonBeanUtils.copyProperties(dataNodeInfo, dto, true);
+ dto.setPassword(dataNodeInfo.getToken());
+
dto.setHostname(dataNodeInfo.getUrl().split(InlongConstants.COLON)[0]);
+
dto.setPort(Integer.valueOf(dataNodeInfo.getUrl().split(InlongConstants.COLON)[1]));
+ }
+ CommonBeanUtils.copyProperties(entity, source, true);
+ CommonBeanUtils.copyProperties(dto, source, true);
+
+ List<StreamField> sourceFields = super.getSourceFields(entity.getId());
+ source.setFieldList(sourceFields);
+ return source;
+ }
+
+}