This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new e7902bb  [FLINK-18076][table sql / client] Use correct classloader 
when parsing queries
e7902bb is described below

commit e7902bb4d1329833870ee53c782c6431cfc8cb80
Author: Leonard Xu <[email protected]>
AuthorDate: Thu Jun 4 23:53:07 2020 +0800

    [FLINK-18076][table sql / client] Use correct classloader when parsing 
queries
    
    This closes #12475
---
 .../table/client/gateway/local/LocalExecutor.java  | 15 ++++++-
 .../table/client/gateway/local/DependencyTest.java | 50 ++++++++++++++++------
 2 files changed, 51 insertions(+), 14 deletions(-)

diff --git 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
index fd78712..604734e 100644
--- 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
+++ 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
@@ -41,6 +41,7 @@ import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.TableResult;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.internal.TableEnvironmentInternal;
+import org.apache.flink.table.catalog.UnresolvedIdentifier;
 import org.apache.flink.table.catalog.exceptions.CatalogException;
 import org.apache.flink.table.client.SqlClientException;
 import org.apache.flink.table.client.config.Environment;
@@ -56,6 +57,7 @@ import 
org.apache.flink.table.client.gateway.local.result.ChangelogResult;
 import org.apache.flink.table.client.gateway.local.result.DynamicResult;
 import org.apache.flink.table.client.gateway.local.result.MaterializedResult;
 import org.apache.flink.table.delegation.Parser;
+import org.apache.flink.table.operations.Operation;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.utils.LogicalTypeUtils;
 import org.apache.flink.table.types.utils.DataTypeUtils;
@@ -460,7 +462,18 @@ public class LocalExecutor implements Executor {
        public Parser getSqlParser(String sessionId) {
                final ExecutionContext<?> context = 
getExecutionContext(sessionId);
                final TableEnvironment tableEnv = context.getTableEnvironment();
-               return ((TableEnvironmentInternal) tableEnv).getParser();
+               final Parser parser = ((TableEnvironmentInternal) 
tableEnv).getParser();
+               return new Parser() {
+                       @Override
+                       public List<Operation> parse(String statement) {
+                               return context.wrapClassLoader(() -> 
parser.parse(statement));
+                       }
+
+                       @Override
+                       public UnresolvedIdentifier parseIdentifier(String 
identifier) {
+                               return context.wrapClassLoader(() -> 
parser.parseIdentifier(identifier));
+                       }
+               };
        }
 
        @Override
diff --git 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/DependencyTest.java
 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/DependencyTest.java
index 33633f3..87f750d 100644
--- 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/DependencyTest.java
+++ 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/DependencyTest.java
@@ -44,10 +44,12 @@ import org.apache.flink.table.client.gateway.SessionContext;
 import org.apache.flink.table.client.gateway.utils.EnvironmentFileUtil;
 import org.apache.flink.table.client.gateway.utils.TestTableSinkFactoryBase;
 import org.apache.flink.table.client.gateway.utils.TestTableSourceFactoryBase;
+import org.apache.flink.table.delegation.Parser;
 import org.apache.flink.table.descriptors.DescriptorProperties;
 import org.apache.flink.table.factories.CatalogFactory;
 import org.apache.flink.table.factories.ModuleFactory;
 import org.apache.flink.table.module.Module;
+import org.apache.flink.table.operations.Operation;
 import org.apache.flink.table.types.DataType;
 
 import org.junit.Test;
@@ -65,6 +67,7 @@ import static 
org.apache.flink.table.descriptors.CatalogDescriptorValidator.CATA
 import static 
org.apache.flink.table.descriptors.CatalogDescriptorValidator.CATALOG_TYPE;
 import static 
org.apache.flink.table.descriptors.ModuleDescriptorValidator.MODULE_TYPE;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 /**
  * Dependency tests for {@link LocalExecutor}. Mainly for testing classloading 
of dependencies.
@@ -82,6 +85,39 @@ public class DependencyTest {
 
        @Test
        public void testTableFactoryDiscovery() throws Exception {
+               final LocalExecutor executor = createExecutor();
+               final SessionContext session = new 
SessionContext("test-session", new Environment());
+               String sessionId = executor.openSession(session);
+               try {
+                       final TableSchema result = 
executor.getTableSchema(sessionId, "TableNumber1");
+                       final TableSchema expected = TableSchema.builder()
+                               .field("IntegerField1", Types.INT())
+                               .field("StringField1", Types.STRING())
+                               .field("rowtimeField", Types.SQL_TIMESTAMP())
+                               .build();
+
+                       assertEquals(expected, result);
+               } finally {
+                       executor.closeSession(sessionId);
+               }
+       }
+
+       @Test
+       public void testSqlParseWithUserClassLoader() throws Exception {
+               final LocalExecutor executor = createExecutor();
+               final SessionContext session = new 
SessionContext("test-session", new Environment());
+               String sessionId = executor.openSession(session);
+               try {
+                       final Parser sqlParser = 
executor.getSqlParser(sessionId);
+                       List<Operation> operations = sqlParser.parse("SELECT 
IntegerField1, StringField1 FROM TableNumber1");
+
+                       assertTrue(operations != null && operations.size() == 
1);
+               } finally {
+                       executor.closeSession(sessionId);
+               }
+       }
+
+       private LocalExecutor createExecutor() throws Exception {
                // create environment
                final Map<String, String> replaceVars = new HashMap<>();
                replaceVars.put("$VAR_CONNECTOR_TYPE", CONNECTOR_TYPE_VALUE);
@@ -91,24 +127,12 @@ public class DependencyTest {
 
                // create executor with dependencies
                final URL dependency = Paths.get("target", 
TABLE_FACTORY_JAR_FILE).toUri().toURL();
-               final LocalExecutor executor = new LocalExecutor(
+               return new LocalExecutor(
                        env,
                        Collections.singletonList(dependency),
                        new Configuration(),
                        new DefaultCLI(new Configuration()),
                        new DefaultClusterClientServiceLoader());
-
-               final SessionContext session = new 
SessionContext("test-session", new Environment());
-               String sessionId = executor.openSession(session);
-
-               final TableSchema result = executor.getTableSchema(sessionId, 
"TableNumber1");
-               final TableSchema expected = TableSchema.builder()
-                       .field("IntegerField1", Types.INT())
-                       .field("StringField1", Types.STRING())
-                       .field("rowtimeField", Types.SQL_TIMESTAMP())
-                       .build();
-
-               assertEquals(expected, result);
        }
 
        // 
--------------------------------------------------------------------------------------------

Reply via email to