This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.7 by this push:
     new 33fad0a  [Pulsar sql] Pulsar sql key-value schema separated model 
support. (#9685)
33fad0a is described below

commit 33fad0a43be7b838f58d78608d61f39b363a0068
Author: congbo <[email protected]>
AuthorDate: Wed Feb 24 17:03:25 2021 +0800

    [Pulsar sql] Pulsar sql key-value schema separated model support. (#9685)
    
    ## Motivation
    Pulsar sql key-value schema separated model support in branch-2.7
    ## implement
    Add the test
    ### Verifying this change
    Add the test for it
---
 .../pulsar/sql/presto/AvroSchemaHandler.java       |  5 +-
 .../pulsar/sql/presto/KeyValueSchemaHandler.java   |  4 +-
 .../pulsar/sql/presto/PulsarRecordCursor.java      |  3 +-
 .../pulsar/sql/presto/PulsarSchemaHandlers.java    |  5 +-
 .../sql/presto/PulsarSqlSchemaInfoProvider.java    | 26 +++++++-
 .../presto/TestPulsarKeyValueSchemaHandler.java    | 74 +++++++++++++++++++++-
 .../presto/TestPulsarPrimitiveSchemaHandler.java   |  2 +-
 .../tests/integration/presto/TestBasicPresto.java  | 66 +++++++++++++++++++
 .../integration/presto/TestPulsarSQLBase.java      |  6 +-
 9 files changed, 177 insertions(+), 14 deletions(-)

diff --git 
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/AvroSchemaHandler.java
 
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/AvroSchemaHandler.java
index db369cf..894deda 100644
--- 
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/AvroSchemaHandler.java
+++ 
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/AvroSchemaHandler.java
@@ -45,9 +45,10 @@ public class AvroSchemaHandler implements SchemaHandler {
     AvroSchemaHandler(TopicName topicName,
                       PulsarConnectorConfig pulsarConnectorConfig,
                       SchemaInfo schemaInfo,
-                      List<PulsarColumnHandle> columnHandles) throws 
PulsarClientException {
+                      List<PulsarColumnHandle> columnHandles,
+                      PulsarSqlSchemaInfoProvider.Type type) throws 
PulsarClientException {
         this(new PulsarSqlSchemaInfoProvider(topicName,
-                                pulsarConnectorConfig.getPulsarAdmin()), 
schemaInfo, columnHandles);
+                                pulsarConnectorConfig.getPulsarAdmin(), type), 
schemaInfo, columnHandles);
     }
 
     AvroSchemaHandler(PulsarSqlSchemaInfoProvider pulsarSqlSchemaInfoProvider,
diff --git 
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/KeyValueSchemaHandler.java
 
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/KeyValueSchemaHandler.java
index 434dd44..c37228d 100644
--- 
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/KeyValueSchemaHandler.java
+++ 
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/KeyValueSchemaHandler.java
@@ -52,9 +52,9 @@ public class KeyValueSchemaHandler implements SchemaHandler {
         this.columnHandles = columnHandles;
         KeyValue<SchemaInfo, SchemaInfo> kvSchemaInfo = 
KeyValueSchemaInfo.decodeKeyValueSchemaInfo(schemaInfo);
         keySchemaHandler = 
PulsarSchemaHandlers.newPulsarSchemaHandler(topicName, pulsarConnectorConfig,
-                kvSchemaInfo.getKey(), columnHandles);
+                kvSchemaInfo.getKey(), columnHandles, 
PulsarSqlSchemaInfoProvider.Type.Key);
         valueSchemaHandler = 
PulsarSchemaHandlers.newPulsarSchemaHandler(topicName, pulsarConnectorConfig,
-                kvSchemaInfo.getValue(), columnHandles);
+                kvSchemaInfo.getValue(), columnHandles, 
PulsarSqlSchemaInfoProvider.Type.Value);
         keyValueEncodingType = 
KeyValueSchemaInfo.decodeKeyValueEncodingType(schemaInfo);
     }
 
diff --git 
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
 
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
index ddd03d6..73d9cec 100644
--- 
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
+++ 
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
@@ -156,7 +156,8 @@ public class PulsarRecordCursor implements RecordCursor {
 
         this.schemaHandler = PulsarSchemaHandlers
                 .newPulsarSchemaHandler(this.topicName,
-                        this.pulsarConnectorConfig, 
pulsarSplit.getSchemaInfo(), columnHandles);
+                        this.pulsarConnectorConfig, 
pulsarSplit.getSchemaInfo(),
+                        columnHandles, PulsarSqlSchemaInfoProvider.Type.NONE);
 
         log.info("Initializing split with parameters: %s", pulsarSplit);
 
diff --git 
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSchemaHandlers.java
 
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSchemaHandlers.java
index 05137e6..7e789f9 100644
--- 
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSchemaHandlers.java
+++ 
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSchemaHandlers.java
@@ -33,7 +33,8 @@ class PulsarSchemaHandlers {
     static SchemaHandler newPulsarSchemaHandler(TopicName topicName,
                                                 PulsarConnectorConfig 
pulsarConnectorConfig,
                                                 SchemaInfo schemaInfo,
-                                                List<PulsarColumnHandle> 
columnHandles) throws RuntimeException{
+                                                List<PulsarColumnHandle> 
columnHandles,
+                                                
PulsarSqlSchemaInfoProvider.Type type) throws RuntimeException{
         if (schemaInfo.getType().isPrimitive()) {
             return new PulsarPrimitiveSchemaHandler(schemaInfo);
         } else if (schemaInfo.getType().isStruct()) {
@@ -42,7 +43,7 @@ class PulsarSchemaHandlers {
                     case JSON:
                         return new JSONSchemaHandler(columnHandles);
                     case AVRO:
-                        return new AvroSchemaHandler(topicName, 
pulsarConnectorConfig, schemaInfo, columnHandles);
+                        return new AvroSchemaHandler(topicName, 
pulsarConnectorConfig, schemaInfo, columnHandles, type);
                     default:
                         throw new PrestoException(NOT_SUPPORTED, "Not 
supported schema type: " + schemaInfo.getType());
                 }
diff --git 
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSqlSchemaInfoProvider.java
 
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSqlSchemaInfoProvider.java
index 41e49c0..996b5d7 100644
--- 
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSqlSchemaInfoProvider.java
+++ 
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSqlSchemaInfoProvider.java
@@ -29,7 +29,9 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.schema.SchemaInfoProvider;
+import org.apache.pulsar.client.impl.schema.KeyValueSchemaInfo;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion;
 import org.apache.pulsar.common.schema.SchemaInfo;
@@ -45,10 +47,18 @@ public class PulsarSqlSchemaInfoProvider implements 
SchemaInfoProvider {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(PulsarSqlSchemaInfoProvider.class);
 
+    public enum Type{
+        NONE,
+        Key,
+        Value,
+    }
+
     private final TopicName topicName;
 
     private final PulsarAdmin pulsarAdmin;
 
+    private final Type type;
+
     private final LoadingCache<BytesSchemaVersion, SchemaInfo> cache = 
CacheBuilder.newBuilder().maximumSize(100000)
             .expireAfterAccess(30, TimeUnit.MINUTES).build(new 
CacheLoader<BytesSchemaVersion, SchemaInfo>() {
                 @Override
@@ -57,9 +67,10 @@ public class PulsarSqlSchemaInfoProvider implements 
SchemaInfoProvider {
                 }
             });
 
-    PulsarSqlSchemaInfoProvider(TopicName topicName, PulsarAdmin pulsarAdmin) {
+    PulsarSqlSchemaInfoProvider(TopicName topicName, PulsarAdmin pulsarAdmin, 
Type type) {
         this.topicName = topicName;
         this.pulsarAdmin = pulsarAdmin;
+        this.type = type;
     }
 
     @Override
@@ -94,8 +105,19 @@ public class PulsarSqlSchemaInfoProvider implements 
SchemaInfoProvider {
     }
 
     private SchemaInfo loadSchema(BytesSchemaVersion bytesSchemaVersion) 
throws PulsarAdminException {
-        return pulsarAdmin.schemas()
+        SchemaInfo schemaInfo = pulsarAdmin.schemas()
                 .getSchemaInfo(topicName.toString(), 
ByteBuffer.wrap(bytesSchemaVersion.get()).getLong());
+        switch (type) {
+            case NONE:
+                return schemaInfo;
+            case Key:
+                return 
KeyValueSchemaInfo.decodeKeyValueSchemaInfo(schemaInfo).getKey();
+            case Value:
+                return 
KeyValueSchemaInfo.decodeKeyValueSchemaInfo(schemaInfo).getValue();
+            default:
+                throw new PulsarAdminException(new PulsarClientException
+                        .NotSupportedException("PulsarSqlSchemaInfoProvider 
don't support this Type : " + type));
+        }
     }
 
 }
diff --git 
a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarKeyValueSchemaHandler.java
 
b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarKeyValueSchemaHandler.java
index 1fe881e..3964601 100644
--- 
a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarKeyValueSchemaHandler.java
+++ 
b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarKeyValueSchemaHandler.java
@@ -29,6 +29,9 @@ import java.util.Objects;
 import java.util.Optional;
 import lombok.Data;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.admin.Schemas;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.impl.schema.KeyValueSchema;
 import org.apache.pulsar.client.impl.schema.KeyValueSchemaInfo;
@@ -37,11 +40,15 @@ import org.apache.pulsar.common.api.raw.RawMessageImpl;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.schema.KeyValue;
 import org.apache.pulsar.common.schema.KeyValueEncodingType;
+import org.apache.pulsar.common.schema.LongSchemaVersion;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 
 
@@ -73,6 +80,9 @@ public class TestPulsarKeyValueSchemaHandler {
 
     private final Integer KEY_FIELD_NAME_PREFIX_LENGTH = 
PulsarColumnMetadata.KEY_SCHEMA_COLUMN_PREFIX.length();
 
+    private Schema<KeyValue<Boo, Foo>> schema5 =
+            Schema.KeyValue(Schema.AVRO(Boo.class), Schema.AVRO(Foo.class), 
KeyValueEncodingType.SEPARATED);
+
     static {
         foo = new Foo();
         foo.field1 = "field1-value";
@@ -108,7 +118,7 @@ public class TestPulsarKeyValueSchemaHandler {
         List<PulsarColumnHandle> columnHandleList = 
getColumnHandlerList(columnMetadataList);
 
         KeyValueSchemaHandler keyValueSchemaHandler =
-                new KeyValueSchemaHandler(null, null,schema1.getSchemaInfo(), 
columnHandleList);
+                new KeyValueSchemaHandler(null, null, schema1.getSchemaInfo(), 
columnHandleList);
 
         RawMessageImpl message = mock(RawMessageImpl.class);
         Mockito.when(message.getData()).thenReturn(
@@ -222,6 +232,68 @@ public class TestPulsarKeyValueSchemaHandler {
     }
 
     @Test
+    public void testKeyValueSeparatedSchema() throws IOException, 
PulsarAdminException {
+        final Boo boo = new Boo();
+        boo.field1 = "field1-value";
+        boo.field2 = true;
+        boo.field3 = 10.2;
+
+        final Foo foo = new Foo();
+        foo.field1 = "file2-value";
+        foo.field2 = 200;
+
+        List<ColumnMetadata> columnMetadataList =
+                PulsarMetadata.getPulsarColumns(topicName, 
schema3.getSchemaInfo(),
+                        true, null);
+        int keyCount = 0;
+        int valueCount = 0;
+        for (ColumnMetadata columnMetadata : columnMetadataList) {
+            PulsarColumnMetadata pulsarColumnMetadata = (PulsarColumnMetadata) 
columnMetadata;
+            if (pulsarColumnMetadata.isKey()) {
+                keyCount++;
+            } else if (pulsarColumnMetadata.isValue()) {
+                valueCount++;
+            }
+        }
+        Assert.assertEquals(keyCount, 3);
+        Assert.assertEquals(valueCount, 1);
+
+        List<PulsarColumnHandle> columnHandleList = 
getColumnHandlerList(columnMetadataList);
+
+        PulsarConnectorConfig pulsarConnectorConfig = 
mock(PulsarConnectorConfig.class);
+        PulsarAdmin admin = mock(PulsarAdmin.class);
+        Schemas schemas = mock(Schemas.class);
+        doReturn(admin).when(pulsarConnectorConfig).getPulsarAdmin();
+        doReturn(schemas).when(admin).schemas();
+        
doReturn(schema5.getSchemaInfo()).when(schemas).getSchemaInfo(anyString(), 
anyLong());
+        KeyValueSchemaHandler keyValueSchemaHandler =
+                new KeyValueSchemaHandler(topicName, pulsarConnectorConfig, 
schema5.getSchemaInfo(), columnHandleList);
+
+        RawMessage message = mock(RawMessage.class);
+        Mockito.when(message.getKeyBytes()).thenReturn(
+                Optional.of(Unpooled.wrappedBuffer(
+                        ((KeyValueSchema) schema5).getKeySchema().encode(boo)
+                ))
+        );
+        Mockito.when(message.getData()).thenReturn(
+                Unpooled.wrappedBuffer(schema5.encode(new KeyValue<>(boo, 
foo)))
+        );
+
+        KeyValue<ByteBuf, ByteBuf> byteBufKeyValue = 
getKeyValueByteBuf(message, schema5);
+        Object object = 
keyValueSchemaHandler.deserialize(byteBufKeyValue.getKey(), 
byteBufKeyValue.getValue(), new LongSchemaVersion(1).bytes());
+        Assert.assertEquals(keyValueSchemaHandler.extractField(0, 
object).toString(),
+                
boo.getValue(columnHandleList.get(0).getName().substring(KEY_FIELD_NAME_PREFIX_LENGTH)));
+        Assert.assertEquals(keyValueSchemaHandler.extractField(1, object),
+                
boo.getValue(columnHandleList.get(1).getName().substring(KEY_FIELD_NAME_PREFIX_LENGTH)));
+        Assert.assertEquals(keyValueSchemaHandler.extractField(2, object),
+                
boo.getValue(columnHandleList.get(2).getName().substring(KEY_FIELD_NAME_PREFIX_LENGTH)));
+        Assert.assertEquals(keyValueSchemaHandler.extractField(3, object),
+                foo.getValue(columnHandleList.get(3).getName()));
+        Assert.assertEquals(keyValueSchemaHandler.extractField(4, object),
+                foo.getValue(columnHandleList.get(4).getName()));
+    }
+
+    @Test
     public void testSchema4() throws IOException {
         List<ColumnMetadata> columnMetadataList =
                 PulsarMetadata.getPulsarColumns(topicName, 
schema4.getSchemaInfo(),
diff --git 
a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarPrimitiveSchemaHandler.java
 
b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarPrimitiveSchemaHandler.java
index fc33077..d8d23cb 100644
--- 
a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarPrimitiveSchemaHandler.java
+++ 
b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarPrimitiveSchemaHandler.java
@@ -141,7 +141,7 @@ public class TestPulsarPrimitiveSchemaHandler {
                 null,
                 null,
                 StringSchema.utf8().getSchemaInfo(),
-                null);
+                null, PulsarSqlSchemaInfoProvider.Type.NONE);
 
         String stringValue = "test";
         
when(rawMessage.getData()).thenReturn(ByteBufAllocator.DEFAULT.buffer().writeBytes(StringSchema.utf8().encode(stringValue)));
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java
index c40002e..4d9c60a 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java
@@ -22,12 +22,22 @@ import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.impl.schema.JSONSchema;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.schema.KeyValue;
+import org.apache.pulsar.common.schema.KeyValueEncodingType;
+import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
+import org.awaitility.Awaitility;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
+import java.util.concurrent.TimeUnit;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
 @Slf4j
 public class TestBasicPresto extends TestPulsarSQLBase {
 
@@ -57,6 +67,62 @@ public class TestBasicPresto extends TestPulsarSQLBase {
         pulsarSQLBasicTest(topicName, false, false);
     }
 
+    @DataProvider(name = "keyValueEncodingType")
+    public Object[][] keyValueEncodingType() {
+        return new Object[][] { { KeyValueEncodingType.INLINE }, { 
KeyValueEncodingType.SEPARATED } };
+    }
+
+    @Test(dataProvider = "keyValueEncodingType")
+    public void testKeyValueSchema(KeyValueEncodingType type) throws Exception 
{
+        waitPulsarSQLReady();
+        TopicName topicName = TopicName.get("public/default/stocks" + 
randomName(20));
+        @Cleanup
+        PulsarClient pulsarClient = PulsarClient.builder()
+                .serviceUrl(pulsarCluster.getPlainTextServiceUrl())
+                .build();
+
+        @Cleanup
+        Producer<KeyValue<Stock,Stock>> producer = 
pulsarClient.newProducer(Schema
+                .KeyValue(Schema.AVRO(Stock.class), Schema.AVRO(Stock.class), 
type))
+                .topic(topicName.toString())
+                .create();
+
+        for (int i = 0 ; i < NUM_OF_STOCKS; ++i) {
+            int j = 100 * i;
+            final Stock stock1 = new Stock(j, "STOCK_" + j , 100.0 + j * 10);
+            final Stock stock2 = new Stock(i, "STOCK_" + i , 100.0 + i * 10);
+            producer.send(new KeyValue<>(stock1, stock2));
+        }
+
+        producer.flush();
+
+        validateMetadata(topicName);
+
+        Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(
+                () -> {
+                    ContainerExecResult containerExecResult = execQuery(
+                            String.format("select * from pulsar.\"%s\".\"%s\" 
order by entryid;",
+                                    topicName.getNamespace(), 
topicName.getLocalName()));
+                    assertThat(containerExecResult.getExitCode()).isEqualTo(0);
+                    log.info("select sql query output \n{}", 
containerExecResult.getStdout());
+                    String[] split = 
containerExecResult.getStdout().split("\n");
+                    assertThat(split.length).isEqualTo(NUM_OF_STOCKS);
+                    String[] split2 = 
containerExecResult.getStdout().split("\n|,");
+                    for (int i = 0; i < NUM_OF_STOCKS; ++i) {
+                        int j = 100 * i;
+                        assertThat(split2).contains("\"" + i + "\"");
+                        assertThat(split2).contains("\"" + "STOCK_" + i + 
"\"");
+                        assertThat(split2).contains("\"" + (100.0 + i * 10) + 
"\"");
+
+                        assertThat(split2).contains("\"" + j + "\"");
+                        assertThat(split2).contains("\"" + "STOCK_" + j + 
"\"");
+                        assertThat(split2).contains("\"" + (100.0 + j * 10) + 
"\"");
+                    }
+                }
+        );
+
+    }
+
     @Override
     protected int prepareData(TopicName topicName, boolean isBatch, boolean 
useNsOffloadPolices) throws Exception {
         @Cleanup
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPulsarSQLBase.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPulsarSQLBase.java
index b4caa9f..adf40c6 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPulsarSQLBase.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPulsarSQLBase.java
@@ -57,7 +57,7 @@ public class TestPulsarSQLBase extends PulsarSQLTestSuite {
         validateData(topic, messageCnt);
     }
 
-    private void waitPulsarSQLReady() throws Exception {
+    public void waitPulsarSQLReady() throws Exception {
         // wait until presto worker started
         ContainerExecResult result;
         do {
@@ -102,7 +102,7 @@ public class TestPulsarSQLBase extends PulsarSQLTestSuite {
         throw new Exception("Unsupported operation prepareData.");
     }
 
-    private void validateMetadata(TopicName topicName) throws Exception {
+    public void validateMetadata(TopicName topicName) throws Exception {
         ContainerExecResult result = execQuery("show schemas in pulsar;");
         assertThat(result.getExitCode()).isEqualTo(0);
         assertThat(result.getStdout()).contains(topicName.getNamespace());
@@ -122,7 +122,7 @@ public class TestPulsarSQLBase extends PulsarSQLTestSuite {
         );
     }
 
-    private void validateData(TopicName topicName, int messageNum) throws 
Exception {
+    public void validateData(TopicName topicName, int messageNum) throws 
Exception {
         String namespace = topicName.getNamespace();
         String topic = topicName.getLocalName();
 

Reply via email to