This is an automated email from the ASF dual-hosted git repository.
gongzhongqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 2632c8fe96c [FLINK-25537][JUnit5 Migration] Module: flink-core (#24939)
2632c8fe96c is described below
commit 2632c8fe96c48894784fba22842de17a19562599
Author: gongzhongqiang <[email protected]>
AuthorDate: Fri Jul 12 16:32:37 2024 +0800
[FLINK-25537][JUnit5 Migration] Module: flink-core (#24939)
* [FLINK-25537] [JUnit5 Migration] Module: flink-core
* Revert "[FLINK-35601][test] Revert the junit5 migration of
InitOutputPathTest"
This reverts commit 3a15d1ce69ac21d619f60033ec45cae303489c8f.
* the junit5 migration of InitOutputPathTest"
---
flink-core/pom.xml | 12 --
.../apache/flink/core/fs/InitOutputPathTest.java | 147 ++++++++++++++-------
.../apache/flink/management/jmx/JMXServerTest.java | 36 +++--
.../flink/management/jmx/JMXServiceTest.java | 15 +--
.../apache/flink/testutils/ClassLoaderUtils.java | 3 +-
.../serialization/types/SerializationTestType.java | 4 +-
.../flink/testutils/serialization/types/Util.java | 2 +-
7 files changed, 123 insertions(+), 96 deletions(-)
diff --git a/flink-core/pom.xml b/flink-core/pom.xml
index 16b87896fa5..7373463b307 100644
--- a/flink-core/pom.xml
+++ b/flink-core/pom.xml
@@ -179,18 +179,6 @@ under the License.
<version>${project.version}</version>
<scope>test</scope>
</dependency>
-
- <dependency>
- <groupId>org.powermock</groupId>
- <artifactId>powermock-module-junit4</artifactId>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.powermock</groupId>
- <artifactId>powermock-api-mockito2</artifactId>
- <scope>test</scope>
- </dependency>
</dependencies>
<build>
diff --git
a/flink-core/src/test/java/org/apache/flink/core/fs/InitOutputPathTest.java
b/flink-core/src/test/java/org/apache/flink/core/fs/InitOutputPathTest.java
index 891cd123fb2..8043d45b046 100644
--- a/flink-core/src/test/java/org/apache/flink/core/fs/InitOutputPathTest.java
+++ b/flink-core/src/test/java/org/apache/flink/core/fs/InitOutputPathTest.java
@@ -23,31 +23,29 @@ import org.apache.flink.core.fs.local.LocalDataOutputStream;
import org.apache.flink.core.fs.local.LocalFileSystem;
import org.apache.flink.core.testutils.CheckedThread;
import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.testutils.junit.utils.TempDirUtils;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
+import lombok.SneakyThrows;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
+import java.nio.file.FileAlreadyExistsException;
import java.util.concurrent.locks.ReentrantLock;
-import static org.junit.Assert.fail;
-import static org.powermock.api.mockito.PowerMockito.whenNew;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** A test validating that the initialization of local output paths is
properly synchronized. */
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(LocalFileSystem.class)
-public class InitOutputPathTest {
+class InitOutputPathTest {
- @Rule public final TemporaryFolder tempDir = new TemporaryFolder();
+ @TempDir private static java.nio.file.Path tempFolder;
/**
* This test validates that this test case makes sense - that the error
can be produced in the
@@ -55,27 +53,24 @@ public class InitOutputPathTest {
* latches.
*/
@Test
- public void testErrorOccursUnSynchronized() throws Exception {
+ void testErrorOccursUnSynchronized() throws Exception {
// deactivate the lock to produce the original un-synchronized state
Field lock =
FileSystem.class.getDeclaredField("OUTPUT_DIRECTORY_INIT_LOCK");
lock.setAccessible(true);
- lock.set(null, new NoOpLock());
- try {
- // in the original un-synchronized state, we can force the race to
occur by using
- // the proper latch order to control the process of the concurrent
threads
- runTest(true);
- fail("should fail with an exception");
- } catch (FileNotFoundException e) {
- // expected
- } finally {
- // reset the proper value
- lock.set(null, new ReentrantLock(true));
- }
+ Field modifiers = getModifiersField();
+ modifiers.setAccessible(true);
+ modifiers.setInt(lock, lock.getModifiers() & ~Modifier.FINAL);
+
+ lock.set(null, new NoOpLock());
+ // in the original un-synchronized state, we can force the race to
occur by using
+ // the proper latch order to control the process of the concurrent
threads
+ assertThatThrownBy(() ->
runTest(true)).isInstanceOf(FileNotFoundException.class);
+ lock.set(null, new ReentrantLock(true));
}
@Test
- public void testProperSynchronized() throws Exception {
+ void testProperSynchronized() throws Exception {
// in the synchronized variant, we cannot use the "await latches"
because not
// both threads can make process interleaved (due to the
synchronization)
// the test uses sleeps (rather than latches) to produce the same
interleaving.
@@ -86,8 +81,39 @@ public class InitOutputPathTest {
runTest(false);
}
+ private Field getModifiersField() throws IllegalAccessException,
NoSuchFieldException {
+ // this is copied from
https://github.com/powermock/powermock/pull/1010/files to work around
+ // JDK 12+
+ Field modifiersField = null;
+ try {
+ modifiersField = Field.class.getDeclaredField("modifiers");
+ } catch (NoSuchFieldException e) {
+ try {
+ Method getDeclaredFields0 =
+ Class.class.getDeclaredMethod("getDeclaredFields0",
boolean.class);
+ boolean accessibleBeforeSet =
getDeclaredFields0.isAccessible();
+ getDeclaredFields0.setAccessible(true);
+ Field[] fields = (Field[])
getDeclaredFields0.invoke(Field.class, false);
+ getDeclaredFields0.setAccessible(accessibleBeforeSet);
+ for (Field field : fields) {
+ if ("modifiers".equals(field.getName())) {
+ modifiersField = field;
+ break;
+ }
+ }
+ if (modifiersField == null) {
+ throw e;
+ }
+ } catch (NoSuchMethodException | InvocationTargetException ex) {
+ e.addSuppressed(ex);
+ throw e;
+ }
+ }
+ return modifiersField;
+ }
+
private void runTest(final boolean useAwaits) throws Exception {
- final File tempFile = tempDir.newFile();
+ final File tempFile = TempDirUtils.newFile(tempFolder);
final Path path1 = new Path(tempFile.getAbsolutePath(), "1");
final Path path2 = new Path(tempFile.getAbsolutePath(), "2");
@@ -104,32 +130,23 @@ public class InitOutputPathTest {
final OneShotLatch createAwaitLatch = new OneShotLatch();
final OneShotLatch createTriggerLatch = new OneShotLatch();
- // this "new LocalDataOutputStream()" is in the end called by the
async threads
- whenNew(LocalDataOutputStream.class)
- .withAnyArguments()
- .thenAnswer(
- new Answer<LocalDataOutputStream>() {
-
- @Override
- public LocalDataOutputStream
answer(InvocationOnMock invocation)
- throws Throwable {
- createAwaitLatch.trigger();
- createTriggerLatch.await();
-
- final File file = (File)
invocation.getArguments()[0];
- return new LocalDataOutputStream(file);
- }
- });
-
final LocalFileSystem fs1 =
new SyncedFileSystem(
- deleteAwaitLatch1, mkdirsAwaitLatch1,
- deleteTriggerLatch1, mkdirsTriggerLatch1);
+ deleteAwaitLatch1,
+ mkdirsAwaitLatch1,
+ deleteTriggerLatch1,
+ mkdirsTriggerLatch1,
+ createAwaitLatch,
+ createTriggerLatch);
final LocalFileSystem fs2 =
new SyncedFileSystem(
- deleteAwaitLatch2, mkdirsAwaitLatch2,
- deletetriggerLatch2, mkdirsTriggerLatch2);
+ deleteAwaitLatch2,
+ mkdirsAwaitLatch2,
+ deletetriggerLatch2,
+ mkdirsTriggerLatch2,
+ createAwaitLatch,
+ createTriggerLatch);
// start the concurrent file creators
FileCreator thread1 = new FileCreator(fs1, path1);
@@ -211,16 +228,44 @@ public class InitOutputPathTest {
private final OneShotLatch deleteAwaitLatch;
private final OneShotLatch mkdirsAwaitLatch;
+ private final OneShotLatch createAwaitLatch;
+ private final OneShotLatch createTriggerLatch;
+
SyncedFileSystem(
OneShotLatch deleteTriggerLatch,
OneShotLatch mkdirsTriggerLatch,
OneShotLatch deleteAwaitLatch,
- OneShotLatch mkdirsAwaitLatch) {
+ OneShotLatch mkdirsAwaitLatch,
+ OneShotLatch createAwaitLatch,
+ OneShotLatch createTriggerLatch) {
this.deleteTriggerLatch = deleteTriggerLatch;
this.mkdirsTriggerLatch = mkdirsTriggerLatch;
this.deleteAwaitLatch = deleteAwaitLatch;
this.mkdirsAwaitLatch = mkdirsAwaitLatch;
+ this.createAwaitLatch = createAwaitLatch;
+ this.createTriggerLatch = createTriggerLatch;
+ }
+
+ @Override
+ @SneakyThrows
+ public FSDataOutputStream create(final Path filePath, final WriteMode
overwrite)
+ throws IOException {
+ checkNotNull(filePath, "filePath");
+
+ if (exists(filePath) && overwrite == WriteMode.NO_OVERWRITE) {
+ throw new FileAlreadyExistsException("File already exists: " +
filePath);
+ }
+
+ final Path parent = filePath.getParent();
+ if (parent != null && !mkdirs(parent)) {
+ throw new IOException("Mkdirs failed to create " + parent);
+ }
+
+ final File file = pathToFile(filePath);
+ createAwaitLatch.trigger();
+ createTriggerLatch.await();
+ return new LocalDataOutputStream(file);
}
@Override
diff --git
a/flink-core/src/test/java/org/apache/flink/management/jmx/JMXServerTest.java
b/flink-core/src/test/java/org/apache/flink/management/jmx/JMXServerTest.java
index 5778178ba19..445838c3ab9 100644
---
a/flink-core/src/test/java/org/apache/flink/management/jmx/JMXServerTest.java
+++
b/flink-core/src/test/java/org/apache/flink/management/jmx/JMXServerTest.java
@@ -18,9 +18,9 @@
package org.apache.flink.management.jmx;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
import javax.management.InstanceNotFoundException;
import javax.management.MBeanServer;
@@ -33,32 +33,32 @@ import javax.management.remote.JMXServiceURL;
import java.lang.management.ManagementFactory;
import java.util.Optional;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** Test for {@link JMXServer} functionality. */
-public class JMXServerTest {
+class JMXServerTest {
- @Before
- public void setUp() throws Exception {
+ @BeforeEach
+ void setUp() {
JMXService.startInstance("23456-23466");
}
- @After
- public void tearDown() throws Exception {
+ @AfterEach
+ void tearDown() throws Exception {
JMXService.stopInstance();
}
/** Verifies initialize, registered mBean and retrieval via attribute. */
@Test
- public void testJMXServiceRegisterMBean() throws Exception {
+ void testJMXServiceRegisterMBean() throws Exception {
TestObject testObject = new TestObject();
ObjectName testObjectName = new
ObjectName("org.apache.flink.management", "key", "value");
MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
try {
Optional<JMXServer> server = JMXService.getInstance();
- assertTrue(server.isPresent());
+ assertThat(server).isPresent();
mBeanServer.registerMBean(testObject, testObjectName);
JMXServiceURL url =
@@ -71,14 +71,10 @@ public class JMXServerTest {
JMXConnector jmxConn = JMXConnectorFactory.connect(url);
MBeanServerConnection mbeanConnConn =
jmxConn.getMBeanServerConnection();
- assertEquals(1, mbeanConnConn.getAttribute(testObjectName, "Foo"));
+ assertThat((int) mbeanConnConn.getAttribute(testObjectName,
"Foo")).isOne();
mBeanServer.unregisterMBean(testObjectName);
- try {
- mbeanConnConn.getAttribute(testObjectName, "Foo");
- } catch (Exception e) {
- // expected for unregistered objects.
- assertTrue(e instanceof InstanceNotFoundException);
- }
+ assertThatThrownBy(() ->
mbeanConnConn.getAttribute(testObjectName, "Foo"))
+ .isInstanceOf(InstanceNotFoundException.class);
} finally {
JMXService.stopInstance();
}
@@ -91,7 +87,7 @@ public class JMXServerTest {
/** Test MBean Object. */
public static class TestObject implements TestObjectMBean {
- private int foo = 1;
+ private final int foo = 1;
@Override
public int getFoo() {
diff --git
a/flink-core/src/test/java/org/apache/flink/management/jmx/JMXServiceTest.java
b/flink-core/src/test/java/org/apache/flink/management/jmx/JMXServiceTest.java
index 04bc85e1f04..9a5fd50b582 100644
---
a/flink-core/src/test/java/org/apache/flink/management/jmx/JMXServiceTest.java
+++
b/flink-core/src/test/java/org/apache/flink/management/jmx/JMXServiceTest.java
@@ -18,22 +18,21 @@
package org.apache.flink.management.jmx;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
import java.net.ServerSocket;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
+import static org.assertj.core.api.Assertions.assertThat;
/** Tests for the singleton usage via {@link JMXService}. */
-public class JMXServiceTest {
+class JMXServiceTest {
/** Verifies initialize with port range. */
@Test
- public void testJMXServiceInit() throws Exception {
+ void testJMXServiceInit() throws Exception {
try {
JMXService.startInstance("23456-23466");
- assertTrue(JMXService.getPort().isPresent());
+ assertThat(JMXService.getPort()).isPresent();
} finally {
JMXService.stopInstance();
}
@@ -41,10 +40,10 @@ public class JMXServiceTest {
/** Verifies initialize failure with occupied port. */
@Test
- public void testJMXServiceInitWithOccupiedPort() throws Exception {
+ void testJMXServiceInitWithOccupiedPort() throws Exception {
try (ServerSocket socket = new ServerSocket(0)) {
JMXService.startInstance(String.valueOf(socket.getLocalPort()));
- assertFalse(JMXService.getInstance().isPresent());
+ assertThat(JMXService.getInstance()).isNotPresent();
}
}
}
diff --git
a/flink-core/src/test/java/org/apache/flink/testutils/ClassLoaderUtils.java
b/flink-core/src/test/java/org/apache/flink/testutils/ClassLoaderUtils.java
index c4ca8a2b529..78e9e7418a1 100644
--- a/flink-core/src/test/java/org/apache/flink/testutils/ClassLoaderUtils.java
+++ b/flink-core/src/test/java/org/apache/flink/testutils/ClassLoaderUtils.java
@@ -260,8 +260,7 @@ public class ClassLoaderUtils {
public static ObjectAndClassLoader<Serializable>
createSerializableObjectFromNewClassLoader() {
final String classSource =
- ""
- + "import java.io.Serializable;"
+ "import java.io.Serializable;"
+ "import java.util.Random;"
+ "public class TestSerializable implements
Serializable {"
+ " private static final long serialVersionUID = -3L;"
diff --git
a/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/SerializationTestType.java
b/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/SerializationTestType.java
index bb359e7e2cf..5425b1e178b 100644
---
a/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/SerializationTestType.java
+++
b/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/SerializationTestType.java
@@ -24,7 +24,7 @@ import java.util.Random;
public interface SerializationTestType extends IOReadableWritable {
- public SerializationTestType getRandom(Random rnd);
+ SerializationTestType getRandom(Random rnd);
- public int length();
+ int length();
}
diff --git
a/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/Util.java
b/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/Util.java
index 5d428b3cabf..07db45de724 100644
---
a/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/Util.java
+++
b/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/Util.java
@@ -27,7 +27,7 @@ public final class Util {
private static final long SEED = 64871654635745873L;
- private static Random random = new Random(SEED);
+ private static final Random random = new Random(SEED);
public static SerializationTestType
randomRecord(SerializationTestTypeFactory type) {
return type.factory().getRandom(Util.random);