tonisojandu-sympower opened a new issue #11819:
URL: https://github.com/apache/pulsar/issues/11819


   I think we found a bug in the Pulsar Java Client. You maybe aware of it or 
you may have already addressed it. When writing Avro Specific Records into 
Pulsar producer then it uses shaded Avro code. This means that when it reaches:
   
`org.apache.pulsar.shade.org.apache.avro.specific.SpecificDatumWriter.writeField(Object
 datum, Field f, Encoder out, Object state)`
   ```
   at 
org.apache.pulsar.shade.org.apache.avro.specific.SpecificDatumWriter.writeField(SpecificDatumWriter.java:101)
   at 
org.apache.pulsar.shade.org.apache.avro.reflect.ReflectDatumWriter.writeField(ReflectDatumWriter.java:184)
   at 
org.apache.pulsar.shade.org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:210)
   at 
org.apache.pulsar.shade.org.apache.avro.specific.SpecificDatumWriter.writeRecord(SpecificDatumWriter.java:83)
   at 
org.apache.pulsar.shade.org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:131)
   at 
org.apache.pulsar.shade.org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:83)
   at 
org.apache.pulsar.shade.org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:158)
   at 
org.apache.pulsar.shade.org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:73)
   at 
org.apache.pulsar.client.impl.schema.writer.AvroWriter.write(AvroWriter.java:53)
   at 
org.apache.pulsar.client.impl.schema.AbstractStructSchema.encode(AbstractStructSchema.java:57)
   at 
org.apache.pulsar.client.impl.TypedMessageBuilderImpl.value(TypedMessageBuilderImpl.java:175)
   at 
net.sympower.usernotification.disturbance.DisturbanceStartedUseCaseIT.disturbance(DisturbanceStartedUseCaseIT.kt:58)
   ```
   it has a decision to make:
   ```
   if (datum instanceof SpecificRecordBase) {
     Conversion<?> conversion = 
((SpecificRecordBase)datum).getConversion(f.pos());
     Schema fieldSchema = f.schema();
     LogicalType logicalType = fieldSchema.getLogicalType();
     Object value = this.getData().getField(datum, f.name(), f.pos());
     if (conversion != null && logicalType != null) {
        value = this.convert(fieldSchema, logicalType, conversion, value);
     }
     this.writeWithoutConversion(fieldSchema, value, out);
   } else {
     super.writeField(datum, f, out, state);
   }
   ```
   Since Specific Avro classes created by Avro code generators do not extend 
this shaded one:
   `org.apache.pulsar.shade.org.apache.avro.specific.SpecificRecordBase`
   
   It will end up taking the else path that serializes without special 
conversion logic provided by the Specific Avro record. We found out about it 
when we used our generated Avro class that had a field with `timestamp-millis` 
logical type (`long` in reality). Generator was able to turn it into an Instant 
on language level. Since the the generated custom conversion logic is ignored 
then in ends up serializing it as an `IndexedRecord` instead. This path cannot 
handle Instant class when it is returned the record.
   
   We are not sure what the correct path is here. We got it working by just 
using Pulsar Admin Java Client. For some reason, for the latter,
   `org.apache.pulsar.client.impl.schema.writer.AvroWriter` does not use the 
shaded `org.apache.pulsar.shade.org.apache.avro.generic.GenericDatumWriter` but 
the official
   `org.apache.avro.generic.GenericDatumWriter` . This might be some build-time 
 before/after shading order of operation thing. So if the shading is important, 
then this may be also a bug.
   
   Some blame is maybe on the the code generator that generates 
`IndexRecord.get(int field)` method that does not do the immediate conversion, 
but I am not quite sure, if that would be by the Avro spec.
   Our exception:
   
   ```
   Caused by: java.lang.ClassCastException: class java.time.Instant cannot be 
cast to class java.lang.Number (java.time.Instant and java.lang.Number are in 
module java.base of loader 'bootstrap') in field startedAt
     at 
org.apache.pulsar.shade.org.apache.avro.generic.GenericDatumWriter.addClassCastMsg(GenericDatumWriter.java:191)
     at 
org.apache.pulsar.shade.org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:229)
     at 
org.apache.pulsar.shade.org.apache.avro.specific.SpecificDatumWriter.writeField(SpecificDatumWriter.java:101)
     at 
org.apache.pulsar.shade.org.apache.avro.reflect.ReflectDatumWriter.writeField(ReflectDatumWriter.java:184)
     at 
org.apache.pulsar.shade.org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:210)
     at 
org.apache.pulsar.shade.org.apache.avro.specific.SpecificDatumWriter.writeRecord(SpecificDatumWriter.java:83)
     at 
org.apache.pulsar.shade.org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:131)
     at 
org.apache.pulsar.shade.org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:83)
     at 
org.apache.pulsar.shade.org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:158)
     at 
org.apache.pulsar.shade.org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:73)
     at 
org.apache.pulsar.client.impl.schema.writer.AvroWriter.write(AvroWriter.java:53)
     ... 93 more
   Caused by: java.lang.ClassCastException: class java.time.Instant cannot be 
cast to class java.lang.Number (java.time.Instant and java.lang.Number are in 
module java.base of loader 'bootstrap')
     at 
org.apache.pulsar.shade.org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:160)
     at 
org.apache.pulsar.shade.org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:81)
     at 
org.apache.pulsar.shade.org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:158)
     at 
org.apache.pulsar.shade.org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:221)
   ```
   
   To reproduce:
   1) using Avro Schema:
   `src/main/avro/some-object.avsc`
   ```json
   {
     "name": "SomeObject",
     "namespace": "test",
     "type": "record",
     "version": "1",
     "fields" : [
       {
         "name": "startedAt",
         "type": {
           "type":  "long",
           "logicalType": "timestamp-millis"
         }
       }
     ]
   }
   ```
   generate a class using Gradle plugin:
   ```kotlin
   plugins {
     kotlin("jvm") version "1.5.21"
     kotlin("plugin.spring") version "1.5.21"
     id("com.github.davidmc24.gradle.plugin.avro") version "1.2.1"
     id("maven-publish")
   }
   
   dependencies {
     implementation("org.apache.avro:avro:1.10.2")
     
implementation("com.github.davidmc24.gradle.plugin:gradle-avro-plugin:1.2.1")
     implementation("org.apache.pulsar:pulsar-client:2.8.0")
   }
   ```
   2) Send an instance of this object to a producer:
   ```kotlin
   val producer = 
pulsarClient.newProducer(AvroSchema.of(SomeObject::class.java))
         .topic("some-topic")
         .create()
   val someObject = SomeObject.newBuilder()
         .setStartedAt(Instant.now(clock))
         .build()
   producer.newMessage().value(someObject).send()
   ```
   
   **Expected behaviour:**
   Custom converters are used when sending Avro Specific Records


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to