This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new e78d355 [FLINK-23267][table-planner] Enable Java code splitting for
all generated classes
e78d355 is described below
commit e78d3551c5048c85f38d1e1c0327c341b44f85fa
Author: tsreaper <[email protected]>
AuthorDate: Mon Jul 12 10:30:47 2021 +0800
[FLINK-23267][table-planner] Enable Java code splitting for all generated
classes
This closes #16349
---
.../generated/table_config_configuration.html | 4 +-
.../org/apache/flink/table/api/TableConfig.java | 13 ++-
.../flink/table/api/config/TableConfigOptions.java | 14 ++-
flink-table/flink-table-code-splitter/pom.xml | 26 ++++-
.../src/main/resources/META-INF/NOTICE | 2 +-
.../{LICENSE.antlr => LICENSE.antlr-runtime} | 0
flink-table/flink-table-planner/pom.xml | 1 +
.../abilities/source/WatermarkPushDownSpec.java | 3 +-
.../planner/codegen/CollectorCodeGenerator.scala | 6 +-
.../planner/codegen/EqualiserCodeGenerator.scala | 3 +-
.../planner/codegen/FunctionCodeGenerator.scala | 6 +-
.../table/planner/codegen/HashCodeGenerator.scala | 3 +-
.../planner/codegen/InputFormatCodeGenerator.scala | 3 +-
.../planner/codegen/LookupJoinCodeGenerator.scala | 6 +-
.../table/planner/codegen/MatchCodeGenerator.scala | 5 +-
.../planner/codegen/OperatorCodeGenerator.scala | 6 +-
.../planner/codegen/ProjectionCodeGenerator.scala | 3 +-
.../codegen/WatermarkGeneratorCodeGenerator.scala | 4 +-
.../codegen/agg/AggsHandlerCodeGenerator.scala | 11 +-
...ltiFieldRangeBoundComparatorCodeGenerator.scala | 4 +-
.../over/RangeBoundComparatorCodeGenerator.scala | 4 +-
.../codegen/sort/ComparatorCodeGenerator.scala | 3 +-
.../planner/codegen/sort/SortCodeGenerator.scala | 2 +-
.../runtime/batch/sql/CodeSplitITCase.scala | 112 +++++++++++++++++++++
flink-table/flink-table-runtime/pom.xml | 6 ++
.../generated/GeneratedAggsHandleFunction.java | 13 ++-
.../table/runtime/generated/GeneratedClass.java | 30 +++++-
.../runtime/generated/GeneratedCollector.java | 15 ++-
.../table/runtime/generated/GeneratedFunction.java | 15 ++-
.../runtime/generated/GeneratedHashFunction.java | 10 +-
.../table/runtime/generated/GeneratedInput.java | 8 +-
.../runtime/generated/GeneratedJoinCondition.java | 16 ++-
.../GeneratedNamespaceAggsHandleFunction.java | 13 ++-
.../GeneratedNamespaceTableAggsHandleFunction.java | 13 ++-
.../generated/GeneratedNormalizedKeyComputer.java | 15 ++-
.../table/runtime/generated/GeneratedOperator.java | 9 +-
.../runtime/generated/GeneratedProjection.java | 16 ++-
.../generated/GeneratedRecordComparator.java | 16 ++-
.../generated/GeneratedRecordEqualiser.java | 16 ++-
.../runtime/generated/GeneratedResultFuture.java | 15 ++-
.../GeneratedTableAggsHandleFunction.java | 9 +-
.../generated/GeneratedWatermarkGenerator.java | 11 +-
42 files changed, 409 insertions(+), 81 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/table_config_configuration.html
b/docs/layouts/shortcodes/generated/table_config_configuration.html
index 1bd84f8..5d24c14 100644
--- a/docs/layouts/shortcodes/generated/table_config_configuration.html
+++ b/docs/layouts/shortcodes/generated/table_config_configuration.html
@@ -22,9 +22,9 @@
</tr>
<tr>
<td><h5>table.generated-code.max-length</h5><br> <span
class="label label-primary">Batch</span> <span class="label
label-primary">Streaming</span></td>
- <td style="word-wrap: break-word;">64000</td>
+ <td style="word-wrap: break-word;">4000</td>
<td>Integer</td>
- <td>Specifies a threshold where generated code will be split into
sub-function calls. Java has a maximum method length of 64 KB. This setting
allows for finer granularity if necessary.</td>
+ <td>Specifies a threshold where generated code will be split into
sub-function calls. Java has a maximum method length of 64 KB. This setting
allows for finer granularity if necessary. Default value is 4000 instead of
64KB as by default JIT refuses to work on methods with more than 8K byte
code.</td>
</tr>
<tr>
<td><h5>table.local-time-zone</h5><br> <span class="label
label-primary">Batch</span> <span class="label
label-primary">Streaming</span></td>
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableConfig.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableConfig.java
index e1084bd..3493b4f 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableConfig.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableConfig.java
@@ -231,16 +231,21 @@ public class TableConfig {
/**
* Returns the current threshold where generated code will be split into
sub-function calls.
* Java has a maximum method length of 64 KB. This setting allows for
finer granularity if
- * necessary. Default is 64000.
+ * necessary.
+ *
+ * <p>Default value is 4000 instead of 64KB as by default JIT refuses to
work on methods with
+ * more than 8K byte code.
*/
public Integer getMaxGeneratedCodeLength() {
return
this.configuration.getInteger(TableConfigOptions.MAX_LENGTH_GENERATED_CODE);
}
/**
- * Returns the current threshold where generated code will be split into
sub-function calls.
- * Java has a maximum method length of 64 KB. This setting allows for
finer granularity if
- * necessary. Default is 64000.
+ * Sets current threshold where generated code will be split into
sub-function calls. Java has a
+ * maximum method length of 64 KB. This setting allows for finer
granularity if necessary.
+ *
+ * <p>Default value is 4000 instead of 64KB as by default JIT refuses to
work on methods with
+ * more than 8K byte code.
*/
public void setMaxGeneratedCodeLength(Integer maxGeneratedCodeLength) {
this.configuration.setInteger(
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/TableConfigOptions.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/TableConfigOptions.java
index 3302c06..33cff53 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/TableConfigOptions.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/TableConfigOptions.java
@@ -94,8 +94,18 @@ public class TableConfigOptions {
public static final ConfigOption<Integer> MAX_LENGTH_GENERATED_CODE =
key("table.generated-code.max-length")
.intType()
- .defaultValue(64000)
+ .defaultValue(4000)
.withDescription(
"Specifies a threshold where generated code will
be split into sub-function calls. "
- + "Java has a maximum method length of 64
KB. This setting allows for finer granularity if necessary.");
+ + "Java has a maximum method length of 64
KB. This setting allows for finer granularity if necessary. "
+ + "Default value is 4000 instead of 64KB
as by default JIT refuses to work on methods with more than 8K byte code.");
+
+ @Documentation.ExcludeFromDocumentation(
+ "This option is rarely used. The default value is good enough for
almost all cases.")
+ public static final ConfigOption<Integer> MAX_MEMBERS_GENERATED_CODE =
+ key("table.generated-code.max-members")
+ .intType()
+ .defaultValue(10000)
+ .withDescription(
+ "Specifies a threshold where class members of
generated code will be grouped into arrays by types.");
}
diff --git a/flink-table/flink-table-code-splitter/pom.xml
b/flink-table/flink-table-code-splitter/pom.xml
index 8f2c1c5..0fb2dc7 100644
--- a/flink-table/flink-table-code-splitter/pom.xml
+++ b/flink-table/flink-table-code-splitter/pom.xml
@@ -29,7 +29,7 @@ under the License.
</parent>
<artifactId>flink-table-code-splitter</artifactId>
- <name>Flink : Table : Code Splitter </name>
+ <name>Flink : Table : Code Splitter</name>
<description>
This module contains a tool to split generated Java code
so that each method does not exceed the limit of 64KB.
@@ -83,6 +83,30 @@ under the License.
</execution>
</executions>
</plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>shade-flink</id>
+ <configuration>
+
<shadeTestJar>false</shadeTestJar>
+ <artifactSet>
+ <includes
combine.children="append">
+
<include>org.antlr:antlr4-runtime</include>
+ </includes>
+ </artifactSet>
+ <relocations>
+ <relocation>
+
<pattern>org.antlr.v4.runtime</pattern>
+
<shadedPattern>org.apache.flink.table.shaded.org.antlr.v4.runtime</shadedPattern>
+ </relocation>
+ </relocations>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
</plugins>
</build>
diff --git
a/flink-table/flink-table-code-splitter/src/main/resources/META-INF/NOTICE
b/flink-table/flink-table-code-splitter/src/main/resources/META-INF/NOTICE
index f75bf43..e5d304d 100644
--- a/flink-table/flink-table-code-splitter/src/main/resources/META-INF/NOTICE
+++ b/flink-table/flink-table-code-splitter/src/main/resources/META-INF/NOTICE
@@ -7,7 +7,7 @@ The Apache Software Foundation (http://www.apache.org/).
This project bundles the following dependencies under the BSD 3-clause license.
See bundled license files for details.
-- antlr:4.7
+- org.antlr:antlr4-runtime:4.7
This project bundles the following files under the BSD license.
See bundled license files for details.
diff --git
a/flink-table/flink-table-code-splitter/src/main/resources/META-INF/licenses/LICENSE.antlr
b/flink-table/flink-table-code-splitter/src/main/resources/META-INF/licenses/LICENSE.antlr-runtime
similarity index 100%
rename from
flink-table/flink-table-code-splitter/src/main/resources/META-INF/licenses/LICENSE.antlr
rename to
flink-table/flink-table-code-splitter/src/main/resources/META-INF/licenses/LICENSE.antlr-runtime
diff --git a/flink-table/flink-table-planner/pom.xml
b/flink-table/flink-table-planner/pom.xml
index 89db4ac..767641e 100644
--- a/flink-table/flink-table-planner/pom.xml
+++ b/flink-table/flink-table-planner/pom.xml
@@ -384,6 +384,7 @@ under the License.
<!--
flink-table-runtime dependencies -->
<include>org.codehaus.janino:*</include>
+
<include>org.apache.flink:flink-table-code-splitter</include>
<!--
Tools to unify display format for different languages -->
<include>com.ibm.icu:icu4j</include>
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/WatermarkPushDownSpec.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/WatermarkPushDownSpec.java
index 61f6d91..6c8dd3f 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/WatermarkPushDownSpec.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/WatermarkPushDownSpec.java
@@ -131,7 +131,8 @@ public class WatermarkPushDownSpec extends
SourceAbilitySpecBase {
new GeneratedWatermarkGenerator(
generatedWatermarkGenerator.getClassName(),
generatedWatermarkGenerator.getCode(),
- references.toArray())
+ references.toArray(),
+ configuration)
.newInstance(Thread.currentThread().getContextClassLoader());
try {
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CollectorCodeGenerator.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CollectorCodeGenerator.scala
index a6babf0..03b5e6e 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CollectorCodeGenerator.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CollectorCodeGenerator.scala
@@ -93,7 +93,8 @@ object CollectorCodeGenerator {
}
""".stripMargin
- new GeneratedCollector(funcName, funcCode, ctx.references.toArray)
+ new GeneratedCollector(
+ funcName, funcCode, ctx.references.toArray,
ctx.tableConfig.getConfiguration)
}
/**
@@ -155,7 +156,8 @@ object CollectorCodeGenerator {
}
""".stripMargin
- new GeneratedCollector(funcName, funcCode, ctx.references.toArray)
+ new GeneratedCollector(
+ funcName, funcCode, ctx.references.toArray,
ctx.tableConfig.getConfiguration)
}
def addToContext(
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/EqualiserCodeGenerator.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/EqualiserCodeGenerator.scala
index 79d8098..1d58657 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/EqualiserCodeGenerator.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/EqualiserCodeGenerator.scala
@@ -79,7 +79,8 @@ class EqualiserCodeGenerator(fieldTypes: Array[LogicalType]) {
}
""".stripMargin
- new GeneratedRecordEqualiser(className, classCode, ctx.references.toArray)
+ new GeneratedRecordEqualiser(
+ className, classCode, ctx.references.toArray,
ctx.tableConfig.getConfiguration)
}
private def getEqualsMethodName(idx: Int) = s"""equalsAtIndex$idx"""
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/FunctionCodeGenerator.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/FunctionCodeGenerator.scala
index d02ad1f..44a4c23 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/FunctionCodeGenerator.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/FunctionCodeGenerator.scala
@@ -159,7 +159,8 @@ object FunctionCodeGenerator {
}
""".stripMargin
- new GeneratedFunction(funcName, funcCode, ctx.references.toArray)
+ new GeneratedFunction(
+ funcName, funcCode, ctx.references.toArray,
ctx.tableConfig.getConfiguration)
}
/**
@@ -215,6 +216,7 @@ object FunctionCodeGenerator {
}
""".stripMargin
- new GeneratedJoinCondition(funcName, funcCode, ctx.references.toArray)
+ new GeneratedJoinCondition(
+ funcName, funcCode, ctx.references.toArray,
ctx.tableConfig.getConfiguration)
}
}
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/HashCodeGenerator.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/HashCodeGenerator.scala
index 6106e74..a925e02 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/HashCodeGenerator.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/HashCodeGenerator.scala
@@ -79,7 +79,8 @@ object HashCodeGenerator {
}
""".stripMargin
- new GeneratedHashFunction(className, code, ctx.references.toArray)
+ new GeneratedHashFunction(
+ className, code, ctx.references.toArray,
ctx.tableConfig.getConfiguration)
}
private def generateCodeBody(
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/InputFormatCodeGenerator.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/InputFormatCodeGenerator.scala
index 2996a8f..63d7c7d 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/InputFormatCodeGenerator.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/InputFormatCodeGenerator.scala
@@ -87,7 +87,8 @@ object InputFormatCodeGenerator {
}
""".stripMargin
- new GeneratedInput(funcName, funcCode, ctx.references.toArray)
+ new GeneratedInput(
+ funcName, funcCode, ctx.references.toArray,
ctx.tableConfig.getConfiguration)
}
}
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/LookupJoinCodeGenerator.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/LookupJoinCodeGenerator.scala
index 102bf59..f0615a7 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/LookupJoinCodeGenerator.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/LookupJoinCodeGenerator.scala
@@ -408,7 +408,8 @@ object LookupJoinCodeGenerator {
}
""".stripMargin
- new GeneratedCollector(funcName, funcCode, ctx.references.toArray)
+ new GeneratedCollector(
+ funcName, funcCode, ctx.references.toArray,
ctx.tableConfig.getConfiguration)
}
/**
@@ -498,7 +499,8 @@ object LookupJoinCodeGenerator {
}
""".stripMargin
- new GeneratedResultFuture(funcName, funcCode, ctx.references.toArray)
+ new GeneratedResultFuture(
+ funcName, funcCode, ctx.references.toArray,
ctx.tableConfig.getConfiguration)
}
/**
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/MatchCodeGenerator.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/MatchCodeGenerator.scala
index 673d25a..8c14843 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/MatchCodeGenerator.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/MatchCodeGenerator.scala
@@ -37,7 +37,7 @@ import
org.apache.flink.table.planner.plan.utils.MatchUtil.AggregationPatternVar
import org.apache.flink.table.runtime.dataview.PerKeyStateDataViewStore
import org.apache.flink.table.runtime.generated.GeneratedFunction
import
org.apache.flink.table.runtime.operators.`match`.{IterativeConditionRunner,
PatternProcessFunctionRunner}
-import org.apache.flink.table.types.logical.{LocalZonedTimestampType, RowType,
TimestampKind, TimestampType}
+import org.apache.flink.table.types.logical.{LocalZonedTimestampType, RowType,
TimestampKind}
import org.apache.flink.table.utils.EncodingUtils
import org.apache.flink.util.Collector
import org.apache.flink.util.MathUtils.checkedDownCast
@@ -292,7 +292,8 @@ class MatchCodeGenerator(
}
""".stripMargin
- new GeneratedFunction[F](funcName, funcCode, ctx.references.toArray)
+ new GeneratedFunction[F](
+ funcName, funcCode, ctx.references.toArray,
ctx.tableConfig.getConfiguration)
}
private def generateOneRowPerMatchExpression(
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/OperatorCodeGenerator.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/OperatorCodeGenerator.scala
index 8e3a9ed..2a10507 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/OperatorCodeGenerator.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/OperatorCodeGenerator.scala
@@ -120,7 +120,8 @@ object OperatorCodeGenerator extends Logging {
""".stripMargin
LOG.debug(s"Compiling OneInputStreamOperator Code:\n$name")
- new GeneratedOperator(operatorName, operatorCode, ctx.references.toArray)
+ new GeneratedOperator(
+ operatorName, operatorCode, ctx.references.toArray,
ctx.tableConfig.getConfiguration)
}
def generateTwoInputStreamOperator[IN1 <: Any, IN2 <: Any, OUT <: Any](
@@ -247,7 +248,8 @@ object OperatorCodeGenerator extends Logging {
""".stripMargin
LOG.debug(s"Compiling TwoInputStreamOperator Code:\n$name")
- new GeneratedOperator(operatorName, operatorCode, ctx.references.toArray)
+ new GeneratedOperator(
+ operatorName, operatorCode, ctx.references.toArray,
ctx.tableConfig.getConfiguration)
}
private def generateInputTerm(inputTypeTerm: String): String = {
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ProjectionCodeGenerator.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ProjectionCodeGenerator.scala
index bc5726b..5642705 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ProjectionCodeGenerator.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ProjectionCodeGenerator.scala
@@ -117,7 +117,8 @@ object ProjectionCodeGenerator {
|}
""".stripMargin
- new GeneratedProjection(className, code, ctx.references.toArray)
+ new GeneratedProjection(
+ className, code, ctx.references.toArray,
ctx.tableConfig.getConfiguration)
}
/**
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/WatermarkGeneratorCodeGenerator.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/WatermarkGeneratorCodeGenerator.scala
index 3f21e88..780a0e0 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/WatermarkGeneratorCodeGenerator.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/WatermarkGeneratorCodeGenerator.scala
@@ -117,7 +117,9 @@ object WatermarkGeneratorCodeGenerator {
}
}
""".stripMargin
- new GeneratedWatermarkGenerator(funcName, funcCode, ctx.references.toArray)
+
+ new GeneratedWatermarkGenerator(
+ funcName, funcCode, ctx.references.toArray,
ctx.tableConfig.getConfiguration)
}
}
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/AggsHandlerCodeGenerator.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/AggsHandlerCodeGenerator.scala
index d6493fd..9395bf0 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/AggsHandlerCodeGenerator.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/AggsHandlerCodeGenerator.scala
@@ -417,7 +417,8 @@ class AggsHandlerCodeGenerator(
}
""".stripMargin
- new GeneratedAggsHandleFunction(functionName, functionCode,
ctx.references.toArray)
+ new GeneratedAggsHandleFunction(
+ functionName, functionCode, ctx.references.toArray,
ctx.tableConfig.getConfiguration)
}
/**
@@ -568,7 +569,8 @@ class AggsHandlerCodeGenerator(
}
""".stripMargin
- new GeneratedTableAggsHandleFunction(functionName, functionCode,
ctx.references.toArray)
+ new GeneratedTableAggsHandleFunction(
+ functionName, functionCode, ctx.references.toArray,
ctx.tableConfig.getConfiguration)
}
/**
@@ -694,7 +696,8 @@ class AggsHandlerCodeGenerator(
}
""".stripMargin
- new GeneratedNamespaceAggsHandleFunction[N](functionName, functionCode,
ctx.references.toArray)
+ new GeneratedNamespaceAggsHandleFunction[N](
+ functionName, functionCode, ctx.references.toArray,
ctx.tableConfig.getConfiguration)
}
/**
@@ -847,7 +850,7 @@ class AggsHandlerCodeGenerator(
""".stripMargin
new GeneratedNamespaceTableAggsHandleFunction[N](
- functionName, functionCode, ctx.references.toArray)
+ functionName, functionCode, ctx.references.toArray,
ctx.tableConfig.getConfiguration)
}
private def genCreateAccumulators(): String = {
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/over/MultiFieldRangeBoundComparatorCodeGenerator.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/over/MultiFieldRangeBoundComparatorCodeGenerator.scala
index 1a55e87..9b101aa 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/over/MultiFieldRangeBoundComparatorCodeGenerator.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/over/MultiFieldRangeBoundComparatorCodeGenerator.scala
@@ -74,7 +74,9 @@ class MultiFieldRangeBoundComparatorCodeGenerator(
}
""".stripMargin
- new GeneratedRecordComparator(className, code, ctx.references.toArray)
+
+ new GeneratedRecordComparator(
+ className, code, ctx.references.toArray,
ctx.tableConfig.getConfiguration)
}
}
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/over/RangeBoundComparatorCodeGenerator.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/over/RangeBoundComparatorCodeGenerator.scala
index 77267a3..ff2a468 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/over/RangeBoundComparatorCodeGenerator.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/over/RangeBoundComparatorCodeGenerator.scala
@@ -23,7 +23,6 @@ import org.apache.flink.table.planner.calcite.FlinkTypeFactory
import org.apache.flink.table.planner.codegen.CodeGenUtils.{ROW_DATA, newName}
import org.apache.flink.table.planner.codegen.Indenter.toISC
import org.apache.flink.table.planner.codegen.{CodeGenUtils,
CodeGeneratorContext, ExprCodeGenerator, GenerateUtils}
-import
org.apache.flink.table.planner.plan.nodes.exec.spec.SortSpec.SortFieldSpec
import org.apache.flink.table.runtime.generated.{GeneratedRecordComparator,
RecordComparator}
import org.apache.flink.table.types.logical.{BigIntType, IntType, LogicalType,
LogicalTypeRoot, RowType}
@@ -123,7 +122,8 @@ class RangeBoundComparatorCodeGenerator(
}
""".stripMargin
- new GeneratedRecordComparator(className, code, ctx.references.toArray)
+ new GeneratedRecordComparator(
+ className, code, ctx.references.toArray,
ctx.tableConfig.getConfiguration)
}
private def getComparatorCode(inputValue: String, currentValue: String):
String = {
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/sort/ComparatorCodeGenerator.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/sort/ComparatorCodeGenerator.scala
index dad4d99..0829073 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/sort/ComparatorCodeGenerator.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/sort/ComparatorCodeGenerator.scala
@@ -74,7 +74,8 @@ object ComparatorCodeGenerator {
}
""".stripMargin
- new GeneratedRecordComparator(className, code, ctx.references.toArray)
+ new GeneratedRecordComparator(
+ className, code, ctx.references.toArray,
ctx.tableConfig.getConfiguration)
}
}
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/sort/SortCodeGenerator.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/sort/SortCodeGenerator.scala
index 4b153fd..5c61aae 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/sort/SortCodeGenerator.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/sort/SortCodeGenerator.scala
@@ -183,7 +183,7 @@ class SortCodeGenerator(
}
""".stripMargin
- new GeneratedNormalizedKeyComputer(className, code)
+ new GeneratedNormalizedKeyComputer(className, code, conf.getConfiguration)
}
def generatePutNormalizedKeys(numKeyBytes: Int): mutable.ArrayBuffer[String]
= {
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CodeSplitITCase.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CodeSplitITCase.scala
new file mode 100644
index 0000000..cfec3a7
--- /dev/null
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CodeSplitITCase.scala
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.runtime.batch.sql
+
+import org.apache.flink.core.testutils.FlinkMatchers
+import org.apache.flink.table.api.config.TableConfigOptions
+import org.apache.flink.table.planner.runtime.utils.BatchTestBase
+import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row
+import
org.apache.flink.table.planner.runtime.utils.TestData.{nullablesOfData3,
smallData3, type3}
+import org.apache.flink.types.Row
+
+import org.hamcrest.MatcherAssert
+import org.junit.{Assert, Before, Test}
+
+import scala.collection.Seq
+
+class CodeSplitITCase extends BatchTestBase {
+
+ @Before
+ override def before(): Unit = {
+ super.before()
+ registerCollection("SmallTable3", smallData3, type3, "a, b, c",
nullablesOfData3)
+ }
+
+ @Test
+ def testSelectManyColumns(): Unit = {
+ val sql = new StringBuilder("SELECT ")
+ for (i <- 1 to 1000) {
+ sql.append(s"a + $i * b, ")
+ }
+ sql.append("a, b FROM SmallTable3")
+
+ val results = new scala.collection.mutable.ArrayBuffer[Row]()
+ for ((a, b) <- Seq((1, 1), (2, 2), (3, 2))) {
+ val r = new Row(1002)
+ for (i <- 1 to 1000) {
+ r.setField(i - 1, a + i * b)
+ }
+ r.setField(1000, a)
+ r.setField(1001, b)
+ results.append(r)
+ }
+
+ runTest(sql.mkString, results)
+ }
+
+ @Test
+ def testManyOrsInCondition(): Unit = {
+ val sql = new StringBuilder("SELECT a, b FROM SmallTable3 WHERE ")
+ for (i <- 1 to 300) {
+ sql.append(s"(a + b > $i AND a * b > $i) OR ")
+ }
+ sql.append("CAST((a + b > 1 AND a * b > 1) AS VARCHAR) = 'true'")
+
+ runTest(sql.mkString, Seq(row(2, 2), row(3, 2)))
+ }
+
+ @Test
+ def testManyAggregations(): Unit = {
+ val sql = new StringBuilder("SELECT ")
+ for (i <- 1 to 300) {
+ sql.append(s"SUM(a + $i * b)")
+ if (i != 300) {
+ sql.append(", ")
+ }
+ }
+ sql.append(" FROM SmallTable3")
+
+ val result = new Row(300)
+ for (i <- 1 to 300) {
+ result.setField(i - 1, 6 + 5 * i)
+ }
+
+ runTest(sql.mkString, Seq(result))
+ }
+
+ private[flink] def runTest(sql: String, results: Seq[Row]): Unit = {
+ tEnv.getConfig.getConfiguration.setInteger(
+ TableConfigOptions.MAX_LENGTH_GENERATED_CODE, 4000)
+ tEnv.getConfig.getConfiguration.setInteger(
+ TableConfigOptions.MAX_MEMBERS_GENERATED_CODE, 10000)
+ checkResult(sql.mkString, results)
+
+ tEnv.getConfig.getConfiguration.setInteger(
+ TableConfigOptions.MAX_LENGTH_GENERATED_CODE, Int.MaxValue)
+ tEnv.getConfig.getConfiguration.setInteger(
+ TableConfigOptions.MAX_MEMBERS_GENERATED_CODE, Int.MaxValue)
+ try {
+ checkResult(sql, results)
+ Assert.fail("Expecting compiler exception")
+ } catch {
+ case e: Exception =>
+ MatcherAssert.assertThat(e, FlinkMatchers.containsMessage("grows
beyond 64 KB"))
+ }
+ }
+}
diff --git a/flink-table/flink-table-runtime/pom.xml
b/flink-table/flink-table-runtime/pom.xml
index a63cfca..260ac4d 100644
--- a/flink-table/flink-table-runtime/pom.xml
+++ b/flink-table/flink-table-runtime/pom.xml
@@ -60,6 +60,12 @@ under the License.
<dependency>
<groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-code-splitter</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedAggsHandleFunction.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedAggsHandleFunction.java
index 45a58a2..ec983f9 100644
---
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedAggsHandleFunction.java
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedAggsHandleFunction.java
@@ -18,12 +18,21 @@
package org.apache.flink.table.runtime.generated;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+
/** Describes a generated {@link AggsHandleFunction}. */
public class GeneratedAggsHandleFunction extends
GeneratedClass<AggsHandleFunction> {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 2L;
+ @VisibleForTesting
public GeneratedAggsHandleFunction(String className, String code, Object[]
references) {
- super(className, code, references);
+ super(className, code, references, new Configuration());
+ }
+
+ public GeneratedAggsHandleFunction(
+ String className, String code, Object[] references, Configuration
conf) {
+ super(className, code, references, conf);
}
}
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedClass.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedClass.java
index 039a3e0..09fbef0 100644
---
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedClass.java
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedClass.java
@@ -18,6 +18,13 @@
package org.apache.flink.table.runtime.generated;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.config.TableConfigOptions;
+import org.apache.flink.table.codesplit.JavaCodeSplitter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.io.Serializable;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -28,18 +35,30 @@ import static
org.apache.flink.util.Preconditions.checkNotNull;
*/
public abstract class GeneratedClass<T> implements Serializable {
+ private static final Logger LOG =
LoggerFactory.getLogger(GeneratedClass.class);
+
private final String className;
private final String code;
+ private final String splitCode;
private final Object[] references;
private transient Class<T> compiledClass;
- protected GeneratedClass(String className, String code, Object[]
references) {
+ protected GeneratedClass(
+ String className, String code, Object[] references, Configuration
conf) {
checkNotNull(className, "name must not be null");
checkNotNull(code, "code must not be null");
checkNotNull(references, "references must not be null");
+ checkNotNull(conf, "conf must not be null");
this.className = className;
this.code = code;
+ this.splitCode =
+ code.isEmpty()
+ ? code
+ : JavaCodeSplitter.split(
+ code,
+
conf.getInteger(TableConfigOptions.MAX_LENGTH_GENERATED_CODE),
+
conf.getInteger(TableConfigOptions.MAX_MEMBERS_GENERATED_CODE));
this.references = references;
}
@@ -74,7 +93,14 @@ public abstract class GeneratedClass<T> implements
Serializable {
public Class<T> compile(ClassLoader classLoader) {
if (compiledClass == null) {
// cache the compiled class
- compiledClass = CompileUtils.compile(classLoader, className, code);
+ try {
+ // first try to compile the split code
+ compiledClass = CompileUtils.compile(classLoader, className,
splitCode);
+ } catch (Throwable t) {
+ // compile the original code as fallback
+ LOG.warn("Failed to compile split code, falling back to
original code", t);
+ compiledClass = CompileUtils.compile(classLoader, className,
code);
+ }
}
return compiledClass;
}
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedCollector.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedCollector.java
index f69e876..4932144 100644
---
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedCollector.java
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedCollector.java
@@ -18,6 +18,8 @@
package org.apache.flink.table.runtime.generated;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;
/**
@@ -27,7 +29,12 @@ import org.apache.flink.util.Collector;
*/
public class GeneratedCollector<C extends Collector<?>> extends
GeneratedClass<C> {
- private static final long serialVersionUID = -7355875544905245676L;
+ private static final long serialVersionUID = 2L;
+
+ @VisibleForTesting
+ public GeneratedCollector(String className, String code, Object[]
references) {
+ super(className, code, references, new Configuration());
+ }
/**
* Creates a GeneratedCollector.
@@ -35,8 +42,10 @@ public class GeneratedCollector<C extends Collector<?>>
extends GeneratedClass<C
* @param className class name of the generated Collector.
* @param code code of the generated Collector.
* @param references referenced objects of the generated Collector.
+ * @param conf configuration when generating Collector.
*/
- public GeneratedCollector(String className, String code, Object[]
references) {
- super(className, code, references);
+ public GeneratedCollector(
+ String className, String code, Object[] references, Configuration
conf) {
+ super(className, code, references, conf);
}
}
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedFunction.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedFunction.java
index 84b494c..5c99a68 100644
---
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedFunction.java
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedFunction.java
@@ -18,7 +18,9 @@
package org.apache.flink.table.runtime.generated;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.configuration.Configuration;
/**
* Describes a generated {@link Function}.
@@ -27,7 +29,12 @@ import org.apache.flink.api.common.functions.Function;
*/
public class GeneratedFunction<F extends Function> extends GeneratedClass<F> {
- private static final long serialVersionUID = -7355875544905245676L;
+ private static final long serialVersionUID = 2L;
+
+ @VisibleForTesting
+ public GeneratedFunction(String className, String code, Object[]
references) {
+ super(className, code, references, new Configuration());
+ }
/**
* Creates a GeneratedFunction.
@@ -35,8 +42,10 @@ public class GeneratedFunction<F extends Function> extends
GeneratedClass<F> {
* @param className class name of the generated Function.
* @param code code of the generated Function.
* @param references referenced objects of the generated Function.
+ * @param conf configuration when generating Function.
*/
- public GeneratedFunction(String className, String code, Object[]
references) {
- super(className, code, references);
+ public GeneratedFunction(
+ String className, String code, Object[] references, Configuration
conf) {
+ super(className, code, references, conf);
}
}
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedHashFunction.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedHashFunction.java
index d8ed2d0..1896556 100644
---
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedHashFunction.java
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedHashFunction.java
@@ -18,10 +18,12 @@
package org.apache.flink.table.runtime.generated;
+import org.apache.flink.configuration.Configuration;
+
/** Describes a generated {@link HashFunction}. */
public class GeneratedHashFunction extends GeneratedClass<HashFunction> {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 2L;
/**
* Creates a GeneratedHashFunction.
@@ -29,8 +31,10 @@ public class GeneratedHashFunction extends
GeneratedClass<HashFunction> {
* @param className class name of the generated Function.
* @param code code of the generated Function.
* @param references referenced objects of the generated Function.
+ * @param conf configuration when generating Function.
*/
- public GeneratedHashFunction(String className, String code, Object[]
references) {
- super(className, code, references);
+ public GeneratedHashFunction(
+ String className, String code, Object[] references, Configuration
conf) {
+ super(className, code, references, conf);
}
}
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedInput.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedInput.java
index f1e52f4..80cb268 100644
---
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedInput.java
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedInput.java
@@ -19,6 +19,7 @@
package org.apache.flink.table.runtime.generated;
import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.configuration.Configuration;
/**
* Describes a generated {@link InputFormat}.
@@ -27,7 +28,7 @@ import org.apache.flink.api.common.io.InputFormat;
*/
public class GeneratedInput<F extends InputFormat<?, ?>> extends
GeneratedClass<F> {
- private static final long serialVersionUID = -7355875544905245676L;
+ private static final long serialVersionUID = 2L;
/**
* Creates a GeneratedInput.
@@ -35,8 +36,9 @@ public class GeneratedInput<F extends InputFormat<?, ?>>
extends GeneratedClass<
* @param className class name of the generated Function.
* @param code code of the generated Function.
* @param references referenced objects of the generated Function.
+ * @param conf configuration when generating Function.
*/
- public GeneratedInput(String className, String code, Object[] references) {
- super(className, code, references);
+ public GeneratedInput(String className, String code, Object[] references,
Configuration conf) {
+ super(className, code, references, conf);
}
}
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedJoinCondition.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedJoinCondition.java
index ed50bf9..d1c5c01 100644
---
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedJoinCondition.java
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedJoinCondition.java
@@ -18,10 +18,18 @@
package org.apache.flink.table.runtime.generated;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+
/** Describes a generated {@link JoinCondition}. */
public class GeneratedJoinCondition extends GeneratedClass<JoinCondition> {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 2L;
+
+ @VisibleForTesting
+ public GeneratedJoinCondition(String className, String code, Object[]
references) {
+ super(className, code, references, new Configuration());
+ }
/**
* Creates a GeneratedJoinCondition.
@@ -29,8 +37,10 @@ public class GeneratedJoinCondition extends
GeneratedClass<JoinCondition> {
* @param className class name of the generated JoinCondition.
* @param code code of the generated JoinCondition.
* @param references referenced objects of the generated JoinCondition.
+ * @param conf configuration when generating JoinCondition.
*/
- public GeneratedJoinCondition(String className, String code, Object[]
references) {
- super(className, code, references);
+ public GeneratedJoinCondition(
+ String className, String code, Object[] references, Configuration
conf) {
+ super(className, code, references, conf);
}
}
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedNamespaceAggsHandleFunction.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedNamespaceAggsHandleFunction.java
index 68a010a..312331b 100644
---
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedNamespaceAggsHandleFunction.java
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedNamespaceAggsHandleFunction.java
@@ -18,14 +18,23 @@
package org.apache.flink.table.runtime.generated;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+
/** Describes a generated {@link NamespaceAggsHandleFunction}. */
public class GeneratedNamespaceAggsHandleFunction<N>
extends GeneratedClass<NamespaceAggsHandleFunction<N>> {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 2L;
+ @VisibleForTesting
public GeneratedNamespaceAggsHandleFunction(
String className, String code, Object[] references) {
- super(className, code, references);
+ super(className, code, references, new Configuration());
+ }
+
+ public GeneratedNamespaceAggsHandleFunction(
+ String className, String code, Object[] references, Configuration
conf) {
+ super(className, code, references, conf);
}
}
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedNamespaceTableAggsHandleFunction.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedNamespaceTableAggsHandleFunction.java
index 6a108d2..b596e36 100644
---
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedNamespaceTableAggsHandleFunction.java
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedNamespaceTableAggsHandleFunction.java
@@ -18,14 +18,23 @@
package org.apache.flink.table.runtime.generated;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+
/** Describes a generated {@link NamespaceTableAggsHandleFunction}. */
public class GeneratedNamespaceTableAggsHandleFunction<N>
extends GeneratedClass<NamespaceTableAggsHandleFunction<N>> {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 2L;
+ @VisibleForTesting
public GeneratedNamespaceTableAggsHandleFunction(
String className, String code, Object[] references) {
- super(className, code, references);
+ super(className, code, references, new Configuration());
+ }
+
+ public GeneratedNamespaceTableAggsHandleFunction(
+ String className, String code, Object[] references, Configuration
conf) {
+ super(className, code, references, conf);
}
}
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedNormalizedKeyComputer.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedNormalizedKeyComputer.java
index 79196a4..9400c12 100644
---
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedNormalizedKeyComputer.java
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedNormalizedKeyComputer.java
@@ -18,18 +18,27 @@
package org.apache.flink.table.runtime.generated;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+
/** Describes a generated {@link NormalizedKeyComputer}. */
public class GeneratedNormalizedKeyComputer extends
GeneratedClass<NormalizedKeyComputer> {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 2L;
+
+ @VisibleForTesting
+ public GeneratedNormalizedKeyComputer(String className, String code) {
+ super(className, code, new Object[0], new Configuration());
+ }
/**
* Creates a GeneratedNormalizedKeyComputer.
*
* @param className class name of the generated class.
* @param code code of the generated class.
+ * @param conf configuration when generating the generated class.
*/
- public GeneratedNormalizedKeyComputer(String className, String code) {
- super(className, code, new Object[0]);
+ public GeneratedNormalizedKeyComputer(String className, String code,
Configuration conf) {
+ super(className, code, new Object[0], conf);
}
}
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedOperator.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedOperator.java
index 09b9015..b13398fa 100644
---
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedOperator.java
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedOperator.java
@@ -18,6 +18,7 @@
package org.apache.flink.table.runtime.generated;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.operators.StreamOperator;
/**
@@ -27,7 +28,7 @@ import
org.apache.flink.streaming.api.operators.StreamOperator;
*/
public class GeneratedOperator<C extends StreamOperator<?>> extends
GeneratedClass<C> {
- private static final long serialVersionUID = -7355875544905245676L;
+ private static final long serialVersionUID = 2L;
/**
* Creates a GeneratedOperator.
@@ -35,8 +36,10 @@ public class GeneratedOperator<C extends StreamOperator<?>>
extends GeneratedCla
* @param className class name of the generated StreamOperator.
* @param code code of the generated StreamOperator.
* @param references referenced objects of the generated StreamOperator.
+ * @param conf configuration when generating StreamOperator.
*/
- public GeneratedOperator(String className, String code, Object[]
references) {
- super(className, code, references);
+ public GeneratedOperator(
+ String className, String code, Object[] references, Configuration
conf) {
+ super(className, code, references, conf);
}
}
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedProjection.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedProjection.java
index ff702dd..20daf9b 100644
---
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedProjection.java
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedProjection.java
@@ -18,10 +18,18 @@
package org.apache.flink.table.runtime.generated;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+
/** Describes a generated {@link Projection}. */
public class GeneratedProjection extends GeneratedClass<Projection> {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 2L;
+
+ @VisibleForTesting
+ public GeneratedProjection(String className, String code, Object[]
references) {
+ super(className, code, references, new Configuration());
+ }
/**
* Creates a GeneratedProjection.
@@ -29,8 +37,10 @@ public class GeneratedProjection extends
GeneratedClass<Projection> {
* @param className class name of the generated Function.
* @param code code of the generated Function.
* @param references referenced objects of the generated Function.
+ * @param conf configuration when generating Function.
*/
- public GeneratedProjection(String className, String code, Object[]
references) {
- super(className, code, references);
+ public GeneratedProjection(
+ String className, String code, Object[] references, Configuration
conf) {
+ super(className, code, references, conf);
}
}
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedRecordComparator.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedRecordComparator.java
index 9bbba60..6e2b799 100644
---
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedRecordComparator.java
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedRecordComparator.java
@@ -18,10 +18,18 @@
package org.apache.flink.table.runtime.generated;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+
/** Describes a generated {@link RecordComparator}. */
public class GeneratedRecordComparator extends
GeneratedClass<RecordComparator> {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 2L;
+
+ @VisibleForTesting
+ public GeneratedRecordComparator(String className, String code, Object[]
references) {
+ super(className, code, references, new Configuration());
+ }
/**
* Creates a GeneratedRecordComparator.
@@ -29,8 +37,10 @@ public class GeneratedRecordComparator extends
GeneratedClass<RecordComparator>
* @param className class name of the generated class.
* @param code code of the generated class.
* @param references referenced objects of the generated class.
+ * @param conf configuration when generating the generated class.
*/
- public GeneratedRecordComparator(String className, String code, Object[]
references) {
- super(className, code, references);
+ public GeneratedRecordComparator(
+ String className, String code, Object[] references, Configuration
conf) {
+ super(className, code, references, conf);
}
}
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedRecordEqualiser.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedRecordEqualiser.java
index f266e57..1f6f020 100644
---
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedRecordEqualiser.java
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedRecordEqualiser.java
@@ -18,10 +18,18 @@
package org.apache.flink.table.runtime.generated;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+
/** Describes a generated {@link RecordEqualiser}. */
public class GeneratedRecordEqualiser extends GeneratedClass<RecordEqualiser> {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 2L;
+
+ @VisibleForTesting
+ public GeneratedRecordEqualiser(String className, String code, Object[]
references) {
+ super(className, code, references, new Configuration());
+ }
/**
* Creates a GeneratedRecordEqualiser.
@@ -29,8 +37,10 @@ public class GeneratedRecordEqualiser extends
GeneratedClass<RecordEqualiser> {
* @param className class name of the generated class.
* @param code code of the generated class.
* @param references referenced objects of the generated class.
+ * @param conf configuration when generating the generated class.
*/
- public GeneratedRecordEqualiser(String className, String code, Object[]
references) {
- super(className, code, references);
+ public GeneratedRecordEqualiser(
+ String className, String code, Object[] references, Configuration
conf) {
+ super(className, code, references, conf);
}
}
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedResultFuture.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedResultFuture.java
index fd5ca2c..3d949da 100644
---
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedResultFuture.java
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedResultFuture.java
@@ -18,6 +18,8 @@
package org.apache.flink.table.runtime.generated;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
/**
@@ -27,7 +29,12 @@ import
org.apache.flink.streaming.api.functions.async.ResultFuture;
*/
public class GeneratedResultFuture<T extends ResultFuture<?>> extends
GeneratedClass<T> {
- private static final long serialVersionUID = -7355875544905245676L;
+ private static final long serialVersionUID = 2L;
+
+ @VisibleForTesting
+ public GeneratedResultFuture(String className, String code, Object[]
references) {
+ super(className, code, references, new Configuration());
+ }
/**
* Creates a GeneratedResultFuture.
@@ -35,8 +42,10 @@ public class GeneratedResultFuture<T extends
ResultFuture<?>> extends GeneratedC
* @param className class name of the generated ResultFuture.
* @param code code of the generated ResultFuture.
* @param references referenced objects of the generated ResultFuture.
+ * @param conf configuration when generating ResultFuture.
*/
- public GeneratedResultFuture(String className, String code, Object[]
references) {
- super(className, code, references);
+ public GeneratedResultFuture(
+ String className, String code, Object[] references, Configuration
conf) {
+ super(className, code, references, conf);
}
}
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedTableAggsHandleFunction.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedTableAggsHandleFunction.java
index 395135b..6d95ab7 100644
---
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedTableAggsHandleFunction.java
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedTableAggsHandleFunction.java
@@ -18,12 +18,15 @@
package org.apache.flink.table.runtime.generated;
+import org.apache.flink.configuration.Configuration;
+
/** Describes a generated {@link TableAggsHandleFunction}. */
public class GeneratedTableAggsHandleFunction extends
GeneratedClass<TableAggsHandleFunction> {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 2L;
- public GeneratedTableAggsHandleFunction(String className, String code,
Object[] references) {
- super(className, code, references);
+ public GeneratedTableAggsHandleFunction(
+ String className, String code, Object[] references, Configuration
conf) {
+ super(className, code, references, conf);
}
}
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedWatermarkGenerator.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedWatermarkGenerator.java
index 47a7cde..e189eae 100644
---
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedWatermarkGenerator.java
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedWatermarkGenerator.java
@@ -18,12 +18,19 @@
package org.apache.flink.table.runtime.generated;
+import org.apache.flink.configuration.Configuration;
+
/** Describes a generated {@link WatermarkGenerator}. */
public class GeneratedWatermarkGenerator extends
GeneratedClass<WatermarkGenerator> {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 2L;
public GeneratedWatermarkGenerator(String className, String code, Object[]
references) {
- super(className, code, references);
+ super(className, code, references, new Configuration());
+ }
+
+ public GeneratedWatermarkGenerator(
+ String className, String code, Object[] references, Configuration
conf) {
+ super(className, code, references, conf);
}
}