dmcarver opened a new issue #11288:
URL: https://github.com/apache/pulsar/issues/11288
**Describe the bug**
I have a simple Python producer that has been sending messages to Pulsar
since 2.6.1 with `batching_enabled=True`. These messages are processed an
instance of the built-in InfluxDB sink connector. Since upgrading to 2.8.0, I
receive this warning and the records are not processed:
```
12:53:20.020 [pool-2-thread-1] WARN org.apache.pulsar.io.influxdb.BatchSink
- Record flush thread was exception
org.apache.pulsar.client.api.SchemaSerializationException: measurement is a
required field.
at
org.apache.pulsar.io.influxdb.v1.InfluxDBGenericRecordSink.buildPoint(InfluxDBGenericRecordSink.java:55)
~[pulsar-io-influxdb-2.8.0.nar-unpacked/:?]
at
org.apache.pulsar.io.influxdb.v1.InfluxDBGenericRecordSink.buildPoint(InfluxDBGenericRecordSink.java:41)
~[pulsar-io-influxdb-2.8.0.nar-unpacked/:?]
at org.apache.pulsar.io.influxdb.BatchSink.flush(BatchSink.java:85)
~[pulsar-io-influxdb-2.8.0.nar-unpacked/:?]
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
[?:1.8.0_292]
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
[?:1.8.0_292]
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
[?:1.8.0_292]
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
[?:1.8.0_292]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[?:1.8.0_292]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[?:1.8.0_292]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_292]
```
My messages do have a simple schema required by the sink and `measurement`
(as seen above) is defined and present in them. More importantly, messages sent
with `batching_enabled=False` continue to work fine.
I added some additional logging to the sink and was able to observe that
when `batching_enabled=False`, records seen in
`org.apache.pulsar.io.influxdb.BatchSink.flush()` are of class
`GenericJsonRecord`, but when batching is set to True, they are of
`GenericObjectWrapper`. In 2.7.x, they are `GenericJsonRecord` in both cases.
I also tested using Avro instead of JSON, with the same behavior.
**To Reproduce**
Steps to reproduce the behavior:
1. Create a simple Python producer:
```
from pulsar import Client
from pulsar.schema import JsonSchema, Record, String
class Test_Schema(Record):
measurement = String(required=True)
field1 = String()
client = Client('pulsar://localhost:6650')
producer = client.create_producer(topic='mytopic',
schema=JsonSchema(Test_Schema), batching_enabled=True)
producer.send(Test_Schema(measurement='foo', field1='bar'))
```
2. Connect InfluxDB sink to 'mytopic' in localrun mode. I am using this
config:
```
configs:
influxdbUrl: "http://influxdb:8086"
database: "mydb"
consistencyLevel: "ONE"
logLevel: "NONE"
retentionPolicy: "autogen"
batchTimeMs: 1000
batchSize: 100
```
3. Run script
4. See error
**Expected behavior**
No error generated. Message sent to InfluxDB and written to database.
**Additional context**
As mentioned, this same code works without issue on Pulsar 2.6.1 and 2.7.x.
The Python client version does not seem to matter either (I tested using 2.6.1
and 2.8.0). It is only during the recent Pulsar upgrade to 2.8.0 that the batch
processing no longer works.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]