[ 
https://issues.apache.org/jira/browse/FLINK-10261?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16605842#comment-16605842
 ] 

ASF GitHub Bot commented on FLINK-10261:
----------------------------------------

asfgit closed pull request #6648: [FLINK-10261][table] fix insert into with 
order by
URL: https://github.com/apache/flink/pull/6648
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
index 37f6d024a07..195812d1d1c 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
@@ -711,10 +711,10 @@ abstract class TableEnvironment(val config: TableConfig) {
       case insert: SqlInsert =>
         // validate the SQL query
         val query = insert.getSource
-        planner.validate(query)
+        val validatedQuery = planner.validate(query)
 
         // get query result as Table
-        val queryResult = new Table(this, 
LogicalRelNode(planner.rel(query).rel))
+        val queryResult = new Table(this, 
LogicalRelNode(planner.rel(validatedQuery).rel))
 
         // get name of sink table
         val targetTableName = 
insert.getTargetTable.asInstanceOf[SqlIdentifier].names.get(0)
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/SortValidationTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/SortValidationTest.scala
index 083ed9468bf..644b0c3fb83 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/SortValidationTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/SortValidationTest.scala
@@ -38,7 +38,6 @@ class SortValidationTest extends TableTestBase {
     streamUtil.verifySql(sqlQuery, "")
   }
 
-
   // test should fail because time is not the primary order field
   @Test(expected = classOf[TableException])
   def testSortProcessingTimeSecondaryField(): Unit = {
@@ -54,4 +53,12 @@ class SortValidationTest extends TableTestBase {
     val sqlQuery = "SELECT a FROM MyTable LIMIT 3"
     streamUtil.verifySql(sqlQuery, "")
   }
+
+  // test should fail because time is not order field
+  @Test(expected = classOf[TableException])
+  def testNonTimeSorting(): Unit = {
+
+    val sqlQuery = "INSERT INTO targetTable SELECT a, b, c, rowtime FROM 
sourceTable ORDER BY a"
+    streamUtil.verifySql(sqlQuery, "")
+  }
 }
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SortITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SortITCase.scala
index 19db2a031b4..e7b79a5a196 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SortITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SortITCase.scala
@@ -18,15 +18,17 @@
 
 package org.apache.flink.table.runtime.stream.sql
 
+import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.scala._
 import org.apache.flink.streaming.api.TimeCharacteristic
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.{TableEnvironment, Types}
 import org.apache.flink.table.api.scala._
 import 
org.apache.flink.table.runtime.utils.TimeTestUtil.EventTimeSourceFunction
 import 
org.apache.flink.table.runtime.stream.sql.SortITCase.StringRowSelectorSink
-import org.apache.flink.table.runtime.utils.{StreamITCase, 
StreamingWithStateTestBase}
+import org.apache.flink.table.runtime.utils.{StreamITCase, StreamTestData, 
StreamingWithStateTestBase}
+import org.apache.flink.table.utils.MemoryTableSourceSinkUtil
 import org.apache.flink.types.Row
 import org.junit.Assert._
 import org.junit._
@@ -105,6 +107,36 @@ class SortITCase extends StreamingWithStateTestBase {
       "20")
     assertEquals(expected, SortITCase.testResults)
   }
+
+  @Test
+  def testInsertIntoMemoryTableOrderBy(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    MemoryTableSourceSinkUtil.clear()
+
+    val t = StreamTestData.getSmall3TupleDataStream(env)
+      .assignAscendingTimestamps(x => x._2)
+      .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
+    tEnv.registerTable("sourceTable", t)
+
+    val fieldNames = Array("d", "e", "f", "t")
+    val fieldTypes = Array(Types.INT, Types.LONG, Types.STRING, 
Types.SQL_TIMESTAMP)
+      .asInstanceOf[Array[TypeInformation[_]]]
+    val sink = new MemoryTableSourceSinkUtil.UnsafeMemoryAppendTableSink
+    tEnv.registerTableSink("targetTable", fieldNames, fieldTypes, sink)
+
+    val sql = "INSERT INTO targetTable SELECT a, b, c, rowtime " +
+      "FROM sourceTable ORDER BY rowtime, a desc"
+    tEnv.sqlUpdate(sql)
+    env.execute()
+
+    val expected = List(
+      "1,1,Hi,1970-01-01 00:00:00.001",
+      "3,2,Hello world,1970-01-01 00:00:00.002",
+      "2,2,Hello,1970-01-01 00:00:00.002")
+    assertEquals(expected, MemoryTableSourceSinkUtil.tableDataStrings)
+  }
 }
 
 object SortITCase {
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MemoryTableSourceSinkUtil.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MemoryTableSourceSinkUtil.scala
index cb0ad436a18..1edd79fca56 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MemoryTableSourceSinkUtil.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MemoryTableSourceSinkUtil.scala
@@ -119,8 +119,10 @@ object MemoryTableSourceSinkUtil {
     }
 
     override def emitDataStream(dataStream: DataStream[Row]): Unit = {
+      val inputParallelism = dataStream.getParallelism
       dataStream
         .addSink(new MemoryAppendSink)
+        .setParallelism(inputParallelism)
         .name(TableConnectorUtil.generateRuntimeName(this.getClass, 
getFieldNames))
     }
   }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> INSERT INTO does not work with ORDER BY clause
> ----------------------------------------------
>
>                 Key: FLINK-10261
>                 URL: https://issues.apache.org/jira/browse/FLINK-10261
>             Project: Flink
>          Issue Type: Bug
>          Components: Table API & SQL
>            Reporter: Timo Walther
>            Assignee: xueyu
>            Priority: Major
>              Labels: pull-request-available
>
> It seems that INSERT INTO and ORDER BY do not work well together.
> An AssertionError is thrown and the ORDER BY clause is duplicated. I guess 
> this is a Calcite issue.
> Example:
> {code}
> @Test
>   def testInsertIntoMemoryTable(): Unit = {
>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>     val tEnv = TableEnvironment.getTableEnvironment(env)
>     MemoryTableSourceSinkUtil.clear()
>     val t = StreamTestData.getSmall3TupleDataStream(env)
>         .assignAscendingTimestamps(x => x._2)
>       .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
>     tEnv.registerTable("sourceTable", t)
>     val fieldNames = Array("d", "e", "f", "t")
>     val fieldTypes = Array(Types.INT, Types.LONG, Types.STRING, 
> Types.SQL_TIMESTAMP)
>       .asInstanceOf[Array[TypeInformation[_]]]
>     val sink = new MemoryTableSourceSinkUtil.UnsafeMemoryAppendTableSink
>     tEnv.registerTableSink("targetTable", fieldNames, fieldTypes, sink)
>     val sql = "INSERT INTO targetTable SELECT a, b, c, rowtime FROM 
> sourceTable ORDER BY a"
>     tEnv.sqlUpdate(sql)
>     env.execute()
> {code}
> Error:
> {code}
> java.lang.AssertionError: not a query: SELECT `sourceTable`.`a`, 
> `sourceTable`.`b`, `sourceTable`.`c`, `sourceTable`.`rowtime`
> FROM `sourceTable` AS `sourceTable`
> ORDER BY `a`
> ORDER BY `a`
>       at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3069)
>       at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:557)
>       at 
> org.apache.flink.table.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:104)
>       at 
> org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:717)
>       at 
> org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:683)
>       at 
> org.apache.flink.table.runtime.stream.sql.SqlITCase.testInsertIntoMemoryTable(SqlITCase.scala:735)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:498)
>       at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>       at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>       at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>       at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to