[ https://issues.apache.org/jira/browse/KAFKA-631?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13561871#comment-13561871 ]
Sriram Subramanian commented on KAFKA-631: ------------------------------------------ Good stuff. My feedbacks below - 1. Throttling The current scheme of throttling will work only if there is one physical disk that kafka uses which I guess is not going to be the case. For multiple disks, the single throttler is not going to prevent some disks from getting saturated. A more accurate but complex solution is to do the following - - Query the number of physical disks on the machine on startup. Divide the total bytes / sec allowed for the cleaner by this number (This is the value per disk). - Create a throttler per disk. - Have a background thread that refreshes the log directory mapping to the physical disk (this is in cases when the log directory gets moved to a different disk) - Use the appropriate throttler based on the mapping above This would be one of the ways you can control any single disk from getting saturated. 2. Grouping segments by size The way the current grouping of segments is done based on size I think will not solve the problem of preventing very small segments. We decide the grouping even before deduplicating. I would assume somebody would choose dedup based GC only if they have lots of updates instead of new records. In such a scenario, all the old segments will eventually tend to 0 after dedup. If you were to calculate the total number of segments based on a max size before dedup, you could end up having very few records in that new segment. A more deterministic way to ensure each segment has a min size is to check the size as you append to the segment. If it has crossed the maxsize and is at the end of a segment boundary, do the swap with the segments read. 3. Determination of end offset The code currently decides the end offset based on the map capacity. Consider the example you had quoted in the wiki about user updates. Very few active users would generate new events regularly and the majority would have unique records in a given time period. If many of the records within the dirty set gets duplicated you would not be making optimum use of your memory (map would end up being partially filled). I dont have a good answer for this but something to note.One option would be to keep reading the dirty set till you hit the map capacity or the beginning of the active segment. 4. Script to find actual deduplication As part of your tests do you plan to measure the expected dedup level Vs the actual gain? As long as the gain is close to the expected value it is fine but we do not want it to be way off. 5. Integration tests Should the integration tests use more than one cleaner thread to catch any corner cases? I could have missed it but I did not find any test that does a sanity check of multiple cleaner threads functioning correctly. 6. Salt generation Should the salt be randomly generated instead of being monotonically increasing. Empirically I have found it to perform better in terms of providing more uniform distribution given a key namespace. > Implement log compaction > ------------------------ > > Key: KAFKA-631 > URL: https://issues.apache.org/jira/browse/KAFKA-631 > Project: Kafka > Issue Type: New Feature > Components: core > Affects Versions: 0.8.1 > Reporter: Jay Kreps > Assignee: Jay Kreps > Attachments: KAFKA-631-v1.patch, KAFKA-631-v2.patch, > KAFKA-631-v3.patch, KAFKA-631-v4.patch, KAFKA-631-v5.patch > > > Currently Kafka has only one way to bound the space of the log, namely by > deleting old segments. The policy that controls which segments are deleted > can be configured based either on the number of bytes to retain or the age of > the messages. This makes sense for event or log data which has no notion of > primary key. However lots of data has a primary key and consists of updates > by primary key. For this data it would be nice to be able to ensure that the > log contained at least the last version of every key. > As an example, say that the Kafka topic contains a sequence of User Account > messages, each capturing the current state of a given user account. Rather > than simply discarding old segments, since the set of user accounts is > finite, it might make more sense to delete individual records that have been > made obsolete by a more recent update for the same key. This would ensure > that the topic contained at least the current state of each record. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira