I've copied this failing test into my client, and it passes for me. I can't
reproduce the failure.

On Wed, Apr 29, 2020 at 6:34 PM Luke Cwik <lc...@google.com> wrote:

> +dev <dev@beam.apache.org> +Brian Hulette <bhule...@google.com> +Reuven
> Lax <re...@google.com>
>
> On Wed, Apr 29, 2020 at 4:21 AM Paolo Tomeo <p.tome...@gmail.com> wrote:
>
>> Hi all,
>>
>> I think the method AvroUtils.toBeamSchema has a not expected side effect.
>> I found out that, if you invoke it and then you run a pipeline of
>> GenericRecords containing a timestamp (l tried with logical-type
>> timestamp-millis), Beam converts such timestamp from long to
>> org.joda.time.DateTime. Even if you don't apply any transformation to the
>> pipeline.
>> Do you think it's a bug?
>>
>> Below you can find a simple test class I wrote in order to replicate the
>> problem.
>> The first test passes while the second fails.
>>
>>
>> import org.apache.avro.Schema;
>> import org.apache.avro.SchemaBuilder;
>> import org.apache.avro.generic.GenericRecord;
>> import org.apache.avro.generic.GenericRecordBuilder;
>> import org.apache.beam.sdk.coders.AvroCoder;
>> import org.apache.beam.sdk.schemas.utils.AvroUtils;
>> import org.apache.beam.sdk.testing.TestPipeline;
>> import org.apache.beam.sdk.transforms.Combine;
>> import org.apache.beam.sdk.transforms.Create;
>> import org.apache.beam.sdk.transforms.SerializableFunction;
>> import org.junit.Rule;
>>
>> import java.sql.Timestamp;
>>
>> import static org.junit.Assert.assertEquals;
>>
>> public class AvroUtilsSideEffect {
>>
>>     @Rule
>>     public final transient TestPipeline pipeline = TestPipeline.create();
>>     @Rule
>>     public final transient TestPipeline pipeline2 = TestPipeline.create();
>>     public final Schema testSchema = SchemaBuilder
>>             .record("record").namespace("test")
>>             .fields()
>>             .name("timestamp").type().longBuilder().prop("logicalType", 
>> "timestamp-millis").endLong().noDefault()
>>             .endRecord();
>>     public final GenericRecord record = new GenericRecordBuilder(testSchema)
>>             .set("timestamp", new Timestamp(1563926400000L).getTime())
>>             .build();
>>
>>
>>     @org.junit.Test
>>     public void test() {
>>         pipeline.apply( 
>> Create.of(record).withCoder(AvroCoder.of(testSchema)))
>>                 .apply( Combine.globally(new TestFn()));
>>
>>         pipeline.run().waitUntilFinish();
>>     }
>>     @org.junit.Test
>>     public void test2() {
>>
>>         AvroUtils.toBeamSchema(testSchema);
>>
>>         
>> pipeline2.apply(Create.of(record).withCoder(AvroCoder.of(testSchema)))
>>                 .apply(Combine.globally(new TestFn()));
>>
>>         pipeline2.run().waitUntilFinish();
>>     }
>>
>>     public static class TestFn implements 
>> SerializableFunction<Iterable<GenericRecord>, GenericRecord> {
>>
>>         @Override
>>         public GenericRecord apply(Iterable<GenericRecord> input) {
>>             for (GenericRecord item : input) {
>>                 if(item != null){
>>                     assertEquals(Long.class, 
>> item.get("timestamp").getClass());
>>                     assertEquals(1563926400000L, item.get("timestamp"));
>>                 }
>>                 return item;
>>             }
>>             return null;
>>         }
>>     }
>> }
>>
>> Thanks,
>> Paolo
>>
>> --
>> Paolo Tomeo, PhD
>>
>> Big Data and Machine Learning Engineer
>>
>> linkedin.com/in/ptomeo <https://www.linkedin.com/in/ptomeo>
>>
>

Reply via email to