This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 96a142866a0 [FLINK-33447][table-planner] Avoid CompiledPlan
recompilation during loading
96a142866a0 is described below
commit 96a142866a042598bfe85407b46b0871a7b8993a
Author: Timo Walther <[email protected]>
AuthorDate: Fri Nov 3 11:02:19 2023 +0100
[FLINK-33447][table-planner] Avoid CompiledPlan recompilation during loading
---
.../planner/plan/ExecNodeGraphInternalPlan.java | 15 ++++++++++----
.../table/planner/delegation/StreamPlanner.scala | 24 ++++++++++++----------
2 files changed, 24 insertions(+), 15 deletions(-)
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/ExecNodeGraphInternalPlan.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/ExecNodeGraphInternalPlan.java
index 7f138c6517a..b921535b8b8 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/ExecNodeGraphInternalPlan.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/ExecNodeGraphInternalPlan.java
@@ -34,17 +34,21 @@ import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.StandardOpenOption;
import java.util.List;
+import java.util.function.Supplier;
import java.util.stream.Collectors;
/** Implementation of {@link CompiledPlan} backed by an {@link ExecNodeGraph}.
*/
@Internal
public class ExecNodeGraphInternalPlan implements InternalPlan {
- private final String serializedPlan;
+ private final Supplier<String> serializedPlanSupplier;
private final ExecNodeGraph execNodeGraph;
- public ExecNodeGraphInternalPlan(String serializedPlan, ExecNodeGraph
execNodeGraph) {
- this.serializedPlan = serializedPlan;
+ private String serializedPlan;
+
+ public ExecNodeGraphInternalPlan(
+ Supplier<String> serializedPlanSupplier, ExecNodeGraph
execNodeGraph) {
+ this.serializedPlanSupplier = serializedPlanSupplier;
this.execNodeGraph = execNodeGraph;
}
@@ -54,6 +58,9 @@ public class ExecNodeGraphInternalPlan implements
InternalPlan {
@Override
public String asJsonString() {
+ if (serializedPlan == null) {
+ serializedPlan = serializedPlanSupplier.get();
+ }
return serializedPlan;
}
@@ -78,7 +85,7 @@ public class ExecNodeGraphInternalPlan implements
InternalPlan {
Files.createDirectories(file.toPath().getParent());
Files.write(
file.toPath(),
- serializedPlan.getBytes(StandardCharsets.UTF_8),
+ asJsonString().getBytes(StandardCharsets.UTF_8),
StandardOpenOption.CREATE,
StandardOpenOption.TRUNCATE_EXISTING,
StandardOpenOption.WRITE);
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala
index 1029475b748..fb32326f117 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala
@@ -22,7 +22,7 @@ import org.apache.flink.api.dag.Transformation
import org.apache.flink.configuration.ExecutionOptions
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectReader
import org.apache.flink.streaming.api.graph.StreamGraph
-import org.apache.flink.table.api.{ExplainDetail, ExplainFormat,
PlanReference, TableConfig, TableException}
+import org.apache.flink.table.api._
import org.apache.flink.table.api.PlanReference.{ContentPlanReference,
FilePlanReference, ResourcePlanReference}
import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog}
import org.apache.flink.table.delegation.{Executor, InternalPlan}
@@ -190,10 +190,12 @@ class StreamPlanner(
}
new ExecNodeGraphInternalPlan(
- JsonSerdeUtil
- .createObjectWriter(ctx)
- .withDefaultPrettyPrinter()
- .writeValueAsString(execNodeGraph),
+ // ensures that the JSON output is always normalized
+ () =>
+ JsonSerdeUtil
+ .createObjectWriter(ctx)
+ .withDefaultPrettyPrinter()
+ .writeValueAsString(execNodeGraph),
execNodeGraph)
}
@@ -204,12 +206,12 @@ class StreamPlanner(
val execGraph = translateToExecNodeGraph(optimizedRelNodes, isCompiled =
true)
afterTranslation()
- new ExecNodeGraphInternalPlan(
- JsonSerdeUtil
- .createObjectWriter(createSerdeContext)
- .withDefaultPrettyPrinter()
- .writeValueAsString(execGraph),
- execGraph)
+ val compiledJson = JsonSerdeUtil
+ .createObjectWriter(createSerdeContext)
+ .withDefaultPrettyPrinter()
+ .writeValueAsString(execGraph)
+
+ new ExecNodeGraphInternalPlan(() => compiledJson, execGraph)
}
override def translatePlan(plan: InternalPlan): util.List[Transformation[_]]
= {