This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new a54c2e7 [FLINK-25282][table-planner][table-runtime] Move runtime code
from table-planner to table-runtime
a54c2e7 is described below
commit a54c2e75eb8f214552643adbdc2a5ce2abc2506c
Author: slinkydeveloper <[email protected]>
AuthorDate: Thu Dec 9 12:30:18 2021 +0100
[FLINK-25282][table-planner][table-runtime] Move runtime code from
table-planner to table-runtime
- Removes the dependency on SqlFunctions from Calcite
- Move DefaultWatermarkGeneratorSupplier to runtime and rename to
GeneratedWatermarkGeneratorSupplier
- Remove dependency on BuiltInMethod from Calcite for floor, ceil and abs
- Copy from Calcite json functions in SqlJsonUtils. Now jackson and
jsonpath are shipped by runtime.
- Move various Flink functions
This closes #18108.
---
.../apache/flink/table/functions/SqlLikeUtils.java | 24 +-
.../apache/flink/table/utils/DateTimeUtils.java | 89 ++++
flink-table/flink-table-planner/pom.xml | 8 +-
.../abilities/source/WatermarkPushDownSpec.java | 92 +---
.../nodes/exec/stream/StreamExecIntervalJoin.java | 75 +--
.../stream/StreamExecLegacyTableSourceScan.java | 82 +---
.../src/main/resources/META-INF/NOTICE | 4 -
.../table/planner/codegen/ExprCodeGenerator.scala | 18 +-
.../table/planner/codegen/GenerateUtils.scala | 32 +-
.../planner/codegen/calls/BuiltInMethods.scala | 86 ++--
.../planner/codegen/calls/FloorCeilCallGen.scala | 14 +-
.../planner/codegen/calls/FunctionGenerator.scala | 81 ++--
.../planner/codegen/calls/JsonValueCallGen.scala | 17 +-
.../table/planner/codegen/calls/LikeCallGen.scala | 9 +-
.../planner/codegen/calls/StringCallGen.scala | 7 +-
flink-table/flink-table-runtime/pom.xml | 38 ++
.../table/runtime/functions/SqlFunctionUtils.java | 148 ++++++
.../table/runtime/functions/SqlJsonUtils.java | 517 ++++++++++++++++++++-
.../GeneratedWatermarkGeneratorSupplier.java | 109 +++++
.../join/interval/FilterAllFlatMapFunction.java | 48 ++
.../join/interval/PaddingLeftMapFunction.java | 53 +++
.../join/interval/PaddingRightMapFunction.java | 53 +++
.../PeriodicWatermarkAssignerWrapper.java | 57 +++
.../PunctuatedWatermarkAssignerWrapper.java | 74 +++
.../src/main/resources/META-INF/NOTICE | 9 +
flink-table/pom.xml | 5 +-
26 files changed, 1393 insertions(+), 356 deletions(-)
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/functions/SqlLikeUtils.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/functions/SqlLikeUtils.java
index aa22466..5c3efaf 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/functions/SqlLikeUtils.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/functions/SqlLikeUtils.java
@@ -46,12 +46,30 @@ public class SqlLikeUtils {
private SqlLikeUtils() {}
- /** SQL like function with escape. */
+ /** SQL {@code LIKE} function. */
+ public static boolean like(String s, String pattern) {
+ final String regex = sqlToRegexLike(pattern, null);
+ return Pattern.matches(regex, s);
+ }
+
+ /** SQL {@code LIKE} function with escape. */
public static boolean like(String s, String pattern, String escape) {
final String regex = sqlToRegexLike(pattern, escape);
return Pattern.matches(regex, s);
}
+ /** SQL {@code SIMILAR} function. */
+ public static boolean similar(String s, String pattern) {
+ final String regex = sqlToRegexSimilar(pattern, null);
+ return Pattern.matches(regex, s);
+ }
+
+ /** SQL {@code SIMILAR} function with escape. */
+ public static boolean similar(String s, String pattern, String escape) {
+ final String regex = sqlToRegexSimilar(pattern, escape);
+ return Pattern.matches(regex, s);
+ }
+
/** Translates a SQL LIKE pattern to Java regex pattern, with optional
escape string. */
public static String sqlToRegexLike(String sqlPattern, CharSequence
escapeStr) {
final char escapeChar;
@@ -192,7 +210,7 @@ public class SqlLikeUtils {
}
/** Translates a SQL SIMILAR pattern to Java regex pattern, with optional
escape string. */
- static String sqlToRegexSimilar(String sqlPattern, CharSequence escapeStr)
{
+ public static String sqlToRegexSimilar(String sqlPattern, CharSequence
escapeStr) {
final char escapeChar;
if (escapeStr != null) {
if (escapeStr.length() != 1) {
@@ -206,7 +224,7 @@ public class SqlLikeUtils {
}
/** Translates SQL SIMILAR pattern to Java regex pattern. */
- static String sqlToRegexSimilar(String sqlPattern, char escapeChar) {
+ public static String sqlToRegexSimilar(String sqlPattern, char escapeChar)
{
similarEscapeRuleChecking(sqlPattern, escapeChar);
boolean insideCharacterEnumeration = false;
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/DateTimeUtils.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/DateTimeUtils.java
index 566956a..51b0090 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/DateTimeUtils.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/DateTimeUtils.java
@@ -1624,6 +1624,95 @@ public class DateTimeUtils {
}
//
--------------------------------------------------------------------------------------------
+ // ADD/REMOVE months
+ //
--------------------------------------------------------------------------------------------
+
+ /**
+ * Adds a given number of months to a timestamp, represented as the number
of milliseconds since
+ * the epoch.
+ */
+ public static long addMonths(long timestamp, int m) {
+ final long millis = DateTimeUtils.floorMod(timestamp,
DateTimeUtils.MILLIS_PER_DAY);
+ timestamp -= millis;
+ final long x = addMonths((int) (timestamp /
DateTimeUtils.MILLIS_PER_DAY), m);
+ return x * DateTimeUtils.MILLIS_PER_DAY + millis;
+ }
+
+ /**
+ * Adds a given number of months to a date, represented as the number of
days since the epoch.
+ */
+ public static int addMonths(int date, int m) {
+ int y0 = (int) extractFromDate(TimeUnitRange.YEAR, date);
+ int m0 = (int) extractFromDate(TimeUnitRange.MONTH, date);
+ int d0 = (int) extractFromDate(TimeUnitRange.DAY, date);
+ m0 += m;
+ int deltaYear = (int) DateTimeUtils.floorDiv(m0, 12);
+ y0 += deltaYear;
+ m0 = (int) DateTimeUtils.floorMod(m0, 12);
+ if (m0 == 0) {
+ y0 -= 1;
+ m0 += 12;
+ }
+
+ int last = lastDay(y0, m0);
+ if (d0 > last) {
+ d0 = last;
+ }
+ return ymdToUnixDate(y0, m0, d0);
+ }
+
+ private static int lastDay(int y, int m) {
+ switch (m) {
+ case 2:
+ return y % 4 == 0 && (y % 100 != 0 || y % 400 == 0) ? 29 : 28;
+ case 4:
+ case 6:
+ case 9:
+ case 11:
+ return 30;
+ default:
+ return 31;
+ }
+ }
+
+ /**
+ * Finds the number of months between two dates, each represented as the
number of days since
+ * the epoch.
+ */
+ public static int subtractMonths(int date0, int date1) {
+ if (date0 < date1) {
+ return -subtractMonths(date1, date0);
+ }
+ // Start with an estimate.
+ // Since no month has more than 31 days, the estimate is <= the true
value.
+ int m = (date0 - date1) / 31;
+ while (true) {
+ int date2 = addMonths(date1, m);
+ if (date2 >= date0) {
+ return m;
+ }
+ int date3 = addMonths(date1, m + 1);
+ if (date3 > date0) {
+ return m;
+ }
+ ++m;
+ }
+ }
+
+ public static int subtractMonths(long t0, long t1) {
+ final long millis0 = DateTimeUtils.floorMod(t0,
DateTimeUtils.MILLIS_PER_DAY);
+ final int d0 = (int) DateTimeUtils.floorDiv(t0 - millis0,
DateTimeUtils.MILLIS_PER_DAY);
+ final long millis1 = DateTimeUtils.floorMod(t1,
DateTimeUtils.MILLIS_PER_DAY);
+ final int d1 = (int) DateTimeUtils.floorDiv(t1 - millis1,
DateTimeUtils.MILLIS_PER_DAY);
+ int x = subtractMonths(d0, d1);
+ final long d2 = addMonths(d1, x);
+ if (d2 == d0 && millis0 < millis1) {
+ --x;
+ }
+ return x;
+ }
+
+ //
--------------------------------------------------------------------------------------------
// TimeUnit and TimeUnitRange enums
//
--------------------------------------------------------------------------------------------
diff --git a/flink-table/flink-table-planner/pom.xml
b/flink-table/flink-table-planner/pom.xml
index 8dd4dc9..d89cfc3 100644
--- a/flink-table/flink-table-planner/pom.xml
+++ b/flink-table/flink-table-planner/pom.xml
@@ -389,10 +389,6 @@ under the License.
<include>com.esri.geometry:esri-geometry-api</include>
<include>com.google.guava:guava</include>
<include>com.google.guava:failureaccess</include>
-
<include>com.jayway.jsonpath:json-path</include>
-
<include>com.fasterxml.jackson.core:jackson-core</include>
-
<include>com.fasterxml.jackson.core:jackson-databind</include>
-
<include>com.fasterxml.jackson.core:jackson-annotations</include>
<include>commons-codec:commons-codec</include>
<include>commons-io:commons-io</include>
@@ -426,12 +422,14 @@ under the License.
<shadedPattern>org.apache.flink.calcite.shaded.com.google</shadedPattern>
</relocation>
<relocation>
+ <!--
Packaged in runtime -->
<pattern>com.jayway</pattern>
<shadedPattern>org.apache.flink.calcite.shaded.com.jayway</shadedPattern>
</relocation>
<relocation>
+ <!--
Packaged in runtime -->
<pattern>com.fasterxml</pattern>
-
<shadedPattern>org.apache.flink.calcite.shaded.com.fasterxml</shadedPattern>
+
<shadedPattern>org.apache.flink.shaded.jackson2.com.fasterxml</shadedPattern>
</relocation>
<relocation>
<pattern>org.apache.commons.codec</pattern>
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/WatermarkPushDownSpec.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/WatermarkPushDownSpec.java
index 2648d31..8d20453 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/WatermarkPushDownSpec.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/WatermarkPushDownSpec.java
@@ -18,10 +18,7 @@
package org.apache.flink.table.planner.plan.abilities.source;
-import org.apache.flink.api.common.eventtime.Watermark;
-import org.apache.flink.api.common.eventtime.WatermarkGenerator;
import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
-import org.apache.flink.api.common.eventtime.WatermarkOutput;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.TableException;
@@ -32,6 +29,7 @@ import
org.apache.flink.table.planner.codegen.WatermarkGeneratorCodeGenerator;
import org.apache.flink.table.planner.plan.utils.FlinkRexUtil;
import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
import org.apache.flink.table.runtime.generated.GeneratedWatermarkGenerator;
+import
org.apache.flink.table.runtime.generated.GeneratedWatermarkGeneratorSupplier;
import org.apache.flink.table.types.logical.RowType;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
@@ -41,9 +39,6 @@ import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTyp
import org.apache.calcite.rex.RexNode;
import java.time.Duration;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
import scala.Option;
@@ -86,7 +81,7 @@ public class WatermarkPushDownSpec extends
SourceAbilitySpecBase {
Configuration configuration =
context.getTableConfig().getConfiguration();
WatermarkGeneratorSupplier<RowData> supplier =
- new DefaultWatermarkGeneratorSupplier(
+ new GeneratedWatermarkGeneratorSupplier(
configuration, generatedWatermarkGenerator);
WatermarkStrategy<RowData> watermarkStrategy =
WatermarkStrategy.forGenerator(supplier);
@@ -116,87 +111,4 @@ public class WatermarkPushDownSpec extends
SourceAbilitySpecBase {
}
return String.format("watermark=[%s]", expressionStr);
}
-
- /**
- * Wrapper of the {@link GeneratedWatermarkGenerator} that is used to
create {@link
- * WatermarkGenerator}. The {@link DefaultWatermarkGeneratorSupplier} uses
the {@link
- * WatermarkGeneratorSupplier.Context} to init the generated watermark
generator.
- */
- public static class DefaultWatermarkGeneratorSupplier
- implements WatermarkGeneratorSupplier<RowData> {
- private static final long serialVersionUID = 1L;
-
- private final Configuration configuration;
- private final GeneratedWatermarkGenerator generatedWatermarkGenerator;
-
- public DefaultWatermarkGeneratorSupplier(
- Configuration configuration,
- GeneratedWatermarkGenerator generatedWatermarkGenerator) {
- this.configuration = configuration;
- this.generatedWatermarkGenerator = generatedWatermarkGenerator;
- }
-
- @Override
- public WatermarkGenerator<RowData> createWatermarkGenerator(Context
context) {
-
- List<Object> references =
- new
ArrayList<>(Arrays.asList(generatedWatermarkGenerator.getReferences()));
- references.add(context);
-
- org.apache.flink.table.runtime.generated.WatermarkGenerator
innerWatermarkGenerator =
- new GeneratedWatermarkGenerator(
- generatedWatermarkGenerator.getClassName(),
- generatedWatermarkGenerator.getCode(),
- references.toArray(),
- configuration)
-
.newInstance(Thread.currentThread().getContextClassLoader());
-
- try {
- innerWatermarkGenerator.open(configuration);
- } catch (Exception e) {
- throw new RuntimeException("Fail to instantiate generated
watermark generator.", e);
- }
- return new
DefaultWatermarkGeneratorSupplier.DefaultWatermarkGenerator(
- innerWatermarkGenerator);
- }
-
- /**
- * Wrapper of the code-generated {@link
- * org.apache.flink.table.runtime.generated.WatermarkGenerator}.
- */
- public static class DefaultWatermarkGenerator implements
WatermarkGenerator<RowData> {
- private static final long serialVersionUID = 1L;
-
- private final
org.apache.flink.table.runtime.generated.WatermarkGenerator
- innerWatermarkGenerator;
- private Long currentWatermark = Long.MIN_VALUE;
-
- public DefaultWatermarkGenerator(
- org.apache.flink.table.runtime.generated.WatermarkGenerator
- watermarkGenerator) {
- this.innerWatermarkGenerator = watermarkGenerator;
- }
-
- @Override
- public void onEvent(RowData event, long eventTimestamp,
WatermarkOutput output) {
- try {
- Long watermark =
innerWatermarkGenerator.currentWatermark(event);
- if (watermark != null) {
- currentWatermark = watermark;
- }
- } catch (Exception e) {
- throw new RuntimeException(
- String.format(
- "Generated WatermarkGenerator fails to
generate for row: %s.",
- event),
- e);
- }
- }
-
- @Override
- public void onPeriodicEmit(WatermarkOutput output) {
- output.emitWatermark(new Watermark(currentWatermark));
- }
- }
- }
}
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecIntervalJoin.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecIntervalJoin.java
index b5a8163..036d71e 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecIntervalJoin.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecIntervalJoin.java
@@ -18,11 +18,7 @@
package org.apache.flink.table.planner.plan.nodes.exec.stream;
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.dag.Transformation;
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.operators.StreamFlatMap;
import org.apache.flink.streaming.api.operators.StreamMap;
@@ -47,12 +43,14 @@ import
org.apache.flink.table.runtime.generated.GeneratedJoinCondition;
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
import
org.apache.flink.table.runtime.operators.join.KeyedCoProcessOperatorWithWatermarkDelay;
import org.apache.flink.table.runtime.operators.join.OuterJoinPaddingUtil;
+import
org.apache.flink.table.runtime.operators.join.interval.FilterAllFlatMapFunction;
import
org.apache.flink.table.runtime.operators.join.interval.IntervalJoinFunction;
+import
org.apache.flink.table.runtime.operators.join.interval.PaddingLeftMapFunction;
+import
org.apache.flink.table.runtime.operators.join.interval.PaddingRightMapFunction;
import
org.apache.flink.table.runtime.operators.join.interval.ProcTimeIntervalJoin;
import
org.apache.flink.table.runtime.operators.join.interval.RowTimeIntervalJoin;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.util.Collector;
import org.apache.flink.util.Preconditions;
import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
@@ -196,73 +194,6 @@ public class StreamExecIntervalJoin extends
ExecNodeBase<RowData>
}
}
- private static class FilterAllFlatMapFunction
- implements FlatMapFunction<RowData, RowData>,
ResultTypeQueryable<RowData> {
- private static final long serialVersionUID = 1L;
-
- private final InternalTypeInfo<RowData> outputTypeInfo;
-
- public FilterAllFlatMapFunction(InternalTypeInfo<RowData>
inputTypeInfo) {
- this.outputTypeInfo = inputTypeInfo;
- }
-
- @Override
- public void flatMap(RowData value, Collector<RowData> out) {}
-
- @Override
- public TypeInformation<RowData> getProducedType() {
- return outputTypeInfo;
- }
- }
-
- private static class PaddingLeftMapFunction
- implements MapFunction<RowData, RowData>,
ResultTypeQueryable<RowData> {
- private static final long serialVersionUID = 1L;
-
- private final OuterJoinPaddingUtil paddingUtil;
- private final InternalTypeInfo<RowData> outputTypeInfo;
-
- public PaddingLeftMapFunction(
- OuterJoinPaddingUtil paddingUtil, InternalTypeInfo<RowData>
returnType) {
- this.paddingUtil = paddingUtil;
- this.outputTypeInfo = returnType;
- }
-
- @Override
- public RowData map(RowData value) {
- return paddingUtil.padLeft(value);
- }
-
- @Override
- public TypeInformation<RowData> getProducedType() {
- return outputTypeInfo;
- }
- }
-
- private static class PaddingRightMapFunction
- implements MapFunction<RowData, RowData>,
ResultTypeQueryable<RowData> {
- private static final long serialVersionUID = 1L;
-
- private final OuterJoinPaddingUtil paddingUtil;
- private final InternalTypeInfo<RowData> outputTypeInfo;
-
- public PaddingRightMapFunction(
- OuterJoinPaddingUtil paddingUtil, InternalTypeInfo<RowData>
returnType) {
- this.paddingUtil = paddingUtil;
- this.outputTypeInfo = returnType;
- }
-
- @Override
- public RowData map(RowData value) {
- return paddingUtil.padRight(value);
- }
-
- @Override
- public TypeInformation<RowData> getProducedType() {
- return outputTypeInfo;
- }
- }
-
private Transformation<RowData> createNegativeWindowSizeJoin(
JoinSpec joinSpec,
Transformation<RowData> leftInputTransform,
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLegacyTableSourceScan.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLegacyTableSourceScan.java
index 50b87ab..e8b182d 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLegacyTableSourceScan.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLegacyTableSourceScan.java
@@ -25,12 +25,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
-import
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.data.util.DataFormatConverters;
import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
import org.apache.flink.table.planner.codegen.OperatorCodeGenerator;
@@ -41,6 +36,8 @@ import org.apache.flink.table.planner.plan.utils.ScanUtil;
import org.apache.flink.table.planner.sources.TableSourceUtil;
import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
import org.apache.flink.table.runtime.operators.TableStreamOperator;
+import
org.apache.flink.table.runtime.operators.wmassigners.PeriodicWatermarkAssignerWrapper;
+import
org.apache.flink.table.runtime.operators.wmassigners.PunctuatedWatermarkAssignerWrapper;
import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.table.sources.TableSource;
@@ -48,9 +45,7 @@ import
org.apache.flink.table.sources.wmstrategies.PeriodicWatermarkAssigner;
import org.apache.flink.table.sources.wmstrategies.PunctuatedWatermarkAssigner;
import org.apache.flink.table.sources.wmstrategies.WatermarkStrategy;
import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.FieldsDataType;
import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.types.Row;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rex.RexNode;
@@ -176,77 +171,4 @@ public class StreamExecLegacyTableSourceScan extends
CommonExecLegacyTableSource
.name(tableSource.explainSource())
.getTransformation();
}
-
- /** Generates periodic watermarks based on a {@link
PeriodicWatermarkAssigner}. */
- private static class PeriodicWatermarkAssignerWrapper
- implements AssignerWithPeriodicWatermarks<RowData> {
- private static final long serialVersionUID = 1L;
- private final PeriodicWatermarkAssigner assigner;
- private final int timeFieldIdx;
-
- /**
- * @param timeFieldIdx the index of the rowtime attribute.
- * @param assigner the watermark assigner.
- */
- private PeriodicWatermarkAssignerWrapper(
- PeriodicWatermarkAssigner assigner, int timeFieldIdx) {
- this.assigner = assigner;
- this.timeFieldIdx = timeFieldIdx;
- }
-
- @Nullable
- @Override
- public Watermark getCurrentWatermark() {
- return assigner.getWatermark();
- }
-
- @Override
- public long extractTimestamp(RowData row, long recordTimestamp) {
- long timestamp = row.getTimestamp(timeFieldIdx,
3).getMillisecond();
- assigner.nextTimestamp(timestamp);
- return 0;
- }
- }
-
- /** Generates periodic watermarks based on a
[[PunctuatedWatermarkAssigner]]. */
- private static class PunctuatedWatermarkAssignerWrapper
- implements AssignerWithPunctuatedWatermarks<RowData> {
- private static final long serialVersionUID = 1L;
- private final PunctuatedWatermarkAssigner assigner;
- private final int timeFieldIdx;
- private final DataFormatConverters.DataFormatConverter<RowData, Row>
converter;
-
- /**
- * @param timeFieldIdx the index of the rowtime attribute.
- * @param assigner the watermark assigner.
- * @param sourceType the type of source
- */
- @SuppressWarnings("unchecked")
- private PunctuatedWatermarkAssignerWrapper(
- PunctuatedWatermarkAssigner assigner, int timeFieldIdx,
DataType sourceType) {
- this.assigner = assigner;
- this.timeFieldIdx = timeFieldIdx;
- DataType originDataType;
- if (sourceType instanceof FieldsDataType) {
- originDataType = sourceType;
- } else {
- originDataType = DataTypes.ROW(DataTypes.FIELD("f0",
sourceType));
- }
- converter =
- DataFormatConverters.getConverterForDataType(
- originDataType.bridgedTo(Row.class));
- }
-
- @Nullable
- @Override
- public Watermark checkAndGetNextWatermark(RowData row, long
extractedTimestamp) {
- long timestamp = row.getLong(timeFieldIdx);
- return assigner.getWatermark(converter.toExternal(row), timestamp);
- }
-
- @Override
- public long extractTimestamp(RowData element, long recordTimestamp) {
- return 0;
- }
- }
}
diff --git a/flink-table/flink-table-planner/src/main/resources/META-INF/NOTICE
b/flink-table/flink-table-planner/src/main/resources/META-INF/NOTICE
index 08a07c7..2b53463 100644
--- a/flink-table/flink-table-planner/src/main/resources/META-INF/NOTICE
+++ b/flink-table/flink-table-planner/src/main/resources/META-INF/NOTICE
@@ -9,10 +9,6 @@ This project bundles the following dependencies under the
Apache Software Licens
- com.google.guava:guava:29.0-jre
- com.google.guava:failureaccess:1.0.1
- com.esri.geometry:esri-geometry-api:2.2.0
-- com.fasterxml.jackson.core:jackson-annotations:2.13.0
-- com.fasterxml.jackson.core:jackson-core:2.13.0
-- com.fasterxml.jackson.core:jackson-databind:2.13.0
-- com.jayway.jsonpath:json-path:2.4.0
- org.apache.calcite:calcite-core:1.26.0
- org.apache.calcite:calcite-linq4j:1.26.0
- org.apache.calcite.avatica:avatica-core:1.17.0
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala
index 6009cc1..f083c39 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala
@@ -18,6 +18,10 @@
package org.apache.flink.table.planner.codegen
+import org.apache.calcite.rex._
+import org.apache.calcite.sql.`type`.{ReturnTypes, SqlTypeName}
+import org.apache.calcite.sql.{SqlKind, SqlOperator}
+import org.apache.calcite.util.{Sarg, TimestampString}
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.table.api.TableException
import org.apache.flink.table.data.RowData
@@ -42,10 +46,6 @@ import
org.apache.flink.table.runtime.typeutils.TypeCheckUtils.{isNumeric, isTem
import org.apache.flink.table.types.logical._
import
org.apache.flink.table.types.logical.utils.LogicalTypeChecks.{getFieldCount,
isCompositeType}
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo
-import org.apache.calcite.rex._
-import org.apache.calcite.sql.`type`.{ReturnTypes, SqlTypeName}
-import org.apache.calcite.sql.{SqlKind, SqlOperator}
-import org.apache.calcite.util.{Sarg, TimestampString}
import scala.collection.JavaConversions._
@@ -418,15 +418,7 @@ class ExprCodeGenerator(ctx: CodeGeneratorContext,
nullableInput: Boolean)
case _ =>
literal.getValue3
}
- // Make sure to convert avatica time types to flink internal types
- val convertedValue = value match {
- case tu: org.apache.calcite.avatica.util.TimeUnit =>
- org.apache.flink.table.utils.DateTimeUtils.TimeUnit.valueOf(tu.name())
- case tur: org.apache.calcite.avatica.util.TimeUnitRange =>
-
org.apache.flink.table.utils.DateTimeUtils.TimeUnitRange.valueOf(tur.name())
- case _ => value
- }
- generateLiteral(ctx, resultType, convertedValue)
+ generateLiteral(ctx, resultType, value)
}
override def visitCorrelVariable(correlVariable: RexCorrelVariable):
GeneratedExpression = {
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/GenerateUtils.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/GenerateUtils.scala
index 3fa5a27..fcf92b4 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/GenerateUtils.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/GenerateUtils.scala
@@ -20,7 +20,7 @@ package org.apache.flink.table.planner.codegen
import org.apache.flink.api.common.ExecutionConfig
import org.apache.flink.api.common.typeinfo.{AtomicType => AtomicTypeInfo}
-import org.apache.flink.api.java.typeutils.GenericTypeInfo
+import org.apache.flink.table.api.{JsonExistsOnError, JsonQueryOnEmptyOrError,
JsonQueryWrapper}
import org.apache.flink.table.data._
import org.apache.flink.table.data.binary.BinaryRowData
import org.apache.flink.table.data.utils.JoinedRowData
@@ -30,13 +30,14 @@ import
org.apache.flink.table.planner.codegen.GeneratedExpression.{ALWAYS_NULL,
import org.apache.flink.table.planner.codegen.calls.CurrentTimePointCallGen
import org.apache.flink.table.planner.plan.nodes.exec.spec.SortSpec
import org.apache.flink.table.planner.plan.utils.SortUtil
+import
org.apache.flink.table.planner.utils.TimestampStringUtils.toLocalDateTime
import
org.apache.flink.table.runtime.typeutils.TypeCheckUtils.{isCharacterString,
isReference, isTemporal}
import org.apache.flink.table.types.logical.LogicalTypeRoot._
import org.apache.flink.table.types.logical._
import
org.apache.flink.table.types.logical.utils.LogicalTypeChecks.{getFieldCount,
getFieldTypes}
-import
org.apache.flink.table.planner.utils.TimestampStringUtils.toLocalDateTime
import org.apache.calcite.avatica.util.ByteString
+import org.apache.calcite.sql.{SqlJsonExistsErrorBehavior,
SqlJsonQueryEmptyOrErrorBehavior, SqlJsonQueryWrapperBehavior}
import org.apache.calcite.util.TimestampString
import org.apache.commons.lang3.StringEscapeUtils
@@ -443,13 +444,34 @@ object GenerateUtils {
}
}
- def generateSymbol(enum: Enum[_]): GeneratedExpression = {
+ def generateSymbol(value: Enum[_]): GeneratedExpression = {
+ // Make sure to convert calcite enum types to flink types
+ val convertedValue = value match {
+ case tu: org.apache.calcite.avatica.util.TimeUnit =>
+ org.apache.flink.table.utils.DateTimeUtils.TimeUnit.valueOf(tu.name())
+ case tur: org.apache.calcite.avatica.util.TimeUnitRange =>
+
org.apache.flink.table.utils.DateTimeUtils.TimeUnitRange.valueOf(tur.name())
+ case jeeb: SqlJsonExistsErrorBehavior =>
+ JsonExistsOnError.valueOf(jeeb.name())
+ case jqeeb: SqlJsonQueryEmptyOrErrorBehavior =>
+ JsonQueryOnEmptyOrError.valueOf(jqeeb.name())
+ case jqwb: SqlJsonQueryWrapperBehavior => jqwb match {
+ case SqlJsonQueryWrapperBehavior.WITHOUT_ARRAY =>
+ JsonQueryWrapper.WITHOUT_ARRAY
+ case SqlJsonQueryWrapperBehavior.WITH_CONDITIONAL_ARRAY =>
+ JsonQueryWrapper.CONDITIONAL_ARRAY
+ case SqlJsonQueryWrapperBehavior.WITH_UNCONDITIONAL_ARRAY =>
+ JsonQueryWrapper.UNCONDITIONAL_ARRAY
+ }
+ case _ => value
+ }
+
GeneratedExpression(
- qualifyEnum(enum),
+ qualifyEnum(convertedValue),
NEVER_NULL,
NO_CODE,
new SymbolType(false),
- literalValue = Some(enum))
+ literalValue = Some(convertedValue))
}
/**
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/BuiltInMethods.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/BuiltInMethods.scala
index 0ee5b96..0b5e6a7 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/BuiltInMethods.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/BuiltInMethods.scala
@@ -18,14 +18,15 @@
package org.apache.flink.table.planner.codegen.calls
+import org.apache.flink.table.api.{JsonExistsOnError, JsonQueryOnEmptyOrError,
JsonQueryWrapper, JsonValueOnEmptyOrError}
+import org.apache.flink.table.data.binary.{BinaryStringData,
BinaryStringDataUtil}
import org.apache.flink.table.data.{DecimalData, DecimalDataUtils,
TimestampData}
+import org.apache.flink.table.functions.SqlLikeUtils
import org.apache.flink.table.runtime.functions._
import org.apache.flink.table.utils.DateTimeUtils
import org.apache.flink.table.utils.DateTimeUtils.TimeUnitRange
+
import org.apache.calcite.linq4j.tree.Types
-import org.apache.calcite.runtime.{JsonFunctions, SqlFunctions}
-import org.apache.calcite.sql.{SqlJsonExistsErrorBehavior,
SqlJsonQueryEmptyOrErrorBehavior, SqlJsonQueryWrapperBehavior,
SqlJsonValueEmptyOrErrorBehavior}
-import org.apache.flink.table.data.binary.{BinaryStringData,
BinaryStringDataUtil}
import java.lang.reflect.Method
import java.lang.{Byte => JByte, Integer => JInteger, Long => JLong, Short =>
JShort}
@@ -82,17 +83,22 @@ object BuiltInMethods {
val POWER_DEC_NUM = Types.lookupMethod(
classOf[SqlFunctionUtils], "power", classOf[DecimalData], classOf[Double])
-
// TRIGONOMETRIC FUNCTIONS
val LN = Types.lookupMethod(classOf[SqlFunctionUtils], "log",
classOf[Double])
val LN_DEC = Types.lookupMethod(classOf[SqlFunctionUtils], "log",
classOf[DecimalData])
- val ABS = Types.lookupMethod(classOf[SqlFunctions], "abs", classOf[Double])
+ val ABS = Types.lookupMethod(classOf[SqlFunctionUtils], "abs",
classOf[Double])
val ABS_DEC = Types.lookupMethod(classOf[SqlFunctionUtils], "abs",
classOf[DecimalData])
+ val FLOOR = Types.lookupMethod(classOf[SqlFunctionUtils], "floor",
classOf[Double])
+ val FLOOR_INTEGRAL = Types.lookupMethod(classOf[SqlFunctionUtils], "floor",
classOf[Int],
+ classOf[Int])
val FLOOR_DEC = Types.lookupMethod(classOf[SqlFunctionUtils], "floor",
classOf[DecimalData])
+ val CEIL = Types.lookupMethod(classOf[SqlFunctionUtils], "ceil",
classOf[Double])
+ val CEIL_INTEGRAL = Types.lookupMethod(classOf[SqlFunctionUtils], "ceil",
classOf[Int],
+ classOf[Int])
val CEIL_DEC = Types.lookupMethod(classOf[SqlFunctionUtils], "ceil",
classOf[DecimalData])
val SIN = Types.lookupMethod(classOf[Math], "sin", classOf[Double])
@@ -104,7 +110,7 @@ object BuiltInMethods {
val TAN = Types.lookupMethod(classOf[Math], "tan", classOf[Double])
val TAN_DEC = Types.lookupMethod(classOf[SqlFunctionUtils], "tan",
classOf[DecimalData])
- val COT = Types.lookupMethod(classOf[SqlFunctions], "cot", classOf[Double])
+ val COT = Types.lookupMethod(classOf[SqlFunctionUtils], "cot",
classOf[Double])
val COT_DEC = Types.lookupMethod(classOf[SqlFunctionUtils], "cot",
classOf[DecimalData])
val ASIN = Types.lookupMethod(classOf[Math], "asin", classOf[Double])
@@ -147,10 +153,12 @@ object BuiltInMethods {
val SIGN_LONG = Types.lookupMethod(classOf[JLong], "signum", classOf[Long])
val SIGN_DEC = Types.lookupMethod(classOf[SqlFunctionUtils], "sign",
classOf[DecimalData])
- val ROUND_DOUBLE = Types.lookupMethod(classOf[SqlFunctions], "sround",
classOf[Double],
+ val ROUND_DOUBLE = Types.lookupMethod(classOf[SqlFunctionUtils], "sround",
classOf[Double],
+ classOf[Int])
+ val ROUND_INT = Types.lookupMethod(classOf[SqlFunctionUtils], "sround",
classOf[Int],
+ classOf[Int])
+ val ROUND_LONG = Types.lookupMethod(classOf[SqlFunctionUtils], "sround",
classOf[Long],
classOf[Int])
- val ROUND_INT = Types.lookupMethod(classOf[SqlFunctions], "sround",
classOf[Int], classOf[Int])
- val ROUND_LONG = Types.lookupMethod(classOf[SqlFunctions], "sround",
classOf[Long], classOf[Int])
val ROUND_BYTE = Types.lookupMethod(classOf[SqlFunctionUtils], "sround",
classOf[Byte], classOf[Int])
val ROUND_SHORT = Types.lookupMethod(classOf[SqlFunctionUtils], "sround",
@@ -399,24 +407,24 @@ object BuiltInMethods {
val STRING_TO_TIME = Types.lookupMethod(
classOf[DateTimeUtils], "parseTime", classOf[String])
- val TRUNCATE_DOUBLE_ONE = Types.lookupMethod(classOf[SqlFunctions],
"struncate",
+ val TRUNCATE_DOUBLE_ONE = Types.lookupMethod(classOf[SqlFunctionUtils],
"struncate",
classOf[Double])
val TRUNCATE_FLOAT_ONE = Types.lookupMethod(classOf[SqlFunctionUtils],
"struncate",
classOf[Float])
- val TRUNCATE_INT_ONE = Types.lookupMethod(classOf[SqlFunctions], "struncate",
+ val TRUNCATE_INT_ONE = Types.lookupMethod(classOf[SqlFunctionUtils],
"struncate",
classOf[Int])
- val TRUNCATE_LONG_ONE = Types.lookupMethod(classOf[SqlFunctions],
"struncate",
+ val TRUNCATE_LONG_ONE = Types.lookupMethod(classOf[SqlFunctionUtils],
"struncate",
classOf[Long])
val TRUNCATE_DEC_ONE = Types.lookupMethod(classOf[SqlFunctionUtils],
"struncate",
classOf[DecimalData])
- val TRUNCATE_DOUBLE = Types.lookupMethod(classOf[SqlFunctions], "struncate",
+ val TRUNCATE_DOUBLE = Types.lookupMethod(classOf[SqlFunctionUtils],
"struncate",
classOf[Double], classOf[Int])
val TRUNCATE_FLOAT = Types.lookupMethod(classOf[SqlFunctionUtils],
"struncate",
classOf[Float], classOf[Int])
- val TRUNCATE_INT = Types.lookupMethod(classOf[SqlFunctions], "struncate",
+ val TRUNCATE_INT = Types.lookupMethod(classOf[SqlFunctionUtils], "struncate",
classOf[Int], classOf[Int])
- val TRUNCATE_LONG = Types.lookupMethod(classOf[SqlFunctions], "struncate",
+ val TRUNCATE_LONG = Types.lookupMethod(classOf[SqlFunctionUtils],
"struncate",
classOf[Long], classOf[Int])
val TRUNCATE_DEC = Types.lookupMethod(classOf[SqlFunctionUtils], "struncate",
classOf[DecimalData], classOf[Int])
@@ -439,29 +447,41 @@ object BuiltInMethods {
val TRUNCATE_SQL_TIMESTAMP = Types.lookupMethod(classOf[DateTimeUtils],
"truncate",
classOf[TimestampData], classOf[Int])
- val ADD_MONTHS = Types.lookupMethod(classOf[SqlFunctions], "addMonths",
+ val ADD_MONTHS = Types.lookupMethod(classOf[DateTimeUtils], "addMonths",
classOf[Long], classOf[Int])
- val SUBTRACT_MONTHS = Types.lookupMethod(classOf[SqlFunctions],
"subtractMonths",
+ val SUBTRACT_MONTHS = Types.lookupMethod(classOf[DateTimeUtils],
"subtractMonths",
classOf[Long], classOf[Long])
// JSON functions
- val JSON_EXISTS = Types.lookupMethod(classOf[JsonFunctions], "jsonExists",
+ val JSON_EXISTS = Types.lookupMethod(classOf[SqlJsonUtils], "jsonExists",
classOf[String], classOf[String])
- val JSON_EXISTS_ON_ERROR = Types.lookupMethod(classOf[JsonFunctions],
"jsonExists",
- classOf[String], classOf[String], classOf[SqlJsonExistsErrorBehavior])
+ val JSON_EXISTS_ON_ERROR = Types.lookupMethod(classOf[SqlJsonUtils],
"jsonExists",
+ classOf[String], classOf[String], classOf[JsonExistsOnError])
- val JSON_VALUE = Types.lookupMethod(classOf[JsonFunctions], "jsonValue",
+ val JSON_VALUE = Types.lookupMethod(classOf[SqlJsonUtils], "jsonValue",
classOf[String], classOf[String],
- classOf[SqlJsonValueEmptyOrErrorBehavior], classOf[Any],
- classOf[SqlJsonValueEmptyOrErrorBehavior], classOf[Any]
+ classOf[JsonValueOnEmptyOrError], classOf[Any],
+ classOf[JsonValueOnEmptyOrError], classOf[Any]
)
- val JSON_QUERY = Types.lookupMethod(classOf[JsonFunctions], "jsonQuery",
- classOf[String], classOf[String], classOf[SqlJsonQueryWrapperBehavior],
- classOf[SqlJsonQueryEmptyOrErrorBehavior],
classOf[SqlJsonQueryEmptyOrErrorBehavior])
+ val JSON_QUERY = Types.lookupMethod(classOf[SqlJsonUtils], "jsonQuery",
+ classOf[String], classOf[String], classOf[JsonQueryWrapper],
+ classOf[JsonQueryOnEmptyOrError], classOf[JsonQueryOnEmptyOrError])
+
+ val IS_JSON_VALUE = Types.lookupMethod(classOf[SqlJsonUtils], "isJsonValue",
+ classOf[String])
+
+ val IS_JSON_OBJECT = Types.lookupMethod(classOf[SqlJsonUtils],
"isJsonObject",
+ classOf[String])
+
+ val IS_JSON_ARRAY = Types.lookupMethod(classOf[SqlJsonUtils], "isJsonArray",
+ classOf[String])
+
+ val IS_JSON_SCALAR = Types.lookupMethod(classOf[SqlJsonUtils],
"isJsonScalar",
+ classOf[String])
// STRING functions
@@ -522,6 +542,20 @@ object BuiltInMethods {
val STRING_DATA_TO_TIMESTAMP_WITH_ZONE = Types.lookupMethod(
classOf[BinaryStringDataUtil], "toTimestamp", classOf[BinaryStringData],
classOf[TimeZone])
+ val STRING_LIKE = Types.lookupMethod(
+ classOf[SqlLikeUtils], "like", classOf[String], classOf[String])
+
+ val STRING_LIKE_WITH_ESCAPE = Types.lookupMethod(
+ classOf[SqlLikeUtils], "like", classOf[String], classOf[String],
classOf[String])
+
+ val STRING_SIMILAR = Types.lookupMethod(
+ classOf[SqlLikeUtils], "similar", classOf[String], classOf[String])
+
+ val STRING_SIMILAR_WITH_ESCAPE = Types.lookupMethod(
+ classOf[SqlLikeUtils], "similar", classOf[String], classOf[String],
classOf[String])
+
+ val STRING_INITCAP = Types.lookupMethod(classOf[SqlFunctionUtils],
"initcap", classOf[String])
+
// DecimalData functions
val DECIMAL_TO_DECIMAL = Types.lookupMethod(
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/FloorCeilCallGen.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/FloorCeilCallGen.scala
index e9e9df8..8333af7 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/FloorCeilCallGen.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/FloorCeilCallGen.scala
@@ -22,7 +22,6 @@ import
org.apache.flink.table.planner.codegen.CodeGenUtils.{TIMESTAMP_DATA, getE
import
org.apache.flink.table.planner.codegen.GenerateUtils.generateCallIfArgsNotNull
import org.apache.flink.table.planner.codegen.{CodeGeneratorContext,
GeneratedExpression}
import org.apache.flink.table.types.logical.{LogicalType, LogicalTypeRoot}
-
import org.apache.flink.table.utils.DateTimeUtils.TimeUnitRange
import org.apache.flink.table.utils.DateTimeUtils.TimeUnitRange._
@@ -34,6 +33,8 @@ import java.util.TimeZone
*/
class FloorCeilCallGen(
arithmeticMethod: Method,
+ arithmeticIntegralMethod: Option[Method] = None,
+ decimalMethod: Option[Method] = None,
temporalMethod: Option[Method] = None)
extends MethodCallGen(arithmeticMethod) {
@@ -49,7 +50,7 @@ class FloorCeilCallGen(
case LogicalTypeRoot.DECIMAL =>
generateCallIfArgsNotNull(ctx, returnType, operands) {
operandResultTerms =>
-
s"${qualifyMethod(arithmeticMethod)}(${operandResultTerms.mkString(", ")})"
+
s"${qualifyMethod(decimalMethod.get)}(${operandResultTerms.mkString(", ")})"
}
case _ =>
operands.head // no floor/ceil necessary
@@ -98,13 +99,14 @@ class FloorCeilCallGen(
case LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE =>
val longTerm = s"${terms.head}.getMillisecond()"
s"""
-
|$TIMESTAMP_DATA.fromEpochMillis(${qualifyMethod(arithmeticMethod)}(
- | $longTerm,
- | (long) ${unit.startUnit.multiplier.intValue()}))
+ |$TIMESTAMP_DATA.fromEpochMillis(
+ | ${qualifyMethod(arithmeticIntegralMethod.get)}(
+ | $longTerm,
+ | (long) ${unit.startUnit.multiplier.intValue()}))
""".stripMargin
case _ =>
s"""
- |${qualifyMethod(arithmeticMethod)}(
+ |${qualifyMethod(arithmeticIntegralMethod.get)}(
| ($internalType) ${terms.head},
| ($internalType)
${unit.startUnit.multiplier.intValue()})
|""".stripMargin
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/FunctionGenerator.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/FunctionGenerator.scala
index b7ce412..22cb1a9 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/FunctionGenerator.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/FunctionGenerator.scala
@@ -18,6 +18,7 @@
package org.apache.flink.table.planner.codegen.calls
+import org.apache.calcite.sql.SqlOperator
import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.configuration.ExecutionOptions
import org.apache.flink.table.api.TableConfig
@@ -26,11 +27,7 @@ import
org.apache.flink.table.runtime.types.PlannerTypeUtils.isPrimitive
import org.apache.flink.table.types.logical.LogicalTypeRoot._
import org.apache.flink.table.types.logical.{LogicalType, LogicalTypeRoot}
-import org.apache.calcite.sql.SqlOperator
-import org.apache.calcite.util.BuiltInMethod
-
import java.lang.reflect.Method
-
import scala.collection.mutable
class FunctionGenerator private(config: TableConfig) {
@@ -115,22 +112,22 @@ class FunctionGenerator private(config: TableConfig) {
addSqlFunction(
FLOOR,
Seq(DOUBLE),
- new FloorCeilCallGen(BuiltInMethod.FLOOR.method))
+ new FloorCeilCallGen(BuiltInMethods.FLOOR))
- addSqlFunction(
+ addSqlFunctionMethod(
FLOOR,
Seq(DECIMAL),
- new FloorCeilCallGen(BuiltInMethods.FLOOR_DEC))
+ BuiltInMethods.FLOOR_DEC)
addSqlFunction(
CEIL,
Seq(DOUBLE),
- new FloorCeilCallGen(BuiltInMethod.CEIL.method))
+ new FloorCeilCallGen(BuiltInMethods.CEIL))
- addSqlFunction(
+ addSqlFunctionMethod(
CEIL,
Seq(DECIMAL),
- new FloorCeilCallGen(BuiltInMethods.CEIL_DEC))
+ BuiltInMethods.CEIL_DEC)
addSqlFunctionMethod(
SIN,
@@ -462,28 +459,36 @@ class FunctionGenerator private(config: TableConfig) {
FLOOR,
Seq(DATE, SYMBOL),
new FloorCeilCallGen(
- BuiltInMethod.FLOOR.method,
+ BuiltInMethods.FLOOR,
+ Some(BuiltInMethods.FLOOR_INTEGRAL),
+ Some(BuiltInMethods.FLOOR_DEC),
Some(BuiltInMethods.UNIX_DATE_FLOOR)))
addSqlFunction(
FLOOR,
Seq(TIME_WITHOUT_TIME_ZONE, SYMBOL),
new FloorCeilCallGen(
- BuiltInMethod.FLOOR.method,
+ BuiltInMethods.FLOOR,
+ Some(BuiltInMethods.FLOOR_INTEGRAL),
+ Some(BuiltInMethods.FLOOR_DEC),
Some(BuiltInMethods.UNIX_DATE_FLOOR)))
addSqlFunction(
FLOOR,
Seq(TIMESTAMP_WITHOUT_TIME_ZONE, SYMBOL),
new FloorCeilCallGen(
- BuiltInMethod.FLOOR.method,
+ BuiltInMethods.FLOOR,
+ Some(BuiltInMethods.FLOOR_INTEGRAL),
+ Some(BuiltInMethods.FLOOR_DEC),
Some(BuiltInMethods.UNIX_TIMESTAMP_FLOOR)))
addSqlFunction(
FLOOR,
Seq(TIMESTAMP_WITH_LOCAL_TIME_ZONE, SYMBOL),
new FloorCeilCallGen(
- BuiltInMethod.FLOOR.method,
+ BuiltInMethods.FLOOR,
+ Some(BuiltInMethods.FLOOR_INTEGRAL),
+ Some(BuiltInMethods.FLOOR_DEC),
Some(BuiltInMethods.TIMESTAMP_FLOOR_TIME_ZONE)))
// TODO: fixme if CALCITE-3199 fixed
@@ -492,28 +497,36 @@ class FunctionGenerator private(config: TableConfig) {
CEIL,
Seq(DATE, SYMBOL),
new FloorCeilCallGen(
- BuiltInMethod.CEIL.method,
+ BuiltInMethods.CEIL,
+ Some(BuiltInMethods.CEIL_INTEGRAL),
+ Some(BuiltInMethods.CEIL_DEC),
Some(BuiltInMethods.UNIX_DATE_CEIL)))
addSqlFunction(
CEIL,
Seq(TIME_WITHOUT_TIME_ZONE, SYMBOL),
new FloorCeilCallGen(
- BuiltInMethod.CEIL.method,
+ BuiltInMethods.CEIL,
+ Some(BuiltInMethods.CEIL_INTEGRAL),
+ Some(BuiltInMethods.CEIL_DEC),
Some(BuiltInMethods.UNIX_DATE_CEIL)))
addSqlFunction(
CEIL,
Seq(TIMESTAMP_WITHOUT_TIME_ZONE, SYMBOL),
new FloorCeilCallGen(
- BuiltInMethod.CEIL.method,
+ BuiltInMethods.CEIL,
+ Some(BuiltInMethods.CEIL_INTEGRAL),
+ Some(BuiltInMethods.CEIL_DEC),
Some(BuiltInMethods.UNIX_TIMESTAMP_CEIL)))
addSqlFunction(
CEIL,
Seq(TIMESTAMP_WITH_LOCAL_TIME_ZONE, SYMBOL),
new FloorCeilCallGen(
- BuiltInMethod.CEIL.method,
+ BuiltInMethods.CEIL,
+ Some(BuiltInMethods.CEIL_INTEGRAL),
+ Some(BuiltInMethods.CEIL_DEC),
Some(BuiltInMethods.TIMESTAMP_CEIL_TIME_ZONE)))
addSqlFunction(
@@ -821,44 +834,44 @@ class FunctionGenerator private(config: TableConfig) {
BuiltInMethods.JSON_QUERY)
addSqlFunctionMethod(IS_JSON_VALUE, Seq(CHAR),
- BuiltInMethod.IS_JSON_VALUE.method, argsNullable = true)
+ BuiltInMethods.IS_JSON_VALUE, argsNullable = true)
addSqlFunctionMethod(IS_JSON_VALUE, Seq(VARCHAR),
- BuiltInMethod.IS_JSON_VALUE.method, argsNullable = true)
+ BuiltInMethods.IS_JSON_VALUE, argsNullable = true)
addSqlFunctionMethod(IS_JSON_OBJECT, Seq(CHAR),
- BuiltInMethod.IS_JSON_OBJECT.method, argsNullable = true)
+ BuiltInMethods.IS_JSON_OBJECT, argsNullable = true)
addSqlFunctionMethod(IS_JSON_OBJECT, Seq(VARCHAR),
- BuiltInMethod.IS_JSON_OBJECT.method, argsNullable = true)
+ BuiltInMethods.IS_JSON_OBJECT, argsNullable = true)
addSqlFunctionMethod(IS_JSON_ARRAY, Seq(CHAR),
- BuiltInMethod.IS_JSON_ARRAY.method, argsNullable = true)
+ BuiltInMethods.IS_JSON_ARRAY, argsNullable = true)
addSqlFunctionMethod(IS_JSON_ARRAY, Seq(VARCHAR),
- BuiltInMethod.IS_JSON_ARRAY.method, argsNullable = true)
+ BuiltInMethods.IS_JSON_ARRAY, argsNullable = true)
addSqlFunctionMethod(IS_JSON_SCALAR, Seq(CHAR),
- BuiltInMethod.IS_JSON_SCALAR.method, argsNullable = true)
+ BuiltInMethods.IS_JSON_SCALAR, argsNullable = true)
addSqlFunctionMethod(IS_JSON_SCALAR, Seq(VARCHAR),
- BuiltInMethod.IS_JSON_SCALAR.method, argsNullable = true)
+ BuiltInMethods.IS_JSON_SCALAR, argsNullable = true)
addSqlFunction(IS_NOT_JSON_VALUE, Seq(CHAR),
- new NotCallGen(new MethodCallGen(BuiltInMethod.IS_JSON_VALUE.method,
argsNullable = true)))
+ new NotCallGen(new MethodCallGen(BuiltInMethods.IS_JSON_VALUE,
argsNullable = true)))
addSqlFunction(IS_NOT_JSON_VALUE, Seq(VARCHAR),
- new NotCallGen(new MethodCallGen(BuiltInMethod.IS_JSON_VALUE.method,
argsNullable = true)))
+ new NotCallGen(new MethodCallGen(BuiltInMethods.IS_JSON_VALUE,
argsNullable = true)))
addSqlFunction(IS_NOT_JSON_OBJECT, Seq(CHAR),
- new NotCallGen(new MethodCallGen(BuiltInMethod.IS_JSON_OBJECT.method,
argsNullable = true)))
+ new NotCallGen(new MethodCallGen(BuiltInMethods.IS_JSON_OBJECT,
argsNullable = true)))
addSqlFunction(IS_NOT_JSON_OBJECT, Seq(VARCHAR),
- new NotCallGen(new MethodCallGen(BuiltInMethod.IS_JSON_OBJECT.method,
argsNullable = true)))
+ new NotCallGen(new MethodCallGen(BuiltInMethods.IS_JSON_OBJECT,
argsNullable = true)))
addSqlFunction(IS_NOT_JSON_ARRAY, Seq(CHAR),
- new NotCallGen(new MethodCallGen(BuiltInMethod.IS_JSON_ARRAY.method,
argsNullable = true)))
+ new NotCallGen(new MethodCallGen(BuiltInMethods.IS_JSON_ARRAY,
argsNullable = true)))
addSqlFunction(IS_NOT_JSON_ARRAY, Seq(VARCHAR),
- new NotCallGen(new MethodCallGen(BuiltInMethod.IS_JSON_ARRAY.method,
argsNullable = true)))
+ new NotCallGen(new MethodCallGen(BuiltInMethods.IS_JSON_ARRAY,
argsNullable = true)))
addSqlFunction(IS_NOT_JSON_SCALAR, Seq(CHAR),
- new NotCallGen(new MethodCallGen(BuiltInMethod.IS_JSON_SCALAR.method,
argsNullable = true)))
+ new NotCallGen(new MethodCallGen(BuiltInMethods.IS_JSON_SCALAR,
argsNullable = true)))
addSqlFunction(IS_NOT_JSON_SCALAR, Seq(VARCHAR),
- new NotCallGen(new MethodCallGen(BuiltInMethod.IS_JSON_SCALAR.method,
argsNullable = true)))
+ new NotCallGen(new MethodCallGen(BuiltInMethods.IS_JSON_SCALAR,
argsNullable = true)))
//
----------------------------------------------------------------------------------------------
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/JsonValueCallGen.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/JsonValueCallGen.scala
index 0b54a5c..376b46b 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/JsonValueCallGen.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/JsonValueCallGen.scala
@@ -17,7 +17,9 @@
*/
package org.apache.flink.table.planner.codegen.calls
+
import org.apache.calcite.sql.{SqlJsonEmptyOrError,
SqlJsonValueEmptyOrErrorBehavior}
+import org.apache.flink.table.api.JsonValueOnEmptyOrError
import org.apache.flink.table.planner.codegen.CodeGenUtils.{BINARY_STRING,
qualifyEnum, qualifyMethod}
import
org.apache.flink.table.planner.codegen.GenerateUtils.generateCallWithStmtIfArgsNotNull
import org.apache.flink.table.planner.codegen.{CodeGenException, CodeGenUtils,
CodeGeneratorContext, GeneratedExpression}
@@ -81,16 +83,16 @@ class JsonValueCallGen extends CallGenerator {
*/
private def getBehavior(
operands: Seq[GeneratedExpression],
- mode: SqlJsonEmptyOrError): (SqlJsonValueEmptyOrErrorBehavior, String) =
{
+ mode: SqlJsonEmptyOrError): (JsonValueOnEmptyOrError, String) = {
operands.indexWhere(expr => expr.literalValue.contains(mode)) match {
- case -1 => (SqlJsonValueEmptyOrErrorBehavior.NULL, null)
+ case -1 => (JsonValueOnEmptyOrError.NULL, null)
case modeIndex => operands(modeIndex - 1).literalValue.get match {
// Case for [NULL | ERROR] ON [EMPTY | ERROR]
- case behavior: SqlJsonValueEmptyOrErrorBehavior => (behavior, null)
+ case behavior: SqlJsonValueEmptyOrErrorBehavior =>
(convertCalciteEnum(behavior), null)
case _ => operands(modeIndex - 2).literalValue.get match {
// Case for DEFAULT <expr> ON [EMPTY | ERROR]
case behavior: SqlJsonValueEmptyOrErrorBehavior =>
- (behavior, operands(modeIndex - 1).resultTerm)
+ (convertCalciteEnum(behavior), operands(modeIndex - 1).resultTerm)
case _ =>
throw new CodeGenException("Invalid combination of arguments for
JSON_VALUE. "
+ "This is a bug. Please consider filing an issue.")
@@ -98,4 +100,11 @@ class JsonValueCallGen extends CallGenerator {
}
}
}
+
+ private def convertCalciteEnum(
+ behavior: SqlJsonValueEmptyOrErrorBehavior): JsonValueOnEmptyOrError =
behavior match {
+ case SqlJsonValueEmptyOrErrorBehavior.ERROR =>
JsonValueOnEmptyOrError.ERROR
+ case SqlJsonValueEmptyOrErrorBehavior.NULL => JsonValueOnEmptyOrError.NULL
+ case SqlJsonValueEmptyOrErrorBehavior.DEFAULT =>
JsonValueOnEmptyOrError.DEFAULT
+ }
}
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/LikeCallGen.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/LikeCallGen.scala
index 49b7dc2..cd8035d 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/LikeCallGen.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/LikeCallGen.scala
@@ -19,14 +19,12 @@
package org.apache.flink.table.planner.codegen.calls
import org.apache.flink.table.functions.SqlLikeUtils
-import org.apache.flink.table.planner.codegen.CodeGenUtils.{className, newName}
+import org.apache.flink.table.planner.codegen.CodeGenUtils.{className,
newName, qualifyMethod}
import
org.apache.flink.table.planner.codegen.GenerateUtils.generateCallIfArgsNotNull
import org.apache.flink.table.planner.codegen.{CodeGeneratorContext,
GeneratedExpression}
import org.apache.flink.table.runtime.functions.SqlLikeChainChecker
import org.apache.flink.table.types.logical.{BooleanType, LogicalType}
-import org.apache.calcite.runtime.SqlFunctions
-
import java.util.regex.Pattern
/**
@@ -161,12 +159,11 @@ class LikeCallGen extends CallGenerator {
terms =>
val str1 = s"${terms.head}.toString()"
val str2 = s"${terms(1)}.toString()"
- val clsName = className[SqlFunctions]
if (terms.length == 2) {
- s"$clsName.like($str1, $str2)"
+ s"${qualifyMethod(BuiltInMethods.STRING_LIKE)}($str1, $str2)"
} else {
val str3 = s"${terms(2)}.toString()"
- s"$clsName.like($str1, $str2, $str3)"
+ s"${qualifyMethod(BuiltInMethods.STRING_LIKE_WITH_ESCAPE)}($str1,
$str2, $str3)"
}
}
}
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/StringCallGen.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/StringCallGen.scala
index 7619486..9de99fc 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/StringCallGen.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/StringCallGen.scala
@@ -29,7 +29,6 @@ import
org.apache.flink.table.runtime.functions.SqlFunctionUtils
import
org.apache.flink.table.runtime.typeutils.TypeCheckUtils.{isCharacterString,
isTimestamp, isTimestampWithLocalZone}
import org.apache.flink.table.types.logical._
-import org.apache.calcite.runtime.SqlFunctions
import org.apache.calcite.sql.SqlOperator
import org.apache.calcite.sql.fun.SqlTrimFunction.Flag.{BOTH, LEADING,
TRAILING}
@@ -341,9 +340,8 @@ object StringCallGen {
def generateSimilarTo(
ctx: CodeGeneratorContext,
operands: Seq[GeneratedExpression]): GeneratedExpression = {
- val className = classOf[SqlFunctions].getCanonicalName
generateCallIfArgsNotNull(ctx, new BooleanType(), operands) {
- terms => s"$className.similar(${toStringTerms(terms, operands)})"
+ terms =>
s"${qualifyMethod(BuiltInMethods.STRING_SIMILAR)}(${toStringTerms(terms,
operands)})"
}
}
@@ -424,9 +422,8 @@ object StringCallGen {
ctx: CodeGeneratorContext,
operands: Seq[GeneratedExpression],
returnType: LogicalType): GeneratedExpression = {
- val className = classOf[SqlFunctions].getCanonicalName
generateStringResultCallIfArgsNotNull(ctx, operands, returnType) {
- terms => s"$className.initcap(${terms.head}.toString())"
+ terms =>
s"${qualifyMethod(BuiltInMethods.STRING_INITCAP)}(${terms.head}.toString())"
}
}
diff --git a/flink-table/flink-table-runtime/pom.xml
b/flink-table/flink-table-runtime/pom.xml
index a6a6f9a..0c85dcc 100644
--- a/flink-table/flink-table-runtime/pom.xml
+++ b/flink-table/flink-table-runtime/pom.xml
@@ -84,6 +84,14 @@ under the License.
<version>${janino.version}</version>
</dependency>
+ <!-- Jackson -->
+
+ <dependency>
+ <groupId>com.jayway.jsonpath</groupId>
+ <artifactId>json-path</artifactId>
+ <version>${jsonpath.version}</version>
+ </dependency>
+
<!-- test dependencies -->
<dependency>
@@ -131,6 +139,36 @@ under the License.
</execution>
</executions>
</plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>shade-flink</id>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <artifactSet>
+ <includes
combine.children="append">
+
<include>com.jayway.jsonpath:json-path</include>
+ </includes>
+ </artifactSet>
+ <relocations>
+ <relocation>
+
<pattern>com.jayway</pattern>
+
<shadedPattern>org.apache.flink.calcite.shaded.com.jayway</shadedPattern>
+ </relocation>
+ <relocation>
+
<pattern>com.fasterxml</pattern>
+
<shadedPattern>org.apache.flink.shaded.jackson2.com.fasterxml</shadedPattern>
+ </relocation>
+ </relocations>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
</plugins>
</build>
</project>
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/SqlFunctionUtils.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/SqlFunctionUtils.java
index c9016eb..06628ba 100644
---
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/SqlFunctionUtils.java
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/SqlFunctionUtils.java
@@ -47,6 +47,7 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern;
import static org.apache.flink.table.data.DecimalDataUtils.castFrom;
+import static org.apache.flink.table.data.DecimalDataUtils.castToIntegral;
import static org.apache.flink.table.data.DecimalDataUtils.doubleValue;
/**
@@ -140,6 +141,11 @@ public class SqlFunctionUtils {
return Math.tanh(doubleValue(a));
}
+ /** SQL <code>COT</code> operator applied to double values. */
+ public static double cot(double b0) {
+ return 1.0d / Math.tan(b0);
+ }
+
public static double cot(DecimalData a) {
return 1.0d / Math.tan(doubleValue(a));
}
@@ -152,14 +158,92 @@ public class SqlFunctionUtils {
return Math.toRadians(doubleValue(angdeg));
}
+ /** SQL <code>ABS</code> operator applied to byte values. */
+ public static byte abs(byte b0) {
+ return (byte) Math.abs(b0);
+ }
+
+ /** SQL <code>ABS</code> operator applied to short values. */
+ public static short abs(short b0) {
+ return (short) Math.abs(b0);
+ }
+
+ /** SQL <code>ABS</code> operator applied to int values. */
+ public static int abs(int b0) {
+ return Math.abs(b0);
+ }
+
+ /** SQL <code>ABS</code> operator applied to long values. */
+ public static long abs(long b0) {
+ return Math.abs(b0);
+ }
+
+ /** SQL <code>ABS</code> operator applied to float values. */
+ public static float abs(float b0) {
+ return Math.abs(b0);
+ }
+
+ /** SQL <code>ABS</code> operator applied to double values. */
+ public static double abs(double b0) {
+ return Math.abs(b0);
+ }
+
public static DecimalData abs(DecimalData a) {
return DecimalDataUtils.abs(a);
}
+ public static double floor(double b0) {
+ return Math.floor(b0);
+ }
+
+ public static float floor(float b0) {
+ return (float) Math.floor(b0);
+ }
+
+ /** SQL <code>FLOOR</code> operator applied to int values. */
+ public static int floor(int b0, int b1) {
+ int r = b0 % b1;
+ if (r < 0) {
+ r += b1;
+ }
+ return b0 - r;
+ }
+
+ /** SQL <code>FLOOR</code> operator applied to long values. */
+ public static long floor(long b0, long b1) {
+ long r = b0 % b1;
+ if (r < 0) {
+ r += b1;
+ }
+ return b0 - r;
+ }
+
public static DecimalData floor(DecimalData a) {
return DecimalDataUtils.floor(a);
}
+ public static double ceil(double b0) {
+ return Math.ceil(b0);
+ }
+
+ public static float ceil(float b0) {
+ return (float) Math.ceil(b0);
+ }
+
+ /** SQL <code>CEIL</code> operator applied to int values. */
+ public static int ceil(int b0, int b1) {
+ int r = b0 % b1;
+ if (r > 0) {
+ r -= b1;
+ }
+ return b0 - r;
+ }
+
+ /** SQL <code>CEIL</code> operator applied to long values. */
+ public static long ceil(long b0, long b1) {
+ return floor(b0 + b1 - 1, b1);
+ }
+
public static DecimalData ceil(DecimalData a) {
return DecimalDataUtils.ceil(a);
}
@@ -1103,6 +1187,24 @@ public class SqlFunctionUtils {
return UUID.nameUUIDFromBytes(b).toString();
}
+ /** SQL <code>TRUNCATE</code> operator applied to int values. */
+ public static int struncate(int b0) {
+ return struncate(b0, 0);
+ }
+
+ public static int struncate(int b0, int b1) {
+ return (int) struncate((long) b0, b1);
+ }
+
+ /** SQL <code>TRUNCATE</code> operator applied to long values. */
+ public static long struncate(long b0) {
+ return struncate(b0, 0);
+ }
+
+ public static long struncate(long b0, int b1) {
+ return castToIntegral(struncate(castFrom(b0, 38, 18), b1));
+ }
+
/** SQL <code>TRUNCATE</code> operator applied to BigDecimal values. */
public static DecimalData struncate(DecimalData b0) {
return struncate(b0, 0);
@@ -1137,6 +1239,15 @@ public class SqlFunctionUtils {
return (float) doubleValue(struncate(castFrom((double) b0, 38, 18),
b1));
}
+ /** SQL <code>TRUNCATE</code> operator applied to double values. */
+ public static double struncate(double b0) {
+ return struncate(b0, 0);
+ }
+
+ public static double struncate(double b0, int b1) {
+ return doubleValue(struncate(castFrom(b0, 38, 18), b1));
+ }
+
/**
* Compares two byte arrays in lexicographical order.
*
@@ -1160,4 +1271,41 @@ public class SqlFunctionUtils {
}
return array1.length - array2.length;
}
+
+ /** SQL INITCAP(string) function. */
+ public static String initcap(String s) {
+ // Assumes Alpha as [A-Za-z0-9]
+ // white space is treated as everything else.
+ final int len = s.length();
+ boolean start = true;
+ final StringBuilder newS = new StringBuilder();
+
+ for (int i = 0; i < len; i++) {
+ char curCh = s.charAt(i);
+ final int c = (int) curCh;
+ if (start) { // curCh is whitespace or first character of word.
+ if (c > 47 && c < 58) { // 0-9
+ start = false;
+ } else if (c > 64 && c < 91) { // A-Z
+ start = false;
+ } else if (c > 96 && c < 123) { // a-z
+ start = false;
+ curCh = (char) (c - 32); // Uppercase this character
+ }
+ // else {} whitespace
+ } else { // Inside of a word or white space after end of word.
+ if (c > 47 && c < 58) { // 0-9
+ // noop
+ } else if (c > 64 && c < 91) { // A-Z
+ curCh = (char) (c + 32); // Lowercase this character
+ } else if (c > 96 && c < 123) { // a-z
+ // noop
+ } else { // whitespace
+ start = true;
+ }
+ }
+ newS.append(curCh);
+ } // for each character in s
+ return newS.toString();
+ }
}
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/SqlJsonUtils.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/SqlJsonUtils.java
index 40641ee..7602a19 100644
---
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/SqlJsonUtils.java
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/SqlJsonUtils.java
@@ -19,8 +19,14 @@
package org.apache.flink.table.runtime.functions;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.JsonExistsOnError;
+import org.apache.flink.table.api.JsonQueryOnEmptyOrError;
+import org.apache.flink.table.api.JsonQueryWrapper;
+import org.apache.flink.table.api.JsonValueOnEmptyOrError;
import org.apache.flink.table.api.TableException;
+import org.apache.flink.util.FlinkRuntimeException;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonValue;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonFactory;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
@@ -30,6 +36,23 @@ import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.Arra
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.JsonNodeFactory;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
+import com.jayway.jsonpath.Configuration;
+import com.jayway.jsonpath.DocumentContext;
+import com.jayway.jsonpath.InvalidPathException;
+import com.jayway.jsonpath.JsonPath;
+import com.jayway.jsonpath.Option;
+import com.jayway.jsonpath.spi.json.JacksonJsonProvider;
+import com.jayway.jsonpath.spi.mapper.JacksonMappingProvider;
+import com.jayway.jsonpath.spi.mapper.MappingProvider;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Objects;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
/**
* Utilities for JSON functions.
*
@@ -42,6 +65,14 @@ public class SqlJsonUtils {
private static final ObjectMapper MAPPER =
new ObjectMapper(JSON_FACTORY)
.configure(SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS,
true);
+ private static final Pattern JSON_PATH_BASE =
+ Pattern.compile(
+ "^\\s*(?<mode>strict|lax)\\s+(?<spec>.+)$",
+ Pattern.CASE_INSENSITIVE | Pattern.DOTALL |
Pattern.MULTILINE);
+ private static final JacksonJsonProvider JSON_PATH_JSON_PROVIDER = new
JacksonJsonProvider();
+ private static final MappingProvider JSON_PATH_MAPPING_PROVIDER = new
JacksonMappingProvider();
+
+ private SqlJsonUtils() {}
/** Returns the {@link JsonNodeFactory} for creating nodes. */
public static JsonNodeFactory getNodeFactory() {
@@ -71,5 +102,489 @@ public class SqlJsonUtils {
}
}
- private SqlJsonUtils() {}
+ public static Boolean jsonExists(String input, String pathSpec) {
+ return jsonExists(jsonApiCommonSyntax(input, pathSpec),
JsonExistsOnError.FALSE);
+ }
+
+ public static Boolean jsonExists(
+ String input, String pathSpec, JsonExistsOnError errorBehavior) {
+ return jsonExists(jsonApiCommonSyntax(input, pathSpec), errorBehavior);
+ }
+
+ private static Boolean jsonExists(JsonPathContext context,
JsonExistsOnError errorBehavior) {
+ if (context.hasException()) {
+ switch (errorBehavior) {
+ case TRUE:
+ return Boolean.TRUE;
+ case FALSE:
+ return Boolean.FALSE;
+ case ERROR:
+ throw toUnchecked(context.exc);
+ case UNKNOWN:
+ return null;
+ default:
+ throw
illegalErrorBehaviorInJsonExistsFunc(errorBehavior.toString());
+ }
+ } else {
+ return context.obj != null;
+ }
+ }
+
+ public static Object jsonValue(
+ String input,
+ String pathSpec,
+ JsonValueOnEmptyOrError emptyBehavior,
+ Object defaultValueOnEmpty,
+ JsonValueOnEmptyOrError errorBehavior,
+ Object defaultValueOnError) {
+ return jsonValue(
+ jsonApiCommonSyntax(input, pathSpec),
+ emptyBehavior,
+ defaultValueOnEmpty,
+ errorBehavior,
+ defaultValueOnError);
+ }
+
+ private static Object jsonValue(
+ JsonPathContext context,
+ JsonValueOnEmptyOrError emptyBehavior,
+ Object defaultValueOnEmpty,
+ JsonValueOnEmptyOrError errorBehavior,
+ Object defaultValueOnError) {
+ final Exception exc;
+ if (context.hasException()) {
+ exc = context.exc;
+ } else {
+ Object value = context.obj;
+ if (value == null || context.mode == PathMode.LAX &&
!isScalarObject(value)) {
+ switch (emptyBehavior) {
+ case ERROR:
+ throw emptyResultOfJsonValueFuncNotAllowed();
+ case NULL:
+ return null;
+ case DEFAULT:
+ return defaultValueOnEmpty;
+ default:
+ throw
illegalEmptyBehaviorInJsonValueFunc(emptyBehavior.toString());
+ }
+ } else if (context.mode == PathMode.STRICT &&
!isScalarObject(value)) {
+ exc =
scalarValueRequiredInStrictModeOfJsonValueFunc(value.toString());
+ } else {
+ return value;
+ }
+ }
+ switch (errorBehavior) {
+ case ERROR:
+ throw toUnchecked(exc);
+ case NULL:
+ return null;
+ case DEFAULT:
+ return defaultValueOnError;
+ default:
+ throw
illegalErrorBehaviorInJsonValueFunc(errorBehavior.toString());
+ }
+ }
+
+ public static String jsonQuery(
+ String input,
+ String pathSpec,
+ JsonQueryWrapper wrapperBehavior,
+ JsonQueryOnEmptyOrError emptyBehavior,
+ JsonQueryOnEmptyOrError errorBehavior) {
+ return jsonQuery(
+ jsonApiCommonSyntax(input, pathSpec),
+ wrapperBehavior,
+ emptyBehavior,
+ errorBehavior);
+ }
+
+ private static String jsonQuery(
+ JsonPathContext context,
+ JsonQueryWrapper wrapperBehavior,
+ JsonQueryOnEmptyOrError emptyBehavior,
+ JsonQueryOnEmptyOrError errorBehavior) {
+ final Exception exc;
+ if (context.hasException()) {
+ exc = context.exc;
+ } else {
+ Object value;
+ if (context.obj == null) {
+ value = null;
+ } else {
+ switch (wrapperBehavior) {
+ case WITHOUT_ARRAY:
+ value = context.obj;
+ break;
+ case UNCONDITIONAL_ARRAY:
+ value = Collections.singletonList(context.obj);
+ break;
+ case CONDITIONAL_ARRAY:
+ if (context.obj instanceof Collection) {
+ value = context.obj;
+ } else {
+ value = Collections.singletonList(context.obj);
+ }
+ break;
+ default:
+ throw
illegalWrapperBehaviorInJsonQueryFunc(wrapperBehavior.toString());
+ }
+ }
+ if (value == null || context.mode == PathMode.LAX &&
isScalarObject(value)) {
+ switch (emptyBehavior) {
+ case ERROR:
+ throw emptyResultOfJsonQueryFuncNotAllowed();
+ case NULL:
+ return null;
+ case EMPTY_ARRAY:
+ return "[]";
+ case EMPTY_OBJECT:
+ return "{}";
+ default:
+ throw
illegalEmptyBehaviorInJsonQueryFunc(emptyBehavior.toString());
+ }
+ } else if (context.mode == PathMode.STRICT &&
isScalarObject(value)) {
+ exc =
arrayOrObjectValueRequiredInStrictModeOfJsonQueryFunc(value.toString());
+ } else {
+ try {
+ return jsonize(value);
+ } catch (Exception e) {
+ exc = e;
+ }
+ }
+ }
+ switch (errorBehavior) {
+ case ERROR:
+ throw toUnchecked(exc);
+ case NULL:
+ return null;
+ case EMPTY_ARRAY:
+ return "[]";
+ case EMPTY_OBJECT:
+ return "{}";
+ default:
+ throw
illegalErrorBehaviorInJsonQueryFunc(errorBehavior.toString());
+ }
+ }
+
+ public static boolean isJsonValue(String input) {
+ try {
+ dejsonize(input);
+ return true;
+ } catch (Exception e) {
+ return false;
+ }
+ }
+
+ public static boolean isJsonObject(String input) {
+ try {
+ Object o = dejsonize(input);
+ return o instanceof Map;
+ } catch (Exception e) {
+ return false;
+ }
+ }
+
+ public static boolean isJsonArray(String input) {
+ try {
+ Object o = dejsonize(input);
+ return o instanceof Collection;
+ } catch (Exception e) {
+ return false;
+ }
+ }
+
+ public static boolean isJsonScalar(String input) {
+ try {
+ Object o = dejsonize(input);
+ return !(o instanceof Map) && !(o instanceof Collection);
+ } catch (Exception e) {
+ return false;
+ }
+ }
+
+ private static boolean isScalarObject(Object obj) {
+ if (obj instanceof Collection) {
+ return false;
+ }
+ if (obj instanceof Map) {
+ return false;
+ }
+ return true;
+ }
+
+ private static String jsonize(Object input) {
+ return JSON_PATH_JSON_PROVIDER.toJson(input);
+ }
+
+ private static Object dejsonize(String input) {
+ return JSON_PATH_JSON_PROVIDER.parse(input);
+ }
+
+ private static JsonValueContext jsonValueExpression(String input) {
+ try {
+ return JsonValueContext.withJavaObj(dejsonize(input));
+ } catch (Exception e) {
+ return JsonValueContext.withException(e);
+ }
+ }
+
+ private static JsonPathContext jsonApiCommonSyntax(String input, String
pathSpec) {
+ return jsonApiCommonSyntax(jsonValueExpression(input), pathSpec);
+ }
+
+ private static JsonPathContext jsonApiCommonSyntax(JsonValueContext input,
String pathSpec) {
+ PathMode mode;
+ String pathStr;
+ try {
+ Matcher matcher = JSON_PATH_BASE.matcher(pathSpec);
+ if (!matcher.matches()) {
+ mode = PathMode.STRICT;
+ pathStr = pathSpec;
+ } else {
+ mode =
PathMode.valueOf(matcher.group(1).toUpperCase(Locale.ROOT));
+ pathStr = matcher.group(2);
+ }
+ DocumentContext ctx;
+ switch (mode) {
+ case STRICT:
+ if (input.hasException()) {
+ return JsonPathContext.withStrictException(pathSpec,
input.exc);
+ }
+ ctx =
+ JsonPath.parse(
+ input.obj,
+ Configuration.builder()
+
.jsonProvider(JSON_PATH_JSON_PROVIDER)
+
.mappingProvider(JSON_PATH_MAPPING_PROVIDER)
+ .build());
+ break;
+ case LAX:
+ if (input.hasException()) {
+ return JsonPathContext.withJavaObj(PathMode.LAX, null);
+ }
+ ctx =
+ JsonPath.parse(
+ input.obj,
+ Configuration.builder()
+
.options(Option.SUPPRESS_EXCEPTIONS)
+
.jsonProvider(JSON_PATH_JSON_PROVIDER)
+
.mappingProvider(JSON_PATH_MAPPING_PROVIDER)
+ .build());
+ break;
+ default:
+ throw illegalJsonPathModeInPathSpec(mode.toString(),
pathSpec);
+ }
+ try {
+ return JsonPathContext.withJavaObj(mode, ctx.read(pathStr));
+ } catch (Exception e) {
+ return JsonPathContext.withStrictException(pathSpec, e);
+ }
+ } catch (Exception e) {
+ return JsonPathContext.withUnknownException(e);
+ }
+ }
+
+ private static RuntimeException toUnchecked(Exception e) {
+ if (e instanceof RuntimeException) {
+ return (RuntimeException) e;
+ }
+ return new RuntimeException(e);
+ }
+
+ private static RuntimeException illegalJsonPathModeInPathSpec(
+ String pathMode, String pathSpec) {
+ return new FlinkRuntimeException(
+ String.format(
+ "Illegal jsonpath mode ''%s'' in jsonpath spec:
''%s''",
+ pathMode, pathSpec));
+ }
+
+ private static RuntimeException illegalJsonPathMode(String pathMode) {
+ return new FlinkRuntimeException(String.format("Illegal jsonpath mode
''%s''", pathMode));
+ }
+
+ private static RuntimeException illegalJsonPathSpec(String pathSpec) {
+ return new FlinkRuntimeException(
+ String.format(
+ "Illegal jsonpath spec ''%s'', format of the spec
should be: ''<lax|strict> $'{'expr'}'''",
+ pathSpec));
+ }
+
+ private static RuntimeException strictPathModeRequiresNonEmptyValue() {
+ return new FlinkRuntimeException(
+ "Strict jsonpath mode requires a non empty returned value, but
is null");
+ }
+
+ private static RuntimeException
illegalErrorBehaviorInJsonExistsFunc(String errorBehavior) {
+ return new FlinkRuntimeException(
+ String.format(
+ "Illegal error behavior ''{0}'' specified in
JSON_EXISTS function",
+ errorBehavior));
+ }
+
+ private static RuntimeException emptyResultOfJsonValueFuncNotAllowed() {
+ return new FlinkRuntimeException("Empty result of JSON_VALUE function
is not allowed");
+ }
+
+ private static RuntimeException illegalEmptyBehaviorInJsonValueFunc(String
emptyBehavior) {
+ return new FlinkRuntimeException(
+ String.format(
+ "Illegal empty behavior ''{0}'' specified in
JSON_VALUE function",
+ emptyBehavior));
+ }
+
+ private static RuntimeException illegalErrorBehaviorInJsonValueFunc(String
errorBehavior) {
+ return new FlinkRuntimeException(
+ String.format(
+ "Illegal error behavior ''%s'' specified in JSON_VALUE
function",
+ errorBehavior));
+ }
+
+ private static RuntimeException
scalarValueRequiredInStrictModeOfJsonValueFunc(String value) {
+ return new FlinkRuntimeException(
+ String.format(
+ "Strict jsonpath mode requires scalar value, and the
actual value is: ''%s''",
+ value));
+ }
+
+ private static RuntimeException
illegalWrapperBehaviorInJsonQueryFunc(String wrapperBehavior) {
+ return new FlinkRuntimeException(
+ String.format(
+ "Illegal wrapper behavior ''%s'' specified in
JSON_QUERY function",
+ wrapperBehavior));
+ }
+
+ private static RuntimeException emptyResultOfJsonQueryFuncNotAllowed() {
+ return new FlinkRuntimeException("Empty result of JSON_QUERY function
is not allowed");
+ }
+
+ private static RuntimeException illegalEmptyBehaviorInJsonQueryFunc(String
emptyBehavior) {
+ return new FlinkRuntimeException(
+ String.format(
+ "Illegal empty behavior ''%s'' specified in JSON_VALUE
function",
+ emptyBehavior));
+ }
+
+ private static RuntimeException
arrayOrObjectValueRequiredInStrictModeOfJsonQueryFunc(
+ String value) {
+ return new FlinkRuntimeException(
+ String.format(
+ "Strict jsonpath mode requires array or object value,
and the actual value is: ''%s''",
+ value));
+ }
+
+ private static RuntimeException illegalErrorBehaviorInJsonQueryFunc(String
errorBehavior) {
+ return new FlinkRuntimeException(
+ String.format(
+ "Illegal error behavior ''%s'' specified in JSON_VALUE
function",
+ errorBehavior));
+ }
+
+ /**
+ * Path spec has two different modes: lax mode and strict mode. Lax mode
suppresses any thrown
+ * exception and returns null, whereas strict mode throws exceptions.
+ */
+ public enum PathMode {
+ LAX,
+ STRICT,
+ UNKNOWN,
+ NONE
+ }
+
+ /** Returned path context of JsonApiCommonSyntax, public for testing. */
+ private static class JsonPathContext {
+ public final PathMode mode;
+ public final Object obj;
+ public final Exception exc;
+
+ private JsonPathContext(Object obj, Exception exc) {
+ this(PathMode.NONE, obj, exc);
+ }
+
+ private JsonPathContext(PathMode mode, Object obj, Exception exc) {
+ assert obj == null || exc == null;
+ this.mode = mode;
+ this.obj = obj;
+ this.exc = exc;
+ }
+
+ public boolean hasException() {
+ return exc != null;
+ }
+
+ public static JsonPathContext withUnknownException(Exception exc) {
+ return new JsonPathContext(PathMode.UNKNOWN, null, exc);
+ }
+
+ public static JsonPathContext withStrictException(Exception exc) {
+ return new JsonPathContext(PathMode.STRICT, null, exc);
+ }
+
+ public static JsonPathContext withStrictException(String pathSpec,
Exception exc) {
+ if (exc.getClass() == InvalidPathException.class) {
+ exc = illegalJsonPathSpec(pathSpec);
+ }
+ return withStrictException(exc);
+ }
+
+ public static JsonPathContext withJavaObj(PathMode mode, Object obj) {
+ if (mode == PathMode.UNKNOWN) {
+ throw illegalJsonPathMode(mode.toString());
+ }
+ if (mode == PathMode.STRICT && obj == null) {
+ throw strictPathModeRequiresNonEmptyValue();
+ }
+ return new JsonPathContext(mode, obj, null);
+ }
+
+ @Override
+ public String toString() {
+ return "JsonPathContext{" + "mode=" + mode + ", obj=" + obj + ",
exc=" + exc + '}';
+ }
+ }
+
+ private static class JsonValueContext {
+ @JsonValue public final Object obj;
+ public final Exception exc;
+
+ private JsonValueContext(Object obj, Exception exc) {
+ assert obj == null || exc == null;
+ this.obj = obj;
+ this.exc = exc;
+ }
+
+ public static JsonValueContext withJavaObj(Object obj) {
+ return new JsonValueContext(obj, null);
+ }
+
+ public static JsonValueContext withException(Exception exc) {
+ return new JsonValueContext(null, exc);
+ }
+
+ public boolean hasException() {
+ return exc != null;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ JsonValueContext jsonValueContext = (JsonValueContext) o;
+ return Objects.equals(obj, jsonValueContext.obj);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(obj);
+ }
+
+ @Override
+ public String toString() {
+ return Objects.toString(obj);
+ }
+ }
}
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedWatermarkGeneratorSupplier.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedWatermarkGeneratorSupplier.java
new file mode 100644
index 0000000..583b3c2
--- /dev/null
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedWatermarkGeneratorSupplier.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.generated;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.eventtime.Watermark;
+import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
+import org.apache.flink.api.common.eventtime.WatermarkOutput;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.data.RowData;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Wrapper of the {@link GeneratedWatermarkGenerator} that is used to create
{@link
+ * org.apache.flink.api.common.eventtime.WatermarkGenerator}. The {@link
+ * GeneratedWatermarkGeneratorSupplier} uses the {@link Context} to init the
generated watermark
+ * generator.
+ */
+@Internal
+public class GeneratedWatermarkGeneratorSupplier implements
WatermarkGeneratorSupplier<RowData> {
+ private static final long serialVersionUID = 1L;
+
+ private final Configuration configuration;
+ private final GeneratedWatermarkGenerator generatedWatermarkGenerator;
+
+ public GeneratedWatermarkGeneratorSupplier(
+ Configuration configuration, GeneratedWatermarkGenerator
generatedWatermarkGenerator) {
+ this.configuration = configuration;
+ this.generatedWatermarkGenerator = generatedWatermarkGenerator;
+ }
+
+ @Override
+ public org.apache.flink.api.common.eventtime.WatermarkGenerator<RowData>
+ createWatermarkGenerator(Context context) {
+
+ List<Object> references =
+ new
ArrayList<>(Arrays.asList(generatedWatermarkGenerator.getReferences()));
+ references.add(context);
+
+ WatermarkGenerator innerWatermarkGenerator =
+ new GeneratedWatermarkGenerator(
+ generatedWatermarkGenerator.getClassName(),
+ generatedWatermarkGenerator.getCode(),
+ references.toArray(),
+ configuration)
+
.newInstance(Thread.currentThread().getContextClassLoader());
+
+ try {
+ innerWatermarkGenerator.open(configuration);
+ } catch (Exception e) {
+ throw new RuntimeException("Fail to instantiate generated
watermark generator.", e);
+ }
+ return new
GeneratedWatermarkGeneratorSupplier.DefaultWatermarkGenerator(
+ innerWatermarkGenerator);
+ }
+
+ /** Wrapper of the code-generated {@link WatermarkGenerator}. */
+ public static class DefaultWatermarkGenerator
+ implements
org.apache.flink.api.common.eventtime.WatermarkGenerator<RowData> {
+ private static final long serialVersionUID = 1L;
+
+ private final WatermarkGenerator innerWatermarkGenerator;
+ private Long currentWatermark = Long.MIN_VALUE;
+
+ public DefaultWatermarkGenerator(WatermarkGenerator
watermarkGenerator) {
+ this.innerWatermarkGenerator = watermarkGenerator;
+ }
+
+ @Override
+ public void onEvent(RowData event, long eventTimestamp,
WatermarkOutput output) {
+ try {
+ Long watermark =
innerWatermarkGenerator.currentWatermark(event);
+ if (watermark != null) {
+ currentWatermark = watermark;
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(
+ String.format(
+ "Generated WatermarkGenerator fails to
generate for row: %s.",
+ event),
+ e);
+ }
+ }
+
+ @Override
+ public void onPeriodicEmit(WatermarkOutput output) {
+ output.emitWatermark(new Watermark(currentWatermark));
+ }
+ }
+}
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/interval/FilterAllFlatMapFunction.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/interval/FilterAllFlatMapFunction.java
new file mode 100644
index 0000000..a379b2b
--- /dev/null
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/interval/FilterAllFlatMapFunction.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.join.interval;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.util.Collector;
+
+/** Function filtering out all the input records. */
+@Internal
+public class FilterAllFlatMapFunction
+ implements FlatMapFunction<RowData, RowData>,
ResultTypeQueryable<RowData> {
+ private static final long serialVersionUID = 1L;
+
+ private final InternalTypeInfo<RowData> outputTypeInfo;
+
+ public FilterAllFlatMapFunction(InternalTypeInfo<RowData> inputTypeInfo) {
+ this.outputTypeInfo = inputTypeInfo;
+ }
+
+ @Override
+ public void flatMap(RowData value, Collector<RowData> out) {}
+
+ @Override
+ public TypeInformation<RowData> getProducedType() {
+ return outputTypeInfo;
+ }
+}
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/interval/PaddingLeftMapFunction.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/interval/PaddingLeftMapFunction.java
new file mode 100644
index 0000000..ecd84cb
--- /dev/null
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/interval/PaddingLeftMapFunction.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.join.interval;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.operators.join.OuterJoinPaddingUtil;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+
+/** Function performing left padding. */
+@Internal
+public class PaddingLeftMapFunction
+ implements MapFunction<RowData, RowData>, ResultTypeQueryable<RowData>
{
+ private static final long serialVersionUID = 1L;
+
+ private final OuterJoinPaddingUtil paddingUtil;
+ private final InternalTypeInfo<RowData> outputTypeInfo;
+
+ public PaddingLeftMapFunction(
+ OuterJoinPaddingUtil paddingUtil, InternalTypeInfo<RowData>
returnType) {
+ this.paddingUtil = paddingUtil;
+ this.outputTypeInfo = returnType;
+ }
+
+ @Override
+ public RowData map(RowData value) {
+ return paddingUtil.padLeft(value);
+ }
+
+ @Override
+ public TypeInformation<RowData> getProducedType() {
+ return outputTypeInfo;
+ }
+}
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/interval/PaddingRightMapFunction.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/interval/PaddingRightMapFunction.java
new file mode 100644
index 0000000..5cb02dd
--- /dev/null
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/interval/PaddingRightMapFunction.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.join.interval;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.operators.join.OuterJoinPaddingUtil;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+
+/** Function performing right padding. */
+@Internal
+public class PaddingRightMapFunction
+ implements MapFunction<RowData, RowData>, ResultTypeQueryable<RowData>
{
+ private static final long serialVersionUID = 1L;
+
+ private final OuterJoinPaddingUtil paddingUtil;
+ private final InternalTypeInfo<RowData> outputTypeInfo;
+
+ public PaddingRightMapFunction(
+ OuterJoinPaddingUtil paddingUtil, InternalTypeInfo<RowData>
returnType) {
+ this.paddingUtil = paddingUtil;
+ this.outputTypeInfo = returnType;
+ }
+
+ @Override
+ public RowData map(RowData value) {
+ return paddingUtil.padRight(value);
+ }
+
+ @Override
+ public TypeInformation<RowData> getProducedType() {
+ return outputTypeInfo;
+ }
+}
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/PeriodicWatermarkAssignerWrapper.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/PeriodicWatermarkAssignerWrapper.java
new file mode 100644
index 0000000..3ed9556
--- /dev/null
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/PeriodicWatermarkAssignerWrapper.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.wmassigners;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.sources.wmstrategies.PeriodicWatermarkAssigner;
+
+import javax.annotation.Nullable;
+
+/** Generates periodic watermarks based on a {@link
PeriodicWatermarkAssigner}. */
+@Internal
+public class PeriodicWatermarkAssignerWrapper implements
AssignerWithPeriodicWatermarks<RowData> {
+ private static final long serialVersionUID = 1L;
+ private final PeriodicWatermarkAssigner assigner;
+ private final int timeFieldIdx;
+
+ /**
+ * @param timeFieldIdx the index of the rowtime attribute.
+ * @param assigner the watermark assigner.
+ */
+ public PeriodicWatermarkAssignerWrapper(PeriodicWatermarkAssigner
assigner, int timeFieldIdx) {
+ this.assigner = assigner;
+ this.timeFieldIdx = timeFieldIdx;
+ }
+
+ @Nullable
+ @Override
+ public Watermark getCurrentWatermark() {
+ return assigner.getWatermark();
+ }
+
+ @Override
+ public long extractTimestamp(RowData row, long recordTimestamp) {
+ long timestamp = row.getTimestamp(timeFieldIdx, 3).getMillisecond();
+ assigner.nextTimestamp(timestamp);
+ return 0;
+ }
+}
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/PunctuatedWatermarkAssignerWrapper.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/PunctuatedWatermarkAssignerWrapper.java
new file mode 100644
index 0000000..a3f99e2
--- /dev/null
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/PunctuatedWatermarkAssignerWrapper.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.wmassigners;
+
+import org.apache.flink.annotation.Internal;
+import
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.util.DataFormatConverters;
+import org.apache.flink.table.sources.wmstrategies.PunctuatedWatermarkAssigner;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.FieldsDataType;
+import org.apache.flink.types.Row;
+
+import javax.annotation.Nullable;
+
+/** Generates periodic watermarks based on a {@link
PunctuatedWatermarkAssigner}. */
+@Internal
+public class PunctuatedWatermarkAssignerWrapper
+ implements AssignerWithPunctuatedWatermarks<RowData> {
+ private static final long serialVersionUID = 1L;
+ private final PunctuatedWatermarkAssigner assigner;
+ private final int timeFieldIdx;
+ private final DataFormatConverters.DataFormatConverter<RowData, Row>
converter;
+
+ /**
+ * @param timeFieldIdx the index of the rowtime attribute.
+ * @param assigner the watermark assigner.
+ * @param sourceType the type of source
+ */
+ @SuppressWarnings("unchecked")
+ public PunctuatedWatermarkAssignerWrapper(
+ PunctuatedWatermarkAssigner assigner, int timeFieldIdx, DataType
sourceType) {
+ this.assigner = assigner;
+ this.timeFieldIdx = timeFieldIdx;
+ DataType originDataType;
+ if (sourceType instanceof FieldsDataType) {
+ originDataType = sourceType;
+ } else {
+ originDataType = DataTypes.ROW(DataTypes.FIELD("f0", sourceType));
+ }
+ converter =
+
DataFormatConverters.getConverterForDataType(originDataType.bridgedTo(Row.class));
+ }
+
+ @Nullable
+ @Override
+ public Watermark checkAndGetNextWatermark(RowData row, long
extractedTimestamp) {
+ long timestamp = row.getLong(timeFieldIdx);
+ return assigner.getWatermark(converter.toExternal(row), timestamp);
+ }
+
+ @Override
+ public long extractTimestamp(RowData element, long recordTimestamp) {
+ return 0;
+ }
+}
diff --git a/flink-table/flink-table-runtime/src/main/resources/META-INF/NOTICE
b/flink-table/flink-table-runtime/src/main/resources/META-INF/NOTICE
new file mode 100644
index 0000000..33fe52a
--- /dev/null
+++ b/flink-table/flink-table-runtime/src/main/resources/META-INF/NOTICE
@@ -0,0 +1,9 @@
+flink-table-runtime
+Copyright 2014-2021 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+
+This project bundles the following dependencies under the Apache Software
License 2.0. (http://www.apache.org/licenses/LICENSE-2.0.txt)
+
+- com.jayway.jsonpath:json-path:2.6.0
diff --git a/flink-table/pom.xml b/flink-table/pom.xml
index 63f6371..850931e 100644
--- a/flink-table/pom.xml
+++ b/flink-table/pom.xml
@@ -72,9 +72,10 @@ under the License.
</dependencyManagement>
<properties>
- <!-- When updating Janino, make sure that Calcite supports it
as well. -->
- <janino.version>3.0.11</janino.version>
<calcite.version>1.26.0</calcite.version>
+ <!-- Keep Janino in sync with calcite. -->
+ <janino.version>3.0.11</janino.version>
+ <jsonpath.version>2.6.0</jsonpath.version>
<guava.version>29.0-jre</guava.version>
</properties>
</project>