hadesy opened a new issue #8449:
URL: https://github.com/apache/pulsar/issues/8449
**Describe the bug**
We create debezium source in the kubernetes cluster,the error on the broker
is:
```
14:13:21.677 [DL-io-0] ERROR
org.apache.bookkeeper.common.allocator.impl.ByteBufAllocatorImpl - Unable to
allocate memory
io.netty.util.internal.OutOfDirectMemoryError: failed to allocate 16777216
byte(s) of direct memory (used: 5234491399, max: 5242880000)
at
io.netty.util.internal.PlatformDependent.incrementMemoryCounter(PlatformDependent.java:742)
~[io.netty-netty-common-4.1.48.Final.jar:4.1.48.Final]
at
io.netty.util.internal.PlatformDependent.allocateDirectNoCleaner(PlatformDependent.java:697)
~[io.netty-netty-common-4.1.48.Final.jar:4.1.48.Final]
at
io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:758)
~[io.netty-netty-buffer-4.1.48.Final.jar:4.1.48.Final]
at io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:734)
~[io.netty-netty-buffer-4.1.48.Final.jar:4.1.48.Final]
at io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:245)
~[io.netty-netty-buffer-4.1.48.Final.jar:4.1.48.Final]
at io.netty.buffer.PoolArena.allocate(PoolArena.java:227)
~[io.netty-netty-buffer-4.1.48.Final.jar:4.1.48.Final]
at io.netty.buffer.PoolArena.allocate(PoolArena.java:147)
~[io.netty-netty-buffer-4.1.48.Final.jar:4.1.48.Final]
at
io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:356)
~[io.netty-netty-buffer-4.1.48.Final.jar:4.1.48.Final]
at
io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:187)
~[io.netty-netty-buffer-4.1.48.Final.jar:4.1.48.Final]
at
org.apache.bookkeeper.common.allocator.impl.ByteBufAllocatorImpl.newDirectBuffer(ByteBufAllocatorImpl.java:164)
[org.apache.bookkeeper-bookkeeper-common-allocator-4.10.0.jar:4.10.0]
at
org.apache.bookkeeper.common.allocator.impl.ByteBufAllocatorImpl.newDirectBuffer(ByteBufAllocatorImpl.java:158)
[org.apache.bookkeeper-bookkeeper-common-allocator-4.10.0.jar:4.10.0]
at
io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:187)
[io.netty-netty-buffer-4.1.48.Final.jar:4.1.48.Final]
at
io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:178)
[io.netty-netty-buffer-4.1.48.Final.jar:4.1.48.Final]
at
io.netty.channel.epoll.AbstractEpollChannel.newDirectBuffer0(AbstractEpollChannel.java:328)
[io.netty-netty-transport-native-epoll-4.1.48.Final.jar:4.1.48.Final]
at
io.netty.channel.epoll.AbstractEpollChannel.newDirectBuffer(AbstractEpollChannel.java:314)
[io.netty-netty-transport-native-epoll-4.1.48.Final.jar:4.1.48.Final]
at
io.netty.channel.epoll.AbstractEpollChannel.newDirectBuffer(AbstractEpollChannel.java:297)
[io.netty-netty-transport-native-epoll-4.1.48.Final.jar:4.1.48.Final]
at
io.netty.channel.epoll.AbstractEpollStreamChannel.filterOutboundMessage(AbstractEpollStreamChannel.java:521)
[io.netty-netty-transport-native-epoll-4.1.48.Final.jar:4.1.48.Final]
at
io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:873)
[io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
at
io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1367)
[io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
at
io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
[io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
at
io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:709)
[io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
at
io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:792)
[io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
at
io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:702)
[io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
at
org.apache.bookkeeper.util.ByteBufList$Encoder.write(ByteBufList.java:328)
[org.apache.bookkeeper-bookkeeper-server-4.10.0.jar:4.10.0]
at
io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
[io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
at
io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:709)
[io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
at
io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:792)
[io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
at
io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:702)
[io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
at
io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:697)
[io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
at
io.netty.handler.codec.MessageToMessageEncoder.writePromiseCombiner(MessageToMessageEncoder.java:137)
[io.netty-netty-codec-4.1.48.Final.jar:4.1.48.Final]
at
io.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:119)
[io.netty-netty-codec-4.1.48.Final.jar:4.1.48.Final]
at
io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
[io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
at
io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:709)
[io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
at
io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:792)
[io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
at
io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:702)
[io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
at
org.apache.bookkeeper.proto.BookieProtoEncoding$RequestEncoder.write(BookieProtoEncoding.java:391)
[org.apache.bookkeeper-bookkeeper-server-4.10.0.jar:4.10.0]
at
io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
[io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
at
io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:709)
[io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
at
io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:792)
[io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
at
io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:702)
[io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
at
io.netty.channel.ChannelDuplexHandler.write(ChannelDuplexHandler.java:115)
[io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
at
org.apache.bookkeeper.proto.AuthHandler$ClientSideHandler.write(AuthHandler.java:336)
[org.apache.bookkeeper-bookkeeper-server-4.10.0.jar:4.10.0]
at
io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
[io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
at
io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:764)
[io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
at
io.netty.channel.AbstractChannelHandlerContext$WriteTask.run(AbstractChannelHandlerContext.java:1104)
[io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
at
io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
[io.netty-netty-common-4.1.48.Final.jar:4.1.48.Final]
at
io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
[io.netty-netty-common-4.1.48.Final.jar:4.1.48.Final]
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:387)
[io.netty-netty-transport-native-epoll-4.1.48.Final.jar:4.1.48.Final]
at
io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
[io.netty-netty-common-4.1.48.Final.jar:4.1.48.Final]
at
io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
[io.netty-netty-common-4.1.48.Final.jar:4.1.48.Final]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_252]
```
**To Reproduce**
Steps to reproduce the behavior:
1.Create a cluster
`helm upgrade --install pulsar pulsar`
2.Create a task
```
PulsarAdmin admin = PulsarAdmin.builder()
.serviceHttpUrl("http://172.16.82.42")
.build();
Map<String, Object> config = new HashMap<>();
config.put("database.hostname", "127.0.0.1");
config.put("database.port", "3306");
config.put("database.user", "test");
config.put("database.password", "test");
config.put("database.server.id", "184054");
config.put("database.server.name", "dbserver1");
config.put("database.whitelist", "test");
config.put("database.history",
"org.apache.pulsar.io.debezium.PulsarDatabaseHistory");
config.put("database.history.pulsar.topic", "test");
config.put("database.history.pulsar.service.url",
"pulsar://127.0.0.1:6650");
config.put("key.converter",
"org.apache.kafka.connect.json.JsonConverter");
config.put("value.converter",
"org.apache.kafka.connect.json.JsonConverter");
config.put("pulsar.service.url",
"pulsar://127.0.0.1:6650");
config.put("offset.storage.topic", "offset");
admin.sources().createSourceAsync(SourceConfig.builder()
.name("cdc-test")
.namespace("cdc")
.tenant("test")
.topicName("binlog_test")
.parallelism(1)
.archive("builtin://debezium-mysql")
.schemaType("json")
.configs(config)
.build(),null );
```
3. OOM

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]