poorbarcode commented on code in PR #21247:
URL: https://github.com/apache/pulsar/pull/21247#discussion_r1371074368


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java:
##########
@@ -89,7 +89,7 @@ public AbstractReplicator(String localCluster, Topic 
localTopic, String remoteCl
                         localTopicName + "-->" + remoteTopicName,
                 StringUtils.equals(localCluster, remoteCluster) ? localCluster 
: localCluster + "-->" + remoteCluster
         );
-        this.producerBuilder = 
replicationClient.newProducer(Schema.AUTO_PRODUCE_BYTES()) //
+        this.producerBuilder = replicationClient.newProducer(Schema.BYTES) //

Review Comment:
   Seems it is an issue of Schema Service, @liangyepianzhou Could you take a 
look?
   
   <img width="761" alt="Screenshot 2023-10-25 at 10 44 44" 
src="https://github.com/apache/pulsar/assets/25195800/2d4b83d8-3829-4e9f-8944-94b8e97a4d57";>
   
   ```
   2023-10-25T10:46:27,178 - WARN  - [pulsar-io-105-14:AbstractReplicator@151] 
- 
[persistent://pulsar/ns/testReplicatorProducerByBytesSchema-0a1e1c3c-c8f0-4fd6-9121-fac5f109abda
 | r2-->r1] Failed to create remote producer 
(org.apache.avro.AvroRuntimeException: Not a record: "string"), retrying in 
0.198 s
   java.util.concurrent.CompletionException: 
org.apache.avro.AvroRuntimeException: Not a record: "string"
        at 
java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:315)
        at 
java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:320)
        at 
java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1159)
        at 
java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
        at 
java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147)
        at 
org.apache.pulsar.client.impl.BinaryProtoLookupService.lambda$getSchema$11(BinaryProtoLookupService.java:282)
        at 
java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
        at 
java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841)
        at 
java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
        at 
java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147)
        at 
org.apache.pulsar.client.impl.ClientCnx.handleGetSchemaResponse(ClientCnx.java:923)
        at 
org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:360)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
        at 
io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346)
        at 
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
        at 
io.netty.handler.flush.FlushConsolidationHandler.channelRead(FlushConsolidationHandler.java:152)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
        at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1471)
        at 
io.netty.handler.ssl.SslHandler.decodeNonJdkCompatible(SslHandler.java:1345)
        at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1385)
        at 
io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:529)
        at 
io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:468)
        at 
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
        at 
io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
        at 
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
        at 
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
        at 
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
        at 
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
        at 
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
        at 
io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
        at 
io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at 
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.base/java.lang.Thread.run(Thread.java:833)
   Caused by: org.apache.avro.AvroRuntimeException: Not a record: "string"
        at org.apache.avro.Schema.getFields(Schema.java:283)
        at 
org.apache.pulsar.client.impl.schema.generic.GenericSchemaImpl.<init>(GenericSchemaImpl.java:43)
        at 
org.apache.pulsar.client.impl.schema.generic.GenericJsonSchema.<init>(GenericJsonSchema.java:39)
        at 
org.apache.pulsar.client.impl.schema.generic.GenericSchemaImpl.of(GenericSchemaImpl.java:79)
        at 
org.apache.pulsar.client.impl.schema.AutoConsumeSchema.getSchema(AutoConsumeSchema.java:260)
        at 
org.apache.pulsar.client.impl.PulsarClientImplementationBindingImpl.getSchema(PulsarClientImplementationBindingImpl.java:256)
        at org.apache.pulsar.client.api.Schema.getSchema(Schema.java:463)
        at 
org.apache.pulsar.client.impl.PulsarClientImpl.lambda$createProducerAsync$0(PulsarClientImpl.java:361)
        at 
java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1150)
        ... 43 more
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to