This is an automated email from the ASF dual-hosted git repository.
zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 58e15df09 [INLONG-5613][Sort] Add interval join support for
FlinkSqlParser (#5614)
58e15df09 is described below
commit 58e15df09cbaf0f8d4b3c7eb33836f8e1a4f1f91
Author: Charles <[email protected]>
AuthorDate: Mon Aug 22 17:08:35 2022 +0800
[INLONG-5613][Sort] Add interval join support for FlinkSqlParser (#5614)
---
.../protocol/transformation/FieldRelation.java | 2 +-
.../protocol/transformation/FilterFunction.java | 6 +-
.../sort/protocol/transformation/Function.java | 17 +-
.../protocol/transformation/FunctionParam.java | 10 +-
.../transformation/function/AddFunction.java | 67 ++++++++
.../transformation/function/BetweenFunction.java | 84 +++++++++
.../CastFunction.java} | 52 +++---
.../transformation/function/IntervalFunction.java | 70 ++++++++
.../transformation/function/SubtractFunction.java | 67 ++++++++
...elation.java => InnerTemporalJoinRelation.java} | 6 +-
...tionRelation.java => IntervalJoinRelation.java} | 31 ++--
.../transformation/relation/JoinRelation.java | 5 +-
...ion.java => LeftOuterTemporalJoinRelation.java} | 4 +-
.../transformation/relation/NodeRelation.java | 5 +-
.../relation/TemporalJoinRelation.java | 4 +-
.../transformation/function/AddFunctionTest.java | 44 +++++
.../function/BetweenFunctionTest.java | 49 ++++++
.../function/IntervalFunctionTest.java | 41 +++++
.../function/SubtractFunctionTest.java | 44 +++++
.../relation/InnerTemporalJoinRelationTest.java | 56 ++++++
.../relation/IntervalJoinRelationTest.java | 55 ++++++
.../relation/LeftTemporalJoinRelationTest.java | 56 ++++++
.../inlong/sort/parser/impl/FlinkSqlParser.java | 24 ++-
.../parser/IntervalJoinRelationSqlParseTest.java | 187 +++++++++++++++++++++
.../MySqlTemporalJoinRelationSqlParseTest.java | 8 +-
.../RedisTemporalJoinRelationSqlParseTest.java | 8 +-
26 files changed, 938 insertions(+), 64 deletions(-)
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/FieldRelation.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/FieldRelation.java
index 1955dcb29..36ca76779 100644
---
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/FieldRelation.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/FieldRelation.java
@@ -45,7 +45,7 @@ public class FieldRelation {
@JsonCreator
public FieldRelation(@JsonProperty("inputField") FunctionParam inputField,
- @JsonProperty("outputField") FieldInfo outputField) {
+ @JsonProperty("outputField") FieldInfo outputField) {
this.inputField = inputField;
this.outputField = outputField;
}
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/FilterFunction.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/FilterFunction.java
index 69a145cc2..bae643d30 100644
---
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/FilterFunction.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/FilterFunction.java
@@ -19,6 +19,7 @@ package org.apache.inlong.sort.protocol.transformation;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonSubTypes;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeInfo;
+import org.apache.inlong.sort.protocol.transformation.function.BetweenFunction;
import
org.apache.inlong.sort.protocol.transformation.function.MultiValueFilterFunction;
import
org.apache.inlong.sort.protocol.transformation.function.SingleValueFilterFunction;
@@ -31,8 +32,9 @@ import
org.apache.inlong.sort.protocol.transformation.function.SingleValueFilter
property = "type")
@JsonSubTypes({
@JsonSubTypes.Type(value = SingleValueFilterFunction.class, name =
"singleValueFilter"),
- @JsonSubTypes.Type(value = MultiValueFilterFunction.class, name =
"multiValueFilter")}
-)
+ @JsonSubTypes.Type(value = MultiValueFilterFunction.class, name =
"multiValueFilter"),
+ @JsonSubTypes.Type(value = BetweenFunction.class, name =
"betweenFunction")
+})
public interface FilterFunction extends Function {
}
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/Function.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/Function.java
index 2ef8be7cb..42747e9b0 100644
---
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/Function.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/Function.java
@@ -20,10 +20,16 @@ package org.apache.inlong.sort.protocol.transformation;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonSubTypes;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeInfo;
+import org.apache.inlong.sort.protocol.transformation.function.AddFunction;
+import org.apache.inlong.sort.protocol.transformation.function.BetweenFunction;
import
org.apache.inlong.sort.protocol.transformation.function.CascadeFunctionWrapper;
+import org.apache.inlong.sort.protocol.transformation.function.CustomFunction;
+import org.apache.inlong.sort.protocol.transformation.function.EncryptFunction;
import org.apache.inlong.sort.protocol.transformation.function.HopEndFunction;
import org.apache.inlong.sort.protocol.transformation.function.HopFunction;
import
org.apache.inlong.sort.protocol.transformation.function.HopStartFunction;
+import
org.apache.inlong.sort.protocol.transformation.function.IntervalFunction;
+import
org.apache.inlong.sort.protocol.transformation.function.JsonGetterFunction;
import
org.apache.inlong.sort.protocol.transformation.function.MultiValueFilterFunction;
import
org.apache.inlong.sort.protocol.transformation.function.RegexpReplaceFirstFunction;
import
org.apache.inlong.sort.protocol.transformation.function.RegexpReplaceFunction;
@@ -32,6 +38,7 @@ import
org.apache.inlong.sort.protocol.transformation.function.SessionFunction;
import
org.apache.inlong.sort.protocol.transformation.function.SessionStartFunction;
import
org.apache.inlong.sort.protocol.transformation.function.SingleValueFilterFunction;
import
org.apache.inlong.sort.protocol.transformation.function.SplitIndexFunction;
+import
org.apache.inlong.sort.protocol.transformation.function.SubtractFunction;
import
org.apache.inlong.sort.protocol.transformation.function.TumbleEndFunction;
import org.apache.inlong.sort.protocol.transformation.function.TumbleFunction;
import
org.apache.inlong.sort.protocol.transformation.function.TumbleStartFunction;
@@ -58,7 +65,15 @@ import java.util.List;
@JsonSubTypes.Type(value = SplitIndexFunction.class, name =
"splitIndex"),
@JsonSubTypes.Type(value = RegexpReplaceFunction.class, name =
"regexpReplace"),
@JsonSubTypes.Type(value = RegexpReplaceFirstFunction.class, name =
"regexpReplaceFirst"),
- @JsonSubTypes.Type(value = CascadeFunctionWrapper.class, name =
"cascadeFunctionWrapper")
+ @JsonSubTypes.Type(value = CascadeFunctionWrapper.class, name =
"cascadeFunctionWrapper"),
+ @JsonSubTypes.Type(value = EncryptFunction.class, name = "encrypt"),
+ @JsonSubTypes.Type(value = JsonGetterFunction.class, name =
"jsonGetterFunction"),
+ @JsonSubTypes.Type(value = CustomFunction.class, name =
"customFunction"),
+ @JsonSubTypes.Type(value = BetweenFunction.class, name =
"betweenFunction"),
+ @JsonSubTypes.Type(value = IntervalFunction.class, name =
"intervalFunction"),
+ @JsonSubTypes.Type(value = AddFunction.class, name = "addFunction"),
+ @JsonSubTypes.Type(value = SubtractFunction.class, name =
"subtractFunction")
+
})
public interface Function extends FunctionParam {
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/FunctionParam.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/FunctionParam.java
index effd06200..ccda5f42a 100644
---
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/FunctionParam.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/FunctionParam.java
@@ -22,12 +22,15 @@ import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonSub
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeInfo;
import org.apache.inlong.sort.protocol.FieldInfo;
import org.apache.inlong.sort.protocol.MetaFieldInfo;
+import org.apache.inlong.sort.protocol.transformation.function.AddFunction;
+import org.apache.inlong.sort.protocol.transformation.function.BetweenFunction;
import
org.apache.inlong.sort.protocol.transformation.function.CascadeFunctionWrapper;
import org.apache.inlong.sort.protocol.transformation.function.CustomFunction;
import org.apache.inlong.sort.protocol.transformation.function.EncryptFunction;
import org.apache.inlong.sort.protocol.transformation.function.HopEndFunction;
import org.apache.inlong.sort.protocol.transformation.function.HopFunction;
import
org.apache.inlong.sort.protocol.transformation.function.HopStartFunction;
+import
org.apache.inlong.sort.protocol.transformation.function.IntervalFunction;
import
org.apache.inlong.sort.protocol.transformation.function.JsonGetterFunction;
import
org.apache.inlong.sort.protocol.transformation.function.MultiValueFilterFunction;
import
org.apache.inlong.sort.protocol.transformation.function.RegexpReplaceFirstFunction;
@@ -37,6 +40,7 @@ import
org.apache.inlong.sort.protocol.transformation.function.SessionFunction;
import
org.apache.inlong.sort.protocol.transformation.function.SessionStartFunction;
import
org.apache.inlong.sort.protocol.transformation.function.SingleValueFilterFunction;
import
org.apache.inlong.sort.protocol.transformation.function.SplitIndexFunction;
+import
org.apache.inlong.sort.protocol.transformation.function.SubtractFunction;
import
org.apache.inlong.sort.protocol.transformation.function.TumbleEndFunction;
import org.apache.inlong.sort.protocol.transformation.function.TumbleFunction;
import
org.apache.inlong.sort.protocol.transformation.function.TumbleStartFunction;
@@ -100,7 +104,11 @@ import
org.apache.inlong.sort.protocol.transformation.operator.OrOperator;
@JsonSubTypes.Type(value = CascadeFunctionWrapper.class, name =
"cascadeFunctionWrapper"),
@JsonSubTypes.Type(value = EncryptFunction.class, name = "encrypt"),
@JsonSubTypes.Type(value = JsonGetterFunction.class, name =
"jsonGetterFunction"),
- @JsonSubTypes.Type(value = CustomFunction.class, name =
"customFunction")
+ @JsonSubTypes.Type(value = CustomFunction.class, name =
"customFunction"),
+ @JsonSubTypes.Type(value = BetweenFunction.class, name =
"betweenFunction"),
+ @JsonSubTypes.Type(value = IntervalFunction.class, name =
"intervalFunction"),
+ @JsonSubTypes.Type(value = AddFunction.class, name = "addFunction"),
+ @JsonSubTypes.Type(value = SubtractFunction.class, name =
"subtractFunction")
})
public interface FunctionParam {
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/function/AddFunction.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/function/AddFunction.java
new file mode 100644
index 000000000..75025b583
--- /dev/null
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/function/AddFunction.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.transformation.function;
+
+import com.google.common.base.Preconditions;
+import lombok.Data;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.inlong.sort.protocol.transformation.Function;
+import org.apache.inlong.sort.protocol.transformation.FunctionParam;
+
+import javax.annotation.Nonnull;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * The function for add
+ */
+@JsonTypeName("addFunction")
+@Data
+public class AddFunction implements Function {
+
+ @Nonnull
+ @JsonProperty("leftField")
+ private final FunctionParam leftField;
+ @Nonnull
+ @JsonProperty("rightField")
+ private final FunctionParam rightField;
+
+ @JsonCreator
+ public AddFunction(@Nonnull @JsonProperty("leftField") FunctionParam
leftField,
+ @Nonnull @JsonProperty("rightField") FunctionParam rightField) {
+ this.leftField = Preconditions.checkNotNull(leftField, "leftField is
null");
+ this.rightField = Preconditions.checkNotNull(rightField, "rightField
is null");
+ }
+
+ @Override
+ public List<FunctionParam> getParams() {
+ return Arrays.asList(leftField, rightField);
+ }
+
+ @Override
+ public String getName() {
+ return "+";
+ }
+
+ @Override
+ public String format() {
+ return String.format("%s %s %s", leftField.format(), getName(),
rightField.format());
+ }
+}
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/function/BetweenFunction.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/function/BetweenFunction.java
new file mode 100644
index 000000000..df7cbbcd4
--- /dev/null
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/function/BetweenFunction.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.transformation.function;
+
+import com.google.common.base.Preconditions;
+import lombok.Data;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.inlong.sort.protocol.transformation.FilterFunction;
+import org.apache.inlong.sort.protocol.transformation.FunctionParam;
+import org.apache.inlong.sort.protocol.transformation.LogicOperator;
+import org.apache.inlong.sort.protocol.transformation.operator.EmptyOperator;
+
+import javax.annotation.Nonnull;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * The function for between
+ */
+@JsonTypeName("betweenFunction")
+@Data
+public class BetweenFunction implements FilterFunction {
+
+ @Nonnull
+ @JsonProperty("field")
+ private final FunctionParam field;
+ @Nonnull
+ @JsonProperty("start")
+ private final FunctionParam start;
+ @Nonnull
+ @JsonProperty("end")
+ private final FunctionParam end;
+ @Nonnull
+ @JsonProperty("logicOperator")
+ private final LogicOperator logicOperator;
+
+ @JsonCreator
+ public BetweenFunction(
+ @Nonnull @JsonProperty("logicOperator") LogicOperator
logicOperator,
+ @Nonnull @JsonProperty("field") FunctionParam field,
+ @Nonnull @JsonProperty("start") FunctionParam start,
+ @Nonnull @JsonProperty("end") FunctionParam end) {
+ this.field = Preconditions.checkNotNull(field, "field is null");
+ this.start = Preconditions.checkNotNull(start, "start is null");
+ this.end = Preconditions.checkNotNull(end, "end is null");
+ this.logicOperator = Preconditions.checkNotNull(logicOperator,
"logicOperator is null");
+ }
+
+ @Override
+ public List<FunctionParam> getParams() {
+ return Arrays.asList(logicOperator, field, start, end);
+ }
+
+ @Override
+ public String getName() {
+ return "BETWEEN";
+ }
+
+ @Override
+ public String format() {
+ String format = "%s %s %s %s AND %s";
+ if (logicOperator == EmptyOperator.getInstance()) {
+ format = "%s%s %s %s AND %s";
+ }
+ return String.format(format, logicOperator.format(), field.format(),
getName(), start.format(), end.format());
+ }
+}
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/FieldRelation.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/function/CastFunction.java
similarity index 51%
copy from
inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/FieldRelation.java
copy to
inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/function/CastFunction.java
index 1955dcb29..6dcd858c1 100644
---
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/FieldRelation.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/function/CastFunction.java
@@ -15,38 +15,50 @@
* limitations under the License.
*/
-package org.apache.inlong.sort.protocol.transformation;
+package org.apache.inlong.sort.protocol.transformation.function;
+import com.google.common.base.Preconditions;
import lombok.Data;
import lombok.NoArgsConstructor;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
-import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeInfo;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
-import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.transformation.ConstantParam;
+import org.apache.inlong.sort.protocol.transformation.Function;
+import org.apache.inlong.sort.protocol.transformation.FunctionParam;
+import java.util.Arrays;
+import java.util.List;
-/**
- * Defines the relation between fields from input to output field
- */
-@JsonTypeInfo(
- use = JsonTypeInfo.Id.NAME,
- include = JsonTypeInfo.As.PROPERTY,
- property = "type")
-@JsonTypeName("fieldRelation")
+@JsonTypeName("cast")
@Data
@NoArgsConstructor
-public class FieldRelation {
+public class CastFunction implements Function {
+
+ @JsonProperty("field")
+ private FunctionParam field;
- @JsonProperty("inputField")
- private FunctionParam inputField;
- @JsonProperty("outputField")
- private FieldInfo outputField;
+ private String type;
@JsonCreator
- public FieldRelation(@JsonProperty("inputField") FunctionParam inputField,
- @JsonProperty("outputField") FieldInfo outputField) {
- this.inputField = inputField;
- this.outputField = outputField;
+ public CastFunction(@JsonProperty("field") FunctionParam field,
+ @JsonProperty("type") String type) {
+ this.field = Preconditions.checkNotNull(field, "field is null");
+ this.type = Preconditions.checkNotNull(type, "type is null");
+ }
+
+ @Override
+ public List<FunctionParam> getParams() {
+ return Arrays.asList(field, new ConstantParam(type));
+ }
+
+ @Override
+ public String getName() {
+ return "CAST";
+ }
+
+ @Override
+ public String format() {
+ return String.format("%s(%s AS %s)", getName(), field.format(), type);
}
}
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/function/IntervalFunction.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/function/IntervalFunction.java
new file mode 100644
index 000000000..2c6897dca
--- /dev/null
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/function/IntervalFunction.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.transformation.function;
+
+import com.google.common.base.Preconditions;
+import lombok.Data;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.inlong.sort.protocol.transformation.Function;
+import org.apache.inlong.sort.protocol.transformation.FunctionParam;
+import org.apache.inlong.sort.protocol.transformation.StringConstantParam;
+import org.apache.inlong.sort.protocol.transformation.TimeUnitConstantParam;
+
+import javax.annotation.Nonnull;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * The function for interval
+ */
+@JsonTypeName("intervalFunction")
+@Data
+public class IntervalFunction implements Function {
+
+ @Nonnull
+ @JsonProperty("interval")
+ private final StringConstantParam interval;
+
+ @Nonnull
+ @JsonProperty("timeUnit")
+ private final TimeUnitConstantParam timeUnit;
+
+ @JsonCreator
+ public IntervalFunction(@Nonnull @JsonProperty("interval")
StringConstantParam interval,
+ @Nonnull @JsonProperty("timeUnit") TimeUnitConstantParam timeUnit)
{
+ this.interval = Preconditions.checkNotNull(interval, "interval is
null");
+ this.timeUnit = Preconditions.checkNotNull(timeUnit, "timeUnit is
null");
+ }
+
+ @Override
+ public List<FunctionParam> getParams() {
+ return Arrays.asList(interval, timeUnit);
+ }
+
+ @Override
+ public String getName() {
+ return "INTERVAL";
+ }
+
+ @Override
+ public String format() {
+ return String.format("%s %s %s", getName(), interval.format(),
timeUnit.format());
+ }
+}
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/function/SubtractFunction.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/function/SubtractFunction.java
new file mode 100644
index 000000000..24133695a
--- /dev/null
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/function/SubtractFunction.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.transformation.function;
+
+import com.google.common.base.Preconditions;
+import lombok.Data;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.inlong.sort.protocol.transformation.Function;
+import org.apache.inlong.sort.protocol.transformation.FunctionParam;
+
+import javax.annotation.Nonnull;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * The function for subtract
+ */
+@JsonTypeName("subtractFunction")
+@Data
+public class SubtractFunction implements Function {
+
+ @Nonnull
+ @JsonProperty("leftField")
+ private final FunctionParam leftField;
+ @Nonnull
+ @JsonProperty("rightField")
+ private final FunctionParam rightField;
+
+ @JsonCreator
+ public SubtractFunction(@Nonnull @JsonProperty("leftField") FunctionParam
leftField,
+ @Nonnull @JsonProperty("rightField") FunctionParam rightField) {
+ this.leftField = Preconditions.checkNotNull(leftField, "leftField is
null");
+ this.rightField = Preconditions.checkNotNull(rightField, "rightField
is null");
+ }
+
+ @Override
+ public List<FunctionParam> getParams() {
+ return Arrays.asList(leftField, rightField);
+ }
+
+ @Override
+ public String getName() {
+ return "-";
+ }
+
+ @Override
+ public String format() {
+ return String.format("%s %s %s", leftField.format(), getName(),
rightField.format());
+ }
+}
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/InnerTemporalJoinRelationRelation.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/InnerTemporalJoinRelation.java
similarity index 92%
copy from
inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/InnerTemporalJoinRelationRelation.java
copy to
inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/InnerTemporalJoinRelation.java
index 2b9b5782d..e1c283fcf 100644
---
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/InnerTemporalJoinRelationRelation.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/InnerTemporalJoinRelation.java
@@ -37,7 +37,7 @@ import java.util.Map;
@EqualsAndHashCode(callSuper = true)
@Data
@NoArgsConstructor
-public class InnerTemporalJoinRelationRelation extends TemporalJoinRelation {
+public class InnerTemporalJoinRelation extends TemporalJoinRelation {
/**
* Constructor
@@ -50,11 +50,11 @@ public class InnerTemporalJoinRelationRelation extends
TemporalJoinRelation {
* @param systemTime The system time for temporal join
*/
@JsonCreator
- public InnerTemporalJoinRelationRelation(
+ public InnerTemporalJoinRelation(
@JsonProperty("inputs") List<String> inputs,
@JsonProperty("outputs") List<String> outputs,
@JsonProperty("joinConditionMap") Map<String,
List<FilterFunction>> joinConditionMap,
- @Nullable @JsonProperty("systemTimeMap") FieldInfo systemTime) {
+ @Nullable @JsonProperty("systemTime") FieldInfo systemTime) {
super(inputs, outputs, joinConditionMap, systemTime);
}
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/InnerTemporalJoinRelationRelation.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/IntervalJoinRelation.java
similarity index 64%
rename from
inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/InnerTemporalJoinRelationRelation.java
rename to
inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/IntervalJoinRelation.java
index 2b9b5782d..e1a0d1153 100644
---
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/InnerTemporalJoinRelationRelation.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/IntervalJoinRelation.java
@@ -17,27 +17,25 @@
package org.apache.inlong.sort.protocol.transformation.relation;
+import com.google.common.base.Preconditions;
import lombok.Data;
import lombok.EqualsAndHashCode;
-import lombok.NoArgsConstructor;
-import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
-import org.apache.inlong.sort.protocol.FieldInfo;
import org.apache.inlong.sort.protocol.transformation.FilterFunction;
-import javax.annotation.Nullable;
+import java.util.LinkedHashMap;
import java.util.List;
-import java.util.Map;
/**
- * Inner temporal join relation
+ * This class defines the interval join relation.In interval join, the join
conditions is same as filters,
+ * and so we forbid the filters for interval join. And the same time,
+ * the joinConditionMap will be allowed to have only one value.
*/
-@JsonTypeName("innerTemporalJoin")
+@JsonTypeName("intervalJoin")
@EqualsAndHashCode(callSuper = true)
@Data
-@NoArgsConstructor
-public class InnerTemporalJoinRelationRelation extends TemporalJoinRelation {
+public class IntervalJoinRelation extends JoinRelation {
/**
* Constructor
@@ -47,19 +45,18 @@ public class InnerTemporalJoinRelationRelation extends
TemporalJoinRelation {
* @param joinConditionMap The joinConditionMap is a map of join conditions
* the key of joinConditionMap is the node id of join node
* the value of joinConditionMap is a list of join contidition
- * @param systemTime The system time for temporal join
*/
- @JsonCreator
- public InnerTemporalJoinRelationRelation(
- @JsonProperty("inputs") List<String> inputs,
+ public IntervalJoinRelation(@JsonProperty("inputs") List<String> inputs,
@JsonProperty("outputs") List<String> outputs,
- @JsonProperty("joinConditionMap") Map<String,
List<FilterFunction>> joinConditionMap,
- @Nullable @JsonProperty("systemTimeMap") FieldInfo systemTime) {
- super(inputs, outputs, joinConditionMap, systemTime);
+ @JsonProperty("joinConditionMap") LinkedHashMap<String,
List<FilterFunction>> joinConditionMap) {
+ super(inputs, outputs, joinConditionMap);
+ Preconditions.checkState(joinConditionMap.size() == 1,
+ String.format("The size of joinConditionMap must be one for
%s", this.getClass().getSimpleName()));
}
@Override
public String format() {
- return "INNER JOIN";
+ throw new UnsupportedOperationException(String.format("Format is not
supported for %s",
+ this.getClass().getSimpleName()));
}
}
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/JoinRelation.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/JoinRelation.java
index c98e2be30..8ec80c4ee 100644
---
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/JoinRelation.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/JoinRelation.java
@@ -41,8 +41,9 @@ import java.util.Map;
@JsonSubTypes.Type(value = InnerJoinNodeRelation.class, name =
"innerJoin"),
@JsonSubTypes.Type(value = LeftOuterJoinNodeRelation.class, name =
"leftOuterJoin"),
@JsonSubTypes.Type(value = RightOuterJoinNodeRelation.class, name =
"rightOutJoin"),
- @JsonSubTypes.Type(value = InnerTemporalJoinRelationRelation.class,
name = "innerTemporalJoin"),
- @JsonSubTypes.Type(value =
LeftOuterTemporalJoinRelationRelation.class, name = "leftOuterTemporalJoin")
+ @JsonSubTypes.Type(value = InnerTemporalJoinRelation.class, name =
"innerTemporalJoin"),
+ @JsonSubTypes.Type(value = LeftOuterTemporalJoinRelation.class, name =
"leftOuterTemporalJoin"),
+ @JsonSubTypes.Type(value = IntervalJoinRelation.class, name =
"intervalJoin")
})
@EqualsAndHashCode(callSuper = true)
@Data
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/LeftOuterTemporalJoinRelationRelation.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/LeftOuterTemporalJoinRelation.java
similarity index 94%
rename from
inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/LeftOuterTemporalJoinRelationRelation.java
rename to
inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/LeftOuterTemporalJoinRelation.java
index 2a873381e..ad61b93ab 100644
---
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/LeftOuterTemporalJoinRelationRelation.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/LeftOuterTemporalJoinRelation.java
@@ -37,7 +37,7 @@ import java.util.Map;
@EqualsAndHashCode(callSuper = true)
@Data
@NoArgsConstructor
-public class LeftOuterTemporalJoinRelationRelation extends
TemporalJoinRelation {
+public class LeftOuterTemporalJoinRelation extends TemporalJoinRelation {
/**
* LeftOuterTemporalJoin Constructor
@@ -50,7 +50,7 @@ public class LeftOuterTemporalJoinRelationRelation extends
TemporalJoinRelation
* @param systemTime The system time for temporal join
*/
@JsonCreator
- public LeftOuterTemporalJoinRelationRelation(
+ public LeftOuterTemporalJoinRelation(
@JsonProperty("inputs") List<String> inputs,
@JsonProperty("outputs") List<String> outputs,
@JsonProperty("joinConditionMap") Map<String,
List<FilterFunction>> joinConditionMap,
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/NodeRelation.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/NodeRelation.java
index 43a3e725c..bcfa7058f 100644
---
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/NodeRelation.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/NodeRelation.java
@@ -40,8 +40,9 @@ import java.util.List;
@JsonSubTypes.Type(value = InnerJoinNodeRelation.class, name =
"innerJoin"),
@JsonSubTypes.Type(value = LeftOuterJoinNodeRelation.class, name =
"leftOuterJoin"),
@JsonSubTypes.Type(value = RightOuterJoinNodeRelation.class, name =
"rightOutJoin"),
- @JsonSubTypes.Type(value = InnerTemporalJoinRelationRelation.class,
name = "innerTemporalJoin"),
- @JsonSubTypes.Type(value =
LeftOuterTemporalJoinRelationRelation.class, name = "leftOuterTemporalJoin"),
+ @JsonSubTypes.Type(value = InnerTemporalJoinRelation.class, name =
"innerTemporalJoin"),
+ @JsonSubTypes.Type(value = LeftOuterTemporalJoinRelation.class, name =
"leftOuterTemporalJoin"),
+ @JsonSubTypes.Type(value = IntervalJoinRelation.class, name =
"intervalJoin"),
@JsonSubTypes.Type(value = UnionNodeRelation.class, name = "union"),
@JsonSubTypes.Type(value = NodeRelation.class, name = "baseRelation")
})
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/TemporalJoinRelation.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/TemporalJoinRelation.java
index 6a4fc1725..f25dc4b06 100644
---
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/TemporalJoinRelation.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/TemporalJoinRelation.java
@@ -40,8 +40,8 @@ import java.util.Map;
include = JsonTypeInfo.As.PROPERTY,
property = "type")
@JsonSubTypes({
- @JsonSubTypes.Type(value = InnerTemporalJoinRelationRelation.class,
name = "innerTemporalJoin"),
- @JsonSubTypes.Type(value =
LeftOuterTemporalJoinRelationRelation.class, name = "leftOuterTemporalJoin")
+ @JsonSubTypes.Type(value = InnerTemporalJoinRelation.class, name =
"innerTemporalJoin"),
+ @JsonSubTypes.Type(value = LeftOuterTemporalJoinRelation.class, name =
"leftOuterTemporalJoin")
})
@EqualsAndHashCode(callSuper = true)
@Data
diff --git
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/AddFunctionTest.java
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/AddFunctionTest.java
new file mode 100644
index 000000000..344c2e8ac
--- /dev/null
+++
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/AddFunctionTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.transformation.function;
+
+import org.apache.inlong.sort.formats.common.TimestampFormatInfo;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.transformation.Function;
+import org.apache.inlong.sort.protocol.transformation.FunctionBaseTest;
+import org.apache.inlong.sort.protocol.transformation.StringConstantParam;
+import org.apache.inlong.sort.protocol.transformation.TimeUnitConstantParam;
+import
org.apache.inlong.sort.protocol.transformation.TimeUnitConstantParam.TimeUnit;
+
+/**
+ * Test for {@link AddFunction}
+ */
+public class AddFunctionTest extends FunctionBaseTest {
+
+ @Override
+ public Function getTestObject() {
+ return new AddFunction(new FieldInfo("event_time", new
TimestampFormatInfo()),
+ new IntervalFunction(new StringConstantParam("5"),
+ new TimeUnitConstantParam(TimeUnit.SECOND)));
+ }
+
+ @Override
+ public String getExpectFormat() {
+ return "`event_time` + INTERVAL '5' SECOND";
+ }
+}
diff --git
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/BetweenFunctionTest.java
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/BetweenFunctionTest.java
new file mode 100644
index 000000000..89518c7e8
--- /dev/null
+++
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/BetweenFunctionTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.transformation.function;
+
+import org.apache.inlong.sort.formats.common.TimestampFormatInfo;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.transformation.Function;
+import org.apache.inlong.sort.protocol.transformation.FunctionBaseTest;
+import org.apache.inlong.sort.protocol.transformation.StringConstantParam;
+import org.apache.inlong.sort.protocol.transformation.TimeUnitConstantParam;
+import
org.apache.inlong.sort.protocol.transformation.TimeUnitConstantParam.TimeUnit;
+import org.apache.inlong.sort.protocol.transformation.operator.EmptyOperator;
+
+/**
+ * Test for {@link BetweenFunction}
+ */
+public class BetweenFunctionTest extends FunctionBaseTest {
+
+ @Override
+ public Function getTestObject() {
+ return new BetweenFunction(EmptyOperator.getInstance(), new
FieldInfo("order_time", new TimestampFormatInfo()),
+ new SubtractFunction(new FieldInfo("update_time", new
TimestampFormatInfo()),
+ new IntervalFunction(new StringConstantParam("5"),
+ new TimeUnitConstantParam(TimeUnit.SECOND))),
+ new AddFunction(new FieldInfo("update_time", new
TimestampFormatInfo()),
+ new IntervalFunction(new StringConstantParam("5"),
+ new TimeUnitConstantParam(TimeUnit.SECOND))));
+ }
+
+ @Override
+ public String getExpectFormat() {
+ return "`order_time` BETWEEN `update_time` - INTERVAL '5' SECOND AND
`update_time` + INTERVAL '5' SECOND";
+ }
+}
diff --git
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/IntervalFunctionTest.java
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/IntervalFunctionTest.java
new file mode 100644
index 000000000..640d6d3b0
--- /dev/null
+++
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/IntervalFunctionTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.transformation.function;
+
+import org.apache.inlong.sort.protocol.transformation.Function;
+import org.apache.inlong.sort.protocol.transformation.FunctionBaseTest;
+import org.apache.inlong.sort.protocol.transformation.StringConstantParam;
+import org.apache.inlong.sort.protocol.transformation.TimeUnitConstantParam;
+import
org.apache.inlong.sort.protocol.transformation.TimeUnitConstantParam.TimeUnit;
+
+/**
+ * Test for {@link IntervalFunction}
+ */
+public class IntervalFunctionTest extends FunctionBaseTest {
+
+ @Override
+ public Function getTestObject() {
+ return new IntervalFunction(new StringConstantParam("5"),
+ new TimeUnitConstantParam(TimeUnit.SECOND));
+ }
+
+ @Override
+ public String getExpectFormat() {
+ return "INTERVAL '5' SECOND";
+ }
+}
diff --git
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/SubtractFunctionTest.java
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/SubtractFunctionTest.java
new file mode 100644
index 000000000..57a3eef16
--- /dev/null
+++
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/SubtractFunctionTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.transformation.function;
+
+import org.apache.inlong.sort.formats.common.TimestampFormatInfo;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.transformation.Function;
+import org.apache.inlong.sort.protocol.transformation.FunctionBaseTest;
+import org.apache.inlong.sort.protocol.transformation.StringConstantParam;
+import org.apache.inlong.sort.protocol.transformation.TimeUnitConstantParam;
+import
org.apache.inlong.sort.protocol.transformation.TimeUnitConstantParam.TimeUnit;
+
+/**
+ * Test for {@link SubtractFunction}
+ */
+public class SubtractFunctionTest extends FunctionBaseTest {
+
+ @Override
+ public Function getTestObject() {
+ return new SubtractFunction(new FieldInfo("event_time", new
TimestampFormatInfo()),
+ new IntervalFunction(new StringConstantParam("5"),
+ new TimeUnitConstantParam(TimeUnit.SECOND)));
+ }
+
+ @Override
+ public String getExpectFormat() {
+ return "`event_time` - INTERVAL '5' SECOND";
+ }
+}
diff --git
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/relation/InnerTemporalJoinRelationTest.java
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/relation/InnerTemporalJoinRelationTest.java
new file mode 100644
index 000000000..5109f874a
--- /dev/null
+++
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/relation/InnerTemporalJoinRelationTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.transformation.relation;
+
+import org.apache.inlong.sort.SerializeBaseTest;
+import org.apache.inlong.sort.formats.common.StringFormatInfo;
+import org.apache.inlong.sort.formats.common.TimestampFormatInfo;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.transformation.ConstantParam;
+import org.apache.inlong.sort.protocol.transformation.FilterFunction;
+import
org.apache.inlong.sort.protocol.transformation.function.SingleValueFilterFunction;
+import org.apache.inlong.sort.protocol.transformation.operator.AndOperator;
+import org.apache.inlong.sort.protocol.transformation.operator.EmptyOperator;
+import org.apache.inlong.sort.protocol.transformation.operator.EqualOperator;
+import
org.apache.inlong.sort.protocol.transformation.operator.NotEqualOperator;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+
+/**
+ * Tests for {@link InnerTemporalJoinRelation}
+ */
+public class InnerTemporalJoinRelationTest extends
SerializeBaseTest<InnerTemporalJoinRelation> {
+
+ @Override
+ public InnerTemporalJoinRelation getTestObject() {
+ LinkedHashMap<String, List<FilterFunction>> joinConditionMap = new
LinkedHashMap<>();
+ joinConditionMap.put("2", Arrays.asList(
+ new SingleValueFilterFunction(EmptyOperator.getInstance(),
+ new FieldInfo("name", "1", new StringFormatInfo()),
+ EqualOperator.getInstance(), new FieldInfo("name", "2",
+ new StringFormatInfo())),
+ new SingleValueFilterFunction(AndOperator.getInstance(),
+ new FieldInfo("name", "1", new StringFormatInfo()),
+ NotEqualOperator.getInstance(), new
ConstantParam("test"))));
+ return new InnerTemporalJoinRelation(Arrays.asList("1", "2", "3"),
+ Collections.singletonList("4"), joinConditionMap, new
FieldInfo("ts", new TimestampFormatInfo()));
+ }
+}
diff --git
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/relation/IntervalJoinRelationTest.java
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/relation/IntervalJoinRelationTest.java
new file mode 100644
index 000000000..9e365ae04
--- /dev/null
+++
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/relation/IntervalJoinRelationTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.transformation.relation;
+
+import org.apache.inlong.sort.SerializeBaseTest;
+import org.apache.inlong.sort.formats.common.StringFormatInfo;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.transformation.ConstantParam;
+import org.apache.inlong.sort.protocol.transformation.FilterFunction;
+import
org.apache.inlong.sort.protocol.transformation.function.SingleValueFilterFunction;
+import org.apache.inlong.sort.protocol.transformation.operator.AndOperator;
+import org.apache.inlong.sort.protocol.transformation.operator.EmptyOperator;
+import org.apache.inlong.sort.protocol.transformation.operator.EqualOperator;
+import
org.apache.inlong.sort.protocol.transformation.operator.NotEqualOperator;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+
+/**
+ * Tests for {@link IntervalJoinRelation}
+ */
+public class IntervalJoinRelationTest extends
SerializeBaseTest<IntervalJoinRelation> {
+
+ @Override
+ public IntervalJoinRelation getTestObject() {
+ LinkedHashMap<String, List<FilterFunction>> joinConditionMap = new
LinkedHashMap<>();
+ joinConditionMap.put("2", Arrays.asList(
+ new SingleValueFilterFunction(EmptyOperator.getInstance(),
+ new FieldInfo("name", "1", new StringFormatInfo()),
+ EqualOperator.getInstance(), new FieldInfo("name", "2",
+ new StringFormatInfo())),
+ new SingleValueFilterFunction(AndOperator.getInstance(),
+ new FieldInfo("name", "1", new StringFormatInfo()),
+ NotEqualOperator.getInstance(), new
ConstantParam("test"))));
+ return new IntervalJoinRelation(Arrays.asList("1", "2", "3"),
+ Collections.singletonList("4"), joinConditionMap);
+ }
+}
diff --git
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/relation/LeftTemporalJoinRelationTest.java
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/relation/LeftTemporalJoinRelationTest.java
new file mode 100644
index 000000000..1831aa903
--- /dev/null
+++
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/relation/LeftTemporalJoinRelationTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.transformation.relation;
+
+import org.apache.inlong.sort.SerializeBaseTest;
+import org.apache.inlong.sort.formats.common.StringFormatInfo;
+import org.apache.inlong.sort.formats.common.TimestampFormatInfo;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.transformation.ConstantParam;
+import org.apache.inlong.sort.protocol.transformation.FilterFunction;
+import
org.apache.inlong.sort.protocol.transformation.function.SingleValueFilterFunction;
+import org.apache.inlong.sort.protocol.transformation.operator.AndOperator;
+import org.apache.inlong.sort.protocol.transformation.operator.EmptyOperator;
+import org.apache.inlong.sort.protocol.transformation.operator.EqualOperator;
+import
org.apache.inlong.sort.protocol.transformation.operator.NotEqualOperator;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+
+/**
+ * Tests for {@link LeftOuterTemporalJoinRelation}
+ */
+public class LeftTemporalJoinRelationTest extends
SerializeBaseTest<LeftOuterTemporalJoinRelation> {
+
+ @Override
+ public LeftOuterTemporalJoinRelation getTestObject() {
+ LinkedHashMap<String, List<FilterFunction>> joinConditionMap = new
LinkedHashMap<>();
+ joinConditionMap.put("2", Arrays.asList(
+ new SingleValueFilterFunction(EmptyOperator.getInstance(),
+ new FieldInfo("name", "1", new StringFormatInfo()),
+ EqualOperator.getInstance(), new FieldInfo("name", "2",
+ new StringFormatInfo())),
+ new SingleValueFilterFunction(AndOperator.getInstance(),
+ new FieldInfo("name", "1", new StringFormatInfo()),
+ NotEqualOperator.getInstance(), new
ConstantParam("test"))));
+ return new LeftOuterTemporalJoinRelation(Arrays.asList("1", "2", "3"),
+ Collections.singletonList("4"), joinConditionMap, new
FieldInfo("ts", new TimestampFormatInfo()));
+ }
+}
diff --git
a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/FlinkSqlParser.java
b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/FlinkSqlParser.java
index faa7fdf71..02722a9c5 100644
---
a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/FlinkSqlParser.java
+++
b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/FlinkSqlParser.java
@@ -46,6 +46,7 @@ import
org.apache.inlong.sort.protocol.transformation.FieldRelation;
import org.apache.inlong.sort.protocol.transformation.FilterFunction;
import org.apache.inlong.sort.protocol.transformation.Function;
import org.apache.inlong.sort.protocol.transformation.FunctionParam;
+import
org.apache.inlong.sort.protocol.transformation.relation.IntervalJoinRelation;
import org.apache.inlong.sort.protocol.transformation.relation.JoinRelation;
import org.apache.inlong.sort.protocol.transformation.relation.NodeRelation;
import
org.apache.inlong.sort.protocol.transformation.relation.TemporalJoinRelation;
@@ -198,7 +199,6 @@ public class FlinkSqlParser implements Parser {
/**
* parse node relation
- * <p/>
* Here we only parse the output node in the relation,
* and the input node parsing is achieved by parsing the dependent node
parsing of the output node.
*
@@ -427,6 +427,15 @@ public class FlinkSqlParser implements Parser {
Map<String, List<FilterFunction>> conditionMap =
relation.getJoinConditionMap();
if (relation instanceof TemporalJoinRelation) {
parseTemporalJoin((TemporalJoinRelation) relation, nodeMap,
tableNameAliasMap, conditionMap, sb);
+ } else if (relation instanceof IntervalJoinRelation) {
+ Preconditions.checkState(filters == null || filters.isEmpty(),
+ String.format("filters must be empty for %s",
relation.getClass().getSimpleName()));
+ parseIntervalJoin((IntervalJoinRelation) relation, nodeMap,
tableNameAliasMap, sb);
+ List<FilterFunction> conditions =
conditionMap.values().stream().findFirst().orElse(null);
+ Preconditions.checkState(conditions != null &&
!conditions.isEmpty(),
+ String.format("Join conditions must no be empty for %s",
relation.getClass().getSimpleName()));
+ fillOutTableNameAlias(new ArrayList<>(conditions),
tableNameAliasMap);
+ parseFilterFields(FilterStrategy.RETAIN, conditions, sb);
} else {
parseRegularJoin(relation, nodeMap, tableNameAliasMap,
conditionMap, sb);
}
@@ -443,6 +452,15 @@ public class FlinkSqlParser implements Parser {
return sb.toString();
}
+ private void parseIntervalJoin(IntervalJoinRelation relation, Map<String,
Node> nodeMap,
+ Map<String, String> tableNameAliasMap, StringBuilder sb) {
+ for (int i = 1; i < relation.getInputs().size(); i++) {
+ String inputId = relation.getInputs().get(i);
+ sb.append(", ").append(nodeMap.get(inputId).genTableName())
+ .append(" ").append(tableNameAliasMap.get(inputId));
+ }
+ }
+
private void parseRegularJoin(JoinRelation relation, Map<String, Node>
nodeMap,
Map<String, String> tableNameAliasMap, Map<String,
List<FilterFunction>> conditionMap, StringBuilder sb) {
for (int i = 1; i < relation.getInputs().size(); i++) {
@@ -590,9 +608,9 @@ public class FlinkSqlParser implements Parser {
*/
private void parseFilterFields(FilterStrategy filterStrategy,
List<FilterFunction> filters, StringBuilder sb) {
if (filters != null && !filters.isEmpty()) {
- sb.append("\n WHERE ");
+ sb.append("\nWHERE ");
String subSql = StringUtils
-
.join(filters.stream().map(FunctionParam::format).collect(Collectors.toList()),
" ");
+
.join(filters.stream().map(FunctionParam::format).collect(Collectors.toList()),
"\n ");
if (filterStrategy == FilterStrategy.REMOVE) {
sb.append("not (").append(subSql).append(")");
} else {
diff --git
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/IntervalJoinRelationSqlParseTest.java
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/IntervalJoinRelationSqlParseTest.java
new file mode 100644
index 000000000..2fc4fa03d
--- /dev/null
+++
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/IntervalJoinRelationSqlParseTest.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.parser;
+
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.inlong.common.enums.MetaField;
+import org.apache.inlong.sort.formats.common.DecimalFormatInfo;
+import org.apache.inlong.sort.formats.common.LongFormatInfo;
+import org.apache.inlong.sort.formats.common.StringFormatInfo;
+import org.apache.inlong.sort.formats.common.TimestampFormatInfo;
+import org.apache.inlong.sort.parser.impl.FlinkSqlParser;
+import org.apache.inlong.sort.parser.result.ParseResult;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.GroupInfo;
+import org.apache.inlong.sort.protocol.MetaFieldInfo;
+import org.apache.inlong.sort.protocol.StreamInfo;
+import org.apache.inlong.sort.protocol.enums.KafkaScanStartupMode;
+import org.apache.inlong.sort.protocol.node.Node;
+import org.apache.inlong.sort.protocol.node.extract.KafkaExtractNode;
+import org.apache.inlong.sort.protocol.node.format.CanalJsonFormat;
+import org.apache.inlong.sort.protocol.node.format.JsonFormat;
+import org.apache.inlong.sort.protocol.node.load.KafkaLoadNode;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
+import org.apache.inlong.sort.protocol.transformation.FilterFunction;
+import org.apache.inlong.sort.protocol.transformation.StringConstantParam;
+import org.apache.inlong.sort.protocol.transformation.TimeUnitConstantParam;
+import
org.apache.inlong.sort.protocol.transformation.TimeUnitConstantParam.TimeUnit;
+import org.apache.inlong.sort.protocol.transformation.WatermarkField;
+import org.apache.inlong.sort.protocol.transformation.function.AddFunction;
+import org.apache.inlong.sort.protocol.transformation.function.BetweenFunction;
+import
org.apache.inlong.sort.protocol.transformation.function.IntervalFunction;
+import
org.apache.inlong.sort.protocol.transformation.function.SingleValueFilterFunction;
+import
org.apache.inlong.sort.protocol.transformation.function.SubtractFunction;
+import org.apache.inlong.sort.protocol.transformation.operator.AndOperator;
+import org.apache.inlong.sort.protocol.transformation.operator.EmptyOperator;
+import org.apache.inlong.sort.protocol.transformation.operator.EqualOperator;
+import
org.apache.inlong.sort.protocol.transformation.relation.IntervalJoinRelation;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Test for Interval join for {@link IntervalJoinRelation} {@link
FlinkSqlParser} with {@link KafkaExtractNode}
+ */
+public class IntervalJoinRelationSqlParseTest extends AbstractTestBase {
+
+ private KafkaExtractNode buildIntervalJoinLeftStream() {
+ List<FieldInfo> fields = Arrays.asList(new FieldInfo("id", new
LongFormatInfo()),
+ new FieldInfo("price", new DecimalFormatInfo(32, 2)),
+ new FieldInfo("currency", new StringFormatInfo()),
+ new FieldInfo("order_time", new TimestampFormatInfo(3)),
+ new MetaFieldInfo("proc_time", MetaField.PROCESS_TIME)
+ );
+ return new KafkaExtractNode("1", "kafka_input_1", fields,
+ new WatermarkField(new FieldInfo("order_time", new
TimestampFormatInfo(3))),
+ null, "orders", "localhost:9092",
+ new JsonFormat(), KafkaScanStartupMode.EARLIEST_OFFSET, null,
+ "groupId_1", null);
+ }
+
+ private KafkaExtractNode buildIntervalJoinRightStream() {
+ List<FieldInfo> fields = Arrays.asList(
+ new FieldInfo("conversion_rate", new DecimalFormatInfo(32, 2)),
+ new FieldInfo("currency", new StringFormatInfo()),
+ new FieldInfo("update_time", new TimestampFormatInfo(3)),
+ new MetaFieldInfo("proc_time", MetaField.PROCESS_TIME)
+ );
+ return new KafkaExtractNode("2", "kafka_input_2", fields,
+ new WatermarkField(new FieldInfo("update_time", new
TimestampFormatInfo(3))),
+ null, "currency_rates", "localhost:9092",
+ new JsonFormat(), KafkaScanStartupMode.EARLIEST_OFFSET, null,
+ "groupId_2", null);
+ }
+
+ private KafkaLoadNode buildKafkaLoadNode() {
+ List<FieldInfo> fields = Arrays.asList(new FieldInfo("id", new
LongFormatInfo()),
+ new FieldInfo("price", new DecimalFormatInfo(32, 2)),
+ new FieldInfo("currency", new StringFormatInfo()),
+ new FieldInfo("order_time", new TimestampFormatInfo(3)),
+ new FieldInfo("conversion_rate", new DecimalFormatInfo(32, 2))
+ );
+ List<FieldRelation> relations = Arrays.asList(
+ new FieldRelation(new FieldInfo("id", "1", new
LongFormatInfo()),
+ new FieldInfo("id", new LongFormatInfo())),
+ new FieldRelation(new FieldInfo("price", "1", new
DecimalFormatInfo(32, 2)),
+ new FieldInfo("price", new DecimalFormatInfo(32, 2))),
+ new FieldRelation(new FieldInfo("currency", "1", new
StringFormatInfo()),
+ new FieldInfo("currency", new StringFormatInfo())),
+ new FieldRelation(new FieldInfo("order_time", "1", new
TimestampFormatInfo(3)),
+ new FieldInfo("order_time", new
TimestampFormatInfo(3))),
+ new FieldRelation(new FieldInfo("conversion_rate", "2", new
DecimalFormatInfo(32, 2)),
+ new FieldInfo("conversion_rate", new
DecimalFormatInfo(32, 2)))
+ );
+ return new KafkaLoadNode("3", "kafka_output", fields, relations, null,
+ null, "orders_output", "localhost:9092", new CanalJsonFormat(),
+ null, null, null);
+ }
+
+ /**
+ * build node relation
+ *
+ * @param inputs extract node
+ * @param outputs load node
+ * @return node relation
+ */
+ private IntervalJoinRelation buildNodeRelation(List<Node> inputs,
List<Node> outputs) {
+ List<String> inputIds =
inputs.stream().map(Node::getId).collect(Collectors.toList());
+ List<String> outputIds =
outputs.stream().map(Node::getId).collect(Collectors.toList());
+ LinkedHashMap<String, List<FilterFunction>> conditionMap = new
LinkedHashMap<>();
+ conditionMap.put("2", Arrays.asList(
+ new SingleValueFilterFunction(
+ EmptyOperator.getInstance(),
+ new FieldInfo("currency", "1", new StringFormatInfo()),
+ EqualOperator.getInstance(),
+ new FieldInfo("currency", "2", new StringFormatInfo())
+ ),
+ new BetweenFunction(
+ AndOperator.getInstance(),
+ new FieldInfo("order_time", "1", new
TimestampFormatInfo()),
+ new SubtractFunction(new FieldInfo("update_time", "2",
new TimestampFormatInfo()),
+ new IntervalFunction(new
StringConstantParam("10"), new TimeUnitConstantParam(
+ TimeUnit.SECOND))),
+ new AddFunction(new FieldInfo("update_time", "2", new
TimestampFormatInfo()),
+ new IntervalFunction(new
StringConstantParam("5"), new TimeUnitConstantParam(
+ TimeUnit.SECOND)))
+ )
+ ));
+ return new IntervalJoinRelation(inputIds, outputIds, conditionMap);
+ }
+
+ /**
+ * Test inner temporal join with event time for extract is mysql {@link
KafkaExtractNode}
+ * and load is mysql {@link KafkaLoadNode}
+ *
+ * @throws Exception The exception may be thrown when executing
+ */
+ @Test
+ public void testIntervalJoinParse() throws Exception {
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+ env.enableCheckpointing(10000);
+ env.disableOperatorChaining();
+ EnvironmentSettings settings = EnvironmentSettings
+ .newInstance()
+ .useBlinkPlanner()
+ .inStreamingMode()
+ .build();
+ StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
settings);
+ Node leftStream = buildIntervalJoinLeftStream();
+ Node rightStream = buildIntervalJoinRightStream();
+ Node kafkaLoadNode = buildKafkaLoadNode();
+ StreamInfo streamInfo = new StreamInfo("1",
+ Arrays.asList(leftStream, rightStream, kafkaLoadNode),
+ Collections.singletonList(
+ buildNodeRelation(Arrays.asList(leftStream,
rightStream),
+ Collections.singletonList(kafkaLoadNode)))
+ );
+ GroupInfo groupInfo = new GroupInfo("1",
Collections.singletonList(streamInfo));
+ FlinkSqlParser parser = FlinkSqlParser.getInstance(tableEnv,
groupInfo);
+ ParseResult result = parser.parse();
+ Assert.assertTrue(result.tryExecute());
+ }
+}
diff --git
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/MySqlTemporalJoinRelationSqlParseTest.java
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/MySqlTemporalJoinRelationSqlParseTest.java
index 69453fd95..8fe23b450 100644
---
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/MySqlTemporalJoinRelationSqlParseTest.java
+++
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/MySqlTemporalJoinRelationSqlParseTest.java
@@ -46,8 +46,8 @@ import
org.apache.inlong.sort.protocol.transformation.WatermarkField;
import
org.apache.inlong.sort.protocol.transformation.function.SingleValueFilterFunction;
import org.apache.inlong.sort.protocol.transformation.operator.EmptyOperator;
import org.apache.inlong.sort.protocol.transformation.operator.EqualOperator;
-import
org.apache.inlong.sort.protocol.transformation.relation.InnerTemporalJoinRelationRelation;
-import
org.apache.inlong.sort.protocol.transformation.relation.LeftOuterTemporalJoinRelationRelation;
+import
org.apache.inlong.sort.protocol.transformation.relation.InnerTemporalJoinRelation;
+import
org.apache.inlong.sort.protocol.transformation.relation.LeftOuterTemporalJoinRelation;
import
org.apache.inlong.sort.protocol.transformation.relation.TemporalJoinRelation;
import org.junit.Assert;
import org.junit.Test;
@@ -146,9 +146,9 @@ public class MySqlTemporalJoinRelationSqlParseTest extends
AbstractTestBase {
new FieldInfo("currency", "1", new LongFormatInfo()),
EqualOperator.getInstance(), new FieldInfo("currency", "2",
new StringFormatInfo()))));
if (left) {
- return new LeftOuterTemporalJoinRelationRelation(inputIds,
outputIds, conditionMap, systemTime);
+ return new LeftOuterTemporalJoinRelation(inputIds, outputIds,
conditionMap, systemTime);
}
- return new InnerTemporalJoinRelationRelation(inputIds, outputIds,
conditionMap, systemTime);
+ return new InnerTemporalJoinRelation(inputIds, outputIds,
conditionMap, systemTime);
}
/**
diff --git
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/RedisTemporalJoinRelationSqlParseTest.java
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/RedisTemporalJoinRelationSqlParseTest.java
index 36b3977e5..fdb28ccf7 100644
---
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/RedisTemporalJoinRelationSqlParseTest.java
+++
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/RedisTemporalJoinRelationSqlParseTest.java
@@ -44,8 +44,8 @@ import
org.apache.inlong.sort.protocol.transformation.FilterFunction;
import
org.apache.inlong.sort.protocol.transformation.function.SingleValueFilterFunction;
import org.apache.inlong.sort.protocol.transformation.operator.EmptyOperator;
import org.apache.inlong.sort.protocol.transformation.operator.EqualOperator;
-import
org.apache.inlong.sort.protocol.transformation.relation.InnerTemporalJoinRelationRelation;
-import
org.apache.inlong.sort.protocol.transformation.relation.LeftOuterTemporalJoinRelationRelation;
+import
org.apache.inlong.sort.protocol.transformation.relation.InnerTemporalJoinRelation;
+import
org.apache.inlong.sort.protocol.transformation.relation.LeftOuterTemporalJoinRelation;
import
org.apache.inlong.sort.protocol.transformation.relation.TemporalJoinRelation;
import org.junit.Assert;
import org.junit.Test;
@@ -144,10 +144,10 @@ public class RedisTemporalJoinRelationSqlParseTest
extends AbstractTestBase {
new FieldInfo("id", "1", new LongFormatInfo()),
EqualOperator.getInstance(), new FieldInfo("k", "5", new
StringFormatInfo()))));
if (left) {
- return new LeftOuterTemporalJoinRelationRelation(inputIds,
outputIds, conditionMap,
+ return new LeftOuterTemporalJoinRelation(inputIds, outputIds,
conditionMap,
new FieldInfo("proc_time"));
}
- return new InnerTemporalJoinRelationRelation(inputIds, outputIds,
conditionMap, new FieldInfo("proc_time"));
+ return new InnerTemporalJoinRelation(inputIds, outputIds,
conditionMap, new FieldInfo("proc_time"));
}
/**