RYA-467 responding to code review.  Closes #286

Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/dc238970
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/dc238970
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/dc238970

Branch: refs/heads/master
Commit: dc238970bc6ed0ffccd2cdcf836ce7474e93ab20
Parents: ba1e7e1
Author: Andrew Smith <smith...@gmail.com>
Authored: Mon Apr 2 14:20:13 2018 -0400
Committer: Valiyil <puja.vali...@parsons.com>
Committed: Mon Apr 16 14:53:23 2018 -0400

----------------------------------------------------------------------
 .../streams/client/command/RunQueryCommand.java |  4 +-
 .../interactor/KafkaTopicPropertiesBuilder.java | 36 ++++++++++----
 .../KafkaTopicPropertiesBuilderTest.java        | 49 ++++++++++++++++++++
 3 files changed, 79 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/dc238970/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/RunQueryCommand.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/RunQueryCommand.java
 
b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/RunQueryCommand.java
index edcc252..ed513ec 100644
--- 
a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/RunQueryCommand.java
+++ 
b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/RunQueryCommand.java
@@ -19,7 +19,6 @@
 package org.apache.rya.streams.client.command;
 
 import static java.util.Objects.requireNonNull;
-import static 
org.apache.rya.streams.kafka.interactor.KafkaTopicPropertiesBuilder.CLEANUP_POLICY_COMPACT;
 
 import java.util.HashSet;
 import java.util.Optional;
@@ -47,6 +46,7 @@ import 
com.google.common.util.concurrent.AbstractScheduledService.Scheduler;
 
 import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
 import edu.umd.cs.findbugs.annotations.NonNull;
+import kafka.log.LogConfig;
 
 /**
  * A command that runs a Rya Streams processing topology on the node the 
client is executed on until it has finished.
@@ -141,7 +141,7 @@ public class RunQueryCommand implements RyaStreamsCommand {
                 topics.add( KafkaTopics.queryResultsTopic(params.ryaInstance, 
queryId) );
 
                 final Properties topicProps = new KafkaTopicPropertiesBuilder()
-                    .setCleanupPolicy(CLEANUP_POLICY_COMPACT)
+                    .setCleanupPolicy(LogConfig.Compact())
                     .build();
                 KafkaTopics.createTopics(params.zookeeperServers, topics, 1, 
1, Optional.of(topicProps));
 

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/dc238970/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaTopicPropertiesBuilder.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaTopicPropertiesBuilder.java
 
b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaTopicPropertiesBuilder.java
index 7df9891..b5001dd 100644
--- 
a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaTopicPropertiesBuilder.java
+++ 
b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaTopicPropertiesBuilder.java
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.rya.streams.kafka.interactor;
 
 import static java.util.Objects.requireNonNull;
@@ -5,6 +23,10 @@ import static java.util.Objects.requireNonNull;
 import java.util.Optional;
 import java.util.Properties;
 
+import org.apache.kafka.common.config.ConfigException;
+
+import kafka.log.LogConfig;
+
 /**
  * Properties builder to be used when creating new Kafka Topics.
  *
@@ -12,15 +34,11 @@ import java.util.Properties;
  * {@link https://kafka.apache.org/documentation/#topicconfigs}
  */
 public class KafkaTopicPropertiesBuilder {
-    /*----- Cleanup Policy -----*/
-    public static final String CLEANUP_POLICY_KEY = "cleanup.policy";
-    public static final String CLEANUP_POLICY_DELETE = "cleanup.policy";
-    public static final String CLEANUP_POLICY_COMPACT = "cleanup.policy";
-
-
     private Optional<String> cleanupPolicy;
     /**
      * Sets the cleanup.policy of the Kafka Topic.
+     * Valid properties are:
+     * {@link LogConfig#compact()} and {@link LogConfig#Delete()}
      *
      * @param policy - The cleanup policy to use.
      * @return The builder.
@@ -33,14 +51,16 @@ public class KafkaTopicPropertiesBuilder {
     /**
      * Builds the Kafka topic properties.
      * @return The {@link Properties} of the Kafka Topic.
+     * @throws ConfigException - If any of the properties are misconfigured.
      */
-    public Properties build() {
+    public Properties build() throws ConfigException {
         final Properties props = new Properties();
 
         if(cleanupPolicy.isPresent()) {
-            props.setProperty(CLEANUP_POLICY_KEY, cleanupPolicy.get());
+            props.setProperty(LogConfig.CleanupPolicyProp(), 
cleanupPolicy.get());
         }
 
+        LogConfig.validate(props);
         return props;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/dc238970/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaTopicPropertiesBuilderTest.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaTopicPropertiesBuilderTest.java
 
b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaTopicPropertiesBuilderTest.java
new file mode 100644
index 0000000..1550a16
--- /dev/null
+++ 
b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaTopicPropertiesBuilderTest.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.kafka.interactor;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Properties;
+
+import org.apache.kafka.common.config.ConfigException;
+import org.junit.Test;
+
+import kafka.log.LogConfig;
+
+public class KafkaTopicPropertiesBuilderTest {
+
+    @Test(expected=ConfigException.class)
+    public void invalidProperty() {
+        new KafkaTopicPropertiesBuilder()
+            .setCleanupPolicy("invalid")
+            .build();
+    }
+
+    @Test
+    public void validProperty() {
+        final Properties props = new KafkaTopicPropertiesBuilder()
+            .setCleanupPolicy(LogConfig.Compact())
+            .build();
+
+        final Properties expected = new Properties();
+        expected.setProperty(LogConfig.CleanupPolicyProp(), 
LogConfig.Compact());
+        assertEquals(expected, props);
+    }
+}

Reply via email to