This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new fa51956e [FLINK-31309] Delete DFS schema when create table failed in
HiveCatalog
fa51956e is described below
commit fa51956e4fd1df18c7cfa87ae0f0042bab2a209b
Author: Shammon FY <[email protected]>
AuthorDate: Mon Mar 6 17:21:46 2023 +0800
[FLINK-31309] Delete DFS schema when create table failed in HiveCatalog
This closes #575
---
.../org/apache/flink/table/store/fs/FileIO.java | 14 ++++++
.../apache/flink/table/store/hive/HiveCatalog.java | 9 ++++
.../table/store/hive/FailHiveMetaStoreClient.java | 55 ++++++++++++++++++++++
.../flink/table/store/hive/HiveCatalogITCase.java | 35 ++++++++++++++
4 files changed, 113 insertions(+)
diff --git
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/fs/FileIO.java
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/fs/FileIO.java
index 74d3a20f..8e2f0374 100644
---
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/fs/FileIO.java
+++
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/fs/FileIO.java
@@ -146,6 +146,20 @@ public interface FileIO extends Serializable {
}
}
+ default void deleteDirectoryQuietly(Path directory) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Ready to delete " + directory.toString());
+ }
+
+ try {
+ if (!delete(directory, true) && exists(directory)) {
+ LOG.warn("Failed to delete directory " + directory);
+ }
+ } catch (IOException e) {
+ LOG.warn("Exception occurs when deleting directory " + directory,
e);
+ }
+ }
+
default long getFileSize(Path path) throws IOException {
return getFileStatus(path).getLen();
}
diff --git
a/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalog.java
b/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalog.java
index 6771740e..9a4d1640 100644
---
a/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalog.java
+++
b/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalog.java
@@ -49,6 +49,8 @@ import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.api.UnknownDBException;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.Collections;
import java.util.HashMap;
@@ -66,6 +68,7 @@ import static
org.apache.flink.table.store.utils.Preconditions.checkState;
/** A catalog implementation for Hive. */
public class HiveCatalog extends AbstractCatalog {
+ private static final Logger LOG =
LoggerFactory.getLogger(HiveCatalog.class);
// we don't include flink-table-store-hive-connector as dependencies
because it depends on
// hive-exec
@@ -241,6 +244,12 @@ public class HiveCatalog extends AbstractCatalog {
try {
client.createTable(table);
} catch (TException e) {
+ Path path = getDataTableLocation(identifier);
+ try {
+ fileIO.deleteDirectoryQuietly(path);
+ } catch (Exception ee) {
+ LOG.error("Delete directory[{}] fail for table {}", path,
identifier, ee);
+ }
throw new RuntimeException("Failed to create table " +
identifier.getFullName(), e);
}
}
diff --git
a/flink-table-store-hive/flink-table-store-hive-catalog/src/test/java/org/apache/flink/table/store/hive/FailHiveMetaStoreClient.java
b/flink-table-store-hive/flink-table-store-hive-catalog/src/test/java/org/apache/flink/table/store/hive/FailHiveMetaStoreClient.java
new file mode 100644
index 00000000..85cc933e
--- /dev/null
+++
b/flink-table-store-hive/flink-table-store-hive-catalog/src/test/java/org/apache/flink/table/store/hive/FailHiveMetaStoreClient.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.hive;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaHookLoader;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
+import org.apache.hadoop.hive.metastore.api.InvalidObjectException;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.thrift.TException;
+
+/** A {@link HiveMetaStoreClient} to test accessing exception in Hive
metastore client. */
+public class FailHiveMetaStoreClient extends HiveMetaStoreClient implements
IMetaStoreClient {
+ public FailHiveMetaStoreClient(HiveConf conf) throws MetaException {
+ super(conf);
+ }
+
+ public FailHiveMetaStoreClient(HiveConf conf, HiveMetaHookLoader
hookLoader)
+ throws MetaException {
+ super(conf, hookLoader);
+ }
+
+ public FailHiveMetaStoreClient(
+ HiveConf conf, HiveMetaHookLoader hookLoader, Boolean
allowEmbedded)
+ throws MetaException {
+ super(conf, hookLoader, allowEmbedded);
+ }
+
+ @Override
+ public void createTable(Table tbl)
+ throws AlreadyExistsException, InvalidObjectException,
MetaException,
+ NoSuchObjectException, TException {
+ throw new TException();
+ }
+}
diff --git
a/flink-table-store-hive/flink-table-store-hive-catalog/src/test/java/org/apache/flink/table/store/hive/HiveCatalogITCase.java
b/flink-table-store-hive/flink-table-store-hive-catalog/src/test/java/org/apache/flink/table/store/hive/HiveCatalogITCase.java
index 47388fc8..f12695cb 100644
---
a/flink-table-store-hive/flink-table-store-hive-catalog/src/test/java/org/apache/flink/table/store/hive/HiveCatalogITCase.java
+++
b/flink-table-store-hive/flink-table-store-hive-catalog/src/test/java/org/apache/flink/table/store/hive/HiveCatalogITCase.java
@@ -22,12 +22,14 @@ import
org.apache.flink.connectors.hive.FlinkEmbeddedHiveRunner;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.internal.TableEnvironmentImpl;
import org.apache.flink.table.store.connector.FlinkCatalog;
import org.apache.flink.table.store.file.catalog.AbstractCatalog;
import org.apache.flink.table.store.file.catalog.Catalog;
import org.apache.flink.table.store.file.catalog.CatalogLock;
import org.apache.flink.table.store.file.catalog.Identifier;
+import org.apache.flink.table.store.file.schema.SchemaManager;
import org.apache.flink.table.store.fs.local.LocalFileIO;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
@@ -60,6 +62,7 @@ import static
org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_SUPPORT_CONCURR
import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_TXN_MANAGER;
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
+import static org.junit.jupiter.api.Assertions.assertTrue;
/** IT cases for {@link HiveCatalog}. */
@RunWith(FlinkEmbeddedHiveRunner.class)
@@ -527,6 +530,38 @@ public class HiveCatalogITCase {
Assert.assertEquals("[]", tables.toString());
}
+ @Test
+ public void testCreateExistTableInHive() throws Exception {
+ tEnv.executeSql(
+ String.join(
+ "\n",
+ "CREATE CATALOG my_hive_custom_client WITH (",
+ " 'type' = 'table-store',",
+ " 'metastore' = 'hive',",
+ " 'uri' = '',",
+ " 'warehouse' = '" + path + "',",
+ " 'metastore.client.class' = '"
+ + FailHiveMetaStoreClient.class.getName()
+ + "'",
+ ")"));
+ tEnv.executeSql("USE CATALOG my_hive_custom_client");
+ assertThatThrownBy(
+ () ->
+ tEnv.executeSql(
+ "CREATE TABLE hive_table(a
INT, b INT, c INT, d INT)")
+ .await())
+ .isInstanceOf(TableException.class)
+ .hasMessage(
+ "Could not execute CreateTable in path
`my_hive_custom_client`.`default`.`hive_table`");
+ assertTrue(
+ new SchemaManager(
+ LocalFileIO.create(),
+ new org.apache.flink.table.store.fs.Path(
+ path, "default.db/hive_table"))
+ .listAllIds()
+ .isEmpty());
+ }
+
private List<Row> collect(String sql) throws Exception {
List<Row> result = new ArrayList<>();
try (CloseableIterator<Row> it = tEnv.executeSql(sql).collect()) {