kezhuw commented on a change in pull request #15507:
URL: https://github.com/apache/flink/pull/15507#discussion_r608798087
##########
File path:
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/FromElementsFunctionTest.java
##########
@@ -68,6 +88,135 @@ public void testStrings() {
}
}
+ @Test
+ public void testNullElement() throws Exception {
+ try {
+ new FromElementsFunction<>("a", null, "b");
+ fail("expect exception");
+ } catch (Exception ex) {
+ assertThat(ex, instanceOf(IllegalArgumentException.class));
+ assertThat(ex.getMessage(), containsString("contains a null
element"));
+ }
+ }
+
+ @Test
+ public void testSetOutputTypeWithNoSerializer() throws Exception {
+ FromElementsFunction<String> source = new
FromElementsFunction<>(STRING_ARRAY_DATA);
+
+ assertNull(source.getSerializer());
+
+ source.setOutputType(BasicTypeInfo.STRING_TYPE_INFO, new
ExecutionConfig());
+
+ assertNotNull(source.getSerializer());
+ assertEquals(
+ BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new
ExecutionConfig()),
+ source.getSerializer());
+
+ List<String> result = runSource(source);
+
+ assertEquals(STRING_LIST_DATA, result);
+ }
+
+ @Test
+ public void testSetOutputTypeWithSameSerializer() throws Exception {
+ FromElementsFunction<String> source =
+ new FromElementsFunction<>(
+ BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new
ExecutionConfig()),
+ STRING_LIST_DATA);
+
+ TypeSerializer<String> existingSerializer = source.getSerializer();
+
+ source.setOutputType(BasicTypeInfo.STRING_TYPE_INFO, new
ExecutionConfig());
+
+ TypeSerializer<String> newSerializer = source.getSerializer();
+
+ assertEquals(existingSerializer, newSerializer);
+
+ List<String> result = runSource(source);
+
+ assertEquals(STRING_LIST_DATA, result);
+ }
+
+ @Test
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ public void testSetOutputTypeWithIncompatibleType() throws Exception {
+ FromElementsFunction<String> source = new
FromElementsFunction<>(STRING_LIST_DATA);
+
+ try {
+ source.setOutputType(
+ (TypeInformation) BasicTypeInfo.INT_TYPE_INFO, new
ExecutionConfig());
+ fail("expect exception");
+ } catch (Exception ex) {
+ assertThat(ex, instanceOf(IllegalArgumentException.class));
+ assertThat(ex.getMessage(), containsString("not all subclasses of
java.lang.Integer"));
+ }
+ }
+
+ @Test
+ public void testSetOutputTypeWithDifferentSerializer() throws Exception {
+ FromElementsFunction<String> source =
+ new FromElementsFunction<>(
+ BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new
ExecutionConfig()),
+ STRING_LIST_DATA);
+
+ TypeSerializer<String> existingSerializer = source.getSerializer();
+
+ source.setOutputType(new GenericTypeInfo<>(String.class), new
ExecutionConfig());
+
+ TypeSerializer<String> newSerializer = source.getSerializer();
+
+ assertNotEquals(existingSerializer, newSerializer);
+
+ List<String> result = runSource(source);
+
+ assertEquals(STRING_LIST_DATA, result);
+ }
+
+ @Test
+ public void testSetOutputTypeWithExistingBrokenSerializer() throws
Exception {
Review comment:
Merged to one `testSetOutputTypeWithExistingBrokenSerializer`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]