hadesy opened a new issue #8449:
URL: https://github.com/apache/pulsar/issues/8449


   **Describe the bug**
   We create debezium source in the kubernetes cluster,the error on the broker 
is:
   
   ```
   14:13:21.677 [DL-io-0] ERROR 
org.apache.bookkeeper.common.allocator.impl.ByteBufAllocatorImpl - Unable to 
allocate memory
   io.netty.util.internal.OutOfDirectMemoryError: failed to allocate 16777216 
byte(s) of direct memory (used: 5234491399, max: 5242880000)
        at 
io.netty.util.internal.PlatformDependent.incrementMemoryCounter(PlatformDependent.java:742)
 ~[io.netty-netty-common-4.1.48.Final.jar:4.1.48.Final]
        at 
io.netty.util.internal.PlatformDependent.allocateDirectNoCleaner(PlatformDependent.java:697)
 ~[io.netty-netty-common-4.1.48.Final.jar:4.1.48.Final]
        at 
io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:758) 
~[io.netty-netty-buffer-4.1.48.Final.jar:4.1.48.Final]
        at io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:734) 
~[io.netty-netty-buffer-4.1.48.Final.jar:4.1.48.Final]
        at io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:245) 
~[io.netty-netty-buffer-4.1.48.Final.jar:4.1.48.Final]
        at io.netty.buffer.PoolArena.allocate(PoolArena.java:227) 
~[io.netty-netty-buffer-4.1.48.Final.jar:4.1.48.Final]
        at io.netty.buffer.PoolArena.allocate(PoolArena.java:147) 
~[io.netty-netty-buffer-4.1.48.Final.jar:4.1.48.Final]
        at 
io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:356)
 ~[io.netty-netty-buffer-4.1.48.Final.jar:4.1.48.Final]
        at 
io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:187)
 ~[io.netty-netty-buffer-4.1.48.Final.jar:4.1.48.Final]
        at 
org.apache.bookkeeper.common.allocator.impl.ByteBufAllocatorImpl.newDirectBuffer(ByteBufAllocatorImpl.java:164)
 [org.apache.bookkeeper-bookkeeper-common-allocator-4.10.0.jar:4.10.0]
        at 
org.apache.bookkeeper.common.allocator.impl.ByteBufAllocatorImpl.newDirectBuffer(ByteBufAllocatorImpl.java:158)
 [org.apache.bookkeeper-bookkeeper-common-allocator-4.10.0.jar:4.10.0]
        at 
io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:187)
 [io.netty-netty-buffer-4.1.48.Final.jar:4.1.48.Final]
        at 
io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:178)
 [io.netty-netty-buffer-4.1.48.Final.jar:4.1.48.Final]
        at 
io.netty.channel.epoll.AbstractEpollChannel.newDirectBuffer0(AbstractEpollChannel.java:328)
 [io.netty-netty-transport-native-epoll-4.1.48.Final.jar:4.1.48.Final]
        at 
io.netty.channel.epoll.AbstractEpollChannel.newDirectBuffer(AbstractEpollChannel.java:314)
 [io.netty-netty-transport-native-epoll-4.1.48.Final.jar:4.1.48.Final]
        at 
io.netty.channel.epoll.AbstractEpollChannel.newDirectBuffer(AbstractEpollChannel.java:297)
 [io.netty-netty-transport-native-epoll-4.1.48.Final.jar:4.1.48.Final]
        at 
io.netty.channel.epoll.AbstractEpollStreamChannel.filterOutboundMessage(AbstractEpollStreamChannel.java:521)
 [io.netty-netty-transport-native-epoll-4.1.48.Final.jar:4.1.48.Final]
        at 
io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:873) 
[io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
        at 
io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1367)
 [io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
 [io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:709)
 [io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
        at 
io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:792)
 [io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
        at 
io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:702)
 [io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
        at 
org.apache.bookkeeper.util.ByteBufList$Encoder.write(ByteBufList.java:328) 
[org.apache.bookkeeper-bookkeeper-server-4.10.0.jar:4.10.0]
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
 [io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:709)
 [io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
        at 
io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:792)
 [io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
        at 
io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:702)
 [io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
        at 
io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:697)
 [io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
        at 
io.netty.handler.codec.MessageToMessageEncoder.writePromiseCombiner(MessageToMessageEncoder.java:137)
 [io.netty-netty-codec-4.1.48.Final.jar:4.1.48.Final]
        at 
io.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:119)
 [io.netty-netty-codec-4.1.48.Final.jar:4.1.48.Final]
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
 [io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:709)
 [io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
        at 
io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:792)
 [io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
        at 
io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:702)
 [io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
        at 
org.apache.bookkeeper.proto.BookieProtoEncoding$RequestEncoder.write(BookieProtoEncoding.java:391)
 [org.apache.bookkeeper-bookkeeper-server-4.10.0.jar:4.10.0]
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
 [io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:709)
 [io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
        at 
io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:792)
 [io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
        at 
io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:702)
 [io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
        at 
io.netty.channel.ChannelDuplexHandler.write(ChannelDuplexHandler.java:115) 
[io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
        at 
org.apache.bookkeeper.proto.AuthHandler$ClientSideHandler.write(AuthHandler.java:336)
 [org.apache.bookkeeper-bookkeeper-server-4.10.0.jar:4.10.0]
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
 [io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:764)
 [io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
        at 
io.netty.channel.AbstractChannelHandlerContext$WriteTask.run(AbstractChannelHandlerContext.java:1104)
 [io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
        at 
io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
 [io.netty-netty-common-4.1.48.Final.jar:4.1.48.Final]
        at 
io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
 [io.netty-netty-common-4.1.48.Final.jar:4.1.48.Final]
        at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:387) 
[io.netty-netty-transport-native-epoll-4.1.48.Final.jar:4.1.48.Final]
        at 
io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
 [io.netty-netty-common-4.1.48.Final.jar:4.1.48.Final]
        at 
io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) 
[io.netty-netty-common-4.1.48.Final.jar:4.1.48.Final]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_252]
   ```
   
   **To Reproduce**
   Steps to reproduce the behavior:
   
   1.Create a cluster
   `helm upgrade --install pulsar pulsar`
   
   2.Create a task
   ```
   PulsarAdmin admin = PulsarAdmin.builder()
                       .serviceHttpUrl("http://172.16.82.42";)
                       .build();
               Map<String, Object> config = new HashMap<>();
               config.put("database.hostname", "127.0.0.1");
               config.put("database.port", "3306");
                        config.put("database.user", "test");
                        config.put("database.password", "test");
                        config.put("database.server.id", "184054");
                        config.put("database.server.name", "dbserver1");
                        config.put("database.whitelist", "test");
                        config.put("database.history", 
"org.apache.pulsar.io.debezium.PulsarDatabaseHistory");
                        config.put("database.history.pulsar.topic", "test");
                        config.put("database.history.pulsar.service.url", 
"pulsar://127.0.0.1:6650");
                        config.put("key.converter", 
"org.apache.kafka.connect.json.JsonConverter");
                        config.put("value.converter", 
"org.apache.kafka.connect.json.JsonConverter");
                        config.put("pulsar.service.url", 
"pulsar://127.0.0.1:6650");
                        config.put("offset.storage.topic", "offset");
   
               admin.sources().createSourceAsync(SourceConfig.builder()
                       .name("cdc-test")
                       .namespace("cdc")
                       .tenant("test")
                       .topicName("binlog_test")
                       .parallelism(1)
                       .archive("builtin://debezium-mysql")
                       .schemaType("json")
                       .configs(config)
                       .build(),null );
   ```
   3. OOM
   
![1](https://user-images.githubusercontent.com/6346047/98123275-9a9c9400-1eec-11eb-8a31-5f5fc05dc6f8.jpg)
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to