This is an automated email from the ASF dual-hosted git repository.
yashmayya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 2eeecc5b6d3 Fix GRPC_BYTES_SENT broker metric for gRPC query server
(#17422)
2eeecc5b6d3 is described below
commit 2eeecc5b6d3847a925e69aec7d0ee4ec63f4fd80
Author: Yash Mayya <[email protected]>
AuthorDate: Thu Dec 25 06:46:54 2025 -0800
Fix GRPC_BYTES_SENT broker metric for gRPC query server (#17422)
---
.../apache/pinot/broker/grpc/BrokerGrpcServer.java | 9 +
.../pinot/broker/grpc/BrokerGrpcServerTest.java | 229 +++++++++++++++++++++
2 files changed, 238 insertions(+)
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/grpc/BrokerGrpcServer.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/grpc/BrokerGrpcServer.java
index b4ba3fcdd02..aa1e8c37eee 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/grpc/BrokerGrpcServer.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/grpc/BrokerGrpcServer.java
@@ -242,6 +242,7 @@ public class BrokerGrpcServer extends
PinotQueryBrokerGrpc.PinotQueryBrokerImplB
}
brokerResponse.emitBrokerResponseMetrics(_brokerMetrics);
responseObserver.onNext(errorBlock);
+ _brokerMetrics.addMeteredGlobalValue(BrokerMeter.GRPC_BYTES_SENT,
errorBlock.getSerializedSize());
responseObserver.onCompleted();
return;
}
@@ -278,10 +279,14 @@ public class BrokerGrpcServer extends
PinotQueryBrokerGrpc.PinotQueryBrokerImplB
throw new RuntimeException(e);
}
responseObserver.onNext(emptyOrErrorBlock);
+ _brokerMetrics.addMeteredGlobalValue(BrokerMeter.GRPC_BYTES_SENT,
emptyOrErrorBlock.getSerializedSize());
responseObserver.onCompleted();
return;
}
// Handle query response:
+ // Track total bytes sent for metrics
+ long totalBytesSent = 0;
+
// First block is metadata
try {
Broker.BrokerResponse metadataBlock =
@@ -289,6 +294,7 @@ public class BrokerGrpcServer extends
PinotQueryBrokerGrpc.PinotQueryBrokerImplB
.setPayload(ByteString.copyFrom(brokerResponse.toMetadataJsonString().getBytes()))
.build();
responseObserver.onNext(metadataBlock);
+ totalBytesSent += metadataBlock.getSerializedSize();
} catch (IOException e) {
responseObserver.onError(e);
throw new RuntimeException(e);
@@ -299,6 +305,7 @@ public class BrokerGrpcServer extends
PinotQueryBrokerGrpc.PinotQueryBrokerImplB
Broker.BrokerResponse.newBuilder().setPayload(ByteString.copyFrom(resultTable.getDataSchema().toBytes()))
.build();
responseObserver.onNext(schemaBlock);
+ totalBytesSent += schemaBlock.getSerializedSize();
} catch (IOException e) {
responseObserver.onError(e);
throw new RuntimeException(e);
@@ -334,11 +341,13 @@ public class BrokerGrpcServer extends
PinotQueryBrokerGrpc.PinotQueryBrokerImplB
.putMetadata("encoding", encodingAlgorithm)
.build();
responseObserver.onNext(dataBlock);
+ totalBytesSent += dataBlock.getSerializedSize();
} catch (Exception e) {
responseObserver.onError(e);
throw new RuntimeException(e);
}
}
+ _brokerMetrics.addMeteredGlobalValue(BrokerMeter.GRPC_BYTES_SENT,
totalBytesSent);
responseObserver.onCompleted();
}
diff --git
a/pinot-broker/src/test/java/org/apache/pinot/broker/grpc/BrokerGrpcServerTest.java
b/pinot-broker/src/test/java/org/apache/pinot/broker/grpc/BrokerGrpcServerTest.java
new file mode 100644
index 00000000000..649b7dee418
--- /dev/null
+++
b/pinot-broker/src/test/java/org/apache/pinot/broker/grpc/BrokerGrpcServerTest.java
@@ -0,0 +1,229 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.grpc;
+
+import io.grpc.stub.StreamObserver;
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.pinot.broker.requesthandler.BrokerRequestHandler;
+import org.apache.pinot.common.metrics.BrokerMeter;
+import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.common.proto.Broker;
+import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.*;
+import static org.testng.Assert.*;
+
+
+public class BrokerGrpcServerTest {
+
+ @Mock
+ private BrokerRequestHandler _brokerRequestHandler;
+ @Mock
+ private BrokerMetrics _brokerMetrics;
+
+ private BrokerGrpcServer _brokerGrpcServer;
+ private int _grpcPort;
+
+ @BeforeMethod
+ public void setUp()
+ throws IOException {
+ MockitoAnnotations.openMocks(this);
+
+ // Find an available port
+ try (ServerSocket socket = new ServerSocket(0)) {
+ _grpcPort = socket.getLocalPort();
+ }
+
+ // Create config with gRPC port so server is created
+ PinotConfiguration config = new PinotConfiguration();
+ config.setProperty(CommonConstants.Broker.Grpc.KEY_OF_GRPC_PORT,
_grpcPort);
+ _brokerGrpcServer = new BrokerGrpcServer(config, "testBroker",
_brokerMetrics, _brokerRequestHandler);
+ }
+
+ @AfterMethod
+ public void tearDown() {
+ if (_brokerGrpcServer != null) {
+ _brokerGrpcServer.shutdown();
+ }
+ }
+
+ @Test
+ public void testGrpcBytesSentMetricOnSqlParsingError() {
+ // Create a request with invalid SQL that will fail parsing
+ Broker.BrokerRequest request = Broker.BrokerRequest.newBuilder()
+ .setSql("INVALID SQL @@@ SYNTAX ERROR")
+ .build();
+
+ // Create a mock StreamObserver to capture responses
+ List<Broker.BrokerResponse> responses = new ArrayList<>();
+ StreamObserver<Broker.BrokerResponse> responseObserver =
createMockStreamObserver(responses);
+
+ // Submit the request - this will fail SQL parsing
+ _brokerGrpcServer.submit(request, responseObserver);
+
+ // Verify that one response was sent
+ assertEquals(responses.size(), 1, "Should have sent exactly one error
response");
+
+ // Verify GRPC_BYTES_SENT metric was recorded
+ ArgumentCaptor<Long> sizeCaptor = ArgumentCaptor.forClass(Long.class);
+
verify(_brokerMetrics).addMeteredGlobalValue(eq(BrokerMeter.GRPC_BYTES_SENT),
sizeCaptor.capture());
+
+ // Verify the recorded size matches the actual response size
+ long expectedSize = responses.get(0).getSerializedSize();
+ assertEquals(sizeCaptor.getValue().longValue(), expectedSize,
+ "GRPC_BYTES_SENT should record the actual response size");
+ assertTrue(expectedSize > 0, "Response size should be greater than 0");
+ }
+
+ @Test
+ public void testGrpcBytesSentMetricOnEmptyResult()
+ throws Exception {
+ // Create a valid SQL request
+ Broker.BrokerRequest request = Broker.BrokerRequest.newBuilder()
+ .setSql("SELECT * FROM testTable")
+ .build();
+
+ // Mock the request handler to return an empty result (no ResultTable)
+ BrokerResponseNative emptyResponse = new BrokerResponseNative();
+ when(_brokerRequestHandler.handleRequest(any(), any(), any(), any(),
any()))
+ .thenReturn(emptyResponse);
+
+ // Create a mock StreamObserver to capture responses
+ List<Broker.BrokerResponse> responses = new ArrayList<>();
+ StreamObserver<Broker.BrokerResponse> responseObserver =
createMockStreamObserver(responses);
+
+ // Submit the request
+ _brokerGrpcServer.submit(request, responseObserver);
+
+ // Verify that one response was sent (empty/null result case)
+ assertEquals(responses.size(), 1, "Should have sent exactly one response
for empty result");
+
+ // Verify GRPC_BYTES_SENT metric was recorded
+ ArgumentCaptor<Long> sizeCaptor = ArgumentCaptor.forClass(Long.class);
+
verify(_brokerMetrics).addMeteredGlobalValue(eq(BrokerMeter.GRPC_BYTES_SENT),
sizeCaptor.capture());
+
+ // Verify the recorded size matches the actual response size
+ long expectedSize = responses.get(0).getSerializedSize();
+ assertEquals(sizeCaptor.getValue().longValue(), expectedSize,
+ "GRPC_BYTES_SENT should record the actual response size");
+ assertTrue(expectedSize > 0, "Response size should be greater than 0");
+ }
+
+ @Test
+ public void testGrpcBytesSentMetricOnSuccessfulQuery()
+ throws Exception {
+ // Create a valid SQL request
+ Broker.BrokerRequest request = Broker.BrokerRequest.newBuilder()
+ .setSql("SELECT col1, col2 FROM testTable")
+ .build();
+
+ // Mock the request handler to return a response with data
+ BrokerResponseNative brokerResponse = new BrokerResponseNative();
+ DataSchema dataSchema = new DataSchema(
+ new String[]{"col1", "col2"},
+ new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING,
DataSchema.ColumnDataType.INT}
+ );
+ List<Object[]> rows = new ArrayList<>();
+ rows.add(new Object[]{"value1", 100});
+ rows.add(new Object[]{"value2", 200});
+ rows.add(new Object[]{"value3", 300});
+ brokerResponse.setResultTable(new ResultTable(dataSchema, rows));
+
+ when(_brokerRequestHandler.handleRequest(any(), any(), any(), any(),
any()))
+ .thenReturn(brokerResponse);
+
+ // Create a mock StreamObserver to capture responses
+ List<Broker.BrokerResponse> responses = new ArrayList<>();
+ StreamObserver<Broker.BrokerResponse> responseObserver =
createMockStreamObserver(responses);
+
+ // Submit the request
+ _brokerGrpcServer.submit(request, responseObserver);
+
+ // Verify that multiple responses were sent (metadata + schema + data
blocks)
+ assertTrue(responses.size() >= 3,
+ "Should have sent at least 3 responses: metadata, schema, and data
block(s)");
+
+ // Calculate total expected size
+ long expectedTotalSize = 0;
+ for (Broker.BrokerResponse response : responses) {
+ expectedTotalSize += response.getSerializedSize();
+ }
+
+ // Verify GRPC_BYTES_SENT metric was recorded with total size
+ ArgumentCaptor<Long> sizeCaptor = ArgumentCaptor.forClass(Long.class);
+
verify(_brokerMetrics).addMeteredGlobalValue(eq(BrokerMeter.GRPC_BYTES_SENT),
sizeCaptor.capture());
+
+ assertEquals(sizeCaptor.getValue().longValue(), expectedTotalSize,
+ "GRPC_BYTES_SENT should record the total response size across all
blocks");
+ assertTrue(expectedTotalSize > 0, "Total response size should be greater
than 0");
+ }
+
+ @Test
+ public void testGrpcBytesReceivedMetric()
+ throws Exception {
+ // Create a request
+ Broker.BrokerRequest request = Broker.BrokerRequest.newBuilder()
+ .setSql("SELECT * FROM testTable")
+ .build();
+
+ // Mock empty response
+ when(_brokerRequestHandler.handleRequest(any(), any(), any(), any(),
any()))
+ .thenReturn(new BrokerResponseNative());
+
+ List<Broker.BrokerResponse> responses = new ArrayList<>();
+ StreamObserver<Broker.BrokerResponse> responseObserver =
createMockStreamObserver(responses);
+
+ // Submit the request
+ _brokerGrpcServer.submit(request, responseObserver);
+
+ // Verify both GRPC_BYTES_RECEIVED and GRPC_BYTES_SENT are tracked
+ verify(_brokerMetrics).addMeteredGlobalValue(
+ eq(BrokerMeter.GRPC_BYTES_RECEIVED), eq((long)
request.getSerializedSize()));
+
verify(_brokerMetrics).addMeteredGlobalValue(eq(BrokerMeter.GRPC_BYTES_SENT),
anyLong());
+ }
+
+ /**
+ * Helper method to create a mock StreamObserver that captures all responses.
+ */
+ private StreamObserver<Broker.BrokerResponse>
createMockStreamObserver(List<Broker.BrokerResponse> responses) {
+ @SuppressWarnings("unchecked")
+ StreamObserver<Broker.BrokerResponse> observer =
mock(StreamObserver.class);
+ doAnswer(invocation -> {
+ responses.add(invocation.getArgument(0));
+ return null;
+ }).when(observer).onNext(any());
+ return observer;
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]