luoyuxia commented on code in PR #22818:
URL: https://github.com/apache/flink/pull/22818#discussion_r1235201385


##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java:
##########
@@ -731,38 +731,46 @@ public TableResultInternal executePlan(InternalPlan plan) 
{
     }
 
     private CompiledPlan compilePlanAndWrite(
-            String filePath, boolean ifNotExists, Operation operation) {
-        File file = Paths.get(filePath).toFile();
-        if (file.exists()) {
-            if (ifNotExists) {
-                return loadPlan(PlanReference.fromFile(filePath));
+            String pathString, boolean ignoreIfExists, Operation operation) {
+        try {
+            ResourceUri planResource = new ResourceUri(ResourceType.FILE, 
pathString);
+            Path planPath = new Path(pathString);
+            if (resourceManager.exists(planPath)) {
+                if (ignoreIfExists) {
+                    return loadPlan(
+                            PlanReference.fromFile(
+                                    
resourceManager.registerFileResource(planResource)));
+                }
+
+                if (!tableConfig.get(TableConfigOptions.PLAN_FORCE_RECOMPILE)) 
{
+                    throw new TableException(
+                            String.format(
+                                    "Cannot overwrite the plan file '%s'. "
+                                            + "Either manually remove the file 
or, "
+                                            + "if you're debugging your job, "
+                                            + "set the option '%s' to true.",
+                                    pathString, 
TableConfigOptions.PLAN_FORCE_RECOMPILE.key()));
+                }
             }
 
-            if (!tableConfig.get(TableConfigOptions.PLAN_FORCE_RECOMPILE)) {
+            CompiledPlan compiledPlan;
+            if (operation instanceof StatementSetOperation) {
+                compiledPlan = compilePlan(((StatementSetOperation) 
operation).getOperations());
+            } else if (operation instanceof ModifyOperation) {
+                compiledPlan = 
compilePlan(Collections.singletonList((ModifyOperation) operation));
+            } else {
                 throw new TableException(
-                        String.format(
-                                "Cannot overwrite the plan file '%s'. "
-                                        + "Either manually remove the file or, 
"
-                                        + "if you're debugging your job, "
-                                        + "set the option '%s' to true.",
-                                filePath, 
TableConfigOptions.PLAN_FORCE_RECOMPILE.key()));
+                        "Unsupported operation to compile: "
+                                + operation.getClass()
+                                + ". This is a bug, please file an issue.");
             }
-        }
-
-        CompiledPlan compiledPlan;
-        if (operation instanceof StatementSetOperation) {
-            compiledPlan = compilePlan(((StatementSetOperation) 
operation).getOperations());
-        } else if (operation instanceof ModifyOperation) {
-            compiledPlan = 
compilePlan(Collections.singletonList((ModifyOperation) operation));
-        } else {
+            resourceManager.syncFileResource(
+                    planResource, path -> compiledPlan.writeToFile(path, 
false));
+            return compiledPlan;
+        } catch (IOException e) {
             throw new TableException(
-                    "Unsupported operation to compile: "
-                            + operation.getClass()
-                            + ". This is a bug, please file an issue.");
+                    String.format("Cannot execute operation %s ", 
operation.asSummaryString()), e);

Review Comment:
   Nit:
   Will it be better if error message is some thing like:
   `Fail to comile plan and write to xxx for xx`?



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/resource/ResourceManager.java:
##########
@@ -296,4 +319,85 @@ private Path getResourceLocalPath(Path remotePath) {
         }
         return new Path(localResourceDir, fileNameWithUUID);
     }
+
+    private void checkResources(List<ResourceUri> resourceUris, ResourceType 
expectedType)
+            throws IOException {
+        // check the resource type
+        if (resourceUris.stream()
+                .anyMatch(resourceUri -> expectedType != 
resourceUri.getResourceType())) {
+            throw new ValidationException(
+                    String.format(
+                            "Only support to register %s resource, resource 
info:\n %s.",

Review Comment:
   Got confused about the method and the error message.
   What's the purpose for this method?
   Is it to check the `resourceUris` are all belong to `expectedType`?
   If it's, the error message should change to something like `Expect the 
resouce type to be xxx, but get an a reousce with resource type xxx`;
   
   If it 's check the `resourceUris` are all belong to supported resource 
types, I think the `expectedType` won't be needed, we can maintain the 
supported resource types as a static field in this class.



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/resource/ResourceManager.java:
##########
@@ -203,54 +193,75 @@ public void close() throws IOException {
         }
     }
 
-    private void checkJarResources(List<ResourceUri> resourceUris) throws 
IOException {
-        // only support register jar resource
-        if (resourceUris.stream()
-                .anyMatch(resourceUri -> 
!ResourceType.JAR.equals(resourceUri.getResourceType()))) {
-            throw new ValidationException(
-                    String.format(
-                            "Only support to register jar resource, resource 
info:\n %s.",
-                            resourceUris.stream()
-                                    .map(ResourceUri::getUri)
-                                    .collect(Collectors.joining(",\n"))));
-        }
+    /** Check whether the {@link Path} exists. */
+    public boolean exists(Path filePath) throws IOException {
+        return filePath.getFileSystem().exists(filePath);
+    }
 
-        for (ResourceUri resourceUri : resourceUris) {
-            checkJarPath(new Path(resourceUri.getUri()));
+    /**
+     * Synchronize a file resource identified by the given {@link ResourceUri} 
with a local copy
+     * generated by the given resource generator.
+     *
+     * @param resourceUri the file resource to synchronize

Review Comment:
   nit
   ```suggestion
        * @param resourceUri the file resource uri to synchronize to
   ```
   ?



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java:
##########
@@ -938,8 +946,19 @@ public TableResultInternal executeInternal(Operation 
operation) {
             return executeQueryOperation((QueryOperation) operation);
         } else if (operation instanceof ExecutePlanOperation) {
             ExecutePlanOperation executePlanOperation = (ExecutePlanOperation) 
operation;
-            return (TableResultInternal)
-                    
executePlan(PlanReference.fromFile(executePlanOperation.getFilePath()));
+            try {
+                return (TableResultInternal)
+                        executePlan(
+                                PlanReference.fromFile(
+                                        resourceManager.registerFileResource(
+                                                new ResourceUri(
+                                                        ResourceType.FILE,
+                                                        
executePlanOperation.getFilePath()))));
+            } catch (IOException e) {
+                throw new TableException(
+                        String.format("Cannot execute operation %s", 
operation.asSummaryString()),

Review Comment:
   nit:
   Can't execute the plan for the operation %s
   ?



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/resource/ResourceManager.java:
##########
@@ -89,52 +90,41 @@ public ResourceManager(ReadableConfig config, 
MutableURLClassLoader userClassLoa
      * register them into the {@link ResourceManager}.
      */
     public void registerJarResources(List<ResourceUri> resourceUris) throws 
IOException {
-        // check jar resource before register
-        checkJarResources(resourceUris);
-
-        Map<ResourceUri, URL> stagingResourceLocalURLs = new HashMap<>();
-        for (ResourceUri resourceUri : resourceUris) {
-            // check whether the resource has been registered
-            if (resourceInfos.containsKey(resourceUri) && 
resourceInfos.get(resourceUri) != null) {
-                LOG.info(
-                        "Resource [{}] has been registered, overwriting of 
registered resource is not supported "
-                                + "in the current version, skipping.",
-                        resourceUri.getUri());
-                continue;
-            }
-
-            // here can check whether the resource path is valid
-            Path path = new Path(resourceUri.getUri());
-            URL localUrl;
-            // check resource scheme
-            String scheme = StringUtils.lowerCase(path.toUri().getScheme());
-            // download resource to local path firstly if in remote
-            if (scheme != null && !FILE_SCHEME.equals(scheme)) {
-                localUrl = downloadResource(path);
-            } else {
-                localUrl = getURLFromPath(path);
-                // if the local jar resource is a relative path, here convert 
it to absolute path
-                // before register
-                resourceUri = new ResourceUri(ResourceType.JAR, 
localUrl.getPath());
-            }
-
-            // check the local jar file
-            JarUtils.checkJarFile(localUrl);
-
-            // add it to staging map
-            stagingResourceLocalURLs.put(resourceUri, localUrl);
-        }
-
-        // register resource in batch
-        stagingResourceLocalURLs.forEach(
-                (resourceUri, url) -> {
-                    // jar resource need add to classloader
-                    userClassLoader.addURL(url);
-                    LOG.info("Added jar resource [{}] to class path.", url);
+        registerResources(
+                prepareStagingResources(
+                        resourceUris,
+                        ResourceType.JAR,
+                        true,
+                        url -> {
+                            try {
+                                JarUtils.checkJarFile(url);
+                            } catch (IOException e) {
+                                throw new ValidationException(e.getMessage(), 
e);

Review Comment:
   nit:
   ```suggestion
                                  throw new ValidationException(
                                           String.format("Fail to register jar: 
%s.", url), e);
   ```
   ?



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/resource/ResourceManager.java:
##########
@@ -203,54 +193,75 @@ public void close() throws IOException {
         }
     }
 
-    private void checkJarResources(List<ResourceUri> resourceUris) throws 
IOException {
-        // only support register jar resource
-        if (resourceUris.stream()
-                .anyMatch(resourceUri -> 
!ResourceType.JAR.equals(resourceUri.getResourceType()))) {
-            throw new ValidationException(
-                    String.format(
-                            "Only support to register jar resource, resource 
info:\n %s.",
-                            resourceUris.stream()
-                                    .map(ResourceUri::getUri)
-                                    .collect(Collectors.joining(",\n"))));
-        }
+    /** Check whether the {@link Path} exists. */
+    public boolean exists(Path filePath) throws IOException {
+        return filePath.getFileSystem().exists(filePath);
+    }
 
-        for (ResourceUri resourceUri : resourceUris) {
-            checkJarPath(new Path(resourceUri.getUri()));
+    /**
+     * Synchronize a file resource identified by the given {@link ResourceUri} 
with a local copy

Review Comment:
   I do spend some time to understand this method. Maybe the doc is hard to 
understand. 
   IIUC, this method is not to snchronize a file resource but to snchronize a 
local file generated to the given {@link ResourceUri}.
   
   it should be
   Generate a local file resource by given resource generator and then 
synchronize to the path identified by the identified by the given {@link 
ResourceUri}.  The path passed to `resourceGenerator` will be a local path 
retreived from the given {@link ResourceUri}.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to