This is an automated email from the ASF dual-hosted git repository. lhaiesp pushed a commit to branch 1.3.1 in repository https://gitbox.apache.org/repos/asf/samza.git
commit 3c2863c1e2074f3132feaa6c315ec109726198d0 Author: Shanthoosh Venkataraman <[email protected]> AuthorDate: Tue Jan 14 16:20:29 2020 -0800 Fix the coordinator stream creation workflow. --- .../samza/system/kafka/KafkaSystemAdmin.java | 10 +++--- .../system/kafka/TestKafkaSystemAdminJava.java | 39 ++++++++++++++++++++++ 2 files changed, 44 insertions(+), 5 deletions(-) diff --git a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java index 97229db..e5d6af1 100644 --- a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java +++ b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java @@ -464,19 +464,19 @@ public class KafkaSystemAdmin implements SystemAdmin { LOG.info("Creating Kafka topic: {} on system: {}", streamSpec.getPhysicalName(), streamSpec.getSystemName()); final String REPL_FACTOR = "replication.factor"; - KafkaStreamSpec kSpec = toKafkaSpec(streamSpec); - String topicName = kSpec.getPhysicalName(); + KafkaStreamSpec kafkaStreamSpec = toKafkaSpec(streamSpec); + String topicName = kafkaStreamSpec.getPhysicalName(); // create topic. - NewTopic newTopic = new NewTopic(topicName, kSpec.getPartitionCount(), (short) kSpec.getReplicationFactor()); + NewTopic newTopic = new NewTopic(topicName, kafkaStreamSpec.getPartitionCount(), (short) kafkaStreamSpec.getReplicationFactor()); // specify the configs - Map<String, String> streamConfig = new HashMap<>(streamSpec.getConfig()); + Map<String, String> streamConfig = new HashMap<>(kafkaStreamSpec.getConfig()); // HACK - replication.factor is invalid config for AdminClient.createTopics if (streamConfig.containsKey(REPL_FACTOR)) { String repl = streamConfig.get(REPL_FACTOR); LOG.warn("Configuration {}={} for topic={} is invalid. Using kSpec repl factor {}", - REPL_FACTOR, repl, kSpec.getPhysicalName(), kSpec.getReplicationFactor()); + REPL_FACTOR, repl, kafkaStreamSpec.getPhysicalName(), kafkaStreamSpec.getReplicationFactor()); streamConfig.remove(REPL_FACTOR); } newTopic.configs(new MapConfig(streamConfig)); diff --git a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java index 1431600..7ca03f3 100644 --- a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java +++ b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java @@ -19,15 +19,22 @@ package org.apache.samza.system.kafka; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import java.util.Collection; import java.util.HashMap; +import java.util.List; import java.util.Map; +import org.apache.commons.lang3.RandomStringUtils; +import org.apache.kafka.clients.admin.DescribeConfigsResult; import org.apache.kafka.clients.admin.DescribeTopicsResult; import org.apache.kafka.clients.admin.TopicDescription; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndTimestamp; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.config.TopicConfig; import org.apache.samza.Partition; import org.apache.samza.config.ApplicationConfig; import org.apache.samza.config.Config; @@ -62,6 +69,38 @@ public class TestKafkaSystemAdminJava extends TestKafkaSystemAdmin { private static final String TEST_OFFSET = "10"; @Test + public void testCreateStreamShouldCoordinatorStreamWithCorrectTopicProperties() throws Exception { + String coordinatorTopicName = String.format("topic-name-%s", RandomStringUtils.randomAlphabetic(5)); + StreamSpec coordinatorStreamSpec = KafkaStreamSpec.createCoordinatorStreamSpec(coordinatorTopicName, SYSTEM()); + + boolean hasCreatedStream = systemAdmin().createStream(coordinatorStreamSpec); + + assertTrue(hasCreatedStream); + + Map<String, String> coordinatorTopicProperties = getTopicConfigFromKafkaBroker(coordinatorTopicName); + + assertEquals("compact", coordinatorTopicProperties.get(TopicConfig.CLEANUP_POLICY_CONFIG)); + assertEquals("26214400", coordinatorTopicProperties.get(TopicConfig.SEGMENT_BYTES_CONFIG)); + assertEquals("86400000", coordinatorTopicProperties.get(TopicConfig.DELETE_RETENTION_MS_CONFIG)); + } + + private static Map<String, String> getTopicConfigFromKafkaBroker(String topicName) throws Exception { + List<ConfigResource> configResourceList = ImmutableList.of( + new ConfigResource(ConfigResource.Type.TOPIC, topicName)); + Map<ConfigResource, org.apache.kafka.clients.admin.Config> configResourceConfigMap = + adminClient().describeConfigs(configResourceList).all().get(); + Map<String, String> kafkaTopicConfig = new HashMap<>(); + + configResourceConfigMap.values().forEach(configEntry -> { + configEntry.entries().forEach(config -> { + kafkaTopicConfig.put(config.name(), config.value()); + }); + }); + + return kafkaTopicConfig; + } + + @Test public void testGetOffsetsAfter() { SystemStreamPartition ssp1 = new SystemStreamPartition(SYSTEM, TOPIC, new Partition(0)); SystemStreamPartition ssp2 = new SystemStreamPartition(SYSTEM, TOPIC, new Partition(1));
