[
https://issues.apache.org/jira/browse/CAMEL-19619?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17744639#comment-17744639
]
Kartik edited comment on CAMEL-19619 at 7/19/23 2:37 PM:
---------------------------------------------------------
{code:java}
// code placeholder
from(aEndpoint)
.routeId(aRouteId)
.autoStartup(false)
.setProperty(ICamelConstants.CAMEL_ROUTE_CONFIG_MAP,
constant(aRouteConfigMap))
.throttle(new ExpressionAdapter()
{
/**
* @param exchange
* @return Allowed count
*/
@Override
public Object evaluate(Exchange exchange)
{
return
OnethrottlerProperties.getOrDefault(ICamelConstants.THROTTLE_COUNT, 5000);
}
}, new ExpressionAdapter()
{
/**
* @param exchange Camel exchange
* @return for correlation
*/
@Override
public Object evaluate(Exchange exchange)
{
return aRouteConfigMap.getOrDefault(ICamelConstants.USER,
"default");
}
})
.asyncDelayed((Boolean)
OnethrottlerProperties.getOrDefault(ICamelConstants.THROTTLE_IS_ASYNC_DELAYED,
true))
.timePeriodMillis((Integer)
OnethrottlerProperties.getOrDefault(ICamelConstants.THROTTLE_TIME_PERIOD_MILLIS,
1000 * 60))
.throttle(new ExpressionAdapter()
{
@Override
public Object evaluate(Exchange exchange)
{
return
TwothrottlerProperties.getOrDefault(ICamelConstants.THROTTLE_COUNT, 100);
}
})
.asyncDelayed((Boolean)
TwothrottlerProperties.getOrDefault(ICamelConstants.THROTTLE_IS_ASYNC_DELAYED,
true))
.timePeriodMillis((Integer)
TwothrottlerProperties.getOrDefault(ICamelConstants.THROTTLE_TIME_PERIOD_MILLIS,
1000 * 60))
.process((AsyncProcessor)
aRouteConfigMap.get(ICamelConstants.IDEMPOTENCY_HEADER_GENERATE_PROCESSOR))
.idempotentConsumer(header(ICamelConstants.MESSAGE_ID),
(IdempotentRepository)
aRouteConfigMap.get(ICamelConstants.IDEMPOTENT_REPOSITORY)); {code}
"aEndpoint" is a function argument that is constructed using "Kafka" component
by passing all topic and broker information.
So we have 2 throttles injected in route one is at the user level which uses
"username" to correlate and group it and another at each connection.
!image-2023-07-19-20-05-52-306.png|width=429,height=210!
We can have multiple users who starts multiple routes so to provide fair
resource to all, we have 2 level throttle first one at user level which is
limited to 5000 messages and each route under this is given 100 message.
was (Author: kartikvk1996):
{code:java}
// code placeholder
from(aEndpoint)
.routeId(aRouteId)
.autoStartup(false)
.setProperty(ICamelConstants.CAMEL_ROUTE_CONFIG_MAP,
constant(aRouteConfigMap))
.throttle(new ExpressionAdapter()
{
/**
* @param exchange
* @return Allowed count
*/
@Override
public Object evaluate(Exchange exchange)
{
return
OnethrottlerProperties.getOrDefault(ICamelConstants.THROTTLE_COUNT, 5000);
}
}, new ExpressionAdapter()
{
/**
* @param exchange Camel exchange
* @return for correlation
*/
@Override
public Object evaluate(Exchange exchange)
{
return aRouteConfigMap.getOrDefault(ICamelConstants.USER,
"default");
}
})
.asyncDelayed((Boolean)
OnethrottlerProperties.getOrDefault(ICamelConstants.THROTTLE_IS_ASYNC_DELAYED,
true))
.timePeriodMillis((Integer)
OnethrottlerProperties.getOrDefault(ICamelConstants.THROTTLE_TIME_PERIOD_MILLIS,
1000 * 60))
.throttle(new ExpressionAdapter()
{
@Override
public Object evaluate(Exchange exchange)
{
return
TwothrottlerProperties.getOrDefault(ICamelConstants.THROTTLE_COUNT, 100);
}
})
.asyncDelayed((Boolean)
TwothrottlerProperties.getOrDefault(ICamelConstants.THROTTLE_IS_ASYNC_DELAYED,
true))
.timePeriodMillis((Integer)
TwothrottlerProperties.getOrDefault(ICamelConstants.THROTTLE_TIME_PERIOD_MILLIS,
1000 * 60))
.process((AsyncProcessor)
aRouteConfigMap.get(ICamelConstants.IDEMPOTENCY_HEADER_GENERATE_PROCESSOR))
.idempotentConsumer(header(ICamelConstants.MESSAGE_ID),
(IdempotentRepository)
aRouteConfigMap.get(ICamelConstants.IDEMPOTENT_REPOSITORY)); {code}
So we have 2 throttles injected in route one is at the user level which uses
"username" to correlate and group it and another at each connection.
> Throttle when used in camel-kafka route creates thread leak
> -----------------------------------------------------------
>
> Key: CAMEL-19619
> URL: https://issues.apache.org/jira/browse/CAMEL-19619
> Project: Camel
> Issue Type: Bug
> Components: camel-core
> Affects Versions: 3.14.9
> Reporter: Kartik
> Priority: Critical
> Attachments: image-2023-07-19-19-33-10-591.png,
> image-2023-07-19-19-33-33-151.png, image-2023-07-19-20-05-52-306.png
>
>
> I have used throttle in the camel Kafka route to control the message flow and
> I have 10 routes running initially there will be no throttle thread created
> as soon as the message comes to the Kafka topic and read huge number of
> throttle threads are created.
>
> For 10 routes I see 88 throttle "threads" threads created and are doing no
> work.
>
> !image-2023-07-19-19-33-10-591.png!
>
> !image-2023-07-19-19-33-33-151.png!
>
> Stack trace of each throttle thread.
> "Camel (InfaDefaultCamelContext) thread #51 - Throttle" #135 daemon prio=5
> os_prio=0 tid=0x000001dfee834000 nid=0x2dbc waiting on condition
> [0x000000d5a2dff000]
> java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for <0x000000073da9aaf0> (a
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> at
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2044)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1088)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
> at
> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:750)
>
> Locked ownable synchronizers:
> - None
>
> "Camel (InfaDefaultCamelContext) thread #50 - Throttle" #134 daemon prio=5
> os_prio=0 tid=0x000001dfee835800 nid=0x71d4 waiting on condition
> [0x000000d5a2cfe000]
> java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for <0x000000073db8c0c0> (a
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> at
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2044)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1088)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
> at
> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:750)
>
> Locked ownable synchronizers:
> - None
>
> "Camel (InfaDefaultCamelContext) thread #49 - Throttle" #133 daemon prio=5
> os_prio=0 tid=0x000001dfee82b000 nid=0x5574 waiting on condition
> [0x000000d5a2bfe000]
> java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for <0x000000073de977d8> (a
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> at
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2044)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1088)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
> at
> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:750)
>
> Locked ownable synchronizers:
> - None
>
> "Camel (InfaDefaultCamelContext) thread #48 - Throttle" #132 daemon prio=5
> os_prio=0 tid=0x000001dff3b5e800 nid=0x60cc waiting on condition
> [0x000000d5a2aff000]
> java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for <0x000000073e02bf60> (a
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> at
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2044)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1088)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
> at
> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:750)
>
> Locked ownable synchronizers:
> - None
>
> Any idea if this is a leak or configuration issue
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)