This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new dd68edd [FLINK-11785][table-api] Replace case class Null(type) by
nullOf(type) expression
dd68edd is described below
commit dd68edd8e6519013d2ab3bd2d4d815f1997ac0fe
Author: Timo Walther <[email protected]>
AuthorDate: Thu Feb 28 15:33:09 2019 +0100
[FLINK-11785][table-api] Replace case class Null(type) by nullOf(type)
expression
This introduces `nullOf(type)` for representing typed nulls in Table API.
It allows
to uncouple API from expression case classes and enables us to have
`nullOf(type)` and
`null` in the future, once we introduced a NULL type and proper type
inference.
Furthermore, it also integrates better in existing expressions that all
start
with lower case characters.
This closes #7864.
---
docs/dev/table/tableApi.md | 18 ++++----
.../flink/table/api/scala/expressionDsl.scala | 17 +++++++
.../flink/table/expressions/ExpressionParser.scala | 3 +-
.../apache/flink/table/expressions/literals.scala | 3 ++
.../table/api/batch/table/SetOperatorsTest.scala | 3 +-
.../flink/table/api/stream/sql/JoinTest.scala | 3 +-
.../flink/table/expressions/ArrayTypeTest.scala | 8 ++--
.../flink/table/expressions/MapTypeTest.scala | 4 +-
.../flink/table/expressions/RowTypeTest.scala | 4 +-
.../table/expressions/ScalarFunctionsTest.scala | 52 +++++++++++-----------
.../table/expressions/ScalarOperatorsTest.scala | 34 +++++++-------
.../UserDefinedScalarFunctionTest.scala | 20 ++++-----
.../table/runtime/stream/sql/JoinITCase.scala | 11 +++--
.../runtime/stream/table/AggregateITCase.scala | 5 +--
.../table/runtime/stream/table/JoinITCase.scala | 18 ++++----
15 files changed, 111 insertions(+), 92 deletions(-)
diff --git a/docs/dev/table/tableApi.md b/docs/dev/table/tableApi.md
index 08a5a8a..a509d85 100644
--- a/docs/dev/table/tableApi.md
+++ b/docs/dev/table/tableApi.md
@@ -1780,7 +1780,7 @@ atom = ( "(" , expression , ")" ) | literal |
fieldReference ;
fieldReference = "*" | identifier ;
-nullLiteral = "Null(" , dataType , ")" ;
+nullLiteral = "nullOf(" , dataType , ")" ;
timeIntervalUnit = "YEAR" | "YEAR_TO_MONTH" | "MONTH" | "QUARTER" | "WEEK" |
"DAY" | "DAY_TO_HOUR" | "DAY_TO_MINUTE" | "DAY_TO_SECOND" | "HOUR" |
"HOUR_TO_MINUTE" | "HOUR_TO_SECOND" | "MINUTE" | "MINUTE_TO_SECOND" | "SECOND" ;
@@ -1794,18 +1794,20 @@ timeIndicator = fieldReference , "." , ( "proctime" |
"rowtime" ) ;
{% endhighlight %}
-Here, `literal` is a valid Java literal. String literals can be specified
using single or double quotes. Duplicate the quote for escaping (e.g. `'It''s
me.'` or `"I ""like"" dogs."`).
+**Literals:** Here, `literal` is a valid Java literal. String literals can be
specified using single or double quotes. Duplicate the quote for escaping (e.g.
`'It''s me.'` or `"I ""like"" dogs."`).
-The `fieldReference` specifies a column in the data (or all columns if `*` is
used), and `functionIdentifier` specifies a supported scalar function. The
column names and function names follow Java identifier syntax.
+**Null literals:** Null literals must have a type attached. Use `nullOf(type)`
(e.g. `nullOf(INT)`) for creating a null value.
-Expressions specified as strings can also use prefix notation instead of
suffix notation to call operators and functions.
+**Field references:** The `fieldReference` specifies a column in the data (or
all columns if `*` is used), and `functionIdentifier` specifies a supported
scalar function. The column names and function names follow Java identifier
syntax.
-If working with exact numeric values or large decimals is required, the Table
API also supports Java's BigDecimal type. In the Scala Table API decimals can
be defined by `BigDecimal("123456")` and in Java by appending a "p" for precise
e.g. `123456p`.
+**Function calls:** Expressions specified as strings can also use prefix
notation instead of suffix notation to call operators and functions.
-In order to work with temporal values the Table API supports Java SQL's Date,
Time, and Timestamp types. In the Scala Table API literals can be defined by
using `java.sql.Date.valueOf("2016-06-27")`,
`java.sql.Time.valueOf("10:10:42")`, or `java.sql.Timestamp.valueOf("2016-06-27
10:10:42.123")`. The Java and Scala Table API also support calling
`"2016-06-27".toDate()`, `"10:10:42".toTime()`, and `"2016-06-27
10:10:42.123".toTimestamp()` for converting Strings into temporal types.
*Note:* [...]
+**Decimals:** If working with exact numeric values or large decimals is
required, the Table API also supports Java's BigDecimal type. In the Scala
Table API decimals can be defined by `BigDecimal("123456")` and in Java by
appending a "p" for precise e.g. `123456p`.
-Temporal intervals can be represented as number of months
(`Types.INTERVAL_MONTHS`) or number of milliseconds (`Types.INTERVAL_MILLIS`).
Intervals of same type can be added or subtracted (e.g. `1.hour + 10.minutes`).
Intervals of milliseconds can be added to time points (e.g.
`"2016-08-10".toDate + 5.days`).
+**Time representation:** In order to work with temporal values the Table API
supports Java SQL's Date, Time, and Timestamp types. In the Scala Table API
literals can be defined by using `java.sql.Date.valueOf("2016-06-27")`,
`java.sql.Time.valueOf("10:10:42")`, or `java.sql.Timestamp.valueOf("2016-06-27
10:10:42.123")`. The Java and Scala Table API also support calling
`"2016-06-27".toDate()`, `"10:10:42".toTime()`, and `"2016-06-27
10:10:42.123".toTimestamp()` for converting Strings int [...]
-Scala expressions use implicit conversions. Therefore, make sure to add the
wildcard import `org.apache.flink.table.api.scala._` to your programs. In case
a literal is not treated as an expression, use `.toExpr` such as `3.toExpr` to
force a literal to be converted.
+**Temporal intervals:** Temporal intervals can be represented as number of
months (`Types.INTERVAL_MONTHS`) or number of milliseconds
(`Types.INTERVAL_MILLIS`). Intervals of same type can be added or subtracted
(e.g. `1.hour + 10.minutes`). Intervals of milliseconds can be added to time
points (e.g. `"2016-08-10".toDate + 5.days`).
+
+**Scala expressions:** Scala expressions use implicit conversions. Therefore,
make sure to add the wildcard import `org.apache.flink.table.api.scala._` to
your programs. In case a literal is not treated as an expression, use `.toExpr`
such as `3.toExpr` to force a literal to be converted.
{% top %}
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
index d101717..5e6549d 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
@@ -1414,4 +1414,21 @@ object uuid {
}
}
+/**
+ * Returns a null literal value of a given type.
+ *
+ * e.g. nullOf(Types.INT)
+ */
+object nullOf {
+
+ /**
+ * Returns a null literal value of a given type.
+ *
+ * e.g. nullOf(Types.INT)
+ */
+ def apply(typeInfo: TypeInformation[_]): Expression = {
+ Null(typeInfo)
+ }
+}
+
// scalastyle:on object.name
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala
index b45ce80..b502827 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala
@@ -53,6 +53,7 @@ object ExpressionParser extends JavaTokenParsers with
PackratParsers {
lazy val ASC: Keyword = Keyword("asc")
lazy val DESC: Keyword = Keyword("desc")
lazy val NULL: Keyword = Keyword("Null")
+ lazy val NULL_OF: Keyword = Keyword("nullOf")
lazy val IF: Keyword = Keyword("?")
lazy val TO_DATE: Keyword = Keyword("toDate")
lazy val TO_TIME: Keyword = Keyword("toTime")
@@ -207,7 +208,7 @@ object ExpressionParser extends JavaTokenParsers with
PackratParsers {
str => Literal(str.toBoolean)
}
- lazy val nullLiteral: PackratParser[Expression] = NULL ~ "(" ~> dataType <~
")" ^^ {
+ lazy val nullLiteral: PackratParser[Expression] = (NULL | NULL_OF) ~ "(" ~>
dataType <~ ")" ^^ {
dt => Null(dt)
}
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/literals.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/literals.scala
index 24bae90..bf39d99 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/literals.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/literals.scala
@@ -122,6 +122,9 @@ case class Literal(value: Any, resultType:
TypeInformation[_]) extends LeafExpre
}
}
+@deprecated(
+ "Use nullOf(TypeInformation) instead. It is available through the implicit
Scala DSL.",
+ "1.8.0")
case class Null(resultType: TypeInformation[_]) extends LeafExpression {
override def toString = s"null"
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/SetOperatorsTest.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/SetOperatorsTest.scala
index 9226200..f0f1ca3 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/SetOperatorsTest.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/SetOperatorsTest.scala
@@ -24,7 +24,6 @@ import org.apache.flink.api.java.typeutils.GenericTypeInfo
import org.apache.flink.api.scala._
import org.apache.flink.table.api.Types
import org.apache.flink.table.api.scala._
-import org.apache.flink.table.expressions.Null
import org.apache.flink.table.runtime.utils.CommonTestData.NonPojo
import org.apache.flink.table.utils.TableTestBase
import org.apache.flink.table.utils.TableTestUtil._
@@ -88,7 +87,7 @@ class SetOperatorsTest extends TableTestBase {
val in = t.select('a)
.unionAll(
- t.select(('c > 0) ? ('b, Null(createTypeInformation[(Int, String)]))))
+ t.select(('c > 0) ? ('b, nullOf(createTypeInformation[(Int,
String)]))))
val expected = binaryNode(
"DataSetUnion",
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala
index 37d5bc1..8e24413 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala
@@ -22,7 +22,6 @@ import org.apache.flink.api.scala._
import org.apache.flink.table.api.Types
import org.apache.flink.table.api.scala._
import org.apache.flink.table.calcite.RelTimeIndicatorConverter
-import org.apache.flink.table.expressions.Null
import org.apache.flink.table.plan.logical.TumblingGroupWindow
import org.apache.flink.table.runtime.join.WindowJoinUtil
import org.apache.flink.table.utils.TableTestUtil.{term, _}
@@ -256,7 +255,7 @@ class JoinTest extends TableTestBase {
val streamUtil: StreamTableTestUtil = streamTestUtil()
val t1 = streamUtil.addTable[(Int, Long, String)]("Table1", 'a, 'b, 'c,
'proctime.proctime)
- .select('a, 'b, 'c, 'proctime, Null(Types.LONG) as 'nullField)
+ .select('a, 'b, 'c, 'proctime, nullOf(Types.LONG) as 'nullField)
val t2 = streamUtil.addTable[(Int, Long, String)]("Table2", 'a, 'b, 'c,
'proctime.proctime)
.select('a, 'b, 'c, 'proctime, 12L as 'nullField)
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/expressions/ArrayTypeTest.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/expressions/ArrayTypeTest.scala
index e0292b2..a7aae1c 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/expressions/ArrayTypeTest.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/expressions/ArrayTypeTest.scala
@@ -54,14 +54,14 @@ class ArrayTypeTest extends ArrayTypeTestBase {
"[2, 9]")
testAllApis(
- array(Null(Types.INT), 1),
- "array(Null(INT), 1)",
+ array(nullOf(Types.INT), 1),
+ "array(nullOf(INT), 1)",
"ARRAY[NULLIF(1,1), 1]",
"[null, 1]")
testAllApis(
- array(array(Null(Types.INT), 1)),
- "array(array(Null(INT), 1))",
+ array(array(nullOf(Types.INT), 1)),
+ "array(array(nullOf(INT), 1))",
"ARRAY[ARRAY[NULLIF(1,1), 1]]",
"[[null, 1]]")
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/expressions/MapTypeTest.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/expressions/MapTypeTest.scala
index 0a30eb0..56cfd0f 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/expressions/MapTypeTest.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/expressions/MapTypeTest.scala
@@ -54,8 +54,8 @@ class MapTypeTest extends MapTypeTestBase {
"{2=2, 3=9}")
testAllApis(
- map(1, Null(Types.INT)),
- "map(1, Null(INT))",
+ map(1, nullOf(Types.INT)),
+ "map(1, nullOf(INT))",
"map[1, NULLIF(1,1)]",
"{1=null}")
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/expressions/RowTypeTest.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/expressions/RowTypeTest.scala
index df84a84..7893e05 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/expressions/RowTypeTest.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/expressions/RowTypeTest.scala
@@ -52,8 +52,8 @@ class RowTypeTest extends RowTypeTestBase {
"1985-04-11,0.1,[1, 2, 3],{foo=bar},1,true") // string flatten
testAllApis(
- row(1 + 1, 2 * 3, Null(Types.STRING)),
- "row(1 + 1, 2 * 3, Null(STRING))",
+ row(1 + 1, 2 * 3, nullOf(Types.STRING)),
+ "row(1 + 1, 2 * 3, nullOf(STRING))",
"ROW(1 + 1, 2 * 3, NULLIF(1,1))",
"2,6,null"
)
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
index 799f636..e5e8dc2 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
@@ -115,14 +115,14 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
"null")
testAllApis(
- 'f0.replace(Null(Types.STRING), ""),
- "f0.replace(Null(STRING), '')",
+ 'f0.replace(nullOf(Types.STRING), ""),
+ "f0.replace(nullOf(STRING), '')",
"REPLACE(f0, NULLIF('', ''), '')",
"null")
testAllApis(
- 'f0.replace(" ", Null(Types.STRING)),
- "f0.replace(' ', Null(STRING))",
+ 'f0.replace(" ", nullOf(Types.STRING)),
+ "f0.replace(' ', nullOf(STRING))",
"REPLACE(f0, ' ', NULLIF('', ''))",
"null")
}
@@ -440,8 +440,8 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
"2A")
testAllApis(
- Null(Types.BYTE).hex(),
- "hex(Null(BYTE))",
+ nullOf(Types.BYTE).hex(),
+ "hex(nullOf(BYTE))",
"HEX(CAST(NULL AS TINYINT))",
"null")
@@ -529,8 +529,8 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
def testBin(): Unit = {
testAllApis(
- Null(Types.BYTE).bin(),
- "bin(Null(BYTE))",
+ nullOf(Types.BYTE).bin(),
+ "bin(nullOf(BYTE))",
"BIN((CAST(NULL AS TINYINT)))",
"null")
@@ -648,8 +648,8 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
// This test was added for the null literal problem in string expression
parsing (FLINK-10463).
testAllApis(
- Null(Types.STRING).regexpReplace("oo|ar", 'f33),
- "Null(STRING).regexpReplace('oo|ar', f33)",
+ nullOf(Types.STRING).regexpReplace("oo|ar", 'f33),
+ "nullOf(STRING).regexpReplace('oo|ar', f33)",
"REGEXP_REPLACE(CAST(NULL AS VARCHAR), 'oo|ar', f33)",
"null")
}
@@ -2645,17 +2645,17 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
}
testAllApis(
- timestampDiff(TimePointUnit.DAY, Null(Types.SQL_TIMESTAMP),
+ timestampDiff(TimePointUnit.DAY, nullOf(Types.SQL_TIMESTAMP),
"2016-02-24 12:42:25".toTimestamp),
- "timestampDiff(DAY, Null(SQL_TIMESTAMP), '2016-02-24
12:42:25'.toTimestamp)",
+ "timestampDiff(DAY, nullOf(SQL_TIMESTAMP), '2016-02-24
12:42:25'.toTimestamp)",
"TIMESTAMPDIFF(DAY, CAST(NULL AS TIMESTAMP), TIMESTAMP '2016-02-24
12:42:25')",
"null"
)
testAllApis(
timestampDiff(TimePointUnit.DAY, "2016-02-24 12:42:25".toTimestamp,
- Null(Types.SQL_TIMESTAMP)),
- "timestampDiff(DAY, '2016-02-24 12:42:25'.toTimestamp,
Null(SQL_TIMESTAMP))",
+ nullOf(Types.SQL_TIMESTAMP)),
+ "timestampDiff(DAY, '2016-02-24 12:42:25'.toTimestamp,
nullOf(SQL_TIMESTAMP))",
"TIMESTAMPDIFF(DAY, TIMESTAMP '2016-02-24 12:42:25', CAST(NULL AS
TIMESTAMP))",
"null"
)
@@ -2779,20 +2779,20 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
}
testAllApis(
- "2016-02-24 12:42:25".toTimestamp + Null(Types.INTERVAL_MILLIS),
- "'2016-02-24 12:42:25'.toTimestamp + Null(INTERVAL_MILLIS)",
+ "2016-02-24 12:42:25".toTimestamp + nullOf(Types.INTERVAL_MILLIS),
+ "'2016-02-24 12:42:25'.toTimestamp + nullOf(INTERVAL_MILLIS)",
"TIMESTAMPADD(HOUR, CAST(NULL AS INTEGER), TIMESTAMP '2016-02-24
12:42:25')",
"null")
testAllApis(
- Null(Types.SQL_TIMESTAMP) + -200.hours,
- "Null(SQL_TIMESTAMP) + -200.hours",
+ nullOf(Types.SQL_TIMESTAMP) + -200.hours,
+ "nullOf(SQL_TIMESTAMP) + -200.hours",
"TIMESTAMPADD(HOUR, -200, CAST(NULL AS TIMESTAMP))",
"null")
testAllApis(
- Null(Types.SQL_TIMESTAMP) + 3.months,
- "Null(SQL_TIMESTAMP) + 3.months",
+ nullOf(Types.SQL_TIMESTAMP) + 3.months,
+ "nullOf(SQL_TIMESTAMP) + 3.months",
"TIMESTAMPADD(MONTH, 3, CAST(NULL AS TIMESTAMP))",
"null")
@@ -2827,13 +2827,13 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
"timestampadd(SECOND, 1, date '2016-06-15')",
"2016-06-15 00:00:01.0")
- testAllApis(Null(Types.SQL_TIMESTAMP) + 1.second,
- "Null(SQL_TIMESTAMP) + 1.second",
+ testAllApis(nullOf(Types.SQL_TIMESTAMP) + 1.second,
+ "nullOf(SQL_TIMESTAMP) + 1.second",
"timestampadd(SECOND, 1, cast(null as date))",
"null")
- testAllApis(Null(Types.SQL_TIMESTAMP) + 1.day,
- "Null(SQL_TIMESTAMP) + 1.day",
+ testAllApis(nullOf(Types.SQL_TIMESTAMP) + 1.day,
+ "nullOf(SQL_TIMESTAMP) + 1.day",
"timestampadd(DAY, 1, cast(null as date))",
"null")
@@ -2986,8 +2986,8 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
"null")
testAllApis(
- "test".sha2(Null(Types.INT)),
- "sha2('test', Null(INT))",
+ "test".sha2(nullOf(Types.INT)),
+ "sha2('test', nullOf(INT))",
"SHA2('test', CAST(NULL AS INT))",
"null")
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/expressions/ScalarOperatorsTest.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/expressions/ScalarOperatorsTest.scala
index d61627b..088ba65 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/expressions/ScalarOperatorsTest.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/expressions/ScalarOperatorsTest.scala
@@ -296,8 +296,8 @@ class ScalarOperatorsTest extends ScalarOperatorsTestBase {
)
testTableApi(
- 'f10.in("This is a test String.", "String", "Hello world", "Comment#1",
Null(Types.STRING)),
- "f10.in('This is a test String.', 'String', 'Hello world', 'Comment#1',
Null(STRING))",
+ 'f10.in("This is a test String.", "String", "Hello world", "Comment#1",
nullOf(Types.STRING)),
+ "f10.in('This is a test String.', 'String', 'Hello world', 'Comment#1',
nullOf(STRING))",
"true"
)
@@ -308,8 +308,8 @@ class ScalarOperatorsTest extends ScalarOperatorsTestBase {
)
testTableApi(
- 'f10.in("FAIL", "FAIL", Null(Types.STRING)),
- "f10.in('FAIL', 'FAIL', Null(STRING))",
+ 'f10.in("FAIL", "FAIL", nullOf(Types.STRING)),
+ "f10.in('FAIL', 'FAIL', nullOf(STRING))",
"null"
)
}
@@ -350,10 +350,10 @@ class ScalarOperatorsTest extends ScalarOperatorsTestBase
{
"true")
// null
- testAllApis(Null(Types.INT), "Null(INT)", "CAST(NULL AS INT)", "null")
+ testAllApis(nullOf(Types.INT), "nullOf(INT)", "CAST(NULL AS INT)", "null")
testAllApis(
- Null(Types.STRING) === "",
- "Null(STRING) === ''",
+ nullOf(Types.STRING) === "",
+ "nullOf(STRING) === ''",
"CAST(NULL AS VARCHAR) = ''",
"null")
@@ -416,26 +416,26 @@ class ScalarOperatorsTest extends ScalarOperatorsTestBase
{
def testBetween(): Unit = {
// between
testAllApis(
- 4.between(Null(Types.INT), 3),
- "4.between(Null(INT), 3)",
+ 4.between(nullOf(Types.INT), 3),
+ "4.between(nullOf(INT), 3)",
"4 BETWEEN NULL AND 3",
"false"
)
testAllApis(
- 4.between(Null(Types.INT), 12),
- "4.between(Null(INT), 12)",
+ 4.between(nullOf(Types.INT), 12),
+ "4.between(nullOf(INT), 12)",
"4 BETWEEN NULL AND 12",
"null"
)
testAllApis(
- 4.between(Null(Types.INT), 3),
- "4.between(Null(INT), 3)",
+ 4.between(nullOf(Types.INT), 3),
+ "4.between(nullOf(INT), 3)",
"4 BETWEEN 5 AND NULL",
"false"
)
testAllApis(
- 4.between(Null(Types.INT), 12),
- "4.between(Null(INT), 12)",
+ 4.between(nullOf(Types.INT), 12),
+ "4.between(nullOf(INT), 12)",
"4 BETWEEN 0 AND NULL",
"null"
)
@@ -490,8 +490,8 @@ class ScalarOperatorsTest extends ScalarOperatorsTestBase {
// not between
testAllApis(
- 2.notBetween(Null(Types.INT), 3),
- "2.notBetween(Null(INT), 3)",
+ 2.notBetween(nullOf(Types.INT), 3),
+ "2.notBetween(nullOf(INT), 3)",
"2 NOT BETWEEN NULL AND 3",
"null"
)
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/expressions/UserDefinedScalarFunctionTest.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/expressions/UserDefinedScalarFunctionTest.scala
index 1534344..35faa2b 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/expressions/UserDefinedScalarFunctionTest.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/expressions/UserDefinedScalarFunctionTest.scala
@@ -121,26 +121,26 @@ class UserDefinedScalarFunctionTest extends
ExpressionTestBase {
@Test
def testNullableParameters(): Unit = {
testAllApis(
- Func3(Null(INT_TYPE_INFO), Null(STRING_TYPE_INFO)),
- "Func3(Null(INT), Null(STRING))",
+ Func3(nullOf(INT_TYPE_INFO), nullOf(STRING_TYPE_INFO)),
+ "Func3(nullOf(INT), nullOf(STRING))",
"Func3(NULL, NULL)",
"null and null")
testAllApis(
- Func3(Null(INT_TYPE_INFO), "Test"),
- "Func3(Null(INT), 'Test')",
+ Func3(nullOf(INT_TYPE_INFO), "Test"),
+ "Func3(nullOf(INT), 'Test')",
"Func3(NULL, 'Test')",
"null and Test")
testAllApis(
- Func3(42, Null(STRING_TYPE_INFO)),
- "Func3(42, Null(STRING))",
+ Func3(42, nullOf(STRING_TYPE_INFO)),
+ "Func3(42, nullOf(STRING))",
"Func3(42, NULL)",
"42 and null")
testAllApis(
- Func0(Null(INT_TYPE_INFO)),
- "Func0(Null(INT))",
+ Func0(nullOf(INT_TYPE_INFO)),
+ "Func0(nullOf(INT))",
"Func0(NULL)",
"-1")
}
@@ -349,8 +349,8 @@ class UserDefinedScalarFunctionTest extends
ExpressionTestBase {
"7591 and 43810000 and 655906210000")
testAllApis(
- JavaFunc1(Null(Types.SQL_TIME), 15, Null(Types.SQL_TIMESTAMP)),
- "JavaFunc1(Null(SQL_TIME), 15, Null(SQL_TIMESTAMP))",
+ JavaFunc1(nullOf(Types.SQL_TIME), 15, nullOf(Types.SQL_TIMESTAMP)),
+ "JavaFunc1(nullOf(SQL_TIME), 15, nullOf(SQL_TIMESTAMP))",
"JavaFunc1(NULL, 15, NULL)",
"null and 15 and null")
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala
index c5787c2..ddaf7fc 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala
@@ -25,9 +25,8 @@ import org.apache.flink.streaming.api.TimeCharacteristic
import
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.watermark.Watermark
-import org.apache.flink.table.api.scala._
import org.apache.flink.table.api.Types
-import org.apache.flink.table.expressions.Null
+import org.apache.flink.table.api.scala._
import org.apache.flink.table.runtime.utils.{StreamITCase, StreamTestData,
StreamingWithStateTestBase}
import org.apache.flink.types.Row
import org.junit.Assert.assertEquals
@@ -69,9 +68,9 @@ class JoinITCase extends StreamingWithStateTestBase {
data2.+=((2, 2L, "HeHe"))
val t1 = env.fromCollection(data1).toTable(tEnv, 'a, 'b, 'c,
'proctime.proctime)
- .select(('a === 1)?(Null(Types.INT), 'a) as 'a, 'b, 'c, 'proctime) //
test null values
+ .select(('a === 1)?(nullOf(Types.INT), 'a) as 'a, 'b, 'c, 'proctime) //
test null values
val t2 = env.fromCollection(data2).toTable(tEnv, 'a, 'b, 'c,
'proctime.proctime)
- .select(('a === 1)?(Null(Types.INT), 'a) as 'a, 'b, 'c, 'proctime) //
test null values
+ .select(('a === 1)?(nullOf(Types.INT), 'a) as 'a, 'b, 'c, 'proctime) //
test null values
tEnv.registerTable("T1", t1)
tEnv.registerTable("T2", t2)
@@ -947,9 +946,9 @@ class JoinITCase extends StreamingWithStateTestBase {
data2.+=((3, 2L, "HeHe"))
val t1 = env.fromCollection(data1).toTable(tEnv, 'a, 'b, 'c)
- .select(('a === 3) ? (Null(Types.INT), 'a) as 'a, 'b, 'c)
+ .select(('a === 3) ? (nullOf(Types.INT), 'a) as 'a, 'b, 'c)
val t2 = env.fromCollection(data2).toTable(tEnv, 'a, 'b, 'c)
- .select(('a === 3) ? (Null(Types.INT), 'a) as 'a, 'b, 'c)
+ .select(('a === 3) ? (nullOf(Types.INT), 'a) as 'a, 'b, 'c)
tEnv.registerTable("T1", t1)
tEnv.registerTable("T2", t2)
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/AggregateITCase.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/AggregateITCase.scala
index 3b2268c..c0a3e24 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/AggregateITCase.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/AggregateITCase.scala
@@ -23,10 +23,9 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.scala._
-import org.apache.flink.table.runtime.utils.StreamITCase.RetractingSink
import org.apache.flink.table.api.{StreamQueryConfig, Types}
-import org.apache.flink.table.expressions.Null
import
org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.{CountDistinct,
DataViewTestAgg, WeightedAvg}
+import org.apache.flink.table.runtime.utils.StreamITCase.RetractingSink
import org.apache.flink.table.runtime.utils.{JavaUserDefinedAggFunctions,
StreamITCase, StreamTestData, StreamingWithStateTestBase}
import org.apache.flink.types.Row
import org.junit.Assert.assertEquals
@@ -142,7 +141,7 @@ class AggregateITCase extends StreamingWithStateTestBase {
StreamITCase.clear
val t = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
- .select('b, Null(Types.LONG)).distinct()
+ .select('b, nullOf(Types.LONG)).distinct()
val results = t.toRetractStream[Row](queryConfig)
results.addSink(new StreamITCase.RetractingSink).setParallelism(1)
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/JoinITCase.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/JoinITCase.scala
index 4a69ec5..926319f 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/JoinITCase.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/JoinITCase.scala
@@ -18,20 +18,20 @@
package org.apache.flink.table.runtime.stream.table
+import org.apache.flink.api.common.time.Time
+import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.table.api.{StreamQueryConfig, Types}
import org.apache.flink.table.api.scala._
-import org.apache.flink.table.runtime.utils.{StreamITCase, StreamTestData,
StreamingWithStateTestBase}
-import org.junit.Assert._
-import org.junit.Test
-import org.apache.flink.api.common.time.Time
-import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.{StreamQueryConfig, Types}
+import org.apache.flink.table.expressions.Literal
import org.apache.flink.table.expressions.utils.Func20
-import org.apache.flink.table.expressions.{Literal, Null}
import org.apache.flink.table.functions.aggfunctions.CountAggFunction
import
org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.{CountDistinct,
WeightedAvg}
+import org.apache.flink.table.runtime.utils.{StreamITCase, StreamTestData,
StreamingWithStateTestBase}
import org.apache.flink.types.Row
+import org.junit.Assert._
+import org.junit.Test
import scala.collection.mutable
@@ -441,9 +441,9 @@ class JoinITCase extends StreamingWithStateTestBase {
env.setStateBackend(getStateBackend)
val ds1 = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
- .select(('a === 21) ? (Null(Types.INT), 'a) as 'a, 'b, 'c)
+ .select(('a === 21) ? (nullOf(Types.INT), 'a) as 'a, 'b, 'c)
val ds2 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'd, 'e,
'f, 'g, 'h)
- .select(('e === 15) ? (Null(Types.INT), 'd) as 'd, 'e, 'f, 'g, 'h)
+ .select(('e === 15) ? (nullOf(Types.INT), 'd) as 'd, 'e, 'f, 'g, 'h)
val joinT = ds1.leftOuterJoin(ds2, 'a === 'd && 'b === 'h).select('c, 'g)