This is an automated email from the ASF dual-hosted git repository.

etudenhoefner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 943321ee6d Flink: Migrate tests to JUnit5 (#10130)
943321ee6d is described below

commit 943321ee6d1a8fd9da4632c456c4507e62b2898e
Author: Tom Tanaka <[email protected]>
AuthorDate: Mon Apr 15 15:45:45 2024 +0900

    Flink: Migrate tests to JUnit5 (#10130)
---
 .../java/org/apache/iceberg/data/DataTest.java     |   8 +-
 .../apache/iceberg/data/avro/TestGenericData.java  |   7 +-
 .../apache/iceberg/data/orc/TestGenericData.java   |  81 +++---
 .../iceberg/data/parquet/TestGenericData.java      |  20 +-
 .../TestParquetEncryptionWithWriteSupport.java     |  41 +--
 .../flink/data/TestFlinkAvroReaderWriter.java      |  21 +-
 .../flink/data/TestFlinkOrcReaderWriter.java       |  19 +-
 .../iceberg/flink/data/TestFlinkParquetReader.java |  24 +-
 .../iceberg/flink/data/TestFlinkParquetWriter.java |  17 +-
 .../flink/data/TestFlinkAvroReaderWriter.java      |  21 +-
 .../flink/data/TestFlinkOrcReaderWriter.java       |  19 +-
 .../iceberg/flink/data/TestFlinkParquetReader.java |  24 +-
 .../iceberg/flink/data/TestFlinkParquetWriter.java |  17 +-
 .../flink/AvroGenericRecordConverterBase.java      |   2 +-
 .../iceberg/flink/TestDataFileSerialization.java   |  14 +-
 .../iceberg/flink/TestFlinkCatalogFactory.java     |  20 +-
 .../org/apache/iceberg/flink/TestFlinkFilters.java |  87 +++----
 .../apache/iceberg/flink/TestFlinkSchemaUtil.java  |  40 +--
 .../flink/TestManifestFileSerialization.java       |  17 +-
 .../apache/iceberg/flink/TestRowDataWrapper.java   |  18 +-
 .../iceberg/flink/TestTableSerialization.java      |  18 +-
 .../flink/data/TestFlinkAvroReaderWriter.java      |  21 +-
 .../flink/data/TestFlinkOrcReaderWriter.java       |  19 +-
 .../iceberg/flink/data/TestFlinkParquetReader.java |  24 +-
 .../iceberg/flink/data/TestFlinkParquetWriter.java |  17 +-
 .../iceberg/flink/data/TestRowProjection.java      | 280 +++++++++++----------
 .../iceberg/flink/data/TestStructRowData.java      |   2 +-
 .../sink/TestAvroGenericRecordToRowDataMapper.java |   5 +-
 .../TestRowDataToAvroGenericRecordConverter.java   |   5 +-
 .../iceberg/flink/util/TestFlinkPackage.java       |  11 +-
 30 files changed, 483 insertions(+), 436 deletions(-)

diff --git a/data/src/test/java/org/apache/iceberg/data/DataTest.java 
b/data/src/test/java/org/apache/iceberg/data/DataTest.java
index 7e32da4c6e..5ea742e451 100644
--- a/data/src/test/java/org/apache/iceberg/data/DataTest.java
+++ b/data/src/test/java/org/apache/iceberg/data/DataTest.java
@@ -22,6 +22,7 @@ import static 
org.apache.iceberg.types.Types.NestedField.optional;
 import static org.apache.iceberg.types.Types.NestedField.required;
 
 import java.io.IOException;
+import java.nio.file.Path;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.types.TypeUtil;
@@ -30,9 +31,8 @@ import org.apache.iceberg.types.Types.ListType;
 import org.apache.iceberg.types.Types.LongType;
 import org.apache.iceberg.types.Types.MapType;
 import org.apache.iceberg.types.Types.StructType;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
 
 public abstract class DataTest {
 
@@ -58,7 +58,7 @@ public abstract class DataTest {
           required(116, "dec_38_10", Types.DecimalType.of(38, 10)), // maximum 
precision
           required(117, "time", Types.TimeType.get()));
 
-  @Rule public TemporaryFolder temp = new TemporaryFolder();
+  @TempDir protected Path temp;
 
   @Test
   public void testSimpleStruct() throws IOException {
diff --git 
a/data/src/test/java/org/apache/iceberg/data/avro/TestGenericData.java 
b/data/src/test/java/org/apache/iceberg/data/avro/TestGenericData.java
index ef690919ae..83e8c09449 100644
--- a/data/src/test/java/org/apache/iceberg/data/avro/TestGenericData.java
+++ b/data/src/test/java/org/apache/iceberg/data/avro/TestGenericData.java
@@ -18,6 +18,8 @@
  */
 package org.apache.iceberg.data.avro;
 
+import static org.assertj.core.api.Assertions.assertThat;
+
 import java.io.File;
 import java.io.IOException;
 import java.util.List;
@@ -31,15 +33,14 @@ import org.apache.iceberg.data.RandomGenericData;
 import org.apache.iceberg.data.Record;
 import org.apache.iceberg.io.FileAppender;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
-import org.junit.Assert;
 
 public class TestGenericData extends DataTest {
   @Override
   protected void writeAndValidate(Schema schema) throws IOException {
     List<Record> expected = RandomGenericData.generate(schema, 100, 0L);
 
-    File testFile = temp.newFile();
-    Assert.assertTrue("Delete should succeed", testFile.delete());
+    File testFile = File.createTempFile("junit", null, temp.toFile());
+    assertThat(testFile.delete()).isTrue();
 
     try (FileAppender<Record> writer =
         Avro.write(Files.localOutput(testFile))
diff --git 
a/data/src/test/java/org/apache/iceberg/data/orc/TestGenericData.java 
b/data/src/test/java/org/apache/iceberg/data/orc/TestGenericData.java
index 180f7a6ad0..5147fd377c 100644
--- a/data/src/test/java/org/apache/iceberg/data/orc/TestGenericData.java
+++ b/data/src/test/java/org/apache/iceberg/data/orc/TestGenericData.java
@@ -20,6 +20,7 @@ package org.apache.iceberg.data.orc;
 
 import static org.apache.iceberg.types.Types.NestedField.optional;
 import static org.apache.iceberg.types.Types.NestedField.required;
+import static org.assertj.core.api.Assertions.assertThat;
 
 import java.io.File;
 import java.io.IOException;
@@ -49,8 +50,7 @@ import org.apache.orc.Writer;
 import org.apache.orc.storage.ql.exec.vector.BytesColumnVector;
 import org.apache.orc.storage.ql.exec.vector.LongColumnVector;
 import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
-import org.junit.Assert;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 public class TestGenericData extends DataTest {
 
@@ -97,8 +97,8 @@ public class TestGenericData extends DataTest {
       record4.setField("tsTzCol", 
OffsetDateTime.parse("1935-05-16T17:10:34-08:00"));
       record4.setField("tsCol", LocalDateTime.parse("1935-05-01T00:01:00"));
 
-      File testFile = temp.newFile();
-      Assert.assertTrue("Delete should succeed", testFile.delete());
+      File testFile = File.createTempFile("junit", null, temp.toFile());
+      assertThat(testFile.delete()).isTrue();
 
       try (FileAppender<Record> writer =
           ORC.write(Files.localOutput(testFile))
@@ -123,22 +123,42 @@ public class TestGenericData extends DataTest {
         rows = Lists.newArrayList(reader);
       }
 
-      Assert.assertEquals(
-          OffsetDateTime.parse("2017-01-17T01:10:34Z"), 
rows.get(0).getField("tsTzCol"));
-      Assert.assertEquals(
-          LocalDateTime.parse("1970-01-01T00:01:00"), 
rows.get(0).getField("tsCol"));
-      Assert.assertEquals(
-          OffsetDateTime.parse("2017-05-17T01:10:34Z"), 
rows.get(1).getField("tsTzCol"));
-      Assert.assertEquals(
-          LocalDateTime.parse("1970-05-01T00:01:00"), 
rows.get(1).getField("tsCol"));
-      Assert.assertEquals(
-          OffsetDateTime.parse("1935-01-17T01:10:34Z"), 
rows.get(2).getField("tsTzCol"));
-      Assert.assertEquals(
-          LocalDateTime.parse("1935-01-01T00:01:00"), 
rows.get(2).getField("tsCol"));
-      Assert.assertEquals(
-          OffsetDateTime.parse("1935-05-17T01:10:34Z"), 
rows.get(3).getField("tsTzCol"));
-      Assert.assertEquals(
-          LocalDateTime.parse("1935-05-01T00:01:00"), 
rows.get(3).getField("tsCol"));
+      assertThat(rows)
+          .element(0)
+          .satisfies(
+              record -> {
+                assertThat(record.getField("tsTzCol"))
+                    .isEqualTo(OffsetDateTime.parse("2017-01-17T01:10:34Z"));
+                assertThat(record.getField("tsCol"))
+                    .isEqualTo(LocalDateTime.parse("1970-01-01T00:01:00"));
+              });
+      assertThat(rows)
+          .element(1)
+          .satisfies(
+              record -> {
+                assertThat(record.getField("tsTzCol"))
+                    .isEqualTo(OffsetDateTime.parse("2017-05-17T01:10:34Z"));
+                assertThat(record.getField("tsCol"))
+                    .isEqualTo(LocalDateTime.parse("1970-05-01T00:01:00"));
+              });
+      assertThat(rows)
+          .element(2)
+          .satisfies(
+              record -> {
+                assertThat(record.getField("tsTzCol"))
+                    .isEqualTo(OffsetDateTime.parse("1935-01-17T01:10:34Z"));
+                assertThat(record.getField("tsCol"))
+                    .isEqualTo(LocalDateTime.parse("1935-01-01T00:01:00"));
+              });
+      assertThat(rows)
+          .element(3)
+          .satisfies(
+              record -> {
+                assertThat(record.getField("tsTzCol"))
+                    .isEqualTo(OffsetDateTime.parse("1935-05-17T01:10:34Z"));
+                assertThat(record.getField("tsCol"))
+                    .isEqualTo(LocalDateTime.parse("1935-05-01T00:01:00"));
+              });
     } finally {
       TimeZone.setDefault(currentTz);
     }
@@ -146,8 +166,8 @@ public class TestGenericData extends DataTest {
 
   @Test
   public void writeAndValidateExternalData() throws IOException {
-    File testFile = temp.newFile();
-    Assert.assertTrue("Delete should succeed", testFile.delete());
+    File testFile = File.createTempFile("junit", null, temp.toFile());
+    assertThat(testFile.delete()).isTrue();
 
     Configuration conf = new Configuration();
     TypeDescription writerSchema =
@@ -179,15 +199,20 @@ public class TestGenericData extends DataTest {
             .build()) {
       rows = Lists.newArrayList(reader);
     }
-    Assert.assertEquals(1, rows.get(0).getField("a"));
-    Assert.assertEquals(123, rows.get(0).getField("b"));
-    Assert.assertEquals("1", rows.get(0).getField("c"));
-    Assert.assertEquals("123", rows.get(0).getField("d"));
+    assertThat(rows)
+        .first()
+        .satisfies(
+            record -> {
+              assertThat(record.getField("a")).isEqualTo(1);
+              assertThat(record.getField("b")).isEqualTo(123);
+              assertThat(record.getField("c")).isEqualTo("1");
+              assertThat(record.getField("d")).isEqualTo("123");
+            });
   }
 
   private void writeAndValidateRecords(Schema schema, List<Record> expected) 
throws IOException {
-    File testFile = temp.newFile();
-    Assert.assertTrue("Delete should succeed", testFile.delete());
+    File testFile = File.createTempFile("junit", null, temp.toFile());
+    assertThat(testFile.delete()).isTrue();
 
     try (FileAppender<Record> writer =
         ORC.write(Files.localOutput(testFile))
diff --git 
a/data/src/test/java/org/apache/iceberg/data/parquet/TestGenericData.java 
b/data/src/test/java/org/apache/iceberg/data/parquet/TestGenericData.java
index 71dd16d584..6de5657058 100644
--- a/data/src/test/java/org/apache/iceberg/data/parquet/TestGenericData.java
+++ b/data/src/test/java/org/apache/iceberg/data/parquet/TestGenericData.java
@@ -19,6 +19,7 @@
 package org.apache.iceberg.data.parquet;
 
 import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.assertj.core.api.Assertions.assertThat;
 
 import java.io.File;
 import java.io.IOException;
@@ -44,16 +45,15 @@ import 
org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.types.Types;
 import org.apache.parquet.avro.AvroParquetWriter;
 import org.apache.parquet.hadoop.ParquetWriter;
-import org.junit.Assert;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 public class TestGenericData extends DataTest {
   @Override
   protected void writeAndValidate(Schema schema) throws IOException {
     List<Record> expected = RandomGenericData.generate(schema, 100, 0L);
 
-    File testFile = temp.newFile();
-    Assert.assertTrue("Delete should succeed", testFile.delete());
+    File testFile = File.createTempFile("junit", null, temp.toFile());
+    assertThat(testFile.delete()).isTrue();
 
     try (FileAppender<Record> appender =
         Parquet.write(Files.localOutput(testFile))
@@ -101,8 +101,8 @@ public class TestGenericData extends DataTest {
             optional(2, "topbytes", Types.BinaryType.get()));
     org.apache.avro.Schema avroSchema = 
AvroSchemaUtil.convert(schema.asStruct());
 
-    File testFile = temp.newFile();
-    Assert.assertTrue(testFile.delete());
+    File testFile = File.createTempFile("junit", null, temp.toFile());
+    assertThat(testFile.delete()).isTrue();
 
     ParquetWriter<org.apache.avro.generic.GenericRecord> writer =
         AvroParquetWriter.<org.apache.avro.generic.GenericRecord>builder(new 
Path(testFile.toURI()))
@@ -132,12 +132,12 @@ public class TestGenericData extends DataTest {
             .createReaderFunc(fileSchema -> 
GenericParquetReaders.buildReader(schema, fileSchema))
             .build()) {
       CloseableIterator it = reader.iterator();
-      Assert.assertTrue("Should have at least one row", it.hasNext());
+      assertThat(it).hasNext();
       while (it.hasNext()) {
         GenericRecord actualRecord = (GenericRecord) it.next();
-        Assert.assertEquals(actualRecord.get(0, ArrayList.class).get(0), 
expectedBinary);
-        Assert.assertEquals(actualRecord.get(1, ByteBuffer.class), 
expectedBinary);
-        Assert.assertFalse("Should not have more than one row", it.hasNext());
+        assertThat(actualRecord.get(0, 
ArrayList.class)).first().isEqualTo(expectedBinary);
+        assertThat(actualRecord.get(1, 
ByteBuffer.class)).isEqualTo(expectedBinary);
+        assertThat(it).isExhausted();
       }
     }
   }
diff --git 
a/data/src/test/java/org/apache/iceberg/data/parquet/TestParquetEncryptionWithWriteSupport.java
 
b/data/src/test/java/org/apache/iceberg/data/parquet/TestParquetEncryptionWithWriteSupport.java
index ba382e1c8a..c6a5ed9f6d 100644
--- 
a/data/src/test/java/org/apache/iceberg/data/parquet/TestParquetEncryptionWithWriteSupport.java
+++ 
b/data/src/test/java/org/apache/iceberg/data/parquet/TestParquetEncryptionWithWriteSupport.java
@@ -20,6 +20,8 @@ package org.apache.iceberg.data.parquet;
 
 import static org.apache.iceberg.Files.localInput;
 import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 import java.io.File;
 import java.io.IOException;
@@ -48,8 +50,7 @@ import org.apache.parquet.avro.AvroParquetWriter;
 import org.apache.parquet.crypto.FileEncryptionProperties;
 import org.apache.parquet.crypto.ParquetCryptoRuntimeException;
 import org.apache.parquet.hadoop.ParquetWriter;
-import org.junit.Assert;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 public class TestParquetEncryptionWithWriteSupport extends DataTest {
   private static final ByteBuffer fileDek = ByteBuffer.allocate(16);
@@ -59,8 +60,8 @@ public class TestParquetEncryptionWithWriteSupport extends 
DataTest {
   protected void writeAndValidate(Schema schema) throws IOException {
     List<Record> expected = RandomGenericData.generate(schema, 100, 0L);
 
-    File testFile = temp.newFile();
-    Assert.assertTrue("Delete should succeed", testFile.delete());
+    File testFile = File.createTempFile("junit", null, temp.toFile());
+    assertThat(testFile.delete()).isTrue();
 
     SecureRandom rand = new SecureRandom();
     rand.nextBytes(fileDek.array());
@@ -76,16 +77,16 @@ public class TestParquetEncryptionWithWriteSupport extends 
DataTest {
       appender.addAll(expected);
     }
 
-    Assert.assertThrows(
-        "Decrypted without keys",
-        ParquetCryptoRuntimeException.class,
-        () ->
-            Parquet.read(localInput(testFile))
-                .project(schema)
-                .createReaderFunc(
-                    fileSchema -> GenericParquetReaders.buildReader(schema, 
fileSchema))
-                .build()
-                .iterator());
+    assertThatThrownBy(
+            () ->
+                Parquet.read(localInput(testFile))
+                    .project(schema)
+                    .createReaderFunc(
+                        fileSchema -> 
GenericParquetReaders.buildReader(schema, fileSchema))
+                    .build()
+                    .iterator())
+        .hasMessage("Trying to read file with encrypted footer. No keys 
available")
+        .isInstanceOf(ParquetCryptoRuntimeException.class);
 
     List<Record> rows;
     try (CloseableIterable<Record> reader =
@@ -129,8 +130,8 @@ public class TestParquetEncryptionWithWriteSupport extends 
DataTest {
             optional(2, "topbytes", Types.BinaryType.get()));
     org.apache.avro.Schema avroSchema = 
AvroSchemaUtil.convert(schema.asStruct());
 
-    File testFile = temp.newFile();
-    Assert.assertTrue(testFile.delete());
+    File testFile = File.createTempFile("junit", null, temp.toFile());
+    assertThat(testFile.delete()).isTrue();
 
     SecureRandom rand = new SecureRandom();
     rand.nextBytes(fileDek.array());
@@ -169,12 +170,12 @@ public class TestParquetEncryptionWithWriteSupport 
extends DataTest {
             .createReaderFunc(fileSchema -> 
GenericParquetReaders.buildReader(schema, fileSchema))
             .build()) {
       CloseableIterator it = reader.iterator();
-      Assert.assertTrue("Should have at least one row", it.hasNext());
+      assertThat(it).hasNext();
       while (it.hasNext()) {
         GenericRecord actualRecord = (GenericRecord) it.next();
-        Assert.assertEquals(actualRecord.get(0, ArrayList.class).get(0), 
expectedBinary);
-        Assert.assertEquals(actualRecord.get(1, ByteBuffer.class), 
expectedBinary);
-        Assert.assertFalse("Should not have more than one row", it.hasNext());
+        assertThat(actualRecord.get(0, 
ArrayList.class)).first().isEqualTo(expectedBinary);
+        assertThat(actualRecord.get(1, 
ByteBuffer.class)).isEqualTo(expectedBinary);
+        assertThat(it).isExhausted();
       }
     }
   }
diff --git 
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkAvroReaderWriter.java
 
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkAvroReaderWriter.java
index e8aab824ea..a1039d27d8 100644
--- 
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkAvroReaderWriter.java
+++ 
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkAvroReaderWriter.java
@@ -18,6 +18,8 @@
  */
 package org.apache.iceberg.flink.data;
 
+import static org.assertj.core.api.Assertions.assertThat;
+
 import java.io.File;
 import java.io.IOException;
 import java.math.BigDecimal;
@@ -44,8 +46,7 @@ import 
org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.types.Types;
 import org.apache.iceberg.util.DateTimeUtil;
-import org.junit.Assert;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 public class TestFlinkAvroReaderWriter extends DataTest {
 
@@ -74,8 +75,8 @@ public class TestFlinkAvroReaderWriter extends DataTest {
     RowType flinkSchema = FlinkSchemaUtil.convert(schema);
     List<RowData> expectedRows = 
Lists.newArrayList(RandomRowData.convert(schema, expectedRecords));
 
-    File recordsFile = temp.newFile();
-    Assert.assertTrue("Delete should succeed", recordsFile.delete());
+    File recordsFile = File.createTempFile("junit", null, temp.toFile());
+    assertThat(recordsFile.delete()).isTrue();
 
     // Write the expected records into AVRO file, then read them into RowData 
and assert with the
     // expected Record list.
@@ -95,14 +96,14 @@ public class TestFlinkAvroReaderWriter extends DataTest {
       Iterator<Record> expected = expectedRecords.iterator();
       Iterator<RowData> rows = reader.iterator();
       for (int i = 0; i < numRecord; i++) {
-        Assert.assertTrue("Should have expected number of records", 
rows.hasNext());
+        assertThat(rows).hasNext();
         TestHelpers.assertRowData(schema.asStruct(), flinkSchema, 
expected.next(), rows.next());
       }
-      Assert.assertFalse("Should not have extra records", rows.hasNext());
+      assertThat(rows).isExhausted();
     }
 
-    File rowDataFile = temp.newFile();
-    Assert.assertTrue("Delete should succeed", rowDataFile.delete());
+    File rowDataFile = File.createTempFile("junit", null, temp.toFile());
+    assertThat(rowDataFile.delete()).isTrue();
 
     // Write the expected RowData into AVRO file, then read them into Record 
and assert with the
     // expected RowData list.
@@ -122,10 +123,10 @@ public class TestFlinkAvroReaderWriter extends DataTest {
       Iterator<RowData> expected = expectedRows.iterator();
       Iterator<Record> records = reader.iterator();
       for (int i = 0; i < numRecord; i += 1) {
-        Assert.assertTrue("Should have expected number of records", 
records.hasNext());
+        assertThat(records).hasNext();
         TestHelpers.assertRowData(schema.asStruct(), flinkSchema, 
records.next(), expected.next());
       }
-      Assert.assertFalse("Should not have extra records", records.hasNext());
+      assertThat(records).isExhausted();
     }
   }
 
diff --git 
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkOrcReaderWriter.java
 
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkOrcReaderWriter.java
index fdffc0e01c..72f2ce4f4b 100644
--- 
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkOrcReaderWriter.java
+++ 
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkOrcReaderWriter.java
@@ -18,6 +18,8 @@
  */
 package org.apache.iceberg.flink.data;
 
+import static org.assertj.core.api.Assertions.assertThat;
+
 import java.io.File;
 import java.io.IOException;
 import java.util.Iterator;
@@ -37,7 +39,6 @@ import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.FileAppender;
 import org.apache.iceberg.orc.ORC;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
-import org.junit.Assert;
 
 public class TestFlinkOrcReaderWriter extends DataTest {
   private static final int NUM_RECORDS = 100;
@@ -48,8 +49,8 @@ public class TestFlinkOrcReaderWriter extends DataTest {
     List<Record> expectedRecords = RandomGenericData.generate(schema, 
NUM_RECORDS, 1990L);
     List<RowData> expectedRows = 
Lists.newArrayList(RandomRowData.convert(schema, expectedRecords));
 
-    File recordsFile = temp.newFile();
-    Assert.assertTrue("Delete should succeed", recordsFile.delete());
+    File recordsFile = File.createTempFile("junit", null, temp.toFile());
+    assertThat(recordsFile.delete()).isTrue();
 
     // Write the expected records into ORC file, then read them into RowData 
and assert with the
     // expected Record list.
@@ -69,14 +70,14 @@ public class TestFlinkOrcReaderWriter extends DataTest {
       Iterator<Record> expected = expectedRecords.iterator();
       Iterator<RowData> rows = reader.iterator();
       for (int i = 0; i < NUM_RECORDS; i++) {
-        Assert.assertTrue("Should have expected number of records", 
rows.hasNext());
+        assertThat(rows).hasNext();
         TestHelpers.assertRowData(schema.asStruct(), flinkSchema, 
expected.next(), rows.next());
       }
-      Assert.assertFalse("Should not have extra records", rows.hasNext());
+      assertThat(rows).isExhausted();
     }
 
-    File rowDataFile = temp.newFile();
-    Assert.assertTrue("Delete should succeed", rowDataFile.delete());
+    File rowDataFile = File.createTempFile("junit", null, temp.toFile());
+    assertThat(rowDataFile.delete()).isTrue();
 
     // Write the expected RowData into ORC file, then read them into Record 
and assert with the
     // expected RowData list.
@@ -97,10 +98,10 @@ public class TestFlinkOrcReaderWriter extends DataTest {
       Iterator<RowData> expected = expectedRows.iterator();
       Iterator<Record> records = reader.iterator();
       for (int i = 0; i < NUM_RECORDS; i += 1) {
-        Assert.assertTrue("Should have expected number of records", 
records.hasNext());
+        assertThat(records.hasNext()).isTrue();
         TestHelpers.assertRowData(schema.asStruct(), flinkSchema, 
records.next(), expected.next());
       }
-      Assert.assertFalse("Should not have extra records", records.hasNext());
+      assertThat(records).isExhausted();
     }
   }
 }
diff --git 
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java
 
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java
index 30a2a7bb51..1fdc4cf838 100644
--- 
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java
+++ 
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java
@@ -19,6 +19,7 @@
 package org.apache.iceberg.flink.data;
 
 import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.assertj.core.api.Assertions.assertThat;
 
 import java.io.File;
 import java.io.IOException;
@@ -47,8 +48,7 @@ import 
org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.types.Types;
 import org.apache.parquet.avro.AvroParquetWriter;
 import org.apache.parquet.hadoop.ParquetWriter;
-import org.junit.Assert;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 public class TestFlinkParquetReader extends DataTest {
   private static final int NUM_RECORDS = 100;
@@ -61,8 +61,8 @@ public class TestFlinkParquetReader extends DataTest {
             optional(2, "topbytes", Types.BinaryType.get()));
     org.apache.avro.Schema avroSchema = 
AvroSchemaUtil.convert(schema.asStruct());
 
-    File testFile = temp.newFile();
-    Assert.assertTrue(testFile.delete());
+    File testFile = File.createTempFile("junit", null, temp.toFile());
+    assertThat(testFile.delete()).isTrue();
 
     ParquetWriter<GenericRecord> writer =
         AvroParquetWriter.<GenericRecord>builder(new Path(testFile.toURI()))
@@ -90,17 +90,17 @@ public class TestFlinkParquetReader extends DataTest {
             .createReaderFunc(type -> FlinkParquetReaders.buildReader(schema, 
type))
             .build()) {
       Iterator<RowData> rows = reader.iterator();
-      Assert.assertTrue("Should have at least one row", rows.hasNext());
+      assertThat(rows).hasNext();
       RowData rowData = rows.next();
-      Assert.assertArrayEquals(rowData.getArray(0).getBinary(0), expectedByte);
-      Assert.assertArrayEquals(rowData.getBinary(1), expectedByte);
-      Assert.assertFalse("Should not have more than one row", rows.hasNext());
+      assertThat(rowData.getArray(0).getBinary(0)).isEqualTo(expectedByte);
+      assertThat(rowData.getBinary(1)).isEqualTo(expectedByte);
+      assertThat(rows).isExhausted();
     }
   }
 
   private void writeAndValidate(Iterable<Record> iterable, Schema schema) 
throws IOException {
-    File testFile = temp.newFile();
-    Assert.assertTrue("Delete should succeed", testFile.delete());
+    File testFile = File.createTempFile("junit", null, temp.toFile());
+    assertThat(testFile.delete()).isTrue();
 
     try (FileAppender<Record> writer =
         Parquet.write(Files.localOutput(testFile))
@@ -119,10 +119,10 @@ public class TestFlinkParquetReader extends DataTest {
       Iterator<RowData> rows = reader.iterator();
       LogicalType rowType = FlinkSchemaUtil.convert(schema);
       for (int i = 0; i < NUM_RECORDS; i += 1) {
-        Assert.assertTrue("Should have expected number of rows", 
rows.hasNext());
+        assertThat(rows).hasNext();
         TestHelpers.assertRowData(schema.asStruct(), rowType, expected.next(), 
rows.next());
       }
-      Assert.assertFalse("Should not have extra rows", rows.hasNext());
+      assertThat(rows).isExhausted();
     }
   }
 
diff --git 
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetWriter.java
 
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetWriter.java
index 7b868eafc3..b1e6f5aa00 100644
--- 
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetWriter.java
+++ 
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetWriter.java
@@ -18,8 +18,11 @@
  */
 package org.apache.iceberg.flink.data;
 
+import static org.assertj.core.api.Assertions.assertThat;
+
 import java.io.File;
 import java.io.IOException;
+import java.nio.file.Path;
 import java.util.Iterator;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.logical.LogicalType;
@@ -34,18 +37,16 @@ import org.apache.iceberg.flink.TestHelpers;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.FileAppender;
 import org.apache.iceberg.parquet.Parquet;
-import org.junit.Assert;
-import org.junit.Rule;
-import org.junit.rules.TemporaryFolder;
+import org.junit.jupiter.api.io.TempDir;
 
 public class TestFlinkParquetWriter extends DataTest {
   private static final int NUM_RECORDS = 100;
 
-  @Rule public TemporaryFolder temp = new TemporaryFolder();
+  @TempDir private Path temp;
 
   private void writeAndValidate(Iterable<RowData> iterable, Schema schema) 
throws IOException {
-    File testFile = temp.newFile();
-    Assert.assertTrue("Delete should succeed", testFile.delete());
+    File testFile = File.createTempFile("junit", null, temp.toFile());
+    assertThat(testFile.delete()).isTrue();
 
     LogicalType logicalType = FlinkSchemaUtil.convert(schema);
 
@@ -66,10 +67,10 @@ public class TestFlinkParquetWriter extends DataTest {
       Iterator<Record> actual = reader.iterator();
       LogicalType rowType = FlinkSchemaUtil.convert(schema);
       for (int i = 0; i < NUM_RECORDS; i += 1) {
-        Assert.assertTrue("Should have expected number of rows", 
actual.hasNext());
+        assertThat(actual).hasNext();
         TestHelpers.assertRowData(schema.asStruct(), rowType, actual.next(), 
expected.next());
       }
-      Assert.assertFalse("Should not have extra rows", actual.hasNext());
+      assertThat(actual).isExhausted();
     }
   }
 
diff --git 
a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkAvroReaderWriter.java
 
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkAvroReaderWriter.java
index e8aab824ea..a1039d27d8 100644
--- 
a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkAvroReaderWriter.java
+++ 
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkAvroReaderWriter.java
@@ -18,6 +18,8 @@
  */
 package org.apache.iceberg.flink.data;
 
+import static org.assertj.core.api.Assertions.assertThat;
+
 import java.io.File;
 import java.io.IOException;
 import java.math.BigDecimal;
@@ -44,8 +46,7 @@ import 
org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.types.Types;
 import org.apache.iceberg.util.DateTimeUtil;
-import org.junit.Assert;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 public class TestFlinkAvroReaderWriter extends DataTest {
 
@@ -74,8 +75,8 @@ public class TestFlinkAvroReaderWriter extends DataTest {
     RowType flinkSchema = FlinkSchemaUtil.convert(schema);
     List<RowData> expectedRows = 
Lists.newArrayList(RandomRowData.convert(schema, expectedRecords));
 
-    File recordsFile = temp.newFile();
-    Assert.assertTrue("Delete should succeed", recordsFile.delete());
+    File recordsFile = File.createTempFile("junit", null, temp.toFile());
+    assertThat(recordsFile.delete()).isTrue();
 
     // Write the expected records into AVRO file, then read them into RowData 
and assert with the
     // expected Record list.
@@ -95,14 +96,14 @@ public class TestFlinkAvroReaderWriter extends DataTest {
       Iterator<Record> expected = expectedRecords.iterator();
       Iterator<RowData> rows = reader.iterator();
       for (int i = 0; i < numRecord; i++) {
-        Assert.assertTrue("Should have expected number of records", 
rows.hasNext());
+        assertThat(rows).hasNext();
         TestHelpers.assertRowData(schema.asStruct(), flinkSchema, 
expected.next(), rows.next());
       }
-      Assert.assertFalse("Should not have extra records", rows.hasNext());
+      assertThat(rows).isExhausted();
     }
 
-    File rowDataFile = temp.newFile();
-    Assert.assertTrue("Delete should succeed", rowDataFile.delete());
+    File rowDataFile = File.createTempFile("junit", null, temp.toFile());
+    assertThat(rowDataFile.delete()).isTrue();
 
     // Write the expected RowData into AVRO file, then read them into Record 
and assert with the
     // expected RowData list.
@@ -122,10 +123,10 @@ public class TestFlinkAvroReaderWriter extends DataTest {
       Iterator<RowData> expected = expectedRows.iterator();
       Iterator<Record> records = reader.iterator();
       for (int i = 0; i < numRecord; i += 1) {
-        Assert.assertTrue("Should have expected number of records", 
records.hasNext());
+        assertThat(records).hasNext();
         TestHelpers.assertRowData(schema.asStruct(), flinkSchema, 
records.next(), expected.next());
       }
-      Assert.assertFalse("Should not have extra records", records.hasNext());
+      assertThat(records).isExhausted();
     }
   }
 
diff --git 
a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkOrcReaderWriter.java
 
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkOrcReaderWriter.java
index fdffc0e01c..91ee017238 100644
--- 
a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkOrcReaderWriter.java
+++ 
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkOrcReaderWriter.java
@@ -18,6 +18,8 @@
  */
 package org.apache.iceberg.flink.data;
 
+import static org.assertj.core.api.Assertions.assertThat;
+
 import java.io.File;
 import java.io.IOException;
 import java.util.Iterator;
@@ -37,7 +39,6 @@ import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.FileAppender;
 import org.apache.iceberg.orc.ORC;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
-import org.junit.Assert;
 
 public class TestFlinkOrcReaderWriter extends DataTest {
   private static final int NUM_RECORDS = 100;
@@ -48,8 +49,8 @@ public class TestFlinkOrcReaderWriter extends DataTest {
     List<Record> expectedRecords = RandomGenericData.generate(schema, 
NUM_RECORDS, 1990L);
     List<RowData> expectedRows = 
Lists.newArrayList(RandomRowData.convert(schema, expectedRecords));
 
-    File recordsFile = temp.newFile();
-    Assert.assertTrue("Delete should succeed", recordsFile.delete());
+    File recordsFile = File.createTempFile("junit", null, temp.toFile());
+    assertThat(recordsFile.delete()).isTrue();
 
     // Write the expected records into ORC file, then read them into RowData 
and assert with the
     // expected Record list.
@@ -69,14 +70,14 @@ public class TestFlinkOrcReaderWriter extends DataTest {
       Iterator<Record> expected = expectedRecords.iterator();
       Iterator<RowData> rows = reader.iterator();
       for (int i = 0; i < NUM_RECORDS; i++) {
-        Assert.assertTrue("Should have expected number of records", 
rows.hasNext());
+        assertThat(rows.hasNext()).isTrue();
         TestHelpers.assertRowData(schema.asStruct(), flinkSchema, 
expected.next(), rows.next());
       }
-      Assert.assertFalse("Should not have extra records", rows.hasNext());
+      assertThat(rows).isExhausted();
     }
 
-    File rowDataFile = temp.newFile();
-    Assert.assertTrue("Delete should succeed", rowDataFile.delete());
+    File rowDataFile = File.createTempFile("junit", null, temp.toFile());
+    assertThat(rowDataFile.delete()).isTrue();
 
     // Write the expected RowData into ORC file, then read them into Record 
and assert with the
     // expected RowData list.
@@ -97,10 +98,10 @@ public class TestFlinkOrcReaderWriter extends DataTest {
       Iterator<RowData> expected = expectedRows.iterator();
       Iterator<Record> records = reader.iterator();
       for (int i = 0; i < NUM_RECORDS; i += 1) {
-        Assert.assertTrue("Should have expected number of records", 
records.hasNext());
+        assertThat(records).hasNext();
         TestHelpers.assertRowData(schema.asStruct(), flinkSchema, 
records.next(), expected.next());
       }
-      Assert.assertFalse("Should not have extra records", records.hasNext());
+      assertThat(records).isExhausted();
     }
   }
 }
diff --git 
a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java
 
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java
index 30a2a7bb51..1fdc4cf838 100644
--- 
a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java
+++ 
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java
@@ -19,6 +19,7 @@
 package org.apache.iceberg.flink.data;
 
 import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.assertj.core.api.Assertions.assertThat;
 
 import java.io.File;
 import java.io.IOException;
@@ -47,8 +48,7 @@ import 
org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.types.Types;
 import org.apache.parquet.avro.AvroParquetWriter;
 import org.apache.parquet.hadoop.ParquetWriter;
-import org.junit.Assert;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 public class TestFlinkParquetReader extends DataTest {
   private static final int NUM_RECORDS = 100;
@@ -61,8 +61,8 @@ public class TestFlinkParquetReader extends DataTest {
             optional(2, "topbytes", Types.BinaryType.get()));
     org.apache.avro.Schema avroSchema = 
AvroSchemaUtil.convert(schema.asStruct());
 
-    File testFile = temp.newFile();
-    Assert.assertTrue(testFile.delete());
+    File testFile = File.createTempFile("junit", null, temp.toFile());
+    assertThat(testFile.delete()).isTrue();
 
     ParquetWriter<GenericRecord> writer =
         AvroParquetWriter.<GenericRecord>builder(new Path(testFile.toURI()))
@@ -90,17 +90,17 @@ public class TestFlinkParquetReader extends DataTest {
             .createReaderFunc(type -> FlinkParquetReaders.buildReader(schema, 
type))
             .build()) {
       Iterator<RowData> rows = reader.iterator();
-      Assert.assertTrue("Should have at least one row", rows.hasNext());
+      assertThat(rows).hasNext();
       RowData rowData = rows.next();
-      Assert.assertArrayEquals(rowData.getArray(0).getBinary(0), expectedByte);
-      Assert.assertArrayEquals(rowData.getBinary(1), expectedByte);
-      Assert.assertFalse("Should not have more than one row", rows.hasNext());
+      assertThat(rowData.getArray(0).getBinary(0)).isEqualTo(expectedByte);
+      assertThat(rowData.getBinary(1)).isEqualTo(expectedByte);
+      assertThat(rows).isExhausted();
     }
   }
 
   private void writeAndValidate(Iterable<Record> iterable, Schema schema) 
throws IOException {
-    File testFile = temp.newFile();
-    Assert.assertTrue("Delete should succeed", testFile.delete());
+    File testFile = File.createTempFile("junit", null, temp.toFile());
+    assertThat(testFile.delete()).isTrue();
 
     try (FileAppender<Record> writer =
         Parquet.write(Files.localOutput(testFile))
@@ -119,10 +119,10 @@ public class TestFlinkParquetReader extends DataTest {
       Iterator<RowData> rows = reader.iterator();
       LogicalType rowType = FlinkSchemaUtil.convert(schema);
       for (int i = 0; i < NUM_RECORDS; i += 1) {
-        Assert.assertTrue("Should have expected number of rows", 
rows.hasNext());
+        assertThat(rows).hasNext();
         TestHelpers.assertRowData(schema.asStruct(), rowType, expected.next(), 
rows.next());
       }
-      Assert.assertFalse("Should not have extra rows", rows.hasNext());
+      assertThat(rows).isExhausted();
     }
   }
 
diff --git 
a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetWriter.java
 
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetWriter.java
index 7b868eafc3..b1e6f5aa00 100644
--- 
a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetWriter.java
+++ 
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetWriter.java
@@ -18,8 +18,11 @@
  */
 package org.apache.iceberg.flink.data;
 
+import static org.assertj.core.api.Assertions.assertThat;
+
 import java.io.File;
 import java.io.IOException;
+import java.nio.file.Path;
 import java.util.Iterator;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.logical.LogicalType;
@@ -34,18 +37,16 @@ import org.apache.iceberg.flink.TestHelpers;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.FileAppender;
 import org.apache.iceberg.parquet.Parquet;
-import org.junit.Assert;
-import org.junit.Rule;
-import org.junit.rules.TemporaryFolder;
+import org.junit.jupiter.api.io.TempDir;
 
 public class TestFlinkParquetWriter extends DataTest {
   private static final int NUM_RECORDS = 100;
 
-  @Rule public TemporaryFolder temp = new TemporaryFolder();
+  @TempDir private Path temp;
 
   private void writeAndValidate(Iterable<RowData> iterable, Schema schema) 
throws IOException {
-    File testFile = temp.newFile();
-    Assert.assertTrue("Delete should succeed", testFile.delete());
+    File testFile = File.createTempFile("junit", null, temp.toFile());
+    assertThat(testFile.delete()).isTrue();
 
     LogicalType logicalType = FlinkSchemaUtil.convert(schema);
 
@@ -66,10 +67,10 @@ public class TestFlinkParquetWriter extends DataTest {
       Iterator<Record> actual = reader.iterator();
       LogicalType rowType = FlinkSchemaUtil.convert(schema);
       for (int i = 0; i < NUM_RECORDS; i += 1) {
-        Assert.assertTrue("Should have expected number of rows", 
actual.hasNext());
+        assertThat(actual).hasNext();
         TestHelpers.assertRowData(schema.asStruct(), rowType, actual.next(), 
expected.next());
       }
-      Assert.assertFalse("Should not have extra rows", actual.hasNext());
+      assertThat(actual).isExhausted();
     }
   }
 
diff --git 
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/AvroGenericRecordConverterBase.java
 
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/AvroGenericRecordConverterBase.java
index 47319ec9bc..4184526a6a 100644
--- 
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/AvroGenericRecordConverterBase.java
+++ 
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/AvroGenericRecordConverterBase.java
@@ -18,7 +18,7 @@
  */
 package org.apache.iceberg.flink;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 public abstract class AvroGenericRecordConverterBase {
   protected abstract void testConverter(DataGenerator dataGenerator) throws 
Exception;
diff --git 
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestDataFileSerialization.java
 
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestDataFileSerialization.java
index e9372adda4..8992cbd751 100644
--- 
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestDataFileSerialization.java
+++ 
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestDataFileSerialization.java
@@ -20,6 +20,7 @@ package org.apache.iceberg.flink;
 
 import static org.apache.iceberg.types.Types.NestedField.optional;
 import static org.apache.iceberg.types.Types.NestedField.required;
+import static org.assertj.core.api.Assertions.assertThat;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
@@ -44,8 +45,7 @@ import org.apache.iceberg.SortOrder;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.types.Types;
-import org.assertj.core.api.Assertions;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 public class TestDataFileSerialization {
 
@@ -135,23 +135,19 @@ public class TestDataFileSerialization {
         new ObjectInputStream(new ByteArrayInputStream(bytes.toByteArray()))) {
       for (int i = 0; i < 2; i += 1) {
         Object obj = in.readObject();
-        Assertions.assertThat(obj).as("Should be a 
DataFile").isInstanceOf(DataFile.class);
+        assertThat(obj).as("Should be a 
DataFile").isInstanceOf(DataFile.class);
         TestHelpers.assertEquals(DATA_FILE, (DataFile) obj);
       }
 
       for (int i = 0; i < 2; i += 1) {
         Object obj = in.readObject();
-        Assertions.assertThat(obj)
-            .as("Should be a position DeleteFile")
-            .isInstanceOf(DeleteFile.class);
+        assertThat(obj).as("Should be a position 
DeleteFile").isInstanceOf(DeleteFile.class);
         TestHelpers.assertEquals(POS_DELETE_FILE, (DeleteFile) obj);
       }
 
       for (int i = 0; i < 2; i += 1) {
         Object obj = in.readObject();
-        Assertions.assertThat(obj)
-            .as("Should be a equality DeleteFile")
-            .isInstanceOf(DeleteFile.class);
+        assertThat(obj).as("Should be a equality 
DeleteFile").isInstanceOf(DeleteFile.class);
         TestHelpers.assertEquals(EQ_DELETE_FILE, (DeleteFile) obj);
       }
     }
diff --git 
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogFactory.java
 
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogFactory.java
index ba08b76dd5..4c9e95b8fa 100644
--- 
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogFactory.java
+++ 
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogFactory.java
@@ -18,6 +18,9 @@
  */
 package org.apache.iceberg.flink;
 
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
 import java.util.Map;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.iceberg.CatalogProperties;
@@ -26,15 +29,14 @@ import org.apache.iceberg.hadoop.HadoopCatalog;
 import org.apache.iceberg.hive.HiveCatalog;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
-import org.assertj.core.api.Assertions;
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
 
 public class TestFlinkCatalogFactory {
 
   private Map<String, String> props;
 
-  @Before
+  @BeforeEach
   public void before() {
     props = Maps.newHashMap();
     props.put("type", "iceberg");
@@ -51,7 +53,7 @@ public class TestFlinkCatalogFactory {
         FlinkCatalogFactory.createCatalogLoader(catalogName, props, new 
Configuration())
             .loadCatalog();
 
-    Assertions.assertThat(catalog).isNotNull().isInstanceOf(HiveCatalog.class);
+    assertThat(catalog).isNotNull().isInstanceOf(HiveCatalog.class);
   }
 
   @Test
@@ -64,7 +66,7 @@ public class TestFlinkCatalogFactory {
         FlinkCatalogFactory.createCatalogLoader(catalogName, props, new 
Configuration())
             .loadCatalog();
 
-    
Assertions.assertThat(catalog).isNotNull().isInstanceOf(HadoopCatalog.class);
+    assertThat(catalog).isNotNull().isInstanceOf(HadoopCatalog.class);
   }
 
   @Test
@@ -76,7 +78,7 @@ public class TestFlinkCatalogFactory {
         FlinkCatalogFactory.createCatalogLoader(catalogName, props, new 
Configuration())
             .loadCatalog();
 
-    
Assertions.assertThat(catalog).isNotNull().isInstanceOf(CustomHadoopCatalog.class);
+    assertThat(catalog).isNotNull().isInstanceOf(CustomHadoopCatalog.class);
   }
 
   @Test
@@ -86,7 +88,7 @@ public class TestFlinkCatalogFactory {
     props.put(
         FlinkCatalogFactory.ICEBERG_CATALOG_TYPE, 
FlinkCatalogFactory.ICEBERG_CATALOG_TYPE_HIVE);
 
-    Assertions.assertThatThrownBy(
+    assertThatThrownBy(
             () -> FlinkCatalogFactory.createCatalogLoader(catalogName, props, 
new Configuration()))
         .isInstanceOf(IllegalArgumentException.class)
         .hasMessageStartingWith(
@@ -98,7 +100,7 @@ public class TestFlinkCatalogFactory {
     String catalogName = "unknownCatalog";
     props.put(FlinkCatalogFactory.ICEBERG_CATALOG_TYPE, "fooType");
 
-    Assertions.assertThatThrownBy(
+    assertThatThrownBy(
             () -> FlinkCatalogFactory.createCatalogLoader(catalogName, props, 
new Configuration()))
         .isInstanceOf(UnsupportedOperationException.class)
         .hasMessageStartingWith("Unknown catalog-type: fooType");
diff --git 
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestFlinkFilters.java
 
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestFlinkFilters.java
index c89ea4f530..838b0ea0e1 100644
--- 
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestFlinkFilters.java
+++ 
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestFlinkFilters.java
@@ -18,6 +18,8 @@
  */
 package org.apache.iceberg.flink;
 
+import static org.assertj.core.api.Assertions.assertThat;
+
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 import java.time.Instant;
@@ -49,9 +51,7 @@ import org.apache.iceberg.expressions.UnboundPredicate;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.util.DateTimeUtil;
 import org.apache.iceberg.util.Pair;
-import org.assertj.core.api.Assertions;
-import org.junit.Assert;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 public class TestFlinkFilters {
 
@@ -121,13 +121,13 @@ public class TestFlinkFilters {
       Optional<org.apache.iceberg.expressions.Expression> actual =
           FlinkFilters.convert(
               
resolve(Expressions.$(pair.first()).isEqual(Expressions.lit(pair.second()))));
-      Assert.assertTrue("Conversion should succeed", actual.isPresent());
+      assertThat(actual).isPresent();
       assertPredicatesMatch(expected, actual.get());
 
       Optional<org.apache.iceberg.expressions.Expression> actual1 =
           FlinkFilters.convert(
               
resolve(Expressions.lit(pair.second()).isEqual(Expressions.$(pair.first()))));
-      Assert.assertTrue("Conversion should succeed", actual1.isPresent());
+      assertThat(actual1).isPresent();
       assertPredicatesMatch(expected, actual1.get());
     }
   }
@@ -138,12 +138,12 @@ public class TestFlinkFilters {
 
     Optional<org.apache.iceberg.expressions.Expression> actual =
         
FlinkFilters.convert(resolve(Expressions.$("field3").isEqual(Expressions.lit(Float.NaN))));
-    Assert.assertTrue("Conversion should succeed", actual.isPresent());
+    assertThat(actual).isPresent();
     assertPredicatesMatch(expected, actual.get());
 
     Optional<org.apache.iceberg.expressions.Expression> actual1 =
         
FlinkFilters.convert(resolve(Expressions.lit(Float.NaN).isEqual(Expressions.$("field3"))));
-    Assert.assertTrue("Conversion should succeed", actual1.isPresent());
+    assertThat(actual1).isPresent();
     assertPredicatesMatch(expected, actual1.get());
   }
 
@@ -156,13 +156,13 @@ public class TestFlinkFilters {
       Optional<org.apache.iceberg.expressions.Expression> actual =
           FlinkFilters.convert(
               
resolve(Expressions.$(pair.first()).isNotEqual(Expressions.lit(pair.second()))));
-      Assert.assertTrue("Conversion should succeed", actual.isPresent());
+      assertThat(actual).isPresent();
       assertPredicatesMatch(expected, actual.get());
 
       Optional<org.apache.iceberg.expressions.Expression> actual1 =
           FlinkFilters.convert(
               
resolve(Expressions.lit(pair.second()).isNotEqual(Expressions.$(pair.first()))));
-      Assert.assertTrue("Conversion should succeed", actual1.isPresent());
+      assertThat(actual1).isPresent();
       assertPredicatesMatch(expected, actual1.get());
     }
   }
@@ -174,13 +174,13 @@ public class TestFlinkFilters {
     Optional<org.apache.iceberg.expressions.Expression> actual =
         FlinkFilters.convert(
             
resolve(Expressions.$("field3").isNotEqual(Expressions.lit(Float.NaN))));
-    Assert.assertTrue("Conversion should succeed", actual.isPresent());
+    assertThat(actual).isPresent();
     assertPredicatesMatch(expected, actual.get());
 
     Optional<org.apache.iceberg.expressions.Expression> actual1 =
         FlinkFilters.convert(
             
resolve(Expressions.lit(Float.NaN).isNotEqual(Expressions.$("field3"))));
-    Assert.assertTrue("Conversion should succeed", actual1.isPresent());
+    assertThat(actual1).isPresent();
     assertPredicatesMatch(expected, actual1.get());
   }
 
@@ -191,12 +191,12 @@ public class TestFlinkFilters {
 
     Optional<org.apache.iceberg.expressions.Expression> actual =
         
FlinkFilters.convert(resolve(Expressions.$("field1").isGreater(Expressions.lit(1))));
-    Assert.assertTrue("Conversion should succeed", actual.isPresent());
+    assertThat(actual).isPresent();
     assertPredicatesMatch(expected, actual.get());
 
     Optional<org.apache.iceberg.expressions.Expression> actual1 =
         
FlinkFilters.convert(resolve(Expressions.lit(1).isLess(Expressions.$("field1"))));
-    Assert.assertTrue("Conversion should succeed", actual1.isPresent());
+    assertThat(actual1).isPresent();
     assertPredicatesMatch(expected, actual1.get());
   }
 
@@ -207,12 +207,12 @@ public class TestFlinkFilters {
 
     Optional<org.apache.iceberg.expressions.Expression> actual =
         
FlinkFilters.convert(resolve(Expressions.$("field1").isGreaterOrEqual(Expressions.lit(1))));
-    Assert.assertTrue("Conversion should succeed", actual.isPresent());
+    assertThat(actual).isPresent();
     assertPredicatesMatch(expected, actual.get());
 
     Optional<org.apache.iceberg.expressions.Expression> actual1 =
         
FlinkFilters.convert(resolve(Expressions.lit(1).isLessOrEqual(Expressions.$("field1"))));
-    Assert.assertTrue("Conversion should succeed", actual1.isPresent());
+    assertThat(actual1).isPresent();
     assertPredicatesMatch(expected, actual1.get());
   }
 
@@ -223,12 +223,12 @@ public class TestFlinkFilters {
 
     Optional<org.apache.iceberg.expressions.Expression> actual =
         
FlinkFilters.convert(resolve(Expressions.$("field1").isLess(Expressions.lit(1))));
-    Assert.assertTrue("Conversion should succeed", actual.isPresent());
+    assertThat(actual).isPresent();
     assertPredicatesMatch(expected, actual.get());
 
     Optional<org.apache.iceberg.expressions.Expression> actual1 =
         
FlinkFilters.convert(resolve(Expressions.lit(1).isGreater(Expressions.$("field1"))));
-    Assert.assertTrue("Conversion should succeed", actual1.isPresent());
+    assertThat(actual1).isPresent();
     assertPredicatesMatch(expected, actual1.get());
   }
 
@@ -239,12 +239,12 @@ public class TestFlinkFilters {
 
     Optional<org.apache.iceberg.expressions.Expression> actual =
         
FlinkFilters.convert(resolve(Expressions.$("field1").isLessOrEqual(Expressions.lit(1))));
-    Assert.assertTrue("Conversion should succeed", actual.isPresent());
+    assertThat(actual).isPresent();
     assertPredicatesMatch(expected, actual.get());
 
     Optional<org.apache.iceberg.expressions.Expression> actual1 =
         
FlinkFilters.convert(resolve(Expressions.lit(1).isGreaterOrEqual(Expressions.$("field1"))));
-    Assert.assertTrue("Conversion should succeed", actual1.isPresent());
+    assertThat(actual1).isPresent();
     assertPredicatesMatch(expected, actual1.get());
   }
 
@@ -252,7 +252,7 @@ public class TestFlinkFilters {
   public void testIsNull() {
     Expression expr = resolve(Expressions.$("field1").isNull());
     Optional<org.apache.iceberg.expressions.Expression> actual = 
FlinkFilters.convert(expr);
-    Assert.assertTrue("Conversion should succeed", actual.isPresent());
+    assertThat(actual).isPresent();
     UnboundPredicate<Object> expected = 
org.apache.iceberg.expressions.Expressions.isNull("field1");
     assertPredicatesMatch(expected, actual.get());
   }
@@ -261,7 +261,7 @@ public class TestFlinkFilters {
   public void testIsNotNull() {
     Expression expr = resolve(Expressions.$("field1").isNotNull());
     Optional<org.apache.iceberg.expressions.Expression> actual = 
FlinkFilters.convert(expr);
-    Assert.assertTrue("Conversion should succeed", actual.isPresent());
+    assertThat(actual).isPresent();
     UnboundPredicate<Object> expected =
         org.apache.iceberg.expressions.Expressions.notNull("field1");
     assertPredicatesMatch(expected, actual.get());
@@ -275,7 +275,7 @@ public class TestFlinkFilters {
                 .isEqual(Expressions.lit(1))
                 .and(Expressions.$("field2").isEqual(Expressions.lit(2L))));
     Optional<org.apache.iceberg.expressions.Expression> actual = 
FlinkFilters.convert(expr);
-    Assert.assertTrue("Conversion should succeed", actual.isPresent());
+    assertThat(actual).isPresent();
     And and = (And) actual.get();
     And expected =
         (And)
@@ -295,7 +295,7 @@ public class TestFlinkFilters {
                 .isEqual(Expressions.lit(1))
                 .or(Expressions.$("field2").isEqual(Expressions.lit(2L))));
     Optional<org.apache.iceberg.expressions.Expression> actual = 
FlinkFilters.convert(expr);
-    Assert.assertTrue("Conversion should succeed", actual.isPresent());
+    assertThat(actual).isPresent();
     Or or = (Or) actual.get();
     Or expected =
         (Or)
@@ -315,14 +315,14 @@ public class TestFlinkFilters {
                 BuiltInFunctionDefinitions.NOT,
                 Expressions.$("field1").isEqual(Expressions.lit(1))));
     Optional<org.apache.iceberg.expressions.Expression> actual = 
FlinkFilters.convert(expr);
-    Assert.assertTrue("Conversion should succeed", actual.isPresent());
+    assertThat(actual).isPresent();
     Not not = (Not) actual.get();
     Not expected =
         (Not)
             org.apache.iceberg.expressions.Expressions.not(
                 org.apache.iceberg.expressions.Expressions.equal("field1", 1));
 
-    Assert.assertEquals("Predicate operation should match", expected.op(), 
not.op());
+    assertThat(not.op()).as("Predicate operation should 
match").isEqualTo(expected.op());
     assertPredicatesMatch(expected.child(), not.child());
   }
 
@@ -335,7 +335,7 @@ public class TestFlinkFilters {
             ApiExpressionUtils.unresolvedCall(
                 BuiltInFunctionDefinitions.LIKE, Expressions.$("field5"), 
Expressions.lit("abc%")));
     Optional<org.apache.iceberg.expressions.Expression> actual = 
FlinkFilters.convert(expr);
-    Assert.assertTrue("Conversion should succeed", actual.isPresent());
+    assertThat(actual).isPresent();
     assertPredicatesMatch(expected, actual.get());
 
     expr =
@@ -343,7 +343,7 @@ public class TestFlinkFilters {
             ApiExpressionUtils.unresolvedCall(
                 BuiltInFunctionDefinitions.LIKE, Expressions.$("field5"), 
Expressions.lit("%abc")));
     actual = FlinkFilters.convert(expr);
-    Assert.assertFalse("Conversion should failed", actual.isPresent());
+    assertThat(actual).isNotPresent();
 
     expr =
         resolve(
@@ -352,7 +352,7 @@ public class TestFlinkFilters {
                 Expressions.$("field5"),
                 Expressions.lit("%abc%")));
     actual = FlinkFilters.convert(expr);
-    Assert.assertFalse("Conversion should failed", actual.isPresent());
+    assertThat(actual).isNotPresent();
 
     expr =
         resolve(
@@ -361,49 +361,49 @@ public class TestFlinkFilters {
                 Expressions.$("field5"),
                 Expressions.lit("abc%d")));
     actual = FlinkFilters.convert(expr);
-    Assert.assertFalse("Conversion should failed", actual.isPresent());
+    assertThat(actual).isNotPresent();
 
     expr =
         resolve(
             ApiExpressionUtils.unresolvedCall(
                 BuiltInFunctionDefinitions.LIKE, Expressions.$("field5"), 
Expressions.lit("%")));
     actual = FlinkFilters.convert(expr);
-    Assert.assertFalse("Conversion should failed", actual.isPresent());
+    assertThat(actual).isNotPresent();
 
     expr =
         resolve(
             ApiExpressionUtils.unresolvedCall(
                 BuiltInFunctionDefinitions.LIKE, Expressions.$("field5"), 
Expressions.lit("a_")));
     actual = FlinkFilters.convert(expr);
-    Assert.assertFalse("Conversion should failed", actual.isPresent());
+    assertThat(actual).isNotPresent();
 
     expr =
         resolve(
             ApiExpressionUtils.unresolvedCall(
                 BuiltInFunctionDefinitions.LIKE, Expressions.$("field5"), 
Expressions.lit("a%b")));
     actual = FlinkFilters.convert(expr);
-    Assert.assertFalse("Conversion should failed", actual.isPresent());
+    assertThat(actual).isNotPresent();
   }
 
   @SuppressWarnings("unchecked")
   private <T> void matchLiteral(String fieldName, Object flinkLiteral, T 
icebergLiteral) {
     Expression expr = 
resolve(Expressions.$(fieldName).isEqual(Expressions.lit(flinkLiteral)));
     Optional<org.apache.iceberg.expressions.Expression> actual = 
FlinkFilters.convert(expr);
-    Assert.assertTrue("Conversion should succeed", actual.isPresent());
+    assertThat(actual).isPresent();
     org.apache.iceberg.expressions.Expression expression = actual.get();
-    Assertions.assertThat(expression)
+    assertThat(expression)
         .as("The expression should be a UnboundPredicate")
         .isInstanceOf(UnboundPredicate.class);
     UnboundPredicate<T> unboundPredicate = (UnboundPredicate<T>) expression;
 
     org.apache.iceberg.expressions.Expression expression1 =
         
unboundPredicate.bind(FlinkSchemaUtil.convert(TABLE_SCHEMA).asStruct(), false);
-    Assertions.assertThat(expression1)
+    assertThat(expression1)
         .as("The expression should be a BoundLiteralPredicate")
         .isInstanceOf(BoundLiteralPredicate.class);
 
     BoundLiteralPredicate<T> predicate = (BoundLiteralPredicate<T>) 
expression1;
-    Assert.assertTrue("Should match the  literal", 
predicate.test(icebergLiteral));
+    assertThat(predicate.test(icebergLiteral)).isTrue();
   }
 
   private static Expression resolve(Expression originalExpression) {
@@ -447,21 +447,16 @@ public class TestFlinkFilters {
   private void assertPredicatesMatch(
       org.apache.iceberg.expressions.Expression expected,
       org.apache.iceberg.expressions.Expression actual) {
-    Assertions.assertThat(expected)
+    assertThat(expected)
         .as("The expected expression should be a UnboundPredicate")
         .isInstanceOf(UnboundPredicate.class);
-    Assertions.assertThat(actual)
+    assertThat(actual)
         .as("The actual expression should be a UnboundPredicate")
         .isInstanceOf(UnboundPredicate.class);
     UnboundPredicate<?> predicateExpected = (UnboundPredicate<?>) expected;
     UnboundPredicate<?> predicateActual = (UnboundPredicate<?>) actual;
-    Assert.assertEquals(
-        "Predicate operation should match", predicateExpected.op(), 
predicateActual.op());
-    Assert.assertEquals(
-        "Predicate literal should match", predicateExpected.literal(), 
predicateActual.literal());
-    Assert.assertEquals(
-        "Predicate name should match",
-        predicateExpected.ref().name(),
-        predicateActual.ref().name());
+    assertThat(predicateActual.op()).isEqualTo(predicateExpected.op());
+    
assertThat(predicateActual.literal()).isEqualTo(predicateExpected.literal());
+    
assertThat(predicateActual.ref().name()).isEqualTo(predicateExpected.ref().name());
   }
 }
diff --git 
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestFlinkSchemaUtil.java
 
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestFlinkSchemaUtil.java
index 4ac32c08eb..eab60d886a 100644
--- 
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestFlinkSchemaUtil.java
+++ 
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestFlinkSchemaUtil.java
@@ -18,6 +18,9 @@
  */
 package org.apache.iceberg.flink;
 
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.ValidationException;
@@ -31,14 +34,11 @@ import org.apache.flink.table.types.logical.TimestampType;
 import org.apache.flink.table.types.logical.VarBinaryType;
 import org.apache.flink.table.types.logical.VarCharType;
 import org.apache.iceberg.Schema;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.types.Type;
 import org.apache.iceberg.types.Types;
-import org.assertj.core.api.Assertions;
-import org.junit.Assert;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 public class TestFlinkSchemaUtil {
 
@@ -313,12 +313,13 @@ public class TestFlinkSchemaUtil {
   }
 
   private void checkSchema(TableSchema flinkSchema, Schema icebergSchema) {
-    Assert.assertEquals(icebergSchema.asStruct(), 
FlinkSchemaUtil.convert(flinkSchema).asStruct());
+    
assertThat(FlinkSchemaUtil.convert(flinkSchema).asStruct()).isEqualTo(icebergSchema.asStruct());
     // The conversion is not a 1:1 mapping, so we just check iceberg types.
-    Assert.assertEquals(
-        icebergSchema.asStruct(),
-        
FlinkSchemaUtil.convert(FlinkSchemaUtil.toSchema(FlinkSchemaUtil.convert(icebergSchema)))
-            .asStruct());
+    assertThat(
+            FlinkSchemaUtil.convert(
+                    
FlinkSchemaUtil.toSchema(FlinkSchemaUtil.convert(icebergSchema)))
+                .asStruct())
+        .isEqualTo(icebergSchema.asStruct());
   }
 
   @Test
@@ -354,10 +355,9 @@ public class TestFlinkSchemaUtil {
       LogicalType flinkExpectedType,
       LogicalType flinkType,
       Type icebergExpectedType) {
-    Assert.assertEquals(flinkExpectedType, 
FlinkSchemaUtil.convert(icebergType));
-    Assert.assertEquals(
-        Types.StructType.of(Types.NestedField.optional(0, "f0", 
icebergExpectedType)),
-        
FlinkSchemaUtil.convert(FlinkSchemaUtil.toSchema(RowType.of(flinkType))).asStruct());
+    
assertThat(FlinkSchemaUtil.convert(icebergType)).isEqualTo(flinkExpectedType);
+    
assertThat(FlinkSchemaUtil.convert(FlinkSchemaUtil.toSchema(RowType.of(flinkType))).asStruct())
+        .isEqualTo(Types.StructType.of(Types.NestedField.optional(0, "f0", 
icebergExpectedType)));
   }
 
   @Test
@@ -376,8 +376,8 @@ public class TestFlinkSchemaUtil {
             .primaryKey("int")
             .build();
     Schema convertedSchema = FlinkSchemaUtil.convert(baseSchema, flinkSchema);
-    Assert.assertEquals(baseSchema.asStruct(), convertedSchema.asStruct());
-    Assert.assertEquals(ImmutableSet.of(101), 
convertedSchema.identifierFieldIds());
+    assertThat(convertedSchema.asStruct()).isEqualTo(baseSchema.asStruct());
+    assertThat(convertedSchema.identifierFieldIds()).containsExactly(101);
   }
 
   @Test
@@ -390,10 +390,10 @@ public class TestFlinkSchemaUtil {
             Sets.newHashSet(1, 2));
 
     TableSchema tableSchema = FlinkSchemaUtil.toSchema(icebergSchema);
-    Assert.assertTrue(tableSchema.getPrimaryKey().isPresent());
-    Assert.assertEquals(
-        ImmutableSet.of("int", "string"),
-        ImmutableSet.copyOf(tableSchema.getPrimaryKey().get().getColumns()));
+    assertThat(tableSchema.getPrimaryKey())
+        .isPresent()
+        .get()
+        .satisfies(k -> assertThat(k.getColumns()).containsExactly("int", 
"string"));
   }
 
   @Test
@@ -408,7 +408,7 @@ public class TestFlinkSchemaUtil {
                         Types.NestedField.required(2, "inner", 
Types.IntegerType.get())))),
             Sets.newHashSet(2));
 
-    Assertions.assertThatThrownBy(() -> 
FlinkSchemaUtil.toSchema(icebergSchema))
+    assertThatThrownBy(() -> FlinkSchemaUtil.toSchema(icebergSchema))
         .isInstanceOf(ValidationException.class)
         .hasMessageStartingWith("Could not create a PRIMARY KEY")
         .hasMessageContaining("Column 'struct.inner' does not exist.");
diff --git 
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestManifestFileSerialization.java
 
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestManifestFileSerialization.java
index 6bd94e9ca6..8f1f129e18 100644
--- 
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestManifestFileSerialization.java
+++ 
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestManifestFileSerialization.java
@@ -20,6 +20,7 @@ package org.apache.iceberg.flink;
 
 import static org.apache.iceberg.types.Types.NestedField.optional;
 import static org.apache.iceberg.types.Types.NestedField.required;
+import static org.assertj.core.api.Assertions.assertThat;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
@@ -29,6 +30,7 @@ import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
+import java.nio.file.Path;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
 import org.apache.flink.core.memory.DataInputDeserializer;
@@ -48,11 +50,8 @@ import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.OutputFile;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.types.Types;
-import org.assertj.core.api.Assertions;
-import org.junit.Assert;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
 
 public class TestManifestFileSerialization {
 
@@ -104,7 +103,7 @@ public class TestManifestFileSerialization {
 
   private static final FileIO FILE_IO = new HadoopFileIO(new Configuration());
 
-  @Rule public TemporaryFolder temp = new TemporaryFolder();
+  @TempDir private Path temp;
 
   @Test
   public void testKryoSerialization() throws IOException {
@@ -145,15 +144,15 @@ public class TestManifestFileSerialization {
         new ObjectInputStream(new ByteArrayInputStream(bytes.toByteArray()))) {
       for (int i = 0; i < 3; i += 1) {
         Object obj = in.readObject();
-        Assertions.assertThat(obj).as("Should be a 
ManifestFile").isInstanceOf(ManifestFile.class);
+        assertThat(obj).as("Should be a 
ManifestFile").isInstanceOf(ManifestFile.class);
         TestHelpers.assertEquals(manifest, (ManifestFile) obj);
       }
     }
   }
 
   private ManifestFile writeManifest(DataFile... files) throws IOException {
-    File manifestFile = temp.newFile("input.m0.avro");
-    Assert.assertTrue(manifestFile.delete());
+    File manifestFile = File.createTempFile("input", "m0.avro", temp.toFile());
+    assertThat(manifestFile.delete()).isTrue();
     OutputFile outputFile = 
FILE_IO.newOutputFile(manifestFile.getCanonicalPath());
 
     ManifestWriter<DataFile> writer = ManifestFiles.write(SPEC, outputFile);
diff --git 
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestRowDataWrapper.java
 
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestRowDataWrapper.java
index c78fa51215..caefbb5a54 100644
--- 
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestRowDataWrapper.java
+++ 
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestRowDataWrapper.java
@@ -18,6 +18,8 @@
  */
 package org.apache.iceberg.flink;
 
+import static org.assertj.core.api.Assertions.assertThat;
+
 import java.util.Iterator;
 import org.apache.flink.table.data.RowData;
 import org.apache.iceberg.RecordWrapperTest;
@@ -28,8 +30,6 @@ import org.apache.iceberg.data.RandomGenericData;
 import org.apache.iceberg.data.Record;
 import org.apache.iceberg.flink.data.RandomRowData;
 import org.apache.iceberg.util.StructLikeWrapper;
-import org.assertj.core.api.Assertions;
-import org.junit.Assert;
 
 public class TestRowDataWrapper extends RecordWrapperTest {
 
@@ -49,12 +49,12 @@ public class TestRowDataWrapper extends RecordWrapperTest {
               return;
             }
 
-            Assertions.assertThat(actual).isNotNull();
-            Assertions.assertThat(expected).isNotNull();
+            assertThat(actual).isNotNull();
+            assertThat(expected).isNotNull();
 
             int expectedMilliseconds = (int) ((long) expected / 1000_000);
             int actualMilliseconds = (int) ((long) actual / 1000_000);
-            Assert.assertEquals(message, expectedMilliseconds, 
actualMilliseconds);
+            
assertThat(actualMilliseconds).as(message).isEqualTo(expectedMilliseconds);
           }
         });
   }
@@ -75,8 +75,8 @@ public class TestRowDataWrapper extends RecordWrapperTest {
     StructLikeWrapper actualWrapper = 
StructLikeWrapper.forType(schema.asStruct());
     StructLikeWrapper expectedWrapper = 
StructLikeWrapper.forType(schema.asStruct());
     for (int i = 0; i < numRecords; i++) {
-      Assert.assertTrue("Should have more records", actual.hasNext());
-      Assert.assertTrue("Should have more RowData", expected.hasNext());
+      assertThat(actual).hasNext();
+      assertThat(expected).hasNext();
 
       StructLike recordStructLike = recordWrapper.wrap(actual.next());
       StructLike rowDataStructLike = rowDataWrapper.wrap(expected.next());
@@ -87,7 +87,7 @@ public class TestRowDataWrapper extends RecordWrapperTest {
           expectedWrapper.set(rowDataStructLike));
     }
 
-    Assert.assertFalse("Shouldn't have more record", actual.hasNext());
-    Assert.assertFalse("Shouldn't have more RowData", expected.hasNext());
+    assertThat(actual).isExhausted();
+    assertThat(expected).isExhausted();
   }
 }
diff --git 
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestTableSerialization.java
 
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestTableSerialization.java
index 27124d93fe..7f0e7acaa8 100644
--- 
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestTableSerialization.java
+++ 
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestTableSerialization.java
@@ -21,9 +21,11 @@ package org.apache.iceberg.flink;
 import static org.apache.iceberg.flink.TestHelpers.roundTripKryoSerialize;
 import static org.apache.iceberg.types.Types.NestedField.optional;
 import static org.apache.iceberg.types.Types.NestedField.required;
+import static org.assertj.core.api.Assertions.assertThat;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.file.Path;
 import java.util.Map;
 import org.apache.iceberg.HasTableOperations;
 import org.apache.iceberg.MetadataTableType;
@@ -39,11 +41,9 @@ import org.apache.iceberg.Transaction;
 import org.apache.iceberg.hadoop.HadoopTables;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.types.Types;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
 
 public class TestTableSerialization {
   private static final HadoopTables TABLES = new HadoopTables();
@@ -60,15 +60,15 @@ public class TestTableSerialization {
 
   private static final SortOrder SORT_ORDER = 
SortOrder.builderFor(SCHEMA).asc("id").build();
 
-  @Rule public TemporaryFolder temp = new TemporaryFolder();
+  @TempDir private Path temp;
   private Table table;
 
-  @Before
+  @BeforeEach
   public void initTable() throws IOException {
     Map<String, String> props = ImmutableMap.of("k1", "v1", "k2", "v2");
 
-    File tableLocation = temp.newFolder();
-    Assert.assertTrue(tableLocation.delete());
+    File tableLocation = File.createTempFile("junit", null, temp.toFile());
+    assertThat(tableLocation.delete()).isTrue();
 
     this.table = TABLES.create(SCHEMA, SPEC, SORT_ORDER, props, 
tableLocation.toString());
   }
diff --git 
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkAvroReaderWriter.java
 
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkAvroReaderWriter.java
index e8aab824ea..a1039d27d8 100644
--- 
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkAvroReaderWriter.java
+++ 
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkAvroReaderWriter.java
@@ -18,6 +18,8 @@
  */
 package org.apache.iceberg.flink.data;
 
+import static org.assertj.core.api.Assertions.assertThat;
+
 import java.io.File;
 import java.io.IOException;
 import java.math.BigDecimal;
@@ -44,8 +46,7 @@ import 
org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.types.Types;
 import org.apache.iceberg.util.DateTimeUtil;
-import org.junit.Assert;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 public class TestFlinkAvroReaderWriter extends DataTest {
 
@@ -74,8 +75,8 @@ public class TestFlinkAvroReaderWriter extends DataTest {
     RowType flinkSchema = FlinkSchemaUtil.convert(schema);
     List<RowData> expectedRows = 
Lists.newArrayList(RandomRowData.convert(schema, expectedRecords));
 
-    File recordsFile = temp.newFile();
-    Assert.assertTrue("Delete should succeed", recordsFile.delete());
+    File recordsFile = File.createTempFile("junit", null, temp.toFile());
+    assertThat(recordsFile.delete()).isTrue();
 
     // Write the expected records into AVRO file, then read them into RowData 
and assert with the
     // expected Record list.
@@ -95,14 +96,14 @@ public class TestFlinkAvroReaderWriter extends DataTest {
       Iterator<Record> expected = expectedRecords.iterator();
       Iterator<RowData> rows = reader.iterator();
       for (int i = 0; i < numRecord; i++) {
-        Assert.assertTrue("Should have expected number of records", 
rows.hasNext());
+        assertThat(rows).hasNext();
         TestHelpers.assertRowData(schema.asStruct(), flinkSchema, 
expected.next(), rows.next());
       }
-      Assert.assertFalse("Should not have extra records", rows.hasNext());
+      assertThat(rows).isExhausted();
     }
 
-    File rowDataFile = temp.newFile();
-    Assert.assertTrue("Delete should succeed", rowDataFile.delete());
+    File rowDataFile = File.createTempFile("junit", null, temp.toFile());
+    assertThat(rowDataFile.delete()).isTrue();
 
     // Write the expected RowData into AVRO file, then read them into Record 
and assert with the
     // expected RowData list.
@@ -122,10 +123,10 @@ public class TestFlinkAvroReaderWriter extends DataTest {
       Iterator<RowData> expected = expectedRows.iterator();
       Iterator<Record> records = reader.iterator();
       for (int i = 0; i < numRecord; i += 1) {
-        Assert.assertTrue("Should have expected number of records", 
records.hasNext());
+        assertThat(records).hasNext();
         TestHelpers.assertRowData(schema.asStruct(), flinkSchema, 
records.next(), expected.next());
       }
-      Assert.assertFalse("Should not have extra records", records.hasNext());
+      assertThat(records).isExhausted();
     }
   }
 
diff --git 
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkOrcReaderWriter.java
 
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkOrcReaderWriter.java
index fdffc0e01c..72f2ce4f4b 100644
--- 
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkOrcReaderWriter.java
+++ 
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkOrcReaderWriter.java
@@ -18,6 +18,8 @@
  */
 package org.apache.iceberg.flink.data;
 
+import static org.assertj.core.api.Assertions.assertThat;
+
 import java.io.File;
 import java.io.IOException;
 import java.util.Iterator;
@@ -37,7 +39,6 @@ import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.FileAppender;
 import org.apache.iceberg.orc.ORC;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
-import org.junit.Assert;
 
 public class TestFlinkOrcReaderWriter extends DataTest {
   private static final int NUM_RECORDS = 100;
@@ -48,8 +49,8 @@ public class TestFlinkOrcReaderWriter extends DataTest {
     List<Record> expectedRecords = RandomGenericData.generate(schema, 
NUM_RECORDS, 1990L);
     List<RowData> expectedRows = 
Lists.newArrayList(RandomRowData.convert(schema, expectedRecords));
 
-    File recordsFile = temp.newFile();
-    Assert.assertTrue("Delete should succeed", recordsFile.delete());
+    File recordsFile = File.createTempFile("junit", null, temp.toFile());
+    assertThat(recordsFile.delete()).isTrue();
 
     // Write the expected records into ORC file, then read them into RowData 
and assert with the
     // expected Record list.
@@ -69,14 +70,14 @@ public class TestFlinkOrcReaderWriter extends DataTest {
       Iterator<Record> expected = expectedRecords.iterator();
       Iterator<RowData> rows = reader.iterator();
       for (int i = 0; i < NUM_RECORDS; i++) {
-        Assert.assertTrue("Should have expected number of records", 
rows.hasNext());
+        assertThat(rows).hasNext();
         TestHelpers.assertRowData(schema.asStruct(), flinkSchema, 
expected.next(), rows.next());
       }
-      Assert.assertFalse("Should not have extra records", rows.hasNext());
+      assertThat(rows).isExhausted();
     }
 
-    File rowDataFile = temp.newFile();
-    Assert.assertTrue("Delete should succeed", rowDataFile.delete());
+    File rowDataFile = File.createTempFile("junit", null, temp.toFile());
+    assertThat(rowDataFile.delete()).isTrue();
 
     // Write the expected RowData into ORC file, then read them into Record 
and assert with the
     // expected RowData list.
@@ -97,10 +98,10 @@ public class TestFlinkOrcReaderWriter extends DataTest {
       Iterator<RowData> expected = expectedRows.iterator();
       Iterator<Record> records = reader.iterator();
       for (int i = 0; i < NUM_RECORDS; i += 1) {
-        Assert.assertTrue("Should have expected number of records", 
records.hasNext());
+        assertThat(records.hasNext()).isTrue();
         TestHelpers.assertRowData(schema.asStruct(), flinkSchema, 
records.next(), expected.next());
       }
-      Assert.assertFalse("Should not have extra records", records.hasNext());
+      assertThat(records).isExhausted();
     }
   }
 }
diff --git 
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java
 
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java
index 30a2a7bb51..1fdc4cf838 100644
--- 
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java
+++ 
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java
@@ -19,6 +19,7 @@
 package org.apache.iceberg.flink.data;
 
 import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.assertj.core.api.Assertions.assertThat;
 
 import java.io.File;
 import java.io.IOException;
@@ -47,8 +48,7 @@ import 
org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.types.Types;
 import org.apache.parquet.avro.AvroParquetWriter;
 import org.apache.parquet.hadoop.ParquetWriter;
-import org.junit.Assert;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 public class TestFlinkParquetReader extends DataTest {
   private static final int NUM_RECORDS = 100;
@@ -61,8 +61,8 @@ public class TestFlinkParquetReader extends DataTest {
             optional(2, "topbytes", Types.BinaryType.get()));
     org.apache.avro.Schema avroSchema = 
AvroSchemaUtil.convert(schema.asStruct());
 
-    File testFile = temp.newFile();
-    Assert.assertTrue(testFile.delete());
+    File testFile = File.createTempFile("junit", null, temp.toFile());
+    assertThat(testFile.delete()).isTrue();
 
     ParquetWriter<GenericRecord> writer =
         AvroParquetWriter.<GenericRecord>builder(new Path(testFile.toURI()))
@@ -90,17 +90,17 @@ public class TestFlinkParquetReader extends DataTest {
             .createReaderFunc(type -> FlinkParquetReaders.buildReader(schema, 
type))
             .build()) {
       Iterator<RowData> rows = reader.iterator();
-      Assert.assertTrue("Should have at least one row", rows.hasNext());
+      assertThat(rows).hasNext();
       RowData rowData = rows.next();
-      Assert.assertArrayEquals(rowData.getArray(0).getBinary(0), expectedByte);
-      Assert.assertArrayEquals(rowData.getBinary(1), expectedByte);
-      Assert.assertFalse("Should not have more than one row", rows.hasNext());
+      assertThat(rowData.getArray(0).getBinary(0)).isEqualTo(expectedByte);
+      assertThat(rowData.getBinary(1)).isEqualTo(expectedByte);
+      assertThat(rows).isExhausted();
     }
   }
 
   private void writeAndValidate(Iterable<Record> iterable, Schema schema) 
throws IOException {
-    File testFile = temp.newFile();
-    Assert.assertTrue("Delete should succeed", testFile.delete());
+    File testFile = File.createTempFile("junit", null, temp.toFile());
+    assertThat(testFile.delete()).isTrue();
 
     try (FileAppender<Record> writer =
         Parquet.write(Files.localOutput(testFile))
@@ -119,10 +119,10 @@ public class TestFlinkParquetReader extends DataTest {
       Iterator<RowData> rows = reader.iterator();
       LogicalType rowType = FlinkSchemaUtil.convert(schema);
       for (int i = 0; i < NUM_RECORDS; i += 1) {
-        Assert.assertTrue("Should have expected number of rows", 
rows.hasNext());
+        assertThat(rows).hasNext();
         TestHelpers.assertRowData(schema.asStruct(), rowType, expected.next(), 
rows.next());
       }
-      Assert.assertFalse("Should not have extra rows", rows.hasNext());
+      assertThat(rows).isExhausted();
     }
   }
 
diff --git 
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetWriter.java
 
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetWriter.java
index 7b868eafc3..b1e6f5aa00 100644
--- 
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetWriter.java
+++ 
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetWriter.java
@@ -18,8 +18,11 @@
  */
 package org.apache.iceberg.flink.data;
 
+import static org.assertj.core.api.Assertions.assertThat;
+
 import java.io.File;
 import java.io.IOException;
+import java.nio.file.Path;
 import java.util.Iterator;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.logical.LogicalType;
@@ -34,18 +37,16 @@ import org.apache.iceberg.flink.TestHelpers;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.FileAppender;
 import org.apache.iceberg.parquet.Parquet;
-import org.junit.Assert;
-import org.junit.Rule;
-import org.junit.rules.TemporaryFolder;
+import org.junit.jupiter.api.io.TempDir;
 
 public class TestFlinkParquetWriter extends DataTest {
   private static final int NUM_RECORDS = 100;
 
-  @Rule public TemporaryFolder temp = new TemporaryFolder();
+  @TempDir private Path temp;
 
   private void writeAndValidate(Iterable<RowData> iterable, Schema schema) 
throws IOException {
-    File testFile = temp.newFile();
-    Assert.assertTrue("Delete should succeed", testFile.delete());
+    File testFile = File.createTempFile("junit", null, temp.toFile());
+    assertThat(testFile.delete()).isTrue();
 
     LogicalType logicalType = FlinkSchemaUtil.convert(schema);
 
@@ -66,10 +67,10 @@ public class TestFlinkParquetWriter extends DataTest {
       Iterator<Record> actual = reader.iterator();
       LogicalType rowType = FlinkSchemaUtil.convert(schema);
       for (int i = 0; i < NUM_RECORDS; i += 1) {
-        Assert.assertTrue("Should have expected number of rows", 
actual.hasNext());
+        assertThat(actual).hasNext();
         TestHelpers.assertRowData(schema.asStruct(), rowType, actual.next(), 
expected.next());
       }
-      Assert.assertFalse("Should not have extra rows", actual.hasNext());
+      assertThat(actual).isExhausted();
     }
   }
 
diff --git 
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/data/TestRowProjection.java
 
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/data/TestRowProjection.java
index df2e6ae21c..7dd4e8759c 100644
--- 
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/data/TestRowProjection.java
+++ 
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/data/TestRowProjection.java
@@ -18,8 +18,12 @@
  */
 package org.apache.iceberg.flink.data;
 
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.withPrecision;
+
 import java.io.File;
 import java.io.IOException;
+import java.nio.file.Path;
 import java.util.Map;
 import org.apache.flink.table.data.ArrayData;
 import org.apache.flink.table.data.GenericArrayData;
@@ -37,19 +41,17 @@ import 
org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.types.Comparators;
 import org.apache.iceberg.types.Types;
-import org.junit.Assert;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
 
 public class TestRowProjection {
 
-  @Rule public TemporaryFolder temp = new TemporaryFolder();
+  @TempDir private Path temp;
 
   private RowData writeAndRead(String desc, Schema writeSchema, Schema 
readSchema, RowData row)
       throws IOException {
-    File file = temp.newFile(desc + ".avro");
-    Assert.assertTrue(file.delete());
+    File file = File.createTempFile("junit", desc + ".avro", temp.toFile());
+    assertThat(file.delete()).isTrue();
 
     try (FileAppender<RowData> appender =
         Avro.write(Files.localOutput(file))
@@ -79,10 +81,8 @@ public class TestRowProjection {
 
     RowData projected = writeAndRead("full_projection", schema, schema, row);
 
-    Assert.assertEquals("Should contain the correct id value", 34L, 
projected.getLong(0));
-
-    int cmp = Comparators.charSequences().compare("test", 
projected.getString(1).toString());
-    Assert.assertEquals("Should contain the correct data value", cmp, 0);
+    assertThat(projected.getLong(0)).isEqualTo(34);
+    assertThat(projected.getString(1)).asString().isEqualTo("test");
   }
 
   @Test
@@ -96,19 +96,13 @@ public class TestRowProjection {
 
     RowData full = writeAndRead("special_chars", schema, schema, row);
 
-    Assert.assertEquals("Should contain the correct id value", 34L, 
full.getLong(0));
-    Assert.assertEquals(
-        "Should contain the correct data value",
-        0,
-        Comparators.charSequences().compare("test", 
full.getString(1).toString()));
+    assertThat(full.getLong(0)).isEqualTo(34L);
+    assertThat(full.getString(1)).asString().isEqualTo("test");
 
     RowData projected = writeAndRead("special_characters", schema, 
schema.select("data%0"), full);
 
-    Assert.assertEquals("Should not contain id value", 1, 
projected.getArity());
-    Assert.assertEquals(
-        "Should contain the correct data value",
-        0,
-        Comparators.charSequences().compare("test", 
projected.getString(0).toString()));
+    assertThat(projected.getArity()).isEqualTo(1);
+    assertThat(projected.getString(0)).asString().isEqualTo("test");
   }
 
   @Test
@@ -127,9 +121,8 @@ public class TestRowProjection {
 
     RowData projected = writeAndRead("full_projection", schema, reordered, 
row);
 
-    Assert.assertEquals(
-        "Should contain the correct 0 value", "test", 
projected.getString(0).toString());
-    Assert.assertEquals("Should contain the correct 1 value", 34L, 
projected.getLong(1));
+    assertThat(projected.getString(0)).asString().isEqualTo("test");
+    assertThat(projected.getLong(1)).isEqualTo(34);
   }
 
   @Test
@@ -149,10 +142,9 @@ public class TestRowProjection {
 
     RowData projected = writeAndRead("full_projection", schema, reordered, 
row);
 
-    Assert.assertTrue("Should contain the correct 0 value", 
projected.isNullAt(0));
-    Assert.assertEquals(
-        "Should contain the correct 1 value", "test", 
projected.getString(1).toString());
-    Assert.assertTrue("Should contain the correct 2 value", 
projected.isNullAt(2));
+    assertThat(projected.isNullAt(0)).isTrue();
+    assertThat(projected.getString(1)).asString().isEqualTo("test");
+    assertThat(projected.isNullAt(2)).isTrue();
   }
 
   @Test
@@ -173,10 +165,16 @@ public class TestRowProjection {
             Types.NestedField.optional(4, "d", Types.LongType.get()));
 
     RowData projected = writeAndRead("rename_and_add_column_projection", 
schema, renamedAdded, row);
-    Assert.assertEquals("Should contain the correct value in column 1", 
projected.getLong(0), 100L);
-    Assert.assertEquals("Should contain the correct value in column 2", 
projected.getLong(1), 200L);
-    Assert.assertEquals("Should contain the correct value in column 3", 
projected.getLong(2), 300L);
-    Assert.assertTrue("Should contain empty value on new column 4", 
projected.isNullAt(3));
+    assertThat(projected.getLong(0))
+        .as("Should contain the correct value in column 1")
+        .isEqualTo(100L);
+    assertThat(projected.getLong(1))
+        .as("Should contain the correct value in column 2")
+        .isEqualTo(200L);
+    assertThat(projected.getLong(2))
+        .as("Should contain the correct value in column 1")
+        .isEqualTo(300L);
+    assertThat(projected.isNullAt(3)).as("Should contain empty value on new 
column 4").isTrue();
   }
 
   @Test
@@ -190,8 +188,8 @@ public class TestRowProjection {
 
     RowData projected = writeAndRead("empty_projection", schema, 
schema.select(), row);
 
-    Assert.assertNotNull("Should read a non-null record", projected);
-    Assert.assertEquals(0, projected.getArity());
+    assertThat(projected).isNotNull();
+    assertThat(projected.getArity()).isEqualTo(0);
   }
 
   @Test
@@ -206,16 +204,16 @@ public class TestRowProjection {
     Schema idOnly = new Schema(Types.NestedField.required(0, "id", 
Types.LongType.get()));
 
     RowData projected = writeAndRead("basic_projection_id", writeSchema, 
idOnly, row);
-    Assert.assertEquals("Should not project data", 1, projected.getArity());
-    Assert.assertEquals("Should contain the correct id value", 34L, 
projected.getLong(0));
+    assertThat(projected.getArity()).as("Should not project 
data").isEqualTo(1);
+    assertThat(projected.getLong(0)).isEqualTo(34L);
 
     Schema dataOnly = new Schema(Types.NestedField.optional(1, "data", 
Types.StringType.get()));
 
     projected = writeAndRead("basic_projection_data", writeSchema, dataOnly, 
row);
 
-    Assert.assertEquals("Should not project id", 1, projected.getArity());
+    assertThat(projected.getArity()).as("Should not project id").isEqualTo(1);
     int cmp = Comparators.charSequences().compare("test", 
projected.getString(0).toString());
-    Assert.assertEquals("Should contain the correct data value", 0, cmp);
+    assertThat(projected.getString(0)).asString().isEqualTo("test");
   }
 
   @Test
@@ -234,9 +232,11 @@ public class TestRowProjection {
 
     RowData projected = writeAndRead("project_and_rename", writeSchema, 
readSchema, row);
 
-    Assert.assertEquals("Should contain the correct id value", 34L, 
projected.getLong(0));
-    int cmp = Comparators.charSequences().compare("test", 
projected.getString(1).toString());
-    Assert.assertEquals("Should contain the correct data/renamed value", 0, 
cmp);
+    assertThat(projected.getLong(0)).isEqualTo(34L);
+    assertThat(projected.getString(1))
+        .as("Should contain the correct data/renamed value")
+        .asString()
+        .isEqualTo("test");
   }
 
   @Test
@@ -257,8 +257,8 @@ public class TestRowProjection {
     Schema idOnly = new Schema(Types.NestedField.required(0, "id", 
Types.LongType.get()));
 
     RowData projected = writeAndRead("id_only", writeSchema, idOnly, record);
-    Assert.assertEquals("Should not project location", 1, 
projected.getArity());
-    Assert.assertEquals("Should contain the correct id value", 34L, 
projected.getLong(0));
+    assertThat(projected.getArity()).isEqualTo(1);
+    assertThat(projected.getLong(0)).as("Should contain the correct id 
value").isEqualTo(34L);
 
     Schema latOnly =
         new Schema(
@@ -269,11 +269,12 @@ public class TestRowProjection {
 
     projected = writeAndRead("latitude_only", writeSchema, latOnly, record);
     RowData projectedLocation = projected.getRow(0, 1);
-    Assert.assertEquals("Should not project id", 1, projected.getArity());
-    Assert.assertFalse("Should project location", projected.isNullAt(0));
-    Assert.assertEquals("Should not project longitude", 1, 
projectedLocation.getArity());
-    Assert.assertEquals(
-        "Should project latitude", 52.995143f, projectedLocation.getFloat(0), 
0.000001f);
+    assertThat(projected.getArity()).as("Should not project id").isEqualTo(1);
+    assertThat(projected.isNullAt(0)).as("Should project location").isFalse();
+    assertThat(projectedLocation.getArity()).as("Should not project 
longitude").isEqualTo(1);
+    assertThat(projectedLocation.getFloat(0))
+        .as("Should project latitude")
+        .isEqualTo(52.995143f, withPrecision(0.000001f));
 
     Schema longOnly =
         new Schema(
@@ -284,21 +285,24 @@ public class TestRowProjection {
 
     projected = writeAndRead("longitude_only", writeSchema, longOnly, record);
     projectedLocation = projected.getRow(0, 1);
-    Assert.assertEquals("Should not project id", 1, projected.getArity());
-    Assert.assertFalse("Should project location", projected.isNullAt(0));
-    Assert.assertEquals("Should not project latitutde", 1, 
projectedLocation.getArity());
-    Assert.assertEquals(
-        "Should project longitude", -1.539054f, projectedLocation.getFloat(0), 
0.000001f);
+    assertThat(projected.getArity()).as("Should not project id").isEqualTo(1);
+    assertThat(projected.isNullAt(0)).as("Should project location").isFalse();
+    assertThat(projectedLocation.getArity()).as("Should not project 
latitutde").isEqualTo(1);
+    assertThat(projectedLocation.getFloat(0))
+        .as("Should project longitude")
+        .isEqualTo(-1.539054f, withPrecision(0.000001f));
 
     Schema locationOnly = writeSchema.select("location");
     projected = writeAndRead("location_only", writeSchema, locationOnly, 
record);
     projectedLocation = projected.getRow(0, 1);
-    Assert.assertEquals("Should not project id", 1, projected.getArity());
-    Assert.assertFalse("Should project location", projected.isNullAt(0));
-    Assert.assertEquals(
-        "Should project latitude", 52.995143f, projectedLocation.getFloat(0), 
0.000001f);
-    Assert.assertEquals(
-        "Should project longitude", -1.539054f, projectedLocation.getFloat(1), 
0.000001f);
+    assertThat(projected.getArity()).as("Should not project id").isEqualTo(1);
+    assertThat(projected.isNullAt(0)).as("Should project location").isFalse();
+    assertThat(projectedLocation.getFloat(0))
+        .as("Should project latitude")
+        .isEqualTo(52.995143f, withPrecision(0.000001f));
+    assertThat(projectedLocation.getFloat(1))
+        .as("Should project longitude")
+        .isEqualTo(-1.539054f, withPrecision(0.000001f));
   }
 
   @Test
@@ -324,23 +328,23 @@ public class TestRowProjection {
     Schema idOnly = new Schema(Types.NestedField.required(0, "id", 
Types.LongType.get()));
 
     RowData projected = writeAndRead("id_only", writeSchema, idOnly, row);
-    Assert.assertEquals("Should contain the correct id value", 34L, 
projected.getLong(0));
-    Assert.assertEquals("Should not project properties map", 1, 
projected.getArity());
+    assertThat(projected.getLong(0)).isEqualTo(34L);
+    assertThat(projected.getArity()).as("Should not project properties 
map").isEqualTo(1);
 
     Schema keyOnly = writeSchema.select("properties.key");
     projected = writeAndRead("key_only", writeSchema, keyOnly, row);
-    Assert.assertEquals("Should not project id", 1, projected.getArity());
-    Assert.assertEquals("Should project entire map", properties, 
projected.getMap(0));
+    assertThat(projected.getArity()).as("Should not project id").isEqualTo(1);
+    assertThat(projected.getMap(0)).isEqualTo(properties);
 
     Schema valueOnly = writeSchema.select("properties.value");
     projected = writeAndRead("value_only", writeSchema, valueOnly, row);
-    Assert.assertEquals("Should not project id", 1, projected.getArity());
-    Assert.assertEquals("Should project entire map", properties, 
projected.getMap(0));
+    assertThat(projected.getArity()).as("Should not project id").isEqualTo(1);
+    assertThat(projected.getMap(0)).isEqualTo(properties);
 
     Schema mapOnly = writeSchema.select("properties");
     projected = writeAndRead("map_only", writeSchema, mapOnly, row);
-    Assert.assertEquals("Should not project id", 1, projected.getArity());
-    Assert.assertEquals("Should project entire map", properties, 
projected.getMap(0));
+    assertThat(projected.getArity()).as("Should not project id").isEqualTo(1);
+    assertThat(projected.getMap(0)).isEqualTo(properties);
   }
 
   private Map<String, ?> toStringMap(Map<?, ?> map) {
@@ -381,42 +385,50 @@ public class TestRowProjection {
     Schema idOnly = new Schema(Types.NestedField.required(0, "id", 
Types.LongType.get()));
 
     RowData projected = writeAndRead("id_only", writeSchema, idOnly, row);
-    Assert.assertEquals("Should contain the correct id value", 34L, 
projected.getLong(0));
-    Assert.assertEquals("Should not project locations map", 1, 
projected.getArity());
+    assertThat(projected.getLong(0)).isEqualTo(34L);
+    assertThat(projected.getArity()).as("Should not project locations 
map").isEqualTo(1);
 
     projected = writeAndRead("all_locations", writeSchema, 
writeSchema.select("locations"), row);
-    Assert.assertEquals("Should not project id", 1, projected.getArity());
-    Assert.assertEquals("Should project locations map", row.getMap(1), 
projected.getMap(0));
+    assertThat(projected.getArity()).as("Should not project id").isEqualTo(1);
+    assertThat(projected.getMap(0)).isEqualTo(row.getMap(1));
 
     projected = writeAndRead("lat_only", writeSchema, 
writeSchema.select("locations.lat"), row);
     GenericMapData locations = (GenericMapData) projected.getMap(0);
-    Assert.assertNotNull("Should project locations map", locations);
+    assertThat(locations).isNotNull();
     GenericArrayData l1l2Array =
         new GenericArrayData(
             new Object[] {StringData.fromString("L2"), 
StringData.fromString("L1")});
-    Assert.assertEquals("Should contain L1 and L2", l1l2Array, 
locations.keyArray());
+    assertThat(locations.keyArray()).isEqualTo(l1l2Array);
     RowData projectedL1 = (RowData) locations.get(StringData.fromString("L1"));
-    Assert.assertNotNull("L1 should not be null", projectedL1);
-    Assert.assertEquals("L1 should contain lat", 53.992811f, 
projectedL1.getFloat(0), 0.000001);
-    Assert.assertEquals("L1 should not contain long", 1, 
projectedL1.getArity());
+    assertThat(projectedL1).isNotNull();
+    assertThat(projectedL1.getFloat(0))
+        .as("L1 should contain lat")
+        .isEqualTo(53.992811f, withPrecision(0.000001f));
+    assertThat(projectedL1.getArity()).as("L1 should not contain 
long").isEqualTo(1);
     RowData projectedL2 = (RowData) locations.get(StringData.fromString("L2"));
-    Assert.assertNotNull("L2 should not be null", projectedL2);
-    Assert.assertEquals("L2 should contain lat", 52.995143f, 
projectedL2.getFloat(0), 0.000001);
-    Assert.assertEquals("L2 should not contain long", 1, 
projectedL2.getArity());
+    assertThat(projectedL2).isNotNull();
+    assertThat(projectedL2.getFloat(0))
+        .as("L2 should contain lat")
+        .isEqualTo(52.995143f, withPrecision(0.000001f));
+    assertThat(projectedL2.getArity()).as("L2 should not contain 
long").isEqualTo(1);
 
     projected = writeAndRead("long_only", writeSchema, 
writeSchema.select("locations.long"), row);
-    Assert.assertEquals("Should not project id", 1, projected.getArity());
+    assertThat(projected.getArity()).as("Should not project id").isEqualTo(1);
     locations = (GenericMapData) projected.getMap(0);
-    Assert.assertNotNull("Should project locations map", locations);
-    Assert.assertEquals("Should contain L1 and L2", l1l2Array, 
locations.keyArray());
+    assertThat(locations).isNotNull();
+    assertThat(locations.keyArray()).isEqualTo(l1l2Array);
     projectedL1 = (RowData) locations.get(StringData.fromString("L1"));
-    Assert.assertNotNull("L1 should not be null", projectedL1);
-    Assert.assertEquals("L1 should not contain lat", 1, 
projectedL1.getArity());
-    Assert.assertEquals("L1 should contain long", -1.542616f, 
projectedL1.getFloat(0), 0.000001);
+    assertThat(projectedL1).isNotNull();
+    assertThat(projectedL1.getArity()).as("L1 should not contain 
lat").isEqualTo(1);
+    assertThat(projectedL1.getFloat(0))
+        .as("L1 should contain long")
+        .isEqualTo(-1.542616f, withPrecision(0.000001f));
     projectedL2 = (RowData) locations.get(StringData.fromString("L2"));
-    Assert.assertNotNull("L2 should not be null", projectedL2);
-    Assert.assertEquals("L2 should not contain lat", 1, 
projectedL2.getArity());
-    Assert.assertEquals("L2 should contain long", -1.539054f, 
projectedL2.getFloat(0), 0.000001);
+    assertThat(projectedL2).isNotNull();
+    assertThat(projectedL2.getArity()).as("L2 should not contain 
lat").isEqualTo(1);
+    assertThat(projectedL2.getFloat(0))
+        .as("L2 should contain long")
+        .isEqualTo(-1.539054f, withPrecision(0.000001f));
 
     Schema latitiudeRenamed =
         new Schema(
@@ -431,18 +443,20 @@ public class TestRowProjection {
                         Types.NestedField.required(1, "latitude", 
Types.FloatType.get())))));
 
     projected = writeAndRead("latitude_renamed", writeSchema, 
latitiudeRenamed, row);
-    Assert.assertEquals("Should not project id", 1, projected.getArity());
+    assertThat(projected.getArity()).as("Should not project id").isEqualTo(1);
     locations = (GenericMapData) projected.getMap(0);
-    Assert.assertNotNull("Should project locations map", locations);
-    Assert.assertEquals("Should contain L1 and L2", l1l2Array, 
locations.keyArray());
+    assertThat(locations).isNotNull();
+    assertThat(locations.keyArray()).isEqualTo(l1l2Array);
     projectedL1 = (RowData) locations.get(StringData.fromString("L1"));
-    Assert.assertNotNull("L1 should not be null", projectedL1);
-    Assert.assertEquals(
-        "L1 should contain latitude", 53.992811f, projectedL1.getFloat(0), 
0.000001);
+    assertThat(projectedL1).isNotNull();
+    assertThat(projectedL1.getFloat(0))
+        .as("L1 should contain latitude")
+        .isEqualTo(53.992811f, withPrecision(0.000001f));
     projectedL2 = (RowData) locations.get(StringData.fromString("L2"));
-    Assert.assertNotNull("L2 should not be null", projectedL2);
-    Assert.assertEquals(
-        "L2 should contain latitude", 52.995143f, projectedL2.getFloat(0), 
0.000001);
+    assertThat(projectedL2).isNotNull();
+    assertThat(projectedL2.getFloat(0))
+        .as("L2 should contain latitude")
+        .isEqualTo(52.995143f, withPrecision(0.000001f));
   }
 
   @Test
@@ -460,18 +474,18 @@ public class TestRowProjection {
     Schema idOnly = new Schema(Types.NestedField.required(0, "id", 
Types.LongType.get()));
 
     RowData projected = writeAndRead("id_only", writeSchema, idOnly, row);
-    Assert.assertEquals("Should contain the correct id value", 34L, 
projected.getLong(0));
-    Assert.assertEquals("Should not project values list", 1, 
projected.getArity());
+    assertThat(projected.getLong(0)).isEqualTo(34L);
+    assertThat(projected.getArity()).as("Should not project values 
list").isEqualTo(1);
 
     Schema elementOnly = writeSchema.select("values.element");
     projected = writeAndRead("element_only", writeSchema, elementOnly, row);
-    Assert.assertEquals("Should not project id", 1, projected.getArity());
-    Assert.assertEquals("Should project entire list", values, 
projected.getArray(0));
+    assertThat(projected.getArity()).as("Should not project id").isEqualTo(1);
+    assertThat(projected.getArray(0)).isEqualTo(values);
 
     Schema listOnly = writeSchema.select("values");
     projected = writeAndRead("list_only", writeSchema, listOnly, row);
-    Assert.assertEquals("Should not project id", 1, projected.getArity());
-    Assert.assertEquals("Should project entire list", values, 
projected.getArray(0));
+    assertThat(projected.getArity()).as("Should not project id").isEqualTo(1);
+    assertThat(projected.getArray(0)).isEqualTo(values);
   }
 
   @Test
@@ -497,36 +511,36 @@ public class TestRowProjection {
     Schema idOnly = new Schema(Types.NestedField.required(0, "id", 
Types.LongType.get()));
 
     RowData projected = writeAndRead("id_only", writeSchema, idOnly, row);
-    Assert.assertEquals("Should contain the correct id value", 34L, 
projected.getLong(0));
-    Assert.assertEquals("Should not project points list", 1, 
projected.getArity());
+    assertThat(projected.getLong(0)).isEqualTo(34L);
+    assertThat(projected.getArity()).isEqualTo(1);
 
     projected = writeAndRead("all_points", writeSchema, 
writeSchema.select("points"), row);
-    Assert.assertEquals("Should not project id", 1, projected.getArity());
-    Assert.assertEquals("Should project points list", row.getArray(1), 
projected.getArray(0));
+    assertThat(projected.getArity()).as("Should not project id").isEqualTo(1);
+    assertThat(projected.getArray(0)).isEqualTo(row.getArray(1));
 
     projected = writeAndRead("x_only", writeSchema, 
writeSchema.select("points.x"), row);
-    Assert.assertEquals("Should not project id", 1, projected.getArity());
-    Assert.assertFalse("Should project points list", projected.isNullAt(0));
+    assertThat(projected.getArity()).as("Should not project id").isEqualTo(1);
+    assertThat(projected.isNullAt(0)).isFalse();
     ArrayData points = projected.getArray(0);
-    Assert.assertEquals("Should read 2 points", 2, points.size());
+    assertThat(points.size()).isEqualTo(2);
     RowData projectedP1 = points.getRow(0, 2);
-    Assert.assertEquals("Should project x", 1, projectedP1.getInt(0));
-    Assert.assertEquals("Should not project y", 1, projectedP1.getArity());
+    assertThat(projectedP1.getInt(0)).as("Should project x").isEqualTo(1);
+    assertThat(projectedP1.getArity()).as("Should not project y").isEqualTo(1);
     RowData projectedP2 = points.getRow(1, 2);
-    Assert.assertEquals("Should not project y", 1, projectedP2.getArity());
-    Assert.assertEquals("Should project x", 3, projectedP2.getInt(0));
+    assertThat(projectedP2.getArity()).as("Should not project y").isEqualTo(1);
+    assertThat(projectedP2.getInt(0)).as("Should project x").isEqualTo(3);
 
     projected = writeAndRead("y_only", writeSchema, 
writeSchema.select("points.y"), row);
-    Assert.assertEquals("Should not project id", 1, projected.getArity());
-    Assert.assertFalse("Should project points list", projected.isNullAt(0));
+    assertThat(projected.getArity()).as("Should not project id").isEqualTo(1);
+    assertThat(projected.isNullAt(0)).isFalse();
     points = projected.getArray(0);
-    Assert.assertEquals("Should read 2 points", 2, points.size());
+    assertThat(points.size()).isEqualTo(2);
     projectedP1 = points.getRow(0, 2);
-    Assert.assertEquals("Should not project x", 1, projectedP1.getArity());
-    Assert.assertEquals("Should project y", 2, projectedP1.getInt(0));
+    assertThat(projectedP1.getArity()).as("Should not project x").isEqualTo(1);
+    assertThat(projectedP1.getInt(0)).as("Should project y").isEqualTo(2);
     projectedP2 = points.getRow(1, 2);
-    Assert.assertEquals("Should not project x", 1, projectedP2.getArity());
-    Assert.assertTrue("Should project null y", projectedP2.isNullAt(0));
+    assertThat(projectedP2.getArity()).as("Should not project x").isEqualTo(1);
+    assertThat(projectedP2.isNullAt(0)).as("Should project null y").isTrue();
 
     Schema yRenamed =
         new Schema(
@@ -539,16 +553,16 @@ public class TestRowProjection {
                         Types.NestedField.optional(18, "z", 
Types.IntegerType.get())))));
 
     projected = writeAndRead("y_renamed", writeSchema, yRenamed, row);
-    Assert.assertEquals("Should not project id", 1, projected.getArity());
-    Assert.assertFalse("Should project points list", projected.isNullAt(0));
+    assertThat(projected.getArity()).as("Should not project id").isEqualTo(1);
+    assertThat(projected.isNullAt(0)).isFalse();
     points = projected.getArray(0);
-    Assert.assertEquals("Should read 2 points", 2, points.size());
+    assertThat(points.size()).isEqualTo(2);
     projectedP1 = points.getRow(0, 2);
-    Assert.assertEquals("Should not project x and y", 1, 
projectedP1.getArity());
-    Assert.assertEquals("Should project z", 2, projectedP1.getInt(0));
+    assertThat(projectedP1.getArity()).as("Should not project x and 
y").isEqualTo(1);
+    assertThat(projectedP1.getInt(0)).as("Should project z").isEqualTo(2);
     projectedP2 = points.getRow(1, 2);
-    Assert.assertEquals("Should not project x and y", 1, 
projectedP2.getArity());
-    Assert.assertTrue("Should project null z", projectedP2.isNullAt(0));
+    assertThat(projectedP2.getArity()).as("Should not project x and 
y").isEqualTo(1);
+    assertThat(projectedP2.isNullAt(0)).as("Should project null z").isTrue();
   }
 
   @Test
@@ -572,9 +586,11 @@ public class TestRowProjection {
 
     RowData projected =
         writeAndRead("add_fields_with_required_children_projection", schema, 
addedFields, row);
-    Assert.assertEquals("Should contain the correct value in column 1", 
projected.getLong(0), 100L);
-    Assert.assertTrue("Should contain empty value in new column 2", 
projected.isNullAt(1));
-    Assert.assertTrue("Should contain empty value in new column 4", 
projected.isNullAt(2));
-    Assert.assertTrue("Should contain empty value in new column 6", 
projected.isNullAt(3));
+    assertThat(projected.getLong(0))
+        .as("Should contain the correct value in column 1")
+        .isEqualTo(100L);
+    assertThat(projected.isNullAt(1)).as("Should contain empty value in new 
column 2").isTrue();
+    assertThat(projected.isNullAt(2)).as("Should contain empty value in new 
column 4").isTrue();
+    assertThat(projected.isNullAt(3)).as("Should contain empty value in new 
column 6").isTrue();
   }
 }
diff --git 
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/data/TestStructRowData.java
 
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/data/TestStructRowData.java
index e0340e0743..eccab20e04 100644
--- 
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/data/TestStructRowData.java
+++ 
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/data/TestStructRowData.java
@@ -22,7 +22,7 @@ import org.apache.iceberg.data.GenericRecord;
 import org.apache.iceberg.flink.DataGenerator;
 import org.apache.iceberg.flink.DataGenerators;
 import org.apache.iceberg.flink.TestHelpers;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 public class TestStructRowData {
 
diff --git 
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/sink/TestAvroGenericRecordToRowDataMapper.java
 
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/sink/TestAvroGenericRecordToRowDataMapper.java
index 6a493692c2..44eb907a17 100644
--- 
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/sink/TestAvroGenericRecordToRowDataMapper.java
+++ 
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/sink/TestAvroGenericRecordToRowDataMapper.java
@@ -18,10 +18,11 @@
  */
 package org.apache.iceberg.flink.sink;
 
+import static org.assertj.core.api.Assertions.assertThat;
+
 import org.apache.flink.table.data.RowData;
 import org.apache.iceberg.flink.AvroGenericRecordConverterBase;
 import org.apache.iceberg.flink.DataGenerator;
-import org.junit.Assert;
 
 public class TestAvroGenericRecordToRowDataMapper extends 
AvroGenericRecordConverterBase {
   @Override
@@ -32,6 +33,6 @@ public class TestAvroGenericRecordToRowDataMapper extends 
AvroGenericRecordConve
         
AvroGenericRecordToRowDataMapper.forAvroSchema(dataGenerator.avroSchema());
     RowData expected = dataGenerator.generateFlinkRowData();
     RowData actual = mapper.map(dataGenerator.generateAvroGenericRecord());
-    Assert.assertEquals(expected, actual);
+    assertThat(actual).isEqualTo(expected);
   }
 }
diff --git 
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestRowDataToAvroGenericRecordConverter.java
 
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestRowDataToAvroGenericRecordConverter.java
index 485035787d..6ef4069382 100644
--- 
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestRowDataToAvroGenericRecordConverter.java
+++ 
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestRowDataToAvroGenericRecordConverter.java
@@ -18,10 +18,11 @@
  */
 package org.apache.iceberg.flink.source;
 
+import static org.assertj.core.api.Assertions.assertThat;
+
 import org.apache.avro.generic.GenericRecord;
 import org.apache.iceberg.flink.AvroGenericRecordConverterBase;
 import org.apache.iceberg.flink.DataGenerator;
-import org.junit.Assert;
 
 public class TestRowDataToAvroGenericRecordConverter extends 
AvroGenericRecordConverterBase {
   @Override
@@ -30,6 +31,6 @@ public class TestRowDataToAvroGenericRecordConverter extends 
AvroGenericRecordCo
         
RowDataToAvroGenericRecordConverter.fromAvroSchema(dataGenerator.avroSchema());
     GenericRecord expected = dataGenerator.generateAvroGenericRecord();
     GenericRecord actual = 
converter.apply(dataGenerator.generateFlinkRowData());
-    Assert.assertEquals(expected, actual);
+    assertThat(actual).isEqualTo(expected);
   }
 }
diff --git 
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/util/TestFlinkPackage.java
 
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/util/TestFlinkPackage.java
index 6cef0d1228..08bbc4fc80 100644
--- 
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/util/TestFlinkPackage.java
+++ 
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/util/TestFlinkPackage.java
@@ -18,8 +18,9 @@
  */
 package org.apache.iceberg.flink.util;
 
-import org.junit.Assert;
-import org.junit.Test;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.junit.jupiter.api.Test;
 import org.mockito.MockedStatic;
 import org.mockito.Mockito;
 
@@ -28,7 +29,7 @@ public class TestFlinkPackage {
   /** This unit test would need to be adjusted as new Flink version is 
supported. */
   @Test
   public void testVersion() {
-    Assert.assertEquals("1.18.1", FlinkPackage.version());
+    assertThat(FlinkPackage.version()).isEqualTo("1.18.1");
   }
 
   @Test
@@ -41,14 +42,14 @@ public class TestFlinkPackage {
     try (MockedStatic<FlinkPackage> mockedStatic = 
Mockito.mockStatic(FlinkPackage.class)) {
       
mockedStatic.when(FlinkPackage::versionFromJar).thenThrow(RuntimeException.class);
       mockedStatic.when(FlinkPackage::version).thenCallRealMethod();
-      Assert.assertEquals(FlinkPackage.FLINK_UNKNOWN_VERSION, 
FlinkPackage.version());
+      
assertThat(FlinkPackage.version()).isEqualTo(FlinkPackage.FLINK_UNKNOWN_VERSION);
     }
     FlinkPackage.setVersion(null);
     try (MockedStatic<FlinkPackage> mockedStatic = 
Mockito.mockStatic(FlinkPackage.class)) {
       mockedStatic.when(FlinkPackage::versionFromJar).thenReturn(null);
       mockedStatic.when(FlinkPackage::version).thenCallRealMethod();
       FlinkPackage.setVersion(null);
-      Assert.assertEquals(FlinkPackage.FLINK_UNKNOWN_VERSION, 
FlinkPackage.version());
+      
assertThat(FlinkPackage.version()).isEqualTo(FlinkPackage.FLINK_UNKNOWN_VERSION);
     }
   }
 }

Reply via email to