noob-se7en commented on code in PR #17587:
URL: https://github.com/apache/pinot/pull/17587#discussion_r2851384629


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java:
##########
@@ -160,12 +160,15 @@ protected List<String> assignConsumingSegment(int 
segmentPartitionId, InstancePa
           instancesAssigned.add(instances.get(segmentPartitionId % 
instances.size()));
         }
       } else {
-        // Explicit partition:
-        // Assign segment to the first instance within the partition.
-
+        // Explicit partition: instance partitions are keyed by stream 
partition id (supports non-contiguous subset).
         for (int replicaGroupId = 0; replicaGroupId < numReplicaGroups; 
replicaGroupId++) {
-          int partitionId = segmentPartitionId % numPartitions;
-          instancesAssigned.add(instancePartitions.getInstances(partitionId, 
replicaGroupId).get(0));
+          List<String> instances = 
instancePartitions.getInstances(segmentPartitionId, replicaGroupId);

Review Comment:
   We dont need to do `segmentPartitionId % numPartitions` ? 



##########
pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaStreamMetadataProvider.java:
##########
@@ -343,7 +407,26 @@ private List<PartitionInfo> fetchPartitionInfos(long 
timeoutMillis) {
     }
 
     throw new TransientConsumerException(
-        new RuntimeException(String.format("Failed to fetch partition 
information for topic: %s", _topic)));
+        new RuntimeException("Failed to fetch partition information for topic: 
" + _topic));
+  }
+
+  private void validatePartitionIds(List<Integer> subset) {
+    List<Integer> topicPartitionIds = new ArrayList<>();
+    List<PartitionInfo> partitionInfos = fetchPartitionInfos(10_000L);
+    for (PartitionInfo partitionInfo : partitionInfos) {
+      topicPartitionIds.add(partitionInfo.partition());
+    }
+    Collections.sort(topicPartitionIds);

Review Comment:
   nit: No need for this



##########
pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaStreamMetadataProvider.java:
##########
@@ -343,7 +407,26 @@ private List<PartitionInfo> fetchPartitionInfos(long 
timeoutMillis) {
     }
 
     throw new TransientConsumerException(
-        new RuntimeException(String.format("Failed to fetch partition 
information for topic: %s", _topic)));
+        new RuntimeException("Failed to fetch partition information for topic: 
" + _topic));
+  }
+
+  private void validatePartitionIds(List<Integer> subset) {
+    List<Integer> topicPartitionIds = new ArrayList<>();
+    List<PartitionInfo> partitionInfos = fetchPartitionInfos(10_000L);
+    for (PartitionInfo partitionInfo : partitionInfos) {
+      topicPartitionIds.add(partitionInfo.partition());
+    }
+    Collections.sort(topicPartitionIds);
+    List<Integer> missingPartitionIds = new ArrayList<>();
+    for (Integer partitionId : subset) {
+      if (!topicPartitionIds.contains(partitionId)) {

Review Comment:
   nit: `contains` on list, let use set.



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java:
##########
@@ -289,16 +322,18 @@ private void 
replicaGroupBasedMinimumMovement(Map<Integer, List<InstanceConfig>>
     int maxReplicaGroupsPerPool = (numReplicaGroups + numPools - 1) / numPools;
     int startIndex = Math.abs(tableNameHash % numPools);
 
-    int existingNumReplicaGroups = 
_existingInstancePartitions.getNumReplicaGroups();
-    int existingNumPartitions = _existingInstancePartitions.getNumPartitions();
+    int existingNumReplicaGroups =
+        _existingInstancePartitions != null ? 
_existingInstancePartitions.getNumReplicaGroups() : 0;
+    int existingNumPartitions =
+        _existingInstancePartitions != null ? 
_existingInstancePartitions.getNumPartitions() : 0;
     for (int replicaGroupId = 0; replicaGroupId < numReplicaGroups; 
replicaGroupId++) {
       // For each replica-group, gather number of existing instances within 
each pool
       Set<String> existingInstanceSet = new HashSet<>();
       replicaGroupIdToExistingInstancesMap.add(existingInstanceSet);
       Map<Integer, Integer> poolToNumExistingInstancesMap = new TreeMap<>();
       if (replicaGroupId < existingNumReplicaGroups) {
         for (int partitionId = 0; partitionId < existingNumPartitions; 
partitionId++) {

Review Comment:
   existingNumPartitions seems to come from `Integer.max(_numPartitions, 
partitionId + 1);`. This is correct?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/ImplicitRealtimeTablePartitionSelector.java:
##########
@@ -19,30 +19,35 @@
 package org.apache.pinot.controller.helix.core.assignment.instance;
 
 import com.google.common.annotations.VisibleForTesting;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
 import javax.annotation.Nullable;
 import org.apache.pinot.common.assignment.InstancePartitions;
 import org.apache.pinot.spi.config.table.TableConfig;
 import 
org.apache.pinot.spi.config.table.assignment.InstanceReplicaGroupPartitionConfig;
 import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
 import org.apache.pinot.spi.stream.StreamMetadataProvider;
 import org.apache.pinot.spi.utils.IngestionConfigUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
- * Variation of {@link InstanceReplicaGroupPartitionSelector} that uses the 
number of partitions from the stream
- * to determine the number of partitions in each replica group.
+ * Variation of {@link InstanceReplicaGroupPartitionSelector} that uses the 
number and IDs of partitions
+ * from the stream to determine the partitions in each replica group. When the 
stream exposes partition IDs
+ * (e.g. Kafka with subset), instance partitions are keyed by those IDs so 
non-contiguous subsets work.
  */
 public class ImplicitRealtimeTablePartitionSelector extends 
InstanceReplicaGroupPartitionSelector {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ImplicitRealtimeTablePartitionSelector.class);

Review Comment:
   seems unused



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to