RYA-467 responding to code review. Closes #286
Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/dc238970 Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/dc238970 Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/dc238970 Branch: refs/heads/master Commit: dc238970bc6ed0ffccd2cdcf836ce7474e93ab20 Parents: ba1e7e1 Author: Andrew Smith <[email protected]> Authored: Mon Apr 2 14:20:13 2018 -0400 Committer: Valiyil <[email protected]> Committed: Mon Apr 16 14:53:23 2018 -0400 ---------------------------------------------------------------------- .../streams/client/command/RunQueryCommand.java | 4 +- .../interactor/KafkaTopicPropertiesBuilder.java | 36 ++++++++++---- .../KafkaTopicPropertiesBuilderTest.java | 49 ++++++++++++++++++++ 3 files changed, 79 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/dc238970/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/RunQueryCommand.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/RunQueryCommand.java b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/RunQueryCommand.java index edcc252..ed513ec 100644 --- a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/RunQueryCommand.java +++ b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/RunQueryCommand.java @@ -19,7 +19,6 @@ package org.apache.rya.streams.client.command; import static java.util.Objects.requireNonNull; -import static org.apache.rya.streams.kafka.interactor.KafkaTopicPropertiesBuilder.CLEANUP_POLICY_COMPACT; import java.util.HashSet; import java.util.Optional; @@ -47,6 +46,7 @@ import com.google.common.util.concurrent.AbstractScheduledService.Scheduler; import edu.umd.cs.findbugs.annotations.DefaultAnnotation; import edu.umd.cs.findbugs.annotations.NonNull; +import kafka.log.LogConfig; /** * A command that runs a Rya Streams processing topology on the node the client is executed on until it has finished. @@ -141,7 +141,7 @@ public class RunQueryCommand implements RyaStreamsCommand { topics.add( KafkaTopics.queryResultsTopic(params.ryaInstance, queryId) ); final Properties topicProps = new KafkaTopicPropertiesBuilder() - .setCleanupPolicy(CLEANUP_POLICY_COMPACT) + .setCleanupPolicy(LogConfig.Compact()) .build(); KafkaTopics.createTopics(params.zookeeperServers, topics, 1, 1, Optional.of(topicProps)); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/dc238970/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaTopicPropertiesBuilder.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaTopicPropertiesBuilder.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaTopicPropertiesBuilder.java index 7df9891..b5001dd 100644 --- a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaTopicPropertiesBuilder.java +++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaTopicPropertiesBuilder.java @@ -1,3 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ package org.apache.rya.streams.kafka.interactor; import static java.util.Objects.requireNonNull; @@ -5,6 +23,10 @@ import static java.util.Objects.requireNonNull; import java.util.Optional; import java.util.Properties; +import org.apache.kafka.common.config.ConfigException; + +import kafka.log.LogConfig; + /** * Properties builder to be used when creating new Kafka Topics. * @@ -12,15 +34,11 @@ import java.util.Properties; * {@link https://kafka.apache.org/documentation/#topicconfigs} */ public class KafkaTopicPropertiesBuilder { - /*----- Cleanup Policy -----*/ - public static final String CLEANUP_POLICY_KEY = "cleanup.policy"; - public static final String CLEANUP_POLICY_DELETE = "cleanup.policy"; - public static final String CLEANUP_POLICY_COMPACT = "cleanup.policy"; - - private Optional<String> cleanupPolicy; /** * Sets the cleanup.policy of the Kafka Topic. + * Valid properties are: + * {@link LogConfig#compact()} and {@link LogConfig#Delete()} * * @param policy - The cleanup policy to use. * @return The builder. @@ -33,14 +51,16 @@ public class KafkaTopicPropertiesBuilder { /** * Builds the Kafka topic properties. * @return The {@link Properties} of the Kafka Topic. + * @throws ConfigException - If any of the properties are misconfigured. */ - public Properties build() { + public Properties build() throws ConfigException { final Properties props = new Properties(); if(cleanupPolicy.isPresent()) { - props.setProperty(CLEANUP_POLICY_KEY, cleanupPolicy.get()); + props.setProperty(LogConfig.CleanupPolicyProp(), cleanupPolicy.get()); } + LogConfig.validate(props); return props; } } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/dc238970/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaTopicPropertiesBuilderTest.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaTopicPropertiesBuilderTest.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaTopicPropertiesBuilderTest.java new file mode 100644 index 0000000..1550a16 --- /dev/null +++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaTopicPropertiesBuilderTest.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.kafka.interactor; + +import static org.junit.Assert.assertEquals; + +import java.util.Properties; + +import org.apache.kafka.common.config.ConfigException; +import org.junit.Test; + +import kafka.log.LogConfig; + +public class KafkaTopicPropertiesBuilderTest { + + @Test(expected=ConfigException.class) + public void invalidProperty() { + new KafkaTopicPropertiesBuilder() + .setCleanupPolicy("invalid") + .build(); + } + + @Test + public void validProperty() { + final Properties props = new KafkaTopicPropertiesBuilder() + .setCleanupPolicy(LogConfig.Compact()) + .build(); + + final Properties expected = new Properties(); + expected.setProperty(LogConfig.CleanupPolicyProp(), LogConfig.Compact()); + assertEquals(expected, props); + } +}
