This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new e5bf5dc  [FLINK-18369] Fix instable 
TableEnvironmentITCase#testStatementSetWithSameSinkTableNames
e5bf5dc is described below

commit e5bf5dc8e9c42a9d1e63c4025a71f8bdb29a50dd
Author: Dawid Wysakowicz <[email protected]>
AuthorDate: Fri Jun 19 13:12:49 2020 +0200

    [FLINK-18369] Fix instable 
TableEnvironmentITCase#testStatementSetWithSameSinkTableNames
    
    Replaced TestingOverwritableTableSink with UnsafeMemoryAppendTableSink
    as the first uses DataSet#writeAsText. This sink cannot be used twice
    with the same path in a single JobGraph.
---
 .../table/runtime/batch/sql/TableEnvironmentITCase.scala    | 13 ++++++-------
 1 file changed, 6 insertions(+), 7 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableEnvironmentITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableEnvironmentITCase.scala
index a490467..7b5057b 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableEnvironmentITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableEnvironmentITCase.scala
@@ -534,7 +534,6 @@ class TableEnvironmentITCase(
   }
 
   @Test
-  @Ignore
   def testStatementSetWithSameSinkTableNames(): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
     val tEnv = BatchTableEnvironment.create(env)
@@ -543,15 +542,15 @@ class TableEnvironmentITCase(
     val t = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv).as('a, 
'b, 'c)
     tEnv.registerTable("MyTable", t)
 
-    val sinkPath = _tempFolder.newFile().getAbsolutePath
-    val configuredSink = new TestingOverwritableTableSink(sinkPath)
+    MemoryTableSourceSinkUtil.clear()
+    val configuredSink = new 
MemoryTableSourceSinkUtil.UnsafeMemoryAppendTableSink()
       .configure(Array("d", "e", "f"), Array(INT, LONG, STRING))
-    
tEnv.asInstanceOf[TableEnvironmentInternal].registerTableSinkInternal("MySink", 
configuredSink)
-    assertTrue(FileUtils.readFileUtf8(new File(sinkPath)).isEmpty)
+    tEnv.asInstanceOf[TableEnvironmentInternal]
+      .registerTableSinkInternal("MySink", configuredSink)
 
     val stmtSet = tEnv.createStatementSet()
-    stmtSet.addInsert("MySink", tEnv.sqlQuery("select * from MyTable where a > 
2"), true)
-      .addInsertSql("INSERT OVERWRITE MySink SELECT a, b, c FROM MyTable where 
a <= 2")
+    stmtSet.addInsert("MySink", tEnv.sqlQuery("select * from MyTable where a > 
2"))
+      .addInsertSql("INSERT INTO MySink SELECT a, b, c FROM MyTable where a <= 
2")
 
     val tableResult = stmtSet.execute()
     // wait job finished

Reply via email to