gaoran10 opened a new issue #9704:
URL: https://github.com/apache/pulsar/issues/9704
**Describe the bug**
If the producer uses the key-value schema in a separate mode and disables
the batch feature, the Pulsar SQL will get in trouble.
**To Reproduce**
1. produce key-value schema data use separate mode
set enableBatching to false
```
@Cleanup
PulsarClient pulsarClient =
PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
Schema<KeyValue<Stock, Stock>> schema = Schema.KeyValue(
Schema.AVRO(Stock.class), Schema.AVRO(Stock.class),
KeyValueEncodingType.SEPARATED);
String topic = "kv-schema-test2";
@Cleanup
Producer<KeyValue<Stock, Stock>> producer = pulsarClient
.newProducer(schema)
.topic(topic)
.enableBatching(false)
.create();
for (int i = 0; i < 10; i++) {
producer.send(new KeyValue<>(
new Stock(i, "K_STOCK_" + i, i * 100),
new Stock(i, "V_STOCK_" + i, i * 100)));
}
```
```
@Data
public class Stock {
private int entryId;
private String symbol;
private double sharePrice;
}
```
2. query data by Pulsar SQL
3. see the error logs
```
2021-02-24T21:01:22.914+0800 INFO 20210224_130122_00004_yzrcx.1.0-0-114
org.apache.pulsar.sql.presto.PulsarRecordCursor Initializing split with
parameters: PulsarSplit{splitId=0, connectorId='pulsar',
originSchemaName='kv-schema-test2', schemaName='public/default',
tableName='kv-schema-test2', splitSize=5,
schema='ä{"type":"record","name":"Stock","namespace":"org.apache.pulsar.tests.integration.presto","fields":[{"name":"entryId","type":"int"},{"name":"symbol","type":["null","string"],"default":null},{"name":"sharePrice","type":"double"}]}ä{"type":"record","name":"Stock","namespace":"org.apache.pulsar.tests.integration.presto","fields":[{"name":"entryId","type":"int"},{"name":"symbol","type":["null","string"],"default":null},{"name":"sharePrice","type":"double"}]}',
schemaType=KEY_VALUE, startPositionEntryId=0, endPositionEntryId=5,
startPositionLedgerId=12, endPositionLedgerId=12,
schemaInfoProperties={"key.schema.properties":"{\"__alwaysAllowNull\":\"true\",\"__jsr310Conversio
nEnabled\":\"false\"}","value.schema.properties":"{\"__alwaysAllowNull\":\"true\",\"__jsr310ConversionEnabled\":\"false\"}","value.schema.type":"AVRO","key.schema.name":"","value.schema.name":"","kv.encoding.type":"SEPARATED","key.schema.type":"AVRO"}}
2021-02-24T21:01:22.914+0800 INFO 20210224_130122_00004_yzrcx.1.0-1-106
org.apache.pulsar.sql.presto.PulsarRecordCursor Initializing split with
parameters: PulsarSplit{splitId=1, connectorId='pulsar',
originSchemaName='kv-schema-test2', schemaName='public/default',
tableName='kv-schema-test2', splitSize=4,
schema='ä{"type":"record","name":"Stock","namespace":"org.apache.pulsar.tests.integration.presto","fields":[{"name":"entryId","type":"int"},{"name":"symbol","type":["null","string"],"default":null},{"name":"sharePrice","type":"double"}]}ä{"type":"record","name":"Stock","namespace":"org.apache.pulsar.tests.integration.presto","fields":[{"name":"entryId","type":"int"},{"name":"symbol","type":["null","string"],"default":null},{"name":"sharePrice","type":"double"}]}',
schemaType=KEY_VALUE, startPositionEntryId=5, endPositionEntryId=9,
startPositionLedgerId=12, endPositionLedgerId=12,
schemaInfoProperties={"key.schema.properties":"{\"__alwaysAllowNull\":\"true\",\"__jsr310Conversio
nEnabled\":\"false\"}","value.schema.properties":"{\"__alwaysAllowNull\":\"true\",\"__jsr310ConversionEnabled\":\"false\"}","value.schema.type":"AVRO","key.schema.name":"","value.schema.name":"","kv.encoding.type":"SEPARATED","key.schema.type":"AVRO"}}
2021-02-24T21:01:22.959+0800 INFO 20210224_130122_00004_yzrcx.1.0-1-106
stderr java.lang.ArrayIndexOutOfBoundsException: -52
2021-02-24T21:01:22.959+0800 INFO 20210224_130122_00004_yzrcx.1.0-1-106
stderr at
org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:460)
2021-02-24T21:01:22.959+0800 INFO 20210224_130122_00004_yzrcx.1.0-1-106
stderr at
org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:283)
2021-02-24T21:01:22.959+0800 INFO 20210224_130122_00004_yzrcx.1.0-1-106
stderr at
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:178)
2021-02-24T21:01:22.959+0800 INFO 20210224_130122_00004_yzrcx.1.0-1-106
stderr at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
2021-02-24T21:01:22.959+0800 INFO 20210224_130122_00004_yzrcx.1.0-1-106
stderr at
org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:248)
2021-02-24T21:01:22.959+0800 INFO 20210224_130122_00004_yzrcx.1.0-1-106
stderr at
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:237)
2021-02-24T21:01:22.960+0800 INFO 20210224_130122_00004_yzrcx.1.0-1-106
stderr at
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:170)
2021-02-24T21:01:22.960+0800 INFO 20210224_130122_00004_yzrcx.1.0-1-106
stderr at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
2021-02-24T21:01:22.960+0800 INFO 20210224_130122_00004_yzrcx.1.0-1-106
stderr at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:144)
2021-02-24T21:01:22.960+0800 INFO 20210224_130122_00004_yzrcx.1.0-1-106
stderr at
org.apache.pulsar.client.impl.schema.generic.GenericAvroReader.read(GenericAvroReader.java:101)
2021-02-24T21:01:22.960+0800 INFO 20210224_130122_00004_yzrcx.1.0-1-106
stderr at
org.apache.pulsar.client.impl.schema.generic.GenericAvroReader.read(GenericAvroReader.java:42)
2021-02-24T21:01:22.960+0800 INFO 20210224_130122_00004_yzrcx.1.0-1-106
stderr at
org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader.read(AbstractMultiVersionReader.java:67)
2021-02-24T21:01:22.960+0800 INFO 20210224_130122_00004_yzrcx.1.0-1-106
stderr at
org.apache.pulsar.client.impl.schema.AbstractStructSchema.decode(AbstractStructSchema.java:65)
2021-02-24T21:01:22.960+0800 INFO 20210224_130122_00004_yzrcx.1.0-1-106
stderr at
org.apache.pulsar.sql.presto.decoder.avro.PulsarAvroRowDecoder.decodeRow(PulsarAvroRowDecoder.java:66)
2021-02-24T21:01:22.960+0800 INFO 20210224_130122_00004_yzrcx.1.0-1-106
stderr at
org.apache.pulsar.sql.presto.PulsarRecordCursor.advanceNextPosition(PulsarRecordCursor.java:499)
2021-02-24T21:01:22.960+0800 INFO 20210224_130122_00004_yzrcx.1.0-1-106
stderr at
io.prestosql.spi.connector.RecordPageSource.getNextPage(RecordPageSource.java:90)
2021-02-24T21:01:22.960+0800 INFO 20210224_130122_00004_yzrcx.1.0-1-106
stderr at
io.prestosql.operator.TableScanOperator.getOutput(TableScanOperator.java:302)
2021-02-24T21:01:22.960+0800 INFO 20210224_130122_00004_yzrcx.1.0-1-106
stderr at io.prestosql.operator.Driver.processInternal(Driver.java:379)
2021-02-24T21:01:22.960+0800 INFO 20210224_130122_00004_yzrcx.1.0-1-106
stderr at
io.prestosql.operator.Driver.lambda$processFor$8(Driver.java:283)
2021-02-24T21:01:22.960+0800 INFO 20210224_130122_00004_yzrcx.1.0-1-106
stderr at io.prestosql.operator.Driver.tryWithLock(Driver.java:675)
2021-02-24T21:01:22.961+0800 INFO 20210224_130122_00004_yzrcx.1.0-1-106
stderr at io.prestosql.operator.Driver.processFor(Driver.java:276)
2021-02-24T21:01:22.961+0800 INFO 20210224_130122_00004_yzrcx.1.0-1-106
stderr at
io.prestosql.execution.SqlTaskExecution$DriverSplitRunner.processFor(SqlTaskExecution.java:1075)
2021-02-24T21:01:22.961+0800 INFO 20210224_130122_00004_yzrcx.1.0-1-106
stderr at
io.prestosql.execution.executor.PrioritizedSplitRunner.process(PrioritizedSplitRunner.java:163)
2021-02-24T21:01:22.961+0800 INFO 20210224_130122_00004_yzrcx.1.0-1-106
stderr at
io.prestosql.execution.executor.TaskExecutor$TaskRunner.run(TaskExecutor.java:484)
2021-02-24T21:01:22.961+0800 INFO 20210224_130122_00004_yzrcx.1.0-1-106
stderr at
io.prestosql.$gen.Presto_332__testversion____20210224_125443_2.run(Unknown
Source)
2021-02-24T21:01:22.961+0800 INFO 20210224_130122_00004_yzrcx.1.0-1-106
stderr at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
2021-02-24T21:01:22.961+0800 INFO 20210224_130122_00004_yzrcx.1.0-1-106
stderr at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
2021-02-24T21:01:22.961+0800 INFO 20210224_130122_00004_yzrcx.1.0-1-106
stderr at java.lang.Thread.run(Thread.java:748)
2021-02-24T21:01:22.974+0800 ERROR SplitRunner-5-106
io.prestosql.execution.executor.TaskExecutor Error processing Split
20210224_130122_00004_yzrcx.1.0-1 PulsarSplit{splitId=1, connectorId='pulsar',
originSchemaName='kv-schema-test2', schemaName='public/default',
tableName='kv-schema-test2', splitSize=4,
schema='ä{"type":"record","name":"Stock","namespace":"org.apache.pulsar.tests.integration.presto","fields":[{"name":"entryId","type":"int"},{"name":"symbol","type":["null","string"],"default":null},{"name":"sharePrice","type":"double"}]}ä{"type":"record","name":"Stock","namespace":"org.apache.pulsar.tests.integration.presto","fields":[{"name":"entryId","type":"int"},{"name":"symbol","type":["null","string"],"default":null},{"name":"sharePrice","type":"double"}]}',
schemaType=KEY_VALUE, startPositionEntryId=5, endPositionEntryId=9,
startPositionLedgerId=12, endPositionLedgerId=12,
schemaInfoProperties={"key.schema.properties":"{\"__alwaysAllowNull\":\"true\",\"__jsr310Conversion
Enabled\":\"false\"}","value.schema.properties":"{\"__alwaysAllowNull\":\"true\",\"__jsr310ConversionEnabled\":\"false\"}","value.schema.type":"AVRO","key.schema.name":"","value.schema.name":"","kv.encoding.type":"SEPARATED","key.schema.type":"AVRO"}}
(start = 2.86554176341414E8, wall = 53 ms, cpu = 0 ms, wait = 0 ms, calls =
1): GENERIC_INTERNAL_ERROR: Decoding avro record failed.
io.prestosql.spi.PrestoException: Decoding avro record failed.
at
org.apache.pulsar.sql.presto.decoder.avro.PulsarAvroRowDecoder.decodeRow(PulsarAvroRowDecoder.java:70)
at
org.apache.pulsar.sql.presto.PulsarRecordCursor.advanceNextPosition(PulsarRecordCursor.java:499)
at
io.prestosql.spi.connector.RecordPageSource.getNextPage(RecordPageSource.java:90)
at
io.prestosql.operator.TableScanOperator.getOutput(TableScanOperator.java:302)
at io.prestosql.operator.Driver.processInternal(Driver.java:379)
at io.prestosql.operator.Driver.lambda$processFor$8(Driver.java:283)
at io.prestosql.operator.Driver.tryWithLock(Driver.java:675)
at io.prestosql.operator.Driver.processFor(Driver.java:276)
at
io.prestosql.execution.SqlTaskExecution$DriverSplitRunner.processFor(SqlTaskExecution.java:1075)
at
io.prestosql.execution.executor.PrioritizedSplitRunner.process(PrioritizedSplitRunner.java:163)
at
io.prestosql.execution.executor.TaskExecutor$TaskRunner.run(TaskExecutor.java:484)
at
io.prestosql.$gen.Presto_332__testversion____20210224_125443_2.run(Unknown
Source)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ArrayIndexOutOfBoundsException: -52
at
org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:460)
at
org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:283)
at
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:178)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
at
org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:248)
at
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:237)
at
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:170)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:144)
at
org.apache.pulsar.client.impl.schema.generic.GenericAvroReader.read(GenericAvroReader.java:101)
at
org.apache.pulsar.client.impl.schema.generic.GenericAvroReader.read(GenericAvroReader.java:42)
at
org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader.read(AbstractMultiVersionReader.java:67)
at
org.apache.pulsar.client.impl.schema.AbstractStructSchema.decode(AbstractStructSchema.java:65)
at
org.apache.pulsar.sql.presto.decoder.avro.PulsarAvroRowDecoder.decodeRow(PulsarAvroRowDecoder.java:66)
... 14 more
```
**Expected behavior**
Query data successfully.
**Desktop (please complete the following information):**
- OS: MacOS 11.2.1
**Additional context**
Maybe related to this method in the class `RawMessageImpl`.
```
public Optional<ByteBuf> getKeyBytes() {
if (getKey().isPresent()) {
if (hasBase64EncodedKey()) {
return
Optional.of(Unpooled.wrappedBuffer(Base64.getDecoder().decode(getKey().get())));
} else {
return
Optional.of(Unpooled.wrappedBuffer(getKey().get().getBytes(StandardCharsets.UTF_8)));
}
}
return Optional.empty();
}
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]