This is an automated email from the ASF dual-hosted git repository.
etudenhoefner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new c41c599fe8 Flink: Backport Flink 1.18 JUnit5 migration to Flink 1.17
(#10163)
c41c599fe8 is described below
commit c41c599fe877b676628ce05ff2653401d306c9c6
Author: Tom Tanaka <[email protected]>
AuthorDate: Wed Apr 17 20:43:23 2024 +0900
Flink: Backport Flink 1.18 JUnit5 migration to Flink 1.17 (#10163)
---
.../flink/AvroGenericRecordConverterBase.java | 2 +-
.../iceberg/flink/TestDataFileSerialization.java | 14 ++--
.../iceberg/flink/TestFlinkCatalogFactory.java | 20 ++---
.../org/apache/iceberg/flink/TestFlinkFilters.java | 87 ++++++++++------------
.../apache/iceberg/flink/TestFlinkSchemaUtil.java | 40 +++++-----
.../flink/TestManifestFileSerialization.java | 17 ++---
.../apache/iceberg/flink/TestRowDataWrapper.java | 18 ++---
.../iceberg/flink/TestTableSerialization.java | 18 ++---
.../sink/TestAvroGenericRecordToRowDataMapper.java | 5 +-
.../TestRowDataToAvroGenericRecordConverter.java | 5 +-
.../iceberg/flink/util/TestFlinkPackage.java | 11 +--
11 files changed, 116 insertions(+), 121 deletions(-)
diff --git
a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/AvroGenericRecordConverterBase.java
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/AvroGenericRecordConverterBase.java
index 47319ec9bc..4184526a6a 100644
---
a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/AvroGenericRecordConverterBase.java
+++
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/AvroGenericRecordConverterBase.java
@@ -18,7 +18,7 @@
*/
package org.apache.iceberg.flink;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
public abstract class AvroGenericRecordConverterBase {
protected abstract void testConverter(DataGenerator dataGenerator) throws
Exception;
diff --git
a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestDataFileSerialization.java
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestDataFileSerialization.java
index e9372adda4..8992cbd751 100644
---
a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestDataFileSerialization.java
+++
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestDataFileSerialization.java
@@ -20,6 +20,7 @@ package org.apache.iceberg.flink;
import static org.apache.iceberg.types.Types.NestedField.optional;
import static org.apache.iceberg.types.Types.NestedField.required;
+import static org.assertj.core.api.Assertions.assertThat;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
@@ -44,8 +45,7 @@ import org.apache.iceberg.SortOrder;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Types;
-import org.assertj.core.api.Assertions;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
public class TestDataFileSerialization {
@@ -135,23 +135,19 @@ public class TestDataFileSerialization {
new ObjectInputStream(new ByteArrayInputStream(bytes.toByteArray()))) {
for (int i = 0; i < 2; i += 1) {
Object obj = in.readObject();
- Assertions.assertThat(obj).as("Should be a
DataFile").isInstanceOf(DataFile.class);
+ assertThat(obj).as("Should be a
DataFile").isInstanceOf(DataFile.class);
TestHelpers.assertEquals(DATA_FILE, (DataFile) obj);
}
for (int i = 0; i < 2; i += 1) {
Object obj = in.readObject();
- Assertions.assertThat(obj)
- .as("Should be a position DeleteFile")
- .isInstanceOf(DeleteFile.class);
+ assertThat(obj).as("Should be a position
DeleteFile").isInstanceOf(DeleteFile.class);
TestHelpers.assertEquals(POS_DELETE_FILE, (DeleteFile) obj);
}
for (int i = 0; i < 2; i += 1) {
Object obj = in.readObject();
- Assertions.assertThat(obj)
- .as("Should be a equality DeleteFile")
- .isInstanceOf(DeleteFile.class);
+ assertThat(obj).as("Should be a equality
DeleteFile").isInstanceOf(DeleteFile.class);
TestHelpers.assertEquals(EQ_DELETE_FILE, (DeleteFile) obj);
}
}
diff --git
a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogFactory.java
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogFactory.java
index ba08b76dd5..4c9e95b8fa 100644
---
a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogFactory.java
+++
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogFactory.java
@@ -18,6 +18,9 @@
*/
package org.apache.iceberg.flink;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.CatalogProperties;
@@ -26,15 +29,14 @@ import org.apache.iceberg.hadoop.HadoopCatalog;
import org.apache.iceberg.hive.HiveCatalog;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
-import org.assertj.core.api.Assertions;
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
public class TestFlinkCatalogFactory {
private Map<String, String> props;
- @Before
+ @BeforeEach
public void before() {
props = Maps.newHashMap();
props.put("type", "iceberg");
@@ -51,7 +53,7 @@ public class TestFlinkCatalogFactory {
FlinkCatalogFactory.createCatalogLoader(catalogName, props, new
Configuration())
.loadCatalog();
- Assertions.assertThat(catalog).isNotNull().isInstanceOf(HiveCatalog.class);
+ assertThat(catalog).isNotNull().isInstanceOf(HiveCatalog.class);
}
@Test
@@ -64,7 +66,7 @@ public class TestFlinkCatalogFactory {
FlinkCatalogFactory.createCatalogLoader(catalogName, props, new
Configuration())
.loadCatalog();
-
Assertions.assertThat(catalog).isNotNull().isInstanceOf(HadoopCatalog.class);
+ assertThat(catalog).isNotNull().isInstanceOf(HadoopCatalog.class);
}
@Test
@@ -76,7 +78,7 @@ public class TestFlinkCatalogFactory {
FlinkCatalogFactory.createCatalogLoader(catalogName, props, new
Configuration())
.loadCatalog();
-
Assertions.assertThat(catalog).isNotNull().isInstanceOf(CustomHadoopCatalog.class);
+ assertThat(catalog).isNotNull().isInstanceOf(CustomHadoopCatalog.class);
}
@Test
@@ -86,7 +88,7 @@ public class TestFlinkCatalogFactory {
props.put(
FlinkCatalogFactory.ICEBERG_CATALOG_TYPE,
FlinkCatalogFactory.ICEBERG_CATALOG_TYPE_HIVE);
- Assertions.assertThatThrownBy(
+ assertThatThrownBy(
() -> FlinkCatalogFactory.createCatalogLoader(catalogName, props,
new Configuration()))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageStartingWith(
@@ -98,7 +100,7 @@ public class TestFlinkCatalogFactory {
String catalogName = "unknownCatalog";
props.put(FlinkCatalogFactory.ICEBERG_CATALOG_TYPE, "fooType");
- Assertions.assertThatThrownBy(
+ assertThatThrownBy(
() -> FlinkCatalogFactory.createCatalogLoader(catalogName, props,
new Configuration()))
.isInstanceOf(UnsupportedOperationException.class)
.hasMessageStartingWith("Unknown catalog-type: fooType");
diff --git
a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkFilters.java
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkFilters.java
index c89ea4f530..838b0ea0e1 100644
---
a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkFilters.java
+++
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkFilters.java
@@ -18,6 +18,8 @@
*/
package org.apache.iceberg.flink;
+import static org.assertj.core.api.Assertions.assertThat;
+
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.time.Instant;
@@ -49,9 +51,7 @@ import org.apache.iceberg.expressions.UnboundPredicate;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.util.DateTimeUtil;
import org.apache.iceberg.util.Pair;
-import org.assertj.core.api.Assertions;
-import org.junit.Assert;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
public class TestFlinkFilters {
@@ -121,13 +121,13 @@ public class TestFlinkFilters {
Optional<org.apache.iceberg.expressions.Expression> actual =
FlinkFilters.convert(
resolve(Expressions.$(pair.first()).isEqual(Expressions.lit(pair.second()))));
- Assert.assertTrue("Conversion should succeed", actual.isPresent());
+ assertThat(actual).isPresent();
assertPredicatesMatch(expected, actual.get());
Optional<org.apache.iceberg.expressions.Expression> actual1 =
FlinkFilters.convert(
resolve(Expressions.lit(pair.second()).isEqual(Expressions.$(pair.first()))));
- Assert.assertTrue("Conversion should succeed", actual1.isPresent());
+ assertThat(actual1).isPresent();
assertPredicatesMatch(expected, actual1.get());
}
}
@@ -138,12 +138,12 @@ public class TestFlinkFilters {
Optional<org.apache.iceberg.expressions.Expression> actual =
FlinkFilters.convert(resolve(Expressions.$("field3").isEqual(Expressions.lit(Float.NaN))));
- Assert.assertTrue("Conversion should succeed", actual.isPresent());
+ assertThat(actual).isPresent();
assertPredicatesMatch(expected, actual.get());
Optional<org.apache.iceberg.expressions.Expression> actual1 =
FlinkFilters.convert(resolve(Expressions.lit(Float.NaN).isEqual(Expressions.$("field3"))));
- Assert.assertTrue("Conversion should succeed", actual1.isPresent());
+ assertThat(actual1).isPresent();
assertPredicatesMatch(expected, actual1.get());
}
@@ -156,13 +156,13 @@ public class TestFlinkFilters {
Optional<org.apache.iceberg.expressions.Expression> actual =
FlinkFilters.convert(
resolve(Expressions.$(pair.first()).isNotEqual(Expressions.lit(pair.second()))));
- Assert.assertTrue("Conversion should succeed", actual.isPresent());
+ assertThat(actual).isPresent();
assertPredicatesMatch(expected, actual.get());
Optional<org.apache.iceberg.expressions.Expression> actual1 =
FlinkFilters.convert(
resolve(Expressions.lit(pair.second()).isNotEqual(Expressions.$(pair.first()))));
- Assert.assertTrue("Conversion should succeed", actual1.isPresent());
+ assertThat(actual1).isPresent();
assertPredicatesMatch(expected, actual1.get());
}
}
@@ -174,13 +174,13 @@ public class TestFlinkFilters {
Optional<org.apache.iceberg.expressions.Expression> actual =
FlinkFilters.convert(
resolve(Expressions.$("field3").isNotEqual(Expressions.lit(Float.NaN))));
- Assert.assertTrue("Conversion should succeed", actual.isPresent());
+ assertThat(actual).isPresent();
assertPredicatesMatch(expected, actual.get());
Optional<org.apache.iceberg.expressions.Expression> actual1 =
FlinkFilters.convert(
resolve(Expressions.lit(Float.NaN).isNotEqual(Expressions.$("field3"))));
- Assert.assertTrue("Conversion should succeed", actual1.isPresent());
+ assertThat(actual1).isPresent();
assertPredicatesMatch(expected, actual1.get());
}
@@ -191,12 +191,12 @@ public class TestFlinkFilters {
Optional<org.apache.iceberg.expressions.Expression> actual =
FlinkFilters.convert(resolve(Expressions.$("field1").isGreater(Expressions.lit(1))));
- Assert.assertTrue("Conversion should succeed", actual.isPresent());
+ assertThat(actual).isPresent();
assertPredicatesMatch(expected, actual.get());
Optional<org.apache.iceberg.expressions.Expression> actual1 =
FlinkFilters.convert(resolve(Expressions.lit(1).isLess(Expressions.$("field1"))));
- Assert.assertTrue("Conversion should succeed", actual1.isPresent());
+ assertThat(actual1).isPresent();
assertPredicatesMatch(expected, actual1.get());
}
@@ -207,12 +207,12 @@ public class TestFlinkFilters {
Optional<org.apache.iceberg.expressions.Expression> actual =
FlinkFilters.convert(resolve(Expressions.$("field1").isGreaterOrEqual(Expressions.lit(1))));
- Assert.assertTrue("Conversion should succeed", actual.isPresent());
+ assertThat(actual).isPresent();
assertPredicatesMatch(expected, actual.get());
Optional<org.apache.iceberg.expressions.Expression> actual1 =
FlinkFilters.convert(resolve(Expressions.lit(1).isLessOrEqual(Expressions.$("field1"))));
- Assert.assertTrue("Conversion should succeed", actual1.isPresent());
+ assertThat(actual1).isPresent();
assertPredicatesMatch(expected, actual1.get());
}
@@ -223,12 +223,12 @@ public class TestFlinkFilters {
Optional<org.apache.iceberg.expressions.Expression> actual =
FlinkFilters.convert(resolve(Expressions.$("field1").isLess(Expressions.lit(1))));
- Assert.assertTrue("Conversion should succeed", actual.isPresent());
+ assertThat(actual).isPresent();
assertPredicatesMatch(expected, actual.get());
Optional<org.apache.iceberg.expressions.Expression> actual1 =
FlinkFilters.convert(resolve(Expressions.lit(1).isGreater(Expressions.$("field1"))));
- Assert.assertTrue("Conversion should succeed", actual1.isPresent());
+ assertThat(actual1).isPresent();
assertPredicatesMatch(expected, actual1.get());
}
@@ -239,12 +239,12 @@ public class TestFlinkFilters {
Optional<org.apache.iceberg.expressions.Expression> actual =
FlinkFilters.convert(resolve(Expressions.$("field1").isLessOrEqual(Expressions.lit(1))));
- Assert.assertTrue("Conversion should succeed", actual.isPresent());
+ assertThat(actual).isPresent();
assertPredicatesMatch(expected, actual.get());
Optional<org.apache.iceberg.expressions.Expression> actual1 =
FlinkFilters.convert(resolve(Expressions.lit(1).isGreaterOrEqual(Expressions.$("field1"))));
- Assert.assertTrue("Conversion should succeed", actual1.isPresent());
+ assertThat(actual1).isPresent();
assertPredicatesMatch(expected, actual1.get());
}
@@ -252,7 +252,7 @@ public class TestFlinkFilters {
public void testIsNull() {
Expression expr = resolve(Expressions.$("field1").isNull());
Optional<org.apache.iceberg.expressions.Expression> actual =
FlinkFilters.convert(expr);
- Assert.assertTrue("Conversion should succeed", actual.isPresent());
+ assertThat(actual).isPresent();
UnboundPredicate<Object> expected =
org.apache.iceberg.expressions.Expressions.isNull("field1");
assertPredicatesMatch(expected, actual.get());
}
@@ -261,7 +261,7 @@ public class TestFlinkFilters {
public void testIsNotNull() {
Expression expr = resolve(Expressions.$("field1").isNotNull());
Optional<org.apache.iceberg.expressions.Expression> actual =
FlinkFilters.convert(expr);
- Assert.assertTrue("Conversion should succeed", actual.isPresent());
+ assertThat(actual).isPresent();
UnboundPredicate<Object> expected =
org.apache.iceberg.expressions.Expressions.notNull("field1");
assertPredicatesMatch(expected, actual.get());
@@ -275,7 +275,7 @@ public class TestFlinkFilters {
.isEqual(Expressions.lit(1))
.and(Expressions.$("field2").isEqual(Expressions.lit(2L))));
Optional<org.apache.iceberg.expressions.Expression> actual =
FlinkFilters.convert(expr);
- Assert.assertTrue("Conversion should succeed", actual.isPresent());
+ assertThat(actual).isPresent();
And and = (And) actual.get();
And expected =
(And)
@@ -295,7 +295,7 @@ public class TestFlinkFilters {
.isEqual(Expressions.lit(1))
.or(Expressions.$("field2").isEqual(Expressions.lit(2L))));
Optional<org.apache.iceberg.expressions.Expression> actual =
FlinkFilters.convert(expr);
- Assert.assertTrue("Conversion should succeed", actual.isPresent());
+ assertThat(actual).isPresent();
Or or = (Or) actual.get();
Or expected =
(Or)
@@ -315,14 +315,14 @@ public class TestFlinkFilters {
BuiltInFunctionDefinitions.NOT,
Expressions.$("field1").isEqual(Expressions.lit(1))));
Optional<org.apache.iceberg.expressions.Expression> actual =
FlinkFilters.convert(expr);
- Assert.assertTrue("Conversion should succeed", actual.isPresent());
+ assertThat(actual).isPresent();
Not not = (Not) actual.get();
Not expected =
(Not)
org.apache.iceberg.expressions.Expressions.not(
org.apache.iceberg.expressions.Expressions.equal("field1", 1));
- Assert.assertEquals("Predicate operation should match", expected.op(),
not.op());
+ assertThat(not.op()).as("Predicate operation should
match").isEqualTo(expected.op());
assertPredicatesMatch(expected.child(), not.child());
}
@@ -335,7 +335,7 @@ public class TestFlinkFilters {
ApiExpressionUtils.unresolvedCall(
BuiltInFunctionDefinitions.LIKE, Expressions.$("field5"),
Expressions.lit("abc%")));
Optional<org.apache.iceberg.expressions.Expression> actual =
FlinkFilters.convert(expr);
- Assert.assertTrue("Conversion should succeed", actual.isPresent());
+ assertThat(actual).isPresent();
assertPredicatesMatch(expected, actual.get());
expr =
@@ -343,7 +343,7 @@ public class TestFlinkFilters {
ApiExpressionUtils.unresolvedCall(
BuiltInFunctionDefinitions.LIKE, Expressions.$("field5"),
Expressions.lit("%abc")));
actual = FlinkFilters.convert(expr);
- Assert.assertFalse("Conversion should failed", actual.isPresent());
+ assertThat(actual).isNotPresent();
expr =
resolve(
@@ -352,7 +352,7 @@ public class TestFlinkFilters {
Expressions.$("field5"),
Expressions.lit("%abc%")));
actual = FlinkFilters.convert(expr);
- Assert.assertFalse("Conversion should failed", actual.isPresent());
+ assertThat(actual).isNotPresent();
expr =
resolve(
@@ -361,49 +361,49 @@ public class TestFlinkFilters {
Expressions.$("field5"),
Expressions.lit("abc%d")));
actual = FlinkFilters.convert(expr);
- Assert.assertFalse("Conversion should failed", actual.isPresent());
+ assertThat(actual).isNotPresent();
expr =
resolve(
ApiExpressionUtils.unresolvedCall(
BuiltInFunctionDefinitions.LIKE, Expressions.$("field5"),
Expressions.lit("%")));
actual = FlinkFilters.convert(expr);
- Assert.assertFalse("Conversion should failed", actual.isPresent());
+ assertThat(actual).isNotPresent();
expr =
resolve(
ApiExpressionUtils.unresolvedCall(
BuiltInFunctionDefinitions.LIKE, Expressions.$("field5"),
Expressions.lit("a_")));
actual = FlinkFilters.convert(expr);
- Assert.assertFalse("Conversion should failed", actual.isPresent());
+ assertThat(actual).isNotPresent();
expr =
resolve(
ApiExpressionUtils.unresolvedCall(
BuiltInFunctionDefinitions.LIKE, Expressions.$("field5"),
Expressions.lit("a%b")));
actual = FlinkFilters.convert(expr);
- Assert.assertFalse("Conversion should failed", actual.isPresent());
+ assertThat(actual).isNotPresent();
}
@SuppressWarnings("unchecked")
private <T> void matchLiteral(String fieldName, Object flinkLiteral, T
icebergLiteral) {
Expression expr =
resolve(Expressions.$(fieldName).isEqual(Expressions.lit(flinkLiteral)));
Optional<org.apache.iceberg.expressions.Expression> actual =
FlinkFilters.convert(expr);
- Assert.assertTrue("Conversion should succeed", actual.isPresent());
+ assertThat(actual).isPresent();
org.apache.iceberg.expressions.Expression expression = actual.get();
- Assertions.assertThat(expression)
+ assertThat(expression)
.as("The expression should be a UnboundPredicate")
.isInstanceOf(UnboundPredicate.class);
UnboundPredicate<T> unboundPredicate = (UnboundPredicate<T>) expression;
org.apache.iceberg.expressions.Expression expression1 =
unboundPredicate.bind(FlinkSchemaUtil.convert(TABLE_SCHEMA).asStruct(), false);
- Assertions.assertThat(expression1)
+ assertThat(expression1)
.as("The expression should be a BoundLiteralPredicate")
.isInstanceOf(BoundLiteralPredicate.class);
BoundLiteralPredicate<T> predicate = (BoundLiteralPredicate<T>)
expression1;
- Assert.assertTrue("Should match the literal",
predicate.test(icebergLiteral));
+ assertThat(predicate.test(icebergLiteral)).isTrue();
}
private static Expression resolve(Expression originalExpression) {
@@ -447,21 +447,16 @@ public class TestFlinkFilters {
private void assertPredicatesMatch(
org.apache.iceberg.expressions.Expression expected,
org.apache.iceberg.expressions.Expression actual) {
- Assertions.assertThat(expected)
+ assertThat(expected)
.as("The expected expression should be a UnboundPredicate")
.isInstanceOf(UnboundPredicate.class);
- Assertions.assertThat(actual)
+ assertThat(actual)
.as("The actual expression should be a UnboundPredicate")
.isInstanceOf(UnboundPredicate.class);
UnboundPredicate<?> predicateExpected = (UnboundPredicate<?>) expected;
UnboundPredicate<?> predicateActual = (UnboundPredicate<?>) actual;
- Assert.assertEquals(
- "Predicate operation should match", predicateExpected.op(),
predicateActual.op());
- Assert.assertEquals(
- "Predicate literal should match", predicateExpected.literal(),
predicateActual.literal());
- Assert.assertEquals(
- "Predicate name should match",
- predicateExpected.ref().name(),
- predicateActual.ref().name());
+ assertThat(predicateActual.op()).isEqualTo(predicateExpected.op());
+
assertThat(predicateActual.literal()).isEqualTo(predicateExpected.literal());
+
assertThat(predicateActual.ref().name()).isEqualTo(predicateExpected.ref().name());
}
}
diff --git
a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkSchemaUtil.java
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkSchemaUtil.java
index 4ac32c08eb..eab60d886a 100644
---
a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkSchemaUtil.java
+++
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkSchemaUtil.java
@@ -18,6 +18,9 @@
*/
package org.apache.iceberg.flink;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.ValidationException;
@@ -31,14 +34,11 @@ import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.logical.VarBinaryType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.iceberg.Schema;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
-import org.assertj.core.api.Assertions;
-import org.junit.Assert;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
public class TestFlinkSchemaUtil {
@@ -313,12 +313,13 @@ public class TestFlinkSchemaUtil {
}
private void checkSchema(TableSchema flinkSchema, Schema icebergSchema) {
- Assert.assertEquals(icebergSchema.asStruct(),
FlinkSchemaUtil.convert(flinkSchema).asStruct());
+
assertThat(FlinkSchemaUtil.convert(flinkSchema).asStruct()).isEqualTo(icebergSchema.asStruct());
// The conversion is not a 1:1 mapping, so we just check iceberg types.
- Assert.assertEquals(
- icebergSchema.asStruct(),
-
FlinkSchemaUtil.convert(FlinkSchemaUtil.toSchema(FlinkSchemaUtil.convert(icebergSchema)))
- .asStruct());
+ assertThat(
+ FlinkSchemaUtil.convert(
+
FlinkSchemaUtil.toSchema(FlinkSchemaUtil.convert(icebergSchema)))
+ .asStruct())
+ .isEqualTo(icebergSchema.asStruct());
}
@Test
@@ -354,10 +355,9 @@ public class TestFlinkSchemaUtil {
LogicalType flinkExpectedType,
LogicalType flinkType,
Type icebergExpectedType) {
- Assert.assertEquals(flinkExpectedType,
FlinkSchemaUtil.convert(icebergType));
- Assert.assertEquals(
- Types.StructType.of(Types.NestedField.optional(0, "f0",
icebergExpectedType)),
-
FlinkSchemaUtil.convert(FlinkSchemaUtil.toSchema(RowType.of(flinkType))).asStruct());
+
assertThat(FlinkSchemaUtil.convert(icebergType)).isEqualTo(flinkExpectedType);
+
assertThat(FlinkSchemaUtil.convert(FlinkSchemaUtil.toSchema(RowType.of(flinkType))).asStruct())
+ .isEqualTo(Types.StructType.of(Types.NestedField.optional(0, "f0",
icebergExpectedType)));
}
@Test
@@ -376,8 +376,8 @@ public class TestFlinkSchemaUtil {
.primaryKey("int")
.build();
Schema convertedSchema = FlinkSchemaUtil.convert(baseSchema, flinkSchema);
- Assert.assertEquals(baseSchema.asStruct(), convertedSchema.asStruct());
- Assert.assertEquals(ImmutableSet.of(101),
convertedSchema.identifierFieldIds());
+ assertThat(convertedSchema.asStruct()).isEqualTo(baseSchema.asStruct());
+ assertThat(convertedSchema.identifierFieldIds()).containsExactly(101);
}
@Test
@@ -390,10 +390,10 @@ public class TestFlinkSchemaUtil {
Sets.newHashSet(1, 2));
TableSchema tableSchema = FlinkSchemaUtil.toSchema(icebergSchema);
- Assert.assertTrue(tableSchema.getPrimaryKey().isPresent());
- Assert.assertEquals(
- ImmutableSet.of("int", "string"),
- ImmutableSet.copyOf(tableSchema.getPrimaryKey().get().getColumns()));
+ assertThat(tableSchema.getPrimaryKey())
+ .isPresent()
+ .get()
+ .satisfies(k -> assertThat(k.getColumns()).containsExactly("int",
"string"));
}
@Test
@@ -408,7 +408,7 @@ public class TestFlinkSchemaUtil {
Types.NestedField.required(2, "inner",
Types.IntegerType.get())))),
Sets.newHashSet(2));
- Assertions.assertThatThrownBy(() ->
FlinkSchemaUtil.toSchema(icebergSchema))
+ assertThatThrownBy(() -> FlinkSchemaUtil.toSchema(icebergSchema))
.isInstanceOf(ValidationException.class)
.hasMessageStartingWith("Could not create a PRIMARY KEY")
.hasMessageContaining("Column 'struct.inner' does not exist.");
diff --git
a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestManifestFileSerialization.java
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestManifestFileSerialization.java
index 6bd94e9ca6..8f1f129e18 100644
---
a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestManifestFileSerialization.java
+++
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestManifestFileSerialization.java
@@ -20,6 +20,7 @@ package org.apache.iceberg.flink;
import static org.apache.iceberg.types.Types.NestedField.optional;
import static org.apache.iceberg.types.Types.NestedField.required;
+import static org.assertj.core.api.Assertions.assertThat;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
@@ -29,6 +30,7 @@ import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
+import java.nio.file.Path;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
import org.apache.flink.core.memory.DataInputDeserializer;
@@ -48,11 +50,8 @@ import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.types.Types;
-import org.assertj.core.api.Assertions;
-import org.junit.Assert;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
public class TestManifestFileSerialization {
@@ -104,7 +103,7 @@ public class TestManifestFileSerialization {
private static final FileIO FILE_IO = new HadoopFileIO(new Configuration());
- @Rule public TemporaryFolder temp = new TemporaryFolder();
+ @TempDir private Path temp;
@Test
public void testKryoSerialization() throws IOException {
@@ -145,15 +144,15 @@ public class TestManifestFileSerialization {
new ObjectInputStream(new ByteArrayInputStream(bytes.toByteArray()))) {
for (int i = 0; i < 3; i += 1) {
Object obj = in.readObject();
- Assertions.assertThat(obj).as("Should be a
ManifestFile").isInstanceOf(ManifestFile.class);
+ assertThat(obj).as("Should be a
ManifestFile").isInstanceOf(ManifestFile.class);
TestHelpers.assertEquals(manifest, (ManifestFile) obj);
}
}
}
private ManifestFile writeManifest(DataFile... files) throws IOException {
- File manifestFile = temp.newFile("input.m0.avro");
- Assert.assertTrue(manifestFile.delete());
+ File manifestFile = File.createTempFile("input", "m0.avro", temp.toFile());
+ assertThat(manifestFile.delete()).isTrue();
OutputFile outputFile =
FILE_IO.newOutputFile(manifestFile.getCanonicalPath());
ManifestWriter<DataFile> writer = ManifestFiles.write(SPEC, outputFile);
diff --git
a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestRowDataWrapper.java
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestRowDataWrapper.java
index c78fa51215..caefbb5a54 100644
---
a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestRowDataWrapper.java
+++
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestRowDataWrapper.java
@@ -18,6 +18,8 @@
*/
package org.apache.iceberg.flink;
+import static org.assertj.core.api.Assertions.assertThat;
+
import java.util.Iterator;
import org.apache.flink.table.data.RowData;
import org.apache.iceberg.RecordWrapperTest;
@@ -28,8 +30,6 @@ import org.apache.iceberg.data.RandomGenericData;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.flink.data.RandomRowData;
import org.apache.iceberg.util.StructLikeWrapper;
-import org.assertj.core.api.Assertions;
-import org.junit.Assert;
public class TestRowDataWrapper extends RecordWrapperTest {
@@ -49,12 +49,12 @@ public class TestRowDataWrapper extends RecordWrapperTest {
return;
}
- Assertions.assertThat(actual).isNotNull();
- Assertions.assertThat(expected).isNotNull();
+ assertThat(actual).isNotNull();
+ assertThat(expected).isNotNull();
int expectedMilliseconds = (int) ((long) expected / 1000_000);
int actualMilliseconds = (int) ((long) actual / 1000_000);
- Assert.assertEquals(message, expectedMilliseconds,
actualMilliseconds);
+
assertThat(actualMilliseconds).as(message).isEqualTo(expectedMilliseconds);
}
});
}
@@ -75,8 +75,8 @@ public class TestRowDataWrapper extends RecordWrapperTest {
StructLikeWrapper actualWrapper =
StructLikeWrapper.forType(schema.asStruct());
StructLikeWrapper expectedWrapper =
StructLikeWrapper.forType(schema.asStruct());
for (int i = 0; i < numRecords; i++) {
- Assert.assertTrue("Should have more records", actual.hasNext());
- Assert.assertTrue("Should have more RowData", expected.hasNext());
+ assertThat(actual).hasNext();
+ assertThat(expected).hasNext();
StructLike recordStructLike = recordWrapper.wrap(actual.next());
StructLike rowDataStructLike = rowDataWrapper.wrap(expected.next());
@@ -87,7 +87,7 @@ public class TestRowDataWrapper extends RecordWrapperTest {
expectedWrapper.set(rowDataStructLike));
}
- Assert.assertFalse("Shouldn't have more record", actual.hasNext());
- Assert.assertFalse("Shouldn't have more RowData", expected.hasNext());
+ assertThat(actual).isExhausted();
+ assertThat(expected).isExhausted();
}
}
diff --git
a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestTableSerialization.java
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestTableSerialization.java
index 27124d93fe..7f0e7acaa8 100644
---
a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestTableSerialization.java
+++
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestTableSerialization.java
@@ -21,9 +21,11 @@ package org.apache.iceberg.flink;
import static org.apache.iceberg.flink.TestHelpers.roundTripKryoSerialize;
import static org.apache.iceberg.types.Types.NestedField.optional;
import static org.apache.iceberg.types.Types.NestedField.required;
+import static org.assertj.core.api.Assertions.assertThat;
import java.io.File;
import java.io.IOException;
+import java.nio.file.Path;
import java.util.Map;
import org.apache.iceberg.HasTableOperations;
import org.apache.iceberg.MetadataTableType;
@@ -39,11 +41,9 @@ import org.apache.iceberg.Transaction;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.types.Types;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
public class TestTableSerialization {
private static final HadoopTables TABLES = new HadoopTables();
@@ -60,15 +60,15 @@ public class TestTableSerialization {
private static final SortOrder SORT_ORDER =
SortOrder.builderFor(SCHEMA).asc("id").build();
- @Rule public TemporaryFolder temp = new TemporaryFolder();
+ @TempDir private Path temp;
private Table table;
- @Before
+ @BeforeEach
public void initTable() throws IOException {
Map<String, String> props = ImmutableMap.of("k1", "v1", "k2", "v2");
- File tableLocation = temp.newFolder();
- Assert.assertTrue(tableLocation.delete());
+ File tableLocation = File.createTempFile("junit", null, temp.toFile());
+ assertThat(tableLocation.delete()).isTrue();
this.table = TABLES.create(SCHEMA, SPEC, SORT_ORDER, props,
tableLocation.toString());
}
diff --git
a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/TestAvroGenericRecordToRowDataMapper.java
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/TestAvroGenericRecordToRowDataMapper.java
index 6a493692c2..44eb907a17 100644
---
a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/TestAvroGenericRecordToRowDataMapper.java
+++
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/TestAvroGenericRecordToRowDataMapper.java
@@ -18,10 +18,11 @@
*/
package org.apache.iceberg.flink.sink;
+import static org.assertj.core.api.Assertions.assertThat;
+
import org.apache.flink.table.data.RowData;
import org.apache.iceberg.flink.AvroGenericRecordConverterBase;
import org.apache.iceberg.flink.DataGenerator;
-import org.junit.Assert;
public class TestAvroGenericRecordToRowDataMapper extends
AvroGenericRecordConverterBase {
@Override
@@ -32,6 +33,6 @@ public class TestAvroGenericRecordToRowDataMapper extends
AvroGenericRecordConve
AvroGenericRecordToRowDataMapper.forAvroSchema(dataGenerator.avroSchema());
RowData expected = dataGenerator.generateFlinkRowData();
RowData actual = mapper.map(dataGenerator.generateAvroGenericRecord());
- Assert.assertEquals(expected, actual);
+ assertThat(actual).isEqualTo(expected);
}
}
diff --git
a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestRowDataToAvroGenericRecordConverter.java
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestRowDataToAvroGenericRecordConverter.java
index 485035787d..6ef4069382 100644
---
a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestRowDataToAvroGenericRecordConverter.java
+++
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestRowDataToAvroGenericRecordConverter.java
@@ -18,10 +18,11 @@
*/
package org.apache.iceberg.flink.source;
+import static org.assertj.core.api.Assertions.assertThat;
+
import org.apache.avro.generic.GenericRecord;
import org.apache.iceberg.flink.AvroGenericRecordConverterBase;
import org.apache.iceberg.flink.DataGenerator;
-import org.junit.Assert;
public class TestRowDataToAvroGenericRecordConverter extends
AvroGenericRecordConverterBase {
@Override
@@ -30,6 +31,6 @@ public class TestRowDataToAvroGenericRecordConverter extends
AvroGenericRecordCo
RowDataToAvroGenericRecordConverter.fromAvroSchema(dataGenerator.avroSchema());
GenericRecord expected = dataGenerator.generateAvroGenericRecord();
GenericRecord actual =
converter.apply(dataGenerator.generateFlinkRowData());
- Assert.assertEquals(expected, actual);
+ assertThat(actual).isEqualTo(expected);
}
}
diff --git
a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/util/TestFlinkPackage.java
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/util/TestFlinkPackage.java
index eda340deda..079c70bae0 100644
---
a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/util/TestFlinkPackage.java
+++
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/util/TestFlinkPackage.java
@@ -18,8 +18,9 @@
*/
package org.apache.iceberg.flink.util;
-import org.junit.Assert;
-import org.junit.Test;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.junit.jupiter.api.Test;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
@@ -28,7 +29,7 @@ public class TestFlinkPackage {
/** This unit test would need to be adjusted as new Flink version is
supported. */
@Test
public void testVersion() {
- Assert.assertEquals("1.17.2", FlinkPackage.version());
+ assertThat(FlinkPackage.version()).isEqualTo("1.17.2");
}
@Test
@@ -41,14 +42,14 @@ public class TestFlinkPackage {
try (MockedStatic<FlinkPackage> mockedStatic =
Mockito.mockStatic(FlinkPackage.class)) {
mockedStatic.when(FlinkPackage::versionFromJar).thenThrow(RuntimeException.class);
mockedStatic.when(FlinkPackage::version).thenCallRealMethod();
- Assert.assertEquals(FlinkPackage.FLINK_UNKNOWN_VERSION,
FlinkPackage.version());
+
assertThat(FlinkPackage.version()).isEqualTo(FlinkPackage.FLINK_UNKNOWN_VERSION);
}
FlinkPackage.setVersion(null);
try (MockedStatic<FlinkPackage> mockedStatic =
Mockito.mockStatic(FlinkPackage.class)) {
mockedStatic.when(FlinkPackage::versionFromJar).thenReturn(null);
mockedStatic.when(FlinkPackage::version).thenCallRealMethod();
FlinkPackage.setVersion(null);
- Assert.assertEquals(FlinkPackage.FLINK_UNKNOWN_VERSION,
FlinkPackage.version());
+
assertThat(FlinkPackage.version()).isEqualTo(FlinkPackage.FLINK_UNKNOWN_VERSION);
}
}
}