This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b1db13f5cfb3b81409d6f6fb079424f44ccc826e
Author: Dawid Wysakowicz <dwysakow...@apache.org>
AuthorDate: Wed Jul 24 11:21:00 2019 +0200

    [FLINK-13279][table-sql-client] Fully qualify sink name in sql-client
    
    This closes #9229.
---
 .../client/gateway/local/ExecutionContext.java     |  28 ++---
 .../table/client/gateway/local/LocalExecutor.java  |   7 +-
 .../client/gateway/local/LocalExecutorITCase.java  |  23 +++-
 .../client/gateway/utils/SimpleCatalogFactory.java | 118 +++++++++++++++++++++
 .../org.apache.flink.table.factories.TableFactory  |   1 +
 .../test/resources/test-sql-client-defaults.yaml   |   3 +
 .../flink/table/api/EnvironmentSettings.java       |   7 +-
 7 files changed, 168 insertions(+), 19 deletions(-)

diff --git 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
index 0df7fba..ae7fb2c 100644
--- 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
+++ 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
@@ -302,20 +302,6 @@ public class ExecutionContext<T> {
                        // register catalogs
                        catalogs.forEach(tableEnv::registerCatalog);
 
-                       // set current catalog
-                       if (sessionContext.getCurrentCatalog().isPresent()) {
-                               
tableEnv.useCatalog(sessionContext.getCurrentCatalog().get());
-                       } else if 
(mergedEnv.getExecution().getCurrentCatalog().isPresent()) {
-                               
tableEnv.useCatalog(mergedEnv.getExecution().getCurrentCatalog().get());
-                       }
-
-                       // set current database
-                       if (sessionContext.getCurrentDatabase().isPresent()) {
-                               
tableEnv.useDatabase(sessionContext.getCurrentDatabase().get());
-                       } else if 
(mergedEnv.getExecution().getCurrentDatabase().isPresent()) {
-                               
tableEnv.useDatabase(mergedEnv.getExecution().getCurrentDatabase().get());
-                       }
-
                        // create query config
                        queryConfig = createQueryConfig();
 
@@ -340,6 +326,20 @@ public class ExecutionContext<T> {
                                        
registerTemporalTable(temporalTableEntry);
                                }
                        });
+
+                       // set current catalog
+                       if (sessionContext.getCurrentCatalog().isPresent()) {
+                               
tableEnv.useCatalog(sessionContext.getCurrentCatalog().get());
+                       } else if 
(mergedEnv.getExecution().getCurrentCatalog().isPresent()) {
+                               
tableEnv.useCatalog(mergedEnv.getExecution().getCurrentCatalog().get());
+                       }
+
+                       // set current database
+                       if (sessionContext.getCurrentDatabase().isPresent()) {
+                               
tableEnv.useDatabase(sessionContext.getCurrentDatabase().get());
+                       } else if 
(mergedEnv.getExecution().getCurrentDatabase().isPresent()) {
+                               
tableEnv.useDatabase(mergedEnv.getExecution().getCurrentDatabase().get());
+                       }
                }
 
                public QueryConfig getQueryConfig() {
diff --git 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
index 7e41f1f..101f72c 100644
--- 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
+++ 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
@@ -34,6 +34,7 @@ import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.plugin.PluginUtils;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.QueryConfig;
 import org.apache.flink.table.api.StreamQueryConfig;
 import org.apache.flink.table.api.Table;
@@ -473,7 +474,11 @@ public class LocalExecutor implements Executor {
                        // writing to a sink requires an optimization step that 
might reference UDFs during code compilation
                        context.wrapClassLoader(() -> {
                                
envInst.getTableEnvironment().registerTableSink(jobName, result.getTableSink());
-                               table.insertInto(jobName, 
envInst.getQueryConfig());
+                               table.insertInto(
+                                       envInst.getQueryConfig(),
+                                       
EnvironmentSettings.DEFAULT_BUILTIN_CATALOG,
+                                       
EnvironmentSettings.DEFAULT_BUILTIN_DATABASE,
+                                       jobName);
                                return null;
                        });
                        jobGraph = envInst.createJobGraph(jobName);
diff --git 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
index ac1a7ae..5dbec41 100644
--- 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
+++ 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
@@ -42,6 +42,7 @@ import org.apache.flink.table.client.gateway.SessionContext;
 import org.apache.flink.table.client.gateway.SqlExecutionException;
 import org.apache.flink.table.client.gateway.TypedResult;
 import org.apache.flink.table.client.gateway.utils.EnvironmentFileUtil;
+import org.apache.flink.table.client.gateway.utils.SimpleCatalogFactory;
 import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.types.Row;
@@ -63,6 +64,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
 import static org.junit.Assert.assertEquals;
@@ -160,7 +162,8 @@ public class LocalExecutorITCase extends TestLogger {
 
                final List<String> expectedCatalogs = Arrays.asList(
                        "default_catalog",
-                       "catalog1");
+                       "catalog1",
+                       "simple-catalog");
                assertEquals(expectedCatalogs, actualCatalogs);
        }
 
@@ -402,7 +405,7 @@ public class LocalExecutorITCase extends TestLogger {
                final SessionContext session = new 
SessionContext("test-session", new Environment());
 
                try {
-                       // start job
+                       // Case 1: Registered sink
                        final ProgramTargetDescriptor targetDescriptor = 
executor.executeUpdate(
                                session,
                                "INSERT INTO TableSourceSink SELECT 
IntegerField1 = 42, StringField1 FROM TableNumber1");
@@ -424,6 +427,22 @@ public class LocalExecutorITCase extends TestLogger {
                                                fail("Unexpected job status.");
                                }
                        }
+
+                       // Case 2: Temporary sink
+                       session.setCurrentCatalog("simple-catalog");
+                       session.setCurrentDatabase("default_database");
+                       // all queries are pipelined to an in-memory sink, 
check it is properly registered
+                       final ResultDescriptor otherCatalogDesc = 
executor.executeQuery(session, "SELECT * FROM `test-table`");
+
+                       final List<String> otherCatalogResults = 
retrieveTableResult(
+                               executor,
+                               session,
+                               otherCatalogDesc.getResultId());
+
+                       TestBaseUtils.compareResultCollections(
+                               
SimpleCatalogFactory.TABLE_CONTENTS.stream().map(Row::toString).collect(Collectors.toList()),
+                               otherCatalogResults,
+                               Comparator.naturalOrder());
                } finally {
                        executor.stop(session);
                }
diff --git 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/SimpleCatalogFactory.java
 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/SimpleCatalogFactory.java
new file mode 100644
index 0000000..5f533fb
--- /dev/null
+++ 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/SimpleCatalogFactory.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.gateway.utils;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.Types;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.ConnectorCatalogTable;
+import org.apache.flink.table.catalog.GenericInMemoryCatalog;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.descriptors.CatalogDescriptorValidator;
+import org.apache.flink.table.factories.CatalogFactory;
+import org.apache.flink.table.sources.StreamTableSource;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.WrappingRuntimeException;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Catalog factory for an in-memory catalog that contains a single non-empty 
table.
+ * The contents of the table are equal to {@link 
SimpleCatalogFactory#TABLE_CONTENTS}.
+ */
+public class SimpleCatalogFactory implements CatalogFactory {
+
+       public static final String CATALOG_TYPE_VALUE = "simple-catalog";
+
+       public static final String TEST_TABLE_NAME = "test-table";
+
+       public static final List<Row> TABLE_CONTENTS = Arrays.asList(
+               Row.of(1, "Hello"),
+               Row.of(2, "Hello world"),
+               Row.of(3, "Hello world! Hello!")
+       );
+
+       @Override
+       public Catalog createCatalog(String name, Map<String, String> 
properties) {
+               String database = properties.getOrDefault(
+                       CatalogDescriptorValidator.CATALOG_DEFAULT_DATABASE,
+                       "default_database");
+               GenericInMemoryCatalog genericInMemoryCatalog = new 
GenericInMemoryCatalog(name, database);
+
+               String tableName = properties.getOrDefault(TEST_TABLE_NAME, 
TEST_TABLE_NAME);
+               StreamTableSource<Row> tableSource = new 
StreamTableSource<Row>() {
+                       @Override
+                       public DataStream<Row> 
getDataStream(StreamExecutionEnvironment execEnv) {
+                               return execEnv.fromCollection(TABLE_CONTENTS)
+                                       .returns(new RowTypeInfo(
+                                               new 
TypeInformation[]{Types.INT(), Types.STRING()},
+                                               new String[]{"id", "string"}));
+                       }
+
+                       @Override
+                       public TableSchema getTableSchema() {
+                               return TableSchema.builder()
+                                       .field("id", DataTypes.INT())
+                                       .field("string", DataTypes.STRING())
+                                       .build();
+                       }
+
+                       @Override
+                       public DataType getProducedDataType() {
+                               return DataTypes.ROW(
+                                       DataTypes.FIELD("id", DataTypes.INT()),
+                                       DataTypes.FIELD("string", 
DataTypes.STRING())
+                               );
+                       }
+               };
+
+               try {
+                       genericInMemoryCatalog.createTable(
+                               new ObjectPath(database, tableName),
+                               ConnectorCatalogTable.source(tableSource, 
false),
+                               false
+                       );
+               } catch (Exception e) {
+                       throw new WrappingRuntimeException(e);
+               }
+
+               return genericInMemoryCatalog;
+       }
+
+       @Override
+       public Map<String, String> requiredContext() {
+               Map<String, String> context = new HashMap<>();
+               context.put(CatalogDescriptorValidator.CATALOG_TYPE, 
CATALOG_TYPE_VALUE);
+               return context;
+       }
+
+       @Override
+       public List<String> supportedProperties() {
+               return 
Arrays.asList(CatalogDescriptorValidator.CATALOG_DEFAULT_DATABASE, 
TEST_TABLE_NAME);
+       }
+}
diff --git 
a/flink-table/flink-sql-client/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
 
b/flink-table/flink-sql-client/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
index b4e3095..5ba6b0b 100644
--- 
a/flink-table/flink-sql-client/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
+++ 
b/flink-table/flink-sql-client/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
@@ -15,5 +15,6 @@
 
 org.apache.flink.table.client.gateway.utils.DummyTableSinkFactory
 org.apache.flink.table.client.gateway.utils.DummyTableSourceFactory
+org.apache.flink.table.client.gateway.utils.SimpleCatalogFactory
 org.apache.flink.table.client.gateway.local.DependencyTest$TestCatalogFactory
 
org.apache.flink.table.client.gateway.local.DependencyTest$TestHiveCatalogFactory
diff --git 
a/flink-table/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml 
b/flink-table/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml
index 9e0582b..9844d54 100644
--- 
a/flink-table/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml
+++ 
b/flink-table/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml
@@ -137,3 +137,6 @@ deployment:
 catalogs:
   - name: catalog1
     type: DependencyTest
+  - name: simple-catalog
+    type: simple-catalog
+    test-table: test-table
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java
index bfd9203..7eec4a3 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java
@@ -46,8 +46,11 @@ import java.util.Map;
  */
 @PublicEvolving
 public class EnvironmentSettings {
+
        public static final String STREAMING_MODE = "streaming-mode";
        public static final String CLASS_NAME = "class-name";
+       public static final String DEFAULT_BUILTIN_CATALOG = "default_catalog";
+       public static final String DEFAULT_BUILTIN_DATABASE = 
"default_database";
 
        /**
         * Canonical name of the {@link Planner} class to use.
@@ -158,8 +161,8 @@ public class EnvironmentSettings {
 
                private String plannerClass = OLD_PLANNER_FACTORY;
                private String executorClass = OLD_EXECUTOR_FACTORY;
-               private String builtInCatalogName = "default_catalog";
-               private String builtInDatabaseName = "default_database";
+               private String builtInCatalogName = DEFAULT_BUILTIN_CATALOG;
+               private String builtInDatabaseName = DEFAULT_BUILTIN_DATABASE;
                private boolean isStreamingMode = true;
 
                /**

Reply via email to