Repository: kafka Updated Branches: refs/heads/trunk e79d9af3c -> 667ff7ef7
KAFKA-3504; Log compaction for changelog partition Author: Eno Thereska <[email protected]> Reviewers: Ismael Juma <[email protected]>, Guozhang Wang <[email protected]> Closes #1203 from enothereska/KAFKA-3504-logcompaction Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/667ff7ef Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/667ff7ef Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/667ff7ef Branch: refs/heads/trunk Commit: 667ff7ef737612773c50908d2b3cc829bb5132c7 Parents: e79d9af Author: Eno Thereska <[email protected]> Authored: Tue Apr 12 17:38:20 2016 -0700 Committer: Guozhang Wang <[email protected]> Committed: Tue Apr 12 17:38:20 2016 -0700 ---------------------------------------------------------------------- .../internals/InternalTopicManager.java | 29 ++++- .../internals/StreamPartitionAssignor.java | 120 ++++++++++--------- .../internals/StreamPartitionAssignorTest.java | 2 +- 3 files changed, 86 insertions(+), 65 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/667ff7ef/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java index 536a447..4477fb7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java @@ -36,6 +36,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Properties; public class InternalTopicManager { @@ -45,6 +46,10 @@ public class InternalTopicManager { private static final String ZK_TOPIC_PATH = "/brokers/topics"; private static final String ZK_BROKER_PATH = "/brokers/ids"; private static final String ZK_DELETE_TOPIC_PATH = "/admin/delete_topics"; + private static final String ZK_ENTITY_CONFIG_PATH = "/config/topics"; + // TODO: the following LogConfig dependency should be removed after KIP-4 + private static final String CLEANUP_POLICY_PROP = "cleanup.policy"; + private static final String COMPACT = "compact"; private final ZkClient zkClient; private final int replicationFactor; @@ -89,7 +94,7 @@ public class InternalTopicManager { this.replicationFactor = replicationFactor; } - public void makeReady(String topic, int numPartitions) { + public void makeReady(String topic, int numPartitions, boolean compactTopic) { boolean topicNotReady = true; while (topicNotReady) { @@ -97,7 +102,7 @@ public class InternalTopicManager { if (topicMetadata == null) { try { - createTopic(topic, numPartitions, replicationFactor); + createTopic(topic, numPartitions, replicationFactor, compactTopic); } catch (ZkNodeExistsException e) { // ignore and continue } @@ -158,9 +163,10 @@ public class InternalTopicManager { } } - private void createTopic(String topic, int numPartitions, int replicationFactor) throws ZkNodeExistsException { + private void createTopic(String topic, int numPartitions, int replicationFactor, boolean compactTopic) throws ZkNodeExistsException { log.debug("Creating topic {} with {} partitions from ZK in partition assignor.", topic, numPartitions); - + Properties prop = new Properties(); + ObjectMapper mapper = new ObjectMapper(); List<Integer> brokers = getBrokers(); int numBrokers = brokers.size(); if (numBrokers < replicationFactor) { @@ -178,14 +184,25 @@ public class InternalTopicManager { } assignment.put(i, brokerList); } + // write out config first just like in AdminUtils.scala createOrUpdateTopicPartitionAssignmentPathInZK() + if (compactTopic) { + prop.put(CLEANUP_POLICY_PROP, COMPACT); + try { + Map<String, Object> dataMap = new HashMap<>(); + dataMap.put("version", 1); + dataMap.put("config", prop); + String data = mapper.writeValueAsString(dataMap); + zkClient.createPersistent(ZK_ENTITY_CONFIG_PATH + "/" + topic, data, ZooDefs.Ids.OPEN_ACL_UNSAFE); + } catch (JsonProcessingException e) { + throw new StreamsException("Error while creating topic config in ZK for internal topic " + topic, e); + } + } // try to write to ZK with open ACL try { Map<String, Object> dataMap = new HashMap<>(); dataMap.put("version", 1); dataMap.put("partitions", assignment); - - ObjectMapper mapper = new ObjectMapper(); String data = mapper.writeValueAsString(dataMap); zkClient.createPersistent(ZK_TOPIC_PATH + "/" + topic, data, ZooDefs.Ids.OPEN_ACL_UNSAFE); http://git-wip-us.apache.org/repos/asf/kafka/blob/667ff7ef/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java index bc42c82..341e66a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java @@ -147,6 +147,63 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable return new Subscription(new ArrayList<>(topics), data.encode()); } + /** + * Internal helper function that creates a Kafka topic + * @param topicToTaskIds Map that contains the topic names to be created + * @param compactTopic If true, the topic should be a compacted topic. This is used for + * change log topics usually. + * @param outPartitionInfo If true, compute and return all partitions created + * @param postPartitionPhase If true, the computation for calculating the number of partitions + * is slightly different. Set to true after the initial topic-to-partition + * assignment. + * @return + */ + private Map<TopicPartition, PartitionInfo> prepareTopic(Map<String, Set<TaskId>> topicToTaskIds, + boolean compactTopic, + boolean outPartitionInfo, + boolean postPartitionPhase) { + Map<TopicPartition, PartitionInfo> partitionInfos = new HashMap<>(); + // if ZK is specified, prepare the internal source topic before calling partition grouper + if (internalTopicManager != null) { + log.debug("Starting to validate internal topics in partition assignor."); + + for (Map.Entry<String, Set<TaskId>> entry : topicToTaskIds.entrySet()) { + String topic = entry.getKey(); + int numPartitions = 0; + if (postPartitionPhase) { + // the expected number of partitions is the max value of TaskId.partition + 1 + for (TaskId task : entry.getValue()) { + if (numPartitions < task.partition + 1) + numPartitions = task.partition + 1; + } + } else { + // should have size 1 only + numPartitions = -1; + for (TaskId task : entry.getValue()) { + numPartitions = task.partition; + } + } + + internalTopicManager.makeReady(topic, numPartitions, compactTopic); + + // wait until the topic metadata has been propagated to all brokers + List<PartitionInfo> partitions; + do { + partitions = streamThread.restoreConsumer.partitionsFor(topic); + } while (partitions == null || partitions.size() != numPartitions); + + if (outPartitionInfo) { + for (PartitionInfo partition : partitions) + partitionInfos.put(new TopicPartition(partition.topic(), partition.partition()), partition); + } + } + + log.info("Completed validating internal topics in partition assignor."); + } + + return partitionInfos; + } + @Override public Map<String, Assignment> assign(Cluster metadata, Map<String, Subscription> subscriptions) { // This assigns tasks to consumer clients in two steps. @@ -227,35 +284,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable } } - Map<TopicPartition, PartitionInfo> internalPartitionInfos = new HashMap<>(); - - // if ZK is specified, prepare the internal source topic before calling partition grouper - if (internalTopicManager != null) { - log.debug("Starting to validate internal source topics in partition assignor."); - - for (Map.Entry<String, Set<TaskId>> entry : internalSourceTopicToTaskIds.entrySet()) { - String topic = entry.getKey(); - - // should have size 1 only - int numPartitions = -1; - for (TaskId task : entry.getValue()) { - numPartitions = task.partition; - } - - internalTopicManager.makeReady(topic, numPartitions); - - // wait until the topic metadata has been propagated to all brokers - List<PartitionInfo> partitions; - do { - partitions = streamThread.restoreConsumer.partitionsFor(topic); - } while (partitions == null || partitions.size() != numPartitions); - - for (PartitionInfo partition : partitions) - internalPartitionInfos.put(new TopicPartition(partition.topic(), partition.partition()), partition); - } - - log.info("Completed validating internal source topics in partition assignor."); - } + Map<TopicPartition, PartitionInfo> internalPartitionInfos = prepareTopic(internalSourceTopicToTaskIds, false, true, false); internalSourceTopicToTaskIds.clear(); Cluster metadataWithInternalTopics = metadata; @@ -350,35 +379,10 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable } } - // if ZK is specified, validate the internal source topics and the state changelog topics - if (internalTopicManager != null) { - log.debug("Starting to validate changelog topics in partition assignor."); - - Map<String, Set<TaskId>> topicToTaskIds = new HashMap<>(); - topicToTaskIds.putAll(stateChangelogTopicToTaskIds); - topicToTaskIds.putAll(internalSourceTopicToTaskIds); - - for (Map.Entry<String, Set<TaskId>> entry : topicToTaskIds.entrySet()) { - String topic = entry.getKey(); - - // the expected number of partitions is the max value of TaskId.partition + 1 - int numPartitions = 0; - for (TaskId task : entry.getValue()) { - if (numPartitions < task.partition + 1) - numPartitions = task.partition + 1; - } - - internalTopicManager.makeReady(topic, numPartitions); - - // wait until the topic metadata has been propagated to all brokers - List<PartitionInfo> partitions; - do { - partitions = streamThread.restoreConsumer.partitionsFor(topic); - } while (partitions == null || partitions.size() != numPartitions); - } - - log.info("Completed validating changelog topics in partition assignor."); - } + // if ZK is specified, validate the internal topics again + prepareTopic(internalSourceTopicToTaskIds, false /* compactTopic */, false, true); + // change log topics should be compacted + prepareTopic(stateChangelogTopicToTaskIds, true /* compactTopic */, false, true); return assignment; } http://git-wip-us.apache.org/repos/asf/kafka/blob/667ff7ef/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java index be851bf..3e8b110 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java @@ -556,7 +556,7 @@ public class StreamPartitionAssignorTest { } @Override - public void makeReady(String topic, int numPartitions) { + public void makeReady(String topic, int numPartitions, boolean compactTopic) { readyTopics.put(topic, numPartitions); List<PartitionInfo> partitions = new ArrayList<>();
