This is an automated email from the ASF dual-hosted git repository.
boryas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new b3e1a4e5 Fix the coordinator stream creation workflow.
new 2827c96 Merge pull request #1252 from
shanthoosh/fix-coordinator-streeam-ttl
b3e1a4e5 is described below
commit b3e1a4e50883ebd7fb895195519880893baecfb0
Author: Shanthoosh Venkataraman <[email protected]>
AuthorDate: Tue Jan 14 16:20:29 2020 -0800
Fix the coordinator stream creation workflow.
---
.../samza/system/kafka/KafkaSystemAdmin.java | 10 +++---
.../system/kafka/TestKafkaSystemAdminJava.java | 39 ++++++++++++++++++++++
2 files changed, 44 insertions(+), 5 deletions(-)
diff --git
a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java
b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java
index 97229db..e5d6af1 100644
---
a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java
+++
b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java
@@ -464,19 +464,19 @@ public class KafkaSystemAdmin implements SystemAdmin {
LOG.info("Creating Kafka topic: {} on system: {}",
streamSpec.getPhysicalName(), streamSpec.getSystemName());
final String REPL_FACTOR = "replication.factor";
- KafkaStreamSpec kSpec = toKafkaSpec(streamSpec);
- String topicName = kSpec.getPhysicalName();
+ KafkaStreamSpec kafkaStreamSpec = toKafkaSpec(streamSpec);
+ String topicName = kafkaStreamSpec.getPhysicalName();
// create topic.
- NewTopic newTopic = new NewTopic(topicName, kSpec.getPartitionCount(),
(short) kSpec.getReplicationFactor());
+ NewTopic newTopic = new NewTopic(topicName,
kafkaStreamSpec.getPartitionCount(), (short)
kafkaStreamSpec.getReplicationFactor());
// specify the configs
- Map<String, String> streamConfig = new HashMap<>(streamSpec.getConfig());
+ Map<String, String> streamConfig = new
HashMap<>(kafkaStreamSpec.getConfig());
// HACK - replication.factor is invalid config for AdminClient.createTopics
if (streamConfig.containsKey(REPL_FACTOR)) {
String repl = streamConfig.get(REPL_FACTOR);
LOG.warn("Configuration {}={} for topic={} is invalid. Using kSpec repl
factor {}",
- REPL_FACTOR, repl, kSpec.getPhysicalName(),
kSpec.getReplicationFactor());
+ REPL_FACTOR, repl, kafkaStreamSpec.getPhysicalName(),
kafkaStreamSpec.getReplicationFactor());
streamConfig.remove(REPL_FACTOR);
}
newTopic.configs(new MapConfig(streamConfig));
diff --git
a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java
b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java
index 1431600..7ca03f3 100644
---
a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java
+++
b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java
@@ -19,15 +19,22 @@
package org.apache.samza.system.kafka;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
+import java.util.Collection;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.kafka.clients.admin.DescribeConfigsResult;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.config.TopicConfig;
import org.apache.samza.Partition;
import org.apache.samza.config.ApplicationConfig;
import org.apache.samza.config.Config;
@@ -62,6 +69,38 @@ public class TestKafkaSystemAdminJava extends
TestKafkaSystemAdmin {
private static final String TEST_OFFSET = "10";
@Test
+ public void
testCreateStreamShouldCoordinatorStreamWithCorrectTopicProperties() throws
Exception {
+ String coordinatorTopicName = String.format("topic-name-%s",
RandomStringUtils.randomAlphabetic(5));
+ StreamSpec coordinatorStreamSpec =
KafkaStreamSpec.createCoordinatorStreamSpec(coordinatorTopicName, SYSTEM());
+
+ boolean hasCreatedStream =
systemAdmin().createStream(coordinatorStreamSpec);
+
+ assertTrue(hasCreatedStream);
+
+ Map<String, String> coordinatorTopicProperties =
getTopicConfigFromKafkaBroker(coordinatorTopicName);
+
+ assertEquals("compact",
coordinatorTopicProperties.get(TopicConfig.CLEANUP_POLICY_CONFIG));
+ assertEquals("26214400",
coordinatorTopicProperties.get(TopicConfig.SEGMENT_BYTES_CONFIG));
+ assertEquals("86400000",
coordinatorTopicProperties.get(TopicConfig.DELETE_RETENTION_MS_CONFIG));
+ }
+
+ private static Map<String, String> getTopicConfigFromKafkaBroker(String
topicName) throws Exception {
+ List<ConfigResource> configResourceList = ImmutableList.of(
+ new ConfigResource(ConfigResource.Type.TOPIC, topicName));
+ Map<ConfigResource, org.apache.kafka.clients.admin.Config>
configResourceConfigMap =
+ adminClient().describeConfigs(configResourceList).all().get();
+ Map<String, String> kafkaTopicConfig = new HashMap<>();
+
+ configResourceConfigMap.values().forEach(configEntry -> {
+ configEntry.entries().forEach(config -> {
+ kafkaTopicConfig.put(config.name(), config.value());
+ });
+ });
+
+ return kafkaTopicConfig;
+ }
+
+ @Test
public void testGetOffsetsAfter() {
SystemStreamPartition ssp1 = new SystemStreamPartition(SYSTEM, TOPIC, new
Partition(0));
SystemStreamPartition ssp2 = new SystemStreamPartition(SYSTEM, TOPIC, new
Partition(1));