Repository: kafka
Updated Branches:
  refs/heads/trunk e79d9af3c -> 667ff7ef7


KAFKA-3504; Log compaction for changelog partition

Author: Eno Thereska <[email protected]>

Reviewers: Ismael Juma <[email protected]>, Guozhang Wang <[email protected]>

Closes #1203 from enothereska/KAFKA-3504-logcompaction


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/667ff7ef
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/667ff7ef
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/667ff7ef

Branch: refs/heads/trunk
Commit: 667ff7ef737612773c50908d2b3cc829bb5132c7
Parents: e79d9af
Author: Eno Thereska <[email protected]>
Authored: Tue Apr 12 17:38:20 2016 -0700
Committer: Guozhang Wang <[email protected]>
Committed: Tue Apr 12 17:38:20 2016 -0700

----------------------------------------------------------------------
 .../internals/InternalTopicManager.java         |  29 ++++-
 .../internals/StreamPartitionAssignor.java      | 120 ++++++++++---------
 .../internals/StreamPartitionAssignorTest.java  |   2 +-
 3 files changed, 86 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/667ff7ef/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
index 536a447..4477fb7 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
@@ -36,6 +36,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 
 public class InternalTopicManager {
 
@@ -45,6 +46,10 @@ public class InternalTopicManager {
     private static final String ZK_TOPIC_PATH = "/brokers/topics";
     private static final String ZK_BROKER_PATH = "/brokers/ids";
     private static final String ZK_DELETE_TOPIC_PATH = "/admin/delete_topics";
+    private static final String ZK_ENTITY_CONFIG_PATH = "/config/topics";
+    // TODO: the following LogConfig dependency should be removed after KIP-4
+    private static final String CLEANUP_POLICY_PROP = "cleanup.policy";
+    private static final String COMPACT = "compact";
 
     private final ZkClient zkClient;
     private final int replicationFactor;
@@ -89,7 +94,7 @@ public class InternalTopicManager {
         this.replicationFactor = replicationFactor;
     }
 
-    public void makeReady(String topic, int numPartitions) {
+    public void makeReady(String topic, int numPartitions, boolean 
compactTopic) {
         boolean topicNotReady = true;
 
         while (topicNotReady) {
@@ -97,7 +102,7 @@ public class InternalTopicManager {
 
             if (topicMetadata == null) {
                 try {
-                    createTopic(topic, numPartitions, replicationFactor);
+                    createTopic(topic, numPartitions, replicationFactor, 
compactTopic);
                 } catch (ZkNodeExistsException e) {
                     // ignore and continue
                 }
@@ -158,9 +163,10 @@ public class InternalTopicManager {
         }
     }
 
-    private void createTopic(String topic, int numPartitions, int 
replicationFactor) throws ZkNodeExistsException {
+    private void createTopic(String topic, int numPartitions, int 
replicationFactor, boolean compactTopic) throws ZkNodeExistsException {
         log.debug("Creating topic {} with {} partitions from ZK in partition 
assignor.", topic, numPartitions);
-
+        Properties prop = new Properties();
+        ObjectMapper mapper = new ObjectMapper();
         List<Integer> brokers = getBrokers();
         int numBrokers = brokers.size();
         if (numBrokers < replicationFactor) {
@@ -178,14 +184,25 @@ public class InternalTopicManager {
             }
             assignment.put(i, brokerList);
         }
+        // write out config first just like in AdminUtils.scala 
createOrUpdateTopicPartitionAssignmentPathInZK()
+        if (compactTopic) {
+            prop.put(CLEANUP_POLICY_PROP, COMPACT);
+            try {
+                Map<String, Object> dataMap = new HashMap<>();
+                dataMap.put("version", 1);
+                dataMap.put("config", prop);
+                String data = mapper.writeValueAsString(dataMap);
+                zkClient.createPersistent(ZK_ENTITY_CONFIG_PATH + "/" + topic, 
data, ZooDefs.Ids.OPEN_ACL_UNSAFE);
+            } catch (JsonProcessingException e) {
+                throw new StreamsException("Error while creating topic config 
in ZK for internal topic " + topic, e);
+            }
+        }
 
         // try to write to ZK with open ACL
         try {
             Map<String, Object> dataMap = new HashMap<>();
             dataMap.put("version", 1);
             dataMap.put("partitions", assignment);
-
-            ObjectMapper mapper = new ObjectMapper();
             String data = mapper.writeValueAsString(dataMap);
 
             zkClient.createPersistent(ZK_TOPIC_PATH + "/" + topic, data, 
ZooDefs.Ids.OPEN_ACL_UNSAFE);

http://git-wip-us.apache.org/repos/asf/kafka/blob/667ff7ef/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
index bc42c82..341e66a 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
@@ -147,6 +147,63 @@ public class StreamPartitionAssignor implements 
PartitionAssignor, Configurable
         return new Subscription(new ArrayList<>(topics), data.encode());
     }
 
+    /**
+     * Internal helper function that creates a Kafka topic
+     * @param topicToTaskIds Map that contains the topic names to be created
+     * @param compactTopic If true, the topic should be a compacted topic. 
This is used for
+     *                     change log topics usually.
+     * @param outPartitionInfo If true, compute and return all partitions 
created
+     * @param postPartitionPhase If true, the computation for calculating the 
number of partitions
+     *                           is slightly different. Set to true after the 
initial topic-to-partition
+     *                           assignment.
+     * @return
+     */
+    private Map<TopicPartition, PartitionInfo> prepareTopic(Map<String, 
Set<TaskId>> topicToTaskIds,
+                                                            boolean 
compactTopic,
+                                                            boolean 
outPartitionInfo,
+                                                            boolean 
postPartitionPhase) {
+        Map<TopicPartition, PartitionInfo> partitionInfos = new HashMap<>();
+        // if ZK is specified, prepare the internal source topic before 
calling partition grouper
+        if (internalTopicManager != null) {
+            log.debug("Starting to validate internal topics in partition 
assignor.");
+
+            for (Map.Entry<String, Set<TaskId>> entry : 
topicToTaskIds.entrySet()) {
+                String topic = entry.getKey();
+                int numPartitions = 0;
+                if (postPartitionPhase) {
+                    // the expected number of partitions is the max value of 
TaskId.partition + 1
+                    for (TaskId task : entry.getValue()) {
+                        if (numPartitions < task.partition + 1)
+                            numPartitions = task.partition + 1;
+                    }
+                } else {
+                    // should have size 1 only
+                    numPartitions = -1;
+                    for (TaskId task : entry.getValue()) {
+                        numPartitions = task.partition;
+                    }
+                }
+
+                internalTopicManager.makeReady(topic, numPartitions, 
compactTopic);
+
+                // wait until the topic metadata has been propagated to all 
brokers
+                List<PartitionInfo> partitions;
+                do {
+                    partitions = 
streamThread.restoreConsumer.partitionsFor(topic);
+                } while (partitions == null || partitions.size() != 
numPartitions);
+
+                if (outPartitionInfo) {
+                    for (PartitionInfo partition : partitions)
+                        partitionInfos.put(new 
TopicPartition(partition.topic(), partition.partition()), partition);
+                }
+            }
+
+            log.info("Completed validating internal topics in partition 
assignor.");
+        }
+
+        return partitionInfos;
+    }
+
     @Override
     public Map<String, Assignment> assign(Cluster metadata, Map<String, 
Subscription> subscriptions) {
         // This assigns tasks to consumer clients in two steps.
@@ -227,35 +284,7 @@ public class StreamPartitionAssignor implements 
PartitionAssignor, Configurable
             }
         }
 
-        Map<TopicPartition, PartitionInfo> internalPartitionInfos = new 
HashMap<>();
-
-        // if ZK is specified, prepare the internal source topic before 
calling partition grouper
-        if (internalTopicManager != null) {
-            log.debug("Starting to validate internal source topics in 
partition assignor.");
-
-            for (Map.Entry<String, Set<TaskId>> entry : 
internalSourceTopicToTaskIds.entrySet()) {
-                String topic = entry.getKey();
-
-                // should have size 1 only
-                int numPartitions = -1;
-                for (TaskId task : entry.getValue()) {
-                    numPartitions = task.partition;
-                }
-
-                internalTopicManager.makeReady(topic, numPartitions);
-
-                // wait until the topic metadata has been propagated to all 
brokers
-                List<PartitionInfo> partitions;
-                do {
-                    partitions = 
streamThread.restoreConsumer.partitionsFor(topic);
-                } while (partitions == null || partitions.size() != 
numPartitions);
-
-                for (PartitionInfo partition : partitions)
-                    internalPartitionInfos.put(new 
TopicPartition(partition.topic(), partition.partition()), partition);
-            }
-
-            log.info("Completed validating internal source topics in partition 
assignor.");
-        }
+        Map<TopicPartition, PartitionInfo> internalPartitionInfos = 
prepareTopic(internalSourceTopicToTaskIds, false, true, false);
         internalSourceTopicToTaskIds.clear();
 
         Cluster metadataWithInternalTopics = metadata;
@@ -350,35 +379,10 @@ public class StreamPartitionAssignor implements 
PartitionAssignor, Configurable
             }
         }
 
-        // if ZK is specified, validate the internal source topics and the 
state changelog topics
-        if (internalTopicManager != null) {
-            log.debug("Starting to validate changelog topics in partition 
assignor.");
-
-            Map<String, Set<TaskId>> topicToTaskIds = new HashMap<>();
-            topicToTaskIds.putAll(stateChangelogTopicToTaskIds);
-            topicToTaskIds.putAll(internalSourceTopicToTaskIds);
-
-            for (Map.Entry<String, Set<TaskId>> entry : 
topicToTaskIds.entrySet()) {
-                String topic = entry.getKey();
-
-                // the expected number of partitions is the max value of 
TaskId.partition + 1
-                int numPartitions = 0;
-                for (TaskId task : entry.getValue()) {
-                    if (numPartitions < task.partition + 1)
-                        numPartitions = task.partition + 1;
-                }
-
-                internalTopicManager.makeReady(topic, numPartitions);
-
-                // wait until the topic metadata has been propagated to all 
brokers
-                List<PartitionInfo> partitions;
-                do {
-                    partitions = 
streamThread.restoreConsumer.partitionsFor(topic);
-                } while (partitions == null || partitions.size() != 
numPartitions);
-            }
-
-            log.info("Completed validating changelog topics in partition 
assignor.");
-        }
+        // if ZK is specified, validate the internal topics again
+        prepareTopic(internalSourceTopicToTaskIds, false /* compactTopic */, 
false, true);
+        // change log topics should be compacted
+        prepareTopic(stateChangelogTopicToTaskIds, true /* compactTopic */, 
false, true);
 
         return assignment;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/667ff7ef/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
index be851bf..3e8b110 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
@@ -556,7 +556,7 @@ public class StreamPartitionAssignorTest {
         }
 
         @Override
-        public void makeReady(String topic, int numPartitions) {
+        public void makeReady(String topic, int numPartitions, boolean 
compactTopic) {
             readyTopics.put(topic, numPartitions);
 
             List<PartitionInfo> partitions = new ArrayList<>();

Reply via email to