This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 8c40849 [FLINK-25075][table] Refactored PlannerExpressionParser to
avoid instantiation through reflections
8c40849 is described below
commit 8c40849c5249d96532305d6b88302ee863371541
Author: slinkydeveloper <[email protected]>
AuthorDate: Fri Nov 26 11:44:26 2021 +0100
[FLINK-25075][table] Refactored PlannerExpressionParser to avoid
instantiation through reflections
This closes #17931.
---
.../java/internal/StreamTableEnvironmentImpl.java | 4 +-
.../main/java/org/apache/flink/table/api/Over.java | 5 +-
.../flink/table/api/OverWindowPartitioned.java | 4 +-
.../table/api/OverWindowPartitionedOrdered.java | 6 +-
.../api/OverWindowPartitionedOrderedPreceding.java | 6 +-
.../java/org/apache/flink/table/api/Session.java | 4 +-
.../org/apache/flink/table/api/SessionWithGap.java | 4 +-
.../flink/table/api/SessionWithGapOnTime.java | 4 +-
.../java/org/apache/flink/table/api/Slide.java | 4 +-
.../org/apache/flink/table/api/SlideWithSize.java | 4 +-
.../flink/table/api/SlideWithSizeAndSlide.java | 4 +-
.../table/api/SlideWithSizeAndSlideOnTime.java | 4 +-
.../java/org/apache/flink/table/api/Tumble.java | 4 +-
.../org/apache/flink/table/api/TumbleWithSize.java | 4 +-
.../flink/table/api/TumbleWithSizeOnTime.java | 4 +-
.../apache/flink/table/api/internal/TableImpl.java | 93 +++++++++++++---------
.../ExpressionParser.java | 24 +++---
.../ExpressionParserFactory.java} | 29 ++++---
.../table/delegation/PlannerExpressionParser.java | 73 -----------------
.../table/api/WindowCreationValidationTest.java | 57 -------------
.../delegation/DefaultExpressionParserFactory.java | 53 ++++++++++++
.../org.apache.flink.table.factories.Factory | 1 +
...ParserImpl.scala => ExpressionParserImpl.scala} | 19 +++--
.../planner/expressions/KeywordParseTest.scala | 13 ++-
.../planner/expressions/ScalarFunctionsTest.scala | 10 +--
.../expressions/utils/ExpressionTestBase.scala | 7 +-
.../planner/plan/utils/RexNodeExtractorTest.scala | 48 +++++------
.../planner/runtime/utils/BatchTableEnvUtil.scala | 6 +-
.../runtime/utils/CollectionBatchExecTable.scala | 5 +-
.../planner/runtime/utils/StreamTableEnvUtil.scala | 5 +-
30 files changed, 228 insertions(+), 280 deletions(-)
diff --git
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImpl.java
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImpl.java
index 72a92d0..b0957f6 100644
---
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImpl.java
+++
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImpl.java
@@ -49,10 +49,10 @@ import org.apache.flink.table.catalog.UnresolvedIdentifier;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.delegation.Executor;
import org.apache.flink.table.delegation.ExecutorFactory;
+import org.apache.flink.table.delegation.ExpressionParser;
import org.apache.flink.table.delegation.Planner;
import org.apache.flink.table.expressions.ApiExpressionUtils;
import org.apache.flink.table.expressions.Expression;
-import org.apache.flink.table.expressions.ExpressionParser;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.factories.PlannerFactoryUtil;
import org.apache.flink.table.functions.AggregateFunction;
@@ -458,7 +458,7 @@ public final class StreamTableEnvironmentImpl extends
TableEnvironmentImpl
@Override
public <T> Table fromDataStream(DataStream<T> dataStream, String fields) {
- List<Expression> expressions =
ExpressionParser.parseExpressionList(fields);
+ List<Expression> expressions =
ExpressionParser.INSTANCE.parseExpressionList(fields);
return fromDataStream(dataStream, expressions.toArray(new
Expression[0]));
}
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Over.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Over.java
index 7862816..97a1eec 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Over.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Over.java
@@ -19,8 +19,8 @@
package org.apache.flink.table.api;
import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.delegation.ExpressionParser;
import org.apache.flink.table.expressions.Expression;
-import org.apache.flink.table.expressions.ExpressionParser;
import java.util.Arrays;
@@ -53,7 +53,8 @@ public final class Over {
* @return an over window with defined partitioning
*/
public static OverWindowPartitioned partitionBy(String partitionBy) {
- return new
OverWindowPartitioned(ExpressionParser.parseExpressionList(partitionBy));
+ return new OverWindowPartitioned(
+ ExpressionParser.INSTANCE.parseExpressionList(partitionBy));
}
/**
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/OverWindowPartitioned.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/OverWindowPartitioned.java
index 7c47444..2d35689 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/OverWindowPartitioned.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/OverWindowPartitioned.java
@@ -19,8 +19,8 @@
package org.apache.flink.table.api;
import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.delegation.ExpressionParser;
import org.apache.flink.table.expressions.Expression;
-import org.apache.flink.table.expressions.ExpressionParser;
import java.util.List;
@@ -49,7 +49,7 @@ public final class OverWindowPartitioned {
*/
@Deprecated
public OverWindowPartitionedOrdered orderBy(String orderBy) {
- return this.orderBy(ExpressionParser.parseExpression(orderBy));
+ return
this.orderBy(ExpressionParser.INSTANCE.parseExpression(orderBy));
}
/**
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/OverWindowPartitionedOrdered.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/OverWindowPartitionedOrdered.java
index d1d82a8..92a2360 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/OverWindowPartitionedOrdered.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/OverWindowPartitionedOrdered.java
@@ -19,8 +19,8 @@
package org.apache.flink.table.api;
import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.delegation.ExpressionParser;
import org.apache.flink.table.expressions.Expression;
-import org.apache.flink.table.expressions.ExpressionParser;
import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
import java.util.List;
@@ -49,7 +49,7 @@ public final class OverWindowPartitionedOrdered {
*/
@Deprecated
public OverWindowPartitionedOrderedPreceding preceding(String preceding) {
- return this.preceding(ExpressionParser.parseExpression(preceding));
+ return
this.preceding(ExpressionParser.INSTANCE.parseExpression(preceding));
}
/**
@@ -69,7 +69,7 @@ public final class OverWindowPartitionedOrdered {
* @return the fully defined over window
*/
public OverWindow as(String alias) {
- return as(ExpressionParser.parseExpression(alias));
+ return as(ExpressionParser.INSTANCE.parseExpression(alias));
}
/**
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/OverWindowPartitionedOrderedPreceding.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/OverWindowPartitionedOrderedPreceding.java
index 885306e..32511fb 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/OverWindowPartitionedOrderedPreceding.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/OverWindowPartitionedOrderedPreceding.java
@@ -19,8 +19,8 @@
package org.apache.flink.table.api;
import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.delegation.ExpressionParser;
import org.apache.flink.table.expressions.Expression;
-import org.apache.flink.table.expressions.ExpressionParser;
import java.util.List;
import java.util.Optional;
@@ -48,7 +48,7 @@ public final class OverWindowPartitionedOrderedPreceding {
* @return the fully defined over window
*/
public OverWindow as(String alias) {
- return as(ExpressionParser.parseExpression(alias));
+ return as(ExpressionParser.INSTANCE.parseExpression(alias));
}
/**
@@ -70,7 +70,7 @@ public final class OverWindowPartitionedOrderedPreceding {
*/
@Deprecated
public OverWindowPartitionedOrderedPreceding following(String following) {
- return this.following(ExpressionParser.parseExpression(following));
+ return
this.following(ExpressionParser.INSTANCE.parseExpression(following));
}
/**
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Session.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Session.java
index 50cb165..e075c6d 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Session.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Session.java
@@ -19,8 +19,8 @@
package org.apache.flink.table.api;
import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.delegation.ExpressionParser;
import org.apache.flink.table.expressions.Expression;
-import org.apache.flink.table.expressions.ExpressionParser;
/**
* Helper class for creating a session window. The boundary of session windows
are defined by
@@ -53,7 +53,7 @@ public final class Session {
*/
@Deprecated
public static SessionWithGap withGap(String gap) {
- return withGap(ExpressionParser.parseExpression(gap));
+ return withGap(ExpressionParser.INSTANCE.parseExpression(gap));
}
/**
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SessionWithGap.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SessionWithGap.java
index 49521bb..0859cd2 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SessionWithGap.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SessionWithGap.java
@@ -19,9 +19,9 @@
package org.apache.flink.table.api;
import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.delegation.ExpressionParser;
import org.apache.flink.table.expressions.ApiExpressionUtils;
import org.apache.flink.table.expressions.Expression;
-import org.apache.flink.table.expressions.ExpressionParser;
/**
* Session window.
@@ -54,7 +54,7 @@ public final class SessionWithGap {
*/
@Deprecated
public SessionWithGapOnTime on(String timeField) {
- return on(ExpressionParser.parseExpression(timeField));
+ return on(ExpressionParser.INSTANCE.parseExpression(timeField));
}
/**
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SessionWithGapOnTime.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SessionWithGapOnTime.java
index 202a940..d53c1ed 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SessionWithGapOnTime.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SessionWithGapOnTime.java
@@ -19,9 +19,9 @@
package org.apache.flink.table.api;
import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.delegation.ExpressionParser;
import org.apache.flink.table.expressions.ApiExpressionUtils;
import org.apache.flink.table.expressions.Expression;
-import org.apache.flink.table.expressions.ExpressionParser;
/** Session window on time. */
@PublicEvolving
@@ -44,7 +44,7 @@ public final class SessionWithGapOnTime {
* @return this window
*/
public SessionWithGapOnTimeWithAlias as(String alias) {
- return as(ExpressionParser.parseExpression(alias));
+ return as(ExpressionParser.INSTANCE.parseExpression(alias));
}
/**
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Slide.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Slide.java
index 6c4673d..4d1bb81 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Slide.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Slide.java
@@ -19,8 +19,8 @@
package org.apache.flink.table.api;
import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.delegation.ExpressionParser;
import org.apache.flink.table.expressions.Expression;
-import org.apache.flink.table.expressions.ExpressionParser;
/**
* Helper class for creating a sliding window. Sliding windows have a fixed
size and slide by a
@@ -61,7 +61,7 @@ public final class Slide {
*/
@Deprecated
public static SlideWithSize over(String size) {
- return over(ExpressionParser.parseExpression(size));
+ return over(ExpressionParser.INSTANCE.parseExpression(size));
}
/**
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SlideWithSize.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SlideWithSize.java
index 1e29478..7b0227f 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SlideWithSize.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SlideWithSize.java
@@ -19,9 +19,9 @@
package org.apache.flink.table.api;
import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.delegation.ExpressionParser;
import org.apache.flink.table.expressions.ApiExpressionUtils;
import org.apache.flink.table.expressions.Expression;
-import org.apache.flink.table.expressions.ExpressionParser;
/**
* Partially specified sliding window. The size of the window either as time
or row-count interval.
@@ -52,7 +52,7 @@ public final class SlideWithSize {
*/
@Deprecated
public SlideWithSizeAndSlide every(String slide) {
- return every(ExpressionParser.parseExpression(slide));
+ return every(ExpressionParser.INSTANCE.parseExpression(slide));
}
/**
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SlideWithSizeAndSlide.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SlideWithSizeAndSlide.java
index 3713500..d101d6e 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SlideWithSizeAndSlide.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SlideWithSizeAndSlide.java
@@ -19,9 +19,9 @@
package org.apache.flink.table.api;
import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.delegation.ExpressionParser;
import org.apache.flink.table.expressions.ApiExpressionUtils;
import org.apache.flink.table.expressions.Expression;
-import org.apache.flink.table.expressions.ExpressionParser;
/**
* Sliding window. The size of the window either as time or row-count interval.
@@ -57,7 +57,7 @@ public final class SlideWithSizeAndSlide {
*/
@Deprecated
public SlideWithSizeAndSlideOnTime on(String timeField) {
- return on(ExpressionParser.parseExpression(timeField));
+ return on(ExpressionParser.INSTANCE.parseExpression(timeField));
}
/**
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SlideWithSizeAndSlideOnTime.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SlideWithSizeAndSlideOnTime.java
index 4a57bdb..d21a01c 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SlideWithSizeAndSlideOnTime.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SlideWithSizeAndSlideOnTime.java
@@ -19,9 +19,9 @@
package org.apache.flink.table.api;
import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.delegation.ExpressionParser;
import org.apache.flink.table.expressions.ApiExpressionUtils;
import org.apache.flink.table.expressions.Expression;
-import org.apache.flink.table.expressions.ExpressionParser;
/** Sliding window on time. */
@PublicEvolving
@@ -46,7 +46,7 @@ public final class SlideWithSizeAndSlideOnTime {
* @return this window
*/
public SlideWithSizeAndSlideOnTimeWithAlias as(String alias) {
- return as(ExpressionParser.parseExpression(alias));
+ return as(ExpressionParser.INSTANCE.parseExpression(alias));
}
/**
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Tumble.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Tumble.java
index 87743c9..9a3489b 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Tumble.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Tumble.java
@@ -19,8 +19,8 @@
package org.apache.flink.table.api;
import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.delegation.ExpressionParser;
import org.apache.flink.table.expressions.Expression;
-import org.apache.flink.table.expressions.ExpressionParser;
/**
* Helper class for creating a tumbling window. Tumbling windows are
consecutive, non-overlapping
@@ -53,7 +53,7 @@ public final class Tumble {
*/
@Deprecated
public static TumbleWithSize over(String size) {
- return over(ExpressionParser.parseExpression(size));
+ return over(ExpressionParser.INSTANCE.parseExpression(size));
}
/**
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TumbleWithSize.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TumbleWithSize.java
index 563d0c6..20e34c5 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TumbleWithSize.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TumbleWithSize.java
@@ -19,9 +19,9 @@
package org.apache.flink.table.api;
import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.delegation.ExpressionParser;
import org.apache.flink.table.expressions.ApiExpressionUtils;
import org.apache.flink.table.expressions.Expression;
-import org.apache.flink.table.expressions.ExpressionParser;
/**
* Tumbling window.
@@ -69,6 +69,6 @@ public final class TumbleWithSize {
*/
@Deprecated
public TumbleWithSizeOnTime on(String timeField) {
- return on(ExpressionParser.parseExpression(timeField));
+ return on(ExpressionParser.INSTANCE.parseExpression(timeField));
}
}
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TumbleWithSizeOnTime.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TumbleWithSizeOnTime.java
index 8f06421..083a835 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TumbleWithSizeOnTime.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TumbleWithSizeOnTime.java
@@ -19,9 +19,9 @@
package org.apache.flink.table.api;
import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.delegation.ExpressionParser;
import org.apache.flink.table.expressions.ApiExpressionUtils;
import org.apache.flink.table.expressions.Expression;
-import org.apache.flink.table.expressions.ExpressionParser;
/** Tumbling window on time. */
@PublicEvolving
@@ -56,6 +56,6 @@ public final class TumbleWithSizeOnTime {
* @return this window
*/
public TumbleWithSizeOnTimeWithAlias as(String alias) {
- return as(ExpressionParser.parseExpression(alias));
+ return as(ExpressionParser.INSTANCE.parseExpression(alias));
}
}
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java
index 682fd93..a1030b26c 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java
@@ -39,8 +39,8 @@ import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.SchemaTranslator;
import org.apache.flink.table.catalog.UnresolvedIdentifier;
+import org.apache.flink.table.delegation.ExpressionParser;
import org.apache.flink.table.expressions.Expression;
-import org.apache.flink.table.expressions.ExpressionParser;
import org.apache.flink.table.expressions.UnresolvedReferenceExpression;
import org.apache.flink.table.expressions.resolver.LookupCallResolver;
import org.apache.flink.table.functions.TemporalTableFunction;
@@ -120,7 +120,8 @@ public class TableImpl implements Table {
@Override
public Table select(String fields) {
- return select(ExpressionParser.parseExpressionList(fields).toArray(new
Expression[0]));
+ return select(
+
ExpressionParser.INSTANCE.parseExpressionList(fields).toArray(new
Expression[0]));
}
@Override
@@ -151,8 +152,8 @@ public class TableImpl implements Table {
public TemporalTableFunction createTemporalTableFunction(
String timeAttribute, String primaryKey) {
return createTemporalTableFunction(
- ExpressionParser.parseExpression(timeAttribute),
- ExpressionParser.parseExpression(primaryKey));
+ ExpressionParser.INSTANCE.parseExpression(timeAttribute),
+ ExpressionParser.INSTANCE.parseExpression(primaryKey));
}
@Override
@@ -171,7 +172,7 @@ public class TableImpl implements Table {
public Table as(String field, String... fields) {
final List<Expression> fieldsExprs;
if (fields.length == 0 &&
operationTree.getResolvedSchema().getColumnCount() > 1) {
- fieldsExprs = ExpressionParser.parseExpressionList(field);
+ fieldsExprs = ExpressionParser.INSTANCE.parseExpressionList(field);
} else {
fieldsExprs = new ArrayList<>();
fieldsExprs.add(lit(field));
@@ -189,7 +190,7 @@ public class TableImpl implements Table {
@Override
public Table filter(String predicate) {
- return filter(ExpressionParser.parseExpression(predicate));
+ return filter(ExpressionParser.INSTANCE.parseExpression(predicate));
}
@Override
@@ -210,7 +211,7 @@ public class TableImpl implements Table {
@Override
public GroupedTable groupBy(String fields) {
- return new GroupedTableImpl(this,
ExpressionParser.parseExpressionList(fields));
+ return new GroupedTableImpl(this,
ExpressionParser.INSTANCE.parseExpressionList(fields));
}
@Override
@@ -230,7 +231,7 @@ public class TableImpl implements Table {
@Override
public Table join(Table right, String joinPredicate) {
- return join(right, ExpressionParser.parseExpression(joinPredicate));
+ return join(right,
ExpressionParser.INSTANCE.parseExpression(joinPredicate));
}
@Override
@@ -245,7 +246,7 @@ public class TableImpl implements Table {
@Override
public Table leftOuterJoin(Table right, String joinPredicate) {
- return leftOuterJoin(right,
ExpressionParser.parseExpression(joinPredicate));
+ return leftOuterJoin(right,
ExpressionParser.INSTANCE.parseExpression(joinPredicate));
}
@Override
@@ -255,7 +256,7 @@ public class TableImpl implements Table {
@Override
public Table rightOuterJoin(Table right, String joinPredicate) {
- return rightOuterJoin(right,
ExpressionParser.parseExpression(joinPredicate));
+ return rightOuterJoin(right,
ExpressionParser.INSTANCE.parseExpression(joinPredicate));
}
@Override
@@ -265,7 +266,7 @@ public class TableImpl implements Table {
@Override
public Table fullOuterJoin(Table right, String joinPredicate) {
- return fullOuterJoin(right,
ExpressionParser.parseExpression(joinPredicate));
+ return fullOuterJoin(right,
ExpressionParser.INSTANCE.parseExpression(joinPredicate));
}
@Override
@@ -297,7 +298,7 @@ public class TableImpl implements Table {
@Override
public Table joinLateral(String tableFunctionCall) {
- return
joinLateral(ExpressionParser.parseExpression(tableFunctionCall));
+ return
joinLateral(ExpressionParser.INSTANCE.parseExpression(tableFunctionCall));
}
@Override
@@ -308,8 +309,8 @@ public class TableImpl implements Table {
@Override
public Table joinLateral(String tableFunctionCall, String joinPredicate) {
return joinLateral(
- ExpressionParser.parseExpression(tableFunctionCall),
- ExpressionParser.parseExpression(joinPredicate));
+ ExpressionParser.INSTANCE.parseExpression(tableFunctionCall),
+ ExpressionParser.INSTANCE.parseExpression(joinPredicate));
}
@Override
@@ -319,7 +320,7 @@ public class TableImpl implements Table {
@Override
public Table leftOuterJoinLateral(String tableFunctionCall) {
- return
leftOuterJoinLateral(ExpressionParser.parseExpression(tableFunctionCall));
+ return
leftOuterJoinLateral(ExpressionParser.INSTANCE.parseExpression(tableFunctionCall));
}
@Override
@@ -330,8 +331,8 @@ public class TableImpl implements Table {
@Override
public Table leftOuterJoinLateral(String tableFunctionCall, String
joinPredicate) {
return leftOuterJoinLateral(
- ExpressionParser.parseExpression(tableFunctionCall),
- ExpressionParser.parseExpression(joinPredicate));
+ ExpressionParser.INSTANCE.parseExpression(tableFunctionCall),
+ ExpressionParser.INSTANCE.parseExpression(joinPredicate));
}
@Override
@@ -406,7 +407,7 @@ public class TableImpl implements Table {
public Table orderBy(String fields) {
return createTable(
operationTreeBuilder.sort(
- ExpressionParser.parseExpressionList(fields),
operationTree));
+ ExpressionParser.INSTANCE.parseExpressionList(fields),
operationTree));
}
@Override
@@ -449,7 +450,7 @@ public class TableImpl implements Table {
@Override
public Table addColumns(String fields) {
- return addColumnsOperation(false,
ExpressionParser.parseExpressionList(fields));
+ return addColumnsOperation(false,
ExpressionParser.INSTANCE.parseExpressionList(fields));
}
@Override
@@ -459,7 +460,7 @@ public class TableImpl implements Table {
@Override
public Table addOrReplaceColumns(String fields) {
- return addColumnsOperation(true,
ExpressionParser.parseExpressionList(fields));
+ return addColumnsOperation(true,
ExpressionParser.INSTANCE.parseExpressionList(fields));
}
@Override
@@ -490,7 +491,7 @@ public class TableImpl implements Table {
public Table renameColumns(String fields) {
return createTable(
operationTreeBuilder.renameColumns(
- ExpressionParser.parseExpressionList(fields),
operationTree));
+ ExpressionParser.INSTANCE.parseExpressionList(fields),
operationTree));
}
@Override
@@ -503,7 +504,7 @@ public class TableImpl implements Table {
public Table dropColumns(String fields) {
return createTable(
operationTreeBuilder.dropColumns(
- ExpressionParser.parseExpressionList(fields),
operationTree));
+ ExpressionParser.INSTANCE.parseExpressionList(fields),
operationTree));
}
@Override
@@ -513,7 +514,7 @@ public class TableImpl implements Table {
@Override
public Table map(String mapFunction) {
- return map(ExpressionParser.parseExpression(mapFunction));
+ return map(ExpressionParser.INSTANCE.parseExpression(mapFunction));
}
@Override
@@ -523,7 +524,7 @@ public class TableImpl implements Table {
@Override
public Table flatMap(String tableFunction) {
- return flatMap(ExpressionParser.parseExpression(tableFunction));
+ return
flatMap(ExpressionParser.INSTANCE.parseExpression(tableFunction));
}
@Override
@@ -533,7 +534,7 @@ public class TableImpl implements Table {
@Override
public AggregatedTable aggregate(String aggregateFunction) {
- return aggregate(ExpressionParser.parseExpression(aggregateFunction));
+ return
aggregate(ExpressionParser.INSTANCE.parseExpression(aggregateFunction));
}
@Override
@@ -642,7 +643,10 @@ public class TableImpl implements Table {
@Override
public Table select(String fields) {
- return
select(ExpressionParser.parseExpressionList(fields).toArray(new Expression[0]));
+ return select(
+ ExpressionParser.INSTANCE
+ .parseExpressionList(fields)
+ .toArray(new Expression[0]));
}
@Override
@@ -666,7 +670,7 @@ public class TableImpl implements Table {
@Override
public AggregatedTable aggregate(String aggregateFunction) {
- return
aggregate(ExpressionParser.parseExpression(aggregateFunction));
+ return
aggregate(ExpressionParser.INSTANCE.parseExpression(aggregateFunction));
}
@Override
@@ -676,7 +680,7 @@ public class TableImpl implements Table {
@Override
public FlatAggregateTable flatAggregate(String tableAggFunction) {
- return
flatAggregate(ExpressionParser.parseExpression(tableAggFunction));
+ return
flatAggregate(ExpressionParser.INSTANCE.parseExpression(tableAggFunction));
}
@Override
@@ -699,7 +703,10 @@ public class TableImpl implements Table {
@Override
public Table select(String fields) {
- return
select(ExpressionParser.parseExpressionList(fields).toArray(new Expression[0]));
+ return select(
+ ExpressionParser.INSTANCE
+ .parseExpressionList(fields)
+ .toArray(new Expression[0]));
}
@Override
@@ -729,7 +736,7 @@ public class TableImpl implements Table {
public Table select(String fields) {
return table.createTable(
table.operationTreeBuilder.project(
- ExpressionParser.parseExpressionList(fields),
+
ExpressionParser.INSTANCE.parseExpressionList(fields),
table.operationTreeBuilder.tableAggregate(
groupKey,
tableAggregateFunction.accept(table.lookupResolver),
@@ -759,7 +766,10 @@ public class TableImpl implements Table {
@Override
public WindowGroupedTable groupBy(String fields) {
- return
groupBy(ExpressionParser.parseExpressionList(fields).toArray(new
Expression[0]));
+ return groupBy(
+ ExpressionParser.INSTANCE
+ .parseExpressionList(fields)
+ .toArray(new Expression[0]));
}
@Override
@@ -791,7 +801,10 @@ public class TableImpl implements Table {
@Override
public Table select(String fields) {
- return
select(ExpressionParser.parseExpressionList(fields).toArray(new Expression[0]));
+ return select(
+ ExpressionParser.INSTANCE
+ .parseExpressionList(fields)
+ .toArray(new Expression[0]));
}
@Override
@@ -816,7 +829,7 @@ public class TableImpl implements Table {
@Override
public AggregatedTable aggregate(String aggregateFunction) {
- return
aggregate(ExpressionParser.parseExpression(aggregateFunction));
+ return
aggregate(ExpressionParser.INSTANCE.parseExpression(aggregateFunction));
}
@Override
@@ -826,7 +839,7 @@ public class TableImpl implements Table {
@Override
public FlatAggregateTable flatAggregate(String tableAggregateFunction)
{
- return
flatAggregate(ExpressionParser.parseExpression(tableAggregateFunction));
+ return
flatAggregate(ExpressionParser.INSTANCE.parseExpression(tableAggregateFunction));
}
@Override
@@ -855,7 +868,10 @@ public class TableImpl implements Table {
@Override
public Table select(String fields) {
- return
select(ExpressionParser.parseExpressionList(fields).toArray(new Expression[0]));
+ return select(
+ ExpressionParser.INSTANCE
+ .parseExpressionList(fields)
+ .toArray(new Expression[0]));
}
@Override
@@ -914,7 +930,10 @@ public class TableImpl implements Table {
@Override
public Table select(String fields) {
- return
select(ExpressionParser.parseExpressionList(fields).toArray(new Expression[0]));
+ return select(
+ ExpressionParser.INSTANCE
+ .parseExpressionList(fields)
+ .toArray(new Expression[0]));
}
@Override
@@ -969,7 +988,7 @@ public class TableImpl implements Table {
public Table select(String fields) {
return table.createTable(
table.operationTreeBuilder.project(
- ExpressionParser.parseExpressionList(fields),
+
ExpressionParser.INSTANCE.parseExpressionList(fields),
table.operationTree,
overWindows));
}
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/ExpressionParser.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/ExpressionParser.java
similarity index 56%
copy from
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/ExpressionParser.java
copy to
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/ExpressionParser.java
index 2fbe87d..efd424a 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/ExpressionParser.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/ExpressionParser.java
@@ -16,27 +16,27 @@
* limitations under the License.
*/
-package org.apache.flink.table.expressions;
+package org.apache.flink.table.delegation;
import org.apache.flink.annotation.Internal;
-import org.apache.flink.table.delegation.PlannerExpressionParser;
+import org.apache.flink.table.expressions.Expression;
import java.util.List;
/**
- * Parser for expressions inside a String. This parses exactly the same
expressions that would be
- * accepted by the Scala Expression DSL.
+ * {@link Expression} parser used by Table API to parse strings in AST
expression. This parses
+ * exactly the same expressions that would be accepted by the Scala Expression
DSL.
*
- * <p>{@link ExpressionParser} use {@link PlannerExpressionParser} to parse
expressions.
+ * @deprecated The Java String Expression DSL is deprecated.
*/
@Internal
-public final class ExpressionParser {
+@Deprecated
+public interface ExpressionParser {
- public static Expression parseExpression(String exprString) {
- return PlannerExpressionParser.create().parseExpression(exprString);
- }
+ /** Default instance of the {@link ExpressionParser}. */
+ ExpressionParser INSTANCE = ExpressionParserFactory.getDefault().create();
- public static List<Expression> parseExpressionList(String expression) {
- return
PlannerExpressionParser.create().parseExpressionList(expression);
- }
+ Expression parseExpression(String exprString);
+
+ List<Expression> parseExpressionList(String expression);
}
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/ExpressionParser.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/ExpressionParserFactory.java
similarity index 55%
rename from
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/ExpressionParser.java
rename to
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/ExpressionParserFactory.java
index 2fbe87d..a61c1cb 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/ExpressionParser.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/ExpressionParserFactory.java
@@ -16,27 +16,30 @@
* limitations under the License.
*/
-package org.apache.flink.table.expressions;
+package org.apache.flink.table.delegation;
import org.apache.flink.annotation.Internal;
-import org.apache.flink.table.delegation.PlannerExpressionParser;
-
-import java.util.List;
+import org.apache.flink.table.factories.Factory;
+import org.apache.flink.table.factories.FactoryUtil;
/**
- * Parser for expressions inside a String. This parses exactly the same
expressions that would be
- * accepted by the Scala Expression DSL.
+ * Factory for {@link ExpressionParser}.
*
- * <p>{@link ExpressionParser} use {@link PlannerExpressionParser} to parse
expressions.
+ * @deprecated The Java String Expression DSL is deprecated.
*/
@Internal
-public final class ExpressionParser {
+@Deprecated
+public interface ExpressionParserFactory extends Factory {
- public static Expression parseExpression(String exprString) {
- return PlannerExpressionParser.create().parseExpression(exprString);
- }
+ /** {@link #factoryIdentifier()} for the default {@link
ExpressionParserFactory}. */
+ String DEFAULT_IDENTIFIER = "default";
+
+ ExpressionParser create();
- public static List<Expression> parseExpressionList(String expression) {
- return
PlannerExpressionParser.create().parseExpressionList(expression);
+ static ExpressionParserFactory getDefault() {
+ return FactoryUtil.discoverFactory(
+ Thread.currentThread().getContextClassLoader(),
+ ExpressionParserFactory.class,
+ DEFAULT_IDENTIFIER);
}
}
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/PlannerExpressionParser.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/PlannerExpressionParser.java
deleted file mode 100644
index 9b0e6e9..0000000
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/PlannerExpressionParser.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.delegation;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.table.api.TableException;
-import org.apache.flink.table.expressions.Expression;
-import org.apache.flink.table.expressions.ExpressionParser;
-
-import java.lang.reflect.Constructor;
-import java.util.List;
-
-/**
- * Temporary utility for parsing expressions inside a String. This parses
exactly the same
- * expressions that would be accepted by the Scala Expression DSL.
- *
- * <p>{@link PlannerExpressionParser} is used by {@link ExpressionParser} to
parse expressions.
- */
-@Internal
-public interface PlannerExpressionParser {
-
- static PlannerExpressionParser create() {
- return SingletonPlannerExpressionParser.getExpressionParser();
- }
-
- Expression parseExpression(String exprString);
-
- List<Expression> parseExpressionList(String expression);
-
- /**
- * Util class to create {@link PlannerExpressionParser} instance. Use
singleton pattern to avoid
- * creating many {@link PlannerExpressionParser}.
- */
- class SingletonPlannerExpressionParser {
-
- private static volatile PlannerExpressionParser expressionParser;
-
- private SingletonPlannerExpressionParser() {}
-
- public static PlannerExpressionParser getExpressionParser() {
-
- if (expressionParser == null) {
- try {
- Class<?> clazz =
- Class.forName(
-
"org.apache.flink.table.expressions.PlannerExpressionParserImpl");
- Constructor<?> con = clazz.getConstructor();
- expressionParser = (PlannerExpressionParser)
con.newInstance();
- } catch (Throwable t) {
- throw new TableException(
- "Construction of PlannerExpressionParserImpl class
failed.", t);
- }
- }
- return expressionParser;
- }
- }
-}
diff --git
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/WindowCreationValidationTest.java
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/WindowCreationValidationTest.java
deleted file mode 100644
index ef1e1f0..0000000
---
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/WindowCreationValidationTest.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.api;
-
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-
-/** Test failures for the creation of window. */
-public class WindowCreationValidationTest {
-
- @Rule public final ExpectedException exception = ExpectedException.none();
-
- @Test
- public void testTumbleOverForString() {
- exception.expect(TableException.class);
- exception.expectMessage("Construction of PlannerExpressionParserImpl
class failed.");
- Tumble.over("4.hours");
- }
-
- @Test
- public void testSlideOverForString() {
- exception.expect(TableException.class);
- exception.expectMessage("Construction of PlannerExpressionParserImpl
class failed.");
- Slide.over("4.hours");
- }
-
- @Test
- public void testSessionWithGapForString() {
- exception.expect(TableException.class);
- exception.expectMessage("Construction of PlannerExpressionParserImpl
class failed.");
- Session.withGap("4.hours");
- }
-
- @Test
- public void testOverWithPartitionByForString() {
- exception.expect(TableException.class);
- exception.expectMessage("Construction of PlannerExpressionParserImpl
class failed.");
- Over.partitionBy("a");
- }
-}
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/delegation/DefaultExpressionParserFactory.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/delegation/DefaultExpressionParserFactory.java
new file mode 100644
index 0000000..fe1753b
--- /dev/null
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/delegation/DefaultExpressionParserFactory.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.delegation;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.table.delegation.ExpressionParser;
+import org.apache.flink.table.delegation.ExpressionParserFactory;
+import org.apache.flink.table.expressions.ExpressionParserImpl;
+
+import java.util.Collections;
+import java.util.Set;
+
+/** Default factory for {@link ExpressionParser}. */
+@Internal
+public final class DefaultExpressionParserFactory implements
ExpressionParserFactory {
+
+ @Override
+ public ExpressionParser create() {
+ return new ExpressionParserImpl();
+ }
+
+ @Override
+ public String factoryIdentifier() {
+ return ExpressionParserFactory.DEFAULT_IDENTIFIER;
+ }
+
+ @Override
+ public Set<ConfigOption<?>> requiredOptions() {
+ return Collections.emptySet();
+ }
+
+ @Override
+ public Set<ConfigOption<?>> optionalOptions() {
+ return Collections.emptySet();
+ }
+}
diff --git
a/flink-table/flink-table-planner/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
b/flink-table/flink-table-planner/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
index ac5f9b6..6d9eff6 100644
---
a/flink-table/flink-table-planner/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
+++
b/flink-table/flink-table-planner/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -16,3 +16,4 @@
org.apache.flink.table.planner.delegation.DefaultExecutorFactory
org.apache.flink.table.planner.delegation.DefaultParserFactory
org.apache.flink.table.planner.delegation.DefaultPlannerFactory
+org.apache.flink.table.planner.delegation.DefaultExpressionParserFactory
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionParserImpl.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/ExpressionParserImpl.scala
similarity index 97%
rename from
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionParserImpl.scala
rename to
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/ExpressionParserImpl.scala
index 811e4f3..56a688b 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionParserImpl.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/ExpressionParserImpl.scala
@@ -19,32 +19,31 @@ package org.apache.flink.table.expressions
import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation}
import org.apache.flink.table.api._
-import org.apache.flink.table.delegation.PlannerExpressionParser
import ApiExpressionUtils._
+import org.apache.flink.table.delegation.ExpressionParser
import org.apache.flink.table.functions.BuiltInFunctionDefinitions
import
org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType
import _root_.java.math.{BigDecimal => JBigDecimal}
import _root_.java.util.{List => JList}
-
import _root_.scala.collection.JavaConversions._
import _root_.scala.language.implicitConversions
import _root_.scala.util.parsing.combinator.{JavaTokenParsers, PackratParsers}
/**
- * The implementation of a [[PlannerExpressionParser]] which parsers
expressions inside a String.
+ * The implementation of a [[ExpressionParser]] which parsers expressions
inside a String.
*
* <p><strong>WARNING</strong>: please keep this class in sync with
PlannerExpressionParserImpl
* variant in flink-table-planner module.
*/
-class PlannerExpressionParserImpl extends PlannerExpressionParser {
+class ExpressionParserImpl extends ExpressionParser {
- def parseExpression(exprString: String): Expression = {
- PlannerExpressionParserImpl.parseExpression(exprString)
+ override def parseExpression(exprString: String): Expression = {
+ ExpressionParserImpl.parseExpression(exprString)
}
override def parseExpressionList(expression: String): JList[Expression] = {
- PlannerExpressionParserImpl.parseExpressionList(expression)
+ ExpressionParserImpl.parseExpressionList(expression)
}
}
@@ -56,9 +55,9 @@ class PlannerExpressionParserImpl extends
PlannerExpressionParser {
* available in the Scala Expression DSL. This parser must be kept in sync
with the Scala DSL
* lazy valined in the above files.
*/
-object PlannerExpressionParserImpl extends JavaTokenParsers
+object ExpressionParserImpl extends JavaTokenParsers
with PackratParsers
- with PlannerExpressionParser {
+ with ExpressionParser {
case class Keyword(key: String)
@@ -140,7 +139,7 @@ object PlannerExpressionParserImpl extends JavaTokenParsers
lazy val TRIM_MODE_BOTH: Keyword = Keyword("BOTH")
lazy val TO: Keyword = Keyword("TO")
- def functionIdent: PlannerExpressionParserImpl.Parser[String] = super.ident
+ def functionIdent: ExpressionParserImpl.Parser[String] = super.ident
// symbols
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/KeywordParseTest.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/KeywordParseTest.scala
index f9a6ced..2e613f9 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/KeywordParseTest.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/KeywordParseTest.scala
@@ -18,10 +18,9 @@
package org.apache.flink.table.planner.expressions
-import org.apache.flink.table.expressions.ExpressionParser
+import org.apache.flink.table.delegation.ExpressionParser
import org.apache.flink.table.expressions.ApiExpressionUtils.{lookupCall,
unresolvedCall, unresolvedRef}
import org.apache.flink.table.functions.BuiltInFunctionDefinitions
-
import org.junit.Assert.assertEquals
import org.junit.Test
@@ -34,30 +33,30 @@ class KeywordParseTest {
def testKeyword(): Unit = {
assertEquals(
unresolvedCall(BuiltInFunctionDefinitions.ORDER_ASC,
unresolvedRef("f0")),
- ExpressionParser.parseExpression("f0.asc"))
+ ExpressionParser.INSTANCE.parseExpression("f0.asc"))
assertEquals(
unresolvedCall(BuiltInFunctionDefinitions.ORDER_ASC,
unresolvedRef("f0")),
- ExpressionParser.parseExpression("f0.asc()"))
+ ExpressionParser.INSTANCE.parseExpression("f0.asc()"))
}
@Test
def testKeywordAsPrefixInFunctionName(): Unit = {
assertEquals(
lookupCall("ascii", unresolvedRef("f0")),
- ExpressionParser.parseExpression("f0.ascii()"))
+ ExpressionParser.INSTANCE.parseExpression("f0.ascii()"))
}
@Test
def testKeywordAsInfixInFunctionName(): Unit = {
assertEquals(
lookupCall("iiascii", unresolvedRef("f0")),
- ExpressionParser.parseExpression("f0.iiascii()"))
+ ExpressionParser.INSTANCE.parseExpression("f0.iiascii()"))
}
@Test
def testKeywordAsSuffixInFunctionName(): Unit = {
assertEquals(
lookupCall("iiasc", unresolvedRef("f0")),
- ExpressionParser.parseExpression("f0.iiasc()"))
+ ExpressionParser.INSTANCE.parseExpression("f0.iiasc()"))
}
}
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala
index d6b7101..396ebca 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala
@@ -19,9 +19,9 @@
package org.apache.flink.table.planner.expressions
import org.apache.flink.table.api._
-import org.apache.flink.table.expressions.{Expression, ExpressionParser,
TimeIntervalUnit, TimePointUnit}
+import org.apache.flink.table.delegation.ExpressionParser
+import org.apache.flink.table.expressions.{Expression, TimeIntervalUnit,
TimePointUnit}
import org.apache.flink.table.planner.expressions.utils.ScalarTypesTestBase
-
import org.junit.Test
class ScalarFunctionsTest extends ScalarTypesTestBase {
@@ -112,7 +112,7 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
cases.foreach(x => {
testAllApis(
- ExpressionParser.parseExpression(x._1),
+ ExpressionParser.INSTANCE.parseExpression(x._1),
x._1,
x._2,
x._3
@@ -3959,7 +3959,7 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
val tableApiString = x._1.format(field)
val sqlApiString = x._2.format(field)
testAllApis(
- ExpressionParser.parseExpression(tableApiString),
+ ExpressionParser.INSTANCE.parseExpression(tableApiString),
tableApiString,
sqlApiString,
"null"
@@ -3985,7 +3985,7 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
.format(x)
testAllApis(
- ExpressionParser.parseExpression(tableApiString),
+ ExpressionParser.INSTANCE.parseExpression(tableApiString),
tableApiString,
sqlApiString,
"null"
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala
index 7cec8b0..5204b94 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala
@@ -39,7 +39,8 @@ import org.apache.flink.table.data.binary.BinaryRowData
import org.apache.flink.table.data.conversion.{DataStructureConverter,
DataStructureConverters}
import org.apache.flink.table.data.util.DataFormatConverters
import
org.apache.flink.table.data.util.DataFormatConverters.DataFormatConverter
-import org.apache.flink.table.expressions.{Expression, ExpressionParser}
+import org.apache.flink.table.delegation.ExpressionParser
+import org.apache.flink.table.expressions.Expression
import org.apache.flink.table.functions.ScalarFunction
import org.apache.flink.table.planner.codegen.{CodeGeneratorContext,
ExprCodeGenerator, FunctionCodeGenerator}
import org.apache.flink.table.planner.delegation.PlannerBase
@@ -55,7 +56,6 @@ import org.junit.{After, Before, Rule}
import scala.collection.JavaConverters._
import scala.collection.mutable
-import scala.collection.mutable.ListBuffer
abstract class ExpressionTestBase {
@@ -276,7 +276,8 @@ abstract class ExpressionTestBase {
}
private def testTableApiTestExpr(tableApiString: String, expected: String):
Unit = {
- addTableApiTestExpr(ExpressionParser.parseExpression(tableApiString),
expected, validExprs)
+ addTableApiTestExpr(
+ ExpressionParser.INSTANCE.parseExpression(tableApiString), expected,
validExprs)
}
private def addSqlTestExpr(
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractorTest.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractorTest.scala
index 0388d32..6658c36 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractorTest.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractorTest.scala
@@ -22,7 +22,7 @@ import org.apache.flink.api.common.typeinfo.Types
import org.apache.flink.table.api.{DataTypes, TableConfig}
import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog}
import org.apache.flink.table.expressions.ApiExpressionUtils.{unresolvedCall,
unresolvedRef, valueLiteral}
-import org.apache.flink.table.expressions.{Expression, ExpressionParser}
+import org.apache.flink.table.expressions.Expression
import org.apache.flink.table.functions.BuiltInFunctionDefinitions.{EQUALS,
GREATER_THAN, LESS_THAN, LESS_THAN_OR_EQUAL}
import org.apache.flink.table.functions.{AggregateFunctionDefinition,
FunctionIdentifier}
import org.apache.flink.table.module.ModuleManager
@@ -32,13 +32,13 @@ import
org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable
import org.apache.flink.table.planner.functions.utils.ScalarSqlFunction
import org.apache.flink.table.planner.utils.{DateTimeTestUtil,
IntSumAggFunction}
import org.apache.flink.table.utils.CatalogManagerMocks
-
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rex.{RexBuilder, RexNode}
import org.apache.calcite.sql.SqlPostfixOperator
import org.apache.calcite.sql.`type`.SqlTypeName
import org.apache.calcite.sql.fun.{SqlStdOperatorTable, SqlTrimFunction}
import org.apache.calcite.util.{DateString, TimeString, TimestampString}
+import org.apache.flink.table.delegation.ExpressionParser
import org.hamcrest.CoreMatchers.is
import org.junit.Assert.{assertArrayEquals, assertEquals, assertThat,
assertTrue}
import org.junit.Test
@@ -46,7 +46,6 @@ import org.junit.Test
import java.math.BigDecimal
import java.time.ZoneId
import java.util.{Arrays, TimeZone, List => JList}
-
import scala.collection.JavaConverters._
/**
@@ -112,8 +111,8 @@ class RexNodeExtractorTest extends RexNodeTestBase {
val builder: RexBuilder = new FlinkRexBuilder(typeFactory)
val expr = buildConditionExpr()
- val firstExp = ExpressionParser.parseExpression("id > 6")
- val secondExp = ExpressionParser.parseExpression("amount * price < 100")
+ val firstExp = ExpressionParser.INSTANCE.parseExpression("id > 6")
+ val secondExp = ExpressionParser.INSTANCE.parseExpression("amount * price
< 100")
val expected: Array[Expression] = Array(firstExp, secondExp)
val (convertedExpressions, unconvertedRexNodes) =
@@ -147,7 +146,8 @@ class RexNodeExtractorTest extends RexNodeTestBase {
relBuilder,
functionCatalog)
- val expected: Array[Expression] =
Array(ExpressionParser.parseExpression("amount >= id"))
+ val expected: Array[Expression] = Array(
+ ExpressionParser.INSTANCE.parseExpression("amount >= id"))
assertExpressionArrayEquals(expected, convertedExpressions)
assertEquals(0, unconvertedRexNodes.length)
}
@@ -197,9 +197,9 @@ class RexNodeExtractorTest extends RexNodeTestBase {
functionCatalog)
val expected: Array[Expression] = Array(
- ExpressionParser.parseExpression("amount < 100 || price == 100 || price
=== 200"),
- ExpressionParser.parseExpression("id > 100 || price == 100 || price ===
200"),
- ExpressionParser.parseExpression("!(amount <= id)"))
+ ExpressionParser.INSTANCE.parseExpression("amount < 100 || price == 100
|| price === 200"),
+ ExpressionParser.INSTANCE.parseExpression("id > 100 || price == 100 ||
price === 200"),
+ ExpressionParser.INSTANCE.parseExpression("!(amount <= id)"))
assertExpressionArrayEquals(expected, convertedExpressions)
assertEquals(0, unconvertedRexNodes.length)
}
@@ -237,10 +237,10 @@ class RexNodeExtractorTest extends RexNodeTestBase {
functionCatalog)
val expected: Array[Expression] = Array(
- ExpressionParser.parseExpression("amount < 100"),
- ExpressionParser.parseExpression("amount <= id"),
- ExpressionParser.parseExpression("id > 100"),
- ExpressionParser.parseExpression("price === 100")
+ ExpressionParser.INSTANCE.parseExpression("amount < 100"),
+ ExpressionParser.INSTANCE.parseExpression("amount <= id"),
+ ExpressionParser.INSTANCE.parseExpression("id > 100"),
+ ExpressionParser.INSTANCE.parseExpression("price === 100")
)
assertExpressionArrayEquals(expected, convertedExpressions)
@@ -393,16 +393,16 @@ class RexNodeExtractorTest extends RexNodeTestBase {
functionCatalog)
val expected: Array[Expression] = Array(
- ExpressionParser.parseExpression("amount < id"),
- ExpressionParser.parseExpression("amount <= id"),
- ExpressionParser.parseExpression("amount <> id"),
- ExpressionParser.parseExpression("amount == id"),
- ExpressionParser.parseExpression("amount >= id"),
- ExpressionParser.parseExpression("amount > id"),
- ExpressionParser.parseExpression("amount + id == 100"),
- ExpressionParser.parseExpression("amount - id == 100"),
- ExpressionParser.parseExpression("amount * id == 100"),
- ExpressionParser.parseExpression("amount / id == 100")
+ ExpressionParser.INSTANCE.parseExpression("amount < id"),
+ ExpressionParser.INSTANCE.parseExpression("amount <= id"),
+ ExpressionParser.INSTANCE.parseExpression("amount <> id"),
+ ExpressionParser.INSTANCE.parseExpression("amount == id"),
+ ExpressionParser.INSTANCE.parseExpression("amount >= id"),
+ ExpressionParser.INSTANCE.parseExpression("amount > id"),
+ ExpressionParser.INSTANCE.parseExpression("amount + id == 100"),
+ ExpressionParser.INSTANCE.parseExpression("amount - id == 100"),
+ ExpressionParser.INSTANCE.parseExpression("amount * id == 100"),
+ ExpressionParser.INSTANCE.parseExpression("amount / id == 100")
)
assertExpressionArrayEquals(expected, convertedExpressions)
assertEquals(0, unconvertedRexNodes.length)
@@ -508,7 +508,7 @@ class RexNodeExtractorTest extends RexNodeTestBase {
assertEquals(0, unconvertedRexNodes.length)
assertExpressionArrayEquals(
- Array(ExpressionParser.parseExpression("amount <= id")),
+ Array(ExpressionParser.INSTANCE.parseExpression("amount <= id")),
Array(convertedExpressions(1)))
}
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/BatchTableEnvUtil.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/BatchTableEnvUtil.scala
index d801362..f389897 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/BatchTableEnvUtil.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/BatchTableEnvUtil.scala
@@ -24,14 +24,13 @@ import org.apache.flink.api.java.io.CollectionInputFormat
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.table.api.internal.TableEnvironmentImpl
import org.apache.flink.table.api.{Table, TableEnvironment}
-import org.apache.flink.table.expressions.ExpressionParser
+import org.apache.flink.table.delegation.ExpressionParser
import org.apache.flink.table.planner.delegation.PlannerBase
import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil
import org.apache.flink.table.planner.plan.stats.FlinkStatistic
import org.apache.flink.table.planner.utils.TableTestUtil
import _root_.java.util.UUID
-
import _root_.scala.collection.JavaConverters._
import scala.reflect.ClassTag
@@ -222,7 +221,8 @@ object BatchTableEnvUtil {
fieldNames: Option[Array[String]],
fieldNullables: Option[Array[Boolean]],
statistic: Option[FlinkStatistic]): Unit = {
- val fields = fieldNames.map((f: Array[String]) =>
f.map(ExpressionParser.parseExpression))
+ val fields = fieldNames.map((f: Array[String]) =>
+ f.map(ExpressionParser.INSTANCE.parseExpression))
// for tests we know that this stream is definitely bounded
ExecNodeUtil.makeLegacySourceTransformationsBounded(boundedStream.getTransformation)
TableTestUtil.createTemporaryView(
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/CollectionBatchExecTable.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/CollectionBatchExecTable.scala
index b5bc5b8..c30a090 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/CollectionBatchExecTable.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/CollectionBatchExecTable.scala
@@ -19,7 +19,8 @@ package org.apache.flink.table.planner.runtime.utils
import org.apache.flink.api.scala._
import org.apache.flink.table.api.{Table, TableEnvironment}
-import org.apache.flink.table.expressions.{Expression, ExpressionParser}
+import org.apache.flink.table.delegation.ExpressionParser
+import org.apache.flink.table.expressions.Expression
import scala.collection.mutable
import scala.util.Random
@@ -284,7 +285,7 @@ object CollectionBatchExecTable {
if (fields == null) {
null
} else {
- ExpressionParser.parseExpressionList(fields).toArray(Array[Expression]())
+
ExpressionParser.INSTANCE.parseExpressionList(fields).toArray(Array[Expression]())
}
}
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamTableEnvUtil.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamTableEnvUtil.scala
index 27dd611..1bebf6d 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamTableEnvUtil.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamTableEnvUtil.scala
@@ -21,7 +21,8 @@ package org.apache.flink.table.planner.runtime.utils
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
-import org.apache.flink.table.expressions.{Expression, ExpressionParser}
+import org.apache.flink.table.delegation.ExpressionParser
+import org.apache.flink.table.expressions.Expression
import org.apache.flink.table.planner.plan.stats.FlinkStatistic
import org.apache.flink.table.planner.utils.TableTestUtil
@@ -44,7 +45,7 @@ object StreamTableEnvUtil {
fieldNullables: Option[Array[Boolean]],
statistic: Option[FlinkStatistic]): Unit = {
val fields: Option[Array[Expression]] = fieldNames match {
- case Some(names) => Some(names.map(ExpressionParser.parseExpression))
+ case Some(names) =>
Some(names.map(ExpressionParser.INSTANCE.parseExpression))
case _ => None
}
TableTestUtil.createTemporaryView(tEnv, name, dataStream, fields,
fieldNullables, statistic)