This is an automated email from the ASF dual-hosted git repository.

bogong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new e5619cf  [Pulsar SQL] Handle message null schema version in 
PulsarRecordCursor (#12809)
e5619cf is described below

commit e5619cffce702d9f446c27e69927148e45797b28
Author: ran <[email protected]>
AuthorDate: Tue Nov 16 13:16:58 2021 +0800

    [Pulsar SQL] Handle message null schema version in PulsarRecordCursor 
(#12809)
    
    ### Motivation
    
    Currently, if the schema version of the message is null, the Pulsar SQL 
will encounter an NPE problem.
    
    ### Modifications
    
    Adjust logic for null schema version in `PulsarRecordCursor`.
    
    1. If the schema type of pulsarSplit is NONE or BYTES, use the BYTES schema.
    2. If the schema type of pulsarSplit is BYTEBUFFER, use the BYTEBUFFER 
schema.
    3. If the schema version of the message is null, use the latest schema of 
the topic.
    4. If the schema version of the message is not null, get the specific 
version schema by PulsarAdmin.
    5. If the final schema is null throw a runtime exception.
---
 .../pulsar/sql/presto/PulsarRecordCursor.java      |  42 ++++++--
 .../sql/presto/PulsarSqlSchemaInfoProvider.java    |   9 +-
 .../pulsar/sql/presto/TestPulsarRecordCursor.java  | 107 ++++++++++++++++++++-
 3 files changed, 146 insertions(+), 12 deletions(-)

diff --git 
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
 
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
index f1e2bdb..b1230d3 100644
--- 
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
+++ 
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
@@ -61,6 +61,7 @@ import org.apache.pulsar.common.api.raw.RawMessage;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
+import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion;
 import org.apache.pulsar.common.schema.KeyValue;
 import org.apache.pulsar.common.schema.KeyValueEncodingType;
 import org.apache.pulsar.common.schema.SchemaInfo;
@@ -479,14 +480,7 @@ public class PulsarRecordCursor implements RecordCursor {
         //start time for deseralizing record
         metricsTracker.start_RECORD_DESERIALIZE_TIME();
 
-        SchemaInfo schemaInfo = 
getBytesSchemaInfo(pulsarSplit.getSchemaType(), pulsarSplit.getSchemaName());
-        try {
-            if (schemaInfo == null) {
-                schemaInfo =  
schemaInfoProvider.getSchemaByVersion(this.currentMessage.getSchemaVersion()).get();
-            }
-        } catch (InterruptedException | ExecutionException e) {
-            throw new RuntimeException(e);
-        }
+        SchemaInfo schemaInfo = getSchemaInfo(pulsarSplit);
 
         Map<ColumnHandle, FieldValueProvider> currentRowValuesMap = new 
HashMap<>();
 
@@ -600,6 +594,38 @@ public class PulsarRecordCursor implements RecordCursor {
         return true;
     }
 
+    /**
+     * Get the schemaInfo of the message.
+     *
+     * 1. If the schema type of pulsarSplit is NONE or BYTES, use the BYTES 
schema.
+     * 2. If the schema type of pulsarSplit is BYTEBUFFER, use the BYTEBUFFER 
schema.
+     * 3. If the schema version of the message is null, use the schema info of 
pulsarSplit.
+     * 4. If the schema version of the message is not null, get the specific 
version schema by PulsarAdmin.
+     * 5. If the final schema is null throw a runtime exception.
+     */
+    private SchemaInfo getSchemaInfo(PulsarSplit pulsarSplit) {
+        SchemaInfo schemaInfo = 
getBytesSchemaInfo(pulsarSplit.getSchemaType(), pulsarSplit.getSchemaName());
+        if (schemaInfo != null) {
+            return schemaInfo;
+        }
+        try {
+            if (this.currentMessage.getSchemaVersion() == null) {
+                schemaInfo = pulsarSplit.getSchemaInfo();
+            } else {
+                schemaInfo =  
schemaInfoProvider.getSchemaByVersion(this.currentMessage.getSchemaVersion()).get();
+            }
+        } catch (InterruptedException | ExecutionException e) {
+            throw new RuntimeException(e);
+        }
+        if (schemaInfo == null) {
+            String schemaVersion = this.currentMessage.getSchemaVersion() == 
null
+                    ? "null" : 
BytesSchemaVersion.of(this.currentMessage.getSchemaVersion()).toString();
+            throw new RuntimeException("The specific version (" + 
schemaVersion + ") schema of the table "
+                    + pulsarSplit.getTableName() + " is null");
+        }
+        return schemaInfo;
+    }
+
     private SchemaInfo getBytesSchemaInfo(SchemaType schemaType, String 
schemaName) {
         if (!schemaType.equals(SchemaType.BYTES) && 
!schemaType.equals(SchemaType.NONE)) {
             return null;
diff --git 
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSqlSchemaInfoProvider.java
 
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSqlSchemaInfoProvider.java
index 3a9233c..828ceef 100644
--- 
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSqlSchemaInfoProvider.java
+++ 
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSqlSchemaInfoProvider.java
@@ -102,8 +102,13 @@ public class PulsarSqlSchemaInfoProvider implements 
SchemaInfoProvider {
         ClassLoader originalContextLoader = 
Thread.currentThread().getContextClassLoader();
         try {
             
Thread.currentThread().setContextClassLoader(InjectionManagerFactory.class.getClassLoader());
-            return pulsarAdmin.schemas()
-                    .getSchemaInfo(topicName.toString(), 
ByteBuffer.wrap(bytesSchemaVersion.get()).getLong());
+            long version = ByteBuffer.wrap(bytesSchemaVersion.get()).getLong();
+            SchemaInfo schemaInfo = 
pulsarAdmin.schemas().getSchemaInfo(topicName.toString(), version);
+            if (schemaInfo == null) {
+                throw new RuntimeException(
+                        "The specific version (" + version + ") schema of the 
topic " + topicName + " is null");
+            }
+            return schemaInfo;
         } finally {
             
Thread.currentThread().setContextClassLoader(originalContextLoader);
         }
diff --git 
a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarRecordCursor.java
 
b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarRecordCursor.java
index 610a1c2..d60ff20 100644
--- 
a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarRecordCursor.java
+++ 
b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarRecordCursor.java
@@ -34,18 +34,31 @@ import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.bookkeeper.mledger.impl.ReadOnlyCursorImpl;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats;
 import org.apache.bookkeeper.stats.NullStatsProvider;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.Schemas;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl;
 import org.apache.pulsar.common.api.proto.MessageMetadata;
+import org.apache.pulsar.common.api.raw.RawMessage;
+import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion;
 import org.apache.pulsar.common.schema.KeyValue;
 import org.apache.pulsar.common.schema.KeyValueEncodingType;
+import org.apache.pulsar.common.schema.LongSchemaVersion;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaType;
+import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 import org.testng.annotations.Test;
 
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
 import java.nio.charset.Charset;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.LinkedList;
@@ -56,6 +69,8 @@ import static 
java.util.concurrent.CompletableFuture.completedFuture;
 import static 
org.apache.pulsar.common.protocol.Commands.serializeMetadataAndPayload;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
@@ -65,6 +80,7 @@ import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
 
 public class TestPulsarRecordCursor extends TestPulsarConnector {
 
@@ -323,9 +339,14 @@ public class TestPulsarRecordCursor extends 
TestPulsarConnector {
 
                                     MessageMetadata messageMetadata =
                                             new MessageMetadata()
-                                                    
.setProducerName("test-producer").setSequenceId(positions.get(topic))
+                                                    
.setProducerName("test-producer")
+                                                    
.setSequenceId(positions.get(topic))
                                                     
.setPublishTime(System.currentTimeMillis());
 
+                                    if (i % 2 == 0) {
+                                        messageMetadata.setSchemaVersion(new 
LongSchemaVersion(1L).bytes());
+                                    }
+
                                     if 
(KeyValueEncodingType.SEPARATED.equals(schema.getKeyValueEncodingType())) {
                                         messageMetadata
                                                 .setPartitionKey(new 
String(schema
@@ -380,7 +401,7 @@ public class TestPulsarRecordCursor extends 
TestPulsarConnector {
         PulsarSplit split = new PulsarSplit(0, pulsarConnectorId.toString(),
                 topicName.getNamespace(), topicName.getLocalName(), 
topicName.getLocalName(),
                 entriesNum,
-                new String(schema.getSchemaInfo().getSchema()),
+                new String(schema.getSchemaInfo().getSchema(),  "ISO8859-1"),
                 schema.getSchemaInfo().getType(),
                 0, entriesNum,
                 0, 0, TupleDomain.all(),
@@ -416,4 +437,86 @@ public class TestPulsarRecordCursor extends 
TestPulsarConnector {
         private Double field3;
     }
 
+    @Test
+    public void testGetSchemaInfo() throws Exception {
+        String topic = "get-schema-test";
+        PulsarSplit pulsarSplit = Mockito.mock(PulsarSplit.class);
+        
Mockito.when(pulsarSplit.getTableName()).thenReturn(TopicName.get(topic).getLocalName());
+        Mockito.when(pulsarSplit.getSchemaName()).thenReturn("public/default");
+        PulsarAdmin pulsarAdmin = Mockito.mock(PulsarAdmin.class);
+        Schemas schemas = Mockito.mock(Schemas.class);
+        Mockito.when(pulsarAdmin.schemas()).thenReturn(schemas);
+        PulsarConnectorConfig connectorConfig = spy(new 
PulsarConnectorConfig());
+        Mockito.when(connectorConfig.getPulsarAdmin()).thenReturn(pulsarAdmin);
+        PulsarRecordCursor pulsarRecordCursor = spy(new PulsarRecordCursor(
+                new ArrayList<>(), pulsarSplit, connectorConfig, 
Mockito.mock(ManagedLedgerFactory.class),
+                new ManagedLedgerConfig(), null, null));
+
+        Class<PulsarRecordCursor> clazz =  PulsarRecordCursor.class;
+        Method getSchemaInfo = clazz.getDeclaredMethod("getSchemaInfo", 
PulsarSplit.class);
+        getSchemaInfo.setAccessible(true);
+        Field currentMessage = clazz.getDeclaredField("currentMessage");
+        currentMessage.setAccessible(true);
+        RawMessage rawMessage = Mockito.mock(RawMessage.class);
+        currentMessage.set(pulsarRecordCursor, rawMessage);
+
+        // If the schemaType of pulsarSplit is NONE or BYTES, using bytes 
schema
+        Mockito.when(pulsarSplit.getSchemaType()).thenReturn(SchemaType.NONE);
+        SchemaInfo schemaInfo = (SchemaInfo) 
getSchemaInfo.invoke(pulsarRecordCursor, pulsarSplit);
+        assertEquals(SchemaType.BYTES, schemaInfo.getType());
+
+        Mockito.when(pulsarSplit.getSchemaType()).thenReturn(SchemaType.BYTES);
+        schemaInfo = (SchemaInfo) getSchemaInfo.invoke(pulsarRecordCursor, 
pulsarSplit);
+        assertEquals(SchemaType.BYTES, schemaInfo.getType());
+
+        
Mockito.when(pulsarSplit.getSchemaName()).thenReturn(Schema.BYTEBUFFER.getSchemaInfo().getName());
+        schemaInfo = (SchemaInfo) getSchemaInfo.invoke(pulsarRecordCursor, 
pulsarSplit);
+        assertEquals(SchemaType.BYTES, schemaInfo.getType());
+
+        // If the schemaVersion of the message is not null, try to get the 
schema.
+        Mockito.when(pulsarSplit.getSchemaType()).thenReturn(SchemaType.AVRO);
+        Mockito.when(rawMessage.getSchemaVersion()).thenReturn(new 
LongSchemaVersion(0).bytes());
+        Mockito.when(schemas.getSchemaInfo(anyString(), eq(0L)))
+                .thenReturn(Schema.AVRO(Foo.class).getSchemaInfo());
+        schemaInfo = (SchemaInfo) getSchemaInfo.invoke(pulsarRecordCursor, 
pulsarSplit);
+        assertEquals(SchemaType.AVRO, schemaInfo.getType());
+
+        String schemaTopic = "persistent://public/default/" + topic;
+
+        // If the schemaVersion of the message is null and the schema of 
pulsarSplit is null, throw runtime exception.
+        Mockito.when(pulsarSplit.getSchemaInfo()).thenReturn(null);
+        Mockito.when(rawMessage.getSchemaVersion()).thenReturn(null);
+        try {
+            schemaInfo = (SchemaInfo) getSchemaInfo.invoke(pulsarRecordCursor, 
pulsarSplit);
+            fail("The message schema version is null and the latest schema is 
null, should fail.");
+        } catch (InvocationTargetException e) {
+            assertTrue(e.getCause() instanceof RuntimeException);
+            assertTrue(e.getCause().getMessage().contains("schema of the table 
" + topic + " is null"));
+        }
+
+        // If the schemaVersion of the message is null, try to get the latest 
schema.
+        Mockito.when(rawMessage.getSchemaVersion()).thenReturn(null);
+        
Mockito.when(pulsarSplit.getSchemaInfo()).thenReturn(Schema.AVRO(Foo.class).getSchemaInfo());
+        schemaInfo = (SchemaInfo) getSchemaInfo.invoke(pulsarRecordCursor, 
pulsarSplit);
+        assertEquals(Schema.AVRO(Foo.class).getSchemaInfo(), schemaInfo);
+
+        // If the specific version schema is null, throw runtime exception.
+        Mockito.when(rawMessage.getSchemaVersion()).thenReturn(new 
LongSchemaVersion(1L).bytes());
+        Mockito.when(schemas.getSchemaInfo(schemaTopic, 1)).thenReturn(null);
+        try {
+            schemaInfo = (SchemaInfo) getSchemaInfo.invoke(pulsarRecordCursor, 
pulsarSplit);
+            fail("The specific version " + 1 + " schema is null, should 
fail.");
+        } catch (InvocationTargetException e) {
+            String schemaVersion = BytesSchemaVersion.of(new 
LongSchemaVersion(1L).bytes()).toString();
+            assertTrue(e.getCause() instanceof RuntimeException);
+            assertTrue(e.getCause().getMessage().contains("schema of the topic 
" + schemaTopic + " is null"));
+        }
+
+        // Get the specific version schema.
+        Mockito.when(rawMessage.getSchemaVersion()).thenReturn(new 
LongSchemaVersion(2L).bytes());
+        Mockito.when(schemas.getSchemaInfo(schemaTopic, 
2)).thenReturn(Schema.AVRO(Foo.class).getSchemaInfo());
+        schemaInfo = (SchemaInfo) getSchemaInfo.invoke(pulsarRecordCursor, 
pulsarSplit);
+        assertEquals(Schema.AVRO(Foo.class).getSchemaInfo(), schemaInfo);
+    }
+
 }

Reply via email to