dmcarver opened a new issue #11288:
URL: https://github.com/apache/pulsar/issues/11288


   **Describe the bug**
   I have a simple Python producer that has been sending messages to Pulsar 
since 2.6.1 with `batching_enabled=True`. These messages are processed an 
instance of the built-in InfluxDB sink connector. Since upgrading to 2.8.0, I 
receive this warning and the records are not processed:
   
   ```
   12:53:20.020 [pool-2-thread-1] WARN  org.apache.pulsar.io.influxdb.BatchSink 
- Record flush thread was exception
   org.apache.pulsar.client.api.SchemaSerializationException: measurement is a 
required field.
           at 
org.apache.pulsar.io.influxdb.v1.InfluxDBGenericRecordSink.buildPoint(InfluxDBGenericRecordSink.java:55)
 ~[pulsar-io-influxdb-2.8.0.nar-unpacked/:?]
           at 
org.apache.pulsar.io.influxdb.v1.InfluxDBGenericRecordSink.buildPoint(InfluxDBGenericRecordSink.java:41)
 ~[pulsar-io-influxdb-2.8.0.nar-unpacked/:?]
           at org.apache.pulsar.io.influxdb.BatchSink.flush(BatchSink.java:85) 
~[pulsar-io-influxdb-2.8.0.nar-unpacked/:?]
           at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
[?:1.8.0_292]
           at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) 
[?:1.8.0_292]
           at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
 [?:1.8.0_292]
           at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
 [?:1.8.0_292]
           at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
[?:1.8.0_292]
           at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
[?:1.8.0_292]
           at java.lang.Thread.run(Thread.java:748) [?:1.8.0_292]
   ```
   
   My messages do have a simple schema required by the sink and `measurement` 
(as seen above) is defined and present in them. More importantly, messages sent 
with `batching_enabled=False` continue to work fine. 
   
   I added some additional logging to the sink and was able to observe that 
when `batching_enabled=False`, records seen in 
`org.apache.pulsar.io.influxdb.BatchSink.flush()` are of class 
`GenericJsonRecord`, but when batching is set to True, they are of 
`GenericObjectWrapper`. In 2.7.x, they are `GenericJsonRecord` in both cases.
   
   I also tested using Avro instead of JSON, with the same behavior.
   
   **To Reproduce**
   Steps to reproduce the behavior:
   1. Create a simple Python producer:
   ```
   from pulsar import Client
   from pulsar.schema import JsonSchema, Record, String
   
   class Test_Schema(Record):
       measurement = String(required=True)
       field1 = String()
   
   client = Client('pulsar://localhost:6650')
   producer = client.create_producer(topic='mytopic', 
schema=JsonSchema(Test_Schema), batching_enabled=True)
   producer.send(Test_Schema(measurement='foo', field1='bar'))
   ```
   2. Connect InfluxDB sink to 'mytopic' in localrun mode. I am using this 
config:
   ```
   configs:
     influxdbUrl: "http://influxdb:8086";
     database: "mydb"
     consistencyLevel: "ONE"
     logLevel: "NONE"
     retentionPolicy: "autogen"
     batchTimeMs: 1000
     batchSize: 100
   ```
   3. Run script
   4. See error
   
   **Expected behavior**
   No error generated. Message sent to InfluxDB and written to database.
   
   **Additional context**
   As mentioned, this same code works without issue on Pulsar 2.6.1 and 2.7.x. 
The Python client version does not seem to matter either (I tested using 2.6.1 
and 2.8.0). It is only during the recent Pulsar upgrade to 2.8.0 that the batch 
processing no longer works.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to