Copilot commented on code in PR #17169: URL: https://github.com/apache/pinot/pull/17169#discussion_r2511531296
########## pinot-broker/src/main/java/org/apache/pinot/broker/querylog/QueryLogSystemTable.java: ########## @@ -0,0 +1,1512 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.broker.querylog; + +import com.google.common.annotations.VisibleForTesting; +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.EOFException; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardOpenOption; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.Deque; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Predicate; +import java.util.regex.Pattern; +import java.util.stream.Stream; +import javax.annotation.Nullable; +import org.apache.calcite.sql.SqlBasicCall; +import org.apache.calcite.sql.SqlIdentifier; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.SqlLiteral; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlNodeList; +import org.apache.calcite.sql.SqlNumericLiteral; +import org.apache.calcite.sql.SqlOrderBy; +import org.apache.calcite.sql.SqlSelect; +import org.apache.pinot.common.response.BrokerResponse; +import org.apache.pinot.common.response.broker.BrokerResponseNative; +import org.apache.pinot.common.response.broker.ResultTable; +import org.apache.pinot.common.utils.DataSchema; +import org.apache.pinot.common.utils.QueryLogSystemTableUtils; +import org.apache.pinot.spi.env.PinotConfiguration; +import org.apache.pinot.spi.exception.BadQueryRequestException; +import org.apache.pinot.spi.utils.CommonConstants; +import org.apache.pinot.sql.parsers.SqlNodeAndOptions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Keeps an in-memory copy of recent query log records and evaluates {@code SELECT} queries against it. + */ +public class QueryLogSystemTable { + private static final Logger LOGGER = LoggerFactory.getLogger(QueryLogSystemTable.class); + + private static final QueryLogSystemTable INSTANCE = new QueryLogSystemTable(); + + public static final String FULL_TABLE_NAME = QueryLogSystemTableUtils.FULL_TABLE_NAME; + + private final ReadWriteLock _lock = new ReentrantReadWriteLock(); + private final AtomicBoolean _initialized = new AtomicBoolean(false); + + private volatile boolean _enabled; + private volatile int _maxEntries; + private volatile long _retentionMs; + private volatile int _defaultLimit; + private volatile QueryLogStore _store; + private volatile String _storageType; + + private QueryLogSystemTable() { + } + + public static QueryLogSystemTable getInstance() { + return INSTANCE; + } + + public void initIfNeeded(PinotConfiguration config) { + if (_initialized.compareAndSet(false, true)) { + _enabled = config.getProperty(CommonConstants.Broker.CONFIG_OF_QUERY_LOG_SYSTEM_TABLE_ENABLED, + CommonConstants.Broker.DEFAULT_QUERY_LOG_SYSTEM_TABLE_ENABLED); + _maxEntries = config.getProperty(CommonConstants.Broker.CONFIG_OF_QUERY_LOG_SYSTEM_TABLE_MAX_ENTRIES, + CommonConstants.Broker.DEFAULT_QUERY_LOG_SYSTEM_TABLE_MAX_ENTRIES); + _retentionMs = config.getProperty(CommonConstants.Broker.CONFIG_OF_QUERY_LOG_SYSTEM_TABLE_RETENTION_MS, + CommonConstants.Broker.DEFAULT_QUERY_LOG_SYSTEM_TABLE_RETENTION_MS); + _defaultLimit = config.getProperty(CommonConstants.Broker.CONFIG_OF_QUERY_LOG_SYSTEM_TABLE_DEFAULT_LIMIT, + CommonConstants.Broker.DEFAULT_QUERY_LOG_SYSTEM_TABLE_DEFAULT_LIMIT); + _storageType = config.getProperty(CommonConstants.Broker.CONFIG_OF_QUERY_LOG_SYSTEM_TABLE_STORAGE, + CommonConstants.Broker.DEFAULT_QUERY_LOG_SYSTEM_TABLE_STORAGE); + if (_enabled) { + try { + if ("disk".equalsIgnoreCase(_storageType)) { + String directory = config.getProperty(CommonConstants.Broker.CONFIG_OF_QUERY_LOG_SYSTEM_TABLE_DISK_DIR, + CommonConstants.Broker.DEFAULT_QUERY_LOG_SYSTEM_TABLE_DISK_DIR); + long maxBytes = config.getProperty(CommonConstants.Broker.CONFIG_OF_QUERY_LOG_SYSTEM_TABLE_DISK_MAX_BYTES, + CommonConstants.Broker.DEFAULT_QUERY_LOG_SYSTEM_TABLE_DISK_MAX_BYTES); + long segmentBytes = config.getProperty( + CommonConstants.Broker.CONFIG_OF_QUERY_LOG_SYSTEM_TABLE_DISK_SEGMENT_BYTES, + CommonConstants.Broker.DEFAULT_QUERY_LOG_SYSTEM_TABLE_DISK_SEGMENT_BYTES); + segmentBytes = Math.min(segmentBytes, maxBytes); + _store = + new DiskBackedQueryLogStore(Paths.get(directory), maxBytes, segmentBytes, _maxEntries, _retentionMs); + } else { + _store = new InMemoryQueryLogStore(_maxEntries, _retentionMs); + } + } catch (IOException e) { + LOGGER.error("Failed to initialize query log system table storage", e); + _enabled = false; + _store = null; + } + } + LOGGER.info( + "Initialized query log system table: enabled={}, storage={}, maxEntries={}, retentionMs={}, defaultLimit={}", + _enabled, _storageType, _maxEntries, _retentionMs, _defaultLimit); + } + } + + public boolean isEnabled() { + return _enabled; + } + + public void append(QueryLogRecord record) { + if (!_enabled || record == null || _store == null) { + return; + } + _lock.writeLock().lock(); + try { + _store.append(record); + } catch (IOException e) { + LOGGER.warn("Failed to append query log record", e); + } finally { + _lock.writeLock().unlock(); + } + } + + @Nullable + public BrokerResponse handleIfSystemTable(SqlNodeAndOptions sqlNodeAndOptions) + throws BadQueryRequestException { + if (!_enabled) { + return null; + } + SqlNode sqlNode = sqlNodeAndOptions.getSqlNode(); + if (!QueryLogSystemTableUtils.isQueryLogSystemTableQuery(sqlNode)) { + return null; + } + ParsedQuery parsedQuery = parse(sqlNode); + if (parsedQuery == null) { + return null; + } + return execute(parsedQuery); + } + + private ParsedQuery parse(SqlNode sqlNode) + throws BadQueryRequestException { + SqlNode workingNode = sqlNode; + SqlNodeList outerOrderBy = null; + SqlNode outerOffset = null; + SqlNode outerFetch = null; + if (workingNode instanceof SqlOrderBy) { + SqlOrderBy orderBy = (SqlOrderBy) workingNode; + outerOrderBy = orderBy.orderList; + outerOffset = orderBy.offset; + outerFetch = orderBy.fetch; + workingNode = orderBy.query; + } + + if (!(workingNode instanceof SqlSelect)) { + return null; + } + SqlSelect select = (SqlSelect) workingNode; + if (select.getGroup() != null && select.getGroup().size() > 0) { + throw new BadQueryRequestException("GROUP BY is not supported for system.query_log"); + } + if (select.getHaving() != null) { + throw new BadQueryRequestException("HAVING is not supported for system.query_log"); + } + if (select.isDistinct()) { + throw new BadQueryRequestException("DISTINCT is not supported for system.query_log"); + } + + SqlNodeList selectList = select.getSelectList(); + SqlNode where = select.getWhere(); + SqlNodeList orderList = outerOrderBy != null ? outerOrderBy : select.getOrderList(); + SqlNode offset = outerOffset != null ? outerOffset : select.getOffset(); + SqlNode fetch = outerFetch != null ? outerFetch : select.getFetch(); + + return new ParsedQuery(selectList, where, orderList, offset, fetch); + } + + private BrokerResponse execute(ParsedQuery query) + throws BadQueryRequestException { + long startTimeMs = System.currentTimeMillis(); + List<SelectedColumn> selections = resolveSelectList(query._selectList); + Map<String, QueryLogColumn> aliasLookup = new HashMap<>(); + for (SelectedColumn selectedColumn : selections) { + aliasLookup.put(selectedColumn._outputName.toLowerCase(Locale.ROOT), selectedColumn._column); + } + + Predicate<QueryLogRecord> predicate = buildPredicate(query._whereClause, aliasLookup); + List<Ordering> orderings = parseOrderings(query._orderBy, aliasLookup, selections); + int offset = parseNonNegativeInt(query._offsetNode, "OFFSET", 0); + int limit = parseNonNegativeInt(query._fetchNode, "LIMIT", _defaultLimit); + + List<QueryLogRecord> rows; + try { + rows = getFilteredRecords(predicate); + } catch (IOException e) { + throw new BadQueryRequestException("Failed to read query log storage", e); + } + // If no ORDER BY clause is specified, default to ordering by timestampMs DESC. + Comparator<QueryLogRecord> comparator = orderings.isEmpty() + ? Comparator.comparingLong(QueryLogRecord::getLogTimestampMs).reversed() + : buildComparator(orderings); + rows.sort(comparator); + + int fromIndex = Math.min(offset, rows.size()); + int toIndex = limit < 0 ? rows.size() : Math.min(rows.size(), fromIndex + limit); + List<QueryLogRecord> window = rows.subList(fromIndex, toIndex); + + String[] columnNames = selections.stream().map(selection -> selection._outputName).toArray(String[]::new); + DataSchema.ColumnDataType[] columnDataTypes = selections.stream() + .map(selection -> selection._column._dataType).toArray(DataSchema.ColumnDataType[]::new); + + List<Object[]> tableRows = new ArrayList<>(window.size()); + for (QueryLogRecord record : window) { + Object[] row = new Object[selections.size()]; + for (int i = 0; i < selections.size(); i++) { + row[i] = selections.get(i)._column.extract(record); + } + tableRows.add(row); + } + + ResultTable resultTable = new ResultTable(new DataSchema(columnNames, columnDataTypes), tableRows); + BrokerResponseNative response = new BrokerResponseNative(); + response.setResultTable(resultTable); + response.setNumRowsResultSet(resultTable.getRows().size()); + response.setTablesQueried(Set.of(QueryLogSystemTableUtils.FULL_TABLE_NAME)); + response.setTimeUsedMs(System.currentTimeMillis() - startTimeMs); + return response; + } + + private List<QueryLogRecord> getFilteredRecords(@Nullable Predicate<QueryLogRecord> predicate) + throws IOException { + if (_store == null) { + return Collections.emptyList(); + } + _lock.readLock().lock(); + try { + List<QueryLogRecord> records = _store.getRecords(); + if (records.isEmpty()) { + return Collections.emptyList(); + } + long cutoff = _retentionMs > 0 ? System.currentTimeMillis() - _retentionMs : Long.MIN_VALUE; + List<QueryLogRecord> filtered = new ArrayList<>(records.size()); + for (QueryLogRecord record : records) { + if (record.getLogTimestampMs() < cutoff) { + continue; + } + filtered.add(record); + } + if (filtered.isEmpty()) { + return Collections.emptyList(); + } + if (_maxEntries > 0 && filtered.size() > _maxEntries) { + filtered = new ArrayList<>(filtered.subList(filtered.size() - _maxEntries, filtered.size())); + } else { + filtered = new ArrayList<>(filtered); + } + if (predicate != null) { + filtered.removeIf(record -> !predicate.test(record)); + } + return filtered; + } finally { + _lock.readLock().unlock(); + } + } + + private interface QueryLogStore extends AutoCloseable { + void append(QueryLogRecord record) + throws IOException; + + List<QueryLogRecord> getRecords() + throws IOException; + + @Override + void close() + throws IOException; + } + + private static final class InMemoryQueryLogStore implements QueryLogStore { + private final Deque<QueryLogRecord> _records = new ArrayDeque<>(); + private final int _maxEntries; + private final long _retentionMs; + + InMemoryQueryLogStore(int maxEntries, long retentionMs) { + _maxEntries = Math.max(1, maxEntries); + _retentionMs = retentionMs; + } + + @Override + public void append(QueryLogRecord record) { + if (_retentionMs > 0) { + long cutoff = record.getLogTimestampMs() - _retentionMs; + while (!_records.isEmpty() && _records.peekFirst().getLogTimestampMs() < cutoff) { + _records.removeFirst(); + } + } + _records.addLast(record); + while (_records.size() > Math.max(1, _maxEntries)) { Review Comment: The retention logic uses `Math.max(1, _maxEntries)` which means if `_maxEntries` is configured as 0 (to disable row-based retention), it will still keep at least 1 record. This conflicts with the intended behavior where `_maxEntries <= 0` should disable row-based pruning. Remove the `Math.max` wrapper and check `_maxEntries > 0` before the while loop instead. ########## pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryResource.java: ########## @@ -576,6 +648,44 @@ private StreamingOutput sendRequestToBroker(String query, String instanceId, Str return sendRequestRaw(url, "POST", query, requestJson, headers); } + private StreamingOutput aggregateQueryLogResponses(List<InstanceConfig> brokerInstanceConfigs, + ObjectNode requestJson, Map<String, String> headers) { + String protocol = _controllerConf.getControllerBrokerProtocol(); + String payload = requestJson.toString(); + return outputStream -> { + long startTime = System.currentTimeMillis(); + List<BrokerResponseNative> responses = new ArrayList<>(brokerInstanceConfigs.size()); + List<QueryProcessingException> fetchExceptions = new ArrayList<>(); + for (InstanceConfig brokerConfig : brokerInstanceConfigs) { + String hostName = getHost(brokerConfig); + int port = getPort(brokerConfig); + String url = getQueryURL(protocol, hostName, port); + try { + responses.add(fetchBrokerQueryLog(url, payload, headers)); + } catch (Exception e) { + fetchExceptions.add(new QueryProcessingException(QueryErrorCode.BROKER_REQUEST_SEND, + String.format("Failed to fetch query log from broker %s: %s", brokerConfig.getInstanceName(), + e.getMessage()))); + } + } + BrokerResponseNative aggregated = QueryLogResponseAggregator.aggregate(responses); + List<QueryProcessingException> allExceptions = new ArrayList<>(aggregated.getExceptions()); + allExceptions.addAll(fetchExceptions); + aggregated.setExceptions(allExceptions); + aggregated.setNumServersQueried(brokerInstanceConfigs.size()); + aggregated.setTimeUsedMs(System.currentTimeMillis() - startTime); + aggregated.setBrokerId("controller"); + aggregated.toOutputStream(outputStream); + }; + } + + private BrokerResponseNative fetchBrokerQueryLog(String url, String payload, Map<String, String> headers) + throws Exception { + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + sendRequestRaw(url, "POST", payload, headers, outputStream); Review Comment: The `sendRequestRaw` method signature expects five arguments (url, method, requestStr, requestJson, headers), but here it's being called with only five arguments where the fourth argument is `headers` (Map) and fifth is `outputStream` (OutputStream). This appears to be a method signature mismatch. Verify the correct `sendRequestRaw` overload is being used or if arguments are in the wrong order. ```suggestion ObjectNode requestJson = (ObjectNode) JsonUtils.stringToJsonNode(payload); StreamingOutput streamingOutput = sendRequestRaw(url, "POST", null, requestJson, headers); ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); streamingOutput.write(outputStream); ``` ########## pinot-broker/src/main/java/org/apache/pinot/broker/querylog/QueryLogSystemTable.java: ########## @@ -0,0 +1,1512 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.broker.querylog; + +import com.google.common.annotations.VisibleForTesting; +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.EOFException; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardOpenOption; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.Deque; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Predicate; +import java.util.regex.Pattern; +import java.util.stream.Stream; +import javax.annotation.Nullable; +import org.apache.calcite.sql.SqlBasicCall; +import org.apache.calcite.sql.SqlIdentifier; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.SqlLiteral; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlNodeList; +import org.apache.calcite.sql.SqlNumericLiteral; +import org.apache.calcite.sql.SqlOrderBy; +import org.apache.calcite.sql.SqlSelect; +import org.apache.pinot.common.response.BrokerResponse; +import org.apache.pinot.common.response.broker.BrokerResponseNative; +import org.apache.pinot.common.response.broker.ResultTable; +import org.apache.pinot.common.utils.DataSchema; +import org.apache.pinot.common.utils.QueryLogSystemTableUtils; +import org.apache.pinot.spi.env.PinotConfiguration; +import org.apache.pinot.spi.exception.BadQueryRequestException; +import org.apache.pinot.spi.utils.CommonConstants; +import org.apache.pinot.sql.parsers.SqlNodeAndOptions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Keeps an in-memory copy of recent query log records and evaluates {@code SELECT} queries against it. + */ +public class QueryLogSystemTable { + private static final Logger LOGGER = LoggerFactory.getLogger(QueryLogSystemTable.class); + + private static final QueryLogSystemTable INSTANCE = new QueryLogSystemTable(); + + public static final String FULL_TABLE_NAME = QueryLogSystemTableUtils.FULL_TABLE_NAME; + + private final ReadWriteLock _lock = new ReentrantReadWriteLock(); + private final AtomicBoolean _initialized = new AtomicBoolean(false); + + private volatile boolean _enabled; + private volatile int _maxEntries; + private volatile long _retentionMs; + private volatile int _defaultLimit; + private volatile QueryLogStore _store; + private volatile String _storageType; + + private QueryLogSystemTable() { + } + + public static QueryLogSystemTable getInstance() { + return INSTANCE; + } + + public void initIfNeeded(PinotConfiguration config) { + if (_initialized.compareAndSet(false, true)) { + _enabled = config.getProperty(CommonConstants.Broker.CONFIG_OF_QUERY_LOG_SYSTEM_TABLE_ENABLED, + CommonConstants.Broker.DEFAULT_QUERY_LOG_SYSTEM_TABLE_ENABLED); + _maxEntries = config.getProperty(CommonConstants.Broker.CONFIG_OF_QUERY_LOG_SYSTEM_TABLE_MAX_ENTRIES, + CommonConstants.Broker.DEFAULT_QUERY_LOG_SYSTEM_TABLE_MAX_ENTRIES); + _retentionMs = config.getProperty(CommonConstants.Broker.CONFIG_OF_QUERY_LOG_SYSTEM_TABLE_RETENTION_MS, + CommonConstants.Broker.DEFAULT_QUERY_LOG_SYSTEM_TABLE_RETENTION_MS); + _defaultLimit = config.getProperty(CommonConstants.Broker.CONFIG_OF_QUERY_LOG_SYSTEM_TABLE_DEFAULT_LIMIT, + CommonConstants.Broker.DEFAULT_QUERY_LOG_SYSTEM_TABLE_DEFAULT_LIMIT); + _storageType = config.getProperty(CommonConstants.Broker.CONFIG_OF_QUERY_LOG_SYSTEM_TABLE_STORAGE, + CommonConstants.Broker.DEFAULT_QUERY_LOG_SYSTEM_TABLE_STORAGE); + if (_enabled) { + try { + if ("disk".equalsIgnoreCase(_storageType)) { + String directory = config.getProperty(CommonConstants.Broker.CONFIG_OF_QUERY_LOG_SYSTEM_TABLE_DISK_DIR, + CommonConstants.Broker.DEFAULT_QUERY_LOG_SYSTEM_TABLE_DISK_DIR); + long maxBytes = config.getProperty(CommonConstants.Broker.CONFIG_OF_QUERY_LOG_SYSTEM_TABLE_DISK_MAX_BYTES, + CommonConstants.Broker.DEFAULT_QUERY_LOG_SYSTEM_TABLE_DISK_MAX_BYTES); + long segmentBytes = config.getProperty( + CommonConstants.Broker.CONFIG_OF_QUERY_LOG_SYSTEM_TABLE_DISK_SEGMENT_BYTES, + CommonConstants.Broker.DEFAULT_QUERY_LOG_SYSTEM_TABLE_DISK_SEGMENT_BYTES); + segmentBytes = Math.min(segmentBytes, maxBytes); + _store = + new DiskBackedQueryLogStore(Paths.get(directory), maxBytes, segmentBytes, _maxEntries, _retentionMs); + } else { + _store = new InMemoryQueryLogStore(_maxEntries, _retentionMs); + } + } catch (IOException e) { + LOGGER.error("Failed to initialize query log system table storage", e); + _enabled = false; + _store = null; + } + } + LOGGER.info( + "Initialized query log system table: enabled={}, storage={}, maxEntries={}, retentionMs={}, defaultLimit={}", + _enabled, _storageType, _maxEntries, _retentionMs, _defaultLimit); + } + } + + public boolean isEnabled() { + return _enabled; + } + + public void append(QueryLogRecord record) { + if (!_enabled || record == null || _store == null) { + return; + } + _lock.writeLock().lock(); + try { + _store.append(record); + } catch (IOException e) { + LOGGER.warn("Failed to append query log record", e); + } finally { + _lock.writeLock().unlock(); + } + } + + @Nullable + public BrokerResponse handleIfSystemTable(SqlNodeAndOptions sqlNodeAndOptions) + throws BadQueryRequestException { + if (!_enabled) { + return null; + } + SqlNode sqlNode = sqlNodeAndOptions.getSqlNode(); + if (!QueryLogSystemTableUtils.isQueryLogSystemTableQuery(sqlNode)) { + return null; + } + ParsedQuery parsedQuery = parse(sqlNode); + if (parsedQuery == null) { + return null; + } + return execute(parsedQuery); + } + + private ParsedQuery parse(SqlNode sqlNode) + throws BadQueryRequestException { + SqlNode workingNode = sqlNode; + SqlNodeList outerOrderBy = null; + SqlNode outerOffset = null; + SqlNode outerFetch = null; + if (workingNode instanceof SqlOrderBy) { + SqlOrderBy orderBy = (SqlOrderBy) workingNode; + outerOrderBy = orderBy.orderList; + outerOffset = orderBy.offset; + outerFetch = orderBy.fetch; + workingNode = orderBy.query; + } + + if (!(workingNode instanceof SqlSelect)) { + return null; + } + SqlSelect select = (SqlSelect) workingNode; + if (select.getGroup() != null && select.getGroup().size() > 0) { + throw new BadQueryRequestException("GROUP BY is not supported for system.query_log"); + } + if (select.getHaving() != null) { + throw new BadQueryRequestException("HAVING is not supported for system.query_log"); + } + if (select.isDistinct()) { + throw new BadQueryRequestException("DISTINCT is not supported for system.query_log"); + } + + SqlNodeList selectList = select.getSelectList(); + SqlNode where = select.getWhere(); + SqlNodeList orderList = outerOrderBy != null ? outerOrderBy : select.getOrderList(); + SqlNode offset = outerOffset != null ? outerOffset : select.getOffset(); + SqlNode fetch = outerFetch != null ? outerFetch : select.getFetch(); + + return new ParsedQuery(selectList, where, orderList, offset, fetch); + } + + private BrokerResponse execute(ParsedQuery query) + throws BadQueryRequestException { + long startTimeMs = System.currentTimeMillis(); + List<SelectedColumn> selections = resolveSelectList(query._selectList); + Map<String, QueryLogColumn> aliasLookup = new HashMap<>(); + for (SelectedColumn selectedColumn : selections) { + aliasLookup.put(selectedColumn._outputName.toLowerCase(Locale.ROOT), selectedColumn._column); + } + + Predicate<QueryLogRecord> predicate = buildPredicate(query._whereClause, aliasLookup); + List<Ordering> orderings = parseOrderings(query._orderBy, aliasLookup, selections); + int offset = parseNonNegativeInt(query._offsetNode, "OFFSET", 0); + int limit = parseNonNegativeInt(query._fetchNode, "LIMIT", _defaultLimit); + + List<QueryLogRecord> rows; + try { + rows = getFilteredRecords(predicate); + } catch (IOException e) { + throw new BadQueryRequestException("Failed to read query log storage", e); + } + // If no ORDER BY clause is specified, default to ordering by timestampMs DESC. + Comparator<QueryLogRecord> comparator = orderings.isEmpty() + ? Comparator.comparingLong(QueryLogRecord::getLogTimestampMs).reversed() + : buildComparator(orderings); + rows.sort(comparator); + + int fromIndex = Math.min(offset, rows.size()); + int toIndex = limit < 0 ? rows.size() : Math.min(rows.size(), fromIndex + limit); + List<QueryLogRecord> window = rows.subList(fromIndex, toIndex); + + String[] columnNames = selections.stream().map(selection -> selection._outputName).toArray(String[]::new); + DataSchema.ColumnDataType[] columnDataTypes = selections.stream() + .map(selection -> selection._column._dataType).toArray(DataSchema.ColumnDataType[]::new); + + List<Object[]> tableRows = new ArrayList<>(window.size()); + for (QueryLogRecord record : window) { + Object[] row = new Object[selections.size()]; + for (int i = 0; i < selections.size(); i++) { + row[i] = selections.get(i)._column.extract(record); + } + tableRows.add(row); + } + + ResultTable resultTable = new ResultTable(new DataSchema(columnNames, columnDataTypes), tableRows); + BrokerResponseNative response = new BrokerResponseNative(); + response.setResultTable(resultTable); + response.setNumRowsResultSet(resultTable.getRows().size()); + response.setTablesQueried(Set.of(QueryLogSystemTableUtils.FULL_TABLE_NAME)); + response.setTimeUsedMs(System.currentTimeMillis() - startTimeMs); + return response; + } + + private List<QueryLogRecord> getFilteredRecords(@Nullable Predicate<QueryLogRecord> predicate) + throws IOException { + if (_store == null) { + return Collections.emptyList(); + } + _lock.readLock().lock(); + try { + List<QueryLogRecord> records = _store.getRecords(); + if (records.isEmpty()) { + return Collections.emptyList(); + } + long cutoff = _retentionMs > 0 ? System.currentTimeMillis() - _retentionMs : Long.MIN_VALUE; + List<QueryLogRecord> filtered = new ArrayList<>(records.size()); + for (QueryLogRecord record : records) { + if (record.getLogTimestampMs() < cutoff) { + continue; + } + filtered.add(record); Review Comment: Time-based filtering is performed in-memory on every query execution, even though disk-backed storage already performs time-based pruning at append time (line 420). For disk-backed storage, this in-memory filtering is redundant since segments older than retention have already been deleted. Consider skipping this filtering step when using disk storage, or optimize by checking segment timestamps before reading all records. ```suggestion if (_store.isDiskBacked()) { // Disk-backed storage already prunes records at append time, so skip time-based filtering. filtered.addAll(records); } else { for (QueryLogRecord record : records) { if (record.getLogTimestampMs() < cutoff) { continue; } filtered.add(record); } ``` ########## pinot-broker/src/main/java/org/apache/pinot/broker/querylog/QueryLogSystemTable.java: ########## @@ -0,0 +1,1512 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.broker.querylog; + +import com.google.common.annotations.VisibleForTesting; +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.EOFException; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardOpenOption; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.Deque; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Predicate; +import java.util.regex.Pattern; +import java.util.stream.Stream; +import javax.annotation.Nullable; +import org.apache.calcite.sql.SqlBasicCall; +import org.apache.calcite.sql.SqlIdentifier; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.SqlLiteral; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlNodeList; +import org.apache.calcite.sql.SqlNumericLiteral; +import org.apache.calcite.sql.SqlOrderBy; +import org.apache.calcite.sql.SqlSelect; +import org.apache.pinot.common.response.BrokerResponse; +import org.apache.pinot.common.response.broker.BrokerResponseNative; +import org.apache.pinot.common.response.broker.ResultTable; +import org.apache.pinot.common.utils.DataSchema; +import org.apache.pinot.common.utils.QueryLogSystemTableUtils; +import org.apache.pinot.spi.env.PinotConfiguration; +import org.apache.pinot.spi.exception.BadQueryRequestException; +import org.apache.pinot.spi.utils.CommonConstants; +import org.apache.pinot.sql.parsers.SqlNodeAndOptions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Keeps an in-memory copy of recent query log records and evaluates {@code SELECT} queries against it. + */ +public class QueryLogSystemTable { + private static final Logger LOGGER = LoggerFactory.getLogger(QueryLogSystemTable.class); + + private static final QueryLogSystemTable INSTANCE = new QueryLogSystemTable(); + + public static final String FULL_TABLE_NAME = QueryLogSystemTableUtils.FULL_TABLE_NAME; + + private final ReadWriteLock _lock = new ReentrantReadWriteLock(); + private final AtomicBoolean _initialized = new AtomicBoolean(false); + + private volatile boolean _enabled; + private volatile int _maxEntries; + private volatile long _retentionMs; + private volatile int _defaultLimit; + private volatile QueryLogStore _store; + private volatile String _storageType; + + private QueryLogSystemTable() { + } + + public static QueryLogSystemTable getInstance() { + return INSTANCE; + } + + public void initIfNeeded(PinotConfiguration config) { + if (_initialized.compareAndSet(false, true)) { + _enabled = config.getProperty(CommonConstants.Broker.CONFIG_OF_QUERY_LOG_SYSTEM_TABLE_ENABLED, + CommonConstants.Broker.DEFAULT_QUERY_LOG_SYSTEM_TABLE_ENABLED); + _maxEntries = config.getProperty(CommonConstants.Broker.CONFIG_OF_QUERY_LOG_SYSTEM_TABLE_MAX_ENTRIES, + CommonConstants.Broker.DEFAULT_QUERY_LOG_SYSTEM_TABLE_MAX_ENTRIES); + _retentionMs = config.getProperty(CommonConstants.Broker.CONFIG_OF_QUERY_LOG_SYSTEM_TABLE_RETENTION_MS, + CommonConstants.Broker.DEFAULT_QUERY_LOG_SYSTEM_TABLE_RETENTION_MS); + _defaultLimit = config.getProperty(CommonConstants.Broker.CONFIG_OF_QUERY_LOG_SYSTEM_TABLE_DEFAULT_LIMIT, + CommonConstants.Broker.DEFAULT_QUERY_LOG_SYSTEM_TABLE_DEFAULT_LIMIT); + _storageType = config.getProperty(CommonConstants.Broker.CONFIG_OF_QUERY_LOG_SYSTEM_TABLE_STORAGE, + CommonConstants.Broker.DEFAULT_QUERY_LOG_SYSTEM_TABLE_STORAGE); + if (_enabled) { + try { + if ("disk".equalsIgnoreCase(_storageType)) { + String directory = config.getProperty(CommonConstants.Broker.CONFIG_OF_QUERY_LOG_SYSTEM_TABLE_DISK_DIR, + CommonConstants.Broker.DEFAULT_QUERY_LOG_SYSTEM_TABLE_DISK_DIR); + long maxBytes = config.getProperty(CommonConstants.Broker.CONFIG_OF_QUERY_LOG_SYSTEM_TABLE_DISK_MAX_BYTES, + CommonConstants.Broker.DEFAULT_QUERY_LOG_SYSTEM_TABLE_DISK_MAX_BYTES); + long segmentBytes = config.getProperty( + CommonConstants.Broker.CONFIG_OF_QUERY_LOG_SYSTEM_TABLE_DISK_SEGMENT_BYTES, + CommonConstants.Broker.DEFAULT_QUERY_LOG_SYSTEM_TABLE_DISK_SEGMENT_BYTES); + segmentBytes = Math.min(segmentBytes, maxBytes); + _store = + new DiskBackedQueryLogStore(Paths.get(directory), maxBytes, segmentBytes, _maxEntries, _retentionMs); + } else { + _store = new InMemoryQueryLogStore(_maxEntries, _retentionMs); + } + } catch (IOException e) { + LOGGER.error("Failed to initialize query log system table storage", e); + _enabled = false; + _store = null; + } + } + LOGGER.info( + "Initialized query log system table: enabled={}, storage={}, maxEntries={}, retentionMs={}, defaultLimit={}", + _enabled, _storageType, _maxEntries, _retentionMs, _defaultLimit); + } + } + + public boolean isEnabled() { + return _enabled; + } + + public void append(QueryLogRecord record) { + if (!_enabled || record == null || _store == null) { + return; + } + _lock.writeLock().lock(); + try { + _store.append(record); + } catch (IOException e) { + LOGGER.warn("Failed to append query log record", e); + } finally { + _lock.writeLock().unlock(); + } + } + + @Nullable + public BrokerResponse handleIfSystemTable(SqlNodeAndOptions sqlNodeAndOptions) + throws BadQueryRequestException { + if (!_enabled) { + return null; + } + SqlNode sqlNode = sqlNodeAndOptions.getSqlNode(); + if (!QueryLogSystemTableUtils.isQueryLogSystemTableQuery(sqlNode)) { + return null; + } + ParsedQuery parsedQuery = parse(sqlNode); + if (parsedQuery == null) { + return null; + } + return execute(parsedQuery); + } + + private ParsedQuery parse(SqlNode sqlNode) + throws BadQueryRequestException { + SqlNode workingNode = sqlNode; + SqlNodeList outerOrderBy = null; + SqlNode outerOffset = null; + SqlNode outerFetch = null; + if (workingNode instanceof SqlOrderBy) { + SqlOrderBy orderBy = (SqlOrderBy) workingNode; + outerOrderBy = orderBy.orderList; + outerOffset = orderBy.offset; + outerFetch = orderBy.fetch; + workingNode = orderBy.query; + } + + if (!(workingNode instanceof SqlSelect)) { + return null; + } + SqlSelect select = (SqlSelect) workingNode; + if (select.getGroup() != null && select.getGroup().size() > 0) { + throw new BadQueryRequestException("GROUP BY is not supported for system.query_log"); + } + if (select.getHaving() != null) { + throw new BadQueryRequestException("HAVING is not supported for system.query_log"); + } + if (select.isDistinct()) { + throw new BadQueryRequestException("DISTINCT is not supported for system.query_log"); + } + + SqlNodeList selectList = select.getSelectList(); + SqlNode where = select.getWhere(); + SqlNodeList orderList = outerOrderBy != null ? outerOrderBy : select.getOrderList(); + SqlNode offset = outerOffset != null ? outerOffset : select.getOffset(); + SqlNode fetch = outerFetch != null ? outerFetch : select.getFetch(); + + return new ParsedQuery(selectList, where, orderList, offset, fetch); + } + + private BrokerResponse execute(ParsedQuery query) + throws BadQueryRequestException { + long startTimeMs = System.currentTimeMillis(); + List<SelectedColumn> selections = resolveSelectList(query._selectList); + Map<String, QueryLogColumn> aliasLookup = new HashMap<>(); + for (SelectedColumn selectedColumn : selections) { + aliasLookup.put(selectedColumn._outputName.toLowerCase(Locale.ROOT), selectedColumn._column); + } + + Predicate<QueryLogRecord> predicate = buildPredicate(query._whereClause, aliasLookup); + List<Ordering> orderings = parseOrderings(query._orderBy, aliasLookup, selections); + int offset = parseNonNegativeInt(query._offsetNode, "OFFSET", 0); + int limit = parseNonNegativeInt(query._fetchNode, "LIMIT", _defaultLimit); + + List<QueryLogRecord> rows; + try { + rows = getFilteredRecords(predicate); + } catch (IOException e) { + throw new BadQueryRequestException("Failed to read query log storage", e); + } + // If no ORDER BY clause is specified, default to ordering by timestampMs DESC. + Comparator<QueryLogRecord> comparator = orderings.isEmpty() + ? Comparator.comparingLong(QueryLogRecord::getLogTimestampMs).reversed() + : buildComparator(orderings); + rows.sort(comparator); + + int fromIndex = Math.min(offset, rows.size()); + int toIndex = limit < 0 ? rows.size() : Math.min(rows.size(), fromIndex + limit); + List<QueryLogRecord> window = rows.subList(fromIndex, toIndex); + + String[] columnNames = selections.stream().map(selection -> selection._outputName).toArray(String[]::new); + DataSchema.ColumnDataType[] columnDataTypes = selections.stream() + .map(selection -> selection._column._dataType).toArray(DataSchema.ColumnDataType[]::new); + + List<Object[]> tableRows = new ArrayList<>(window.size()); + for (QueryLogRecord record : window) { + Object[] row = new Object[selections.size()]; + for (int i = 0; i < selections.size(); i++) { + row[i] = selections.get(i)._column.extract(record); + } + tableRows.add(row); + } + + ResultTable resultTable = new ResultTable(new DataSchema(columnNames, columnDataTypes), tableRows); + BrokerResponseNative response = new BrokerResponseNative(); + response.setResultTable(resultTable); + response.setNumRowsResultSet(resultTable.getRows().size()); + response.setTablesQueried(Set.of(QueryLogSystemTableUtils.FULL_TABLE_NAME)); + response.setTimeUsedMs(System.currentTimeMillis() - startTimeMs); + return response; + } + + private List<QueryLogRecord> getFilteredRecords(@Nullable Predicate<QueryLogRecord> predicate) + throws IOException { + if (_store == null) { + return Collections.emptyList(); + } + _lock.readLock().lock(); + try { + List<QueryLogRecord> records = _store.getRecords(); + if (records.isEmpty()) { + return Collections.emptyList(); + } + long cutoff = _retentionMs > 0 ? System.currentTimeMillis() - _retentionMs : Long.MIN_VALUE; + List<QueryLogRecord> filtered = new ArrayList<>(records.size()); + for (QueryLogRecord record : records) { + if (record.getLogTimestampMs() < cutoff) { + continue; + } + filtered.add(record); + } + if (filtered.isEmpty()) { + return Collections.emptyList(); + } + if (_maxEntries > 0 && filtered.size() > _maxEntries) { + filtered = new ArrayList<>(filtered.subList(filtered.size() - _maxEntries, filtered.size())); + } else { + filtered = new ArrayList<>(filtered); Review Comment: Row-based filtering creates a new ArrayList copy of filtered records on every query, even when row count is already below the limit. For disk-backed storage, row-based pruning happens at append time (line 421), making this in-memory check redundant. Consider skipping this step for disk storage or only copying when the limit is actually exceeded. ```suggestion ``` ########## pinot-broker/src/main/java/org/apache/pinot/broker/querylog/QueryLogSystemTable.java: ########## @@ -0,0 +1,1512 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.broker.querylog; + +import com.google.common.annotations.VisibleForTesting; +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.EOFException; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardOpenOption; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.Deque; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Predicate; +import java.util.regex.Pattern; +import java.util.stream.Stream; +import javax.annotation.Nullable; +import org.apache.calcite.sql.SqlBasicCall; +import org.apache.calcite.sql.SqlIdentifier; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.SqlLiteral; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlNodeList; +import org.apache.calcite.sql.SqlNumericLiteral; +import org.apache.calcite.sql.SqlOrderBy; +import org.apache.calcite.sql.SqlSelect; +import org.apache.pinot.common.response.BrokerResponse; +import org.apache.pinot.common.response.broker.BrokerResponseNative; +import org.apache.pinot.common.response.broker.ResultTable; +import org.apache.pinot.common.utils.DataSchema; +import org.apache.pinot.common.utils.QueryLogSystemTableUtils; +import org.apache.pinot.spi.env.PinotConfiguration; +import org.apache.pinot.spi.exception.BadQueryRequestException; +import org.apache.pinot.spi.utils.CommonConstants; +import org.apache.pinot.sql.parsers.SqlNodeAndOptions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Keeps an in-memory copy of recent query log records and evaluates {@code SELECT} queries against it. + */ +public class QueryLogSystemTable { + private static final Logger LOGGER = LoggerFactory.getLogger(QueryLogSystemTable.class); + + private static final QueryLogSystemTable INSTANCE = new QueryLogSystemTable(); + + public static final String FULL_TABLE_NAME = QueryLogSystemTableUtils.FULL_TABLE_NAME; + + private final ReadWriteLock _lock = new ReentrantReadWriteLock(); + private final AtomicBoolean _initialized = new AtomicBoolean(false); + + private volatile boolean _enabled; + private volatile int _maxEntries; + private volatile long _retentionMs; + private volatile int _defaultLimit; + private volatile QueryLogStore _store; + private volatile String _storageType; + + private QueryLogSystemTable() { + } + + public static QueryLogSystemTable getInstance() { + return INSTANCE; + } + + public void initIfNeeded(PinotConfiguration config) { + if (_initialized.compareAndSet(false, true)) { + _enabled = config.getProperty(CommonConstants.Broker.CONFIG_OF_QUERY_LOG_SYSTEM_TABLE_ENABLED, + CommonConstants.Broker.DEFAULT_QUERY_LOG_SYSTEM_TABLE_ENABLED); + _maxEntries = config.getProperty(CommonConstants.Broker.CONFIG_OF_QUERY_LOG_SYSTEM_TABLE_MAX_ENTRIES, + CommonConstants.Broker.DEFAULT_QUERY_LOG_SYSTEM_TABLE_MAX_ENTRIES); + _retentionMs = config.getProperty(CommonConstants.Broker.CONFIG_OF_QUERY_LOG_SYSTEM_TABLE_RETENTION_MS, + CommonConstants.Broker.DEFAULT_QUERY_LOG_SYSTEM_TABLE_RETENTION_MS); + _defaultLimit = config.getProperty(CommonConstants.Broker.CONFIG_OF_QUERY_LOG_SYSTEM_TABLE_DEFAULT_LIMIT, + CommonConstants.Broker.DEFAULT_QUERY_LOG_SYSTEM_TABLE_DEFAULT_LIMIT); + _storageType = config.getProperty(CommonConstants.Broker.CONFIG_OF_QUERY_LOG_SYSTEM_TABLE_STORAGE, + CommonConstants.Broker.DEFAULT_QUERY_LOG_SYSTEM_TABLE_STORAGE); + if (_enabled) { + try { + if ("disk".equalsIgnoreCase(_storageType)) { + String directory = config.getProperty(CommonConstants.Broker.CONFIG_OF_QUERY_LOG_SYSTEM_TABLE_DISK_DIR, + CommonConstants.Broker.DEFAULT_QUERY_LOG_SYSTEM_TABLE_DISK_DIR); + long maxBytes = config.getProperty(CommonConstants.Broker.CONFIG_OF_QUERY_LOG_SYSTEM_TABLE_DISK_MAX_BYTES, + CommonConstants.Broker.DEFAULT_QUERY_LOG_SYSTEM_TABLE_DISK_MAX_BYTES); + long segmentBytes = config.getProperty( + CommonConstants.Broker.CONFIG_OF_QUERY_LOG_SYSTEM_TABLE_DISK_SEGMENT_BYTES, + CommonConstants.Broker.DEFAULT_QUERY_LOG_SYSTEM_TABLE_DISK_SEGMENT_BYTES); + segmentBytes = Math.min(segmentBytes, maxBytes); + _store = + new DiskBackedQueryLogStore(Paths.get(directory), maxBytes, segmentBytes, _maxEntries, _retentionMs); + } else { + _store = new InMemoryQueryLogStore(_maxEntries, _retentionMs); + } + } catch (IOException e) { + LOGGER.error("Failed to initialize query log system table storage", e); + _enabled = false; + _store = null; + } + } + LOGGER.info( + "Initialized query log system table: enabled={}, storage={}, maxEntries={}, retentionMs={}, defaultLimit={}", + _enabled, _storageType, _maxEntries, _retentionMs, _defaultLimit); + } + } + + public boolean isEnabled() { + return _enabled; + } + + public void append(QueryLogRecord record) { + if (!_enabled || record == null || _store == null) { + return; + } + _lock.writeLock().lock(); + try { + _store.append(record); + } catch (IOException e) { + LOGGER.warn("Failed to append query log record", e); + } finally { + _lock.writeLock().unlock(); + } + } + + @Nullable + public BrokerResponse handleIfSystemTable(SqlNodeAndOptions sqlNodeAndOptions) + throws BadQueryRequestException { + if (!_enabled) { + return null; + } + SqlNode sqlNode = sqlNodeAndOptions.getSqlNode(); + if (!QueryLogSystemTableUtils.isQueryLogSystemTableQuery(sqlNode)) { + return null; + } + ParsedQuery parsedQuery = parse(sqlNode); + if (parsedQuery == null) { + return null; + } + return execute(parsedQuery); + } + + private ParsedQuery parse(SqlNode sqlNode) + throws BadQueryRequestException { + SqlNode workingNode = sqlNode; + SqlNodeList outerOrderBy = null; + SqlNode outerOffset = null; + SqlNode outerFetch = null; + if (workingNode instanceof SqlOrderBy) { + SqlOrderBy orderBy = (SqlOrderBy) workingNode; + outerOrderBy = orderBy.orderList; + outerOffset = orderBy.offset; + outerFetch = orderBy.fetch; + workingNode = orderBy.query; + } + + if (!(workingNode instanceof SqlSelect)) { + return null; + } + SqlSelect select = (SqlSelect) workingNode; + if (select.getGroup() != null && select.getGroup().size() > 0) { + throw new BadQueryRequestException("GROUP BY is not supported for system.query_log"); + } + if (select.getHaving() != null) { + throw new BadQueryRequestException("HAVING is not supported for system.query_log"); + } + if (select.isDistinct()) { + throw new BadQueryRequestException("DISTINCT is not supported for system.query_log"); + } + + SqlNodeList selectList = select.getSelectList(); + SqlNode where = select.getWhere(); + SqlNodeList orderList = outerOrderBy != null ? outerOrderBy : select.getOrderList(); + SqlNode offset = outerOffset != null ? outerOffset : select.getOffset(); + SqlNode fetch = outerFetch != null ? outerFetch : select.getFetch(); + + return new ParsedQuery(selectList, where, orderList, offset, fetch); + } + + private BrokerResponse execute(ParsedQuery query) + throws BadQueryRequestException { + long startTimeMs = System.currentTimeMillis(); + List<SelectedColumn> selections = resolveSelectList(query._selectList); + Map<String, QueryLogColumn> aliasLookup = new HashMap<>(); + for (SelectedColumn selectedColumn : selections) { + aliasLookup.put(selectedColumn._outputName.toLowerCase(Locale.ROOT), selectedColumn._column); + } + + Predicate<QueryLogRecord> predicate = buildPredicate(query._whereClause, aliasLookup); + List<Ordering> orderings = parseOrderings(query._orderBy, aliasLookup, selections); + int offset = parseNonNegativeInt(query._offsetNode, "OFFSET", 0); + int limit = parseNonNegativeInt(query._fetchNode, "LIMIT", _defaultLimit); + + List<QueryLogRecord> rows; + try { + rows = getFilteredRecords(predicate); + } catch (IOException e) { + throw new BadQueryRequestException("Failed to read query log storage", e); + } + // If no ORDER BY clause is specified, default to ordering by timestampMs DESC. + Comparator<QueryLogRecord> comparator = orderings.isEmpty() + ? Comparator.comparingLong(QueryLogRecord::getLogTimestampMs).reversed() + : buildComparator(orderings); + rows.sort(comparator); + + int fromIndex = Math.min(offset, rows.size()); + int toIndex = limit < 0 ? rows.size() : Math.min(rows.size(), fromIndex + limit); + List<QueryLogRecord> window = rows.subList(fromIndex, toIndex); + + String[] columnNames = selections.stream().map(selection -> selection._outputName).toArray(String[]::new); + DataSchema.ColumnDataType[] columnDataTypes = selections.stream() + .map(selection -> selection._column._dataType).toArray(DataSchema.ColumnDataType[]::new); + + List<Object[]> tableRows = new ArrayList<>(window.size()); + for (QueryLogRecord record : window) { + Object[] row = new Object[selections.size()]; + for (int i = 0; i < selections.size(); i++) { + row[i] = selections.get(i)._column.extract(record); + } + tableRows.add(row); + } + + ResultTable resultTable = new ResultTable(new DataSchema(columnNames, columnDataTypes), tableRows); + BrokerResponseNative response = new BrokerResponseNative(); + response.setResultTable(resultTable); + response.setNumRowsResultSet(resultTable.getRows().size()); + response.setTablesQueried(Set.of(QueryLogSystemTableUtils.FULL_TABLE_NAME)); + response.setTimeUsedMs(System.currentTimeMillis() - startTimeMs); + return response; + } + + private List<QueryLogRecord> getFilteredRecords(@Nullable Predicate<QueryLogRecord> predicate) + throws IOException { + if (_store == null) { + return Collections.emptyList(); + } + _lock.readLock().lock(); + try { + List<QueryLogRecord> records = _store.getRecords(); + if (records.isEmpty()) { + return Collections.emptyList(); + } + long cutoff = _retentionMs > 0 ? System.currentTimeMillis() - _retentionMs : Long.MIN_VALUE; + List<QueryLogRecord> filtered = new ArrayList<>(records.size()); + for (QueryLogRecord record : records) { + if (record.getLogTimestampMs() < cutoff) { + continue; + } + filtered.add(record); + } + if (filtered.isEmpty()) { + return Collections.emptyList(); + } + if (_maxEntries > 0 && filtered.size() > _maxEntries) { + filtered = new ArrayList<>(filtered.subList(filtered.size() - _maxEntries, filtered.size())); + } else { + filtered = new ArrayList<>(filtered); + } + if (predicate != null) { + filtered.removeIf(record -> !predicate.test(record)); + } + return filtered; + } finally { + _lock.readLock().unlock(); + } + } + + private interface QueryLogStore extends AutoCloseable { + void append(QueryLogRecord record) + throws IOException; + + List<QueryLogRecord> getRecords() + throws IOException; + + @Override + void close() + throws IOException; + } + + private static final class InMemoryQueryLogStore implements QueryLogStore { + private final Deque<QueryLogRecord> _records = new ArrayDeque<>(); + private final int _maxEntries; + private final long _retentionMs; + + InMemoryQueryLogStore(int maxEntries, long retentionMs) { + _maxEntries = Math.max(1, maxEntries); Review Comment: Setting `_maxEntries = Math.max(1, maxEntries)` prevents users from disabling row-based retention by setting `maxEntries` to 0 or negative values. The code at line 493 checks `if (_maxEntries <= 0)` to skip row-based pruning, but this check will never be true. Either remove the `Math.max(1, ...)` or update the pruning logic to be consistent. ########## pinot-broker/src/main/java/org/apache/pinot/broker/querylog/QueryLogRecordSerDe.java: ########## @@ -0,0 +1,247 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.broker.querylog; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.zip.GZIPInputStream; +import java.util.zip.GZIPOutputStream; + + +/** + * Binary serializer/deserializer for {@link QueryLogRecord}. Records are compressed individually so they can be + * appended to arbitrary storage backends. + */ +final class QueryLogRecordSerDe { + private QueryLogRecordSerDe() { + } + + static byte[] serialize(QueryLogRecord record) + throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try (GZIPOutputStream gzip = new GZIPOutputStream(baos); + DataOutputStream out = new DataOutputStream(gzip)) { + out.writeLong(record.getLogTimestampMs()); + out.writeLong(record.getRequestId()); + writeString(out, record.getTableName()); + writeString(out, record.getBrokerId()); + writeString(out, record.getClientIp()); + writeString(out, record.getQuery()); + writeString(out, record.getQueryEngine()); + out.writeLong(record.getRequestArrivalTimeMs()); + out.writeLong(record.getTimeMs()); + out.writeLong(record.getBrokerReduceTimeMs()); + out.writeLong(record.getNumDocsScanned()); + out.writeLong(record.getTotalDocs()); + out.writeLong(record.getNumEntriesScannedInFilter()); + out.writeLong(record.getNumEntriesScannedPostFilter()); + out.writeLong(record.getNumSegmentsQueried()); + out.writeLong(record.getNumSegmentsProcessed()); + out.writeLong(record.getNumSegmentsMatched()); + out.writeLong(record.getNumConsumingSegmentsQueried()); + out.writeLong(record.getNumConsumingSegmentsProcessed()); + out.writeLong(record.getNumConsumingSegmentsMatched()); + out.writeLong(record.getNumUnavailableSegments()); + out.writeLong(record.getMinConsumingFreshnessTimeMs()); + out.writeInt(record.getNumServersResponded()); + out.writeInt(record.getNumServersQueried()); + out.writeBoolean(record.isGroupsTrimmed()); + out.writeBoolean(record.isGroupLimitReached()); + out.writeBoolean(record.isGroupWarningLimitReached()); + out.writeLong(record.getNumExceptions()); + writeString(out, record.getExceptions()); + writeString(out, record.getServerStats()); + out.writeLong(record.getOfflineTotalCpuTimeNs()); + out.writeLong(record.getOfflineThreadCpuTimeNs()); + out.writeLong(record.getOfflineSystemActivitiesCpuTimeNs()); + out.writeLong(record.getOfflineResponseSerializationCpuTimeNs()); + out.writeLong(record.getRealtimeTotalCpuTimeNs()); + out.writeLong(record.getRealtimeThreadCpuTimeNs()); + out.writeLong(record.getRealtimeSystemActivitiesCpuTimeNs()); + out.writeLong(record.getRealtimeResponseSerializationCpuTimeNs()); + out.writeLong(record.getOfflineTotalMemAllocatedBytes()); + out.writeLong(record.getOfflineThreadMemAllocatedBytes()); + out.writeLong(record.getOfflineResponseSerMemAllocatedBytes()); + out.writeLong(record.getRealtimeTotalMemAllocatedBytes()); + out.writeLong(record.getRealtimeThreadMemAllocatedBytes()); + out.writeLong(record.getRealtimeResponseSerMemAllocatedBytes()); + writeIntArray(out, record.getPools()); + out.writeBoolean(record.isPartialResult()); + out.writeBoolean(record.isRlsFiltersApplied()); + out.writeInt(record.getNumRowsResultSet()); + writeStringArray(out, record.getTablesQueried()); + writeString(out, record.getTraceInfoJson()); + writeString(out, record.getFanoutType()); + writeString(out, record.getOfflineServerTenant()); + writeString(out, record.getRealtimeServerTenant()); + } + return baos.toByteArray(); + } + + static QueryLogRecord deserialize(byte[] payload) + throws IOException { + try (GZIPInputStream gzip = new GZIPInputStream(new ByteArrayInputStream(payload)); + DataInputStream in = new DataInputStream(gzip)) { + long logTimestampMs = in.readLong(); + long requestId = in.readLong(); + String tableName = readString(in); + String brokerId = readString(in); + String clientIp = readString(in); + String query = readString(in); + String queryEngine = readString(in); + long requestArrivalTimeMs = in.readLong(); + long timeMs = in.readLong(); + long brokerReduceTimeMs = in.readLong(); + long numDocsScanned = in.readLong(); + long totalDocs = in.readLong(); + long numEntriesScannedInFilter = in.readLong(); + long numEntriesScannedPostFilter = in.readLong(); + long numSegmentsQueried = in.readLong(); + long numSegmentsProcessed = in.readLong(); + long numSegmentsMatched = in.readLong(); + long numConsumingSegmentsQueried = in.readLong(); + long numConsumingSegmentsProcessed = in.readLong(); + long numConsumingSegmentsMatched = in.readLong(); + long numUnavailableSegments = in.readLong(); + long minConsumingFreshnessTimeMs = in.readLong(); + int numServersResponded = in.readInt(); + int numServersQueried = in.readInt(); + boolean groupsTrimmed = in.readBoolean(); + boolean groupLimitReached = in.readBoolean(); + boolean groupWarningLimitReached = in.readBoolean(); + long numExceptions = in.readLong(); + String exceptions = readString(in); + String serverStats = readString(in); + long offlineTotalCpuTimeNs = in.readLong(); + long offlineThreadCpuTimeNs = in.readLong(); + long offlineSystemActivitiesCpuTimeNs = in.readLong(); + long offlineResponseSerializationCpuTimeNs = in.readLong(); + long realtimeTotalCpuTimeNs = in.readLong(); + long realtimeThreadCpuTimeNs = in.readLong(); + long realtimeSystemActivitiesCpuTimeNs = in.readLong(); + long realtimeResponseSerializationCpuTimeNs = in.readLong(); + long offlineTotalMemAllocatedBytes = in.readLong(); + long offlineThreadMemAllocatedBytes = in.readLong(); + long offlineResponseSerMemAllocatedBytes = in.readLong(); + long realtimeTotalMemAllocatedBytes = in.readLong(); + long realtimeThreadMemAllocatedBytes = in.readLong(); + long realtimeResponseSerMemAllocatedBytes = in.readLong(); + int[] pools = readIntArray(in); + boolean partialResult = in.readBoolean(); + boolean rlsFiltersApplied = in.readBoolean(); + int numRowsResultSet = in.readInt(); + String[] tablesQueried = readStringArray(in); + String traceInfoJson = readString(in); + String fanoutType = readString(in); + String offlineServerTenant = readString(in); + String realtimeServerTenant = readString(in); + + return new QueryLogRecord(logTimestampMs, requestId, tableName, brokerId, clientIp, query, queryEngine, + requestArrivalTimeMs, timeMs, brokerReduceTimeMs, numDocsScanned, totalDocs, numEntriesScannedInFilter, + numEntriesScannedPostFilter, numSegmentsQueried, numSegmentsProcessed, numSegmentsMatched, + numConsumingSegmentsQueried, numConsumingSegmentsProcessed, numConsumingSegmentsMatched, + numUnavailableSegments, minConsumingFreshnessTimeMs, numServersResponded, numServersQueried, groupsTrimmed, + groupLimitReached, groupWarningLimitReached, numExceptions, exceptions, serverStats, offlineTotalCpuTimeNs, + offlineThreadCpuTimeNs, offlineSystemActivitiesCpuTimeNs, offlineResponseSerializationCpuTimeNs, + realtimeTotalCpuTimeNs, realtimeThreadCpuTimeNs, realtimeSystemActivitiesCpuTimeNs, + realtimeResponseSerializationCpuTimeNs, offlineTotalMemAllocatedBytes, offlineThreadMemAllocatedBytes, + offlineResponseSerMemAllocatedBytes, realtimeTotalMemAllocatedBytes, realtimeThreadMemAllocatedBytes, + realtimeResponseSerMemAllocatedBytes, pools, partialResult, rlsFiltersApplied, numRowsResultSet, + tablesQueried, traceInfoJson, fanoutType, offlineServerTenant, realtimeServerTenant); + } + } + + private static void writeString(DataOutputStream out, String value) + throws IOException { + if (value == null) { + out.writeBoolean(false); + return; + } + out.writeBoolean(true); + byte[] bytes = value.getBytes(StandardCharsets.UTF_8); + out.writeInt(bytes.length); + out.write(bytes); + } + + private static String readString(DataInputStream in) + throws IOException { + boolean present = in.readBoolean(); + if (!present) { + return null; + } + int length = in.readInt(); + byte[] bytes = new byte[length]; + in.readFully(bytes); + return new String(bytes, StandardCharsets.UTF_8); + } + + private static void writeStringArray(DataOutputStream out, String[] values) + throws IOException { + if (values == null) { + out.writeInt(-1); + return; + } + out.writeInt(values.length); + for (String value : values) { + writeString(out, value); + } + } + + private static String[] readStringArray(DataInputStream in) + throws IOException { + int length = in.readInt(); + if (length < 0) { + return new String[0]; + } + String[] values = new String[length]; + for (int i = 0; i < length; i++) { + values[i] = readString(in); + } + return values; + } + + private static void writeIntArray(DataOutputStream out, int[] values) + throws IOException { + if (values == null) { + out.writeInt(-1); + return; + } + out.writeInt(values.length); + for (int value : values) { + out.writeInt(value); + } + } + + private static int[] readIntArray(DataInputStream in) + throws IOException { + int length = in.readInt(); + if (length < 0) { + return new int[0]; Review Comment: When deserializing a null int array (indicated by length < 0), this returns an empty array instead of null. This is inconsistent with serialization at line 225-227 where null arrays are written with length -1. Consider returning `null` here to maintain round-trip serialization consistency. ```suggestion return null; ``` ########## pinot-broker/src/main/java/org/apache/pinot/broker/querylog/QueryLogRecordSerDe.java: ########## @@ -0,0 +1,247 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.broker.querylog; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.zip.GZIPInputStream; +import java.util.zip.GZIPOutputStream; + + +/** + * Binary serializer/deserializer for {@link QueryLogRecord}. Records are compressed individually so they can be + * appended to arbitrary storage backends. + */ +final class QueryLogRecordSerDe { + private QueryLogRecordSerDe() { + } + + static byte[] serialize(QueryLogRecord record) + throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try (GZIPOutputStream gzip = new GZIPOutputStream(baos); + DataOutputStream out = new DataOutputStream(gzip)) { + out.writeLong(record.getLogTimestampMs()); + out.writeLong(record.getRequestId()); + writeString(out, record.getTableName()); + writeString(out, record.getBrokerId()); + writeString(out, record.getClientIp()); + writeString(out, record.getQuery()); + writeString(out, record.getQueryEngine()); + out.writeLong(record.getRequestArrivalTimeMs()); + out.writeLong(record.getTimeMs()); + out.writeLong(record.getBrokerReduceTimeMs()); + out.writeLong(record.getNumDocsScanned()); + out.writeLong(record.getTotalDocs()); + out.writeLong(record.getNumEntriesScannedInFilter()); + out.writeLong(record.getNumEntriesScannedPostFilter()); + out.writeLong(record.getNumSegmentsQueried()); + out.writeLong(record.getNumSegmentsProcessed()); + out.writeLong(record.getNumSegmentsMatched()); + out.writeLong(record.getNumConsumingSegmentsQueried()); + out.writeLong(record.getNumConsumingSegmentsProcessed()); + out.writeLong(record.getNumConsumingSegmentsMatched()); + out.writeLong(record.getNumUnavailableSegments()); + out.writeLong(record.getMinConsumingFreshnessTimeMs()); + out.writeInt(record.getNumServersResponded()); + out.writeInt(record.getNumServersQueried()); + out.writeBoolean(record.isGroupsTrimmed()); + out.writeBoolean(record.isGroupLimitReached()); + out.writeBoolean(record.isGroupWarningLimitReached()); + out.writeLong(record.getNumExceptions()); + writeString(out, record.getExceptions()); + writeString(out, record.getServerStats()); + out.writeLong(record.getOfflineTotalCpuTimeNs()); + out.writeLong(record.getOfflineThreadCpuTimeNs()); + out.writeLong(record.getOfflineSystemActivitiesCpuTimeNs()); + out.writeLong(record.getOfflineResponseSerializationCpuTimeNs()); + out.writeLong(record.getRealtimeTotalCpuTimeNs()); + out.writeLong(record.getRealtimeThreadCpuTimeNs()); + out.writeLong(record.getRealtimeSystemActivitiesCpuTimeNs()); + out.writeLong(record.getRealtimeResponseSerializationCpuTimeNs()); + out.writeLong(record.getOfflineTotalMemAllocatedBytes()); + out.writeLong(record.getOfflineThreadMemAllocatedBytes()); + out.writeLong(record.getOfflineResponseSerMemAllocatedBytes()); + out.writeLong(record.getRealtimeTotalMemAllocatedBytes()); + out.writeLong(record.getRealtimeThreadMemAllocatedBytes()); + out.writeLong(record.getRealtimeResponseSerMemAllocatedBytes()); + writeIntArray(out, record.getPools()); + out.writeBoolean(record.isPartialResult()); + out.writeBoolean(record.isRlsFiltersApplied()); + out.writeInt(record.getNumRowsResultSet()); + writeStringArray(out, record.getTablesQueried()); + writeString(out, record.getTraceInfoJson()); + writeString(out, record.getFanoutType()); + writeString(out, record.getOfflineServerTenant()); + writeString(out, record.getRealtimeServerTenant()); + } + return baos.toByteArray(); + } + + static QueryLogRecord deserialize(byte[] payload) + throws IOException { + try (GZIPInputStream gzip = new GZIPInputStream(new ByteArrayInputStream(payload)); + DataInputStream in = new DataInputStream(gzip)) { + long logTimestampMs = in.readLong(); + long requestId = in.readLong(); + String tableName = readString(in); + String brokerId = readString(in); + String clientIp = readString(in); + String query = readString(in); + String queryEngine = readString(in); + long requestArrivalTimeMs = in.readLong(); + long timeMs = in.readLong(); + long brokerReduceTimeMs = in.readLong(); + long numDocsScanned = in.readLong(); + long totalDocs = in.readLong(); + long numEntriesScannedInFilter = in.readLong(); + long numEntriesScannedPostFilter = in.readLong(); + long numSegmentsQueried = in.readLong(); + long numSegmentsProcessed = in.readLong(); + long numSegmentsMatched = in.readLong(); + long numConsumingSegmentsQueried = in.readLong(); + long numConsumingSegmentsProcessed = in.readLong(); + long numConsumingSegmentsMatched = in.readLong(); + long numUnavailableSegments = in.readLong(); + long minConsumingFreshnessTimeMs = in.readLong(); + int numServersResponded = in.readInt(); + int numServersQueried = in.readInt(); + boolean groupsTrimmed = in.readBoolean(); + boolean groupLimitReached = in.readBoolean(); + boolean groupWarningLimitReached = in.readBoolean(); + long numExceptions = in.readLong(); + String exceptions = readString(in); + String serverStats = readString(in); + long offlineTotalCpuTimeNs = in.readLong(); + long offlineThreadCpuTimeNs = in.readLong(); + long offlineSystemActivitiesCpuTimeNs = in.readLong(); + long offlineResponseSerializationCpuTimeNs = in.readLong(); + long realtimeTotalCpuTimeNs = in.readLong(); + long realtimeThreadCpuTimeNs = in.readLong(); + long realtimeSystemActivitiesCpuTimeNs = in.readLong(); + long realtimeResponseSerializationCpuTimeNs = in.readLong(); + long offlineTotalMemAllocatedBytes = in.readLong(); + long offlineThreadMemAllocatedBytes = in.readLong(); + long offlineResponseSerMemAllocatedBytes = in.readLong(); + long realtimeTotalMemAllocatedBytes = in.readLong(); + long realtimeThreadMemAllocatedBytes = in.readLong(); + long realtimeResponseSerMemAllocatedBytes = in.readLong(); + int[] pools = readIntArray(in); + boolean partialResult = in.readBoolean(); + boolean rlsFiltersApplied = in.readBoolean(); + int numRowsResultSet = in.readInt(); + String[] tablesQueried = readStringArray(in); + String traceInfoJson = readString(in); + String fanoutType = readString(in); + String offlineServerTenant = readString(in); + String realtimeServerTenant = readString(in); + + return new QueryLogRecord(logTimestampMs, requestId, tableName, brokerId, clientIp, query, queryEngine, + requestArrivalTimeMs, timeMs, brokerReduceTimeMs, numDocsScanned, totalDocs, numEntriesScannedInFilter, + numEntriesScannedPostFilter, numSegmentsQueried, numSegmentsProcessed, numSegmentsMatched, + numConsumingSegmentsQueried, numConsumingSegmentsProcessed, numConsumingSegmentsMatched, + numUnavailableSegments, minConsumingFreshnessTimeMs, numServersResponded, numServersQueried, groupsTrimmed, + groupLimitReached, groupWarningLimitReached, numExceptions, exceptions, serverStats, offlineTotalCpuTimeNs, + offlineThreadCpuTimeNs, offlineSystemActivitiesCpuTimeNs, offlineResponseSerializationCpuTimeNs, + realtimeTotalCpuTimeNs, realtimeThreadCpuTimeNs, realtimeSystemActivitiesCpuTimeNs, + realtimeResponseSerializationCpuTimeNs, offlineTotalMemAllocatedBytes, offlineThreadMemAllocatedBytes, + offlineResponseSerMemAllocatedBytes, realtimeTotalMemAllocatedBytes, realtimeThreadMemAllocatedBytes, + realtimeResponseSerMemAllocatedBytes, pools, partialResult, rlsFiltersApplied, numRowsResultSet, + tablesQueried, traceInfoJson, fanoutType, offlineServerTenant, realtimeServerTenant); + } + } + + private static void writeString(DataOutputStream out, String value) + throws IOException { + if (value == null) { + out.writeBoolean(false); + return; + } + out.writeBoolean(true); + byte[] bytes = value.getBytes(StandardCharsets.UTF_8); + out.writeInt(bytes.length); + out.write(bytes); + } + + private static String readString(DataInputStream in) + throws IOException { + boolean present = in.readBoolean(); + if (!present) { + return null; + } + int length = in.readInt(); + byte[] bytes = new byte[length]; + in.readFully(bytes); + return new String(bytes, StandardCharsets.UTF_8); + } + + private static void writeStringArray(DataOutputStream out, String[] values) + throws IOException { + if (values == null) { + out.writeInt(-1); + return; + } + out.writeInt(values.length); + for (String value : values) { + writeString(out, value); + } + } + + private static String[] readStringArray(DataInputStream in) + throws IOException { + int length = in.readInt(); + if (length < 0) { + return new String[0]; Review Comment: When deserializing a null string array (indicated by length < 0), this returns an empty array instead of null. This is inconsistent with serialization at line 200-202 where null arrays are written with length -1. Consider returning `null` here to maintain round-trip serialization consistency. ```suggestion return null; ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
