I'm not sure who added that, but it's been there for a while. Making global static changes like that in our module seems like poor form - I wonder if there's a better approach.
On Thu, Apr 30, 2020 at 8:36 AM Brian Hulette <bhule...@google.com> wrote: > It seems likely this is a side effect of some static initialization in > AvroUtils: > https://github.com/apache/beam/blob/763b7ccd17a420eb634d6799adcd3ecfcf33d6a7/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java#L99 > > On Wed, Apr 29, 2020 at 9:59 PM Reuven Lax <re...@google.com> wrote: > >> I've copied this failing test into my client, and it passes for me. I >> can't reproduce the failure. >> >> On Wed, Apr 29, 2020 at 6:34 PM Luke Cwik <lc...@google.com> wrote: >> >>> +dev <dev@beam.apache.org> +Brian Hulette <bhule...@google.com> +Reuven >>> Lax <re...@google.com> >>> >>> On Wed, Apr 29, 2020 at 4:21 AM Paolo Tomeo <p.tome...@gmail.com> wrote: >>> >>>> Hi all, >>>> >>>> I think the method AvroUtils.toBeamSchema has a not expected side >>>> effect. >>>> I found out that, if you invoke it and then you run a pipeline of >>>> GenericRecords containing a timestamp (l tried with logical-type >>>> timestamp-millis), Beam converts such timestamp from long to >>>> org.joda.time.DateTime. Even if you don't apply any transformation to the >>>> pipeline. >>>> Do you think it's a bug? >>>> >>>> Below you can find a simple test class I wrote in order to replicate >>>> the problem. >>>> The first test passes while the second fails. >>>> >>>> >>>> import org.apache.avro.Schema; >>>> import org.apache.avro.SchemaBuilder; >>>> import org.apache.avro.generic.GenericRecord; >>>> import org.apache.avro.generic.GenericRecordBuilder; >>>> import org.apache.beam.sdk.coders.AvroCoder; >>>> import org.apache.beam.sdk.schemas.utils.AvroUtils; >>>> import org.apache.beam.sdk.testing.TestPipeline; >>>> import org.apache.beam.sdk.transforms.Combine; >>>> import org.apache.beam.sdk.transforms.Create; >>>> import org.apache.beam.sdk.transforms.SerializableFunction; >>>> import org.junit.Rule; >>>> >>>> import java.sql.Timestamp; >>>> >>>> import static org.junit.Assert.assertEquals; >>>> >>>> public class AvroUtilsSideEffect { >>>> >>>> @Rule >>>> public final transient TestPipeline pipeline = TestPipeline.create(); >>>> @Rule >>>> public final transient TestPipeline pipeline2 = TestPipeline.create(); >>>> public final Schema testSchema = SchemaBuilder >>>> .record("record").namespace("test") >>>> .fields() >>>> .name("timestamp").type().longBuilder().prop("logicalType", >>>> "timestamp-millis").endLong().noDefault() >>>> .endRecord(); >>>> public final GenericRecord record = new >>>> GenericRecordBuilder(testSchema) >>>> .set("timestamp", new Timestamp(1563926400000L).getTime()) >>>> .build(); >>>> >>>> >>>> @org.junit.Test >>>> public void test() { >>>> pipeline.apply( >>>> Create.of(record).withCoder(AvroCoder.of(testSchema))) >>>> .apply( Combine.globally(new TestFn())); >>>> >>>> pipeline.run().waitUntilFinish(); >>>> } >>>> @org.junit.Test >>>> public void test2() { >>>> >>>> AvroUtils.toBeamSchema(testSchema); >>>> >>>> >>>> pipeline2.apply(Create.of(record).withCoder(AvroCoder.of(testSchema))) >>>> .apply(Combine.globally(new TestFn())); >>>> >>>> pipeline2.run().waitUntilFinish(); >>>> } >>>> >>>> public static class TestFn implements >>>> SerializableFunction<Iterable<GenericRecord>, GenericRecord> { >>>> >>>> @Override >>>> public GenericRecord apply(Iterable<GenericRecord> input) { >>>> for (GenericRecord item : input) { >>>> if(item != null){ >>>> assertEquals(Long.class, >>>> item.get("timestamp").getClass()); >>>> assertEquals(1563926400000L, item.get("timestamp")); >>>> } >>>> return item; >>>> } >>>> return null; >>>> } >>>> } >>>> } >>>> >>>> Thanks, >>>> Paolo >>>> >>>> -- >>>> Paolo Tomeo, PhD >>>> >>>> Big Data and Machine Learning Engineer >>>> >>>> linkedin.com/in/ptomeo <https://www.linkedin.com/in/ptomeo> >>>> >>>