TanYuxin-tyx commented on code in PR #22573:
URL: https://github.com/apache/flink/pull/22573#discussion_r1196044055
##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSinkITCase.scala:
##########
@@ -69,28 +67,19 @@ class TableSinkITCase extends BatchTestBase {
s"insert into MySink /*+ OPTIONS('path' = '$newPath2') */ select * from
MyTable")
stmtSet.execute().await()
- Assert.assertTrue(TableTestUtil.readFromFile(resultPath).isEmpty)
+ assertThat(TableTestUtil.readFromFile(resultPath).isEmpty).isTrue
val expected = Seq("1,1,Hi", "2,2,Hello", "3,2,Hello world")
val result1 = TableTestUtil.readFromFile(newPath1)
- Assert.assertEquals(expected.sorted, result1.sorted)
+ assertThat(expected.sorted).isEqualTo(result1.sorted)
val result2 = TableTestUtil.readFromFile(newPath2)
- Assert.assertEquals(expected.sorted, result2.sorted)
+ assertThat(expected.sorted).isEqualTo(result2.sorted)
}
@Test
def testCollectSinkConfiguration(): Unit = {
tEnv.getConfig.set(CollectSinkOperatorFactory.MAX_BATCH_SIZE,
MemorySize.parse("1b"))
- try {
- checkResult("SELECT 1", Seq(row(1)))
- Assert.fail("Expecting exception thrown from collect sink")
- } catch {
- case e: Exception =>
- MatcherAssert.assertThat(
- e,
- FlinkMatchers.containsMessage(
- "Please consider increasing max bytes per batch value " +
- "by setting collect-sink.batch-size.max"))
- }
+ assertThatThrownBy(() => checkResult("SELECT 1",
Seq(row(1)))).rootCause.toString.contains(
+ "Please consider increasing max bytes per batch value by setting
collect-sink.batch-size.max")
Review Comment:
Fixed.
##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/MiscITCase.scala:
##########
@@ -528,82 +527,97 @@ class MiscITCase extends BatchTestBase {
Seq(row(false, 1), row(true, 3)))
}
- @Test(expected = classOf[org.apache.flink.table.api.ValidationException])
+ @Test
def testTableGenerateFunction(): Unit = {
- checkResult(
- "SELECT f, g, v FROM testTable," +
- "LATERAL TABLE(STRING_SPLIT(f, ' ')) AS T(v)",
- Seq(row("abcd", "f%g", "abcd"), row("e fg", null, "e"), row("e fg",
null, "fg"))
- )
+ assertThatThrownBy(
+ () =>
+ checkResult(
+ "SELECT f, g, v FROM testTable," +
+ "LATERAL TABLE(STRING_SPLIT(f, ' ')) AS T(v)",
+ Seq(row("abcd", "f%g", "abcd"), row("e fg", null, "e"), row("e fg",
null, "fg"))
+
)).isInstanceOf(classOf[org.apache.flink.table.api.ValidationException])
// BuildInFunctions in SQL is case insensitive
- checkResult(
- "SELECT f, g, v FROM testTable," +
- "LATERAL TABLE(sTRING_sPLIT(f, ' ')) AS T(v)",
- Seq(row("abcd", "f%g", "abcd"), row("e fg", null, "e"), row("e fg",
null, "fg"))
- )
-
- checkResult(
- "SELECT f, g, v FROM testTable," +
- "LATERAL TABLE(GENERATE_SERIES(0, CAST(b AS INTEGER))) AS T(v)",
- Seq(
- row("abcd", "f%g", 0),
- row(null, "hij_k", 0),
- row(null, "hij_k", 1),
- row("e fg", null, 0),
- row("e fg", null, 1),
- row("e fg", null, 2))
- )
-
- checkResult(
- "SELECT f, g, v FROM testTable," +
- "LATERAL TABLE(JSON_TUPLE('{\"a1\": \"b1\", \"a2\": \"b2\", \"e fg\":
\"b3\"}'," +
- "'a1', f)) AS T(v)",
- Seq(
- row("abcd", "f%g", "b1"),
- row("abcd", "f%g", null),
- row(null, "hij_k", "b1"),
- row(null, "hij_k", null),
- row("e fg", null, "b1"),
- row("e fg", null, "b3"))
- )
-
- checkResult(
- "SELECT f, g, v FROM " +
- "testTable JOIN LATERAL TABLE(JSON_TUPLE" +
- "('{\"a1\": \"b1\", \"a2\": \"b2\", \"e fg\": \"b3\"}', 'a1', f)) AS
T(v) " +
- "ON CHAR_LENGTH(f) = CHAR_LENGTH(v) + 2 OR CHAR_LENGTH(g) =
CHAR_LENGTH(v) + 3",
- Seq(
- row("abcd", "f%g", "b1"),
- row(null, "hij_k", "b1"),
- row("e fg", null, "b1"),
- row("e fg", null, "b3"))
- )
-
- checkResult(
- "SELECT f, g, v FROM " +
- "testTable JOIN LATERAL TABLE(JSON_TUPLE" +
- "('{\"a1\": \"b1\", \"a2\": \"b2\", \"e fg\": \"b3\"}', 'a1', f)) AS
T(v) " +
- "ON CHAR_LENGTH(f) = CHAR_LENGTH(v) + 2 OR CHAR_LENGTH(g) =
CHAR_LENGTH(v) + 3",
- Seq(
- row("abcd", "f%g", "b1"),
- row(null, "hij_k", "b1"),
- row("e fg", null, "b1"),
- row("e fg", null, "b3"))
- )
+ assertThatThrownBy(
+ () =>
+ checkResult(
+ "SELECT f, g, v FROM testTable," +
+ "LATERAL TABLE(sTRING_sPLIT(f, ' ')) AS T(v)",
+ Seq(row("abcd", "f%g", "abcd"), row("e fg", null, "e"), row("e fg",
null, "fg"))
+
)).isInstanceOf(classOf[org.apache.flink.table.api.ValidationException])
+
+ assertThatThrownBy(
+ () =>
+ checkResult(
+ "SELECT f, g, v FROM testTable," +
+ "LATERAL TABLE(GENERATE_SERIES(0, CAST(b AS INTEGER))) AS T(v)",
+ Seq(
+ row("abcd", "f%g", 0),
+ row(null, "hij_k", 0),
+ row(null, "hij_k", 1),
+ row("e fg", null, 0),
+ row("e fg", null, 1),
+ row("e fg", null, 2))
+
)).isInstanceOf(classOf[org.apache.flink.table.api.ValidationException])
+
+ assertThatThrownBy(
+ () =>
+ checkResult(
+ "SELECT f, g, v FROM testTable," +
+ "LATERAL TABLE(JSON_TUPLE('{\"a1\": \"b1\", \"a2\": \"b2\", \"e
fg\": \"b3\"}'," +
+ "'a1', f)) AS T(v)",
+ Seq(
+ row("abcd", "f%g", "b1"),
+ row("abcd", "f%g", null),
+ row(null, "hij_k", "b1"),
+ row(null, "hij_k", null),
+ row("e fg", null, "b1"),
+ row("e fg", null, "b3"))
+
)).isInstanceOf(classOf[org.apache.flink.table.api.ValidationException])
+
+ assertThatThrownBy(
+ () =>
+ checkResult(
+ "SELECT f, g, v FROM " +
+ "testTable JOIN LATERAL TABLE(JSON_TUPLE" +
+ "('{\"a1\": \"b1\", \"a2\": \"b2\", \"e fg\": \"b3\"}', 'a1', f))
AS T(v) " +
+ "ON CHAR_LENGTH(f) = CHAR_LENGTH(v) + 2 OR CHAR_LENGTH(g) =
CHAR_LENGTH(v) + 3",
+ Seq(
+ row("abcd", "f%g", "b1"),
+ row(null, "hij_k", "b1"),
+ row("e fg", null, "b1"),
+ row("e fg", null, "b3"))
+
)).isInstanceOf(classOf[org.apache.flink.table.api.ValidationException])
+
+ assertThatThrownBy(
+ () =>
+ checkResult(
+ "SELECT f, g, v FROM " +
+ "testTable JOIN LATERAL TABLE(JSON_TUPLE" +
+ "('{\"a1\": \"b1\", \"a2\": \"b2\", \"e fg\": \"b3\"}', 'a1', f))
AS T(v) " +
+ "ON CHAR_LENGTH(f) = CHAR_LENGTH(v) + 2 OR CHAR_LENGTH(g) =
CHAR_LENGTH(v) + 3",
+ Seq(
+ row("abcd", "f%g", "b1"),
+ row(null, "hij_k", "b1"),
+ row("e fg", null, "b1"),
+ row("e fg", null, "b3"))
+
)).isInstanceOf(classOf[org.apache.flink.table.api.ValidationException])
}
/**
* Due to the improper translation of TableFunction left outer join (see
CALCITE-2004), the join
* predicate can only be empty or literal true (the restriction should be
removed in FLINK-7865).
*/
- @Test(expected = classOf[org.apache.flink.table.api.ValidationException])
+ @Test
def testTableGenerateFunctionLeftJoin(): Unit = {
- checkResult(
- "SELECT f, g, v FROM " +
- "testTable LEFT OUTER JOIN LATERAL TABLE(GENERATE_SERIES(0, CAST(b AS
INTEGER))) AS T(v) " +
- "ON LENGTH(f) = v + 2 OR LENGTH(g) = v + 4",
- Seq(row(null, "hij_k", 1), row("e fg", null, 2))
- )
+ assertThatThrownBy(
+ () =>
+ checkResult(
+ "SELECT f, g, v FROM " +
+ "testTable LEFT OUTER JOIN LATERAL TABLE(GENERATE_SERIES(0, CAST(b
AS INTEGER))) AS T(v) " +
+ "ON LENGTH(f) = v + 2 OR LENGTH(g) = v + 4",
+ Seq(row(null, "hij_k", 1), row("e fg", null, 2))
+
)).isInstanceOf(classOf[org.apache.flink.table.api.ValidationException])
Review Comment:
Fixed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]