zentol commented on code in PR #23553:
URL: https://github.com/apache/flink/pull/23553#discussion_r1371718910


##########
flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamExecutionEnvironmentITCase.java:
##########
@@ -67,7 +79,62 @@ public void executeThrowsProgramInvocationException() throws 
Exception {
                         })
                 .print();
 
-        thrown.expect(ProgramInvocationException.class);
-        env.execute();
+        
assertThatThrownBy(env::execute).isInstanceOf(ProgramInvocationException.class);
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    void testAvroSpecificRecordsInFromElements() throws Exception {
+        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        User user1 = new User("Foo", 1);
+        User user2 = new User("Bar", 2);
+        User[] data = {user1, user2};
+        DataStreamSource<User> stream = env.fromElements(User.class, user1, 
user2);
+        DataGeneratorSource<User> source = getSourceFromStream(stream);
+        FromElementsGeneratorFunction<User> generatorFunction =
+                (FromElementsGeneratorFunction<User>) 
source.getGeneratorFunction();
+
+        List<User> result = stream.executeAndCollect(data.length + 1);
+        TypeSerializer<User> serializer = generatorFunction.getSerializer();
+
+        assertThat(serializer).isInstanceOf(AvroSerializer.class);
+        assertThat(result).containsExactly(data);
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    void testAvroGenericRecordsInFromElements() throws Exception {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        Schema schema = getSchemaFromResources("/avro/user.avsc");
+        GenericRecord user1 =
+                new GenericRecordBuilder(schema).set("name", "Foo").set("age", 
40).build();
+        GenericRecord user2 =
+                new GenericRecordBuilder(schema).set("name", "Bar").set("age", 
45).build();
+        GenericRecord[] data = {user1, user2};
+        DataStream<GenericRecord> stream =
+                env.fromElements(data).returns(new 
GenericRecordAvroTypeInfo(schema));
+        DataGeneratorSource<GenericRecord> source = 
getSourceFromStream(stream);
+        FromElementsGeneratorFunction<GenericRecord> generatorFunction =
+                (FromElementsGeneratorFunction<GenericRecord>) 
source.getGeneratorFunction();
+
+        List<GenericRecord> result = stream.executeAndCollect(data.length + 1);
+        TypeSerializer<GenericRecord> serializer = 
generatorFunction.getSerializer();
+
+        assertThat(serializer).isInstanceOf(AvroSerializer.class);
+        assertThat(result).containsExactly(data);
+    }
+
+    private Schema getSchemaFromResources(String path) throws Exception {
+        try (InputStream schemaStream = getClass().getResourceAsStream(path)) {
+            if (schemaStream == null) {
+                throw new IllegalStateException("Could not find " + path + " 
in classpath");
+            }
+            return new Schema.Parser().parse(schemaStream);
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    private static <T, S extends Source<T, ?, ?>> S 
getSourceFromStream(DataStream<T> stream) {
+        return (S) ((SourceTransformation<T, ?, ?>) 
stream.getTransformation()).getSource();

Review Comment:
   ah I see. It feels a bit weird to break the abstraction layers like this; 
they are 3 casts being done here.
   
   A higher level test that just uses avro + fromElements and verifies that 
things _works_ seems more appropriate?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to