This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new fa51956e [FLINK-31309] Delete DFS schema when create table failed in 
HiveCatalog
fa51956e is described below

commit fa51956e4fd1df18c7cfa87ae0f0042bab2a209b
Author: Shammon FY <[email protected]>
AuthorDate: Mon Mar 6 17:21:46 2023 +0800

    [FLINK-31309] Delete DFS schema when create table failed in HiveCatalog
    
    This closes #575
---
 .../org/apache/flink/table/store/fs/FileIO.java    | 14 ++++++
 .../apache/flink/table/store/hive/HiveCatalog.java |  9 ++++
 .../table/store/hive/FailHiveMetaStoreClient.java  | 55 ++++++++++++++++++++++
 .../flink/table/store/hive/HiveCatalogITCase.java  | 35 ++++++++++++++
 4 files changed, 113 insertions(+)

diff --git 
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/fs/FileIO.java
 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/fs/FileIO.java
index 74d3a20f..8e2f0374 100644
--- 
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/fs/FileIO.java
+++ 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/fs/FileIO.java
@@ -146,6 +146,20 @@ public interface FileIO extends Serializable {
         }
     }
 
+    default void deleteDirectoryQuietly(Path directory) {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Ready to delete " + directory.toString());
+        }
+
+        try {
+            if (!delete(directory, true) && exists(directory)) {
+                LOG.warn("Failed to delete directory " + directory);
+            }
+        } catch (IOException e) {
+            LOG.warn("Exception occurs when deleting directory " + directory, 
e);
+        }
+    }
+
     default long getFileSize(Path path) throws IOException {
         return getFileStatus(path).getLen();
     }
diff --git 
a/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalog.java
 
b/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalog.java
index 6771740e..9a4d1640 100644
--- 
a/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalog.java
+++ 
b/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalog.java
@@ -49,6 +49,8 @@ import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.api.UnknownDBException;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.Collections;
 import java.util.HashMap;
@@ -66,6 +68,7 @@ import static 
org.apache.flink.table.store.utils.Preconditions.checkState;
 
 /** A catalog implementation for Hive. */
 public class HiveCatalog extends AbstractCatalog {
+    private static final Logger LOG = 
LoggerFactory.getLogger(HiveCatalog.class);
 
     // we don't include flink-table-store-hive-connector as dependencies 
because it depends on
     // hive-exec
@@ -241,6 +244,12 @@ public class HiveCatalog extends AbstractCatalog {
         try {
             client.createTable(table);
         } catch (TException e) {
+            Path path = getDataTableLocation(identifier);
+            try {
+                fileIO.deleteDirectoryQuietly(path);
+            } catch (Exception ee) {
+                LOG.error("Delete directory[{}] fail for table {}", path, 
identifier, ee);
+            }
             throw new RuntimeException("Failed to create table " + 
identifier.getFullName(), e);
         }
     }
diff --git 
a/flink-table-store-hive/flink-table-store-hive-catalog/src/test/java/org/apache/flink/table/store/hive/FailHiveMetaStoreClient.java
 
b/flink-table-store-hive/flink-table-store-hive-catalog/src/test/java/org/apache/flink/table/store/hive/FailHiveMetaStoreClient.java
new file mode 100644
index 00000000..85cc933e
--- /dev/null
+++ 
b/flink-table-store-hive/flink-table-store-hive-catalog/src/test/java/org/apache/flink/table/store/hive/FailHiveMetaStoreClient.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.hive;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaHookLoader;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
+import org.apache.hadoop.hive.metastore.api.InvalidObjectException;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.thrift.TException;
+
+/** A {@link HiveMetaStoreClient} to test accessing exception in Hive 
metastore client. */
+public class FailHiveMetaStoreClient extends HiveMetaStoreClient implements 
IMetaStoreClient {
+    public FailHiveMetaStoreClient(HiveConf conf) throws MetaException {
+        super(conf);
+    }
+
+    public FailHiveMetaStoreClient(HiveConf conf, HiveMetaHookLoader 
hookLoader)
+            throws MetaException {
+        super(conf, hookLoader);
+    }
+
+    public FailHiveMetaStoreClient(
+            HiveConf conf, HiveMetaHookLoader hookLoader, Boolean 
allowEmbedded)
+            throws MetaException {
+        super(conf, hookLoader, allowEmbedded);
+    }
+
+    @Override
+    public void createTable(Table tbl)
+            throws AlreadyExistsException, InvalidObjectException, 
MetaException,
+                    NoSuchObjectException, TException {
+        throw new TException();
+    }
+}
diff --git 
a/flink-table-store-hive/flink-table-store-hive-catalog/src/test/java/org/apache/flink/table/store/hive/HiveCatalogITCase.java
 
b/flink-table-store-hive/flink-table-store-hive-catalog/src/test/java/org/apache/flink/table/store/hive/HiveCatalogITCase.java
index 47388fc8..f12695cb 100644
--- 
a/flink-table-store-hive/flink-table-store-hive-catalog/src/test/java/org/apache/flink/table/store/hive/HiveCatalogITCase.java
+++ 
b/flink-table-store-hive/flink-table-store-hive-catalog/src/test/java/org/apache/flink/table/store/hive/HiveCatalogITCase.java
@@ -22,12 +22,14 @@ import 
org.apache.flink.connectors.hive.FlinkEmbeddedHiveRunner;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.internal.TableEnvironmentImpl;
 import org.apache.flink.table.store.connector.FlinkCatalog;
 import org.apache.flink.table.store.file.catalog.AbstractCatalog;
 import org.apache.flink.table.store.file.catalog.Catalog;
 import org.apache.flink.table.store.file.catalog.CatalogLock;
 import org.apache.flink.table.store.file.catalog.Identifier;
+import org.apache.flink.table.store.file.schema.SchemaManager;
 import org.apache.flink.table.store.fs.local.LocalFileIO;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.CloseableIterator;
@@ -60,6 +62,7 @@ import static 
org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_SUPPORT_CONCURR
 import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_TXN_MANAGER;
 import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
 import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /** IT cases for {@link HiveCatalog}. */
 @RunWith(FlinkEmbeddedHiveRunner.class)
@@ -527,6 +530,38 @@ public class HiveCatalogITCase {
         Assert.assertEquals("[]", tables.toString());
     }
 
+    @Test
+    public void testCreateExistTableInHive() throws Exception {
+        tEnv.executeSql(
+                String.join(
+                        "\n",
+                        "CREATE CATALOG my_hive_custom_client WITH (",
+                        "  'type' = 'table-store',",
+                        "  'metastore' = 'hive',",
+                        "  'uri' = '',",
+                        "  'warehouse' = '" + path + "',",
+                        "  'metastore.client.class' = '"
+                                + FailHiveMetaStoreClient.class.getName()
+                                + "'",
+                        ")"));
+        tEnv.executeSql("USE CATALOG my_hive_custom_client");
+        assertThatThrownBy(
+                        () ->
+                                tEnv.executeSql(
+                                                "CREATE TABLE hive_table(a 
INT, b INT, c INT, d INT)")
+                                        .await())
+                .isInstanceOf(TableException.class)
+                .hasMessage(
+                        "Could not execute CreateTable in path 
`my_hive_custom_client`.`default`.`hive_table`");
+        assertTrue(
+                new SchemaManager(
+                                LocalFileIO.create(),
+                                new org.apache.flink.table.store.fs.Path(
+                                        path, "default.db/hive_table"))
+                        .listAllIds()
+                        .isEmpty());
+    }
+
     private List<Row> collect(String sql) throws Exception {
         List<Row> result = new ArrayList<>();
         try (CloseableIterator<Row> it = tEnv.executeSql(sql).collect()) {

Reply via email to