Repository: flink Updated Branches: refs/heads/release-1.5 cdbd5e1cc -> 7cf56bc51
[FLINK-8274] [table] Split generated methods for preventing compiler exceptions This closes #5613. This closes #5174. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7cf56bc5 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7cf56bc5 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7cf56bc5 Branch: refs/heads/release-1.5 Commit: 7cf56bc51c47d8d7cd943f44d2dad83e50324d05 Parents: cdbd5e1 Author: Timo Walther <twal...@apache.org> Authored: Thu Mar 1 16:26:21 2018 +0100 Committer: Timo Walther <twal...@apache.org> Committed: Mon Mar 12 11:33:05 2018 +0100 ---------------------------------------------------------------------- .../apache/flink/table/api/TableConfig.scala | 33 ++++- .../flink/table/codegen/CodeGenerator.scala | 143 +++++++++++++++---- .../table/codegen/CollectorCodeGenerator.scala | 66 +++++---- .../table/codegen/FunctionCodeGenerator.scala | 96 +++++++------ .../codegen/InputFormatCodeGenerator.scala | 6 +- .../runtime/batch/table/CorrelateITCase.scala | 15 ++ .../table/runtime/stream/sql/SqlITCase.scala | 54 ++++++- .../table/runtime/utils/StreamTestData.scala | 6 + .../utils/StreamingWithStateTestBase.scala | 3 +- 9 files changed, 320 insertions(+), 102 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/7cf56bc5/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableConfig.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableConfig.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableConfig.scala index c78a022..51c9a37 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableConfig.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableConfig.scala @@ -40,13 +40,19 @@ class TableConfig { /** * Defines the configuration of Calcite for Table API and SQL queries. */ - private var calciteConfig = CalciteConfig.DEFAULT + private var calciteConfig: CalciteConfig = CalciteConfig.DEFAULT /** * Defines the default context for decimal division calculation. * We use Scala's default MathContext.DECIMAL128. */ - private var decimalContext = MathContext.DECIMAL128 + private var decimalContext: MathContext = MathContext.DECIMAL128 + + /** + * Specifies a threshold where generated code will be split into sub-function calls. Java has a + * maximum method length of 64 KB. This setting allows for finer granularity if necessary. + */ + private var maxGeneratedCodeLength: Int = 64000 // just an estimate /** * Sets the timezone for date/time/timestamp conversions. @@ -59,12 +65,12 @@ class TableConfig { /** * Returns the timezone for date/time/timestamp conversions. */ - def getTimeZone = timeZone + def getTimeZone: TimeZone = timeZone /** * Returns the NULL check. If enabled, all fields need to be checked for NULL first. */ - def getNullCheck = nullCheck + def getNullCheck: Boolean = nullCheck /** * Sets the NULL check. If enabled, all fields need to be checked for NULL first. @@ -99,6 +105,25 @@ class TableConfig { def setDecimalContext(mathContext: MathContext): Unit = { this.decimalContext = mathContext } + + /** + * Returns the current threshold where generated code will be split into sub-function calls. + * Java has a maximum method length of 64 KB. This setting allows for finer granularity if + * necessary. Default is 64000. + */ + def getMaxGeneratedCodeLength: Int = maxGeneratedCodeLength + + /** + * Returns the current threshold where generated code will be split into sub-function calls. + * Java has a maximum method length of 64 KB. This setting allows for finer granularity if + * necessary. Default is 64000. + */ + def setMaxGeneratedCodeLength(maxGeneratedCodeLength: Int): Unit = { + if (maxGeneratedCodeLength <= 0) { + throw new IllegalArgumentException("Length must be greater than 0.") + } + this.maxGeneratedCodeLength = maxGeneratedCodeLength + } } object TableConfig { http://git-wip-us.apache.org/repos/asf/flink/blob/7cf56bc5/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala index e4064d6..44885e3 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala @@ -109,31 +109,45 @@ abstract class CodeGenerator( // set of member statements that will be added only once // we use a LinkedHashSet to keep the insertion order - protected val reusableMemberStatements = mutable.LinkedHashSet[String]() + protected val reusableMemberStatements: mutable.LinkedHashSet[String] = + mutable.LinkedHashSet[String]() // set of constructor statements that will be added only once // we use a LinkedHashSet to keep the insertion order - protected val reusableInitStatements = mutable.LinkedHashSet[String]() + protected val reusableInitStatements: mutable.LinkedHashSet[String] = + mutable.LinkedHashSet[String]() // set of open statements for RichFunction that will be added only once // we use a LinkedHashSet to keep the insertion order - protected val reusableOpenStatements = mutable.LinkedHashSet[String]() + protected val reusableOpenStatements: mutable.LinkedHashSet[String] = + mutable.LinkedHashSet[String]() // set of close statements for RichFunction that will be added only once // we use a LinkedHashSet to keep the insertion order - protected val reusableCloseStatements = mutable.LinkedHashSet[String]() + protected val reusableCloseStatements: mutable.LinkedHashSet[String] = + mutable.LinkedHashSet[String]() - // set of statements that will be added only once per record + // set of statements that will be added only once per record; + // code should only update member variables because local variables are not accessible if + // the code needs to be split; // we use a LinkedHashSet to keep the insertion order - protected val reusablePerRecordStatements = mutable.LinkedHashSet[String]() + protected val reusablePerRecordStatements: mutable.LinkedHashSet[String] = + mutable.LinkedHashSet[String]() // map of initial input unboxing expressions that will be added only once // (inputTerm, index) -> expr - protected val reusableInputUnboxingExprs = mutable.Map[(String, Int), GeneratedExpression]() + protected val reusableInputUnboxingExprs: mutable.Map[(String, Int), GeneratedExpression] = + mutable.Map[(String, Int), GeneratedExpression]() // set of constructor statements that will be added only once // we use a LinkedHashSet to keep the insertion order - protected val reusableConstructorStatements = mutable.LinkedHashSet[(String, String)]() + protected val reusableConstructorStatements: mutable.LinkedHashSet[(String, String)] = + mutable.LinkedHashSet[(String, String)]() + + /** + * Flag that indicates that the generated code needed to be split into several methods. + */ + protected var hasCodeSplits: Boolean = false /** * @return code block of statements that need to be placed in the member area of the Function @@ -384,7 +398,7 @@ abstract class CodeGenerator( returnType match { case ri: RowTypeInfo => addReusableOutRecord(ri) - val resultSetters: String = boxedFieldExprs.zipWithIndex map { + val resultSetters = boxedFieldExprs.zipWithIndex map { case (fieldExpr, i) => if (nullCheck) { s""" @@ -403,13 +417,15 @@ abstract class CodeGenerator( |$outRecordTerm.setField($i, ${fieldExpr.resultTerm}); |""".stripMargin } - } mkString "\n" + } + + val code = generateCodeSplits(resultSetters) - GeneratedExpression(outRecordTerm, "false", resultSetters, returnType) + GeneratedExpression(outRecordTerm, NEVER_NULL, code, returnType) case pt: PojoTypeInfo[_] => addReusableOutRecord(pt) - val resultSetters: String = boxedFieldExprs.zip(resultFieldNames) map { + val resultSetters = boxedFieldExprs.zip(resultFieldNames) map { case (fieldExpr, fieldName) => val accessor = getFieldAccessor(pt.getTypeClass, fieldName) @@ -474,13 +490,15 @@ abstract class CodeGenerator( |""".stripMargin } } - } mkString "\n" + } + + val code = generateCodeSplits(resultSetters) - GeneratedExpression(outRecordTerm, "false", resultSetters, returnType) + GeneratedExpression(outRecordTerm, NEVER_NULL, code, returnType) case tup: TupleTypeInfo[_] => addReusableOutRecord(tup) - val resultSetters: String = boxedFieldExprs.zipWithIndex map { + val resultSetters = boxedFieldExprs.zipWithIndex map { case (fieldExpr, i) => val fieldName = "f" + i if (nullCheck) { @@ -500,11 +518,13 @@ abstract class CodeGenerator( |$outRecordTerm.$fieldName = ${fieldExpr.resultTerm}; |""".stripMargin } - } mkString "\n" + } + + val code = generateCodeSplits(resultSetters) - GeneratedExpression(outRecordTerm, "false", resultSetters, returnType) + GeneratedExpression(outRecordTerm, NEVER_NULL, code, returnType) - case cc: CaseClassTypeInfo[_] => + case _: CaseClassTypeInfo[_] => val fieldCodes: String = boxedFieldExprs.map(_.code).mkString("\n") val constructorParams: String = boxedFieldExprs.map(_.resultTerm).mkString(", ") val resultTerm = newName(outRecordTerm) @@ -528,9 +548,10 @@ abstract class CodeGenerator( |$returnTypeTerm $resultTerm = new $returnTypeTerm($constructorParams); |""".stripMargin - GeneratedExpression(resultTerm, "false", resultCode, returnType) + // case classes are not splittable + GeneratedExpression(resultTerm, NEVER_NULL, resultCode, returnType) - case t: TypeInformation[_] => + case _: TypeInformation[_] => val fieldExpr = boxedFieldExprs.head val nullCheckCode = if (nullCheck) { s""" @@ -547,7 +568,8 @@ abstract class CodeGenerator( |$nullCheckCode |""".stripMargin - GeneratedExpression(fieldExpr.resultTerm, "false", resultCode, returnType) + // other types are not splittable + GeneratedExpression(fieldExpr.resultTerm, fieldExpr.nullTerm, resultCode, returnType) case _ => throw new CodeGenException(s"Unsupported result type: $returnType") @@ -1024,6 +1046,55 @@ abstract class CodeGenerator( // generator helping methods // ---------------------------------------------------------------------------------------------- + private def generateCodeSplits(splits: Seq[String]): String = { + val totalLen = splits.map(_.length + 1).sum // 1 for a line break + + // split + if (totalLen > config.getMaxGeneratedCodeLength) { + + hasCodeSplits = true + + // add input unboxing to member area such that all split functions can access it + reusableInputUnboxingExprs.foreach { case (_, expr) => + + // declaration + val resultTypeTerm = primitiveTypeTermForTypeInfo(expr.resultType) + if (nullCheck) { + reusableMemberStatements.add(s"private boolean ${expr.nullTerm};") + } + reusableMemberStatements.add(s"private $resultTypeTerm ${expr.resultTerm};") + + // assignment + if (nullCheck) { + reusablePerRecordStatements.add(s"this.${expr.nullTerm} = ${expr.nullTerm};") + } + reusablePerRecordStatements.add(s"this.${expr.resultTerm} = ${expr.resultTerm};") + } + + // add split methods to the member area and return the code necessary to call those methods + val methodCalls = splits.map { split => + val methodName = newName(s"split") + + val method = + s""" + |private final void $methodName() throws Exception { + | $split + |} + |""".stripMargin + reusableMemberStatements.add(method) + + // create method call + s"$methodName();" + } + + methodCalls.mkString("\n") + } + // don't split + else { + splits.mkString("\n") + } + } + private def generateFieldAccess(refExpr: GeneratedExpression, index: Int): GeneratedExpression = { val fieldAccessExpr = generateFieldAccess( @@ -1644,9 +1715,13 @@ abstract class CodeGenerator( def addReusableTimestamp(): String = { val fieldTerm = s"timestamp" + // declaration + reusableMemberStatements.add(s"private long $fieldTerm;") + + // assignment val field = s""" - |final long $fieldTerm = java.lang.System.currentTimeMillis(); + |$fieldTerm = java.lang.System.currentTimeMillis(); |""".stripMargin reusablePerRecordStatements.add(field) fieldTerm @@ -1660,9 +1735,13 @@ abstract class CodeGenerator( val timestamp = addReusableTimestamp() + // declaration + reusableMemberStatements.add(s"private long $fieldTerm;") + + // assignment val field = s""" - |final long $fieldTerm = $timestamp + java.util.TimeZone.getDefault().getOffset(timestamp); + |$fieldTerm = $timestamp + java.util.TimeZone.getDefault().getOffset($timestamp); |""".stripMargin reusablePerRecordStatements.add(field) fieldTerm @@ -1676,10 +1755,14 @@ abstract class CodeGenerator( val timestamp = addReusableTimestamp() + // declaration + reusableMemberStatements.add(s"private int $fieldTerm;") + + // assignment // adopted from org.apache.calcite.runtime.SqlFunctions.currentTime() val field = s""" - |final int $fieldTerm = (int) ($timestamp % ${DateTimeUtils.MILLIS_PER_DAY}); + |$fieldTerm = (int) ($timestamp % ${DateTimeUtils.MILLIS_PER_DAY}); |if (time < 0) { | time += ${DateTimeUtils.MILLIS_PER_DAY}; |} @@ -1696,10 +1779,14 @@ abstract class CodeGenerator( val localtimestamp = addReusableLocalTimestamp() + // declaration + reusableMemberStatements.add(s"private int $fieldTerm;") + + // assignment // adopted from org.apache.calcite.runtime.SqlFunctions.localTime() val field = s""" - |final int $fieldTerm = (int) ($localtimestamp % ${DateTimeUtils.MILLIS_PER_DAY}); + |$fieldTerm = (int) ($localtimestamp % ${DateTimeUtils.MILLIS_PER_DAY}); |""".stripMargin reusablePerRecordStatements.add(field) fieldTerm @@ -1715,10 +1802,14 @@ abstract class CodeGenerator( val timestamp = addReusableTimestamp() val time = addReusableTime() + // declaration + reusableMemberStatements.add(s"private int $fieldTerm;") + + // assignment // adopted from org.apache.calcite.runtime.SqlFunctions.currentDate() val field = s""" - |final int $fieldTerm = (int) ($timestamp / ${DateTimeUtils.MILLIS_PER_DAY}); + |$fieldTerm = (int) ($timestamp / ${DateTimeUtils.MILLIS_PER_DAY}); |if ($time < 0) { | $fieldTerm -= 1; |} http://git-wip-us.apache.org/repos/asf/flink/blob/7cf56bc5/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CollectorCodeGenerator.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CollectorCodeGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CollectorCodeGenerator.scala index 70f6638..9fc76e3 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CollectorCodeGenerator.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CollectorCodeGenerator.scala @@ -61,38 +61,54 @@ class CollectorCodeGenerator( * @return instance of GeneratedCollector */ def generateTableFunctionCollector( - name: String, - bodyCode: String, - collectedType: TypeInformation[Any]) - : GeneratedCollector = { + name: String, + bodyCode: String, + collectedType: TypeInformation[Any]) + : GeneratedCollector = { val className = newName(name) val input1TypeClass = boxedTypeTermForTypeInfo(input1) val input2TypeClass = boxedTypeTermForTypeInfo(collectedType) - val funcCode = j""" - public class $className extends ${classOf[TableFunctionCollector[_]].getCanonicalName} { - - ${reuseMemberCode()} + // declaration in case of code splits + val recordMember = if (hasCodeSplits) { + s"private $input2TypeClass $input2Term;" + } else { + "" + } - public $className() throws Exception { - ${reuseInitCode()} - } + // assignment in case of code splits + val recordAssignment = if (hasCodeSplits) { + s"$input2Term" // use member + } else { + s"$input2TypeClass $input2Term" // local variable + } - @Override - public void collect(Object record) throws Exception { - super.collect(record); - $input1TypeClass $input1Term = ($input1TypeClass) getInput(); - $input2TypeClass $input2Term = ($input2TypeClass) record; - ${reuseInputUnboxingCode()} - $bodyCode - } - - @Override - public void close() { - } - } - """.stripMargin + val funcCode = j""" + |public class $className extends ${classOf[TableFunctionCollector[_]].getCanonicalName} { + | + | $recordMember + | ${reuseMemberCode()} + | + | public $className() throws Exception { + | ${reuseInitCode()} + | } + | + | @Override + | public void collect(Object record) throws Exception { + | super.collect(record); + | $input1TypeClass $input1Term = ($input1TypeClass) getInput(); + | $recordAssignment = ($input2TypeClass) record; + | ${reuseInputUnboxingCode()} + | ${reusePerRecordCode()} + | $bodyCode + | } + | + | @Override + | public void close() { + | } + |} + |""".stripMargin GeneratedCollector(className, funcCode) } http://git-wip-us.apache.org/repos/asf/flink/blob/7cf56bc5/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/FunctionCodeGenerator.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/FunctionCodeGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/FunctionCodeGenerator.scala index 2bd2fe7..8ac18cd 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/FunctionCodeGenerator.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/FunctionCodeGenerator.scala @@ -24,6 +24,7 @@ import org.apache.flink.streaming.api.functions.ProcessFunction import org.apache.flink.table.api.TableConfig import org.apache.flink.table.codegen.CodeGenUtils.{boxedTypeTermForTypeInfo, newName} import org.apache.flink.table.codegen.Indenter.toISC +import org.apache.flink.util.Collector /** * A code generator for generating Flink [[org.apache.flink.api.common.functions.Function]]s. @@ -85,22 +86,23 @@ class FunctionCodeGenerator( * @return instance of GeneratedFunction */ def generateFunction[F <: Function, T <: Any]( - name: String, - clazz: Class[F], - bodyCode: String, - returnType: TypeInformation[T]) - : GeneratedFunction[F, T] = { + name: String, + clazz: Class[F], + bodyCode: String, + returnType: TypeInformation[T]) + : GeneratedFunction[F, T] = { val funcName = newName(name) + val collectorTypeTerm = classOf[Collector[Any]].getCanonicalName // Janino does not support generics, that's why we need // manual casting here - val samHeader = + val (functionClass, signature, inputStatements) = // FlatMapFunction if (clazz == classOf[FlatMapFunction[_, _]]) { val baseClass = classOf[RichFlatMapFunction[_, _]] val inputTypeTerm = boxedTypeTermForTypeInfo(input1) (baseClass, - s"void flatMap(Object _in1, org.apache.flink.util.Collector $collectorTerm)", + s"void flatMap(Object _in1, $collectorTypeTerm $collectorTerm)", List(s"$inputTypeTerm $input1Term = ($inputTypeTerm) _in1;")) } @@ -120,7 +122,7 @@ class FunctionCodeGenerator( val inputTypeTerm2 = boxedTypeTermForTypeInfo(input2.getOrElse( throw new CodeGenException("Input 2 for FlatJoinFunction should not be null"))) (baseClass, - s"void join(Object _in1, Object _in2, org.apache.flink.util.Collector $collectorTerm)", + s"void join(Object _in1, Object _in2, $collectorTypeTerm $collectorTerm)", List(s"$inputTypeTerm1 $input1Term = ($inputTypeTerm1) _in1;", s"$inputTypeTerm2 $input2Term = ($inputTypeTerm2) _in2;")) } @@ -141,11 +143,22 @@ class FunctionCodeGenerator( else if (clazz == classOf[ProcessFunction[_, _]]) { val baseClass = classOf[ProcessFunction[_, _]] val inputTypeTerm = boxedTypeTermForTypeInfo(input1) + val contextTypeTerm = classOf[ProcessFunction[Any, Any]#Context].getCanonicalName + + // make context accessible also for split code + val globalContext = if (hasCodeSplits) { + // declaration + reusableMemberStatements.add(s"private $contextTypeTerm $contextTerm;") + // assignment + List(s"this.$contextTerm = $contextTerm;") + } else { + Nil + } + (baseClass, - s"void processElement(Object _in1, " + - s"org.apache.flink.streaming.api.functions.ProcessFunction.Context $contextTerm," + - s"org.apache.flink.util.Collector $collectorTerm)", - List(s"$inputTypeTerm $input1Term = ($inputTypeTerm) _in1;")) + s"void processElement(Object _in1, $contextTypeTerm $contextTerm, " + + s"$collectorTypeTerm $collectorTerm)", + List(s"$inputTypeTerm $input1Term = ($inputTypeTerm) _in1;") ++ globalContext) } else { // TODO more functions @@ -153,36 +166,35 @@ class FunctionCodeGenerator( } val funcCode = j""" - public class $funcName - extends ${samHeader._1.getCanonicalName} { - - ${reuseMemberCode()} - - public $funcName() throws Exception { - ${reuseInitCode()} - } - - ${reuseConstructorCode(funcName)} - - @Override - public void open(${classOf[Configuration].getCanonicalName} parameters) throws Exception { - ${reuseOpenCode()} - } - - @Override - public ${samHeader._2} throws Exception { - ${samHeader._3.mkString("\n")} - ${reusePerRecordCode()} - ${reuseInputUnboxingCode()} - $bodyCode - } - - @Override - public void close() throws Exception { - ${reuseCloseCode()} - } - } - """.stripMargin + |public class $funcName extends ${functionClass.getCanonicalName} { + | + | ${reuseMemberCode()} + | + | public $funcName() throws Exception { + | ${reuseInitCode()} + | } + | + | ${reuseConstructorCode(funcName)} + | + | @Override + | public void open(${classOf[Configuration].getCanonicalName} parameters) throws Exception { + | ${reuseOpenCode()} + | } + | + | @Override + | public $signature throws Exception { + | ${inputStatements.mkString("\n")} + | ${reuseInputUnboxingCode()} + | ${reusePerRecordCode()} + | $bodyCode + | } + | + | @Override + | public void close() throws Exception { + | ${reuseCloseCode()} + | } + |} + |""".stripMargin GeneratedFunction(funcName, returnType, funcCode) } http://git-wip-us.apache.org/repos/asf/flink/blob/7cf56bc5/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/InputFormatCodeGenerator.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/InputFormatCodeGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/InputFormatCodeGenerator.scala index 6d6e1b6..30d3300 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/InputFormatCodeGenerator.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/InputFormatCodeGenerator.scala @@ -71,12 +71,16 @@ class InputFormatCodeGenerator( } @Override - public Object nextRecord(Object reuse) { + public Object nextRecord(Object reuse) throws java.io.IOException { switch (nextIdx) { ${records.zipWithIndex.map { case (r, i) => s""" |case $i: + |try { | $r + |} catch (Exception e) { + | throw new java.io.IOException(e); + |} |break; """.stripMargin }.mkString("\n")} http://git-wip-us.apache.org/repos/asf/flink/blob/7cf56bc5/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CorrelateITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CorrelateITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CorrelateITCase.scala index 828a9e2..b385015 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CorrelateITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CorrelateITCase.scala @@ -81,6 +81,21 @@ class CorrelateITCase( TestBaseUtils.compareResultAsText(results.asJava, expected) } + @Test + def testLeftOuterJoinWithSplit(): Unit = { + val env = ExecutionEnvironment.getExecutionEnvironment + val tableEnv = TableEnvironment.getTableEnvironment(env, config) + tableEnv.getConfig.setMaxGeneratedCodeLength(1) // split every field + val in = testData(env).toTable(tableEnv).as('a, 'b, 'c) + + val func2 = new TableFunc2 + val result = in.leftOuterJoin(func2('c) as ('s, 'l)).select('c, 's, 'l).toDataSet[Row] + val results = result.collect() + val expected = "Jack#22,Jack,4\n" + "Jack#22,22,2\n" + "John#19,John,4\n" + + "John#19,19,2\n" + "Anna#44,Anna,4\n" + "Anna#44,44,2\n" + "nosharp,null,null" + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + /** * Common join predicates are temporarily forbidden (see FLINK-7865). */ http://git-wip-us.apache.org/repos/asf/flink/blob/7cf56bc5/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala index 1e2cf9c..b7950b7 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala @@ -173,7 +173,7 @@ class SqlITCase extends StreamingWithStateTestBase { val t = StreamTestData.get3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c) tEnv.registerTable("MyTable", t) - val result = tEnv.sql(sqlQuery).toRetractStream[Row] + val result = tEnv.sqlQuery(sqlQuery).toRetractStream[Row] result.addSink(new StreamITCase.RetractingSink).setParallelism(1) env.execute() @@ -208,7 +208,7 @@ class SqlITCase extends StreamingWithStateTestBase { tEnv.registerTable("MyTable", env.fromCollection(data).toTable(tEnv).as('a, 'b, 'c)) - val result = tEnv.sql(sqlQuery).toRetractStream[Row] + val result = tEnv.sqlQuery(sqlQuery).toRetractStream[Row] result.addSink(new StreamITCase.RetractingSink).setParallelism(1) env.execute() @@ -261,6 +261,27 @@ class SqlITCase extends StreamingWithStateTestBase { assertEquals(expected.sorted, StreamITCase.testResults.sorted) } + @Test + def testSelectExpressionWithSplitFromTable(): Unit = { + + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + tEnv.getConfig.setMaxGeneratedCodeLength(1) // split every field + StreamITCase.clear + + val sqlQuery = "SELECT a * 2, b - 1 FROM MyTable" + + val t = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c) + tEnv.registerTable("MyTable", t) + + val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row] + result.addSink(new StreamITCase.StringSink[Row]) + env.execute() + + val expected = List("2,0", "4,1", "6,1") + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + /** test filtering with registered table **/ @Test def testSimpleFilter(): Unit = { @@ -580,7 +601,7 @@ class SqlITCase extends StreamingWithStateTestBase { tEnv.registerTable("T1", t1) - val result = tEnv.sql(sqlQuery).toAppendStream[Row] + val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row] result.addSink(new StreamITCase.StringSink[Row]) env.execute() @@ -638,6 +659,33 @@ class SqlITCase extends StreamingWithStateTestBase { "3,3300") assertEquals(expected.sorted, StreamITCase.testResults.sorted) } + + @Test + def testVeryBigQuery(): Unit = { + + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + StreamITCase.clear + + val t = StreamTestData.getSingletonDataStream(env).toTable(tEnv).as('a, 'b, 'c) + tEnv.registerTable("MyTable", t) + + val sqlQuery = new StringBuilder + sqlQuery.append("SELECT ") + val expected = new StringBuilder + for (i <- 0 until 500) { + sqlQuery.append(s"a + b + $i, ") + expected.append((1 + 42L + i).toString + ",") + } + sqlQuery.append("c FROM MyTable") + expected.append("Hi") + + val result = tEnv.sqlQuery(sqlQuery.toString()).toAppendStream[Row] + result.addSink(new StreamITCase.StringSink[Row]) + env.execute() + + assertEquals(List(expected.toString()), StreamITCase.testResults.sorted) + } } object SqlITCase { http://git-wip-us.apache.org/repos/asf/flink/blob/7cf56bc5/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/StreamTestData.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/StreamTestData.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/StreamTestData.scala index 58d3c63..ef98791 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/StreamTestData.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/StreamTestData.scala @@ -25,6 +25,12 @@ import scala.collection.mutable object StreamTestData { + def getSingletonDataStream(env: StreamExecutionEnvironment): DataStream[(Int, Long, String)] = { + val data = new mutable.MutableList[(Int, Long, String)] + data.+=((1, 42L, "Hi")) + env.fromCollection(data) + } + def getSmall3TupleDataStream(env: StreamExecutionEnvironment): DataStream[(Int, Long, String)] = { val data = new mutable.MutableList[(Int, Long, String)] data.+=((1, 1L, "Hi")) http://git-wip-us.apache.org/repos/asf/flink/blob/7cf56bc5/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/StreamingWithStateTestBase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/StreamingWithStateTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/StreamingWithStateTestBase.scala index 5cfab4a..b3eeb59 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/StreamingWithStateTestBase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/StreamingWithStateTestBase.scala @@ -18,6 +18,7 @@ package org.apache.flink.table.runtime.utils import org.apache.flink.contrib.streaming.state.RocksDBStateBackend +import org.apache.flink.runtime.state.StateBackend import org.apache.flink.test.util.AbstractTestBase import org.junit.Rule import org.junit.rules.TemporaryFolder @@ -29,7 +30,7 @@ class StreamingWithStateTestBase extends AbstractTestBase { @Rule def tempFolder: TemporaryFolder = _tempFolder - def getStateBackend: RocksDBStateBackend = { + def getStateBackend: StateBackend = { val dbPath = tempFolder.newFolder().getAbsolutePath val checkpointPath = tempFolder.newFolder().toURI.toString val backend = new RocksDBStateBackend(checkpointPath)