dawidwys opened a new pull request, #24834:
URL: https://github.com/apache/flink/pull/24834
## What is the purpose of the change
Make the `BlockStatementGrouper` work with less memory for complex `CASE
WHEN` statements.
## Verifying this change
1. All existing tests pass
2. If you run the added test with complex `CASE WHEN` with `-Xmx128mb` the
test quickly fails with:
```
java.lang.RuntimeException: JavaCodeSplitter failed. This is a bug. Please
file an issue.
at
org.apache.flink.table.codesplit.JavaCodeSplitter.split(JavaCodeSplitter.java:37)
at
org.apache.flink.table.runtime.generated.GeneratedClass.<init>(GeneratedClass.java:58)
at
org.apache.flink.table.runtime.generated.GeneratedOperator.<init>(GeneratedOperator.java:43)
at
org.apache.flink.table.planner.codegen.OperatorCodeGenerator$.generateOneInputStreamOperator(OperatorCodeGenerator.scala:130)
at
org.apache.flink.table.planner.codegen.CalcCodeGenerator$.generateCalcOperator(CalcCodeGenerator.scala:60)
at
org.apache.flink.table.planner.codegen.CalcCodeGenerator.generateCalcOperator(CalcCodeGenerator.scala)
at
org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecCalc.translateToPlanInternal(CommonExecCalc.java:100)
at
org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:167)
at
org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:258)
at
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.java:178)
at
org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:167)
at
org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:85)
at
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
at scala.collection.Iterator.foreach(Iterator.scala:937)
at scala.collection.Iterator.foreach$(Iterator.scala:937)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
at scala.collection.IterableLike.foreach(IterableLike.scala:70)
at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike.map(TraversableLike.scala:233)
at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at
org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:84)
at
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:188)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1277)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:1055)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1120)
at
org.apache.flink.table.api.internal.TableImpl.execute(TableImpl.java:477)
at
org.apache.flink.table.planner.functions.BuiltInFunctionTestBase$ResultTestItem.test(BuiltInFunctionTestBase.java:376)
at
org.apache.flink.table.planner.functions.BuiltInFunctionTestBase$TestSetSpec.lambda$getTestCase$4(BuiltInFunctionTestBase.java:336)
at
org.apache.flink.table.planner.functions.BuiltInFunctionTestBase$TestCase.execute(BuiltInFunctionTestBase.java:115)
at
org.apache.flink.table.planner.functions.BuiltInFunctionTestBase.test(BuiltInFunctionTestBase.java:95)
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at
org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:727)
at
org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
at
org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
at
org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156)
at
org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:147)
at
org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestTemplateMethod(TimeoutExtension.java:94)
at
org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:103)
at
org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93)
at
org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
at
org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
at
org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
at
org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
at
org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:92)
at
org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:86)
at
org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:217)
at
org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at
org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:213)
at
org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:138)
at
org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:68)
at
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:151)
at
org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
at
org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
at
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
at
org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at
org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
at
org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
at
org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:35)
at
org.junit.platform.engine.support.hierarchical.NodeTestTask$DefaultDynamicTestExecutor.execute(NodeTestTask.java:226)
at
org.junit.platform.engine.support.hierarchical.NodeTestTask$DefaultDynamicTestExecutor.execute(NodeTestTask.java:204)
at
org.junit.jupiter.engine.descriptor.TestTemplateTestDescriptor.execute(TestTemplateTestDescriptor.java:142)
at
org.junit.jupiter.engine.descriptor.TestTemplateTestDescriptor.lambda$execute$2(TestTemplateTestDescriptor.java:110)
at
java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
at
java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
at
java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:177)
at
java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
at
java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
at
java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
at
java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
at
java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
at
java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
at
java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
at
java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
at
java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
at
java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
at
java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655)
at
java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
at
java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
at
java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
at
java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
at
java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at
java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497)
at
java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:274)
at
java.base/java.util.stream.IntPipeline$1$1.accept(IntPipeline.java:180)
at
java.base/java.util.stream.Streams$RangeIntSpliterator.forEachRemaining(Streams.java:104)
at
java.base/java.util.Spliterator$OfInt.forEachRemaining(Spliterator.java:699)
at
java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
at
java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
at
java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
at
java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
at
java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at
java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497)
at
java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:274)
at
java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
at
java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
at
java.base/java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
at
java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
at
java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
at
java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
at
java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
at
java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at
java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497)
at
java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:274)
at
java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
at
java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
at
java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
at
java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655)
at
java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
at
java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
at
java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
at
java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
at
java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at
java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497)
at
java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:274)
at
java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655)
at
java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
at
java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
at
java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
at
java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
at
java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at
java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497)
at
org.junit.jupiter.engine.descriptor.TestTemplateTestDescriptor.execute(TestTemplateTestDescriptor.java:110)
at
org.junit.jupiter.engine.descriptor.TestTemplateTestDescriptor.execute(TestTemplateTestDescriptor.java:44)
at
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:151)
at
org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
at
org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
at
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
at
org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at
org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
at
org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1541)
at
org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)
at
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155)
at
org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
at
org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
at
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
at
org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at
org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
at
org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1541)
at
org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)
at
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155)
at
org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
at
org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
at
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
at
org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at
org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
at
org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
at
org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:35)
at
org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)
at
org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:54)
at
org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:147)
at
org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:127)
at
org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:90)
at
org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:55)
at
org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:102)
at
org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:54)
at
org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:114)
at
org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:86)
at
org.junit.platform.launcher.core.DefaultLauncherSession$DelegatingLauncher.execute(DefaultLauncherSession.java:86)
at
org.junit.platform.launcher.core.SessionPerRequestLauncher.execute(SessionPerRequestLauncher.java:53)
at
com.intellij.junit5.JUnit5IdeaTestRunner.startRunnerWithArgs(JUnit5IdeaTestRunner.java:57)
at
com.intellij.rt.junit.IdeaTestRunner$Repeater$1.execute(IdeaTestRunner.java:38)
at
com.intellij.rt.execution.junit.TestsRepeater.repeat(TestsRepeater.java:11)
at
com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:35)
at
com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:232)
at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:55)
Caused by: java.lang.OutOfMemoryError: Java heap space
```
After the change it works fine:

## Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): (yes / **no**)
- The public API, i.e., is any changed class annotated with
`@Public(Evolving)`: (yes / **no**)
- The serializers: (yes / **no** / don't know)
- The runtime per-record code paths (performance sensitive): (yes / **no**
/ don't know)
- Anything that affects deployment or recovery: JobManager (and its
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / **no** / don't
know)
- The S3 file system connector: (yes / **no** / don't know)
## Documentation
- Does this pull request introduce a new feature? (yes / **no**)
- If yes, how is the feature documented? (**not applicable** / docs /
JavaDocs / not documented)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]