[
https://issues.apache.org/jira/browse/FLINK-10261?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16605842#comment-16605842
]
ASF GitHub Bot commented on FLINK-10261:
----------------------------------------
asfgit closed pull request #6648: [FLINK-10261][table] fix insert into with
order by
URL: https://github.com/apache/flink/pull/6648
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
index 37f6d024a07..195812d1d1c 100644
---
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
+++
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
@@ -711,10 +711,10 @@ abstract class TableEnvironment(val config: TableConfig) {
case insert: SqlInsert =>
// validate the SQL query
val query = insert.getSource
- planner.validate(query)
+ val validatedQuery = planner.validate(query)
// get query result as Table
- val queryResult = new Table(this,
LogicalRelNode(planner.rel(query).rel))
+ val queryResult = new Table(this,
LogicalRelNode(planner.rel(validatedQuery).rel))
// get name of sink table
val targetTableName =
insert.getTargetTable.asInstanceOf[SqlIdentifier].names.get(0)
diff --git
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/SortValidationTest.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/SortValidationTest.scala
index 083ed9468bf..644b0c3fb83 100644
---
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/SortValidationTest.scala
+++
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/SortValidationTest.scala
@@ -38,7 +38,6 @@ class SortValidationTest extends TableTestBase {
streamUtil.verifySql(sqlQuery, "")
}
-
// test should fail because time is not the primary order field
@Test(expected = classOf[TableException])
def testSortProcessingTimeSecondaryField(): Unit = {
@@ -54,4 +53,12 @@ class SortValidationTest extends TableTestBase {
val sqlQuery = "SELECT a FROM MyTable LIMIT 3"
streamUtil.verifySql(sqlQuery, "")
}
+
+ // test should fail because time is not order field
+ @Test(expected = classOf[TableException])
+ def testNonTimeSorting(): Unit = {
+
+ val sqlQuery = "INSERT INTO targetTable SELECT a, b, c, rowtime FROM
sourceTable ORDER BY a"
+ streamUtil.verifySql(sqlQuery, "")
+ }
}
diff --git
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SortITCase.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SortITCase.scala
index 19db2a031b4..e7b79a5a196 100644
---
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SortITCase.scala
+++
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SortITCase.scala
@@ -18,15 +18,17 @@
package org.apache.flink.table.runtime.stream.sql
+import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.{TableEnvironment, Types}
import org.apache.flink.table.api.scala._
import
org.apache.flink.table.runtime.utils.TimeTestUtil.EventTimeSourceFunction
import
org.apache.flink.table.runtime.stream.sql.SortITCase.StringRowSelectorSink
-import org.apache.flink.table.runtime.utils.{StreamITCase,
StreamingWithStateTestBase}
+import org.apache.flink.table.runtime.utils.{StreamITCase, StreamTestData,
StreamingWithStateTestBase}
+import org.apache.flink.table.utils.MemoryTableSourceSinkUtil
import org.apache.flink.types.Row
import org.junit.Assert._
import org.junit._
@@ -105,6 +107,36 @@ class SortITCase extends StreamingWithStateTestBase {
"20")
assertEquals(expected, SortITCase.testResults)
}
+
+ @Test
+ def testInsertIntoMemoryTableOrderBy(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ MemoryTableSourceSinkUtil.clear()
+
+ val t = StreamTestData.getSmall3TupleDataStream(env)
+ .assignAscendingTimestamps(x => x._2)
+ .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
+ tEnv.registerTable("sourceTable", t)
+
+ val fieldNames = Array("d", "e", "f", "t")
+ val fieldTypes = Array(Types.INT, Types.LONG, Types.STRING,
Types.SQL_TIMESTAMP)
+ .asInstanceOf[Array[TypeInformation[_]]]
+ val sink = new MemoryTableSourceSinkUtil.UnsafeMemoryAppendTableSink
+ tEnv.registerTableSink("targetTable", fieldNames, fieldTypes, sink)
+
+ val sql = "INSERT INTO targetTable SELECT a, b, c, rowtime " +
+ "FROM sourceTable ORDER BY rowtime, a desc"
+ tEnv.sqlUpdate(sql)
+ env.execute()
+
+ val expected = List(
+ "1,1,Hi,1970-01-01 00:00:00.001",
+ "3,2,Hello world,1970-01-01 00:00:00.002",
+ "2,2,Hello,1970-01-01 00:00:00.002")
+ assertEquals(expected, MemoryTableSourceSinkUtil.tableDataStrings)
+ }
}
object SortITCase {
diff --git
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MemoryTableSourceSinkUtil.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MemoryTableSourceSinkUtil.scala
index cb0ad436a18..1edd79fca56 100644
---
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MemoryTableSourceSinkUtil.scala
+++
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MemoryTableSourceSinkUtil.scala
@@ -119,8 +119,10 @@ object MemoryTableSourceSinkUtil {
}
override def emitDataStream(dataStream: DataStream[Row]): Unit = {
+ val inputParallelism = dataStream.getParallelism
dataStream
.addSink(new MemoryAppendSink)
+ .setParallelism(inputParallelism)
.name(TableConnectorUtil.generateRuntimeName(this.getClass,
getFieldNames))
}
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> INSERT INTO does not work with ORDER BY clause
> ----------------------------------------------
>
> Key: FLINK-10261
> URL: https://issues.apache.org/jira/browse/FLINK-10261
> Project: Flink
> Issue Type: Bug
> Components: Table API & SQL
> Reporter: Timo Walther
> Assignee: xueyu
> Priority: Major
> Labels: pull-request-available
>
> It seems that INSERT INTO and ORDER BY do not work well together.
> An AssertionError is thrown and the ORDER BY clause is duplicated. I guess
> this is a Calcite issue.
> Example:
> {code}
> @Test
> def testInsertIntoMemoryTable(): Unit = {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> val tEnv = TableEnvironment.getTableEnvironment(env)
> MemoryTableSourceSinkUtil.clear()
> val t = StreamTestData.getSmall3TupleDataStream(env)
> .assignAscendingTimestamps(x => x._2)
> .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
> tEnv.registerTable("sourceTable", t)
> val fieldNames = Array("d", "e", "f", "t")
> val fieldTypes = Array(Types.INT, Types.LONG, Types.STRING,
> Types.SQL_TIMESTAMP)
> .asInstanceOf[Array[TypeInformation[_]]]
> val sink = new MemoryTableSourceSinkUtil.UnsafeMemoryAppendTableSink
> tEnv.registerTableSink("targetTable", fieldNames, fieldTypes, sink)
> val sql = "INSERT INTO targetTable SELECT a, b, c, rowtime FROM
> sourceTable ORDER BY a"
> tEnv.sqlUpdate(sql)
> env.execute()
> {code}
> Error:
> {code}
> java.lang.AssertionError: not a query: SELECT `sourceTable`.`a`,
> `sourceTable`.`b`, `sourceTable`.`c`, `sourceTable`.`rowtime`
> FROM `sourceTable` AS `sourceTable`
> ORDER BY `a`
> ORDER BY `a`
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3069)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:557)
> at
> org.apache.flink.table.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:104)
> at
> org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:717)
> at
> org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:683)
> at
> org.apache.flink.table.runtime.stream.sql.SqlITCase.testInsertIntoMemoryTable(SqlITCase.scala:735)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> at
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> at
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> at
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> {code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)