afedulov commented on code in PR #23553: URL: https://github.com/apache/flink/pull/23553#discussion_r1371571177
########## flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamExecutionEnvironmentITCase.java: ########## @@ -67,7 +79,62 @@ public void executeThrowsProgramInvocationException() throws Exception { }) .print(); - thrown.expect(ProgramInvocationException.class); - env.execute(); + assertThatThrownBy(env::execute).isInstanceOf(ProgramInvocationException.class); + } + + @Test + @SuppressWarnings("unchecked") + void testAvroSpecificRecordsInFromElements() throws Exception { + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + User user1 = new User("Foo", 1); + User user2 = new User("Bar", 2); + User[] data = {user1, user2}; + DataStreamSource<User> stream = env.fromElements(User.class, user1, user2); + DataGeneratorSource<User> source = getSourceFromStream(stream); + FromElementsGeneratorFunction<User> generatorFunction = + (FromElementsGeneratorFunction<User>) source.getGeneratorFunction(); + + List<User> result = stream.executeAndCollect(data.length + 1); + TypeSerializer<User> serializer = generatorFunction.getSerializer(); + + assertThat(serializer).isInstanceOf(AvroSerializer.class); + assertThat(result).containsExactly(data); + } + + @Test + @SuppressWarnings("unchecked") + void testAvroGenericRecordsInFromElements() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + Schema schema = getSchemaFromResources("/avro/user.avsc"); + GenericRecord user1 = + new GenericRecordBuilder(schema).set("name", "Foo").set("age", 40).build(); + GenericRecord user2 = + new GenericRecordBuilder(schema).set("name", "Bar").set("age", 45).build(); + GenericRecord[] data = {user1, user2}; + DataStream<GenericRecord> stream = + env.fromElements(data).returns(new GenericRecordAvroTypeInfo(schema)); + DataGeneratorSource<GenericRecord> source = getSourceFromStream(stream); + FromElementsGeneratorFunction<GenericRecord> generatorFunction = + (FromElementsGeneratorFunction<GenericRecord>) source.getGeneratorFunction(); + + List<GenericRecord> result = stream.executeAndCollect(data.length + 1); + TypeSerializer<GenericRecord> serializer = generatorFunction.getSerializer(); + + assertThat(serializer).isInstanceOf(AvroSerializer.class); + assertThat(result).containsExactly(data); + } + + private Schema getSchemaFromResources(String path) throws Exception { + try (InputStream schemaStream = getClass().getResourceAsStream(path)) { + if (schemaStream == null) { + throw new IllegalStateException("Could not find " + path + " in classpath"); + } + return new Schema.Parser().parse(schemaStream); + } + } + + @SuppressWarnings("unchecked") + private static <T, S extends Source<T, ?, ?>> S getSourceFromStream(DataStream<T> stream) { + return (S) ((SourceTransformation<T, ?, ?>) stream.getTransformation()).getSource(); Review Comment: Missed this one. `fromElements` returns `DataStreamSource` (`SingleOutputStreamOperator`), not the FLIP-27 Source that I can cast to `DataGeneratorSource` to get a hold of the `FromElementsGeneratorFunction`. Or do you have something completely different in mind? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org