Copilot commented on code in PR #17811:
URL: https://github.com/apache/pinot/pull/17811#discussion_r2887667668
##########
pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java:
##########
@@ -81,18 +93,24 @@ private Boolean fetchSingleStream()
StreamConsumerFactory streamConsumerFactory =
StreamConsumerFactoryProvider.create(streamConfig);
try (StreamMetadataProvider streamMetadataProvider =
streamConsumerFactory.createStreamMetadataProvider(
StreamConsumerFactory.getUniqueClientId(clientId))) {
-
_newPartitionGroupMetadataList.addAll(streamMetadataProvider.computePartitionGroupMetadata(clientId,
streamConfig,
- _partitionGroupConsumptionStatusList, /*maxWaitTimeMs=*/15000,
_forceGetOffsetFromStream));
+ List<PartitionGroupMetadata> partitionGroupMetadataList =
+ streamMetadataProvider.computePartitionGroupMetadata(clientId,
streamConfig,
+ _partitionGroupConsumptionStatusList, /*maxWaitTimeMs=*/15000,
_forceGetOffsetFromStream);
+ // Fetch partition count after computePartitionGroupMetadata() to avoid
duplicate metadata calls,
+ // since the default computePartitionGroupMetadata() implementation
calls fetchPartitionCount() internally.
+ int partitionCount =
streamMetadataProvider.fetchPartitionCount(/*timeoutMillis=*/15000);
Review Comment:
`fetchSingleStream()` calls `computePartitionGroupMetadata(...)` and then
unconditionally calls `fetchPartitionCount(...)`. The default
`StreamMetadataProvider.computePartitionGroupMetadata(...)` already calls
`fetchPartitionCount(...)`, so this can result in two partition-count fetches
per stream (often a network call). Consider adjusting the API to return the
partition count along with the computed metadata, or avoid the second call when
using the default implementation (e.g., by deriving the count once or caching
it inside the provider).
```suggestion
// Derive partition count from the computed metadata to avoid an extra
partition-count fetch.
int partitionCount = partitionGroupMetadataList.size();
```
##########
pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java:
##########
@@ -51,23 +52,34 @@ public PartitionGroupMetadataFetcher(List<StreamConfig>
streamConfigs,
_pausedTopicIndices = pausedTopicIndices;
}
+ public List<StreamMetadata> getStreamMetadataList() {
+ return Collections.unmodifiableList(_streamMetadataList);
+ }
+
+ /**
+ * @deprecated after 1.5.0 release. Use {@link #getStreamMetadataList()}
instead.
+ */
+ @Deprecated
public List<PartitionGroupMetadata> getPartitionGroupMetadataList() {
- return _newPartitionGroupMetadataList;
+ return _streamMetadataList.stream()
+ .flatMap(sm -> sm.getPartitionGroupMetadataList().stream())
+ .collect(Collectors.toList());
}
public Exception getException() {
return _exception;
}
/**
- * Callable to fetch the {@link PartitionGroupMetadata} list, from the
stream.
+ * Callable to fetch the {@link StreamMetadata} list from the streams.
* The stream requires the list of {@link PartitionGroupConsumptionStatus}
to compute the new
* {@link PartitionGroupMetadata}
*/
@Override
public Boolean call()
throws Exception {
- _newPartitionGroupMetadataList.clear();
+ _streamMetadataList.clear();
+ _exception = null;
Review Comment:
`call()` clears `_exception` at the start, but `fetchSingleStream()` /
`fetchMultipleStreams()` still check `if (_exception != null)` to log a
“succeeded now” message. With the reset, that condition can never be true, so
the info log block is dead code. Consider capturing the previous exception
state in a local boolean before resetting, or remove the unreachable branch.
```suggestion
```
##########
pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadata.java:
##########
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.stream;
+
+import java.util.List;
+
+
+/**
+ * Groups partition metadata for a single stream/topic.
+ *
+ * <p>This replaces the flat {@code List<PartitionGroupMetadata>} pattern
where partitions from all streams were mixed
+ * together and required partition ID padding to identify stream membership.
+ *
+ * <p>The {@link PartitionGroupMetadata} items within this container use
Pinot-encoded partition IDs
+ * (i.e., {@code streamIndex * 10000 + streamPartitionId}) to maintain
backward compatibility with segment names
+ * stored in ZooKeeper.
+ */
+public class StreamMetadata {
+
+ private final StreamConfig _streamConfig;
+ private final int _numPartitions;
+ private final List<PartitionGroupMetadata> _partitionGroupMetadataList;
+
+ public StreamMetadata(StreamConfig streamConfig, int numPartitions,
+ List<PartitionGroupMetadata> partitionGroupMetadataList) {
+ _streamConfig = streamConfig;
+ _numPartitions = numPartitions;
+ _partitionGroupMetadataList = List.copyOf(partitionGroupMetadataList);
+ }
Review Comment:
The PR description mentions `StreamMetadata` also carrying a “stream config
index”, but the new `StreamMetadata` type only stores `StreamConfig`,
`numPartitions`, and the partition metadata list. If the index is required by
downstream logic, it should be added to the type; otherwise, the PR description
(and any related docs) should be updated to avoid implying it is available from
the object.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]