I've copied this failing test into my client, and it passes for me. I can't reproduce the failure.
On Wed, Apr 29, 2020 at 6:34 PM Luke Cwik <lc...@google.com> wrote: > +dev <dev@beam.apache.org> +Brian Hulette <bhule...@google.com> +Reuven > Lax <re...@google.com> > > On Wed, Apr 29, 2020 at 4:21 AM Paolo Tomeo <p.tome...@gmail.com> wrote: > >> Hi all, >> >> I think the method AvroUtils.toBeamSchema has a not expected side effect. >> I found out that, if you invoke it and then you run a pipeline of >> GenericRecords containing a timestamp (l tried with logical-type >> timestamp-millis), Beam converts such timestamp from long to >> org.joda.time.DateTime. Even if you don't apply any transformation to the >> pipeline. >> Do you think it's a bug? >> >> Below you can find a simple test class I wrote in order to replicate the >> problem. >> The first test passes while the second fails. >> >> >> import org.apache.avro.Schema; >> import org.apache.avro.SchemaBuilder; >> import org.apache.avro.generic.GenericRecord; >> import org.apache.avro.generic.GenericRecordBuilder; >> import org.apache.beam.sdk.coders.AvroCoder; >> import org.apache.beam.sdk.schemas.utils.AvroUtils; >> import org.apache.beam.sdk.testing.TestPipeline; >> import org.apache.beam.sdk.transforms.Combine; >> import org.apache.beam.sdk.transforms.Create; >> import org.apache.beam.sdk.transforms.SerializableFunction; >> import org.junit.Rule; >> >> import java.sql.Timestamp; >> >> import static org.junit.Assert.assertEquals; >> >> public class AvroUtilsSideEffect { >> >> @Rule >> public final transient TestPipeline pipeline = TestPipeline.create(); >> @Rule >> public final transient TestPipeline pipeline2 = TestPipeline.create(); >> public final Schema testSchema = SchemaBuilder >> .record("record").namespace("test") >> .fields() >> .name("timestamp").type().longBuilder().prop("logicalType", >> "timestamp-millis").endLong().noDefault() >> .endRecord(); >> public final GenericRecord record = new GenericRecordBuilder(testSchema) >> .set("timestamp", new Timestamp(1563926400000L).getTime()) >> .build(); >> >> >> @org.junit.Test >> public void test() { >> pipeline.apply( >> Create.of(record).withCoder(AvroCoder.of(testSchema))) >> .apply( Combine.globally(new TestFn())); >> >> pipeline.run().waitUntilFinish(); >> } >> @org.junit.Test >> public void test2() { >> >> AvroUtils.toBeamSchema(testSchema); >> >> >> pipeline2.apply(Create.of(record).withCoder(AvroCoder.of(testSchema))) >> .apply(Combine.globally(new TestFn())); >> >> pipeline2.run().waitUntilFinish(); >> } >> >> public static class TestFn implements >> SerializableFunction<Iterable<GenericRecord>, GenericRecord> { >> >> @Override >> public GenericRecord apply(Iterable<GenericRecord> input) { >> for (GenericRecord item : input) { >> if(item != null){ >> assertEquals(Long.class, >> item.get("timestamp").getClass()); >> assertEquals(1563926400000L, item.get("timestamp")); >> } >> return item; >> } >> return null; >> } >> } >> } >> >> Thanks, >> Paolo >> >> -- >> Paolo Tomeo, PhD >> >> Big Data and Machine Learning Engineer >> >> linkedin.com/in/ptomeo <https://www.linkedin.com/in/ptomeo> >> >