gaoyunhaii commented on a change in pull request #18978:
URL: https://github.com/apache/flink/pull/18978#discussion_r830747758



##########
File path: 
flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/BatchingStateChangeUploadSchedulerTest.java
##########
@@ -236,41 +233,47 @@ public void testRetryOnTimeout() throws Exception {
             }
         }
 
-        assertTrue(upload.finished.get());
-        assertEquals(
-                upload.changeSets.stream()
-                        .map(StateChangeSet::getSequenceNumber)
-                        .collect(Collectors.toSet()),
-                succeeded.get().stream()
-                        .map(UploadResult::getSequenceNumber)
-                        .collect(Collectors.toSet()));
-        assertTrue(failed.get().isEmpty());
+        assertThat(upload.finished.get()).isTrue();
+        assertThat(
+                        upload.changeSets.stream()
+                                .map(StateChangeSet::getSequenceNumber)
+                                .collect(Collectors.toSet()))
+                .isEqualTo(
+                        succeeded.get().stream()
+                                .map(UploadResult::getSequenceNumber)
+                                .collect(Collectors.toSet()));
+        assertThat(failed.get()).isEmpty();
     }
 
-    @Test(expected = RejectedExecutionException.class)
-    public void testErrorHandling() throws Exception {
-        TestingStateChangeUploader probe = new TestingStateChangeUploader();
-        DirectScheduledExecutorService scheduler = new 
DirectScheduledExecutorService();
-        try (BatchingStateChangeUploadScheduler store =
-                new BatchingStateChangeUploadScheduler(
-                        Integer.MAX_VALUE,
-                        MAX_BYTES_IN_FLIGHT,
-                        MAX_BYTES_IN_FLIGHT,
-                        RetryPolicy.NONE,
-                        probe,
-                        scheduler,
-                        new RetryingExecutor(
-                                5,
-                                createUnregisteredChangelogStorageMetricGroup()
-                                        .getAttemptsPerUpload()),
-                        createUnregisteredChangelogStorageMetricGroup())) {
-            scheduler.shutdown();
-            upload(store, getChanges(4));
-        }
+    @Test
+    void testErrorHandling() {
+        assertThatThrownBy(

Review comment:
       I think we might narrow down the scope wrappered by the 
`assertThatThrownBy`? Like
   
   ```
           TestingStateChangeUploader probe = new TestingStateChangeUploader();
           DirectScheduledExecutorService scheduler = new 
DirectScheduledExecutorService();
           try (BatchingStateChangeUploadScheduler store =
                   new BatchingStateChangeUploadScheduler(
                           Integer.MAX_VALUE,
                           MAX_BYTES_IN_FLIGHT,
                           MAX_BYTES_IN_FLIGHT,
                           RetryPolicy.NONE,
                           probe,
                           scheduler,
                           new RetryingExecutor(
                                   5,
                                   
createUnregisteredChangelogStorageMetricGroup()
                                           .getAttemptsPerUpload()),
                           createUnregisteredChangelogStorageMetricGroup())) {
               scheduler.shutdown();
               assertThatThrownBy(() -> upload(store, getChanges(4)))
                       .isInstanceOf(RejectedExecutionException.class);
           }
   
   ```
   
   (Might need a double check if the above wrapped scope is right)

##########
File path: 
flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/FsStateChangelogStorageTest.java
##########
@@ -21,27 +21,24 @@
 import org.apache.flink.runtime.state.changelog.StateChangelogStorage;
 import 
org.apache.flink.runtime.state.changelog.inmemory.StateChangelogStorageTest;
 
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
+import java.io.File;
 import java.io.IOException;
+import java.util.stream.Stream;
 
 import static 
org.apache.flink.changelog.fs.UnregisteredChangelogStorageMetricGroup.createUnregisteredChangelogStorageMetricGroup;
 
 /** {@link FsStateChangelogStorage} test. */
-@RunWith(Parameterized.class)
 public class FsStateChangelogStorageTest extends StateChangelogStorageTest {

Review comment:
       Might complement the parameter type? 
   
   ```
   public class FsStateChangelogStorageTest
           extends StateChangelogStorageTest<ChangelogStateHandleStreamImpl> {
   ...
   protected StateChangelogStorage<ChangelogStateHandleStreamImpl> getFactory(
   
   ```

##########
File path: 
flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/BatchingStateChangeUploadSchedulerTest.java
##########
@@ -128,20 +127,18 @@ public void testDelay() throws Exception {
                     scheduler.triggerAll();
                     List<StateChangeSet> changeSets = getChanges(4);
                     upload(store, changeSets);
-                    assertTrue(probe.getUploaded().isEmpty());
-                    assertTrue(
-                            scheduler.getAllNonPeriodicScheduledTask().stream()
-                                    .anyMatch(
-                                            scheduled ->
-                                                    
scheduled.getDelay(MILLISECONDS) == delayMs));
+                    assertThat(probe.getUploaded()).isEmpty();
+                    assertThat(scheduler.getAllNonPeriodicScheduledTask())
+                            .anyMatch(scheduled -> 
scheduled.getDelay(MILLISECONDS) == delayMs)
+                            .hasSizeGreaterThan(0);

Review comment:
       `hasSizeGreaterThan(0)` seems unnecessary~?
   
   I tried `assertThat(Arrays.asList(1, 2)).anyMatch(i -> i == 3);` would fail

##########
File path: 
flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/FsStateChangelogWriterSqnTest.java
##########
@@ -72,14 +68,9 @@
                         .expectIncrement(true));
     }
 
-    private final WriterSqnTestSettings test;
-
-    public FsStateChangelogWriterSqnTest(WriterSqnTestSettings test) {
-        this.test = test;
-    }
-
-    @Test
-    public void runTest() throws IOException {
+    @MethodSource("getSettings")
+    @ParameterizedTest

Review comment:
       Might complement the display name with `@ParameterizedTest(name = "name 
= {0}")`

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogStorageTest.java
##########
@@ -44,31 +45,43 @@
 import static java.util.function.Function.identity;
 import static java.util.stream.Collectors.toMap;
 import static java.util.stream.StreamSupport.stream;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** {@link InMemoryStateChangelogStorage} test. */
 public class StateChangelogStorageTest<T extends ChangelogStateHandle> {
 
     private final Random random = new Random();
 
-    @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder();
+    @TempDir public File temporaryFolder;
 
-    @Test(expected = IllegalStateException.class)
-    public void testNoAppendAfterClose() throws IOException {
-        StateChangelogWriter<?> writer =
-                getFactory().createWriter(new OperatorID().toString(), 
KeyGroupRange.of(0, 0));
-        writer.close();
-        writer.append(0, new byte[0]);
+    public static Stream<Boolean> parameters() {
+        return Stream.of(true);
     }
 
-    @Test
-    public void testWriteAndRead() throws Exception {
+    @MethodSource("parameters")
+    @ParameterizedTest
+    public void testNoAppendAfterClose(boolean compression) throws IOException 
{
+        assertThatThrownBy(

Review comment:
       Narrow down the scope of `assertThatThrownBy` if possible?

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogStorageTest.java
##########
@@ -44,31 +45,43 @@
 import static java.util.function.Function.identity;
 import static java.util.stream.Collectors.toMap;
 import static java.util.stream.StreamSupport.stream;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** {@link InMemoryStateChangelogStorage} test. */
 public class StateChangelogStorageTest<T extends ChangelogStateHandle> {
 
     private final Random random = new Random();
 
-    @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder();
+    @TempDir public File temporaryFolder;
 
-    @Test(expected = IllegalStateException.class)
-    public void testNoAppendAfterClose() throws IOException {
-        StateChangelogWriter<?> writer =
-                getFactory().createWriter(new OperatorID().toString(), 
KeyGroupRange.of(0, 0));
-        writer.close();
-        writer.append(0, new byte[0]);
+    public static Stream<Boolean> parameters() {
+        return Stream.of(true);
     }
 
-    @Test
-    public void testWriteAndRead() throws Exception {
+    @MethodSource("parameters")
+    @ParameterizedTest
+    public void testNoAppendAfterClose(boolean compression) throws IOException 
{
+        assertThatThrownBy(
+                        () -> {
+                            StateChangelogWriter<?> writer =
+                                    getFactory(compression, temporaryFolder)
+                                            .createWriter(
+                                                    new 
OperatorID().toString(),
+                                                    KeyGroupRange.of(0, 0));
+                            writer.close();
+                            writer.append(0, new byte[0]);
+                        })
+                .isInstanceOf(IllegalStateException.class);
+    }
+
+    @MethodSource("parameters")
+    @ParameterizedTest

Review comment:
       Keeps the original description like `@ParameterizedTest(name = ...)` ?

##########
File path: 
flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/FsStateChangelogWriterSqnTest.java
##########
@@ -91,24 +82,14 @@ public void runTest() throws IOException {
                 append(writer);
             }
             test.action.accept(writer);
-            assertEquals(
-                    getMessage(),
-                    test.expectIncrement
-                            ? writer.initialSequenceNumber().next()
-                            : writer.initialSequenceNumber(),
-                    writer.lastAppendedSqnUnsafe());
+            assertThat(writer.lastAppendedSqnUnsafe())
+                    .isEqualTo(

Review comment:
       Might keeps the customized error message with 
   ```
   assertThat(...).as(test.getMessage()).isEqualTo(...)
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to