This is an automated email from the ASF dual-hosted git repository.
dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new e5bf5dc [FLINK-18369] Fix instable
TableEnvironmentITCase#testStatementSetWithSameSinkTableNames
e5bf5dc is described below
commit e5bf5dc8e9c42a9d1e63c4025a71f8bdb29a50dd
Author: Dawid Wysakowicz <[email protected]>
AuthorDate: Fri Jun 19 13:12:49 2020 +0200
[FLINK-18369] Fix instable
TableEnvironmentITCase#testStatementSetWithSameSinkTableNames
Replaced TestingOverwritableTableSink with UnsafeMemoryAppendTableSink
as the first uses DataSet#writeAsText. This sink cannot be used twice
with the same path in a single JobGraph.
---
.../table/runtime/batch/sql/TableEnvironmentITCase.scala | 13 ++++++-------
1 file changed, 6 insertions(+), 7 deletions(-)
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableEnvironmentITCase.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableEnvironmentITCase.scala
index a490467..7b5057b 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableEnvironmentITCase.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableEnvironmentITCase.scala
@@ -534,7 +534,6 @@ class TableEnvironmentITCase(
}
@Test
- @Ignore
def testStatementSetWithSameSinkTableNames(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val tEnv = BatchTableEnvironment.create(env)
@@ -543,15 +542,15 @@ class TableEnvironmentITCase(
val t = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv).as('a,
'b, 'c)
tEnv.registerTable("MyTable", t)
- val sinkPath = _tempFolder.newFile().getAbsolutePath
- val configuredSink = new TestingOverwritableTableSink(sinkPath)
+ MemoryTableSourceSinkUtil.clear()
+ val configuredSink = new
MemoryTableSourceSinkUtil.UnsafeMemoryAppendTableSink()
.configure(Array("d", "e", "f"), Array(INT, LONG, STRING))
-
tEnv.asInstanceOf[TableEnvironmentInternal].registerTableSinkInternal("MySink",
configuredSink)
- assertTrue(FileUtils.readFileUtf8(new File(sinkPath)).isEmpty)
+ tEnv.asInstanceOf[TableEnvironmentInternal]
+ .registerTableSinkInternal("MySink", configuredSink)
val stmtSet = tEnv.createStatementSet()
- stmtSet.addInsert("MySink", tEnv.sqlQuery("select * from MyTable where a >
2"), true)
- .addInsertSql("INSERT OVERWRITE MySink SELECT a, b, c FROM MyTable where
a <= 2")
+ stmtSet.addInsert("MySink", tEnv.sqlQuery("select * from MyTable where a >
2"))
+ .addInsertSql("INSERT INTO MySink SELECT a, b, c FROM MyTable where a <=
2")
val tableResult = stmtSet.execute()
// wait job finished