afedulov commented on code in PR #23553:
URL: https://github.com/apache/flink/pull/23553#discussion_r1371571177


##########
flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamExecutionEnvironmentITCase.java:
##########
@@ -67,7 +79,62 @@ public void executeThrowsProgramInvocationException() throws 
Exception {
                         })
                 .print();
 
-        thrown.expect(ProgramInvocationException.class);
-        env.execute();
+        
assertThatThrownBy(env::execute).isInstanceOf(ProgramInvocationException.class);
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    void testAvroSpecificRecordsInFromElements() throws Exception {
+        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        User user1 = new User("Foo", 1);
+        User user2 = new User("Bar", 2);
+        User[] data = {user1, user2};
+        DataStreamSource<User> stream = env.fromElements(User.class, user1, 
user2);
+        DataGeneratorSource<User> source = getSourceFromStream(stream);
+        FromElementsGeneratorFunction<User> generatorFunction =
+                (FromElementsGeneratorFunction<User>) 
source.getGeneratorFunction();
+
+        List<User> result = stream.executeAndCollect(data.length + 1);
+        TypeSerializer<User> serializer = generatorFunction.getSerializer();
+
+        assertThat(serializer).isInstanceOf(AvroSerializer.class);
+        assertThat(result).containsExactly(data);
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    void testAvroGenericRecordsInFromElements() throws Exception {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        Schema schema = getSchemaFromResources("/avro/user.avsc");
+        GenericRecord user1 =
+                new GenericRecordBuilder(schema).set("name", "Foo").set("age", 
40).build();
+        GenericRecord user2 =
+                new GenericRecordBuilder(schema).set("name", "Bar").set("age", 
45).build();
+        GenericRecord[] data = {user1, user2};
+        DataStream<GenericRecord> stream =
+                env.fromElements(data).returns(new 
GenericRecordAvroTypeInfo(schema));
+        DataGeneratorSource<GenericRecord> source = 
getSourceFromStream(stream);
+        FromElementsGeneratorFunction<GenericRecord> generatorFunction =
+                (FromElementsGeneratorFunction<GenericRecord>) 
source.getGeneratorFunction();
+
+        List<GenericRecord> result = stream.executeAndCollect(data.length + 1);
+        TypeSerializer<GenericRecord> serializer = 
generatorFunction.getSerializer();
+
+        assertThat(serializer).isInstanceOf(AvroSerializer.class);
+        assertThat(result).containsExactly(data);
+    }
+
+    private Schema getSchemaFromResources(String path) throws Exception {
+        try (InputStream schemaStream = getClass().getResourceAsStream(path)) {
+            if (schemaStream == null) {
+                throw new IllegalStateException("Could not find " + path + " 
in classpath");
+            }
+            return new Schema.Parser().parse(schemaStream);
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    private static <T, S extends Source<T, ?, ?>> S 
getSourceFromStream(DataStream<T> stream) {
+        return (S) ((SourceTransformation<T, ?, ?>) 
stream.getTransformation()).getSource();

Review Comment:
   Missed this one. 
   `fromElements` returns `DataStreamSource` (`SingleOutputStreamOperator`), 
not the FLIP-27 Source that I can cast to `DataGeneratorSource` to get a hold 
of the `FromElementsGeneratorFunction`. Or do you have something completely 
different in mind?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to