This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new f5f06fa5d [INLONG-7334][Manager] Support stream join dimension table
(#7345)
f5f06fa5d is described below
commit f5f06fa5d281e06af1407bed5c9decc0511c39eb
Author: jiachengjiang <[email protected]>
AuthorDate: Sun Feb 12 12:21:28 2023 +0800
[INLONG-7334][Manager] Support stream join dimension table (#7345)
---
.../inlong/manager/common/enums/TransformType.java | 15 +++
.../manager/pojo/sort/util/ExtractNodeUtils.java | 107 +++++++++++-----
.../manager/pojo/sort/util/FieldRelationUtils.java | 3 +
.../pojo/sort/util/FilterFunctionUtils.java | 6 +
.../manager/pojo/sort/util/NodeRelationUtils.java | 134 ++++++++++++++++++++-
.../manager/pojo/sort/util/StreamParseUtils.java | 51 ++++++--
.../pojo/source/redis/RedisLookupOptions.java | 59 +++++++++
.../manager/pojo/source/redis/RedisSource.java | 70 +++++------
.../manager/pojo/source/redis/RedisSourceDTO.java | 107 ++++++++--------
.../pojo/source/redis/RedisSourceRequest.java | 67 +++++------
.../transform/joiner/IntervalJoinerDefinition.java | 129 ++++++++++++++++++++
.../transform/joiner/LookUpJoinerDefinition.java | 70 +++++++++++
.../transform/joiner/TemporalJoinerDefinition.java | 121 +++++++++++++++++++
.../service/source/RedisSourceServiceTest.java | 6 +-
14 files changed, 768 insertions(+), 177 deletions(-)
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/TransformType.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/TransformType.java
index 6c125ce55..d02052ee4 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/TransformType.java
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/TransformType.java
@@ -48,6 +48,21 @@ public enum TransformType {
*/
JOINER("joiner"),
+ /**
+ * A lookup join is typically used to enrich a table with data that is
queried from an external system
+ */
+ LOOKUP_JOINER("lookup_joiner"),
+
+ /**
+ * Temporal joins allow joining against a versioned table
+ */
+ TEMPORAL_JOINER("temporal_joiner"),
+
+ /**
+ * Returns a simple Cartesian product restricted by the join condition and
a time constraint
+ */
+ INTERVAL_JOINER("interval_joiner"),
+
/**
* Encrypt records on given fields
*/
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/ExtractNodeUtils.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/ExtractNodeUtils.java
index 72dda505c..a6eabf75a 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/ExtractNodeUtils.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/ExtractNodeUtils.java
@@ -33,6 +33,7 @@ import
org.apache.inlong.manager.pojo.source.mysql.MySQLBinlogSource;
import org.apache.inlong.manager.pojo.source.oracle.OracleSource;
import org.apache.inlong.manager.pojo.source.postgresql.PostgreSQLSource;
import org.apache.inlong.manager.pojo.source.pulsar.PulsarSource;
+import org.apache.inlong.manager.pojo.source.redis.RedisLookupOptions;
import org.apache.inlong.manager.pojo.source.redis.RedisSource;
import org.apache.inlong.manager.pojo.source.sqlserver.SQLServerSource;
import org.apache.inlong.manager.pojo.source.tubemq.TubeMQSource;
@@ -384,33 +385,71 @@ public class ExtractNodeUtils {
public static RedisExtractNode createExtractNode(RedisSource source) {
List<FieldInfo> fieldInfos = parseFieldInfos(source.getFieldList(),
source.getSourceName());
Map<String, String> properties =
parseProperties(source.getProperties());
- RedisCommand command = RedisCommand.forName(source.getRedisCommand());
- RedisMode mode = RedisMode.forName(source.getRedisMode());
- LookupOptions lookupOptions = new
LookupOptions(source.getLookupCacheMaxRows(), source.getLookupCacheTtl(),
- source.getLookupMaxRetries(), source.getLookupAsync());
- return new RedisExtractNode(
- source.getSourceName(),
- source.getSourceName(),
- fieldInfos,
- null,
- properties,
- source.getPrimaryKey(),
- mode,
- command,
- source.getClusterNodes(),
- source.getMasterName(),
- source.getSentinelsInfo(),
- source.getHostname(),
- source.getPort(),
- source.getPassword(),
- source.getAdditionalKey(),
- source.getDatabase(),
- source.getTimeout(),
- source.getSoTimeout(),
- source.getMaxTotal(),
- source.getMaxIdle(),
- source.getMinIdle(),
- lookupOptions);
+ RedisMode redisMode = RedisMode.forName(source.getRedisMode());
+ switch (redisMode) {
+ case STANDALONE:
+ return new RedisExtractNode(
+ source.getSourceName(),
+ source.getSourceName(),
+ fieldInfos,
+ null,
+ properties,
+ source.getPrimaryKey(),
+ RedisCommand.forName(source.getCommand()),
+ source.getHost(),
+ source.getPort(),
+ source.getPassword(),
+ source.getAdditionalKey(),
+ source.getDatabase(),
+ source.getTimeout(),
+ source.getSoTimeout(),
+ source.getMaxTotal(),
+ source.getMaxIdle(),
+ source.getMinIdle(),
+ parseLookupOptions(source.getLookupOptions()));
+ case SENTINEL:
+ return new RedisExtractNode(
+ source.getSourceName(),
+ source.getSourceName(),
+ fieldInfos,
+ null,
+ properties,
+ source.getPrimaryKey(),
+ RedisCommand.forName(source.getCommand()),
+ source.getMasterName(),
+ source.getSentinelsInfo(),
+ source.getPassword(),
+ source.getAdditionalKey(),
+ source.getDatabase(),
+ source.getTimeout(),
+ source.getSoTimeout(),
+ source.getMaxTotal(),
+ source.getMaxIdle(),
+ source.getMinIdle(),
+ parseLookupOptions(source.getLookupOptions()));
+ case CLUSTER:
+ return new RedisExtractNode(
+ source.getSourceName(),
+ source.getSourceName(),
+ fieldInfos,
+ null,
+ properties,
+ source.getPrimaryKey(),
+ RedisCommand.forName(source.getCommand()),
+ source.getClusterNodes(),
+ source.getPassword(),
+ source.getAdditionalKey(),
+ source.getDatabase(),
+ source.getTimeout(),
+ source.getSoTimeout(),
+ source.getMaxTotal(),
+ source.getMaxIdle(),
+ source.getMinIdle(),
+ parseLookupOptions(source.getLookupOptions()));
+ default:
+ throw new IllegalArgumentException(String.format("Unsupported
redis-mode=%s for Inlong", redisMode));
+ }
+
}
/**
@@ -520,4 +559,18 @@ public class ExtractNodeUtils {
.collect(Collectors.toMap(Map.Entry::getKey, e ->
e.getValue().toString()));
}
+ /**
+ * Parse LookupOptions
+ *
+ * @param options
+ * @return LookupOptions
+ */
+ private static LookupOptions parseLookupOptions(RedisLookupOptions
options) {
+ if (options == null) {
+ return null;
+ }
+ return new LookupOptions(options.getLookupCacheMaxRows(),
options.getLookupCacheTtl(),
+ options.getLookupMaxRetries(), options.getLookupAsync());
+ }
+
}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/FieldRelationUtils.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/FieldRelationUtils.java
index 9f017eda5..5837b935a 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/FieldRelationUtils.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/FieldRelationUtils.java
@@ -87,6 +87,9 @@ public class FieldRelationUtils {
case FILTER:
return createFieldRelations(fieldList, constantFieldMap);
case JOINER:
+ case LOOKUP_JOINER:
+ case TEMPORAL_JOINER:
+ case INTERVAL_JOINER:
return createJoinerFieldRelations(fieldList, constantFieldMap);
default:
throw new UnsupportedOperationException(
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/FilterFunctionUtils.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/FilterFunctionUtils.java
index be28d26ed..90741f651 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/FilterFunctionUtils.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/FilterFunctionUtils.java
@@ -72,6 +72,9 @@ public class FilterFunctionUtils {
case DE_DUPLICATION:
case SPLITTER:
case JOINER:
+ case LOOKUP_JOINER:
+ case TEMPORAL_JOINER:
+ case INTERVAL_JOINER:
case STRING_REPLACER:
case ENCRYPT:
return Lists.newArrayList();
@@ -124,6 +127,9 @@ public class FilterFunctionUtils {
case DE_DUPLICATION:
case SPLITTER:
case JOINER:
+ case LOOKUP_JOINER:
+ case TEMPORAL_JOINER:
+ case INTERVAL_JOINER:
case STRING_REPLACER:
case ENCRYPT:
return null;
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/NodeRelationUtils.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/NodeRelationUtils.java
index 5896e2887..929337038 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/NodeRelationUtils.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/NodeRelationUtils.java
@@ -21,6 +21,7 @@ import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.common.enums.MetaField;
import org.apache.inlong.manager.common.enums.TransformType;
import org.apache.inlong.manager.pojo.sink.StreamSink;
import org.apache.inlong.manager.pojo.source.StreamSource;
@@ -31,8 +32,11 @@ import org.apache.inlong.manager.pojo.stream.StreamPipeline;
import org.apache.inlong.manager.pojo.stream.StreamTransform;
import org.apache.inlong.manager.pojo.transform.TransformDefinition;
import org.apache.inlong.manager.pojo.transform.TransformResponse;
+import
org.apache.inlong.manager.pojo.transform.joiner.IntervalJoinerDefinition;
import org.apache.inlong.manager.pojo.transform.joiner.JoinerDefinition;
import
org.apache.inlong.manager.pojo.transform.joiner.JoinerDefinition.JoinMode;
+import org.apache.inlong.manager.pojo.transform.joiner.LookUpJoinerDefinition;
+import
org.apache.inlong.manager.pojo.transform.joiner.TemporalJoinerDefinition;
import org.apache.inlong.sort.protocol.FieldInfo;
import org.apache.inlong.sort.protocol.StreamInfo;
import org.apache.inlong.sort.protocol.node.Node;
@@ -44,15 +48,20 @@ import
org.apache.inlong.sort.protocol.transformation.operator.AndOperator;
import org.apache.inlong.sort.protocol.transformation.operator.EmptyOperator;
import org.apache.inlong.sort.protocol.transformation.operator.EqualOperator;
import
org.apache.inlong.sort.protocol.transformation.relation.InnerJoinNodeRelation;
+import
org.apache.inlong.sort.protocol.transformation.relation.IntervalJoinRelation;
import
org.apache.inlong.sort.protocol.transformation.relation.LeftOuterJoinNodeRelation;
+import
org.apache.inlong.sort.protocol.transformation.relation.LeftOuterTemporalJoinRelation;
import org.apache.inlong.sort.protocol.transformation.relation.NodeRelation;
import
org.apache.inlong.sort.protocol.transformation.relation.RightOuterJoinNodeRelation;
import
org.apache.inlong.sort.protocol.transformation.relation.UnionNodeRelation;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.stream.Collectors;
/**
@@ -61,6 +70,17 @@ import java.util.stream.Collectors;
@Slf4j
public class NodeRelationUtils {
+ private static final Set<TransformType> JOIN_NODES = new HashSet<>();
+
+ static {
+
+ JOIN_NODES.add(TransformType.JOINER);
+ JOIN_NODES.add(TransformType.LOOKUP_JOINER);
+ JOIN_NODES.add(TransformType.INTERVAL_JOINER);
+ JOIN_NODES.add(TransformType.TEMPORAL_JOINER);
+
+ }
+
/**
* Create node relation for the given stream
*/
@@ -115,7 +135,7 @@ public class NodeRelationUtils {
.map(node -> (TransformNode) node)
.filter(transformNode -> {
TransformDefinition transformDefinition =
transformTypeMap.get(transformNode.getName());
- return transformDefinition.getTransformType() ==
TransformType.JOINER;
+ return
JOIN_NODES.contains(transformDefinition.getTransformType());
}).collect(Collectors.toMap(TransformNode::getName,
transformNode -> transformNode));
List<NodeRelation> relations = streamInfo.getRelations();
@@ -128,8 +148,34 @@ public class NodeRelationUtils {
String nodeName = outputs.get(0);
if (joinNodes.get(nodeName) != null) {
TransformDefinition transformDefinition =
transformTypeMap.get(nodeName);
- joinRelations.add(getNodeRelation((JoinerDefinition)
transformDefinition, relation));
- shipIterator.remove();
+ TransformType transformType =
transformDefinition.getTransformType();
+ switch (transformType) {
+ case JOINER:
+
joinRelations.add(getNodeRelation((JoinerDefinition) transformDefinition,
relation));
+ shipIterator.remove();
+ break;
+ case LOOKUP_JOINER:
+ assert transformDefinition instanceof
LookUpJoinerDefinition;
+
joinRelations.add(getNodeRelation((LookUpJoinerDefinition) transformDefinition,
relation));
+ shipIterator.remove();
+ break;
+ case INTERVAL_JOINER:
+ if (transformDefinition instanceof
IntervalJoinerDefinition) {
+ joinRelations
+
.add(getNodeRelation((IntervalJoinerDefinition) transformDefinition, relation));
+ }
+ shipIterator.remove();
+ break;
+ case TEMPORAL_JOINER:
+ assert transformDefinition instanceof
TemporalJoinerDefinition;
+ joinRelations
+
.add(getNodeRelation((TemporalJoinerDefinition) transformDefinition, relation));
+ shipIterator.remove();
+ break;
+ default:
+ throw new IllegalArgumentException(
+ String.format("Unsupported transformType
for %s", transformType));
+ }
}
}
}
@@ -169,6 +215,88 @@ public class NodeRelationUtils {
}
}
+ private static NodeRelation getNodeRelation(LookUpJoinerDefinition
joinerDefinition, NodeRelation nodeRelation) {
+ String leftNode = getNodeName(joinerDefinition.getLeftNode());
+ String rightNode = getNodeName(joinerDefinition.getRightNode());
+ List<String> preNodes = Lists.newArrayList(leftNode, rightNode);
+ List<StreamField> leftJoinFields =
joinerDefinition.getLeftJoinFields();
+ List<StreamField> rightJoinFields =
joinerDefinition.getRightJoinFields();
+ List<FilterFunction> filterFunctions = Lists.newArrayList();
+ for (int index = 0; index < leftJoinFields.size(); index++) {
+ StreamField leftField = leftJoinFields.get(index);
+ StreamField rightField = rightJoinFields.get(index);
+ LogicOperator operator;
+ if (index != 0) {
+ operator = AndOperator.getInstance();
+ } else {
+ operator = EmptyOperator.getInstance();
+ }
+ filterFunctions.add(createFilterFunction(leftField, rightField,
operator));
+ }
+ Map<String, List<FilterFunction>> joinConditions = new HashMap<>();
+ joinConditions.put(rightNode, filterFunctions);
+ FieldInfo systemTime = new FieldInfo(MetaField.PROCESS_TIME.name());
+ return new LeftOuterTemporalJoinRelation(preNodes,
nodeRelation.getOutputs(), joinConditions, systemTime);
+ }
+
+ private static NodeRelation getNodeRelation(IntervalJoinerDefinition
joinerDefinition, NodeRelation nodeRelation) {
+ String leftNode = getNodeName(joinerDefinition.getLeftNode());
+ String rightNode = getNodeName(joinerDefinition.getRightNode());
+ List<String> preNodes = Lists.newArrayList(leftNode, rightNode);
+ List<StreamField> leftJoinFields =
joinerDefinition.getLeftJoinFields();
+ List<StreamField> rightJoinFields =
joinerDefinition.getRightJoinFields();
+ List<FilterFunction> filterFunctions = Lists.newArrayList();
+ for (int index = 0; index < leftJoinFields.size(); index++) {
+ StreamField leftField = leftJoinFields.get(index);
+ StreamField rightField = rightJoinFields.get(index);
+ LogicOperator operator;
+ if (index != 0) {
+ operator = AndOperator.getInstance();
+ } else {
+ operator = EmptyOperator.getInstance();
+ }
+ filterFunctions.add(createFilterFunction(leftField, rightField,
operator));
+ }
+ LinkedHashMap<String, List<FilterFunction>> joinConditions = new
LinkedHashMap<>();
+ joinConditions.put(rightNode, filterFunctions);
+
+ return new IntervalJoinRelation(preNodes, nodeRelation.getOutputs(),
joinConditions);
+
+ }
+
+ private static NodeRelation getNodeRelation(TemporalJoinerDefinition
joinerDefinition, NodeRelation nodeRelation) {
+ JoinMode joinMode = joinerDefinition.getJoinMode();
+ String leftNode = getNodeName(joinerDefinition.getLeftNode());
+ String rightNode = getNodeName(joinerDefinition.getRightNode());
+ List<String> preNodes = Lists.newArrayList(leftNode, rightNode);
+ List<StreamField> leftJoinFields =
joinerDefinition.getLeftJoinFields();
+ List<StreamField> rightJoinFields =
joinerDefinition.getRightJoinFields();
+ List<FilterFunction> filterFunctions = Lists.newArrayList();
+ for (int index = 0; index < leftJoinFields.size(); index++) {
+ StreamField leftField = leftJoinFields.get(index);
+ StreamField rightField = rightJoinFields.get(index);
+ LogicOperator operator;
+ if (index != 0) {
+ operator = AndOperator.getInstance();
+ } else {
+ operator = EmptyOperator.getInstance();
+ }
+ filterFunctions.add(createFilterFunction(leftField, rightField,
operator));
+ }
+ Map<String, List<FilterFunction>> joinConditions = new HashMap<>();
+ joinConditions.put(rightNode, filterFunctions);
+ switch (joinMode) {
+ case LEFT_JOIN:
+ return new LeftOuterJoinNodeRelation(preNodes,
nodeRelation.getOutputs(), joinConditions);
+ case INNER_JOIN:
+ return new InnerJoinNodeRelation(preNodes,
nodeRelation.getOutputs(), joinConditions);
+ case RIGHT_JOIN:
+ return new RightOuterJoinNodeRelation(preNodes,
nodeRelation.getOutputs(), joinConditions);
+ default:
+ throw new IllegalArgumentException(String.format("Unsupported
join mode=%s for inlong", joinMode));
+ }
+ }
+
private static SingleValueFilterFunction createFilterFunction(StreamField
leftField, StreamField rightField,
LogicOperator operator) {
FieldInfo sourceField = new FieldInfo(leftField.getOriginFieldName(),
leftField.getOriginNodeName(),
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/StreamParseUtils.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/StreamParseUtils.java
index 5fe859296..399bf6b40 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/StreamParseUtils.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/StreamParseUtils.java
@@ -31,7 +31,10 @@ import
org.apache.inlong.manager.pojo.transform.TransformDefinition;
import
org.apache.inlong.manager.pojo.transform.deduplication.DeDuplicationDefinition;
import org.apache.inlong.manager.pojo.transform.encrypt.EncryptDefinition;
import org.apache.inlong.manager.pojo.transform.filter.FilterDefinition;
+import
org.apache.inlong.manager.pojo.transform.joiner.IntervalJoinerDefinition;
import org.apache.inlong.manager.pojo.transform.joiner.JoinerDefinition;
+import org.apache.inlong.manager.pojo.transform.joiner.LookUpJoinerDefinition;
+import
org.apache.inlong.manager.pojo.transform.joiner.TemporalJoinerDefinition;
import
org.apache.inlong.manager.pojo.transform.replacer.StringReplacerDefinition;
import org.apache.inlong.manager.pojo.transform.splitter.SplitterDefinition;
@@ -53,11 +56,12 @@ public class StreamParseUtils {
public static TransformDefinition parseTransformDefinition(String
transformDefinition,
TransformType transformType) {
+ JsonObject joinerJson = GSON.fromJson(transformDefinition,
JsonObject.class);
switch (transformType) {
case FILTER:
return GSON.fromJson(transformDefinition,
FilterDefinition.class);
case JOINER:
- return parseJoinerDefinition(transformDefinition);
+ return parseJoinerDefinition(transformDefinition, joinerJson);
case SPLITTER:
return GSON.fromJson(transformDefinition,
SplitterDefinition.class);
case DE_DUPLICATION:
@@ -66,19 +70,52 @@ public class StreamParseUtils {
return GSON.fromJson(transformDefinition,
StringReplacerDefinition.class);
case ENCRYPT:
return GSON.fromJson(transformDefinition,
EncryptDefinition.class);
+ case LOOKUP_JOINER:
+ return parseLookupJoinerDefinition(transformDefinition,
joinerJson);
+ case TEMPORAL_JOINER:
+ return parseTemporalJoinerDefinition(transformDefinition,
joinerJson);
+ case INTERVAL_JOINER:
+ return parseIntervalJoinerDefinition(transformDefinition,
joinerJson);
default:
throw new IllegalArgumentException(String.format("Unsupported
transformType for %s", transformType));
}
}
- public static JoinerDefinition parseJoinerDefinition(String
transformDefinition) {
+ public static JoinerDefinition parseJoinerDefinition(String
transformDefinition, JsonObject joinerJson) {
JoinerDefinition joinerDefinition = GSON.fromJson(transformDefinition,
JoinerDefinition.class);
- JsonObject joinerJson = GSON.fromJson(transformDefinition,
JsonObject.class);
- JsonObject leftNode = joinerJson.getAsJsonObject(LEFT_NODE);
- StreamNode leftStreamNode = parseNode(leftNode);
+ StreamNode leftStreamNode =
parseNode(joinerJson.getAsJsonObject(LEFT_NODE));
+ joinerDefinition.setLeftNode(leftStreamNode);
+ StreamNode rightStreamNode =
parseNode(joinerJson.getAsJsonObject(RIGHT_NODE));
+ joinerDefinition.setRightNode(rightStreamNode);
+ return joinerDefinition;
+ }
+
+ public static LookUpJoinerDefinition parseLookupJoinerDefinition(String
transformDefinition,
+ JsonObject joinerJson) {
+ LookUpJoinerDefinition joinerDefinition =
GSON.fromJson(transformDefinition, LookUpJoinerDefinition.class);
+ StreamNode leftStreamNode =
parseNode(joinerJson.getAsJsonObject(LEFT_NODE));
+ joinerDefinition.setLeftNode(leftStreamNode);
+ StreamNode rightStreamNode =
parseNode(joinerJson.getAsJsonObject(RIGHT_NODE));
+ joinerDefinition.setRightNode(rightStreamNode);
+ return joinerDefinition;
+ }
+
+ public static TemporalJoinerDefinition
parseTemporalJoinerDefinition(String transformDefinition,
+ JsonObject joinerJson) {
+ TemporalJoinerDefinition joinerDefinition =
GSON.fromJson(transformDefinition, TemporalJoinerDefinition.class);
+ StreamNode leftStreamNode =
parseNode(joinerJson.getAsJsonObject(LEFT_NODE));
+ joinerDefinition.setLeftNode(leftStreamNode);
+ StreamNode rightStreamNode =
parseNode(joinerJson.getAsJsonObject(RIGHT_NODE));
+ joinerDefinition.setRightNode(rightStreamNode);
+ return joinerDefinition;
+ }
+
+ public static IntervalJoinerDefinition
parseIntervalJoinerDefinition(String transformDefinition,
+ JsonObject joinerJson) {
+ IntervalJoinerDefinition joinerDefinition =
GSON.fromJson(transformDefinition, IntervalJoinerDefinition.class);
+ StreamNode leftStreamNode =
parseNode(joinerJson.getAsJsonObject(LEFT_NODE));
joinerDefinition.setLeftNode(leftStreamNode);
- JsonObject rightNode = joinerJson.getAsJsonObject("rightNode");
- StreamNode rightStreamNode = parseNode(rightNode);
+ StreamNode rightStreamNode =
parseNode(joinerJson.getAsJsonObject(RIGHT_NODE));
joinerDefinition.setRightNode(rightStreamNode);
return joinerDefinition;
}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/redis/RedisLookupOptions.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/redis/RedisLookupOptions.java
new file mode 100644
index 000000000..e3d7d6c46
--- /dev/null
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/redis/RedisLookupOptions.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.source.redis;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import lombok.ToString;
+
+/**
+ * Lookup options
+ */
+
+@Data
+@AllArgsConstructor
+@NoArgsConstructor
+@ToString(callSuper = true)
+@ApiModel(value = "Redis Lookup options")
+public class RedisLookupOptions {
+
+ /**
+ * Lookup cache max rows
+ */
+ @ApiModelProperty("Lookup cache max rows")
+ private Long lookupCacheMaxRows;
+ /**
+ * Lookup cache ttl
+ */
+ @ApiModelProperty("Lookup cache ttl")
+ private Long lookupCacheTtl;
+ /**
+ * Lookup max retries
+ */
+ @ApiModelProperty("Lookup max retries")
+ private Integer lookupMaxRetries;
+ /**
+ * Lookup async
+ */
+ @ApiModelProperty("Lookup async")
+ private Boolean lookupAsync;
+
+}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/redis/RedisSource.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/redis/RedisSource.java
index cea322b5e..a91c28b85 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/redis/RedisSource.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/redis/RedisSource.java
@@ -20,7 +20,6 @@ package org.apache.inlong.manager.pojo.source.redis;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.AllArgsConstructor;
-import lombok.Builder;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;
@@ -43,69 +42,59 @@ import org.apache.inlong.manager.pojo.source.StreamSource;
@JsonTypeDefine(value = SourceType.REDIS)
public class RedisSource extends StreamSource {
- @ApiModelProperty("Username of the redis server")
- private String username;
+ @ApiModelProperty("Redis primaryKey")
+ private String primaryKey;
- @ApiModelProperty("Password of the redis server")
- private String password;
+ @ApiModelProperty("Redis host")
+ private String host;
- @ApiModelProperty("Hostname of the redis server")
- private String hostname;
+ @ApiModelProperty("Redis port")
+ private Integer port;
- @ApiModelProperty("Port of the redis server")
- @Builder.Default
- private Integer port = 6379;
+ @ApiModelProperty("Redis username")
+ private String username;
- @ApiModelProperty("Primary key")
- private String primaryKey;
+ @ApiModelProperty("Redis password")
+ private String password;
- @ApiModelProperty("Redis command, supports: hget, get, zscore, zrevrank")
- private String redisCommand;
+ @ApiModelProperty("Redis database")
+ private Integer database;
- @ApiModelProperty("Redis deploy mode, supports: standalone, cluster,
sentinel")
+ @ApiModelProperty("Redis deploy Mode(standalone/cluster/sentinel)")
private String redisMode;
- @ApiModelProperty("Cluster node infos only used for redis cluster deploy
mode")
- private String clusterNodes;
-
- @ApiModelProperty("Master name only used for redis sentinel deploy mode")
- private String masterName;
-
- @ApiModelProperty("Sentinels info only used for redis sentinel deploy
mode")
- private String sentinelsInfo;
+ @ApiModelProperty("supportted in
Sort-connector-redis(hget/get/zscore/zrevrank)")
+ private String command;
- @ApiModelProperty("Additional key only used for hash/Sorted-set data type")
+ @ApiModelProperty("The additional key connect to redis only used for
[Hash|Sorted-Set] data type")
private String additionalKey;
- @ApiModelProperty("Database number connect to redis for redis
standalone/sentinel deploy modes")
- private Integer database;
-
- @ApiModelProperty("Timeout value of connect to redis")
+ @ApiModelProperty("The timeout connect to redis")
private Integer timeout;
- @ApiModelProperty("Timeout value of read data from redis")
+ @ApiModelProperty("The soTimeout connect to redis")
private Integer soTimeout;
- @ApiModelProperty("Max connection number to redis")
+ @ApiModelProperty("The maxTotal connect to redis")
private Integer maxTotal;
- @ApiModelProperty("Max free connection number")
+ @ApiModelProperty("The maxIdle connect to redis")
private Integer maxIdle;
- @ApiModelProperty("Min free connection number")
+ @ApiModelProperty("The minIdle connect to redis")
private Integer minIdle;
- @ApiModelProperty("Lookup Async")
- private Boolean lookupAsync;
+ @ApiModelProperty("The lookup options for connector redis")
+ private RedisLookupOptions lookupOptions;
- @ApiModelProperty("Lookup cache max rows")
- private Long lookupCacheMaxRows;
+ @ApiModelProperty("The masterName for connector redis")
+ private String masterName;
- @ApiModelProperty("Lookup cache ttl")
- private Long lookupCacheTtl;
+ @ApiModelProperty("The sentinelsInfo for connector redis")
+ private String sentinelsInfo;
- @ApiModelProperty("Lookup max retry times")
- private Integer lookupMaxRetries;
+ @ApiModelProperty("The clusterNodes for connector redis")
+ private String clusterNodes;
public RedisSource() {
this.setSourceType(SourceType.REDIS);
@@ -115,4 +104,5 @@ public class RedisSource extends StreamSource {
public SourceRequest genSourceRequest() {
return CommonBeanUtils.copyProperties(this, RedisSourceRequest::new);
}
+
}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/redis/RedisSourceDTO.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/redis/RedisSourceDTO.java
index 89facbbd6..d3704ae24 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/redis/RedisSourceDTO.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/redis/RedisSourceDTO.java
@@ -17,6 +17,8 @@
package org.apache.inlong.manager.pojo.source.redis;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
import io.swagger.annotations.ApiModelProperty;
import lombok.AllArgsConstructor;
import lombok.Builder;
@@ -24,10 +26,8 @@ import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
-import org.apache.inlong.manager.common.util.JsonUtils;
import javax.validation.constraints.NotNull;
-import java.util.Map;
/**
* redis source info
@@ -38,108 +38,97 @@ import java.util.Map;
@AllArgsConstructor
public class RedisSourceDTO {
- @ApiModelProperty("Username of the redis server")
- private String username;
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
- @ApiModelProperty("Password of the redis server")
- private String password;
+ @ApiModelProperty("Redis primaryKey")
+ private String primaryKey;
- @ApiModelProperty("Hostname of the redis server")
- private String hostname;
+ @ApiModelProperty("Redis host")
+ private String host;
- @ApiModelProperty("Port of the redis server")
+ @ApiModelProperty("Redis port")
private Integer port;
- @ApiModelProperty("Primary key")
- private String primaryKey;
-
- @ApiModelProperty("Redis command, supports: hget, get, zscore, zrevrank")
- private String redisCommand;
+ @ApiModelProperty("Redis username")
+ private String username;
- @ApiModelProperty("Redis deploy mode, supports: standalone, cluster,
sentinel")
- private String redisMode;
+ @ApiModelProperty("Redis password")
+ private String password;
- @ApiModelProperty("Cluster node infos only used for redis cluster deploy
mode")
- private String clusterNodes;
+ @ApiModelProperty("Redis database")
+ private Integer database;
- @ApiModelProperty("Master name only used for redis sentinel deploy mode")
- private String masterName;
+ @ApiModelProperty("Redis deploy Mode(standalone/cluster/sentinel)")
+ private String redisMode;
- @ApiModelProperty("Sentinels info only used for redis sentinel deploy
mode")
- private String sentinelsInfo;
+ @ApiModelProperty("supportted in
Sort-connector-redis(hget/get/zscore/zrevrank)")
+ private String command;
- @ApiModelProperty("Additional key only used for hash/Sorted-set data type")
+ @ApiModelProperty("The additional key connect to redis only used for
[Hash|Sorted-Set] data type")
private String additionalKey;
- @ApiModelProperty("Database number connect to redis for redis
standalone/sentinel deploy modes")
- private Integer database;
-
- @ApiModelProperty("Timeout value of connect to redis")
+ @ApiModelProperty("The timeout connect to redis")
private Integer timeout;
- @ApiModelProperty("Timeout value of read data from redis")
+ @ApiModelProperty("The soTimeout connect to redis")
private Integer soTimeout;
- @ApiModelProperty("Max connection number to redis")
+ @ApiModelProperty("The maxTotal connect to redis")
private Integer maxTotal;
- @ApiModelProperty("Max free connection number")
+ @ApiModelProperty("The maxIdle connect to redis")
private Integer maxIdle;
- @ApiModelProperty("Min free connection number")
+ @ApiModelProperty("The minIdle connect to redis")
private Integer minIdle;
- @ApiModelProperty("Lookup cache max rows")
- private Long lookupCacheMaxRows;
+ @ApiModelProperty("The lookup options for connector redis")
+ private RedisLookupOptions lookupOptions;
- @ApiModelProperty("Lookup cache ttl")
- private Long lookupCacheTtl;
-
- @ApiModelProperty("Lookup max retry times")
- private Integer lookupMaxRetries;
+ @ApiModelProperty("The masterName for connector redis")
+ private String masterName;
- @ApiModelProperty("Lookup Async")
- private Boolean lookupAsync;
+ @ApiModelProperty("The sentinelsInfo for connector redis")
+ private String sentinelsInfo;
- @ApiModelProperty("Properties for redis")
- private Map<String, Object> properties;
+ @ApiModelProperty("The clusterNodes for connector redis")
+ private String clusterNodes;
/**
- * Get the dto instance from request
+ * Get the dto instance from the request
*/
public static RedisSourceDTO getFromRequest(RedisSourceRequest request) {
return RedisSourceDTO.builder()
+ .primaryKey(request.getPrimaryKey())
+ .host(request.getHost())
+ .port(request.getPort())
.username(request.getUsername())
.password(request.getPassword())
- .hostname(request.getHostname())
- .port(request.getPort())
- .primaryKey(request.getPrimaryKey())
- .redisCommand(request.getRedisCommand())
+ .database(request.getDatabase())
.redisMode(request.getRedisMode())
- .clusterNodes(request.getClusterNodes())
- .masterName(request.getMasterName())
- .sentinelsInfo(request.getSentinelsInfo())
+ .command(request.getCommand())
.additionalKey(request.getAdditionalKey())
- .database(request.getDatabase())
.timeout(request.getTimeout())
.soTimeout(request.getSoTimeout())
.maxTotal(request.getMaxTotal())
.maxIdle(request.getMaxIdle())
.minIdle(request.getMinIdle())
- .lookupCacheMaxRows(request.getLookupCacheMaxRows())
- .lookupCacheTtl(request.getLookupCacheTtl())
- .lookupMaxRetries(request.getLookupMaxRetries())
- .lookupAsync(request.getLookupAsync())
- .properties(request.getProperties())
+ .lookupOptions(request.getLookupOptions())
+ .masterName(request.getMasterName())
+ .sentinelsInfo(request.getSentinelsInfo())
+ .clusterNodes(request.getClusterNodes())
.build();
}
+ /**
+ * Get the dto instance from the JSON string
+ */
public static RedisSourceDTO getFromJson(@NotNull String extParams) {
try {
- return JsonUtils.parseObject(extParams, RedisSourceDTO.class);
+
OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,
false);
+ return OBJECT_MAPPER.readValue(extParams, RedisSourceDTO.class);
} catch (Exception e) {
- throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT,
- String.format("parse extParams of RedisSource failure:
%s", e.getMessage()));
+ throw new
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage());
}
}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/redis/RedisSourceRequest.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/redis/RedisSourceRequest.java
index b53c75725..c5af1e78b 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/redis/RedisSourceRequest.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/redis/RedisSourceRequest.java
@@ -36,68 +36,59 @@ import org.apache.inlong.manager.pojo.source.SourceRequest;
@JsonTypeDefine(value = SourceType.REDIS)
public class RedisSourceRequest extends SourceRequest {
- @ApiModelProperty("Username of the redis server")
- private String username;
+ @ApiModelProperty("Redis primaryKey")
+ private String primaryKey;
- @ApiModelProperty("Password of the redis server")
- private String password;
+ @ApiModelProperty("Redis host")
+ private String host;
- @ApiModelProperty("Hostname of the redis server")
- private String hostname;
+ @ApiModelProperty("Redis port")
+ private Integer port;
- @ApiModelProperty("Port of the redis server")
- private Integer port = 6379;
+ @ApiModelProperty("Redis username")
+ private String username;
- @ApiModelProperty("Primary key")
- private String primaryKey;
+ @ApiModelProperty("Redis password")
+ private String password;
- @ApiModelProperty("Redis command, supports: hget, get, zscore, zrevrank")
- private String redisCommand;
+ @ApiModelProperty("Redis database")
+ private Integer database;
- @ApiModelProperty("Redis deploy mode, supports: standalone, cluster,
sentinel")
+ @ApiModelProperty("Redis deploy Mode(standalone/cluster/sentinel)")
private String redisMode;
- @ApiModelProperty("Cluster node infos only used for redis cluster deploy
mode")
- private String clusterNodes;
+ @ApiModelProperty("supportted in
Sort-connector-redis(hget/get/zscore/zrevrank)")
+ private String command;
- @ApiModelProperty("Master name only used for redis sentinel deploy mode")
- private String masterName;
-
- @ApiModelProperty("Sentinels info only used for redis sentinel deploy
mode")
- private String sentinelsInfo;
-
- @ApiModelProperty("Additional key only used for hash/Sorted-set data type")
+ @ApiModelProperty("The additional key connect to redis only used for
[Hash|Sorted-Set] data type")
private String additionalKey;
- @ApiModelProperty("Database number connect to redis for redis
standalone/sentinel deploy modes")
- private Integer database;
-
- @ApiModelProperty("Timeout value of connect to redis")
+ @ApiModelProperty("The timeout connect to redis")
private Integer timeout;
- @ApiModelProperty("Timeout value of read data from redis")
+ @ApiModelProperty("The soTimeout connect to redis")
private Integer soTimeout;
- @ApiModelProperty("Max connection number to redis")
+ @ApiModelProperty("The maxTotal connect to redis")
private Integer maxTotal;
- @ApiModelProperty("Max free connection number")
+ @ApiModelProperty("The maxIdle connect to redis")
private Integer maxIdle;
- @ApiModelProperty("Min free connection number")
+ @ApiModelProperty("The minIdle connect to redis")
private Integer minIdle;
- @ApiModelProperty("Lookup Async")
- private Boolean lookupAsync;
+ @ApiModelProperty("The lookup options for connector redis")
+ private RedisLookupOptions lookupOptions;
- @ApiModelProperty("Lookup cache max rows")
- private Long lookupCacheMaxRows;
+ @ApiModelProperty("The masterName for connector redis")
+ private String masterName;
- @ApiModelProperty("Lookup cache ttl")
- private Long lookupCacheTtl;
+ @ApiModelProperty("The sentinelsInfo for connector redis")
+ private String sentinelsInfo;
- @ApiModelProperty("Lookup max retry times")
- private Integer lookupMaxRetries;
+ @ApiModelProperty("The clusterNodes for connector redis")
+ private String clusterNodes;
public RedisSourceRequest() {
this.setSourceType(SourceType.REDIS);
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/transform/joiner/IntervalJoinerDefinition.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/transform/joiner/IntervalJoinerDefinition.java
new file mode 100644
index 000000000..d38b1f01d
--- /dev/null
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/transform/joiner/IntervalJoinerDefinition.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.transform.joiner;
+
+import com.fasterxml.jackson.annotation.JsonFormat;
+import lombok.Builder;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import org.apache.inlong.manager.common.enums.TransformType;
+import org.apache.inlong.manager.pojo.stream.StreamField;
+import org.apache.inlong.manager.pojo.stream.StreamNode;
+import org.apache.inlong.manager.pojo.transform.TransformDefinition;
+
+import java.util.List;
+
+/**
+ * A class to define operations to join two streamNode in one with relation
defined.
+ * Returns a simple Cartesian product restricted by the join condition and a
time constraint
+ */
+@Data
+@EqualsAndHashCode(callSuper = true)
+@Builder
+public class IntervalJoinerDefinition extends TransformDefinition {
+
+ public IntervalJoinerDefinition(StreamNode leftNode,
+ StreamNode rightNode,
+ List<StreamField> leftJoinFields,
+ List<StreamField> rightJoinFields,
+ String leftTimeColumn,
+ String rightTimeColumn,
+ String forwardInterval,
+ String backwardInterval,
+ TimeUnit forwardIntervalUnit,
+ TimeUnit backwardIntervalUnit) {
+ this.transformType = TransformType.INTERVAL_JOINER;
+ this.leftNode = leftNode;
+ this.rightNode = rightNode;
+ this.leftJoinFields = leftJoinFields;
+ this.rightJoinFields = rightJoinFields;
+ this.leftTimeColumn = leftTimeColumn;
+ this.rightTimeColumn = rightTimeColumn;
+ this.forwardInterval = forwardInterval;
+ this.backwardInterval = backwardInterval;
+ this.forwardIntervalUnit = forwardIntervalUnit;
+ this.backwardIntervalUnit = backwardIntervalUnit;
+ }
+
+ /**
+ * Left node for join
+ */
+ private StreamNode leftNode;
+
+ /**
+ * Right node for join
+ */
+ private StreamNode rightNode;
+
+ /**
+ * Join streamFields from left node
+ */
+ private List<StreamField> leftJoinFields;
+
+ /**
+ * Join streamFields from right node
+ */
+ private List<StreamField> rightJoinFields;
+
+ /**
+ * Left table time column
+ */
+ private String leftTimeColumn;
+
+ /**
+ * Right table time column
+ */
+ private String rightTimeColumn;
+
+ /**
+ * Forward interval time (seconds/minutes/hours)
+ */
+ private String forwardInterval;
+
+ /**
+ * Backward interval time (seconds/minutes/hours)
+ */
+ private String backwardInterval;
+
+ /**
+ * Forward interval time (seconds/minutes/hours)
+ */
+ private TimeUnit forwardIntervalUnit;
+
+ /**
+ * Backward interval time (seconds/minutes/hours)
+ */
+ private TimeUnit backwardIntervalUnit;
+
+ @JsonFormat
+ public enum TimeUnit {
+ /**
+ * Time unit for second
+ */
+ SECOND,
+ /**
+ * Time unit for minute
+ */
+ MINUTE,
+ /**
+ * Time unit for hour
+ */
+ HOUR
+ }
+
+}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/transform/joiner/LookUpJoinerDefinition.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/transform/joiner/LookUpJoinerDefinition.java
new file mode 100644
index 000000000..9b52db3d3
--- /dev/null
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/transform/joiner/LookUpJoinerDefinition.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.transform.joiner;
+
+import lombok.Builder;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import org.apache.inlong.manager.common.enums.TransformType;
+import org.apache.inlong.manager.pojo.stream.StreamField;
+import org.apache.inlong.manager.pojo.stream.StreamNode;
+import org.apache.inlong.manager.pojo.transform.TransformDefinition;
+
+import java.util.List;
+
+/**
+ * A class to define operations to join two streamNode in one with relation
defined.
+ * (A lookup join is typically used to enrich a table with data that is
queried from an external system)
+ */
+@Data
+@EqualsAndHashCode(callSuper = true)
+@Builder
+public class LookUpJoinerDefinition extends TransformDefinition {
+
+ public LookUpJoinerDefinition(StreamNode leftNode,
+ StreamNode rightNode,
+ List<StreamField> leftJoinFields,
+ List<StreamField> rightJoinFields) {
+ this.transformType = TransformType.LOOKUP_JOINER;
+ this.leftNode = leftNode;
+ this.rightNode = rightNode;
+ this.leftJoinFields = leftJoinFields;
+ this.rightJoinFields = rightJoinFields;
+ }
+
+ /**
+ * Left node for join
+ */
+ private StreamNode leftNode;
+
+ /**
+ * Right node for join
+ */
+ private StreamNode rightNode;
+
+ /**
+ * Join streamFields from left node
+ */
+ private List<StreamField> leftJoinFields;
+
+ /**
+ * Join streamFields from right node
+ */
+ private List<StreamField> rightJoinFields;
+
+}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/transform/joiner/TemporalJoinerDefinition.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/transform/joiner/TemporalJoinerDefinition.java
new file mode 100644
index 000000000..86301201d
--- /dev/null
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/transform/joiner/TemporalJoinerDefinition.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.transform.joiner;
+
+import com.fasterxml.jackson.annotation.JsonFormat;
+import lombok.Builder;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import org.apache.inlong.manager.common.enums.TransformType;
+import org.apache.inlong.manager.pojo.stream.StreamField;
+import org.apache.inlong.manager.pojo.stream.StreamNode;
+import org.apache.inlong.manager.pojo.transform.TransformDefinition;
+
+import java.util.List;
+
+/**
+ * A class to define operations to join two streamNode in one with relation
defined.
+ * Temporal joins allow joining against a versioned table
+ */
+@Data
+@EqualsAndHashCode(callSuper = true)
+@Builder
+public class TemporalJoinerDefinition extends TransformDefinition {
+
+ public TemporalJoinerDefinition(TemporalJoinWay temporalJoinWay,
+ StreamNode leftNode,
+ String leftTimeColumn,
+ String rightTimeColumn,
+ StreamNode rightNode,
+ List<StreamField> leftJoinFields,
+ List<StreamField> rightJoinFields,
+ JoinerDefinition.JoinMode joinMode) {
+ this.transformType = TransformType.TEMPORAL_JOINER;
+ this.temporalJoinWay = temporalJoinWay;
+ this.leftNode = leftNode;
+ this.leftTimeColumn = leftTimeColumn;
+ this.rightTimeColumn = rightTimeColumn;
+ this.rightNode = rightNode;
+ this.leftJoinFields = leftJoinFields;
+ this.rightJoinFields = rightJoinFields;
+ this.joinMode = joinMode;
+ }
+
+ /**
+ * Temporal join time attribute
+ * (Event Time Temporal Join/Processing Time Temporal Join )
+ */
+ private TemporalJoinWay temporalJoinWay;
+
+ /**
+ * Left node for join
+ */
+ private StreamNode leftNode;
+
+ /**
+ * Left table time column
+ */
+ private String leftTimeColumn;
+
+ /**
+ * Right table time column
+ */
+ private String rightTimeColumn;
+
+ /**
+ * Right node for join
+ */
+ private StreamNode rightNode;
+
+ /**
+ * Join streamFields from left node
+ */
+ private List<StreamField> leftJoinFields;
+
+ /**
+ * Join streamFields from right node
+ */
+ private List<StreamField> rightJoinFields;
+
+ @JsonFormat
+ public enum JoinMode {
+ LEFT_JOIN, INNER_JOIN
+ }
+
+ /**
+ * Join mode for join transform
+ */
+ private JoinerDefinition.JoinMode joinMode;
+
+ /**
+ * The TimeUnit class defines an enumeration of time units
+ */
+ public enum TemporalJoinWay {
+ /**
+ * Temporal joins allow joining against a versioned table.
+ * This means a table can be enriched with changing metadata and
retrieve its value at a certain point in time.
+ */
+ EVENT,
+ /**
+ * A processing time temporal table join uses a processing-time
attribute to
+ * correlate rows to the latest version of a key in an external
versioned table.
+ */
+ PROCESSING
+ }
+
+}
diff --git
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/source/RedisSourceServiceTest.java
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/source/RedisSourceServiceTest.java
index 55c5ada3a..380b830dd 100644
---
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/source/RedisSourceServiceTest.java
+++
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/source/RedisSourceServiceTest.java
@@ -36,7 +36,7 @@ public class RedisSourceServiceTest extends ServiceBaseTest {
private static final String hostname = "127.0.0.1";
private static final Integer port = 6379;
private static final String redisMode = "standalone";
- private static final String redisCommand = "get";
+ private static final String command = "get";
private final String sourceName = "stream_source_service_test";
@Autowired
@@ -55,9 +55,9 @@ public class RedisSourceServiceTest extends ServiceBaseTest {
sourceInfo.setInlongStreamId(GLOBAL_STREAM_ID);
sourceInfo.setSourceName(sourceName);
sourceInfo.setSourceType(SourceType.REDIS);
- sourceInfo.setHostname(hostname);
+ sourceInfo.setHost(hostname);
sourceInfo.setPort(port);
- sourceInfo.setRedisCommand(redisCommand);
+ sourceInfo.setCommand(command);
sourceInfo.setRedisMode(redisMode);
return sourceService.save(sourceInfo, GLOBAL_OPERATOR);
}