Repository: flink
Updated Branches:
  refs/heads/release-1.5 cdbd5e1cc -> 7cf56bc51


[FLINK-8274] [table] Split generated methods for preventing compiler exceptions

This closes #5613.
This closes #5174.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7cf56bc5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7cf56bc5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7cf56bc5

Branch: refs/heads/release-1.5
Commit: 7cf56bc51c47d8d7cd943f44d2dad83e50324d05
Parents: cdbd5e1
Author: Timo Walther <twal...@apache.org>
Authored: Thu Mar 1 16:26:21 2018 +0100
Committer: Timo Walther <twal...@apache.org>
Committed: Mon Mar 12 11:33:05 2018 +0100

----------------------------------------------------------------------
 .../apache/flink/table/api/TableConfig.scala    |  33 ++++-
 .../flink/table/codegen/CodeGenerator.scala     | 143 +++++++++++++++----
 .../table/codegen/CollectorCodeGenerator.scala  |  66 +++++----
 .../table/codegen/FunctionCodeGenerator.scala   |  96 +++++++------
 .../codegen/InputFormatCodeGenerator.scala      |   6 +-
 .../runtime/batch/table/CorrelateITCase.scala   |  15 ++
 .../table/runtime/stream/sql/SqlITCase.scala    |  54 ++++++-
 .../table/runtime/utils/StreamTestData.scala    |   6 +
 .../utils/StreamingWithStateTestBase.scala      |   3 +-
 9 files changed, 320 insertions(+), 102 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7cf56bc5/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableConfig.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableConfig.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableConfig.scala
index c78a022..51c9a37 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableConfig.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableConfig.scala
@@ -40,13 +40,19 @@ class TableConfig {
   /**
     * Defines the configuration of Calcite for Table API and SQL queries.
     */
-  private var calciteConfig = CalciteConfig.DEFAULT
+  private var calciteConfig: CalciteConfig = CalciteConfig.DEFAULT
 
   /**
     * Defines the default context for decimal division calculation.
     * We use Scala's default MathContext.DECIMAL128.
     */
-  private var decimalContext = MathContext.DECIMAL128
+  private var decimalContext: MathContext = MathContext.DECIMAL128
+
+  /**
+    * Specifies a threshold where generated code will be split into 
sub-function calls. Java has a
+    * maximum method length of 64 KB. This setting allows for finer 
granularity if necessary.
+    */
+  private var maxGeneratedCodeLength: Int = 64000 // just an estimate
 
   /**
    * Sets the timezone for date/time/timestamp conversions.
@@ -59,12 +65,12 @@ class TableConfig {
   /**
    * Returns the timezone for date/time/timestamp conversions.
    */
-  def getTimeZone = timeZone
+  def getTimeZone: TimeZone = timeZone
 
   /**
    * Returns the NULL check. If enabled, all fields need to be checked for 
NULL first.
    */
-  def getNullCheck = nullCheck
+  def getNullCheck: Boolean = nullCheck
 
   /**
    * Sets the NULL check. If enabled, all fields need to be checked for NULL 
first.
@@ -99,6 +105,25 @@ class TableConfig {
   def setDecimalContext(mathContext: MathContext): Unit = {
     this.decimalContext = mathContext
   }
+
+  /**
+    * Returns the current threshold where generated code will be split into 
sub-function calls.
+    * Java has a maximum method length of 64 KB. This setting allows for finer 
granularity if
+    * necessary. Default is 64000.
+    */
+  def getMaxGeneratedCodeLength: Int = maxGeneratedCodeLength
+
+  /**
+    * Returns the current threshold where generated code will be split into 
sub-function calls.
+    * Java has a maximum method length of 64 KB. This setting allows for finer 
granularity if
+    * necessary. Default is 64000.
+    */
+  def setMaxGeneratedCodeLength(maxGeneratedCodeLength: Int): Unit = {
+    if (maxGeneratedCodeLength <= 0) {
+      throw new IllegalArgumentException("Length must be greater than 0.")
+    }
+    this.maxGeneratedCodeLength = maxGeneratedCodeLength
+  }
 }
 
 object TableConfig {

http://git-wip-us.apache.org/repos/asf/flink/blob/7cf56bc5/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
index e4064d6..44885e3 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
@@ -109,31 +109,45 @@ abstract class CodeGenerator(
 
   // set of member statements that will be added only once
   // we use a LinkedHashSet to keep the insertion order
-  protected val reusableMemberStatements = mutable.LinkedHashSet[String]()
+  protected val reusableMemberStatements: mutable.LinkedHashSet[String] =
+    mutable.LinkedHashSet[String]()
 
   // set of constructor statements that will be added only once
   // we use a LinkedHashSet to keep the insertion order
-  protected val reusableInitStatements = mutable.LinkedHashSet[String]()
+  protected val reusableInitStatements: mutable.LinkedHashSet[String] =
+    mutable.LinkedHashSet[String]()
 
   // set of open statements for RichFunction that will be added only once
   // we use a LinkedHashSet to keep the insertion order
-  protected val reusableOpenStatements = mutable.LinkedHashSet[String]()
+  protected val reusableOpenStatements: mutable.LinkedHashSet[String] =
+    mutable.LinkedHashSet[String]()
 
   // set of close statements for RichFunction that will be added only once
   // we use a LinkedHashSet to keep the insertion order
-  protected val reusableCloseStatements = mutable.LinkedHashSet[String]()
+  protected val reusableCloseStatements: mutable.LinkedHashSet[String] =
+    mutable.LinkedHashSet[String]()
 
-  // set of statements that will be added only once per record
+  // set of statements that will be added only once per record;
+  // code should only update member variables because local variables are not 
accessible if
+  // the code needs to be split;
   // we use a LinkedHashSet to keep the insertion order
-  protected val reusablePerRecordStatements = mutable.LinkedHashSet[String]()
+  protected val reusablePerRecordStatements: mutable.LinkedHashSet[String] =
+    mutable.LinkedHashSet[String]()
 
   // map of initial input unboxing expressions that will be added only once
   // (inputTerm, index) -> expr
-  protected val reusableInputUnboxingExprs = mutable.Map[(String, Int), 
GeneratedExpression]()
+  protected val reusableInputUnboxingExprs: mutable.Map[(String, Int), 
GeneratedExpression] =
+    mutable.Map[(String, Int), GeneratedExpression]()
 
   // set of constructor statements that will be added only once
   // we use a LinkedHashSet to keep the insertion order
-  protected val reusableConstructorStatements = mutable.LinkedHashSet[(String, 
String)]()
+  protected val reusableConstructorStatements: mutable.LinkedHashSet[(String, 
String)] =
+    mutable.LinkedHashSet[(String, String)]()
+
+  /**
+    * Flag that indicates that the generated code needed to be split into 
several methods.
+    */
+  protected var hasCodeSplits: Boolean = false
 
   /**
     * @return code block of statements that need to be placed in the member 
area of the Function
@@ -384,7 +398,7 @@ abstract class CodeGenerator(
     returnType match {
       case ri: RowTypeInfo =>
         addReusableOutRecord(ri)
-        val resultSetters: String = boxedFieldExprs.zipWithIndex map {
+        val resultSetters = boxedFieldExprs.zipWithIndex map {
           case (fieldExpr, i) =>
             if (nullCheck) {
               s"""
@@ -403,13 +417,15 @@ abstract class CodeGenerator(
               |$outRecordTerm.setField($i, ${fieldExpr.resultTerm});
               |""".stripMargin
             }
-        } mkString "\n"
+        }
+
+        val code = generateCodeSplits(resultSetters)
 
-        GeneratedExpression(outRecordTerm, "false", resultSetters, returnType)
+        GeneratedExpression(outRecordTerm, NEVER_NULL, code, returnType)
 
       case pt: PojoTypeInfo[_] =>
         addReusableOutRecord(pt)
-        val resultSetters: String = boxedFieldExprs.zip(resultFieldNames) map {
+        val resultSetters = boxedFieldExprs.zip(resultFieldNames) map {
           case (fieldExpr, fieldName) =>
             val accessor = getFieldAccessor(pt.getTypeClass, fieldName)
 
@@ -474,13 +490,15 @@ abstract class CodeGenerator(
                     |""".stripMargin
                 }
               }
-          } mkString "\n"
+          }
+
+        val code = generateCodeSplits(resultSetters)
 
-        GeneratedExpression(outRecordTerm, "false", resultSetters, returnType)
+        GeneratedExpression(outRecordTerm, NEVER_NULL, code, returnType)
 
       case tup: TupleTypeInfo[_] =>
         addReusableOutRecord(tup)
-        val resultSetters: String = boxedFieldExprs.zipWithIndex map {
+        val resultSetters = boxedFieldExprs.zipWithIndex map {
           case (fieldExpr, i) =>
             val fieldName = "f" + i
             if (nullCheck) {
@@ -500,11 +518,13 @@ abstract class CodeGenerator(
                 |$outRecordTerm.$fieldName = ${fieldExpr.resultTerm};
                 |""".stripMargin
             }
-        } mkString "\n"
+        }
+
+        val code = generateCodeSplits(resultSetters)
 
-        GeneratedExpression(outRecordTerm, "false", resultSetters, returnType)
+        GeneratedExpression(outRecordTerm, NEVER_NULL, code, returnType)
 
-      case cc: CaseClassTypeInfo[_] =>
+      case _: CaseClassTypeInfo[_] =>
         val fieldCodes: String = boxedFieldExprs.map(_.code).mkString("\n")
         val constructorParams: String = 
boxedFieldExprs.map(_.resultTerm).mkString(", ")
         val resultTerm = newName(outRecordTerm)
@@ -528,9 +548,10 @@ abstract class CodeGenerator(
             |$returnTypeTerm $resultTerm = new 
$returnTypeTerm($constructorParams);
             |""".stripMargin
 
-        GeneratedExpression(resultTerm, "false", resultCode, returnType)
+        // case classes are not splittable
+        GeneratedExpression(resultTerm, NEVER_NULL, resultCode, returnType)
 
-      case t: TypeInformation[_] =>
+      case _: TypeInformation[_] =>
         val fieldExpr = boxedFieldExprs.head
         val nullCheckCode = if (nullCheck) {
           s"""
@@ -547,7 +568,8 @@ abstract class CodeGenerator(
             |$nullCheckCode
             |""".stripMargin
 
-        GeneratedExpression(fieldExpr.resultTerm, "false", resultCode, 
returnType)
+        // other types are not splittable
+        GeneratedExpression(fieldExpr.resultTerm, fieldExpr.nullTerm, 
resultCode, returnType)
 
       case _ =>
         throw new CodeGenException(s"Unsupported result type: $returnType")
@@ -1024,6 +1046,55 @@ abstract class CodeGenerator(
   // generator helping methods
   // 
----------------------------------------------------------------------------------------------
 
+  private def generateCodeSplits(splits: Seq[String]): String = {
+    val totalLen = splits.map(_.length + 1).sum // 1 for a line break
+
+    // split
+    if (totalLen > config.getMaxGeneratedCodeLength) {
+
+      hasCodeSplits = true
+
+      // add input unboxing to member area such that all split functions can 
access it
+      reusableInputUnboxingExprs.foreach { case (_, expr) =>
+
+        // declaration
+        val resultTypeTerm = primitiveTypeTermForTypeInfo(expr.resultType)
+        if (nullCheck) {
+          reusableMemberStatements.add(s"private boolean ${expr.nullTerm};")
+        }
+        reusableMemberStatements.add(s"private $resultTypeTerm 
${expr.resultTerm};")
+
+        // assignment
+        if (nullCheck) {
+          reusablePerRecordStatements.add(s"this.${expr.nullTerm} = 
${expr.nullTerm};")
+        }
+        reusablePerRecordStatements.add(s"this.${expr.resultTerm} = 
${expr.resultTerm};")
+      }
+
+      // add split methods to the member area and return the code necessary to 
call those methods
+      val methodCalls = splits.map { split =>
+        val methodName = newName(s"split")
+
+        val method =
+          s"""
+            |private final void $methodName() throws Exception {
+            |  $split
+            |}
+            |""".stripMargin
+        reusableMemberStatements.add(method)
+
+        // create method call
+        s"$methodName();"
+      }
+
+      methodCalls.mkString("\n")
+    }
+    // don't split
+    else {
+      splits.mkString("\n")
+    }
+  }
+
   private def generateFieldAccess(refExpr: GeneratedExpression, index: Int): 
GeneratedExpression = {
 
     val fieldAccessExpr = generateFieldAccess(
@@ -1644,9 +1715,13 @@ abstract class CodeGenerator(
   def addReusableTimestamp(): String = {
     val fieldTerm = s"timestamp"
 
+    // declaration
+    reusableMemberStatements.add(s"private long $fieldTerm;")
+
+    // assignment
     val field =
       s"""
-        |final long $fieldTerm = java.lang.System.currentTimeMillis();
+        |$fieldTerm = java.lang.System.currentTimeMillis();
         |""".stripMargin
     reusablePerRecordStatements.add(field)
     fieldTerm
@@ -1660,9 +1735,13 @@ abstract class CodeGenerator(
 
     val timestamp = addReusableTimestamp()
 
+    // declaration
+    reusableMemberStatements.add(s"private long $fieldTerm;")
+
+    // assignment
     val field =
       s"""
-        |final long $fieldTerm = $timestamp + 
java.util.TimeZone.getDefault().getOffset(timestamp);
+        |$fieldTerm = $timestamp + 
java.util.TimeZone.getDefault().getOffset($timestamp);
         |""".stripMargin
     reusablePerRecordStatements.add(field)
     fieldTerm
@@ -1676,10 +1755,14 @@ abstract class CodeGenerator(
 
     val timestamp = addReusableTimestamp()
 
+    // declaration
+    reusableMemberStatements.add(s"private int $fieldTerm;")
+
+    // assignment
     // adopted from org.apache.calcite.runtime.SqlFunctions.currentTime()
     val field =
       s"""
-        |final int $fieldTerm = (int) ($timestamp % 
${DateTimeUtils.MILLIS_PER_DAY});
+        |$fieldTerm = (int) ($timestamp % ${DateTimeUtils.MILLIS_PER_DAY});
         |if (time < 0) {
         |  time += ${DateTimeUtils.MILLIS_PER_DAY};
         |}
@@ -1696,10 +1779,14 @@ abstract class CodeGenerator(
 
     val localtimestamp = addReusableLocalTimestamp()
 
+    // declaration
+    reusableMemberStatements.add(s"private int $fieldTerm;")
+
+    // assignment
     // adopted from org.apache.calcite.runtime.SqlFunctions.localTime()
     val field =
       s"""
-        |final int $fieldTerm = (int) ($localtimestamp % 
${DateTimeUtils.MILLIS_PER_DAY});
+        |$fieldTerm = (int) ($localtimestamp % 
${DateTimeUtils.MILLIS_PER_DAY});
         |""".stripMargin
     reusablePerRecordStatements.add(field)
     fieldTerm
@@ -1715,10 +1802,14 @@ abstract class CodeGenerator(
     val timestamp = addReusableTimestamp()
     val time = addReusableTime()
 
+    // declaration
+    reusableMemberStatements.add(s"private int $fieldTerm;")
+
+    // assignment
     // adopted from org.apache.calcite.runtime.SqlFunctions.currentDate()
     val field =
       s"""
-        |final int $fieldTerm = (int) ($timestamp / 
${DateTimeUtils.MILLIS_PER_DAY});
+        |$fieldTerm = (int) ($timestamp / ${DateTimeUtils.MILLIS_PER_DAY});
         |if ($time < 0) {
         |  $fieldTerm -= 1;
         |}

http://git-wip-us.apache.org/repos/asf/flink/blob/7cf56bc5/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CollectorCodeGenerator.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CollectorCodeGenerator.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CollectorCodeGenerator.scala
index 70f6638..9fc76e3 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CollectorCodeGenerator.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CollectorCodeGenerator.scala
@@ -61,38 +61,54 @@ class CollectorCodeGenerator(
     * @return instance of GeneratedCollector
     */
   def generateTableFunctionCollector(
-    name: String,
-    bodyCode: String,
-    collectedType: TypeInformation[Any])
-  : GeneratedCollector = {
+      name: String,
+      bodyCode: String,
+      collectedType: TypeInformation[Any])
+    : GeneratedCollector = {
 
     val className = newName(name)
     val input1TypeClass = boxedTypeTermForTypeInfo(input1)
     val input2TypeClass = boxedTypeTermForTypeInfo(collectedType)
 
-    val funcCode = j"""
-      public class $className extends 
${classOf[TableFunctionCollector[_]].getCanonicalName} {
-
-        ${reuseMemberCode()}
+    // declaration in case of code splits
+    val recordMember = if (hasCodeSplits) {
+      s"private $input2TypeClass $input2Term;"
+    } else {
+      ""
+    }
 
-        public $className() throws Exception {
-          ${reuseInitCode()}
-        }
+    // assignment in case of code splits
+    val recordAssignment = if (hasCodeSplits) {
+      s"$input2Term" // use member
+    } else {
+      s"$input2TypeClass $input2Term" // local variable
+    }
 
-        @Override
-        public void collect(Object record) throws Exception {
-          super.collect(record);
-          $input1TypeClass $input1Term = ($input1TypeClass) getInput();
-          $input2TypeClass $input2Term = ($input2TypeClass) record;
-          ${reuseInputUnboxingCode()}
-          $bodyCode
-        }
-
-        @Override
-        public void close() {
-        }
-      }
-    """.stripMargin
+    val funcCode = j"""
+      |public class $className extends 
${classOf[TableFunctionCollector[_]].getCanonicalName} {
+      |
+      |  $recordMember
+      |  ${reuseMemberCode()}
+      |
+      |  public $className() throws Exception {
+      |    ${reuseInitCode()}
+      |  }
+      |
+      |  @Override
+      |  public void collect(Object record) throws Exception {
+      |    super.collect(record);
+      |    $input1TypeClass $input1Term = ($input1TypeClass) getInput();
+      |    $recordAssignment = ($input2TypeClass) record;
+      |    ${reuseInputUnboxingCode()}
+      |    ${reusePerRecordCode()}
+      |    $bodyCode
+      |  }
+      |
+      |  @Override
+      |  public void close() {
+      |  }
+      |}
+      |""".stripMargin
 
     GeneratedCollector(className, funcCode)
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/7cf56bc5/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/FunctionCodeGenerator.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/FunctionCodeGenerator.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/FunctionCodeGenerator.scala
index 2bd2fe7..8ac18cd 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/FunctionCodeGenerator.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/FunctionCodeGenerator.scala
@@ -24,6 +24,7 @@ import 
org.apache.flink.streaming.api.functions.ProcessFunction
 import org.apache.flink.table.api.TableConfig
 import org.apache.flink.table.codegen.CodeGenUtils.{boxedTypeTermForTypeInfo, 
newName}
 import org.apache.flink.table.codegen.Indenter.toISC
+import org.apache.flink.util.Collector
 
 /**
   * A code generator for generating Flink 
[[org.apache.flink.api.common.functions.Function]]s.
@@ -85,22 +86,23 @@ class FunctionCodeGenerator(
     * @return instance of GeneratedFunction
     */
   def generateFunction[F <: Function, T <: Any](
-    name: String,
-    clazz: Class[F],
-    bodyCode: String,
-    returnType: TypeInformation[T])
-  : GeneratedFunction[F, T] = {
+      name: String,
+      clazz: Class[F],
+      bodyCode: String,
+      returnType: TypeInformation[T])
+    : GeneratedFunction[F, T] = {
     val funcName = newName(name)
+    val collectorTypeTerm = classOf[Collector[Any]].getCanonicalName
 
     // Janino does not support generics, that's why we need
     // manual casting here
-    val samHeader =
+    val (functionClass, signature, inputStatements) =
     // FlatMapFunction
     if (clazz == classOf[FlatMapFunction[_, _]]) {
       val baseClass = classOf[RichFlatMapFunction[_, _]]
       val inputTypeTerm = boxedTypeTermForTypeInfo(input1)
       (baseClass,
-        s"void flatMap(Object _in1, org.apache.flink.util.Collector 
$collectorTerm)",
+        s"void flatMap(Object _in1, $collectorTypeTerm $collectorTerm)",
         List(s"$inputTypeTerm $input1Term = ($inputTypeTerm) _in1;"))
     }
 
@@ -120,7 +122,7 @@ class FunctionCodeGenerator(
       val inputTypeTerm2 = boxedTypeTermForTypeInfo(input2.getOrElse(
         throw new CodeGenException("Input 2 for FlatJoinFunction should not be 
null")))
       (baseClass,
-        s"void join(Object _in1, Object _in2, org.apache.flink.util.Collector 
$collectorTerm)",
+        s"void join(Object _in1, Object _in2, $collectorTypeTerm 
$collectorTerm)",
         List(s"$inputTypeTerm1 $input1Term = ($inputTypeTerm1) _in1;",
              s"$inputTypeTerm2 $input2Term = ($inputTypeTerm2) _in2;"))
     }
@@ -141,11 +143,22 @@ class FunctionCodeGenerator(
     else if (clazz == classOf[ProcessFunction[_, _]]) {
       val baseClass = classOf[ProcessFunction[_, _]]
       val inputTypeTerm = boxedTypeTermForTypeInfo(input1)
+      val contextTypeTerm = classOf[ProcessFunction[Any, 
Any]#Context].getCanonicalName
+
+      // make context accessible also for split code
+      val globalContext = if (hasCodeSplits) {
+        // declaration
+        reusableMemberStatements.add(s"private $contextTypeTerm $contextTerm;")
+        // assignment
+        List(s"this.$contextTerm = $contextTerm;")
+      } else {
+        Nil
+      }
+
       (baseClass,
-        s"void processElement(Object _in1, " +
-          s"org.apache.flink.streaming.api.functions.ProcessFunction.Context 
$contextTerm," +
-          s"org.apache.flink.util.Collector $collectorTerm)",
-        List(s"$inputTypeTerm $input1Term = ($inputTypeTerm) _in1;"))
+        s"void processElement(Object _in1, $contextTypeTerm $contextTerm, " +
+          s"$collectorTypeTerm $collectorTerm)",
+        List(s"$inputTypeTerm $input1Term = ($inputTypeTerm) _in1;") ++ 
globalContext)
     }
     else {
       // TODO more functions
@@ -153,36 +166,35 @@ class FunctionCodeGenerator(
     }
 
     val funcCode = j"""
-      public class $funcName
-          extends ${samHeader._1.getCanonicalName} {
-
-        ${reuseMemberCode()}
-
-        public $funcName() throws Exception {
-          ${reuseInitCode()}
-        }
-
-        ${reuseConstructorCode(funcName)}
-
-        @Override
-        public void open(${classOf[Configuration].getCanonicalName} 
parameters) throws Exception {
-          ${reuseOpenCode()}
-        }
-
-        @Override
-        public ${samHeader._2} throws Exception {
-          ${samHeader._3.mkString("\n")}
-          ${reusePerRecordCode()}
-          ${reuseInputUnboxingCode()}
-          $bodyCode
-        }
-
-        @Override
-        public void close() throws Exception {
-          ${reuseCloseCode()}
-        }
-      }
-    """.stripMargin
+      |public class $funcName extends ${functionClass.getCanonicalName} {
+      |
+      |  ${reuseMemberCode()}
+      |
+      |  public $funcName() throws Exception {
+      |    ${reuseInitCode()}
+      |  }
+      |
+      |  ${reuseConstructorCode(funcName)}
+      |
+      |  @Override
+      |  public void open(${classOf[Configuration].getCanonicalName} 
parameters) throws Exception {
+      |    ${reuseOpenCode()}
+      |  }
+      |
+      |  @Override
+      |  public $signature throws Exception {
+      |    ${inputStatements.mkString("\n")}
+      |    ${reuseInputUnboxingCode()}
+      |    ${reusePerRecordCode()}
+      |    $bodyCode
+      |  }
+      |
+      |  @Override
+      |  public void close() throws Exception {
+      |    ${reuseCloseCode()}
+      |  }
+      |}
+      |""".stripMargin
 
     GeneratedFunction(funcName, returnType, funcCode)
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/7cf56bc5/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/InputFormatCodeGenerator.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/InputFormatCodeGenerator.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/InputFormatCodeGenerator.scala
index 6d6e1b6..30d3300 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/InputFormatCodeGenerator.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/InputFormatCodeGenerator.scala
@@ -71,12 +71,16 @@ class InputFormatCodeGenerator(
         }
 
         @Override
-        public Object nextRecord(Object reuse) {
+        public Object nextRecord(Object reuse) throws java.io.IOException {
           switch (nextIdx) {
             ${records.zipWithIndex.map { case (r, i) =>
               s"""
                  |case $i:
+                 |try {
                  |  $r
+                 |} catch (Exception e) {
+                 |  throw new java.io.IOException(e);
+                 |}
                  |break;
                        """.stripMargin
             }.mkString("\n")}

http://git-wip-us.apache.org/repos/asf/flink/blob/7cf56bc5/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CorrelateITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CorrelateITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CorrelateITCase.scala
index 828a9e2..b385015 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CorrelateITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CorrelateITCase.scala
@@ -81,6 +81,21 @@ class CorrelateITCase(
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
+  @Test
+  def testLeftOuterJoinWithSplit(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tableEnv = TableEnvironment.getTableEnvironment(env, config)
+    tableEnv.getConfig.setMaxGeneratedCodeLength(1) // split every field
+    val in = testData(env).toTable(tableEnv).as('a, 'b, 'c)
+
+    val func2 = new TableFunc2
+    val result = in.leftOuterJoin(func2('c) as ('s, 'l)).select('c, 's, 
'l).toDataSet[Row]
+    val results = result.collect()
+    val expected = "Jack#22,Jack,4\n" + "Jack#22,22,2\n" + "John#19,John,4\n" +
+      "John#19,19,2\n" + "Anna#44,Anna,4\n" + "Anna#44,44,2\n" + 
"nosharp,null,null"
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
   /**
     * Common join predicates are temporarily forbidden (see FLINK-7865).
     */

http://git-wip-us.apache.org/repos/asf/flink/blob/7cf56bc5/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
index 1e2cf9c..b7950b7 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
@@ -173,7 +173,7 @@ class SqlITCase extends StreamingWithStateTestBase {
     val t = StreamTestData.get3TupleDataStream(env).toTable(tEnv).as('a, 'b, 
'c)
     tEnv.registerTable("MyTable", t)
 
-    val result = tEnv.sql(sqlQuery).toRetractStream[Row]
+    val result = tEnv.sqlQuery(sqlQuery).toRetractStream[Row]
     result.addSink(new StreamITCase.RetractingSink).setParallelism(1)
     env.execute()
 
@@ -208,7 +208,7 @@ class SqlITCase extends StreamingWithStateTestBase {
     tEnv.registerTable("MyTable",
       env.fromCollection(data).toTable(tEnv).as('a, 'b, 'c))
 
-    val result = tEnv.sql(sqlQuery).toRetractStream[Row]
+    val result = tEnv.sqlQuery(sqlQuery).toRetractStream[Row]
     result.addSink(new StreamITCase.RetractingSink).setParallelism(1)
     env.execute()
 
@@ -261,6 +261,27 @@ class SqlITCase extends StreamingWithStateTestBase {
     assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
 
+  @Test
+  def testSelectExpressionWithSplitFromTable(): Unit = {
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    tEnv.getConfig.setMaxGeneratedCodeLength(1) // split every field
+    StreamITCase.clear
+
+    val sqlQuery = "SELECT a * 2, b - 1 FROM MyTable"
+
+    val t = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 
'b, 'c)
+    tEnv.registerTable("MyTable", t)
+
+    val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+    result.addSink(new StreamITCase.StringSink[Row])
+    env.execute()
+
+    val expected = List("2,0", "4,1", "6,1")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
   /** test filtering with registered table **/
   @Test
   def testSimpleFilter(): Unit = {
@@ -580,7 +601,7 @@ class SqlITCase extends StreamingWithStateTestBase {
 
     tEnv.registerTable("T1", t1)
 
-    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+    val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
     result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
@@ -638,6 +659,33 @@ class SqlITCase extends StreamingWithStateTestBase {
       "3,3300")
     assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
+
+  @Test
+  def testVeryBigQuery(): Unit = {
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.clear
+
+    val t = StreamTestData.getSingletonDataStream(env).toTable(tEnv).as('a, 
'b, 'c)
+    tEnv.registerTable("MyTable", t)
+
+    val sqlQuery = new StringBuilder
+    sqlQuery.append("SELECT ")
+    val expected = new StringBuilder
+    for (i <- 0 until 500) {
+      sqlQuery.append(s"a + b + $i, ")
+      expected.append((1 + 42L + i).toString + ",")
+    }
+    sqlQuery.append("c FROM MyTable")
+    expected.append("Hi")
+
+    val result = tEnv.sqlQuery(sqlQuery.toString()).toAppendStream[Row]
+    result.addSink(new StreamITCase.StringSink[Row])
+    env.execute()
+
+    assertEquals(List(expected.toString()), StreamITCase.testResults.sorted)
+  }
 }
 
 object SqlITCase {

http://git-wip-us.apache.org/repos/asf/flink/blob/7cf56bc5/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/StreamTestData.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/StreamTestData.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/StreamTestData.scala
index 58d3c63..ef98791 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/StreamTestData.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/StreamTestData.scala
@@ -25,6 +25,12 @@ import scala.collection.mutable
 
 object StreamTestData {
 
+  def getSingletonDataStream(env: StreamExecutionEnvironment): 
DataStream[(Int, Long, String)] = {
+    val data = new mutable.MutableList[(Int, Long, String)]
+    data.+=((1, 42L, "Hi"))
+    env.fromCollection(data)
+  }
+
   def getSmall3TupleDataStream(env: StreamExecutionEnvironment): 
DataStream[(Int, Long, String)] = {
     val data = new mutable.MutableList[(Int, Long, String)]
     data.+=((1, 1L, "Hi"))

http://git-wip-us.apache.org/repos/asf/flink/blob/7cf56bc5/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/StreamingWithStateTestBase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/StreamingWithStateTestBase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/StreamingWithStateTestBase.scala
index 5cfab4a..b3eeb59 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/StreamingWithStateTestBase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/StreamingWithStateTestBase.scala
@@ -18,6 +18,7 @@
 package org.apache.flink.table.runtime.utils
 
 import org.apache.flink.contrib.streaming.state.RocksDBStateBackend
+import org.apache.flink.runtime.state.StateBackend
 import org.apache.flink.test.util.AbstractTestBase
 import org.junit.Rule
 import org.junit.rules.TemporaryFolder
@@ -29,7 +30,7 @@ class StreamingWithStateTestBase extends AbstractTestBase {
   @Rule
   def tempFolder: TemporaryFolder = _tempFolder
 
-  def getStateBackend: RocksDBStateBackend = {
+  def getStateBackend: StateBackend = {
     val dbPath = tempFolder.newFolder().getAbsolutePath
     val checkpointPath = tempFolder.newFolder().toURI.toString
     val backend = new RocksDBStateBackend(checkpointPath)

Reply via email to