tsreaper commented on a change in pull request #17672:
URL: https://github.com/apache/flink/pull/17672#discussion_r743594661
##########
File path:
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSourceITCase.scala
##########
@@ -18,12 +18,19 @@
package org.apache.flink.table.planner.runtime.batch.sql
+import java.io.File
+import java.net.URL
import java.util
+import org.apache.flink.client.ClientUtils
+import org.apache.flink.configuration.Configuration
import org.apache.flink.table.catalog.{CatalogPartitionImpl,
CatalogPartitionSpec, ObjectPath}
import org.apache.flink.table.planner.factories.{TestValuesCatalog,
TestValuesTableFactory}
+import
org.apache.flink.table.planner.runtime.utils.BatchAbstractTestBase.TEMPORARY_FOLDER
import org.apache.flink.table.planner.runtime.utils.BatchTestBase
import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row
+import org.apache.flink.table.utils.TestUserClassLoaderJar
+import org.apache.flink.util.TemporaryClassLoaderContext
import org.junit.{Before, Test}
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
Review comment:
Clean up import order, although they're currently not checked for scala
files.
```suggestion
import org.apache.flink.client.ClientUtils
import org.apache.flink.configuration.Configuration
import org.apache.flink.table.catalog.{CatalogPartitionImpl,
CatalogPartitionSpec, ObjectPath}
import org.apache.flink.table.planner.factories.{TestValuesCatalog,
TestValuesTableFactory}
import
org.apache.flink.table.planner.runtime.utils.BatchAbstractTestBase.TEMPORARY_FOLDER
import org.apache.flink.table.planner.runtime.utils.BatchTestBase
import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row
import org.apache.flink.table.utils.TestUserClassLoaderJar
import org.apache.flink.util.TemporaryClassLoaderContext
import org.junit.{Before, Test}
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
import java.io.File
import java.net.URL
import java.util
```
##########
File path:
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSourceITCase.scala
##########
@@ -120,6 +127,33 @@ class PartitionableSourceITCase(
)
)
}
+
+ @Test
+ def testPartitionPrunerCompileClassLoader(): Unit = {
+ val udfJavaCode =
+ s"""
+ |public class TrimUDF extends
org.apache.flink.table.functions.ScalarFunction {
+ | public String eval(String str) {
+ | return str.trim();
+ | }
+ |}
+ |""".stripMargin
+ val tmpDir: File = TEMPORARY_FOLDER.newFolder()
+ val udfJarFile: File = TestUserClassLoaderJar.createJarFile(
+ tmpDir, "flink-test-udf.jar", "TrimUDF", udfJavaCode)
+ val jars: util.List[URL] = new util.ArrayList[URL]()
+ jars.add(udfJarFile.toURI.toURL)
+ val cl = ClientUtils.buildUserCodeClassLoader(jars,
java.util.Collections.emptyList(),
+ getClass.getClassLoader, new Configuration());
+ val ctx = TemporaryClassLoaderContext.of(cl)
+ tEnv.executeSql("create temporary function trimUDF as 'TrimUDF'");
+ checkResult("select * from MyTable where trimUDF(part1) = 'A' and part2 >
1",
+ Seq(
+ row(3, "Jack", "A", 2, 3)
+ )
+ )
+ ctx.close()
Review comment:
Wrap this with `try... finally...` and close class loader context in
`finally`. If the test fails the class loader will be changed and might cause
other bugs in other tests.
##########
File path:
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/utils/TestUserClassLoaderJar.java
##########
@@ -34,12 +34,13 @@
import java.util.jar.JarEntry;
import java.util.jar.JarOutputStream;
-/** Mainly used for testing classloading of UDF dependencies. */
+/** Mainly used for testing classloading. */
public class TestUserClassLoaderJar {
- private static final String GENERATED_UDF_CLASS = "LowerUDF";
+ /** Legacy code. */
+ public static final String GENERATED_UDF_CLASS = "LowerUDF";
- private static final String GENERATED_UDF_CODE =
+ public static final String GENERATED_UDF_CODE =
Review comment:
Move these back to `flink-sql-client`. They are used only by tests in
SQL client.
##########
File path:
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSourceITCase.scala
##########
@@ -120,6 +127,33 @@ class PartitionableSourceITCase(
)
)
}
+
+ @Test
+ def testPartitionPrunerCompileClassLoader(): Unit = {
+ val udfJavaCode =
+ s"""
+ |public class TrimUDF extends
org.apache.flink.table.functions.ScalarFunction {
+ | public String eval(String str) {
+ | return str.trim();
+ | }
+ |}
+ |""".stripMargin
+ val tmpDir: File = TEMPORARY_FOLDER.newFolder()
+ val udfJarFile: File = TestUserClassLoaderJar.createJarFile(
+ tmpDir, "flink-test-udf.jar", "TrimUDF", udfJavaCode)
+ val jars: util.List[URL] = new util.ArrayList[URL]()
+ jars.add(udfJarFile.toURI.toURL)
+ val cl = ClientUtils.buildUserCodeClassLoader(jars,
java.util.Collections.emptyList(),
+ getClass.getClassLoader, new Configuration());
+ val ctx = TemporaryClassLoaderContext.of(cl)
+ tEnv.executeSql("create temporary function trimUDF as 'TrimUDF'");
Review comment:
No semicolon in scala code.
##########
File path:
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSourceITCase.scala
##########
@@ -120,6 +127,33 @@ class PartitionableSourceITCase(
)
)
}
+
+ @Test
+ def testPartitionPrunerCompileClassLoader(): Unit = {
+ val udfJavaCode =
+ s"""
+ |public class TrimUDF extends
org.apache.flink.table.functions.ScalarFunction {
+ | public String eval(String str) {
+ | return str.trim();
+ | }
+ |}
+ |""".stripMargin
+ val tmpDir: File = TEMPORARY_FOLDER.newFolder()
+ val udfJarFile: File = TestUserClassLoaderJar.createJarFile(
+ tmpDir, "flink-test-udf.jar", "TrimUDF", udfJavaCode)
+ val jars: util.List[URL] = new util.ArrayList[URL]()
+ jars.add(udfJarFile.toURI.toURL)
+ val cl = ClientUtils.buildUserCodeClassLoader(jars,
java.util.Collections.emptyList(),
Review comment:
nit: For lists with single constant element use
`java.util.Collections.singletonList`.
##########
File path:
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSourceITCase.scala
##########
@@ -18,12 +18,19 @@
package org.apache.flink.table.planner.runtime.batch.sql
+import java.io.File
+import java.net.URL
import java.util
+import org.apache.flink.client.ClientUtils
+import org.apache.flink.configuration.Configuration
import org.apache.flink.table.catalog.{CatalogPartitionImpl,
CatalogPartitionSpec, ObjectPath}
import org.apache.flink.table.planner.factories.{TestValuesCatalog,
TestValuesTableFactory}
+import
org.apache.flink.table.planner.runtime.utils.BatchAbstractTestBase.TEMPORARY_FOLDER
import org.apache.flink.table.planner.runtime.utils.BatchTestBase
import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row
+import org.apache.flink.table.utils.TestUserClassLoaderJar
+import org.apache.flink.util.TemporaryClassLoaderContext
import org.junit.{Before, Test}
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
Review comment:
Clean up import order, although they're currently not checked for scala
files.
```suggestion
import org.apache.flink.client.ClientUtils
import org.apache.flink.configuration.Configuration
import org.apache.flink.table.catalog.{CatalogPartitionImpl,
CatalogPartitionSpec, ObjectPath}
import org.apache.flink.table.planner.factories.{TestValuesCatalog,
TestValuesTableFactory}
import
org.apache.flink.table.planner.runtime.utils.BatchAbstractTestBase.TEMPORARY_FOLDER
import org.apache.flink.table.planner.runtime.utils.BatchTestBase
import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row
import org.apache.flink.table.utils.TestUserClassLoaderJar
import org.apache.flink.util.TemporaryClassLoaderContext
import org.junit.{Before, Test}
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
import java.io.File
import java.net.URL
import java.util
```
##########
File path:
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSourceITCase.scala
##########
@@ -120,6 +127,33 @@ class PartitionableSourceITCase(
)
)
}
+
+ @Test
+ def testPartitionPrunerCompileClassLoader(): Unit = {
+ val udfJavaCode =
+ s"""
+ |public class TrimUDF extends
org.apache.flink.table.functions.ScalarFunction {
+ | public String eval(String str) {
+ | return str.trim();
+ | }
+ |}
+ |""".stripMargin
+ val tmpDir: File = TEMPORARY_FOLDER.newFolder()
+ val udfJarFile: File = TestUserClassLoaderJar.createJarFile(
+ tmpDir, "flink-test-udf.jar", "TrimUDF", udfJavaCode)
+ val jars: util.List[URL] = new util.ArrayList[URL]()
+ jars.add(udfJarFile.toURI.toURL)
+ val cl = ClientUtils.buildUserCodeClassLoader(jars,
java.util.Collections.emptyList(),
+ getClass.getClassLoader, new Configuration());
+ val ctx = TemporaryClassLoaderContext.of(cl)
+ tEnv.executeSql("create temporary function trimUDF as 'TrimUDF'");
+ checkResult("select * from MyTable where trimUDF(part1) = 'A' and part2 >
1",
+ Seq(
+ row(3, "Jack", "A", 2, 3)
+ )
+ )
+ ctx.close()
Review comment:
Wrap this with `try... finally...` and close class loader context in
`finally`. If the test fails the class loader will be changed and might cause
other bugs in other tests.
##########
File path:
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/utils/TestUserClassLoaderJar.java
##########
@@ -34,12 +34,13 @@
import java.util.jar.JarEntry;
import java.util.jar.JarOutputStream;
-/** Mainly used for testing classloading of UDF dependencies. */
+/** Mainly used for testing classloading. */
public class TestUserClassLoaderJar {
- private static final String GENERATED_UDF_CLASS = "LowerUDF";
+ /** Legacy code. */
+ public static final String GENERATED_UDF_CLASS = "LowerUDF";
- private static final String GENERATED_UDF_CODE =
+ public static final String GENERATED_UDF_CODE =
Review comment:
Move these back to `flink-sql-client`. They are used only by tests in
SQL client.
##########
File path:
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSourceITCase.scala
##########
@@ -120,6 +127,33 @@ class PartitionableSourceITCase(
)
)
}
+
+ @Test
+ def testPartitionPrunerCompileClassLoader(): Unit = {
+ val udfJavaCode =
+ s"""
+ |public class TrimUDF extends
org.apache.flink.table.functions.ScalarFunction {
+ | public String eval(String str) {
+ | return str.trim();
+ | }
+ |}
+ |""".stripMargin
+ val tmpDir: File = TEMPORARY_FOLDER.newFolder()
+ val udfJarFile: File = TestUserClassLoaderJar.createJarFile(
+ tmpDir, "flink-test-udf.jar", "TrimUDF", udfJavaCode)
+ val jars: util.List[URL] = new util.ArrayList[URL]()
+ jars.add(udfJarFile.toURI.toURL)
+ val cl = ClientUtils.buildUserCodeClassLoader(jars,
java.util.Collections.emptyList(),
+ getClass.getClassLoader, new Configuration());
+ val ctx = TemporaryClassLoaderContext.of(cl)
+ tEnv.executeSql("create temporary function trimUDF as 'TrimUDF'");
Review comment:
No semicolon in scala code.
##########
File path:
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSourceITCase.scala
##########
@@ -120,6 +127,33 @@ class PartitionableSourceITCase(
)
)
}
+
+ @Test
+ def testPartitionPrunerCompileClassLoader(): Unit = {
+ val udfJavaCode =
+ s"""
+ |public class TrimUDF extends
org.apache.flink.table.functions.ScalarFunction {
+ | public String eval(String str) {
+ | return str.trim();
+ | }
+ |}
+ |""".stripMargin
+ val tmpDir: File = TEMPORARY_FOLDER.newFolder()
+ val udfJarFile: File = TestUserClassLoaderJar.createJarFile(
+ tmpDir, "flink-test-udf.jar", "TrimUDF", udfJavaCode)
+ val jars: util.List[URL] = new util.ArrayList[URL]()
+ jars.add(udfJarFile.toURI.toURL)
+ val cl = ClientUtils.buildUserCodeClassLoader(jars,
java.util.Collections.emptyList(),
Review comment:
nit: For lists with single constant element use
`java.util.Collections.singletonList`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]