This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 61532a042a Flink: Backport #9364 to 1.16 and 1.17 for Create 
CatalogTestBase for migration to JUnit5 (#9601)
61532a042a is described below

commit 61532a042adcd2c916b62ace533129e9e7ad3128
Author: Rodrigo <[email protected]>
AuthorDate: Thu Feb 1 00:07:56 2024 -0800

    Flink: Backport #9364 to 1.16 and 1.17 for Create CatalogTestBase for 
migration to JUnit5 (#9601)
---
 .../org/apache/iceberg/flink/CatalogTestBase.java  | 143 +++++++++++
 .../iceberg/flink/TestFlinkCatalogDatabase.java    | 267 +++++++++------------
 .../flink/TestFlinkCatalogTablePartitions.java     |  46 ++--
 .../source/TestMetadataTableReadableMetrics.java   |  48 ++--
 .../org/apache/iceberg/flink/CatalogTestBase.java  | 143 +++++++++++
 .../iceberg/flink/TestFlinkCatalogDatabase.java    | 267 +++++++++------------
 .../flink/TestFlinkCatalogTablePartitions.java     |  46 ++--
 .../source/TestMetadataTableReadableMetrics.java   |  48 ++--
 8 files changed, 604 insertions(+), 404 deletions(-)

diff --git 
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/CatalogTestBase.java 
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/CatalogTestBase.java
new file mode 100644
index 0000000000..91ed3c4ade
--- /dev/null
+++ 
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/CatalogTestBase.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import org.apache.flink.util.ArrayUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.Parameter;
+import org.apache.iceberg.ParameterizedTestExtension;
+import org.apache.iceberg.Parameters;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.hadoop.HadoopCatalog;
+import org.apache.iceberg.relocated.com.google.common.base.Joiner;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
+
+@ExtendWith(ParameterizedTestExtension.class)
+public abstract class CatalogTestBase extends TestBase {
+
+  protected static final String DATABASE = "db";
+  @TempDir protected File hiveWarehouse;
+  @TempDir protected File hadoopWarehouse;
+
+  @Parameter(index = 0)
+  protected String catalogName;
+
+  @Parameter(index = 1)
+  protected Namespace baseNamespace;
+
+  protected Catalog validationCatalog;
+  protected SupportsNamespaces validationNamespaceCatalog;
+  protected Map<String, String> config = Maps.newHashMap();
+
+  protected String flinkDatabase;
+  protected Namespace icebergNamespace;
+  protected boolean isHadoopCatalog;
+
+  @Parameters(name = "catalogName={0}, baseNamespace={1}")
+  protected static List<Object[]> parameters() {
+    return Arrays.asList(
+        new Object[] {"testhive", Namespace.empty()},
+        new Object[] {"testhadoop", Namespace.empty()},
+        new Object[] {"testhadoop_basenamespace", Namespace.of("l0", "l1")});
+  }
+
+  @BeforeEach
+  public void before() {
+    this.isHadoopCatalog = catalogName.startsWith("testhadoop");
+    this.validationCatalog =
+        isHadoopCatalog
+            ? new HadoopCatalog(hiveConf, "file:" + hadoopWarehouse.getPath())
+            : catalog;
+    this.validationNamespaceCatalog = (SupportsNamespaces) validationCatalog;
+
+    config.put("type", "iceberg");
+    if (!baseNamespace.isEmpty()) {
+      config.put(FlinkCatalogFactory.BASE_NAMESPACE, baseNamespace.toString());
+    }
+    if (isHadoopCatalog) {
+      config.put(FlinkCatalogFactory.ICEBERG_CATALOG_TYPE, "hadoop");
+    } else {
+      config.put(FlinkCatalogFactory.ICEBERG_CATALOG_TYPE, "hive");
+      config.put(CatalogProperties.URI, getURI(hiveConf));
+    }
+    config.put(CatalogProperties.WAREHOUSE_LOCATION, 
String.format("file://%s", warehouseRoot()));
+
+    this.flinkDatabase = catalogName + "." + DATABASE;
+    this.icebergNamespace =
+        Namespace.of(ArrayUtils.concat(baseNamespace.levels(), new String[] 
{DATABASE}));
+    sql("CREATE CATALOG %s WITH %s", catalogName, toWithClause(config));
+  }
+
+  @AfterEach
+  public void clean() {
+    dropCatalog(catalogName, true);
+  }
+
+  protected String warehouseRoot() {
+    if (isHadoopCatalog) {
+      return hadoopWarehouse.getAbsolutePath();
+    } else {
+      return hiveWarehouse.getAbsolutePath();
+    }
+  }
+
+  protected String getFullQualifiedTableName(String tableName) {
+    final List<String> levels = Lists.newArrayList(icebergNamespace.levels());
+    levels.add(tableName);
+    return Joiner.on('.').join(levels);
+  }
+
+  static String getURI(HiveConf conf) {
+    return conf.get(HiveConf.ConfVars.METASTOREURIS.varname);
+  }
+
+  static String toWithClause(Map<String, String> props) {
+    StringBuilder builder = new StringBuilder();
+    builder.append("(");
+    int propCount = 0;
+    for (Map.Entry<String, String> entry : props.entrySet()) {
+      if (propCount > 0) {
+        builder.append(",");
+      }
+      builder
+          .append("'")
+          .append(entry.getKey())
+          .append("'")
+          .append("=")
+          .append("'")
+          .append(entry.getValue())
+          .append("'");
+      propCount++;
+    }
+    builder.append(")");
+    return builder.toString();
+  }
+}
diff --git 
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogDatabase.java
 
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogDatabase.java
index 47b47cb626..f46d50a5f0 100644
--- 
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogDatabase.java
+++ 
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogDatabase.java
@@ -18,7 +18,10 @@
  */
 package org.apache.iceberg.flink;
 
-import java.io.File;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assumptions.assumeThat;
+
+import java.nio.file.Path;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -29,18 +32,12 @@ import org.apache.iceberg.catalog.Namespace;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.types.Types;
 import org.assertj.core.api.Assertions;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Assume;
-import org.junit.Test;
-
-public class TestFlinkCatalogDatabase extends FlinkCatalogTestBase {
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.TestTemplate;
 
-  public TestFlinkCatalogDatabase(String catalogName, Namespace baseNamespace) 
{
-    super(catalogName, baseNamespace);
-  }
+public class TestFlinkCatalogDatabase extends CatalogTestBase {
 
-  @After
+  @AfterEach
   @Override
   public void clean() {
     sql("DROP TABLE IF EXISTS %s.tl", flinkDatabase);
@@ -48,240 +45,204 @@ public class TestFlinkCatalogDatabase extends 
FlinkCatalogTestBase {
     super.clean();
   }
 
-  @Test
+  @TestTemplate
   public void testCreateNamespace() {
-    Assert.assertFalse(
-        "Database should not already exist",
-        validationNamespaceCatalog.namespaceExists(icebergNamespace));
+    assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace))
+        .as("Database should not already exist")
+        .isFalse();
 
     sql("CREATE DATABASE %s", flinkDatabase);
-
-    Assert.assertTrue(
-        "Database should exist", 
validationNamespaceCatalog.namespaceExists(icebergNamespace));
+    assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace))
+        .as("Database should exist")
+        .isTrue();
 
     sql("CREATE DATABASE IF NOT EXISTS %s", flinkDatabase);
-    Assert.assertTrue(
-        "Database should still exist",
-        validationNamespaceCatalog.namespaceExists(icebergNamespace));
+    assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace))
+        .as("Database should still exist")
+        .isTrue();
 
     sql("DROP DATABASE IF EXISTS %s", flinkDatabase);
-    Assert.assertFalse(
-        "Database should be dropped", 
validationNamespaceCatalog.namespaceExists(icebergNamespace));
+    assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace))
+        .as("Database should be dropped")
+        .isFalse();
 
     sql("CREATE DATABASE IF NOT EXISTS %s", flinkDatabase);
-    Assert.assertTrue(
-        "Database should be created", 
validationNamespaceCatalog.namespaceExists(icebergNamespace));
+    assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace))
+        .as("Database should be created")
+        .isTrue();
   }
 
-  @Test
+  @TestTemplate
   public void testDropEmptyDatabase() {
-    Assert.assertFalse(
-        "Namespace should not already exist",
-        validationNamespaceCatalog.namespaceExists(icebergNamespace));
-
+    assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace))
+        .as("Namespace should not already exist")
+        .isFalse();
     sql("CREATE DATABASE %s", flinkDatabase);
-
-    Assert.assertTrue(
-        "Namespace should exist", 
validationNamespaceCatalog.namespaceExists(icebergNamespace));
-
+    assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace))
+        .as("Namespace should exist")
+        .isTrue();
     sql("DROP DATABASE %s", flinkDatabase);
-
-    Assert.assertFalse(
-        "Namespace should have been dropped",
-        validationNamespaceCatalog.namespaceExists(icebergNamespace));
+    assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace))
+        .as("Namespace should have been dropped")
+        .isFalse();
   }
 
-  @Test
+  @TestTemplate
   public void testDropNonEmptyNamespace() {
-    Assume.assumeFalse(
-        "Hadoop catalog throws IOException: Directory is not empty.", 
isHadoopCatalog);
-
-    Assert.assertFalse(
-        "Namespace should not already exist",
-        validationNamespaceCatalog.namespaceExists(icebergNamespace));
-
+    assumeThat(isHadoopCatalog)
+        .as("Hadoop catalog throws IOException: Directory is not empty.")
+        .isFalse();
+    assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace))
+        .as("Namespace should not already exist")
+        .isFalse();
     sql("CREATE DATABASE %s", flinkDatabase);
-
     validationCatalog.createTable(
         TableIdentifier.of(icebergNamespace, "tl"),
         new Schema(Types.NestedField.optional(0, "id", Types.LongType.get())));
-
-    Assert.assertTrue(
-        "Namespace should exist", 
validationNamespaceCatalog.namespaceExists(icebergNamespace));
-    Assert.assertTrue(
-        "Table should exist",
-        validationCatalog.tableExists(TableIdentifier.of(icebergNamespace, 
"tl")));
-
+    assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace))
+        .as("Namespace should exist")
+        .isTrue();
+    
assertThat(validationCatalog.tableExists(TableIdentifier.of(icebergNamespace, 
"tl")))
+        .as("Table should exist")
+        .isTrue();
     Assertions.assertThatThrownBy(() -> sql("DROP DATABASE %s", flinkDatabase))
         .cause()
         .isInstanceOf(DatabaseNotEmptyException.class)
         .hasMessage(
             String.format("Database %s in catalog %s is not empty.", DATABASE, 
catalogName));
-
     sql("DROP TABLE %s.tl", flinkDatabase);
   }
 
-  @Test
+  @TestTemplate
   public void testListTables() {
-    Assert.assertFalse(
-        "Namespace should not already exist",
-        validationNamespaceCatalog.namespaceExists(icebergNamespace));
-
+    assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace))
+        .as("Namespace should not already exist")
+        .isFalse();
     sql("CREATE DATABASE %s", flinkDatabase);
     sql("USE CATALOG %s", catalogName);
     sql("USE %s", DATABASE);
-
-    Assert.assertTrue(
-        "Namespace should exist", 
validationNamespaceCatalog.namespaceExists(icebergNamespace));
-
-    Assert.assertEquals("Should not list any tables", 0, sql("SHOW 
TABLES").size());
-
+    assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace))
+        .as("Namespace should exist")
+        .isTrue();
+    assertThat(sql("SHOW TABLES")).isEmpty();
     validationCatalog.createTable(
         TableIdentifier.of(icebergNamespace, "tl"),
         new Schema(Types.NestedField.optional(0, "id", Types.LongType.get())));
 
     List<Row> tables = sql("SHOW TABLES");
-    Assert.assertEquals("Only 1 table", 1, tables.size());
-    Assert.assertEquals("Table name should match", "tl", 
tables.get(0).getField(0));
+    assertThat(tables).hasSize(1);
+    assertThat("tl").as("Table name should 
match").isEqualTo(tables.get(0).getField(0));
   }
 
-  @Test
+  @TestTemplate
   public void testListNamespace() {
-    Assert.assertFalse(
-        "Namespace should not already exist",
-        validationNamespaceCatalog.namespaceExists(icebergNamespace));
-
+    assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace))
+        .as("Namespace should not already exist")
+        .isFalse();
     sql("CREATE DATABASE %s", flinkDatabase);
     sql("USE CATALOG %s", catalogName);
-
-    Assert.assertTrue(
-        "Namespace should exist", 
validationNamespaceCatalog.namespaceExists(icebergNamespace));
+    assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace))
+        .as("Namespace should exist")
+        .isTrue();
 
     List<Row> databases = sql("SHOW DATABASES");
 
     if (isHadoopCatalog) {
-      Assert.assertEquals("Should have 1 database", 1, databases.size());
-      Assert.assertEquals("Should have db database", "db", 
databases.get(0).getField(0));
-
+      assertThat(databases).hasSize(1);
+      assertThat(databases.get(0).getField(0)).as("Should have db 
database").isEqualTo("db");
       if (!baseNamespace.isEmpty()) {
         // test namespace not belongs to this catalog
         validationNamespaceCatalog.createNamespace(
             Namespace.of(baseNamespace.level(0), "UNKNOWN_NAMESPACE"));
         databases = sql("SHOW DATABASES");
-        Assert.assertEquals("Should have 1 database", 1, databases.size());
-        Assert.assertEquals(
-            "Should have db and default database", "db", 
databases.get(0).getField(0));
+        assertThat(databases).hasSize(1);
+        assertThat(databases.get(0).getField(0)).as("Should have db 
database").isEqualTo("db");
       }
     } else {
       // If there are multiple classes extends FlinkTestBase, 
TestHiveMetastore may loose the
       // creation for default
       // database. See HiveMetaStore.HMSHandler.init.
-      Assert.assertTrue(
-          "Should have db database",
-          databases.stream().anyMatch(d -> Objects.equals(d.getField(0), 
"db")));
+      assertThat(databases)
+          .as("Should have db database")
+          .anyMatch(d -> Objects.equals(d.getField(0), "db"));
     }
   }
 
-  @Test
+  @TestTemplate
   public void testCreateNamespaceWithMetadata() {
-    Assume.assumeFalse("HadoopCatalog does not support namespace metadata", 
isHadoopCatalog);
-
-    Assert.assertFalse(
-        "Namespace should not already exist",
-        validationNamespaceCatalog.namespaceExists(icebergNamespace));
-
+    assumeThat(isHadoopCatalog).as("HadoopCatalog does not support namespace 
metadata").isFalse();
+    assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace))
+        .as("Namespace should not already exist")
+        .isFalse();
     sql("CREATE DATABASE %s WITH ('prop'='value')", flinkDatabase);
-
-    Assert.assertTrue(
-        "Namespace should exist", 
validationNamespaceCatalog.namespaceExists(icebergNamespace));
-
+    assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace))
+        .as("Namespace should exist")
+        .isTrue();
     Map<String, String> nsMetadata =
         validationNamespaceCatalog.loadNamespaceMetadata(icebergNamespace);
-
-    Assert.assertEquals(
-        "Namespace should have expected prop value", "value", 
nsMetadata.get("prop"));
+    assertThat(nsMetadata).containsEntry("prop", "value");
   }
 
-  @Test
+  @TestTemplate
   public void testCreateNamespaceWithComment() {
-    Assume.assumeFalse("HadoopCatalog does not support namespace metadata", 
isHadoopCatalog);
-
-    Assert.assertFalse(
-        "Namespace should not already exist",
-        validationNamespaceCatalog.namespaceExists(icebergNamespace));
+    assumeThat(isHadoopCatalog).as("HadoopCatalog does not support namespace 
metadata").isFalse();
+    assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace))
+        .as("Namespace should not already exist")
+        .isFalse();
 
     sql("CREATE DATABASE %s COMMENT 'namespace doc'", flinkDatabase);
-
-    Assert.assertTrue(
-        "Namespace should exist", 
validationNamespaceCatalog.namespaceExists(icebergNamespace));
-
+    assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace))
+        .as("Namespace should exist")
+        .isTrue();
     Map<String, String> nsMetadata =
         validationNamespaceCatalog.loadNamespaceMetadata(icebergNamespace);
-
-    Assert.assertEquals(
-        "Namespace should have expected comment", "namespace doc", 
nsMetadata.get("comment"));
+    assertThat(nsMetadata).containsEntry("comment", "namespace doc");
   }
 
-  @Test
+  @TestTemplate
   public void testCreateNamespaceWithLocation() throws Exception {
-    Assume.assumeFalse("HadoopCatalog does not support namespace metadata", 
isHadoopCatalog);
-
-    Assert.assertFalse(
-        "Namespace should not already exist",
-        validationNamespaceCatalog.namespaceExists(icebergNamespace));
-
-    File location = TEMPORARY_FOLDER.newFile();
-    Assert.assertTrue(location.delete());
+    assumeThat(isHadoopCatalog).as("HadoopCatalog does not support namespace 
metadata").isFalse();
+    assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace))
+        .as("Namespace should not already exist")
+        .isFalse();
 
+    Path location = temporaryDirectory.getRoot();
     sql("CREATE DATABASE %s WITH ('location'='%s')", flinkDatabase, location);
-
-    Assert.assertTrue(
-        "Namespace should exist", 
validationNamespaceCatalog.namespaceExists(icebergNamespace));
-
+    assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace))
+        .as("Namespace should exist")
+        .isTrue();
     Map<String, String> nsMetadata =
         validationNamespaceCatalog.loadNamespaceMetadata(icebergNamespace);
-
-    Assert.assertEquals(
-        "Namespace should have expected location",
-        "file:" + location.getPath(),
-        nsMetadata.get("location"));
+    assertThat(nsMetadata).containsEntry("location", "file:" + 
location.getRoot());
   }
 
-  @Test
+  @TestTemplate
   public void testSetProperties() {
-    Assume.assumeFalse("HadoopCatalog does not support namespace metadata", 
isHadoopCatalog);
-
-    Assert.assertFalse(
-        "Namespace should not already exist",
-        validationNamespaceCatalog.namespaceExists(icebergNamespace));
+    assumeThat(isHadoopCatalog).as("HadoopCatalog does not support namespace 
metadata").isFalse();
+    assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace))
+        .as("Namespace should not already exist")
+        .isFalse();
 
     sql("CREATE DATABASE %s", flinkDatabase);
-
-    Assert.assertTrue(
-        "Namespace should exist", 
validationNamespaceCatalog.namespaceExists(icebergNamespace));
+    assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace))
+        .as("Namespace should exist")
+        .isTrue();
 
     Map<String, String> defaultMetadata =
         validationNamespaceCatalog.loadNamespaceMetadata(icebergNamespace);
-    Assert.assertFalse(
-        "Default metadata should not have custom property", 
defaultMetadata.containsKey("prop"));
-
+    assertThat(defaultMetadata).doesNotContainKey("prop");
     sql("ALTER DATABASE %s SET ('prop'='value')", flinkDatabase);
-
     Map<String, String> nsMetadata =
         validationNamespaceCatalog.loadNamespaceMetadata(icebergNamespace);
-
-    Assert.assertEquals(
-        "Namespace should have expected prop value", "value", 
nsMetadata.get("prop"));
+    assertThat(nsMetadata).containsEntry("prop", "value");
   }
 
-  @Test
+  @TestTemplate
   public void testHadoopNotSupportMeta() {
-    Assume.assumeTrue("HadoopCatalog does not support namespace metadata", 
isHadoopCatalog);
-
-    Assert.assertFalse(
-        "Namespace should not already exist",
-        validationNamespaceCatalog.namespaceExists(icebergNamespace));
-
+    assumeThat(isHadoopCatalog).as("HadoopCatalog does not support namespace 
metadata").isTrue();
+    assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace))
+        .as("Namespace should not already exist")
+        .isFalse();
     Assertions.assertThatThrownBy(
             () -> sql("CREATE DATABASE %s WITH ('prop'='value')", 
flinkDatabase))
         .cause()
diff --git 
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTablePartitions.java
 
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTablePartitions.java
index fad65f4c63..5716abf2e1 100644
--- 
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTablePartitions.java
+++ 
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTablePartitions.java
@@ -18,6 +18,8 @@
  */
 package org.apache.iceberg.flink;
 
+import static org.assertj.core.api.Assertions.assertThat;
+
 import java.util.List;
 import org.apache.flink.table.catalog.CatalogPartitionSpec;
 import org.apache.flink.table.catalog.ObjectPath;
@@ -25,25 +27,28 @@ import 
org.apache.flink.table.catalog.exceptions.TableNotExistException;
 import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
 import org.apache.iceberg.CatalogProperties;
 import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Parameter;
+import org.apache.iceberg.Parameters;
 import org.apache.iceberg.catalog.Namespace;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.assertj.core.api.Assertions;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runners.Parameterized;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestTemplate;
 
-public class TestFlinkCatalogTablePartitions extends FlinkCatalogTestBase {
+public class TestFlinkCatalogTablePartitions extends CatalogTestBase {
 
   private String tableName = "test_table";
 
-  private final FileFormat format;
+  @Parameter(index = 2)
+  private FileFormat format;
+
+  @Parameter(index = 3)
+  private Boolean cacheEnabled;
 
-  @Parameterized.Parameters(
-      name = "catalogName={0}, baseNamespace={1}, format={2}, 
cacheEnabled={3}")
-  public static Iterable<Object[]> parameters() {
+  @Parameters(name = "catalogName={0}, baseNamespace={1}, format={2}, 
cacheEnabled={3}")
+  protected static List<Object[]> parameters() {
     List<Object[]> parameters = Lists.newArrayList();
     for (FileFormat format :
         new FileFormat[] {FileFormat.ORC, FileFormat.AVRO, 
FileFormat.PARQUET}) {
@@ -58,30 +63,24 @@ public class TestFlinkCatalogTablePartitions extends 
FlinkCatalogTestBase {
     return parameters;
   }
 
-  public TestFlinkCatalogTablePartitions(
-      String catalogName, Namespace baseNamespace, FileFormat format, boolean 
cacheEnabled) {
-    super(catalogName, baseNamespace);
-    this.format = format;
-    config.put(CatalogProperties.CACHE_ENABLED, String.valueOf(cacheEnabled));
-  }
-
   @Override
-  @Before
+  @BeforeEach
   public void before() {
     super.before();
+    config.put(CatalogProperties.CACHE_ENABLED, String.valueOf(cacheEnabled));
     sql("CREATE DATABASE %s", flinkDatabase);
     sql("USE CATALOG %s", catalogName);
     sql("USE %s", DATABASE);
   }
 
-  @After
+  @AfterEach
   public void cleanNamespaces() {
     sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName);
     sql("DROP DATABASE IF EXISTS %s", flinkDatabase);
     super.clean();
   }
 
-  @Test
+  @TestTemplate
   public void testListPartitionsWithUnpartitionedTable() {
     sql(
         "CREATE TABLE %s (id INT, data VARCHAR) with 
('write.format.default'='%s')",
@@ -95,7 +94,7 @@ public class TestFlinkCatalogTablePartitions extends 
FlinkCatalogTestBase {
         .hasMessage("Table " + objectPath + " in catalog " + catalogName + " 
is not partitioned.");
   }
 
-  @Test
+  @TestTemplate
   public void testListPartitionsWithPartitionedTable()
       throws TableNotExistException, TableNotPartitionedException {
     sql(
@@ -108,13 +107,12 @@ public class TestFlinkCatalogTablePartitions extends 
FlinkCatalogTestBase {
     ObjectPath objectPath = new ObjectPath(DATABASE, tableName);
     FlinkCatalog flinkCatalog = (FlinkCatalog) 
getTableEnv().getCatalog(catalogName).get();
     List<CatalogPartitionSpec> list = flinkCatalog.listPartitions(objectPath);
-    Assert.assertEquals("Should have 2 partition", 2, list.size());
-
+    assertThat(list).hasSize(2);
     List<CatalogPartitionSpec> expected = Lists.newArrayList();
     CatalogPartitionSpec partitionSpec1 = new 
CatalogPartitionSpec(ImmutableMap.of("data", "a"));
     CatalogPartitionSpec partitionSpec2 = new 
CatalogPartitionSpec(ImmutableMap.of("data", "b"));
     expected.add(partitionSpec1);
     expected.add(partitionSpec2);
-    Assert.assertEquals("Should produce the expected catalog partition 
specs.", list, expected);
+    assertThat(list).as("Should produce the expected catalog partition 
specs.").isEqualTo(expected);
   }
 }
diff --git 
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestMetadataTableReadableMetrics.java
 
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestMetadataTableReadableMetrics.java
index f05bf2fcd9..40dfda7237 100644
--- 
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestMetadataTableReadableMetrics.java
+++ 
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestMetadataTableReadableMetrics.java
@@ -21,9 +21,11 @@ package org.apache.iceberg.flink.source;
 import static org.apache.iceberg.types.Types.NestedField.optional;
 import static org.apache.iceberg.types.Types.NestedField.required;
 
+import java.io.File;
 import java.io.IOException;
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
+import java.nio.file.Path;
 import java.util.Base64;
 import java.util.List;
 import org.apache.flink.configuration.Configuration;
@@ -32,6 +34,7 @@ import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.types.Row;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.Files;
+import org.apache.iceberg.Parameters;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
@@ -40,28 +43,22 @@ import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.data.FileHelpers;
 import org.apache.iceberg.data.GenericRecord;
 import org.apache.iceberg.data.Record;
-import org.apache.iceberg.flink.FlinkCatalogTestBase;
+import org.apache.iceberg.flink.CatalogTestBase;
 import org.apache.iceberg.flink.TestHelpers;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.types.Types;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runners.Parameterized;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.io.TempDir;
 
-public class TestMetadataTableReadableMetrics extends FlinkCatalogTestBase {
+public class TestMetadataTableReadableMetrics extends CatalogTestBase {
   private static final String TABLE_NAME = "test_table";
 
-  public TestMetadataTableReadableMetrics(String catalogName, Namespace 
baseNamespace) {
-    super(catalogName, baseNamespace);
-  }
-
-  @Parameterized.Parameters(name = "catalogName={0}, baseNamespace={1}")
-  public static Iterable<Object[]> parameters() {
+  @Parameters(name = "catalogName={0}, baseNamespace={1}")
+  protected static List<Object[]> parameters() {
     List<Object[]> parameters = Lists.newArrayList();
     String catalogName = "testhive";
     Namespace baseNamespace = Namespace.empty();
@@ -76,7 +73,7 @@ public class TestMetadataTableReadableMetrics extends 
FlinkCatalogTestBase {
     return super.getTableEnv();
   }
 
-  @Rule public TemporaryFolder temp = new TemporaryFolder();
+  private @TempDir Path temp;
 
   private static final Types.StructType LEAF_STRUCT_TYPE =
       Types.StructType.of(
@@ -134,8 +131,8 @@ public class TestMetadataTableReadableMetrics extends 
FlinkCatalogTestBase {
             createPrimitiveRecord(
                 false, 2, 2L, Float.NaN, 2.0D, new BigDecimal("2.00"), "2", 
null, null));
 
-    DataFile dataFile =
-        FileHelpers.writeDataFile(table, Files.localOutput(temp.newFile()), 
records);
+    File testFile = File.createTempFile("junit", null, temp.toFile());
+    DataFile dataFile = FileHelpers.writeDataFile(table, 
Files.localOutput(testFile), records);
     table.newAppend().appendFile(dataFile).commit();
     return table;
   }
@@ -153,12 +150,13 @@ public class TestMetadataTableReadableMetrics extends 
FlinkCatalogTestBase {
             createNestedRecord(0L, 0.0),
             createNestedRecord(1L, Double.NaN),
             createNestedRecord(null, null));
-    DataFile dataFile =
-        FileHelpers.writeDataFile(table, Files.localOutput(temp.newFile()), 
records);
+
+    File testFile = File.createTempFile("junit", null, temp.toFile());
+    DataFile dataFile = FileHelpers.writeDataFile(table, 
Files.localOutput(testFile), records);
     table.newAppend().appendFile(dataFile).commit();
   }
 
-  @Before
+  @BeforeEach
   public void before() {
     super.before();
     sql("USE CATALOG %s", catalogName);
@@ -167,7 +165,7 @@ public class TestMetadataTableReadableMetrics extends 
FlinkCatalogTestBase {
   }
 
   @Override
-  @After
+  @AfterEach
   public void clean() {
     sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, TABLE_NAME);
     sql("DROP DATABASE IF EXISTS %s", flinkDatabase);
@@ -212,7 +210,7 @@ public class TestMetadataTableReadableMetrics extends 
FlinkCatalogTestBase {
     return values;
   }
 
-  @Test
+  @TestTemplate
   public void testPrimitiveColumns() throws Exception {
     createPrimitiveTable();
     List<Row> result = sql("SELECT readable_metrics FROM %s$files", 
TABLE_NAME);
@@ -257,7 +255,7 @@ public class TestMetadataTableReadableMetrics extends 
FlinkCatalogTestBase {
     TestHelpers.assertRows(result, expected);
   }
 
-  @Test
+  @TestTemplate
   public void testSelectPrimitiveValues() throws Exception {
     createPrimitiveTable();
 
@@ -276,7 +274,7 @@ public class TestMetadataTableReadableMetrics extends 
FlinkCatalogTestBase {
         ImmutableList.of(Row.of(4L, 0)));
   }
 
-  @Test
+  @TestTemplate
   public void testSelectNestedValues() throws Exception {
     createNestedTable();
     TestHelpers.assertRows(
@@ -287,7 +285,7 @@ public class TestMetadataTableReadableMetrics extends 
FlinkCatalogTestBase {
         ImmutableList.of(Row.of(0L, 3L)));
   }
 
-  @Test
+  @TestTemplate
   public void testNestedValues() throws Exception {
     createNestedTable();
 
diff --git 
a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/CatalogTestBase.java 
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/CatalogTestBase.java
new file mode 100644
index 0000000000..91ed3c4ade
--- /dev/null
+++ 
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/CatalogTestBase.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import org.apache.flink.util.ArrayUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.Parameter;
+import org.apache.iceberg.ParameterizedTestExtension;
+import org.apache.iceberg.Parameters;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.hadoop.HadoopCatalog;
+import org.apache.iceberg.relocated.com.google.common.base.Joiner;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
+
+@ExtendWith(ParameterizedTestExtension.class)
+public abstract class CatalogTestBase extends TestBase {
+
+  protected static final String DATABASE = "db";
+  @TempDir protected File hiveWarehouse;
+  @TempDir protected File hadoopWarehouse;
+
+  @Parameter(index = 0)
+  protected String catalogName;
+
+  @Parameter(index = 1)
+  protected Namespace baseNamespace;
+
+  protected Catalog validationCatalog;
+  protected SupportsNamespaces validationNamespaceCatalog;
+  protected Map<String, String> config = Maps.newHashMap();
+
+  protected String flinkDatabase;
+  protected Namespace icebergNamespace;
+  protected boolean isHadoopCatalog;
+
+  @Parameters(name = "catalogName={0}, baseNamespace={1}")
+  protected static List<Object[]> parameters() {
+    return Arrays.asList(
+        new Object[] {"testhive", Namespace.empty()},
+        new Object[] {"testhadoop", Namespace.empty()},
+        new Object[] {"testhadoop_basenamespace", Namespace.of("l0", "l1")});
+  }
+
+  @BeforeEach
+  public void before() {
+    this.isHadoopCatalog = catalogName.startsWith("testhadoop");
+    this.validationCatalog =
+        isHadoopCatalog
+            ? new HadoopCatalog(hiveConf, "file:" + hadoopWarehouse.getPath())
+            : catalog;
+    this.validationNamespaceCatalog = (SupportsNamespaces) validationCatalog;
+
+    config.put("type", "iceberg");
+    if (!baseNamespace.isEmpty()) {
+      config.put(FlinkCatalogFactory.BASE_NAMESPACE, baseNamespace.toString());
+    }
+    if (isHadoopCatalog) {
+      config.put(FlinkCatalogFactory.ICEBERG_CATALOG_TYPE, "hadoop");
+    } else {
+      config.put(FlinkCatalogFactory.ICEBERG_CATALOG_TYPE, "hive");
+      config.put(CatalogProperties.URI, getURI(hiveConf));
+    }
+    config.put(CatalogProperties.WAREHOUSE_LOCATION, 
String.format("file://%s", warehouseRoot()));
+
+    this.flinkDatabase = catalogName + "." + DATABASE;
+    this.icebergNamespace =
+        Namespace.of(ArrayUtils.concat(baseNamespace.levels(), new String[] 
{DATABASE}));
+    sql("CREATE CATALOG %s WITH %s", catalogName, toWithClause(config));
+  }
+
+  @AfterEach
+  public void clean() {
+    dropCatalog(catalogName, true);
+  }
+
+  protected String warehouseRoot() {
+    if (isHadoopCatalog) {
+      return hadoopWarehouse.getAbsolutePath();
+    } else {
+      return hiveWarehouse.getAbsolutePath();
+    }
+  }
+
+  protected String getFullQualifiedTableName(String tableName) {
+    final List<String> levels = Lists.newArrayList(icebergNamespace.levels());
+    levels.add(tableName);
+    return Joiner.on('.').join(levels);
+  }
+
+  static String getURI(HiveConf conf) {
+    return conf.get(HiveConf.ConfVars.METASTOREURIS.varname);
+  }
+
+  static String toWithClause(Map<String, String> props) {
+    StringBuilder builder = new StringBuilder();
+    builder.append("(");
+    int propCount = 0;
+    for (Map.Entry<String, String> entry : props.entrySet()) {
+      if (propCount > 0) {
+        builder.append(",");
+      }
+      builder
+          .append("'")
+          .append(entry.getKey())
+          .append("'")
+          .append("=")
+          .append("'")
+          .append(entry.getValue())
+          .append("'");
+      propCount++;
+    }
+    builder.append(")");
+    return builder.toString();
+  }
+}
diff --git 
a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogDatabase.java
 
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogDatabase.java
index 47b47cb626..f46d50a5f0 100644
--- 
a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogDatabase.java
+++ 
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogDatabase.java
@@ -18,7 +18,10 @@
  */
 package org.apache.iceberg.flink;
 
-import java.io.File;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assumptions.assumeThat;
+
+import java.nio.file.Path;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -29,18 +32,12 @@ import org.apache.iceberg.catalog.Namespace;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.types.Types;
 import org.assertj.core.api.Assertions;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Assume;
-import org.junit.Test;
-
-public class TestFlinkCatalogDatabase extends FlinkCatalogTestBase {
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.TestTemplate;
 
-  public TestFlinkCatalogDatabase(String catalogName, Namespace baseNamespace) 
{
-    super(catalogName, baseNamespace);
-  }
+public class TestFlinkCatalogDatabase extends CatalogTestBase {
 
-  @After
+  @AfterEach
   @Override
   public void clean() {
     sql("DROP TABLE IF EXISTS %s.tl", flinkDatabase);
@@ -48,240 +45,204 @@ public class TestFlinkCatalogDatabase extends 
FlinkCatalogTestBase {
     super.clean();
   }
 
-  @Test
+  @TestTemplate
   public void testCreateNamespace() {
-    Assert.assertFalse(
-        "Database should not already exist",
-        validationNamespaceCatalog.namespaceExists(icebergNamespace));
+    assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace))
+        .as("Database should not already exist")
+        .isFalse();
 
     sql("CREATE DATABASE %s", flinkDatabase);
-
-    Assert.assertTrue(
-        "Database should exist", 
validationNamespaceCatalog.namespaceExists(icebergNamespace));
+    assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace))
+        .as("Database should exist")
+        .isTrue();
 
     sql("CREATE DATABASE IF NOT EXISTS %s", flinkDatabase);
-    Assert.assertTrue(
-        "Database should still exist",
-        validationNamespaceCatalog.namespaceExists(icebergNamespace));
+    assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace))
+        .as("Database should still exist")
+        .isTrue();
 
     sql("DROP DATABASE IF EXISTS %s", flinkDatabase);
-    Assert.assertFalse(
-        "Database should be dropped", 
validationNamespaceCatalog.namespaceExists(icebergNamespace));
+    assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace))
+        .as("Database should be dropped")
+        .isFalse();
 
     sql("CREATE DATABASE IF NOT EXISTS %s", flinkDatabase);
-    Assert.assertTrue(
-        "Database should be created", 
validationNamespaceCatalog.namespaceExists(icebergNamespace));
+    assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace))
+        .as("Database should be created")
+        .isTrue();
   }
 
-  @Test
+  @TestTemplate
   public void testDropEmptyDatabase() {
-    Assert.assertFalse(
-        "Namespace should not already exist",
-        validationNamespaceCatalog.namespaceExists(icebergNamespace));
-
+    assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace))
+        .as("Namespace should not already exist")
+        .isFalse();
     sql("CREATE DATABASE %s", flinkDatabase);
-
-    Assert.assertTrue(
-        "Namespace should exist", 
validationNamespaceCatalog.namespaceExists(icebergNamespace));
-
+    assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace))
+        .as("Namespace should exist")
+        .isTrue();
     sql("DROP DATABASE %s", flinkDatabase);
-
-    Assert.assertFalse(
-        "Namespace should have been dropped",
-        validationNamespaceCatalog.namespaceExists(icebergNamespace));
+    assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace))
+        .as("Namespace should have been dropped")
+        .isFalse();
   }
 
-  @Test
+  @TestTemplate
   public void testDropNonEmptyNamespace() {
-    Assume.assumeFalse(
-        "Hadoop catalog throws IOException: Directory is not empty.", 
isHadoopCatalog);
-
-    Assert.assertFalse(
-        "Namespace should not already exist",
-        validationNamespaceCatalog.namespaceExists(icebergNamespace));
-
+    assumeThat(isHadoopCatalog)
+        .as("Hadoop catalog throws IOException: Directory is not empty.")
+        .isFalse();
+    assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace))
+        .as("Namespace should not already exist")
+        .isFalse();
     sql("CREATE DATABASE %s", flinkDatabase);
-
     validationCatalog.createTable(
         TableIdentifier.of(icebergNamespace, "tl"),
         new Schema(Types.NestedField.optional(0, "id", Types.LongType.get())));
-
-    Assert.assertTrue(
-        "Namespace should exist", 
validationNamespaceCatalog.namespaceExists(icebergNamespace));
-    Assert.assertTrue(
-        "Table should exist",
-        validationCatalog.tableExists(TableIdentifier.of(icebergNamespace, 
"tl")));
-
+    assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace))
+        .as("Namespace should exist")
+        .isTrue();
+    
assertThat(validationCatalog.tableExists(TableIdentifier.of(icebergNamespace, 
"tl")))
+        .as("Table should exist")
+        .isTrue();
     Assertions.assertThatThrownBy(() -> sql("DROP DATABASE %s", flinkDatabase))
         .cause()
         .isInstanceOf(DatabaseNotEmptyException.class)
         .hasMessage(
             String.format("Database %s in catalog %s is not empty.", DATABASE, 
catalogName));
-
     sql("DROP TABLE %s.tl", flinkDatabase);
   }
 
-  @Test
+  @TestTemplate
   public void testListTables() {
-    Assert.assertFalse(
-        "Namespace should not already exist",
-        validationNamespaceCatalog.namespaceExists(icebergNamespace));
-
+    assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace))
+        .as("Namespace should not already exist")
+        .isFalse();
     sql("CREATE DATABASE %s", flinkDatabase);
     sql("USE CATALOG %s", catalogName);
     sql("USE %s", DATABASE);
-
-    Assert.assertTrue(
-        "Namespace should exist", 
validationNamespaceCatalog.namespaceExists(icebergNamespace));
-
-    Assert.assertEquals("Should not list any tables", 0, sql("SHOW 
TABLES").size());
-
+    assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace))
+        .as("Namespace should exist")
+        .isTrue();
+    assertThat(sql("SHOW TABLES")).isEmpty();
     validationCatalog.createTable(
         TableIdentifier.of(icebergNamespace, "tl"),
         new Schema(Types.NestedField.optional(0, "id", Types.LongType.get())));
 
     List<Row> tables = sql("SHOW TABLES");
-    Assert.assertEquals("Only 1 table", 1, tables.size());
-    Assert.assertEquals("Table name should match", "tl", 
tables.get(0).getField(0));
+    assertThat(tables).hasSize(1);
+    assertThat("tl").as("Table name should 
match").isEqualTo(tables.get(0).getField(0));
   }
 
-  @Test
+  @TestTemplate
   public void testListNamespace() {
-    Assert.assertFalse(
-        "Namespace should not already exist",
-        validationNamespaceCatalog.namespaceExists(icebergNamespace));
-
+    assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace))
+        .as("Namespace should not already exist")
+        .isFalse();
     sql("CREATE DATABASE %s", flinkDatabase);
     sql("USE CATALOG %s", catalogName);
-
-    Assert.assertTrue(
-        "Namespace should exist", 
validationNamespaceCatalog.namespaceExists(icebergNamespace));
+    assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace))
+        .as("Namespace should exist")
+        .isTrue();
 
     List<Row> databases = sql("SHOW DATABASES");
 
     if (isHadoopCatalog) {
-      Assert.assertEquals("Should have 1 database", 1, databases.size());
-      Assert.assertEquals("Should have db database", "db", 
databases.get(0).getField(0));
-
+      assertThat(databases).hasSize(1);
+      assertThat(databases.get(0).getField(0)).as("Should have db 
database").isEqualTo("db");
       if (!baseNamespace.isEmpty()) {
         // test namespace not belongs to this catalog
         validationNamespaceCatalog.createNamespace(
             Namespace.of(baseNamespace.level(0), "UNKNOWN_NAMESPACE"));
         databases = sql("SHOW DATABASES");
-        Assert.assertEquals("Should have 1 database", 1, databases.size());
-        Assert.assertEquals(
-            "Should have db and default database", "db", 
databases.get(0).getField(0));
+        assertThat(databases).hasSize(1);
+        assertThat(databases.get(0).getField(0)).as("Should have db 
database").isEqualTo("db");
       }
     } else {
       // If there are multiple classes extends FlinkTestBase, 
TestHiveMetastore may loose the
       // creation for default
       // database. See HiveMetaStore.HMSHandler.init.
-      Assert.assertTrue(
-          "Should have db database",
-          databases.stream().anyMatch(d -> Objects.equals(d.getField(0), 
"db")));
+      assertThat(databases)
+          .as("Should have db database")
+          .anyMatch(d -> Objects.equals(d.getField(0), "db"));
     }
   }
 
-  @Test
+  @TestTemplate
   public void testCreateNamespaceWithMetadata() {
-    Assume.assumeFalse("HadoopCatalog does not support namespace metadata", 
isHadoopCatalog);
-
-    Assert.assertFalse(
-        "Namespace should not already exist",
-        validationNamespaceCatalog.namespaceExists(icebergNamespace));
-
+    assumeThat(isHadoopCatalog).as("HadoopCatalog does not support namespace 
metadata").isFalse();
+    assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace))
+        .as("Namespace should not already exist")
+        .isFalse();
     sql("CREATE DATABASE %s WITH ('prop'='value')", flinkDatabase);
-
-    Assert.assertTrue(
-        "Namespace should exist", 
validationNamespaceCatalog.namespaceExists(icebergNamespace));
-
+    assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace))
+        .as("Namespace should exist")
+        .isTrue();
     Map<String, String> nsMetadata =
         validationNamespaceCatalog.loadNamespaceMetadata(icebergNamespace);
-
-    Assert.assertEquals(
-        "Namespace should have expected prop value", "value", 
nsMetadata.get("prop"));
+    assertThat(nsMetadata).containsEntry("prop", "value");
   }
 
-  @Test
+  @TestTemplate
   public void testCreateNamespaceWithComment() {
-    Assume.assumeFalse("HadoopCatalog does not support namespace metadata", 
isHadoopCatalog);
-
-    Assert.assertFalse(
-        "Namespace should not already exist",
-        validationNamespaceCatalog.namespaceExists(icebergNamespace));
+    assumeThat(isHadoopCatalog).as("HadoopCatalog does not support namespace 
metadata").isFalse();
+    assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace))
+        .as("Namespace should not already exist")
+        .isFalse();
 
     sql("CREATE DATABASE %s COMMENT 'namespace doc'", flinkDatabase);
-
-    Assert.assertTrue(
-        "Namespace should exist", 
validationNamespaceCatalog.namespaceExists(icebergNamespace));
-
+    assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace))
+        .as("Namespace should exist")
+        .isTrue();
     Map<String, String> nsMetadata =
         validationNamespaceCatalog.loadNamespaceMetadata(icebergNamespace);
-
-    Assert.assertEquals(
-        "Namespace should have expected comment", "namespace doc", 
nsMetadata.get("comment"));
+    assertThat(nsMetadata).containsEntry("comment", "namespace doc");
   }
 
-  @Test
+  @TestTemplate
   public void testCreateNamespaceWithLocation() throws Exception {
-    Assume.assumeFalse("HadoopCatalog does not support namespace metadata", 
isHadoopCatalog);
-
-    Assert.assertFalse(
-        "Namespace should not already exist",
-        validationNamespaceCatalog.namespaceExists(icebergNamespace));
-
-    File location = TEMPORARY_FOLDER.newFile();
-    Assert.assertTrue(location.delete());
+    assumeThat(isHadoopCatalog).as("HadoopCatalog does not support namespace 
metadata").isFalse();
+    assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace))
+        .as("Namespace should not already exist")
+        .isFalse();
 
+    Path location = temporaryDirectory.getRoot();
     sql("CREATE DATABASE %s WITH ('location'='%s')", flinkDatabase, location);
-
-    Assert.assertTrue(
-        "Namespace should exist", 
validationNamespaceCatalog.namespaceExists(icebergNamespace));
-
+    assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace))
+        .as("Namespace should exist")
+        .isTrue();
     Map<String, String> nsMetadata =
         validationNamespaceCatalog.loadNamespaceMetadata(icebergNamespace);
-
-    Assert.assertEquals(
-        "Namespace should have expected location",
-        "file:" + location.getPath(),
-        nsMetadata.get("location"));
+    assertThat(nsMetadata).containsEntry("location", "file:" + 
location.getRoot());
   }
 
-  @Test
+  @TestTemplate
   public void testSetProperties() {
-    Assume.assumeFalse("HadoopCatalog does not support namespace metadata", 
isHadoopCatalog);
-
-    Assert.assertFalse(
-        "Namespace should not already exist",
-        validationNamespaceCatalog.namespaceExists(icebergNamespace));
+    assumeThat(isHadoopCatalog).as("HadoopCatalog does not support namespace 
metadata").isFalse();
+    assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace))
+        .as("Namespace should not already exist")
+        .isFalse();
 
     sql("CREATE DATABASE %s", flinkDatabase);
-
-    Assert.assertTrue(
-        "Namespace should exist", 
validationNamespaceCatalog.namespaceExists(icebergNamespace));
+    assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace))
+        .as("Namespace should exist")
+        .isTrue();
 
     Map<String, String> defaultMetadata =
         validationNamespaceCatalog.loadNamespaceMetadata(icebergNamespace);
-    Assert.assertFalse(
-        "Default metadata should not have custom property", 
defaultMetadata.containsKey("prop"));
-
+    assertThat(defaultMetadata).doesNotContainKey("prop");
     sql("ALTER DATABASE %s SET ('prop'='value')", flinkDatabase);
-
     Map<String, String> nsMetadata =
         validationNamespaceCatalog.loadNamespaceMetadata(icebergNamespace);
-
-    Assert.assertEquals(
-        "Namespace should have expected prop value", "value", 
nsMetadata.get("prop"));
+    assertThat(nsMetadata).containsEntry("prop", "value");
   }
 
-  @Test
+  @TestTemplate
   public void testHadoopNotSupportMeta() {
-    Assume.assumeTrue("HadoopCatalog does not support namespace metadata", 
isHadoopCatalog);
-
-    Assert.assertFalse(
-        "Namespace should not already exist",
-        validationNamespaceCatalog.namespaceExists(icebergNamespace));
-
+    assumeThat(isHadoopCatalog).as("HadoopCatalog does not support namespace 
metadata").isTrue();
+    assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace))
+        .as("Namespace should not already exist")
+        .isFalse();
     Assertions.assertThatThrownBy(
             () -> sql("CREATE DATABASE %s WITH ('prop'='value')", 
flinkDatabase))
         .cause()
diff --git 
a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTablePartitions.java
 
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTablePartitions.java
index 0008e4320c..05fd1bad1d 100644
--- 
a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTablePartitions.java
+++ 
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTablePartitions.java
@@ -18,6 +18,8 @@
  */
 package org.apache.iceberg.flink;
 
+import static org.assertj.core.api.Assertions.assertThat;
+
 import java.util.List;
 import org.apache.flink.table.catalog.CatalogPartitionSpec;
 import org.apache.flink.table.catalog.ObjectPath;
@@ -25,25 +27,28 @@ import 
org.apache.flink.table.catalog.exceptions.TableNotExistException;
 import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
 import org.apache.iceberg.CatalogProperties;
 import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Parameter;
+import org.apache.iceberg.Parameters;
 import org.apache.iceberg.catalog.Namespace;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.assertj.core.api.Assertions;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runners.Parameterized;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestTemplate;
 
-public class TestFlinkCatalogTablePartitions extends FlinkCatalogTestBase {
+public class TestFlinkCatalogTablePartitions extends CatalogTestBase {
 
   private String tableName = "test_table";
 
-  private final FileFormat format;
+  @Parameter(index = 2)
+  private FileFormat format;
+
+  @Parameter(index = 3)
+  private Boolean cacheEnabled;
 
-  @Parameterized.Parameters(
-      name = "catalogName={0}, baseNamespace={1}, format={2}, 
cacheEnabled={3}")
-  public static Iterable<Object[]> parameters() {
+  @Parameters(name = "catalogName={0}, baseNamespace={1}, format={2}, 
cacheEnabled={3}")
+  protected static List<Object[]> parameters() {
     List<Object[]> parameters = Lists.newArrayList();
     for (FileFormat format :
         new FileFormat[] {FileFormat.ORC, FileFormat.AVRO, 
FileFormat.PARQUET}) {
@@ -58,30 +63,24 @@ public class TestFlinkCatalogTablePartitions extends 
FlinkCatalogTestBase {
     return parameters;
   }
 
-  public TestFlinkCatalogTablePartitions(
-      String catalogName, Namespace baseNamespace, FileFormat format, boolean 
cacheEnabled) {
-    super(catalogName, baseNamespace);
-    this.format = format;
-    config.put(CatalogProperties.CACHE_ENABLED, String.valueOf(cacheEnabled));
-  }
-
   @Override
-  @Before
+  @BeforeEach
   public void before() {
     super.before();
+    config.put(CatalogProperties.CACHE_ENABLED, String.valueOf(cacheEnabled));
     sql("CREATE DATABASE %s", flinkDatabase);
     sql("USE CATALOG %s", catalogName);
     sql("USE %s", DATABASE);
   }
 
-  @After
+  @AfterEach
   public void cleanNamespaces() {
     sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName);
     sql("DROP DATABASE IF EXISTS %s", flinkDatabase);
     super.clean();
   }
 
-  @Test
+  @TestTemplate
   public void testListPartitionsWithUnpartitionedTable() {
     sql(
         "CREATE TABLE %s (id INT, data VARCHAR) with 
('write.format.default'='%s')",
@@ -96,7 +95,7 @@ public class TestFlinkCatalogTablePartitions extends 
FlinkCatalogTestBase {
         .hasMessageEndingWith("is not partitioned.");
   }
 
-  @Test
+  @TestTemplate
   public void testListPartitionsWithPartitionedTable()
       throws TableNotExistException, TableNotPartitionedException {
     sql(
@@ -109,13 +108,12 @@ public class TestFlinkCatalogTablePartitions extends 
FlinkCatalogTestBase {
     ObjectPath objectPath = new ObjectPath(DATABASE, tableName);
     FlinkCatalog flinkCatalog = (FlinkCatalog) 
getTableEnv().getCatalog(catalogName).get();
     List<CatalogPartitionSpec> list = flinkCatalog.listPartitions(objectPath);
-    Assert.assertEquals("Should have 2 partition", 2, list.size());
-
+    assertThat(list).hasSize(2);
     List<CatalogPartitionSpec> expected = Lists.newArrayList();
     CatalogPartitionSpec partitionSpec1 = new 
CatalogPartitionSpec(ImmutableMap.of("data", "a"));
     CatalogPartitionSpec partitionSpec2 = new 
CatalogPartitionSpec(ImmutableMap.of("data", "b"));
     expected.add(partitionSpec1);
     expected.add(partitionSpec2);
-    Assert.assertEquals("Should produce the expected catalog partition 
specs.", list, expected);
+    assertThat(list).as("Should produce the expected catalog partition 
specs.").isEqualTo(expected);
   }
 }
diff --git 
a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestMetadataTableReadableMetrics.java
 
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestMetadataTableReadableMetrics.java
index f05bf2fcd9..40dfda7237 100644
--- 
a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestMetadataTableReadableMetrics.java
+++ 
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestMetadataTableReadableMetrics.java
@@ -21,9 +21,11 @@ package org.apache.iceberg.flink.source;
 import static org.apache.iceberg.types.Types.NestedField.optional;
 import static org.apache.iceberg.types.Types.NestedField.required;
 
+import java.io.File;
 import java.io.IOException;
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
+import java.nio.file.Path;
 import java.util.Base64;
 import java.util.List;
 import org.apache.flink.configuration.Configuration;
@@ -32,6 +34,7 @@ import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.types.Row;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.Files;
+import org.apache.iceberg.Parameters;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
@@ -40,28 +43,22 @@ import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.data.FileHelpers;
 import org.apache.iceberg.data.GenericRecord;
 import org.apache.iceberg.data.Record;
-import org.apache.iceberg.flink.FlinkCatalogTestBase;
+import org.apache.iceberg.flink.CatalogTestBase;
 import org.apache.iceberg.flink.TestHelpers;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.types.Types;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runners.Parameterized;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.io.TempDir;
 
-public class TestMetadataTableReadableMetrics extends FlinkCatalogTestBase {
+public class TestMetadataTableReadableMetrics extends CatalogTestBase {
   private static final String TABLE_NAME = "test_table";
 
-  public TestMetadataTableReadableMetrics(String catalogName, Namespace 
baseNamespace) {
-    super(catalogName, baseNamespace);
-  }
-
-  @Parameterized.Parameters(name = "catalogName={0}, baseNamespace={1}")
-  public static Iterable<Object[]> parameters() {
+  @Parameters(name = "catalogName={0}, baseNamespace={1}")
+  protected static List<Object[]> parameters() {
     List<Object[]> parameters = Lists.newArrayList();
     String catalogName = "testhive";
     Namespace baseNamespace = Namespace.empty();
@@ -76,7 +73,7 @@ public class TestMetadataTableReadableMetrics extends 
FlinkCatalogTestBase {
     return super.getTableEnv();
   }
 
-  @Rule public TemporaryFolder temp = new TemporaryFolder();
+  private @TempDir Path temp;
 
   private static final Types.StructType LEAF_STRUCT_TYPE =
       Types.StructType.of(
@@ -134,8 +131,8 @@ public class TestMetadataTableReadableMetrics extends 
FlinkCatalogTestBase {
             createPrimitiveRecord(
                 false, 2, 2L, Float.NaN, 2.0D, new BigDecimal("2.00"), "2", 
null, null));
 
-    DataFile dataFile =
-        FileHelpers.writeDataFile(table, Files.localOutput(temp.newFile()), 
records);
+    File testFile = File.createTempFile("junit", null, temp.toFile());
+    DataFile dataFile = FileHelpers.writeDataFile(table, 
Files.localOutput(testFile), records);
     table.newAppend().appendFile(dataFile).commit();
     return table;
   }
@@ -153,12 +150,13 @@ public class TestMetadataTableReadableMetrics extends 
FlinkCatalogTestBase {
             createNestedRecord(0L, 0.0),
             createNestedRecord(1L, Double.NaN),
             createNestedRecord(null, null));
-    DataFile dataFile =
-        FileHelpers.writeDataFile(table, Files.localOutput(temp.newFile()), 
records);
+
+    File testFile = File.createTempFile("junit", null, temp.toFile());
+    DataFile dataFile = FileHelpers.writeDataFile(table, 
Files.localOutput(testFile), records);
     table.newAppend().appendFile(dataFile).commit();
   }
 
-  @Before
+  @BeforeEach
   public void before() {
     super.before();
     sql("USE CATALOG %s", catalogName);
@@ -167,7 +165,7 @@ public class TestMetadataTableReadableMetrics extends 
FlinkCatalogTestBase {
   }
 
   @Override
-  @After
+  @AfterEach
   public void clean() {
     sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, TABLE_NAME);
     sql("DROP DATABASE IF EXISTS %s", flinkDatabase);
@@ -212,7 +210,7 @@ public class TestMetadataTableReadableMetrics extends 
FlinkCatalogTestBase {
     return values;
   }
 
-  @Test
+  @TestTemplate
   public void testPrimitiveColumns() throws Exception {
     createPrimitiveTable();
     List<Row> result = sql("SELECT readable_metrics FROM %s$files", 
TABLE_NAME);
@@ -257,7 +255,7 @@ public class TestMetadataTableReadableMetrics extends 
FlinkCatalogTestBase {
     TestHelpers.assertRows(result, expected);
   }
 
-  @Test
+  @TestTemplate
   public void testSelectPrimitiveValues() throws Exception {
     createPrimitiveTable();
 
@@ -276,7 +274,7 @@ public class TestMetadataTableReadableMetrics extends 
FlinkCatalogTestBase {
         ImmutableList.of(Row.of(4L, 0)));
   }
 
-  @Test
+  @TestTemplate
   public void testSelectNestedValues() throws Exception {
     createNestedTable();
     TestHelpers.assertRows(
@@ -287,7 +285,7 @@ public class TestMetadataTableReadableMetrics extends 
FlinkCatalogTestBase {
         ImmutableList.of(Row.of(0L, 3L)));
   }
 
-  @Test
+  @TestTemplate
   public void testNestedValues() throws Exception {
     createNestedTable();
 

Reply via email to