This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.10 by this push:
new 03eeb11 [FLINK-15430][table-planner-blink] Split generated methods
for Calc to prevent compiler exceptions (#10737)
03eeb11 is described below
commit 03eeb117b067c767576553040ac950ff6ba3249d
Author: Benchao Li <[email protected]>
AuthorDate: Mon Jan 6 19:50:00 2020 +0800
[FLINK-15430][table-planner-blink] Split generated methods for Calc to
prevent compiler exceptions (#10737)
---
.../table/planner/codegen/CalcCodeGenerator.scala | 9 ++++---
.../planner/codegen/CodeGeneratorContext.scala | 31 +++++++++++++++++++---
.../table/planner/codegen/ExprCodeGenerator.scala | 31 ++++++++++++++++++----
.../planner/codegen/GeneratedExpression.scala | 4 +--
.../codegen/agg/batch/HashAggCodeGenHelper.scala | 3 ++-
.../planner/runtime/stream/sql/CalcITCase.scala | 24 +++++++++++++++++
6 files changed, 88 insertions(+), 14 deletions(-)
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CalcCodeGenerator.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CalcCodeGenerator.scala
index 2ab0223..ef65f2f 100644
---
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CalcCodeGenerator.scala
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CalcCodeGenerator.scala
@@ -56,7 +56,8 @@ object CalcCodeGenerator {
calcProgram,
condition,
eagerInputUnboxingCode = true,
- retainHeader = retainHeader)
+ retainHeader = retainHeader,
+ allowSplit = true)
val genOperator =
OperatorCodeGenerator.generateOneInputStreamOperator[BaseRow, BaseRow](
@@ -119,7 +120,8 @@ object CalcCodeGenerator {
collectorTerm: String = CodeGenUtils.DEFAULT_OPERATOR_COLLECTOR_TERM,
eagerInputUnboxingCode: Boolean,
retainHeader: Boolean = false,
- outputDirectly: Boolean = false): String = {
+ outputDirectly: Boolean = false,
+ allowSplit: Boolean = false): String = {
val projection = calcProgram.getProjectList.map(calcProgram.expandLocalRef)
val exprGenerator = new ExprCodeGenerator(ctx, false)
@@ -151,7 +153,8 @@ object CalcCodeGenerator {
exprGenerator.generateResultExpression(
projectionExprs,
outRowType,
- outRowClass)
+ outRowClass,
+ allowSplit = allowSplit)
}
val projectionExpressionCode = projectionExpression.code
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CodeGeneratorContext.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CodeGeneratorContext.scala
index cc7d681..e624a1a 100644
---
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CodeGeneratorContext.scala
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CodeGeneratorContext.scala
@@ -108,6 +108,11 @@ class CodeGeneratorContext(val tableConfig: TableConfig) {
*/
private var currentMethodNameForLocalVariables = "DEFAULT"
+ /**
+ * Flag that indicates whether the generated code is split into several
methods.
+ */
+ private var isCodeSplit = false
+
// map of local variable statements. It will be placed in method if method
code not excess
// max code length, otherwise will be placed in member area of the class.
The statements
// are maintained for multiple methods, so that it's a map from method_name
to variables.
@@ -143,6 +148,13 @@ class CodeGeneratorContext(val tableConfig: TableConfig) {
reusableLocalVariableStatements(methodName) =
mutable.LinkedHashSet[String]()
}
+ /**
+ * Set the flag [[isCodeSplit]] to be true, which indicates the generated
code is split into
+ * several methods.
+ */
+ def setCodeSplit(): Unit = {
+ isCodeSplit = true
+ }
/**
* Adds a reusable local variable statement with the given type term and
field name.
@@ -197,7 +209,15 @@ class CodeGeneratorContext(val tableConfig: TableConfig) {
* (e.g. member variables and their initialization)
*/
def reuseMemberCode(): String = {
- reusableMemberStatements.mkString("\n")
+ val result = reusableMemberStatements.mkString("\n")
+ if (isCodeSplit) {
+ val localVariableAsMember = reusableLocalVariableStatements.map(
+ statements => statements._2.map("private " + _).mkString("\n")
+ ).mkString("\n")
+ result + "\n" + localVariableAsMember
+ } else {
+ result
+ }
}
/**
@@ -205,7 +225,9 @@ class CodeGeneratorContext(val tableConfig: TableConfig) {
* if generated code is split or in local variables of method
*/
def reuseLocalVariableCode(methodName: String = null): String = {
- if (methodName == null) {
+ if (isCodeSplit) {
+ GeneratedExpression.NO_CODE
+ } else if (methodName == null) {
reusableLocalVariableStatements(currentMethodNameForLocalVariables).mkString("\n")
} else {
reusableLocalVariableStatements(methodName).mkString("\n")
@@ -408,9 +430,12 @@ class CodeGeneratorContext(val tableConfig: TableConfig) {
*/
def addReusableTimestamp(): String = {
val fieldTerm = s"timestamp"
+
+ reusableMemberStatements.add(s"private $SQL_TIMESTAMP $fieldTerm;")
+
val field =
s"""
- |final $SQL_TIMESTAMP $fieldTerm =
+ |$fieldTerm =
|
$SQL_TIMESTAMP.fromEpochMillis(java.lang.System.currentTimeMillis());
|""".stripMargin
reusablePerRecordStatements.add(field)
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala
index 5a57705..1c60918 100644
---
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala
@@ -224,10 +224,11 @@ class ExprCodeGenerator(ctx: CodeGeneratorContext,
nullableInput: Boolean)
outRow: String = DEFAULT_OUT_RECORD_TERM,
outRowWriter: Option[String] = Some(DEFAULT_OUT_RECORD_WRITER_TERM),
reusedOutRow: Boolean = true,
- outRowAlreadyExists: Boolean = false): GeneratedExpression = {
+ outRowAlreadyExists: Boolean = false,
+ allowSplit: Boolean = false): GeneratedExpression = {
val fieldExprIdxToOutputRowPosMap = fieldExprs.indices.map(i => i ->
i).toMap
generateResultExpression(fieldExprs, fieldExprIdxToOutputRowPosMap,
returnType,
- returnTypeClazz, outRow, outRowWriter, reusedOutRow, outRowAlreadyExists)
+ returnTypeClazz, outRow, outRowWriter, reusedOutRow,
outRowAlreadyExists, allowSplit)
}
/**
@@ -253,7 +254,8 @@ class ExprCodeGenerator(ctx: CodeGeneratorContext,
nullableInput: Boolean)
outRow: String,
outRowWriter: Option[String],
reusedOutRow: Boolean,
- outRowAlreadyExists: Boolean)
+ outRowAlreadyExists: Boolean,
+ allowSplit: Boolean)
: GeneratedExpression = {
// initial type check
if (returnType.getFieldCount != fieldExprs.length) {
@@ -285,11 +287,30 @@ class ExprCodeGenerator(ctx: CodeGeneratorContext,
nullableInput: Boolean)
case _ => // ok
}
- val setFieldsCode = fieldExprs.zipWithIndex.map { case (fieldExpr, index)
=>
+ val setFieldsCodes = fieldExprs.zipWithIndex.map { case (fieldExpr, index)
=>
val pos = fieldExprIdxToOutputRowPosMap.getOrElse(index,
throw new CodeGenException(s"Illegal field expr index: $index"))
baseRowSetField(ctx, returnTypeClazz, outRow, pos.toString, fieldExpr,
outRowWriter)
- }.mkString("\n")
+ }
+ val totalLen = setFieldsCodes.map(_.length).sum
+ val maxCodeLength = ctx.tableConfig.getMaxGeneratedCodeLength
+ val setFieldsCode = if (allowSplit && totalLen > maxCodeLength) {
+ // do the split.
+ ctx.setCodeSplit()
+ setFieldsCodes.map(project => {
+ val methodName = newName("split")
+ val method =
+ s"""
+ |private void $methodName() throws Exception {
+ | $project
+ |}
+ |""".stripMargin
+ ctx.addReusableMember(method)
+ s"$methodName();"
+ }).mkString("\n")
+ } else {
+ setFieldsCodes.mkString("\n")
+ }
val outRowInitCode = if (!outRowAlreadyExists) {
val initCode = generateRecordStatement(returnType, returnTypeClazz,
outRow, outRowWriter)
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/GeneratedExpression.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/GeneratedExpression.scala
index 90eddc8..aaec539 100644
---
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/GeneratedExpression.scala
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/GeneratedExpression.scala
@@ -77,14 +77,14 @@ case class GeneratedExpression(
def deepCopy(ctx: CodeGeneratorContext): GeneratedExpression = {
// only copy when type is mutable
if (TypeCheckUtils.isMutable(resultType)) {
- val newResultTerm = newName("field")
// if the type need copy, it must be a boxed type
val typeTerm = boxedTypeTermForType(resultType)
val serTerm = ctx.addReusableTypeSerializer(resultType)
+ val newResultTerm = ctx.addReusableLocalVariable(typeTerm, "field")
val newCode =
s"""
|$code
- |$typeTerm $newResultTerm = $resultTerm;
+ |$newResultTerm = $resultTerm;
|if (!$nullTerm) {
| $newResultTerm = ($typeTerm) ($serTerm.copy($newResultTerm));
|}
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenHelper.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenHelper.scala
index 098adbf..1d1a176 100644
---
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenHelper.scala
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenHelper.scala
@@ -405,7 +405,8 @@ object HashAggCodeGenHelper {
outRow = currentAggBufferTerm,
outRowWriter = None,
reusedOutRow = true,
- outRowAlreadyExists = true
+ outRowAlreadyExists = true,
+ allowSplit = false
)
}
diff --git
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala
index fbb37fa..dea17c7 100644
---
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala
+++
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala
@@ -260,4 +260,28 @@ class CalcITCase extends StreamingTestBase {
val expected = List("2,2,Hello", "3,2,Hello world")
assertEquals(expected.sorted, sink.getAppendResults.sorted)
}
+
+ @Test
+ def testLongProjectionList(): Unit = {
+
+ val t = env.fromCollection(TestData.smallTupleData3)
+ .toTable(tEnv, 'a, 'b, 'c)
+ tEnv.createTemporaryView("MyTable", t)
+
+ val selectList = Stream.range(3, 200)
+ .map(i => s"CASE WHEN a IS NOT NULL AND a > $i THEN 0 WHEN a < 0 THEN 0
ELSE $i END")
+ .mkString(",")
+ val sqlQuery = s"select $selectList from MyTable"
+
+ val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+ val sink = new TestingAppendSink
+ result.addSink(sink)
+ env.execute()
+
+ val expected = Stream.range(3, 200).map(_.toString).mkString(",")
+ assertEquals(sink.getAppendResults.size, TestData.smallTupleData3.size)
+ sink.getAppendResults.foreach( result =>
+ assertEquals(expected, result)
+ )
+ }
}