This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 46bb8ee [FLINK-18569][table] Support limit() for unordered tables
46bb8ee is described below
commit 46bb8eee3fdc69ec0049c6cb1cc40e9b40e3aaa7
Author: Timo Walther <[email protected]>
AuthorDate: Tue Jul 14 13:56:27 2020 +0200
[FLINK-18569][table] Support limit() for unordered tables
This closes #12904.
---
docs/dev/table/tableApi.md | 25 +++----
flink-python/pyflink/table/table.py | 25 ++++---
.../pyflink/table/tests/test_table_completeness.py | 2 +-
.../java/org/apache/flink/table/api/Table.java | 46 +++++++++----
.../operations/utils/OperationTreeBuilder.java | 9 +--
.../operations/utils/SortOperationFactory.java | 76 ++++++++++------------
.../table/validation/SortValidationTest.scala | 16 -----
.../validation/UnsupportedOpsValidationTest.scala | 24 -------
.../planner/runtime/batch/sql/LimitITCase.scala | 9 ---
.../planner/runtime/batch/table/LimitITCase.scala | 58 +++++++++++++++++
.../table/validation/SortValidationTest.scala | 16 -----
.../validation/UnsupportedOpsValidationTest.scala | 18 -----
12 files changed, 161 insertions(+), 163 deletions(-)
diff --git a/docs/dev/table/tableApi.md b/docs/dev/table/tableApi.md
index 1fc323d..9e80729 100644
--- a/docs/dev/table/tableApi.md
+++ b/docs/dev/table/tableApi.md
@@ -1924,10 +1924,10 @@ result = left.select("a, b,
c").where("a.in(RightTable)")
<tr>
<td>
<strong>Order By</strong><br>
- <span class="label label-primary">Batch</span>
+ <span class="label label-primary">Batch</span> <span class="label
label-primary">Streaming</span>
</td>
<td>
- <p>Similar to a SQL ORDER BY clause. Returns records globally sorted
across all parallel partitions.</p>
+ <p>Similar to a SQL ORDER BY clause. Returns records globally sorted
across all parallel partitions. For unbounded tables, this operation requires a
sorting on a time attribute or a subsequent fetch operation.</p>
{% highlight java %}
Table in = tableEnv.fromDataSet(ds, "a, b, c");
Table result = in.orderBy($("a").asc()");
@@ -1938,10 +1938,10 @@ Table result = in.orderBy($("a").asc()");
<tr>
<td>
<strong>Offset & Fetch</strong><br>
- <span class="label label-primary">Batch</span>
+ <span class="label label-primary">Batch</span> <span class="label
label-primary">Streaming</span>
</td>
<td>
- <p>Similar to the SQL OFFSET and FETCH clauses. Offset and Fetch limit
the number of records returned from a sorted result. Offset and Fetch are
technically part of the Order By operator and thus must be preceded by it.</p>
+ <p>Similar to the SQL OFFSET and FETCH clauses. The offset operation
limits a (possibly sorted) result from an offset position. The fetch operation
limits a (possibly sorted) result to the first n rows. Usually, the two
operations are preceded by an ordering operator. For unbounded tables, a fetch
operation is required for an offset operation.</p>
{% highlight java %}
Table in = tableEnv.fromDataSet(ds, "a, b, c");
@@ -1973,10 +1973,10 @@ Table result3 =
in.orderBy($("a").asc()).offset(10).fetch(5);
<tr>
<td>
<strong>Order By</strong><br>
- <span class="label label-primary">Batch</span>
+ <span class="label label-primary">Batch</span> <span class="label
label-primary">Streaming</span>
</td>
<td>
- <p>Similar to a SQL ORDER BY clause. Returns records globally sorted
across all parallel partitions.</p>
+ <p>Similar to a SQL ORDER BY clause. Returns records globally sorted
across all parallel partitions. For unbounded tables, this operation requires a
sorting on a time attribute or a subsequent fetch operation.</p>
{% highlight scala %}
val in = ds.toTable(tableEnv, $"a", $"b", $"c")
val result = in.orderBy($"a".asc)
@@ -1987,10 +1987,11 @@ val result = in.orderBy($"a".asc)
<tr>
<td>
<strong>Offset & Fetch</strong><br>
- <span class="label label-primary">Batch</span>
+ <span class="label label-primary">Batch</span> <span class="label
label-primary">Streaming</span>
+ <span class="label label-info">Result Updating</span>
</td>
<td>
- <p>Similar to the SQL OFFSET and FETCH clauses. Offset and Fetch limit
the number of records returned from a sorted result. Offset and Fetch are
technically part of the Order By operator and thus must be preceded by it.</p>
+ <p>Similar to the SQL OFFSET and FETCH clauses. The offset operation
limits a (possibly sorted) result from an offset position. The fetch operation
limits a (possibly sorted) result to the first n rows. Usually, the two
operations are preceded by an ordering operator. For unbounded tables, a fetch
operation is required for an offset operation.</p>
{% highlight scala %}
val in = ds.toTable(tableEnv, $"a", $"b", $"c")
@@ -2021,10 +2022,10 @@ val result3: Table =
in.orderBy($"a".asc).offset(10).fetch(5)
<tr>
<td>
<strong>Order By</strong><br>
- <span class="label label-primary">Batch</span>
+ <span class="label label-primary">Batch</span> <span class="label
label-primary">Streaming</span>
</td>
<td>
- <p>Similar to a SQL ORDER BY clause. Returns records globally sorted
across all parallel partitions.</p>
+ <p>Similar to a SQL ORDER BY clause. Returns records globally sorted
across all parallel partitions. For unbounded tables, this operation requires a
sorting on a time attribute or a subsequent fetch operation.</p>
{% highlight python %}
in = table_env.from_path("Source1").select("a, b, c")
result = in.order_by("a.asc")
@@ -2035,10 +2036,10 @@ result = in.order_by("a.asc")
<tr>
<td>
<strong>Offset & Fetch</strong><br>
- <span class="label label-primary">Batch</span>
+ <span class="label label-primary">Batch</span> <span class="label
label-primary">Streaming</span>
</td>
<td>
- <p>Similar to the SQL OFFSET and FETCH clauses. Offset and Fetch limit
the number of records returned from a sorted result. Offset and Fetch are
technically part of the Order By operator and thus must be preceded by it.</p>
+ <p>Similar to the SQL OFFSET and FETCH clauses. The offset operation
limits a (possibly sorted) result from an offset position. The fetch operation
limits a (possibly sorted) result to the first n rows. Usually, the two
operations are preceded by an ordering operator. For unbounded tables, a fetch
operation is required for an offset operation.</p>
{% highlight python %}
in = table_env.from_path("Source1").select("a, b, c")
diff --git a/flink-python/pyflink/table/table.py
b/flink-python/pyflink/table/table.py
index e140825..2f55680 100644
--- a/flink-python/pyflink/table/table.py
+++ b/flink-python/pyflink/table/table.py
@@ -478,6 +478,9 @@ class Table(object):
>>> tab.order_by("name.desc")
+ For unbounded tables, this operation requires a sorting on a time
attribute or a subsequent
+ fetch operation.
+
:param fields: Order fields expression string.
:type fields: str
:return: The result table.
@@ -487,11 +490,11 @@ class Table(object):
def offset(self, offset):
"""
- Limits a sorted result from an offset position.
- Similar to a SQL OFFSET clause. Offset is technically part of the
Order By operator and
- thus must be preceded by it.
- :func:`~pyflink.table.Table.offset` can be combined with a subsequent
- :func:`~pyflink.table.Table.fetch` call to return n rows after
skipping the first o rows.
+ Limits a (possibly sorted) result from an offset position.
+
+ This method can be combined with a preceding
:func:`~pyflink.table.Table.order_by` call for
+ a deterministic order and a subsequent
:func:`~pyflink.table.Table.fetch` call to return n
+ rows after skipping the first o rows.
Example:
::
@@ -501,6 +504,8 @@ class Table(object):
# skips the first 10 rows and returns the next 5 rows.
>>> tab.order_by("name.desc").offset(10).fetch(5)
+ For unbounded tables, this operation requires a subsequent fetch
operation.
+
:param offset: Number of records to skip.
:type offset: int
:return: The result table.
@@ -510,11 +515,11 @@ class Table(object):
def fetch(self, fetch):
"""
- Limits a sorted result to the first n rows.
- Similar to a SQL FETCH clause. Fetch is technically part of the Order
By operator and
- thus must be preceded by it.
- :func:`~pyflink.table.Table.offset` can be combined with a preceding
- :func:`~pyflink.table.Table.fetch` call to return n rows after
skipping the first o rows.
+ Limits a (possibly sorted) result to the first n rows.
+
+ This method can be combined with a preceding
:func:`~pyflink.table.Table.order_by` call for
+ a deterministic order and :func:`~pyflink.table.Table.offset` call to
return n rows after
+ skipping the first o rows.
Example:
diff --git a/flink-python/pyflink/table/tests/test_table_completeness.py
b/flink-python/pyflink/table/tests/test_table_completeness.py
index 20e81e1..31007af 100644
--- a/flink-python/pyflink/table/tests/test_table_completeness.py
+++ b/flink-python/pyflink/table/tests/test_table_completeness.py
@@ -43,7 +43,7 @@ class
TableAPICompletenessTests(PythonAPICompletenessTestCase, unittest.TestCase
# complete type system, which does not exist currently. It will be
implemented after
# FLINK-12408 is merged. So we exclude this method for the time being.
return {'map', 'flatMap', 'flatAggregate', 'aggregate',
'leftOuterJoinLateral',
- 'createTemporalTableFunction', 'joinLateral',
'getQueryOperation'}
+ 'createTemporalTableFunction', 'joinLateral',
'getQueryOperation', 'limit'}
@classmethod
def java_method_name(cls, python_method_name):
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java
index 1a5825a..61d07ef 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java
@@ -933,10 +933,11 @@ public interface Table {
Table orderBy(String fields);
/**
- * Sorts the given {@link Table}. Similar to SQL ORDER BY.
- * The resulting Table is globally sorted across all parallel
partitions.
+ * Sorts the given {@link Table}. Similar to SQL {@code ORDER BY}.
*
- * <p>Scala Example:
+ * <p>The resulting Table is globally sorted across all parallel
partitions.
+ *
+ * <p>Java Example:
*
* <pre>
* {@code
@@ -951,16 +952,17 @@ public interface Table {
* tab.orderBy($"name".desc)
* }
* </pre>
+ *
+ * <p>For unbounded tables, this operation requires a sorting on a time
attribute or a subsequent
+ * fetch operation.
*/
Table orderBy(Expression... fields);
/**
- * Limits a sorted result from an offset position.
- * Similar to a SQL OFFSET clause. Offset is technically part of the
Order By operator and
- * thus must be preceded by it.
+ * Limits a (possibly sorted) result from an offset position.
*
- * {@link Table#offset(int offset)} can be combined with a subsequent
- * {@link Table#fetch(int fetch)} call to return n rows after skipping
the first o rows.
+ * <p>This method can be combined with a preceding {@link
#orderBy(Expression...)} call for a deterministic
+ * order and a subsequent {@link #fetch(int)} call to return n rows
after skipping the first o rows.
*
* <pre>
* {@code
@@ -971,17 +973,17 @@ public interface Table {
* }
* </pre>
*
+ * <p>For unbounded tables, this operation requires a subsequent fetch
operation.
+ *
* @param offset number of records to skip
*/
Table offset(int offset);
/**
- * Limits a sorted result to the first n rows.
- * Similar to a SQL FETCH clause. Fetch is technically part of the
Order By operator and
- * thus must be preceded by it.
+ * Limits a (possibly sorted) result to the first n rows.
*
- * {@link Table#fetch(int fetch)} can be combined with a preceding
- * {@link Table#offset(int offset)} call to return n rows after
skipping the first o rows.
+ * <p>This method can be combined with a preceding {@link
#orderBy(Expression...)} call for a deterministic
+ * order and {@link #offset(int)} call to return n rows after skipping
the first o rows.
*
* <pre>
* {@code
@@ -997,6 +999,24 @@ public interface Table {
Table fetch(int fetch);
/**
+ * Limits a (possibly sorted) result to the first n rows.
+ *
+ * <p>This method is a synonym for {@link #fetch(int)}.
+ */
+ default Table limit(int fetch) {
+ return fetch(fetch);
+ }
+
+ /**
+ * Limits a (possibly sorted) result to the first n rows from an offset
position.
+ *
+ * <p>This method is a synonym for {@link #offset(int)} followed by
{@link #fetch(int)}.
+ */
+ default Table limit(int offset, int fetch) {
+ return offset(offset).fetch(fetch);
+ }
+
+ /**
* Writes the {@link Table} to a {@link TableSink} that was registered
under the specified path.
* For the path resolution algorithm see {@link
TableEnvironment#useDatabase(String)}.
*
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/OperationTreeBuilder.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/OperationTreeBuilder.java
index 9aad88a..057dfd3 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/OperationTreeBuilder.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/OperationTreeBuilder.java
@@ -138,7 +138,7 @@ public final class OperationTreeBuilder {
typeFactory,
tableReferenceLookup,
new ProjectionOperationFactory(),
- new SortOperationFactory(isStreamingMode),
+ new SortOperationFactory(),
new CalculatedTableFactory(),
new SetOperationFactory(isStreamingMode),
new AggregateOperationFactory(isStreamingMode),
@@ -402,7 +402,6 @@ public final class OperationTreeBuilder {
}
public QueryOperation sort(List<Expression> fields, QueryOperation
child) {
-
ExpressionResolver resolver = getResolver(child);
List<ResolvedExpression> resolvedFields =
resolver.resolve(fields);
@@ -410,11 +409,13 @@ public final class OperationTreeBuilder {
}
public QueryOperation limitWithOffset(int offset, QueryOperation child)
{
- return sortOperationFactory.createLimitWithOffset(offset,
child);
+ ExpressionResolver resolver = getResolver(child);
+ return sortOperationFactory.createLimitWithOffset(offset,
child, resolver.postResolverFactory());
}
public QueryOperation limitWithFetch(int fetch, QueryOperation child) {
- return sortOperationFactory.createLimitWithFetch(fetch, child);
+ ExpressionResolver resolver = getResolver(child);
+ return sortOperationFactory.createLimitWithFetch(fetch, child,
resolver.postResolverFactory());
}
public QueryOperation alias(List<Expression> fields, QueryOperation
child) {
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/SortOperationFactory.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/SortOperationFactory.java
index 7147641..472e9b6 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/SortOperationFactory.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/SortOperationFactory.java
@@ -22,11 +22,12 @@ import org.apache.flink.annotation.Internal;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.expressions.CallExpression;
import org.apache.flink.table.expressions.ResolvedExpression;
-import org.apache.flink.table.expressions.resolver.ExpressionResolver;
+import
org.apache.flink.table.expressions.resolver.ExpressionResolver.PostResolverFactory;
import
org.apache.flink.table.expressions.utils.ResolvedExpressionDefaultVisitor;
import org.apache.flink.table.operations.QueryOperation;
import org.apache.flink.table.operations.SortQueryOperation;
+import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
@@ -40,28 +41,22 @@ import static
org.apache.flink.table.functions.BuiltInFunctionDefinitions.ORDER_
@Internal
final class SortOperationFactory {
- private final boolean isStreamingMode;
-
- SortOperationFactory(boolean isStreamingMode) {
- this.isStreamingMode = isStreamingMode;
- }
-
/**
- * Creates a valid {@link SortQueryOperation} operation.
+ * Creates a valid {@link SortQueryOperation}.
*
- * <p><b>NOTE:</b> if the collation is not explicitly specified for any
expression, it is wrapped in a
- * default ascending order
+ * <p><b>NOTE:</b> If the collation is not explicitly specified for an
expression, the expression
+ * is wrapped in a default ascending order. If no expression is
specified, the result is not sorted
+ * but only limited.
*
- * @param orders expressions describing order,
+ * @param orders expressions describing order
* @param child relational expression on top of which to apply the sort
operation
+ * @param postResolverFactory factory for creating resolved expressions
* @return valid sort operation
*/
QueryOperation createSort(
List<ResolvedExpression> orders,
QueryOperation child,
- ExpressionResolver.PostResolverFactory
postResolverFactory) {
- failIfStreaming();
-
+ PostResolverFactory postResolverFactory) {
final OrderWrapper orderWrapper = new
OrderWrapper(postResolverFactory);
List<ResolvedExpression> convertedOrders = orders.stream()
@@ -71,14 +66,18 @@ final class SortOperationFactory {
}
/**
- * Adds offset to the underlying {@link SortQueryOperation} if it is a
valid one.
+ * Creates a valid {@link SortQueryOperation} with offset (possibly
merged into a preceding {@link SortQueryOperation}).
*
- * @param offset offset to add
- * @param child should be {@link SortQueryOperation}
+ * @param offset offset to start from
+ * @param child relational expression on top of which to apply the sort
operation
+ * @param postResolverFactory factory for creating resolved expressions
* @return valid sort operation with applied offset
*/
- QueryOperation createLimitWithOffset(int offset, QueryOperation child) {
- SortQueryOperation previousSort =
validateAndGetChildSort(child);
+ QueryOperation createLimitWithOffset(
+ int offset,
+ QueryOperation child,
+ PostResolverFactory postResolverFactory) {
+ SortQueryOperation previousSort =
validateAndGetChildSort(child, postResolverFactory);
if (offset < 0) {
throw new ValidationException("Offset should be greater
or equal 0");
@@ -92,14 +91,18 @@ final class SortOperationFactory {
}
/**
- * Adds fetch to the underlying {@link SortQueryOperation} if it is a
valid one.
+ * Creates a valid {@link SortQueryOperation} with fetch (possibly
merged into a preceding {@link SortQueryOperation}).
*
- * @param fetch fetch number to add
- * @param child should be {@link SortQueryOperation}
- * @return valid sort operation with applied fetch
+ * @param fetch fetch to limit
+ * @param child relational expression on top of which to apply the sort
operation
+ * @param postResolverFactory factory for creating resolved expressions
+ * @return valid sort operation with applied offset
*/
- QueryOperation createLimitWithFetch(int fetch, QueryOperation child) {
- SortQueryOperation previousSort =
validateAndGetChildSort(child);
+ QueryOperation createLimitWithFetch(
+ int fetch,
+ QueryOperation child,
+ PostResolverFactory postResolverFactory) {
+ SortQueryOperation previousSort =
validateAndGetChildSort(child, postResolverFactory);
if (fetch < 0) {
throw new ValidationException("Fetch should be greater
or equal 0");
@@ -110,15 +113,14 @@ final class SortOperationFactory {
return new SortQueryOperation(previousSort.getOrder(),
previousSort.getChild(), offset, fetch);
}
- private SortQueryOperation validateAndGetChildSort(QueryOperation
child) {
- failIfStreaming();
-
- if (!(child instanceof SortQueryOperation)) {
- throw new ValidationException("A limit operation must
be preceded by a sort operation.");
+ private SortQueryOperation validateAndGetChildSort(QueryOperation
child, PostResolverFactory postResolverFactory) {
+ final SortQueryOperation previousSort;
+ if (child instanceof SortQueryOperation) {
+ previousSort = (SortQueryOperation) child;
+ } else {
+ previousSort = (SortQueryOperation)
createSort(Collections.emptyList(), child, postResolverFactory);
}
- SortQueryOperation previousSort = (SortQueryOperation) child;
-
if ((previousSort).getFetch() != -1) {
throw new ValidationException("FETCH is already
defined.");
}
@@ -126,17 +128,11 @@ final class SortOperationFactory {
return previousSort;
}
- private void failIfStreaming() {
- if (isStreamingMode) {
- throw new ValidationException("A limit operation on
unbounded tables is currently not supported.");
- }
- }
-
private static class OrderWrapper extends
ResolvedExpressionDefaultVisitor<ResolvedExpression> {
- private ExpressionResolver.PostResolverFactory
postResolverFactory;
+ private PostResolverFactory postResolverFactory;
- OrderWrapper(ExpressionResolver.PostResolverFactory
postResolverFactory) {
+ OrderWrapper(PostResolverFactory postResolverFactory) {
this.postResolverFactory = postResolverFactory;
}
diff --git
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/table/validation/SortValidationTest.scala
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/table/validation/SortValidationTest.scala
index 713a4e1..b4aeeb5 100644
---
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/table/validation/SortValidationTest.scala
+++
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/table/validation/SortValidationTest.scala
@@ -27,22 +27,6 @@ import org.junit._
class SortValidationTest extends TableTestBase {
@Test(expected = classOf[ValidationException])
- def testOffsetWithoutOrder(): Unit = {
- val util = batchTestUtil()
- val ds = util.addTableSource[(Int, Long, String)]("Table3", 'a, 'b, 'c)
-
- ds.offset(5)
- }
-
- @Test(expected = classOf[ValidationException])
- def testFetchWithoutOrder(): Unit = {
- val util = batchTestUtil()
- val ds = util.addTableSource[(Int, Long, String)]("Table3", 'a, 'b, 'c)
-
- ds.fetch(5)
- }
-
- @Test(expected = classOf[ValidationException])
def testFetchBeforeOffset(): Unit = {
val util = batchTestUtil()
val ds = util.addTableSource[(Int, Long, String)]("Table3", 'a, 'b, 'c)
diff --git
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/validation/UnsupportedOpsValidationTest.scala
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/validation/UnsupportedOpsValidationTest.scala
index 484b1d3..769ee54 100644
---
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/validation/UnsupportedOpsValidationTest.scala
+++
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/validation/UnsupportedOpsValidationTest.scala
@@ -31,13 +31,6 @@ import org.junit.Test
class UnsupportedOpsValidationTest extends AbstractTestBase {
@Test(expected = classOf[ValidationException])
- def testSort(): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)
-
env.fromCollection(TestData.smallTupleData3).toTable(tEnv).orderBy('_1.desc)
- }
-
- @Test(expected = classOf[ValidationException])
def testJoin(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)
@@ -90,21 +83,4 @@ class UnsupportedOpsValidationTest extends AbstractTestBase {
val t2 = env.fromCollection(TestData.smallTupleData3).toTable(tEnv)
t1.minusAll(t2)
}
-
- @Test(expected = classOf[ValidationException])
- def testOffset(): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)
- val t1 = env.fromCollection(TestData.smallTupleData3).toTable(tEnv)
- t1.offset(5)
- }
-
- @Test(expected = classOf[ValidationException])
- def testFetch(): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)
- val t1 = env.fromCollection(TestData.smallTupleData3).toTable(tEnv)
- t1.fetch(5)
- }
-
}
diff --git
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/LimitITCase.scala
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/LimitITCase.scala
index 783e152..1f49b67 100644
---
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/LimitITCase.scala
+++
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/LimitITCase.scala
@@ -19,11 +19,9 @@
package org.apache.flink.table.planner.runtime.batch.sql
import org.apache.flink.table.api.TableSchema
-import org.apache.flink.table.api.ValidationException
import org.apache.flink.table.planner.runtime.utils.BatchTestBase
import org.apache.flink.table.planner.runtime.utils.TestData._
import org.apache.flink.table.planner.utils.TestLimitableTableSource
-
import org.junit._
class LimitITCase extends BatchTestBase {
@@ -100,13 +98,6 @@ class LimitITCase extends BatchTestBase {
5)
}
- @Test(expected = classOf[ValidationException])
- def testTableLimitWithLimitTable(): Unit = {
- Assert.assertEquals(
- executeQuery(tEnv.from("LimitTable").fetch(5)).size,
- 5)
- }
-
@Test
def testLessThanOffset(): Unit = {
checkSize(
diff --git
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/LimitITCase.scala
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/LimitITCase.scala
new file mode 100644
index 0000000..9562a40
--- /dev/null
+++
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/LimitITCase.scala
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.runtime.batch.table
+
+import org.apache.flink.table.api.TableSchema
+import org.apache.flink.table.planner.runtime.utils.BatchTestBase
+import org.apache.flink.table.planner.runtime.utils.TestData._
+import org.apache.flink.table.planner.utils.TestLimitableTableSource
+import org.junit.Assert.assertEquals
+import org.junit._
+
+class LimitITCase extends BatchTestBase {
+
+ @Before
+ override def before(): Unit = {
+ super.before()
+
+ TestLimitableTableSource.createTemporaryTable(
+ tEnv, data3, new TableSchema(Array("a", "b", "c"), type3.getFieldTypes),
"LimitTable")
+ }
+
+ @Test
+ def testFetch(): Unit = {
+ assertEquals(
+ executeQuery(tEnv.from("LimitTable").fetch(5)).size,
+ 5)
+ }
+
+ @Test
+ def testOffset(): Unit = {
+ assertEquals(
+ executeQuery(tEnv.from("LimitTable").offset(5)).size,
+ 16)
+ }
+
+ @Test
+ def testOffsetAndFetch(): Unit = {
+ assertEquals(
+ executeQuery(tEnv.from("LimitTable").limit(5, 5)).size,
+ 5)
+ }
+}
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/validation/SortValidationTest.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/validation/SortValidationTest.scala
index 3575833..95c9e86 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/validation/SortValidationTest.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/validation/SortValidationTest.scala
@@ -27,22 +27,6 @@ import org.junit._
class SortValidationTest extends TableTestBase {
@Test(expected = classOf[ValidationException])
- def testOffsetWithoutOrder(): Unit = {
- val util = batchTestUtil()
- val ds = util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
-
- ds.offset(5)
- }
-
- @Test(expected = classOf[ValidationException])
- def testFetchWithoutOrder(): Unit = {
- val util = batchTestUtil()
- val ds = util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
-
- ds.fetch(5)
- }
-
- @Test(expected = classOf[ValidationException])
def testFetchBeforeOffset(): Unit = {
val util = batchTestUtil()
val ds = util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/validation/UnsupportedOpsValidationTest.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/validation/UnsupportedOpsValidationTest.scala
index 73d8e77..f86e828 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/validation/UnsupportedOpsValidationTest.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/validation/UnsupportedOpsValidationTest.scala
@@ -33,11 +33,6 @@ class UnsupportedOpsValidationTest extends AbstractTestBase {
val tEnv: StreamTableEnvironment = StreamTableEnvironment.create(env,
settings)
@Test(expected = classOf[ValidationException])
- def testSort(): Unit = {
-
StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).orderBy('_1.desc)
- }
-
- @Test(expected = classOf[ValidationException])
def testJoin(): Unit = {
val t1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
val t2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
@@ -78,17 +73,4 @@ class UnsupportedOpsValidationTest extends AbstractTestBase {
val t2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
t1.minusAll(t2)
}
-
- @Test(expected = classOf[ValidationException])
- def testOffset(): Unit = {
- val t1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
- t1.offset(5)
- }
-
- @Test(expected = classOf[ValidationException])
- def testFetch(): Unit = {
- val t1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
- t1.fetch(5)
- }
-
}