This is an automated email from the ASF dual-hosted git repository.
rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 3b07def331 use MultiStageRequestIdGenerator in v1 engine (#10966)
3b07def331 is described below
commit 3b07def33122b30cf3820b28ea55397862022217
Author: mingmxu <[email protected]>
AuthorDate: Tue Jun 27 13:44:32 2023 -0700
use MultiStageRequestIdGenerator in v1 engine (#10966)
---
.../requesthandler/BaseBrokerRequestHandler.java | 6 +--
.../requesthandler/BrokerRequestIdGenerator.java | 49 ++++++++++++++++++++++
.../MultiStageBrokerRequestHandler.java | 36 +---------------
.../BaseBrokerRequestHandlerTest.java | 15 ++++---
.../LiteralOnlyBrokerRequestTest.java | 18 ++++----
5 files changed, 71 insertions(+), 53 deletions(-)
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
index 406734812d..e64251493d 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
@@ -34,7 +34,6 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
@@ -115,7 +114,7 @@ public abstract class BaseBrokerRequestHandler implements
BrokerRequestHandler {
protected final TableCache _tableCache;
protected final BrokerMetrics _brokerMetrics;
- protected final AtomicLong _requestIdGenerator = new AtomicLong();
+ protected final BrokerRequestIdGenerator _brokerIdGenerator;
protected final QueryOptimizer _queryOptimizer = new QueryOptimizer();
protected final String _brokerId;
@@ -134,6 +133,7 @@ public abstract class BaseBrokerRequestHandler implements
BrokerRequestHandler {
AccessControlFactory accessControlFactory, QueryQuotaManager
queryQuotaManager, TableCache tableCache,
BrokerMetrics brokerMetrics) {
_brokerId = brokerId;
+ _brokerIdGenerator = new BrokerRequestIdGenerator(brokerId);
_config = config;
_routingManager = routingManager;
_accessControlFactory = accessControlFactory;
@@ -231,7 +231,7 @@ public abstract class BaseBrokerRequestHandler implements
BrokerRequestHandler {
public BrokerResponse handleRequest(JsonNode request, @Nullable
SqlNodeAndOptions sqlNodeAndOptions,
@Nullable RequesterIdentity requesterIdentity, RequestContext
requestContext)
throws Exception {
- long requestId = _requestIdGenerator.incrementAndGet();
+ long requestId = _brokerIdGenerator.get();
requestContext.setRequestId(requestId);
requestContext.setRequestArrivalTimeMillis(System.currentTimeMillis());
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestIdGenerator.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestIdGenerator.java
new file mode 100644
index 0000000000..c97ee44a0a
--- /dev/null
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestIdGenerator.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.requesthandler;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * An ID generator to produce a global unique identifier for each query, used
in v1/v2 engine for tracking and
+ * inter-stage communication(v2 only). It's guaranteed by:
+ * <ol>
+ * <li>
+ * Using a mask computed using the hash-code of the broker-id to ensure
two brokers don't arrive at the same
+ * requestId. This mask becomes the most significant 9 digits (in base-10).
+ * </li>
+ * <li>
+ * Using a auto-incrementing counter for the least significant 9 digits
(in base-10).
+ * </li>
+ * </ol>
+ */
+public class BrokerRequestIdGenerator {
+ private static final long OFFSET = 1_000_000_000L;
+ private final long _mask;
+ private final AtomicLong _incrementingId = new AtomicLong(0);
+
+ public BrokerRequestIdGenerator(String brokerId) {
+ _mask = ((long) (brokerId.hashCode() & Integer.MAX_VALUE)) * OFFSET;
+ }
+
+ public long get() {
+ long normalized = (_incrementingId.getAndIncrement() & Long.MAX_VALUE) %
OFFSET;
+ return _mask + normalized;
+ }
+}
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
index 85839ebe66..a27c772c0a 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
@@ -25,7 +25,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;
import org.apache.calcite.jdbc.CalciteSchemaBuilder;
import org.apache.commons.lang3.StringUtils;
@@ -77,7 +76,6 @@ public class MultiStageBrokerRequestHandler extends
BaseBrokerRequestHandler {
private final MailboxService _mailboxService;
private final QueryEnvironment _queryEnvironment;
private final QueryDispatcher _queryDispatcher;
- private final MultiStageRequestIdGenerator _multistageRequestIdGenerator;
public MultiStageBrokerRequestHandler(PinotConfiguration config, String
brokerIdFromConfig,
BrokerRoutingManager routingManager, AccessControlFactory
accessControlFactory,
@@ -111,15 +109,13 @@ public class MultiStageBrokerRequestHandler extends
BaseBrokerRequestHandler {
// TODO: move this to a startUp() function.
_mailboxService.start();
-
- _multistageRequestIdGenerator = new
MultiStageRequestIdGenerator(brokerIdFromConfig);
}
@Override
public BrokerResponse handleRequest(JsonNode request, @Nullable
SqlNodeAndOptions sqlNodeAndOptions,
@Nullable RequesterIdentity requesterIdentity, RequestContext
requestContext)
throws Exception {
- long requestId = _multistageRequestIdGenerator.get();
+ long requestId = _brokerIdGenerator.get();
requestContext.setRequestId(requestId);
requestContext.setRequestArrivalTimeMillis(System.currentTimeMillis());
@@ -322,34 +318,4 @@ public class MultiStageBrokerRequestHandler extends
BaseBrokerRequestHandler {
_queryDispatcher.shutdown();
_mailboxService.shutdown();
}
-
- /**
- * OpChains in Multistage queries are identified by the requestId and the
stage-id. v1 Engine uses an incrementing
- * long to generate requestId, so the requestIds are numbered [0, 1, 2,
...]. When running with multiple brokers,
- * it could be that two brokers end up generating the same requestId which
could lead to weird query errors. This
- * requestId generator addresses that by:
- * <ol>
- * <li>
- * Using a mask computed using the hash-code of the broker-id to ensure
two brokers don't arrive at the same
- * requestId. This mask becomes the most significant 9 digits (in
base-10).
- * </li>
- * <li>
- * Using a auto-incrementing counter for the least significant 9 digits
(in base-10).
- * </li>
- * </ol>
- */
- static class MultiStageRequestIdGenerator {
- private static final long OFFSET = 1_000_000_000L;
- private final long _mask;
- private final AtomicLong _incrementingId = new AtomicLong(0);
-
- public MultiStageRequestIdGenerator(String brokerId) {
- _mask = ((long) (brokerId.hashCode() & Integer.MAX_VALUE)) * OFFSET;
- }
-
- public long get() {
- long normalized = (_incrementingId.getAndIncrement() & Long.MAX_VALUE) %
OFFSET;
- return _mask + normalized;
- }
- }
}
diff --git
a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandlerTest.java
b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandlerTest.java
index d2ea5c9b7a..7f3cb79749 100644
---
a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandlerTest.java
+++
b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandlerTest.java
@@ -197,16 +197,18 @@ public class BaseBrokerRequestHandlerTest {
BrokerRoutingManager routingManager = mock(BrokerRoutingManager.class);
when(routingManager.routingExists(anyString())).thenReturn(true);
RoutingTable rt = mock(RoutingTable.class);
- when(rt.getServerInstanceToSegmentsMap()).thenReturn(Collections
- .singletonMap(new ServerInstance(new InstanceConfig("server01_9000")),
Collections.singletonList("segment01")));
+ when(rt.getServerInstanceToSegmentsMap()).thenReturn(
+ Collections.singletonMap(new ServerInstance(new
InstanceConfig("server01_9000")),
+ Collections.singletonList("segment01")));
when(routingManager.getRoutingTable(any(),
Mockito.anyLong())).thenReturn(rt);
QueryQuotaManager queryQuotaManager = mock(QueryQuotaManager.class);
when(queryQuotaManager.acquire(anyString())).thenReturn(true);
CountDownLatch latch = new CountDownLatch(1);
+ final long[] testRequestId = {-1};
PinotConfiguration config =
new
PinotConfiguration(Collections.singletonMap("pinot.broker.enable.query.cancellation",
"true"));
BaseBrokerRequestHandler requestHandler =
- new BaseBrokerRequestHandler(config, null, routingManager, new
AllowAllAccessControlFactory(),
+ new BaseBrokerRequestHandler(config, "testBrokerId", routingManager,
new AllowAllAccessControlFactory(),
queryQuotaManager, tableCache,
new BrokerMetrics("", PinotMetricUtils.getPinotMetricsRegistry(),
true, Collections.emptySet())) {
@Override
@@ -225,6 +227,7 @@ public class BaseBrokerRequestHandlerTest {
@Nullable Map<ServerInstance, List<String>>
realtimeRoutingTable, long timeoutMs, ServerStats serverStats,
RequestContext requestContext)
throws Exception {
+ testRequestId[0] = requestId;
latch.await();
return null;
}
@@ -239,12 +242,12 @@ public class BaseBrokerRequestHandlerTest {
throw new RuntimeException(e);
}
});
- TestUtils.waitForCondition((aVoid) ->
requestHandler.getRunningServers(1).size() == 1, 500, 5000,
+ TestUtils.waitForCondition((aVoid) ->
requestHandler.getRunningServers(testRequestId[0]).size() == 1, 500, 5000,
"Failed to submit query");
Map.Entry<Long, String> entry =
requestHandler.getRunningQueries().entrySet().iterator().next();
- Assert.assertEquals(entry.getKey().longValue(), 1);
+ Assert.assertEquals(entry.getKey().longValue(), testRequestId[0]);
Assert.assertTrue(entry.getValue().contains("select * from myTable_OFFLINE
limit 10"));
- Set<ServerInstance> servers = requestHandler.getRunningServers(1);
+ Set<ServerInstance> servers =
requestHandler.getRunningServers(testRequestId[0]);
Assert.assertEquals(servers.size(), 1);
Assert.assertEquals(servers.iterator().next().getHostname(), "server01");
Assert.assertEquals(servers.iterator().next().getPort(), 9000);
diff --git
a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/LiteralOnlyBrokerRequestTest.java
b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/LiteralOnlyBrokerRequestTest.java
index a9ee2a5dee..b85004c48f 100644
---
a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/LiteralOnlyBrokerRequestTest.java
+++
b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/LiteralOnlyBrokerRequestTest.java
@@ -181,9 +181,9 @@ public class LiteralOnlyBrokerRequestTest {
public void testBrokerRequestHandler()
throws Exception {
SingleConnectionBrokerRequestHandler requestHandler =
- new SingleConnectionBrokerRequestHandler(new PinotConfiguration(),
null, null, ACCESS_CONTROL_FACTORY, null,
- null, new BrokerMetrics("",
PinotMetricUtils.getPinotMetricsRegistry(), true, Collections.emptySet()), null,
- null, mock(ServerRoutingStatsManager.class));
+ new SingleConnectionBrokerRequestHandler(new PinotConfiguration(),
"testBrokerId", null, ACCESS_CONTROL_FACTORY,
+ null, null, new BrokerMetrics("",
PinotMetricUtils.getPinotMetricsRegistry(), true, Collections.emptySet()),
+ null, null, mock(ServerRoutingStatsManager.class));
long randNum = RANDOM.nextLong();
byte[] randBytes = new byte[12];
@@ -209,9 +209,9 @@ public class LiteralOnlyBrokerRequestTest {
public void testBrokerRequestHandlerWithAsFunction()
throws Exception {
SingleConnectionBrokerRequestHandler requestHandler =
- new SingleConnectionBrokerRequestHandler(new PinotConfiguration(),
null, null, ACCESS_CONTROL_FACTORY, null,
- null, new BrokerMetrics("",
PinotMetricUtils.getPinotMetricsRegistry(), true, Collections.emptySet()), null,
- null, mock(ServerRoutingStatsManager.class));
+ new SingleConnectionBrokerRequestHandler(new PinotConfiguration(),
"testBrokerId", null, ACCESS_CONTROL_FACTORY,
+ null, null, new BrokerMetrics("",
PinotMetricUtils.getPinotMetricsRegistry(), true, Collections.emptySet()),
+ null, null, mock(ServerRoutingStatsManager.class));
long currentTsMin = System.currentTimeMillis();
JsonNode request = JsonUtils.stringToJsonNode(
"{\"sql\":\"SELECT now() as currentTs, fromDateTime('2020-01-01 UTC',
'yyyy-MM-dd z') as firstDayOf2020\"}");
@@ -416,9 +416,9 @@ public class LiteralOnlyBrokerRequestTest {
public void testExplainPlanLiteralOnly()
throws Exception {
SingleConnectionBrokerRequestHandler requestHandler =
- new SingleConnectionBrokerRequestHandler(new PinotConfiguration(),
null, null, ACCESS_CONTROL_FACTORY, null,
- null, new BrokerMetrics("",
PinotMetricUtils.getPinotMetricsRegistry(), true, Collections.emptySet()), null,
- null, mock(ServerRoutingStatsManager.class));
+ new SingleConnectionBrokerRequestHandler(new PinotConfiguration(),
"testBrokerId", null, ACCESS_CONTROL_FACTORY,
+ null, null, new BrokerMetrics("",
PinotMetricUtils.getPinotMetricsRegistry(), true, Collections.emptySet()),
+ null, null, mock(ServerRoutingStatsManager.class));
// Test 1: select constant
JsonNode request = JsonUtils.stringToJsonNode("{\"sql\":\"EXPLAIN PLAN FOR
SELECT 1.5, 'test'\"}");
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]