Repository: incubator-rya
Updated Branches:
  refs/heads/master fc8d30ac6 -> 051472660


RYA-350 Added EmbeddedKafkaSingleton to help eliminate flaky ITs. Closes #214.


Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/82df3ad0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/82df3ad0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/82df3ad0

Branch: refs/heads/master
Commit: 82df3ad0ba502ff8fafd184d318231510698342f
Parents: fc8d30a
Author: jdasch <[email protected]>
Authored: Tue Aug 22 23:08:30 2017 -0400
Committer: Caleb Meier <[email protected]>
Committed: Fri Aug 25 12:34:15 2017 -0700

----------------------------------------------------------------------
 .../rya/kafka/base/EmbeddedKafkaInstance.java   | 143 +++++++++++++++++++
 .../rya/kafka/base/EmbeddedKafkaSingleton.java  |  87 +++++++++++
 .../org/apache/rya/kafka/base/KafkaITBase.java  |  58 ++------
 .../rya/kafka/base/KafkaTestInstanceRule.java   |  98 +++++++++++++
 .../periodic.service.integration.tests/pom.xml  | 119 ++++++++-------
 .../PeriodicNotificationExporterIT.java         |  98 +++++++------
 .../src/test/resources/log4j.properties         |  37 +++++
 7 files changed, 493 insertions(+), 147 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/82df3ad0/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/kafka/base/EmbeddedKafkaInstance.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/kafka/base/EmbeddedKafkaInstance.java
 
b/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/kafka/base/EmbeddedKafkaInstance.java
new file mode 100644
index 0000000..97d8b90
--- /dev/null
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/kafka/base/EmbeddedKafkaInstance.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.base;
+
+import java.nio.file.Files;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.fluo.core.util.PortUtils;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaConfig$;
+import kafka.server.KafkaServer;
+import kafka.utils.MockTime;
+import kafka.utils.TestUtils;
+import kafka.utils.Time;
+import kafka.zk.EmbeddedZookeeper;
+
+/**
+ * This class provides a {@link KafkaServer} and a dedicated
+ * {@link EmbeddedZookeeper} server for integtration testing. Both servers use 
a
+ * random free port, so it is necesssary to use the
+ * {@link #getZookeeperConnect()} and {@link #createBootstrapServerConfig()}
+ * methods to determine how to connect to them.
+ *
+ */
+public class EmbeddedKafkaInstance {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(EmbeddedKafkaInstance.class);
+
+    private static final AtomicInteger KAFKA_TOPIC_COUNTER = new 
AtomicInteger(1);
+    private static final String IPv4_LOOPBACK = "127.0.0.1";
+    private static final String ZKHOST = IPv4_LOOPBACK;
+    private static final String BROKERHOST = IPv4_LOOPBACK;
+    private KafkaServer kafkaServer;
+    private EmbeddedZookeeper zkServer;
+    private String brokerPort;
+    private String zookeperConnect;
+
+    /**
+     * Starts the Embedded Kafka and Zookeeper Servers.
+     * @throws Exception - If an exeption occurs during startup.
+     */
+    protected void startup() throws Exception {
+        // Setup the embedded zookeeper
+        logger.info("Starting up Embedded Zookeeper...");
+        zkServer = new EmbeddedZookeeper();
+        zookeperConnect = ZKHOST + ":" + zkServer.port();
+        logger.info("Embedded Zookeeper started at: {}", zookeperConnect);
+
+        // setup Broker
+        logger.info("Starting up Embedded Kafka...");
+        brokerPort = Integer.toString(PortUtils.getRandomFreePort());
+        final Properties brokerProps = new Properties();
+        brokerProps.setProperty(KafkaConfig$.MODULE$.BrokerIdProp(), "0");
+        brokerProps.setProperty(KafkaConfig$.MODULE$.HostNameProp(), 
BROKERHOST);
+        brokerProps.setProperty(KafkaConfig$.MODULE$.PortProp(), brokerPort);
+        brokerProps.setProperty(KafkaConfig$.MODULE$.ZkConnectProp(), 
zookeperConnect);
+        brokerProps.setProperty(KafkaConfig$.MODULE$.LogDirsProp(), 
Files.createTempDirectory(getClass().getSimpleName() + 
"-").toAbsolutePath().toString());
+        final KafkaConfig config = new KafkaConfig(brokerProps);
+        final Time mock = new MockTime();
+        kafkaServer = TestUtils.createServer(config, mock);
+        logger.info("Embedded Kafka Server started at: {}:{}", BROKERHOST, 
brokerPort);
+    }
+
+    /**
+     * Shutdown the Embedded Kafka and Zookeeper.
+     * @throws Exception
+     */
+    protected void shutdown() throws Exception {
+        try {
+            if(kafkaServer != null) {
+                kafkaServer.shutdown();
+            }
+        } finally {
+            if(zkServer != null) {
+                zkServer.shutdown();
+            }
+        }
+    }
+
+    /**
+     * @return A new Property object containing the correct value of
+     *         {@link CommonClientConfigs#BOOTSTRAP_SERVERS_CONFIG}, for
+     *         connecting to this instance.
+     */
+    public Properties createBootstrapServerConfig() {
+        final Properties config = new Properties();
+        config.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
BROKERHOST + ":" + brokerPort);
+        return config;
+    }
+
+    /**
+     *
+     * @return The host of the Kafka Broker.
+     */
+    public String getBrokerHost() {
+        return BROKERHOST;
+    }
+
+    /**
+     *
+     * @return The port of the Kafka Broker.
+     */
+    public String getBrokerPort() {
+        return brokerPort;
+    }
+
+    /**
+     *
+     * @return The Zookeeper Connect String.
+     */
+    public String getZookeeperConnect() {
+        return zookeperConnect;
+    }
+
+    /**
+     *
+     * @return A unique Kafka topic name for this instance.
+     */
+    public String getUniqueTopicName() {
+        return "topic_" + KAFKA_TOPIC_COUNTER.getAndIncrement() + "_";
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/82df3ad0/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/kafka/base/EmbeddedKafkaSingleton.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/kafka/base/EmbeddedKafkaSingleton.java
 
b/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/kafka/base/EmbeddedKafkaSingleton.java
new file mode 100644
index 0000000..933377b
--- /dev/null
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/kafka/base/EmbeddedKafkaSingleton.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.base;
+
+import java.io.IOException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Provides a singleton instance of an {@link EmbeddedKafkaInstance} and
+ * includes a shutdown hook to ensure any open resources are closed on JVM 
exit.
+ * <p>
+ * This class is derived from MiniAccumuloSingleton.
+ */
+public class EmbeddedKafkaSingleton {
+
+    public static EmbeddedKafkaInstance getInstance() {
+        return InstanceHolder.SINGLETON.instance;
+    }
+
+    private EmbeddedKafkaSingleton() {
+        // hiding implicit default constructor
+    }
+
+    private enum InstanceHolder {
+
+        SINGLETON;
+
+        private final Logger log;
+        private final EmbeddedKafkaInstance instance;
+
+        InstanceHolder() {
+            this.log = LoggerFactory.getLogger(EmbeddedKafkaInstance.class);
+            this.instance = new EmbeddedKafkaInstance();
+            try {
+                this.instance.startup();
+
+                // JUnit does not have an overall lifecycle event for tearing 
down
+                // this kind of resource, but shutdown hooks work alright in 
practice
+                // since this should only be used during testing
+
+                // The only other alternative for lifecycle management is to 
use a
+                // suite lifecycle to enclose the tests that need this 
resource.
+                // In practice this becomes unwieldy.
+
+                Runtime.getRuntime().addShutdownHook(new Thread() {
+                    @Override
+                    public void run() {
+                        try {
+                            InstanceHolder.this.instance.shutdown();
+                        } catch (final Throwable t) {
+                            // logging frameworks will likely be shut down
+                            t.printStackTrace(System.err);
+                        }
+                    }
+                });
+
+            } catch (final InterruptedException e) {
+                Thread.currentThread().interrupt();
+                log.error("Interrupted while starting EmbeddedKafkaInstance", 
e);
+            } catch (final IOException e) {
+                log.error("Unexpected error while starting 
EmbeddedKafkaInstance", e);
+            } catch (final Throwable e) {
+                // catching throwable because failure to construct an enum
+                // instance will lead to another error being thrown downstream
+                log.error("Unexpected throwable while starting 
EmbeddedKafkaInstance", e);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/82df3ad0/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/kafka/base/KafkaITBase.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/kafka/base/KafkaITBase.java
 
b/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/kafka/base/KafkaITBase.java
index b9be828..da4526c 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/kafka/base/KafkaITBase.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/kafka/base/KafkaITBase.java
@@ -18,61 +18,21 @@
  */
 package org.apache.rya.kafka.base;
 
-import java.nio.file.Files;
 import java.util.Properties;
 
-import org.I0Itec.zkclient.ZkClient;
-import org.junit.After;
-import org.junit.Before;
-
-import kafka.server.KafkaConfig;
-import kafka.server.KafkaServer;
-import kafka.utils.MockTime;
-import kafka.utils.TestUtils;
-import kafka.utils.Time;
-import kafka.utils.ZKStringSerializer$;
-import kafka.utils.ZkUtils;
-import kafka.zk.EmbeddedZookeeper;
-
+/**
+ * A class intended to be extended for Kafka Integration tests.
+ */
 public class KafkaITBase {
 
-    private static final String ZKHOST = "127.0.0.1";
-    private static final String BROKERHOST = "127.0.0.1";
-    private static final String BROKERPORT = "9092";
-    private KafkaServer kafkaServer;
-    private EmbeddedZookeeper zkServer;
-    private ZkClient zkClient;
-    
-    @Before
-    public void setupKafka() throws Exception {
+    private static EmbeddedKafkaInstance embeddedKafka = 
EmbeddedKafkaSingleton.getInstance();
 
-        // Setup Kafka.
-        zkServer = new EmbeddedZookeeper();
-        final String zkConnect = ZKHOST + ":" + zkServer.port();
-        zkClient = new ZkClient(zkConnect, 30000, 30000, 
ZKStringSerializer$.MODULE$);
-        ZkUtils.apply(zkClient, false);
-
-        // setup Broker
-        final Properties brokerProps = new Properties();
-        brokerProps.setProperty("zookeeper.connect", zkConnect);
-        brokerProps.setProperty("broker.id", "0");
-        brokerProps.setProperty("log.dirs", 
Files.createTempDirectory("kafka-").toAbsolutePath().toString());
-        brokerProps.setProperty("listeners", "PLAINTEXT://" + BROKERHOST + ":" 
+ BROKERPORT);
-        final KafkaConfig config = new KafkaConfig(brokerProps);
-        final Time mock = new MockTime();
-        kafkaServer = TestUtils.createServer(config, mock);
-    }
-    
     /**
-     * Close all the Kafka mini server and mini-zookeeper
-     *
-     * @see org.apache.rya.indexing.pcj.fluo.ITBase#shutdownMiniResources()
+     * @return A new Property object containing the correct value for Kafka's
+     *         {@link CommonClientConfigs#BOOTSTRAP_SERVERS_CONFIG}.
      */
-    @After
-    public void teardownKafka() {
-        kafkaServer.shutdown();
-        zkClient.close();
-        zkServer.shutdown();
+    protected Properties createBootstrapServerConfig() {
+        return embeddedKafka.createBootstrapServerConfig();
     }
-    
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/82df3ad0/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/kafka/base/KafkaTestInstanceRule.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/kafka/base/KafkaTestInstanceRule.java
 
b/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/kafka/base/KafkaTestInstanceRule.java
new file mode 100644
index 0000000..a9ee7b5
--- /dev/null
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/kafka/base/KafkaTestInstanceRule.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.base;
+
+import java.util.Properties;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.junit.rules.ExternalResource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import kafka.admin.AdminUtils;
+import kafka.admin.RackAwareMode;
+import kafka.utils.ZKStringSerializer$;
+import kafka.utils.ZkUtils;
+
+
+/**
+ * Provides a JUnit Rule for interacting with the {@link 
EmbeddedKafkaSingleton}.
+ *
+ */
+public class KafkaTestInstanceRule extends ExternalResource {
+    private static final Logger logger = 
LoggerFactory.getLogger(KafkaTestInstanceRule.class);
+    private static final EmbeddedKafkaInstance kafkaInstance = 
EmbeddedKafkaSingleton.getInstance();
+    private String kafkaTopicName;
+    private final boolean createTopic;
+
+    /**
+     * @param createTopic - If true, a topic shall be created for the value
+     *            provided by {@link #getKafkaTopicName()}. If false, no topics
+     *            shall be created.
+     */
+    public KafkaTestInstanceRule(final boolean createTopic) {
+        this.createTopic = createTopic;
+    }
+
+    /**
+     * @return A unique topic name for this test execution. If multiple topics 
are required by a test, use this value as
+     *         a prefix.
+     */
+    public String getKafkaTopicName() {
+        if (kafkaTopicName == null) {
+            throw new IllegalStateException("Cannot get Kafka Topic Name 
outside of a test execution.");
+        }
+        return kafkaTopicName;
+    }
+
+    @Override
+    protected void before() throws Throwable {
+        // Get the next kafka topic name.
+        kafkaTopicName = kafkaInstance.getUniqueTopicName();
+
+        if(createTopic) {
+            createTopic(kafkaTopicName);
+        }
+    }
+
+    @Override
+    protected void after() {
+        kafkaTopicName = null;
+    }
+
+    /**
+     * Utility method to provide additional unique topics if they are required.
+     * @param topicName - The Kafka topic to create.
+     */
+    public void createTopic(final String topicName) {
+        // Setup Kafka.
+        ZkUtils zkUtils = null;
+        try {
+            logger.info("Creating Kafka Topic: '{}'", topicName);
+            zkUtils = ZkUtils.apply(new 
ZkClient(kafkaInstance.getZookeeperConnect(), 30000, 30000, 
ZKStringSerializer$.MODULE$), false);
+            AdminUtils.createTopic(zkUtils, topicName, 1, 1, new Properties(), 
RackAwareMode.Disabled$.MODULE$);
+        }
+        finally {
+            if(zkUtils != null) {
+                zkUtils.close();
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/82df3ad0/extras/rya.periodic.service/periodic.service.integration.tests/pom.xml
----------------------------------------------------------------------
diff --git 
a/extras/rya.periodic.service/periodic.service.integration.tests/pom.xml 
b/extras/rya.periodic.service/periodic.service.integration.tests/pom.xml
index 1b784a6..20a0647 100644
--- a/extras/rya.periodic.service/periodic.service.integration.tests/pom.xml
+++ b/extras/rya.periodic.service/periodic.service.integration.tests/pom.xml
@@ -1,62 +1,71 @@
+<?xml version="1.0" encoding="utf-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
 <project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
-       xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
-       <!-- Licensed to the Apache Software Foundation (ASF) under one or more 
-               contributor license agreements. See the NOTICE file distributed 
with this 
-               work for additional information regarding copyright ownership. 
The ASF licenses 
-               this file to you under the Apache License, Version 2.0 (the 
"License"); you 
-               may not use this file except in compliance with the License. 
You may obtain 
-               a copy of the License at 
http://www.apache.org/licenses/LICENSE-2.0 Unless 
-               required by applicable law or agreed to in writing, software 
distributed 
-               under the License is distributed on an "AS IS" BASIS, WITHOUT 
WARRANTIES 
-               OR CONDITIONS OF ANY KIND, either express or implied. See the 
License for 
-               the specific language governing permissions and limitations 
under the License. -->
-       <modelVersion>4.0.0</modelVersion>
+    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.rya</groupId>
+        <artifactId>rya.periodic.service</artifactId>
+        <version>3.2.11-incubating-SNAPSHOT</version>
+    </parent>
 
-       <parent>
-               <groupId>org.apache.rya</groupId>
-               <artifactId>rya.periodic.service</artifactId>
-               <version>3.2.11-incubating-SNAPSHOT</version>
-       </parent>
+    <artifactId>rya.periodic.service.integration.tests</artifactId>
 
-       <artifactId>rya.periodic.service.integration.tests</artifactId>
-       
-       <name>Apache Rya Periodic Service Integration Tests</name>
+    <name>Apache Rya Periodic Service Integration Tests</name>
     <description>Integration Tests for Rya Periodic Service</description>
 
-       <dependencies>
-               <dependency>
-                       <groupId>org.apache.rya</groupId>
-                       <artifactId>rya.pcj.fluo.test.base</artifactId>
-                       <exclusions>
-                               <exclusion>
-                                       <artifactId>log4j-1.2-api</artifactId>
-                                       
<groupId>org.apache.logging.log4j</groupId>
-                               </exclusion>
-                               <exclusion>
-                                       <artifactId>log4j-api</artifactId>
-                                       
<groupId>org.apache.logging.log4j</groupId>
-                               </exclusion>
-                               <exclusion>
-                                       <artifactId>log4j-core</artifactId>
-                                       
<groupId>org.apache.logging.log4j</groupId>
-                               </exclusion>
-                       </exclusions>
-               </dependency>
-               <dependency>
-                       <groupId>org.apache.rya</groupId>
-                       
<artifactId>rya.periodic.service.notification</artifactId>
-                       <version>${project.version}</version>
-                       <exclusions>
-                               <exclusion>
-                                       <artifactId>logback-classic</artifactId>
-                                       <groupId>ch.qos.logback</groupId>
-                               </exclusion>
-                               <exclusion>
-                                       <artifactId>logback-core</artifactId>
-                                       <groupId>ch.qos.logback</groupId>
-                               </exclusion>
-                       </exclusions>
-               </dependency>
-       </dependencies>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.rya</groupId>
+            <artifactId>rya.pcj.fluo.test.base</artifactId>
+            <exclusions>
+                <exclusion>
+                    <artifactId>log4j-1.2-api</artifactId>
+                    <groupId>org.apache.logging.log4j</groupId>
+                </exclusion>
+                <exclusion>
+                    <artifactId>log4j-api</artifactId>
+                    <groupId>org.apache.logging.log4j</groupId>
+                </exclusion>
+                <exclusion>
+                    <artifactId>log4j-core</artifactId>
+                    <groupId>org.apache.logging.log4j</groupId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.rya</groupId>
+            <artifactId>rya.periodic.service.notification</artifactId>
+            <version>${project.version}</version>
+            <exclusions>
+                <exclusion>
+                    <artifactId>logback-classic</artifactId>
+                    <groupId>ch.qos.logback</groupId>
+                </exclusion>
+                <exclusion>
+                    <artifactId>logback-core</artifactId>
+                    <groupId>ch.qos.logback</groupId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+    </dependencies>
 
 </project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/82df3ad0/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/exporter/PeriodicNotificationExporterIT.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/exporter/PeriodicNotificationExporterIT.java
 
b/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/exporter/PeriodicNotificationExporterIT.java
index c0efc4f..c5dc809 100644
--- 
a/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/exporter/PeriodicNotificationExporterIT.java
+++ 
b/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/exporter/PeriodicNotificationExporterIT.java
@@ -22,6 +22,7 @@ import java.util.Arrays;
 import java.util.HashSet;
 import java.util.Properties;
 import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 
@@ -34,8 +35,10 @@ import 
org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
 import org.apache.rya.kafka.base.KafkaITBase;
+import org.apache.rya.kafka.base.KafkaTestInstanceRule;
 import org.apache.rya.periodic.notification.serialization.BindingSetSerDe;
 import org.junit.Assert;
+import org.junit.Rule;
 import org.junit.Test;
 import org.openrdf.model.ValueFactory;
 import org.openrdf.model.impl.ValueFactoryImpl;
@@ -44,82 +47,91 @@ import org.openrdf.query.algebra.evaluation.QueryBindingSet;
 
 public class PeriodicNotificationExporterIT extends KafkaITBase {
 
+
+    @Rule
+    public KafkaTestInstanceRule kafkaTestInstanceRule = new 
KafkaTestInstanceRule(false);
+
+
     private static final ValueFactory vf = new ValueFactoryImpl();
-    
+
     @Test
     public void testExporter() throws InterruptedException {
-        
-        BlockingQueue<BindingSetRecord> records = new LinkedBlockingQueue<>();
-        Properties props = createKafkaConfig();
-        
-        KafkaExporterExecutor exporter = new KafkaExporterExecutor(new 
KafkaProducer<String, BindingSet>(props), 1, records);
+
+        final String topic1 = kafkaTestInstanceRule.getKafkaTopicName() + "1";
+        final String topic2 = kafkaTestInstanceRule.getKafkaTopicName() + "2";
+
+        kafkaTestInstanceRule.createTopic(topic1);
+        kafkaTestInstanceRule.createTopic(topic2);
+
+        final BlockingQueue<BindingSetRecord> records = new 
LinkedBlockingQueue<>();
+
+        final KafkaExporterExecutor exporter = new KafkaExporterExecutor(new 
KafkaProducer<String, BindingSet>(createKafkaProducerConfig()), 1, records);
         exporter.start();
-        
-        QueryBindingSet bs1 = new QueryBindingSet();
+        final QueryBindingSet bs1 = new QueryBindingSet();
         bs1.addBinding(PeriodicQueryResultStorage.PeriodicBinId, 
vf.createLiteral(1L));
         bs1.addBinding("name", vf.createURI("uri:Bob"));
-        BindingSetRecord record1 = new BindingSetRecord(bs1, "topic1");
-        
-        QueryBindingSet bs2 = new QueryBindingSet();
+        final BindingSetRecord record1 = new BindingSetRecord(bs1, topic1);
+
+        final QueryBindingSet bs2 = new QueryBindingSet();
         bs2.addBinding(PeriodicQueryResultStorage.PeriodicBinId, 
vf.createLiteral(2L));
         bs2.addBinding("name", vf.createURI("uri:Joe"));
-        BindingSetRecord record2 = new BindingSetRecord(bs2, "topic2");
-        
+        final BindingSetRecord record2 = new BindingSetRecord(bs2, topic2);
+
         records.add(record1);
         records.add(record2);
-        
-        Set<BindingSet> expected1 = new HashSet<>();
+
+        final Set<BindingSet> expected1 = new HashSet<>();
         expected1.add(bs1);
-        Set<BindingSet> expected2 = new HashSet<>();
+        final Set<BindingSet> expected2 = new HashSet<>();
         expected2.add(bs2);
-        
-        Set<BindingSet> actual1 = getBindingSetsFromKafka("topic1");
-        Set<BindingSet> actual2 = getBindingSetsFromKafka("topic2");
-        
+
+        final Set<BindingSet> actual1 = getBindingSetsFromKafka(topic1);
+        final Set<BindingSet> actual2 = getBindingSetsFromKafka(topic2);
+
         Assert.assertEquals(expected1, actual1);
         Assert.assertEquals(expected2, actual2);
-        
+
         exporter.stop();
-        
     }
-    
-    
-    private Properties createKafkaConfig() {
-        Properties props = new Properties();
-        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"127.0.0.1:9092");
-        props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group0");
-        props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "consumer0");
-        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+
+
+    private Properties createKafkaProducerConfig() {
+        final Properties props = createBootstrapServerConfig();
         props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
         props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
BindingSetSerDe.class.getName());
+        return props;
+    }
+    private Properties createKafkaConsumerConfig() {
+        final Properties props = createBootstrapServerConfig();
+        props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, 
UUID.randomUUID().toString());
+        props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "consumer0");
+        props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());
         props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
BindingSetSerDe.class.getName());
-
         return props;
     }
-    
-    
-    private KafkaConsumer<String, BindingSet> makeBindingSetConsumer(final 
String TopicName) {
+
+
+    private KafkaConsumer<String, BindingSet> makeBindingSetConsumer(final 
String topicName) {
         // setup consumer
-        final Properties consumerProps = createKafkaConfig();
-        final KafkaConsumer<String, BindingSet> consumer = new 
KafkaConsumer<>(consumerProps);
-        consumer.subscribe(Arrays.asList(TopicName));
+        final KafkaConsumer<String, BindingSet> consumer = new 
KafkaConsumer<>(createKafkaConsumerConfig());
+        consumer.subscribe(Arrays.asList(topicName));
         return consumer;
     }
-    
-    private Set<BindingSet> getBindingSetsFromKafka(String topic) {
+
+    private Set<BindingSet> getBindingSetsFromKafka(final String topicName) {
         KafkaConsumer<String, BindingSet> consumer = null;
 
         try {
-            consumer = makeBindingSetConsumer(topic);
-            ConsumerRecords<String, BindingSet> records = consumer.poll(5000);
+            consumer = makeBindingSetConsumer(topicName);
+            final ConsumerRecords<String, BindingSet> records = 
consumer.poll(20000);  // Wait up to 20 seconds for a result to be published.
 
-            Set<BindingSet> bindingSets = new HashSet<>();
+            final Set<BindingSet> bindingSets = new HashSet<>();
             records.forEach(x -> bindingSets.add(x.value()));
 
             return bindingSets;
 
-        } catch (Exception e) {
+        } catch (final Exception e) {
             throw new RuntimeException(e);
         } finally {
             if (consumer != null) {

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/82df3ad0/extras/rya.periodic.service/periodic.service.integration.tests/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git 
a/extras/rya.periodic.service/periodic.service.integration.tests/src/test/resources/log4j.properties
 
b/extras/rya.periodic.service/periodic.service.integration.tests/src/test/resources/log4j.properties
new file mode 100644
index 0000000..19cc13c
--- /dev/null
+++ 
b/extras/rya.periodic.service/periodic.service.integration.tests/src/test/resources/log4j.properties
@@ -0,0 +1,37 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Valid levels:
+# TRACE, DEBUG, INFO, WARN, ERROR and FATAL
+log4j.rootLogger=INFO, CONSOLE
+
+# Set independent logging levels
+log4j.logger.org.apache.zookeeper=WARN
+log4j.logger.kafka=WARN
+log4j.logger.org.apache.kafka=WARN
+
+# LOGFILE is set to be a File appender using a PatternLayout.
+log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
+#log4j.appender.CONSOLE.Threshold=DEBUG
+
+log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
+log4j.appender.CONSOLE.layout.ConversionPattern=%d [%t] %-5p %c - %m%n
+
+#log4j.appender.CONSOLE.layout=org.apache.log4j.EnhancedPatternLayout
+#log4j.appender.CONSOLE.layout.ConversionPattern=%d [%t] %-5p %c{1.} - %m%n
\ No newline at end of file

Reply via email to