This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 77338950f [INLONG-7242][Manager] Support register and manage the
resource of Redis sink (#7243)
77338950f is described below
commit 77338950f861d947086da231f876f191a6fbb6e1
Author: feat <[email protected]>
AuthorDate: Thu Feb 2 16:00:27 2023 +0800
[INLONG-7242][Manager] Support register and manage the resource of Redis
sink (#7243)
---
.../inlong/manager/common/consts/DataNodeType.java | 1 +
.../inlong/manager/common/consts/SinkType.java | 1 +
.../inlong/manager/common/enums/ErrorCodeEnum.java | 3 +-
.../inlong/manager/common/util/Preconditions.java | 13 ++
.../plugin/flink/enums/ConnectorJarType.java | 1 +
.../manager/pojo/node/redis/RedisDataNodeDTO.java | 63 ++++++++
.../manager/pojo/node/redis/RedisDataNodeInfo.java | 47 ++++++
.../pojo/node/redis/RedisDataNodeRequest.java} | 30 ++--
.../manager/pojo/sink/redis/RedisClusterMode.java} | 32 +++--
.../manager/pojo/sink/redis/RedisDataType.java} | 34 +++--
.../pojo/sink/redis/RedisSchemaMapMode.java} | 19 +--
.../inlong/manager/pojo/sink/redis/RedisSink.java | 124 ++++++++++++++++
.../manager/pojo/sink/redis/RedisSinkDTO.java | 129 +++++++++++++++++
.../manager/pojo/sink/redis/RedisSinkRequest.java | 108 ++++++++++++++
.../manager/pojo/sort/util/LoadNodeUtils.java | 118 +++++++++++++++
.../service/node/redis/RedisDataNodeOperator.java | 84 +++++++++++
.../resource/sink/redis/RedisResourceOperator.java | 48 +++++++
.../service/sink/redis/RedisSinkOperator.java | 158 +++++++++++++++++++++
.../manager/service/sink/RedisSinkServiceTest.java | 95 +++++++++++++
19 files changed, 1061 insertions(+), 47 deletions(-)
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java
index a32adac37..757561d5b 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java
@@ -30,5 +30,6 @@ public class DataNodeType {
public static final String ELASTICSEARCH = "ELASTICSEARCH";
public static final String MYSQL = "MYSQL";
public static final String STARROCKS = "STARROCKS";
+ public static final String REDIS = "REDIS";
}
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java
index a2da09886..c7e293ed6 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java
@@ -38,4 +38,5 @@ public class SinkType {
public static final String TDSQLPOSTGRESQL = "TDSQLPOSTGRESQL";
public static final String DORIS = "DORIS";
public static final String STARROCKS = "STARROCKS";
+ public static final String REDIS = "REDIS";
}
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java
index 9c532dddc..d690e3f5d 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java
@@ -40,6 +40,8 @@ public enum ErrorCodeEnum {
RECORD_NOT_FOUND(109, "The record does not exist"),
USER_IS_NOT_MANAGER(110, "%s is not the manager, please contact %s"),
RECORD_IN_USED(111, "The record is in use"),
+ IP_EMPTY(112, "The IP is is empty"),
+ PORT_EMPTY(113, "The PORT is is empty"),
GROUP_NOT_FOUND(1001, "Inlong group does not exist/no operation
authority"),
GROUP_DUPLICATE(1002, "Inlong group already exists"),
@@ -143,7 +145,6 @@ public enum ErrorCodeEnum {
CONSUME_PERMISSION_DENIED(3005, "No permission to access this inlong
consume"),
AUDIT_ID_TYPE_NOT_SUPPORTED(4001, "Audit id type '%s' not supported"),
-
;
private final int code;
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/util/Preconditions.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/util/Preconditions.java
index b0326b18c..ce2ae4485 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/util/Preconditions.java
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/util/Preconditions.java
@@ -90,6 +90,14 @@ public class Preconditions {
expectTrue(collection != null && !collection.isEmpty(), errMsg);
}
+ public static void expectNotEmpty(String[] collection, String errMsg) {
+ expectTrue(collection != null && collection.length != 0, errMsg);
+ }
+
+ public static void expectNotEmpty(String[] collection, Supplier<String>
errMsg) {
+ expectTrue(collection != null && collection.length != 0, errMsg);
+ }
+
public static void expectNotEmpty(Map<?, ?> map, String errMsg) {
expectTrue(map != null && !map.isEmpty(), errMsg);
}
@@ -127,6 +135,11 @@ public class Preconditions {
throw new IllegalArgumentException(errMsg);
}
}
+ public static void expectNotBlank(String obj, String errMsg) {
+ if (StringUtils.isBlank(obj)) {
+ throw new IllegalArgumentException(errMsg);
+ }
+ }
public static void expectNotBlank(String obj, ErrorCodeEnum errorCodeEnum)
{
if (StringUtils.isBlank(obj)) {
diff --git
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/enums/ConnectorJarType.java
b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/enums/ConnectorJarType.java
index de2259f67..52a581a50 100644
---
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/enums/ConnectorJarType.java
+++
b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/enums/ConnectorJarType.java
@@ -72,6 +72,7 @@ public enum ConnectorJarType {
HUDI_SINK("hudiLoad", "hudi"),
HDFS_SINK("fileSystemLoad", ""),
+ REDIS_SINK("redisLoad", "redis"),
;
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/redis/RedisDataNodeDTO.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/redis/RedisDataNodeDTO.java
new file mode 100644
index 000000000..1c6b64cd8
--- /dev/null
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/redis/RedisDataNodeDTO.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.node.redis;
+
+import io.swagger.annotations.ApiModel;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.JsonUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.validation.constraints.NotNull;
+
+/**
+ * Redis data node info
+ */
+@Data
+@Builder
+@NoArgsConstructor
+@ApiModel("Redis data node info")
+public class RedisDataNodeDTO {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(RedisDataNodeDTO.class);
+
+ /**
+ * Get the dto instance from the request
+ */
+ public static RedisDataNodeDTO getFromRequest(RedisDataNodeRequest
request) throws Exception {
+ return RedisDataNodeDTO.builder()
+ .build();
+ }
+
+ /**
+ * Get the dto instance from the JSON string.
+ */
+ public static RedisDataNodeDTO getFromJson(@NotNull String extParams) {
+ try {
+ return JsonUtils.parseObject(extParams, RedisDataNodeDTO.class);
+ } catch (Exception e) {
+ throw new BusinessException(ErrorCodeEnum.GROUP_INFO_INCORRECT,
+ String.format("Failed to parse extParams for Redis node:
%s", e.getMessage()));
+ }
+ }
+
+}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/redis/RedisDataNodeInfo.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/redis/RedisDataNodeInfo.java
new file mode 100644
index 000000000..3772f67d7
--- /dev/null
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/redis/RedisDataNodeInfo.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.node.redis;
+
+import io.swagger.annotations.ApiModel;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import org.apache.inlong.manager.common.consts.DataNodeType;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.node.DataNodeInfo;
+
+/**
+ * Redis data node info
+ */
+@Data
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeDefine(value = DataNodeType.REDIS)
+@ApiModel("Redis data node info")
+public class RedisDataNodeInfo extends DataNodeInfo {
+
+ public RedisDataNodeInfo() {
+ this.setType(DataNodeType.REDIS);
+ }
+
+ @Override
+ public RedisDataNodeRequest genRequest() {
+ return CommonBeanUtils.copyProperties(this, RedisDataNodeRequest::new);
+ }
+}
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/redis/RedisDataNodeRequest.java
similarity index 54%
copy from
inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java
copy to
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/redis/RedisDataNodeRequest.java
index a32adac37..5d782c6f0 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/redis/RedisDataNodeRequest.java
@@ -15,20 +15,28 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.common.consts;
+package org.apache.inlong.manager.pojo.node.redis;
+
+import io.swagger.annotations.ApiModel;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import org.apache.inlong.manager.common.consts.DataNodeType;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.node.DataNodeRequest;
/**
- * Constants of data node.
+ * Redis data node request
*/
-public class DataNodeType {
+@Data
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeDefine(value = DataNodeType.REDIS)
+@ApiModel("Redis data node request")
+public class RedisDataNodeRequest extends DataNodeRequest {
- public static final String HIVE = "HIVE";
- public static final String KAFKA = "KAFKA";
- public static final String ICEBERG = "ICEBERG";
- public static final String HUDI = "HUDI";
- public static final String CLICKHOUSE = "CLICKHOUSE";
- public static final String ELASTICSEARCH = "ELASTICSEARCH";
- public static final String MYSQL = "MYSQL";
- public static final String STARROCKS = "STARROCKS";
+ public RedisDataNodeRequest() {
+ this.setType(DataNodeType.REDIS);
+ }
}
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/redis/RedisClusterMode.java
similarity index 57%
copy from
inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java
copy to
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/redis/RedisClusterMode.java
index a32adac37..f9d4cd413 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/redis/RedisClusterMode.java
@@ -15,20 +15,30 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.common.consts;
+package org.apache.inlong.manager.pojo.sink.redis;
/**
- * Constants of data node.
+ * The cluster mode of Redis.
*/
-public class DataNodeType {
+public enum RedisClusterMode {
- public static final String HIVE = "HIVE";
- public static final String KAFKA = "KAFKA";
- public static final String ICEBERG = "ICEBERG";
- public static final String HUDI = "HUDI";
- public static final String CLICKHOUSE = "CLICKHOUSE";
- public static final String ELASTICSEARCH = "ELASTICSEARCH";
- public static final String MYSQL = "MYSQL";
- public static final String STARROCKS = "STARROCKS";
+ STANDALONE("standalone"),
+ CLUSTER("cluster"),
+ SENTINEL("sentinel"),
+ ;
+ private final String key;
+
+ private RedisClusterMode(String key) {
+ this.key = key;
+ }
+
+ public static RedisClusterMode of(String key) {
+ for (RedisClusterMode redisClusterMode : RedisClusterMode.values()) {
+ if (key != null && redisClusterMode.key.equals(key.toLowerCase()))
{
+ return redisClusterMode;
+ }
+ }
+ return null;
+ }
}
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/redis/RedisDataType.java
similarity index 53%
copy from
inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java
copy to
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/redis/RedisDataType.java
index a32adac37..f74c92b56 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/redis/RedisDataType.java
@@ -15,20 +15,32 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.common.consts;
+package org.apache.inlong.manager.pojo.sink.redis;
+
+import java.util.Arrays;
+import java.util.HashSet;
/**
- * Constants of data node.
+ * The data type of Redis.
*/
-public class DataNodeType {
+public enum RedisDataType {
+
+ HASH(
+ RedisSchemaMapMode.DYNAMIC,
+ RedisSchemaMapMode.STATIC_KV_PAIR,
+ RedisSchemaMapMode.STATIC_PREFIX_MATCH),
+ BITMAP(
+ RedisSchemaMapMode.DYNAMIC),
+ PLAIN(
+ RedisSchemaMapMode.STATIC_PREFIX_MATCH);
+
+ private final HashSet<RedisSchemaMapMode> mapModes;
- public static final String HIVE = "HIVE";
- public static final String KAFKA = "KAFKA";
- public static final String ICEBERG = "ICEBERG";
- public static final String HUDI = "HUDI";
- public static final String CLICKHOUSE = "CLICKHOUSE";
- public static final String ELASTICSEARCH = "ELASTICSEARCH";
- public static final String MYSQL = "MYSQL";
- public static final String STARROCKS = "STARROCKS";
+ private RedisDataType(RedisSchemaMapMode... modes) {
+ this.mapModes = new HashSet<>(Arrays.asList(modes));
+ }
+ public HashSet<RedisSchemaMapMode> getMapModes() {
+ return mapModes;
+ }
}
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/redis/RedisSchemaMapMode.java
similarity index 60%
copy from
inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java
copy to
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/redis/RedisSchemaMapMode.java
index a32adac37..b449fd4f7 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/redis/RedisSchemaMapMode.java
@@ -15,20 +15,13 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.common.consts;
+package org.apache.inlong.manager.pojo.sink.redis;
/**
- * Constants of data node.
+ * The mapMode between SQL column and Redis data-type.
*/
-public class DataNodeType {
-
- public static final String HIVE = "HIVE";
- public static final String KAFKA = "KAFKA";
- public static final String ICEBERG = "ICEBERG";
- public static final String HUDI = "HUDI";
- public static final String CLICKHOUSE = "CLICKHOUSE";
- public static final String ELASTICSEARCH = "ELASTICSEARCH";
- public static final String MYSQL = "MYSQL";
- public static final String STARROCKS = "STARROCKS";
-
+public enum RedisSchemaMapMode {
+ DYNAMIC,
+ STATIC_PREFIX_MATCH,
+ STATIC_KV_PAIR;
}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/redis/RedisSink.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/redis/RedisSink.java
new file mode 100644
index 000000000..ff19f870f
--- /dev/null
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/redis/RedisSink.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sink.redis;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import lombok.experimental.SuperBuilder;
+import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.sink.SinkRequest;
+import org.apache.inlong.manager.pojo.sink.StreamSink;
+
+/**
+ * Redis sink info
+ */
+@Data
+@SuperBuilder
+@AllArgsConstructor
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@ApiModel(value = "Redis sink info")
+@JsonTypeDefine(value = SinkType.REDIS)
+public class RedisSink extends StreamSink {
+
+ @ApiModelProperty("Redis cluster mode")
+ private String clusterMode;
+
+ @ApiModelProperty("Redis database id")
+ private Integer database;
+
+ @ApiModelProperty("Redis data type")
+ private String dataType;
+
+ @ApiModelProperty("Redis schema mapping mode")
+ private String schemaMapMode;
+
+ @ApiModelProperty("Password for Redis accessing")
+ private String password;
+
+ @ApiModelProperty("Database name")
+ private String databaseName;
+
+ @ApiModelProperty("Expire time of Redis row")
+ private Integer ttl;
+
+ @ApiModelProperty("The timeout of Redis client")
+ private Integer timeout;
+
+ @ApiModelProperty("The socket timeout of redis client")
+ private Integer soTimeout;
+
+ @ApiModelProperty("The max total of sink client")
+ private Integer maxTotal;
+
+ @ApiModelProperty("The max idle of sink client")
+ private Integer maxIdle;
+
+ @ApiModelProperty("The min idle of sink client")
+ private Integer minIdle;
+
+ @ApiModelProperty("The max retry time")
+ private Integer maxRetries;
+
+ @ApiModelProperty("The host of Redis server")
+ private String host;
+
+ @ApiModelProperty("The port of Redis server")
+ private Integer port;
+
+ @ApiModelProperty("The master name of Redis sentinel cluster")
+ private String sentinelMasterName;
+
+ @ApiModelProperty("The sentinels info of Redis sentinel cluster")
+ private String sentinelsInfo;
+
+ /**
+ * The address of redis server, format eg: 127.0.0.1:8080,127.0.0.2:8081 .
+ * If server is not cluster mode, server address format eg: 127.0.0.1:8080
.
+ */
+ @ApiModelProperty("The cluster nodes of Redis cluster")
+ private String clusterNodes;
+
+ @ApiModelProperty("The DataEncoding of Redis STATIC_PREFIX_MATCH
data-type")
+ private String formatDataEncoding;
+
+ @ApiModelProperty("The DataType of Redis STATIC_PREFIX_MATCH data-type")
+ private String formatDataType;
+
+ @ApiModelProperty("Whether ignore parse error of Redis STATIC_PREFIX_MATCH
data-type")
+ private Boolean formatIgnoreParseError;
+
+ @ApiModelProperty("The data separator of Redis STATIC_PREFIX_MATCH
data-type")
+ private String formatDataSeparator;
+
+ public RedisSink() {
+ this.setSinkType(SinkType.REDIS);
+ }
+
+ @Override
+ public SinkRequest genSinkRequest() {
+ return CommonBeanUtils.copyProperties(this, RedisSinkRequest::new);
+ }
+
+}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/redis/RedisSinkDTO.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/redis/RedisSinkDTO.java
new file mode 100644
index 000000000..22888179d
--- /dev/null
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/redis/RedisSinkDTO.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sink.redis;
+
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonUtils;
+
+import javax.validation.constraints.NotNull;
+import java.util.Map;
+
+/**
+ * Sink info of Redis
+ */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class RedisSinkDTO {
+
+ @ApiModelProperty("Redis cluster mode")
+ private String clusterMode;
+
+ @ApiModelProperty("Redis database id")
+ private Integer database;
+
+ @ApiModelProperty("Redis data type")
+ private String dataType;
+
+ @ApiModelProperty("Redis schema mapping mode")
+ private String schemaMapMode;
+
+ @ApiModelProperty("Password for Redis accessing")
+ private String password;
+
+ @ApiModelProperty("Database name")
+ private String databaseName;
+
+ @ApiModelProperty("Expire time of Redis row")
+ private Integer ttl;
+
+ @ApiModelProperty("The timeout of Redis client")
+ private Integer timeout;
+
+ @ApiModelProperty("The socket timeout of redis client")
+ private Integer soTimeout;
+
+ @ApiModelProperty("The max total of sink client")
+ private Integer maxTotal;
+
+ @ApiModelProperty("The max idle of sink client")
+ private Integer maxIdle;
+
+ @ApiModelProperty("The min idle of sink client")
+ private Integer minIdle;
+
+ @ApiModelProperty("The max retry time")
+ private Integer maxRetries;
+
+ @ApiModelProperty("The host of Redis server")
+ private String host;
+
+ @ApiModelProperty("The port of Redis server")
+ private Integer port;
+
+ @ApiModelProperty("The master name of Redis sentinel cluster")
+ private String sentinelMasterName;
+
+ private String sentinelsInfo;
+
+ /**
+ * The address of redis server, format eg: 127.0.0.1:8080,127.0.0.2:8081 .
+ * If server is not cluster mode, server address format eg: 127.0.0.1:8080
.
+ */
+ @ApiModelProperty("The cluster nodes of Redis cluster")
+ private String clusterNodes;
+
+ @ApiModelProperty("The DataEncoding of Redis STATIC_PREFIX_MATCH
data-type")
+ private String formatDataEncoding;
+
+ @ApiModelProperty("The DataType of Redis STATIC_PREFIX_MATCH data-type")
+ private String formatDataType;
+
+ @ApiModelProperty("Whether ignore parse error of Redis STATIC_PREFIX_MATCH
data-type")
+ private Boolean formatIgnoreParseError;
+
+ @ApiModelProperty("The data separator of Redis STATIC_PREFIX_MATCH
data-type")
+ private String formatDataSeparator;
+ @ApiModelProperty("Properties for Redis")
+ private Map<String, Object> properties;
+
+ /**
+ * Get the dto instance from the request
+ */
+ public static RedisSinkDTO getFromRequest(RedisSinkRequest request) throws
Exception {
+ return CommonBeanUtils.copyProperties(request, RedisSinkDTO::new);
+ }
+
+ public static RedisSinkDTO getFromJson(@NotNull String extParams) {
+ try {
+ return JsonUtils.parseObject(extParams, RedisSinkDTO.class);
+ } catch (Exception e) {
+ throw new BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT,
+ String.format("parse extParams of Redis SinkDTO failure:
%s", e.getMessage()));
+ }
+ }
+
+}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/redis/RedisSinkRequest.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/redis/RedisSinkRequest.java
new file mode 100644
index 000000000..0ea2e84c9
--- /dev/null
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/redis/RedisSinkRequest.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sink.redis;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.sink.SinkRequest;
+
+/**
+ * Redis sink request.
+ */
+@Data
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@ApiModel(value = "Redis sink request")
+@JsonTypeDefine(value = SinkType.REDIS)
+public class RedisSinkRequest extends SinkRequest {
+
+ @ApiModelProperty("Redis cluster mode")
+ private String clusterMode;
+
+ @ApiModelProperty("Redis database id")
+ private Integer database;
+
+ @ApiModelProperty("Redis data type")
+ private String dataType;
+
+ @ApiModelProperty("Redis schema mapping mode")
+ private String schemaMapMode;
+
+ @ApiModelProperty("Password for Redis accessing")
+ private String password;
+
+ @ApiModelProperty("Database name")
+ private String databaseName;
+
+ @ApiModelProperty("Expire time of Redis row")
+ private Integer ttl;
+
+ @ApiModelProperty("The timeout of Redis client")
+ private Integer timeout;
+
+ @ApiModelProperty("The socket timeout of redis client")
+ private Integer soTimeout;
+
+ @ApiModelProperty("The max total of sink client")
+ private Integer maxTotal;
+
+ @ApiModelProperty("The max idle of sink client")
+ private Integer maxIdle;
+
+ @ApiModelProperty("The min idle of sink client")
+ private Integer minIdle;
+
+ @ApiModelProperty("The max retry time")
+ private Integer maxRetries;
+
+ @ApiModelProperty("The host of Redis server")
+ private String host;
+
+ @ApiModelProperty("The port of Redis server")
+ private Integer port;
+
+ @ApiModelProperty("The master name of Redis sentinel cluster")
+ private String sentinelMasterName;
+
+ @ApiModelProperty("The sentinels info of Redis sentinel cluster")
+ private String sentinelsInfo;
+
+ /**
+ * The address of redis server, format eg: 127.0.0.1:8080,127.0.0.2:8081 .
+ * If server is not cluster mode, server address format eg: 127.0.0.1:8080
.
+ */
+ @ApiModelProperty("The cluster nodes of Redis cluster")
+ private String clusterNodes;
+
+ @ApiModelProperty("The DataEncoding of Redis STATIC_PREFIX_MATCH
data-type")
+ private String formatDataEncoding;
+
+ @ApiModelProperty("The DataType of Redis STATIC_PREFIX_MATCH data-type")
+ private String formatDataType;
+
+ @ApiModelProperty("Whether ignore parse error of Redis STATIC_PREFIX_MATCH
data-type")
+ private Boolean formatIgnoreParseError;
+
+ @ApiModelProperty("The data separator of Redis STATIC_PREFIX_MATCH
data-type")
+ private String formatDataSeparator;
+}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java
index 1eb3e1398..2c49597c6 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java
@@ -41,6 +41,7 @@ import org.apache.inlong.manager.pojo.sink.kafka.KafkaSink;
import org.apache.inlong.manager.pojo.sink.mysql.MySQLSink;
import org.apache.inlong.manager.pojo.sink.oracle.OracleSink;
import org.apache.inlong.manager.pojo.sink.postgresql.PostgreSQLSink;
+import org.apache.inlong.manager.pojo.sink.redis.RedisSink;
import org.apache.inlong.manager.pojo.sink.sqlserver.SQLServerSink;
import org.apache.inlong.manager.pojo.sink.starrocks.StarRocksSink;
import org.apache.inlong.manager.pojo.sink.tdsqlpostgresql.TDSQLPostgreSQLSink;
@@ -55,6 +56,7 @@ import
org.apache.inlong.sort.protocol.node.format.CanalJsonFormat;
import org.apache.inlong.sort.protocol.node.format.CsvFormat;
import org.apache.inlong.sort.protocol.node.format.DebeziumJsonFormat;
import org.apache.inlong.sort.protocol.node.format.Format;
+import org.apache.inlong.sort.protocol.node.format.InLongMsgFormat;
import org.apache.inlong.sort.protocol.node.format.JsonFormat;
import org.apache.inlong.sort.protocol.node.format.RawFormat;
import org.apache.inlong.sort.protocol.node.load.ClickHouseLoadNode;
@@ -70,6 +72,7 @@ import
org.apache.inlong.sort.protocol.node.load.KafkaLoadNode;
import org.apache.inlong.sort.protocol.node.load.MySqlLoadNode;
import org.apache.inlong.sort.protocol.node.load.OracleLoadNode;
import org.apache.inlong.sort.protocol.node.load.PostgresLoadNode;
+import org.apache.inlong.sort.protocol.node.load.RedisLoadNode;
import org.apache.inlong.sort.protocol.node.load.SqlServerLoadNode;
import org.apache.inlong.sort.protocol.node.load.StarRocksLoadNode;
import org.apache.inlong.sort.protocol.node.load.TDSQLPostgresLoadNode;
@@ -145,6 +148,8 @@ public class LoadNodeUtils {
return createLoadNode((DorisSink) streamSink, fieldInfos,
fieldRelations, properties);
case SinkType.STARROCKS:
return createLoadNode((StarRocksSink) streamSink, fieldInfos,
fieldRelations, properties);
+ case SinkType.REDIS:
+ return createLoadNode((RedisSink) streamSink, fieldInfos,
fieldRelations, properties);
default:
throw new BusinessException(String.format("Unsupported
sinkType=%s to create load node", sinkType));
}
@@ -380,6 +385,64 @@ public class LoadNodeUtils {
starRocksSink.getTablePattern());
}
+ private static LoadNode createLoadNode(
+ RedisSink redisSink,
+ List<FieldInfo> fieldInfos,
+ List<FieldRelation> fieldRelations,
+ Map<String, String> properties) {
+ String clusterMode = redisSink.getClusterMode();
+ String dataType = redisSink.getDataType();
+ String schemaMapMode = redisSink.getSchemaMapMode();
+ String host = redisSink.getHost();
+ Integer port = redisSink.getPort();
+ String clusterNodes = redisSink.getClusterNodes();
+ String sentinelMasterName = redisSink.getSentinelMasterName();
+ String sentinelsInfo = redisSink.getSentinelsInfo();
+ Integer database = redisSink.getDatabase();
+ String password = redisSink.getPassword();
+ Integer ttl = redisSink.getTtl();
+ Integer timeout = redisSink.getTimeout();
+ Integer soTimeout = redisSink.getSoTimeout();
+ Integer maxTotal = redisSink.getMaxTotal();
+ Integer maxIdle = redisSink.getMaxIdle();
+ Integer minIdle = redisSink.getMinIdle();
+ Integer maxRetries = redisSink.getMaxRetries();
+
+ Format format = parsingFormat(
+ redisSink.getFormatDataType(),
+ false,
+ redisSink.getFormatDataSeparator(),
+ false);
+
+ return new RedisLoadNode(
+ redisSink.getSinkName(),
+ redisSink.getSinkName(),
+ fieldInfos,
+ fieldRelations,
+ null,
+ null,
+ null,
+ properties,
+ clusterMode,
+ dataType,
+ schemaMapMode,
+ host,
+ port,
+ clusterNodes,
+ sentinelMasterName,
+ sentinelsInfo,
+ database,
+ password,
+ ttl,
+ format,
+ timeout,
+ soTimeout,
+ maxTotal,
+ maxIdle,
+ minIdle,
+ maxRetries);
+ }
+
/**
* Create load node of Iceberg.
*/
@@ -651,4 +714,59 @@ public class LoadNodeUtils {
}
}
+ /**
+ * Parse format
+ *
+ * @param formatName data serialization, support: csv, json, canal,
avro, etc
+ * @param wrapWithInlongMsg whether wrap content with {@link
InLongMsgFormat}
+ * @param separatorStr the separator of data content
+ * @param ignoreParseErrors whether ignore deserialization error data
+ * @return the format for serialized content
+ */
+ private static Format parsingFormat(
+ String formatName,
+ boolean wrapWithInlongMsg,
+ String separatorStr,
+ boolean ignoreParseErrors) {
+ Format format;
+ DataTypeEnum dataType = DataTypeEnum.forType(formatName);
+ switch (dataType) {
+ case CSV:
+ if (StringUtils.isNumeric(separatorStr)) {
+ char dataSeparator = (char) Integer.parseInt(separatorStr);
+ separatorStr = Character.toString(dataSeparator);
+ }
+ CsvFormat csvFormat = new CsvFormat(separatorStr);
+ csvFormat.setIgnoreParseErrors(ignoreParseErrors);
+ format = csvFormat;
+ break;
+ case AVRO:
+ format = new AvroFormat();
+ break;
+ case JSON:
+ JsonFormat jsonFormat = new JsonFormat();
+ jsonFormat.setIgnoreParseErrors(ignoreParseErrors);
+ format = jsonFormat;
+ break;
+ case CANAL:
+ format = new CanalJsonFormat();
+ break;
+ case DEBEZIUM_JSON:
+ DebeziumJsonFormat debeziumJsonFormat = new
DebeziumJsonFormat();
+ debeziumJsonFormat.setIgnoreParseErrors(ignoreParseErrors);
+ format = debeziumJsonFormat;
+ break;
+ case RAW:
+ format = new RawFormat();
+ break;
+ default:
+ throw new IllegalArgumentException(String.format("Unsupported
dataType=%s", dataType));
+ }
+ if (wrapWithInlongMsg) {
+ Format innerFormat = format;
+ format = new InLongMsgFormat(innerFormat, false);
+ }
+ return format;
+ }
+
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/redis/RedisDataNodeOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/redis/RedisDataNodeOperator.java
new file mode 100644
index 000000000..94930b4cd
--- /dev/null
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/redis/RedisDataNodeOperator.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.node.redis;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.consts.DataNodeType;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.dao.entity.DataNodeEntity;
+import org.apache.inlong.manager.pojo.node.DataNodeInfo;
+import org.apache.inlong.manager.pojo.node.DataNodeRequest;
+import org.apache.inlong.manager.pojo.node.redis.RedisDataNodeDTO;
+import org.apache.inlong.manager.pojo.node.redis.RedisDataNodeInfo;
+import org.apache.inlong.manager.pojo.node.redis.RedisDataNodeRequest;
+import org.apache.inlong.manager.service.node.AbstractDataNodeOperator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+@Service
+public class RedisDataNodeOperator extends AbstractDataNodeOperator {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(RedisDataNodeOperator.class);
+
+ @Autowired
+ private ObjectMapper objectMapper;
+
+ @Override
+ public Boolean accept(String dataNodeType) {
+ return getDataNodeType().equals(dataNodeType);
+ }
+
+ @Override
+ public String getDataNodeType() {
+ return DataNodeType.REDIS;
+ }
+
+ @Override
+ public DataNodeInfo getFromEntity(DataNodeEntity entity) {
+ if (entity == null) {
+ throw new BusinessException(ErrorCodeEnum.DATA_NODE_NOT_FOUND);
+ }
+
+ RedisDataNodeInfo redisDataNodeInfo = new RedisDataNodeInfo();
+ CommonBeanUtils.copyProperties(entity, redisDataNodeInfo);
+ if (StringUtils.isNotBlank(entity.getExtParams())) {
+ RedisDataNodeDTO dto =
RedisDataNodeDTO.getFromJson(entity.getExtParams());
+ CommonBeanUtils.copyProperties(dto, redisDataNodeInfo);
+ }
+ return redisDataNodeInfo;
+ }
+
+ @Override
+ protected void setTargetEntity(DataNodeRequest request, DataNodeEntity
targetEntity) {
+ RedisDataNodeRequest redisDataNodeRequest = (RedisDataNodeRequest)
request;
+ CommonBeanUtils.copyProperties(redisDataNodeRequest, targetEntity,
true);
+ try {
+ RedisDataNodeDTO dto =
RedisDataNodeDTO.getFromRequest(redisDataNodeRequest);
+ targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
+ } catch (Exception e) {
+ throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT,
+ String.format("Failed to build extParams for Redis node:
%s", e.getMessage()));
+ }
+ }
+
+}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/redis/RedisResourceOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/redis/RedisResourceOperator.java
new file mode 100644
index 000000000..264d8bf1d
--- /dev/null
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/redis/RedisResourceOperator.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.resource.sink.redis;
+
+import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.pojo.sink.SinkInfo;
+import org.apache.inlong.manager.service.resource.sink.SinkResourceOperator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Service;
+
+/**
+ * Redis resource operator
+ */
+@Service
+public class RedisResourceOperator implements SinkResourceOperator {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(RedisResourceOperator.class);
+
+ @Override
+ public Boolean accept(String sinkType) {
+ return SinkType.REDIS.equals(sinkType);
+ }
+
+ /**
+ * Create Redis table according to the sink config
+ */
+ @Override
+ public void createSinkResource(SinkInfo sinkInfo) {
+ LOGGER.info("It is not need to create redis table!");
+ }
+
+}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/redis/RedisSinkOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/redis/RedisSinkOperator.java
new file mode 100644
index 000000000..884fc64fd
--- /dev/null
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/redis/RedisSinkOperator.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.sink.redis;
+
+import static org.apache.inlong.manager.common.enums.ErrorCodeEnum.IP_EMPTY;
+import static org.apache.inlong.manager.common.enums.ErrorCodeEnum.PORT_EMPTY;
+import static
org.apache.inlong.manager.common.enums.ErrorCodeEnum.SINK_SAVE_FAILED;
+import static
org.apache.inlong.manager.common.enums.ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT;
+import static
org.apache.inlong.manager.common.util.Preconditions.expectNotBlank;
+import static
org.apache.inlong.manager.common.util.Preconditions.expectNotEmpty;
+import static
org.apache.inlong.manager.common.util.Preconditions.expectNotNull;
+import static org.apache.inlong.manager.common.util.Preconditions.expectTrue;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
+import org.apache.inlong.manager.pojo.sink.SinkField;
+import org.apache.inlong.manager.pojo.sink.SinkRequest;
+import org.apache.inlong.manager.pojo.sink.StreamSink;
+import org.apache.inlong.manager.pojo.sink.redis.RedisClusterMode;
+import org.apache.inlong.manager.pojo.sink.redis.RedisDataType;
+import org.apache.inlong.manager.pojo.sink.redis.RedisSchemaMapMode;
+import org.apache.inlong.manager.pojo.sink.redis.RedisSink;
+import org.apache.inlong.manager.pojo.sink.redis.RedisSinkDTO;
+import org.apache.inlong.manager.pojo.sink.redis.RedisSinkRequest;
+import org.apache.inlong.manager.service.sink.AbstractSinkOperator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Redis sink operator, such as save or update redis field, etc.
+ */
+@Service
+public class RedisSinkOperator extends AbstractSinkOperator {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(RedisSinkOperator.class);
+ private static final int PORT_MAX_VALUE = 65535;
+
+ @Autowired
+ private ObjectMapper objectMapper;
+
+ @Override
+ public Boolean accept(String sinkType) {
+ return SinkType.REDIS.equals(sinkType);
+ }
+
+ @Override
+ protected String getSinkType() {
+ return SinkType.REDIS;
+ }
+
+ @Override
+ protected void setTargetEntity(SinkRequest request, StreamSinkEntity
targetEntity) {
+
+ if (!this.getSinkType().equals(request.getSinkType())) {
+ throw new BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED,
+ SINK_TYPE_NOT_SUPPORT.getMessage() + ": " + getSinkType());
+ }
+
+ RedisSinkRequest sinkRequest = (RedisSinkRequest) request;
+
+ String clusterMode = sinkRequest.getClusterMode();
+ RedisClusterMode redisClusterMode = RedisClusterMode.of(clusterMode);
+
+ expectNotNull(redisClusterMode,
+ "Redis ClusterMode must in one of " +
Arrays.toString(RedisClusterMode.values()) + " !");
+
+ switch (redisClusterMode) {
+ case CLUSTER:
+ String clusterNodes = sinkRequest.getClusterNodes();
+ checkClusterNodes(clusterNodes);
+ break;
+ case SENTINEL:
+ String sentinelMasterName =
sinkRequest.getSentinelMasterName();
+ expectNotEmpty(sentinelMasterName, "Redis MasterName of
Sentinel cluster must not null!");
+ String sentinelsInfo = sinkRequest.getSentinelsInfo();
+ expectNotEmpty(sentinelsInfo, "Redis sentinelsInfo of Sentinel
cluster must not null!");
+ break;
+ case STANDALONE:
+ String host = sinkRequest.getHost();
+ Integer port = sinkRequest.getPort();
+
+ expectNotEmpty(host, "Redis server host must not null!");
+ expectTrue(
+ port != null && port > 1 && port < PORT_MAX_VALUE,
+ "The port of the redis server must be greater than 0
and less than 65535!");
+ break;
+ }
+ RedisDataType dataType =
RedisDataType.valueOf(sinkRequest.getDataType());
+ expectNotNull(dataType, "Redis DataType must not null");
+
+ RedisSchemaMapMode mapMode =
RedisSchemaMapMode.valueOf(sinkRequest.getSchemaMapMode());
+ expectTrue(dataType.getMapModes().contains(mapMode),
+ "Redis schemaMapMode '" + mapMode + "' is not supported in '"
+ dataType + "'");
+
+ try {
+ RedisSinkDTO dto = RedisSinkDTO.getFromRequest(sinkRequest);
+ targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
+ } catch (Exception e) {
+ throw new BusinessException(SINK_SAVE_FAILED,
+ String.format("serialize extParams of Redis SinkDTO
failure: %s", e.getMessage()));
+ }
+ }
+
+ private void checkClusterNodes(String clusterNodes) {
+ expectNotBlank(clusterNodes, "the nodes of Redis cluster must not
null");
+ String[] nodeArray = clusterNodes.split(",");
+ expectNotEmpty(nodeArray, "the nodes of Redis cluster must not null");
+
+ for (String node : nodeArray) {
+ expectNotBlank(node, "Redis server host must not null!");
+ String[] ipPort = node.split(":");
+ expectTrue(ipPort.length == 2, "The ip and port of Redis server
must be in form: ip:port");
+ expectNotBlank(ipPort[0], IP_EMPTY);
+ expectNotBlank(ipPort[1], PORT_EMPTY);
+ }
+ }
+
+ @Override
+ public StreamSink getFromEntity(StreamSinkEntity entity) {
+ RedisSink sink = new RedisSink();
+ if (entity == null) {
+ return sink;
+ }
+
+ RedisSinkDTO dto = RedisSinkDTO.getFromJson(entity.getExtParams());
+
+ CommonBeanUtils.copyProperties(entity, sink, true);
+ CommonBeanUtils.copyProperties(dto, sink, true);
+ List<SinkField> sinkFields = super.getSinkFields(entity.getId());
+ sink.setSinkFieldList(sinkFields);
+ return sink;
+ }
+
+}
diff --git
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/RedisSinkServiceTest.java
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/RedisSinkServiceTest.java
new file mode 100644
index 000000000..9f16c7370
--- /dev/null
+++
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/RedisSinkServiceTest.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.sink;
+
+import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.pojo.sink.SinkRequest;
+import org.apache.inlong.manager.pojo.sink.StreamSink;
+import org.apache.inlong.manager.pojo.sink.redis.RedisClusterMode;
+import org.apache.inlong.manager.pojo.sink.redis.RedisSink;
+import org.apache.inlong.manager.pojo.sink.redis.RedisSinkRequest;
+import org.apache.inlong.manager.service.ServiceBaseTest;
+import org.apache.inlong.manager.service.core.impl.InlongStreamServiceTest;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+
+/**
+ * Redis stream sink service test.
+ */
+public class RedisSinkServiceTest extends ServiceBaseTest {
+
+ private final String globalGroupId = "b_group1";
+ private final String globalStreamId = "stream1_hudi";
+ private final String globalOperator = "admin";
+
+ @Autowired
+ private StreamSinkService sinkService;
+ @Autowired
+ private InlongStreamServiceTest streamServiceTest;
+
+ /**
+ * Save sink info.
+ */
+ public Integer saveSink(String sinkName) {
+ streamServiceTest.saveInlongStream(globalGroupId, globalStreamId,
globalOperator);
+ RedisSinkRequest sinkRequest = new RedisSinkRequest();
+ sinkRequest.setInlongGroupId(globalGroupId);
+ sinkRequest.setInlongStreamId(globalStreamId);
+ sinkRequest.setSinkType(SinkType.REDIS);
+ sinkRequest.setClusterMode(RedisClusterMode.STANDALONE.name());
+ sinkRequest.setHost("demo-host");
+ sinkRequest.setPort(6300);
+ sinkRequest.setDataType("HASH");
+ sinkRequest.setSchemaMapMode("DYNAMIC");
+ sinkRequest.setSinkName(sinkName);
+ sinkRequest.setId((int) (Math.random() * 100000 + 1));
+ return sinkService.save(sinkRequest, globalOperator);
+ }
+
+ @Test
+ public void testSaveAndDelete() {
+ Integer id = this.saveSink("default1");
+ Assertions.assertNotNull(id);
+ boolean result = sinkService.delete(id, false, globalOperator);
+ Assertions.assertTrue(result);
+ }
+
+ @Test
+ public void testListByIdentifier() {
+ Integer id = this.saveSink("default2");
+ StreamSink sink = sinkService.get(id);
+ Assertions.assertEquals(globalGroupId, sink.getInlongGroupId());
+ sinkService.delete(id, false, globalOperator);
+ }
+
+ @Test
+ public void testGetAndUpdate() {
+ Integer sinkId = this.saveSink("default3");
+ StreamSink streamSink = sinkService.get(sinkId);
+ Assertions.assertEquals(globalGroupId, streamSink.getInlongGroupId());
+
+ RedisSink sink = (RedisSink) streamSink;
+ SinkRequest request = sink.genSinkRequest();
+ boolean result = sinkService.update(request, globalOperator);
+ Assertions.assertTrue(result);
+
+ sinkService.delete(sinkId, false, globalOperator);
+ }
+
+}