Let's discuss details on the jira. I could maybe take it, but could use advice on the right course of action.
On Fri, May 1, 2020 at 6:05 AM Ismaël Mejía <ieme...@gmail.com> wrote: > I dug deeper and found that this global static change was introduced > since the beginning of the Avro / Beam Schema support (Beam 2.15.0): > https://github.com/apache/beam/commit/2a40c576cfb > > > > On Thu, Apr 30, 2020 at 8:52 PM Ismaël Mejía <ieme...@gmail.com> wrote: > > > > Created https://issues.apache.org/jira/browse/BEAM-9863 to track this. > > Any taker? > > > > On Thu, Apr 30, 2020 at 5:54 PM Reuven Lax <re...@google.com> wrote: > > > > > > I'm not sure who added that, but it's been there for a while. Making > global static changes like that in our module seems like poor form - I > wonder if there's a better approach. > > > > > > On Thu, Apr 30, 2020 at 8:36 AM Brian Hulette <bhule...@google.com> > wrote: > > >> > > >> It seems likely this is a side effect of some static initialization > in AvroUtils: > https://github.com/apache/beam/blob/763b7ccd17a420eb634d6799adcd3ecfcf33d6a7/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java#L99 > > >> > > >> On Wed, Apr 29, 2020 at 9:59 PM Reuven Lax <re...@google.com> wrote: > > >>> > > >>> I've copied this failing test into my client, and it passes for me. > I can't reproduce the failure. > > >>> > > >>> On Wed, Apr 29, 2020 at 6:34 PM Luke Cwik <lc...@google.com> wrote: > > >>>> > > >>>> +dev +Brian Hulette +Reuven Lax > > >>>> > > >>>> On Wed, Apr 29, 2020 at 4:21 AM Paolo Tomeo <p.tome...@gmail.com> > wrote: > > >>>>> > > >>>>> Hi all, > > >>>>> > > >>>>> I think the method AvroUtils.toBeamSchema has a not expected side > effect. > > >>>>> I found out that, if you invoke it and then you run a pipeline of > GenericRecords containing a timestamp (l tried with logical-type > timestamp-millis), Beam converts such timestamp from long to > org.joda.time.DateTime. Even if you don't apply any transformation to the > pipeline. > > >>>>> Do you think it's a bug? > > >>>>> > > >>>>> Below you can find a simple test class I wrote in order to > replicate the problem. > > >>>>> The first test passes while the second fails. > > >>>>> > > >>>>> > > >>>>> import org.apache.avro.Schema; > > >>>>> import org.apache.avro.SchemaBuilder; > > >>>>> import org.apache.avro.generic.GenericRecord; > > >>>>> import org.apache.avro.generic.GenericRecordBuilder; > > >>>>> import org.apache.beam.sdk.coders.AvroCoder; > > >>>>> import org.apache.beam.sdk.schemas.utils.AvroUtils; > > >>>>> import org.apache.beam.sdk.testing.TestPipeline; > > >>>>> import org.apache.beam.sdk.transforms.Combine; > > >>>>> import org.apache.beam.sdk.transforms.Create; > > >>>>> import org.apache.beam.sdk.transforms.SerializableFunction; > > >>>>> import org.junit.Rule; > > >>>>> > > >>>>> import java.sql.Timestamp; > > >>>>> > > >>>>> import static org.junit.Assert.assertEquals; > > >>>>> > > >>>>> public class AvroUtilsSideEffect { > > >>>>> > > >>>>> @Rule > > >>>>> public final transient TestPipeline pipeline = > TestPipeline.create(); > > >>>>> @Rule > > >>>>> public final transient TestPipeline pipeline2 = > TestPipeline.create(); > > >>>>> public final Schema testSchema = SchemaBuilder > > >>>>> .record("record").namespace("test") > > >>>>> .fields() > > >>>>> > .name("timestamp").type().longBuilder().prop("logicalType", > "timestamp-millis").endLong().noDefault() > > >>>>> .endRecord(); > > >>>>> public final GenericRecord record = new > GenericRecordBuilder(testSchema) > > >>>>> .set("timestamp", new > Timestamp(1563926400000L).getTime()) > > >>>>> .build(); > > >>>>> > > >>>>> > > >>>>> @org.junit.Test > > >>>>> public void test() { > > >>>>> pipeline.apply( > Create.of(record).withCoder(AvroCoder.of(testSchema))) > > >>>>> .apply( Combine.globally(new TestFn())); > > >>>>> > > >>>>> pipeline.run().waitUntilFinish(); > > >>>>> } > > >>>>> @org.junit.Test > > >>>>> public void test2() { > > >>>>> > > >>>>> AvroUtils.toBeamSchema(testSchema); > > >>>>> > > >>>>> > pipeline2.apply(Create.of(record).withCoder(AvroCoder.of(testSchema))) > > >>>>> .apply(Combine.globally(new TestFn())); > > >>>>> > > >>>>> pipeline2.run().waitUntilFinish(); > > >>>>> } > > >>>>> > > >>>>> public static class TestFn implements > SerializableFunction<Iterable<GenericRecord>, GenericRecord> { > > >>>>> > > >>>>> @Override > > >>>>> public GenericRecord apply(Iterable<GenericRecord> input) { > > >>>>> for (GenericRecord item : input) { > > >>>>> if(item != null){ > > >>>>> assertEquals(Long.class, > item.get("timestamp").getClass()); > > >>>>> assertEquals(1563926400000L, > item.get("timestamp")); > > >>>>> } > > >>>>> return item; > > >>>>> } > > >>>>> return null; > > >>>>> } > > >>>>> } > > >>>>> } > > >>>>> > > >>>>> Thanks, > > >>>>> Paolo > > >>>>> > > >>>>> -- > > >>>>> Paolo Tomeo, PhD > > >>>>> > > >>>>> Big Data and Machine Learning Engineer > > >>>>> > > >>>>> linkedin.com/in/ptomeo >