zentol commented on code in PR #23553: URL: https://github.com/apache/flink/pull/23553#discussion_r1371718910
########## flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamExecutionEnvironmentITCase.java: ########## @@ -67,7 +79,62 @@ public void executeThrowsProgramInvocationException() throws Exception { }) .print(); - thrown.expect(ProgramInvocationException.class); - env.execute(); + assertThatThrownBy(env::execute).isInstanceOf(ProgramInvocationException.class); + } + + @Test + @SuppressWarnings("unchecked") + void testAvroSpecificRecordsInFromElements() throws Exception { + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + User user1 = new User("Foo", 1); + User user2 = new User("Bar", 2); + User[] data = {user1, user2}; + DataStreamSource<User> stream = env.fromElements(User.class, user1, user2); + DataGeneratorSource<User> source = getSourceFromStream(stream); + FromElementsGeneratorFunction<User> generatorFunction = + (FromElementsGeneratorFunction<User>) source.getGeneratorFunction(); + + List<User> result = stream.executeAndCollect(data.length + 1); + TypeSerializer<User> serializer = generatorFunction.getSerializer(); + + assertThat(serializer).isInstanceOf(AvroSerializer.class); + assertThat(result).containsExactly(data); + } + + @Test + @SuppressWarnings("unchecked") + void testAvroGenericRecordsInFromElements() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + Schema schema = getSchemaFromResources("/avro/user.avsc"); + GenericRecord user1 = + new GenericRecordBuilder(schema).set("name", "Foo").set("age", 40).build(); + GenericRecord user2 = + new GenericRecordBuilder(schema).set("name", "Bar").set("age", 45).build(); + GenericRecord[] data = {user1, user2}; + DataStream<GenericRecord> stream = + env.fromElements(data).returns(new GenericRecordAvroTypeInfo(schema)); + DataGeneratorSource<GenericRecord> source = getSourceFromStream(stream); + FromElementsGeneratorFunction<GenericRecord> generatorFunction = + (FromElementsGeneratorFunction<GenericRecord>) source.getGeneratorFunction(); + + List<GenericRecord> result = stream.executeAndCollect(data.length + 1); + TypeSerializer<GenericRecord> serializer = generatorFunction.getSerializer(); + + assertThat(serializer).isInstanceOf(AvroSerializer.class); + assertThat(result).containsExactly(data); + } + + private Schema getSchemaFromResources(String path) throws Exception { + try (InputStream schemaStream = getClass().getResourceAsStream(path)) { + if (schemaStream == null) { + throw new IllegalStateException("Could not find " + path + " in classpath"); + } + return new Schema.Parser().parse(schemaStream); + } + } + + @SuppressWarnings("unchecked") + private static <T, S extends Source<T, ?, ?>> S getSourceFromStream(DataStream<T> stream) { + return (S) ((SourceTransformation<T, ?, ?>) stream.getTransformation()).getSource(); Review Comment: ah I see. It feels a bit weird to break the abstraction layers like this; they are 3 casts being done here. A higher level test that just uses avro + fromElements and verifies that things _works_ seems more appropriate? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org