twalthr commented on a change in pull request #18573:
URL: https://github.com/apache/flink/pull/18573#discussion_r801843526



##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PlanReference.java
##########
@@ -57,53 +47,140 @@ public static PlanReference fromFile(Path path) {
 
     /** Create a reference starting from a file path. */
     public static PlanReference fromFile(File file) {
-        return new PlanReference(file, null);
+        return new FilePlanReference(file);
     }
 
     /** Create a reference starting from a JSON string. */
     public static PlanReference fromJsonString(String jsonString) {
-        return new PlanReference(null, jsonString);
+        return new ContentPlanReference(jsonString);
     }
 
     /**
      * Create a reference from a file in the classpath, using {@code
      * Thread.currentThread().getContextClassLoader()} as {@link ClassLoader}.
-     *
-     * @throws TableException if the classpath resource cannot be found
      */
-    public static PlanReference fromClasspath(String classpathFilePath) {
-        return fromClasspath(Thread.currentThread().getContextClassLoader(), 
classpathFilePath);
+    public static PlanReference fromResource(String resourcePath) {
+        return fromResource(Thread.currentThread().getContextClassLoader(), 
resourcePath);
     }
 
-    /**
-     * Create a reference from a file in the classpath.
-     *
-     * @throws TableException if the classpath resource cannot be found
-     */
-    public static PlanReference fromClasspath(ClassLoader classLoader, String 
classpathFilePath)
-            throws TableException {
-        URL url = classLoader.getResource(classpathFilePath);
-        if (url == null) {
-            throw new TableException(
-                    "Cannot load the plan reference from classpath, resource 
not found: "
-                            + classpathFilePath);
-        }
-
-        try {
-            return PlanReference.fromFile(new File(url.toURI()));
-        } catch (URISyntaxException e) {
-            throw new TableException(
-                    "Cannot load the plan reference from classpath, invalid 
URI: "
-                            + classpathFilePath,
-                    e);
+    /** Create a reference from a file in the classpath. */
+    public static PlanReference fromResource(ClassLoader classLoader, String 
resourcePath) {
+        return new ResourcePlanReference(classLoader, resourcePath);
+    }
+
+    /** Plan reference to a file in the local filesystem. */
+    public static class FilePlanReference extends PlanReference {
+
+        private final File file;
+
+        private FilePlanReference(File file) {
+            this.file = file;
+        }
+
+        public File getFile() {
+            return file;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            FilePlanReference that = (FilePlanReference) o;
+            return Objects.equals(file, that.file);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(file);
+        }
+
+        @Override
+        public String toString() {
+            return "Plan from file '" + file + "'";
         }
     }
 
-    public Optional<File> getFile() {
-        return Optional.ofNullable(file);
+    /** Plan reference to a string containing the serialized persisted plan in 
JSON. */
+    public static class ContentPlanReference extends PlanReference {
+
+        private final String content;
+
+        public ContentPlanReference(String content) {

Review comment:
       private constructor

##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PlanReference.java
##########
@@ -57,53 +47,140 @@ public static PlanReference fromFile(Path path) {
 
     /** Create a reference starting from a file path. */
     public static PlanReference fromFile(File file) {
-        return new PlanReference(file, null);
+        return new FilePlanReference(file);
     }
 
     /** Create a reference starting from a JSON string. */
     public static PlanReference fromJsonString(String jsonString) {
-        return new PlanReference(null, jsonString);
+        return new ContentPlanReference(jsonString);
     }
 
     /**
      * Create a reference from a file in the classpath, using {@code
      * Thread.currentThread().getContextClassLoader()} as {@link ClassLoader}.
-     *
-     * @throws TableException if the classpath resource cannot be found
      */
-    public static PlanReference fromClasspath(String classpathFilePath) {
-        return fromClasspath(Thread.currentThread().getContextClassLoader(), 
classpathFilePath);
+    public static PlanReference fromResource(String resourcePath) {
+        return fromResource(Thread.currentThread().getContextClassLoader(), 
resourcePath);
     }
 
-    /**
-     * Create a reference from a file in the classpath.
-     *
-     * @throws TableException if the classpath resource cannot be found
-     */
-    public static PlanReference fromClasspath(ClassLoader classLoader, String 
classpathFilePath)
-            throws TableException {
-        URL url = classLoader.getResource(classpathFilePath);
-        if (url == null) {
-            throw new TableException(
-                    "Cannot load the plan reference from classpath, resource 
not found: "
-                            + classpathFilePath);
-        }
-
-        try {
-            return PlanReference.fromFile(new File(url.toURI()));
-        } catch (URISyntaxException e) {
-            throw new TableException(
-                    "Cannot load the plan reference from classpath, invalid 
URI: "
-                            + classpathFilePath,
-                    e);
+    /** Create a reference from a file in the classpath. */
+    public static PlanReference fromResource(ClassLoader classLoader, String 
resourcePath) {
+        return new ResourcePlanReference(classLoader, resourcePath);
+    }
+
+    /** Plan reference to a file in the local filesystem. */
+    public static class FilePlanReference extends PlanReference {
+
+        private final File file;
+
+        private FilePlanReference(File file) {
+            this.file = file;
+        }
+
+        public File getFile() {
+            return file;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            FilePlanReference that = (FilePlanReference) o;
+            return Objects.equals(file, that.file);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(file);
+        }
+
+        @Override
+        public String toString() {
+            return "Plan from file '" + file + "'";
         }
     }
 
-    public Optional<File> getFile() {
-        return Optional.ofNullable(file);
+    /** Plan reference to a string containing the serialized persisted plan in 
JSON. */
+    public static class ContentPlanReference extends PlanReference {
+
+        private final String content;
+
+        public ContentPlanReference(String content) {
+            this.content = content;
+        }
+
+        public String getContent() {
+            return content;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            ContentPlanReference that = (ContentPlanReference) o;
+            return Objects.equals(content, that.content);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(content);
+        }
+
+        @Override
+        public String toString() {
+            return "Plan '" + content + "'";
+        }
     }
 
-    public Optional<String> getContent() {
-        return Optional.ofNullable(content);
+    /** Plan reference to a file in the provided {@link ClassLoader}. */
+    public static class ResourcePlanReference extends PlanReference {
+
+        private final ClassLoader classLoader;
+        private final String resourcePath;
+
+        public ResourcePlanReference(ClassLoader classLoader, String 
resourcePath) {

Review comment:
       private constructor

##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/CompiledPlan.java
##########
@@ -59,4 +62,12 @@ default void writeToFile(Path path) throws IOException {
     /** Write this plan to a file using the JSON representation. */
     void writeToFile(Path path, boolean ignoreIfExists)
             throws IOException, UnsupportedOperationException;
+
+    // --- Accessors
+
+    /** Get the flink version used to compile the plan. */
+    String getFlinkVersion();
+
+    /** This returns an ordered list of sink identifiers, if any. */
+    List<String> getSinkIdentifiers();

Review comment:
       mark as `@Internal`

##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/CompiledPlan.java
##########
@@ -59,4 +62,12 @@ default void writeToFile(Path path) throws IOException {
     /** Write this plan to a file using the JSON representation. */
     void writeToFile(Path path, boolean ignoreIfExists)
             throws IOException, UnsupportedOperationException;
+
+    // --- Accessors
+
+    /** Get the flink version used to compile the plan. */
+    String getFlinkVersion();

Review comment:
       Can we use a `FlinkVersion` instance instead of a string? 

##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PlanReference.java
##########
@@ -57,53 +47,140 @@ public static PlanReference fromFile(Path path) {
 
     /** Create a reference starting from a file path. */
     public static PlanReference fromFile(File file) {
-        return new PlanReference(file, null);
+        return new FilePlanReference(file);

Review comment:
       add null checks either here or in the constructors to identify API 
misuse early

##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PlanReference.java
##########
@@ -57,53 +47,140 @@ public static PlanReference fromFile(Path path) {
 
     /** Create a reference starting from a file path. */
     public static PlanReference fromFile(File file) {
-        return new PlanReference(file, null);
+        return new FilePlanReference(file);
     }
 
     /** Create a reference starting from a JSON string. */
     public static PlanReference fromJsonString(String jsonString) {
-        return new PlanReference(null, jsonString);
+        return new ContentPlanReference(jsonString);
     }
 
     /**
      * Create a reference from a file in the classpath, using {@code
      * Thread.currentThread().getContextClassLoader()} as {@link ClassLoader}.
-     *
-     * @throws TableException if the classpath resource cannot be found
      */
-    public static PlanReference fromClasspath(String classpathFilePath) {
-        return fromClasspath(Thread.currentThread().getContextClassLoader(), 
classpathFilePath);
+    public static PlanReference fromResource(String resourcePath) {
+        return fromResource(Thread.currentThread().getContextClassLoader(), 
resourcePath);
     }
 
-    /**
-     * Create a reference from a file in the classpath.
-     *
-     * @throws TableException if the classpath resource cannot be found
-     */
-    public static PlanReference fromClasspath(ClassLoader classLoader, String 
classpathFilePath)
-            throws TableException {
-        URL url = classLoader.getResource(classpathFilePath);
-        if (url == null) {
-            throw new TableException(
-                    "Cannot load the plan reference from classpath, resource 
not found: "
-                            + classpathFilePath);
-        }
-
-        try {
-            return PlanReference.fromFile(new File(url.toURI()));
-        } catch (URISyntaxException e) {
-            throw new TableException(
-                    "Cannot load the plan reference from classpath, invalid 
URI: "
-                            + classpathFilePath,
-                    e);
+    /** Create a reference from a file in the classpath. */
+    public static PlanReference fromResource(ClassLoader classLoader, String 
resourcePath) {
+        return new ResourcePlanReference(classLoader, resourcePath);
+    }
+
+    /** Plan reference to a file in the local filesystem. */
+    public static class FilePlanReference extends PlanReference {
+
+        private final File file;
+
+        private FilePlanReference(File file) {
+            this.file = file;
+        }
+
+        public File getFile() {
+            return file;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            FilePlanReference that = (FilePlanReference) o;
+            return Objects.equals(file, that.file);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(file);
+        }
+
+        @Override
+        public String toString() {
+            return "Plan from file '" + file + "'";
         }
     }
 
-    public Optional<File> getFile() {
-        return Optional.ofNullable(file);
+    /** Plan reference to a string containing the serialized persisted plan in 
JSON. */
+    public static class ContentPlanReference extends PlanReference {
+
+        private final String content;
+
+        public ContentPlanReference(String content) {
+            this.content = content;
+        }
+
+        public String getContent() {
+            return content;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            ContentPlanReference that = (ContentPlanReference) o;
+            return Objects.equals(content, that.content);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(content);
+        }
+
+        @Override
+        public String toString() {
+            return "Plan '" + content + "'";

Review comment:
       this `toString` is not very useful. We should do `Plan:\n + content`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to