Created https://issues.apache.org/jira/browse/BEAM-9863 to track this.
Any taker?

On Thu, Apr 30, 2020 at 5:54 PM Reuven Lax <re...@google.com> wrote:
>
> I'm not sure who added that, but it's been there for a while. Making global 
> static changes like that in our module seems like poor form - I wonder if 
> there's a better approach.
>
> On Thu, Apr 30, 2020 at 8:36 AM Brian Hulette <bhule...@google.com> wrote:
>>
>> It seems likely this is a side effect of some static initialization in 
>> AvroUtils: 
>> https://github.com/apache/beam/blob/763b7ccd17a420eb634d6799adcd3ecfcf33d6a7/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java#L99
>>
>> On Wed, Apr 29, 2020 at 9:59 PM Reuven Lax <re...@google.com> wrote:
>>>
>>> I've copied this failing test into my client, and it passes for me. I can't 
>>> reproduce the failure.
>>>
>>> On Wed, Apr 29, 2020 at 6:34 PM Luke Cwik <lc...@google.com> wrote:
>>>>
>>>> +dev +Brian Hulette +Reuven Lax
>>>>
>>>> On Wed, Apr 29, 2020 at 4:21 AM Paolo Tomeo <p.tome...@gmail.com> wrote:
>>>>>
>>>>> Hi all,
>>>>>
>>>>> I think the method AvroUtils.toBeamSchema has a not expected side effect.
>>>>> I found out that, if you invoke it and then you run a pipeline of 
>>>>> GenericRecords containing a timestamp (l tried with logical-type 
>>>>> timestamp-millis), Beam converts such timestamp from long to 
>>>>> org.joda.time.DateTime. Even if you don't apply any transformation to the 
>>>>> pipeline.
>>>>> Do you think it's a bug?
>>>>>
>>>>> Below you can find a simple test class I wrote in order to replicate the 
>>>>> problem.
>>>>> The first test passes while the second fails.
>>>>>
>>>>>
>>>>> import org.apache.avro.Schema;
>>>>> import org.apache.avro.SchemaBuilder;
>>>>> import org.apache.avro.generic.GenericRecord;
>>>>> import org.apache.avro.generic.GenericRecordBuilder;
>>>>> import org.apache.beam.sdk.coders.AvroCoder;
>>>>> import org.apache.beam.sdk.schemas.utils.AvroUtils;
>>>>> import org.apache.beam.sdk.testing.TestPipeline;
>>>>> import org.apache.beam.sdk.transforms.Combine;
>>>>> import org.apache.beam.sdk.transforms.Create;
>>>>> import org.apache.beam.sdk.transforms.SerializableFunction;
>>>>> import org.junit.Rule;
>>>>>
>>>>> import java.sql.Timestamp;
>>>>>
>>>>> import static org.junit.Assert.assertEquals;
>>>>>
>>>>> public class AvroUtilsSideEffect {
>>>>>
>>>>>     @Rule
>>>>>     public final transient TestPipeline pipeline = TestPipeline.create();
>>>>>     @Rule
>>>>>     public final transient TestPipeline pipeline2 = TestPipeline.create();
>>>>>     public final Schema testSchema = SchemaBuilder
>>>>>             .record("record").namespace("test")
>>>>>             .fields()
>>>>>             .name("timestamp").type().longBuilder().prop("logicalType", 
>>>>> "timestamp-millis").endLong().noDefault()
>>>>>             .endRecord();
>>>>>     public final GenericRecord record = new 
>>>>> GenericRecordBuilder(testSchema)
>>>>>             .set("timestamp", new Timestamp(1563926400000L).getTime())
>>>>>             .build();
>>>>>
>>>>>
>>>>>     @org.junit.Test
>>>>>     public void test() {
>>>>>         pipeline.apply( 
>>>>> Create.of(record).withCoder(AvroCoder.of(testSchema)))
>>>>>                 .apply( Combine.globally(new TestFn()));
>>>>>
>>>>>         pipeline.run().waitUntilFinish();
>>>>>     }
>>>>>     @org.junit.Test
>>>>>     public void test2() {
>>>>>
>>>>>         AvroUtils.toBeamSchema(testSchema);
>>>>>
>>>>>         
>>>>> pipeline2.apply(Create.of(record).withCoder(AvroCoder.of(testSchema)))
>>>>>                 .apply(Combine.globally(new TestFn()));
>>>>>
>>>>>         pipeline2.run().waitUntilFinish();
>>>>>     }
>>>>>
>>>>>     public static class TestFn implements 
>>>>> SerializableFunction<Iterable<GenericRecord>, GenericRecord> {
>>>>>
>>>>>         @Override
>>>>>         public GenericRecord apply(Iterable<GenericRecord> input) {
>>>>>             for (GenericRecord item : input) {
>>>>>                 if(item != null){
>>>>>                     assertEquals(Long.class, 
>>>>> item.get("timestamp").getClass());
>>>>>                     assertEquals(1563926400000L, item.get("timestamp"));
>>>>>                 }
>>>>>                 return item;
>>>>>             }
>>>>>             return null;
>>>>>         }
>>>>>     }
>>>>> }
>>>>>
>>>>> Thanks,
>>>>> Paolo
>>>>>
>>>>> --
>>>>> Paolo Tomeo, PhD
>>>>>
>>>>> Big Data and Machine Learning Engineer
>>>>>
>>>>> linkedin.com/in/ptomeo

Reply via email to