This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.14 by this push:
     new 992a143  [FLINK-24408][table-planner] Enable code splitting for large 
number of VALUES clause
992a143 is described below

commit 992a1437ef79fb032cb0dccd3c7b362a368a6ab6
Author: tsreaper <[email protected]>
AuthorDate: Sun Oct 31 15:36:04 2021 +0800

    [FLINK-24408][table-planner] Enable code splitting for large number of 
VALUES clause
    
    This closes #17586
---
 .../flink/table/codesplit/JavaCodeSplitter.java    |  3 +++
 .../planner/codegen/InputFormatCodeGenerator.scala | 22 ++++++++--------
 .../runtime/batch/sql/CodeSplitITCase.scala        | 29 ++++++++++++++++++++++
 3 files changed, 43 insertions(+), 11 deletions(-)

diff --git 
a/flink-table/flink-table-code-splitter/src/main/java/org/apache/flink/table/codesplit/JavaCodeSplitter.java
 
b/flink-table/flink-table-code-splitter/src/main/java/org/apache/flink/table/codesplit/JavaCodeSplitter.java
index 9db15d4..89ba865 100644
--- 
a/flink-table/flink-table-code-splitter/src/main/java/org/apache/flink/table/codesplit/JavaCodeSplitter.java
+++ 
b/flink-table/flink-table-code-splitter/src/main/java/org/apache/flink/table/codesplit/JavaCodeSplitter.java
@@ -32,6 +32,9 @@ public class JavaCodeSplitter {
 
     public static String split(String code, int maxMethodLength, int 
maxClassMemberCount) {
         try {
+            if (code.length() <= maxMethodLength) {
+                return code;
+            }
             return splitImpl(code, maxMethodLength, maxClassMemberCount);
         } catch (Throwable t) {
             System.out.println(code);
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/InputFormatCodeGenerator.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/InputFormatCodeGenerator.scala
index 63d7c7d..80f677c 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/InputFormatCodeGenerator.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/InputFormatCodeGenerator.scala
@@ -72,17 +72,17 @@ object InputFormatCodeGenerator {
 
         @Override
         public Object nextRecord(Object reuse) {
-          switch (nextIdx) {
-            ${records.zipWithIndex.map { case (r, i) =>
-              s"""
-                 |case $i:
-                 |  $r
-                 |break;
-                       """.stripMargin
-            }.mkString("\n")}
-          }
-          nextIdx++;
-          return $outRecordTerm;
+          ${records.zipWithIndex.map { case (r, i) =>
+            s"""
+              |if (nextIdx == $i) {
+              |  $r
+              |  nextIdx++;
+              |  return $outRecordTerm;
+              |}
+              |""".stripMargin
+          }.mkString("")}
+          throw new IllegalStateException(
+            "Invalid nextIdx " + nextIdx + ". This is a bug. Please file an 
issue");
         }
       }
     """.stripMargin
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CodeSplitITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CodeSplitITCase.scala
index ab1b7023..7b10375 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CodeSplitITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CodeSplitITCase.scala
@@ -20,6 +20,7 @@ package org.apache.flink.table.planner.runtime.batch.sql
 
 import org.apache.flink.core.testutils.FlinkMatchers
 import org.apache.flink.table.api.config.TableConfigOptions
+import org.apache.flink.table.planner.factories.TestValuesTableFactory
 import org.apache.flink.table.planner.runtime.utils.BatchTestBase
 import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row
 import 
org.apache.flink.table.planner.runtime.utils.TestData.{nullablesOfData3, 
smallData3, type3}
@@ -92,6 +93,34 @@ class CodeSplitITCase extends BatchTestBase {
     runTest(sql.mkString, Seq(result))
   }
 
+  @Test
+  def testManyValues(): Unit = {
+    tEnv.executeSql(
+      s"""
+         |CREATE TABLE test_many_values (
+         |${Range(0, 100).map(i => s"  f$i INT").mkString(",\n")}
+         |) WITH (
+         |  'connector' = 'values'
+         |)
+         |""".stripMargin
+    ).await()
+
+    tEnv.executeSql(
+      s"""
+         |INSERT INTO test_many_values VALUES
+         |${Range(0, 100)
+        .map(i => "(" + Range(0, 100).map(_ => s"$i").mkString(", ") + ")")
+        .mkString(", ")}
+         |""".stripMargin
+    ).await()
+
+    val expected = new java.util.ArrayList[String]()
+    for (i <- 0 until 100) {
+      expected.add(s"+I[${Range(0, 100).map(_ => s"$i").mkString(", ")}]")
+    }
+    Assert.assertEquals(expected, 
TestValuesTableFactory.getResults("test_many_values"))
+  }
+
   private[flink] def runTest(sql: String, results: Seq[Row]): Unit = {
     tEnv.getConfig.getConfiguration.setInteger(
       TableConfigOptions.MAX_LENGTH_GENERATED_CODE, 4000)

Reply via email to