Repository: samza Updated Branches: refs/heads/master 279a2952d -> b815e6d91
SAMZA-1275: Kafka throws when users configure replication.factor for ⦠â¦Kafka default stream Author: Jacob Maes <[email protected]> Reviewers: Prateek Maheshwari <[email protected]>, Xinyu Liu <[email protected]> Closes #176 from jmakes/samza-1275 Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/b815e6d9 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/b815e6d9 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/b815e6d9 Branch: refs/heads/master Commit: b815e6d91c45171b98e8cda76bf25cf95967e285 Parents: 279a295 Author: Jacob Maes <[email protected]> Authored: Tue May 9 15:58:14 2017 -0700 Committer: Jacob Maes <[email protected]> Committed: Tue May 9 15:58:14 2017 -0700 ---------------------------------------------------------------------- .../samza/system/kafka/KafkaStreamSpec.java | 33 ++++++++++- .../samza/system/kafka/TestKafkaStreamSpec.java | 60 ++++++++++++++++++++ 2 files changed, 91 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/b815e6d9/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaStreamSpec.java ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaStreamSpec.java b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaStreamSpec.java index 3255f70..8cf4923 100644 --- a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaStreamSpec.java +++ b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaStreamSpec.java @@ -22,14 +22,19 @@ package org.apache.samza.system.kafka; import java.util.HashMap; import java.util.Map; import java.util.Properties; +import kafka.log.LogConfig; import org.apache.samza.config.KafkaConfig; import org.apache.samza.system.StreamSpec; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Extends StreamSpec with the ability to easily get the topic replication factor. */ public class KafkaStreamSpec extends StreamSpec { + private static Logger LOG = LoggerFactory.getLogger(KafkaStreamSpec.class); + private static final int DEFAULT_REPLICATION_FACTOR = 2; /** @@ -62,6 +67,30 @@ public class KafkaStreamSpec extends StreamSpec { } /** + * Filter out properties from the original config that are not supported by Kafka. + * For example, we allow users to set replication.factor as a property of the streams + * and then parse it out so we can pass it separately as Kafka requires. But Kafka + * will also throw if replication.factor is passed as a property on a new topic. + * + * @param originalConfig The original config to filter + * @return The filtered config + */ + private static Map<String, String> filterUnsupportedProperties(Map<String, String> originalConfig) { + Map<String, String> filteredConfig = new HashMap<>(); + for (Map.Entry<String, String> entry: originalConfig.entrySet()) { + // Kafka requires replication factor, but not as a property, so we have to filter it out. + if (!KafkaConfig.TOPIC_REPLICATION_FACTOR().equals(entry.getKey())) { + if (LogConfig.configNames().contains(entry.getKey())) { + filteredConfig.put(entry.getKey(), entry.getValue()); + } else { + LOG.warn("Property '{}' is not a valid Kafka topic config. It will be ignored."); + } + } + } + return filteredConfig; + } + + /** * Converts any StreamSpec to a KafkaStreamSpec. * If the original spec already is a KafkaStreamSpec, it is simply returned. * @@ -81,7 +110,7 @@ public class KafkaStreamSpec extends StreamSpec { originalSpec.getSystemName(), originalSpec.getPartitionCount(), replicationFactor, - mapToProperties(originalSpec.getConfig())); + mapToProperties(filterUnsupportedProperties(originalSpec.getConfig()))); } /** @@ -109,7 +138,7 @@ public class KafkaStreamSpec extends StreamSpec { * @param systemName The System name on which this stream will exist. Corresponds to a named implementation of the * Samza System abstraction. See {@link org.apache.samza.system.SystemFactory} * - * @param partitionCount The number of partitionts for the stream. A value of {@code 1} indicates unpartitioned. + * @param partitionCount The number of partitions for the stream. A value of {@code 1} indicates unpartitioned. * * @param replicationFactor The number of topic replicas in the Kafka cluster for durability. * http://git-wip-us.apache.org/repos/asf/samza/blob/b815e6d9/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaStreamSpec.java ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaStreamSpec.java b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaStreamSpec.java new file mode 100644 index 0000000..69345a3 --- /dev/null +++ b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaStreamSpec.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.system.kafka; + +import com.google.common.collect.ImmutableMap; +import java.util.Map; +import java.util.Properties; +import org.apache.samza.system.StreamSpec; +import org.junit.Test; + +import static org.junit.Assert.*; + +/** + * See also the general StreamSpec tests in {@link org.apache.samza.runtime.TestAbstractApplicationRunner} + */ +public class TestKafkaStreamSpec { + + @Test + public void testUnsupportedConfigStrippedFromProperties() { + StreamSpec original = new StreamSpec("dummyId","dummyPhysicalName", "dummySystemName", ImmutableMap.of("segment.bytes", "4", "replication.factor", "7")); + + // First verify the original + assertEquals("7", original.get("replication.factor")); + assertEquals("4", original.get("segment.bytes")); + + Map<String, String> config = original.getConfig(); + assertEquals("7", config.get("replication.factor")); + assertEquals("4", config.get("segment.bytes")); + + + // Now verify the Kafka spec + KafkaStreamSpec spec = KafkaStreamSpec.fromSpec(original); + assertNull(spec.get("replication.factor")); + assertEquals("4", spec.get("segment.bytes")); + + Properties kafkaProperties = spec.getProperties(); + Map<String, String> kafkaConfig = spec.getConfig(); + assertNull(kafkaProperties.get("replication.factor")); + assertEquals("4", kafkaProperties.get("segment.bytes")); + + assertNull(kafkaConfig.get("replication.factor")); + assertEquals("4", kafkaConfig.get("segment.bytes")); + } +}
