This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 33dd4f2a7e8 [schema][client][improve] Add decode InputStream for 
Schema (#16659)
33dd4f2a7e8 is described below

commit 33dd4f2a7e8c78ff96ed2795fef4fd4ab35e3d89
Author: Tao Jiuming <[email protected]>
AuthorDate: Tue Aug 2 11:37:47 2022 +0800

    [schema][client][improve] Add decode InputStream for Schema (#16659)
---
 .../pulsar/broker/transaction/TransactionTest.java |  1 +
 .../client/impl/BrokerClientIntegrationTest.java   |  7 ++++---
 .../java/org/apache/pulsar/schema/SchemaTest.java  | 11 +++-------
 .../java/org/apache/pulsar/client/api/Schema.java  | 16 ++++++++++++++-
 .../org/apache/pulsar/client/impl/MessageImpl.java | 12 +++++++++--
 .../client/impl/schema/AbstractStructSchema.java   | 24 ++++++++++++++++++++++
 .../client/impl/schema/AutoConsumeSchema.java      |  9 ++++++++
 .../integration/functions/PulsarFunctionsTest.java |  9 ++++----
 8 files changed, 71 insertions(+), 18 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index 8ef53590fde..7a3eabdb318 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -1319,6 +1319,7 @@ public class TransactionTest extends TransactionTestBase {
         // Mock brokerService.
         BrokerService brokerService = mock(BrokerService.class);
         when(brokerService.getPulsar()).thenReturn(pulsar);
+        when(brokerService.pulsar()).thenReturn(pulsar);
         // Mock topic.
         PersistentTopic topic = mock(PersistentTopic.class);
         when(topic.getBrokerService()).thenReturn(brokerService);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
index 6d6bb9e3f1f..0039fc92cb8 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
@@ -40,6 +40,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import io.netty.buffer.ByteBuf;
+import java.io.InputStream;
 import java.lang.reflect.Field;
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
@@ -788,7 +789,7 @@ public class BrokerClientIntegrationTest extends 
ProducerConsumerBase {
         TestMessageObject object = new TestMessageObject();
         SchemaReader<TestMessageObject> reader = 
Mockito.mock(SchemaReader.class);
         SchemaWriter<TestMessageObject> writer = 
Mockito.mock(SchemaWriter.class);
-        Mockito.when(reader.read(Mockito.any(byte[].class), 
Mockito.any(byte[].class))).thenReturn(object);
+        Mockito.when(reader.read(Mockito.any(InputStream.class), 
Mockito.any(byte[].class))).thenReturn(object);
         
Mockito.when(writer.write(Mockito.any(TestMessageObject.class))).thenReturn("fake
 data".getBytes(StandardCharsets.UTF_8));
         SchemaDefinition<TestMessageObject> schemaDefinition = new 
SchemaDefinitionBuilderImpl<TestMessageObject>()
                 .withPojo(TestMessageObject.class)
@@ -809,7 +810,7 @@ public class BrokerClientIntegrationTest extends 
ProducerConsumerBase {
             TestMessageObject testObject = consumer.receive().getValue();
             Assert.assertEquals(object.getValue(), testObject.getValue());
             Mockito.verify(writer, Mockito.times(1)).write(Mockito.any());
-            Mockito.verify(reader, 
Mockito.times(1)).read(Mockito.any(byte[].class), Mockito.any(byte[].class));
+            Mockito.verify(reader, 
Mockito.times(1)).read(Mockito.any(InputStream.class), 
Mockito.any(byte[].class));
         }
     }
 
@@ -844,7 +845,7 @@ public class BrokerClientIntegrationTest extends 
ProducerConsumerBase {
             Assert.assertEquals(object.getValue(), testObject.getValue());
 
             Mockito.verify(writer, Mockito.times(1)).write(Mockito.any());
-            Mockito.verify(reader, 
Mockito.times(1)).read(Mockito.any(byte[].class));
+            Mockito.verify(reader, 
Mockito.times(1)).read(Mockito.any(InputStream.class));
         }
     }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
index a9a1efe2ced..46e0f5dfc73 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
@@ -24,6 +24,7 @@ import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertThrows;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 import static org.testng.internal.junit.ArrayAsserts.assertArrayEquals;
@@ -1253,14 +1254,8 @@ public class SchemaTest extends 
MockedPulsarServiceBaseTest {
         Message<User> message1 = consumer.receive();
         Assert.assertEquals(test, message1.getValue());
         Message<User> message2 = consumer.receive();
-        try {
-            message2.getValue();
-        } catch (SchemaSerializationException e) {
-            final String schemaString =
-                    new 
String(Schema.AVRO(User.class).getSchemaInfo().getSchema(), 
StandardCharsets.UTF_8);
-            Assert.assertTrue(e.getMessage().contains(schemaString));
-            Assert.assertTrue(e.getMessage().contains("payload (4 bytes)"));
-        }
+
+        assertThrows(SchemaSerializationException.class, message2::getValue);
     }
 
     @EqualsAndHashCode
diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java
index 0692c48f151..e1cbb629757 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java
@@ -46,7 +46,7 @@ import org.apache.pulsar.common.schema.SchemaType;
  */
 @InterfaceAudience.Public
 @InterfaceStability.Stable
-public interface Schema<T> extends Cloneable{
+public interface Schema<T> extends Cloneable {
 
     /**
      * Check if the message is a valid object for this schema.
@@ -120,6 +120,20 @@ public interface Schema<T> extends Cloneable{
         return decode(bytes);
     }
 
+    /**
+     * Decode a ByteBuffer into an object using a given version. <br/>
+     *
+     * @param data
+     *            the ByteBuffer to decode
+     * @return the deserialized object
+     */
+    default T decode(ByteBuffer data) {
+        if (data == null) {
+            return null;
+        }
+        return decode(getBytes(data));
+    }
+
     /**
      * Decode a ByteBuffer into an object using a given version. <br/>
      *
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
index 2fb9311b4b5..4970f95b7fc 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
@@ -488,11 +488,19 @@ public class MessageImpl<T> implements Message<T> {
         if (value != null) {
             return value;
         }
+
         if (null == schemaVersion) {
-            return schema.decode(getData());
+            return schema.decode(getByteBuffer());
         } else {
-            return schema.decode(getData(), schemaVersion);
+            return schema.decode(getByteBuffer(), schemaVersion);
+        }
+    }
+
+    private ByteBuffer getByteBuffer() {
+        if (msgMetadata.isNullValue()) {
+            return null;
         }
+        return this.payload.nioBuffer();
     }
 
     private T getKeyValueBySchemaVersion() {
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AbstractStructSchema.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AbstractStructSchema.java
index c61dd3c8d65..106d05af185 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AbstractStructSchema.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AbstractStructSchema.java
@@ -20,9 +20,14 @@ package org.apache.pulsar.client.impl.schema;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufInputStream;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.List;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.ExecutionException;
+import org.apache.avro.util.ByteBufferInputStream;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SchemaSerializationException;
 import org.apache.pulsar.client.api.schema.SchemaInfoProvider;
@@ -66,6 +71,25 @@ public abstract class AbstractStructSchema<T> extends 
AbstractSchema<T> {
         return reader.read(bytes, schemaVersion);
     }
 
+    @Override
+    public T decode(ByteBuffer buffer) {
+        if (buffer == null) {
+            return null;
+        }
+        List<ByteBuffer> buffers = Collections.singletonList(buffer);
+        return this.reader.read(new ByteBufferInputStream(buffers));
+    }
+
+    @Override
+    public T decode(ByteBuffer buffer, byte[] schemaVersion) {
+        if (buffer == null) {
+            return null;
+        }
+        List<ByteBuffer> buffers = Collections.singletonList(buffer);
+        InputStream input = new ByteBufferInputStream(buffers);
+        return this.reader.read(input, schemaVersion);
+    }
+
     @Override
     public T decode(ByteBuf byteBuf) {
         return reader.read(new ByteBufInputStream(byteBuf));
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
index 5e7568b8e18..cbc27183c36 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.client.impl.schema;
 import static com.google.common.base.Preconditions.checkState;
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.apache.avro.Schema.Type.RECORD;
+import java.nio.ByteBuffer;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
@@ -120,6 +121,14 @@ public class AutoConsumeSchema implements 
Schema<GenericRecord> {
         return adapt(schemaMap.get(sv).decode(bytes, schemaVersion), 
schemaVersion);
     }
 
+    @Override
+    public GenericRecord decode(ByteBuffer buffer, byte[] schemaVersion) {
+        SchemaVersion sv = getSchemaVersion(schemaVersion);
+        fetchSchemaIfNeeded(sv);
+        ensureSchemaInitialized(sv);
+        return adapt(schemaMap.get(sv).decode(buffer, schemaVersion), 
schemaVersion);
+    }
+
     @Override
     public void setSchemaInfoProvider(SchemaInfoProvider schemaInfoProvider) {
         this.schemaInfoProvider = schemaInfoProvider;
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
index e313c32d4d2..2b837b55f49 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
@@ -1598,11 +1598,12 @@ public abstract class PulsarFunctionsTest extends 
PulsarFunctionsTestBase {
                         
Json.pretty(pulsarAdmin.topics().getInternalStats(inputTopic, true)));
                 log.info("Output topic internal-stats: {}",
                         
Json.pretty(pulsarAdmin.topics().getInternalStats(outputTopic, true)));
+            } else {
+                String logMsg = new String(msg.getValue(), UTF_8);
+                log.info("Received message: '{}'", logMsg);
+                assertTrue(expectedMessages.contains(logMsg), "Message '" + 
logMsg + "' not expected");
+                expectedMessages.remove(logMsg);
             }
-            String logMsg = new String(msg.getValue(), UTF_8);
-            log.info("Received message: '{}'", logMsg);
-            assertTrue(expectedMessages.contains(logMsg), "Message '" + logMsg 
+ "' not expected");
-            expectedMessages.remove(logMsg);
         }
 
         consumer.close();

Reply via email to