This is an automated email from the ASF dual-hosted git repository.

fokko pushed a commit to branch 1.3.x
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/1.3.x by this push:
     new e730f847af Flink: remove the creation of default database in 
FlinkCatalog open method (#7795) (#8039)
e730f847af is described below

commit e730f847af9691f79fb264653c4434a48500b4f5
Author: Fokko Driesprong <[email protected]>
AuthorDate: Wed Jul 12 07:06:52 2023 +0200

    Flink: remove the creation of default database in FlinkCatalog open method 
(#7795) (#8039)
---
 .../org/apache/iceberg/flink/FlinkCatalog.java     |  9 +---
 .../iceberg/flink/TestFlinkCatalogDatabase.java    | 51 ++++++++--------------
 .../org/apache/iceberg/flink/FlinkCatalog.java     |  9 +---
 .../iceberg/flink/TestFlinkCatalogDatabase.java    | 51 ++++++++--------------
 .../org/apache/iceberg/flink/FlinkCatalog.java     |  9 +---
 .../iceberg/flink/TestFlinkCatalogDatabase.java    | 27 ++----------
 6 files changed, 41 insertions(+), 115 deletions(-)

diff --git 
a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java 
b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
index 3ecddaa05c..825816fdf4 100644
--- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
@@ -124,14 +124,7 @@ public class FlinkCatalog extends AbstractCatalog {
   }
 
   @Override
-  public void open() throws CatalogException {
-    // Create the default database if it does not exist.
-    try {
-      createDatabase(getDefaultDatabase(), ImmutableMap.of(), true);
-    } catch (DatabaseAlreadyExistException e) {
-      // Ignore the exception if it's already exist.
-    }
-  }
+  public void open() throws CatalogException {}
 
   @Override
   public void close() throws CatalogException {
diff --git 
a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogDatabase.java
 
b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogDatabase.java
index a19e5f72bb..47b47cb626 100644
--- 
a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogDatabase.java
+++ 
b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogDatabase.java
@@ -24,12 +24,11 @@ import java.util.Map;
 import java.util.Objects;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
 import org.apache.flink.types.Row;
-import org.apache.iceberg.AssertHelpers;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.catalog.Namespace;
 import org.apache.iceberg.catalog.TableIdentifier;
-import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.types.Types;
+import org.assertj.core.api.Assertions;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Assume;
@@ -74,19 +73,6 @@ public class TestFlinkCatalogDatabase extends 
FlinkCatalogTestBase {
         "Database should be created", 
validationNamespaceCatalog.namespaceExists(icebergNamespace));
   }
 
-  @Test
-  public void testDefaultDatabase() {
-    sql("USE CATALOG %s", catalogName);
-    sql("SHOW TABLES");
-
-    Assert.assertEquals(
-        "Should use the current catalog", getTableEnv().getCurrentCatalog(), 
catalogName);
-    Assert.assertEquals(
-        "Should use the configured default namespace",
-        getTableEnv().getCurrentDatabase(),
-        "default");
-  }
-
   @Test
   public void testDropEmptyDatabase() {
     Assert.assertFalse(
@@ -126,11 +112,11 @@ public class TestFlinkCatalogDatabase extends 
FlinkCatalogTestBase {
         "Table should exist",
         validationCatalog.tableExists(TableIdentifier.of(icebergNamespace, 
"tl")));
 
-    AssertHelpers.assertThrowsCause(
-        "Should fail if trying to delete a non-empty database",
-        DatabaseNotEmptyException.class,
-        String.format("Database %s in catalog %s is not empty.", DATABASE, 
catalogName),
-        () -> sql("DROP DATABASE %s", flinkDatabase));
+    Assertions.assertThatThrownBy(() -> sql("DROP DATABASE %s", flinkDatabase))
+        .cause()
+        .isInstanceOf(DatabaseNotEmptyException.class)
+        .hasMessage(
+            String.format("Database %s in catalog %s is not empty.", DATABASE, 
catalogName));
 
     sql("DROP TABLE %s.tl", flinkDatabase);
   }
@@ -174,22 +160,17 @@ public class TestFlinkCatalogDatabase extends 
FlinkCatalogTestBase {
     List<Row> databases = sql("SHOW DATABASES");
 
     if (isHadoopCatalog) {
-      Assert.assertEquals("Should have 2 database", 2, databases.size());
-      Assert.assertEquals(
-          "Should have db and default database",
-          Sets.newHashSet("default", "db"),
-          Sets.newHashSet(databases.get(0).getField(0), 
databases.get(1).getField(0)));
+      Assert.assertEquals("Should have 1 database", 1, databases.size());
+      Assert.assertEquals("Should have db database", "db", 
databases.get(0).getField(0));
 
       if (!baseNamespace.isEmpty()) {
         // test namespace not belongs to this catalog
         validationNamespaceCatalog.createNamespace(
             Namespace.of(baseNamespace.level(0), "UNKNOWN_NAMESPACE"));
         databases = sql("SHOW DATABASES");
-        Assert.assertEquals("Should have 2 database", 2, databases.size());
+        Assert.assertEquals("Should have 1 database", 1, databases.size());
         Assert.assertEquals(
-            "Should have db and default database",
-            Sets.newHashSet("default", "db"),
-            Sets.newHashSet(databases.get(0).getField(0), 
databases.get(1).getField(0)));
+            "Should have db and default database", "db", 
databases.get(0).getField(0));
       }
     } else {
       // If there are multiple classes extends FlinkTestBase, 
TestHiveMetastore may loose the
@@ -301,10 +282,12 @@ public class TestFlinkCatalogDatabase extends 
FlinkCatalogTestBase {
         "Namespace should not already exist",
         validationNamespaceCatalog.namespaceExists(icebergNamespace));
 
-    AssertHelpers.assertThrowsCause(
-        "Should fail if trying to create database with location in hadoop 
catalog.",
-        UnsupportedOperationException.class,
-        String.format("Cannot create namespace %s: metadata is not supported", 
icebergNamespace),
-        () -> sql("CREATE DATABASE %s WITH ('prop'='value')", flinkDatabase));
+    Assertions.assertThatThrownBy(
+            () -> sql("CREATE DATABASE %s WITH ('prop'='value')", 
flinkDatabase))
+        .cause()
+        .isInstanceOf(UnsupportedOperationException.class)
+        .hasMessage(
+            String.format(
+                "Cannot create namespace %s: metadata is not supported", 
icebergNamespace));
   }
 }
diff --git 
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java 
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
index 3ecddaa05c..825816fdf4 100644
--- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
+++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
@@ -124,14 +124,7 @@ public class FlinkCatalog extends AbstractCatalog {
   }
 
   @Override
-  public void open() throws CatalogException {
-    // Create the default database if it does not exist.
-    try {
-      createDatabase(getDefaultDatabase(), ImmutableMap.of(), true);
-    } catch (DatabaseAlreadyExistException e) {
-      // Ignore the exception if it's already exist.
-    }
-  }
+  public void open() throws CatalogException {}
 
   @Override
   public void close() throws CatalogException {
diff --git 
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogDatabase.java
 
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogDatabase.java
index a19e5f72bb..47b47cb626 100644
--- 
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogDatabase.java
+++ 
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogDatabase.java
@@ -24,12 +24,11 @@ import java.util.Map;
 import java.util.Objects;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
 import org.apache.flink.types.Row;
-import org.apache.iceberg.AssertHelpers;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.catalog.Namespace;
 import org.apache.iceberg.catalog.TableIdentifier;
-import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.types.Types;
+import org.assertj.core.api.Assertions;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Assume;
@@ -74,19 +73,6 @@ public class TestFlinkCatalogDatabase extends 
FlinkCatalogTestBase {
         "Database should be created", 
validationNamespaceCatalog.namespaceExists(icebergNamespace));
   }
 
-  @Test
-  public void testDefaultDatabase() {
-    sql("USE CATALOG %s", catalogName);
-    sql("SHOW TABLES");
-
-    Assert.assertEquals(
-        "Should use the current catalog", getTableEnv().getCurrentCatalog(), 
catalogName);
-    Assert.assertEquals(
-        "Should use the configured default namespace",
-        getTableEnv().getCurrentDatabase(),
-        "default");
-  }
-
   @Test
   public void testDropEmptyDatabase() {
     Assert.assertFalse(
@@ -126,11 +112,11 @@ public class TestFlinkCatalogDatabase extends 
FlinkCatalogTestBase {
         "Table should exist",
         validationCatalog.tableExists(TableIdentifier.of(icebergNamespace, 
"tl")));
 
-    AssertHelpers.assertThrowsCause(
-        "Should fail if trying to delete a non-empty database",
-        DatabaseNotEmptyException.class,
-        String.format("Database %s in catalog %s is not empty.", DATABASE, 
catalogName),
-        () -> sql("DROP DATABASE %s", flinkDatabase));
+    Assertions.assertThatThrownBy(() -> sql("DROP DATABASE %s", flinkDatabase))
+        .cause()
+        .isInstanceOf(DatabaseNotEmptyException.class)
+        .hasMessage(
+            String.format("Database %s in catalog %s is not empty.", DATABASE, 
catalogName));
 
     sql("DROP TABLE %s.tl", flinkDatabase);
   }
@@ -174,22 +160,17 @@ public class TestFlinkCatalogDatabase extends 
FlinkCatalogTestBase {
     List<Row> databases = sql("SHOW DATABASES");
 
     if (isHadoopCatalog) {
-      Assert.assertEquals("Should have 2 database", 2, databases.size());
-      Assert.assertEquals(
-          "Should have db and default database",
-          Sets.newHashSet("default", "db"),
-          Sets.newHashSet(databases.get(0).getField(0), 
databases.get(1).getField(0)));
+      Assert.assertEquals("Should have 1 database", 1, databases.size());
+      Assert.assertEquals("Should have db database", "db", 
databases.get(0).getField(0));
 
       if (!baseNamespace.isEmpty()) {
         // test namespace not belongs to this catalog
         validationNamespaceCatalog.createNamespace(
             Namespace.of(baseNamespace.level(0), "UNKNOWN_NAMESPACE"));
         databases = sql("SHOW DATABASES");
-        Assert.assertEquals("Should have 2 database", 2, databases.size());
+        Assert.assertEquals("Should have 1 database", 1, databases.size());
         Assert.assertEquals(
-            "Should have db and default database",
-            Sets.newHashSet("default", "db"),
-            Sets.newHashSet(databases.get(0).getField(0), 
databases.get(1).getField(0)));
+            "Should have db and default database", "db", 
databases.get(0).getField(0));
       }
     } else {
       // If there are multiple classes extends FlinkTestBase, 
TestHiveMetastore may loose the
@@ -301,10 +282,12 @@ public class TestFlinkCatalogDatabase extends 
FlinkCatalogTestBase {
         "Namespace should not already exist",
         validationNamespaceCatalog.namespaceExists(icebergNamespace));
 
-    AssertHelpers.assertThrowsCause(
-        "Should fail if trying to create database with location in hadoop 
catalog.",
-        UnsupportedOperationException.class,
-        String.format("Cannot create namespace %s: metadata is not supported", 
icebergNamespace),
-        () -> sql("CREATE DATABASE %s WITH ('prop'='value')", flinkDatabase));
+    Assertions.assertThatThrownBy(
+            () -> sql("CREATE DATABASE %s WITH ('prop'='value')", 
flinkDatabase))
+        .cause()
+        .isInstanceOf(UnsupportedOperationException.class)
+        .hasMessage(
+            String.format(
+                "Cannot create namespace %s: metadata is not supported", 
icebergNamespace));
   }
 }
diff --git 
a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java 
b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
index 3ecddaa05c..825816fdf4 100644
--- a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
+++ b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
@@ -124,14 +124,7 @@ public class FlinkCatalog extends AbstractCatalog {
   }
 
   @Override
-  public void open() throws CatalogException {
-    // Create the default database if it does not exist.
-    try {
-      createDatabase(getDefaultDatabase(), ImmutableMap.of(), true);
-    } catch (DatabaseAlreadyExistException e) {
-      // Ignore the exception if it's already exist.
-    }
-  }
+  public void open() throws CatalogException {}
 
   @Override
   public void close() throws CatalogException {
diff --git 
a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogDatabase.java
 
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogDatabase.java
index f97ace69f8..47b47cb626 100644
--- 
a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogDatabase.java
+++ 
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogDatabase.java
@@ -27,7 +27,6 @@ import org.apache.flink.types.Row;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.catalog.Namespace;
 import org.apache.iceberg.catalog.TableIdentifier;
-import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.types.Types;
 import org.assertj.core.api.Assertions;
 import org.junit.After;
@@ -74,19 +73,6 @@ public class TestFlinkCatalogDatabase extends 
FlinkCatalogTestBase {
         "Database should be created", 
validationNamespaceCatalog.namespaceExists(icebergNamespace));
   }
 
-  @Test
-  public void testDefaultDatabase() {
-    sql("USE CATALOG %s", catalogName);
-    sql("SHOW TABLES");
-
-    Assert.assertEquals(
-        "Should use the current catalog", getTableEnv().getCurrentCatalog(), 
catalogName);
-    Assert.assertEquals(
-        "Should use the configured default namespace",
-        getTableEnv().getCurrentDatabase(),
-        "default");
-  }
-
   @Test
   public void testDropEmptyDatabase() {
     Assert.assertFalse(
@@ -174,22 +160,17 @@ public class TestFlinkCatalogDatabase extends 
FlinkCatalogTestBase {
     List<Row> databases = sql("SHOW DATABASES");
 
     if (isHadoopCatalog) {
-      Assert.assertEquals("Should have 2 database", 2, databases.size());
-      Assert.assertEquals(
-          "Should have db and default database",
-          Sets.newHashSet("default", "db"),
-          Sets.newHashSet(databases.get(0).getField(0), 
databases.get(1).getField(0)));
+      Assert.assertEquals("Should have 1 database", 1, databases.size());
+      Assert.assertEquals("Should have db database", "db", 
databases.get(0).getField(0));
 
       if (!baseNamespace.isEmpty()) {
         // test namespace not belongs to this catalog
         validationNamespaceCatalog.createNamespace(
             Namespace.of(baseNamespace.level(0), "UNKNOWN_NAMESPACE"));
         databases = sql("SHOW DATABASES");
-        Assert.assertEquals("Should have 2 database", 2, databases.size());
+        Assert.assertEquals("Should have 1 database", 1, databases.size());
         Assert.assertEquals(
-            "Should have db and default database",
-            Sets.newHashSet("default", "db"),
-            Sets.newHashSet(databases.get(0).getField(0), 
databases.get(1).getField(0)));
+            "Should have db and default database", "db", 
databases.get(0).getField(0));
       }
     } else {
       // If there are multiple classes extends FlinkTestBase, 
TestHiveMetastore may loose the

Reply via email to