This is an automated email from the ASF dual-hosted git repository.
pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 61532a042a Flink: Backport #9364 to 1.16 and 1.17 for Create
CatalogTestBase for migration to JUnit5 (#9601)
61532a042a is described below
commit 61532a042adcd2c916b62ace533129e9e7ad3128
Author: Rodrigo <[email protected]>
AuthorDate: Thu Feb 1 00:07:56 2024 -0800
Flink: Backport #9364 to 1.16 and 1.17 for Create CatalogTestBase for
migration to JUnit5 (#9601)
---
.../org/apache/iceberg/flink/CatalogTestBase.java | 143 +++++++++++
.../iceberg/flink/TestFlinkCatalogDatabase.java | 267 +++++++++------------
.../flink/TestFlinkCatalogTablePartitions.java | 46 ++--
.../source/TestMetadataTableReadableMetrics.java | 48 ++--
.../org/apache/iceberg/flink/CatalogTestBase.java | 143 +++++++++++
.../iceberg/flink/TestFlinkCatalogDatabase.java | 267 +++++++++------------
.../flink/TestFlinkCatalogTablePartitions.java | 46 ++--
.../source/TestMetadataTableReadableMetrics.java | 48 ++--
8 files changed, 604 insertions(+), 404 deletions(-)
diff --git
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/CatalogTestBase.java
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/CatalogTestBase.java
new file mode 100644
index 0000000000..91ed3c4ade
--- /dev/null
+++
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/CatalogTestBase.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import org.apache.flink.util.ArrayUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.Parameter;
+import org.apache.iceberg.ParameterizedTestExtension;
+import org.apache.iceberg.Parameters;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.hadoop.HadoopCatalog;
+import org.apache.iceberg.relocated.com.google.common.base.Joiner;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
+
+@ExtendWith(ParameterizedTestExtension.class)
+public abstract class CatalogTestBase extends TestBase {
+
+ protected static final String DATABASE = "db";
+ @TempDir protected File hiveWarehouse;
+ @TempDir protected File hadoopWarehouse;
+
+ @Parameter(index = 0)
+ protected String catalogName;
+
+ @Parameter(index = 1)
+ protected Namespace baseNamespace;
+
+ protected Catalog validationCatalog;
+ protected SupportsNamespaces validationNamespaceCatalog;
+ protected Map<String, String> config = Maps.newHashMap();
+
+ protected String flinkDatabase;
+ protected Namespace icebergNamespace;
+ protected boolean isHadoopCatalog;
+
+ @Parameters(name = "catalogName={0}, baseNamespace={1}")
+ protected static List<Object[]> parameters() {
+ return Arrays.asList(
+ new Object[] {"testhive", Namespace.empty()},
+ new Object[] {"testhadoop", Namespace.empty()},
+ new Object[] {"testhadoop_basenamespace", Namespace.of("l0", "l1")});
+ }
+
+ @BeforeEach
+ public void before() {
+ this.isHadoopCatalog = catalogName.startsWith("testhadoop");
+ this.validationCatalog =
+ isHadoopCatalog
+ ? new HadoopCatalog(hiveConf, "file:" + hadoopWarehouse.getPath())
+ : catalog;
+ this.validationNamespaceCatalog = (SupportsNamespaces) validationCatalog;
+
+ config.put("type", "iceberg");
+ if (!baseNamespace.isEmpty()) {
+ config.put(FlinkCatalogFactory.BASE_NAMESPACE, baseNamespace.toString());
+ }
+ if (isHadoopCatalog) {
+ config.put(FlinkCatalogFactory.ICEBERG_CATALOG_TYPE, "hadoop");
+ } else {
+ config.put(FlinkCatalogFactory.ICEBERG_CATALOG_TYPE, "hive");
+ config.put(CatalogProperties.URI, getURI(hiveConf));
+ }
+ config.put(CatalogProperties.WAREHOUSE_LOCATION,
String.format("file://%s", warehouseRoot()));
+
+ this.flinkDatabase = catalogName + "." + DATABASE;
+ this.icebergNamespace =
+ Namespace.of(ArrayUtils.concat(baseNamespace.levels(), new String[]
{DATABASE}));
+ sql("CREATE CATALOG %s WITH %s", catalogName, toWithClause(config));
+ }
+
+ @AfterEach
+ public void clean() {
+ dropCatalog(catalogName, true);
+ }
+
+ protected String warehouseRoot() {
+ if (isHadoopCatalog) {
+ return hadoopWarehouse.getAbsolutePath();
+ } else {
+ return hiveWarehouse.getAbsolutePath();
+ }
+ }
+
+ protected String getFullQualifiedTableName(String tableName) {
+ final List<String> levels = Lists.newArrayList(icebergNamespace.levels());
+ levels.add(tableName);
+ return Joiner.on('.').join(levels);
+ }
+
+ static String getURI(HiveConf conf) {
+ return conf.get(HiveConf.ConfVars.METASTOREURIS.varname);
+ }
+
+ static String toWithClause(Map<String, String> props) {
+ StringBuilder builder = new StringBuilder();
+ builder.append("(");
+ int propCount = 0;
+ for (Map.Entry<String, String> entry : props.entrySet()) {
+ if (propCount > 0) {
+ builder.append(",");
+ }
+ builder
+ .append("'")
+ .append(entry.getKey())
+ .append("'")
+ .append("=")
+ .append("'")
+ .append(entry.getValue())
+ .append("'");
+ propCount++;
+ }
+ builder.append(")");
+ return builder.toString();
+ }
+}
diff --git
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogDatabase.java
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogDatabase.java
index 47b47cb626..f46d50a5f0 100644
---
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogDatabase.java
+++
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogDatabase.java
@@ -18,7 +18,10 @@
*/
package org.apache.iceberg.flink;
-import java.io.File;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assumptions.assumeThat;
+
+import java.nio.file.Path;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -29,18 +32,12 @@ import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.types.Types;
import org.assertj.core.api.Assertions;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Assume;
-import org.junit.Test;
-
-public class TestFlinkCatalogDatabase extends FlinkCatalogTestBase {
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.TestTemplate;
- public TestFlinkCatalogDatabase(String catalogName, Namespace baseNamespace)
{
- super(catalogName, baseNamespace);
- }
+public class TestFlinkCatalogDatabase extends CatalogTestBase {
- @After
+ @AfterEach
@Override
public void clean() {
sql("DROP TABLE IF EXISTS %s.tl", flinkDatabase);
@@ -48,240 +45,204 @@ public class TestFlinkCatalogDatabase extends
FlinkCatalogTestBase {
super.clean();
}
- @Test
+ @TestTemplate
public void testCreateNamespace() {
- Assert.assertFalse(
- "Database should not already exist",
- validationNamespaceCatalog.namespaceExists(icebergNamespace));
+ assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace))
+ .as("Database should not already exist")
+ .isFalse();
sql("CREATE DATABASE %s", flinkDatabase);
-
- Assert.assertTrue(
- "Database should exist",
validationNamespaceCatalog.namespaceExists(icebergNamespace));
+ assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace))
+ .as("Database should exist")
+ .isTrue();
sql("CREATE DATABASE IF NOT EXISTS %s", flinkDatabase);
- Assert.assertTrue(
- "Database should still exist",
- validationNamespaceCatalog.namespaceExists(icebergNamespace));
+ assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace))
+ .as("Database should still exist")
+ .isTrue();
sql("DROP DATABASE IF EXISTS %s", flinkDatabase);
- Assert.assertFalse(
- "Database should be dropped",
validationNamespaceCatalog.namespaceExists(icebergNamespace));
+ assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace))
+ .as("Database should be dropped")
+ .isFalse();
sql("CREATE DATABASE IF NOT EXISTS %s", flinkDatabase);
- Assert.assertTrue(
- "Database should be created",
validationNamespaceCatalog.namespaceExists(icebergNamespace));
+ assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace))
+ .as("Database should be created")
+ .isTrue();
}
- @Test
+ @TestTemplate
public void testDropEmptyDatabase() {
- Assert.assertFalse(
- "Namespace should not already exist",
- validationNamespaceCatalog.namespaceExists(icebergNamespace));
-
+ assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace))
+ .as("Namespace should not already exist")
+ .isFalse();
sql("CREATE DATABASE %s", flinkDatabase);
-
- Assert.assertTrue(
- "Namespace should exist",
validationNamespaceCatalog.namespaceExists(icebergNamespace));
-
+ assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace))
+ .as("Namespace should exist")
+ .isTrue();
sql("DROP DATABASE %s", flinkDatabase);
-
- Assert.assertFalse(
- "Namespace should have been dropped",
- validationNamespaceCatalog.namespaceExists(icebergNamespace));
+ assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace))
+ .as("Namespace should have been dropped")
+ .isFalse();
}
- @Test
+ @TestTemplate
public void testDropNonEmptyNamespace() {
- Assume.assumeFalse(
- "Hadoop catalog throws IOException: Directory is not empty.",
isHadoopCatalog);
-
- Assert.assertFalse(
- "Namespace should not already exist",
- validationNamespaceCatalog.namespaceExists(icebergNamespace));
-
+ assumeThat(isHadoopCatalog)
+ .as("Hadoop catalog throws IOException: Directory is not empty.")
+ .isFalse();
+ assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace))
+ .as("Namespace should not already exist")
+ .isFalse();
sql("CREATE DATABASE %s", flinkDatabase);
-
validationCatalog.createTable(
TableIdentifier.of(icebergNamespace, "tl"),
new Schema(Types.NestedField.optional(0, "id", Types.LongType.get())));
-
- Assert.assertTrue(
- "Namespace should exist",
validationNamespaceCatalog.namespaceExists(icebergNamespace));
- Assert.assertTrue(
- "Table should exist",
- validationCatalog.tableExists(TableIdentifier.of(icebergNamespace,
"tl")));
-
+ assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace))
+ .as("Namespace should exist")
+ .isTrue();
+
assertThat(validationCatalog.tableExists(TableIdentifier.of(icebergNamespace,
"tl")))
+ .as("Table should exist")
+ .isTrue();
Assertions.assertThatThrownBy(() -> sql("DROP DATABASE %s", flinkDatabase))
.cause()
.isInstanceOf(DatabaseNotEmptyException.class)
.hasMessage(
String.format("Database %s in catalog %s is not empty.", DATABASE,
catalogName));
-
sql("DROP TABLE %s.tl", flinkDatabase);
}
- @Test
+ @TestTemplate
public void testListTables() {
- Assert.assertFalse(
- "Namespace should not already exist",
- validationNamespaceCatalog.namespaceExists(icebergNamespace));
-
+ assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace))
+ .as("Namespace should not already exist")
+ .isFalse();
sql("CREATE DATABASE %s", flinkDatabase);
sql("USE CATALOG %s", catalogName);
sql("USE %s", DATABASE);
-
- Assert.assertTrue(
- "Namespace should exist",
validationNamespaceCatalog.namespaceExists(icebergNamespace));
-
- Assert.assertEquals("Should not list any tables", 0, sql("SHOW
TABLES").size());
-
+ assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace))
+ .as("Namespace should exist")
+ .isTrue();
+ assertThat(sql("SHOW TABLES")).isEmpty();
validationCatalog.createTable(
TableIdentifier.of(icebergNamespace, "tl"),
new Schema(Types.NestedField.optional(0, "id", Types.LongType.get())));
List<Row> tables = sql("SHOW TABLES");
- Assert.assertEquals("Only 1 table", 1, tables.size());
- Assert.assertEquals("Table name should match", "tl",
tables.get(0).getField(0));
+ assertThat(tables).hasSize(1);
+ assertThat("tl").as("Table name should
match").isEqualTo(tables.get(0).getField(0));
}
- @Test
+ @TestTemplate
public void testListNamespace() {
- Assert.assertFalse(
- "Namespace should not already exist",
- validationNamespaceCatalog.namespaceExists(icebergNamespace));
-
+ assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace))
+ .as("Namespace should not already exist")
+ .isFalse();
sql("CREATE DATABASE %s", flinkDatabase);
sql("USE CATALOG %s", catalogName);
-
- Assert.assertTrue(
- "Namespace should exist",
validationNamespaceCatalog.namespaceExists(icebergNamespace));
+ assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace))
+ .as("Namespace should exist")
+ .isTrue();
List<Row> databases = sql("SHOW DATABASES");
if (isHadoopCatalog) {
- Assert.assertEquals("Should have 1 database", 1, databases.size());
- Assert.assertEquals("Should have db database", "db",
databases.get(0).getField(0));
-
+ assertThat(databases).hasSize(1);
+ assertThat(databases.get(0).getField(0)).as("Should have db
database").isEqualTo("db");
if (!baseNamespace.isEmpty()) {
// test namespace not belongs to this catalog
validationNamespaceCatalog.createNamespace(
Namespace.of(baseNamespace.level(0), "UNKNOWN_NAMESPACE"));
databases = sql("SHOW DATABASES");
- Assert.assertEquals("Should have 1 database", 1, databases.size());
- Assert.assertEquals(
- "Should have db and default database", "db",
databases.get(0).getField(0));
+ assertThat(databases).hasSize(1);
+ assertThat(databases.get(0).getField(0)).as("Should have db
database").isEqualTo("db");
}
} else {
// If there are multiple classes extends FlinkTestBase,
TestHiveMetastore may loose the
// creation for default
// database. See HiveMetaStore.HMSHandler.init.
- Assert.assertTrue(
- "Should have db database",
- databases.stream().anyMatch(d -> Objects.equals(d.getField(0),
"db")));
+ assertThat(databases)
+ .as("Should have db database")
+ .anyMatch(d -> Objects.equals(d.getField(0), "db"));
}
}
- @Test
+ @TestTemplate
public void testCreateNamespaceWithMetadata() {
- Assume.assumeFalse("HadoopCatalog does not support namespace metadata",
isHadoopCatalog);
-
- Assert.assertFalse(
- "Namespace should not already exist",
- validationNamespaceCatalog.namespaceExists(icebergNamespace));
-
+ assumeThat(isHadoopCatalog).as("HadoopCatalog does not support namespace
metadata").isFalse();
+ assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace))
+ .as("Namespace should not already exist")
+ .isFalse();
sql("CREATE DATABASE %s WITH ('prop'='value')", flinkDatabase);
-
- Assert.assertTrue(
- "Namespace should exist",
validationNamespaceCatalog.namespaceExists(icebergNamespace));
-
+ assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace))
+ .as("Namespace should exist")
+ .isTrue();
Map<String, String> nsMetadata =
validationNamespaceCatalog.loadNamespaceMetadata(icebergNamespace);
-
- Assert.assertEquals(
- "Namespace should have expected prop value", "value",
nsMetadata.get("prop"));
+ assertThat(nsMetadata).containsEntry("prop", "value");
}
- @Test
+ @TestTemplate
public void testCreateNamespaceWithComment() {
- Assume.assumeFalse("HadoopCatalog does not support namespace metadata",
isHadoopCatalog);
-
- Assert.assertFalse(
- "Namespace should not already exist",
- validationNamespaceCatalog.namespaceExists(icebergNamespace));
+ assumeThat(isHadoopCatalog).as("HadoopCatalog does not support namespace
metadata").isFalse();
+ assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace))
+ .as("Namespace should not already exist")
+ .isFalse();
sql("CREATE DATABASE %s COMMENT 'namespace doc'", flinkDatabase);
-
- Assert.assertTrue(
- "Namespace should exist",
validationNamespaceCatalog.namespaceExists(icebergNamespace));
-
+ assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace))
+ .as("Namespace should exist")
+ .isTrue();
Map<String, String> nsMetadata =
validationNamespaceCatalog.loadNamespaceMetadata(icebergNamespace);
-
- Assert.assertEquals(
- "Namespace should have expected comment", "namespace doc",
nsMetadata.get("comment"));
+ assertThat(nsMetadata).containsEntry("comment", "namespace doc");
}
- @Test
+ @TestTemplate
public void testCreateNamespaceWithLocation() throws Exception {
- Assume.assumeFalse("HadoopCatalog does not support namespace metadata",
isHadoopCatalog);
-
- Assert.assertFalse(
- "Namespace should not already exist",
- validationNamespaceCatalog.namespaceExists(icebergNamespace));
-
- File location = TEMPORARY_FOLDER.newFile();
- Assert.assertTrue(location.delete());
+ assumeThat(isHadoopCatalog).as("HadoopCatalog does not support namespace
metadata").isFalse();
+ assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace))
+ .as("Namespace should not already exist")
+ .isFalse();
+ Path location = temporaryDirectory.getRoot();
sql("CREATE DATABASE %s WITH ('location'='%s')", flinkDatabase, location);
-
- Assert.assertTrue(
- "Namespace should exist",
validationNamespaceCatalog.namespaceExists(icebergNamespace));
-
+ assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace))
+ .as("Namespace should exist")
+ .isTrue();
Map<String, String> nsMetadata =
validationNamespaceCatalog.loadNamespaceMetadata(icebergNamespace);
-
- Assert.assertEquals(
- "Namespace should have expected location",
- "file:" + location.getPath(),
- nsMetadata.get("location"));
+ assertThat(nsMetadata).containsEntry("location", "file:" +
location.getRoot());
}
- @Test
+ @TestTemplate
public void testSetProperties() {
- Assume.assumeFalse("HadoopCatalog does not support namespace metadata",
isHadoopCatalog);
-
- Assert.assertFalse(
- "Namespace should not already exist",
- validationNamespaceCatalog.namespaceExists(icebergNamespace));
+ assumeThat(isHadoopCatalog).as("HadoopCatalog does not support namespace
metadata").isFalse();
+ assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace))
+ .as("Namespace should not already exist")
+ .isFalse();
sql("CREATE DATABASE %s", flinkDatabase);
-
- Assert.assertTrue(
- "Namespace should exist",
validationNamespaceCatalog.namespaceExists(icebergNamespace));
+ assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace))
+ .as("Namespace should exist")
+ .isTrue();
Map<String, String> defaultMetadata =
validationNamespaceCatalog.loadNamespaceMetadata(icebergNamespace);
- Assert.assertFalse(
- "Default metadata should not have custom property",
defaultMetadata.containsKey("prop"));
-
+ assertThat(defaultMetadata).doesNotContainKey("prop");
sql("ALTER DATABASE %s SET ('prop'='value')", flinkDatabase);
-
Map<String, String> nsMetadata =
validationNamespaceCatalog.loadNamespaceMetadata(icebergNamespace);
-
- Assert.assertEquals(
- "Namespace should have expected prop value", "value",
nsMetadata.get("prop"));
+ assertThat(nsMetadata).containsEntry("prop", "value");
}
- @Test
+ @TestTemplate
public void testHadoopNotSupportMeta() {
- Assume.assumeTrue("HadoopCatalog does not support namespace metadata",
isHadoopCatalog);
-
- Assert.assertFalse(
- "Namespace should not already exist",
- validationNamespaceCatalog.namespaceExists(icebergNamespace));
-
+ assumeThat(isHadoopCatalog).as("HadoopCatalog does not support namespace
metadata").isTrue();
+ assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace))
+ .as("Namespace should not already exist")
+ .isFalse();
Assertions.assertThatThrownBy(
() -> sql("CREATE DATABASE %s WITH ('prop'='value')",
flinkDatabase))
.cause()
diff --git
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTablePartitions.java
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTablePartitions.java
index fad65f4c63..5716abf2e1 100644
---
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTablePartitions.java
+++
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTablePartitions.java
@@ -18,6 +18,8 @@
*/
package org.apache.iceberg.flink;
+import static org.assertj.core.api.Assertions.assertThat;
+
import java.util.List;
import org.apache.flink.table.catalog.CatalogPartitionSpec;
import org.apache.flink.table.catalog.ObjectPath;
@@ -25,25 +27,28 @@ import
org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Parameter;
+import org.apache.iceberg.Parameters;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.assertj.core.api.Assertions;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runners.Parameterized;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestTemplate;
-public class TestFlinkCatalogTablePartitions extends FlinkCatalogTestBase {
+public class TestFlinkCatalogTablePartitions extends CatalogTestBase {
private String tableName = "test_table";
- private final FileFormat format;
+ @Parameter(index = 2)
+ private FileFormat format;
+
+ @Parameter(index = 3)
+ private Boolean cacheEnabled;
- @Parameterized.Parameters(
- name = "catalogName={0}, baseNamespace={1}, format={2},
cacheEnabled={3}")
- public static Iterable<Object[]> parameters() {
+ @Parameters(name = "catalogName={0}, baseNamespace={1}, format={2},
cacheEnabled={3}")
+ protected static List<Object[]> parameters() {
List<Object[]> parameters = Lists.newArrayList();
for (FileFormat format :
new FileFormat[] {FileFormat.ORC, FileFormat.AVRO,
FileFormat.PARQUET}) {
@@ -58,30 +63,24 @@ public class TestFlinkCatalogTablePartitions extends
FlinkCatalogTestBase {
return parameters;
}
- public TestFlinkCatalogTablePartitions(
- String catalogName, Namespace baseNamespace, FileFormat format, boolean
cacheEnabled) {
- super(catalogName, baseNamespace);
- this.format = format;
- config.put(CatalogProperties.CACHE_ENABLED, String.valueOf(cacheEnabled));
- }
-
@Override
- @Before
+ @BeforeEach
public void before() {
super.before();
+ config.put(CatalogProperties.CACHE_ENABLED, String.valueOf(cacheEnabled));
sql("CREATE DATABASE %s", flinkDatabase);
sql("USE CATALOG %s", catalogName);
sql("USE %s", DATABASE);
}
- @After
+ @AfterEach
public void cleanNamespaces() {
sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName);
sql("DROP DATABASE IF EXISTS %s", flinkDatabase);
super.clean();
}
- @Test
+ @TestTemplate
public void testListPartitionsWithUnpartitionedTable() {
sql(
"CREATE TABLE %s (id INT, data VARCHAR) with
('write.format.default'='%s')",
@@ -95,7 +94,7 @@ public class TestFlinkCatalogTablePartitions extends
FlinkCatalogTestBase {
.hasMessage("Table " + objectPath + " in catalog " + catalogName + "
is not partitioned.");
}
- @Test
+ @TestTemplate
public void testListPartitionsWithPartitionedTable()
throws TableNotExistException, TableNotPartitionedException {
sql(
@@ -108,13 +107,12 @@ public class TestFlinkCatalogTablePartitions extends
FlinkCatalogTestBase {
ObjectPath objectPath = new ObjectPath(DATABASE, tableName);
FlinkCatalog flinkCatalog = (FlinkCatalog)
getTableEnv().getCatalog(catalogName).get();
List<CatalogPartitionSpec> list = flinkCatalog.listPartitions(objectPath);
- Assert.assertEquals("Should have 2 partition", 2, list.size());
-
+ assertThat(list).hasSize(2);
List<CatalogPartitionSpec> expected = Lists.newArrayList();
CatalogPartitionSpec partitionSpec1 = new
CatalogPartitionSpec(ImmutableMap.of("data", "a"));
CatalogPartitionSpec partitionSpec2 = new
CatalogPartitionSpec(ImmutableMap.of("data", "b"));
expected.add(partitionSpec1);
expected.add(partitionSpec2);
- Assert.assertEquals("Should produce the expected catalog partition
specs.", list, expected);
+ assertThat(list).as("Should produce the expected catalog partition
specs.").isEqualTo(expected);
}
}
diff --git
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestMetadataTableReadableMetrics.java
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestMetadataTableReadableMetrics.java
index f05bf2fcd9..40dfda7237 100644
---
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestMetadataTableReadableMetrics.java
+++
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestMetadataTableReadableMetrics.java
@@ -21,9 +21,11 @@ package org.apache.iceberg.flink.source;
import static org.apache.iceberg.types.Types.NestedField.optional;
import static org.apache.iceberg.types.Types.NestedField.required;
+import java.io.File;
import java.io.IOException;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
+import java.nio.file.Path;
import java.util.Base64;
import java.util.List;
import org.apache.flink.configuration.Configuration;
@@ -32,6 +34,7 @@ import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.types.Row;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.Files;
+import org.apache.iceberg.Parameters;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
@@ -40,28 +43,22 @@ import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.FileHelpers;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
-import org.apache.iceberg.flink.FlinkCatalogTestBase;
+import org.apache.iceberg.flink.CatalogTestBase;
import org.apache.iceberg.flink.TestHelpers;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Types;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runners.Parameterized;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.io.TempDir;
-public class TestMetadataTableReadableMetrics extends FlinkCatalogTestBase {
+public class TestMetadataTableReadableMetrics extends CatalogTestBase {
private static final String TABLE_NAME = "test_table";
- public TestMetadataTableReadableMetrics(String catalogName, Namespace
baseNamespace) {
- super(catalogName, baseNamespace);
- }
-
- @Parameterized.Parameters(name = "catalogName={0}, baseNamespace={1}")
- public static Iterable<Object[]> parameters() {
+ @Parameters(name = "catalogName={0}, baseNamespace={1}")
+ protected static List<Object[]> parameters() {
List<Object[]> parameters = Lists.newArrayList();
String catalogName = "testhive";
Namespace baseNamespace = Namespace.empty();
@@ -76,7 +73,7 @@ public class TestMetadataTableReadableMetrics extends
FlinkCatalogTestBase {
return super.getTableEnv();
}
- @Rule public TemporaryFolder temp = new TemporaryFolder();
+ private @TempDir Path temp;
private static final Types.StructType LEAF_STRUCT_TYPE =
Types.StructType.of(
@@ -134,8 +131,8 @@ public class TestMetadataTableReadableMetrics extends
FlinkCatalogTestBase {
createPrimitiveRecord(
false, 2, 2L, Float.NaN, 2.0D, new BigDecimal("2.00"), "2",
null, null));
- DataFile dataFile =
- FileHelpers.writeDataFile(table, Files.localOutput(temp.newFile()),
records);
+ File testFile = File.createTempFile("junit", null, temp.toFile());
+ DataFile dataFile = FileHelpers.writeDataFile(table,
Files.localOutput(testFile), records);
table.newAppend().appendFile(dataFile).commit();
return table;
}
@@ -153,12 +150,13 @@ public class TestMetadataTableReadableMetrics extends
FlinkCatalogTestBase {
createNestedRecord(0L, 0.0),
createNestedRecord(1L, Double.NaN),
createNestedRecord(null, null));
- DataFile dataFile =
- FileHelpers.writeDataFile(table, Files.localOutput(temp.newFile()),
records);
+
+ File testFile = File.createTempFile("junit", null, temp.toFile());
+ DataFile dataFile = FileHelpers.writeDataFile(table,
Files.localOutput(testFile), records);
table.newAppend().appendFile(dataFile).commit();
}
- @Before
+ @BeforeEach
public void before() {
super.before();
sql("USE CATALOG %s", catalogName);
@@ -167,7 +165,7 @@ public class TestMetadataTableReadableMetrics extends
FlinkCatalogTestBase {
}
@Override
- @After
+ @AfterEach
public void clean() {
sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, TABLE_NAME);
sql("DROP DATABASE IF EXISTS %s", flinkDatabase);
@@ -212,7 +210,7 @@ public class TestMetadataTableReadableMetrics extends
FlinkCatalogTestBase {
return values;
}
- @Test
+ @TestTemplate
public void testPrimitiveColumns() throws Exception {
createPrimitiveTable();
List<Row> result = sql("SELECT readable_metrics FROM %s$files",
TABLE_NAME);
@@ -257,7 +255,7 @@ public class TestMetadataTableReadableMetrics extends
FlinkCatalogTestBase {
TestHelpers.assertRows(result, expected);
}
- @Test
+ @TestTemplate
public void testSelectPrimitiveValues() throws Exception {
createPrimitiveTable();
@@ -276,7 +274,7 @@ public class TestMetadataTableReadableMetrics extends
FlinkCatalogTestBase {
ImmutableList.of(Row.of(4L, 0)));
}
- @Test
+ @TestTemplate
public void testSelectNestedValues() throws Exception {
createNestedTable();
TestHelpers.assertRows(
@@ -287,7 +285,7 @@ public class TestMetadataTableReadableMetrics extends
FlinkCatalogTestBase {
ImmutableList.of(Row.of(0L, 3L)));
}
- @Test
+ @TestTemplate
public void testNestedValues() throws Exception {
createNestedTable();
diff --git
a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/CatalogTestBase.java
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/CatalogTestBase.java
new file mode 100644
index 0000000000..91ed3c4ade
--- /dev/null
+++
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/CatalogTestBase.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import org.apache.flink.util.ArrayUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.Parameter;
+import org.apache.iceberg.ParameterizedTestExtension;
+import org.apache.iceberg.Parameters;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.hadoop.HadoopCatalog;
+import org.apache.iceberg.relocated.com.google.common.base.Joiner;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
+
+@ExtendWith(ParameterizedTestExtension.class)
+public abstract class CatalogTestBase extends TestBase {
+
+ protected static final String DATABASE = "db";
+ @TempDir protected File hiveWarehouse;
+ @TempDir protected File hadoopWarehouse;
+
+ @Parameter(index = 0)
+ protected String catalogName;
+
+ @Parameter(index = 1)
+ protected Namespace baseNamespace;
+
+ protected Catalog validationCatalog;
+ protected SupportsNamespaces validationNamespaceCatalog;
+ protected Map<String, String> config = Maps.newHashMap();
+
+ protected String flinkDatabase;
+ protected Namespace icebergNamespace;
+ protected boolean isHadoopCatalog;
+
+ @Parameters(name = "catalogName={0}, baseNamespace={1}")
+ protected static List<Object[]> parameters() {
+ return Arrays.asList(
+ new Object[] {"testhive", Namespace.empty()},
+ new Object[] {"testhadoop", Namespace.empty()},
+ new Object[] {"testhadoop_basenamespace", Namespace.of("l0", "l1")});
+ }
+
+ @BeforeEach
+ public void before() {
+ this.isHadoopCatalog = catalogName.startsWith("testhadoop");
+ this.validationCatalog =
+ isHadoopCatalog
+ ? new HadoopCatalog(hiveConf, "file:" + hadoopWarehouse.getPath())
+ : catalog;
+ this.validationNamespaceCatalog = (SupportsNamespaces) validationCatalog;
+
+ config.put("type", "iceberg");
+ if (!baseNamespace.isEmpty()) {
+ config.put(FlinkCatalogFactory.BASE_NAMESPACE, baseNamespace.toString());
+ }
+ if (isHadoopCatalog) {
+ config.put(FlinkCatalogFactory.ICEBERG_CATALOG_TYPE, "hadoop");
+ } else {
+ config.put(FlinkCatalogFactory.ICEBERG_CATALOG_TYPE, "hive");
+ config.put(CatalogProperties.URI, getURI(hiveConf));
+ }
+ config.put(CatalogProperties.WAREHOUSE_LOCATION,
String.format("file://%s", warehouseRoot()));
+
+ this.flinkDatabase = catalogName + "." + DATABASE;
+ this.icebergNamespace =
+ Namespace.of(ArrayUtils.concat(baseNamespace.levels(), new String[]
{DATABASE}));
+ sql("CREATE CATALOG %s WITH %s", catalogName, toWithClause(config));
+ }
+
+ @AfterEach
+ public void clean() {
+ dropCatalog(catalogName, true);
+ }
+
+ protected String warehouseRoot() {
+ if (isHadoopCatalog) {
+ return hadoopWarehouse.getAbsolutePath();
+ } else {
+ return hiveWarehouse.getAbsolutePath();
+ }
+ }
+
+ protected String getFullQualifiedTableName(String tableName) {
+ final List<String> levels = Lists.newArrayList(icebergNamespace.levels());
+ levels.add(tableName);
+ return Joiner.on('.').join(levels);
+ }
+
+ static String getURI(HiveConf conf) {
+ return conf.get(HiveConf.ConfVars.METASTOREURIS.varname);
+ }
+
+ static String toWithClause(Map<String, String> props) {
+ StringBuilder builder = new StringBuilder();
+ builder.append("(");
+ int propCount = 0;
+ for (Map.Entry<String, String> entry : props.entrySet()) {
+ if (propCount > 0) {
+ builder.append(",");
+ }
+ builder
+ .append("'")
+ .append(entry.getKey())
+ .append("'")
+ .append("=")
+ .append("'")
+ .append(entry.getValue())
+ .append("'");
+ propCount++;
+ }
+ builder.append(")");
+ return builder.toString();
+ }
+}
diff --git
a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogDatabase.java
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogDatabase.java
index 47b47cb626..f46d50a5f0 100644
---
a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogDatabase.java
+++
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogDatabase.java
@@ -18,7 +18,10 @@
*/
package org.apache.iceberg.flink;
-import java.io.File;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assumptions.assumeThat;
+
+import java.nio.file.Path;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -29,18 +32,12 @@ import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.types.Types;
import org.assertj.core.api.Assertions;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Assume;
-import org.junit.Test;
-
-public class TestFlinkCatalogDatabase extends FlinkCatalogTestBase {
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.TestTemplate;
- public TestFlinkCatalogDatabase(String catalogName, Namespace baseNamespace)
{
- super(catalogName, baseNamespace);
- }
+public class TestFlinkCatalogDatabase extends CatalogTestBase {
- @After
+ @AfterEach
@Override
public void clean() {
sql("DROP TABLE IF EXISTS %s.tl", flinkDatabase);
@@ -48,240 +45,204 @@ public class TestFlinkCatalogDatabase extends
FlinkCatalogTestBase {
super.clean();
}
- @Test
+ @TestTemplate
public void testCreateNamespace() {
- Assert.assertFalse(
- "Database should not already exist",
- validationNamespaceCatalog.namespaceExists(icebergNamespace));
+ assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace))
+ .as("Database should not already exist")
+ .isFalse();
sql("CREATE DATABASE %s", flinkDatabase);
-
- Assert.assertTrue(
- "Database should exist",
validationNamespaceCatalog.namespaceExists(icebergNamespace));
+ assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace))
+ .as("Database should exist")
+ .isTrue();
sql("CREATE DATABASE IF NOT EXISTS %s", flinkDatabase);
- Assert.assertTrue(
- "Database should still exist",
- validationNamespaceCatalog.namespaceExists(icebergNamespace));
+ assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace))
+ .as("Database should still exist")
+ .isTrue();
sql("DROP DATABASE IF EXISTS %s", flinkDatabase);
- Assert.assertFalse(
- "Database should be dropped",
validationNamespaceCatalog.namespaceExists(icebergNamespace));
+ assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace))
+ .as("Database should be dropped")
+ .isFalse();
sql("CREATE DATABASE IF NOT EXISTS %s", flinkDatabase);
- Assert.assertTrue(
- "Database should be created",
validationNamespaceCatalog.namespaceExists(icebergNamespace));
+ assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace))
+ .as("Database should be created")
+ .isTrue();
}
- @Test
+ @TestTemplate
public void testDropEmptyDatabase() {
- Assert.assertFalse(
- "Namespace should not already exist",
- validationNamespaceCatalog.namespaceExists(icebergNamespace));
-
+ assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace))
+ .as("Namespace should not already exist")
+ .isFalse();
sql("CREATE DATABASE %s", flinkDatabase);
-
- Assert.assertTrue(
- "Namespace should exist",
validationNamespaceCatalog.namespaceExists(icebergNamespace));
-
+ assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace))
+ .as("Namespace should exist")
+ .isTrue();
sql("DROP DATABASE %s", flinkDatabase);
-
- Assert.assertFalse(
- "Namespace should have been dropped",
- validationNamespaceCatalog.namespaceExists(icebergNamespace));
+ assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace))
+ .as("Namespace should have been dropped")
+ .isFalse();
}
- @Test
+ @TestTemplate
public void testDropNonEmptyNamespace() {
- Assume.assumeFalse(
- "Hadoop catalog throws IOException: Directory is not empty.",
isHadoopCatalog);
-
- Assert.assertFalse(
- "Namespace should not already exist",
- validationNamespaceCatalog.namespaceExists(icebergNamespace));
-
+ assumeThat(isHadoopCatalog)
+ .as("Hadoop catalog throws IOException: Directory is not empty.")
+ .isFalse();
+ assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace))
+ .as("Namespace should not already exist")
+ .isFalse();
sql("CREATE DATABASE %s", flinkDatabase);
-
validationCatalog.createTable(
TableIdentifier.of(icebergNamespace, "tl"),
new Schema(Types.NestedField.optional(0, "id", Types.LongType.get())));
-
- Assert.assertTrue(
- "Namespace should exist",
validationNamespaceCatalog.namespaceExists(icebergNamespace));
- Assert.assertTrue(
- "Table should exist",
- validationCatalog.tableExists(TableIdentifier.of(icebergNamespace,
"tl")));
-
+ assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace))
+ .as("Namespace should exist")
+ .isTrue();
+
assertThat(validationCatalog.tableExists(TableIdentifier.of(icebergNamespace,
"tl")))
+ .as("Table should exist")
+ .isTrue();
Assertions.assertThatThrownBy(() -> sql("DROP DATABASE %s", flinkDatabase))
.cause()
.isInstanceOf(DatabaseNotEmptyException.class)
.hasMessage(
String.format("Database %s in catalog %s is not empty.", DATABASE,
catalogName));
-
sql("DROP TABLE %s.tl", flinkDatabase);
}
- @Test
+ @TestTemplate
public void testListTables() {
- Assert.assertFalse(
- "Namespace should not already exist",
- validationNamespaceCatalog.namespaceExists(icebergNamespace));
-
+ assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace))
+ .as("Namespace should not already exist")
+ .isFalse();
sql("CREATE DATABASE %s", flinkDatabase);
sql("USE CATALOG %s", catalogName);
sql("USE %s", DATABASE);
-
- Assert.assertTrue(
- "Namespace should exist",
validationNamespaceCatalog.namespaceExists(icebergNamespace));
-
- Assert.assertEquals("Should not list any tables", 0, sql("SHOW
TABLES").size());
-
+ assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace))
+ .as("Namespace should exist")
+ .isTrue();
+ assertThat(sql("SHOW TABLES")).isEmpty();
validationCatalog.createTable(
TableIdentifier.of(icebergNamespace, "tl"),
new Schema(Types.NestedField.optional(0, "id", Types.LongType.get())));
List<Row> tables = sql("SHOW TABLES");
- Assert.assertEquals("Only 1 table", 1, tables.size());
- Assert.assertEquals("Table name should match", "tl",
tables.get(0).getField(0));
+ assertThat(tables).hasSize(1);
+ assertThat("tl").as("Table name should
match").isEqualTo(tables.get(0).getField(0));
}
- @Test
+ @TestTemplate
public void testListNamespace() {
- Assert.assertFalse(
- "Namespace should not already exist",
- validationNamespaceCatalog.namespaceExists(icebergNamespace));
-
+ assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace))
+ .as("Namespace should not already exist")
+ .isFalse();
sql("CREATE DATABASE %s", flinkDatabase);
sql("USE CATALOG %s", catalogName);
-
- Assert.assertTrue(
- "Namespace should exist",
validationNamespaceCatalog.namespaceExists(icebergNamespace));
+ assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace))
+ .as("Namespace should exist")
+ .isTrue();
List<Row> databases = sql("SHOW DATABASES");
if (isHadoopCatalog) {
- Assert.assertEquals("Should have 1 database", 1, databases.size());
- Assert.assertEquals("Should have db database", "db",
databases.get(0).getField(0));
-
+ assertThat(databases).hasSize(1);
+ assertThat(databases.get(0).getField(0)).as("Should have db
database").isEqualTo("db");
if (!baseNamespace.isEmpty()) {
// test namespace not belongs to this catalog
validationNamespaceCatalog.createNamespace(
Namespace.of(baseNamespace.level(0), "UNKNOWN_NAMESPACE"));
databases = sql("SHOW DATABASES");
- Assert.assertEquals("Should have 1 database", 1, databases.size());
- Assert.assertEquals(
- "Should have db and default database", "db",
databases.get(0).getField(0));
+ assertThat(databases).hasSize(1);
+ assertThat(databases.get(0).getField(0)).as("Should have db
database").isEqualTo("db");
}
} else {
// If there are multiple classes extends FlinkTestBase,
TestHiveMetastore may loose the
// creation for default
// database. See HiveMetaStore.HMSHandler.init.
- Assert.assertTrue(
- "Should have db database",
- databases.stream().anyMatch(d -> Objects.equals(d.getField(0),
"db")));
+ assertThat(databases)
+ .as("Should have db database")
+ .anyMatch(d -> Objects.equals(d.getField(0), "db"));
}
}
- @Test
+ @TestTemplate
public void testCreateNamespaceWithMetadata() {
- Assume.assumeFalse("HadoopCatalog does not support namespace metadata",
isHadoopCatalog);
-
- Assert.assertFalse(
- "Namespace should not already exist",
- validationNamespaceCatalog.namespaceExists(icebergNamespace));
-
+ assumeThat(isHadoopCatalog).as("HadoopCatalog does not support namespace
metadata").isFalse();
+ assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace))
+ .as("Namespace should not already exist")
+ .isFalse();
sql("CREATE DATABASE %s WITH ('prop'='value')", flinkDatabase);
-
- Assert.assertTrue(
- "Namespace should exist",
validationNamespaceCatalog.namespaceExists(icebergNamespace));
-
+ assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace))
+ .as("Namespace should exist")
+ .isTrue();
Map<String, String> nsMetadata =
validationNamespaceCatalog.loadNamespaceMetadata(icebergNamespace);
-
- Assert.assertEquals(
- "Namespace should have expected prop value", "value",
nsMetadata.get("prop"));
+ assertThat(nsMetadata).containsEntry("prop", "value");
}
- @Test
+ @TestTemplate
public void testCreateNamespaceWithComment() {
- Assume.assumeFalse("HadoopCatalog does not support namespace metadata",
isHadoopCatalog);
-
- Assert.assertFalse(
- "Namespace should not already exist",
- validationNamespaceCatalog.namespaceExists(icebergNamespace));
+ assumeThat(isHadoopCatalog).as("HadoopCatalog does not support namespace
metadata").isFalse();
+ assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace))
+ .as("Namespace should not already exist")
+ .isFalse();
sql("CREATE DATABASE %s COMMENT 'namespace doc'", flinkDatabase);
-
- Assert.assertTrue(
- "Namespace should exist",
validationNamespaceCatalog.namespaceExists(icebergNamespace));
-
+ assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace))
+ .as("Namespace should exist")
+ .isTrue();
Map<String, String> nsMetadata =
validationNamespaceCatalog.loadNamespaceMetadata(icebergNamespace);
-
- Assert.assertEquals(
- "Namespace should have expected comment", "namespace doc",
nsMetadata.get("comment"));
+ assertThat(nsMetadata).containsEntry("comment", "namespace doc");
}
- @Test
+ @TestTemplate
public void testCreateNamespaceWithLocation() throws Exception {
- Assume.assumeFalse("HadoopCatalog does not support namespace metadata",
isHadoopCatalog);
-
- Assert.assertFalse(
- "Namespace should not already exist",
- validationNamespaceCatalog.namespaceExists(icebergNamespace));
-
- File location = TEMPORARY_FOLDER.newFile();
- Assert.assertTrue(location.delete());
+ assumeThat(isHadoopCatalog).as("HadoopCatalog does not support namespace
metadata").isFalse();
+ assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace))
+ .as("Namespace should not already exist")
+ .isFalse();
+ Path location = temporaryDirectory.getRoot();
sql("CREATE DATABASE %s WITH ('location'='%s')", flinkDatabase, location);
-
- Assert.assertTrue(
- "Namespace should exist",
validationNamespaceCatalog.namespaceExists(icebergNamespace));
-
+ assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace))
+ .as("Namespace should exist")
+ .isTrue();
Map<String, String> nsMetadata =
validationNamespaceCatalog.loadNamespaceMetadata(icebergNamespace);
-
- Assert.assertEquals(
- "Namespace should have expected location",
- "file:" + location.getPath(),
- nsMetadata.get("location"));
+ assertThat(nsMetadata).containsEntry("location", "file:" +
location.getRoot());
}
- @Test
+ @TestTemplate
public void testSetProperties() {
- Assume.assumeFalse("HadoopCatalog does not support namespace metadata",
isHadoopCatalog);
-
- Assert.assertFalse(
- "Namespace should not already exist",
- validationNamespaceCatalog.namespaceExists(icebergNamespace));
+ assumeThat(isHadoopCatalog).as("HadoopCatalog does not support namespace
metadata").isFalse();
+ assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace))
+ .as("Namespace should not already exist")
+ .isFalse();
sql("CREATE DATABASE %s", flinkDatabase);
-
- Assert.assertTrue(
- "Namespace should exist",
validationNamespaceCatalog.namespaceExists(icebergNamespace));
+ assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace))
+ .as("Namespace should exist")
+ .isTrue();
Map<String, String> defaultMetadata =
validationNamespaceCatalog.loadNamespaceMetadata(icebergNamespace);
- Assert.assertFalse(
- "Default metadata should not have custom property",
defaultMetadata.containsKey("prop"));
-
+ assertThat(defaultMetadata).doesNotContainKey("prop");
sql("ALTER DATABASE %s SET ('prop'='value')", flinkDatabase);
-
Map<String, String> nsMetadata =
validationNamespaceCatalog.loadNamespaceMetadata(icebergNamespace);
-
- Assert.assertEquals(
- "Namespace should have expected prop value", "value",
nsMetadata.get("prop"));
+ assertThat(nsMetadata).containsEntry("prop", "value");
}
- @Test
+ @TestTemplate
public void testHadoopNotSupportMeta() {
- Assume.assumeTrue("HadoopCatalog does not support namespace metadata",
isHadoopCatalog);
-
- Assert.assertFalse(
- "Namespace should not already exist",
- validationNamespaceCatalog.namespaceExists(icebergNamespace));
-
+ assumeThat(isHadoopCatalog).as("HadoopCatalog does not support namespace
metadata").isTrue();
+ assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace))
+ .as("Namespace should not already exist")
+ .isFalse();
Assertions.assertThatThrownBy(
() -> sql("CREATE DATABASE %s WITH ('prop'='value')",
flinkDatabase))
.cause()
diff --git
a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTablePartitions.java
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTablePartitions.java
index 0008e4320c..05fd1bad1d 100644
---
a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTablePartitions.java
+++
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTablePartitions.java
@@ -18,6 +18,8 @@
*/
package org.apache.iceberg.flink;
+import static org.assertj.core.api.Assertions.assertThat;
+
import java.util.List;
import org.apache.flink.table.catalog.CatalogPartitionSpec;
import org.apache.flink.table.catalog.ObjectPath;
@@ -25,25 +27,28 @@ import
org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Parameter;
+import org.apache.iceberg.Parameters;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.assertj.core.api.Assertions;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runners.Parameterized;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestTemplate;
-public class TestFlinkCatalogTablePartitions extends FlinkCatalogTestBase {
+public class TestFlinkCatalogTablePartitions extends CatalogTestBase {
private String tableName = "test_table";
- private final FileFormat format;
+ @Parameter(index = 2)
+ private FileFormat format;
+
+ @Parameter(index = 3)
+ private Boolean cacheEnabled;
- @Parameterized.Parameters(
- name = "catalogName={0}, baseNamespace={1}, format={2},
cacheEnabled={3}")
- public static Iterable<Object[]> parameters() {
+ @Parameters(name = "catalogName={0}, baseNamespace={1}, format={2},
cacheEnabled={3}")
+ protected static List<Object[]> parameters() {
List<Object[]> parameters = Lists.newArrayList();
for (FileFormat format :
new FileFormat[] {FileFormat.ORC, FileFormat.AVRO,
FileFormat.PARQUET}) {
@@ -58,30 +63,24 @@ public class TestFlinkCatalogTablePartitions extends
FlinkCatalogTestBase {
return parameters;
}
- public TestFlinkCatalogTablePartitions(
- String catalogName, Namespace baseNamespace, FileFormat format, boolean
cacheEnabled) {
- super(catalogName, baseNamespace);
- this.format = format;
- config.put(CatalogProperties.CACHE_ENABLED, String.valueOf(cacheEnabled));
- }
-
@Override
- @Before
+ @BeforeEach
public void before() {
super.before();
+ config.put(CatalogProperties.CACHE_ENABLED, String.valueOf(cacheEnabled));
sql("CREATE DATABASE %s", flinkDatabase);
sql("USE CATALOG %s", catalogName);
sql("USE %s", DATABASE);
}
- @After
+ @AfterEach
public void cleanNamespaces() {
sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName);
sql("DROP DATABASE IF EXISTS %s", flinkDatabase);
super.clean();
}
- @Test
+ @TestTemplate
public void testListPartitionsWithUnpartitionedTable() {
sql(
"CREATE TABLE %s (id INT, data VARCHAR) with
('write.format.default'='%s')",
@@ -96,7 +95,7 @@ public class TestFlinkCatalogTablePartitions extends
FlinkCatalogTestBase {
.hasMessageEndingWith("is not partitioned.");
}
- @Test
+ @TestTemplate
public void testListPartitionsWithPartitionedTable()
throws TableNotExistException, TableNotPartitionedException {
sql(
@@ -109,13 +108,12 @@ public class TestFlinkCatalogTablePartitions extends
FlinkCatalogTestBase {
ObjectPath objectPath = new ObjectPath(DATABASE, tableName);
FlinkCatalog flinkCatalog = (FlinkCatalog)
getTableEnv().getCatalog(catalogName).get();
List<CatalogPartitionSpec> list = flinkCatalog.listPartitions(objectPath);
- Assert.assertEquals("Should have 2 partition", 2, list.size());
-
+ assertThat(list).hasSize(2);
List<CatalogPartitionSpec> expected = Lists.newArrayList();
CatalogPartitionSpec partitionSpec1 = new
CatalogPartitionSpec(ImmutableMap.of("data", "a"));
CatalogPartitionSpec partitionSpec2 = new
CatalogPartitionSpec(ImmutableMap.of("data", "b"));
expected.add(partitionSpec1);
expected.add(partitionSpec2);
- Assert.assertEquals("Should produce the expected catalog partition
specs.", list, expected);
+ assertThat(list).as("Should produce the expected catalog partition
specs.").isEqualTo(expected);
}
}
diff --git
a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestMetadataTableReadableMetrics.java
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestMetadataTableReadableMetrics.java
index f05bf2fcd9..40dfda7237 100644
---
a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestMetadataTableReadableMetrics.java
+++
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestMetadataTableReadableMetrics.java
@@ -21,9 +21,11 @@ package org.apache.iceberg.flink.source;
import static org.apache.iceberg.types.Types.NestedField.optional;
import static org.apache.iceberg.types.Types.NestedField.required;
+import java.io.File;
import java.io.IOException;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
+import java.nio.file.Path;
import java.util.Base64;
import java.util.List;
import org.apache.flink.configuration.Configuration;
@@ -32,6 +34,7 @@ import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.types.Row;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.Files;
+import org.apache.iceberg.Parameters;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
@@ -40,28 +43,22 @@ import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.FileHelpers;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
-import org.apache.iceberg.flink.FlinkCatalogTestBase;
+import org.apache.iceberg.flink.CatalogTestBase;
import org.apache.iceberg.flink.TestHelpers;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Types;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runners.Parameterized;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.io.TempDir;
-public class TestMetadataTableReadableMetrics extends FlinkCatalogTestBase {
+public class TestMetadataTableReadableMetrics extends CatalogTestBase {
private static final String TABLE_NAME = "test_table";
- public TestMetadataTableReadableMetrics(String catalogName, Namespace
baseNamespace) {
- super(catalogName, baseNamespace);
- }
-
- @Parameterized.Parameters(name = "catalogName={0}, baseNamespace={1}")
- public static Iterable<Object[]> parameters() {
+ @Parameters(name = "catalogName={0}, baseNamespace={1}")
+ protected static List<Object[]> parameters() {
List<Object[]> parameters = Lists.newArrayList();
String catalogName = "testhive";
Namespace baseNamespace = Namespace.empty();
@@ -76,7 +73,7 @@ public class TestMetadataTableReadableMetrics extends
FlinkCatalogTestBase {
return super.getTableEnv();
}
- @Rule public TemporaryFolder temp = new TemporaryFolder();
+ private @TempDir Path temp;
private static final Types.StructType LEAF_STRUCT_TYPE =
Types.StructType.of(
@@ -134,8 +131,8 @@ public class TestMetadataTableReadableMetrics extends
FlinkCatalogTestBase {
createPrimitiveRecord(
false, 2, 2L, Float.NaN, 2.0D, new BigDecimal("2.00"), "2",
null, null));
- DataFile dataFile =
- FileHelpers.writeDataFile(table, Files.localOutput(temp.newFile()),
records);
+ File testFile = File.createTempFile("junit", null, temp.toFile());
+ DataFile dataFile = FileHelpers.writeDataFile(table,
Files.localOutput(testFile), records);
table.newAppend().appendFile(dataFile).commit();
return table;
}
@@ -153,12 +150,13 @@ public class TestMetadataTableReadableMetrics extends
FlinkCatalogTestBase {
createNestedRecord(0L, 0.0),
createNestedRecord(1L, Double.NaN),
createNestedRecord(null, null));
- DataFile dataFile =
- FileHelpers.writeDataFile(table, Files.localOutput(temp.newFile()),
records);
+
+ File testFile = File.createTempFile("junit", null, temp.toFile());
+ DataFile dataFile = FileHelpers.writeDataFile(table,
Files.localOutput(testFile), records);
table.newAppend().appendFile(dataFile).commit();
}
- @Before
+ @BeforeEach
public void before() {
super.before();
sql("USE CATALOG %s", catalogName);
@@ -167,7 +165,7 @@ public class TestMetadataTableReadableMetrics extends
FlinkCatalogTestBase {
}
@Override
- @After
+ @AfterEach
public void clean() {
sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, TABLE_NAME);
sql("DROP DATABASE IF EXISTS %s", flinkDatabase);
@@ -212,7 +210,7 @@ public class TestMetadataTableReadableMetrics extends
FlinkCatalogTestBase {
return values;
}
- @Test
+ @TestTemplate
public void testPrimitiveColumns() throws Exception {
createPrimitiveTable();
List<Row> result = sql("SELECT readable_metrics FROM %s$files",
TABLE_NAME);
@@ -257,7 +255,7 @@ public class TestMetadataTableReadableMetrics extends
FlinkCatalogTestBase {
TestHelpers.assertRows(result, expected);
}
- @Test
+ @TestTemplate
public void testSelectPrimitiveValues() throws Exception {
createPrimitiveTable();
@@ -276,7 +274,7 @@ public class TestMetadataTableReadableMetrics extends
FlinkCatalogTestBase {
ImmutableList.of(Row.of(4L, 0)));
}
- @Test
+ @TestTemplate
public void testSelectNestedValues() throws Exception {
createNestedTable();
TestHelpers.assertRows(
@@ -287,7 +285,7 @@ public class TestMetadataTableReadableMetrics extends
FlinkCatalogTestBase {
ImmutableList.of(Row.of(0L, 3L)));
}
- @Test
+ @TestTemplate
public void testNestedValues() throws Exception {
createNestedTable();