vamossagar12 commented on code in PR #12803:
URL: https://github.com/apache/kafka/pull/12803#discussion_r1032731595


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java:
##########
@@ -459,12 +461,23 @@ private List<StreamsMetadata> 
rebuildMetadataForSingleTopology(final Map<HostInf
         return rebuiltMetadata;
     }
 
+    private final Function<Optional<Set<Integer>>, Integer> getPartition = 
maybeMulticastPartitions -> {
+        if (!maybeMulticastPartitions.isPresent()) {
+            return null;
+        }
+        if (maybeMulticastPartitions.get().size() != 1) {
+            throw new IllegalArgumentException("The partitions returned by 
StreamPartitioner#partitions method when used for fetching KeyQueryMetadata for 
key should be a singleton set");

Review Comment:
   @ableegoldman , I took a look at this again today and here's what I think:
   
   1) For IQv2, it definitely makes sense to expand the partition to 
partitions. Given that it is mostly to query data, users can pass in multiple 
partitions and would definitely want to get them back. There's one nit that I 
wanted to do point out here, which somehow is also related to multiple 
partitions. `StreamPartitioner#partition` method states that if it returns a 
null, then default partition should be used. However, that doesn't somehow get 
handled in the 2 `StreamsMetadatState#getKeyQueryMetadataForKey` methods. Wrote 
a simple test case to simulate this and it threw an NPE as expected:
   
   ```
       @Test
       public void 
shouldGetInstanceWithKeyAndCustomPartitionerUsingDefaultPartitioner() {
           final TopicPartition tp4 = new TopicPartition("topic-three", 1);
           hostToActivePartitions.put(hostTwo, mkSet(topic2P0, tp4));
          final StreamPartitioner<String, Object> partitioner1 = (topic, key, 
value, numPartitions) -> null;
   
           metadataState.onChange(hostToActivePartitions, 
hostToStandbyPartitions,
                   cluster.withPartitions(Collections.singletonMap(tp4, new 
PartitionInfo("topic-three", 1, null, null, null))));
   
           final KeyQueryMetadata expected = new KeyQueryMetadata(hostTwo, 
Collections.emptySet(), 1);
   
           final KeyQueryMetadata actual = 
metadataState.getKeyQueryMetadataForKey("table-three",
                   "the-key",
                   partitioner1);
           assertEquals(expected, actual);
           assertEquals(1, actual.partition());
       }
   ```
   
   which throws this:
   
   ```
   java.lang.NullPointerException
           at 
org.apache.kafka.streams.processor.internals.StreamsMetadataState.getKeyQueryMetadataForKey(StreamsMetadataState.java:486)
           at 
org.apache.kafka.streams.processor.internals.StreamsMetadataState.getKeyQueryMetadataForKey(StreamsMetadataState.java:276)
           at 
org.apache.kafka.streams.processor.internals.StreamsMetadataStateTest.shouldGetInstanceWithKeyAndCustomPartitionerUsingDefaultPartitioner(StreamsMetadataS
   ```
   
   Is there an implicit assumption that `partition` method won't return null? 
If not what should be the behaviour in this case? Based on that I would expand 
it to the `partitions` method as well.
   
   2) I went through the FK join usecase again and I think we should not 
restrict it to single partition here again. Give that the FK would be used for 
partitioner for co-partitioning purposes, it's very well possible that the FK 
might be across multiple partitions and the composite key should also be 
produced across those partitions for effective joining. 
   
   
   I would update the KIP once we hear back from John/Adam. Thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to