This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 96a142866a0 [FLINK-33447][table-planner] Avoid CompiledPlan 
recompilation during loading
96a142866a0 is described below

commit 96a142866a042598bfe85407b46b0871a7b8993a
Author: Timo Walther <[email protected]>
AuthorDate: Fri Nov 3 11:02:19 2023 +0100

    [FLINK-33447][table-planner] Avoid CompiledPlan recompilation during loading
---
 .../planner/plan/ExecNodeGraphInternalPlan.java    | 15 ++++++++++----
 .../table/planner/delegation/StreamPlanner.scala   | 24 ++++++++++++----------
 2 files changed, 24 insertions(+), 15 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/ExecNodeGraphInternalPlan.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/ExecNodeGraphInternalPlan.java
index 7f138c6517a..b921535b8b8 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/ExecNodeGraphInternalPlan.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/ExecNodeGraphInternalPlan.java
@@ -34,17 +34,21 @@ import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.StandardOpenOption;
 import java.util.List;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
 /** Implementation of {@link CompiledPlan} backed by an {@link ExecNodeGraph}. 
*/
 @Internal
 public class ExecNodeGraphInternalPlan implements InternalPlan {
 
-    private final String serializedPlan;
+    private final Supplier<String> serializedPlanSupplier;
     private final ExecNodeGraph execNodeGraph;
 
-    public ExecNodeGraphInternalPlan(String serializedPlan, ExecNodeGraph 
execNodeGraph) {
-        this.serializedPlan = serializedPlan;
+    private String serializedPlan;
+
+    public ExecNodeGraphInternalPlan(
+            Supplier<String> serializedPlanSupplier, ExecNodeGraph 
execNodeGraph) {
+        this.serializedPlanSupplier = serializedPlanSupplier;
         this.execNodeGraph = execNodeGraph;
     }
 
@@ -54,6 +58,9 @@ public class ExecNodeGraphInternalPlan implements 
InternalPlan {
 
     @Override
     public String asJsonString() {
+        if (serializedPlan == null) {
+            serializedPlan = serializedPlanSupplier.get();
+        }
         return serializedPlan;
     }
 
@@ -78,7 +85,7 @@ public class ExecNodeGraphInternalPlan implements 
InternalPlan {
             Files.createDirectories(file.toPath().getParent());
             Files.write(
                     file.toPath(),
-                    serializedPlan.getBytes(StandardCharsets.UTF_8),
+                    asJsonString().getBytes(StandardCharsets.UTF_8),
                     StandardOpenOption.CREATE,
                     StandardOpenOption.TRUNCATE_EXISTING,
                     StandardOpenOption.WRITE);
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala
index 1029475b748..fb32326f117 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala
@@ -22,7 +22,7 @@ import org.apache.flink.api.dag.Transformation
 import org.apache.flink.configuration.ExecutionOptions
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectReader
 import org.apache.flink.streaming.api.graph.StreamGraph
-import org.apache.flink.table.api.{ExplainDetail, ExplainFormat, 
PlanReference, TableConfig, TableException}
+import org.apache.flink.table.api._
 import org.apache.flink.table.api.PlanReference.{ContentPlanReference, 
FilePlanReference, ResourcePlanReference}
 import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog}
 import org.apache.flink.table.delegation.{Executor, InternalPlan}
@@ -190,10 +190,12 @@ class StreamPlanner(
     }
 
     new ExecNodeGraphInternalPlan(
-      JsonSerdeUtil
-        .createObjectWriter(ctx)
-        .withDefaultPrettyPrinter()
-        .writeValueAsString(execNodeGraph),
+      // ensures that the JSON output is always normalized
+      () =>
+        JsonSerdeUtil
+          .createObjectWriter(ctx)
+          .withDefaultPrettyPrinter()
+          .writeValueAsString(execNodeGraph),
       execNodeGraph)
   }
 
@@ -204,12 +206,12 @@ class StreamPlanner(
     val execGraph = translateToExecNodeGraph(optimizedRelNodes, isCompiled = 
true)
     afterTranslation()
 
-    new ExecNodeGraphInternalPlan(
-      JsonSerdeUtil
-        .createObjectWriter(createSerdeContext)
-        .withDefaultPrettyPrinter()
-        .writeValueAsString(execGraph),
-      execGraph)
+    val compiledJson = JsonSerdeUtil
+      .createObjectWriter(createSerdeContext)
+      .withDefaultPrettyPrinter()
+      .writeValueAsString(execGraph)
+
+    new ExecNodeGraphInternalPlan(() => compiledJson, execGraph)
   }
 
   override def translatePlan(plan: InternalPlan): util.List[Transformation[_]] 
= {

Reply via email to