Repository: flink Updated Branches: refs/heads/master 2cb58960e -> 2b76ecab8
[hotfix] [table] Fix IndexOOBE in DataStreamSortRule. This closes #5370. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2b76ecab Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2b76ecab Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2b76ecab Branch: refs/heads/master Commit: 2b76ecab8cca8ec8de9b1187fcd656019d36f9f5 Parents: 2cb5896 Author: Fabian Hueske <[email protected]> Authored: Fri Jan 26 10:19:17 2018 +0100 Committer: twalthr <[email protected]> Committed: Wed Jan 31 13:25:28 2018 +0100 ---------------------------------------------------------------------- .../plan/rules/datastream/DataStreamSortRule.scala | 4 ++++ .../apache/flink/table/runtime/aggregate/SortUtil.scala | 4 ++++ .../apache/flink/table/api/stream/sql/SortTest.scala | 4 ++-- .../api/stream/sql/validation/SortValidationTest.scala | 12 ++++++++++-- 4 files changed, 20 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/2b76ecab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamSortRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamSortRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamSortRule.scala index 17a643a..0186347 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamSortRule.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamSortRule.scala @@ -71,6 +71,10 @@ class DataStreamSortRule def checkTimeOrder(sort: FlinkLogicalSort): Boolean = { val sortCollation = sort.collation + if (sortCollation.getFieldCollations.size() == 0) { + // no sort fields defined + return false + } // get type of first sort field val firstSortType = SortUtil.getFirstSortField(sortCollation, sort.getRowType).getType // get direction of first sort field http://git-wip-us.apache.org/repos/asf/flink/blob/2b76ecab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/SortUtil.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/SortUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/SortUtil.scala index 2bafed9..67bb603 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/SortUtil.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/SortUtil.scala @@ -35,6 +35,7 @@ import org.apache.flink.table.api.TableException import org.apache.flink.table.calcite.FlinkTypeFactory import org.apache.flink.api.common.functions.MapFunction import org.apache.flink.table.plan.schema.RowSchema +import org.apache.flink.util.Preconditions import java.util.Comparator @@ -60,6 +61,7 @@ object SortUtil { inputTypeInfo: TypeInformation[Row], execCfg: ExecutionConfig): ProcessFunction[CRow, CRow] = { + Preconditions.checkArgument(collationSort.getFieldCollations.size() > 0) val rowtimeIdx = collationSort.getFieldCollations.get(0).getFieldIndex val collectionRowComparator = if (collationSort.getFieldCollations.size() > 1) { @@ -158,6 +160,7 @@ object SortUtil { * @return The direction of the first sort field. */ def getFirstSortDirection(collationSort: RelCollation): Direction = { + Preconditions.checkArgument(collationSort.getFieldCollations.size() > 0) collationSort.getFieldCollations.get(0).direction } @@ -169,6 +172,7 @@ object SortUtil { * @return The first sort field. */ def getFirstSortField(collationSort: RelCollation, rowType: RelDataType): RelDataTypeField = { + Preconditions.checkArgument(collationSort.getFieldCollations.size() > 0) val idx = collationSort.getFieldCollations.get(0).getFieldIndex rowType.getFieldList.get(idx) } http://git-wip-us.apache.org/repos/asf/flink/blob/2b76ecab/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/SortTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/SortTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/SortTest.scala index d20002a..087ae7d 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/SortTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/SortTest.scala @@ -31,7 +31,7 @@ class SortTest extends TableTestBase { 'proctime.proctime, 'rowtime.rowtime) @Test - def testSortProcessingTime() = { + def testSortProcessingTime(): Unit = { val sqlQuery = "SELECT a FROM MyTable ORDER BY proctime, c" @@ -47,7 +47,7 @@ class SortTest extends TableTestBase { } @Test - def testSortRowTime() = { + def testSortRowTime(): Unit = { val sqlQuery = "SELECT a FROM MyTable ORDER BY rowtime, c" http://git-wip-us.apache.org/repos/asf/flink/blob/2b76ecab/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/SortValidationTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/SortValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/SortValidationTest.scala index ae7486d..083ed94 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/SortValidationTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/SortValidationTest.scala @@ -32,7 +32,7 @@ class SortValidationTest extends TableTestBase { // test should fail because time order is descending @Test(expected = classOf[TableException]) - def testSortProcessingTimeDesc() = { + def testSortProcessingTimeDesc(): Unit = { val sqlQuery = "SELECT a FROM MyTable ORDER BY proctime DESC, c" streamUtil.verifySql(sqlQuery, "") @@ -41,9 +41,17 @@ class SortValidationTest extends TableTestBase { // test should fail because time is not the primary order field @Test(expected = classOf[TableException]) - def testSortProcessingTimeSecondaryField() = { + def testSortProcessingTimeSecondaryField(): Unit = { val sqlQuery = "SELECT a FROM MyTable ORDER BY c, proctime" streamUtil.verifySql(sqlQuery, "") } + + // test should fail because LIMIT is not supported without sorting + @Test(expected = classOf[TableException]) + def testLimitWithoutSorting(): Unit = { + + val sqlQuery = "SELECT a FROM MyTable LIMIT 3" + streamUtil.verifySql(sqlQuery, "") + } }
