mateczagany commented on code in PR #27669:
URL: https://github.com/apache/flink/pull/27669#discussion_r2883753754
##########
flink-tests/src/test/java/org/apache/flink/test/misc/CustomSerializationITCase.java:
##########
@@ -29,160 +28,140 @@
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
-import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.test.junit5.MiniClusterExtension;
import org.apache.flink.types.Value;
-import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.TestLoggerExtension;
-import org.junit.ClassRule;
-import org.junit.Test;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.extension.RegisterExtension;
import java.io.IOException;
-import static org.apache.flink.util.ExceptionUtils.findThrowable;
-import static org.junit.Assert.assertTrue;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
/**
* Test for proper error messages in case user-defined serialization is broken
and detected in the
* network stack.
*/
-@SuppressWarnings("serial")
-public class CustomSerializationITCase extends TestLogger {
+@ExtendWith(TestLoggerExtension.class)
+class CustomSerializationITCase {
- private static final int PARLLELISM = 5;
+ private static final int PARALLELISM = 5;
- @ClassRule
- public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE =
- new MiniClusterWithClientResource(
+ @RegisterExtension
+ private static final MiniClusterExtension MINI_CLUSTER_RESOURCE =
+ new MiniClusterExtension(
new MiniClusterResourceConfiguration.Builder()
.setConfiguration(getConfiguration())
.setNumberTaskManagers(1)
- .setNumberSlotsPerTaskManager(PARLLELISM)
+ .setNumberSlotsPerTaskManager(PARALLELISM)
.build());
- public static Configuration getConfiguration() {
+ private static Configuration getConfiguration() {
Configuration config = new Configuration();
config.set(TaskManagerOptions.MANAGED_MEMORY_SIZE,
MemorySize.parse("30m"));
return config;
}
@Test
- public void testIncorrectSerializer1() throws Exception {
- try {
- StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(PARLLELISM);
-
- env.fromSequence(1, 10 * PARLLELISM)
- .map(
- new MapFunction<Long, ConsumesTooMuch>() {
- @Override
- public ConsumesTooMuch map(Long value) throws
Exception {
- return new ConsumesTooMuch();
- }
- })
- .rebalance()
- .sinkTo(new DiscardingSink<>());
-
- env.execute();
- } catch (JobExecutionException e) {
- assertTrue(
- findThrowable(
- e,
- candidate ->
- candidate
- .getMessage()
- .contains("broken
serialization."))
- .isPresent());
- }
+ @Disabled("TODO: This needs to be investigated why no exception is
thrown.")
Review Comment:
During rewriting this test, I found that these test cases don't even check
if the exception is thrown. Only the 2nd test passes, so I've disabled the
others.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]