This is an automated email from the ASF dual-hosted git repository.
rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new bfb604a883 fix grpc query server not setting max inbound msg size
(#9126)
bfb604a883 is described below
commit bfb604a883a1b63f85fce35fb8fb559cd073a728
Author: Rong Rong <[email protected]>
AuthorDate: Mon Aug 1 16:14:38 2022 -0700
fix grpc query server not setting max inbound msg size (#9126)
* fix grpc query server not setting max inbound msg size
* fix compilation issue for non-default enable profiles
* adjusting config definition to allow pinotconfig to be populated into
grpc config
Co-authored-by: Rong Rong <[email protected]>
---
.../requesthandler/GrpcBrokerRequestHandler.java | 14 ++--
.../org/apache/pinot/common/config/GrpcConfig.java | 77 ++++++++++++++++++++++
.../pinot/common/utils/grpc/GrpcQueryClient.java | 61 ++---------------
.../presto/grpc/PinotStreamingQueryClient.java | 5 +-
.../pinot/core/transport/grpc/GrpcQueryServer.java | 9 ++-
.../OfflineSecureGRPCServerIntegrationTest.java | 5 +-
.../pinot/server/starter/ServerInstance.java | 3 +-
7 files changed, 101 insertions(+), 73 deletions(-)
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java
index 0b3744f018..9c30bacbe7 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java
@@ -28,6 +28,7 @@ import javax.annotation.concurrent.ThreadSafe;
import org.apache.pinot.broker.broker.AccessControlFactory;
import org.apache.pinot.broker.queryquota.QueryQuotaManager;
import org.apache.pinot.broker.routing.BrokerRoutingManager;
+import org.apache.pinot.common.config.GrpcConfig;
import org.apache.pinot.common.config.TlsConfig;
import org.apache.pinot.common.config.provider.TableCache;
import org.apache.pinot.common.metrics.BrokerMetrics;
@@ -53,7 +54,7 @@ import org.slf4j.LoggerFactory;
public class GrpcBrokerRequestHandler extends BaseBrokerRequestHandler {
private static final Logger LOGGER =
LoggerFactory.getLogger(GrpcBrokerRequestHandler.class);
- private final GrpcQueryClient.Config _grpcConfig;
+ private final GrpcConfig _grpcConfig;
private final StreamingReduceService _streamingReduceService;
private final PinotStreamingQueryClient _streamingQueryClient;
@@ -63,7 +64,7 @@ public class GrpcBrokerRequestHandler extends
BaseBrokerRequestHandler {
BrokerMetrics brokerMetrics, TlsConfig tlsConfig) {
super(config, routingManager, accessControlFactory, queryQuotaManager,
tableCache, brokerMetrics);
LOGGER.info("Using Grpc BrokerRequestHandler.");
- _grpcConfig = buildGrpcQueryClientConfig(config);
+ _grpcConfig = GrpcConfig.buildGrpcQueryConfig(config);
// create streaming query client
_streamingQueryClient = new PinotStreamingQueryClient(_grpcConfig);
@@ -125,16 +126,11 @@ public class GrpcBrokerRequestHandler extends
BaseBrokerRequestHandler {
}
}
- // return empty config for now
- private GrpcQueryClient.Config buildGrpcQueryClientConfig(PinotConfiguration
config) {
- return new GrpcQueryClient.Config();
- }
-
public static class PinotStreamingQueryClient {
private final Map<String, GrpcQueryClient> _grpcQueryClientMap = new
ConcurrentHashMap<>();
- private final GrpcQueryClient.Config _config;
+ private final GrpcConfig _config;
- public PinotStreamingQueryClient(GrpcQueryClient.Config config) {
+ public PinotStreamingQueryClient(GrpcConfig config) {
_config = config;
}
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/config/GrpcConfig.java
b/pinot-common/src/main/java/org/apache/pinot/common/config/GrpcConfig.java
new file mode 100644
index 0000000000..3a5c8cdf9e
--- /dev/null
+++ b/pinot-common/src/main/java/org/apache/pinot/common/config/GrpcConfig.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.config;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.Map;
+import org.apache.pinot.common.utils.TlsUtils;
+import org.apache.pinot.spi.env.PinotConfiguration;
+
+
+public class GrpcConfig {
+ public static final String GRPC_TLS_PREFIX = "tls";
+ public static final String CONFIG_USE_PLAIN_TEXT = "usePlainText";
+ public static final String CONFIG_MAX_INBOUND_MESSAGE_BYTES_SIZE =
"maxInboundMessageSizeBytes";
+ // Default max message size to 128MB
+ public static final int DEFAULT_MAX_INBOUND_MESSAGE_BYTES_SIZE = 128 * 1024
* 1024;
+ // Default use plain text for transport
+ private static final String DEFAULT_IS_USE_PLAIN_TEXT = "true";
+
+ private final TlsConfig _tlsConfig;
+ private final PinotConfiguration _pinotConfig;
+
+ public static GrpcConfig buildGrpcQueryConfig(PinotConfiguration
pinotConfig) {
+ return new GrpcConfig(pinotConfig);
+ }
+
+ public GrpcConfig(PinotConfiguration pinotConfig) {
+ _pinotConfig = pinotConfig;
+ _tlsConfig = TlsUtils.extractTlsConfig(_pinotConfig, GRPC_TLS_PREFIX);
+ }
+
+ public GrpcConfig(Map<String, Object> configMap) {
+ this(new PinotConfiguration(configMap));
+ }
+
+ public GrpcConfig(int maxInboundMessageSizeBytes, boolean usePlainText) {
+ this(ImmutableMap.of(CONFIG_MAX_INBOUND_MESSAGE_BYTES_SIZE,
maxInboundMessageSizeBytes, CONFIG_USE_PLAIN_TEXT,
+ usePlainText));
+ }
+
+ // Allow get customized configs.
+ public Object get(String key) {
+ return _pinotConfig.getProperty(key);
+ }
+
+ public int getMaxInboundMessageSizeBytes() {
+ return _pinotConfig.getProperty(CONFIG_MAX_INBOUND_MESSAGE_BYTES_SIZE,
DEFAULT_MAX_INBOUND_MESSAGE_BYTES_SIZE);
+ }
+
+ public boolean isUsePlainText() {
+ return
Boolean.parseBoolean(_pinotConfig.getProperty(CONFIG_USE_PLAIN_TEXT,
DEFAULT_IS_USE_PLAIN_TEXT));
+ }
+
+ public TlsConfig getTlsConfig() {
+ return _tlsConfig;
+ }
+
+ public PinotConfiguration getPinotConfig() {
+ return _pinotConfig;
+ }
+}
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/grpc/GrpcQueryClient.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/grpc/GrpcQueryClient.java
index df074d2581..88611f427d 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/utils/grpc/GrpcQueryClient.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/utils/grpc/GrpcQueryClient.java
@@ -18,24 +18,22 @@
*/
package org.apache.pinot.common.utils.grpc;
-import com.google.common.collect.ImmutableMap;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.netty.shaded.io.grpc.netty.GrpcSslContexts;
import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder;
import io.grpc.netty.shaded.io.netty.handler.ssl.SslContextBuilder;
import io.grpc.netty.shaded.io.netty.handler.ssl.SslProvider;
+import java.util.Collections;
import java.util.Iterator;
-import java.util.Map;
import java.util.concurrent.TimeUnit;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLException;
import javax.net.ssl.TrustManagerFactory;
-import org.apache.pinot.common.config.TlsConfig;
+import org.apache.pinot.common.config.GrpcConfig;
import org.apache.pinot.common.proto.PinotQueryServerGrpc;
import org.apache.pinot.common.proto.Server;
import org.apache.pinot.common.utils.TlsUtils;
-import org.apache.pinot.spi.env.PinotConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -48,10 +46,10 @@ public class GrpcQueryClient {
private final PinotQueryServerGrpc.PinotQueryServerBlockingStub
_blockingStub;
public GrpcQueryClient(String host, int port) {
- this(host, port, new Config());
+ this(host, port, new GrpcConfig(Collections.emptyMap()));
}
- public GrpcQueryClient(String host, int port, Config config) {
+ public GrpcQueryClient(String host, int port, GrpcConfig config) {
if (config.isUsePlainText()) {
_managedChannel =
ManagedChannelBuilder.forAddress(host,
port).maxInboundMessageSize(config.getMaxInboundMessageSizeBytes())
@@ -99,55 +97,4 @@ public class GrpcQueryClient {
}
}
}
-
- public static class Config {
- public static final String GRPC_TLS_PREFIX = "tls";
- public static final String CONFIG_USE_PLAIN_TEXT = "usePlainText";
- public static final String CONFIG_MAX_INBOUND_MESSAGE_BYTES_SIZE =
"maxInboundMessageSizeBytes";
- // Default max message size to 128MB
- public static final int DEFAULT_MAX_INBOUND_MESSAGE_BYTES_SIZE = 128 *
1024 * 1024;
-
- private final int _maxInboundMessageSizeBytes;
- private final boolean _usePlainText;
- private final TlsConfig _tlsConfig;
- private final PinotConfiguration _pinotConfig;
-
- public Config() {
- this(DEFAULT_MAX_INBOUND_MESSAGE_BYTES_SIZE, true);
- }
-
- public Config(int maxInboundMessageSizeBytes, boolean usePlainText) {
- this(ImmutableMap.of(CONFIG_MAX_INBOUND_MESSAGE_BYTES_SIZE,
maxInboundMessageSizeBytes, CONFIG_USE_PLAIN_TEXT,
- usePlainText));
- }
-
- public Config(Map<String, Object> configMap) {
- _pinotConfig = new PinotConfiguration(configMap);
- _maxInboundMessageSizeBytes =
- _pinotConfig.getProperty(CONFIG_MAX_INBOUND_MESSAGE_BYTES_SIZE,
DEFAULT_MAX_INBOUND_MESSAGE_BYTES_SIZE);
- _usePlainText =
Boolean.valueOf(configMap.get(CONFIG_USE_PLAIN_TEXT).toString());
- _tlsConfig = TlsUtils.extractTlsConfig(_pinotConfig, GRPC_TLS_PREFIX);
- }
-
- // Allow get customized configs.
- public Object get(String key) {
- return _pinotConfig.getProperty(key);
- }
-
- public int getMaxInboundMessageSizeBytes() {
- return _maxInboundMessageSizeBytes;
- }
-
- public boolean isUsePlainText() {
- return _usePlainText;
- }
-
- public TlsConfig getTlsConfig() {
- return _tlsConfig;
- }
-
- public PinotConfiguration getPinotConfig() {
- return _pinotConfig;
- }
- }
}
diff --git
a/pinot-connectors/prestodb-pinot-dependencies/presto-pinot-driver/src/main/java/org/apache/pinot/connector/presto/grpc/PinotStreamingQueryClient.java
b/pinot-connectors/prestodb-pinot-dependencies/presto-pinot-driver/src/main/java/org/apache/pinot/connector/presto/grpc/PinotStreamingQueryClient.java
index a5658bba5d..f6a9ccceb3 100644
---
a/pinot-connectors/prestodb-pinot-dependencies/presto-pinot-driver/src/main/java/org/apache/pinot/connector/presto/grpc/PinotStreamingQueryClient.java
+++
b/pinot-connectors/prestodb-pinot-dependencies/presto-pinot-driver/src/main/java/org/apache/pinot/connector/presto/grpc/PinotStreamingQueryClient.java
@@ -21,6 +21,7 @@ package org.apache.pinot.connector.presto.grpc;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
+import org.apache.pinot.common.config.GrpcConfig;
import org.apache.pinot.common.proto.Server;
import org.apache.pinot.common.utils.grpc.GrpcQueryClient;
import org.apache.pinot.common.utils.grpc.GrpcRequestBuilder;
@@ -31,9 +32,9 @@ import org.apache.pinot.common.utils.grpc.GrpcRequestBuilder;
*/
public class PinotStreamingQueryClient {
private final Map<String, GrpcQueryClient> _grpcQueryClientMap = new
HashMap<>();
- private final GrpcQueryClient.Config _config;
+ private final GrpcConfig _config;
- public PinotStreamingQueryClient(GrpcQueryClient.Config config) {
+ public PinotStreamingQueryClient(GrpcConfig config) {
_config = config;
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/transport/grpc/GrpcQueryServer.java
b/pinot-core/src/main/java/org/apache/pinot/core/transport/grpc/GrpcQueryServer.java
index 6115244641..bf8f2aa1c9 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/transport/grpc/GrpcQueryServer.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/transport/grpc/GrpcQueryServer.java
@@ -31,6 +31,7 @@ import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import org.apache.pinot.common.config.GrpcConfig;
import org.apache.pinot.common.config.TlsConfig;
import org.apache.pinot.common.metrics.ServerMeter;
import org.apache.pinot.common.metrics.ServerMetrics;
@@ -60,13 +61,15 @@ public class GrpcQueryServer extends
PinotQueryServerGrpc.PinotQueryServerImplBa
Executors.newFixedThreadPool(ResourceManager.DEFAULT_QUERY_WORKER_THREADS);
private final AccessControl _accessControl;
- public GrpcQueryServer(int port, TlsConfig tlsConfig, QueryExecutor
queryExecutor, ServerMetrics serverMetrics,
- AccessControl accessControl) {
+ public GrpcQueryServer(int port, GrpcConfig config, TlsConfig tlsConfig,
QueryExecutor queryExecutor,
+ ServerMetrics serverMetrics, AccessControl accessControl) {
_queryExecutor = queryExecutor;
_serverMetrics = serverMetrics;
if (tlsConfig != null) {
try {
- _server =
NettyServerBuilder.forPort(port).sslContext(buildGRpcSslContext(tlsConfig)).addService(this).build();
+ _server =
NettyServerBuilder.forPort(port).sslContext(buildGRpcSslContext(tlsConfig))
+ .maxInboundMessageSize(config.getMaxInboundMessageSizeBytes())
+ .addService(this).build();
} catch (Exception e) {
throw new RuntimeException("Failed to start secure grpcQueryServer",
e);
}
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineSecureGRPCServerIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineSecureGRPCServerIntegrationTest.java
index 8df08e4fdf..48d6d3b3b7 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineSecureGRPCServerIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineSecureGRPCServerIntegrationTest.java
@@ -21,6 +21,7 @@ package org.apache.pinot.integration.tests;
import java.net.URL;
import java.util.HashMap;
import java.util.Map;
+import org.apache.pinot.common.config.GrpcConfig;
import org.apache.pinot.common.utils.grpc.GrpcQueryClient;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.utils.CommonConstants.Server;
@@ -56,7 +57,9 @@ public class OfflineSecureGRPCServerIntegrationTest extends
OfflineGRPCServerInt
configMap.put("tls.truststore.password", PASSWORD);
configMap.put("tls.truststore.type", JKS);
configMap.put("tls.ssl.provider", JDK);
- GrpcQueryClient.Config config = new GrpcQueryClient.Config(configMap);
+ PinotConfiguration brokerConfig = new PinotConfiguration(configMap);
+ // This mimics how pinot broker instantiates GRPCQueryClient.
+ GrpcConfig config = GrpcConfig.buildGrpcQueryConfig(brokerConfig);
return new GrpcQueryClient("localhost", Server.DEFAULT_GRPC_PORT, config);
}
}
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java
b/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java
index ec1a32b3bf..7ed0a57989 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java
@@ -22,6 +22,7 @@ import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.atomic.LongAccumulator;
import org.apache.helix.HelixManager;
+import org.apache.pinot.common.config.GrpcConfig;
import org.apache.pinot.common.config.NettyConfig;
import org.apache.pinot.common.config.TlsConfig;
import org.apache.pinot.common.function.FunctionRegistry;
@@ -132,7 +133,7 @@ public class ServerInstance {
if (serverConf.isEnableGrpcServer()) {
int grpcPort = serverConf.getGrpcPort();
LOGGER.info("Initializing gRPC query server on port: {}", grpcPort);
- _grpcQueryServer = new GrpcQueryServer(grpcPort,
+ _grpcQueryServer = new GrpcQueryServer(grpcPort,
GrpcConfig.buildGrpcQueryConfig(serverConf.getPinotConfig()),
serverConf.isGrpcTlsServerEnabled() ?
TlsUtils.extractTlsConfig(serverConf.getPinotConfig(),
CommonConstants.Server.SERVER_GRPCTLS_PREFIX) : null,
_queryExecutor, _serverMetrics, _accessControl);
} else {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]