This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.14 by this push:
new 992a143 [FLINK-24408][table-planner] Enable code splitting for large
number of VALUES clause
992a143 is described below
commit 992a1437ef79fb032cb0dccd3c7b362a368a6ab6
Author: tsreaper <[email protected]>
AuthorDate: Sun Oct 31 15:36:04 2021 +0800
[FLINK-24408][table-planner] Enable code splitting for large number of
VALUES clause
This closes #17586
---
.../flink/table/codesplit/JavaCodeSplitter.java | 3 +++
.../planner/codegen/InputFormatCodeGenerator.scala | 22 ++++++++--------
.../runtime/batch/sql/CodeSplitITCase.scala | 29 ++++++++++++++++++++++
3 files changed, 43 insertions(+), 11 deletions(-)
diff --git
a/flink-table/flink-table-code-splitter/src/main/java/org/apache/flink/table/codesplit/JavaCodeSplitter.java
b/flink-table/flink-table-code-splitter/src/main/java/org/apache/flink/table/codesplit/JavaCodeSplitter.java
index 9db15d4..89ba865 100644
---
a/flink-table/flink-table-code-splitter/src/main/java/org/apache/flink/table/codesplit/JavaCodeSplitter.java
+++
b/flink-table/flink-table-code-splitter/src/main/java/org/apache/flink/table/codesplit/JavaCodeSplitter.java
@@ -32,6 +32,9 @@ public class JavaCodeSplitter {
public static String split(String code, int maxMethodLength, int
maxClassMemberCount) {
try {
+ if (code.length() <= maxMethodLength) {
+ return code;
+ }
return splitImpl(code, maxMethodLength, maxClassMemberCount);
} catch (Throwable t) {
System.out.println(code);
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/InputFormatCodeGenerator.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/InputFormatCodeGenerator.scala
index 63d7c7d..80f677c 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/InputFormatCodeGenerator.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/InputFormatCodeGenerator.scala
@@ -72,17 +72,17 @@ object InputFormatCodeGenerator {
@Override
public Object nextRecord(Object reuse) {
- switch (nextIdx) {
- ${records.zipWithIndex.map { case (r, i) =>
- s"""
- |case $i:
- | $r
- |break;
- """.stripMargin
- }.mkString("\n")}
- }
- nextIdx++;
- return $outRecordTerm;
+ ${records.zipWithIndex.map { case (r, i) =>
+ s"""
+ |if (nextIdx == $i) {
+ | $r
+ | nextIdx++;
+ | return $outRecordTerm;
+ |}
+ |""".stripMargin
+ }.mkString("")}
+ throw new IllegalStateException(
+ "Invalid nextIdx " + nextIdx + ". This is a bug. Please file an
issue");
}
}
""".stripMargin
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CodeSplitITCase.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CodeSplitITCase.scala
index ab1b7023..7b10375 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CodeSplitITCase.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CodeSplitITCase.scala
@@ -20,6 +20,7 @@ package org.apache.flink.table.planner.runtime.batch.sql
import org.apache.flink.core.testutils.FlinkMatchers
import org.apache.flink.table.api.config.TableConfigOptions
+import org.apache.flink.table.planner.factories.TestValuesTableFactory
import org.apache.flink.table.planner.runtime.utils.BatchTestBase
import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row
import
org.apache.flink.table.planner.runtime.utils.TestData.{nullablesOfData3,
smallData3, type3}
@@ -92,6 +93,34 @@ class CodeSplitITCase extends BatchTestBase {
runTest(sql.mkString, Seq(result))
}
+ @Test
+ def testManyValues(): Unit = {
+ tEnv.executeSql(
+ s"""
+ |CREATE TABLE test_many_values (
+ |${Range(0, 100).map(i => s" f$i INT").mkString(",\n")}
+ |) WITH (
+ | 'connector' = 'values'
+ |)
+ |""".stripMargin
+ ).await()
+
+ tEnv.executeSql(
+ s"""
+ |INSERT INTO test_many_values VALUES
+ |${Range(0, 100)
+ .map(i => "(" + Range(0, 100).map(_ => s"$i").mkString(", ") + ")")
+ .mkString(", ")}
+ |""".stripMargin
+ ).await()
+
+ val expected = new java.util.ArrayList[String]()
+ for (i <- 0 until 100) {
+ expected.add(s"+I[${Range(0, 100).map(_ => s"$i").mkString(", ")}]")
+ }
+ Assert.assertEquals(expected,
TestValuesTableFactory.getResults("test_many_values"))
+ }
+
private[flink] def runTest(sql: String, results: Seq[Row]): Unit = {
tEnv.getConfig.getConfiguration.setInteger(
TableConfigOptions.MAX_LENGTH_GENERATED_CODE, 4000)