Copilot commented on code in PR #17587:
URL: https://github.com/apache/pinot/pull/17587#discussion_r2768632079
##########
pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/test/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumerTest.java:
##########
@@ -202,6 +202,86 @@ public void testGetPartitionCount() {
assertEquals(streamMetadataProvider.fetchPartitionCount(10000L), 2);
}
+ @Test
+ public void testFetchPartitionCountWithSubset()
+ throws Exception {
+ String streamType = "kafka";
+ String streamKafkaBrokerList = _kafkaBrokerAddress;
+ String clientId = "clientId";
+ String tableNameWithType = "tableName_REALTIME";
+
+ // TEST_TOPIC_2 has 2 partitions (0, 1). Subset "0" -> count 1.
+ Map<String, String> streamConfigMap = new HashMap<>();
+ streamConfigMap.put("streamType", streamType);
+ streamConfigMap.put("stream.kafka.topic.name", TEST_TOPIC_2);
+ streamConfigMap.put("stream.kafka.broker.list", streamKafkaBrokerList);
+ streamConfigMap.put("stream.kafka.consumer.factory.class.name",
getKafkaConsumerFactoryName());
+ streamConfigMap.put("stream.kafka.decoder.class.name", "decoderClass");
+
streamConfigMap.put(KafkaStreamConfigProperties.constructStreamProperty(KafkaStreamConfigProperties.PARTITION_IDS),
+ "0");
+ StreamConfig streamConfig = new StreamConfig(tableNameWithType,
streamConfigMap);
+
+ try (KafkaStreamMetadataProvider provider = new
KafkaStreamMetadataProvider(clientId, streamConfig)) {
+ Assert.assertEquals(provider.fetchPartitionCount(10000L), 1);
+ }
+
+ // Subset "0,1" -> count 2.
+
streamConfigMap.put(KafkaStreamConfigProperties.constructStreamProperty(KafkaStreamConfigProperties.PARTITION_IDS),
+ "0,1");
+ streamConfig = new StreamConfig(tableNameWithType, streamConfigMap);
+ try (KafkaStreamMetadataProvider provider = new
KafkaStreamMetadataProvider(clientId, streamConfig)) {
+ Assert.assertEquals(provider.fetchPartitionCount(10000L), 2);
+ }
+ }
+
+ @Test
+ public void testFetchPartitionIdsWithSubset()
+ throws Exception {
+ String streamType = "kafka";
+ String streamKafkaBrokerList = _kafkaBrokerAddress;
+ String clientId = "clientId";
+ String tableNameWithType = "tableName_REALTIME";
+
+ Map<String, String> streamConfigMap = new HashMap<>();
+ streamConfigMap.put("streamType", streamType);
+ streamConfigMap.put("stream.kafka.topic.name", TEST_TOPIC_2);
+ streamConfigMap.put("stream.kafka.broker.list", streamKafkaBrokerList);
+ streamConfigMap.put("stream.kafka.consumer.factory.class.name",
getKafkaConsumerFactoryName());
+ streamConfigMap.put("stream.kafka.decoder.class.name", "decoderClass");
+
streamConfigMap.put(KafkaStreamConfigProperties.constructStreamProperty(KafkaStreamConfigProperties.PARTITION_IDS),
+ "1,0");
+ StreamConfig streamConfig = new StreamConfig(tableNameWithType,
streamConfigMap);
+
+ try (KafkaStreamMetadataProvider provider = new
KafkaStreamMetadataProvider(clientId, streamConfig)) {
+ Set<Integer> ids = provider.fetchPartitionIds(10000L);
+ Assert.assertEquals(ids, Set.of(0, 1));
+ }
+ }
+
+ @Test
+ public void testPartitionSubsetDoesNotValidateAtRuntime()
Review Comment:
The test name `testPartitionSubsetDoesNotValidateAtRuntime` is misleading.
The test demonstrates that invalid partition IDs (partition 99 doesn't exist)
are accepted without validation. Consider renaming to
`testPartitionSubsetAcceptsInvalidPartitionIds` or
`testPartitionSubsetSkipsValidation` to more accurately describe the behavior
being tested.
```suggestion
public void testPartitionSubsetAcceptsInvalidPartitionIds()
```
##########
pinot-tools/src/main/java/org/apache/pinot/tools/QuickStartBase.java:
##########
@@ -968,4 +987,66 @@ protected void runVectorQueryExamples(QuickstartRunner
runner)
printStatus(Quickstart.Color.YELLOW,
prettyPrintResponse(runner.runQuery(q7)));
printStatus(Quickstart.Color.GREEN,
"***************************************************");
}
+
+ protected void createFineFoodReviewsFederatedTable() {
+ if (!useDefaultBootstrapTableDir()) {
+ return;
+ }
+ Map<String, String> streamTableDirectories =
getDefaultStreamTableDirectories();
+ if (!streamTableDirectories.containsKey("fineFoodReviews-part-0")
+ || !streamTableDirectories.containsKey("fineFoodReviews-part-1")) {
+ return;
+ }
+ String logicalTableName = "fineFoodReviews-federated";
+ try {
+ Schema schema =
loadSchemaFromResource("/examples/stream/fineFoodReviews/fineFoodReviews_schema.json");
+ schema.setSchemaName(logicalTableName);
+ createSchemaOnController(schema, logicalTableName);
+
+ LogicalTableConfig logicalTableConfig = new LogicalTableConfig();
+ logicalTableConfig.setTableName(logicalTableName);
+ logicalTableConfig.setBrokerTenant("DefaultTenant");
+
logicalTableConfig.setRefRealtimeTableName("fineFoodReviews-part-0_REALTIME");
+ logicalTableConfig.setPhysicalTableConfigMap(Map.of(
+ "fineFoodReviews-part-0_REALTIME", new PhysicalTableConfig(),
+ "fineFoodReviews-part-1_REALTIME", new PhysicalTableConfig()
+ ));
+
+ String logicalTableUrl = "http://localhost:" +
QuickstartRunner.DEFAULT_CONTROLLER_PORT + "/logicalTables";
+ AbstractBaseAdminCommand.sendPostRequest(logicalTableUrl,
logicalTableConfig.toSingleLineJsonString());
+ printStatus(Quickstart.Color.GREEN,
+ "***** Logical table fineFoodReviews-federated created successfully
*****");
+ } catch (Exception e) {
+ printStatus(Quickstart.Color.YELLOW,
+ "***** Logical table fineFoodReviews-federated creation failed: " +
e.getMessage() + " *****");
+ }
+ }
+
+ private Schema loadSchemaFromResource(String resourcePath)
+ throws IOException {
+ try (InputStream inputStream =
getClass().getResourceAsStream(resourcePath)) {
+ if (inputStream == null) {
+ throw new IOException("Schema file not found: " + resourcePath);
+ }
+ String schemaJsonString = IOUtils.toString(inputStream,
StandardCharsets.UTF_8);
+ return Schema.fromString(schemaJsonString);
+ }
+ }
+
+ private void createSchemaOnController(Schema schema, String logicalTableName)
+ throws Exception {
+ File tempSchemaFile = File.createTempFile(logicalTableName + "_schema",
".json");
+ tempSchemaFile.deleteOnExit();
+ FileUtils.writeStringToFile(tempSchemaFile,
schema.toSingleLineJsonString(), StandardCharsets.UTF_8);
+ HttpEntity multipartEntity = MultipartEntityBuilder.create()
+ .addPart("schema", new FileBody(tempSchemaFile,
ContentType.APPLICATION_JSON, logicalTableName + ".json"))
+ .build();
+ HttpClient httpClient = new HttpClient();
+ String schemaUrl = "http://localhost:" +
QuickstartRunner.DEFAULT_CONTROLLER_PORT + "/schemas?override=true"
+ + "&force=true";
+ SimpleHttpResponse response =
httpClient.sendPostRequest(URI.create(schemaUrl), multipartEntity, null, null);
+ if (response.getStatusCode() != 200) {
+ throw new RuntimeException("Schema creation response: " +
response.getResponse());
Review Comment:
The HttpClient instance should be closed after use to release resources
properly. Consider using try-with-resources if HttpClient implements
AutoCloseable, or ensure close() is called in a finally block.
```suggestion
try (HttpClient httpClient = new HttpClient()) {
String schemaUrl = "http://localhost:" +
QuickstartRunner.DEFAULT_CONTROLLER_PORT + "/schemas?override=true"
+ "&force=true";
SimpleHttpResponse response =
httpClient.sendPostRequest(URI.create(schemaUrl), multipartEntity, null, null);
if (response.getStatusCode() != 200) {
throw new RuntimeException("Schema creation response: " +
response.getResponse());
}
```
##########
pinot-tools/src/main/java/org/apache/pinot/tools/QuickStartBase.java:
##########
@@ -81,17 +94,19 @@ public abstract class QuickStartBase {
"examples/batch/testUnnest",
};
- protected static final Map<String, String> DEFAULT_STREAM_TABLE_DIRECTORIES
= Map.of(
- "airlineStats", "examples/stream/airlineStats",
- "dailySales", "examples/stream/dailySales",
- "githubEvents", "examples/stream/githubEvents",
- "meetupRsvp", "examples/stream/meetupRsvp",
- "meetupRsvpJson", "examples/stream/meetupRsvpJson",
- "meetupRsvpComplexType", "examples/stream/meetupRsvpComplexType",
- "upsertMeetupRsvp", "examples/stream/upsertMeetupRsvp",
- "upsertJsonMeetupRsvp", "examples/stream/upsertJsonMeetupRsvp",
- "upsertPartialMeetupRsvp", "examples/stream/upsertPartialMeetupRsvp",
- "fineFoodReviews", "examples/stream/fineFoodReviews");
+ protected static final Map<String, String> DEFAULT_STREAM_TABLE_DIRECTORIES
= Map.ofEntries(
Review Comment:
The change from `Map.of()` to `Map.ofEntries()` is unnecessary for 10
entries. `Map.of()` supports up to 10 key-value pairs and is more concise.
`Map.ofEntries()` is typically used when you have more than 10 entries or need
to construct entries programmatically. Consider reverting to `Map.of()` for
better readability.
##########
pinot-plugins/pinot-stream-ingestion/pinot-kafka-base/src/main/java/org/apache/pinot/plugin/stream/kafka/KafkaPartitionSubsetUtils.java:
##########
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.stream.kafka;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.commons.lang3.StringUtils;
+
+
+/**
+ * Utilities for parsing and validating Kafka partition subset configuration
+ * (stream.kafka.partition.ids) from stream config.
+ */
+public final class KafkaPartitionSubsetUtils {
+
+ private KafkaPartitionSubsetUtils() {
+ }
+
+ /**
+ * Reads the optional comma-separated partition ID list from the stream
config map.
+ * Returns a sorted, deduplicated list for stable ordering when used for
partition group metadata.
+ * Duplicate IDs in the config are silently removed.
+ *
+ * @param streamConfigMap table stream config map (e.g. from
+ * {@link
org.apache.pinot.spi.stream.StreamConfig#getStreamConfigsMap()})
+ * @return Sorted list of unique partition IDs when
stream.kafka.partition.ids is set and non-empty;
+ * null when not set or blank
+ * @throws IllegalArgumentException if the value contains invalid
(non-integer) entries
+ */
+ @Nullable
+ public static List<Integer> getPartitionIdsFromConfig(Map<String, String>
streamConfigMap) {
Review Comment:
The method documentation states that duplicate IDs are "silently removed"
but doesn't explain why this is the desired behavior. Consider adding a
sentence explaining the rationale (e.g., "This ensures stable ordering and
prevents duplicate processing of the same partition").
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java:
##########
@@ -421,8 +459,9 @@ private void replicaGroupBasedMinimumMovement(Map<Integer,
List<InstanceConfig>>
List<List<String>> partitionIdToInstancesMap = new
ArrayList<>(numPartitions);
List<Set<String>> partitionIdToInstanceSetMap = new
ArrayList<>(numPartitions);
- List<List<String>> partitionIdToExistingInstancesMap = new
ArrayList<>(existingNumPartitions);
- for (int partitionId = 0; partitionId < numPartitions;
partitionId++) {
+ List<List<String>> partitionIdToExistingInstancesMap = new
ArrayList<>(partitionIds.size());
+ for (int idx = 0; idx < partitionIds.size(); idx++) {
+ int partitionId = partitionIds.get(idx);
Review Comment:
The code uses an index-based loop to iterate over `partitionIds` and
maintain parallel lists (`partitionIdToInstancesMap`,
`partitionIdToInstanceSetMap`, `partitionIdToExistingInstancesMap`). This
pattern is error-prone when the lists are accessed later. Consider using a
Map<Integer, PartitionData> structure where PartitionData holds instances,
instanceSet, and existingInstances keyed by partitionId for clearer intent and
safer access.
```suggestion
for (Integer partitionId : partitionIds) {
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]