gaoran10 opened a new issue #9704:
URL: https://github.com/apache/pulsar/issues/9704


   **Describe the bug**
   If the producer uses the key-value schema in a separate mode and disables 
the batch feature, the Pulsar SQL will get in trouble.
   
   **To Reproduce**
   1. produce key-value schema data use separate mode
   
   set enableBatching to false
   ```
   @Cleanup
   PulsarClient pulsarClient = 
PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
   Schema<KeyValue<Stock, Stock>> schema = Schema.KeyValue(
           Schema.AVRO(Stock.class), Schema.AVRO(Stock.class), 
KeyValueEncodingType.SEPARATED);
   String topic = "kv-schema-test2";
   
   @Cleanup
   Producer<KeyValue<Stock, Stock>> producer = pulsarClient
           .newProducer(schema)
           .topic(topic)
           .enableBatching(false)
           .create();
   
   for (int i = 0; i < 10; i++) {
       producer.send(new KeyValue<>(
               new Stock(i, "K_STOCK_" + i, i * 100),
               new Stock(i, "V_STOCK_" + i, i * 100)));
   }
   ```
   
   ```
   @Data
   public class Stock {
       private int entryId;
       private String symbol;
       private double sharePrice;
   }
   ```
   
   2. query data by Pulsar SQL
   3. see the error logs
   
   
   ```
   2021-02-24T21:01:22.914+0800 INFO    20210224_130122_00004_yzrcx.1.0-0-114   
org.apache.pulsar.sql.presto.PulsarRecordCursor Initializing split with 
parameters: PulsarSplit{splitId=0, connectorId='pulsar', 
originSchemaName='kv-schema-test2', schemaName='public/default', 
tableName='kv-schema-test2', splitSize=5, 
schema='ä{"type":"record","name":"Stock","namespace":"org.apache.pulsar.tests.integration.presto","fields":[{"name":"entryId","type":"int"},{"name":"symbol","type":["null","string"],"default":null},{"name":"sharePrice","type":"double"}]}ä{"type":"record","name":"Stock","namespace":"org.apache.pulsar.tests.integration.presto","fields":[{"name":"entryId","type":"int"},{"name":"symbol","type":["null","string"],"default":null},{"name":"sharePrice","type":"double"}]}',
 schemaType=KEY_VALUE, startPositionEntryId=0, endPositionEntryId=5, 
startPositionLedgerId=12, endPositionLedgerId=12, 
schemaInfoProperties={"key.schema.properties":"{\"__alwaysAllowNull\":\"true\",\"__jsr310Conversio
 
nEnabled\":\"false\"}","value.schema.properties":"{\"__alwaysAllowNull\":\"true\",\"__jsr310ConversionEnabled\":\"false\"}","value.schema.type":"AVRO","key.schema.name":"","value.schema.name":"","kv.encoding.type":"SEPARATED","key.schema.type":"AVRO"}}
   2021-02-24T21:01:22.914+0800 INFO    20210224_130122_00004_yzrcx.1.0-1-106   
org.apache.pulsar.sql.presto.PulsarRecordCursor Initializing split with 
parameters: PulsarSplit{splitId=1, connectorId='pulsar', 
originSchemaName='kv-schema-test2', schemaName='public/default', 
tableName='kv-schema-test2', splitSize=4, 
schema='ä{"type":"record","name":"Stock","namespace":"org.apache.pulsar.tests.integration.presto","fields":[{"name":"entryId","type":"int"},{"name":"symbol","type":["null","string"],"default":null},{"name":"sharePrice","type":"double"}]}ä{"type":"record","name":"Stock","namespace":"org.apache.pulsar.tests.integration.presto","fields":[{"name":"entryId","type":"int"},{"name":"symbol","type":["null","string"],"default":null},{"name":"sharePrice","type":"double"}]}',
 schemaType=KEY_VALUE, startPositionEntryId=5, endPositionEntryId=9, 
startPositionLedgerId=12, endPositionLedgerId=12, 
schemaInfoProperties={"key.schema.properties":"{\"__alwaysAllowNull\":\"true\",\"__jsr310Conversio
 
nEnabled\":\"false\"}","value.schema.properties":"{\"__alwaysAllowNull\":\"true\",\"__jsr310ConversionEnabled\":\"false\"}","value.schema.type":"AVRO","key.schema.name":"","value.schema.name":"","kv.encoding.type":"SEPARATED","key.schema.type":"AVRO"}}
   2021-02-24T21:01:22.959+0800 INFO    20210224_130122_00004_yzrcx.1.0-1-106   
stderr  java.lang.ArrayIndexOutOfBoundsException: -52
   2021-02-24T21:01:22.959+0800 INFO    20210224_130122_00004_yzrcx.1.0-1-106   
stderr          at 
org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:460)
   2021-02-24T21:01:22.959+0800 INFO    20210224_130122_00004_yzrcx.1.0-1-106   
stderr          at 
org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:283)
   2021-02-24T21:01:22.959+0800 INFO    20210224_130122_00004_yzrcx.1.0-1-106   
stderr          at 
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:178)
   2021-02-24T21:01:22.959+0800 INFO    20210224_130122_00004_yzrcx.1.0-1-106   
stderr          at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
   2021-02-24T21:01:22.959+0800 INFO    20210224_130122_00004_yzrcx.1.0-1-106   
stderr          at 
org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:248)
   2021-02-24T21:01:22.959+0800 INFO    20210224_130122_00004_yzrcx.1.0-1-106   
stderr          at 
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:237)
   2021-02-24T21:01:22.960+0800 INFO    20210224_130122_00004_yzrcx.1.0-1-106   
stderr          at 
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:170)
   2021-02-24T21:01:22.960+0800 INFO    20210224_130122_00004_yzrcx.1.0-1-106   
stderr          at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
   2021-02-24T21:01:22.960+0800 INFO    20210224_130122_00004_yzrcx.1.0-1-106   
stderr          at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:144)
   2021-02-24T21:01:22.960+0800 INFO    20210224_130122_00004_yzrcx.1.0-1-106   
stderr          at 
org.apache.pulsar.client.impl.schema.generic.GenericAvroReader.read(GenericAvroReader.java:101)
   2021-02-24T21:01:22.960+0800 INFO    20210224_130122_00004_yzrcx.1.0-1-106   
stderr          at 
org.apache.pulsar.client.impl.schema.generic.GenericAvroReader.read(GenericAvroReader.java:42)
   2021-02-24T21:01:22.960+0800 INFO    20210224_130122_00004_yzrcx.1.0-1-106   
stderr          at 
org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader.read(AbstractMultiVersionReader.java:67)
   2021-02-24T21:01:22.960+0800 INFO    20210224_130122_00004_yzrcx.1.0-1-106   
stderr          at 
org.apache.pulsar.client.impl.schema.AbstractStructSchema.decode(AbstractStructSchema.java:65)
   2021-02-24T21:01:22.960+0800 INFO    20210224_130122_00004_yzrcx.1.0-1-106   
stderr          at 
org.apache.pulsar.sql.presto.decoder.avro.PulsarAvroRowDecoder.decodeRow(PulsarAvroRowDecoder.java:66)
   2021-02-24T21:01:22.960+0800 INFO    20210224_130122_00004_yzrcx.1.0-1-106   
stderr          at 
org.apache.pulsar.sql.presto.PulsarRecordCursor.advanceNextPosition(PulsarRecordCursor.java:499)
   2021-02-24T21:01:22.960+0800 INFO    20210224_130122_00004_yzrcx.1.0-1-106   
stderr          at 
io.prestosql.spi.connector.RecordPageSource.getNextPage(RecordPageSource.java:90)
   2021-02-24T21:01:22.960+0800 INFO    20210224_130122_00004_yzrcx.1.0-1-106   
stderr          at 
io.prestosql.operator.TableScanOperator.getOutput(TableScanOperator.java:302)
   2021-02-24T21:01:22.960+0800 INFO    20210224_130122_00004_yzrcx.1.0-1-106   
stderr          at io.prestosql.operator.Driver.processInternal(Driver.java:379)
   2021-02-24T21:01:22.960+0800 INFO    20210224_130122_00004_yzrcx.1.0-1-106   
stderr          at 
io.prestosql.operator.Driver.lambda$processFor$8(Driver.java:283)
   2021-02-24T21:01:22.960+0800 INFO    20210224_130122_00004_yzrcx.1.0-1-106   
stderr          at io.prestosql.operator.Driver.tryWithLock(Driver.java:675)
   2021-02-24T21:01:22.961+0800 INFO    20210224_130122_00004_yzrcx.1.0-1-106   
stderr          at io.prestosql.operator.Driver.processFor(Driver.java:276)
   2021-02-24T21:01:22.961+0800 INFO    20210224_130122_00004_yzrcx.1.0-1-106   
stderr          at 
io.prestosql.execution.SqlTaskExecution$DriverSplitRunner.processFor(SqlTaskExecution.java:1075)
   2021-02-24T21:01:22.961+0800 INFO    20210224_130122_00004_yzrcx.1.0-1-106   
stderr          at 
io.prestosql.execution.executor.PrioritizedSplitRunner.process(PrioritizedSplitRunner.java:163)
   2021-02-24T21:01:22.961+0800 INFO    20210224_130122_00004_yzrcx.1.0-1-106   
stderr          at 
io.prestosql.execution.executor.TaskExecutor$TaskRunner.run(TaskExecutor.java:484)
   2021-02-24T21:01:22.961+0800 INFO    20210224_130122_00004_yzrcx.1.0-1-106   
stderr          at 
io.prestosql.$gen.Presto_332__testversion____20210224_125443_2.run(Unknown 
Source)
   2021-02-24T21:01:22.961+0800 INFO    20210224_130122_00004_yzrcx.1.0-1-106   
stderr          at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
   2021-02-24T21:01:22.961+0800 INFO    20210224_130122_00004_yzrcx.1.0-1-106   
stderr          at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
   2021-02-24T21:01:22.961+0800 INFO    20210224_130122_00004_yzrcx.1.0-1-106   
stderr          at java.lang.Thread.run(Thread.java:748)
   2021-02-24T21:01:22.974+0800 ERROR   SplitRunner-5-106       
io.prestosql.execution.executor.TaskExecutor    Error processing Split 
20210224_130122_00004_yzrcx.1.0-1 PulsarSplit{splitId=1, connectorId='pulsar', 
originSchemaName='kv-schema-test2', schemaName='public/default', 
tableName='kv-schema-test2', splitSize=4, 
schema='ä{"type":"record","name":"Stock","namespace":"org.apache.pulsar.tests.integration.presto","fields":[{"name":"entryId","type":"int"},{"name":"symbol","type":["null","string"],"default":null},{"name":"sharePrice","type":"double"}]}ä{"type":"record","name":"Stock","namespace":"org.apache.pulsar.tests.integration.presto","fields":[{"name":"entryId","type":"int"},{"name":"symbol","type":["null","string"],"default":null},{"name":"sharePrice","type":"double"}]}',
 schemaType=KEY_VALUE, startPositionEntryId=5, endPositionEntryId=9, 
startPositionLedgerId=12, endPositionLedgerId=12, 
schemaInfoProperties={"key.schema.properties":"{\"__alwaysAllowNull\":\"true\",\"__jsr310Conversion
 
Enabled\":\"false\"}","value.schema.properties":"{\"__alwaysAllowNull\":\"true\",\"__jsr310ConversionEnabled\":\"false\"}","value.schema.type":"AVRO","key.schema.name":"","value.schema.name":"","kv.encoding.type":"SEPARATED","key.schema.type":"AVRO"}}
 (start = 2.86554176341414E8, wall = 53 ms, cpu = 0 ms, wait = 0 ms, calls = 
1): GENERIC_INTERNAL_ERROR: Decoding avro record failed.
   io.prestosql.spi.PrestoException: Decoding avro record failed.
        at 
org.apache.pulsar.sql.presto.decoder.avro.PulsarAvroRowDecoder.decodeRow(PulsarAvroRowDecoder.java:70)
        at 
org.apache.pulsar.sql.presto.PulsarRecordCursor.advanceNextPosition(PulsarRecordCursor.java:499)
        at 
io.prestosql.spi.connector.RecordPageSource.getNextPage(RecordPageSource.java:90)
        at 
io.prestosql.operator.TableScanOperator.getOutput(TableScanOperator.java:302)
        at io.prestosql.operator.Driver.processInternal(Driver.java:379)
        at io.prestosql.operator.Driver.lambda$processFor$8(Driver.java:283)
        at io.prestosql.operator.Driver.tryWithLock(Driver.java:675)
        at io.prestosql.operator.Driver.processFor(Driver.java:276)
        at 
io.prestosql.execution.SqlTaskExecution$DriverSplitRunner.processFor(SqlTaskExecution.java:1075)
        at 
io.prestosql.execution.executor.PrioritizedSplitRunner.process(PrioritizedSplitRunner.java:163)
        at 
io.prestosql.execution.executor.TaskExecutor$TaskRunner.run(TaskExecutor.java:484)
        at 
io.prestosql.$gen.Presto_332__testversion____20210224_125443_2.run(Unknown 
Source)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
   Caused by: java.lang.ArrayIndexOutOfBoundsException: -52
        at 
org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:460)
        at 
org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:283)
        at 
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:178)
        at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
        at 
org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:248)
        at 
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:237)
        at 
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:170)
        at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
        at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:144)
        at 
org.apache.pulsar.client.impl.schema.generic.GenericAvroReader.read(GenericAvroReader.java:101)
        at 
org.apache.pulsar.client.impl.schema.generic.GenericAvroReader.read(GenericAvroReader.java:42)
        at 
org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader.read(AbstractMultiVersionReader.java:67)
        at 
org.apache.pulsar.client.impl.schema.AbstractStructSchema.decode(AbstractStructSchema.java:65)
        at 
org.apache.pulsar.sql.presto.decoder.avro.PulsarAvroRowDecoder.decodeRow(PulsarAvroRowDecoder.java:66)
        ... 14 more
   ```
   
   **Expected behavior**
   Query data successfully.
   
   **Desktop (please complete the following information):**
    - OS: MacOS 11.2.1
   
   **Additional context**
   Maybe related to this method in the class `RawMessageImpl`.
   
   ```
   public Optional<ByteBuf> getKeyBytes() {
       if (getKey().isPresent()) {
           if (hasBase64EncodedKey()) {
               return 
Optional.of(Unpooled.wrappedBuffer(Base64.getDecoder().decode(getKey().get())));
           } else {
               return 
Optional.of(Unpooled.wrappedBuffer(getKey().get().getBytes(StandardCharsets.UTF_8)));
           }
       }
       return Optional.empty();
   }
   ```
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to