[ 
https://issues.apache.org/jira/browse/FLINK-10263?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16627711#comment-16627711
 ] 

ASF GitHub Bot commented on FLINK-10263:
----------------------------------------

asfgit closed pull request #6725: [FLINK-10263] [sql-client] Fix classloader 
issues in SQL Client
URL: https://github.com/apache/flink/pull/6725
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-end-to-end-tests/test-scripts/test_sql_client.sh 
b/flink-end-to-end-tests/test-scripts/test_sql_client.sh
index b5830725db1..ca0251365e6 100755
--- a/flink-end-to-end-tests/test-scripts/test_sql_client.sh
+++ b/flink-end-to-end-tests/test-scripts/test_sql_client.sh
@@ -212,6 +212,8 @@ tables:
         type: VARCHAR
       - name: duplicate_count
         type: BIGINT
+      - name: constant
+        type: VARCHAR
     connector:
       type: filesystem
       path: $RESULT
@@ -226,6 +228,8 @@ tables:
           type: VARCHAR
         - name: duplicate_count
           type: BIGINT
+        - name: constant
+          type: VARCHAR
 
 functions:
   - name: RegReplace
@@ -261,7 +265,7 @@ $FLINK_DIR/bin/sql-client.sh embedded \
 
 read -r -d '' SQL_STATEMENT_2 << EOF
 INSERT INTO CsvSinkTable
-  SELECT *
+  SELECT AvroBothTable.*, RegReplace('Test constant folding.', 'Test', 
'Success') AS constant
   FROM AvroBothTable
 EOF
 
@@ -285,4 +289,4 @@ for i in {1..10}; do
   sleep 5
 done
 
-check_result_hash "SQLClient" $RESULT "dca08a82cc09f6b19950291dbbef16bb"
+check_result_hash "SQLClient" $RESULT "0a1bf8bf716069b7269f575f87a802c0"
diff --git 
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
 
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
index 85b3e9265a8..552d0b37dca 100644
--- 
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
+++ 
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
@@ -75,6 +75,7 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.function.Supplier;
 
 /**
  * Context for executing table programs. This class caches everything that can 
be cached across
@@ -183,6 +184,19 @@ public EnvironmentInstance createEnvironmentInstance() {
                return tableSinks;
        }
 
+       /**
+        * Executes the given supplier using the execution context's 
classloader as thread classloader.
+        */
+       public <R> R wrapClassLoader(Supplier<R> supplier) {
+               final ClassLoader previousClassloader = 
Thread.currentThread().getContextClassLoader();
+               Thread.currentThread().setContextClassLoader(classLoader);
+               try {
+                       return supplier.get();
+               } finally {
+                       
Thread.currentThread().setContextClassLoader(previousClassloader);
+               }
+       }
+
        // 
--------------------------------------------------------------------------------------------
 
        private static CommandLine createCommandLine(Deployment deployment, 
Options commandLineOptions) {
diff --git 
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
 
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
index 3b9e8e99b82..1318043faf1 100644
--- 
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
+++ 
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
@@ -219,14 +219,16 @@ public TableSchema getTableSchema(SessionContext session, 
String name) throws Sq
 
        @Override
        public String explainStatement(SessionContext session, String 
statement) throws SqlExecutionException {
-               final TableEnvironment tableEnv = 
getOrCreateExecutionContext(session)
+               final ExecutionContext<?> context = 
getOrCreateExecutionContext(session);
+               final TableEnvironment tableEnv = context
                        .createEnvironmentInstance()
                        .getTableEnvironment();
 
                // translate
                try {
                        final Table table = createTable(tableEnv, statement);
-                       return tableEnv.explain(table);
+                       // explanation requires an optimization step that might 
reference UDFs during code compilation
+                       return context.wrapClassLoader(() -> 
tableEnv.explain(table));
                } catch (Throwable t) {
                        // catch everything such that the query does not crash 
the executor
                        throw new SqlExecutionException("Invalid SQL 
statement.", t);
@@ -242,7 +244,7 @@ public ResultDescriptor executeQuery(SessionContext 
session, String query) throw
        @Override
        public TypedResult<List<Tuple2<Boolean, Row>>> 
retrieveResultChanges(SessionContext session,
                        String resultId) throws SqlExecutionException {
-               final DynamicResult result = resultStore.getResult(resultId);
+               final DynamicResult<?> result = resultStore.getResult(resultId);
                if (result == null) {
                        throw new SqlExecutionException("Could not find a 
result with result identifier '" + resultId + "'.");
                }
@@ -254,7 +256,7 @@ public ResultDescriptor executeQuery(SessionContext 
session, String query) throw
 
        @Override
        public TypedResult<Integer> snapshotResult(SessionContext session, 
String resultId, int pageSize) throws SqlExecutionException {
-               final DynamicResult result = resultStore.getResult(resultId);
+               final DynamicResult<?> result = resultStore.getResult(resultId);
                if (result == null) {
                        throw new SqlExecutionException("Could not find a 
result with result identifier '" + resultId + "'.");
                }
@@ -266,7 +268,7 @@ public ResultDescriptor executeQuery(SessionContext 
session, String query) throw
 
        @Override
        public List<Row> retrieveResultPage(String resultId, int page) throws 
SqlExecutionException {
-               final DynamicResult result = resultStore.getResult(resultId);
+               final DynamicResult<?> result = resultStore.getResult(resultId);
                if (result == null) {
                        throw new SqlExecutionException("Could not find a 
result with result identifier '" + resultId + "'.");
                }
@@ -350,7 +352,7 @@ public void stop(SessionContext session) {
        private <C> ProgramTargetDescriptor 
executeUpdateInternal(ExecutionContext<C> context, String statement) {
                final ExecutionContext.EnvironmentInstance envInst = 
context.createEnvironmentInstance();
 
-               applyUpdate(envInst.getTableEnvironment(), 
envInst.getQueryConfig(), statement);
+               applyUpdate(context, envInst.getTableEnvironment(), 
envInst.getQueryConfig(), statement);
 
                // create job graph with dependencies
                final String jobName = context.getSessionContext().getName() + 
": " + statement;
@@ -392,7 +394,11 @@ public void stop(SessionContext session) {
                final String jobName = context.getSessionContext().getName() + 
": " + query;
                final JobGraph jobGraph;
                try {
-                       table.writeToSink(result.getTableSink(), 
envInst.getQueryConfig());
+                       // writing to a sink requires an optimization step that 
might reference UDFs during code compilation
+                       context.wrapClassLoader(() -> {
+                               table.writeToSink(result.getTableSink(), 
envInst.getQueryConfig());
+                               return null;
+                       });
                        jobGraph = envInst.createJobGraph(jobName);
                } catch (Throwable t) {
                        // the result needs to be closed as long as
@@ -435,10 +441,14 @@ private Table createTable(TableEnvironment tableEnv, 
String selectQuery) {
        /**
         * Applies the given update statement to the given table environment 
with query configuration.
         */
-       private void applyUpdate(TableEnvironment tableEnv, QueryConfig 
queryConfig, String updateStatement) {
+       private <C> void applyUpdate(ExecutionContext<C> context, 
TableEnvironment tableEnv, QueryConfig queryConfig, String updateStatement) {
                // parse and validate statement
                try {
-                       tableEnv.sqlUpdate(updateStatement, queryConfig);
+                       // update statement requires an optimization step that 
might reference UDFs during code compilation
+                       context.wrapClassLoader(() -> {
+                               tableEnv.sqlUpdate(updateStatement, 
queryConfig);
+                               return null;
+                       });
                } catch (Throwable t) {
                        // catch everything such that the statement does not 
crash the executor
                        throw new SqlExecutionException("Invalid SQL update 
statement.", t);
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala
index f58e12cfea2..2b50bb92c9a 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala
@@ -23,7 +23,6 @@ import java.util
 import org.apache.calcite.plan.RelOptPlanner
 import org.apache.calcite.rex.{RexBuilder, RexNode}
 import org.apache.calcite.sql.`type`.SqlTypeName
-import org.apache.commons.lang3.StringEscapeUtils
 import org.apache.flink.api.common.functions.MapFunction
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo
 import org.apache.flink.api.java.typeutils.RowTypeInfo
@@ -101,7 +100,10 @@ class ExpressionReducer(config: TableConfig)
         |""".stripMargin,
       resultType)
 
-    val clazz = compile(getClass.getClassLoader, generatedFunction.name, 
generatedFunction.code)
+    val clazz = compile(
+      Thread.currentThread().getContextClassLoader,
+      generatedFunction.name,
+      generatedFunction.code)
     val function = clazz.newInstance()
 
     // execute


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> User-defined function with LITERAL paramters yields CompileException
> --------------------------------------------------------------------
>
>                 Key: FLINK-10263
>                 URL: https://issues.apache.org/jira/browse/FLINK-10263
>             Project: Flink
>          Issue Type: Bug
>          Components: Table API &amp; SQL
>    Affects Versions: 1.7.0
>            Reporter: Fabian Hueske
>            Assignee: Timo Walther
>            Priority: Major
>              Labels: pull-request-available
>
> When using a user-defined scalar function only with literal parameters, a 
> {{CompileException}} is thrown. For example
> {code}
> SELECT myFunc(CAST(40.750444 AS FLOAT), CAST(-73.993475 AS FLOAT))
> public class MyFunc extends ScalarFunction {
>       public int eval(float lon, float lat) {
>               // do something
>       }
> }
> {code}
> results in 
> {code}
> [ERROR] Could not execute SQL statement. Reason:
> org.codehaus.commons.compiler.CompileException: Line 5, Column 10: Cannot 
> determine simple type name "com"
> {code}
> The problem is probably caused by the expression reducer because it 
> disappears if a regular attribute is added to a parameter expression.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to