This is an automated email from the ASF dual-hosted git repository.
bogong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new e5619cf [Pulsar SQL] Handle message null schema version in
PulsarRecordCursor (#12809)
e5619cf is described below
commit e5619cffce702d9f446c27e69927148e45797b28
Author: ran <[email protected]>
AuthorDate: Tue Nov 16 13:16:58 2021 +0800
[Pulsar SQL] Handle message null schema version in PulsarRecordCursor
(#12809)
### Motivation
Currently, if the schema version of the message is null, the Pulsar SQL
will encounter an NPE problem.
### Modifications
Adjust logic for null schema version in `PulsarRecordCursor`.
1. If the schema type of pulsarSplit is NONE or BYTES, use the BYTES schema.
2. If the schema type of pulsarSplit is BYTEBUFFER, use the BYTEBUFFER
schema.
3. If the schema version of the message is null, use the latest schema of
the topic.
4. If the schema version of the message is not null, get the specific
version schema by PulsarAdmin.
5. If the final schema is null throw a runtime exception.
---
.../pulsar/sql/presto/PulsarRecordCursor.java | 42 ++++++--
.../sql/presto/PulsarSqlSchemaInfoProvider.java | 9 +-
.../pulsar/sql/presto/TestPulsarRecordCursor.java | 107 ++++++++++++++++++++-
3 files changed, 146 insertions(+), 12 deletions(-)
diff --git
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
index f1e2bdb..b1230d3 100644
---
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
+++
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
@@ -61,6 +61,7 @@ import org.apache.pulsar.common.api.raw.RawMessage;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
+import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
import org.apache.pulsar.common.schema.SchemaInfo;
@@ -479,14 +480,7 @@ public class PulsarRecordCursor implements RecordCursor {
//start time for deseralizing record
metricsTracker.start_RECORD_DESERIALIZE_TIME();
- SchemaInfo schemaInfo =
getBytesSchemaInfo(pulsarSplit.getSchemaType(), pulsarSplit.getSchemaName());
- try {
- if (schemaInfo == null) {
- schemaInfo =
schemaInfoProvider.getSchemaByVersion(this.currentMessage.getSchemaVersion()).get();
- }
- } catch (InterruptedException | ExecutionException e) {
- throw new RuntimeException(e);
- }
+ SchemaInfo schemaInfo = getSchemaInfo(pulsarSplit);
Map<ColumnHandle, FieldValueProvider> currentRowValuesMap = new
HashMap<>();
@@ -600,6 +594,38 @@ public class PulsarRecordCursor implements RecordCursor {
return true;
}
+ /**
+ * Get the schemaInfo of the message.
+ *
+ * 1. If the schema type of pulsarSplit is NONE or BYTES, use the BYTES
schema.
+ * 2. If the schema type of pulsarSplit is BYTEBUFFER, use the BYTEBUFFER
schema.
+ * 3. If the schema version of the message is null, use the schema info of
pulsarSplit.
+ * 4. If the schema version of the message is not null, get the specific
version schema by PulsarAdmin.
+ * 5. If the final schema is null throw a runtime exception.
+ */
+ private SchemaInfo getSchemaInfo(PulsarSplit pulsarSplit) {
+ SchemaInfo schemaInfo =
getBytesSchemaInfo(pulsarSplit.getSchemaType(), pulsarSplit.getSchemaName());
+ if (schemaInfo != null) {
+ return schemaInfo;
+ }
+ try {
+ if (this.currentMessage.getSchemaVersion() == null) {
+ schemaInfo = pulsarSplit.getSchemaInfo();
+ } else {
+ schemaInfo =
schemaInfoProvider.getSchemaByVersion(this.currentMessage.getSchemaVersion()).get();
+ }
+ } catch (InterruptedException | ExecutionException e) {
+ throw new RuntimeException(e);
+ }
+ if (schemaInfo == null) {
+ String schemaVersion = this.currentMessage.getSchemaVersion() ==
null
+ ? "null" :
BytesSchemaVersion.of(this.currentMessage.getSchemaVersion()).toString();
+ throw new RuntimeException("The specific version (" +
schemaVersion + ") schema of the table "
+ + pulsarSplit.getTableName() + " is null");
+ }
+ return schemaInfo;
+ }
+
private SchemaInfo getBytesSchemaInfo(SchemaType schemaType, String
schemaName) {
if (!schemaType.equals(SchemaType.BYTES) &&
!schemaType.equals(SchemaType.NONE)) {
return null;
diff --git
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSqlSchemaInfoProvider.java
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSqlSchemaInfoProvider.java
index 3a9233c..828ceef 100644
---
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSqlSchemaInfoProvider.java
+++
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSqlSchemaInfoProvider.java
@@ -102,8 +102,13 @@ public class PulsarSqlSchemaInfoProvider implements
SchemaInfoProvider {
ClassLoader originalContextLoader =
Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(InjectionManagerFactory.class.getClassLoader());
- return pulsarAdmin.schemas()
- .getSchemaInfo(topicName.toString(),
ByteBuffer.wrap(bytesSchemaVersion.get()).getLong());
+ long version = ByteBuffer.wrap(bytesSchemaVersion.get()).getLong();
+ SchemaInfo schemaInfo =
pulsarAdmin.schemas().getSchemaInfo(topicName.toString(), version);
+ if (schemaInfo == null) {
+ throw new RuntimeException(
+ "The specific version (" + version + ") schema of the
topic " + topicName + " is null");
+ }
+ return schemaInfo;
} finally {
Thread.currentThread().setContextClassLoader(originalContextLoader);
}
diff --git
a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarRecordCursor.java
b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarRecordCursor.java
index 610a1c2..d60ff20 100644
---
a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarRecordCursor.java
+++
b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarRecordCursor.java
@@ -34,18 +34,31 @@ import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.mledger.impl.ReadOnlyCursorImpl;
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.bookkeeper.stats.NullStatsProvider;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.Schemas;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl;
import org.apache.pulsar.common.api.proto.MessageMetadata;
+import org.apache.pulsar.common.api.raw.RawMessage;
+import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
+import org.apache.pulsar.common.schema.LongSchemaVersion;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaType;
+import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.testng.annotations.Test;
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
import java.nio.charset.Charset;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedList;
@@ -56,6 +69,8 @@ import static
java.util.concurrent.CompletableFuture.completedFuture;
import static
org.apache.pulsar.common.protocol.Commands.serializeMetadataAndPayload;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
@@ -65,6 +80,7 @@ import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
public class TestPulsarRecordCursor extends TestPulsarConnector {
@@ -323,9 +339,14 @@ public class TestPulsarRecordCursor extends
TestPulsarConnector {
MessageMetadata messageMetadata =
new MessageMetadata()
-
.setProducerName("test-producer").setSequenceId(positions.get(topic))
+
.setProducerName("test-producer")
+
.setSequenceId(positions.get(topic))
.setPublishTime(System.currentTimeMillis());
+ if (i % 2 == 0) {
+ messageMetadata.setSchemaVersion(new
LongSchemaVersion(1L).bytes());
+ }
+
if
(KeyValueEncodingType.SEPARATED.equals(schema.getKeyValueEncodingType())) {
messageMetadata
.setPartitionKey(new
String(schema
@@ -380,7 +401,7 @@ public class TestPulsarRecordCursor extends
TestPulsarConnector {
PulsarSplit split = new PulsarSplit(0, pulsarConnectorId.toString(),
topicName.getNamespace(), topicName.getLocalName(),
topicName.getLocalName(),
entriesNum,
- new String(schema.getSchemaInfo().getSchema()),
+ new String(schema.getSchemaInfo().getSchema(), "ISO8859-1"),
schema.getSchemaInfo().getType(),
0, entriesNum,
0, 0, TupleDomain.all(),
@@ -416,4 +437,86 @@ public class TestPulsarRecordCursor extends
TestPulsarConnector {
private Double field3;
}
+ @Test
+ public void testGetSchemaInfo() throws Exception {
+ String topic = "get-schema-test";
+ PulsarSplit pulsarSplit = Mockito.mock(PulsarSplit.class);
+
Mockito.when(pulsarSplit.getTableName()).thenReturn(TopicName.get(topic).getLocalName());
+ Mockito.when(pulsarSplit.getSchemaName()).thenReturn("public/default");
+ PulsarAdmin pulsarAdmin = Mockito.mock(PulsarAdmin.class);
+ Schemas schemas = Mockito.mock(Schemas.class);
+ Mockito.when(pulsarAdmin.schemas()).thenReturn(schemas);
+ PulsarConnectorConfig connectorConfig = spy(new
PulsarConnectorConfig());
+ Mockito.when(connectorConfig.getPulsarAdmin()).thenReturn(pulsarAdmin);
+ PulsarRecordCursor pulsarRecordCursor = spy(new PulsarRecordCursor(
+ new ArrayList<>(), pulsarSplit, connectorConfig,
Mockito.mock(ManagedLedgerFactory.class),
+ new ManagedLedgerConfig(), null, null));
+
+ Class<PulsarRecordCursor> clazz = PulsarRecordCursor.class;
+ Method getSchemaInfo = clazz.getDeclaredMethod("getSchemaInfo",
PulsarSplit.class);
+ getSchemaInfo.setAccessible(true);
+ Field currentMessage = clazz.getDeclaredField("currentMessage");
+ currentMessage.setAccessible(true);
+ RawMessage rawMessage = Mockito.mock(RawMessage.class);
+ currentMessage.set(pulsarRecordCursor, rawMessage);
+
+ // If the schemaType of pulsarSplit is NONE or BYTES, using bytes
schema
+ Mockito.when(pulsarSplit.getSchemaType()).thenReturn(SchemaType.NONE);
+ SchemaInfo schemaInfo = (SchemaInfo)
getSchemaInfo.invoke(pulsarRecordCursor, pulsarSplit);
+ assertEquals(SchemaType.BYTES, schemaInfo.getType());
+
+ Mockito.when(pulsarSplit.getSchemaType()).thenReturn(SchemaType.BYTES);
+ schemaInfo = (SchemaInfo) getSchemaInfo.invoke(pulsarRecordCursor,
pulsarSplit);
+ assertEquals(SchemaType.BYTES, schemaInfo.getType());
+
+
Mockito.when(pulsarSplit.getSchemaName()).thenReturn(Schema.BYTEBUFFER.getSchemaInfo().getName());
+ schemaInfo = (SchemaInfo) getSchemaInfo.invoke(pulsarRecordCursor,
pulsarSplit);
+ assertEquals(SchemaType.BYTES, schemaInfo.getType());
+
+ // If the schemaVersion of the message is not null, try to get the
schema.
+ Mockito.when(pulsarSplit.getSchemaType()).thenReturn(SchemaType.AVRO);
+ Mockito.when(rawMessage.getSchemaVersion()).thenReturn(new
LongSchemaVersion(0).bytes());
+ Mockito.when(schemas.getSchemaInfo(anyString(), eq(0L)))
+ .thenReturn(Schema.AVRO(Foo.class).getSchemaInfo());
+ schemaInfo = (SchemaInfo) getSchemaInfo.invoke(pulsarRecordCursor,
pulsarSplit);
+ assertEquals(SchemaType.AVRO, schemaInfo.getType());
+
+ String schemaTopic = "persistent://public/default/" + topic;
+
+ // If the schemaVersion of the message is null and the schema of
pulsarSplit is null, throw runtime exception.
+ Mockito.when(pulsarSplit.getSchemaInfo()).thenReturn(null);
+ Mockito.when(rawMessage.getSchemaVersion()).thenReturn(null);
+ try {
+ schemaInfo = (SchemaInfo) getSchemaInfo.invoke(pulsarRecordCursor,
pulsarSplit);
+ fail("The message schema version is null and the latest schema is
null, should fail.");
+ } catch (InvocationTargetException e) {
+ assertTrue(e.getCause() instanceof RuntimeException);
+ assertTrue(e.getCause().getMessage().contains("schema of the table
" + topic + " is null"));
+ }
+
+ // If the schemaVersion of the message is null, try to get the latest
schema.
+ Mockito.when(rawMessage.getSchemaVersion()).thenReturn(null);
+
Mockito.when(pulsarSplit.getSchemaInfo()).thenReturn(Schema.AVRO(Foo.class).getSchemaInfo());
+ schemaInfo = (SchemaInfo) getSchemaInfo.invoke(pulsarRecordCursor,
pulsarSplit);
+ assertEquals(Schema.AVRO(Foo.class).getSchemaInfo(), schemaInfo);
+
+ // If the specific version schema is null, throw runtime exception.
+ Mockito.when(rawMessage.getSchemaVersion()).thenReturn(new
LongSchemaVersion(1L).bytes());
+ Mockito.when(schemas.getSchemaInfo(schemaTopic, 1)).thenReturn(null);
+ try {
+ schemaInfo = (SchemaInfo) getSchemaInfo.invoke(pulsarRecordCursor,
pulsarSplit);
+ fail("The specific version " + 1 + " schema is null, should
fail.");
+ } catch (InvocationTargetException e) {
+ String schemaVersion = BytesSchemaVersion.of(new
LongSchemaVersion(1L).bytes()).toString();
+ assertTrue(e.getCause() instanceof RuntimeException);
+ assertTrue(e.getCause().getMessage().contains("schema of the topic
" + schemaTopic + " is null"));
+ }
+
+ // Get the specific version schema.
+ Mockito.when(rawMessage.getSchemaVersion()).thenReturn(new
LongSchemaVersion(2L).bytes());
+ Mockito.when(schemas.getSchemaInfo(schemaTopic,
2)).thenReturn(Schema.AVRO(Foo.class).getSchemaInfo());
+ schemaInfo = (SchemaInfo) getSchemaInfo.invoke(pulsarRecordCursor,
pulsarSplit);
+ assertEquals(Schema.AVRO(Foo.class).getSchemaInfo(), schemaInfo);
+ }
+
}