[ 
https://issues.apache.org/jira/browse/KAFKA-3705?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15352103#comment-15352103
 ] 

Jan Filipiak commented on KAFKA-3705:
-------------------------------------

I will just shoot a quick reply now, time somehow became sparse recently. 
Anyhow. The bottom line of our misunderstandings is always the same thing. My 
bad that I didn't see the wiki page, if that Range-query interface is addressed 
that's nice :D.

Point 3 is the one that causes the most confusion I guess. In the repartition 
case we follow different pathes, where I am not sure that I was able to 
communicate mine well enough. I <3 the idea of having everything a derived 
store. ITE all this is beeing used to tail -F mysql-XXXX.bin | kafka | XXX | 
redis, therefore Redis become a derived store of mysql wich can be used for 
NoSql style reads. I infact am such a great fan of this concept that I tend to 
treat everything a derived store. For me this means a repartitioned topic is a 
derived store of the source topic. This stands in contrast to make a changelog 
out of it and materialize the changelog in say RocksDb. This leads to the 
"problem" that the changelog topic is not a derived store anymore. Wich gives 
me a personally bad feeling, it just pushes me out of my comfort zone. 
Confluent peeps seem to be in their comfort zone with change logging topics. In 
my narrative shit hits the fan when the property of beeing a derived store is 
lost. It leads to all the nasty things like beeing in the need of change 
logging your say RocksDbs as the intermidate topic wont hold stuff forever. 

In contrast to having a change-logging topic that I re-materialize and then 
changecapture again, I prefer todo the change capturing first and only maintain 
the state to wich downstream partitions a record is currently published. This 
works clean and nicely but brings with it what I call "key widening". Say I 
have KTable A and i want to repartition it to A' so that the topic containing 
A' is a derived store & logcompacted. Then I cant use Key<A> todo this for 2 
reasons. The Stream partition, can only access the key to determine the 
partition to delete from  (deletes come as null values), wich means the fields 
going to determine the partitions need to be in the key no matter what. Snippet:
{code:java}

                topology.addSink(name, repartitionTopicName, new 
StreamPartitioner<K, VR>(){
                        private Serializer<KL> intermediateSerializer = 
intermediateSerde.serializer();
                        @Override
                        public Integer partition(K key, VR value, int 
numPartitions) {
                                KL newKey = intermideateKeyExtractor.apply(key);
                                //Copied from Default Partitioner, didn't want 
to create a CLUSTER object here to reuse it.
                                return 
(Utils.murmur2(intermediateSerializer.serialize(repartitionTopicName, newKey)) 
% numPartitions )& 0x7fffffff;
                        }
                        
                }, repartitionProcessorName);
{code}

As you can see the result Key K contains the KL ( the key of the not 
repatitioned table).

the second reason why this key must be there is that one needs to be able to 
build a derived stream A''. But since in A' a record can "move" from partition 
X to Y there is a race condition between the "insert" in Y and the delete in X. 
The repartitioner Processor repartitioning for A'' needs to treat them as 
different keys. If it would be the same key the delete would wipe the new value 
maybe. This puts downstream consumers of A'' also in the wired position that at 
any point in time there can be as many A-keys with the same value as there are 
A' partitons -1 or a specific A key might vanish completly and then reappear. 
Wich is sometimes wanky to work around in the end application. But there is 
enough strategies to solve at least the multiple Akeys case, not so much for 
the complete fanish case. I hope this clarrifies stuff. 





> Support non-key joining in KTable
> ---------------------------------
>
>                 Key: KAFKA-3705
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3705
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>            Reporter: Guozhang Wang
>            Assignee: Liquan Pei
>              Labels: api
>             Fix For: 0.10.1.0
>
>
> Today in Kafka Streams DSL, KTable joins are only based on keys. If users 
> want to join a KTable A by key {{a}} with another KTable B by key {{b}} but 
> with a "foreign key" {{a}}, and assuming they are read from two topics which 
> are partitioned on {{a}} and {{b}} respectively, they need to do the 
> following pattern:
> {code}
> tableB' = tableB.groupBy(/* select on field "a" */).agg(...); // now tableB' 
> is partitioned on "a"
> tableA.join(tableB', joiner);
> {code}
> Even if these two tables are read from two topics which are already 
> partitioned on {{a}}, users still need to do the pre-aggregation in order to 
> make the two joining streams to be on the same key. This is a draw-back from 
> programability and we should fix it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to