Repository: incubator-rya Updated Branches: refs/heads/master b6c94e41e -> dc238970b
RYA-467 update topic cleanup.policy Added a topic properties builder. Only one property is currently in the builder, but adding others should be easy. Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/ba1e7e17 Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/ba1e7e17 Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/ba1e7e17 Branch: refs/heads/master Commit: ba1e7e17ce0922322f4d156cdbbdec569df2867b Parents: b6c94e4 Author: Andrew Smith <smith...@gmail.com> Authored: Mon Apr 2 12:32:40 2018 -0400 Committer: Valiyil <puja.vali...@parsons.com> Committed: Mon Apr 16 14:53:06 2018 -0400 ---------------------------------------------------------------------- .../streams/client/command/RunQueryCommand.java | 9 +++- .../apache/rya/streams/kafka/KafkaTopics.java | 6 ++- .../kafka/interactor/CreateKafkaTopic.java | 8 +++- .../interactor/KafkaTopicPropertiesBuilder.java | 46 ++++++++++++++++++++ .../querymanager/kafka/LocalQueryExecutor.java | 5 ++- 5 files changed, 68 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/ba1e7e17/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/RunQueryCommand.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/RunQueryCommand.java b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/RunQueryCommand.java index f9a9458..edcc252 100644 --- a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/RunQueryCommand.java +++ b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/RunQueryCommand.java @@ -19,9 +19,11 @@ package org.apache.rya.streams.client.command; import static java.util.Objects.requireNonNull; +import static org.apache.rya.streams.kafka.interactor.KafkaTopicPropertiesBuilder.CLEANUP_POLICY_COMPACT; import java.util.HashSet; import java.util.Optional; +import java.util.Properties; import java.util.Set; import java.util.UUID; import java.util.concurrent.TimeUnit; @@ -33,6 +35,7 @@ import org.apache.rya.streams.api.queries.QueryRepository; import org.apache.rya.streams.client.RyaStreamsCommand; import org.apache.rya.streams.kafka.KafkaTopics; import org.apache.rya.streams.kafka.interactor.KafkaRunQuery; +import org.apache.rya.streams.kafka.interactor.KafkaTopicPropertiesBuilder; import org.apache.rya.streams.kafka.queries.KafkaQueryChangeLogFactory; import org.apache.rya.streams.kafka.topology.TopologyFactory; @@ -136,7 +139,11 @@ public class RunQueryCommand implements RyaStreamsCommand { final Set<String> topics = new HashSet<>(); topics.add( KafkaTopics.statementsTopic(params.ryaInstance) ); topics.add( KafkaTopics.queryResultsTopic(params.ryaInstance, queryId) ); - KafkaTopics.createTopics(params.zookeeperServers, topics, 1, 1); + + final Properties topicProps = new KafkaTopicPropertiesBuilder() + .setCleanupPolicy(CLEANUP_POLICY_COMPACT) + .build(); + KafkaTopics.createTopics(params.zookeeperServers, topics, 1, 1, Optional.of(topicProps)); // Run the query that uses those topics. final KafkaRunQuery runQuery = new KafkaRunQuery( http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/ba1e7e17/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaTopics.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaTopics.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaTopics.java index f0cc842..e445f47 100644 --- a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaTopics.java +++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaTopics.java @@ -127,12 +127,14 @@ public class KafkaTopics { * @param topicNames - The topics that will be created. (not null) * @param partitions - The number of partitions that each of the topics will have. * @param replicationFactor - The replication factor of the topics that are created. + * @param topicProperties - The optional properties of the topics to create. */ public static void createTopics( final String zookeeperServers, final Set<String> topicNames, final int partitions, - final int replicationFactor) { + final int replicationFactor, + final Optional<Properties> topicProperties) { requireNonNull(zookeeperServers); requireNonNull(topicNames); @@ -141,7 +143,7 @@ public class KafkaTopics { zkUtils = ZkUtils.apply(new ZkClient(zookeeperServers, 30000, 30000, ZKStringSerializer$.MODULE$), false); for(final String topicName : topicNames) { if(!AdminUtils.topicExists(zkUtils, topicName)) { - AdminUtils.createTopic(zkUtils, topicName, partitions, replicationFactor, new Properties(), RackAwareMode.Disabled$.MODULE$); + AdminUtils.createTopic(zkUtils, topicName, partitions, replicationFactor, topicProperties.orElse(new Properties()), RackAwareMode.Disabled$.MODULE$); } } } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/ba1e7e17/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/CreateKafkaTopic.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/CreateKafkaTopic.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/CreateKafkaTopic.java index 771e1c8..eb6c9c4 100644 --- a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/CreateKafkaTopic.java +++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/CreateKafkaTopic.java @@ -20,6 +20,8 @@ package org.apache.rya.streams.kafka.interactor; import static java.util.Objects.requireNonNull; +import java.util.Optional; +import java.util.Properties; import java.util.Set; import org.apache.rya.streams.kafka.KafkaTopics; @@ -50,11 +52,13 @@ public class CreateKafkaTopic { * @param topicNames - The topics that will be created. (not null) * @param partitions - The number of partitions that each of the topics will have. * @param replicationFactor - The replication factor of the topics that are created. + * @param topicProperties - The optional properties of the topics to create. */ public void createTopics( final Set<String> topicNames, final int partitions, - final int replicationFactor) { - KafkaTopics.createTopics(zookeeperServers, topicNames, partitions, replicationFactor); + final int replicationFactor, + final Optional<Properties> topicProperties) { + KafkaTopics.createTopics(zookeeperServers, topicNames, partitions, replicationFactor, topicProperties); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/ba1e7e17/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaTopicPropertiesBuilder.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaTopicPropertiesBuilder.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaTopicPropertiesBuilder.java new file mode 100644 index 0000000..7df9891 --- /dev/null +++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaTopicPropertiesBuilder.java @@ -0,0 +1,46 @@ +package org.apache.rya.streams.kafka.interactor; + +import static java.util.Objects.requireNonNull; + +import java.util.Optional; +import java.util.Properties; + +/** + * Properties builder to be used when creating new Kafka Topics. + * + * Descriptions of properties can be found at + * {@link https://kafka.apache.org/documentation/#topicconfigs} + */ +public class KafkaTopicPropertiesBuilder { + /*----- Cleanup Policy -----*/ + public static final String CLEANUP_POLICY_KEY = "cleanup.policy"; + public static final String CLEANUP_POLICY_DELETE = "cleanup.policy"; + public static final String CLEANUP_POLICY_COMPACT = "cleanup.policy"; + + + private Optional<String> cleanupPolicy; + /** + * Sets the cleanup.policy of the Kafka Topic. + * + * @param policy - The cleanup policy to use. + * @return The builder. + */ + public KafkaTopicPropertiesBuilder setCleanupPolicy(final String policy) { + cleanupPolicy = Optional.of(requireNonNull(policy)); + return this; + } + + /** + * Builds the Kafka topic properties. + * @return The {@link Properties} of the Kafka Topic. + */ + public Properties build() { + final Properties props = new Properties(); + + if(cleanupPolicy.isPresent()) { + props.setProperty(CLEANUP_POLICY_KEY, cleanupPolicy.get()); + } + + return props; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/ba1e7e17/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutor.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutor.java b/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutor.java index 4bd022a..2341739 100644 --- a/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutor.java +++ b/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutor.java @@ -24,6 +24,7 @@ import static java.util.Objects.requireNonNull; import java.util.HashMap; import java.util.HashSet; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.UUID; import java.util.concurrent.locks.ReentrantLock; @@ -131,7 +132,9 @@ public class LocalQueryExecutor extends AbstractIdleService implements QueryExec KafkaTopics.queryResultsTopic(ryaInstance, query.getQueryId())); // Make sure the Query Results topic exists for the query. - createKafkaTopic.createTopics(topics, 1, 1); + // Since this is running in the JVM, the properties are left empty + // so the cleanup.policy will default to delete to reduce memory usage. + createKafkaTopic.createTopics(topics, 1, 1, Optional.empty()); // Setup the Kafka Streams job that will execute. final KafkaStreams streams = streamsFactory.make(ryaInstance, query);