This is an automated email from the ASF dual-hosted git repository.
yong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 371b311 [Pulsar SQL] Fix Pulsar SQL query bytes schema data error
(#9631)
371b311 is described below
commit 371b311eac93ac1cf5d5a3ceeeb36f7ae11538a1
Author: ran <[email protected]>
AuthorDate: Thu Feb 25 16:49:53 2021 +0800
[Pulsar SQL] Fix Pulsar SQL query bytes schema data error (#9631)
### Motivation
Currently, the Pulsar SQL query bytes schema data will cause an error.
*Reproduce*
1. produce bytes schema data.
2. query data by the Pulsar SQL.
3. the error log could be seen.
*Error log*
```
com.google.common.util.concurrent.UncheckedExecutionException:
java.nio.BufferUnderflowException
at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2051)
at com.google.common.cache.LocalCache.get(LocalCache.java:3951)
at com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974)
at
com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4935)
at
org.apache.pulsar.sql.presto.PulsarSqlSchemaInfoProvider.getSchemaByVersion(PulsarSqlSchemaInfoProvider.java:76)
at
org.apache.pulsar.sql.presto.PulsarRecordCursor.advanceNextPosition(PulsarRecordCursor.java:471)
at
io.prestosql.spi.connector.RecordPageSource.getNextPage(RecordPageSource.java:90)
at
io.prestosql.operator.TableScanOperator.getOutput(TableScanOperator.java:302)
at io.prestosql.operator.Driver.processInternal(Driver.java:379)
at io.prestosql.operator.Driver.lambda$processFor$8(Driver.java:283)
at io.prestosql.operator.Driver.tryWithLock(Driver.java:675)
at io.prestosql.operator.Driver.processFor(Driver.java:276)
at
io.prestosql.execution.SqlTaskExecution$DriverSplitRunner.processFor(SqlTaskExecution.java:1075)
at
io.prestosql.execution.executor.PrioritizedSplitRunner.process(PrioritizedSplitRunner.java:163)
at
io.prestosql.execution.executor.TaskExecutor$TaskRunner.run(TaskExecutor.java:484)
at
io.prestosql.$gen.Presto_332__testversion____20210219_094906_2.run(Unknown
Source)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.nio.BufferUnderflowException
at java.nio.Buffer.nextGetIndex(Buffer.java:509)
at java.nio.HeapByteBuffer.getLong(HeapByteBuffer.java:415)
at
org.apache.pulsar.sql.presto.PulsarSqlSchemaInfoProvider.loadSchema(PulsarSqlSchemaInfoProvider.java:106)
at
org.apache.pulsar.sql.presto.PulsarSqlSchemaInfoProvider.access$000(PulsarSqlSchemaInfoProvider.java:49)
at
org.apache.pulsar.sql.presto.PulsarSqlSchemaInfoProvider$1.load(PulsarSqlSchemaInfoProvider.java:61)
at
org.apache.pulsar.sql.presto.PulsarSqlSchemaInfoProvider$1.load(PulsarSqlSchemaInfoProvider.java:58)
at
com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3529)
at
com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2278)
at
com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2155)
at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2045)
... 18 more
```
### Modifications
Add check for bytes schema, if the schema is bytes schema use the schema
info of the bytes schema directly.
### Verifying this change
Add a new integration test for different schemas.
### Does this pull request potentially affect one of the following parts:
*If `yes` was chosen, please highlight the changes*
- Dependencies (does it add or upgrade a dependency): (no)
- The public API: (no)
- The schema: (no)
- The default values of configurations: (no)
- The wire protocol: (no)
- The rest endpoints: (no)
- The admin cli options: (no)
- Anything that affects deployment: (no)
---
.../pulsar/sql/presto/PulsarRecordCursor.java | 19 +-
.../tests/integration/presto/TestBasicPresto.java | 216 +++++++++++++++------
.../presto/TestPrestoQueryTieredStorage.java | 22 ++-
.../integration/presto/TestPulsarSQLBase.java | 49 +++--
4 files changed, 233 insertions(+), 73 deletions(-)
diff --git
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
index c782de6..1171902 100644
---
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
+++
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
@@ -54,6 +54,7 @@ import org.apache.bookkeeper.mledger.ReadOnlyCursor;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.mledger.impl.ReadOnlyCursorImpl;
import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.schema.KeyValueSchemaInfo;
import org.apache.pulsar.common.api.raw.MessageParser;
import org.apache.pulsar.common.api.raw.RawMessage;
@@ -441,9 +442,11 @@ public class PulsarRecordCursor implements RecordCursor {
//start time for deseralizing record
metricsTracker.start_RECORD_DESERIALIZE_TIME();
- SchemaInfo schemaInfo;
+ SchemaInfo schemaInfo =
getBytesSchemaInfo(pulsarSplit.getSchemaType(), pulsarSplit.getSchemaName());
try {
- schemaInfo =
schemaInfoProvider.getSchemaByVersion(this.currentMessage.getSchemaVersion()).get();
+ if (schemaInfo == null) {
+ schemaInfo =
schemaInfoProvider.getSchemaByVersion(this.currentMessage.getSchemaVersion()).get();
+ }
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
@@ -560,6 +563,18 @@ public class PulsarRecordCursor implements RecordCursor {
return true;
}
+ private SchemaInfo getBytesSchemaInfo(SchemaType schemaType, String
schemaName) {
+ if (!schemaType.equals(SchemaType.BYTES) &&
!schemaType.equals(SchemaType.NONE)) {
+ return null;
+ }
+ if (schemaName.equals(Schema.BYTES.getSchemaInfo().getName())) {
+ return Schema.BYTES.getSchemaInfo();
+ } else if
(schemaName.equals(Schema.BYTEBUFFER.getSchemaInfo().getName())) {
+ return Schema.BYTEBUFFER.getSchemaInfo();
+ } else {
+ return Schema.BYTES.getSchemaInfo();
+ }
+ }
@Override
public boolean getBoolean(int field) {
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java
index 5597a95..fe8ba7e 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java
@@ -18,26 +18,31 @@
*/
package org.apache.pulsar.tests.integration.presto;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.nio.ByteBuffer;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.schema.AvroSchema;
import org.apache.pulsar.client.impl.schema.JSONSchema;
+import org.apache.pulsar.client.impl.schema.KeyValueSchema;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
-import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
-import org.awaitility.Awaitility;
+import org.apache.pulsar.common.schema.SchemaType;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
-import java.util.concurrent.TimeUnit;
-
-import static org.assertj.core.api.Assertions.assertThat;
+/**
+ * Test basic Pulsar SQL query, the Pulsar SQL is standalone mode.
+ */
@Slf4j
public class TestBasicPresto extends TestPulsarSQLBase {
@@ -55,85 +60,124 @@ public class TestBasicPresto extends TestPulsarSQLBase {
pulsarCluster.stopPrestoWorker();
}
+ @DataProvider(name = "schemaProvider")
+ public Object[][] schemaProvider() {
+ return new Object[][] {
+ { Schema.BYTES},
+ { Schema.BYTEBUFFER},
+ { Schema.STRING},
+ { AvroSchema.of(Stock.class)},
+ { JSONSchema.of(Stock.class)},
+ { Schema.KeyValue(Schema.AVRO(Stock.class),
Schema.AVRO(Stock.class), KeyValueEncodingType.INLINE) },
+ { Schema.KeyValue(Schema.AVRO(Stock.class),
Schema.AVRO(Stock.class), KeyValueEncodingType.SEPARATED) }
+ };
+ }
+
@Test
public void testSimpleSQLQueryBatched() throws Exception {
TopicName topicName = TopicName.get("public/default/stocks_batched_" +
randomName(5));
- pulsarSQLBasicTest(topicName, true, false);
+ pulsarSQLBasicTest(topicName, true, false, JSONSchema.of(Stock.class));
}
@Test
public void testSimpleSQLQueryNonBatched() throws Exception {
TopicName topicName =
TopicName.get("public/default/stocks_nonbatched_" + randomName(5));
- pulsarSQLBasicTest(topicName, false, false);
+ pulsarSQLBasicTest(topicName, false, false,
JSONSchema.of(Stock.class));
}
- @DataProvider(name = "keyValueEncodingType")
- public Object[][] keyValueEncodingType() {
- return new Object[][] { { KeyValueEncodingType.INLINE }, {
KeyValueEncodingType.SEPARATED } };
+ @Test(dataProvider = "schemaProvider")
+ public void testForSchema(Schema schema) throws Exception {
+ String schemaFlag;
+ if (schema.getSchemaInfo().getType().isStruct()) {
+ schemaFlag = schema.getSchemaInfo().getType().name();
+ } else
if(schema.getSchemaInfo().getType().equals(SchemaType.KEY_VALUE)) {
+ schemaFlag = schema.getSchemaInfo().getType().name() + "_"
+ + ((KeyValueSchema) schema).getKeyValueEncodingType();
+ } else {
+ // Because some schema types are same(such as BYTES and
BYTEBUFFER), so use the schema name as flag.
+ schemaFlag = schema.getSchemaInfo().getName();
+ }
+ String topic = String.format("public/default/schema_%s_test_%s",
schemaFlag, randomName(5)).toLowerCase();
+ pulsarSQLBasicTest(TopicName.get(topic), false, false, schema);
}
- @Test(dataProvider = "keyValueEncodingType")
- public void testKeyValueSchema(KeyValueEncodingType type) throws Exception
{
- waitPulsarSQLReady();
- TopicName topicName = TopicName.get("public/default/stocks" +
randomName(20));
+ @Override
+ protected int prepareData(TopicName topicName,
+ boolean isBatch,
+ boolean useNsOffloadPolices,
+ Schema schema) throws Exception {
@Cleanup
PulsarClient pulsarClient = PulsarClient.builder()
.serviceUrl(pulsarCluster.getPlainTextServiceUrl())
.build();
+ if
(schema.getSchemaInfo().getName().equals(Schema.BYTES.getSchemaInfo().getName()))
{
+ prepareDataForBytesSchema(pulsarClient, topicName, isBatch);
+ } else if
(schema.getSchemaInfo().getName().equals(Schema.BYTEBUFFER.getSchemaInfo().getName()))
{
+ prepareDataForByteBufferSchema(pulsarClient, topicName, isBatch);
+ } else if (schema.getSchemaInfo().getType().equals(SchemaType.STRING))
{
+ prepareDataForStringSchema(pulsarClient, topicName, isBatch);
+ } else if (schema.getSchemaInfo().getType().equals(SchemaType.JSON)
+ || schema.getSchemaInfo().getType().equals(SchemaType.AVRO)) {
+ prepareDataForStructSchema(pulsarClient, topicName, isBatch,
schema);
+ } else if
(schema.getSchemaInfo().getType().equals(SchemaType.KEY_VALUE)) {
+ prepareDataForKeyValueSchema(pulsarClient, topicName, schema);
+ }
+
+ return NUM_OF_STOCKS;
+ }
+
+ private void prepareDataForBytesSchema(PulsarClient pulsarClient,
+ TopicName topicName,
+ boolean isBatch) throws
PulsarClientException {
@Cleanup
- Producer<KeyValue<Stock,Stock>> producer =
pulsarClient.newProducer(Schema
- .KeyValue(Schema.AVRO(Stock.class), Schema.AVRO(Stock.class),
type))
+ Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
.topic(topicName.toString())
+ .enableBatching(isBatch)
.create();
for (int i = 0 ; i < NUM_OF_STOCKS; ++i) {
- int j = 100 * i;
- final Stock stock1 = new Stock(j, "STOCK_" + j , 100.0 + j * 10);
- final Stock stock2 = new Stock(i, "STOCK_" + i , 100.0 + i * 10);
- producer.send(new KeyValue<>(stock1, stock2));
+ producer.send(("bytes schema test" + i).getBytes());
}
-
producer.flush();
-
- validateMetadata(topicName);
-
- Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(
- () -> {
- ContainerExecResult containerExecResult = execQuery(
- String.format("select * from pulsar.\"%s\".\"%s\"
order by entryid;",
- topicName.getNamespace(),
topicName.getLocalName()));
- assertThat(containerExecResult.getExitCode()).isEqualTo(0);
- log.info("select sql query output \n{}",
containerExecResult.getStdout());
- String[] split =
containerExecResult.getStdout().split("\n");
- assertThat(split.length).isEqualTo(NUM_OF_STOCKS);
- String[] split2 =
containerExecResult.getStdout().split("\n|,");
- for (int i = 0; i < NUM_OF_STOCKS; ++i) {
- int j = 100 * i;
- assertThat(split2).contains("\"" + i + "\"");
- assertThat(split2).contains("\"" + "STOCK_" + i +
"\"");
- assertThat(split2).contains("\"" + (100.0 + i * 10) +
"\"");
-
- assertThat(split2).contains("\"" + j + "\"");
- assertThat(split2).contains("\"" + "STOCK_" + j +
"\"");
- assertThat(split2).contains("\"" + (100.0 + j * 10) +
"\"");
- }
- }
- );
-
}
+ private void prepareDataForByteBufferSchema(PulsarClient pulsarClient,
+ TopicName topicName,
+ boolean isBatch) throws
PulsarClientException {
+ @Cleanup
+ Producer<ByteBuffer> producer =
pulsarClient.newProducer(Schema.BYTEBUFFER)
+ .topic(topicName.toString())
+ .enableBatching(isBatch)
+ .create();
+ for (int i = 0 ; i < NUM_OF_STOCKS; ++i) {
+ producer.send(ByteBuffer.wrap(("bytes schema test" +
i).getBytes()));
+ }
+ producer.flush();
+ }
- @Override
- protected int prepareData(TopicName topicName, boolean isBatch, boolean
useNsOffloadPolices) throws Exception {
+ private void prepareDataForStringSchema(PulsarClient pulsarClient,
+ TopicName topicName,
+ boolean isBatch) throws
PulsarClientException {
@Cleanup
- PulsarClient pulsarClient = PulsarClient.builder()
- .serviceUrl(pulsarCluster.getPlainTextServiceUrl())
- .build();
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+ .topic(topicName.toString())
+ .enableBatching(isBatch)
+ .create();
+ for (int i = 0 ; i < NUM_OF_STOCKS; ++i) {
+ producer.send("string" + i);
+ }
+ producer.flush();
+ }
+
+ private void prepareDataForStructSchema(PulsarClient pulsarClient,
+ TopicName topicName,
+ boolean isBatch,
+ Schema<Stock> schema) throws
Exception {
@Cleanup
- Producer<Stock> producer =
pulsarClient.newProducer(JSONSchema.of(Stock.class))
+ Producer<Stock> producer = pulsarClient.newProducer(schema)
.topic(topicName.toString())
.enableBatching(isBatch)
.create();
@@ -143,7 +187,71 @@ public class TestBasicPresto extends TestPulsarSQLBase {
producer.send(stock);
}
producer.flush();
- return NUM_OF_STOCKS;
+ }
+
+ private void prepareDataForKeyValueSchema(PulsarClient pulsarClient,
+ TopicName topicName,
+ Schema<KeyValue<Stock, Stock>>
schema) throws Exception {
+ @Cleanup
+ Producer<KeyValue<Stock,Stock>> producer =
pulsarClient.newProducer(schema)
+ .topic(topicName.toString())
+ .create();
+
+ for (int i = 0 ; i < NUM_OF_STOCKS; ++i) {
+ int j = 100 * i;
+ final Stock stock1 = new Stock(j, "STOCK_" + j , 100.0 + j * 10);
+ final Stock stock2 = new Stock(i, "STOCK_" + i , 100.0 + i * 10);
+ producer.send(new KeyValue<>(stock1, stock2));
+ }
+ }
+
+ @Override
+ protected void validateContent(int messageNum, String[] contentArr, Schema
schema) {
+ switch (schema.getSchemaInfo().getType()) {
+ case BYTES:
+ log.info("Skip validate content for BYTES schema type.");
+ break;
+ case STRING:
+ validateContentForStringSchema(messageNum, contentArr);
+ log.info("finish validate content for STRING schema type.");
+ break;
+ case JSON:
+ case AVRO:
+ validateContentForStructSchema(messageNum, contentArr);
+ log.info("finish validate content for {} schema type.",
schema.getSchemaInfo().getType());
+ break;
+ case KEY_VALUE:
+ validateContentForKeyValueSchema(messageNum, contentArr);
+ log.info("finish validate content for KEY_VALUE {} schema
type.",
+ ((KeyValueSchema) schema).getKeyValueEncodingType());
+ }
+ }
+
+ private void validateContentForStringSchema(int messageNum, String[]
contentArr) {
+ for (int i = 0; i < messageNum; i++) {
+ assertThat(contentArr).contains("\"string" + i + "\"");
+ }
+ }
+
+ private void validateContentForStructSchema(int messageNum, String[]
contentArr) {
+ for (int i = 0; i < messageNum; ++i) {
+ assertThat(contentArr).contains("\"" + i + "\"");
+ assertThat(contentArr).contains("\"" + "STOCK_" + i + "\"");
+ assertThat(contentArr).contains("\"" + (100.0 + i * 10) + "\"");
+ }
+ }
+
+ private void validateContentForKeyValueSchema(int messageNum, String[]
contentArr) {
+ for (int i = 0; i < messageNum; ++i) {
+ int j = 100 * i;
+ assertThat(contentArr).contains("\"" + i + "\"");
+ assertThat(contentArr).contains("\"" + "STOCK_" + i + "\"");
+ assertThat(contentArr).contains("\"" + (100.0 + i * 10) + "\"");
+
+ assertThat(contentArr).contains("\"" + j + "\"");
+ assertThat(contentArr).contains("\"" + "STOCK_" + j + "\"");
+ assertThat(contentArr).contains("\"" + (100.0 + j * 10) + "\"");
+ }
}
}
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPrestoQueryTieredStorage.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPrestoQueryTieredStorage.java
index 80374e9..e7c3cd4 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPrestoQueryTieredStorage.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPrestoQueryTieredStorage.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.tests.integration.presto;
import static com.google.common.base.Preconditions.checkNotNull;
+import static org.assertj.core.api.Assertions.assertThat;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
@@ -28,6 +29,7 @@ import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.schema.JSONSchema;
@@ -43,7 +45,7 @@ import org.testng.annotations.Test;
/**
- * Test presto query from tiered storage.
+ * Test presto query from tiered storage, the Pulsar SQL is cluster mode.
*/
@Slf4j
public class TestPrestoQueryTieredStorage extends TestPulsarSQLBase {
@@ -106,18 +108,21 @@ public class TestPrestoQueryTieredStorage extends
TestPulsarSQLBase {
public void testQueryTieredStorage1() throws Exception {
TopicName topicName = TopicName.get(
TopicDomain.persistent.value(), TENANT, NAMESPACE,
"stocks_ts_nons_" + randomName(5));
- pulsarSQLBasicTest(topicName, false, false);
+ pulsarSQLBasicTest(topicName, false, false,
JSONSchema.of(Stock.class));
}
@Test
public void testQueryTieredStorage2() throws Exception {
TopicName topicName = TopicName.get(
TopicDomain.persistent.value(), TENANT, NAMESPACE,
"stocks_ts_ns_" + randomName(5));
- pulsarSQLBasicTest(topicName, false, true);
+ pulsarSQLBasicTest(topicName, false, true, JSONSchema.of(Stock.class));
}
@Override
- protected int prepareData(TopicName topicName, boolean isBatch, boolean
useNsOffloadPolices) throws Exception {
+ protected int prepareData(TopicName topicName,
+ boolean isBatch,
+ boolean useNsOffloadPolices,
+ Schema schema) throws Exception {
@Cleanup
PulsarClient pulsarClient = PulsarClient.builder()
.serviceUrl(pulsarCluster.getPlainTextServiceUrl())
@@ -206,4 +211,13 @@ public class TestPrestoQueryTieredStorage extends
TestPulsarSQLBase {
}
}
+ @Override
+ protected void validateContent(int messageNum, String[] contentArr, Schema
schema) {
+ for (int i = 0; i < messageNum; ++i) {
+ assertThat(contentArr).contains("\"" + i + "\"");
+ assertThat(contentArr).contains("\"" + "STOCK_" + i + "\"");
+ assertThat(contentArr).contains("\"" + (100.0 + i * 10) + "\"");
+ }
+ }
+
}
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPulsarSQLBase.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPulsarSQLBase.java
index 5781549..91186f4 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPulsarSQLBase.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPulsarSQLBase.java
@@ -31,7 +31,9 @@ import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.tests.integration.docker.ContainerExecException;
import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
import org.apache.pulsar.tests.integration.suites.PulsarSQLTestSuite;
@@ -42,19 +44,30 @@ import org.testcontainers.shaded.okhttp3.Request;
import org.testcontainers.shaded.okhttp3.Response;
import org.testng.Assert;
+
+/**
+ * Pulsar SQL test base.
+ */
@Slf4j
public class TestPulsarSQLBase extends PulsarSQLTestSuite {
- protected void pulsarSQLBasicTest(TopicName topic, boolean isBatch,
boolean useNsOffloadPolices) throws Exception {
+ protected void pulsarSQLBasicTest(TopicName topic,
+ boolean isBatch,
+ boolean useNsOffloadPolices,
+ Schema schema) throws Exception {
+ log.info("Pulsar SQL basic test. topic: {}", topic);
+
waitPulsarSQLReady();
log.info("start prepare data for query. topic: {}", topic);
- int messageCnt = prepareData(topic, isBatch, useNsOffloadPolices);
+ int messageCnt = prepareData(topic, isBatch, useNsOffloadPolices,
schema);
log.info("finish prepare data for query. topic: {}, messageCnt: {}",
topic, messageCnt);
validateMetadata(topic);
- validateData(topic, messageCnt);
+ validateData(topic, messageCnt, schema);
+
+ log.info("Finish Pulsar SQL basic test. topic: {}", topic);
}
public void waitPulsarSQLReady() throws Exception {
@@ -98,7 +111,10 @@ public class TestPulsarSQLBase extends PulsarSQLTestSuite {
}
}
- protected int prepareData(TopicName topicName, boolean isBatch, boolean
useNsOffloadPolices) throws Exception {
+ protected int prepareData(TopicName topicName,
+ boolean isBatch,
+ boolean useNsOffloadPolices,
+ Schema schema) throws Exception {
throw new Exception("Unsupported operation prepareData.");
}
@@ -122,24 +138,31 @@ public class TestPulsarSQLBase extends PulsarSQLTestSuite
{
);
}
- public void validateData(TopicName topicName, int messageNum) throws
Exception {
+ protected void validateContent(int messageNum, String[] contentArr, Schema
schema) throws Exception {
+ throw new Exception("Unsupported operation validateContent.");
+ }
+
+ private void validateData(TopicName topicName, int messageNum, Schema
schema) throws Exception {
String namespace = topicName.getNamespace();
String topic = topicName.getLocalName();
+ final String queryAllDataSql;
+ if (schema.getSchemaInfo().getType().isStruct()
+ ||
schema.getSchemaInfo().getType().equals(SchemaType.KEY_VALUE)) {
+ queryAllDataSql = String.format("select * from
pulsar.\"%s\".\"%s\" order by entryid;", namespace, topic);
+ } else {
+ queryAllDataSql = String.format("select * from
pulsar.\"%s\".\"%s\";", namespace, topic);
+ }
+
Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(
() -> {
- ContainerExecResult containerExecResult = execQuery(
- String.format("select * from pulsar.\"%s\".\"%s\"
order by entryid;", namespace, topic));
+ ContainerExecResult containerExecResult =
execQuery(queryAllDataSql);
assertThat(containerExecResult.getExitCode()).isEqualTo(0);
log.info("select sql query output \n{}",
containerExecResult.getStdout());
String[] split =
containerExecResult.getStdout().split("\n");
assertThat(split.length).isEqualTo(messageNum);
- String[] split2 =
containerExecResult.getStdout().split("\n|,");
- for (int i = 0; i < messageNum; ++i) {
- assertThat(split2).contains("\"" + i + "\"");
- assertThat(split2).contains("\"" + "STOCK_" + i +
"\"");
- assertThat(split2).contains("\"" + (100.0 + i * 10) +
"\"");
- }
+ String[] contentArr =
containerExecResult.getStdout().split("\n|,");
+ validateContent(messageNum, contentArr, schema);
}
);