This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new f5f06fa5d [INLONG-7334][Manager] Support stream join dimension table 
(#7345)
f5f06fa5d is described below

commit f5f06fa5d281e06af1407bed5c9decc0511c39eb
Author: jiachengjiang <[email protected]>
AuthorDate: Sun Feb 12 12:21:28 2023 +0800

    [INLONG-7334][Manager] Support stream join dimension table (#7345)
---
 .../inlong/manager/common/enums/TransformType.java |  15 +++
 .../manager/pojo/sort/util/ExtractNodeUtils.java   | 107 +++++++++++-----
 .../manager/pojo/sort/util/FieldRelationUtils.java |   3 +
 .../pojo/sort/util/FilterFunctionUtils.java        |   6 +
 .../manager/pojo/sort/util/NodeRelationUtils.java  | 134 ++++++++++++++++++++-
 .../manager/pojo/sort/util/StreamParseUtils.java   |  51 ++++++--
 .../pojo/source/redis/RedisLookupOptions.java      |  59 +++++++++
 .../manager/pojo/source/redis/RedisSource.java     |  70 +++++------
 .../manager/pojo/source/redis/RedisSourceDTO.java  | 107 ++++++++--------
 .../pojo/source/redis/RedisSourceRequest.java      |  67 +++++------
 .../transform/joiner/IntervalJoinerDefinition.java | 129 ++++++++++++++++++++
 .../transform/joiner/LookUpJoinerDefinition.java   |  70 +++++++++++
 .../transform/joiner/TemporalJoinerDefinition.java | 121 +++++++++++++++++++
 .../service/source/RedisSourceServiceTest.java     |   6 +-
 14 files changed, 768 insertions(+), 177 deletions(-)

diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/TransformType.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/TransformType.java
index 6c125ce55..d02052ee4 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/TransformType.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/TransformType.java
@@ -48,6 +48,21 @@ public enum TransformType {
      */
     JOINER("joiner"),
 
+    /**
+     * A lookup join is typically used to enrich a table with data that is 
queried from an external system
+     */
+    LOOKUP_JOINER("lookup_joiner"),
+
+    /**
+     * Temporal joins allow joining against a versioned table
+     */
+    TEMPORAL_JOINER("temporal_joiner"),
+
+    /**
+     * Returns a simple Cartesian product restricted by the join condition and 
a time constraint
+     */
+    INTERVAL_JOINER("interval_joiner"),
+
     /**
      * Encrypt records on given fields
      */
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/ExtractNodeUtils.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/ExtractNodeUtils.java
index 72dda505c..a6eabf75a 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/ExtractNodeUtils.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/ExtractNodeUtils.java
@@ -33,6 +33,7 @@ import 
org.apache.inlong.manager.pojo.source.mysql.MySQLBinlogSource;
 import org.apache.inlong.manager.pojo.source.oracle.OracleSource;
 import org.apache.inlong.manager.pojo.source.postgresql.PostgreSQLSource;
 import org.apache.inlong.manager.pojo.source.pulsar.PulsarSource;
+import org.apache.inlong.manager.pojo.source.redis.RedisLookupOptions;
 import org.apache.inlong.manager.pojo.source.redis.RedisSource;
 import org.apache.inlong.manager.pojo.source.sqlserver.SQLServerSource;
 import org.apache.inlong.manager.pojo.source.tubemq.TubeMQSource;
@@ -384,33 +385,71 @@ public class ExtractNodeUtils {
     public static RedisExtractNode createExtractNode(RedisSource source) {
         List<FieldInfo> fieldInfos = parseFieldInfos(source.getFieldList(), 
source.getSourceName());
         Map<String, String> properties = 
parseProperties(source.getProperties());
-        RedisCommand command = RedisCommand.forName(source.getRedisCommand());
-        RedisMode mode = RedisMode.forName(source.getRedisMode());
-        LookupOptions lookupOptions = new 
LookupOptions(source.getLookupCacheMaxRows(), source.getLookupCacheTtl(),
-                source.getLookupMaxRetries(), source.getLookupAsync());
-        return new RedisExtractNode(
-                source.getSourceName(),
-                source.getSourceName(),
-                fieldInfos,
-                null,
-                properties,
-                source.getPrimaryKey(),
-                mode,
-                command,
-                source.getClusterNodes(),
-                source.getMasterName(),
-                source.getSentinelsInfo(),
-                source.getHostname(),
-                source.getPort(),
-                source.getPassword(),
-                source.getAdditionalKey(),
-                source.getDatabase(),
-                source.getTimeout(),
-                source.getSoTimeout(),
-                source.getMaxTotal(),
-                source.getMaxIdle(),
-                source.getMinIdle(),
-                lookupOptions);
+        RedisMode redisMode = RedisMode.forName(source.getRedisMode());
+        switch (redisMode) {
+            case STANDALONE:
+                return new RedisExtractNode(
+                        source.getSourceName(),
+                        source.getSourceName(),
+                        fieldInfos,
+                        null,
+                        properties,
+                        source.getPrimaryKey(),
+                        RedisCommand.forName(source.getCommand()),
+                        source.getHost(),
+                        source.getPort(),
+                        source.getPassword(),
+                        source.getAdditionalKey(),
+                        source.getDatabase(),
+                        source.getTimeout(),
+                        source.getSoTimeout(),
+                        source.getMaxTotal(),
+                        source.getMaxIdle(),
+                        source.getMinIdle(),
+                        parseLookupOptions(source.getLookupOptions()));
+            case SENTINEL:
+                return new RedisExtractNode(
+                        source.getSourceName(),
+                        source.getSourceName(),
+                        fieldInfos,
+                        null,
+                        properties,
+                        source.getPrimaryKey(),
+                        RedisCommand.forName(source.getCommand()),
+                        source.getMasterName(),
+                        source.getSentinelsInfo(),
+                        source.getPassword(),
+                        source.getAdditionalKey(),
+                        source.getDatabase(),
+                        source.getTimeout(),
+                        source.getSoTimeout(),
+                        source.getMaxTotal(),
+                        source.getMaxIdle(),
+                        source.getMinIdle(),
+                        parseLookupOptions(source.getLookupOptions()));
+            case CLUSTER:
+                return new RedisExtractNode(
+                        source.getSourceName(),
+                        source.getSourceName(),
+                        fieldInfos,
+                        null,
+                        properties,
+                        source.getPrimaryKey(),
+                        RedisCommand.forName(source.getCommand()),
+                        source.getClusterNodes(),
+                        source.getPassword(),
+                        source.getAdditionalKey(),
+                        source.getDatabase(),
+                        source.getTimeout(),
+                        source.getSoTimeout(),
+                        source.getMaxTotal(),
+                        source.getMaxIdle(),
+                        source.getMinIdle(),
+                        parseLookupOptions(source.getLookupOptions()));
+            default:
+                throw new IllegalArgumentException(String.format("Unsupported 
redis-mode=%s for Inlong", redisMode));
+        }
+
     }
 
     /**
@@ -520,4 +559,18 @@ public class ExtractNodeUtils {
                 .collect(Collectors.toMap(Map.Entry::getKey, e -> 
e.getValue().toString()));
     }
 
+    /**
+     * Parse LookupOptions
+     *
+     * @param options
+     * @return LookupOptions
+     */
+    private static LookupOptions parseLookupOptions(RedisLookupOptions 
options) {
+        if (options == null) {
+            return null;
+        }
+        return new LookupOptions(options.getLookupCacheMaxRows(), 
options.getLookupCacheTtl(),
+                options.getLookupMaxRetries(), options.getLookupAsync());
+    }
+
 }
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/FieldRelationUtils.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/FieldRelationUtils.java
index 9f017eda5..5837b935a 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/FieldRelationUtils.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/FieldRelationUtils.java
@@ -87,6 +87,9 @@ public class FieldRelationUtils {
             case FILTER:
                 return createFieldRelations(fieldList, constantFieldMap);
             case JOINER:
+            case LOOKUP_JOINER:
+            case TEMPORAL_JOINER:
+            case INTERVAL_JOINER:
                 return createJoinerFieldRelations(fieldList, constantFieldMap);
             default:
                 throw new UnsupportedOperationException(
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/FilterFunctionUtils.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/FilterFunctionUtils.java
index be28d26ed..90741f651 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/FilterFunctionUtils.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/FilterFunctionUtils.java
@@ -72,6 +72,9 @@ public class FilterFunctionUtils {
             case DE_DUPLICATION:
             case SPLITTER:
             case JOINER:
+            case LOOKUP_JOINER:
+            case TEMPORAL_JOINER:
+            case INTERVAL_JOINER:
             case STRING_REPLACER:
             case ENCRYPT:
                 return Lists.newArrayList();
@@ -124,6 +127,9 @@ public class FilterFunctionUtils {
             case DE_DUPLICATION:
             case SPLITTER:
             case JOINER:
+            case LOOKUP_JOINER:
+            case TEMPORAL_JOINER:
+            case INTERVAL_JOINER:
             case STRING_REPLACER:
             case ENCRYPT:
                 return null;
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/NodeRelationUtils.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/NodeRelationUtils.java
index 5896e2887..929337038 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/NodeRelationUtils.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/NodeRelationUtils.java
@@ -21,6 +21,7 @@ import com.google.common.collect.Lists;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.common.enums.MetaField;
 import org.apache.inlong.manager.common.enums.TransformType;
 import org.apache.inlong.manager.pojo.sink.StreamSink;
 import org.apache.inlong.manager.pojo.source.StreamSource;
@@ -31,8 +32,11 @@ import org.apache.inlong.manager.pojo.stream.StreamPipeline;
 import org.apache.inlong.manager.pojo.stream.StreamTransform;
 import org.apache.inlong.manager.pojo.transform.TransformDefinition;
 import org.apache.inlong.manager.pojo.transform.TransformResponse;
+import 
org.apache.inlong.manager.pojo.transform.joiner.IntervalJoinerDefinition;
 import org.apache.inlong.manager.pojo.transform.joiner.JoinerDefinition;
 import 
org.apache.inlong.manager.pojo.transform.joiner.JoinerDefinition.JoinMode;
+import org.apache.inlong.manager.pojo.transform.joiner.LookUpJoinerDefinition;
+import 
org.apache.inlong.manager.pojo.transform.joiner.TemporalJoinerDefinition;
 import org.apache.inlong.sort.protocol.FieldInfo;
 import org.apache.inlong.sort.protocol.StreamInfo;
 import org.apache.inlong.sort.protocol.node.Node;
@@ -44,15 +48,20 @@ import 
org.apache.inlong.sort.protocol.transformation.operator.AndOperator;
 import org.apache.inlong.sort.protocol.transformation.operator.EmptyOperator;
 import org.apache.inlong.sort.protocol.transformation.operator.EqualOperator;
 import 
org.apache.inlong.sort.protocol.transformation.relation.InnerJoinNodeRelation;
+import 
org.apache.inlong.sort.protocol.transformation.relation.IntervalJoinRelation;
 import 
org.apache.inlong.sort.protocol.transformation.relation.LeftOuterJoinNodeRelation;
+import 
org.apache.inlong.sort.protocol.transformation.relation.LeftOuterTemporalJoinRelation;
 import org.apache.inlong.sort.protocol.transformation.relation.NodeRelation;
 import 
org.apache.inlong.sort.protocol.transformation.relation.RightOuterJoinNodeRelation;
 import 
org.apache.inlong.sort.protocol.transformation.relation.UnionNodeRelation;
 
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 /**
@@ -61,6 +70,17 @@ import java.util.stream.Collectors;
 @Slf4j
 public class NodeRelationUtils {
 
+    private static final Set<TransformType> JOIN_NODES = new HashSet<>();
+
+    static {
+
+        JOIN_NODES.add(TransformType.JOINER);
+        JOIN_NODES.add(TransformType.LOOKUP_JOINER);
+        JOIN_NODES.add(TransformType.INTERVAL_JOINER);
+        JOIN_NODES.add(TransformType.TEMPORAL_JOINER);
+
+    }
+
     /**
      * Create node relation for the given stream
      */
@@ -115,7 +135,7 @@ public class NodeRelationUtils {
                 .map(node -> (TransformNode) node)
                 .filter(transformNode -> {
                     TransformDefinition transformDefinition = 
transformTypeMap.get(transformNode.getName());
-                    return transformDefinition.getTransformType() == 
TransformType.JOINER;
+                    return 
JOIN_NODES.contains(transformDefinition.getTransformType());
                 }).collect(Collectors.toMap(TransformNode::getName, 
transformNode -> transformNode));
 
         List<NodeRelation> relations = streamInfo.getRelations();
@@ -128,8 +148,34 @@ public class NodeRelationUtils {
                 String nodeName = outputs.get(0);
                 if (joinNodes.get(nodeName) != null) {
                     TransformDefinition transformDefinition = 
transformTypeMap.get(nodeName);
-                    joinRelations.add(getNodeRelation((JoinerDefinition) 
transformDefinition, relation));
-                    shipIterator.remove();
+                    TransformType transformType = 
transformDefinition.getTransformType();
+                    switch (transformType) {
+                        case JOINER:
+                            
joinRelations.add(getNodeRelation((JoinerDefinition) transformDefinition, 
relation));
+                            shipIterator.remove();
+                            break;
+                        case LOOKUP_JOINER:
+                            assert transformDefinition instanceof 
LookUpJoinerDefinition;
+                            
joinRelations.add(getNodeRelation((LookUpJoinerDefinition) transformDefinition, 
relation));
+                            shipIterator.remove();
+                            break;
+                        case INTERVAL_JOINER:
+                            if (transformDefinition instanceof 
IntervalJoinerDefinition) {
+                                joinRelations
+                                        
.add(getNodeRelation((IntervalJoinerDefinition) transformDefinition, relation));
+                            }
+                            shipIterator.remove();
+                            break;
+                        case TEMPORAL_JOINER:
+                            assert transformDefinition instanceof 
TemporalJoinerDefinition;
+                            joinRelations
+                                    
.add(getNodeRelation((TemporalJoinerDefinition) transformDefinition, relation));
+                            shipIterator.remove();
+                            break;
+                        default:
+                            throw new IllegalArgumentException(
+                                    String.format("Unsupported transformType 
for %s", transformType));
+                    }
                 }
             }
         }
@@ -169,6 +215,88 @@ public class NodeRelationUtils {
         }
     }
 
+    private static NodeRelation getNodeRelation(LookUpJoinerDefinition 
joinerDefinition, NodeRelation nodeRelation) {
+        String leftNode = getNodeName(joinerDefinition.getLeftNode());
+        String rightNode = getNodeName(joinerDefinition.getRightNode());
+        List<String> preNodes = Lists.newArrayList(leftNode, rightNode);
+        List<StreamField> leftJoinFields = 
joinerDefinition.getLeftJoinFields();
+        List<StreamField> rightJoinFields = 
joinerDefinition.getRightJoinFields();
+        List<FilterFunction> filterFunctions = Lists.newArrayList();
+        for (int index = 0; index < leftJoinFields.size(); index++) {
+            StreamField leftField = leftJoinFields.get(index);
+            StreamField rightField = rightJoinFields.get(index);
+            LogicOperator operator;
+            if (index != 0) {
+                operator = AndOperator.getInstance();
+            } else {
+                operator = EmptyOperator.getInstance();
+            }
+            filterFunctions.add(createFilterFunction(leftField, rightField, 
operator));
+        }
+        Map<String, List<FilterFunction>> joinConditions = new HashMap<>();
+        joinConditions.put(rightNode, filterFunctions);
+        FieldInfo systemTime = new FieldInfo(MetaField.PROCESS_TIME.name());
+        return new LeftOuterTemporalJoinRelation(preNodes, 
nodeRelation.getOutputs(), joinConditions, systemTime);
+    }
+
+    private static NodeRelation getNodeRelation(IntervalJoinerDefinition 
joinerDefinition, NodeRelation nodeRelation) {
+        String leftNode = getNodeName(joinerDefinition.getLeftNode());
+        String rightNode = getNodeName(joinerDefinition.getRightNode());
+        List<String> preNodes = Lists.newArrayList(leftNode, rightNode);
+        List<StreamField> leftJoinFields = 
joinerDefinition.getLeftJoinFields();
+        List<StreamField> rightJoinFields = 
joinerDefinition.getRightJoinFields();
+        List<FilterFunction> filterFunctions = Lists.newArrayList();
+        for (int index = 0; index < leftJoinFields.size(); index++) {
+            StreamField leftField = leftJoinFields.get(index);
+            StreamField rightField = rightJoinFields.get(index);
+            LogicOperator operator;
+            if (index != 0) {
+                operator = AndOperator.getInstance();
+            } else {
+                operator = EmptyOperator.getInstance();
+            }
+            filterFunctions.add(createFilterFunction(leftField, rightField, 
operator));
+        }
+        LinkedHashMap<String, List<FilterFunction>> joinConditions = new 
LinkedHashMap<>();
+        joinConditions.put(rightNode, filterFunctions);
+
+        return new IntervalJoinRelation(preNodes, nodeRelation.getOutputs(), 
joinConditions);
+
+    }
+
+    private static NodeRelation getNodeRelation(TemporalJoinerDefinition 
joinerDefinition, NodeRelation nodeRelation) {
+        JoinMode joinMode = joinerDefinition.getJoinMode();
+        String leftNode = getNodeName(joinerDefinition.getLeftNode());
+        String rightNode = getNodeName(joinerDefinition.getRightNode());
+        List<String> preNodes = Lists.newArrayList(leftNode, rightNode);
+        List<StreamField> leftJoinFields = 
joinerDefinition.getLeftJoinFields();
+        List<StreamField> rightJoinFields = 
joinerDefinition.getRightJoinFields();
+        List<FilterFunction> filterFunctions = Lists.newArrayList();
+        for (int index = 0; index < leftJoinFields.size(); index++) {
+            StreamField leftField = leftJoinFields.get(index);
+            StreamField rightField = rightJoinFields.get(index);
+            LogicOperator operator;
+            if (index != 0) {
+                operator = AndOperator.getInstance();
+            } else {
+                operator = EmptyOperator.getInstance();
+            }
+            filterFunctions.add(createFilterFunction(leftField, rightField, 
operator));
+        }
+        Map<String, List<FilterFunction>> joinConditions = new HashMap<>();
+        joinConditions.put(rightNode, filterFunctions);
+        switch (joinMode) {
+            case LEFT_JOIN:
+                return new LeftOuterJoinNodeRelation(preNodes, 
nodeRelation.getOutputs(), joinConditions);
+            case INNER_JOIN:
+                return new InnerJoinNodeRelation(preNodes, 
nodeRelation.getOutputs(), joinConditions);
+            case RIGHT_JOIN:
+                return new RightOuterJoinNodeRelation(preNodes, 
nodeRelation.getOutputs(), joinConditions);
+            default:
+                throw new IllegalArgumentException(String.format("Unsupported 
join mode=%s for inlong", joinMode));
+        }
+    }
+
     private static SingleValueFilterFunction createFilterFunction(StreamField 
leftField, StreamField rightField,
             LogicOperator operator) {
         FieldInfo sourceField = new FieldInfo(leftField.getOriginFieldName(), 
leftField.getOriginNodeName(),
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/StreamParseUtils.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/StreamParseUtils.java
index 5fe859296..399bf6b40 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/StreamParseUtils.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/StreamParseUtils.java
@@ -31,7 +31,10 @@ import 
org.apache.inlong.manager.pojo.transform.TransformDefinition;
 import 
org.apache.inlong.manager.pojo.transform.deduplication.DeDuplicationDefinition;
 import org.apache.inlong.manager.pojo.transform.encrypt.EncryptDefinition;
 import org.apache.inlong.manager.pojo.transform.filter.FilterDefinition;
+import 
org.apache.inlong.manager.pojo.transform.joiner.IntervalJoinerDefinition;
 import org.apache.inlong.manager.pojo.transform.joiner.JoinerDefinition;
+import org.apache.inlong.manager.pojo.transform.joiner.LookUpJoinerDefinition;
+import 
org.apache.inlong.manager.pojo.transform.joiner.TemporalJoinerDefinition;
 import 
org.apache.inlong.manager.pojo.transform.replacer.StringReplacerDefinition;
 import org.apache.inlong.manager.pojo.transform.splitter.SplitterDefinition;
 
@@ -53,11 +56,12 @@ public class StreamParseUtils {
 
     public static TransformDefinition parseTransformDefinition(String 
transformDefinition,
             TransformType transformType) {
+        JsonObject joinerJson = GSON.fromJson(transformDefinition, 
JsonObject.class);
         switch (transformType) {
             case FILTER:
                 return GSON.fromJson(transformDefinition, 
FilterDefinition.class);
             case JOINER:
-                return parseJoinerDefinition(transformDefinition);
+                return parseJoinerDefinition(transformDefinition, joinerJson);
             case SPLITTER:
                 return GSON.fromJson(transformDefinition, 
SplitterDefinition.class);
             case DE_DUPLICATION:
@@ -66,19 +70,52 @@ public class StreamParseUtils {
                 return GSON.fromJson(transformDefinition, 
StringReplacerDefinition.class);
             case ENCRYPT:
                 return GSON.fromJson(transformDefinition, 
EncryptDefinition.class);
+            case LOOKUP_JOINER:
+                return parseLookupJoinerDefinition(transformDefinition, 
joinerJson);
+            case TEMPORAL_JOINER:
+                return parseTemporalJoinerDefinition(transformDefinition, 
joinerJson);
+            case INTERVAL_JOINER:
+                return parseIntervalJoinerDefinition(transformDefinition, 
joinerJson);
             default:
                 throw new IllegalArgumentException(String.format("Unsupported 
transformType for %s", transformType));
         }
     }
 
-    public static JoinerDefinition parseJoinerDefinition(String 
transformDefinition) {
+    public static JoinerDefinition parseJoinerDefinition(String 
transformDefinition, JsonObject joinerJson) {
         JoinerDefinition joinerDefinition = GSON.fromJson(transformDefinition, 
JoinerDefinition.class);
-        JsonObject joinerJson = GSON.fromJson(transformDefinition, 
JsonObject.class);
-        JsonObject leftNode = joinerJson.getAsJsonObject(LEFT_NODE);
-        StreamNode leftStreamNode = parseNode(leftNode);
+        StreamNode leftStreamNode = 
parseNode(joinerJson.getAsJsonObject(LEFT_NODE));
+        joinerDefinition.setLeftNode(leftStreamNode);
+        StreamNode rightStreamNode = 
parseNode(joinerJson.getAsJsonObject(RIGHT_NODE));
+        joinerDefinition.setRightNode(rightStreamNode);
+        return joinerDefinition;
+    }
+
+    public static LookUpJoinerDefinition parseLookupJoinerDefinition(String 
transformDefinition,
+            JsonObject joinerJson) {
+        LookUpJoinerDefinition joinerDefinition = 
GSON.fromJson(transformDefinition, LookUpJoinerDefinition.class);
+        StreamNode leftStreamNode = 
parseNode(joinerJson.getAsJsonObject(LEFT_NODE));
+        joinerDefinition.setLeftNode(leftStreamNode);
+        StreamNode rightStreamNode = 
parseNode(joinerJson.getAsJsonObject(RIGHT_NODE));
+        joinerDefinition.setRightNode(rightStreamNode);
+        return joinerDefinition;
+    }
+
+    public static TemporalJoinerDefinition 
parseTemporalJoinerDefinition(String transformDefinition,
+            JsonObject joinerJson) {
+        TemporalJoinerDefinition joinerDefinition = 
GSON.fromJson(transformDefinition, TemporalJoinerDefinition.class);
+        StreamNode leftStreamNode = 
parseNode(joinerJson.getAsJsonObject(LEFT_NODE));
+        joinerDefinition.setLeftNode(leftStreamNode);
+        StreamNode rightStreamNode = 
parseNode(joinerJson.getAsJsonObject(RIGHT_NODE));
+        joinerDefinition.setRightNode(rightStreamNode);
+        return joinerDefinition;
+    }
+
+    public static IntervalJoinerDefinition 
parseIntervalJoinerDefinition(String transformDefinition,
+            JsonObject joinerJson) {
+        IntervalJoinerDefinition joinerDefinition = 
GSON.fromJson(transformDefinition, IntervalJoinerDefinition.class);
+        StreamNode leftStreamNode = 
parseNode(joinerJson.getAsJsonObject(LEFT_NODE));
         joinerDefinition.setLeftNode(leftStreamNode);
-        JsonObject rightNode = joinerJson.getAsJsonObject("rightNode");
-        StreamNode rightStreamNode = parseNode(rightNode);
+        StreamNode rightStreamNode = 
parseNode(joinerJson.getAsJsonObject(RIGHT_NODE));
         joinerDefinition.setRightNode(rightStreamNode);
         return joinerDefinition;
     }
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/redis/RedisLookupOptions.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/redis/RedisLookupOptions.java
new file mode 100644
index 000000000..e3d7d6c46
--- /dev/null
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/redis/RedisLookupOptions.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.source.redis;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import lombok.ToString;
+
+/**
+ * Lookup options
+ */
+
+@Data
+@AllArgsConstructor
+@NoArgsConstructor
+@ToString(callSuper = true)
+@ApiModel(value = "Redis Lookup options")
+public class RedisLookupOptions {
+
+    /**
+     * Lookup cache max rows
+     */
+    @ApiModelProperty("Lookup cache max rows")
+    private Long lookupCacheMaxRows;
+    /**
+     * Lookup cache ttl
+     */
+    @ApiModelProperty("Lookup cache ttl")
+    private Long lookupCacheTtl;
+    /**
+     * Lookup max retries
+     */
+    @ApiModelProperty("Lookup max retries")
+    private Integer lookupMaxRetries;
+    /**
+     * Lookup async
+     */
+    @ApiModelProperty("Lookup async")
+    private Boolean lookupAsync;
+
+}
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/redis/RedisSource.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/redis/RedisSource.java
index cea322b5e..a91c28b85 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/redis/RedisSource.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/redis/RedisSource.java
@@ -20,7 +20,6 @@ package org.apache.inlong.manager.pojo.source.redis;
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
 import lombok.AllArgsConstructor;
-import lombok.Builder;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.ToString;
@@ -43,69 +42,59 @@ import org.apache.inlong.manager.pojo.source.StreamSource;
 @JsonTypeDefine(value = SourceType.REDIS)
 public class RedisSource extends StreamSource {
 
-    @ApiModelProperty("Username of the redis server")
-    private String username;
+    @ApiModelProperty("Redis primaryKey")
+    private String primaryKey;
 
-    @ApiModelProperty("Password of the redis server")
-    private String password;
+    @ApiModelProperty("Redis host")
+    private String host;
 
-    @ApiModelProperty("Hostname of the redis server")
-    private String hostname;
+    @ApiModelProperty("Redis port")
+    private Integer port;
 
-    @ApiModelProperty("Port of the redis server")
-    @Builder.Default
-    private Integer port = 6379;
+    @ApiModelProperty("Redis username")
+    private String username;
 
-    @ApiModelProperty("Primary key")
-    private String primaryKey;
+    @ApiModelProperty("Redis password")
+    private String password;
 
-    @ApiModelProperty("Redis command, supports: hget, get, zscore, zrevrank")
-    private String redisCommand;
+    @ApiModelProperty("Redis database")
+    private Integer database;
 
-    @ApiModelProperty("Redis deploy mode, supports: standalone, cluster, 
sentinel")
+    @ApiModelProperty("Redis deploy Mode(standalone/cluster/sentinel)")
     private String redisMode;
 
-    @ApiModelProperty("Cluster node infos only used for redis cluster deploy 
mode")
-    private String clusterNodes;
-
-    @ApiModelProperty("Master name only used for redis sentinel deploy mode")
-    private String masterName;
-
-    @ApiModelProperty("Sentinels info only used for redis sentinel deploy 
mode")
-    private String sentinelsInfo;
+    @ApiModelProperty("supportted in 
Sort-connector-redis(hget/get/zscore/zrevrank)")
+    private String command;
 
-    @ApiModelProperty("Additional key only used for hash/Sorted-set data type")
+    @ApiModelProperty("The additional key connect to redis only used for 
[Hash|Sorted-Set] data type")
     private String additionalKey;
 
-    @ApiModelProperty("Database number connect to redis for redis 
standalone/sentinel deploy modes")
-    private Integer database;
-
-    @ApiModelProperty("Timeout value of connect to redis")
+    @ApiModelProperty("The timeout connect to redis")
     private Integer timeout;
 
-    @ApiModelProperty("Timeout value of read data from redis")
+    @ApiModelProperty("The soTimeout connect to redis")
     private Integer soTimeout;
 
-    @ApiModelProperty("Max connection number to redis")
+    @ApiModelProperty("The maxTotal connect to redis")
     private Integer maxTotal;
 
-    @ApiModelProperty("Max free connection number")
+    @ApiModelProperty("The maxIdle connect to redis")
     private Integer maxIdle;
 
-    @ApiModelProperty("Min free connection number")
+    @ApiModelProperty("The minIdle connect to redis")
     private Integer minIdle;
 
-    @ApiModelProperty("Lookup Async")
-    private Boolean lookupAsync;
+    @ApiModelProperty("The lookup options for connector redis")
+    private RedisLookupOptions lookupOptions;
 
-    @ApiModelProperty("Lookup cache max rows")
-    private Long lookupCacheMaxRows;
+    @ApiModelProperty("The masterName for connector redis")
+    private String masterName;
 
-    @ApiModelProperty("Lookup cache ttl")
-    private Long lookupCacheTtl;
+    @ApiModelProperty("The sentinelsInfo for connector redis")
+    private String sentinelsInfo;
 
-    @ApiModelProperty("Lookup max retry times")
-    private Integer lookupMaxRetries;
+    @ApiModelProperty("The clusterNodes for connector redis")
+    private String clusterNodes;
 
     public RedisSource() {
         this.setSourceType(SourceType.REDIS);
@@ -115,4 +104,5 @@ public class RedisSource extends StreamSource {
     public SourceRequest genSourceRequest() {
         return CommonBeanUtils.copyProperties(this, RedisSourceRequest::new);
     }
+
 }
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/redis/RedisSourceDTO.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/redis/RedisSourceDTO.java
index 89facbbd6..d3704ae24 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/redis/RedisSourceDTO.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/redis/RedisSourceDTO.java
@@ -17,6 +17,8 @@
 
 package org.apache.inlong.manager.pojo.source.redis;
 
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import io.swagger.annotations.ApiModelProperty;
 import lombok.AllArgsConstructor;
 import lombok.Builder;
@@ -24,10 +26,8 @@ import lombok.Data;
 import lombok.NoArgsConstructor;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
-import org.apache.inlong.manager.common.util.JsonUtils;
 
 import javax.validation.constraints.NotNull;
-import java.util.Map;
 
 /**
  * redis source info
@@ -38,108 +38,97 @@ import java.util.Map;
 @AllArgsConstructor
 public class RedisSourceDTO {
 
-    @ApiModelProperty("Username of the redis server")
-    private String username;
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
 
-    @ApiModelProperty("Password of the redis server")
-    private String password;
+    @ApiModelProperty("Redis primaryKey")
+    private String primaryKey;
 
-    @ApiModelProperty("Hostname of the redis server")
-    private String hostname;
+    @ApiModelProperty("Redis host")
+    private String host;
 
-    @ApiModelProperty("Port of the redis server")
+    @ApiModelProperty("Redis port")
     private Integer port;
 
-    @ApiModelProperty("Primary key")
-    private String primaryKey;
-
-    @ApiModelProperty("Redis command, supports: hget, get, zscore, zrevrank")
-    private String redisCommand;
+    @ApiModelProperty("Redis username")
+    private String username;
 
-    @ApiModelProperty("Redis deploy mode, supports: standalone, cluster, 
sentinel")
-    private String redisMode;
+    @ApiModelProperty("Redis password")
+    private String password;
 
-    @ApiModelProperty("Cluster node infos only used for redis cluster deploy 
mode")
-    private String clusterNodes;
+    @ApiModelProperty("Redis database")
+    private Integer database;
 
-    @ApiModelProperty("Master name only used for redis sentinel deploy mode")
-    private String masterName;
+    @ApiModelProperty("Redis deploy Mode(standalone/cluster/sentinel)")
+    private String redisMode;
 
-    @ApiModelProperty("Sentinels info only used for redis sentinel deploy 
mode")
-    private String sentinelsInfo;
+    @ApiModelProperty("supportted in 
Sort-connector-redis(hget/get/zscore/zrevrank)")
+    private String command;
 
-    @ApiModelProperty("Additional key only used for hash/Sorted-set data type")
+    @ApiModelProperty("The additional key connect to redis only used for 
[Hash|Sorted-Set] data type")
     private String additionalKey;
 
-    @ApiModelProperty("Database number connect to redis for redis 
standalone/sentinel deploy modes")
-    private Integer database;
-
-    @ApiModelProperty("Timeout value of connect to redis")
+    @ApiModelProperty("The timeout connect to redis")
     private Integer timeout;
 
-    @ApiModelProperty("Timeout value of read data from redis")
+    @ApiModelProperty("The soTimeout connect to redis")
     private Integer soTimeout;
 
-    @ApiModelProperty("Max connection number to redis")
+    @ApiModelProperty("The maxTotal connect to redis")
     private Integer maxTotal;
 
-    @ApiModelProperty("Max free connection number")
+    @ApiModelProperty("The maxIdle connect to redis")
     private Integer maxIdle;
 
-    @ApiModelProperty("Min free connection number")
+    @ApiModelProperty("The minIdle connect to redis")
     private Integer minIdle;
 
-    @ApiModelProperty("Lookup cache max rows")
-    private Long lookupCacheMaxRows;
+    @ApiModelProperty("The lookup options for connector redis")
+    private RedisLookupOptions lookupOptions;
 
-    @ApiModelProperty("Lookup cache ttl")
-    private Long lookupCacheTtl;
-
-    @ApiModelProperty("Lookup max retry times")
-    private Integer lookupMaxRetries;
+    @ApiModelProperty("The masterName for connector redis")
+    private String masterName;
 
-    @ApiModelProperty("Lookup Async")
-    private Boolean lookupAsync;
+    @ApiModelProperty("The sentinelsInfo for connector redis")
+    private String sentinelsInfo;
 
-    @ApiModelProperty("Properties for redis")
-    private Map<String, Object> properties;
+    @ApiModelProperty("The clusterNodes for connector redis")
+    private String clusterNodes;
 
     /**
-     * Get the dto instance from request
+     * Get the dto instance from the request
      */
     public static RedisSourceDTO getFromRequest(RedisSourceRequest request) {
         return RedisSourceDTO.builder()
+                .primaryKey(request.getPrimaryKey())
+                .host(request.getHost())
+                .port(request.getPort())
                 .username(request.getUsername())
                 .password(request.getPassword())
-                .hostname(request.getHostname())
-                .port(request.getPort())
-                .primaryKey(request.getPrimaryKey())
-                .redisCommand(request.getRedisCommand())
+                .database(request.getDatabase())
                 .redisMode(request.getRedisMode())
-                .clusterNodes(request.getClusterNodes())
-                .masterName(request.getMasterName())
-                .sentinelsInfo(request.getSentinelsInfo())
+                .command(request.getCommand())
                 .additionalKey(request.getAdditionalKey())
-                .database(request.getDatabase())
                 .timeout(request.getTimeout())
                 .soTimeout(request.getSoTimeout())
                 .maxTotal(request.getMaxTotal())
                 .maxIdle(request.getMaxIdle())
                 .minIdle(request.getMinIdle())
-                .lookupCacheMaxRows(request.getLookupCacheMaxRows())
-                .lookupCacheTtl(request.getLookupCacheTtl())
-                .lookupMaxRetries(request.getLookupMaxRetries())
-                .lookupAsync(request.getLookupAsync())
-                .properties(request.getProperties())
+                .lookupOptions(request.getLookupOptions())
+                .masterName(request.getMasterName())
+                .sentinelsInfo(request.getSentinelsInfo())
+                .clusterNodes(request.getClusterNodes())
                 .build();
     }
 
+    /**
+     * Get the dto instance from the JSON string
+     */
     public static RedisSourceDTO getFromJson(@NotNull String extParams) {
         try {
-            return JsonUtils.parseObject(extParams, RedisSourceDTO.class);
+            
OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, 
false);
+            return OBJECT_MAPPER.readValue(extParams, RedisSourceDTO.class);
         } catch (Exception e) {
-            throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT,
-                    String.format("parse extParams of RedisSource failure: 
%s", e.getMessage()));
+            throw new 
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage());
         }
     }
 
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/redis/RedisSourceRequest.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/redis/RedisSourceRequest.java
index b53c75725..c5af1e78b 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/redis/RedisSourceRequest.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/redis/RedisSourceRequest.java
@@ -36,68 +36,59 @@ import org.apache.inlong.manager.pojo.source.SourceRequest;
 @JsonTypeDefine(value = SourceType.REDIS)
 public class RedisSourceRequest extends SourceRequest {
 
-    @ApiModelProperty("Username of the redis server")
-    private String username;
+    @ApiModelProperty("Redis primaryKey")
+    private String primaryKey;
 
-    @ApiModelProperty("Password of the redis server")
-    private String password;
+    @ApiModelProperty("Redis host")
+    private String host;
 
-    @ApiModelProperty("Hostname of the redis server")
-    private String hostname;
+    @ApiModelProperty("Redis port")
+    private Integer port;
 
-    @ApiModelProperty("Port of the redis server")
-    private Integer port = 6379;
+    @ApiModelProperty("Redis username")
+    private String username;
 
-    @ApiModelProperty("Primary key")
-    private String primaryKey;
+    @ApiModelProperty("Redis password")
+    private String password;
 
-    @ApiModelProperty("Redis command, supports: hget, get, zscore, zrevrank")
-    private String redisCommand;
+    @ApiModelProperty("Redis database")
+    private Integer database;
 
-    @ApiModelProperty("Redis deploy mode, supports: standalone, cluster, 
sentinel")
+    @ApiModelProperty("Redis deploy Mode(standalone/cluster/sentinel)")
     private String redisMode;
 
-    @ApiModelProperty("Cluster node infos only used for redis cluster deploy 
mode")
-    private String clusterNodes;
+    @ApiModelProperty("supportted in 
Sort-connector-redis(hget/get/zscore/zrevrank)")
+    private String command;
 
-    @ApiModelProperty("Master name only used for redis sentinel deploy mode")
-    private String masterName;
-
-    @ApiModelProperty("Sentinels info only used for redis sentinel deploy 
mode")
-    private String sentinelsInfo;
-
-    @ApiModelProperty("Additional key only used for hash/Sorted-set data type")
+    @ApiModelProperty("The additional key connect to redis only used for 
[Hash|Sorted-Set] data type")
     private String additionalKey;
 
-    @ApiModelProperty("Database number connect to redis for redis 
standalone/sentinel deploy modes")
-    private Integer database;
-
-    @ApiModelProperty("Timeout value of connect to redis")
+    @ApiModelProperty("The timeout connect to redis")
     private Integer timeout;
 
-    @ApiModelProperty("Timeout value of read data from redis")
+    @ApiModelProperty("The soTimeout connect to redis")
     private Integer soTimeout;
 
-    @ApiModelProperty("Max connection number to redis")
+    @ApiModelProperty("The maxTotal connect to redis")
     private Integer maxTotal;
 
-    @ApiModelProperty("Max free connection number")
+    @ApiModelProperty("The maxIdle connect to redis")
     private Integer maxIdle;
 
-    @ApiModelProperty("Min free connection number")
+    @ApiModelProperty("The minIdle connect to redis")
     private Integer minIdle;
 
-    @ApiModelProperty("Lookup Async")
-    private Boolean lookupAsync;
+    @ApiModelProperty("The lookup options for connector redis")
+    private RedisLookupOptions lookupOptions;
 
-    @ApiModelProperty("Lookup cache max rows")
-    private Long lookupCacheMaxRows;
+    @ApiModelProperty("The masterName for connector redis")
+    private String masterName;
 
-    @ApiModelProperty("Lookup cache ttl")
-    private Long lookupCacheTtl;
+    @ApiModelProperty("The sentinelsInfo for connector redis")
+    private String sentinelsInfo;
 
-    @ApiModelProperty("Lookup max retry times")
-    private Integer lookupMaxRetries;
+    @ApiModelProperty("The clusterNodes for connector redis")
+    private String clusterNodes;
 
     public RedisSourceRequest() {
         this.setSourceType(SourceType.REDIS);
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/transform/joiner/IntervalJoinerDefinition.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/transform/joiner/IntervalJoinerDefinition.java
new file mode 100644
index 000000000..d38b1f01d
--- /dev/null
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/transform/joiner/IntervalJoinerDefinition.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.transform.joiner;
+
+import com.fasterxml.jackson.annotation.JsonFormat;
+import lombok.Builder;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import org.apache.inlong.manager.common.enums.TransformType;
+import org.apache.inlong.manager.pojo.stream.StreamField;
+import org.apache.inlong.manager.pojo.stream.StreamNode;
+import org.apache.inlong.manager.pojo.transform.TransformDefinition;
+
+import java.util.List;
+
+/**
+ * A class to define operations to join two streamNode in one with relation 
defined.
+ * Returns a simple Cartesian product restricted by the join condition and a 
time constraint
+ */
+@Data
+@EqualsAndHashCode(callSuper = true)
+@Builder
+public class IntervalJoinerDefinition extends TransformDefinition {
+
+    public IntervalJoinerDefinition(StreamNode leftNode,
+            StreamNode rightNode,
+            List<StreamField> leftJoinFields,
+            List<StreamField> rightJoinFields,
+            String leftTimeColumn,
+            String rightTimeColumn,
+            String forwardInterval,
+            String backwardInterval,
+            TimeUnit forwardIntervalUnit,
+            TimeUnit backwardIntervalUnit) {
+        this.transformType = TransformType.INTERVAL_JOINER;
+        this.leftNode = leftNode;
+        this.rightNode = rightNode;
+        this.leftJoinFields = leftJoinFields;
+        this.rightJoinFields = rightJoinFields;
+        this.leftTimeColumn = leftTimeColumn;
+        this.rightTimeColumn = rightTimeColumn;
+        this.forwardInterval = forwardInterval;
+        this.backwardInterval = backwardInterval;
+        this.forwardIntervalUnit = forwardIntervalUnit;
+        this.backwardIntervalUnit = backwardIntervalUnit;
+    }
+
+    /**
+     * Left node for join
+     */
+    private StreamNode leftNode;
+
+    /**
+     * Right node for join
+     */
+    private StreamNode rightNode;
+
+    /**
+     * Join streamFields from left node
+     */
+    private List<StreamField> leftJoinFields;
+
+    /**
+     * Join streamFields from right node
+     */
+    private List<StreamField> rightJoinFields;
+
+    /**
+     * Left table time column
+     */
+    private String leftTimeColumn;
+
+    /**
+     * Right table time column
+     */
+    private String rightTimeColumn;
+
+    /**
+     * Forward interval time (seconds/minutes/hours)
+     */
+    private String forwardInterval;
+
+    /**
+     * Backward interval time (seconds/minutes/hours)
+     */
+    private String backwardInterval;
+
+    /**
+     * Forward interval time (seconds/minutes/hours)
+     */
+    private TimeUnit forwardIntervalUnit;
+
+    /**
+     * Backward interval time (seconds/minutes/hours)
+     */
+    private TimeUnit backwardIntervalUnit;
+
+    @JsonFormat
+    public enum TimeUnit {
+        /**
+         * Time unit for second
+         */
+        SECOND,
+        /**
+         * Time unit for minute
+         */
+        MINUTE,
+        /**
+         * Time unit for hour
+         */
+        HOUR
+    }
+
+}
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/transform/joiner/LookUpJoinerDefinition.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/transform/joiner/LookUpJoinerDefinition.java
new file mode 100644
index 000000000..9b52db3d3
--- /dev/null
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/transform/joiner/LookUpJoinerDefinition.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.transform.joiner;
+
+import lombok.Builder;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import org.apache.inlong.manager.common.enums.TransformType;
+import org.apache.inlong.manager.pojo.stream.StreamField;
+import org.apache.inlong.manager.pojo.stream.StreamNode;
+import org.apache.inlong.manager.pojo.transform.TransformDefinition;
+
+import java.util.List;
+
+/**
+ * A class to define operations to join two streamNode in one with relation 
defined.
+ * (A lookup join is typically used to enrich a table with data that is 
queried from an external system)
+ */
+@Data
+@EqualsAndHashCode(callSuper = true)
+@Builder
+public class LookUpJoinerDefinition extends TransformDefinition {
+
+    public LookUpJoinerDefinition(StreamNode leftNode,
+            StreamNode rightNode,
+            List<StreamField> leftJoinFields,
+            List<StreamField> rightJoinFields) {
+        this.transformType = TransformType.LOOKUP_JOINER;
+        this.leftNode = leftNode;
+        this.rightNode = rightNode;
+        this.leftJoinFields = leftJoinFields;
+        this.rightJoinFields = rightJoinFields;
+    }
+
+    /**
+     * Left node for join
+     */
+    private StreamNode leftNode;
+
+    /**
+     * Right node for join
+     */
+    private StreamNode rightNode;
+
+    /**
+     * Join streamFields from left node
+     */
+    private List<StreamField> leftJoinFields;
+
+    /**
+     * Join streamFields from right node
+     */
+    private List<StreamField> rightJoinFields;
+
+}
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/transform/joiner/TemporalJoinerDefinition.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/transform/joiner/TemporalJoinerDefinition.java
new file mode 100644
index 000000000..86301201d
--- /dev/null
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/transform/joiner/TemporalJoinerDefinition.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.transform.joiner;
+
+import com.fasterxml.jackson.annotation.JsonFormat;
+import lombok.Builder;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import org.apache.inlong.manager.common.enums.TransformType;
+import org.apache.inlong.manager.pojo.stream.StreamField;
+import org.apache.inlong.manager.pojo.stream.StreamNode;
+import org.apache.inlong.manager.pojo.transform.TransformDefinition;
+
+import java.util.List;
+
+/**
+ * A class to define operations to join two streamNode in one with relation 
defined.
+ * Temporal joins allow joining against a versioned table
+ */
+@Data
+@EqualsAndHashCode(callSuper = true)
+@Builder
+public class TemporalJoinerDefinition extends TransformDefinition {
+
+    public TemporalJoinerDefinition(TemporalJoinWay temporalJoinWay,
+            StreamNode leftNode,
+            String leftTimeColumn,
+            String rightTimeColumn,
+            StreamNode rightNode,
+            List<StreamField> leftJoinFields,
+            List<StreamField> rightJoinFields,
+            JoinerDefinition.JoinMode joinMode) {
+        this.transformType = TransformType.TEMPORAL_JOINER;
+        this.temporalJoinWay = temporalJoinWay;
+        this.leftNode = leftNode;
+        this.leftTimeColumn = leftTimeColumn;
+        this.rightTimeColumn = rightTimeColumn;
+        this.rightNode = rightNode;
+        this.leftJoinFields = leftJoinFields;
+        this.rightJoinFields = rightJoinFields;
+        this.joinMode = joinMode;
+    }
+
+    /**
+     * Temporal join time attribute
+     * (Event Time Temporal Join/Processing Time Temporal Join )
+     */
+    private TemporalJoinWay temporalJoinWay;
+
+    /**
+     * Left node for join
+     */
+    private StreamNode leftNode;
+
+    /**
+     * Left table time column
+     */
+    private String leftTimeColumn;
+
+    /**
+     * Right table time column
+     */
+    private String rightTimeColumn;
+
+    /**
+     * Right node for join
+     */
+    private StreamNode rightNode;
+
+    /**
+     * Join streamFields from left node
+     */
+    private List<StreamField> leftJoinFields;
+
+    /**
+     * Join streamFields from right node
+     */
+    private List<StreamField> rightJoinFields;
+
+    @JsonFormat
+    public enum JoinMode {
+        LEFT_JOIN, INNER_JOIN
+    }
+
+    /**
+     * Join mode for join transform
+     */
+    private JoinerDefinition.JoinMode joinMode;
+
+    /**
+     * The TimeUnit class defines an enumeration of time units
+     */
+    public enum TemporalJoinWay {
+        /**
+         * Temporal joins allow joining against a versioned table.
+         * This means a table can be enriched with changing metadata and 
retrieve its value at a certain point in time.
+         */
+        EVENT,
+        /**
+         * A processing time temporal table join uses a processing-time 
attribute to
+         * correlate rows to the latest version of a key in an external 
versioned table.
+         */
+        PROCESSING
+    }
+
+}
diff --git 
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/source/RedisSourceServiceTest.java
 
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/source/RedisSourceServiceTest.java
index 55c5ada3a..380b830dd 100644
--- 
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/source/RedisSourceServiceTest.java
+++ 
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/source/RedisSourceServiceTest.java
@@ -36,7 +36,7 @@ public class RedisSourceServiceTest extends ServiceBaseTest {
     private static final String hostname = "127.0.0.1";
     private static final Integer port = 6379;
     private static final String redisMode = "standalone";
-    private static final String redisCommand = "get";
+    private static final String command = "get";
     private final String sourceName = "stream_source_service_test";
 
     @Autowired
@@ -55,9 +55,9 @@ public class RedisSourceServiceTest extends ServiceBaseTest {
         sourceInfo.setInlongStreamId(GLOBAL_STREAM_ID);
         sourceInfo.setSourceName(sourceName);
         sourceInfo.setSourceType(SourceType.REDIS);
-        sourceInfo.setHostname(hostname);
+        sourceInfo.setHost(hostname);
         sourceInfo.setPort(port);
-        sourceInfo.setRedisCommand(redisCommand);
+        sourceInfo.setCommand(command);
         sourceInfo.setRedisMode(redisMode);
         return sourceService.save(sourceInfo, GLOBAL_OPERATOR);
     }

Reply via email to