zentol commented on a change in pull request #19039:
URL: https://github.com/apache/flink/pull/19039#discussion_r823748910



##########
File path: 
flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/factories/DataGenTableSourceFactoryTest.java
##########
@@ -169,23 +169,23 @@ public void testSource() throws Exception {
         List<RowData> results = runGenerator(SCHEMA, descriptor);
         final long end = System.currentTimeMillis();
 
-        Assert.assertEquals(11, results.size());
+        assertThat(results.size()).isEqualTo(11);
         for (int i = 0; i < results.size(); i++) {
             RowData row = results.get(i);
-            Assert.assertEquals(20, row.getString(0).toString().length());
+            assertThat(row.getString(0).toString().length()).isEqualTo(20);
             long f1 = row.getLong(1);
-            Assert.assertTrue(f1 >= 10 && f1 <= 100);
-            Assert.assertEquals(i + 50, row.getLong(2));
+            assertThat(f1 >= 10 && f1 <= 100).isTrue();

Review comment:
       this could be improved

##########
File path: 
flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/TableDescriptorTest.java
##########
@@ -56,22 +54,22 @@ public void testBasic() {
                         .comment("Test Comment")
                         .build();
 
-        assertTrue(descriptor.getSchema().isPresent());
-        assertEquals(schema, descriptor.getSchema().get());
+        assertThat(descriptor.getSchema().isPresent()).isTrue();

Review comment:
       ```suggestion
           assertThat(descriptor.getSchema()).isPresent();
   ```
   There are also other instances

##########
File path: 
flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/factories/DataGenTableSourceFactoryTest.java
##########
@@ -169,23 +169,23 @@ public void testSource() throws Exception {
         List<RowData> results = runGenerator(SCHEMA, descriptor);
         final long end = System.currentTimeMillis();
 
-        Assert.assertEquals(11, results.size());
+        assertThat(results.size()).isEqualTo(11);
         for (int i = 0; i < results.size(); i++) {
             RowData row = results.get(i);
-            Assert.assertEquals(20, row.getString(0).toString().length());
+            assertThat(row.getString(0).toString().length()).isEqualTo(20);
             long f1 = row.getLong(1);
-            Assert.assertTrue(f1 >= 10 && f1 <= 100);
-            Assert.assertEquals(i + 50, row.getLong(2));
+            assertThat(f1 >= 10 && f1 <= 100).isTrue();
+            assertThat(row.getLong(2)).isEqualTo(i + 50);
             final TimestampData f3 = row.getTimestamp(3, 3);
-            Assert.assertTrue(f3.getMillisecond() >= begin - 5000 && 
f3.getMillisecond() <= end);
+            assertThat(f3.getMillisecond() >= begin - 5000 && 
f3.getMillisecond() <= end).isTrue();

Review comment:
       same here

##########
File path: 
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sort/SortUtilTest.java
##########
@@ -52,8 +53,8 @@ public void testNormalizedKey() {
                 byte[] rndBytes = new byte[20];
                 random.nextBytes(rndBytes);
                 segments[2].put(0, rndBytes);
-                Assert.assertTrue(segments[0].compare(segments[2], 0, 0, 20) 
<= 0);
-                Assert.assertTrue(segments[1].compare(segments[2], 0, 0, 20) 
>= 0);
+                assertThat(segments[0].compare(segments[2], 0, 0, 20) <= 
0).isTrue();
+                assertThat(segments[1].compare(segments[2], 0, 0, 20) >= 
0).isTrue();

Review comment:
       isNegative/IsPositive?

##########
File path: 
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/util/collections/binary/BytesHashMapTestBase.java
##########
@@ -258,10 +259,10 @@ public void testResetAndOutput() throws Exception {
             actualKeys.add(keySerializer.copy(iter.getKey()));
             actualValues.add(iter.getValue().copy());
         }
-        Assert.assertEquals(NUM_ENTRIES, expected.size());
-        Assert.assertEquals(NUM_ENTRIES, actualKeys.size());
-        Assert.assertEquals(NUM_ENTRIES, actualValues.size());
-        Assert.assertEquals(expected, actualValues);
+        assertThat(expected.size()).isEqualTo(NUM_ENTRIES);
+        assertThat(actualKeys.size()).isEqualTo(NUM_ENTRIES);

Review comment:
       hasSize

##########
File path: 
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerTest.java
##########
@@ -105,12 +105,12 @@ public void test() throws Exception {
     }
 
     private void shouldEmitNothing(OneInputStreamOperatorTestHarness<RowData, 
RowData> harness) {
-        assertEquals(Collections.emptyList(), getEmittedRows(harness));
+        assertThat(getEmittedRows(harness)).isEqualTo(Collections.emptyList());

Review comment:
       ```suggestion
           assertThat(getEmittedRows(harness)).isEmpty();
   ```

##########
File path: 
flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/TableDescriptorTest.java
##########
@@ -99,9 +97,9 @@ public void testFormatBasic() {
                         .format("json")
                         .build();
 
-        assertEquals(2, descriptor.getOptions().size());
-        assertEquals("test-connector", 
descriptor.getOptions().get("connector"));
-        assertEquals("json", descriptor.getOptions().get("format"));
+        assertThat(descriptor.getOptions().size()).isEqualTo(2);
+        
assertThat(descriptor.getOptions().get("connector")).isEqualTo("test-connector");

Review comment:
       This can be expressed in a nicer way by matching the map as a whole

##########
File path: 
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/RandomSortMergeInnerJoinTest.java
##########
@@ -125,9 +127,9 @@ public void test() throws Exception {
 
         // assert that each expected match was seen
         for (Map.Entry<Integer, Collection<Match>> entry : 
expectedMatchesMap.entrySet()) {
-            Assert.assertTrue(
-                    "Collection for key " + entry.getKey() + " is not empty",
-                    entry.getValue().isEmpty());
+            assertThat(entry.getValue().isEmpty())
+                    .as("Collection for key " + entry.getKey() + " is not 
empty")
+                    .isTrue();

Review comment:
       ```suggestion
               assertThat(entry.getValue())
                       .as("Collection for key " + entry.getKey() + " is not 
empty")
                       .isEmpty();
   ```
   

##########
File path: 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/catalog/CatalogITCase.java
##########
@@ -62,8 +60,8 @@ public void testCreateCatalog() {
 
         tableEnv.executeSql(ddl);
 
-        assertTrue(tableEnv.getCatalog(name).isPresent());
-        assertTrue(tableEnv.getCatalog(name).get() instanceof 
GenericInMemoryCatalog);
+        assertThat(tableEnv.getCatalog(name).isPresent()).isTrue();
+        
assertThat(tableEnv.getCatalog(name).get()).isInstanceOf(GenericInMemoryCatalog.class);

Review comment:
       ```suggestion
           
assertThat(tableEnv.getCatalog(name)).containsInstanceOf(GenericInMemoryCatalog.class);
   ```

##########
File path: 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesRuntimeFunctions.java
##########
@@ -326,7 +327,7 @@ public void invoke(RowData value, Context context) throws 
Exception {
             RowKind kind = value.getRowKind();
             if (value.getRowKind() == RowKind.INSERT) {
                 Row row = (Row) converter.toExternal(value);
-                assert row != null;
+                assertThat(row != null).isTrue();

Review comment:
       ```suggestion
                   assertThat(row).isNotNull();
   ```

##########
File path: 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/codegen/calls/BuiltInMethodsTest.java
##########
@@ -43,12 +43,9 @@
     @ParameterizedTest
     @MethodSource
     void testMethodsAreAvailable(Method m) throws Exception {
-        assertDoesNotThrow(
-                () ->
-                        // Note that this null is because the method is 
static, hence no instance
-                        // should be supplied when calling the reflection
-                        m.invoke(null),
-                "Method " + m.getName() + " throws an exception, perhaps a bad 
definition?");
-        assertNotNull(m.invoke(null));
+        assertThatThrownBy(() -> m.invoke(null))
+                .as("Method " + m.getName() + " throws an exception, perhaps a 
bad definition?")
+                .isNull();

Review comment:
       this just seems...unnecessary? It would've failed anyway below.

##########
File path: 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
##########
@@ -1692,7 +1692,7 @@ public void testReset() {
     // ~ Tool Methods 
----------------------------------------------------------
 
     private static TestItem createTestItem(Object... args) {
-        assert args.length == 2;
+        assertThat(args.length).isEqualTo(2);

Review comment:
       ```suggestion
           assertThat(args).hasSize(2);
   ```

##########
File path: 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DataStreamJavaITCase.java
##########
@@ -864,28 +861,28 @@ private void createTableFromElements(
     }
 
     private static void testSchema(Table table, Column... expectedColumns) {
-        assertEquals(ResolvedSchema.of(expectedColumns), 
table.getResolvedSchema());
+        
assertThat(table.getResolvedSchema()).isEqualTo(ResolvedSchema.of(expectedColumns));
     }
 
     private static void testSchema(Table table, ResolvedSchema expectedSchema) 
{
-        assertEquals(expectedSchema, table.getResolvedSchema());
+        assertThat(table.getResolvedSchema()).isEqualTo(expectedSchema);
     }
 
     private static void testSchema(TableResult result, Column... 
expectedColumns) {
-        assertEquals(ResolvedSchema.of(expectedColumns), 
result.getResolvedSchema());
+        
assertThat(result.getResolvedSchema()).isEqualTo(ResolvedSchema.of(expectedColumns));
     }
 
     private static void testResult(TableResult result, Row... expectedRows) {
         final List<Row> actualRows = 
CollectionUtil.iteratorToList(result.collect());
-        assertThat(actualRows, containsInAnyOrder(expectedRows));
+        
assertThat(actualRows).satisfies(matching(containsInAnyOrder(expectedRows)));
     }
 
     @SafeVarargs
     private static <T> void testResult(DataStream<T> dataStream, T... 
expectedResult)
             throws Exception {
         try (CloseableIterator<T> iterator = dataStream.executeAndCollect()) {
             final List<T> list = CollectionUtil.iteratorToList(iterator);
-            assertThat(list, containsInAnyOrder(expectedResult));
+            
assertThat(list).satisfies(matching(containsInAnyOrder(expectedResult)));

Review comment:
       this could be directly migrated to assertj; containsExactlyInAnyOrder?

##########
File path: 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/CompactionITCaseBase.java
##########
@@ -151,11 +149,11 @@ private void assertFiles(File[] files, boolean 
containSuccess) {
             if (containSuccess && file.getName().equals("_SUCCESS")) {
                 successFile = file;
             } else {
-                assertTrue(file.getName(), 
file.getName().startsWith(COMPACTED_PREFIX));
+                
assertThat(file.getName().startsWith(COMPACTED_PREFIX)).as(file.getName()).isTrue();

Review comment:
       ```suggestion
                   assertThat(file.getName()).startsWith(COMPACTED_PREFIX);
   ```

##########
File path: 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DataStreamJavaITCase.java
##########
@@ -900,7 +897,7 @@ private static void testMaterializedResult(
                         switch (kind) {
                             case UPDATE_AFTER:
                                 final Object primaryKeyValue = 
row.getField(primaryKeyPos);
-                                assert primaryKeyValue != null;
+                                assertThat(primaryKeyValue != null).isTrue();

Review comment:
       ```suggestion
                                   assertThat(primaryKeyValue).isNotNull();
   ```

##########
File path: 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/processor/utils/InputPriorityGraphGeneratorTest.java
##########
@@ -65,9 +66,9 @@ public void testCalculatePipelinedAncestors() {
                         Collections.emptySet(),
                         InputProperty.DamBehavior.END_INPUT);
         List<ExecNode<?>> ancestors = 
resolver.calculatePipelinedAncestors(nodes[2]);
-        Assert.assertEquals(2, ancestors.size());
-        Assert.assertTrue(ancestors.contains(nodes[0]));
-        Assert.assertTrue(ancestors.contains(nodes[5]));
+        assertThat(ancestors.size()).isEqualTo(2);
+        assertThat(ancestors.contains(nodes[0])).isTrue();

Review comment:
       ```suggestion
           assertThat(ancestors).contains(nodes[0]);
   ```

##########
File path: 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase.java
##########
@@ -127,7 +124,7 @@ public void testCreateFunctionCatalogNotExists() {
         try {
             tEnv().executeSql(ddl1);
         } catch (Exception e) {
-            assertEquals("Catalog catalog1 does not exist", e.getMessage());
+            assertThat(e.getMessage()).isEqualTo("Catalog catalog1 does not 
exist");

Review comment:
       assertThatThrownBy

##########
File path: 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/table/ValuesITCase.java
##########
@@ -183,7 +182,7 @@ public void testTypeConversions() throws Exception {
                                         })));
 
         List<Row> actual = TestCollectionTableFactory.getResult();
-        assertThat(new HashSet<>(actual), equalTo(new HashSet<>(expected)));
+        assertThat(new HashSet<>(actual)).isEqualTo(new HashSet<>(expected));

Review comment:
       this looks like a workaround to ignore the order?

##########
File path: 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase.java
##########
@@ -244,27 +241,25 @@ public void testAlterFunctionNonExists() {
 
         try {
             tEnv().executeSql(alterUndefinedFunction);
-            fail();
+            fail("unknown failure");

Review comment:
       assertThatThrownBy would remove the need for this

##########
File path: 
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/wmassigners/WatermarkAssignerOperatorTest.java
##########
@@ -148,14 +144,14 @@ public void testWatermarkAssignerOperator() throws 
Exception {
             while (lastWatermark < 7) {
                 if (output.size() > 0) {
                     Object next = output.poll();
-                    assertNotNull(next);
+                    assertThat(next).isNotNull();
                     Tuple2<Long, Long> update =
                             validateElement(next, nextElementValue, 
lastWatermark);
                     nextElementValue = update.f0;
                     lastWatermark = update.f1;
 
                     // check the invariant
-                    assertTrue(lastWatermark < nextElementValue);
+                    assertThat(lastWatermark < nextElementValue).isTrue();

Review comment:
       isLessThan

##########
File path: 
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/data/DataStructureConvertersTest.java
##########
@@ -396,9 +396,8 @@ public void testConversions() {
                         
simulateSerialization(DataStructureConverters.getConverter(toDataType));
                 
toConverter.open(DataStructureConvertersTest.class.getClassLoader());
 
-                assertArrayEquals(
-                        new Object[] {to.getValue()},
-                        new Object[] 
{toConverter.toExternalOrNull(internalValue)});
+                assertThat(new Object[] 
{toConverter.toExternalOrNull(internalValue)})
+                        .isEqualTo(new Object[] {to.getValue()});

Review comment:
       ```suggestion
                   assertThat(toConverter.toExternalOrNull(internalValue)}
                           .isEqualTo(to.getValue());
   ```

##########
File path: 
flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/factories/DataGenTableSourceFactoryTest.java
##########
@@ -259,15 +259,16 @@ public void testLackStartForSequence() {
                     descriptor.asMap());
         } catch (ValidationException e) {
             Throwable cause = e.getCause();
-            Assert.assertTrue(cause.toString(), cause instanceof 
ValidationException);
-            Assert.assertTrue(
-                    cause.getMessage(),
-                    cause.getMessage()
-                            .contains(
-                                    "Could not find required property 
'fields.f0.start' for sequence generator."));
+            
assertThat(cause).as(cause.toString()).isInstanceOf(ValidationException.class);
+            assertThat(
+                            cause.getMessage()
+                                    .contains(
+                                            "Could not find required property 
'fields.f0.start' for sequence generator."))
+                    .as(cause.getMessage())
+                    .isTrue();

Review comment:
       this whole blocks seems a bit weird and should probably use 
assertThatThrownBy()

##########
File path: 
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sort/BufferedKVExternalSorterTest.java
##########
@@ -131,11 +132,11 @@ public void test() throws Exception {
                 new Tuple2<>(keySerializer.createInstance(), 
valueSerializer.createInstance());
         int count = 0;
         while ((kv = iterator.next(kv)) != null) {
-            Assert.assertEquals((int) expected.get(count), kv.f0.getInt(0));
-            Assert.assertEquals(expected.get(count) * -3 + 177, 
kv.f1.getInt(0));
+            assertThat(kv.f0.getInt(0)).isEqualTo((int) expected.get(count));
+            assertThat(kv.f1.getInt(0)).isEqualTo(expected.get(count) * -3 + 
177);

Review comment:
       good god what are these magic numbers 😕 

##########
File path: 
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/RowTimeMiniBatchLatestChangeDeduplicateFunctionTest.java
##########
@@ -100,7 +100,7 @@ public void 
testKeepLastRowWithGenerateUpdateBeforeAndWithGenerateInsert() throw
         testHarness.processElement(insertRecord("book", 11, 2L));
         testHarness.processElement(insertRecord("book", 13, 1L));
         // output is empty because bundle not trigger yet.
-        Assert.assertTrue(testHarness.getOutput().isEmpty());
+        assertThat(testHarness.getOutput().isEmpty()).isTrue();

Review comment:
       ```suggestion
           assertThat(testHarness.getOutput()).isEmpty();
   ```

##########
File path: 
flink-table/flink-table-common/src/test/java/org/apache/flink/table/connector/sink/TestManagedSinkCommitter.java
##########
@@ -74,7 +76,7 @@ public TestManagedCommittable 
combine(List<TestManagedCommittable> committables)
         for (final TestManagedCommittable combinedCommittable : committables) {
             AtomicReference<Map<CatalogPartitionSpec, List<Path>>> reference =
                     
TestManagedTableFactory.MANAGED_TABLE_FILE_ENTRIES.get(tableIdentifier);
-            assert reference != null;
+            assertThat(reference != null).isTrue();

Review comment:
       ```suggestion
               assertThat(reference).isNotNull();
   ```

##########
File path: 
flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/TableDescriptorTest.java
##########
@@ -56,22 +54,22 @@ public void testBasic() {
                         .comment("Test Comment")
                         .build();
 
-        assertTrue(descriptor.getSchema().isPresent());
-        assertEquals(schema, descriptor.getSchema().get());
+        assertThat(descriptor.getSchema().isPresent()).isTrue();
+        assertThat(descriptor.getSchema().get()).isEqualTo(schema);
 
-        assertEquals(1, descriptor.getPartitionKeys().size());
-        assertEquals("f0", descriptor.getPartitionKeys().get(0));
+        assertThat(descriptor.getPartitionKeys().size()).isEqualTo(1);

Review comment:
       ```suggestion
           assertThat(descriptor.getPartitionKeys).hasSize(1);
   ```

##########
File path: 
flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogTableImpTest.java
##########
@@ -69,8 +69,8 @@ public void testNullComment() {
         Map<String, String> prop = createProperties();
         CatalogTable table = new CatalogTableImpl(schema, 
createPartitionKeys(), prop, null);
 
-        assertEquals("", table.getComment());
-        assertEquals(Optional.of(""), table.getDescription());
+        assertThat(table.getComment()).isEqualTo("");
+        assertThat(table.getDescription()).isEqualTo(Optional.of(""));

Review comment:
       this looks a bit odd.

##########
File path: 
flink-table/flink-table-common/src/test/java/org/apache/flink/table/api/TableSchemaTest.java
##########
@@ -212,13 +206,14 @@ public void testWatermarkOnDifferentFields() {
                     builder.watermark(t.f0, WATERMARK_EXPRESSION, 
WATERMARK_DATATYPE);
                     if (t.f2.equals("PASS")) {
                         TableSchema schema = builder.build();
-                        assertEquals(1, schema.getWatermarkSpecs().size());
-                        assertEquals(t.f0, 
schema.getWatermarkSpecs().get(0).getRowtimeAttribute());
+                        
assertThat(schema.getWatermarkSpecs().size()).isEqualTo(1);
+                        
assertThat(schema.getWatermarkSpecs().get(0).getRowtimeAttribute())
+                                .isEqualTo(t.f0);
                     } else {
                         try {
                             builder.build();
                         } catch (Exception e) {
-                            assertTrue(e.getMessage().contains(t.f2));
+                            assertThat(e.getMessage().contains(t.f2)).isTrue();

Review comment:
       ```suggestion
                               assertThat(e.getMessage()).contains(t.f2);
   ```

##########
File path: 
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/wmassigners/WatermarkAssignerOperatorTestBase.java
##########
@@ -40,11 +39,11 @@
         if (element instanceof StreamRecord) {
             @SuppressWarnings("unchecked")
             StreamRecord<RowData> record = (StreamRecord<RowData>) element;
-            assertEquals(nextElementValue, record.getValue().getLong(0));
+            
assertThat(record.getValue().getLong(0)).isEqualTo(nextElementValue);
             return new Tuple2<>(nextElementValue + 1, currentWatermark);
         } else if (element instanceof Watermark) {
             long wt = ((Watermark) element).getTimestamp();
-            assertTrue(wt > currentWatermark);
+            assertThat(wt > currentWatermark).isTrue();

Review comment:
       isGreaterThan

##########
File path: 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRuleTest.java
##########
@@ -281,7 +280,7 @@ public void 
testMetadataProjectionWithoutProjectionPushDownWhenSupported() {
         util().tableEnv().createTable("T1", sourceDescriptor);
 
         util().verifyRelPlan("SELECT m1, metadata FROM T1");
-        assertThat(appliedKeys.get(), contains("m1", "m2"));
+        assertThat(appliedKeys.get()).satisfies(matching(contains("m1", 
"m2")));

Review comment:
       ```suggestion
           assertThat(appliedKeys.get()).contains("m1", "m2");
   ```

##########
File path: 
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sort/SortUtilTest.java
##########
@@ -124,7 +125,7 @@ public void testNormalizedKey() {
 
             Arrays.sort(segments, (o1, o2) -> o1.compare(o2, 0, 0, 8));
             for (int i = 0; i < len; i++) {
-                Assert.assertTrue(compareSegs[i].equalTo(segments[i], 0, 0, 
8));
+                assertThat(compareSegs[i].equalTo(segments[i], 0, 0, 
8)).isTrue();

Review comment:
       ```suggestion
                   assertThat(compareSegs[i]).isEqualTo(segments[i], 0, 0, 8);
   ```

##########
File path: 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/module/ModuleITCase.java
##########
@@ -61,10 +61,8 @@ public ScanRuntimeProvider getScanRuntimeProvider(
         final Table table = tEnv().from("T");
 
         // Sanity check: without our module loaded, the factory discovery 
process is used.
-        assertThrows(
-                "Discovered factory should not be used",
-                UnsupportedOperationException.class,
-                table::explain);
+        assertThatThrownBy(UnsupportedOperationException.class)

Review comment:
       runnable was dropped

##########
File path: 
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/Int2HashJoinOperatorTest.java
##########
@@ -389,7 +389,7 @@ static void joinAndAssert(
 
         Queue<Object> actual = testHarness.getOutput();
 
-        Assert.assertEquals("Output was not correct.", expectOutSize, 
actual.size());
+        assertThat(actual.size()).as("Output was not 
correct.").isEqualTo(expectOutSize);

Review comment:
       hasSize

##########
File path: 
flink-table/flink-table-common/src/test/java/org/apache/flink/table/data/columnar/vector/ColumnVectorTest.java
##########
@@ -351,16 +348,16 @@ public void testTimestamp() {
                                 .toArray()));
         setRangeDictIds(vector);
         for (int i = 0; i < SIZE; i++) {
-            assertEquals(TimestampData.fromEpochMillis(i, i), 
vector.getTimestamp(i, 9));
+            assertThat(vector.getTimestamp(i, 
9)).isEqualTo(TimestampData.fromEpochMillis(i, i));
         }
     }
 
     @Test
     public void testReserveDictIds() {
         HeapIntVector vector = new HeapIntVector(SIZE);
-        assertTrue(vector.reserveDictionaryIds(2).vector.length >= 2);
-        assertTrue(vector.reserveDictionaryIds(5).vector.length >= 5);
-        assertTrue(vector.reserveDictionaryIds(2).vector.length >= 2);
+        assertThat(vector.reserveDictionaryIds(2).vector.length >= 2).isTrue();

Review comment:
       isGreatThanOrEqualTo

##########
File path: 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
##########
@@ -1674,7 +1675,7 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context 
context) {
                 }
             } else {
                 // we don't support OutputFormat for updating query in the 
TestValues connector
-                assert runtimeSink.equals("SinkFunction");
+                assertThat(runtimeSink.equals("SinkFunction")).isTrue();

Review comment:
       ```suggestion
                   assertThat(runtimeSink).isEqualTo("SinkFunction");
   ```

##########
File path: 
flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTest.java
##########
@@ -141,18 +137,16 @@ public void testCreateDb_DatabaseAlreadyExist_ignored() 
throws Exception {
         List<String> dbs = catalog.listDatabases();
 
         CatalogTestUtil.checkEquals(cd1, catalog.getDatabase(db1));
-        assertEquals(2, dbs.size());
-        assertEquals(
-                new HashSet<>(Arrays.asList(db1, 
catalog.getDefaultDatabase())),
-                new HashSet<>(dbs));
+        assertThat(dbs.size()).isEqualTo(2);
+        assertThat(new HashSet<>(dbs))
+                .isEqualTo(new HashSet<>(Arrays.asList(db1, 
catalog.getDefaultDatabase())));

Review comment:
       containsExactlyInAnyOrder?

##########
File path: 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/delegation/ParserImplTest.java
##########
@@ -111,14 +109,11 @@ public void testParseLegalStatements() {
         for (TestSpec spec : TEST_SPECS) {
             if (spec.expectedSummary != null) {
                 Operation op = parser.parse(spec.statement).get(0);
-                assertEquals(spec.expectedSummary, op.asSummaryString());
+                
assertThat(op.asSummaryString()).isEqualTo(spec.expectedSummary);
             }
 
             if (spec.expectedError != null) {
-                assertThrows(
-                        spec.expectedError,
-                        SqlParserException.class,
-                        () -> parser.parse(spec.statement));
+                
assertThatThrownBy(SqlParserException.class).isInstanceOf(spec.expectedError);

Review comment:
       This looks like a bug in the converter; the runnable was dropped

##########
File path: 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSinkITCase.java
##########
@@ -202,8 +201,8 @@ public void 
testUnifiedSinksAreUsableWithDataStreamSinkProvider()
         tableEnv.executeSql(sqlStmt).await();
         final List<Integer> fetchedRows =
                 fetched.get().stream().map(r -> 
r.getInt(0)).sorted().collect(Collectors.toList());
-        assertEquals(fetchedRows.get(0).intValue(), 1);
-        assertEquals(fetchedRows.get(1).intValue(), 2);
+        assertThat(1).isEqualTo(fetchedRows.get(0).intValue());
+        assertThat(2).isEqualTo(fetchedRows.get(1).intValue());

Review comment:
       switch order

##########
File path: 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase.java
##########
@@ -89,10 +86,10 @@
     public void testCreateCatalogFunctionInDefaultCatalog() {
         String ddl1 = "create function f1 as 
'org.apache.flink.function.TestFunction'";
         tEnv().executeSql(ddl1);
-        assertTrue(Arrays.asList(tEnv().listFunctions()).contains("f1"));
+        
assertThat(Arrays.asList(tEnv().listFunctions()).contains("f1")).isTrue();

Review comment:
       ```suggestion
           assertThat(Arrays.asList(tEnv().listFunctions())).contains("f1");
   ```

##########
File path: 
flink-table/flink-table-planner/src/test/java/org/apache/flink/connector/file/table/FileSystemTableFactoryTest.java
##########
@@ -98,10 +97,10 @@ public void testLackOptionSink() {
             createTableSink(SCHEMA, descriptor.asMap());
         } catch (ValidationException e) {
             Throwable cause = e.getCause();
-            assertTrue(cause.toString(), cause instanceof ValidationException);
-            assertTrue(
-                    cause.getMessage(),
-                    cause.getMessage().contains("Missing required options 
are:\n\nformat"));
+            
assertThat(cause).as(cause.toString()).isInstanceOf(ValidationException.class);
+            assertThat(cause.getMessage().contains("Missing required options 
are:\n\nformat"))
+                    .as(cause.getMessage())
+                    .isTrue();

Review comment:
       ```suggestion
               assertThat(cause).hasMessageContaining("Missing required options 
are:\n\nformat");
   ```

##########
File path: 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase.java
##########
@@ -927,13 +926,14 @@ public void testInvalidUseOfSystemScalarFunction() {
 
         try {
             tEnv().explainSql("INSERT INTO SinkTable " + "SELECT * FROM 
TABLE(MD5('3'))");
-            fail();
+            fail("unknown failure");
         } catch (ValidationException e) {
-            assertThat(
-                    e,
-                    hasMessage(
-                            containsString(
-                                    "Currently, only table functions can be 
used in a correlate operation.")));
+            assertThat(e)
+                    .satisfies(
+                            matching(
+                                    hasMessage(
+                                            containsString(
+                                                    "Currently, only table 
functions can be used in a correlate operation."))));

Review comment:
       ```suggestion
               assertThat(e)
                       .hasMessageContaining("Currently, only table functions 
can be used in a correlate operation.");
   ```

##########
File path: 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/InternalConfigOptionsTest.java
##########
@@ -77,6 +77,6 @@ public void 
testTranslateExecNodeGraphWithInternalTemporalConf() {
         // cleanup them after translate finished
         List<Transformation<?>> transformation = 
planner.translateToPlan(execNodeGraph);
         // check the translation success
-        Assert.assertEquals(1, transformation.size());
+        assertThat(transformation.size()).isEqualTo(1);

Review comment:
       ```suggestion
           assertThat(transformation).hasSize(1);
   ```

##########
File path: 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/DiffRepository.java
##########
@@ -317,7 +317,7 @@ public synchronized String expand(String testCaseName, 
String tag, String text)
      * @param value Value of the resource
      */
     public synchronized void set(String testCaseName, String resourceName, 
String value) {
-        assert resourceName != null;
+        assertThat(resourceName != null).isTrue();

Review comment:
       ```suggestion
           assertThat(resourceName).isNotNull();
   ```

##########
File path: 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/table/DataGeneratorConnectorITCase.java
##########
@@ -69,6 +70,6 @@ public void testTypes() throws Exception {
             }
         }
 
-        Assert.assertEquals("Unexpected number of results", 10, 
results.size());
+        assertThat(results.size()).as("Unexpected number of 
results").isEqualTo(10);

Review comment:
       hasSize

##########
File path: 
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/assigners/SlidingWindowAssignerTest.java
##########
@@ -50,58 +48,63 @@ public void testWindowAssignment() {
         SlidingWindowAssigner assigner =
                 SlidingWindowAssigner.of(Duration.ofMillis(5000), 
Duration.ofMillis(1000));
 
-        assertThat(
-                assigner.assignWindows(ELEMENT, 0L),
-                containsInAnyOrder(
-                        timeWindow(-4000, 1000),
-                        timeWindow(-3000, 2000),
-                        timeWindow(-2000, 3000),
-                        timeWindow(-1000, 4000),
-                        timeWindow(0, 5000)));
-        assertThat(
-                assigner.assignWindows(ELEMENT, 4999L),
-                containsInAnyOrder(
-                        timeWindow(0, 5000),
-                        timeWindow(1000, 6000),
-                        timeWindow(2000, 7000),
-                        timeWindow(3000, 8000),
-                        timeWindow(4000, 9000)));
-        assertThat(
-                assigner.assignWindows(ELEMENT, 5000L),
-                containsInAnyOrder(
-                        timeWindow(1000, 6000),
-                        timeWindow(2000, 7000),
-                        timeWindow(3000, 8000),
-                        timeWindow(4000, 9000),
-                        timeWindow(5000, 10000)));
+        assertThat(assigner.assignWindows(ELEMENT, 0L))
+                .satisfies(
+                        matching(
+                                containsInAnyOrder(
+                                        timeWindow(-4000, 1000),
+                                        timeWindow(-3000, 2000),
+                                        timeWindow(-2000, 3000),
+                                        timeWindow(-1000, 4000),
+                                        timeWindow(0, 5000))));
+        assertThat(assigner.assignWindows(ELEMENT, 4999L))
+                .satisfies(
+                        matching(
+                                containsInAnyOrder(
+                                        timeWindow(0, 5000),
+                                        timeWindow(1000, 6000),
+                                        timeWindow(2000, 7000),
+                                        timeWindow(3000, 8000),
+                                        timeWindow(4000, 9000))));
+        assertThat(assigner.assignWindows(ELEMENT, 5000L))
+                .satisfies(
+                        matching(

Review comment:
       should be unnecessary

##########
File path: 
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/WindowOperatorContractTest.java
##########
@@ -175,7 +175,7 @@ public void testMergeWindowsIsCalled() throws Exception {
         when(mockAssigner.assignWindows(anyGenericRow(), anyLong()))
                 .thenReturn(Arrays.asList(new TimeWindow(2, 4), new 
TimeWindow(0, 2)));
 
-        assertEquals(0, testHarness.getOutput().size());
+        assertThat(testHarness.getOutput().size()).isEqualTo(0);

Review comment:
       ```suggestion
           assertThat(testHarness.getOutput()).hasSize(0);
   ```

##########
File path: 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/typeutils/RowTypeUtilsTest.java
##########
@@ -18,24 +18,25 @@
 
 package org.apache.flink.table.planner.typeutils;
 
-import org.junit.Assert;
 import org.junit.Test;
 
 import java.util.Arrays;
 
+import static org.assertj.core.api.Assertions.assertThat;
+
 /** Tests for {@link RowTypeUtils}. */
 public class RowTypeUtilsTest {
 
     @Test
     public void testGetUniqueName() {
-        Assert.assertEquals(
-                Arrays.asList("Dave", "Evan"),
-                RowTypeUtils.getUniqueName(
-                        Arrays.asList("Dave", "Evan"), Arrays.asList("Alice", 
"Bob")));
-        Assert.assertEquals(
-                Arrays.asList("Bob_0", "Bob_1", "Dave", "Alice_0"),
-                RowTypeUtils.getUniqueName(
-                        Arrays.asList("Bob", "Bob", "Dave", "Alice"),
-                        Arrays.asList("Alice", "Bob")));
+        assertThat(
+                        RowTypeUtils.getUniqueName(
+                                Arrays.asList("Dave", "Evan"), 
Arrays.asList("Alice", "Bob")))
+                .isEqualTo(Arrays.asList("Dave", "Evan"));

Review comment:
       containsExactlyInAnyOrder

##########
File path: 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/DiffRepository.java
##########
@@ -287,7 +287,7 @@ public synchronized String expand(String testCaseName, 
String tag, String text)
             if (tag == null) {
                 tag = token;
             }
-            assert token.startsWith(tag) : "token '" + token + "' does not 
match tag '" + tag + "'";
+            assertThat(token.startsWith(tag)).isTrue();

Review comment:
       ```suggestion
               assertThat(token).startsWith(tag);
   ```

##########
File path: 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/module/ModuleITCase.java
##########
@@ -89,10 +87,8 @@ public SinkRuntimeProvider getSinkRuntimeProvider(
                                 .build());
 
         // Sanity check: without our module loaded, the factory discovery 
process is used.
-        assertThrows(
-                "Discovered factory should not be used",
-                UnsupportedOperationException.class,
-                () -> tEnv().explainSql("INSERT INTO T SELECT 1"));

Review comment:
       runnable was dropped

##########
File path: 
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/data/BinaryRowDataTest.java
##########
@@ -389,15 +383,15 @@ public void testSingleSegmentBinaryRowHashCode() {
             writer.complete();
             hashCodes.add(row.hashCode());
         }
-        Assert.assertTrue(hashCodes.size() > count * 0.997);
+        assertThat(hashCodes.size() > count * 0.997).isTrue();

Review comment:
       isGreaterThan

##########
File path: 
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/MergingWindowSetTest.java
##########
@@ -220,92 +223,99 @@ public void testLateMerging() throws Exception {
         // add several non-overlapping initial windoww
 
         mergeFunction.reset();
-        assertEquals(
-                new TimeWindow(0, 3), windowSet.addWindow(new TimeWindow(0, 
3), mergeFunction));
-        assertFalse(mergeFunction.hasMerged());
-        assertEquals(new TimeWindow(0, 3), windowSet.getStateWindow(new 
TimeWindow(0, 3)));
+        assertThat(windowSet.addWindow(new TimeWindow(0, 3), mergeFunction))
+                .isEqualTo(new TimeWindow(0, 3));
+        assertThat(mergeFunction.hasMerged()).isFalse();
+        assertThat(windowSet.getStateWindow(new TimeWindow(0, 
3))).isEqualTo(new TimeWindow(0, 3));
 
         mergeFunction.reset();
-        assertEquals(
-                new TimeWindow(5, 8), windowSet.addWindow(new TimeWindow(5, 
8), mergeFunction));
-        assertFalse(mergeFunction.hasMerged());
-        assertEquals(new TimeWindow(5, 8), windowSet.getStateWindow(new 
TimeWindow(5, 8)));
+        assertThat(windowSet.addWindow(new TimeWindow(5, 8), mergeFunction))
+                .isEqualTo(new TimeWindow(5, 8));
+        assertThat(mergeFunction.hasMerged()).isFalse();
+        assertThat(windowSet.getStateWindow(new TimeWindow(5, 
8))).isEqualTo(new TimeWindow(5, 8));
 
         mergeFunction.reset();
-        assertEquals(
-                new TimeWindow(10, 13), windowSet.addWindow(new TimeWindow(10, 
13), mergeFunction));
-        assertFalse(mergeFunction.hasMerged());
-        assertEquals(new TimeWindow(10, 13), windowSet.getStateWindow(new 
TimeWindow(10, 13)));
+        assertThat(windowSet.addWindow(new TimeWindow(10, 13), mergeFunction))
+                .isEqualTo(new TimeWindow(10, 13));
+        assertThat(mergeFunction.hasMerged()).isFalse();
+        assertThat(windowSet.getStateWindow(new TimeWindow(10, 13)))
+                .isEqualTo(new TimeWindow(10, 13));
 
         // add a window that merges the later two windows
         mergeFunction.reset();
-        assertEquals(
-                new TimeWindow(5, 13), windowSet.addWindow(new TimeWindow(8, 
10), mergeFunction));
-        assertTrue(mergeFunction.hasMerged());
-        assertEquals(new TimeWindow(5, 13), mergeFunction.mergeTarget());
-        assertThat(
-                mergeFunction.stateWindow(),
-                anyOf(Is.is(new TimeWindow(5, 8)), Is.is(new TimeWindow(10, 
13))));
-        assertThat(
-                mergeFunction.mergeSources(),
-                containsInAnyOrder(new TimeWindow(5, 8), new TimeWindow(10, 
13)));
-        assertThat(
-                mergeFunction.mergedStateWindows(),
-                anyOf(
-                        containsInAnyOrder(new TimeWindow(10, 13)),
-                        containsInAnyOrder(new TimeWindow(5, 8))));
-        assertThat(mergeFunction.mergedStateWindows(), 
not(hasItem(mergeFunction.mergeTarget())));
+        assertThat(windowSet.addWindow(new TimeWindow(8, 10), mergeFunction))
+                .isEqualTo(new TimeWindow(5, 13));
+        assertThat(mergeFunction.hasMerged()).isTrue();
+        assertThat(mergeFunction.mergeTarget()).isEqualTo(new TimeWindow(5, 
13));
+        assertThat(mergeFunction.stateWindow())
+                .satisfies(
+                        matching(
+                                anyOf(Is.is(new TimeWindow(5, 8)), Is.is(new 
TimeWindow(10, 13)))));
+        assertThat(mergeFunction.mergeSources())
+                .satisfies(
+                        matching(containsInAnyOrder(new TimeWindow(5, 8), new 
TimeWindow(10, 13))));
+        assertThat(mergeFunction.mergedStateWindows())
+                .satisfies(
+                        matching(

Review comment:
       this should also be possible without hamcrest

##########
File path: 
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/aggregate/window/SlicingWindowAggOperatorTest.java
##########
@@ -792,10 +791,11 @@ public void testInvalidWindows() {
                     .build();
             fail("should fail");
         } catch (Exception e) {
-            assertThat(
-                    e,
-                    containsMessage(
-                            "Hopping window requires a COUNT(*) in the 
aggregate functions."));
+            assertThat(e)
+                    .satisfies(

Review comment:
       hasMessageContaining

##########
File path: 
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/multipleinput/TableOperatorWrapperTest.java
##########
@@ -81,14 +78,14 @@ public void testCreateOperator() throws Exception {
         StreamOperatorParameters<RowData> parameters = 
createStreamOperatorParameters();
         wrapper.createOperator(parameters);
 
-        assertEquals(operator, wrapper.getStreamOperator());
+        assertThat(wrapper.getStreamOperator()).isEqualTo(operator);
 
         // create operator again, will throw exception
         try {
             wrapper.createOperator(parameters);
             fail("This should not happen");
         } catch (Exception e) {
-            assertTrue(e.getMessage().contains("This operator has been 
initialized"));
+            assertThat(e.getMessage().contains("This operator has been 
initialized")).isTrue();

Review comment:
       hasMessageContaining

##########
File path: 
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/formats/raw/RawFormatFactoryTest.java
##########
@@ -186,13 +189,14 @@ public void testInvalidFieldTypes() {
             createDeserializationSchema(
                     ResolvedSchema.of(Column.physical("field1", 
DataTypes.TIMESTAMP(3))),
                     getBasicOptions());
-            fail();
+            fail("unknown failure");
         } catch (Exception e) {
-            assertThat(
-                    e,
-                    hasMessage(
-                            equalTo(
-                                    "The 'raw' format doesn't supports 
'TIMESTAMP(3)' as column type.")));
+            assertThat(e)
+                    .satisfies(

Review comment:
       hasMessageContaining

##########
File path: 
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/MergingWindowSetTest.java
##########
@@ -104,106 +99,114 @@ public void testIncrementalMerging() throws Exception {
 
         // add initial window
         mergeFunction.reset();
-        assertEquals(
-                new TimeWindow(0, 4), windowSet.addWindow(new TimeWindow(0, 
4), mergeFunction));
-        assertFalse(mergeFunction.hasMerged());
+        assertThat(windowSet.addWindow(new TimeWindow(0, 4), mergeFunction))
+                .isEqualTo(new TimeWindow(0, 4));
+        assertThat(mergeFunction.hasMerged()).isFalse();
 
-        assertTrue(windowSet.getStateWindow(new TimeWindow(0, 4)).equals(new 
TimeWindow(0, 4)));
+        assertThat(windowSet.getStateWindow(new TimeWindow(0, 4)).equals(new 
TimeWindow(0, 4)))
+                .isTrue();
 
         // add some more windows
         mergeFunction.reset();
-        assertEquals(
-                new TimeWindow(0, 4), windowSet.addWindow(new TimeWindow(0, 
4), mergeFunction));
-        assertFalse(mergeFunction.hasMerged());
+        assertThat(windowSet.addWindow(new TimeWindow(0, 4), mergeFunction))
+                .isEqualTo(new TimeWindow(0, 4));
+        assertThat(mergeFunction.hasMerged()).isFalse();
 
         mergeFunction.reset();
-        assertEquals(
-                new TimeWindow(0, 5), windowSet.addWindow(new TimeWindow(3, 
5), mergeFunction));
-        assertTrue(mergeFunction.hasMerged());
-        assertEquals(new TimeWindow(0, 5), mergeFunction.mergeTarget());
-        assertEquals(new TimeWindow(0, 4), mergeFunction.stateWindow());
-        assertThat(mergeFunction.mergeSources(), containsInAnyOrder(new 
TimeWindow(0, 4)));
-        assertTrue(mergeFunction.mergedStateWindows().isEmpty());
+        assertThat(windowSet.addWindow(new TimeWindow(3, 5), mergeFunction))
+                .isEqualTo(new TimeWindow(0, 5));
+        assertThat(mergeFunction.hasMerged()).isTrue();
+        assertThat(mergeFunction.mergeTarget()).isEqualTo(new TimeWindow(0, 
5));
+        assertThat(mergeFunction.stateWindow()).isEqualTo(new TimeWindow(0, 
4));
+        assertThat(mergeFunction.mergeSources())
+                .satisfies(matching(containsInAnyOrder(new TimeWindow(0, 4))));
+        assertThat(mergeFunction.mergedStateWindows().isEmpty()).isTrue();
 
         mergeFunction.reset();
-        assertEquals(
-                new TimeWindow(0, 6), windowSet.addWindow(new TimeWindow(4, 
6), mergeFunction));
-        assertTrue(mergeFunction.hasMerged());
-        assertEquals(new TimeWindow(0, 6), mergeFunction.mergeTarget());
-        assertEquals(new TimeWindow(0, 4), mergeFunction.stateWindow());
-        assertThat(mergeFunction.mergeSources(), containsInAnyOrder(new 
TimeWindow(0, 5)));
-        assertTrue(mergeFunction.mergedStateWindows().isEmpty());
+        assertThat(windowSet.addWindow(new TimeWindow(4, 6), mergeFunction))
+                .isEqualTo(new TimeWindow(0, 6));
+        assertThat(mergeFunction.hasMerged()).isTrue();
+        assertThat(mergeFunction.mergeTarget()).isEqualTo(new TimeWindow(0, 
6));
+        assertThat(mergeFunction.stateWindow()).isEqualTo(new TimeWindow(0, 
4));
+        assertThat(mergeFunction.mergeSources())
+                .satisfies(matching(containsInAnyOrder(new TimeWindow(0, 5))));
+        assertThat(mergeFunction.mergedStateWindows().isEmpty()).isTrue();
 
-        assertEquals(new TimeWindow(0, 4), windowSet.getStateWindow(new 
TimeWindow(0, 6)));
+        assertThat(windowSet.getStateWindow(new TimeWindow(0, 
6))).isEqualTo(new TimeWindow(0, 4));
 
         // add some windows that falls into the already merged region
         mergeFunction.reset();
-        assertEquals(
-                new TimeWindow(0, 6), windowSet.addWindow(new TimeWindow(1, 
4), mergeFunction));
-        assertFalse(mergeFunction.hasMerged());
+        assertThat(windowSet.addWindow(new TimeWindow(1, 4), mergeFunction))
+                .isEqualTo(new TimeWindow(0, 6));
+        assertThat(mergeFunction.hasMerged()).isFalse();
 
         mergeFunction.reset();
-        assertEquals(
-                new TimeWindow(0, 6), windowSet.addWindow(new TimeWindow(0, 
4), mergeFunction));
-        assertFalse(mergeFunction.hasMerged());
+        assertThat(windowSet.addWindow(new TimeWindow(0, 4), mergeFunction))
+                .isEqualTo(new TimeWindow(0, 6));
+        assertThat(mergeFunction.hasMerged()).isFalse();
 
         mergeFunction.reset();
-        assertEquals(
-                new TimeWindow(0, 6), windowSet.addWindow(new TimeWindow(3, 
5), mergeFunction));
-        assertFalse(mergeFunction.hasMerged());
+        assertThat(windowSet.addWindow(new TimeWindow(3, 5), mergeFunction))
+                .isEqualTo(new TimeWindow(0, 6));
+        assertThat(mergeFunction.hasMerged()).isFalse();
 
         mergeFunction.reset();
-        assertEquals(
-                new TimeWindow(0, 6), windowSet.addWindow(new TimeWindow(4, 
6), mergeFunction));
-        assertFalse(mergeFunction.hasMerged());
+        assertThat(windowSet.addWindow(new TimeWindow(4, 6), mergeFunction))
+                .isEqualTo(new TimeWindow(0, 6));
+        assertThat(mergeFunction.hasMerged()).isFalse();
 
-        assertEquals(new TimeWindow(0, 4), windowSet.getStateWindow(new 
TimeWindow(0, 6)));
+        assertThat(windowSet.getStateWindow(new TimeWindow(0, 
6))).isEqualTo(new TimeWindow(0, 4));
 
         // add some more windows that don't merge with the first bunch
         mergeFunction.reset();
-        assertEquals(
-                new TimeWindow(11, 14), windowSet.addWindow(new TimeWindow(11, 
14), mergeFunction));
-        assertFalse(mergeFunction.hasMerged());
+        assertThat(windowSet.addWindow(new TimeWindow(11, 14), mergeFunction))
+                .isEqualTo(new TimeWindow(11, 14));
+        assertThat(mergeFunction.hasMerged()).isFalse();
 
-        assertEquals(new TimeWindow(0, 4), windowSet.getStateWindow(new 
TimeWindow(0, 6)));
+        assertThat(windowSet.getStateWindow(new TimeWindow(0, 
6))).isEqualTo(new TimeWindow(0, 4));
 
-        assertEquals(new TimeWindow(11, 14), windowSet.getStateWindow(new 
TimeWindow(11, 14)));
+        assertThat(windowSet.getStateWindow(new TimeWindow(11, 14)))
+                .isEqualTo(new TimeWindow(11, 14));
 
         // add some more windows that merge with the second bunch
 
         mergeFunction.reset();
-        assertEquals(
-                new TimeWindow(10, 14), windowSet.addWindow(new TimeWindow(10, 
13), mergeFunction));
-        assertTrue(mergeFunction.hasMerged());
-        assertEquals(new TimeWindow(10, 14), mergeFunction.mergeTarget());
-        assertEquals(new TimeWindow(11, 14), mergeFunction.stateWindow());
-        assertThat(mergeFunction.mergeSources(), containsInAnyOrder(new 
TimeWindow(11, 14)));
-        assertTrue(mergeFunction.mergedStateWindows().isEmpty());
+        assertThat(windowSet.addWindow(new TimeWindow(10, 13), mergeFunction))
+                .isEqualTo(new TimeWindow(10, 14));
+        assertThat(mergeFunction.hasMerged()).isTrue();
+        assertThat(mergeFunction.mergeTarget()).isEqualTo(new TimeWindow(10, 
14));
+        assertThat(mergeFunction.stateWindow()).isEqualTo(new TimeWindow(11, 
14));
+        assertThat(mergeFunction.mergeSources())
+                .satisfies(matching(containsInAnyOrder(new TimeWindow(11, 
14))));
+        assertThat(mergeFunction.mergedStateWindows().isEmpty()).isTrue();
 
         mergeFunction.reset();
-        assertEquals(
-                new TimeWindow(10, 15), windowSet.addWindow(new TimeWindow(12, 
15), mergeFunction));
-        assertTrue(mergeFunction.hasMerged());
-        assertEquals(new TimeWindow(10, 15), mergeFunction.mergeTarget());
-        assertEquals(new TimeWindow(11, 14), mergeFunction.stateWindow());
-        assertThat(mergeFunction.mergeSources(), containsInAnyOrder(new 
TimeWindow(10, 14)));
-        assertTrue(mergeFunction.mergedStateWindows().isEmpty());
+        assertThat(windowSet.addWindow(new TimeWindow(12, 15), mergeFunction))
+                .isEqualTo(new TimeWindow(10, 15));
+        assertThat(mergeFunction.hasMerged()).isTrue();
+        assertThat(mergeFunction.mergeTarget()).isEqualTo(new TimeWindow(10, 
15));
+        assertThat(mergeFunction.stateWindow()).isEqualTo(new TimeWindow(11, 
14));
+        assertThat(mergeFunction.mergeSources())
+                .satisfies(matching(containsInAnyOrder(new TimeWindow(10, 
14))));
+        assertThat(mergeFunction.mergedStateWindows().isEmpty()).isTrue();
 
         mergeFunction.reset();
-        assertEquals(
-                new TimeWindow(10, 15), windowSet.addWindow(new TimeWindow(11, 
14), mergeFunction));
-        assertFalse(mergeFunction.hasMerged());
+        assertThat(windowSet.addWindow(new TimeWindow(11, 14), mergeFunction))
+                .isEqualTo(new TimeWindow(10, 15));
+        assertThat(mergeFunction.hasMerged()).isFalse();
 
-        assertEquals(new TimeWindow(0, 4), windowSet.getStateWindow(new 
TimeWindow(0, 6)));
+        assertThat(windowSet.getStateWindow(new TimeWindow(0, 
6))).isEqualTo(new TimeWindow(0, 4));
 
-        assertEquals(new TimeWindow(11, 14), windowSet.getStateWindow(new 
TimeWindow(10, 15)));
+        assertThat(windowSet.getStateWindow(new TimeWindow(10, 15)))
+                .isEqualTo(new TimeWindow(11, 14));
 
         // retire the first batch of windows
         windowSet.retireWindow(new TimeWindow(0, 6));
 
-        assertTrue(windowSet.getStateWindow(new TimeWindow(0, 6)) == null);
+        assertThat(windowSet.getStateWindow(new TimeWindow(0, 6)) == 
null).isTrue();

Review comment:
       isNull

##########
File path: 
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/aggregate/window/SlicingWindowAggOperatorTest.java
##########
@@ -176,7 +175,7 @@ public void testEventTimeHoppingWindows() throws Exception {
         OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0);
         testHarness.close();
 
-        assertTrue("Close was not called.", aggsFunction.closeCalled.get() > 
0);
+        assertThat(aggsFunction.closeCalled.get() > 0).as("Close was not 
called.").isTrue();

Review comment:
       isGreaterThan

##########
File path: 
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/source/InputConversionOperatorTest.java
##########
@@ -53,18 +54,20 @@ public void testInvalidRecords() throws Exception {
         try {
             operator.processElement(new 
StreamRecord<>(Row.ofKind(RowKind.INSERT)));
         } catch (FlinkRuntimeException e) {
-            assertThat(
-                    e,
-                    containsMessage(
-                            "Error during input conversion from external 
DataStream "
-                                    + "API to internal Table API data 
structures"));
+            assertThat(e)
+                    .satisfies(

Review comment:
       hasMessageContaining

##########
File path: 
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/data/DataFormatConvertersTest.java
##########
@@ -164,9 +164,11 @@ private static void test(TypeInformation typeInfo, Object 
value, Object anotherV
             converter.toInternal(anotherValue);
         }
 
-        Assert.assertTrue(
-                Arrays.deepEquals(
-                        new Object[] {converter.toExternal(innerValue)}, new 
Object[] {value}));
+        assertThat(
+                        Arrays.deepEquals(
+                                new Object[] 
{converter.toExternal(innerValue)},
+                                new Object[] {value}))
+                .isTrue();

Review comment:
       ```suggestion
           assertThat(converter.toExternal(innerValue)).isEqualTo(value);
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to