This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new d0b4f234e5 Make thrift deserializer threadsafe (#9299)
d0b4f234e5 is described below
commit d0b4f234e5f265f159ce607818c640ab11b2c3aa
Author: Saurabh Dubey <[email protected]>
AuthorDate: Tue Aug 30 22:32:38 2022 +0530
Make thrift deserializer threadsafe (#9299)
---
.../pinot/core/transport/InstanceRequestHandler.java | 17 ++++++++++-------
1 file changed, 10 insertions(+), 7 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java
b/pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java
index 1c3e408744..43e9264007 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java
@@ -73,7 +73,7 @@ public class InstanceRequestHandler extends
SimpleChannelInboundHandler<ByteBuf>
private static final int SLOW_QUERY_LATENCY_THRESHOLD_MS = 100;
private final String _instanceName;
- private final TDeserializer _deserializer;
+ private final ThreadLocal<TDeserializer> _deserializer;
private final QueryScheduler _queryScheduler;
private final ServerMetrics _serverMetrics;
private final AccessControl _accessControl;
@@ -85,11 +85,14 @@ public class InstanceRequestHandler extends
SimpleChannelInboundHandler<ByteBuf>
_queryScheduler = queryScheduler;
_serverMetrics = serverMetrics;
_accessControl = accessControl;
- try {
- _deserializer = new TDeserializer(new TCompactProtocol.Factory());
- } catch (TTransportException e) {
- throw new RuntimeException("Failed to initialize Thrift Deserializer",
e);
- }
+ _deserializer = ThreadLocal.withInitial(() -> {
+ try {
+ return new TDeserializer(new TCompactProtocol.Factory());
+ } catch (TTransportException e) {
+ throw new RuntimeException("Failed to initialize Thrift Deserializer",
e);
+ }
+ });
+
if
(Boolean.parseBoolean(config.getProperty(CommonConstants.Server.CONFIG_OF_ENABLE_QUERY_CANCELLATION)))
{
_queryFuturesById = new ConcurrentHashMap<>();
LOGGER.info("Enable query cancellation");
@@ -135,7 +138,7 @@ public class InstanceRequestHandler extends
SimpleChannelInboundHandler<ByteBuf>
// Parse instance request into ServerQueryRequest.
msg.readBytes(requestBytes);
- _deserializer.deserialize(instanceRequest, requestBytes);
+ _deserializer.get().deserialize(instanceRequest, requestBytes);
queryRequest = new ServerQueryRequest(instanceRequest, _serverMetrics,
queryArrivalTimeMs);
queryRequest.getTimerContext().startNewPhaseTimer(ServerQueryPhase.REQUEST_DESERIALIZATION,
queryArrivalTimeMs)
.stopAndRecord();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]