LadyForest commented on code in PR #22818:
URL: https://github.com/apache/flink/pull/22818#discussion_r1236665067
##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java:
##########
@@ -731,38 +731,46 @@ public TableResultInternal executePlan(InternalPlan plan)
{
}
private CompiledPlan compilePlanAndWrite(
- String filePath, boolean ifNotExists, Operation operation) {
- File file = Paths.get(filePath).toFile();
- if (file.exists()) {
- if (ifNotExists) {
- return loadPlan(PlanReference.fromFile(filePath));
+ String pathString, boolean ignoreIfExists, Operation operation) {
+ try {
+ ResourceUri planResource = new ResourceUri(ResourceType.FILE,
pathString);
+ Path planPath = new Path(pathString);
+ if (resourceManager.exists(planPath)) {
+ if (ignoreIfExists) {
+ return loadPlan(
+ PlanReference.fromFile(
+
resourceManager.registerFileResource(planResource)));
+ }
+
+ if (!tableConfig.get(TableConfigOptions.PLAN_FORCE_RECOMPILE))
{
+ throw new TableException(
+ String.format(
+ "Cannot overwrite the plan file '%s'. "
+ + "Either manually remove the file
or, "
+ + "if you're debugging your job, "
+ + "set the option '%s' to true.",
+ pathString,
TableConfigOptions.PLAN_FORCE_RECOMPILE.key()));
+ }
}
- if (!tableConfig.get(TableConfigOptions.PLAN_FORCE_RECOMPILE)) {
+ CompiledPlan compiledPlan;
+ if (operation instanceof StatementSetOperation) {
+ compiledPlan = compilePlan(((StatementSetOperation)
operation).getOperations());
+ } else if (operation instanceof ModifyOperation) {
+ compiledPlan =
compilePlan(Collections.singletonList((ModifyOperation) operation));
+ } else {
throw new TableException(
- String.format(
- "Cannot overwrite the plan file '%s'. "
- + "Either manually remove the file or,
"
- + "if you're debugging your job, "
- + "set the option '%s' to true.",
- filePath,
TableConfigOptions.PLAN_FORCE_RECOMPILE.key()));
+ "Unsupported operation to compile: "
+ + operation.getClass()
+ + ". This is a bug, please file an issue.");
}
- }
-
- CompiledPlan compiledPlan;
- if (operation instanceof StatementSetOperation) {
- compiledPlan = compilePlan(((StatementSetOperation)
operation).getOperations());
- } else if (operation instanceof ModifyOperation) {
- compiledPlan =
compilePlan(Collections.singletonList((ModifyOperation) operation));
- } else {
+ resourceManager.syncFileResource(
+ planResource, path -> compiledPlan.writeToFile(path,
false));
+ return compiledPlan;
+ } catch (IOException e) {
throw new TableException(
- "Unsupported operation to compile: "
- + operation.getClass()
- + ". This is a bug, please file an issue.");
+ String.format("Cannot execute operation %s ",
operation.asSummaryString()), e);
Review Comment:
Here's my understanding.
The entire code block is surrounded by one `try... catch`. And the
`IOException` might be thrown at any point
- L#738(check file status)
- L#742(download the remote file to local)
- L#767(sync the local file to remote).
As a result, I think `Fail to compile plan and write to xxx for xx` might be
too general to fit all conditions. At the same time, this method is called by
both `CompilePlanOperation` and `CompileAndExecutePlanOperation`. So I think
the top-level exception message should differentiate it's a `COMPILE PLAN`
statement or a `COMPILE AND EXECUTE PLAN` statement.
WDYT?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]