This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
commit 340bf0010e380ee32125fe4a4d135a4c4cdb6609 Author: JingsongLi <[email protected]> AuthorDate: Sat Mar 18 11:06:45 2023 +0800 [core] Rename paimon: hive meta should be compatible --- .../apache/paimon/tests/FileStoreBatchE2eTest.java | 12 ++--- .../tests/FileStoreBuiltInFormatE2eTest.java | 6 +-- .../java/org/apache/paimon/tests/HiveE2eTest.java | 11 ++--- .../apache/flink/table/hive/LegacyHiveClasses.java | 36 +++++++++++++++ .../java/org/apache/paimon/hive/HiveCatalog.java | 51 ++++++++++++---------- .../store/hive/TableStoreHiveStorageHandler.java | 25 +++++++++++ .../flink/table/store/hive/TableStoreSerDe.java | 25 +++++++++++ .../table/store/mapred/TableStoreInputFormat.java | 25 +++++++++++ .../table/store/mapred/TableStoreOutputFormat.java | 25 +++++++++++ 9 files changed, 176 insertions(+), 40 deletions(-) diff --git a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/FileStoreBatchE2eTest.java b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/FileStoreBatchE2eTest.java index aed5b0686..5bb1ff085 100644 --- a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/FileStoreBatchE2eTest.java +++ b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/FileStoreBatchE2eTest.java @@ -78,7 +78,7 @@ public class FileStoreBatchE2eTest extends E2eTestBase { String useCatalogCmd = "USE CATALOG ts_catalog;"; - String tableStoreDdl = + String paimonDdl = "CREATE TABLE IF NOT EXISTS ts_table (\n" + " dt VARCHAR,\n" + " hr VARCHAR,\n" @@ -98,14 +98,14 @@ public class FileStoreBatchE2eTest extends E2eTestBase { catalogDdl, useCatalogCmd, testDataSourceDdl, - tableStoreDdl); + paimonDdl); // test #1: read all data from paimon runSql( "INSERT INTO result1 SELECT * FROM ts_table;", catalogDdl, useCatalogCmd, - tableStoreDdl, + paimonDdl, createResultSink( "result1", "dt VARCHAR, hr VARCHAR, person VARCHAR, category VARCHAR, price INT")); @@ -133,7 +133,7 @@ public class FileStoreBatchE2eTest extends E2eTestBase { "INSERT INTO result2 SELECT * FROM ts_table WHERE dt > '20211110' AND hr < '09';", catalogDdl, useCatalogCmd, - tableStoreDdl, + paimonDdl, createResultSink( "result2", "dt VARCHAR, hr VARCHAR, person VARCHAR, category VARCHAR, price INT")); @@ -149,7 +149,7 @@ public class FileStoreBatchE2eTest extends E2eTestBase { "INSERT INTO result3 SELECT * FROM ts_table WHERE person = 'Alice' AND category = 'Food';", catalogDdl, useCatalogCmd, - tableStoreDdl, + paimonDdl, createResultSink( "result3", "dt VARCHAR, hr VARCHAR, person VARCHAR, category VARCHAR, price INT")); @@ -166,7 +166,7 @@ public class FileStoreBatchE2eTest extends E2eTestBase { + "INSERT INTO result4 SELECT dt, category, sum(price) AS total FROM ts_table GROUP BY dt, category;", catalogDdl, useCatalogCmd, - tableStoreDdl, + paimonDdl, createResultSink("result4", "dt VARCHAR, hr VARCHAR, total INT")); checkResult( "20211110, Drink, 200", diff --git a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/FileStoreBuiltInFormatE2eTest.java b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/FileStoreBuiltInFormatE2eTest.java index 3f171737f..9425c3042 100644 --- a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/FileStoreBuiltInFormatE2eTest.java +++ b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/FileStoreBuiltInFormatE2eTest.java @@ -51,7 +51,7 @@ public class FileStoreBuiltInFormatE2eTest extends E2eTestBase { TEST_DATA_DIR + "/" + UUID.randomUUID() + ".store"); String useCatalogCmd = "USE CATALOG ts_catalog;"; - String tableStoreDdl = + String paimonDdl = "CREATE TABLE IF NOT EXISTS ts_table (\n" + schema + ") WITH (\n" @@ -92,13 +92,13 @@ public class FileStoreBuiltInFormatE2eTest extends E2eTestBase { + "DATE '2022-05-23'" + ")"; - runSql(insertDml, catalogDdl, useCatalogCmd, tableStoreDdl); + runSql(insertDml, catalogDdl, useCatalogCmd, paimonDdl); runSql( "INSERT INTO result1 SELECT * FROM ts_table where id > 1;", catalogDdl, useCatalogCmd, - tableStoreDdl, + paimonDdl, createResultSink("result1", schema)); checkResult( "2, " diff --git a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/HiveE2eTest.java b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/HiveE2eTest.java index 8155306ca..20b139d26 100644 --- a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/HiveE2eTest.java +++ b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/HiveE2eTest.java @@ -57,8 +57,8 @@ public class HiveE2eTest extends E2eReaderTestBase { @Test public void testReadExternalTable() throws Exception { final String table = "table_store_pk"; - String tableStorePkPath = HDFS_ROOT + "/" + UUID.randomUUID() + ".store"; - String tableStorePkDdl = + String paimonPkPath = HDFS_ROOT + "/" + UUID.randomUUID() + ".store"; + String paimonPkDdl = String.format( "CREATE TABLE IF NOT EXISTS %s (\n" + " a int,\n" @@ -69,17 +69,14 @@ public class HiveE2eTest extends E2eReaderTestBase { + " 'bucket' = '2'\n" + ");", table); - runSql( - createInsertSql(table), - createCatalogSql("table_store", tableStorePkPath), - tableStorePkDdl); + runSql(createInsertSql(table), createCatalogSql("table_store", paimonPkPath), paimonPkDdl); String externalTablePkDdl = String.format( "CREATE EXTERNAL TABLE IF NOT EXISTS %s\n" + "STORED BY 'org.apache.paimon.hive.PaimonStorageHandler'\n" + "LOCATION '%s/default.db/%s';\n", - table, tableStorePkPath, table); + table, paimonPkPath, table); checkQueryResults(table, this::executeQuery, externalTablePkDdl); } diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/flink/table/hive/LegacyHiveClasses.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/flink/table/hive/LegacyHiveClasses.java new file mode 100644 index 000000000..0316f3f1d --- /dev/null +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/flink/table/hive/LegacyHiveClasses.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.hive; + +import org.apache.hadoop.hive.metastore.api.Table; + +/** Legacy hive classes of table store 0.3. */ +@Deprecated +public class LegacyHiveClasses { + + private static final String LEGACY_INPUT_FORMAT_CLASS_NAME = + "org.apache.flink.table.store.mapred.TableStoreInputFormat"; + private static final String LEGACY_OUTPUT_FORMAT_CLASS_NAME = + "org.apache.flink.table.store.mapred.TableStoreOutputFormat"; + + public static boolean isPaimonTable(Table table) { + return LEGACY_INPUT_FORMAT_CLASS_NAME.equals(table.getSd().getInputFormat()) + && LEGACY_OUTPUT_FORMAT_CLASS_NAME.equals(table.getSd().getOutputFormat()); + } +} diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java index 286d9908d..c7888860e 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java @@ -18,6 +18,8 @@ package org.apache.paimon.hive; +import org.apache.flink.table.hive.LegacyHiveClasses; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; @@ -166,7 +168,7 @@ public class HiveCatalog extends AbstractCatalog { // tables. // so we just check the schema file first return schemaFileExists(identifier) - && tableStoreTableExists(identifier, false); + && paimonTableExists(identifier, false); }) .collect(Collectors.toList()); } catch (UnknownDBException e) { @@ -178,7 +180,7 @@ public class HiveCatalog extends AbstractCatalog { @Override public TableSchema getDataTableSchema(Identifier identifier) throws TableNotExistException { - if (!tableStoreTableExists(identifier)) { + if (!paimonTableExists(identifier)) { throw new TableNotExistException(identifier); } Path tableLocation = getDataTableLocation(identifier); @@ -191,7 +193,7 @@ public class HiveCatalog extends AbstractCatalog { public void dropTable(Identifier identifier, boolean ignoreIfNotExists) throws TableNotExistException { checkNotSystemTable(identifier, "dropTable"); - if (!tableStoreTableExists(identifier)) { + if (!paimonTableExists(identifier)) { if (ignoreIfNotExists) { return; } else { @@ -215,7 +217,7 @@ public class HiveCatalog extends AbstractCatalog { if (!databaseExists(databaseName)) { throw new DatabaseNotExistException(databaseName); } - if (tableStoreTableExists(identifier)) { + if (paimonTableExists(identifier)) { if (ignoreIfExists) { return; } else { @@ -256,7 +258,7 @@ public class HiveCatalog extends AbstractCatalog { throws TableNotExistException, TableAlreadyExistException { checkNotSystemTable(fromTable, "renameTable"); checkNotSystemTable(toTable, "renameTable"); - if (!tableStoreTableExists(fromTable)) { + if (!paimonTableExists(fromTable)) { if (ignoreIfNotExists) { return; } else { @@ -264,7 +266,7 @@ public class HiveCatalog extends AbstractCatalog { } } - if (tableStoreTableExists(toTable)) { + if (paimonTableExists(toTable)) { throw new TableAlreadyExistException(toTable); } @@ -285,7 +287,7 @@ public class HiveCatalog extends AbstractCatalog { Identifier identifier, List<SchemaChange> changes, boolean ignoreIfNotExists) throws TableNotExistException { checkNotSystemTable(identifier, "alterTable"); - if (!tableStoreTableExists(identifier)) { + if (!paimonTableExists(identifier)) { if (ignoreIfNotExists) { return; } else { @@ -414,15 +416,15 @@ public class HiveCatalog extends AbstractCatalog { dataField.description()); } - private boolean tableStoreTableExists(Identifier identifier) { - return tableStoreTableExists(identifier, true); + private boolean paimonTableExists(Identifier identifier) { + return paimonTableExists(identifier, true); } private boolean schemaFileExists(Identifier identifier) { return new SchemaManager(fileIO, getDataTableLocation(identifier)).latest().isPresent(); } - private boolean tableStoreTableExists(Identifier identifier, boolean throwException) { + private boolean paimonTableExists(Identifier identifier, boolean throwException) { Table table; try { table = client.getTable(identifier.getDatabaseName(), identifier.getObjectName()); @@ -434,21 +436,22 @@ public class HiveCatalog extends AbstractCatalog { e); } - if (!INPUT_FORMAT_CLASS_NAME.equals(table.getSd().getInputFormat()) - || !OUTPUT_FORMAT_CLASS_NAME.equals(table.getSd().getOutputFormat())) { - if (throwException) { - throw new IllegalArgumentException( - "Table " - + identifier.getFullName() - + " is not a paimon table. It's input format is " - + table.getSd().getInputFormat() - + " and its output format is " - + table.getSd().getOutputFormat()); - } else { - return false; - } + boolean isPaimonTable = isPaimonTable(table) || LegacyHiveClasses.isPaimonTable(table); + if (!isPaimonTable && throwException) { + throw new IllegalArgumentException( + "Table " + + identifier.getFullName() + + " is not a paimon table. It's input format is " + + table.getSd().getInputFormat() + + " and its output format is " + + table.getSd().getOutputFormat()); } - return true; + return isPaimonTable; + } + + private static boolean isPaimonTable(Table table) { + return INPUT_FORMAT_CLASS_NAME.equals(table.getSd().getInputFormat()) + && OUTPUT_FORMAT_CLASS_NAME.equals(table.getSd().getOutputFormat()); } private SchemaManager schemaManager(Identifier identifier) { diff --git a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandler.java b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandler.java new file mode 100644 index 000000000..b1336d536 --- /dev/null +++ b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandler.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.hive; + +import org.apache.paimon.hive.PaimonStorageHandler; + +/** A {@link PaimonStorageHandler} to be compatible to table store 0.3. */ +@Deprecated +public class TableStoreHiveStorageHandler extends PaimonStorageHandler {} diff --git a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/flink/table/store/hive/TableStoreSerDe.java b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/flink/table/store/hive/TableStoreSerDe.java new file mode 100644 index 000000000..c2fe9b98e --- /dev/null +++ b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/flink/table/store/hive/TableStoreSerDe.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.hive; + +import org.apache.paimon.hive.PaimonSerDe; + +/** A {@link PaimonSerDe} to be compatible to table store 0.3. */ +@Deprecated +public class TableStoreSerDe extends PaimonSerDe {} diff --git a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputFormat.java b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputFormat.java new file mode 100644 index 000000000..28d83ff84 --- /dev/null +++ b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputFormat.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.mapred; + +import org.apache.paimon.hive.mapred.PaimonInputFormat; + +/** A {@link PaimonInputFormat} to be compatible to table store 0.3. */ +@Deprecated +public class TableStoreInputFormat extends PaimonInputFormat {} diff --git a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/flink/table/store/mapred/TableStoreOutputFormat.java b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/flink/table/store/mapred/TableStoreOutputFormat.java new file mode 100644 index 000000000..052196841 --- /dev/null +++ b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/flink/table/store/mapred/TableStoreOutputFormat.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.mapred; + +import org.apache.paimon.hive.mapred.PaimonOutputFormat; + +/** A {@link PaimonOutputFormat} to be compatible to table store 0.3. */ +@Deprecated +public class TableStoreOutputFormat extends PaimonOutputFormat {}
