LadyForest commented on code in PR #22539:
URL: https://github.com/apache/flink/pull/22539#discussion_r1230718029
##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/CompileAndExecutePlanOperation.java:
##########
@@ -19,25 +19,26 @@
package org.apache.flink.table.operations;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.fs.Path;
import org.apache.flink.util.Preconditions;
/** Operation to describe an {@code COMPILE AND EXECUTE PLAN} statement. */
@Internal
public class CompileAndExecutePlanOperation implements Operation {
- private final String filePath;
+ private final Path filePath;
private final Operation operation;
public CompileAndExecutePlanOperation(String filePath, Operation
operation) {
Preconditions.checkArgument(
operation instanceof StatementSetOperation || operation
instanceof ModifyOperation,
"Child operation of CompileAndExecuteOperation must be either
a "
+ "ModifyOperation or a StatementSetOperation.");
- this.filePath = filePath;
+ this.filePath = new Path(filePath);
Review Comment:
Perhaps we don't need to change this.
##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala:
##########
@@ -2155,6 +2158,47 @@ class TableEnvironmentTest {
testUnsupportedExplain("explain plan as json for select * from MyTable")
}
+ @Test
+ def testCompileAndExecutePlan(): Unit = {
+ val execEnv = StreamExecutionEnvironment.getExecutionEnvironment
+ execEnv.setParallelism(1)
+ val settings = EnvironmentSettings.newInstance().inStreamingMode().build()
+ val tableEnv = StreamTableEnvironment.create(execEnv, settings)
+ tableEnv.getConfig.set(TableConfigOptions.PLAN_FORCE_RECOMPILE,
Boolean.box(true))
+
+ val srcTableDdl =
+ "CREATE TABLE MyTable (\n" + " a bigint,\n" + " b int,\n" + " c
varchar\n" + ") with (\n" + " 'connector' = 'values',\n" + " 'bounded' =
'false')"
+ tableEnv.executeSql(srcTableDdl)
+
+ val sinkTableDdl =
+ "CREATE TABLE MySink (\n" + " a bigint,\n" + " b int,\n" + " c
varchar\n" + ") with (\n" + " 'connector' = 'values',\n" + "
'table-sink-class' = 'DEFAULT')"
+ tableEnv.executeSql(sinkTableDdl)
+
+ val planfile = tempFolder.newFile("compile1.json")
+ var file = "file://" + planfile.toPath
+
+ var sql = String.format("COMPILE PLAN '%s' FOR INSERT INTO MySink SELECT *
FROM MyTable", file)
+ tableEnv.executeSql(sql)
+
+ sql = String.format("EXECUTE PLAN '%s'", file)
+ tableEnv.executeSql(sql)
+
+ val planPath = tempFolder.getRoot.getPath
+ var path = "file://" + planPath + "/compile2.json"
+
+ sql = String.format("COMPILE PLAN '%s' FOR INSERT INTO MySink SELECT *
FROM MyTable", path)
+ tableEnv.executeSql(sql)
+
+ sql = String.format("EXECUTE PLAN '%s'", path)
+ tableEnv.executeSql(sql)
+
+ path = "file://" + planPath + "/compile3.json"
+ sql = String.format(
+ "COMPILE and EXECUTE plan '%s' FOR INSERT INTO MySink SELECT * FROM
MyTable",
+ path)
+ tableEnv.executeSql(sql)
Review Comment:
What's the purpose of this test? It does not assert anything.
##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala:
##########
@@ -2155,6 +2158,47 @@ class TableEnvironmentTest {
testUnsupportedExplain("explain plan as json for select * from MyTable")
}
+ @Test
+ def testCompileAndExecutePlan(): Unit = {
+ val execEnv = StreamExecutionEnvironment.getExecutionEnvironment
+ execEnv.setParallelism(1)
+ val settings = EnvironmentSettings.newInstance().inStreamingMode().build()
+ val tableEnv = StreamTableEnvironment.create(execEnv, settings)
Review Comment:
What's the purpose of re-creating a `tableEnv`?
##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/resource/ResourceManager.java:
##########
@@ -83,6 +83,56 @@ public ResourceManager(ReadableConfig config,
MutableURLClassLoader userClassLoa
this.userClassLoader = userClassLoader;
}
+ public boolean exists(Path filePath) throws IOException {
+ return
FileSystem.getUnguardedFileSystem(filePath.toUri()).exists(filePath);
+ }
+
+ /**
+ * Register the filePath of flink filesystem. If it is remote filesystem
and the file exists
+ * then download the file at local. register the filePath map to localURL.
+ */
+ public void registerFsResources(Path filePath) throws IOException {
Review Comment:
Meanwhile, this method lacks test coverage.
##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java:
##########
@@ -2165,6 +2168,25 @@ public void testCreateViewWithDynamicTableOptions() {
assertThat(operation).isInstanceOf(CreateViewOperation.class);
}
+ @ParameterizedTest
Review Comment:
We could add the test at the syntax level since the operation level requires
no change.
##########
flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/SqlITCaseBase.java:
##########
@@ -113,6 +113,13 @@ public void runAndCheckSQL(
}
}
+ public void runSQL(String sqlPath, Map<String, String> varsMap) throws
Exception {
+ try (ClusterController clusterController = flink.startCluster(1)) {
+ List<String> sqlLines = initializeSqlLines(sqlPath, varsMap);
+ executeSqlStatements(clusterController, sqlLines);
Review Comment:
Why duplicate the method and assert nothing?
##########
flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/CompileAndExecuteRemoteFileITCase.java:
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sql.codegen;
+
+import org.apache.flink.test.resources.ResourceTestUtils;
+import org.apache.flink.test.util.SQLJobSubmission;
+import org.apache.flink.tests.util.flink.ClusterController;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.List;
+import java.util.Map;
+
+/** End-to-End tests for compile and execute remote file. */
+public class CompileAndExecuteRemoteFileITCase extends SqlITCaseBase {
Review Comment:
This test shares the same HDFS configuration with `UsingRemoteJarITCase.`
Maybe deserve a common base class
##########
flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/CompileAndExecuteRemoteFileITCase.java:
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sql.codegen;
+
+import org.apache.flink.test.resources.ResourceTestUtils;
+import org.apache.flink.test.util.SQLJobSubmission;
+import org.apache.flink.tests.util.flink.ClusterController;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.List;
+import java.util.Map;
+
+/** End-to-End tests for compile and execute remote file. */
+public class CompileAndExecuteRemoteFileITCase extends SqlITCaseBase {
+ private static final Path HADOOP_CLASSPATH =
+ ResourceTestUtils.getResource(".*hadoop.classpath");
+
+ private MiniDFSCluster hdfsCluster;
+ private org.apache.hadoop.fs.Path hdPath;
+ private org.apache.hadoop.fs.FileSystem hdfs;
+
+ public CompileAndExecuteRemoteFileITCase(String executionMode) {
+ super(executionMode);
+ }
+
+ @Before
+ public void before() throws Exception {
+ super.before();
+ createHDFS();
+ }
+
+ private void createHDFS() {
+ try {
+ Configuration hdConf = new Configuration();
+
+ File baseDir = new
File("./target/hdfs/hdfsTest").getAbsoluteFile();
+ FileUtil.fullyDelete(baseDir);
+ hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR,
baseDir.getAbsolutePath());
+ MiniDFSCluster.Builder builder = new
MiniDFSCluster.Builder(hdConf);
+ hdfsCluster = builder.build();
+
+ hdPath = new org.apache.hadoop.fs.Path("/test.json");
+ hdfs = hdPath.getFileSystem(hdConf);
+
+ } catch (Throwable e) {
+ e.printStackTrace();
Review Comment:
I don't think it's a good practice to do so...
##########
flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/CompileAndExecuteRemoteFileITCase.java:
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sql.codegen;
+
+import org.apache.flink.test.resources.ResourceTestUtils;
+import org.apache.flink.test.util.SQLJobSubmission;
+import org.apache.flink.tests.util.flink.ClusterController;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.List;
+import java.util.Map;
+
+/** End-to-End tests for compile and execute remote file. */
+public class CompileAndExecuteRemoteFileITCase extends SqlITCaseBase {
+ private static final Path HADOOP_CLASSPATH =
+ ResourceTestUtils.getResource(".*hadoop.classpath");
+
+ private MiniDFSCluster hdfsCluster;
+ private org.apache.hadoop.fs.Path hdPath;
+ private org.apache.hadoop.fs.FileSystem hdfs;
+
+ public CompileAndExecuteRemoteFileITCase(String executionMode) {
+ super(executionMode);
+ }
+
+ @Before
+ public void before() throws Exception {
+ super.before();
+ createHDFS();
+ }
+
+ private void createHDFS() {
+ try {
+ Configuration hdConf = new Configuration();
+
+ File baseDir = new
File("./target/hdfs/hdfsTest").getAbsoluteFile();
+ FileUtil.fullyDelete(baseDir);
+ hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR,
baseDir.getAbsolutePath());
+ MiniDFSCluster.Builder builder = new
MiniDFSCluster.Builder(hdConf);
+ hdfsCluster = builder.build();
+
+ hdPath = new org.apache.hadoop.fs.Path("/test.json");
Review Comment:
If the purpose of declaring this variable is to delete this path in the
after method, then it can be turned into a local variable in `after`.
##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/resource/ResourceManager.java:
##########
@@ -83,6 +83,56 @@ public ResourceManager(ReadableConfig config,
MutableURLClassLoader userClassLoa
this.userClassLoader = userClassLoader;
}
+ public boolean exists(Path filePath) throws IOException {
+ return
FileSystem.getUnguardedFileSystem(filePath.toUri()).exists(filePath);
+ }
+
+ /**
+ * Register the filePath of flink filesystem. If it is remote filesystem
and the file exists
+ * then download the file at local. register the filePath map to localURL.
+ */
+ public void registerFsResources(Path filePath) throws IOException {
Review Comment:
File resource shares almost the same logic like jar resource, maybe we can
extract the common utils to avoid repeatation.
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/ExecNodeGraphInternalPlan.java:
##########
@@ -75,6 +75,9 @@ public void writeToFile(File file, boolean ignoreIfExists,
boolean failIfExists)
}
}
try {
+ if (!file.getParentFile().exists()) {
Review Comment:
I don't think it's appropriate to make parent dirs here. The logic spreads
across multiple classes...
##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/command/ExecutePlanOperation.java:
##########
@@ -19,19 +19,20 @@
package org.apache.flink.table.operations.command;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.fs.Path;
import org.apache.flink.table.operations.Operation;
/** Operation to describe an EXECUTE PLAN statement. */
@Internal
public class ExecutePlanOperation implements Operation {
- private final String filePath;
+ private final Path filePath;
Review Comment:
ditto
##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlOtherOperationConverterTest.java:
##########
@@ -321,4 +326,23 @@ private void assertShowFunctions(
assertThat(showFunctionsOperation.getFunctionScope()).isEqualTo(expectedScope);
assertThat(showFunctionsOperation.asSummaryString()).isEqualTo(expectedSummary);
}
+
+ @ParameterizedTest
Review Comment:
ditto
##########
flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/CompileAndExecuteRemoteFileITCase.java:
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sql.codegen;
+
+import org.apache.flink.test.resources.ResourceTestUtils;
+import org.apache.flink.test.util.SQLJobSubmission;
+import org.apache.flink.tests.util.flink.ClusterController;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.List;
+import java.util.Map;
+
+/** End-to-End tests for compile and execute remote file. */
+public class CompileAndExecuteRemoteFileITCase extends SqlITCaseBase {
+ private static final Path HADOOP_CLASSPATH =
+ ResourceTestUtils.getResource(".*hadoop.classpath");
+
+ private MiniDFSCluster hdfsCluster;
+ private org.apache.hadoop.fs.Path hdPath;
+ private org.apache.hadoop.fs.FileSystem hdfs;
+
+ public CompileAndExecuteRemoteFileITCase(String executionMode) {
+ super(executionMode);
+ }
+
+ @Before
+ public void before() throws Exception {
+ super.before();
+ createHDFS();
+ }
+
+ private void createHDFS() {
+ try {
+ Configuration hdConf = new Configuration();
+
+ File baseDir = new
File("./target/hdfs/hdfsTest").getAbsoluteFile();
+ FileUtil.fullyDelete(baseDir);
+ hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR,
baseDir.getAbsolutePath());
+ MiniDFSCluster.Builder builder = new
MiniDFSCluster.Builder(hdConf);
+ hdfsCluster = builder.build();
+
+ hdPath = new org.apache.hadoop.fs.Path("/test.json");
+ hdfs = hdPath.getFileSystem(hdConf);
+
+ } catch (Throwable e) {
+ e.printStackTrace();
+ Assert.fail("Test failed " + e.getMessage());
+ }
+ }
+
+ @After
+ public void destroyHDFS() {
+ try {
+ hdfs.delete(hdPath, false);
+ hdfsCluster.shutdown();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ protected Map<String, String> generateReplaceVars() {
+ String remoteJsonPath =
+ String.format(
+ "hdfs://%s:%s/%s",
+ hdfsCluster.getURI().getHost(),
hdfsCluster.getNameNodePort(), hdPath);
+
+ Map<String, String> map = super.generateReplaceVars();
+ map.put("$HDFS_Json_Plan_PATH", remoteJsonPath);
+ return map;
+ }
+
+ @Test
+ public void testCompilePlanRemoteFile() throws Exception {
+ runSQL("compile_plan_use_remote_file_e2e.sql", generateReplaceVars());
Review Comment:
The test does not assert anything...
##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/CompilePlanOperation.java:
##########
@@ -26,20 +27,20 @@
/** Operation to describe an {@code COMPILE PLAN} statement. */
public class CompilePlanOperation implements Operation {
- private final String filePath;
+ private final Path filePath;
private final boolean ifNotExists;
private final Operation operation;
public CompilePlanOperation(String filePath, boolean ifNotExists,
Operation operation) {
Preconditions.checkArgument(
operation instanceof StatementSetOperation || operation
instanceof ModifyOperation,
"child operation of CompileOperation must be either a
ModifyOperation or a StatementSetOperation");
- this.filePath = filePath;
+ this.filePath = new Path(filePath);
Review Comment:
ditto
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]