This is an automated email from the ASF dual-hosted git repository.

yong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 371b311  [Pulsar SQL] Fix Pulsar SQL query bytes schema data error 
(#9631)
371b311 is described below

commit 371b311eac93ac1cf5d5a3ceeeb36f7ae11538a1
Author: ran <[email protected]>
AuthorDate: Thu Feb 25 16:49:53 2021 +0800

    [Pulsar SQL] Fix Pulsar SQL query bytes schema data error (#9631)
    
    ### Motivation
    
    Currently, the Pulsar SQL query bytes schema data will cause an error.
    
    *Reproduce*
    1. produce bytes schema data.
    2. query data by the Pulsar SQL.
    3. the error log could be seen.
    
    *Error log*
    ```
    com.google.common.util.concurrent.UncheckedExecutionException: 
java.nio.BufferUnderflowException
        at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2051)
        at com.google.common.cache.LocalCache.get(LocalCache.java:3951)
        at com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974)
        at 
com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4935)
        at 
org.apache.pulsar.sql.presto.PulsarSqlSchemaInfoProvider.getSchemaByVersion(PulsarSqlSchemaInfoProvider.java:76)
        at 
org.apache.pulsar.sql.presto.PulsarRecordCursor.advanceNextPosition(PulsarRecordCursor.java:471)
        at 
io.prestosql.spi.connector.RecordPageSource.getNextPage(RecordPageSource.java:90)
        at 
io.prestosql.operator.TableScanOperator.getOutput(TableScanOperator.java:302)
        at io.prestosql.operator.Driver.processInternal(Driver.java:379)
        at io.prestosql.operator.Driver.lambda$processFor$8(Driver.java:283)
        at io.prestosql.operator.Driver.tryWithLock(Driver.java:675)
        at io.prestosql.operator.Driver.processFor(Driver.java:276)
        at 
io.prestosql.execution.SqlTaskExecution$DriverSplitRunner.processFor(SqlTaskExecution.java:1075)
        at 
io.prestosql.execution.executor.PrioritizedSplitRunner.process(PrioritizedSplitRunner.java:163)
        at 
io.prestosql.execution.executor.TaskExecutor$TaskRunner.run(TaskExecutor.java:484)
        at 
io.prestosql.$gen.Presto_332__testversion____20210219_094906_2.run(Unknown 
Source)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
    Caused by: java.nio.BufferUnderflowException
        at java.nio.Buffer.nextGetIndex(Buffer.java:509)
        at java.nio.HeapByteBuffer.getLong(HeapByteBuffer.java:415)
        at 
org.apache.pulsar.sql.presto.PulsarSqlSchemaInfoProvider.loadSchema(PulsarSqlSchemaInfoProvider.java:106)
        at 
org.apache.pulsar.sql.presto.PulsarSqlSchemaInfoProvider.access$000(PulsarSqlSchemaInfoProvider.java:49)
        at 
org.apache.pulsar.sql.presto.PulsarSqlSchemaInfoProvider$1.load(PulsarSqlSchemaInfoProvider.java:61)
        at 
org.apache.pulsar.sql.presto.PulsarSqlSchemaInfoProvider$1.load(PulsarSqlSchemaInfoProvider.java:58)
        at 
com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3529)
        at 
com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2278)
        at 
com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2155)
        at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2045)
        ... 18 more
    
    ```
    
    ### Modifications
    
    Add check for bytes schema, if the schema is bytes schema use the schema 
info of the bytes schema directly.
    
    ### Verifying this change
    
    Add a new integration test for different schemas.
    
    ### Does this pull request potentially affect one of the following parts:
    
    *If `yes` was chosen, please highlight the changes*
    
      - Dependencies (does it add or upgrade a dependency): (no)
      - The public API: (no)
      - The schema: (no)
      - The default values of configurations: (no)
      - The wire protocol: (no)
      - The rest endpoints: (no)
      - The admin cli options: (no)
      - Anything that affects deployment: (no)
---
 .../pulsar/sql/presto/PulsarRecordCursor.java      |  19 +-
 .../tests/integration/presto/TestBasicPresto.java  | 216 +++++++++++++++------
 .../presto/TestPrestoQueryTieredStorage.java       |  22 ++-
 .../integration/presto/TestPulsarSQLBase.java      |  49 +++--
 4 files changed, 233 insertions(+), 73 deletions(-)

diff --git 
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
 
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
index c782de6..1171902 100644
--- 
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
+++ 
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
@@ -54,6 +54,7 @@ import org.apache.bookkeeper.mledger.ReadOnlyCursor;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.bookkeeper.mledger.impl.ReadOnlyCursorImpl;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.impl.schema.KeyValueSchemaInfo;
 import org.apache.pulsar.common.api.raw.MessageParser;
 import org.apache.pulsar.common.api.raw.RawMessage;
@@ -441,9 +442,11 @@ public class PulsarRecordCursor implements RecordCursor {
         //start time for deseralizing record
         metricsTracker.start_RECORD_DESERIALIZE_TIME();
 
-        SchemaInfo schemaInfo;
+        SchemaInfo schemaInfo = 
getBytesSchemaInfo(pulsarSplit.getSchemaType(), pulsarSplit.getSchemaName());
         try {
-            schemaInfo =  
schemaInfoProvider.getSchemaByVersion(this.currentMessage.getSchemaVersion()).get();
+            if (schemaInfo == null) {
+                schemaInfo =  
schemaInfoProvider.getSchemaByVersion(this.currentMessage.getSchemaVersion()).get();
+            }
         } catch (InterruptedException | ExecutionException e) {
             throw new RuntimeException(e);
         }
@@ -560,6 +563,18 @@ public class PulsarRecordCursor implements RecordCursor {
         return true;
     }
 
+    private SchemaInfo getBytesSchemaInfo(SchemaType schemaType, String 
schemaName) {
+        if (!schemaType.equals(SchemaType.BYTES) && 
!schemaType.equals(SchemaType.NONE)) {
+            return null;
+        }
+        if (schemaName.equals(Schema.BYTES.getSchemaInfo().getName())) {
+            return Schema.BYTES.getSchemaInfo();
+        } else if 
(schemaName.equals(Schema.BYTEBUFFER.getSchemaInfo().getName())) {
+            return Schema.BYTEBUFFER.getSchemaInfo();
+        } else {
+            return Schema.BYTES.getSchemaInfo();
+        }
+    }
 
     @Override
     public boolean getBoolean(int field) {
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java
index 5597a95..fe8ba7e 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java
@@ -18,26 +18,31 @@
  */
 package org.apache.pulsar.tests.integration.presto;
 
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.nio.ByteBuffer;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.schema.AvroSchema;
 import org.apache.pulsar.client.impl.schema.JSONSchema;
+import org.apache.pulsar.client.impl.schema.KeyValueSchema;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.schema.KeyValue;
 import org.apache.pulsar.common.schema.KeyValueEncodingType;
-import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
-import org.awaitility.Awaitility;
+import org.apache.pulsar.common.schema.SchemaType;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
-import java.util.concurrent.TimeUnit;
-
-import static org.assertj.core.api.Assertions.assertThat;
 
+/**
+ * Test basic Pulsar SQL query, the Pulsar SQL is standalone mode.
+ */
 @Slf4j
 public class TestBasicPresto extends TestPulsarSQLBase {
 
@@ -55,85 +60,124 @@ public class TestBasicPresto extends TestPulsarSQLBase {
         pulsarCluster.stopPrestoWorker();
     }
 
+    @DataProvider(name = "schemaProvider")
+    public Object[][] schemaProvider() {
+        return new Object[][] {
+                { Schema.BYTES},
+                { Schema.BYTEBUFFER},
+                { Schema.STRING},
+                { AvroSchema.of(Stock.class)},
+                { JSONSchema.of(Stock.class)},
+                { Schema.KeyValue(Schema.AVRO(Stock.class), 
Schema.AVRO(Stock.class), KeyValueEncodingType.INLINE) },
+                { Schema.KeyValue(Schema.AVRO(Stock.class), 
Schema.AVRO(Stock.class), KeyValueEncodingType.SEPARATED) }
+        };
+    }
+
     @Test
     public void testSimpleSQLQueryBatched() throws Exception {
         TopicName topicName = TopicName.get("public/default/stocks_batched_" + 
randomName(5));
-        pulsarSQLBasicTest(topicName, true, false);
+        pulsarSQLBasicTest(topicName, true, false, JSONSchema.of(Stock.class));
     }
 
     @Test
     public void testSimpleSQLQueryNonBatched() throws Exception {
         TopicName topicName = 
TopicName.get("public/default/stocks_nonbatched_" + randomName(5));
-        pulsarSQLBasicTest(topicName, false, false);
+        pulsarSQLBasicTest(topicName, false, false, 
JSONSchema.of(Stock.class));
     }
 
-    @DataProvider(name = "keyValueEncodingType")
-    public Object[][] keyValueEncodingType() {
-        return new Object[][] { { KeyValueEncodingType.INLINE }, { 
KeyValueEncodingType.SEPARATED } };
+    @Test(dataProvider = "schemaProvider")
+    public void testForSchema(Schema schema) throws Exception {
+        String schemaFlag;
+        if (schema.getSchemaInfo().getType().isStruct()) {
+            schemaFlag = schema.getSchemaInfo().getType().name();
+        } else 
if(schema.getSchemaInfo().getType().equals(SchemaType.KEY_VALUE)) {
+            schemaFlag = schema.getSchemaInfo().getType().name() + "_"
+                    + ((KeyValueSchema) schema).getKeyValueEncodingType();
+        } else {
+            // Because some schema types are same(such as BYTES and 
BYTEBUFFER), so use the schema name as flag.
+            schemaFlag = schema.getSchemaInfo().getName();
+        }
+        String topic = String.format("public/default/schema_%s_test_%s", 
schemaFlag, randomName(5)).toLowerCase();
+        pulsarSQLBasicTest(TopicName.get(topic), false, false, schema);
     }
 
-    @Test(dataProvider = "keyValueEncodingType")
-    public void testKeyValueSchema(KeyValueEncodingType type) throws Exception 
{
-        waitPulsarSQLReady();
-        TopicName topicName = TopicName.get("public/default/stocks" + 
randomName(20));
+    @Override
+    protected int prepareData(TopicName topicName,
+                              boolean isBatch,
+                              boolean useNsOffloadPolices,
+                              Schema schema) throws Exception {
         @Cleanup
         PulsarClient pulsarClient = PulsarClient.builder()
                 .serviceUrl(pulsarCluster.getPlainTextServiceUrl())
                 .build();
 
+        if 
(schema.getSchemaInfo().getName().equals(Schema.BYTES.getSchemaInfo().getName()))
 {
+            prepareDataForBytesSchema(pulsarClient, topicName, isBatch);
+        } else if 
(schema.getSchemaInfo().getName().equals(Schema.BYTEBUFFER.getSchemaInfo().getName()))
 {
+            prepareDataForByteBufferSchema(pulsarClient, topicName, isBatch);
+        } else if (schema.getSchemaInfo().getType().equals(SchemaType.STRING)) 
{
+            prepareDataForStringSchema(pulsarClient, topicName, isBatch);
+        } else if (schema.getSchemaInfo().getType().equals(SchemaType.JSON)
+                || schema.getSchemaInfo().getType().equals(SchemaType.AVRO)) {
+            prepareDataForStructSchema(pulsarClient, topicName, isBatch, 
schema);
+        } else if 
(schema.getSchemaInfo().getType().equals(SchemaType.KEY_VALUE)) {
+            prepareDataForKeyValueSchema(pulsarClient, topicName, schema);
+        }
+
+        return NUM_OF_STOCKS;
+    }
+
+    private void prepareDataForBytesSchema(PulsarClient pulsarClient,
+                                           TopicName topicName,
+                                           boolean isBatch) throws 
PulsarClientException {
         @Cleanup
-        Producer<KeyValue<Stock,Stock>> producer = 
pulsarClient.newProducer(Schema
-                .KeyValue(Schema.AVRO(Stock.class), Schema.AVRO(Stock.class), 
type))
+        Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
                 .topic(topicName.toString())
+                .enableBatching(isBatch)
                 .create();
 
         for (int i = 0 ; i < NUM_OF_STOCKS; ++i) {
-            int j = 100 * i;
-            final Stock stock1 = new Stock(j, "STOCK_" + j , 100.0 + j * 10);
-            final Stock stock2 = new Stock(i, "STOCK_" + i , 100.0 + i * 10);
-            producer.send(new KeyValue<>(stock1, stock2));
+            producer.send(("bytes schema test" + i).getBytes());
         }
-
         producer.flush();
-
-        validateMetadata(topicName);
-
-        Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(
-                () -> {
-                    ContainerExecResult containerExecResult = execQuery(
-                            String.format("select * from pulsar.\"%s\".\"%s\" 
order by entryid;",
-                                    topicName.getNamespace(), 
topicName.getLocalName()));
-                    assertThat(containerExecResult.getExitCode()).isEqualTo(0);
-                    log.info("select sql query output \n{}", 
containerExecResult.getStdout());
-                    String[] split = 
containerExecResult.getStdout().split("\n");
-                    assertThat(split.length).isEqualTo(NUM_OF_STOCKS);
-                    String[] split2 = 
containerExecResult.getStdout().split("\n|,");
-                    for (int i = 0; i < NUM_OF_STOCKS; ++i) {
-                        int j = 100 * i;
-                        assertThat(split2).contains("\"" + i + "\"");
-                        assertThat(split2).contains("\"" + "STOCK_" + i + 
"\"");
-                        assertThat(split2).contains("\"" + (100.0 + i * 10) + 
"\"");
-
-                        assertThat(split2).contains("\"" + j + "\"");
-                        assertThat(split2).contains("\"" + "STOCK_" + j + 
"\"");
-                        assertThat(split2).contains("\"" + (100.0 + j * 10) + 
"\"");
-                    }
-                }
-        );
-
     }
 
+    private void prepareDataForByteBufferSchema(PulsarClient pulsarClient,
+                                                TopicName topicName,
+                                                boolean isBatch) throws 
PulsarClientException {
+        @Cleanup
+        Producer<ByteBuffer> producer = 
pulsarClient.newProducer(Schema.BYTEBUFFER)
+                .topic(topicName.toString())
+                .enableBatching(isBatch)
+                .create();
 
+        for (int i = 0 ; i < NUM_OF_STOCKS; ++i) {
+            producer.send(ByteBuffer.wrap(("bytes schema test" + 
i).getBytes()));
+        }
+        producer.flush();
+    }
 
-    @Override
-    protected int prepareData(TopicName topicName, boolean isBatch, boolean 
useNsOffloadPolices) throws Exception {
+    private void prepareDataForStringSchema(PulsarClient pulsarClient,
+                                            TopicName topicName,
+                                            boolean isBatch) throws 
PulsarClientException {
         @Cleanup
-        PulsarClient pulsarClient = PulsarClient.builder()
-                .serviceUrl(pulsarCluster.getPlainTextServiceUrl())
-                .build();
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic(topicName.toString())
+                .enableBatching(isBatch)
+                .create();
 
+        for (int i = 0 ; i < NUM_OF_STOCKS; ++i) {
+            producer.send("string" + i);
+        }
+        producer.flush();
+    }
+
+    private void prepareDataForStructSchema(PulsarClient pulsarClient,
+                                            TopicName topicName,
+                                            boolean isBatch,
+                                            Schema<Stock> schema) throws 
Exception {
         @Cleanup
-        Producer<Stock> producer = 
pulsarClient.newProducer(JSONSchema.of(Stock.class))
+        Producer<Stock> producer = pulsarClient.newProducer(schema)
                 .topic(topicName.toString())
                 .enableBatching(isBatch)
                 .create();
@@ -143,7 +187,71 @@ public class TestBasicPresto extends TestPulsarSQLBase {
             producer.send(stock);
         }
         producer.flush();
-        return NUM_OF_STOCKS;
+    }
+
+    private void prepareDataForKeyValueSchema(PulsarClient pulsarClient,
+                                              TopicName topicName,
+                                              Schema<KeyValue<Stock, Stock>> 
schema) throws Exception {
+        @Cleanup
+        Producer<KeyValue<Stock,Stock>> producer = 
pulsarClient.newProducer(schema)
+                .topic(topicName.toString())
+                .create();
+
+        for (int i = 0 ; i < NUM_OF_STOCKS; ++i) {
+            int j = 100 * i;
+            final Stock stock1 = new Stock(j, "STOCK_" + j , 100.0 + j * 10);
+            final Stock stock2 = new Stock(i, "STOCK_" + i , 100.0 + i * 10);
+            producer.send(new KeyValue<>(stock1, stock2));
+        }
+    }
+
+    @Override
+    protected void validateContent(int messageNum, String[] contentArr, Schema 
schema) {
+        switch (schema.getSchemaInfo().getType()) {
+            case BYTES:
+                log.info("Skip validate content for BYTES schema type.");
+                break;
+            case STRING:
+                validateContentForStringSchema(messageNum, contentArr);
+                log.info("finish validate content for STRING schema type.");
+                break;
+            case JSON:
+            case AVRO:
+                validateContentForStructSchema(messageNum, contentArr);
+                log.info("finish validate content for {} schema type.", 
schema.getSchemaInfo().getType());
+                break;
+            case KEY_VALUE:
+                validateContentForKeyValueSchema(messageNum, contentArr);
+                log.info("finish validate content for KEY_VALUE {} schema 
type.",
+                        ((KeyValueSchema) schema).getKeyValueEncodingType());
+        }
+    }
+
+    private void validateContentForStringSchema(int messageNum, String[] 
contentArr) {
+        for (int i = 0; i < messageNum; i++) {
+            assertThat(contentArr).contains("\"string" + i + "\"");
+        }
+    }
+
+    private void validateContentForStructSchema(int messageNum, String[] 
contentArr) {
+        for (int i = 0; i < messageNum; ++i) {
+            assertThat(contentArr).contains("\"" + i + "\"");
+            assertThat(contentArr).contains("\"" + "STOCK_" + i + "\"");
+            assertThat(contentArr).contains("\"" + (100.0 + i * 10) + "\"");
+        }
+    }
+
+    private void validateContentForKeyValueSchema(int messageNum, String[] 
contentArr) {
+        for (int i = 0; i < messageNum; ++i) {
+            int j = 100 * i;
+            assertThat(contentArr).contains("\"" + i + "\"");
+            assertThat(contentArr).contains("\"" + "STOCK_" + i + "\"");
+            assertThat(contentArr).contains("\"" + (100.0 + i * 10) + "\"");
+
+            assertThat(contentArr).contains("\"" + j + "\"");
+            assertThat(contentArr).contains("\"" + "STOCK_" + j + "\"");
+            assertThat(contentArr).contains("\"" + (100.0 + j * 10) + "\"");
+        }
     }
 
 }
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPrestoQueryTieredStorage.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPrestoQueryTieredStorage.java
index 80374e9..e7c3cd4 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPrestoQueryTieredStorage.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPrestoQueryTieredStorage.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.tests.integration.presto;
 
 import static com.google.common.base.Preconditions.checkNotNull;
+import static org.assertj.core.api.Assertions.assertThat;
 
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
@@ -28,6 +29,7 @@ import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.schema.JSONSchema;
@@ -43,7 +45,7 @@ import org.testng.annotations.Test;
 
 
 /**
- * Test presto query from tiered storage.
+ * Test presto query from tiered storage, the Pulsar SQL is cluster mode.
  */
 @Slf4j
 public class TestPrestoQueryTieredStorage extends TestPulsarSQLBase {
@@ -106,18 +108,21 @@ public class TestPrestoQueryTieredStorage extends 
TestPulsarSQLBase {
     public void testQueryTieredStorage1() throws Exception {
         TopicName topicName = TopicName.get(
                 TopicDomain.persistent.value(), TENANT, NAMESPACE, 
"stocks_ts_nons_" + randomName(5));
-        pulsarSQLBasicTest(topicName, false, false);
+        pulsarSQLBasicTest(topicName, false, false, 
JSONSchema.of(Stock.class));
     }
 
     @Test
     public void testQueryTieredStorage2() throws Exception {
         TopicName topicName = TopicName.get(
                 TopicDomain.persistent.value(), TENANT, NAMESPACE, 
"stocks_ts_ns_" + randomName(5));
-        pulsarSQLBasicTest(topicName, false, true);
+        pulsarSQLBasicTest(topicName, false, true, JSONSchema.of(Stock.class));
     }
 
     @Override
-    protected int prepareData(TopicName topicName, boolean isBatch, boolean 
useNsOffloadPolices) throws Exception {
+    protected int prepareData(TopicName topicName,
+                              boolean isBatch,
+                              boolean useNsOffloadPolices,
+                              Schema schema) throws Exception {
         @Cleanup
         PulsarClient pulsarClient = PulsarClient.builder()
                 .serviceUrl(pulsarCluster.getPlainTextServiceUrl())
@@ -206,4 +211,13 @@ public class TestPrestoQueryTieredStorage extends 
TestPulsarSQLBase {
         }
     }
 
+    @Override
+    protected void validateContent(int messageNum, String[] contentArr, Schema 
schema) {
+        for (int i = 0; i < messageNum; ++i) {
+            assertThat(contentArr).contains("\"" + i + "\"");
+            assertThat(contentArr).contains("\"" + "STOCK_" + i + "\"");
+            assertThat(contentArr).contains("\"" + (100.0 + i * 10) + "\"");
+        }
+    }
+
 }
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPulsarSQLBase.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPulsarSQLBase.java
index 5781549..91186f4 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPulsarSQLBase.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPulsarSQLBase.java
@@ -31,7 +31,9 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.tests.integration.docker.ContainerExecException;
 import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
 import org.apache.pulsar.tests.integration.suites.PulsarSQLTestSuite;
@@ -42,19 +44,30 @@ import org.testcontainers.shaded.okhttp3.Request;
 import org.testcontainers.shaded.okhttp3.Response;
 import org.testng.Assert;
 
+
+/**
+ * Pulsar SQL test base.
+ */
 @Slf4j
 public class TestPulsarSQLBase extends PulsarSQLTestSuite {
 
-    protected void pulsarSQLBasicTest(TopicName topic, boolean isBatch, 
boolean useNsOffloadPolices) throws Exception {
+    protected void pulsarSQLBasicTest(TopicName topic,
+                                      boolean isBatch,
+                                      boolean useNsOffloadPolices,
+                                      Schema schema) throws Exception {
+        log.info("Pulsar SQL basic test. topic: {}", topic);
+
         waitPulsarSQLReady();
 
         log.info("start prepare data for query. topic: {}", topic);
-        int messageCnt = prepareData(topic, isBatch, useNsOffloadPolices);
+        int messageCnt = prepareData(topic, isBatch, useNsOffloadPolices, 
schema);
         log.info("finish prepare data for query. topic: {}, messageCnt: {}", 
topic, messageCnt);
 
         validateMetadata(topic);
 
-        validateData(topic, messageCnt);
+        validateData(topic, messageCnt, schema);
+
+        log.info("Finish Pulsar SQL basic test. topic: {}", topic);
     }
 
     public void waitPulsarSQLReady() throws Exception {
@@ -98,7 +111,10 @@ public class TestPulsarSQLBase extends PulsarSQLTestSuite {
         }
     }
 
-    protected int prepareData(TopicName topicName, boolean isBatch, boolean 
useNsOffloadPolices) throws Exception {
+    protected int prepareData(TopicName topicName,
+                              boolean isBatch,
+                              boolean useNsOffloadPolices,
+                              Schema schema) throws Exception {
         throw new Exception("Unsupported operation prepareData.");
     }
 
@@ -122,24 +138,31 @@ public class TestPulsarSQLBase extends PulsarSQLTestSuite 
{
         );
     }
 
-    public void validateData(TopicName topicName, int messageNum) throws 
Exception {
+    protected void validateContent(int messageNum, String[] contentArr, Schema 
schema) throws Exception {
+        throw new Exception("Unsupported operation validateContent.");
+    }
+
+    private void validateData(TopicName topicName, int messageNum, Schema 
schema) throws Exception {
         String namespace = topicName.getNamespace();
         String topic = topicName.getLocalName();
 
+        final String queryAllDataSql;
+        if (schema.getSchemaInfo().getType().isStruct()
+                || 
schema.getSchemaInfo().getType().equals(SchemaType.KEY_VALUE)) {
+            queryAllDataSql = String.format("select * from 
pulsar.\"%s\".\"%s\" order by entryid;", namespace, topic);
+        } else {
+            queryAllDataSql = String.format("select * from 
pulsar.\"%s\".\"%s\";", namespace, topic);
+        }
+
         Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(
                 () -> {
-                    ContainerExecResult containerExecResult = execQuery(
-                            String.format("select * from pulsar.\"%s\".\"%s\" 
order by entryid;", namespace, topic));
+                    ContainerExecResult containerExecResult = 
execQuery(queryAllDataSql);
                     assertThat(containerExecResult.getExitCode()).isEqualTo(0);
                     log.info("select sql query output \n{}", 
containerExecResult.getStdout());
                     String[] split = 
containerExecResult.getStdout().split("\n");
                     assertThat(split.length).isEqualTo(messageNum);
-                    String[] split2 = 
containerExecResult.getStdout().split("\n|,");
-                    for (int i = 0; i < messageNum; ++i) {
-                        assertThat(split2).contains("\"" + i + "\"");
-                        assertThat(split2).contains("\"" + "STOCK_" + i + 
"\"");
-                        assertThat(split2).contains("\"" + (100.0 + i * 10) + 
"\"");
-                    }
+                    String[] contentArr = 
containerExecResult.getStdout().split("\n|,");
+                    validateContent(messageNum, contentArr, schema);
                 }
         );
 

Reply via email to