Copilot commented on code in PR #17291: URL: https://github.com/apache/pinot/pull/17291#discussion_r2745017784
########## pinot-plugins/pinot-system-table/src/main/java/org/apache/pinot/systemtable/provider/TablesSystemTableProvider.java: ########## @@ -0,0 +1,662 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.systemtable.provider; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.JsonNode; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Function; +import java.util.function.IntFunction; +import javax.annotation.Nullable; +import org.apache.helix.HelixAdmin; +import org.apache.pinot.client.admin.PinotAdminClient; +import org.apache.pinot.client.admin.PinotAdminTransport; +import org.apache.pinot.common.config.provider.TableCache; +import org.apache.pinot.common.systemtable.SystemTable; +import org.apache.pinot.common.systemtable.SystemTableProvider; +import org.apache.pinot.common.systemtable.datasource.InMemorySystemTableSegment; +import org.apache.pinot.segment.spi.IndexSegment; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.utils.CommonConstants; +import org.apache.pinot.spi.utils.JsonUtils; +import org.apache.pinot.spi.utils.builder.TableConfigBuilder; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Basic system table exposing table-level metadata populated from the broker {@link TableCache}. + */ +@SystemTable +public final class TablesSystemTableProvider implements SystemTableProvider { + private static final Logger LOGGER = LoggerFactory.getLogger(TablesSystemTableProvider.class); + public static final String TABLE_NAME = "system.tables"; + private static final String SIZE_CACHE_TTL_MS_PROPERTY = "pinot.systemtable.tables.sizeCacheTtlMs"; + private static final long DEFAULT_SIZE_CACHE_TTL_MS = Duration.ofMinutes(1).toMillis(); + private static final long SIZE_CACHE_TTL_MS = getNonNegativeLongProperty(SIZE_CACHE_TTL_MS_PROPERTY, + DEFAULT_SIZE_CACHE_TTL_MS); + + private static final String CONTROLLER_TIMEOUT_MS_PROPERTY = "pinot.systemtable.tables.controllerTimeoutMs"; + private static final long DEFAULT_CONTROLLER_TIMEOUT_MS = Duration.ofSeconds(5).toMillis(); + private static final long CONTROLLER_TIMEOUT_MS = getPositiveLongProperty(CONTROLLER_TIMEOUT_MS_PROPERTY, + DEFAULT_CONTROLLER_TIMEOUT_MS); + + private static final long SIZE_FETCH_FAILURE_WARN_INTERVAL_MS = Duration.ofHours(1).toMillis(); + + private static final Schema SCHEMA = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME) + .addSingleValueDimension("tableName", FieldSpec.DataType.STRING) + .addSingleValueDimension("type", FieldSpec.DataType.STRING) + .addSingleValueDimension("status", FieldSpec.DataType.STRING) + .addSingleValueDimension("segments", FieldSpec.DataType.INT) + .addSingleValueDimension("totalDocs", FieldSpec.DataType.LONG) + .addMetric("reportedSize", FieldSpec.DataType.LONG) + .addMetric("estimatedSize", FieldSpec.DataType.LONG) + .addSingleValueDimension("brokerTenant", FieldSpec.DataType.STRING) + .addSingleValueDimension("serverTenant", FieldSpec.DataType.STRING) + .addSingleValueDimension("replicas", FieldSpec.DataType.INT) + .addSingleValueDimension("tableConfig", FieldSpec.DataType.STRING) + .build(); + + private final TableCache _tableCache; + private final @Nullable HelixAdmin _helixAdmin; + private final @Nullable String _clusterName; + private final @Nullable Function<String, TableSize> _tableSizeFetcherOverride; + private final List<String> _configuredControllerUrls; + private final Map<String, CachedSize> _sizeCache = new ConcurrentHashMap<>(); + private final Map<String, PinotAdminClient> _adminClientCache = new ConcurrentHashMap<>(); + private final AtomicLong _lastSizeFetchFailureWarnLogMs = new AtomicLong(); + + public TablesSystemTableProvider() { + this(null, null, null, null, null); + } + + public TablesSystemTableProvider(TableCache tableCache) { + this(tableCache, null, null, null, null); + } + + public TablesSystemTableProvider(TableCache tableCache, @Nullable HelixAdmin helixAdmin) { + this(tableCache, helixAdmin, null, null, null); + } + + public TablesSystemTableProvider(TableCache tableCache, @Nullable HelixAdmin helixAdmin, + @Nullable String clusterName) { + this(tableCache, helixAdmin, clusterName, null, null); + } + + TablesSystemTableProvider(TableCache tableCache, @Nullable HelixAdmin helixAdmin, @Nullable String clusterName, + @Nullable Function<String, TableSize> tableSizeFetcherOverride, @Nullable List<String> controllerUrls) { + _tableCache = tableCache; + _helixAdmin = helixAdmin; + _clusterName = clusterName; + _tableSizeFetcherOverride = tableSizeFetcherOverride; + _configuredControllerUrls = controllerUrls != null ? new ArrayList<>(controllerUrls) : List.of(); + } + + @Override + public String getTableName() { + return TABLE_NAME; + } + + @Override + public Schema getSchema() { + return SCHEMA; + } + + @Override + public TableConfig getTableConfig() { + return new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).build(); + } + + @Override + public void close() + throws Exception { + for (Map.Entry<String, PinotAdminClient> entry : _adminClientCache.entrySet()) { + try { + entry.getValue().close(); + } catch (Exception e) { + LOGGER.debug("Failed to close admin client for {}: {}", entry.getKey(), e.toString()); + } + } + _adminClientCache.clear(); + } + + @Override + public IndexSegment getDataSource() { + if (_tableCache == null) { + return new InMemorySystemTableSegment(TABLE_NAME, SCHEMA, 0, Collections.emptyMap()); + } + + Set<String> tableNamesWithType = new LinkedHashSet<>(); + for (String tableName : _tableCache.getTableNameMap().values()) { + if (TableNameBuilder.getTableTypeFromTableName(tableName) != null) { + tableNamesWithType.add(tableName); + } + } + List<String> sortedTableNames = new ArrayList<>(tableNamesWithType); + sortedTableNames.sort(Comparator.naturalOrder()); + + List<String> controllerBaseUrls = getControllerBaseUrls(); + Function<String, TableSize> sizeFetcher = getSizeFetcher(); + class TableRow { + final String _tableNameWithType; + final TableType _tableType; + final String _rawTableName; + final @Nullable TableConfig _tableConfig; + private volatile @Nullable String _tableConfigJson; + private volatile @Nullable TableSize _tableSize; Review Comment: The volatile keyword combined with lazy initialization and synchronized blocks creates complex state management. Consider using AtomicReference or compute these values once during TableRow construction to simplify thread-safety and improve readability. ########## pinot-common/src/main/java/org/apache/pinot/common/systemtable/SystemTableRegistry.java: ########## @@ -0,0 +1,202 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.common.systemtable; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.IdentityHashMap; +import java.util.Locale; +import java.util.Map; +import java.util.Set; +import javax.annotation.Nullable; +import org.apache.helix.HelixAdmin; +import org.apache.pinot.common.config.provider.TableCache; +import org.apache.pinot.spi.env.PinotConfiguration; +import org.apache.pinot.spi.utils.PinotReflectionUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Registry to hold and lifecycle-manage system table data providers. + */ +public final class SystemTableRegistry { + private static final Logger LOGGER = LoggerFactory.getLogger(SystemTableRegistry.class); + + // Providers are registered once at broker startup. + private static final Map<String, SystemTableProvider> PROVIDERS = new HashMap<>(); + + private SystemTableRegistry() { + throw new UnsupportedOperationException("This is a utility class and cannot be instantiated"); + } + + public static void register(SystemTableProvider provider) { + synchronized (PROVIDERS) { + PROVIDERS.put(normalize(provider.getTableName()), provider); + } + } + + @Nullable + public static SystemTableProvider get(String tableName) { + synchronized (PROVIDERS) { + return PROVIDERS.get(normalize(tableName)); + } + } + + public static boolean isRegistered(String tableName) { + synchronized (PROVIDERS) { + return PROVIDERS.containsKey(normalize(tableName)); + } + } + + public static Collection<SystemTableProvider> getProviders() { + synchronized (PROVIDERS) { + return Collections.unmodifiableCollection(new java.util.ArrayList<>(PROVIDERS.values())); + } + } + + /** + * Discover and register providers marked with {@link SystemTable} using the available dependencies. + * Follows the ScalarFunction pattern: any class annotated with @SystemTable under a "*.systemtable.*" package + * will be discovered via reflection and registered. + */ + public static void registerAnnotatedProviders(TableCache tableCache, HelixAdmin helixAdmin, String clusterName, + @Nullable PinotConfiguration config) { + Set<Class<?>> classes = + PinotReflectionUtils.getClassesThroughReflection(".*\\.systemtable\\..*", SystemTable.class); + for (Class<?> clazz : classes) { + if (!SystemTableProvider.class.isAssignableFrom(clazz)) { + continue; + } + SystemTableProvider provider = + instantiateProvider(clazz.asSubclass(SystemTableProvider.class), tableCache, helixAdmin, clusterName, config); + if (provider == null) { + continue; + } + if (isRegistered(provider.getTableName())) { + continue; + } + LOGGER.info("Registering system table provider: {}", provider.getTableName()); + register(provider); + } + } + + public static void close() + throws Exception { + Exception firstException = null; + // Snapshot providers to avoid concurrent modifications and to close each provider at most once. + Map<SystemTableProvider, Boolean> providersToClose = new IdentityHashMap<>(); + synchronized (PROVIDERS) { + for (SystemTableProvider provider : PROVIDERS.values()) { + providersToClose.put(provider, Boolean.TRUE); + } + } + try { + for (SystemTableProvider provider : providersToClose.keySet()) { + try { + provider.close(); + } catch (Exception e) { + if (firstException == null) { + firstException = e; + } else { + firstException.addSuppressed(e); + } + } + } + } finally { + synchronized (PROVIDERS) { + PROVIDERS.clear(); + } + } + if (firstException != null) { + throw firstException; + } + } + + /** + * Initialize and register all annotated system table providers. + */ + public static void init(TableCache tableCache, HelixAdmin helixAdmin, String clusterName) { + init(tableCache, helixAdmin, clusterName, null); + } + + /** + * Initialize and register all annotated system table providers. + */ + public static void init(TableCache tableCache, HelixAdmin helixAdmin, String clusterName, + @Nullable PinotConfiguration config) { + registerAnnotatedProviders(tableCache, helixAdmin, clusterName, config); + } + + private static String normalize(String tableName) { + return tableName.toLowerCase(Locale.ROOT); + } + + @Nullable + private static SystemTableProvider instantiateProvider(Class<? extends SystemTableProvider> clazz, + TableCache tableCache, HelixAdmin helixAdmin, String clusterName, @Nullable PinotConfiguration config) { + try { + // Prefer the most specific constructor available. + // System table providers may declare any of the following constructors, in decreasing order of specificity: + // (TableCache, HelixAdmin, String, PinotConfiguration), (TableCache, HelixAdmin, String), + // (TableCache, HelixAdmin), (TableCache), (PinotConfiguration), or a no-arg constructor. + // The registry will select the most specific available constructor so providers can opt in to the dependencies + // they need without forcing all implementations to depend on Helix or cluster metadata. + class ConstructorSpec { + final Class<?>[] _paramTypes; + final Object[] _args; Review Comment: The nested `ConstructorSpec` class is defined inside a method, making the method logic harder to follow. Consider extracting this as a private static class at the class level to improve readability and maintainability. ########## pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SystemTableIntegrationTest.java: ########## @@ -0,0 +1,130 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.integration.tests; + +import com.fasterxml.jackson.databind.JsonNode; +import java.io.File; +import java.net.URI; +import java.util.Iterator; +import java.util.List; +import org.apache.pinot.client.admin.PinotAdminClient; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.env.PinotConfiguration; +import org.apache.pinot.spi.utils.CommonConstants; +import org.apache.pinot.spi.utils.JsonUtils; +import org.apache.pinot.util.TestUtils; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; + + +/** + * Integration test to ensure system tables can be queried through the broker. + */ +public class SystemTableIntegrationTest extends BaseClusterIntegrationTestSet { + + @Override + protected void overrideBrokerConf(PinotConfiguration brokerConf) { + // Provide controller URL so system.tables provider can call size API. + brokerConf.setProperty(CommonConstants.Broker.CONTROLLER_URL, getControllerBaseApiUrl()); + } + + @BeforeClass + public void setUp() + throws Exception { + TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir); + + startZk(); + startController(); + startBroker(); + startServer(); + + Schema schema = createSchema(); + addSchema(schema); + TableConfig tableConfig = createOfflineTableConfig(); + addTableConfig(tableConfig); + + List<File> avroFiles = unpackAvroData(_tempDir); + ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles, tableConfig, schema, 0, _segmentDir, _tarDir); + uploadSegments(getTableName(), _tarDir); + waitForAllDocsLoaded(600_000L); + } + + @AfterClass + public void tearDown() + throws Exception { + stopServer(); + stopBroker(); + stopController(); + stopZk(); + } + + @Test + public void testSystemTablesSize() + throws Exception { + JsonNode controllerSize = JsonUtils.stringToJsonNode( + sendGetRequest(getControllerBaseApiUrl() + "/tables/mytable/size?verbose=true&includeReplacedSegments=false")); Review Comment: URL construction using string concatenation is fragile. Consider using `UriBuilder` or similar utility to construct URLs with proper encoding and parameter handling. ```suggestion URI controllerBaseUri = URI.create(getControllerBaseApiUrl()); URI sizeUri = controllerBaseUri.resolve("/tables/mytable/size?verbose=true&includeReplacedSegments=false"); JsonNode controllerSize = JsonUtils.stringToJsonNode(sendGetRequest(sizeUri.toString())); ``` ########## pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SystemTableBrokerRequestHandler.java: ########## @@ -0,0 +1,554 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.broker.requesthandler; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.annotations.VisibleForTesting; +import java.net.URI; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.OptionalLong; +import java.util.Set; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import javax.annotation.Nullable; +import javax.ws.rs.core.HttpHeaders; +import javax.ws.rs.core.MediaType; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hc.client5.http.classic.methods.HttpPost; +import org.apache.hc.client5.http.config.RequestConfig; +import org.apache.hc.client5.http.impl.classic.CloseableHttpClient; +import org.apache.hc.client5.http.impl.classic.CloseableHttpResponse; +import org.apache.hc.client5.http.impl.classic.HttpClients; +import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManager; +import org.apache.hc.client5.http.io.HttpClientConnectionManager; +import org.apache.hc.core5.http.io.entity.EntityUtils; +import org.apache.hc.core5.http.io.entity.StringEntity; +import org.apache.hc.core5.util.Timeout; +import org.apache.helix.HelixDataAccessor; +import org.apache.helix.HelixManager; +import org.apache.helix.model.InstanceConfig; +import org.apache.pinot.broker.api.AccessControl; +import org.apache.pinot.broker.broker.AccessControlFactory; +import org.apache.pinot.broker.querylog.QueryLogger; +import org.apache.pinot.broker.queryquota.QueryQuotaManager; +import org.apache.pinot.common.config.provider.TableCache; +import org.apache.pinot.common.datatable.DataTable; +import org.apache.pinot.common.datatable.DataTableFactory; +import org.apache.pinot.common.datatable.DataTableImplV4; +import org.apache.pinot.common.http.PoolingHttpClientConnectionManagerHelper; +import org.apache.pinot.common.metrics.BrokerMeter; +import org.apache.pinot.common.request.BrokerRequest; +import org.apache.pinot.common.request.PinotQuery; +import org.apache.pinot.common.request.QuerySource; +import org.apache.pinot.common.response.BrokerResponse; +import org.apache.pinot.common.response.broker.BrokerResponseNative; +import org.apache.pinot.common.systemtable.SystemTableProvider; +import org.apache.pinot.common.systemtable.SystemTableRegistry; +import org.apache.pinot.common.utils.NamedThreadFactory; +import org.apache.pinot.common.utils.config.InstanceUtils; +import org.apache.pinot.common.utils.config.QueryOptionsUtils; +import org.apache.pinot.common.utils.request.RequestUtils; +import org.apache.pinot.core.operator.blocks.InstanceResponseBlock; +import org.apache.pinot.core.plan.Plan; +import org.apache.pinot.core.plan.maker.InstancePlanMakerImplV2; +import org.apache.pinot.core.plan.maker.PlanMaker; +import org.apache.pinot.core.query.reduce.BrokerReduceService; +import org.apache.pinot.core.query.request.context.QueryContext; +import org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils; +import org.apache.pinot.core.routing.MultiClusterRoutingContext; +import org.apache.pinot.core.routing.RoutingManager; +import org.apache.pinot.core.transport.ServerRoutingInstance; +import org.apache.pinot.segment.spi.IndexSegment; +import org.apache.pinot.segment.spi.SegmentContext; +import org.apache.pinot.spi.accounting.ThreadAccountant; +import org.apache.pinot.spi.auth.AuthorizationResult; +import org.apache.pinot.spi.auth.broker.RequesterIdentity; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.env.PinotConfiguration; +import org.apache.pinot.spi.exception.BadQueryRequestException; +import org.apache.pinot.spi.exception.QueryErrorCode; +import org.apache.pinot.spi.query.QueryExecutionContext; +import org.apache.pinot.spi.query.QueryThreadContext; +import org.apache.pinot.spi.trace.RequestContext; +import org.apache.pinot.spi.utils.CommonConstants; +import org.apache.pinot.spi.utils.InstanceTypeUtils; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; +import org.apache.pinot.sql.parsers.CalciteSqlParser; +import org.apache.pinot.sql.parsers.SqlNodeAndOptions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Broker request handler for system tables (handled entirely on the broker). + */ +public class SystemTableBrokerRequestHandler extends BaseBrokerRequestHandler { + private static final Logger LOGGER = LoggerFactory.getLogger(SystemTableBrokerRequestHandler.class); + private static final String SYSTEM_TABLE_PSEUDO_HOST = "localhost"; + private static final int SYSTEM_TABLE_PSEUDO_PORT = 0; + private static final String SYSTEM_TABLE_DATATABLE_API_PATH = "/query/systemTable/datatable"; + private static final Set<String> HOP_BY_HOP_HEADERS_TO_SKIP = + Set.of("connection", "content-length", "transfer-encoding", "host"); + + private final BrokerReduceService _brokerReduceService; + private final PlanMaker _planMaker; + private final ExecutorService _executorService; + private final ExecutorService _scatterGatherExecutorService; + private final PoolingHttpClientConnectionManager _scatterGatherConnMgr; + private final CloseableHttpClient _scatterGatherHttpClient; + @Nullable + private final HelixManager _helixManager; + + public SystemTableBrokerRequestHandler(PinotConfiguration config, String brokerId, + BrokerRequestIdGenerator requestIdGenerator, RoutingManager routingManager, + AccessControlFactory accessControlFactory, QueryQuotaManager queryQuotaManager, TableCache tableCache, + ThreadAccountant threadAccountant, @Nullable MultiClusterRoutingContext multiClusterRoutingContext, + @Nullable HelixManager helixManager) { + super(config, brokerId, requestIdGenerator, routingManager, accessControlFactory, queryQuotaManager, tableCache, + threadAccountant, multiClusterRoutingContext); + _brokerReduceService = new BrokerReduceService(_config); + _planMaker = new InstancePlanMakerImplV2(); + _planMaker.init(_config); + _helixManager = helixManager; + int executorPoolSize = config.getProperty(CommonConstants.Broker.CONFIG_OF_SYSTEM_TABLE_EXECUTOR_POOL_SIZE, + CommonConstants.Broker.DEFAULT_SYSTEM_TABLE_EXECUTOR_POOL_SIZE); + executorPoolSize = Math.max(1, executorPoolSize); + _executorService = QueryThreadContext.contextAwareExecutorService(Executors.newFixedThreadPool(executorPoolSize, + new NamedThreadFactory("system-table-query-executor"))); + _scatterGatherExecutorService = + QueryThreadContext.contextAwareExecutorService(Executors.newFixedThreadPool(executorPoolSize, + new NamedThreadFactory("system-table-scatter-gather-executor"))); + _scatterGatherConnMgr = PoolingHttpClientConnectionManagerHelper.createWithSocketFactory(); + Timeout timeout = Timeout.of(_brokerTimeoutMs, TimeUnit.MILLISECONDS); + RequestConfig defaultRequestConfig = + RequestConfig.custom().setConnectionRequestTimeout(timeout).setResponseTimeout(timeout).build(); + _scatterGatherHttpClient = + HttpClients.custom().setConnectionManager(_scatterGatherConnMgr).setDefaultRequestConfig(defaultRequestConfig) + .build(); + } + + @Override + public void start() { + } + + @Override + public void shutDown() { + _executorService.shutdownNow(); + _scatterGatherExecutorService.shutdownNow(); + try { + _scatterGatherHttpClient.close(); + } catch (Exception e) { + LOGGER.debug("Failed to close system table scatter-gather http client: {}", e.toString()); + } + try { + _scatterGatherConnMgr.close(); + } catch (Exception e) { + LOGGER.debug("Failed to close system table scatter-gather connection manager: {}", e.toString()); + } + _brokerReduceService.shutDown(); + } + + public boolean canHandle(String tableName) { + return isSystemTable(tableName) && SystemTableRegistry.isRegistered(tableName); + } + + @Override + protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndOptions sqlNodeAndOptions, + JsonNode request, @Nullable RequesterIdentity requesterIdentity, RequestContext requestContext, + @Nullable HttpHeaders httpHeaders, AccessControl accessControl) + throws Exception { + long startTimeMs = requestContext.getRequestArrivalTimeMillis(); + long deadlineMs = startTimeMs + _brokerTimeoutMs; + QueryExecutionContext executionContext = + new QueryExecutionContext(QueryExecutionContext.QueryType.STE, requestId, Long.toString(requestId), + QueryOptionsUtils.getWorkloadName(sqlNodeAndOptions.getOptions()), startTimeMs, deadlineMs, deadlineMs, + _brokerId, _brokerId, org.apache.pinot.spi.utils.CommonConstants.Broker.DEFAULT_QUERY_HASH); + try (QueryThreadContext ignore = QueryThreadContext.open(executionContext, _threadAccountant)) { + PinotQuery pinotQuery; + try { + pinotQuery = CalciteSqlParser.compileToPinotQuery(sqlNodeAndOptions); + } catch (Exception e) { + requestContext.setErrorCode(QueryErrorCode.SQL_PARSING); + return new BrokerResponseNative(QueryErrorCode.SQL_PARSING, e.getMessage()); + } + + Set<String> tableNames = RequestUtils.getTableNames(pinotQuery); + if (tableNames == null || tableNames.isEmpty()) { + requestContext.setErrorCode(QueryErrorCode.QUERY_VALIDATION); + return new BrokerResponseNative(QueryErrorCode.QUERY_VALIDATION, "Failed to extract table name"); + } + if (tableNames.size() != 1) { + requestContext.setErrorCode(QueryErrorCode.QUERY_VALIDATION); + return new BrokerResponseNative(QueryErrorCode.QUERY_VALIDATION, "System tables do not support joins"); + } + String tableName = tableNames.iterator().next(); + if (!isSystemTable(tableName)) { + requestContext.setErrorCode(QueryErrorCode.QUERY_VALIDATION); + return new BrokerResponseNative(QueryErrorCode.QUERY_VALIDATION, "Not a system table query"); + } + AuthorizationResult authorizationResult = + hasTableAccess(requesterIdentity, Set.of(tableName), requestContext, httpHeaders); + if (!authorizationResult.hasAccess()) { + requestContext.setErrorCode(QueryErrorCode.ACCESS_DENIED); + return new BrokerResponseNative(QueryErrorCode.ACCESS_DENIED, authorizationResult.getFailureMessage()); + } + + boolean queryWasLogged = _queryLogger.logQueryReceived(requestId, query); + return handleSystemTableQuery(request, pinotQuery, tableName, requestContext, requesterIdentity, query, + httpHeaders, queryWasLogged); + } + } + + @Override + protected boolean handleCancel(long queryId, int timeoutMs, Executor executor, + HttpClientConnectionManager connMgr, Map<String, Integer> serverResponses) { + return false; + } + + @Override + public boolean cancelQueryByClientId(String clientQueryId, int timeoutMs, Executor executor, + HttpClientConnectionManager connMgr, Map<String, Integer> serverResponses) + throws Exception { + return false; + } + + @Override + public Map<Long, String> getRunningQueries() { + return Collections.emptyMap(); + } + + @Override + public OptionalLong getRequestIdByClientId(String clientQueryId) { + return OptionalLong.empty(); + } + + private boolean isSystemTable(String tableName) { + return tableName != null && tableName.toLowerCase(Locale.ROOT).startsWith("system."); + } + + /** + * Executes a system table query against the local broker and returns the raw {@link DataTable} results. + * <p> + * This method is used by the internal broker-to-broker scatter-gather endpoint and must never perform fanout. + */ + public DataTable handleSystemTableDataTableRequest(JsonNode request, @Nullable RequesterIdentity requesterIdentity, + RequestContext requestContext, @Nullable HttpHeaders httpHeaders) { + long startTimeMs = requestContext.getRequestArrivalTimeMillis(); + if (startTimeMs <= 0) { + startTimeMs = System.currentTimeMillis(); + requestContext.setRequestArrivalTimeMillis(startTimeMs); + } + long requestId = _requestIdGenerator.get(); + long deadlineMs = startTimeMs + _brokerTimeoutMs; + + JsonNode sql = request.get(CommonConstants.Broker.Request.SQL); + if (sql == null || !sql.isTextual()) { + return exceptionDataTable(QueryErrorCode.JSON_PARSING, "Failed to find 'sql' in the request: " + request); + } + String query = sql.textValue(); + requestContext.setQuery(query); + + SqlNodeAndOptions sqlNodeAndOptions; + try { + sqlNodeAndOptions = RequestUtils.parseQuery(query, request); + } catch (Exception e) { + requestContext.setErrorCode(QueryErrorCode.SQL_PARSING); + return exceptionDataTable(QueryErrorCode.SQL_PARSING, e.getMessage()); + } + + QueryExecutionContext executionContext = + new QueryExecutionContext(QueryExecutionContext.QueryType.STE, requestId, Long.toString(requestId), + QueryOptionsUtils.getWorkloadName(sqlNodeAndOptions.getOptions()), startTimeMs, deadlineMs, deadlineMs, + _brokerId, _brokerId, org.apache.pinot.spi.utils.CommonConstants.Broker.DEFAULT_QUERY_HASH); + try (QueryThreadContext ignore = QueryThreadContext.open(executionContext, _threadAccountant)) { + AccessControl accessControl = _accessControlFactory.create(); + AuthorizationResult authorizationResult = accessControl.authorize(requesterIdentity); + if (!authorizationResult.hasAccess()) { + requestContext.setErrorCode(QueryErrorCode.ACCESS_DENIED); + return exceptionDataTable(QueryErrorCode.ACCESS_DENIED, authorizationResult.getFailureMessage()); + } + + PinotQuery pinotQuery; + try { + pinotQuery = CalciteSqlParser.compileToPinotQuery(sqlNodeAndOptions); + } catch (Exception e) { + requestContext.setErrorCode(QueryErrorCode.SQL_PARSING); + return exceptionDataTable(QueryErrorCode.SQL_PARSING, e.getMessage()); + } + + Set<String> tableNames = RequestUtils.getTableNames(pinotQuery); + if (tableNames == null || tableNames.isEmpty()) { + requestContext.setErrorCode(QueryErrorCode.QUERY_VALIDATION); + return exceptionDataTable(QueryErrorCode.QUERY_VALIDATION, "Failed to extract table name"); + } + if (tableNames.size() != 1) { + requestContext.setErrorCode(QueryErrorCode.QUERY_VALIDATION); + return exceptionDataTable(QueryErrorCode.QUERY_VALIDATION, "System tables do not support joins"); + } + String tableName = tableNames.iterator().next(); + if (!isSystemTable(tableName)) { + requestContext.setErrorCode(QueryErrorCode.QUERY_VALIDATION); + return exceptionDataTable(QueryErrorCode.QUERY_VALIDATION, "Not a system table query"); + } + + AuthorizationResult tableAuthorizationResult = + hasTableAccess(requesterIdentity, Set.of(tableName), requestContext, httpHeaders); + if (!tableAuthorizationResult.hasAccess()) { + requestContext.setErrorCode(QueryErrorCode.ACCESS_DENIED); + return exceptionDataTable(QueryErrorCode.ACCESS_DENIED, tableAuthorizationResult.getFailureMessage()); + } + + SystemTableProvider provider = SystemTableRegistry.get(tableName); + if (provider == null) { + requestContext.setErrorCode(QueryErrorCode.TABLE_DOES_NOT_EXIST); + return exceptionDataTable(QueryErrorCode.TABLE_DOES_NOT_EXIST, "System table does not exist: " + tableName); + } + + try { + return executeLocalSystemTableQuery(pinotQuery, provider); + } catch (BadQueryRequestException e) { + requestContext.setErrorCode(QueryErrorCode.QUERY_VALIDATION); + _brokerMetrics.addMeteredGlobalValue(BrokerMeter.QUERY_VALIDATION_EXCEPTIONS, 1); + return exceptionDataTable(QueryErrorCode.QUERY_VALIDATION, e.getMessage()); + } catch (Exception e) { + LOGGER.warn("Caught exception while handling system table datatable query {}: {}", tableName, e.getMessage(), + e); + requestContext.setErrorCode(QueryErrorCode.QUERY_EXECUTION); + return exceptionDataTable(QueryErrorCode.QUERY_EXECUTION, e.getMessage()); + } + } + } + + private BrokerResponse handleSystemTableQuery(JsonNode request, PinotQuery pinotQuery, String tableName, + RequestContext requestContext, @Nullable RequesterIdentity requesterIdentity, String query, + @Nullable HttpHeaders httpHeaders, boolean queryWasLogged) { + if (pinotQuery.isExplain()) { + return BrokerResponseNative.BROKER_ONLY_EXPLAIN_PLAN_OUTPUT; + } + SystemTableProvider provider = SystemTableRegistry.get(tableName); + if (provider == null) { + requestContext.setErrorCode(QueryErrorCode.TABLE_DOES_NOT_EXIST); + return BrokerResponseNative.TABLE_DOES_NOT_EXIST; + } + try { + Map<ServerRoutingInstance, DataTable> dataTableMap; + if (provider.getExecutionMode() == SystemTableProvider.ExecutionMode.BROKER_SCATTER_GATHER) { + dataTableMap = scatterGatherSystemTableDataTables(provider, pinotQuery, tableName, request, httpHeaders); + } else { + dataTableMap = new HashMap<>(1); + // Use a synthetic routing instance for broker-local execution of system table queries. + dataTableMap.put(new ServerRoutingInstance(SYSTEM_TABLE_PSEUDO_HOST, SYSTEM_TABLE_PSEUDO_PORT, + TableType.OFFLINE), executeLocalSystemTableQuery(pinotQuery, provider)); + } + + BrokerResponseNative brokerResponse; + BrokerRequest brokerRequest = new BrokerRequest(); + QuerySource querySource = new QuerySource(); + querySource.setTableName(tableName); + brokerRequest.setQuerySource(querySource); + brokerRequest.setPinotQuery(pinotQuery); + brokerResponse = _brokerReduceService.reduceOnDataTable(brokerRequest, brokerRequest, dataTableMap, + _brokerTimeoutMs, _brokerMetrics); + brokerResponse.setTablesQueried(Set.of(TableNameBuilder.extractRawTableName(tableName))); + brokerResponse.setTimeUsedMs(System.currentTimeMillis() - requestContext.getRequestArrivalTimeMillis()); + _queryLogger.logQueryCompleted( + new QueryLogger.QueryLogParams(requestContext, tableName, brokerResponse, + QueryLogger.QueryLogParams.QueryEngine.SINGLE_STAGE, requesterIdentity, null), + queryWasLogged); + return brokerResponse; + } catch (BadQueryRequestException e) { + requestContext.setErrorCode(QueryErrorCode.QUERY_VALIDATION); + _brokerMetrics.addMeteredGlobalValue(BrokerMeter.QUERY_VALIDATION_EXCEPTIONS, 1); + return new BrokerResponseNative(QueryErrorCode.QUERY_VALIDATION, e.getMessage()); + } catch (Exception e) { + LOGGER.warn("Caught exception while handling system table query {}: {}", tableName, e.getMessage(), e); + requestContext.setErrorCode(QueryErrorCode.QUERY_EXECUTION); + return new BrokerResponseNative(QueryErrorCode.QUERY_EXECUTION, e.getMessage()); + } + } + + private DataTable executeLocalSystemTableQuery(PinotQuery pinotQuery, SystemTableProvider provider) + throws Exception { + IndexSegment dataSource = provider.getDataSource(); + try { + QueryContext queryContext = QueryContextConverterUtils.getQueryContext(pinotQuery); + queryContext.setSchema(provider.getSchema()); + queryContext.setEndTimeMs(System.currentTimeMillis() + _brokerTimeoutMs); + + // Pass null for serverMetrics because system table queries run broker-local against an in-memory IndexSegment. + Plan plan = _planMaker.makeInstancePlan(List.of(new SegmentContext(dataSource)), queryContext, _executorService, + null); + InstanceResponseBlock instanceResponse = plan.execute(); + return instanceResponse.toDataTable(); + } finally { + dataSource.destroy(); + } + } + + static final class BrokerTarget { Review Comment: The nested class `BrokerTarget` is marked as package-private (`static final`) but appears to only be used within `SystemTableBrokerRequestHandler`. Consider making it `private` to better encapsulate its usage scope. ```suggestion private static final class BrokerTarget { ``` ########## pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/admin/PinotSegmentApiClient.java: ########## @@ -52,20 +54,25 @@ public static class QueryParameters { private static final String SELECT_PATH = "/select"; private static final String METADATA_PATH = "/metadata"; + private static String encodePathSegment(String pathSegment) { + return URLEncoder.encode(pathSegment, StandardCharsets.UTF_8).replace("+", "%20"); Review Comment: The `encodePathSegment` method is duplicated in both `PinotSegmentApiClient` and `PinotSegmentAdminClient`. Consider extracting this to a shared utility class to avoid duplication. ########## pinot-plugins/pinot-system-table/src/main/java/org/apache/pinot/systemtable/provider/TablesSystemTableProvider.java: ########## @@ -0,0 +1,662 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.systemtable.provider; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.JsonNode; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Function; +import java.util.function.IntFunction; +import javax.annotation.Nullable; +import org.apache.helix.HelixAdmin; +import org.apache.pinot.client.admin.PinotAdminClient; +import org.apache.pinot.client.admin.PinotAdminTransport; +import org.apache.pinot.common.config.provider.TableCache; +import org.apache.pinot.common.systemtable.SystemTable; +import org.apache.pinot.common.systemtable.SystemTableProvider; +import org.apache.pinot.common.systemtable.datasource.InMemorySystemTableSegment; +import org.apache.pinot.segment.spi.IndexSegment; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.utils.CommonConstants; +import org.apache.pinot.spi.utils.JsonUtils; +import org.apache.pinot.spi.utils.builder.TableConfigBuilder; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Basic system table exposing table-level metadata populated from the broker {@link TableCache}. + */ +@SystemTable +public final class TablesSystemTableProvider implements SystemTableProvider { + private static final Logger LOGGER = LoggerFactory.getLogger(TablesSystemTableProvider.class); + public static final String TABLE_NAME = "system.tables"; + private static final String SIZE_CACHE_TTL_MS_PROPERTY = "pinot.systemtable.tables.sizeCacheTtlMs"; + private static final long DEFAULT_SIZE_CACHE_TTL_MS = Duration.ofMinutes(1).toMillis(); + private static final long SIZE_CACHE_TTL_MS = getNonNegativeLongProperty(SIZE_CACHE_TTL_MS_PROPERTY, + DEFAULT_SIZE_CACHE_TTL_MS); Review Comment: Corrected variable name from `SIZE_CACHE_TTL_MS` to match the property name pattern. The constant name should be `SIZE_CACHE_TTL_MS` but it's being assigned from `getNonNegativeLongProperty(SIZE_CACHE_TTL_MS_PROPERTY, DEFAULT_SIZE_CACHE_TTL_MS)`. ########## pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SystemTableBrokerRequestHandler.java: ########## @@ -0,0 +1,554 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.broker.requesthandler; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.annotations.VisibleForTesting; +import java.net.URI; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.OptionalLong; +import java.util.Set; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import javax.annotation.Nullable; +import javax.ws.rs.core.HttpHeaders; +import javax.ws.rs.core.MediaType; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hc.client5.http.classic.methods.HttpPost; +import org.apache.hc.client5.http.config.RequestConfig; +import org.apache.hc.client5.http.impl.classic.CloseableHttpClient; +import org.apache.hc.client5.http.impl.classic.CloseableHttpResponse; +import org.apache.hc.client5.http.impl.classic.HttpClients; +import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManager; +import org.apache.hc.client5.http.io.HttpClientConnectionManager; +import org.apache.hc.core5.http.io.entity.EntityUtils; +import org.apache.hc.core5.http.io.entity.StringEntity; +import org.apache.hc.core5.util.Timeout; +import org.apache.helix.HelixDataAccessor; +import org.apache.helix.HelixManager; +import org.apache.helix.model.InstanceConfig; +import org.apache.pinot.broker.api.AccessControl; +import org.apache.pinot.broker.broker.AccessControlFactory; +import org.apache.pinot.broker.querylog.QueryLogger; +import org.apache.pinot.broker.queryquota.QueryQuotaManager; +import org.apache.pinot.common.config.provider.TableCache; +import org.apache.pinot.common.datatable.DataTable; +import org.apache.pinot.common.datatable.DataTableFactory; +import org.apache.pinot.common.datatable.DataTableImplV4; +import org.apache.pinot.common.http.PoolingHttpClientConnectionManagerHelper; +import org.apache.pinot.common.metrics.BrokerMeter; +import org.apache.pinot.common.request.BrokerRequest; +import org.apache.pinot.common.request.PinotQuery; +import org.apache.pinot.common.request.QuerySource; +import org.apache.pinot.common.response.BrokerResponse; +import org.apache.pinot.common.response.broker.BrokerResponseNative; +import org.apache.pinot.common.systemtable.SystemTableProvider; +import org.apache.pinot.common.systemtable.SystemTableRegistry; +import org.apache.pinot.common.utils.NamedThreadFactory; +import org.apache.pinot.common.utils.config.InstanceUtils; +import org.apache.pinot.common.utils.config.QueryOptionsUtils; +import org.apache.pinot.common.utils.request.RequestUtils; +import org.apache.pinot.core.operator.blocks.InstanceResponseBlock; +import org.apache.pinot.core.plan.Plan; +import org.apache.pinot.core.plan.maker.InstancePlanMakerImplV2; +import org.apache.pinot.core.plan.maker.PlanMaker; +import org.apache.pinot.core.query.reduce.BrokerReduceService; +import org.apache.pinot.core.query.request.context.QueryContext; +import org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils; +import org.apache.pinot.core.routing.MultiClusterRoutingContext; +import org.apache.pinot.core.routing.RoutingManager; +import org.apache.pinot.core.transport.ServerRoutingInstance; +import org.apache.pinot.segment.spi.IndexSegment; +import org.apache.pinot.segment.spi.SegmentContext; +import org.apache.pinot.spi.accounting.ThreadAccountant; +import org.apache.pinot.spi.auth.AuthorizationResult; +import org.apache.pinot.spi.auth.broker.RequesterIdentity; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.env.PinotConfiguration; +import org.apache.pinot.spi.exception.BadQueryRequestException; +import org.apache.pinot.spi.exception.QueryErrorCode; +import org.apache.pinot.spi.query.QueryExecutionContext; +import org.apache.pinot.spi.query.QueryThreadContext; +import org.apache.pinot.spi.trace.RequestContext; +import org.apache.pinot.spi.utils.CommonConstants; +import org.apache.pinot.spi.utils.InstanceTypeUtils; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; +import org.apache.pinot.sql.parsers.CalciteSqlParser; +import org.apache.pinot.sql.parsers.SqlNodeAndOptions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Broker request handler for system tables (handled entirely on the broker). + */ +public class SystemTableBrokerRequestHandler extends BaseBrokerRequestHandler { + private static final Logger LOGGER = LoggerFactory.getLogger(SystemTableBrokerRequestHandler.class); + private static final String SYSTEM_TABLE_PSEUDO_HOST = "localhost"; + private static final int SYSTEM_TABLE_PSEUDO_PORT = 0; + private static final String SYSTEM_TABLE_DATATABLE_API_PATH = "/query/systemTable/datatable"; + private static final Set<String> HOP_BY_HOP_HEADERS_TO_SKIP = + Set.of("connection", "content-length", "transfer-encoding", "host"); Review Comment: The hop-by-hop headers list is incomplete. According to RFC 7230, headers like "keep-alive", "proxy-authenticate", "proxy-authorization", "te", "trailer", and "upgrade" should also be excluded. Consider adding these or documenting why they're intentionally omitted. ```suggestion // Hop-by-hop headers as defined in RFC 7230, Section 6.1, plus Host which is specific to each hop. private static final Set<String> HOP_BY_HOP_HEADERS_TO_SKIP = Set.of( "connection", "keep-alive", "proxy-authenticate", "proxy-authorization", "te", "trailer", "transfer-encoding", "upgrade", "host"); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
