This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.10 by this push:
     new 03eeb11  [FLINK-15430][table-planner-blink] Split generated methods 
for Calc to prevent compiler exceptions (#10737)
03eeb11 is described below

commit 03eeb117b067c767576553040ac950ff6ba3249d
Author: Benchao Li <[email protected]>
AuthorDate: Mon Jan 6 19:50:00 2020 +0800

    [FLINK-15430][table-planner-blink] Split generated methods for Calc to 
prevent compiler exceptions (#10737)
---
 .../table/planner/codegen/CalcCodeGenerator.scala  |  9 ++++---
 .../planner/codegen/CodeGeneratorContext.scala     | 31 +++++++++++++++++++---
 .../table/planner/codegen/ExprCodeGenerator.scala  | 31 ++++++++++++++++++----
 .../planner/codegen/GeneratedExpression.scala      |  4 +--
 .../codegen/agg/batch/HashAggCodeGenHelper.scala   |  3 ++-
 .../planner/runtime/stream/sql/CalcITCase.scala    | 24 +++++++++++++++++
 6 files changed, 88 insertions(+), 14 deletions(-)

diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CalcCodeGenerator.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CalcCodeGenerator.scala
index 2ab0223..ef65f2f 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CalcCodeGenerator.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CalcCodeGenerator.scala
@@ -56,7 +56,8 @@ object CalcCodeGenerator {
       calcProgram,
       condition,
       eagerInputUnboxingCode = true,
-      retainHeader = retainHeader)
+      retainHeader = retainHeader,
+      allowSplit = true)
 
     val genOperator =
       OperatorCodeGenerator.generateOneInputStreamOperator[BaseRow, BaseRow](
@@ -119,7 +120,8 @@ object CalcCodeGenerator {
       collectorTerm: String = CodeGenUtils.DEFAULT_OPERATOR_COLLECTOR_TERM,
       eagerInputUnboxingCode: Boolean,
       retainHeader: Boolean = false,
-      outputDirectly: Boolean = false): String = {
+      outputDirectly: Boolean = false,
+      allowSplit: Boolean = false): String = {
 
     val projection = calcProgram.getProjectList.map(calcProgram.expandLocalRef)
     val exprGenerator = new ExprCodeGenerator(ctx, false)
@@ -151,7 +153,8 @@ object CalcCodeGenerator {
         exprGenerator.generateResultExpression(
           projectionExprs,
           outRowType,
-          outRowClass)
+          outRowClass,
+          allowSplit = allowSplit)
       }
 
       val projectionExpressionCode = projectionExpression.code
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CodeGeneratorContext.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CodeGeneratorContext.scala
index cc7d681..e624a1a 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CodeGeneratorContext.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CodeGeneratorContext.scala
@@ -108,6 +108,11 @@ class CodeGeneratorContext(val tableConfig: TableConfig) {
     */
   private var currentMethodNameForLocalVariables = "DEFAULT"
 
+  /**
+   * Flag that indicates whether the generated code is split into several 
methods.
+   */
+  private var isCodeSplit = false
+
   // map of local variable statements. It will be placed in method if method 
code not excess
   // max code length, otherwise will be placed in member area of the class. 
The statements
   // are maintained for multiple methods, so that it's a map from method_name 
to variables.
@@ -143,6 +148,13 @@ class CodeGeneratorContext(val tableConfig: TableConfig) {
     reusableLocalVariableStatements(methodName) = 
mutable.LinkedHashSet[String]()
   }
 
+  /**
+   * Set the flag [[isCodeSplit]] to be true, which indicates the generated 
code is split into
+   * several methods.
+   */
+  def setCodeSplit(): Unit = {
+    isCodeSplit = true
+  }
 
   /**
     * Adds a reusable local variable statement with the given type term and 
field name.
@@ -197,7 +209,15 @@ class CodeGeneratorContext(val tableConfig: TableConfig) {
     *         (e.g. member variables and their initialization)
     */
   def reuseMemberCode(): String = {
-    reusableMemberStatements.mkString("\n")
+    val result = reusableMemberStatements.mkString("\n")
+    if (isCodeSplit) {
+      val localVariableAsMember = reusableLocalVariableStatements.map(
+        statements => statements._2.map("private " + _).mkString("\n")
+      ).mkString("\n")
+      result + "\n" + localVariableAsMember
+    } else {
+      result
+    }
   }
 
   /**
@@ -205,7 +225,9 @@ class CodeGeneratorContext(val tableConfig: TableConfig) {
     *         if generated code is split or in local variables of method
     */
   def reuseLocalVariableCode(methodName: String = null): String = {
-    if (methodName == null) {
+    if (isCodeSplit) {
+      GeneratedExpression.NO_CODE
+    } else if (methodName == null) {
       
reusableLocalVariableStatements(currentMethodNameForLocalVariables).mkString("\n")
     } else {
       reusableLocalVariableStatements(methodName).mkString("\n")
@@ -408,9 +430,12 @@ class CodeGeneratorContext(val tableConfig: TableConfig) {
     */
   def addReusableTimestamp(): String = {
     val fieldTerm = s"timestamp"
+
+    reusableMemberStatements.add(s"private $SQL_TIMESTAMP $fieldTerm;")
+
     val field =
       s"""
-         |final $SQL_TIMESTAMP $fieldTerm =
+         |$fieldTerm =
          |  
$SQL_TIMESTAMP.fromEpochMillis(java.lang.System.currentTimeMillis());
          |""".stripMargin
     reusablePerRecordStatements.add(field)
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala
index 5a57705..1c60918 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala
@@ -224,10 +224,11 @@ class ExprCodeGenerator(ctx: CodeGeneratorContext, 
nullableInput: Boolean)
       outRow: String = DEFAULT_OUT_RECORD_TERM,
       outRowWriter: Option[String] = Some(DEFAULT_OUT_RECORD_WRITER_TERM),
       reusedOutRow: Boolean = true,
-      outRowAlreadyExists: Boolean = false): GeneratedExpression = {
+      outRowAlreadyExists: Boolean = false,
+      allowSplit: Boolean = false): GeneratedExpression = {
     val fieldExprIdxToOutputRowPosMap = fieldExprs.indices.map(i => i -> 
i).toMap
     generateResultExpression(fieldExprs, fieldExprIdxToOutputRowPosMap, 
returnType,
-      returnTypeClazz, outRow, outRowWriter, reusedOutRow, outRowAlreadyExists)
+      returnTypeClazz, outRow, outRowWriter, reusedOutRow, 
outRowAlreadyExists, allowSplit)
   }
 
   /**
@@ -253,7 +254,8 @@ class ExprCodeGenerator(ctx: CodeGeneratorContext, 
nullableInput: Boolean)
     outRow: String,
     outRowWriter: Option[String],
     reusedOutRow: Boolean,
-    outRowAlreadyExists: Boolean)
+    outRowAlreadyExists: Boolean,
+    allowSplit: Boolean)
   : GeneratedExpression = {
     // initial type check
     if (returnType.getFieldCount != fieldExprs.length) {
@@ -285,11 +287,30 @@ class ExprCodeGenerator(ctx: CodeGeneratorContext, 
nullableInput: Boolean)
       case _ => // ok
     }
 
-    val setFieldsCode = fieldExprs.zipWithIndex.map { case (fieldExpr, index) 
=>
+    val setFieldsCodes = fieldExprs.zipWithIndex.map { case (fieldExpr, index) 
=>
       val pos = fieldExprIdxToOutputRowPosMap.getOrElse(index,
         throw new CodeGenException(s"Illegal field expr index: $index"))
       baseRowSetField(ctx, returnTypeClazz, outRow, pos.toString, fieldExpr, 
outRowWriter)
-    }.mkString("\n")
+    }
+    val totalLen = setFieldsCodes.map(_.length).sum
+    val maxCodeLength = ctx.tableConfig.getMaxGeneratedCodeLength
+    val setFieldsCode = if (allowSplit && totalLen > maxCodeLength) {
+      // do the split.
+      ctx.setCodeSplit()
+      setFieldsCodes.map(project => {
+        val methodName = newName("split")
+        val method =
+          s"""
+            |private void $methodName() throws Exception {
+            |  $project
+            |}
+            |""".stripMargin
+        ctx.addReusableMember(method)
+        s"$methodName();"
+      }).mkString("\n")
+    } else {
+      setFieldsCodes.mkString("\n")
+    }
 
     val outRowInitCode = if (!outRowAlreadyExists) {
       val initCode = generateRecordStatement(returnType, returnTypeClazz, 
outRow, outRowWriter)
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/GeneratedExpression.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/GeneratedExpression.scala
index 90eddc8..aaec539 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/GeneratedExpression.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/GeneratedExpression.scala
@@ -77,14 +77,14 @@ case class GeneratedExpression(
   def deepCopy(ctx: CodeGeneratorContext): GeneratedExpression = {
     // only copy when type is mutable
     if (TypeCheckUtils.isMutable(resultType)) {
-      val newResultTerm = newName("field")
       // if the type need copy, it must be a boxed type
       val typeTerm = boxedTypeTermForType(resultType)
       val serTerm = ctx.addReusableTypeSerializer(resultType)
+      val newResultTerm = ctx.addReusableLocalVariable(typeTerm, "field")
       val newCode =
         s"""
            |$code
-           |$typeTerm $newResultTerm = $resultTerm;
+           |$newResultTerm = $resultTerm;
            |if (!$nullTerm) {
            |  $newResultTerm = ($typeTerm) ($serTerm.copy($newResultTerm));
            |}
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenHelper.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenHelper.scala
index 098adbf..1d1a176 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenHelper.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenHelper.scala
@@ -405,7 +405,8 @@ object HashAggCodeGenHelper {
       outRow = currentAggBufferTerm,
       outRowWriter = None,
       reusedOutRow = true,
-      outRowAlreadyExists = true
+      outRowAlreadyExists = true,
+      allowSplit = false
     )
   }
 
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala
index fbb37fa..dea17c7 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala
@@ -260,4 +260,28 @@ class CalcITCase extends StreamingTestBase {
     val expected = List("2,2,Hello", "3,2,Hello world")
     assertEquals(expected.sorted, sink.getAppendResults.sorted)
   }
+
+  @Test
+  def testLongProjectionList(): Unit = {
+
+    val t = env.fromCollection(TestData.smallTupleData3)
+      .toTable(tEnv, 'a, 'b, 'c)
+    tEnv.createTemporaryView("MyTable", t)
+
+    val selectList = Stream.range(3, 200)
+      .map(i => s"CASE WHEN a IS NOT NULL AND a > $i THEN 0 WHEN a < 0 THEN 0 
ELSE $i END")
+      .mkString(",")
+    val sqlQuery = s"select $selectList from MyTable"
+
+    val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+    val sink = new TestingAppendSink
+    result.addSink(sink)
+    env.execute()
+
+    val expected = Stream.range(3, 200).map(_.toString).mkString(",")
+    assertEquals(sink.getAppendResults.size, TestData.smallTupleData3.size)
+    sink.getAppendResults.foreach( result =>
+      assertEquals(expected, result)
+    )
+  }
 }

Reply via email to