Jiabao-Sun commented on code in PR #23960:
URL: https://github.com/apache/flink/pull/23960#discussion_r1475422173
##########
flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java:
##########
@@ -603,55 +587,52 @@ public void
testGetStatisticsMultipleOneFileWithCachedVersion() throws IOExcepti
* split has to start from the beginning.
*/
@Test
- public void testFileInputFormatWithCompression() {
- try {
- String tempFile =
- TestFileUtils.createTempFileDirForProvidedFormats(
- temporaryFolder.newFolder(),
- FileInputFormat.getSupportedCompressionFormats());
- final DummyFileInputFormat format = new DummyFileInputFormat();
- format.setFilePath(tempFile);
- format.configure(new Configuration());
- FileInputSplit[] splits = format.createInputSplits(2);
- final Set<String> supportedCompressionFormats =
- FileInputFormat.getSupportedCompressionFormats();
- Assert.assertEquals(supportedCompressionFormats.size(),
splits.length);
- for (FileInputSplit split : splits) {
- Assert.assertEquals(
- FileInputFormat.READ_WHOLE_SPLIT_FLAG,
- split.getLength()); // unsplittable compressed files
have this size as a
- // flag for "read whole file"
- Assert.assertEquals(0L, split.getStart()); // always read from
the beginning.
- }
+ void testFileInputFormatWithCompression() throws IOException {
- // test if this also works for "mixed" directories
- TestFileUtils.createTempFileInDirectory(
- tempFile.replace("file:", ""),
- "this creates a test file with a random extension (at
least not .deflate)");
-
- final DummyFileInputFormat formatMixed = new
DummyFileInputFormat();
- formatMixed.setFilePath(tempFile);
- formatMixed.configure(new Configuration());
- FileInputSplit[] splitsMixed = formatMixed.createInputSplits(2);
- Assert.assertEquals(supportedCompressionFormats.size() + 1,
splitsMixed.length);
- for (FileInputSplit split : splitsMixed) {
- final String extension =
-
FileInputFormat.extractFileExtension(split.getPath().getName());
- if (supportedCompressionFormats.contains(extension)) {
- Assert.assertEquals(
- FileInputFormat.READ_WHOLE_SPLIT_FLAG,
- split.getLength()); // unsplittable compressed
files have this size as a
- // flag for "read whole file"
- Assert.assertEquals(0L, split.getStart()); // always read
from the beginning.
- } else {
- Assert.assertEquals(0L, split.getStart());
- Assert.assertTrue("split size not correct",
split.getLength() > 0);
- }
- }
+ String tempFile =
+ TestFileUtils.createTempFileDirForProvidedFormats(
+ TempDirUtils.newFolder(temporaryFolder),
+ FileInputFormat.getSupportedCompressionFormats());
+ final DummyFileInputFormat format = new DummyFileInputFormat();
+ format.setFilePath(tempFile);
+ format.configure(new Configuration());
+ FileInputSplit[] splits = format.createInputSplits(2);
+ final Set<String> supportedCompressionFormats =
+ FileInputFormat.getSupportedCompressionFormats();
+ assertThat(splits).hasSize(supportedCompressionFormats.size());
+ for (FileInputSplit split : splits) {
+ assertThat(split.getLength())
+ .isEqualTo(
+ FileInputFormat.READ_WHOLE_SPLIT_FLAG); //
unsplittable compressed files
+ // have this size as a
+ // flag for "read whole file"
+ assertThat(split.getStart()).isEqualTo(0L); // always read from
the beginning.
Review Comment:
```suggestion
assertThat(split.getStart()).isZero(); // always read from the
beginning.
```
##########
flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java:
##########
@@ -662,35 +643,32 @@ public void testFileInputFormatWithCompression() {
* split is not the compressed file size and that the compression
decorator is called.
*/
@Test
- public void testFileInputFormatWithCompressionFromFileSource() {
- try {
- String tempFile =
- TestFileUtils.createTempFileDirForProvidedFormats(
- temporaryFolder.newFolder(),
- FileInputFormat.getSupportedCompressionFormats());
- DummyFileInputFormat format = new DummyFileInputFormat();
- format.setFilePath(tempFile);
- format.configure(new Configuration());
-
- // manually create a FileInputSplit per file as FileSource would do
- // see
org.apache.flink.connector.file.table.DeserializationSchemaAdapter.Reader()
- List<FileInputSplit> splits = manuallyCreateSplits(tempFile);
- final Set<String> supportedCompressionFormats =
- FileInputFormat.getSupportedCompressionFormats();
- // one file per compression format, one split per file
- Assert.assertEquals(supportedCompressionFormats.size(),
splits.size());
- for (FileInputSplit split : splits) {
- Assert.assertEquals(0L, split.getStart()); // always read from
the beginning.
- format.open(split);
- Assert.assertTrue(format.compressedRead);
- Assert.assertEquals(
- FileInputFormat.READ_WHOLE_SPLIT_FLAG,
- format.getSplitLength()); // unsplittable compressed
files have this size
- // as flag for "read whole file"
- }
- } catch (Exception ex) {
- ex.printStackTrace();
- Assert.fail(ex.getMessage());
+ void testFileInputFormatWithCompressionFromFileSource() throws IOException
{
+
+ String tempFile =
+ TestFileUtils.createTempFileDirForProvidedFormats(
+ TempDirUtils.newFolder(temporaryFolder),
+ FileInputFormat.getSupportedCompressionFormats());
+ DummyFileInputFormat format = new DummyFileInputFormat();
+ format.setFilePath(tempFile);
+ format.configure(new Configuration());
+
+ // manually create a FileInputSplit per file as FileSource would do
+ // see
org.apache.flink.connector.file.table.DeserializationSchemaAdapter.Reader()
+ List<FileInputSplit> splits = manuallyCreateSplits(tempFile);
+ final Set<String> supportedCompressionFormats =
+ FileInputFormat.getSupportedCompressionFormats();
+ // one file per compression format, one split per file
+ assertThat(splits).hasSize(supportedCompressionFormats.size());
+ for (FileInputSplit split : splits) {
+ assertThat(split.getStart()).isEqualTo(0L); // always read from
the beginning.
Review Comment:
```suggestion
assertThat(split.getStart()).isZero(); // always read from the
beginning.
```
##########
flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java:
##########
@@ -662,35 +643,32 @@ public void testFileInputFormatWithCompression() {
* split is not the compressed file size and that the compression
decorator is called.
*/
@Test
- public void testFileInputFormatWithCompressionFromFileSource() {
- try {
- String tempFile =
- TestFileUtils.createTempFileDirForProvidedFormats(
- temporaryFolder.newFolder(),
- FileInputFormat.getSupportedCompressionFormats());
- DummyFileInputFormat format = new DummyFileInputFormat();
- format.setFilePath(tempFile);
- format.configure(new Configuration());
-
- // manually create a FileInputSplit per file as FileSource would do
- // see
org.apache.flink.connector.file.table.DeserializationSchemaAdapter.Reader()
- List<FileInputSplit> splits = manuallyCreateSplits(tempFile);
- final Set<String> supportedCompressionFormats =
- FileInputFormat.getSupportedCompressionFormats();
- // one file per compression format, one split per file
- Assert.assertEquals(supportedCompressionFormats.size(),
splits.size());
- for (FileInputSplit split : splits) {
- Assert.assertEquals(0L, split.getStart()); // always read from
the beginning.
- format.open(split);
- Assert.assertTrue(format.compressedRead);
- Assert.assertEquals(
- FileInputFormat.READ_WHOLE_SPLIT_FLAG,
- format.getSplitLength()); // unsplittable compressed
files have this size
- // as flag for "read whole file"
- }
- } catch (Exception ex) {
- ex.printStackTrace();
- Assert.fail(ex.getMessage());
+ void testFileInputFormatWithCompressionFromFileSource() throws IOException
{
+
+ String tempFile =
+ TestFileUtils.createTempFileDirForProvidedFormats(
+ TempDirUtils.newFolder(temporaryFolder),
+ FileInputFormat.getSupportedCompressionFormats());
+ DummyFileInputFormat format = new DummyFileInputFormat();
+ format.setFilePath(tempFile);
+ format.configure(new Configuration());
+
+ // manually create a FileInputSplit per file as FileSource would do
+ // see
org.apache.flink.connector.file.table.DeserializationSchemaAdapter.Reader()
+ List<FileInputSplit> splits = manuallyCreateSplits(tempFile);
+ final Set<String> supportedCompressionFormats =
+ FileInputFormat.getSupportedCompressionFormats();
+ // one file per compression format, one split per file
+ assertThat(splits).hasSize(supportedCompressionFormats.size());
Review Comment:
```suggestion
assertThat(splits).hasSameSizeAs(supportedCompressionFormats);
```
##########
flink-core/src/test/java/org/apache/flink/api/common/io/RichOutputFormatTest.java:
##########
@@ -21,38 +21,40 @@
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.TaskInfo;
import org.apache.flink.api.common.TaskInfoImpl;
-import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.functions.util.RuntimeUDFContext;
-import org.apache.flink.core.fs.Path;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.types.Value;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
import java.util.HashMap;
-import java.util.concurrent.Future;
-import static org.junit.Assert.assertEquals;
+import static org.assertj.core.api.Assertions.assertThat;
/** Tests runtime context access from inside an RichOutputFormat class. */
-public class RichOutputFormatTest {
+class RichOutputFormatTest {
@Test
- public void testCheckRuntimeContextAccess() {
- final SerializedOutputFormat<Value> inputFormat = new
SerializedOutputFormat<Value>();
+ void testCheckRuntimeContextAccess() {
+ final SerializedOutputFormat<Value> inputFormat = new
SerializedOutputFormat<>();
final TaskInfo taskInfo = new TaskInfoImpl("test name", 3, 1, 3, 0);
inputFormat.setRuntimeContext(
new RuntimeUDFContext(
taskInfo,
getClass().getClassLoader(),
new ExecutionConfig(),
- new HashMap<String, Future<Path>>(),
- new HashMap<String, Accumulator<?, ?>>(),
+ new HashMap<>(),
+ new HashMap<>(),
UnregisteredMetricsGroup.createOperatorMetricGroup()));
-
assertEquals(inputFormat.getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(),
1);
- assertEquals(
-
inputFormat.getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks(), 3);
+ assertThat(1)
+
.isEqualTo(inputFormat.getRuntimeContext().getTaskInfo().getIndexOfThisSubtask());
+ assertThat(3)
+ .isEqualTo(
+ inputFormat
+ .getRuntimeContext()
+ .getTaskInfo()
+ .getNumberOfParallelSubtasks());
Review Comment:
1 and 3 should be expected value, right?
##########
flink-core/src/test/java/org/apache/flink/api/common/operators/DualInputSemanticPropertiesTest.java:
##########
@@ -179,12 +178,12 @@ public void testGetSourceField() {
sp.addForwardedField(1, 2, 3);
sp.addForwardedField(1, 3, 2);
- assertEquals(0, sp.getForwardingSourceField(1, 1));
- assertEquals(1, sp.getForwardingSourceField(1, 4));
- assertEquals(2, sp.getForwardingSourceField(1, 3));
- assertEquals(3, sp.getForwardingSourceField(1, 2));
- assertTrue(sp.getForwardingSourceField(1, 0) < 0);
- assertTrue(sp.getForwardingSourceField(1, 5) < 0);
+ assertThat(sp.getForwardingSourceField(1, 1)).isZero();
+ assertThat(sp.getForwardingSourceField(1, 4)).isEqualTo(1);
Review Comment:
isOne
##########
flink-core/src/test/java/org/apache/flink/api/common/operators/SlotSharingGroupTest.java:
##########
@@ -46,39 +44,48 @@ public void testBuildSlotSharingGroupWithSpecificResource()
{
.setExternalResource("gpu", 1)
.build();
- assertThat(slotSharingGroup.getName(), is(name));
- assertThat(slotSharingGroup.getCpuCores().get(), is(1.0));
- assertThat(slotSharingGroup.getTaskHeapMemory().get(), is(heap));
- assertThat(slotSharingGroup.getTaskOffHeapMemory().get(), is(offHeap));
- assertThat(slotSharingGroup.getManagedMemory().get(), is(managed));
- assertThat(
- slotSharingGroup.getExternalResources(),
is(Collections.singletonMap("gpu", 1.0)));
+ assertThat(slotSharingGroup.getName()).isEqualTo(name);
+ assertThat(slotSharingGroup.getCpuCores()).contains(1.0);
+ assertThat(slotSharingGroup.getTaskHeapMemory()).contains(heap);
+ assertThat(slotSharingGroup.getTaskOffHeapMemory()).contains(offHeap);
+ assertThat(slotSharingGroup.getManagedMemory()).contains(managed);
+ assertThat(slotSharingGroup.getExternalResources())
+ .isEqualTo(Collections.singletonMap("gpu", 1.0));
}
@Test
- public void testBuildSlotSharingGroupWithUnknownResource() {
+ void testBuildSlotSharingGroupWithUnknownResource() {
final String name = "ssg";
final SlotSharingGroup slotSharingGroup =
SlotSharingGroup.newBuilder(name).build();
- assertThat(slotSharingGroup.getName(), is(name));
- assertFalse(slotSharingGroup.getCpuCores().isPresent());
- assertFalse(slotSharingGroup.getTaskHeapMemory().isPresent());
- assertFalse(slotSharingGroup.getManagedMemory().isPresent());
- assertFalse(slotSharingGroup.getTaskOffHeapMemory().isPresent());
- assertTrue(slotSharingGroup.getExternalResources().isEmpty());
+ assertThat(slotSharingGroup.getName()).isEqualTo(name);
+ assertThat(slotSharingGroup.getCpuCores().isPresent()).isFalse();
+ assertThat(slotSharingGroup.getTaskHeapMemory().isPresent()).isFalse();
+ assertThat(slotSharingGroup.getManagedMemory().isPresent()).isFalse();
+
assertThat(slotSharingGroup.getTaskOffHeapMemory().isPresent()).isFalse();
+ assertThat(slotSharingGroup.getExternalResources().isEmpty()).isTrue();
Review Comment:
```suggestion
assertThat(slotSharingGroup.getCpuCores()).isNotPresent();
assertThat(slotSharingGroup.getTaskHeapMemory()).isNotPresent();
assertThat(slotSharingGroup.getManagedMemory()).isNotPresent();
assertThat(slotSharingGroup.getTaskOffHeapMemory()).isNotPresent();
assertThat(slotSharingGroup.getExternalResources()).isEmpty();
```
##########
flink-core/src/test/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBaseTest.java:
##########
@@ -249,11 +252,11 @@ private void testOuterJoin(
baseOperator.executeOnCollections(
leftInput, rightInput, runtimeContext,
executionConfig);
- assertEquals(expected, resultSafe);
- assertEquals(expected, resultRegular);
+ assertThat(resultSafe).isEqualTo(expected);
+ assertThat(resultRegular).isEqualTo(expected);
- assertTrue(joiner.opened.get());
- assertTrue(joiner.closed.get());
+ assertThat(joiner.opened.get()).isTrue();
+ assertThat(joiner.closed.get()).isTrue();
Review Comment:
```suggestion
assertThat(joiner.opened).isTrue();
assertThat(joiner.closed).isTrue();
```
##########
flink-core/src/test/java/org/apache/flink/api/common/io/DefaultFilterTest.java:
##########
@@ -19,50 +19,47 @@
import org.apache.flink.core.fs.Path;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
+import lombok.Getter;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
import java.util.Arrays;
-import java.util.Collection;
+import java.util.List;
-import static org.junit.Assert.assertEquals;
+import static org.assertj.core.api.Assertions.assertThat;
-@RunWith(Parameterized.class)
Review Comment:
```java
@ExtendWith(ParameterizedTestExtension.class)
class DefaultFilterTest {
@Parameters
private static Collection<Object[]> data() {
return Arrays.asList(
new Object[][] {
{"file.txt", false},
{".file.txt", true},
{"dir/.file.txt", true},
{".dir/file.txt", false},
{"_file.txt", true},
{"dir/_file.txt", true},
{"_dir/file.txt", false},
// Check filtering Hadoop's unfinished files
{FilePathFilter.HADOOP_COPYING, true},
{"dir/" + FilePathFilter.HADOOP_COPYING, true},
{FilePathFilter.HADOOP_COPYING + "/file.txt", false},
});
}
private final boolean shouldFilter;
private final String filePath;
DefaultFilterTest(String filePath, boolean shouldFilter) {
this.filePath = filePath;
this.shouldFilter = shouldFilter;
}
@TestTemplate
void test() {
FilePathFilter defaultFilter = FilePathFilter.createDefaultFilter();
Path path = new Path(filePath);
assertThat(defaultFilter.filterPath(path)).as("File: " +
filePath).isEqualTo(shouldFilter);
}
}
```
##########
flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java:
##########
@@ -603,55 +587,52 @@ public void
testGetStatisticsMultipleOneFileWithCachedVersion() throws IOExcepti
* split has to start from the beginning.
*/
@Test
- public void testFileInputFormatWithCompression() {
- try {
- String tempFile =
- TestFileUtils.createTempFileDirForProvidedFormats(
- temporaryFolder.newFolder(),
- FileInputFormat.getSupportedCompressionFormats());
- final DummyFileInputFormat format = new DummyFileInputFormat();
- format.setFilePath(tempFile);
- format.configure(new Configuration());
- FileInputSplit[] splits = format.createInputSplits(2);
- final Set<String> supportedCompressionFormats =
- FileInputFormat.getSupportedCompressionFormats();
- Assert.assertEquals(supportedCompressionFormats.size(),
splits.length);
- for (FileInputSplit split : splits) {
- Assert.assertEquals(
- FileInputFormat.READ_WHOLE_SPLIT_FLAG,
- split.getLength()); // unsplittable compressed files
have this size as a
- // flag for "read whole file"
- Assert.assertEquals(0L, split.getStart()); // always read from
the beginning.
- }
+ void testFileInputFormatWithCompression() throws IOException {
- // test if this also works for "mixed" directories
- TestFileUtils.createTempFileInDirectory(
- tempFile.replace("file:", ""),
- "this creates a test file with a random extension (at
least not .deflate)");
-
- final DummyFileInputFormat formatMixed = new
DummyFileInputFormat();
- formatMixed.setFilePath(tempFile);
- formatMixed.configure(new Configuration());
- FileInputSplit[] splitsMixed = formatMixed.createInputSplits(2);
- Assert.assertEquals(supportedCompressionFormats.size() + 1,
splitsMixed.length);
- for (FileInputSplit split : splitsMixed) {
- final String extension =
-
FileInputFormat.extractFileExtension(split.getPath().getName());
- if (supportedCompressionFormats.contains(extension)) {
- Assert.assertEquals(
- FileInputFormat.READ_WHOLE_SPLIT_FLAG,
- split.getLength()); // unsplittable compressed
files have this size as a
- // flag for "read whole file"
- Assert.assertEquals(0L, split.getStart()); // always read
from the beginning.
- } else {
- Assert.assertEquals(0L, split.getStart());
- Assert.assertTrue("split size not correct",
split.getLength() > 0);
- }
- }
+ String tempFile =
+ TestFileUtils.createTempFileDirForProvidedFormats(
+ TempDirUtils.newFolder(temporaryFolder),
+ FileInputFormat.getSupportedCompressionFormats());
+ final DummyFileInputFormat format = new DummyFileInputFormat();
+ format.setFilePath(tempFile);
+ format.configure(new Configuration());
+ FileInputSplit[] splits = format.createInputSplits(2);
+ final Set<String> supportedCompressionFormats =
+ FileInputFormat.getSupportedCompressionFormats();
+ assertThat(splits).hasSize(supportedCompressionFormats.size());
+ for (FileInputSplit split : splits) {
+ assertThat(split.getLength())
+ .isEqualTo(
+ FileInputFormat.READ_WHOLE_SPLIT_FLAG); //
unsplittable compressed files
+ // have this size as a
+ // flag for "read whole file"
+ assertThat(split.getStart()).isEqualTo(0L); // always read from
the beginning.
+ }
- } catch (Exception ex) {
- ex.printStackTrace();
- Assert.fail(ex.getMessage());
+ // test if this also works for "mixed" directories
+ TestFileUtils.createTempFileInDirectory(
+ tempFile.replace("file:", ""),
+ "this creates a test file with a random extension (at least
not .deflate)");
+
+ final DummyFileInputFormat formatMixed = new DummyFileInputFormat();
+ formatMixed.setFilePath(tempFile);
+ formatMixed.configure(new Configuration());
+ FileInputSplit[] splitsMixed = formatMixed.createInputSplits(2);
+ assertThat(splitsMixed).hasSize(supportedCompressionFormats.size() +
1);
+ for (FileInputSplit split : splitsMixed) {
+ final String extension =
+
FileInputFormat.extractFileExtension(split.getPath().getName());
+ if (supportedCompressionFormats.contains(extension)) {
+ assertThat(split.getLength())
+ .isEqualTo(
+ FileInputFormat.READ_WHOLE_SPLIT_FLAG); //
unsplittable compressed
+ // files have this size as a
+ // flag for "read whole file"
+ assertThat(split.getStart()).isEqualTo(0L); // always read
from the beginning.
+ } else {
+ assertThat(split.getStart()).isEqualTo(0L);
+ assertThat(split.getLength() > 0).as("split size not
correct").isTrue();
Review Comment:
```suggestion
assertThat(split.getStart()).isZero(); // always read from
the beginning.
} else {
assertThat(split.getStart()).isZero();
assertThat(split.getLength()).as("split size not
correct").isPositive();
```
##########
flink-core/src/test/java/org/apache/flink/api/common/functions/util/RuntimeUDFContextTest.java:
##########
@@ -24,199 +24,160 @@
import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
+import static org.assertj.core.api.Fail.fail;
/** Test for the {@link RuntimeUDFContext}. */
-public class RuntimeUDFContextTest {
+class RuntimeUDFContextTest {
private final TaskInfo taskInfo = new TaskInfoImpl("test name", 3, 1, 3,
0);
@Test
- public void testBroadcastVariableNotFound() {
- try {
- RuntimeUDFContext ctx =
- new RuntimeUDFContext(
- taskInfo,
- getClass().getClassLoader(),
- new ExecutionConfig(),
- new HashMap<>(),
- new HashMap<>(),
-
UnregisteredMetricsGroup.createOperatorMetricGroup());
-
- assertFalse(ctx.hasBroadcastVariable("some name"));
-
- try {
- ctx.getBroadcastVariable("some name");
- fail("should throw an exception");
- } catch (IllegalArgumentException e) {
- // expected
- }
-
- try {
- ctx.getBroadcastVariableWithInitializer(
- "some name",
- new BroadcastVariableInitializer<Object, Object>() {
- public Object
initializeBroadcastVariable(Iterable<Object> data) {
- return null;
- }
- });
-
- fail("should throw an exception");
- } catch (IllegalArgumentException e) {
- // expected
- }
- } catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
+ void testBroadcastVariableNotFound() {
+ RuntimeUDFContext ctx =
+ new RuntimeUDFContext(
+ taskInfo,
+ getClass().getClassLoader(),
+ new ExecutionConfig(),
+ new HashMap<>(),
+ new HashMap<>(),
+ UnregisteredMetricsGroup.createOperatorMetricGroup());
+
+ assertThat(ctx.hasBroadcastVariable("some name")).isFalse();
+
+ assertThatThrownBy(() -> ctx.getBroadcastVariable("some name"))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("some name");
+
+ assertThatThrownBy(() -> ctx.getBroadcastVariableWithInitializer("some
name", data -> null))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("some name");
}
@Test
- public void testBroadcastVariableSimple() {
- try {
- RuntimeUDFContext ctx =
- new RuntimeUDFContext(
- taskInfo,
- getClass().getClassLoader(),
- new ExecutionConfig(),
- new HashMap<>(),
- new HashMap<>(),
-
UnregisteredMetricsGroup.createOperatorMetricGroup());
-
- ctx.setBroadcastVariable("name1", Arrays.asList(1, 2, 3, 4));
- ctx.setBroadcastVariable("name2", Arrays.asList(1.0, 2.0, 3.0,
4.0));
-
- assertTrue(ctx.hasBroadcastVariable("name1"));
- assertTrue(ctx.hasBroadcastVariable("name2"));
-
- List<Integer> list1 = ctx.getBroadcastVariable("name1");
- List<Double> list2 = ctx.getBroadcastVariable("name2");
-
- assertEquals(Arrays.asList(1, 2, 3, 4), list1);
- assertEquals(Arrays.asList(1.0, 2.0, 3.0, 4.0), list2);
-
- // access again
- List<Integer> list3 = ctx.getBroadcastVariable("name1");
- List<Double> list4 = ctx.getBroadcastVariable("name2");
-
- assertEquals(Arrays.asList(1, 2, 3, 4), list3);
- assertEquals(Arrays.asList(1.0, 2.0, 3.0, 4.0), list4);
-
- // and again ;-)
- List<Integer> list5 = ctx.getBroadcastVariable("name1");
- List<Double> list6 = ctx.getBroadcastVariable("name2");
-
- assertEquals(Arrays.asList(1, 2, 3, 4), list5);
- assertEquals(Arrays.asList(1.0, 2.0, 3.0, 4.0), list6);
- } catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
+ void testBroadcastVariableSimple() {
+ RuntimeUDFContext ctx =
+ new RuntimeUDFContext(
+ taskInfo,
+ getClass().getClassLoader(),
+ new ExecutionConfig(),
+ new HashMap<>(),
+ new HashMap<>(),
+ UnregisteredMetricsGroup.createOperatorMetricGroup());
+
+ ctx.setBroadcastVariable("name1", Arrays.asList(1, 2, 3, 4));
+ ctx.setBroadcastVariable("name2", Arrays.asList(1.0, 2.0, 3.0, 4.0));
+
+ assertThat(ctx.hasBroadcastVariable("name1")).isTrue();
+ assertThat(ctx.hasBroadcastVariable("name2")).isTrue();
+
+ List<Integer> list1 = ctx.getBroadcastVariable("name1");
+ List<Double> list2 = ctx.getBroadcastVariable("name2");
+
+ assertThat(list1).isEqualTo(Arrays.asList(1, 2, 3, 4));
+ assertThat(list2).isEqualTo(Arrays.asList(1.0, 2.0, 3.0, 4.0));
+
+ // access again
+ List<Integer> list3 = ctx.getBroadcastVariable("name1");
+ List<Double> list4 = ctx.getBroadcastVariable("name2");
+
+ assertThat(list3).isEqualTo(Arrays.asList(1, 2, 3, 4));
+ assertThat(list4).isEqualTo(Arrays.asList(1.0, 2.0, 3.0, 4.0));
+
+ // and again ;-)
+ List<Integer> list5 = ctx.getBroadcastVariable("name1");
+ List<Double> list6 = ctx.getBroadcastVariable("name2");
+
+ assertThat(list5).isEqualTo(Arrays.asList(1, 2, 3, 4));
+ assertThat(list6).isEqualTo(Arrays.asList(1.0, 2.0, 3.0, 4.0));
}
@Test
- public void testBroadcastVariableWithInitializer() {
- try {
- RuntimeUDFContext ctx =
- new RuntimeUDFContext(
- taskInfo,
- getClass().getClassLoader(),
- new ExecutionConfig(),
- new HashMap<>(),
- new HashMap<>(),
-
UnregisteredMetricsGroup.createOperatorMetricGroup());
-
- ctx.setBroadcastVariable("name", Arrays.asList(1, 2, 3, 4));
-
- // access it the first time with an initializer
- List<Double> list =
- ctx.getBroadcastVariableWithInitializer("name", new
ConvertingInitializer());
- assertEquals(Arrays.asList(1.0, 2.0, 3.0, 4.0), list);
-
- // access it the second time with an initializer (which might not
get executed)
- List<Double> list2 =
- ctx.getBroadcastVariableWithInitializer("name", new
ConvertingInitializer());
- assertEquals(Arrays.asList(1.0, 2.0, 3.0, 4.0), list2);
-
- // access it the third time without an initializer (should work by
"chance", because the
- // result is a list)
- List<Double> list3 = ctx.getBroadcastVariable("name");
- assertEquals(Arrays.asList(1.0, 2.0, 3.0, 4.0), list3);
- } catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
+ void testBroadcastVariableWithInitializer() {
+ RuntimeUDFContext ctx =
+ new RuntimeUDFContext(
+ taskInfo,
+ getClass().getClassLoader(),
+ new ExecutionConfig(),
+ new HashMap<>(),
+ new HashMap<>(),
+ UnregisteredMetricsGroup.createOperatorMetricGroup());
+
+ ctx.setBroadcastVariable("name", Arrays.asList(1, 2, 3, 4));
+
+ // access it the first time with an initializer
+ List<Double> list =
+ ctx.getBroadcastVariableWithInitializer("name", new
ConvertingInitializer());
+ assertThat(list).isEqualTo(Arrays.asList(1.0, 2.0, 3.0, 4.0));
+
+ // access it the second time with an initializer (which might not get
executed)
+ List<Double> list2 =
+ ctx.getBroadcastVariableWithInitializer("name", new
ConvertingInitializer());
+ assertThat(list2).isEqualTo(Arrays.asList(1.0, 2.0, 3.0, 4.0));
+
+ // access it the third time without an initializer (should work by
"chance", because the
+ // result is a list)
+ List<Double> list3 = ctx.getBroadcastVariable("name");
+ assertThat(list3).isEqualTo(Arrays.asList(1.0, 2.0, 3.0, 4.0));
}
@Test
- public void testResetBroadcastVariableWithInitializer() {
- try {
- RuntimeUDFContext ctx =
- new RuntimeUDFContext(
- taskInfo,
- getClass().getClassLoader(),
- new ExecutionConfig(),
- new HashMap<>(),
- new HashMap<>(),
-
UnregisteredMetricsGroup.createOperatorMetricGroup());
-
- ctx.setBroadcastVariable("name", Arrays.asList(1, 2, 3, 4));
-
- // access it the first time with an initializer
- List<Double> list =
- ctx.getBroadcastVariableWithInitializer("name", new
ConvertingInitializer());
- assertEquals(Arrays.asList(1.0, 2.0, 3.0, 4.0), list);
-
- // set it again to something different
- ctx.setBroadcastVariable("name", Arrays.asList(2, 3, 4, 5));
-
- List<Double> list2 =
- ctx.getBroadcastVariableWithInitializer("name", new
ConvertingInitializer());
- assertEquals(Arrays.asList(2.0, 3.0, 4.0, 5.0), list2);
- } catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
+ void testResetBroadcastVariableWithInitializer() {
+ RuntimeUDFContext ctx =
+ new RuntimeUDFContext(
+ taskInfo,
+ getClass().getClassLoader(),
+ new ExecutionConfig(),
+ new HashMap<>(),
+ new HashMap<>(),
+ UnregisteredMetricsGroup.createOperatorMetricGroup());
+
+ ctx.setBroadcastVariable("name", Arrays.asList(1, 2, 3, 4));
+
+ // access it the first time with an initializer
+ List<Double> list =
+ ctx.getBroadcastVariableWithInitializer("name", new
ConvertingInitializer());
+ assertThat(list).isEqualTo(Arrays.asList(1.0, 2.0, 3.0, 4.0));
+
+ // set it again to something different
+ ctx.setBroadcastVariable("name", Arrays.asList(2, 3, 4, 5));
+
+ List<Double> list2 =
+ ctx.getBroadcastVariableWithInitializer("name", new
ConvertingInitializer());
+ assertThat(list2).isEqualTo(Arrays.asList(2.0, 3.0, 4.0, 5.0));
}
@Test
- public void testBroadcastVariableWithInitializerAndMismatch() {
+ void testBroadcastVariableWithInitializerAndMismatch() {
+ RuntimeUDFContext ctx =
+ new RuntimeUDFContext(
+ taskInfo,
+ getClass().getClassLoader(),
+ new ExecutionConfig(),
+ new HashMap<>(),
+ new HashMap<>(),
+ UnregisteredMetricsGroup.createOperatorMetricGroup());
+
+ ctx.setBroadcastVariable("name", Arrays.asList(1, 2, 3, 4));
+
+ // access it the first time with an initializer
+ int sum = ctx.getBroadcastVariableWithInitializer("name", new
SumInitializer());
+ assertThat(sum).isEqualTo(10);
+
+ // access it the second time with no initializer -> should fail due to
type mismatch
try {
- RuntimeUDFContext ctx =
- new RuntimeUDFContext(
- taskInfo,
- getClass().getClassLoader(),
- new ExecutionConfig(),
- new HashMap<>(),
- new HashMap<>(),
-
UnregisteredMetricsGroup.createOperatorMetricGroup());
-
- ctx.setBroadcastVariable("name", Arrays.asList(1, 2, 3, 4));
-
- // access it the first time with an initializer
- int sum = ctx.getBroadcastVariableWithInitializer("name", new
SumInitializer());
- assertEquals(10, sum);
-
- // access it the second time with no initializer -> should fail
due to type mismatch
- try {
- ctx.getBroadcastVariable("name");
- fail("should throw an exception");
- } catch (IllegalStateException e) {
- // expected
- }
- } catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
+ ctx.getBroadcastVariable("name");
+ fail("should throw an exception");
+ } catch (IllegalStateException e) {
+ // expected
}
Review Comment:
assertThatThrownBy.isInstanceOf(IllegalStateException.class)
##########
flink-core/src/test/java/org/apache/flink/api/common/operators/DualInputSemanticPropertiesTest.java:
##########
@@ -193,61 +192,69 @@ public void testGetSourceField() {
sp.addForwardedField(1, 1, 2);
sp.addForwardedField(1, 1, 3);
- assertEquals(0, sp.getForwardingSourceField(1, 0));
- assertEquals(0, sp.getForwardingSourceField(1, 4));
- assertEquals(1, sp.getForwardingSourceField(1, 1));
- assertEquals(1, sp.getForwardingSourceField(1, 2));
- assertEquals(1, sp.getForwardingSourceField(1, 3));
- assertTrue(sp.getForwardingSourceField(1, 5) < 0);
+ assertThat(sp.getForwardingSourceField(1, 0)).isZero();
+ assertThat(sp.getForwardingSourceField(1, 4)).isZero();
+ assertThat(sp.getForwardingSourceField(1, 1)).isEqualTo(1);
+ assertThat(sp.getForwardingSourceField(1, 2)).isEqualTo(1);
+ assertThat(sp.getForwardingSourceField(1, 3)).isEqualTo(1);
+ assertThat(sp.getForwardingSourceField(1, 5) < 0).isTrue();
Review Comment:
isOne and isNegative
##########
flink-core/src/test/java/org/apache/flink/api/common/io/RichInputFormatTest.java:
##########
@@ -21,37 +21,39 @@
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.TaskInfo;
import org.apache.flink.api.common.TaskInfoImpl;
-import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.functions.util.RuntimeUDFContext;
-import org.apache.flink.core.fs.Path;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.types.Value;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
import java.util.HashMap;
-import java.util.concurrent.Future;
-import static org.junit.Assert.assertEquals;
+import static org.assertj.core.api.Assertions.assertThat;
/** Tests runtime context access from inside an RichInputFormat class. */
-public class RichInputFormatTest {
+class RichInputFormatTest {
@Test
- public void testCheckRuntimeContextAccess() {
- final SerializedInputFormat<Value> inputFormat = new
SerializedInputFormat<Value>();
+ void testCheckRuntimeContextAccess() {
+ final SerializedInputFormat<Value> inputFormat = new
SerializedInputFormat<>();
final TaskInfo taskInfo = new TaskInfoImpl("test name", 3, 1, 3, 0);
inputFormat.setRuntimeContext(
new RuntimeUDFContext(
taskInfo,
getClass().getClassLoader(),
new ExecutionConfig(),
- new HashMap<String, Future<Path>>(),
- new HashMap<String, Accumulator<?, ?>>(),
+ new HashMap<>(),
+ new HashMap<>(),
UnregisteredMetricsGroup.createOperatorMetricGroup()));
-
assertEquals(inputFormat.getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(),
1);
- assertEquals(
-
inputFormat.getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks(), 3);
+ assertThat(1)
+
.isEqualTo(inputFormat.getRuntimeContext().getTaskInfo().getIndexOfThisSubtask());
+ assertThat(3)
+ .isEqualTo(
+ inputFormat
+ .getRuntimeContext()
+ .getTaskInfo()
+ .getNumberOfParallelSubtasks());
Review Comment:
1 and 3 should be expected value, right?
##########
flink-core/src/test/java/org/apache/flink/api/common/operators/DualInputSemanticPropertiesTest.java:
##########
@@ -179,12 +178,12 @@ public void testGetSourceField() {
sp.addForwardedField(1, 2, 3);
sp.addForwardedField(1, 3, 2);
- assertEquals(0, sp.getForwardingSourceField(1, 1));
- assertEquals(1, sp.getForwardingSourceField(1, 4));
- assertEquals(2, sp.getForwardingSourceField(1, 3));
- assertEquals(3, sp.getForwardingSourceField(1, 2));
- assertTrue(sp.getForwardingSourceField(1, 0) < 0);
- assertTrue(sp.getForwardingSourceField(1, 5) < 0);
+ assertThat(sp.getForwardingSourceField(1, 1)).isZero();
+ assertThat(sp.getForwardingSourceField(1, 4)).isEqualTo(1);
+ assertThat(sp.getForwardingSourceField(1, 3)).isEqualTo(2);
+ assertThat(sp.getForwardingSourceField(1, 2)).isEqualTo(3);
+ assertThat(sp.getForwardingSourceField(1, 0) < 0).isTrue();
+ assertThat(sp.getForwardingSourceField(1, 5) < 0).isTrue();
Review Comment:
isNegative
##########
flink-core/src/test/java/org/apache/flink/api/common/io/FileOutputFormatTest.java:
##########
@@ -24,620 +24,617 @@
import org.apache.flink.core.fs.Path;
import org.apache.flink.types.IntValue;
-import org.junit.Assert;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
import java.io.File;
import java.io.IOException;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatNoException;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
-public class FileOutputFormatTest {
+class FileOutputFormatTest {
@Test
- public void testCreateNonParallelLocalFS() throws IOException {
+ void testCreateNonParallelLocalFS() throws IOException {
File tmpOutPath = File.createTempFile("fileOutputFormatTest", "Test1");
File tmpOutFile = new File(tmpOutPath.getAbsolutePath() + "/1");
String tmpFilePath = tmpOutPath.toURI().toString();
// check fail if file exists
- DummyFileOutputFormat dfof = new DummyFileOutputFormat();
- dfof.setOutputFilePath(new Path(tmpFilePath));
- dfof.setWriteMode(WriteMode.NO_OVERWRITE);
- dfof.setOutputDirectoryMode(OutputDirectoryMode.PARONLY);
-
- dfof.configure(new Configuration());
-
- try {
- dfof.open(0, 1);
- dfof.close();
- fail();
- } catch (Exception e) {
- // exception expected
- }
+ assertThatThrownBy(
+ () -> {
+ DummyFileOutputFormat dfof = new
DummyFileOutputFormat();
+ dfof.setOutputFilePath(new Path(tmpFilePath));
+ dfof.setWriteMode(WriteMode.NO_OVERWRITE);
+
dfof.setOutputDirectoryMode(OutputDirectoryMode.PARONLY);
+
+ dfof.configure(new Configuration());
+
+ dfof.open(0, 1);
+ dfof.close();
+ })
+ .isInstanceOf(Exception.class);
tmpOutPath.delete();
// check fail if directory exists
- Assert.assertTrue("Directory could not be created.",
tmpOutPath.mkdir());
+ assertThat(tmpOutPath.mkdir()).as("Directory could not be
created.").isTrue();
- dfof = new DummyFileOutputFormat();
- dfof.setOutputFilePath(new Path(tmpFilePath));
- dfof.setWriteMode(WriteMode.NO_OVERWRITE);
- dfof.setOutputDirectoryMode(OutputDirectoryMode.PARONLY);
+ assertThatThrownBy(
+ () -> {
+ DummyFileOutputFormat dfof = new
DummyFileOutputFormat();
+ dfof.setOutputFilePath(new Path(tmpFilePath));
+ dfof.setWriteMode(WriteMode.NO_OVERWRITE);
+
dfof.setOutputDirectoryMode(OutputDirectoryMode.PARONLY);
- dfof.configure(new Configuration());
+ dfof.configure(new Configuration());
- try {
- dfof.open(0, 1);
- dfof.close();
- fail();
- } catch (Exception e) {
- // exception expected
- }
+ dfof.open(0, 1);
+ dfof.close();
+ })
+ .isInstanceOf(Exception.class);
tmpOutPath.delete();
// check success
- dfof = new DummyFileOutputFormat();
- dfof.setOutputFilePath(new Path(tmpFilePath));
- dfof.setWriteMode(WriteMode.NO_OVERWRITE);
- dfof.setOutputDirectoryMode(OutputDirectoryMode.PARONLY);
-
- dfof.configure(new Configuration());
-
- try {
- dfof.open(0, 1);
- dfof.close();
- } catch (Exception e) {
- fail();
- }
- Assert.assertTrue(tmpOutPath.exists() && tmpOutPath.isFile());
+ assertThatNoException()
+ .isThrownBy(
+ () -> {
+ DummyFileOutputFormat dfof = new
DummyFileOutputFormat();
+ dfof.setOutputFilePath(new Path(tmpFilePath));
+ dfof.setWriteMode(WriteMode.NO_OVERWRITE);
+
dfof.setOutputDirectoryMode(OutputDirectoryMode.PARONLY);
+
+ dfof.configure(new Configuration());
+
+ dfof.open(0, 1);
+ dfof.close();
+ });
+ assertThat(tmpOutPath.exists() && tmpOutPath.isFile()).isTrue();
tmpOutPath.delete();
// check fail for path with tailing '/'
- dfof = new DummyFileOutputFormat();
- dfof.setOutputFilePath(new Path(tmpFilePath + "/"));
- dfof.setWriteMode(WriteMode.NO_OVERWRITE);
- dfof.setOutputDirectoryMode(OutputDirectoryMode.PARONLY);
-
- dfof.configure(new Configuration());
-
- try {
- dfof.open(0, 1);
- dfof.close();
- } catch (Exception e) {
- fail();
- }
- Assert.assertTrue(tmpOutPath.exists() && tmpOutPath.isFile());
+ assertThatNoException()
+ .isThrownBy(
+ () -> {
+ DummyFileOutputFormat dfof = new
DummyFileOutputFormat();
+ dfof.setOutputFilePath(new Path(tmpFilePath +
"/"));
+ dfof.setWriteMode(WriteMode.NO_OVERWRITE);
+
dfof.setOutputDirectoryMode(OutputDirectoryMode.PARONLY);
+
+ dfof.configure(new Configuration());
+
+ dfof.open(0, 1);
+ dfof.close();
+ });
+ assertThat(tmpOutPath.exists() && tmpOutPath.isFile()).isTrue();
tmpOutPath.delete();
// ----------- test again with always directory mode
// check fail if file exists
tmpOutPath.createNewFile();
- dfof = new DummyFileOutputFormat();
- dfof.setOutputFilePath(new Path(tmpFilePath));
- dfof.setWriteMode(WriteMode.NO_OVERWRITE);
- dfof.setOutputDirectoryMode(OutputDirectoryMode.ALWAYS);
+ assertThatThrownBy(
+ () -> {
+ DummyFileOutputFormat dfof = new
DummyFileOutputFormat();
+ dfof.setOutputFilePath(new Path(tmpFilePath));
+ dfof.setWriteMode(WriteMode.NO_OVERWRITE);
+
dfof.setOutputDirectoryMode(OutputDirectoryMode.ALWAYS);
- dfof.configure(new Configuration());
+ dfof.configure(new Configuration());
- try {
- dfof.open(0, 1);
- dfof.close();
- fail();
- } catch (Exception e) {
- // exception expected
- }
+ dfof.open(0, 1);
+ dfof.close();
+ })
+ .isInstanceOf(Exception.class);
tmpOutPath.delete();
// check success if directory exists
- Assert.assertTrue("Directory could not be created.",
tmpOutPath.mkdir());
-
- dfof = new DummyFileOutputFormat();
- dfof.setOutputFilePath(new Path(tmpFilePath));
- dfof.setWriteMode(WriteMode.NO_OVERWRITE);
- dfof.setOutputDirectoryMode(OutputDirectoryMode.ALWAYS);
-
- dfof.configure(new Configuration());
-
- try {
- dfof.open(0, 1);
- dfof.close();
- } catch (Exception e) {
- fail();
- }
- Assert.assertTrue(tmpOutPath.exists() && tmpOutPath.isDirectory());
- Assert.assertTrue(tmpOutFile.exists() && tmpOutFile.isFile());
+ assertThat(tmpOutPath.mkdir()).as("Directory could not be
created.").isTrue();
+
+ assertThatNoException()
+ .isThrownBy(
+ () -> {
+ DummyFileOutputFormat dfof = new
DummyFileOutputFormat();
+ dfof.setOutputFilePath(new Path(tmpFilePath));
+ dfof.setWriteMode(WriteMode.NO_OVERWRITE);
+
dfof.setOutputDirectoryMode(OutputDirectoryMode.ALWAYS);
+
+ dfof.configure(new Configuration());
+
+ dfof.open(0, 1);
+ dfof.close();
+ });
+ assertThat(tmpOutPath.exists() && tmpOutPath.isDirectory()).isTrue();
+ assertThat(tmpOutFile.exists() && tmpOutFile.isFile()).isTrue();
(new File(tmpOutPath.getAbsoluteFile() + "/1")).delete();
// check custom file name inside directory if directory exists
- dfof = new DummyFileOutputFormat();
- dfof.setOutputFilePath(new Path(tmpFilePath));
- dfof.setWriteMode(WriteMode.NO_OVERWRITE);
- dfof.setOutputDirectoryMode(OutputDirectoryMode.ALWAYS);
- dfof.testFileName = true;
- Configuration c = new Configuration();
- dfof.configure(c);
-
- try {
- dfof.open(0, 1);
- dfof.close();
- } catch (Exception e) {
- fail();
- }
+ assertThatNoException()
+ .isThrownBy(
+ () -> {
+ DummyFileOutputFormat dfof = new
DummyFileOutputFormat();
+ dfof.setOutputFilePath(new Path(tmpFilePath));
+ dfof.setWriteMode(WriteMode.NO_OVERWRITE);
+
dfof.setOutputDirectoryMode(OutputDirectoryMode.ALWAYS);
+ dfof.testFileName = true;
+ Configuration c = new Configuration();
+ dfof.configure(c);
+
+ dfof.open(0, 1);
+ dfof.close();
+ });
File customOutFile = new File(tmpOutPath.getAbsolutePath() +
"/fancy-1-0.avro");
- Assert.assertTrue(tmpOutPath.exists() && tmpOutPath.isDirectory());
- Assert.assertTrue(customOutFile.exists() && customOutFile.isFile());
+ assertThat(tmpOutPath.exists() && tmpOutPath.isDirectory()).isTrue();
+ assertThat(customOutFile.exists() && customOutFile.isFile()).isTrue();
Review Comment:
```suggestion
assertThat(tmpOutPath).exists().isDirectory();
assertThat(customOutFile).exists(). isFile();
```
##########
flink-core/src/test/java/org/apache/flink/api/common/io/FileOutputFormatTest.java:
##########
@@ -24,620 +24,617 @@
import org.apache.flink.core.fs.Path;
import org.apache.flink.types.IntValue;
-import org.junit.Assert;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
import java.io.File;
import java.io.IOException;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatNoException;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
-public class FileOutputFormatTest {
+class FileOutputFormatTest {
@Test
- public void testCreateNonParallelLocalFS() throws IOException {
+ void testCreateNonParallelLocalFS() throws IOException {
File tmpOutPath = File.createTempFile("fileOutputFormatTest", "Test1");
File tmpOutFile = new File(tmpOutPath.getAbsolutePath() + "/1");
String tmpFilePath = tmpOutPath.toURI().toString();
// check fail if file exists
- DummyFileOutputFormat dfof = new DummyFileOutputFormat();
- dfof.setOutputFilePath(new Path(tmpFilePath));
- dfof.setWriteMode(WriteMode.NO_OVERWRITE);
- dfof.setOutputDirectoryMode(OutputDirectoryMode.PARONLY);
-
- dfof.configure(new Configuration());
-
- try {
- dfof.open(0, 1);
- dfof.close();
- fail();
- } catch (Exception e) {
- // exception expected
- }
+ assertThatThrownBy(
+ () -> {
+ DummyFileOutputFormat dfof = new
DummyFileOutputFormat();
+ dfof.setOutputFilePath(new Path(tmpFilePath));
+ dfof.setWriteMode(WriteMode.NO_OVERWRITE);
+
dfof.setOutputDirectoryMode(OutputDirectoryMode.PARONLY);
+
+ dfof.configure(new Configuration());
+
+ dfof.open(0, 1);
+ dfof.close();
+ })
+ .isInstanceOf(Exception.class);
tmpOutPath.delete();
// check fail if directory exists
- Assert.assertTrue("Directory could not be created.",
tmpOutPath.mkdir());
+ assertThat(tmpOutPath.mkdir()).as("Directory could not be
created.").isTrue();
- dfof = new DummyFileOutputFormat();
- dfof.setOutputFilePath(new Path(tmpFilePath));
- dfof.setWriteMode(WriteMode.NO_OVERWRITE);
- dfof.setOutputDirectoryMode(OutputDirectoryMode.PARONLY);
+ assertThatThrownBy(
+ () -> {
+ DummyFileOutputFormat dfof = new
DummyFileOutputFormat();
+ dfof.setOutputFilePath(new Path(tmpFilePath));
+ dfof.setWriteMode(WriteMode.NO_OVERWRITE);
+
dfof.setOutputDirectoryMode(OutputDirectoryMode.PARONLY);
- dfof.configure(new Configuration());
+ dfof.configure(new Configuration());
- try {
- dfof.open(0, 1);
- dfof.close();
- fail();
- } catch (Exception e) {
- // exception expected
- }
+ dfof.open(0, 1);
+ dfof.close();
+ })
+ .isInstanceOf(Exception.class);
tmpOutPath.delete();
// check success
- dfof = new DummyFileOutputFormat();
- dfof.setOutputFilePath(new Path(tmpFilePath));
- dfof.setWriteMode(WriteMode.NO_OVERWRITE);
- dfof.setOutputDirectoryMode(OutputDirectoryMode.PARONLY);
-
- dfof.configure(new Configuration());
-
- try {
- dfof.open(0, 1);
- dfof.close();
- } catch (Exception e) {
- fail();
- }
- Assert.assertTrue(tmpOutPath.exists() && tmpOutPath.isFile());
+ assertThatNoException()
+ .isThrownBy(
+ () -> {
+ DummyFileOutputFormat dfof = new
DummyFileOutputFormat();
+ dfof.setOutputFilePath(new Path(tmpFilePath));
+ dfof.setWriteMode(WriteMode.NO_OVERWRITE);
+
dfof.setOutputDirectoryMode(OutputDirectoryMode.PARONLY);
+
+ dfof.configure(new Configuration());
+
+ dfof.open(0, 1);
+ dfof.close();
+ });
+ assertThat(tmpOutPath.exists() && tmpOutPath.isFile()).isTrue();
tmpOutPath.delete();
// check fail for path with tailing '/'
- dfof = new DummyFileOutputFormat();
- dfof.setOutputFilePath(new Path(tmpFilePath + "/"));
- dfof.setWriteMode(WriteMode.NO_OVERWRITE);
- dfof.setOutputDirectoryMode(OutputDirectoryMode.PARONLY);
-
- dfof.configure(new Configuration());
-
- try {
- dfof.open(0, 1);
- dfof.close();
- } catch (Exception e) {
- fail();
- }
- Assert.assertTrue(tmpOutPath.exists() && tmpOutPath.isFile());
+ assertThatNoException()
+ .isThrownBy(
+ () -> {
+ DummyFileOutputFormat dfof = new
DummyFileOutputFormat();
+ dfof.setOutputFilePath(new Path(tmpFilePath +
"/"));
+ dfof.setWriteMode(WriteMode.NO_OVERWRITE);
+
dfof.setOutputDirectoryMode(OutputDirectoryMode.PARONLY);
+
+ dfof.configure(new Configuration());
+
+ dfof.open(0, 1);
+ dfof.close();
+ });
+ assertThat(tmpOutPath.exists() && tmpOutPath.isFile()).isTrue();
tmpOutPath.delete();
// ----------- test again with always directory mode
// check fail if file exists
tmpOutPath.createNewFile();
- dfof = new DummyFileOutputFormat();
- dfof.setOutputFilePath(new Path(tmpFilePath));
- dfof.setWriteMode(WriteMode.NO_OVERWRITE);
- dfof.setOutputDirectoryMode(OutputDirectoryMode.ALWAYS);
+ assertThatThrownBy(
+ () -> {
+ DummyFileOutputFormat dfof = new
DummyFileOutputFormat();
+ dfof.setOutputFilePath(new Path(tmpFilePath));
+ dfof.setWriteMode(WriteMode.NO_OVERWRITE);
+
dfof.setOutputDirectoryMode(OutputDirectoryMode.ALWAYS);
- dfof.configure(new Configuration());
+ dfof.configure(new Configuration());
- try {
- dfof.open(0, 1);
- dfof.close();
- fail();
- } catch (Exception e) {
- // exception expected
- }
+ dfof.open(0, 1);
+ dfof.close();
+ })
+ .isInstanceOf(Exception.class);
tmpOutPath.delete();
// check success if directory exists
- Assert.assertTrue("Directory could not be created.",
tmpOutPath.mkdir());
-
- dfof = new DummyFileOutputFormat();
- dfof.setOutputFilePath(new Path(tmpFilePath));
- dfof.setWriteMode(WriteMode.NO_OVERWRITE);
- dfof.setOutputDirectoryMode(OutputDirectoryMode.ALWAYS);
-
- dfof.configure(new Configuration());
-
- try {
- dfof.open(0, 1);
- dfof.close();
- } catch (Exception e) {
- fail();
- }
- Assert.assertTrue(tmpOutPath.exists() && tmpOutPath.isDirectory());
- Assert.assertTrue(tmpOutFile.exists() && tmpOutFile.isFile());
+ assertThat(tmpOutPath.mkdir()).as("Directory could not be
created.").isTrue();
+
+ assertThatNoException()
+ .isThrownBy(
+ () -> {
+ DummyFileOutputFormat dfof = new
DummyFileOutputFormat();
+ dfof.setOutputFilePath(new Path(tmpFilePath));
+ dfof.setWriteMode(WriteMode.NO_OVERWRITE);
+
dfof.setOutputDirectoryMode(OutputDirectoryMode.ALWAYS);
+
+ dfof.configure(new Configuration());
+
+ dfof.open(0, 1);
+ dfof.close();
+ });
+ assertThat(tmpOutPath.exists() && tmpOutPath.isDirectory()).isTrue();
+ assertThat(tmpOutFile.exists() && tmpOutFile.isFile()).isTrue();
Review Comment:
```suggestion
assertThat(tmpOutPath).exists().isDirectory();
assertThat(tmpOutFile).exists().isFile();
```
##########
flink-core/src/test/java/org/apache/flink/api/common/operators/base/MapOperatorTest.java:
##########
@@ -41,126 +41,106 @@
import java.util.concurrent.atomic.AtomicBoolean;
import static java.util.Arrays.asList;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThat;
/** The test for map operator. */
-@SuppressWarnings("serial")
public class MapOperatorTest implements java.io.Serializable {
@Test
- public void testMapPlain() {
- try {
- final MapFunction<String, Integer> parser =
- new MapFunction<String, Integer>() {
- @Override
- public Integer map(String value) {
- return Integer.parseInt(value);
- }
- };
-
- MapOperatorBase<String, Integer, MapFunction<String, Integer>> op =
- new MapOperatorBase<String, Integer, MapFunction<String,
Integer>>(
- parser,
- new UnaryOperatorInformation<String, Integer>(
- BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO),
- "TestMapper");
-
- List<String> input = new ArrayList<String>(asList("1", "2", "3",
"4", "5", "6"));
-
- ExecutionConfig executionConfig = new ExecutionConfig();
- executionConfig.disableObjectReuse();
- List<Integer> resultMutableSafe = op.executeOnCollections(input,
null, executionConfig);
- executionConfig.enableObjectReuse();
- List<Integer> resultRegular = op.executeOnCollections(input, null,
executionConfig);
-
- assertEquals(asList(1, 2, 3, 4, 5, 6), resultMutableSafe);
- assertEquals(asList(1, 2, 3, 4, 5, 6), resultRegular);
- } catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
+ void testMapPlain() throws Exception {
+ final MapFunction<String, Integer> parser = Integer::parseInt;
+
+ MapOperatorBase<String, Integer, MapFunction<String, Integer>> op =
+ new MapOperatorBase<>(
+ parser,
+ new UnaryOperatorInformation<>(
+ BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO),
+ "TestMapper");
+
+ List<String> input = new ArrayList<>(asList("1", "2", "3", "4", "5",
"6"));
+
+ ExecutionConfig executionConfig = new ExecutionConfig();
+ executionConfig.disableObjectReuse();
+ List<Integer> resultMutableSafe = op.executeOnCollections(input, null,
executionConfig);
+ executionConfig.enableObjectReuse();
+ List<Integer> resultRegular = op.executeOnCollections(input, null,
executionConfig);
+
+ assertThat(resultMutableSafe).isEqualTo(asList(1, 2, 3, 4, 5, 6));
+ assertThat(resultRegular).isEqualTo(asList(1, 2, 3, 4, 5, 6));
}
@Test
- public void testMapWithRuntimeContext() {
- try {
- final String taskName = "Test Task";
- final AtomicBoolean opened = new AtomicBoolean();
- final AtomicBoolean closed = new AtomicBoolean();
-
- final MapFunction<String, Integer> parser =
- new RichMapFunction<String, Integer>() {
-
- @Override
- public void open(OpenContext openContext) throws
Exception {
- opened.set(true);
- RuntimeContext ctx = getRuntimeContext();
- assertEquals(0,
ctx.getTaskInfo().getIndexOfThisSubtask());
- assertEquals(1,
ctx.getTaskInfo().getNumberOfParallelSubtasks());
- assertEquals(taskName,
ctx.getTaskInfo().getTaskName());
- }
-
- @Override
- public Integer map(String value) {
- return Integer.parseInt(value);
- }
-
- @Override
- public void close() throws Exception {
- closed.set(true);
- }
- };
-
- MapOperatorBase<String, Integer, MapFunction<String, Integer>> op =
- new MapOperatorBase<String, Integer, MapFunction<String,
Integer>>(
- parser,
- new UnaryOperatorInformation<String, Integer>(
- BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO),
- taskName);
-
- List<String> input = new ArrayList<String>(asList("1", "2", "3",
"4", "5", "6"));
- final HashMap<String, Accumulator<?, ?>> accumulatorMap =
- new HashMap<String, Accumulator<?, ?>>();
- final HashMap<String, Future<Path>> cpTasks = new HashMap<>();
- final TaskInfo taskInfo = new TaskInfoImpl(taskName, 1, 0, 1, 0);
- ExecutionConfig executionConfig = new ExecutionConfig();
- executionConfig.disableObjectReuse();
-
- List<Integer> resultMutableSafe =
- op.executeOnCollections(
- input,
- new RuntimeUDFContext(
- taskInfo,
- null,
- executionConfig,
- cpTasks,
- accumulatorMap,
-
UnregisteredMetricsGroup.createOperatorMetricGroup()),
- executionConfig);
-
- executionConfig.enableObjectReuse();
- List<Integer> resultRegular =
- op.executeOnCollections(
- input,
- new RuntimeUDFContext(
- taskInfo,
- null,
- executionConfig,
- cpTasks,
- accumulatorMap,
-
UnregisteredMetricsGroup.createOperatorMetricGroup()),
- executionConfig);
-
- assertEquals(asList(1, 2, 3, 4, 5, 6), resultMutableSafe);
- assertEquals(asList(1, 2, 3, 4, 5, 6), resultRegular);
-
- assertTrue(opened.get());
- assertTrue(closed.get());
- } catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
+ void testMapWithRuntimeContext() throws Exception {
+ final String taskName = "Test Task";
+ final AtomicBoolean opened = new AtomicBoolean();
+ final AtomicBoolean closed = new AtomicBoolean();
+
+ final MapFunction<String, Integer> parser =
+ new RichMapFunction<String, Integer>() {
+
+ @Override
+ public void open(OpenContext openContext) {
+ opened.set(true);
+ RuntimeContext ctx = getRuntimeContext();
+
assertThat(ctx.getTaskInfo().getIndexOfThisSubtask()).isZero();
+
assertThat(ctx.getTaskInfo().getNumberOfParallelSubtasks()).isEqualTo(1);
+
assertThat(ctx.getTaskInfo().getTaskName()).isEqualTo(taskName);
+ }
+
+ @Override
+ public Integer map(String value) {
+ return Integer.parseInt(value);
+ }
+
+ @Override
+ public void close() {
+ closed.set(true);
+ }
+ };
+
+ MapOperatorBase<String, Integer, MapFunction<String, Integer>> op =
+ new MapOperatorBase<>(
+ parser,
+ new UnaryOperatorInformation<>(
+ BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO),
+ taskName);
+
+ List<String> input = new ArrayList<>(asList("1", "2", "3", "4", "5",
"6"));
+ final HashMap<String, Accumulator<?, ?>> accumulatorMap = new
HashMap<>();
+ final HashMap<String, Future<Path>> cpTasks = new HashMap<>();
+ final TaskInfo taskInfo = new TaskInfoImpl(taskName, 1, 0, 1, 0);
+ ExecutionConfig executionConfig = new ExecutionConfig();
+ executionConfig.disableObjectReuse();
+
+ List<Integer> resultMutableSafe =
+ op.executeOnCollections(
+ input,
+ new RuntimeUDFContext(
+ taskInfo,
+ null,
+ executionConfig,
+ cpTasks,
+ accumulatorMap,
+
UnregisteredMetricsGroup.createOperatorMetricGroup()),
+ executionConfig);
+
+ executionConfig.enableObjectReuse();
+ List<Integer> resultRegular =
+ op.executeOnCollections(
+ input,
+ new RuntimeUDFContext(
+ taskInfo,
+ null,
+ executionConfig,
+ cpTasks,
+ accumulatorMap,
+
UnregisteredMetricsGroup.createOperatorMetricGroup()),
+ executionConfig);
+
+ assertThat(resultMutableSafe).isEqualTo(asList(1, 2, 3, 4, 5, 6));
+ assertThat(resultRegular).isEqualTo(asList(1, 2, 3, 4, 5, 6));
+
+ assertThat(opened.get()).isTrue();
+ assertThat(closed.get()).isTrue();
Review Comment:
```suggestion
assertThat(opened).isTrue();
assertThat(closed).isTrue();
```
##########
flink-core/src/test/java/org/apache/flink/api/common/operators/DualInputSemanticPropertiesTest.java:
##########
@@ -165,12 +164,12 @@ public void testGetSourceField() {
sp.addForwardedField(0, 1, 2);
sp.addForwardedField(0, 1, 3);
- assertEquals(0, sp.getForwardingSourceField(0, 0));
- assertEquals(0, sp.getForwardingSourceField(0, 4));
- assertEquals(1, sp.getForwardingSourceField(0, 1));
- assertEquals(1, sp.getForwardingSourceField(0, 2));
- assertEquals(1, sp.getForwardingSourceField(0, 3));
- assertTrue(sp.getForwardingSourceField(0, 5) < 0);
+ assertThat(sp.getForwardingSourceField(0, 0)).isZero();
+ assertThat(sp.getForwardingSourceField(0, 4)).isZero();
+ assertThat(sp.getForwardingSourceField(0, 1)).isEqualTo(1);
+ assertThat(sp.getForwardingSourceField(0, 2)).isEqualTo(1);
+ assertThat(sp.getForwardingSourceField(0, 3)).isEqualTo(1);
+ assertThat(sp.getForwardingSourceField(0, 5) < 0).isTrue();
Review Comment:
```suggestion
```suggestion
assertThat(sp.getForwardingSourceField(0, 1)).isOne();
assertThat(sp.getForwardingSourceField(0, 2)).isOne();
assertThat(sp.getForwardingSourceField(0, 3)).isOne();
assertThat(sp. getForwardingSourceField(0, 5)).isNegative();
```
```
##########
flink-core/src/test/java/org/apache/flink/api/common/operators/DualInputSemanticPropertiesTest.java:
##########
@@ -151,12 +150,12 @@ public void testGetSourceField() {
sp.addForwardedField(0, 2, 3);
sp.addForwardedField(0, 3, 2);
- assertEquals(0, sp.getForwardingSourceField(0, 1));
- assertEquals(1, sp.getForwardingSourceField(0, 4));
- assertEquals(2, sp.getForwardingSourceField(0, 3));
- assertEquals(3, sp.getForwardingSourceField(0, 2));
- assertTrue(sp.getForwardingSourceField(0, 0) < 0);
- assertTrue(sp.getForwardingSourceField(0, 5) < 0);
+ assertThat(sp.getForwardingSourceField(0, 1)).isZero();
+ assertThat(sp.getForwardingSourceField(0, 4)).isEqualTo(1);
Review Comment:
```suggestion
assertThat(sp.getForwardingSourceField(0, 4)).isOne();
```
##########
flink-core/src/test/java/org/apache/flink/api/common/operators/SingleInputSemanticPropertiesTest.java:
##########
@@ -54,32 +55,32 @@ public void testGetTargetFields() {
sp.addForwardedField(1, 2);
sp.addForwardedField(1, 3);
- assertEquals(2, sp.getForwardingTargetFields(0, 0).size());
- assertEquals(3, sp.getForwardingTargetFields(0, 1).size());
- assertTrue(sp.getForwardingTargetFields(0, 0).contains(0));
- assertTrue(sp.getForwardingTargetFields(0, 0).contains(4));
- assertTrue(sp.getForwardingTargetFields(0, 1).contains(1));
- assertTrue(sp.getForwardingTargetFields(0, 1).contains(2));
- assertTrue(sp.getForwardingTargetFields(0, 1).contains(3));
- assertNotNull(sp.getForwardingTargetFields(0, 2));
- assertEquals(0, sp.getForwardingTargetFields(0, 2).size());
+ assertThat(sp.getForwardingTargetFields(0, 0)).hasSize(2);
+ assertThat(sp.getForwardingTargetFields(0, 1)).hasSize(3);
+ assertThat(sp.getForwardingTargetFields(0, 0).contains(0)).isTrue();
+ assertThat(sp.getForwardingTargetFields(0, 0).contains(4)).isTrue();
+ assertThat(sp.getForwardingTargetFields(0, 1).contains(1)).isTrue();
+ assertThat(sp.getForwardingTargetFields(0, 1).contains(2)).isTrue();
+ assertThat(sp.getForwardingTargetFields(0, 1).contains(3)).isTrue();
+ assertThat(sp.getForwardingTargetFields(0, 2)).isNotNull();
+ assertThat(sp.getForwardingTargetFields(0, 2)).isEmpty();
}
@Test
- public void testGetSourceField() {
+ void testGetSourceField() {
SingleInputSemanticProperties sp = new SingleInputSemanticProperties();
sp.addForwardedField(0, 1);
sp.addForwardedField(1, 4);
sp.addForwardedField(2, 3);
sp.addForwardedField(3, 2);
- assertEquals(0, sp.getForwardingSourceField(0, 1));
- assertEquals(1, sp.getForwardingSourceField(0, 4));
- assertEquals(2, sp.getForwardingSourceField(0, 3));
- assertEquals(3, sp.getForwardingSourceField(0, 2));
- assertTrue(sp.getForwardingSourceField(0, 0) < 0);
- assertTrue(sp.getForwardingSourceField(0, 5) < 0);
+ assertThat(sp.getForwardingSourceField(0, 1)).isZero();
+ assertThat(sp.getForwardingSourceField(0, 4)).isEqualTo(1);
+ assertThat(sp.getForwardingSourceField(0, 3)).isEqualTo(2);
+ assertThat(sp.getForwardingSourceField(0, 2)).isEqualTo(3);
+ assertThat(sp.getForwardingSourceField(0, 0) < 0).isTrue();
+ assertThat(sp.getForwardingSourceField(0, 5) < 0).isTrue();
Review Comment:
isOne and isNegative
##########
flink-core/src/test/java/org/apache/flink/api/common/operators/DualInputSemanticPropertiesTest.java:
##########
@@ -151,12 +150,12 @@ public void testGetSourceField() {
sp.addForwardedField(0, 2, 3);
sp.addForwardedField(0, 3, 2);
- assertEquals(0, sp.getForwardingSourceField(0, 1));
- assertEquals(1, sp.getForwardingSourceField(0, 4));
- assertEquals(2, sp.getForwardingSourceField(0, 3));
- assertEquals(3, sp.getForwardingSourceField(0, 2));
- assertTrue(sp.getForwardingSourceField(0, 0) < 0);
- assertTrue(sp.getForwardingSourceField(0, 5) < 0);
+ assertThat(sp.getForwardingSourceField(0, 1)).isZero();
+ assertThat(sp.getForwardingSourceField(0, 4)).isEqualTo(1);
+ assertThat(sp.getForwardingSourceField(0, 3)).isEqualTo(2);
+ assertThat(sp.getForwardingSourceField(0, 2)).isEqualTo(3);
+ assertThat(sp.getForwardingSourceField(0, 0) < 0).isTrue();
+ assertThat(sp.getForwardingSourceField(0, 5) < 0).isTrue();
Review Comment:
```suggestion
assertThat(sp.getForwardingSourceField(0, 0)).isNegative();
assertThat(sp.getForwardingSourceField(0, 5)).isNegative();
```
##########
flink-core/src/test/java/org/apache/flink/api/common/operators/base/InnerJoinOperatorBaseTest.java:
##########
@@ -129,69 +116,58 @@ public void join(String first, String second,
Collector<Integer> out)
InnerJoinOperatorBase<
String, String, Integer, RichFlatJoinFunction<String,
String, Integer>>
base =
- new InnerJoinOperatorBase<
- String,
- String,
- Integer,
- RichFlatJoinFunction<String, String, Integer>>(
+ new InnerJoinOperatorBase<>(
joiner,
- new BinaryOperatorInformation<String, String,
Integer>(
+ new BinaryOperatorInformation<>(
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO),
new int[0],
new int[0],
taskName);
- final List<String> inputData1 =
- new ArrayList<String>(Arrays.asList("foo", "bar", "foobar"));
- final List<String> inputData2 = new
ArrayList<String>(Arrays.asList("foobar", "foo"));
- final List<Integer> expected = new ArrayList<Integer>(Arrays.asList(3,
3, 6, 6));
-
- try {
- final TaskInfo taskInfo = new TaskInfoImpl(taskName, 1, 0, 1, 0);
- final HashMap<String, Accumulator<?, ?>> accumulatorMap =
- new HashMap<String, Accumulator<?, ?>>();
- final HashMap<String, Future<Path>> cpTasks = new HashMap<>();
-
- ExecutionConfig executionConfig = new ExecutionConfig();
-
- executionConfig.disableObjectReuse();
- List<Integer> resultSafe =
- base.executeOnCollections(
- inputData1,
- inputData2,
- new RuntimeUDFContext(
- taskInfo,
- null,
- executionConfig,
- cpTasks,
- accumulatorMap,
-
UnregisteredMetricsGroup.createOperatorMetricGroup()),
- executionConfig);
-
- executionConfig.enableObjectReuse();
- List<Integer> resultRegular =
- base.executeOnCollections(
- inputData1,
- inputData2,
- new RuntimeUDFContext(
- taskInfo,
- null,
- executionConfig,
- cpTasks,
- accumulatorMap,
-
UnregisteredMetricsGroup.createOperatorMetricGroup()),
- executionConfig);
-
- assertEquals(expected, resultSafe);
- assertEquals(expected, resultRegular);
- } catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
-
- assertTrue(opened.get());
- assertTrue(closed.get());
+ final List<String> inputData1 = new ArrayList<>(Arrays.asList("foo",
"bar", "foobar"));
+ final List<String> inputData2 = new
ArrayList<>(Arrays.asList("foobar", "foo"));
+ final List<Integer> expected = new ArrayList<>(Arrays.asList(3, 3, 6,
6));
+
+ final TaskInfo taskInfo = new TaskInfoImpl(taskName, 1, 0, 1, 0);
+ final HashMap<String, Accumulator<?, ?>> accumulatorMap = new
HashMap<>();
+ final HashMap<String, Future<Path>> cpTasks = new HashMap<>();
+
+ ExecutionConfig executionConfig = new ExecutionConfig();
+
+ executionConfig.disableObjectReuse();
+ List<Integer> resultSafe =
+ base.executeOnCollections(
+ inputData1,
+ inputData2,
+ new RuntimeUDFContext(
+ taskInfo,
+ null,
+ executionConfig,
+ cpTasks,
+ accumulatorMap,
+
UnregisteredMetricsGroup.createOperatorMetricGroup()),
+ executionConfig);
+
+ executionConfig.enableObjectReuse();
+ List<Integer> resultRegular =
+ base.executeOnCollections(
+ inputData1,
+ inputData2,
+ new RuntimeUDFContext(
+ taskInfo,
+ null,
+ executionConfig,
+ cpTasks,
+ accumulatorMap,
+
UnregisteredMetricsGroup.createOperatorMetricGroup()),
+ executionConfig);
+
+ assertThat(resultSafe).isEqualTo(expected);
+ assertThat(resultRegular).isEqualTo(expected);
+
+ assertThat(opened.get()).isTrue();
+ assertThat(closed.get()).isTrue();
Review Comment:
```suggestion
assertThat(opened).isTrue();
assertThat(closed).isTrue();
```
##########
flink-core/src/test/java/org/apache/flink/api/common/operators/ExpressionKeysTest.java:
##########
@@ -407,49 +397,49 @@ public void testInvalidPojo() throws Throwable {
} catch (Throwable t) {
e = t;
}
- Assert.assertNotNull(e);
+ assertThat(e).isNotNull();
Review Comment:
assertThatThrownBy
##########
flink-core/src/test/java/org/apache/flink/api/common/operators/ExpressionKeysTest.java:
##########
@@ -148,24 +152,25 @@ public void testInvalidTuple() throws Throwable {
} catch (Throwable t) {
e = t;
}
- Assert.assertNotNull(e);
+ assertThat(e).isNotNull();
Review Comment:
assertThatThrownBy
##########
flink-core/src/test/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBaseTest.java:
##########
@@ -262,20 +265,21 @@ private static class MockRichFlatJoinFunction
final AtomicBoolean closed = new AtomicBoolean(false);
@Override
- public void open(OpenContext openContext) throws Exception {
+ public void open(OpenContext openContext) {
opened.compareAndSet(false, true);
- assertEquals(0,
getRuntimeContext().getTaskInfo().getIndexOfThisSubtask());
- assertEquals(1,
getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks());
+
assertThat(getRuntimeContext().getTaskInfo().getIndexOfThisSubtask()).isZero();
+
assertThat(getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks())
+ .isEqualTo(1);
Review Comment:
```suggestion
.isOne();
```
##########
flink-core/src/test/java/org/apache/flink/api/common/operators/SingleInputSemanticPropertiesTest.java:
##########
@@ -88,98 +89,113 @@ public void testGetSourceField() {
sp.addForwardedField(1, 2);
sp.addForwardedField(1, 3);
- assertEquals(0, sp.getForwardingSourceField(0, 0));
- assertEquals(0, sp.getForwardingSourceField(0, 4));
- assertEquals(1, sp.getForwardingSourceField(0, 1));
- assertEquals(1, sp.getForwardingSourceField(0, 2));
- assertEquals(1, sp.getForwardingSourceField(0, 3));
- assertTrue(sp.getForwardingSourceField(0, 5) < 0);
+ assertThat(sp.getForwardingSourceField(0, 0)).isZero();
+ assertThat(sp.getForwardingSourceField(0, 4)).isZero();
+ assertThat(sp.getForwardingSourceField(0, 1)).isEqualTo(1);
+ assertThat(sp.getForwardingSourceField(0, 2)).isEqualTo(1);
+ assertThat(sp.getForwardingSourceField(0, 3)).isEqualTo(1);
+ assertThat(sp.getForwardingSourceField(0, 5) < 0).isTrue();
Review Comment:
isOne and isNegative
##########
flink-core/src/test/java/org/apache/flink/api/common/typeutils/ComparatorTestBase.java:
##########
@@ -218,38 +193,30 @@ public void testInequalityWithReference() {
}
protected void testGreatSmallAscDescWithReference(boolean ascending,
boolean greater) {
- try {
- T[] data = getSortedData();
+ T[] data = getSortedData();
- TypeComparator<T> comparatorLow = getComparator(ascending);
- TypeComparator<T> comparatorHigh = getComparator(ascending);
+ TypeComparator<T> comparatorLow = getComparator(ascending);
+ TypeComparator<T> comparatorHigh = getComparator(ascending);
- // compares every element in high with every element in low
- for (int x = 0; x < data.length - 1; x++) {
- for (int y = x + 1; y < data.length; y++) {
- comparatorLow.setReference(data[x]);
- comparatorHigh.setReference(data[y]);
+ // compares every element in high with every element in low
+ for (int x = 0; x < data.length - 1; x++) {
+ for (int y = x + 1; y < data.length; y++) {
+ comparatorLow.setReference(data[x]);
+ comparatorHigh.setReference(data[y]);
- if (greater && ascending) {
-
assertThat(comparatorLow.compareToReference(comparatorHigh))
- .isGreaterThan(0);
- }
- if (greater && !ascending) {
-
assertThat(comparatorLow.compareToReference(comparatorHigh)).isLessThan(0);
- }
- if (!greater && ascending) {
-
assertThat(comparatorHigh.compareToReference(comparatorLow)).isLessThan(0);
- }
- if (!greater && !ascending) {
-
assertThat(comparatorHigh.compareToReference(comparatorLow))
- .isGreaterThan(0);
- }
+ if (greater && ascending) {
+
assertThat(comparatorLow.compareToReference(comparatorHigh)).isGreaterThan(0);
+ }
+ if (greater && !ascending) {
+
assertThat(comparatorLow.compareToReference(comparatorHigh)).isLessThan(0);
+ }
+ if (!greater && ascending) {
+
assertThat(comparatorHigh.compareToReference(comparatorLow)).isLessThan(0);
+ }
+ if (!greater && !ascending) {
+
assertThat(comparatorHigh.compareToReference(comparatorLow)).isGreaterThan(0);
Review Comment:
How about isPositive and isNegative?
##########
flink-core/src/test/java/org/apache/flink/api/common/operators/base/FlatMapOperatorCollectionTest.java:
##########
@@ -90,13 +82,13 @@ private void testExecuteOnCollection(
taskInfo,
null,
executionConfig,
- new HashMap<String, Future<Path>>(),
- new HashMap<String, Accumulator<?,
?>>(),
+ new HashMap<>(),
+ new HashMap<>(),
UnregisteredMetricsGroup.createOperatorMetricGroup()),
executionConfig);
- Assert.assertEquals(input.size(), result.size());
- Assert.assertEquals(input, result);
+ assertThat(result).hasSize(input.size());
+ assertThat(result).isEqualTo(input);
Review Comment:
```suggestion
assertThat(result).hasSameSizeAs(input).isEqualTo(input);
```
or just isEqualTo for short
##########
flink-core/src/test/java/org/apache/flink/api/common/operators/base/PartitionMapOperatorTest.java:
##########
@@ -21,115 +21,103 @@
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.TaskInfo;
import org.apache.flink.api.common.TaskInfoImpl;
-import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.functions.MapPartitionFunction;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.functions.RichMapPartitionFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.functions.util.RuntimeUDFContext;
import org.apache.flink.api.common.operators.UnaryOperatorInformation;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.core.fs.Path;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.util.Collector;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
-import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import static java.util.Arrays.asList;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThat;
/** The test for partition map operator. */
-@SuppressWarnings("serial")
public class PartitionMapOperatorTest implements java.io.Serializable {
@Test
- public void testMapPartitionWithRuntimeContext() {
- try {
- final String taskName = "Test Task";
- final AtomicBoolean opened = new AtomicBoolean();
- final AtomicBoolean closed = new AtomicBoolean();
-
- final MapPartitionFunction<String, Integer> parser =
- new RichMapPartitionFunction<String, Integer>() {
-
- @Override
- public void open(OpenContext openContext) throws
Exception {
- opened.set(true);
- RuntimeContext ctx = getRuntimeContext();
- assertEquals(0,
ctx.getTaskInfo().getIndexOfThisSubtask());
- assertEquals(1,
ctx.getTaskInfo().getNumberOfParallelSubtasks());
- assertEquals(taskName,
ctx.getTaskInfo().getTaskName());
+ void testMapPartitionWithRuntimeContext() throws Exception {
+ final String taskName = "Test Task";
+ final AtomicBoolean opened = new AtomicBoolean();
+ final AtomicBoolean closed = new AtomicBoolean();
+
+ final MapPartitionFunction<String, Integer> parser =
+ new RichMapPartitionFunction<String, Integer>() {
+
+ @Override
+ public void open(OpenContext openContext) {
+ opened.set(true);
+ RuntimeContext ctx = getRuntimeContext();
+
assertThat(ctx.getTaskInfo().getIndexOfThisSubtask()).isZero();
+
assertThat(ctx.getTaskInfo().getNumberOfParallelSubtasks()).isEqualTo(1);
+
assertThat(ctx.getTaskInfo().getTaskName()).isEqualTo(taskName);
+ }
+
+ @Override
+ public void mapPartition(Iterable<String> values,
Collector<Integer> out) {
+ for (String s : values) {
+ out.collect(Integer.parseInt(s));
}
-
- @Override
- public void mapPartition(Iterable<String> values,
Collector<Integer> out) {
- for (String s : values) {
- out.collect(Integer.parseInt(s));
- }
- }
-
- @Override
- public void close() throws Exception {
- closed.set(true);
- }
- };
-
- MapPartitionOperatorBase<String, Integer,
MapPartitionFunction<String, Integer>> op =
- new MapPartitionOperatorBase<
- String, Integer, MapPartitionFunction<String,
Integer>>(
- parser,
- new UnaryOperatorInformation<String, Integer>(
- BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO),
- taskName);
-
- List<String> input = new ArrayList<String>(asList("1", "2", "3",
"4", "5", "6"));
-
- final TaskInfo taskInfo = new TaskInfoImpl(taskName, 1, 0, 1, 0);
-
- ExecutionConfig executionConfig = new ExecutionConfig();
- executionConfig.disableObjectReuse();
-
- List<Integer> resultMutableSafe =
- op.executeOnCollections(
- input,
- new RuntimeUDFContext(
- taskInfo,
- null,
- executionConfig,
- new HashMap<String, Future<Path>>(),
- new HashMap<String, Accumulator<?, ?>>(),
-
UnregisteredMetricsGroup.createOperatorMetricGroup()),
- executionConfig);
-
- executionConfig.enableObjectReuse();
- List<Integer> resultRegular =
- op.executeOnCollections(
- input,
- new RuntimeUDFContext(
- taskInfo,
- null,
- executionConfig,
- new HashMap<String, Future<Path>>(),
- new HashMap<String, Accumulator<?, ?>>(),
-
UnregisteredMetricsGroup.createOperatorMetricGroup()),
- executionConfig);
-
- assertEquals(asList(1, 2, 3, 4, 5, 6), resultMutableSafe);
- assertEquals(asList(1, 2, 3, 4, 5, 6), resultRegular);
-
- assertTrue(opened.get());
- assertTrue(closed.get());
- } catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
+ }
+
+ @Override
+ public void close() {
+ closed.set(true);
+ }
+ };
+
+ MapPartitionOperatorBase<String, Integer, MapPartitionFunction<String,
Integer>> op =
+ new MapPartitionOperatorBase<>(
+ parser,
+ new UnaryOperatorInformation<>(
+ BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO),
+ taskName);
+
+ List<String> input = new ArrayList<>(asList("1", "2", "3", "4", "5",
"6"));
+
+ final TaskInfo taskInfo = new TaskInfoImpl(taskName, 1, 0, 1, 0);
+
+ ExecutionConfig executionConfig = new ExecutionConfig();
+ executionConfig.disableObjectReuse();
+
+ List<Integer> resultMutableSafe =
+ op.executeOnCollections(
+ input,
+ new RuntimeUDFContext(
+ taskInfo,
+ null,
+ executionConfig,
+ new HashMap<>(),
+ new HashMap<>(),
+
UnregisteredMetricsGroup.createOperatorMetricGroup()),
+ executionConfig);
+
+ executionConfig.enableObjectReuse();
+ List<Integer> resultRegular =
+ op.executeOnCollections(
+ input,
+ new RuntimeUDFContext(
+ taskInfo,
+ null,
+ executionConfig,
+ new HashMap<>(),
+ new HashMap<>(),
+
UnregisteredMetricsGroup.createOperatorMetricGroup()),
+ executionConfig);
+
+ assertThat(resultMutableSafe).isEqualTo(asList(1, 2, 3, 4, 5, 6));
+ assertThat(resultRegular).isEqualTo(asList(1, 2, 3, 4, 5, 6));
+
+ assertThat(opened.get()).isTrue();
+ assertThat(closed.get()).isTrue();
Review Comment:
```suggestion
assertThat(opened).isTrue();
assertThat(closed).isTrue();
```
##########
flink-core/src/test/java/org/apache/flink/api/common/operators/util/FieldListTest.java:
##########
@@ -101,8 +98,8 @@ private static void check(FieldList set, int... elements) {
for (int i = 0; i < fromIter.length; i++) {
fromIter[i] = iter.next();
}
- assertFalse(iter.hasNext());
- assertTrue(Arrays.equals(fromIter, elements));
+ assertThat(iter.hasNext()).isFalse();
Review Comment:
```suggestion
assertThat(iter).isExhausted();
```
##########
flink-core/src/test/java/org/apache/flink/api/common/operators/util/FieldListTest.java:
##########
@@ -62,26 +59,26 @@ public void testImmutability() {
s1.addField(Integer.valueOf(14));
s2.addFields(78, 13, 66, 3);
- assertEquals(0, s1.size());
- assertEquals(1, s2.size());
- assertEquals(1, s3.size());
- assertEquals(4, s4.size());
+ assertThat(s1).isEmpty();
+ assertThat(s2).hasSize(1);
+ assertThat(s3).hasSize(1);
+ assertThat(s4).hasSize(4);
}
@Test
- public void testAddSetToList() {
+ void testAddSetToList() {
check(new FieldList().addFields(new FieldSet(1)).addFields(2), 1, 2);
check(new FieldList().addFields(1).addFields(new FieldSet(2)), 1, 2);
check(new FieldList().addFields(new FieldSet(2)), 2);
}
private static void check(FieldList set, int... elements) {
if (elements == null) {
- assertEquals(0, set.size());
+ assertThat(set).isEmpty();
return;
}
- assertEquals(elements.length, set.size());
+ assertThat(set).hasSize(elements.length);
Review Comment:
```suggestion
assertThat(set).hasSameSizeAs(elements);
```
##########
flink-core/src/test/java/org/apache/flink/api/common/operators/util/FieldSetTest.java:
##########
@@ -105,9 +103,9 @@ private static void check(FieldSet set, int... elements) {
for (int i = 0; i < fromIter.length; i++) {
fromIter[i] = iter.next();
}
- assertFalse(iter.hasNext());
+ assertThat(iter.hasNext()).isFalse();
Review Comment:
```suggestion
assertThat(iter).isExhausted();
```
##########
flink-core/src/test/java/org/apache/flink/api/common/resources/ResourceTest.java:
##########
@@ -20,194 +20,198 @@
import org.apache.flink.util.TestLogger;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
import java.math.BigDecimal;
-import static org.hamcrest.Matchers.greaterThan;
-import static org.hamcrest.Matchers.is;
-import static org.hamcrest.Matchers.lessThan;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
/** Tests for {@link Resource}. */
-public class ResourceTest extends TestLogger {
+@SuppressWarnings("rawtypes")
+class ResourceTest extends TestLogger {
@Test
- public void testConstructorValid() {
+ void testConstructorValid() {
final Resource v1 = new TestResource(0.1);
assertTestResourceValueEquals(0.1, v1);
final Resource v2 = new TestResource(BigDecimal.valueOf(0.1));
assertTestResourceValueEquals(0.1, v2);
}
- @Test(expected = IllegalArgumentException.class)
- public void testConstructorInvalidValue() {
- new TestResource(-0.1);
+ @Test
+ void testConstructorInvalidValue() {
+ assertThatThrownBy(() -> new TestResource(-0.1))
+ .isInstanceOf(IllegalArgumentException.class);
}
@Test
- public void testEquals() {
+ void testEquals() {
final Resource v1 = new TestResource(0.1);
final Resource v2 = new TestResource(0.1);
final Resource v3 = new TestResource(0.2);
- assertTrue(v1.equals(v2));
- assertFalse(v1.equals(v3));
+ assertThat(v2).isEqualTo(v1);
+ assertThat(v3).isNotEqualTo(v1);
}
@Test
- public void testEqualsIgnoringScale() {
+ void testEqualsIgnoringScale() {
final Resource v1 = new TestResource(new BigDecimal("0.1"));
final Resource v2 = new TestResource(new BigDecimal("0.10"));
- assertTrue(v1.equals(v2));
+ assertThat(v2).isEqualTo(v1);
}
@Test
- public void testHashCodeIgnoringScale() {
+ void testHashCodeIgnoringScale() {
final Resource v1 = new TestResource(new BigDecimal("0.1"));
final Resource v2 = new TestResource(new BigDecimal("0.10"));
- assertTrue(v1.hashCode() == v2.hashCode());
+ assertThat(v2).hasSameHashCodeAs(v1);
}
@Test
- public void testMerge() {
+ void testMerge() {
final Resource v1 = new TestResource(0.1);
final Resource v2 = new TestResource(0.2);
assertTestResourceValueEquals(0.3, v1.merge(v2));
}
- @Test(expected = IllegalArgumentException.class)
- public void testMergeErrorOnDifferentTypes() {
+ @Test
+ void testMergeErrorOnDifferentTypes() {
final Resource v1 = new TestResource(0.1);
final Resource v2 = new CPUResource(0.1);
- v1.merge(v2);
+ // v1.merge(v2);
+ assertThatThrownBy(() ->
v1.merge(v2)).isInstanceOf(IllegalArgumentException.class);
}
@Test
- public void testSubtract() {
+ void testSubtract() {
final Resource v1 = new TestResource(0.2);
final Resource v2 = new TestResource(0.1);
assertTestResourceValueEquals(0.1, v1.subtract(v2));
}
- @Test(expected = IllegalArgumentException.class)
- public void testSubtractLargerValue() {
+ @Test
+ void testSubtractLargerValue() {
final Resource v1 = new TestResource(0.1);
final Resource v2 = new TestResource(0.2);
- v1.subtract(v2);
+ assertThatThrownBy(() ->
v1.subtract(v2)).isInstanceOf(IllegalArgumentException.class);
}
- @Test(expected = IllegalArgumentException.class)
- public void testSubtractErrorOnDifferentTypes() {
+ @Test
+ void testSubtractErrorOnDifferentTypes() {
final Resource v1 = new TestResource(0.1);
final Resource v2 = new CPUResource(0.1);
- v1.subtract(v2);
+ assertThatThrownBy(() ->
v1.subtract(v2)).isInstanceOf(IllegalArgumentException.class);
}
@Test
- public void testDivide() {
+ void testDivide() {
final Resource resource = new TestResource(0.04);
final BigDecimal by = BigDecimal.valueOf(0.1);
assertTestResourceValueEquals(0.4, resource.divide(by));
}
- @Test(expected = IllegalArgumentException.class)
- public void testDivideNegative() {
+ @Test
+ void testDivideNegative() {
final Resource resource = new TestResource(1.2);
final BigDecimal by = BigDecimal.valueOf(-0.5);
- resource.divide(by);
+ assertThatThrownBy(() ->
resource.divide(by)).isInstanceOf(IllegalArgumentException.class);
}
@Test
- public void testDivideInteger() {
+ void testDivideInteger() {
final Resource resource = new TestResource(0.12);
final int by = 4;
assertTestResourceValueEquals(0.03, resource.divide(by));
}
- @Test(expected = IllegalArgumentException.class)
- public void testDivideNegativeInteger() {
+ @Test
+ void testDivideNegativeInteger() {
final Resource resource = new TestResource(1.2);
final int by = -5;
- resource.divide(by);
+ assertThatThrownBy(() ->
resource.divide(by)).isInstanceOf(IllegalArgumentException.class);
}
@Test
- public void testMultiply() {
+ void testMultiply() {
final Resource resource = new TestResource(0.3);
final BigDecimal by = BigDecimal.valueOf(0.2);
assertTestResourceValueEquals(0.06, resource.multiply(by));
}
- @Test(expected = IllegalArgumentException.class)
- public void testMutiplyNegative() {
+ @Test
+ void testMutiplyNegative() {
final Resource resource = new TestResource(0.3);
final BigDecimal by = BigDecimal.valueOf(-0.2);
- resource.multiply(by);
+ assertThatThrownBy(() -> resource.multiply(by))
+ .isInstanceOf(IllegalArgumentException.class);
}
@Test
- public void testMultiplyInteger() {
+ void testMultiplyInteger() {
final Resource resource = new TestResource(0.3);
final int by = 2;
assertTestResourceValueEquals(0.6, resource.multiply(by));
}
- @Test(expected = IllegalArgumentException.class)
- public void testMutiplyNegativeInteger() {
+ @Test
+ void testMutiplyNegativeInteger() {
final Resource resource = new TestResource(0.3);
final int by = -2;
- resource.multiply(by);
+ assertThatThrownBy(() -> resource.multiply(by))
+ .isInstanceOf(IllegalArgumentException.class);
}
@Test
- public void testIsZero() {
+ void testIsZero() {
final Resource resource1 = new TestResource(0.0);
final Resource resource2 = new TestResource(1.0);
- assertTrue(resource1.isZero());
- assertFalse(resource2.isZero());
+ assertThat(resource1.isZero()).isTrue();
+ assertThat(resource2.isZero()).isFalse();
}
@Test
- public void testCompareTo() {
+ void testCompareTo() {
final Resource resource1 = new TestResource(0.0);
final Resource resource2 = new TestResource(0.0);
final Resource resource3 = new TestResource(1.0);
- assertThat(resource1.compareTo(resource1), is(0));
- assertThat(resource1.compareTo(resource2), is(0));
- assertThat(resource1.compareTo(resource3), lessThan(0));
- assertThat(resource3.compareTo(resource1), greaterThan(0));
+ assertThat(resource1.compareTo(resource1)).isZero();
+ assertThat(resource1.compareTo(resource2)).isZero();
+ assertThat(resource1.compareTo(resource3)).isLessThan(0);
+ assertThat(resource3.compareTo(resource1)).isGreaterThan(0);
Review Comment:
```suggestion
assertThat(resource1.compareTo(resource3)).isNegative();
assertThat(resource3.compareTo(resource1)).isPositive();
```
##########
flink-core/src/test/java/org/apache/flink/api/common/typeutils/ComparatorTestBase.java:
##########
@@ -68,148 +67,124 @@ protected Order[] getTestedOrder() {
// -------------------------------- test duplication
------------------------------------------
@Test
- public void testDuplicate() {
- try {
- boolean ascending = isAscending(getTestedOrder()[0]);
- TypeComparator<T> comparator = getComparator(ascending);
- TypeComparator<T> clone = comparator.duplicate();
-
- T[] data = getSortedData();
- comparator.setReference(data[0]);
- clone.setReference(data[1]);
-
- assertThat(comparator.equalToReference(data[0]) &&
clone.equalToReference(data[1]))
- .as(
- "Comparator duplication does not work: Altering
the reference in a duplicated comparator alters the original comparator's
reference.")
- .isTrue();
- } catch (Exception e) {
- System.err.println(e.getMessage());
- e.printStackTrace();
- fail(e.getMessage());
- }
+ protected void testDuplicate() {
+ boolean ascending = isAscending(getTestedOrder()[0]);
+ TypeComparator<T> comparator = getComparator(ascending);
+ TypeComparator<T> clone = comparator.duplicate();
+
+ T[] data = getSortedData();
+ comparator.setReference(data[0]);
+ clone.setReference(data[1]);
+
+ assertThat(comparator.equalToReference(data[0]) &&
clone.equalToReference(data[1]))
+ .as(
+ "Comparator duplication does not work: Altering the
reference in a duplicated comparator alters the original comparator's
reference.")
+ .isTrue();
}
// --------------------------------- equality tests
-------------------------------------------
@Test
- public void testEquality() {
+ protected void testEquality() throws IOException {
for (Order order : getTestedOrder()) {
boolean ascending = isAscending(order);
testEquals(ascending);
}
}
- protected void testEquals(boolean ascending) {
- try {
- // Just setup two identical output/inputViews and go over their
data to see if compare
- // works
- TestOutputView out1;
- TestOutputView out2;
- TestInputView in1;
- TestInputView in2;
+ protected void testEquals(boolean ascending) throws IOException {
+ // Just setup two identical output/inputViews and go over their data
to see if compare
+ // works
+ TestOutputView out1;
+ TestOutputView out2;
+ TestInputView in1;
+ TestInputView in2;
- // Now use comparator and compare
- TypeComparator<T> comparator = getComparator(ascending);
- T[] data = getSortedData();
- for (T d : data) {
+ // Now use comparator and compare
+ TypeComparator<T> comparator = getComparator(ascending);
+ T[] data = getSortedData();
+ for (T d : data) {
- out2 = new TestOutputView();
- writeSortedData(d, out2);
- in2 = out2.getInputView();
+ out2 = new TestOutputView();
+ writeSortedData(d, out2);
+ in2 = out2.getInputView();
- out1 = new TestOutputView();
- writeSortedData(d, out1);
- in1 = out1.getInputView();
+ out1 = new TestOutputView();
+ writeSortedData(d, out1);
+ in1 = out1.getInputView();
- assertThat(comparator.compareSerialized(in1,
in2)).isEqualTo(0);
- }
- } catch (Exception e) {
- System.err.println(e.getMessage());
- e.printStackTrace();
- fail("Exception in test: " + e.getMessage());
+ assertThat(comparator.compareSerialized(in1, in2)).isZero();
}
}
@Test
- public void testEqualityWithReference() {
- try {
- TypeSerializer<T> serializer = createSerializer();
- boolean ascending = isAscending(getTestedOrder()[0]);
- TypeComparator<T> comparator = getComparator(ascending);
- TypeComparator<T> comparator2 = getComparator(ascending);
- T[] data = getSortedData();
- for (T d : data) {
- comparator.setReference(d);
- // Make a copy to compare
- T copy = serializer.copy(d, serializer.createInstance());
-
- // And then test equalTo and compareToReference method of
comparator
- assertThat(comparator.equalToReference(d)).isTrue();
- comparator2.setReference(copy);
-
assertThat(comparator.compareToReference(comparator2)).isEqualTo(0);
- }
- } catch (Exception e) {
- System.err.println(e.getMessage());
- e.printStackTrace();
- fail("Exception in test: " + e.getMessage());
+ protected void testEqualityWithReference() {
+ TypeSerializer<T> serializer = createSerializer();
+ boolean ascending = isAscending(getTestedOrder()[0]);
+ TypeComparator<T> comparator = getComparator(ascending);
+ TypeComparator<T> comparator2 = getComparator(ascending);
+ T[] data = getSortedData();
+ for (T d : data) {
+ comparator.setReference(d);
+ // Make a copy to compare
+ T copy = serializer.copy(d, serializer.createInstance());
+
+ // And then test equalTo and compareToReference method of
comparator
+ assertThat(comparator.equalToReference(d)).isTrue();
+ comparator2.setReference(copy);
+ assertThat(comparator.compareToReference(comparator2)).isZero();
}
}
// --------------------------------- inequality tests
----------------------------------------
@Test
- public void testInequality() {
+ protected void testInequality() throws IOException {
for (Order order : getTestedOrder()) {
boolean ascending = isAscending(order);
testGreatSmallAscDesc(ascending, true);
testGreatSmallAscDesc(ascending, false);
}
}
- protected void testGreatSmallAscDesc(boolean ascending, boolean greater) {
- try {
- // split data into low and high part
- T[] data = getSortedData();
-
- TypeComparator<T> comparator = getComparator(ascending);
- TestOutputView out1;
- TestOutputView out2;
- TestInputView in1;
- TestInputView in2;
-
- // compares every element in high with every element in low
- for (int x = 0; x < data.length - 1; x++) {
- for (int y = x + 1; y < data.length; y++) {
- out1 = new TestOutputView();
- writeSortedData(data[x], out1);
- in1 = out1.getInputView();
-
- out2 = new TestOutputView();
- writeSortedData(data[y], out2);
- in2 = out2.getInputView();
-
- if (greater && ascending) {
- assertThat(comparator.compareSerialized(in1,
in2)).isLessThan(0);
- }
- if (greater && !ascending) {
- assertThat(comparator.compareSerialized(in1,
in2)).isGreaterThan(0);
- }
- if (!greater && ascending) {
- assertThat(comparator.compareSerialized(in2,
in1)).isGreaterThan(0);
- }
- if (!greater && !ascending) {
- assertThat(comparator.compareSerialized(in2,
in1)).isLessThan(0);
- }
+ protected void testGreatSmallAscDesc(boolean ascending, boolean greater)
throws IOException {
+ // split data into low and high part
+ T[] data = getSortedData();
+
+ TypeComparator<T> comparator = getComparator(ascending);
+ TestOutputView out1;
+ TestOutputView out2;
+ TestInputView in1;
+ TestInputView in2;
+
+ // compares every element in high with every element in low
+ for (int x = 0; x < data.length - 1; x++) {
+ for (int y = x + 1; y < data.length; y++) {
+ out1 = new TestOutputView();
+ writeSortedData(data[x], out1);
+ in1 = out1.getInputView();
+
+ out2 = new TestOutputView();
+ writeSortedData(data[y], out2);
+ in2 = out2.getInputView();
+
+ if (greater && ascending) {
+ assertThat(comparator.compareSerialized(in1,
in2)).isLessThan(0);
+ }
+ if (greater && !ascending) {
+ assertThat(comparator.compareSerialized(in1,
in2)).isGreaterThan(0);
+ }
+ if (!greater && ascending) {
+ assertThat(comparator.compareSerialized(in2,
in1)).isGreaterThan(0);
+ }
+ if (!greater && !ascending) {
Review Comment:
How about isPositive and isNegative?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]