This is an automated email from the ASF dual-hosted git repository.
fokko pushed a commit to branch 1.3.x
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/1.3.x by this push:
new e730f847af Flink: remove the creation of default database in
FlinkCatalog open method (#7795) (#8039)
e730f847af is described below
commit e730f847af9691f79fb264653c4434a48500b4f5
Author: Fokko Driesprong <[email protected]>
AuthorDate: Wed Jul 12 07:06:52 2023 +0200
Flink: remove the creation of default database in FlinkCatalog open method
(#7795) (#8039)
---
.../org/apache/iceberg/flink/FlinkCatalog.java | 9 +---
.../iceberg/flink/TestFlinkCatalogDatabase.java | 51 ++++++++--------------
.../org/apache/iceberg/flink/FlinkCatalog.java | 9 +---
.../iceberg/flink/TestFlinkCatalogDatabase.java | 51 ++++++++--------------
.../org/apache/iceberg/flink/FlinkCatalog.java | 9 +---
.../iceberg/flink/TestFlinkCatalogDatabase.java | 27 ++----------
6 files changed, 41 insertions(+), 115 deletions(-)
diff --git
a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
index 3ecddaa05c..825816fdf4 100644
--- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
@@ -124,14 +124,7 @@ public class FlinkCatalog extends AbstractCatalog {
}
@Override
- public void open() throws CatalogException {
- // Create the default database if it does not exist.
- try {
- createDatabase(getDefaultDatabase(), ImmutableMap.of(), true);
- } catch (DatabaseAlreadyExistException e) {
- // Ignore the exception if it's already exist.
- }
- }
+ public void open() throws CatalogException {}
@Override
public void close() throws CatalogException {
diff --git
a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogDatabase.java
b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogDatabase.java
index a19e5f72bb..47b47cb626 100644
---
a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogDatabase.java
+++
b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogDatabase.java
@@ -24,12 +24,11 @@ import java.util.Map;
import java.util.Objects;
import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
import org.apache.flink.types.Row;
-import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.Schema;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
-import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.Types;
+import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Assert;
import org.junit.Assume;
@@ -74,19 +73,6 @@ public class TestFlinkCatalogDatabase extends
FlinkCatalogTestBase {
"Database should be created",
validationNamespaceCatalog.namespaceExists(icebergNamespace));
}
- @Test
- public void testDefaultDatabase() {
- sql("USE CATALOG %s", catalogName);
- sql("SHOW TABLES");
-
- Assert.assertEquals(
- "Should use the current catalog", getTableEnv().getCurrentCatalog(),
catalogName);
- Assert.assertEquals(
- "Should use the configured default namespace",
- getTableEnv().getCurrentDatabase(),
- "default");
- }
-
@Test
public void testDropEmptyDatabase() {
Assert.assertFalse(
@@ -126,11 +112,11 @@ public class TestFlinkCatalogDatabase extends
FlinkCatalogTestBase {
"Table should exist",
validationCatalog.tableExists(TableIdentifier.of(icebergNamespace,
"tl")));
- AssertHelpers.assertThrowsCause(
- "Should fail if trying to delete a non-empty database",
- DatabaseNotEmptyException.class,
- String.format("Database %s in catalog %s is not empty.", DATABASE,
catalogName),
- () -> sql("DROP DATABASE %s", flinkDatabase));
+ Assertions.assertThatThrownBy(() -> sql("DROP DATABASE %s", flinkDatabase))
+ .cause()
+ .isInstanceOf(DatabaseNotEmptyException.class)
+ .hasMessage(
+ String.format("Database %s in catalog %s is not empty.", DATABASE,
catalogName));
sql("DROP TABLE %s.tl", flinkDatabase);
}
@@ -174,22 +160,17 @@ public class TestFlinkCatalogDatabase extends
FlinkCatalogTestBase {
List<Row> databases = sql("SHOW DATABASES");
if (isHadoopCatalog) {
- Assert.assertEquals("Should have 2 database", 2, databases.size());
- Assert.assertEquals(
- "Should have db and default database",
- Sets.newHashSet("default", "db"),
- Sets.newHashSet(databases.get(0).getField(0),
databases.get(1).getField(0)));
+ Assert.assertEquals("Should have 1 database", 1, databases.size());
+ Assert.assertEquals("Should have db database", "db",
databases.get(0).getField(0));
if (!baseNamespace.isEmpty()) {
// test namespace not belongs to this catalog
validationNamespaceCatalog.createNamespace(
Namespace.of(baseNamespace.level(0), "UNKNOWN_NAMESPACE"));
databases = sql("SHOW DATABASES");
- Assert.assertEquals("Should have 2 database", 2, databases.size());
+ Assert.assertEquals("Should have 1 database", 1, databases.size());
Assert.assertEquals(
- "Should have db and default database",
- Sets.newHashSet("default", "db"),
- Sets.newHashSet(databases.get(0).getField(0),
databases.get(1).getField(0)));
+ "Should have db and default database", "db",
databases.get(0).getField(0));
}
} else {
// If there are multiple classes extends FlinkTestBase,
TestHiveMetastore may loose the
@@ -301,10 +282,12 @@ public class TestFlinkCatalogDatabase extends
FlinkCatalogTestBase {
"Namespace should not already exist",
validationNamespaceCatalog.namespaceExists(icebergNamespace));
- AssertHelpers.assertThrowsCause(
- "Should fail if trying to create database with location in hadoop
catalog.",
- UnsupportedOperationException.class,
- String.format("Cannot create namespace %s: metadata is not supported",
icebergNamespace),
- () -> sql("CREATE DATABASE %s WITH ('prop'='value')", flinkDatabase));
+ Assertions.assertThatThrownBy(
+ () -> sql("CREATE DATABASE %s WITH ('prop'='value')",
flinkDatabase))
+ .cause()
+ .isInstanceOf(UnsupportedOperationException.class)
+ .hasMessage(
+ String.format(
+ "Cannot create namespace %s: metadata is not supported",
icebergNamespace));
}
}
diff --git
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
index 3ecddaa05c..825816fdf4 100644
--- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
+++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
@@ -124,14 +124,7 @@ public class FlinkCatalog extends AbstractCatalog {
}
@Override
- public void open() throws CatalogException {
- // Create the default database if it does not exist.
- try {
- createDatabase(getDefaultDatabase(), ImmutableMap.of(), true);
- } catch (DatabaseAlreadyExistException e) {
- // Ignore the exception if it's already exist.
- }
- }
+ public void open() throws CatalogException {}
@Override
public void close() throws CatalogException {
diff --git
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogDatabase.java
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogDatabase.java
index a19e5f72bb..47b47cb626 100644
---
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogDatabase.java
+++
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogDatabase.java
@@ -24,12 +24,11 @@ import java.util.Map;
import java.util.Objects;
import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
import org.apache.flink.types.Row;
-import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.Schema;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
-import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.Types;
+import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Assert;
import org.junit.Assume;
@@ -74,19 +73,6 @@ public class TestFlinkCatalogDatabase extends
FlinkCatalogTestBase {
"Database should be created",
validationNamespaceCatalog.namespaceExists(icebergNamespace));
}
- @Test
- public void testDefaultDatabase() {
- sql("USE CATALOG %s", catalogName);
- sql("SHOW TABLES");
-
- Assert.assertEquals(
- "Should use the current catalog", getTableEnv().getCurrentCatalog(),
catalogName);
- Assert.assertEquals(
- "Should use the configured default namespace",
- getTableEnv().getCurrentDatabase(),
- "default");
- }
-
@Test
public void testDropEmptyDatabase() {
Assert.assertFalse(
@@ -126,11 +112,11 @@ public class TestFlinkCatalogDatabase extends
FlinkCatalogTestBase {
"Table should exist",
validationCatalog.tableExists(TableIdentifier.of(icebergNamespace,
"tl")));
- AssertHelpers.assertThrowsCause(
- "Should fail if trying to delete a non-empty database",
- DatabaseNotEmptyException.class,
- String.format("Database %s in catalog %s is not empty.", DATABASE,
catalogName),
- () -> sql("DROP DATABASE %s", flinkDatabase));
+ Assertions.assertThatThrownBy(() -> sql("DROP DATABASE %s", flinkDatabase))
+ .cause()
+ .isInstanceOf(DatabaseNotEmptyException.class)
+ .hasMessage(
+ String.format("Database %s in catalog %s is not empty.", DATABASE,
catalogName));
sql("DROP TABLE %s.tl", flinkDatabase);
}
@@ -174,22 +160,17 @@ public class TestFlinkCatalogDatabase extends
FlinkCatalogTestBase {
List<Row> databases = sql("SHOW DATABASES");
if (isHadoopCatalog) {
- Assert.assertEquals("Should have 2 database", 2, databases.size());
- Assert.assertEquals(
- "Should have db and default database",
- Sets.newHashSet("default", "db"),
- Sets.newHashSet(databases.get(0).getField(0),
databases.get(1).getField(0)));
+ Assert.assertEquals("Should have 1 database", 1, databases.size());
+ Assert.assertEquals("Should have db database", "db",
databases.get(0).getField(0));
if (!baseNamespace.isEmpty()) {
// test namespace not belongs to this catalog
validationNamespaceCatalog.createNamespace(
Namespace.of(baseNamespace.level(0), "UNKNOWN_NAMESPACE"));
databases = sql("SHOW DATABASES");
- Assert.assertEquals("Should have 2 database", 2, databases.size());
+ Assert.assertEquals("Should have 1 database", 1, databases.size());
Assert.assertEquals(
- "Should have db and default database",
- Sets.newHashSet("default", "db"),
- Sets.newHashSet(databases.get(0).getField(0),
databases.get(1).getField(0)));
+ "Should have db and default database", "db",
databases.get(0).getField(0));
}
} else {
// If there are multiple classes extends FlinkTestBase,
TestHiveMetastore may loose the
@@ -301,10 +282,12 @@ public class TestFlinkCatalogDatabase extends
FlinkCatalogTestBase {
"Namespace should not already exist",
validationNamespaceCatalog.namespaceExists(icebergNamespace));
- AssertHelpers.assertThrowsCause(
- "Should fail if trying to create database with location in hadoop
catalog.",
- UnsupportedOperationException.class,
- String.format("Cannot create namespace %s: metadata is not supported",
icebergNamespace),
- () -> sql("CREATE DATABASE %s WITH ('prop'='value')", flinkDatabase));
+ Assertions.assertThatThrownBy(
+ () -> sql("CREATE DATABASE %s WITH ('prop'='value')",
flinkDatabase))
+ .cause()
+ .isInstanceOf(UnsupportedOperationException.class)
+ .hasMessage(
+ String.format(
+ "Cannot create namespace %s: metadata is not supported",
icebergNamespace));
}
}
diff --git
a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
index 3ecddaa05c..825816fdf4 100644
--- a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
+++ b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
@@ -124,14 +124,7 @@ public class FlinkCatalog extends AbstractCatalog {
}
@Override
- public void open() throws CatalogException {
- // Create the default database if it does not exist.
- try {
- createDatabase(getDefaultDatabase(), ImmutableMap.of(), true);
- } catch (DatabaseAlreadyExistException e) {
- // Ignore the exception if it's already exist.
- }
- }
+ public void open() throws CatalogException {}
@Override
public void close() throws CatalogException {
diff --git
a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogDatabase.java
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogDatabase.java
index f97ace69f8..47b47cb626 100644
---
a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogDatabase.java
+++
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogDatabase.java
@@ -27,7 +27,6 @@ import org.apache.flink.types.Row;
import org.apache.iceberg.Schema;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
-import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.Types;
import org.assertj.core.api.Assertions;
import org.junit.After;
@@ -74,19 +73,6 @@ public class TestFlinkCatalogDatabase extends
FlinkCatalogTestBase {
"Database should be created",
validationNamespaceCatalog.namespaceExists(icebergNamespace));
}
- @Test
- public void testDefaultDatabase() {
- sql("USE CATALOG %s", catalogName);
- sql("SHOW TABLES");
-
- Assert.assertEquals(
- "Should use the current catalog", getTableEnv().getCurrentCatalog(),
catalogName);
- Assert.assertEquals(
- "Should use the configured default namespace",
- getTableEnv().getCurrentDatabase(),
- "default");
- }
-
@Test
public void testDropEmptyDatabase() {
Assert.assertFalse(
@@ -174,22 +160,17 @@ public class TestFlinkCatalogDatabase extends
FlinkCatalogTestBase {
List<Row> databases = sql("SHOW DATABASES");
if (isHadoopCatalog) {
- Assert.assertEquals("Should have 2 database", 2, databases.size());
- Assert.assertEquals(
- "Should have db and default database",
- Sets.newHashSet("default", "db"),
- Sets.newHashSet(databases.get(0).getField(0),
databases.get(1).getField(0)));
+ Assert.assertEquals("Should have 1 database", 1, databases.size());
+ Assert.assertEquals("Should have db database", "db",
databases.get(0).getField(0));
if (!baseNamespace.isEmpty()) {
// test namespace not belongs to this catalog
validationNamespaceCatalog.createNamespace(
Namespace.of(baseNamespace.level(0), "UNKNOWN_NAMESPACE"));
databases = sql("SHOW DATABASES");
- Assert.assertEquals("Should have 2 database", 2, databases.size());
+ Assert.assertEquals("Should have 1 database", 1, databases.size());
Assert.assertEquals(
- "Should have db and default database",
- Sets.newHashSet("default", "db"),
- Sets.newHashSet(databases.get(0).getField(0),
databases.get(1).getField(0)));
+ "Should have db and default database", "db",
databases.get(0).getField(0));
}
} else {
// If there are multiple classes extends FlinkTestBase,
TestHiveMetastore may loose the