This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 33dd4f2a7e8 [schema][client][improve] Add decode InputStream for
Schema (#16659)
33dd4f2a7e8 is described below
commit 33dd4f2a7e8c78ff96ed2795fef4fd4ab35e3d89
Author: Tao Jiuming <[email protected]>
AuthorDate: Tue Aug 2 11:37:47 2022 +0800
[schema][client][improve] Add decode InputStream for Schema (#16659)
---
.../pulsar/broker/transaction/TransactionTest.java | 1 +
.../client/impl/BrokerClientIntegrationTest.java | 7 ++++---
.../java/org/apache/pulsar/schema/SchemaTest.java | 11 +++-------
.../java/org/apache/pulsar/client/api/Schema.java | 16 ++++++++++++++-
.../org/apache/pulsar/client/impl/MessageImpl.java | 12 +++++++++--
.../client/impl/schema/AbstractStructSchema.java | 24 ++++++++++++++++++++++
.../client/impl/schema/AutoConsumeSchema.java | 9 ++++++++
.../integration/functions/PulsarFunctionsTest.java | 9 ++++----
8 files changed, 71 insertions(+), 18 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index 8ef53590fde..7a3eabdb318 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -1319,6 +1319,7 @@ public class TransactionTest extends TransactionTestBase {
// Mock brokerService.
BrokerService brokerService = mock(BrokerService.class);
when(brokerService.getPulsar()).thenReturn(pulsar);
+ when(brokerService.pulsar()).thenReturn(pulsar);
// Mock topic.
PersistentTopic topic = mock(PersistentTopic.class);
when(topic.getBrokerService()).thenReturn(brokerService);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
index 6d6bb9e3f1f..0039fc92cb8 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
@@ -40,6 +40,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import io.netty.buffer.ByteBuf;
+import java.io.InputStream;
import java.lang.reflect.Field;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
@@ -788,7 +789,7 @@ public class BrokerClientIntegrationTest extends
ProducerConsumerBase {
TestMessageObject object = new TestMessageObject();
SchemaReader<TestMessageObject> reader =
Mockito.mock(SchemaReader.class);
SchemaWriter<TestMessageObject> writer =
Mockito.mock(SchemaWriter.class);
- Mockito.when(reader.read(Mockito.any(byte[].class),
Mockito.any(byte[].class))).thenReturn(object);
+ Mockito.when(reader.read(Mockito.any(InputStream.class),
Mockito.any(byte[].class))).thenReturn(object);
Mockito.when(writer.write(Mockito.any(TestMessageObject.class))).thenReturn("fake
data".getBytes(StandardCharsets.UTF_8));
SchemaDefinition<TestMessageObject> schemaDefinition = new
SchemaDefinitionBuilderImpl<TestMessageObject>()
.withPojo(TestMessageObject.class)
@@ -809,7 +810,7 @@ public class BrokerClientIntegrationTest extends
ProducerConsumerBase {
TestMessageObject testObject = consumer.receive().getValue();
Assert.assertEquals(object.getValue(), testObject.getValue());
Mockito.verify(writer, Mockito.times(1)).write(Mockito.any());
- Mockito.verify(reader,
Mockito.times(1)).read(Mockito.any(byte[].class), Mockito.any(byte[].class));
+ Mockito.verify(reader,
Mockito.times(1)).read(Mockito.any(InputStream.class),
Mockito.any(byte[].class));
}
}
@@ -844,7 +845,7 @@ public class BrokerClientIntegrationTest extends
ProducerConsumerBase {
Assert.assertEquals(object.getValue(), testObject.getValue());
Mockito.verify(writer, Mockito.times(1)).write(Mockito.any());
- Mockito.verify(reader,
Mockito.times(1)).read(Mockito.any(byte[].class));
+ Mockito.verify(reader,
Mockito.times(1)).read(Mockito.any(InputStream.class));
}
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
index a9a1efe2ced..46e0f5dfc73 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
@@ -24,6 +24,7 @@ import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertThrows;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import static org.testng.internal.junit.ArrayAsserts.assertArrayEquals;
@@ -1253,14 +1254,8 @@ public class SchemaTest extends
MockedPulsarServiceBaseTest {
Message<User> message1 = consumer.receive();
Assert.assertEquals(test, message1.getValue());
Message<User> message2 = consumer.receive();
- try {
- message2.getValue();
- } catch (SchemaSerializationException e) {
- final String schemaString =
- new
String(Schema.AVRO(User.class).getSchemaInfo().getSchema(),
StandardCharsets.UTF_8);
- Assert.assertTrue(e.getMessage().contains(schemaString));
- Assert.assertTrue(e.getMessage().contains("payload (4 bytes)"));
- }
+
+ assertThrows(SchemaSerializationException.class, message2::getValue);
}
@EqualsAndHashCode
diff --git
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java
index 0692c48f151..e1cbb629757 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java
@@ -46,7 +46,7 @@ import org.apache.pulsar.common.schema.SchemaType;
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
-public interface Schema<T> extends Cloneable{
+public interface Schema<T> extends Cloneable {
/**
* Check if the message is a valid object for this schema.
@@ -120,6 +120,20 @@ public interface Schema<T> extends Cloneable{
return decode(bytes);
}
+ /**
+ * Decode a ByteBuffer into an object using a given version. <br/>
+ *
+ * @param data
+ * the ByteBuffer to decode
+ * @return the deserialized object
+ */
+ default T decode(ByteBuffer data) {
+ if (data == null) {
+ return null;
+ }
+ return decode(getBytes(data));
+ }
+
/**
* Decode a ByteBuffer into an object using a given version. <br/>
*
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
index 2fb9311b4b5..4970f95b7fc 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
@@ -488,11 +488,19 @@ public class MessageImpl<T> implements Message<T> {
if (value != null) {
return value;
}
+
if (null == schemaVersion) {
- return schema.decode(getData());
+ return schema.decode(getByteBuffer());
} else {
- return schema.decode(getData(), schemaVersion);
+ return schema.decode(getByteBuffer(), schemaVersion);
+ }
+ }
+
+ private ByteBuffer getByteBuffer() {
+ if (msgMetadata.isNullValue()) {
+ return null;
}
+ return this.payload.nioBuffer();
}
private T getKeyValueBySchemaVersion() {
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AbstractStructSchema.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AbstractStructSchema.java
index c61dd3c8d65..106d05af185 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AbstractStructSchema.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AbstractStructSchema.java
@@ -20,9 +20,14 @@ package org.apache.pulsar.client.impl.schema;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufInputStream;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
+import org.apache.avro.util.ByteBufferInputStream;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SchemaSerializationException;
import org.apache.pulsar.client.api.schema.SchemaInfoProvider;
@@ -66,6 +71,25 @@ public abstract class AbstractStructSchema<T> extends
AbstractSchema<T> {
return reader.read(bytes, schemaVersion);
}
+ @Override
+ public T decode(ByteBuffer buffer) {
+ if (buffer == null) {
+ return null;
+ }
+ List<ByteBuffer> buffers = Collections.singletonList(buffer);
+ return this.reader.read(new ByteBufferInputStream(buffers));
+ }
+
+ @Override
+ public T decode(ByteBuffer buffer, byte[] schemaVersion) {
+ if (buffer == null) {
+ return null;
+ }
+ List<ByteBuffer> buffers = Collections.singletonList(buffer);
+ InputStream input = new ByteBufferInputStream(buffers);
+ return this.reader.read(input, schemaVersion);
+ }
+
@Override
public T decode(ByteBuf byteBuf) {
return reader.read(new ByteBufInputStream(byteBuf));
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
index 5e7568b8e18..cbc27183c36 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.client.impl.schema;
import static com.google.common.base.Preconditions.checkState;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.avro.Schema.Type.RECORD;
+import java.nio.ByteBuffer;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
@@ -120,6 +121,14 @@ public class AutoConsumeSchema implements
Schema<GenericRecord> {
return adapt(schemaMap.get(sv).decode(bytes, schemaVersion),
schemaVersion);
}
+ @Override
+ public GenericRecord decode(ByteBuffer buffer, byte[] schemaVersion) {
+ SchemaVersion sv = getSchemaVersion(schemaVersion);
+ fetchSchemaIfNeeded(sv);
+ ensureSchemaInitialized(sv);
+ return adapt(schemaMap.get(sv).decode(buffer, schemaVersion),
schemaVersion);
+ }
+
@Override
public void setSchemaInfoProvider(SchemaInfoProvider schemaInfoProvider) {
this.schemaInfoProvider = schemaInfoProvider;
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
index e313c32d4d2..2b837b55f49 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
@@ -1598,11 +1598,12 @@ public abstract class PulsarFunctionsTest extends
PulsarFunctionsTestBase {
Json.pretty(pulsarAdmin.topics().getInternalStats(inputTopic, true)));
log.info("Output topic internal-stats: {}",
Json.pretty(pulsarAdmin.topics().getInternalStats(outputTopic, true)));
+ } else {
+ String logMsg = new String(msg.getValue(), UTF_8);
+ log.info("Received message: '{}'", logMsg);
+ assertTrue(expectedMessages.contains(logMsg), "Message '" +
logMsg + "' not expected");
+ expectedMessages.remove(logMsg);
}
- String logMsg = new String(msg.getValue(), UTF_8);
- log.info("Received message: '{}'", logMsg);
- assertTrue(expectedMessages.contains(logMsg), "Message '" + logMsg
+ "' not expected");
- expectedMessages.remove(logMsg);
}
consumer.close();