noob-se7en commented on code in PR #17587:
URL: https://github.com/apache/pinot/pull/17587#discussion_r2851384629
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java:
##########
@@ -160,12 +160,15 @@ protected List<String> assignConsumingSegment(int
segmentPartitionId, InstancePa
instancesAssigned.add(instances.get(segmentPartitionId %
instances.size()));
}
} else {
- // Explicit partition:
- // Assign segment to the first instance within the partition.
-
+ // Explicit partition: instance partitions are keyed by stream
partition id (supports non-contiguous subset).
for (int replicaGroupId = 0; replicaGroupId < numReplicaGroups;
replicaGroupId++) {
- int partitionId = segmentPartitionId % numPartitions;
- instancesAssigned.add(instancePartitions.getInstances(partitionId,
replicaGroupId).get(0));
+ List<String> instances =
instancePartitions.getInstances(segmentPartitionId, replicaGroupId);
Review Comment:
We dont need to do `segmentPartitionId % numPartitions` ?
##########
pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaStreamMetadataProvider.java:
##########
@@ -343,7 +407,26 @@ private List<PartitionInfo> fetchPartitionInfos(long
timeoutMillis) {
}
throw new TransientConsumerException(
- new RuntimeException(String.format("Failed to fetch partition
information for topic: %s", _topic)));
+ new RuntimeException("Failed to fetch partition information for topic:
" + _topic));
+ }
+
+ private void validatePartitionIds(List<Integer> subset) {
+ List<Integer> topicPartitionIds = new ArrayList<>();
+ List<PartitionInfo> partitionInfos = fetchPartitionInfos(10_000L);
+ for (PartitionInfo partitionInfo : partitionInfos) {
+ topicPartitionIds.add(partitionInfo.partition());
+ }
+ Collections.sort(topicPartitionIds);
Review Comment:
nit: No need for this
##########
pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaStreamMetadataProvider.java:
##########
@@ -343,7 +407,26 @@ private List<PartitionInfo> fetchPartitionInfos(long
timeoutMillis) {
}
throw new TransientConsumerException(
- new RuntimeException(String.format("Failed to fetch partition
information for topic: %s", _topic)));
+ new RuntimeException("Failed to fetch partition information for topic:
" + _topic));
+ }
+
+ private void validatePartitionIds(List<Integer> subset) {
+ List<Integer> topicPartitionIds = new ArrayList<>();
+ List<PartitionInfo> partitionInfos = fetchPartitionInfos(10_000L);
+ for (PartitionInfo partitionInfo : partitionInfos) {
+ topicPartitionIds.add(partitionInfo.partition());
+ }
+ Collections.sort(topicPartitionIds);
+ List<Integer> missingPartitionIds = new ArrayList<>();
+ for (Integer partitionId : subset) {
+ if (!topicPartitionIds.contains(partitionId)) {
Review Comment:
nit: `contains` on list, let use set.
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java:
##########
@@ -289,16 +322,18 @@ private void
replicaGroupBasedMinimumMovement(Map<Integer, List<InstanceConfig>>
int maxReplicaGroupsPerPool = (numReplicaGroups + numPools - 1) / numPools;
int startIndex = Math.abs(tableNameHash % numPools);
- int existingNumReplicaGroups =
_existingInstancePartitions.getNumReplicaGroups();
- int existingNumPartitions = _existingInstancePartitions.getNumPartitions();
+ int existingNumReplicaGroups =
+ _existingInstancePartitions != null ?
_existingInstancePartitions.getNumReplicaGroups() : 0;
+ int existingNumPartitions =
+ _existingInstancePartitions != null ?
_existingInstancePartitions.getNumPartitions() : 0;
for (int replicaGroupId = 0; replicaGroupId < numReplicaGroups;
replicaGroupId++) {
// For each replica-group, gather number of existing instances within
each pool
Set<String> existingInstanceSet = new HashSet<>();
replicaGroupIdToExistingInstancesMap.add(existingInstanceSet);
Map<Integer, Integer> poolToNumExistingInstancesMap = new TreeMap<>();
if (replicaGroupId < existingNumReplicaGroups) {
for (int partitionId = 0; partitionId < existingNumPartitions;
partitionId++) {
Review Comment:
existingNumPartitions seems to come from `Integer.max(_numPartitions,
partitionId + 1);`. This is correct?
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/ImplicitRealtimeTablePartitionSelector.java:
##########
@@ -19,30 +19,35 @@
package org.apache.pinot.controller.helix.core.assignment.instance;
import com.google.common.annotations.VisibleForTesting;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
import javax.annotation.Nullable;
import org.apache.pinot.common.assignment.InstancePartitions;
import org.apache.pinot.spi.config.table.TableConfig;
import
org.apache.pinot.spi.config.table.assignment.InstanceReplicaGroupPartitionConfig;
import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
import org.apache.pinot.spi.stream.StreamMetadataProvider;
import org.apache.pinot.spi.utils.IngestionConfigUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
- * Variation of {@link InstanceReplicaGroupPartitionSelector} that uses the
number of partitions from the stream
- * to determine the number of partitions in each replica group.
+ * Variation of {@link InstanceReplicaGroupPartitionSelector} that uses the
number and IDs of partitions
+ * from the stream to determine the partitions in each replica group. When the
stream exposes partition IDs
+ * (e.g. Kafka with subset), instance partitions are keyed by those IDs so
non-contiguous subsets work.
*/
public class ImplicitRealtimeTablePartitionSelector extends
InstanceReplicaGroupPartitionSelector {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ImplicitRealtimeTablePartitionSelector.class);
Review Comment:
seems unused
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]