This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit da1ccb9cfae8be0abda7e85d15db3d38304336d2
Author: JingsongLi <[email protected]>
AuthorDate: Mon May 25 14:22:50 2020 +0800

    [FLINK-17896][hive] HiveCatalog can work with new table factory because of 
is_generic
    
    This closes #12316
---
 .../connectors/hive/HiveDynamicTableFactory.java   | 88 ++++++++++++++++++++++
 .../flink/table/catalog/hive/HiveCatalog.java      |  7 ++
 .../table/catalog/hive/HiveCatalogITCase.java      | 44 ++++++++++-
 3 files changed, 135 insertions(+), 4 deletions(-)

diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveDynamicTableFactory.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveDynamicTableFactory.java
new file mode 100644
index 0000000..56086fd
--- /dev/null
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveDynamicTableFactory.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.hive;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.flink.table.catalog.config.CatalogConfig.IS_GENERIC;
+
+/**
+ * A dynamic table factory implementation for Hive catalog. Now it only 
support generic tables.
+ * Hive tables should be resolved by {@link HiveTableFactory}.
+ */
+public class HiveDynamicTableFactory implements
+               DynamicTableSourceFactory,
+               DynamicTableSinkFactory {
+
+       @Override
+       public String factoryIdentifier() {
+               throw new UnsupportedOperationException("Hive factory is only 
work for catalog.");
+       }
+
+       @Override
+       public Set<ConfigOption<?>> requiredOptions() {
+               throw new UnsupportedOperationException("Hive factory is only 
work for catalog.");
+       }
+
+       @Override
+       public Set<ConfigOption<?>> optionalOptions() {
+               throw new UnsupportedOperationException("Hive factory is only 
work for catalog.");
+       }
+
+       private static CatalogTable removeIsGenericFlag(CatalogTable table) {
+               Map<String, String> newOptions = new 
HashMap<>(table.getOptions());
+               boolean isGeneric = 
Boolean.parseBoolean(newOptions.remove(IS_GENERIC));
+               if (!isGeneric) {
+                       throw new ValidationException(
+                                       "Hive dynamic table factory now only 
work for generic table.");
+               }
+               return table.copy(newOptions);
+       }
+
+       @Override
+       public DynamicTableSink createDynamicTableSink(Context context) {
+               return FactoryUtil.createTableSink(
+                               null, // we already in the factory of catalog
+                               context.getObjectIdentifier(),
+                               removeIsGenericFlag(context.getCatalogTable()),
+                               context.getConfiguration(),
+                               context.getClassLoader());
+       }
+
+       @Override
+       public DynamicTableSource createDynamicTableSource(Context context) {
+               return FactoryUtil.createTableSource(
+                               null, // we already in the factory of catalog
+                               context.getObjectIdentifier(),
+                               removeIsGenericFlag(context.getCatalogTable()),
+                               context.getConfiguration(),
+                               context.getClassLoader());
+       }
+}
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
index 5bc469f..d10127f 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.catalog.hive;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils;
+import org.apache.flink.connectors.hive.HiveDynamicTableFactory;
 import org.apache.flink.connectors.hive.HiveTableFactory;
 import org.apache.flink.sql.parser.hive.ddl.HiveDDLUtils;
 import org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveTable.AlterTableOp;
@@ -69,6 +70,7 @@ import 
org.apache.flink.table.catalog.stats.CatalogTableStatistics;
 import org.apache.flink.table.descriptors.DescriptorProperties;
 import org.apache.flink.table.descriptors.Schema;
 import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.factories.Factory;
 import org.apache.flink.table.factories.FunctionDefinitionFactory;
 import org.apache.flink.table.factories.TableFactory;
 import org.apache.flink.util.Preconditions;
@@ -238,6 +240,11 @@ public class HiveCatalog extends AbstractCatalog {
        }
 
        @Override
+       public Optional<Factory> getFactory() {
+               return Optional.of(new HiveDynamicTableFactory());
+       }
+
+       @Override
        public Optional<TableFactory> getTableFactory() {
                return Optional.of(new HiveTableFactory(hiveConf));
        }
diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogITCase.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogITCase.java
index 4064057..2817361 100644
--- 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogITCase.java
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogITCase.java
@@ -27,7 +27,6 @@ import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.Types;
-import org.apache.flink.table.api.config.ExecutionConfigOptions;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.CatalogTableBuilder;
 import org.apache.flink.table.catalog.ObjectPath;
@@ -52,8 +51,10 @@ import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 
 import java.io.BufferedReader;
+import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.FileReader;
+import java.io.PrintStream;
 import java.net.URI;
 import java.nio.file.Path;
 import java.nio.file.Paths;
@@ -63,6 +64,7 @@ import java.util.Comparator;
 import java.util.HashSet;
 import java.util.List;
 
+import static 
org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 
@@ -237,7 +239,7 @@ public class HiveCatalogITCase {
                // similar to CatalogTableITCase::testReadWriteCsvUsingDDL but 
uses HiveCatalog
                EnvironmentSettings settings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
                TableEnvironment tableEnv = TableEnvironment.create(settings);
-               
tableEnv.getConfig().getConfiguration().setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
 1);
+               
tableEnv.getConfig().getConfiguration().setInteger(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
 1);
 
                tableEnv.registerCatalog("myhive", hiveCatalog);
                tableEnv.useCatalog("myhive");
@@ -282,8 +284,7 @@ public class HiveCatalogITCase {
                }
                EnvironmentSettings settings = builder.build();
                TableEnvironment tableEnv = TableEnvironment.create(settings);
-               tableEnv.getConfig().getConfiguration().setInteger(
-                               
ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
+               
tableEnv.getConfig().getConfiguration().setInteger(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
 1);
 
                tableEnv.registerCatalog("myhive", hiveCatalog);
                tableEnv.useCatalog("myhive");
@@ -307,4 +308,39 @@ public class HiveCatalogITCase {
                Assert.assertEquals(5, rows.size());
                tableEnv.executeSql("DROP TABLE proctime_src");
        }
+
+       @Test
+       public void testNewTableFactory() {
+               TableEnvironment tEnv = TableEnvironment.create(
+                               
EnvironmentSettings.newInstance().inBatchMode().build());
+               tEnv.registerCatalog("myhive", hiveCatalog);
+               tEnv.useCatalog("myhive");
+               
tEnv.getConfig().getConfiguration().set(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
 1);
+
+               String path = 
this.getClass().getResource("/csv/test.csv").getPath();
+
+               PrintStream originalSystemOut = System.out;
+               try {
+                       ByteArrayOutputStream arrayOutputStream = new 
ByteArrayOutputStream();
+                       System.setOut(new PrintStream(arrayOutputStream));
+
+                       tEnv.executeSql("create table csv_table (name String, 
age Int) with (" +
+                                       "'connector.type' = 'filesystem'," +
+                                       "'connector.path' = 'file://" + path + 
"'," +
+                                       "'format.type' = 'csv')");
+                       tEnv.executeSql("create table print_table (name String, 
age Int) with ('connector' = 'print')");
+
+                       TableEnvUtil.execInsertSqlAndWaitResult(tEnv, "insert 
into print_table select * from csv_table");
+
+                       // assert query result
+                       assertEquals("+I(1,1)\n+I(2,2)\n+I(3,3)\n", 
arrayOutputStream.toString());
+               } finally {
+                       if (System.out != originalSystemOut) {
+                               System.out.close();
+                       }
+                       System.setOut(originalSystemOut);
+                       tEnv.executeSql("DROP TABLE csv_table");
+                       tEnv.executeSql("DROP TABLE print_table");
+               }
+       }
 }

Reply via email to