This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit da1ccb9cfae8be0abda7e85d15db3d38304336d2 Author: JingsongLi <[email protected]> AuthorDate: Mon May 25 14:22:50 2020 +0800 [FLINK-17896][hive] HiveCatalog can work with new table factory because of is_generic This closes #12316 --- .../connectors/hive/HiveDynamicTableFactory.java | 88 ++++++++++++++++++++++ .../flink/table/catalog/hive/HiveCatalog.java | 7 ++ .../table/catalog/hive/HiveCatalogITCase.java | 44 ++++++++++- 3 files changed, 135 insertions(+), 4 deletions(-) diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveDynamicTableFactory.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveDynamicTableFactory.java new file mode 100644 index 0000000..56086fd --- /dev/null +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveDynamicTableFactory.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connectors.hive; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.factories.DynamicTableSinkFactory; +import org.apache.flink.table.factories.DynamicTableSourceFactory; +import org.apache.flink.table.factories.FactoryUtil; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +import static org.apache.flink.table.catalog.config.CatalogConfig.IS_GENERIC; + +/** + * A dynamic table factory implementation for Hive catalog. Now it only support generic tables. + * Hive tables should be resolved by {@link HiveTableFactory}. + */ +public class HiveDynamicTableFactory implements + DynamicTableSourceFactory, + DynamicTableSinkFactory { + + @Override + public String factoryIdentifier() { + throw new UnsupportedOperationException("Hive factory is only work for catalog."); + } + + @Override + public Set<ConfigOption<?>> requiredOptions() { + throw new UnsupportedOperationException("Hive factory is only work for catalog."); + } + + @Override + public Set<ConfigOption<?>> optionalOptions() { + throw new UnsupportedOperationException("Hive factory is only work for catalog."); + } + + private static CatalogTable removeIsGenericFlag(CatalogTable table) { + Map<String, String> newOptions = new HashMap<>(table.getOptions()); + boolean isGeneric = Boolean.parseBoolean(newOptions.remove(IS_GENERIC)); + if (!isGeneric) { + throw new ValidationException( + "Hive dynamic table factory now only work for generic table."); + } + return table.copy(newOptions); + } + + @Override + public DynamicTableSink createDynamicTableSink(Context context) { + return FactoryUtil.createTableSink( + null, // we already in the factory of catalog + context.getObjectIdentifier(), + removeIsGenericFlag(context.getCatalogTable()), + context.getConfiguration(), + context.getClassLoader()); + } + + @Override + public DynamicTableSource createDynamicTableSource(Context context) { + return FactoryUtil.createTableSource( + null, // we already in the factory of catalog + context.getObjectIdentifier(), + removeIsGenericFlag(context.getCatalogTable()), + context.getConfiguration(), + context.getClassLoader()); + } +} diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java index 5bc469f..d10127f 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java @@ -20,6 +20,7 @@ package org.apache.flink.table.catalog.hive; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils; +import org.apache.flink.connectors.hive.HiveDynamicTableFactory; import org.apache.flink.connectors.hive.HiveTableFactory; import org.apache.flink.sql.parser.hive.ddl.HiveDDLUtils; import org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveTable.AlterTableOp; @@ -69,6 +70,7 @@ import org.apache.flink.table.catalog.stats.CatalogTableStatistics; import org.apache.flink.table.descriptors.DescriptorProperties; import org.apache.flink.table.descriptors.Schema; import org.apache.flink.table.expressions.Expression; +import org.apache.flink.table.factories.Factory; import org.apache.flink.table.factories.FunctionDefinitionFactory; import org.apache.flink.table.factories.TableFactory; import org.apache.flink.util.Preconditions; @@ -238,6 +240,11 @@ public class HiveCatalog extends AbstractCatalog { } @Override + public Optional<Factory> getFactory() { + return Optional.of(new HiveDynamicTableFactory()); + } + + @Override public Optional<TableFactory> getTableFactory() { return Optional.of(new HiveTableFactory(hiveConf)); } diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogITCase.java index 4064057..2817361 100644 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogITCase.java +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogITCase.java @@ -27,7 +27,6 @@ import org.apache.flink.table.api.Table; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.api.Types; -import org.apache.flink.table.api.config.ExecutionConfigOptions; import org.apache.flink.table.catalog.CatalogTable; import org.apache.flink.table.catalog.CatalogTableBuilder; import org.apache.flink.table.catalog.ObjectPath; @@ -52,8 +51,10 @@ import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; import java.io.BufferedReader; +import java.io.ByteArrayOutputStream; import java.io.File; import java.io.FileReader; +import java.io.PrintStream; import java.net.URI; import java.nio.file.Path; import java.nio.file.Paths; @@ -63,6 +64,7 @@ import java.util.Comparator; import java.util.HashSet; import java.util.List; +import static org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; @@ -237,7 +239,7 @@ public class HiveCatalogITCase { // similar to CatalogTableITCase::testReadWriteCsvUsingDDL but uses HiveCatalog EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); TableEnvironment tableEnv = TableEnvironment.create(settings); - tableEnv.getConfig().getConfiguration().setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1); + tableEnv.getConfig().getConfiguration().setInteger(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1); tableEnv.registerCatalog("myhive", hiveCatalog); tableEnv.useCatalog("myhive"); @@ -282,8 +284,7 @@ public class HiveCatalogITCase { } EnvironmentSettings settings = builder.build(); TableEnvironment tableEnv = TableEnvironment.create(settings); - tableEnv.getConfig().getConfiguration().setInteger( - ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1); + tableEnv.getConfig().getConfiguration().setInteger(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1); tableEnv.registerCatalog("myhive", hiveCatalog); tableEnv.useCatalog("myhive"); @@ -307,4 +308,39 @@ public class HiveCatalogITCase { Assert.assertEquals(5, rows.size()); tableEnv.executeSql("DROP TABLE proctime_src"); } + + @Test + public void testNewTableFactory() { + TableEnvironment tEnv = TableEnvironment.create( + EnvironmentSettings.newInstance().inBatchMode().build()); + tEnv.registerCatalog("myhive", hiveCatalog); + tEnv.useCatalog("myhive"); + tEnv.getConfig().getConfiguration().set(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1); + + String path = this.getClass().getResource("/csv/test.csv").getPath(); + + PrintStream originalSystemOut = System.out; + try { + ByteArrayOutputStream arrayOutputStream = new ByteArrayOutputStream(); + System.setOut(new PrintStream(arrayOutputStream)); + + tEnv.executeSql("create table csv_table (name String, age Int) with (" + + "'connector.type' = 'filesystem'," + + "'connector.path' = 'file://" + path + "'," + + "'format.type' = 'csv')"); + tEnv.executeSql("create table print_table (name String, age Int) with ('connector' = 'print')"); + + TableEnvUtil.execInsertSqlAndWaitResult(tEnv, "insert into print_table select * from csv_table"); + + // assert query result + assertEquals("+I(1,1)\n+I(2,2)\n+I(3,3)\n", arrayOutputStream.toString()); + } finally { + if (System.out != originalSystemOut) { + System.out.close(); + } + System.setOut(originalSystemOut); + tEnv.executeSql("DROP TABLE csv_table"); + tEnv.executeSql("DROP TABLE print_table"); + } + } }
