afedulov commented on code in PR #23553:
URL: https://github.com/apache/flink/pull/23553#discussion_r1371775741


##########
flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamExecutionEnvironmentITCase.java:
##########
@@ -67,7 +79,62 @@ public void executeThrowsProgramInvocationException() throws 
Exception {
                         })
                 .print();
 
-        thrown.expect(ProgramInvocationException.class);
-        env.execute();
+        
assertThatThrownBy(env::execute).isInstanceOf(ProgramInvocationException.class);
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    void testAvroSpecificRecordsInFromElements() throws Exception {
+        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        User user1 = new User("Foo", 1);
+        User user2 = new User("Bar", 2);
+        User[] data = {user1, user2};
+        DataStreamSource<User> stream = env.fromElements(User.class, user1, 
user2);
+        DataGeneratorSource<User> source = getSourceFromStream(stream);
+        FromElementsGeneratorFunction<User> generatorFunction =
+                (FromElementsGeneratorFunction<User>) 
source.getGeneratorFunction();
+
+        List<User> result = stream.executeAndCollect(data.length + 1);
+        TypeSerializer<User> serializer = generatorFunction.getSerializer();
+
+        assertThat(serializer).isInstanceOf(AvroSerializer.class);
+        assertThat(result).containsExactly(data);
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    void testAvroGenericRecordsInFromElements() throws Exception {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        Schema schema = getSchemaFromResources("/avro/user.avsc");
+        GenericRecord user1 =
+                new GenericRecordBuilder(schema).set("name", "Foo").set("age", 
40).build();
+        GenericRecord user2 =
+                new GenericRecordBuilder(schema).set("name", "Bar").set("age", 
45).build();
+        GenericRecord[] data = {user1, user2};
+        DataStream<GenericRecord> stream =
+                env.fromElements(data).returns(new 
GenericRecordAvroTypeInfo(schema));
+        DataGeneratorSource<GenericRecord> source = 
getSourceFromStream(stream);
+        FromElementsGeneratorFunction<GenericRecord> generatorFunction =
+                (FromElementsGeneratorFunction<GenericRecord>) 
source.getGeneratorFunction();
+
+        List<GenericRecord> result = stream.executeAndCollect(data.length + 1);
+        TypeSerializer<GenericRecord> serializer = 
generatorFunction.getSerializer();
+
+        assertThat(serializer).isInstanceOf(AvroSerializer.class);
+        assertThat(result).containsExactly(data);
+    }
+
+    private Schema getSchemaFromResources(String path) throws Exception {
+        try (InputStream schemaStream = getClass().getResourceAsStream(path)) {
+            if (schemaStream == null) {
+                throw new IllegalStateException("Could not find " + path + " 
in classpath");
+            }
+            return new Schema.Parser().parse(schemaStream);
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    private static <T, S extends Source<T, ?, ?>> S 
getSourceFromStream(DataStream<T> stream) {
+        return (S) ((SourceTransformation<T, ?, ?>) 
stream.getTransformation()).getSource();

Review Comment:
   
https://github.com/apache/flink/pull/23553/commits/2962647e494b569d12bcaf3b4abc314ef0bbc623
 , I also removed the specific record test because without the serializer check 
it is a bit pointless.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to