merlimat commented on a change in pull request #12536:
URL: https://github.com/apache/pulsar/pull/12536#discussion_r742145102



##########
File path: 
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
##########
@@ -77,6 +77,7 @@
     private int newEntriesCheckDelayInMillis = 10;
     private Clock clock = Clock.systemUTC();
     private ManagedLedgerInterceptor managedLedgerInterceptor;
+    private ManagedLedgerInterceptor managedLedgerPayloadProcessor;

Review comment:
       If we're using the same interface, couldn't we have a single 
`managedLedgerInterceptor`  instance>

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptors.java
##########
@@ -102,6 +108,78 @@ public void beforeSendMessage(Subscription subscription,
         }
     }
 
+    @Override
+    public void consumerCreated(ServerCnx cnx,
+                                 Consumer consumer,
+                                 Map<String, String> metadata) {
+        for (BrokerInterceptorWithClassLoader value : interceptors.values()) {

Review comment:
       I know this was already the style of other methods here, though we 
should try to avoid the allocation of an iterator instance in the case where 
there are no interceptors. eg: 
   
   ```java
   if (interceptors.isEmpty()) {
     return;
   }
   
   for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
        // .... 
   }
   ```

##########
File path: 
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheImpl.java
##########
@@ -221,7 +221,7 @@ private void asyncReadEntry0(ReadHandle lh, PositionImpl 
position, final ReadEnt
                             Iterator<LedgerEntry> iterator = 
ledgerEntries.iterator();
                             if (iterator.hasNext()) {
                                 LedgerEntry ledgerEntry = iterator.next();
-                                EntryImpl returnEntry = 
EntryImpl.create(ledgerEntry);
+                                EntryImpl returnEntry = 
EntryImpl.create(ledgerEntry, 
ml.getConfig().getManagedLedgerPayloadProcessor());

Review comment:
       Maybe we can store the interceptor as a member variable of 
`EntryCacheImpl`.

##########
File path: 
pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/PulsarAdminBuilder.java
##########
@@ -264,4 +264,11 @@ PulsarAdminBuilder authentication(String 
authPluginClassName, Map<String, String
      */
     PulsarAdminBuilder setContextClassLoader(ClassLoader 
clientBuilderClassLoader);
 
+    /**
+     *  Set extra headers.
+     * @param extraClientHeaders
+     * @return
+     */
+    PulsarAdminBuilder extraClientHeaders(String extraClientHeaders);

Review comment:
       I think this change should go in a separate PR since it's a bit 
different from the payload interceptor change.
   
   Also, it's not clear from API perspective how the extra headers are going to 
be used and what is the format of that string is.

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
##########
@@ -307,6 +308,7 @@ public TransportCnx getCnx() {
     }
 
     private static final class MessagePublishContext implements 
PublishContext, Runnable {
+        Map<String, Object> propertyMap = new HashMap<>();

Review comment:
       It's not clear why we need to use the property map here and we should 
try to avoid the allocation if it's not being used.

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/ManagedLedgerPayloadProcessor.java
##########
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.intercept;
+
+import io.netty.buffer.ByteBuf;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.mledger.impl.OpAddEntry;
+import org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.common.intercept.MessagePayloadProcessor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ManagedLedgerPayloadProcessor implements ManagedLedgerInterceptor 
{
+    private static final Logger log = 
LoggerFactory.getLogger(ManagedLedgerPayloadProcessor.class);
+    private static final String INDEX = "index";
+    private ServiceConfiguration serviceConfig;
+
+
+    private final Set<MessagePayloadProcessor> brokerEntryPayloadProcessors;
+    public ManagedLedgerPayloadProcessor(ServiceConfiguration serviceConfig,
+                                         Set<MessagePayloadProcessor> 
brokerEntryPayloadProcessors) {
+        this.serviceConfig = serviceConfig;
+        this.brokerEntryPayloadProcessors = brokerEntryPayloadProcessors;
+    }
+    @Override
+    public OpAddEntry beforeAddEntry(OpAddEntry op, int numberOfMessages) {
+        return null;
+    }
+    @Override
+    public ByteBuf beforeStoreEntryToLedger(OpAddEntry op, ByteBuf ledgerData) 
{
+        Map<String, Object> contextMap = new HashMap<>();
+        ByteBuf dataToStore = ledgerData;
+
+        for (MessagePayloadProcessor payloadProcessor : 
brokerEntryPayloadProcessors) {
+            dataToStore = payloadProcessor.interceptIn(
+                serviceConfig.getClusterName(),
+                dataToStore, contextMap);
+            if (op.getCtx() instanceof Topic.PublishContext){
+                Topic.PublishContext ctx = (Topic.PublishContext) op.getCtx();
+                contextMap.forEach((k, v) -> {
+                    ctx.setProperty(k, v);
+                });
+            }
+        }
+        return dataToStore;
+    }
+
+    @Override
+    public ByteBuf beforeCacheEntryFromLedger(ByteBuf ledgerData){
+        ByteBuf dataToCache = ledgerData;
+        for (MessagePayloadProcessor payloadProcessor : 
brokerEntryPayloadProcessors) {
+            dataToCache = payloadProcessor.interceptOut(dataToCache);
+        }
+        return dataToCache;
+    }
+
+    @Override
+    public void onManagedLedgerPropertiesInitialize(Map<String, String> 
propertiesMap) {
+    }
+    @Override
+    public CompletableFuture<Void> onManagedLedgerLastLedgerInitialize(String 
name, LedgerHandle lh) {
+        CompletableFuture<Void> promise = new CompletableFuture<>();
+        promise.complete(null);
+        return promise;

Review comment:
       ```suggestion
           return CompletableFuture.completedFuture(null);
   ```

##########
File path: 
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/intercept/ManagedLedgerInterceptor.java
##########
@@ -58,4 +59,23 @@
      * @param propertiesMap  map of properties.
      */
     void onUpdateManagedLedgerInfo(Map<String, String> propertiesMap);
+
+    /**
+     * Intercept after entry is read from ledger, before it gets cached.
+     * @param ledgerData data from ledger
+     * @return data after processing, if any
+     */
+    default ByteBuf beforeCacheEntryFromLedger(ByteBuf ledgerData){

Review comment:
       I think we need to be very explicit here (and careful :) ) on the 
reference-count expectations here for the buffers, to avoid memory leaks or 
use-after-destroyed scenarios. 
   

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
##########
@@ -1420,11 +1435,15 @@ protected void handleAck(CommandAck ack) {
         final long consumerId = ack.getConsumerId();
 
         if (consumerFuture != null && consumerFuture.isDone() && 
!consumerFuture.isCompletedExceptionally()) {
+            Consumer consumer = consumerFuture.getNow(null);

Review comment:
       Since we have the `consumer` variable here, we can also change the next 
line to avoid doing `getNow()` again. 

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptors.java
##########
@@ -102,6 +108,78 @@ public void beforeSendMessage(Subscription subscription,
         }
     }
 
+    @Override
+    public void consumerCreated(ServerCnx cnx,
+                                 Consumer consumer,
+                                 Map<String, String> metadata) {
+        for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
+            value.consumerCreated(
+                    cnx,
+                    consumer,
+                    metadata);
+        }
+    }
+
+    @Override
+    public void producerCreated(ServerCnx cnx, Producer producer,
+                                 Map<String, String> metadata){
+        for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
+            value.producerCreated(cnx, producer, metadata);
+        }
+    }
+
+    @Override
+    public void messageProduced(ServerCnx cnx, Producer producer, long 
startTimeNs, long ledgerId,
+                                 long entryId, Rate rateIn,
+                                 Topic.PublishContext publishContext) {
+        for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
+            value.messageProduced(cnx, producer, startTimeNs, ledgerId, 
entryId, rateIn, publishContext);
+        }
+    }
+
+    @Override
+    public  void messageDispatched(ServerCnx cnx, Consumer consumer, long 
ledgerId,
+                                   long entryId, ByteBuf headersAndPayload) {
+        for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
+            value.messageDispatched(cnx, consumer, ledgerId, entryId, 
headersAndPayload);
+        }
+    }
+
+    @Override
+    public void messageAcked(ServerCnx cnx, Consumer consumer,
+                              CommandAck ackCmd) {
+        for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
+            value.messageAcked(cnx, consumer, ackCmd);
+        }
+    }
+
+    @Override
+    public boolean delegateSuperUserCheck(){
+        for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
+            if (value.delegateSuperUserCheck()) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    @Override
+    public boolean isSuperUser(PulsarService pulsarService, String appId, 
String originalPrincipal){

Review comment:
       I think this would be more suitable to custom authorization provider, 
where there is already the `isSuperUser()` method. 

##########
File path: 
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java
##########
@@ -47,12 +48,22 @@ protected EntryImpl newObject(Handle<EntryImpl> handle) {
     ByteBuf data;
 
     public static EntryImpl create(LedgerEntry ledgerEntry) {
+        return create(ledgerEntry,null);
+    }
+
+    public static EntryImpl create(LedgerEntry ledgerEntry, 
ManagedLedgerInterceptor managedLedgerInterceptor) {
         EntryImpl entry = RECYCLER.get();
         entry.timestamp = System.nanoTime();
         entry.ledgerId = ledgerEntry.getLedgerId();
         entry.entryId = ledgerEntry.getEntryId();
         entry.data = ledgerEntry.getEntryBuffer();
-        entry.data.retain();
+        ByteBuf cacheBuf = entry.data;
+        if(managedLedgerInterceptor != null)
+            cacheBuf = 
managedLedgerInterceptor.beforeCacheEntryFromLedger(entry.data);
+        if(cacheBuf != null && cacheBuf != entry.data)
+            entry.data = cacheBuf;
+        else
+            entry.data.retain();

Review comment:
       Instead of checking the equality of the buffer instance, the interceptor 
should have a well defined semantic on the expectation of the reference count 
of the returned buffer.

##########
File path: 
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
##########
@@ -77,6 +77,7 @@
     private int newEntriesCheckDelayInMillis = 10;
     private Clock clock = Clock.systemUTC();
     private ManagedLedgerInterceptor managedLedgerInterceptor;
+    private ManagedLedgerInterceptor managedLedgerPayloadProcessor;

Review comment:
       If we're using the same interface, couldn't we have a single 
`managedLedgerInterceptor`  instance>

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptors.java
##########
@@ -102,6 +108,78 @@ public void beforeSendMessage(Subscription subscription,
         }
     }
 
+    @Override
+    public void consumerCreated(ServerCnx cnx,
+                                 Consumer consumer,
+                                 Map<String, String> metadata) {
+        for (BrokerInterceptorWithClassLoader value : interceptors.values()) {

Review comment:
       I know this was already the style of other methods here, though we 
should try to avoid the allocation of an iterator instance in the case where 
there are no interceptors. eg: 
   
   ```java
   if (interceptors.isEmpty()) {
     return;
   }
   
   for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
        // .... 
   }
   ```

##########
File path: 
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheImpl.java
##########
@@ -221,7 +221,7 @@ private void asyncReadEntry0(ReadHandle lh, PositionImpl 
position, final ReadEnt
                             Iterator<LedgerEntry> iterator = 
ledgerEntries.iterator();
                             if (iterator.hasNext()) {
                                 LedgerEntry ledgerEntry = iterator.next();
-                                EntryImpl returnEntry = 
EntryImpl.create(ledgerEntry);
+                                EntryImpl returnEntry = 
EntryImpl.create(ledgerEntry, 
ml.getConfig().getManagedLedgerPayloadProcessor());

Review comment:
       Maybe we can store the interceptor as a member variable of 
`EntryCacheImpl`.

##########
File path: 
pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/PulsarAdminBuilder.java
##########
@@ -264,4 +264,11 @@ PulsarAdminBuilder authentication(String 
authPluginClassName, Map<String, String
      */
     PulsarAdminBuilder setContextClassLoader(ClassLoader 
clientBuilderClassLoader);
 
+    /**
+     *  Set extra headers.
+     * @param extraClientHeaders
+     * @return
+     */
+    PulsarAdminBuilder extraClientHeaders(String extraClientHeaders);

Review comment:
       I think this change should go in a separate PR since it's a bit 
different from the payload interceptor change.
   
   Also, it's not clear from API perspective how the extra headers are going to 
be used and what is the format of that string is.

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
##########
@@ -307,6 +308,7 @@ public TransportCnx getCnx() {
     }
 
     private static final class MessagePublishContext implements 
PublishContext, Runnable {
+        Map<String, Object> propertyMap = new HashMap<>();

Review comment:
       It's not clear why we need to use the property map here and we should 
try to avoid the allocation if it's not being used.

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/ManagedLedgerPayloadProcessor.java
##########
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.intercept;
+
+import io.netty.buffer.ByteBuf;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.mledger.impl.OpAddEntry;
+import org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.common.intercept.MessagePayloadProcessor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ManagedLedgerPayloadProcessor implements ManagedLedgerInterceptor 
{
+    private static final Logger log = 
LoggerFactory.getLogger(ManagedLedgerPayloadProcessor.class);
+    private static final String INDEX = "index";
+    private ServiceConfiguration serviceConfig;
+
+
+    private final Set<MessagePayloadProcessor> brokerEntryPayloadProcessors;
+    public ManagedLedgerPayloadProcessor(ServiceConfiguration serviceConfig,
+                                         Set<MessagePayloadProcessor> 
brokerEntryPayloadProcessors) {
+        this.serviceConfig = serviceConfig;
+        this.brokerEntryPayloadProcessors = brokerEntryPayloadProcessors;
+    }
+    @Override
+    public OpAddEntry beforeAddEntry(OpAddEntry op, int numberOfMessages) {
+        return null;
+    }
+    @Override
+    public ByteBuf beforeStoreEntryToLedger(OpAddEntry op, ByteBuf ledgerData) 
{
+        Map<String, Object> contextMap = new HashMap<>();
+        ByteBuf dataToStore = ledgerData;
+
+        for (MessagePayloadProcessor payloadProcessor : 
brokerEntryPayloadProcessors) {
+            dataToStore = payloadProcessor.interceptIn(
+                serviceConfig.getClusterName(),
+                dataToStore, contextMap);
+            if (op.getCtx() instanceof Topic.PublishContext){
+                Topic.PublishContext ctx = (Topic.PublishContext) op.getCtx();
+                contextMap.forEach((k, v) -> {
+                    ctx.setProperty(k, v);
+                });
+            }
+        }
+        return dataToStore;
+    }
+
+    @Override
+    public ByteBuf beforeCacheEntryFromLedger(ByteBuf ledgerData){
+        ByteBuf dataToCache = ledgerData;
+        for (MessagePayloadProcessor payloadProcessor : 
brokerEntryPayloadProcessors) {
+            dataToCache = payloadProcessor.interceptOut(dataToCache);
+        }
+        return dataToCache;
+    }
+
+    @Override
+    public void onManagedLedgerPropertiesInitialize(Map<String, String> 
propertiesMap) {
+    }
+    @Override
+    public CompletableFuture<Void> onManagedLedgerLastLedgerInitialize(String 
name, LedgerHandle lh) {
+        CompletableFuture<Void> promise = new CompletableFuture<>();
+        promise.complete(null);
+        return promise;

Review comment:
       ```suggestion
           return CompletableFuture.completedFuture(null);
   ```

##########
File path: 
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/intercept/ManagedLedgerInterceptor.java
##########
@@ -58,4 +59,23 @@
      * @param propertiesMap  map of properties.
      */
     void onUpdateManagedLedgerInfo(Map<String, String> propertiesMap);
+
+    /**
+     * Intercept after entry is read from ledger, before it gets cached.
+     * @param ledgerData data from ledger
+     * @return data after processing, if any
+     */
+    default ByteBuf beforeCacheEntryFromLedger(ByteBuf ledgerData){

Review comment:
       I think we need to be very explicit here (and careful :) ) on the 
reference-count expectations here for the buffers, to avoid memory leaks or 
use-after-destroyed scenarios. 
   

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
##########
@@ -1420,11 +1435,15 @@ protected void handleAck(CommandAck ack) {
         final long consumerId = ack.getConsumerId();
 
         if (consumerFuture != null && consumerFuture.isDone() && 
!consumerFuture.isCompletedExceptionally()) {
+            Consumer consumer = consumerFuture.getNow(null);

Review comment:
       Since we have the `consumer` variable here, we can also change the next 
line to avoid doing `getNow()` again. 

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptors.java
##########
@@ -102,6 +108,78 @@ public void beforeSendMessage(Subscription subscription,
         }
     }
 
+    @Override
+    public void consumerCreated(ServerCnx cnx,
+                                 Consumer consumer,
+                                 Map<String, String> metadata) {
+        for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
+            value.consumerCreated(
+                    cnx,
+                    consumer,
+                    metadata);
+        }
+    }
+
+    @Override
+    public void producerCreated(ServerCnx cnx, Producer producer,
+                                 Map<String, String> metadata){
+        for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
+            value.producerCreated(cnx, producer, metadata);
+        }
+    }
+
+    @Override
+    public void messageProduced(ServerCnx cnx, Producer producer, long 
startTimeNs, long ledgerId,
+                                 long entryId, Rate rateIn,
+                                 Topic.PublishContext publishContext) {
+        for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
+            value.messageProduced(cnx, producer, startTimeNs, ledgerId, 
entryId, rateIn, publishContext);
+        }
+    }
+
+    @Override
+    public  void messageDispatched(ServerCnx cnx, Consumer consumer, long 
ledgerId,
+                                   long entryId, ByteBuf headersAndPayload) {
+        for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
+            value.messageDispatched(cnx, consumer, ledgerId, entryId, 
headersAndPayload);
+        }
+    }
+
+    @Override
+    public void messageAcked(ServerCnx cnx, Consumer consumer,
+                              CommandAck ackCmd) {
+        for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
+            value.messageAcked(cnx, consumer, ackCmd);
+        }
+    }
+
+    @Override
+    public boolean delegateSuperUserCheck(){
+        for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
+            if (value.delegateSuperUserCheck()) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    @Override
+    public boolean isSuperUser(PulsarService pulsarService, String appId, 
String originalPrincipal){

Review comment:
       I think this would be more suitable to custom authorization provider, 
where there is already the `isSuperUser()` method. 

##########
File path: 
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java
##########
@@ -47,12 +48,22 @@ protected EntryImpl newObject(Handle<EntryImpl> handle) {
     ByteBuf data;
 
     public static EntryImpl create(LedgerEntry ledgerEntry) {
+        return create(ledgerEntry,null);
+    }
+
+    public static EntryImpl create(LedgerEntry ledgerEntry, 
ManagedLedgerInterceptor managedLedgerInterceptor) {
         EntryImpl entry = RECYCLER.get();
         entry.timestamp = System.nanoTime();
         entry.ledgerId = ledgerEntry.getLedgerId();
         entry.entryId = ledgerEntry.getEntryId();
         entry.data = ledgerEntry.getEntryBuffer();
-        entry.data.retain();
+        ByteBuf cacheBuf = entry.data;
+        if(managedLedgerInterceptor != null)
+            cacheBuf = 
managedLedgerInterceptor.beforeCacheEntryFromLedger(entry.data);
+        if(cacheBuf != null && cacheBuf != entry.data)
+            entry.data = cacheBuf;
+        else
+            entry.data.retain();

Review comment:
       Instead of checking the equality of the buffer instance, the interceptor 
should have a well defined semantic on the expectation of the reference count 
of the returned buffer.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to