Repository: incubator-rya Updated Branches: refs/heads/master fc8d30ac6 -> 051472660
RYA-350 Added EmbeddedKafkaSingleton to help eliminate flaky ITs. Closes #214. Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/82df3ad0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/82df3ad0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/82df3ad0 Branch: refs/heads/master Commit: 82df3ad0ba502ff8fafd184d318231510698342f Parents: fc8d30a Author: jdasch <[email protected]> Authored: Tue Aug 22 23:08:30 2017 -0400 Committer: Caleb Meier <[email protected]> Committed: Fri Aug 25 12:34:15 2017 -0700 ---------------------------------------------------------------------- .../rya/kafka/base/EmbeddedKafkaInstance.java | 143 +++++++++++++++++++ .../rya/kafka/base/EmbeddedKafkaSingleton.java | 87 +++++++++++ .../org/apache/rya/kafka/base/KafkaITBase.java | 58 ++------ .../rya/kafka/base/KafkaTestInstanceRule.java | 98 +++++++++++++ .../periodic.service.integration.tests/pom.xml | 119 ++++++++------- .../PeriodicNotificationExporterIT.java | 98 +++++++------ .../src/test/resources/log4j.properties | 37 +++++ 7 files changed, 493 insertions(+), 147 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/82df3ad0/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/kafka/base/EmbeddedKafkaInstance.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/kafka/base/EmbeddedKafkaInstance.java b/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/kafka/base/EmbeddedKafkaInstance.java new file mode 100644 index 0000000..97d8b90 --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/kafka/base/EmbeddedKafkaInstance.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.base; + +import java.nio.file.Files; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.fluo.core.util.PortUtils; +import org.apache.kafka.clients.CommonClientConfigs; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import kafka.server.KafkaConfig; +import kafka.server.KafkaConfig$; +import kafka.server.KafkaServer; +import kafka.utils.MockTime; +import kafka.utils.TestUtils; +import kafka.utils.Time; +import kafka.zk.EmbeddedZookeeper; + +/** + * This class provides a {@link KafkaServer} and a dedicated + * {@link EmbeddedZookeeper} server for integtration testing. Both servers use a + * random free port, so it is necesssary to use the + * {@link #getZookeeperConnect()} and {@link #createBootstrapServerConfig()} + * methods to determine how to connect to them. + * + */ +public class EmbeddedKafkaInstance { + + private static final Logger logger = LoggerFactory.getLogger(EmbeddedKafkaInstance.class); + + private static final AtomicInteger KAFKA_TOPIC_COUNTER = new AtomicInteger(1); + private static final String IPv4_LOOPBACK = "127.0.0.1"; + private static final String ZKHOST = IPv4_LOOPBACK; + private static final String BROKERHOST = IPv4_LOOPBACK; + private KafkaServer kafkaServer; + private EmbeddedZookeeper zkServer; + private String brokerPort; + private String zookeperConnect; + + /** + * Starts the Embedded Kafka and Zookeeper Servers. + * @throws Exception - If an exeption occurs during startup. + */ + protected void startup() throws Exception { + // Setup the embedded zookeeper + logger.info("Starting up Embedded Zookeeper..."); + zkServer = new EmbeddedZookeeper(); + zookeperConnect = ZKHOST + ":" + zkServer.port(); + logger.info("Embedded Zookeeper started at: {}", zookeperConnect); + + // setup Broker + logger.info("Starting up Embedded Kafka..."); + brokerPort = Integer.toString(PortUtils.getRandomFreePort()); + final Properties brokerProps = new Properties(); + brokerProps.setProperty(KafkaConfig$.MODULE$.BrokerIdProp(), "0"); + brokerProps.setProperty(KafkaConfig$.MODULE$.HostNameProp(), BROKERHOST); + brokerProps.setProperty(KafkaConfig$.MODULE$.PortProp(), brokerPort); + brokerProps.setProperty(KafkaConfig$.MODULE$.ZkConnectProp(), zookeperConnect); + brokerProps.setProperty(KafkaConfig$.MODULE$.LogDirsProp(), Files.createTempDirectory(getClass().getSimpleName() + "-").toAbsolutePath().toString()); + final KafkaConfig config = new KafkaConfig(brokerProps); + final Time mock = new MockTime(); + kafkaServer = TestUtils.createServer(config, mock); + logger.info("Embedded Kafka Server started at: {}:{}", BROKERHOST, brokerPort); + } + + /** + * Shutdown the Embedded Kafka and Zookeeper. + * @throws Exception + */ + protected void shutdown() throws Exception { + try { + if(kafkaServer != null) { + kafkaServer.shutdown(); + } + } finally { + if(zkServer != null) { + zkServer.shutdown(); + } + } + } + + /** + * @return A new Property object containing the correct value of + * {@link CommonClientConfigs#BOOTSTRAP_SERVERS_CONFIG}, for + * connecting to this instance. + */ + public Properties createBootstrapServerConfig() { + final Properties config = new Properties(); + config.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, BROKERHOST + ":" + brokerPort); + return config; + } + + /** + * + * @return The host of the Kafka Broker. + */ + public String getBrokerHost() { + return BROKERHOST; + } + + /** + * + * @return The port of the Kafka Broker. + */ + public String getBrokerPort() { + return brokerPort; + } + + /** + * + * @return The Zookeeper Connect String. + */ + public String getZookeeperConnect() { + return zookeperConnect; + } + + /** + * + * @return A unique Kafka topic name for this instance. + */ + public String getUniqueTopicName() { + return "topic_" + KAFKA_TOPIC_COUNTER.getAndIncrement() + "_"; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/82df3ad0/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/kafka/base/EmbeddedKafkaSingleton.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/kafka/base/EmbeddedKafkaSingleton.java b/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/kafka/base/EmbeddedKafkaSingleton.java new file mode 100644 index 0000000..933377b --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/kafka/base/EmbeddedKafkaSingleton.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.base; + +import java.io.IOException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Provides a singleton instance of an {@link EmbeddedKafkaInstance} and + * includes a shutdown hook to ensure any open resources are closed on JVM exit. + * <p> + * This class is derived from MiniAccumuloSingleton. + */ +public class EmbeddedKafkaSingleton { + + public static EmbeddedKafkaInstance getInstance() { + return InstanceHolder.SINGLETON.instance; + } + + private EmbeddedKafkaSingleton() { + // hiding implicit default constructor + } + + private enum InstanceHolder { + + SINGLETON; + + private final Logger log; + private final EmbeddedKafkaInstance instance; + + InstanceHolder() { + this.log = LoggerFactory.getLogger(EmbeddedKafkaInstance.class); + this.instance = new EmbeddedKafkaInstance(); + try { + this.instance.startup(); + + // JUnit does not have an overall lifecycle event for tearing down + // this kind of resource, but shutdown hooks work alright in practice + // since this should only be used during testing + + // The only other alternative for lifecycle management is to use a + // suite lifecycle to enclose the tests that need this resource. + // In practice this becomes unwieldy. + + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + try { + InstanceHolder.this.instance.shutdown(); + } catch (final Throwable t) { + // logging frameworks will likely be shut down + t.printStackTrace(System.err); + } + } + }); + + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + log.error("Interrupted while starting EmbeddedKafkaInstance", e); + } catch (final IOException e) { + log.error("Unexpected error while starting EmbeddedKafkaInstance", e); + } catch (final Throwable e) { + // catching throwable because failure to construct an enum + // instance will lead to another error being thrown downstream + log.error("Unexpected throwable while starting EmbeddedKafkaInstance", e); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/82df3ad0/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/kafka/base/KafkaITBase.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/kafka/base/KafkaITBase.java b/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/kafka/base/KafkaITBase.java index b9be828..da4526c 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/kafka/base/KafkaITBase.java +++ b/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/kafka/base/KafkaITBase.java @@ -18,61 +18,21 @@ */ package org.apache.rya.kafka.base; -import java.nio.file.Files; import java.util.Properties; -import org.I0Itec.zkclient.ZkClient; -import org.junit.After; -import org.junit.Before; - -import kafka.server.KafkaConfig; -import kafka.server.KafkaServer; -import kafka.utils.MockTime; -import kafka.utils.TestUtils; -import kafka.utils.Time; -import kafka.utils.ZKStringSerializer$; -import kafka.utils.ZkUtils; -import kafka.zk.EmbeddedZookeeper; - +/** + * A class intended to be extended for Kafka Integration tests. + */ public class KafkaITBase { - private static final String ZKHOST = "127.0.0.1"; - private static final String BROKERHOST = "127.0.0.1"; - private static final String BROKERPORT = "9092"; - private KafkaServer kafkaServer; - private EmbeddedZookeeper zkServer; - private ZkClient zkClient; - - @Before - public void setupKafka() throws Exception { + private static EmbeddedKafkaInstance embeddedKafka = EmbeddedKafkaSingleton.getInstance(); - // Setup Kafka. - zkServer = new EmbeddedZookeeper(); - final String zkConnect = ZKHOST + ":" + zkServer.port(); - zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer$.MODULE$); - ZkUtils.apply(zkClient, false); - - // setup Broker - final Properties brokerProps = new Properties(); - brokerProps.setProperty("zookeeper.connect", zkConnect); - brokerProps.setProperty("broker.id", "0"); - brokerProps.setProperty("log.dirs", Files.createTempDirectory("kafka-").toAbsolutePath().toString()); - brokerProps.setProperty("listeners", "PLAINTEXT://" + BROKERHOST + ":" + BROKERPORT); - final KafkaConfig config = new KafkaConfig(brokerProps); - final Time mock = new MockTime(); - kafkaServer = TestUtils.createServer(config, mock); - } - /** - * Close all the Kafka mini server and mini-zookeeper - * - * @see org.apache.rya.indexing.pcj.fluo.ITBase#shutdownMiniResources() + * @return A new Property object containing the correct value for Kafka's + * {@link CommonClientConfigs#BOOTSTRAP_SERVERS_CONFIG}. */ - @After - public void teardownKafka() { - kafkaServer.shutdown(); - zkClient.close(); - zkServer.shutdown(); + protected Properties createBootstrapServerConfig() { + return embeddedKafka.createBootstrapServerConfig(); } - + } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/82df3ad0/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/kafka/base/KafkaTestInstanceRule.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/kafka/base/KafkaTestInstanceRule.java b/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/kafka/base/KafkaTestInstanceRule.java new file mode 100644 index 0000000..a9ee7b5 --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/kafka/base/KafkaTestInstanceRule.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.base; + +import java.util.Properties; + +import org.I0Itec.zkclient.ZkClient; +import org.junit.rules.ExternalResource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import kafka.admin.AdminUtils; +import kafka.admin.RackAwareMode; +import kafka.utils.ZKStringSerializer$; +import kafka.utils.ZkUtils; + + +/** + * Provides a JUnit Rule for interacting with the {@link EmbeddedKafkaSingleton}. + * + */ +public class KafkaTestInstanceRule extends ExternalResource { + private static final Logger logger = LoggerFactory.getLogger(KafkaTestInstanceRule.class); + private static final EmbeddedKafkaInstance kafkaInstance = EmbeddedKafkaSingleton.getInstance(); + private String kafkaTopicName; + private final boolean createTopic; + + /** + * @param createTopic - If true, a topic shall be created for the value + * provided by {@link #getKafkaTopicName()}. If false, no topics + * shall be created. + */ + public KafkaTestInstanceRule(final boolean createTopic) { + this.createTopic = createTopic; + } + + /** + * @return A unique topic name for this test execution. If multiple topics are required by a test, use this value as + * a prefix. + */ + public String getKafkaTopicName() { + if (kafkaTopicName == null) { + throw new IllegalStateException("Cannot get Kafka Topic Name outside of a test execution."); + } + return kafkaTopicName; + } + + @Override + protected void before() throws Throwable { + // Get the next kafka topic name. + kafkaTopicName = kafkaInstance.getUniqueTopicName(); + + if(createTopic) { + createTopic(kafkaTopicName); + } + } + + @Override + protected void after() { + kafkaTopicName = null; + } + + /** + * Utility method to provide additional unique topics if they are required. + * @param topicName - The Kafka topic to create. + */ + public void createTopic(final String topicName) { + // Setup Kafka. + ZkUtils zkUtils = null; + try { + logger.info("Creating Kafka Topic: '{}'", topicName); + zkUtils = ZkUtils.apply(new ZkClient(kafkaInstance.getZookeeperConnect(), 30000, 30000, ZKStringSerializer$.MODULE$), false); + AdminUtils.createTopic(zkUtils, topicName, 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$); + } + finally { + if(zkUtils != null) { + zkUtils.close(); + } + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/82df3ad0/extras/rya.periodic.service/periodic.service.integration.tests/pom.xml ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.integration.tests/pom.xml b/extras/rya.periodic.service/periodic.service.integration.tests/pom.xml index 1b784a6..20a0647 100644 --- a/extras/rya.periodic.service/periodic.service.integration.tests/pom.xml +++ b/extras/rya.periodic.service/periodic.service.integration.tests/pom.xml @@ -1,62 +1,71 @@ +<?xml version="1.0" encoding="utf-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <!-- Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with this - work for additional information regarding copyright ownership. The ASF licenses - this file to you under the Apache License, Version 2.0 (the "License"); you - may not use this file except in compliance with the License. You may obtain - a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless - required by applicable law or agreed to in writing, software distributed - under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES - OR CONDITIONS OF ANY KIND, either express or implied. See the License for - the specific language governing permissions and limitations under the License. --> - <modelVersion>4.0.0</modelVersion> + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.rya</groupId> + <artifactId>rya.periodic.service</artifactId> + <version>3.2.11-incubating-SNAPSHOT</version> + </parent> - <parent> - <groupId>org.apache.rya</groupId> - <artifactId>rya.periodic.service</artifactId> - <version>3.2.11-incubating-SNAPSHOT</version> - </parent> + <artifactId>rya.periodic.service.integration.tests</artifactId> - <artifactId>rya.periodic.service.integration.tests</artifactId> - - <name>Apache Rya Periodic Service Integration Tests</name> + <name>Apache Rya Periodic Service Integration Tests</name> <description>Integration Tests for Rya Periodic Service</description> - <dependencies> - <dependency> - <groupId>org.apache.rya</groupId> - <artifactId>rya.pcj.fluo.test.base</artifactId> - <exclusions> - <exclusion> - <artifactId>log4j-1.2-api</artifactId> - <groupId>org.apache.logging.log4j</groupId> - </exclusion> - <exclusion> - <artifactId>log4j-api</artifactId> - <groupId>org.apache.logging.log4j</groupId> - </exclusion> - <exclusion> - <artifactId>log4j-core</artifactId> - <groupId>org.apache.logging.log4j</groupId> - </exclusion> - </exclusions> - </dependency> - <dependency> - <groupId>org.apache.rya</groupId> - <artifactId>rya.periodic.service.notification</artifactId> - <version>${project.version}</version> - <exclusions> - <exclusion> - <artifactId>logback-classic</artifactId> - <groupId>ch.qos.logback</groupId> - </exclusion> - <exclusion> - <artifactId>logback-core</artifactId> - <groupId>ch.qos.logback</groupId> - </exclusion> - </exclusions> - </dependency> - </dependencies> + <dependencies> + <dependency> + <groupId>org.apache.rya</groupId> + <artifactId>rya.pcj.fluo.test.base</artifactId> + <exclusions> + <exclusion> + <artifactId>log4j-1.2-api</artifactId> + <groupId>org.apache.logging.log4j</groupId> + </exclusion> + <exclusion> + <artifactId>log4j-api</artifactId> + <groupId>org.apache.logging.log4j</groupId> + </exclusion> + <exclusion> + <artifactId>log4j-core</artifactId> + <groupId>org.apache.logging.log4j</groupId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.rya</groupId> + <artifactId>rya.periodic.service.notification</artifactId> + <version>${project.version}</version> + <exclusions> + <exclusion> + <artifactId>logback-classic</artifactId> + <groupId>ch.qos.logback</groupId> + </exclusion> + <exclusion> + <artifactId>logback-core</artifactId> + <groupId>ch.qos.logback</groupId> + </exclusion> + </exclusions> + </dependency> + </dependencies> </project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/82df3ad0/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/exporter/PeriodicNotificationExporterIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/exporter/PeriodicNotificationExporterIT.java b/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/exporter/PeriodicNotificationExporterIT.java index c0efc4f..c5dc809 100644 --- a/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/exporter/PeriodicNotificationExporterIT.java +++ b/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/exporter/PeriodicNotificationExporterIT.java @@ -22,6 +22,7 @@ import java.util.Arrays; import java.util.HashSet; import java.util.Properties; import java.util.Set; +import java.util.UUID; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; @@ -34,8 +35,10 @@ import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage; import org.apache.rya.kafka.base.KafkaITBase; +import org.apache.rya.kafka.base.KafkaTestInstanceRule; import org.apache.rya.periodic.notification.serialization.BindingSetSerDe; import org.junit.Assert; +import org.junit.Rule; import org.junit.Test; import org.openrdf.model.ValueFactory; import org.openrdf.model.impl.ValueFactoryImpl; @@ -44,82 +47,91 @@ import org.openrdf.query.algebra.evaluation.QueryBindingSet; public class PeriodicNotificationExporterIT extends KafkaITBase { + + @Rule + public KafkaTestInstanceRule kafkaTestInstanceRule = new KafkaTestInstanceRule(false); + + private static final ValueFactory vf = new ValueFactoryImpl(); - + @Test public void testExporter() throws InterruptedException { - - BlockingQueue<BindingSetRecord> records = new LinkedBlockingQueue<>(); - Properties props = createKafkaConfig(); - - KafkaExporterExecutor exporter = new KafkaExporterExecutor(new KafkaProducer<String, BindingSet>(props), 1, records); + + final String topic1 = kafkaTestInstanceRule.getKafkaTopicName() + "1"; + final String topic2 = kafkaTestInstanceRule.getKafkaTopicName() + "2"; + + kafkaTestInstanceRule.createTopic(topic1); + kafkaTestInstanceRule.createTopic(topic2); + + final BlockingQueue<BindingSetRecord> records = new LinkedBlockingQueue<>(); + + final KafkaExporterExecutor exporter = new KafkaExporterExecutor(new KafkaProducer<String, BindingSet>(createKafkaProducerConfig()), 1, records); exporter.start(); - - QueryBindingSet bs1 = new QueryBindingSet(); + final QueryBindingSet bs1 = new QueryBindingSet(); bs1.addBinding(PeriodicQueryResultStorage.PeriodicBinId, vf.createLiteral(1L)); bs1.addBinding("name", vf.createURI("uri:Bob")); - BindingSetRecord record1 = new BindingSetRecord(bs1, "topic1"); - - QueryBindingSet bs2 = new QueryBindingSet(); + final BindingSetRecord record1 = new BindingSetRecord(bs1, topic1); + + final QueryBindingSet bs2 = new QueryBindingSet(); bs2.addBinding(PeriodicQueryResultStorage.PeriodicBinId, vf.createLiteral(2L)); bs2.addBinding("name", vf.createURI("uri:Joe")); - BindingSetRecord record2 = new BindingSetRecord(bs2, "topic2"); - + final BindingSetRecord record2 = new BindingSetRecord(bs2, topic2); + records.add(record1); records.add(record2); - - Set<BindingSet> expected1 = new HashSet<>(); + + final Set<BindingSet> expected1 = new HashSet<>(); expected1.add(bs1); - Set<BindingSet> expected2 = new HashSet<>(); + final Set<BindingSet> expected2 = new HashSet<>(); expected2.add(bs2); - - Set<BindingSet> actual1 = getBindingSetsFromKafka("topic1"); - Set<BindingSet> actual2 = getBindingSetsFromKafka("topic2"); - + + final Set<BindingSet> actual1 = getBindingSetsFromKafka(topic1); + final Set<BindingSet> actual2 = getBindingSetsFromKafka(topic2); + Assert.assertEquals(expected1, actual1); Assert.assertEquals(expected2, actual2); - + exporter.stop(); - } - - - private Properties createKafkaConfig() { - Properties props = new Properties(); - props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); - props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group0"); - props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "consumer0"); - props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + + + private Properties createKafkaProducerConfig() { + final Properties props = createBootstrapServerConfig(); props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, BindingSetSerDe.class.getName()); + return props; + } + private Properties createKafkaConsumerConfig() { + final Properties props = createBootstrapServerConfig(); + props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString()); + props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "consumer0"); + props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BindingSetSerDe.class.getName()); - return props; } - - - private KafkaConsumer<String, BindingSet> makeBindingSetConsumer(final String TopicName) { + + + private KafkaConsumer<String, BindingSet> makeBindingSetConsumer(final String topicName) { // setup consumer - final Properties consumerProps = createKafkaConfig(); - final KafkaConsumer<String, BindingSet> consumer = new KafkaConsumer<>(consumerProps); - consumer.subscribe(Arrays.asList(TopicName)); + final KafkaConsumer<String, BindingSet> consumer = new KafkaConsumer<>(createKafkaConsumerConfig()); + consumer.subscribe(Arrays.asList(topicName)); return consumer; } - - private Set<BindingSet> getBindingSetsFromKafka(String topic) { + + private Set<BindingSet> getBindingSetsFromKafka(final String topicName) { KafkaConsumer<String, BindingSet> consumer = null; try { - consumer = makeBindingSetConsumer(topic); - ConsumerRecords<String, BindingSet> records = consumer.poll(5000); + consumer = makeBindingSetConsumer(topicName); + final ConsumerRecords<String, BindingSet> records = consumer.poll(20000); // Wait up to 20 seconds for a result to be published. - Set<BindingSet> bindingSets = new HashSet<>(); + final Set<BindingSet> bindingSets = new HashSet<>(); records.forEach(x -> bindingSets.add(x.value())); return bindingSets; - } catch (Exception e) { + } catch (final Exception e) { throw new RuntimeException(e); } finally { if (consumer != null) { http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/82df3ad0/extras/rya.periodic.service/periodic.service.integration.tests/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.integration.tests/src/test/resources/log4j.properties b/extras/rya.periodic.service/periodic.service.integration.tests/src/test/resources/log4j.properties new file mode 100644 index 0000000..19cc13c --- /dev/null +++ b/extras/rya.periodic.service/periodic.service.integration.tests/src/test/resources/log4j.properties @@ -0,0 +1,37 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# Valid levels: +# TRACE, DEBUG, INFO, WARN, ERROR and FATAL +log4j.rootLogger=INFO, CONSOLE + +# Set independent logging levels +log4j.logger.org.apache.zookeeper=WARN +log4j.logger.kafka=WARN +log4j.logger.org.apache.kafka=WARN + +# LOGFILE is set to be a File appender using a PatternLayout. +log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender +#log4j.appender.CONSOLE.Threshold=DEBUG + +log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout +log4j.appender.CONSOLE.layout.ConversionPattern=%d [%t] %-5p %c - %m%n + +#log4j.appender.CONSOLE.layout=org.apache.log4j.EnhancedPatternLayout +#log4j.appender.CONSOLE.layout.ConversionPattern=%d [%t] %-5p %c{1.} - %m%n \ No newline at end of file
