RYA-402 Create Kafka reusable test code project. Closes #242.
Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/4089e706 Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/4089e706 Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/4089e706 Branch: refs/heads/master Commit: 4089e706ca17c54ced3723602672f1a69d2aa2be Parents: 6dd81bd Author: kchilton2 <kevin.e.chil...@gmail.com> Authored: Tue Oct 10 18:36:25 2017 -0400 Committer: jdasch <hcs...@gmail.com> Committed: Thu Oct 12 12:51:38 2017 -0400 ---------------------------------------------------------------------- .../pcj/matching/AccumuloIndexSetProvider.java | 3 +- ...dicNotificationApplicationConfiguration.java | 2 +- .../exporter/KafkaExporterExecutor.java | 2 +- .../KafkaPeriodicBindingSetExporter.java | 2 +- .../notification/pruner/AccumuloBinPruner.java | 2 +- .../pruner/PeriodicQueryPruner.java | 2 +- extras/periodic.notification/tests/pom.xml | 4 + .../PeriodicNotificationApplicationIT.java | 288 ++++++++++--------- .../PeriodicNotificationExporterIT.java | 4 +- .../PeriodicCommandNotificationConsumerIT.java | 38 +-- .../app/batch/AbstractSpanBatchInformation.java | 2 +- .../fluo/app/batch/JoinBatchInformation.java | 2 +- .../export/kafka/KafkaExportParameterBase.java | 2 +- .../rya/kafka/base/EmbeddedKafkaInstance.java | 143 --------- .../rya/kafka/base/EmbeddedKafkaSingleton.java | 87 ------ .../org/apache/rya/kafka/base/KafkaITBase.java | 38 --- .../rya/kafka/base/KafkaTestInstanceRule.java | 98 ------- pom.xml | 11 + test/kafka/pom.xml | 81 ++++++ .../rya/test/kafka/EmbeddedKafkaInstance.java | 142 +++++++++ .../rya/test/kafka/EmbeddedKafkaSingleton.java | 87 ++++++ .../org/apache/rya/test/kafka/KafkaITBase.java | 38 +++ .../rya/test/kafka/KafkaTestInstanceRule.java | 98 +++++++ .../org/apache/rya/test/kafka/PortUtils.java | 44 +++ test/pom.xml | 39 +++ 25 files changed, 721 insertions(+), 538 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/4089e706/extras/indexing/src/main/java/org/apache/rya/indexing/pcj/matching/AccumuloIndexSetProvider.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/pcj/matching/AccumuloIndexSetProvider.java b/extras/indexing/src/main/java/org/apache/rya/indexing/pcj/matching/AccumuloIndexSetProvider.java index 1940e64..40e2c77 100644 --- a/extras/indexing/src/main/java/org/apache/rya/indexing/pcj/matching/AccumuloIndexSetProvider.java +++ b/extras/indexing/src/main/java/org/apache/rya/indexing/pcj/matching/AccumuloIndexSetProvider.java @@ -52,11 +52,10 @@ import org.openrdf.query.QueryEvaluationException; import org.openrdf.query.algebra.TupleExpr; import org.openrdf.sail.SailException; +import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import jline.internal.Preconditions; - /** * Implementation of {@link ExternalSetProvider} that provides {@link ExternalTupleSet}s. * This provider uses either user specified Accumulo configuration information or user a specified http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/4089e706/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationConfiguration.java ---------------------------------------------------------------------- diff --git a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationConfiguration.java b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationConfiguration.java index d69efe5..ff58979 100644 --- a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationConfiguration.java +++ b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationConfiguration.java @@ -22,7 +22,7 @@ import java.util.Properties; import org.apache.rya.accumulo.AccumuloRdfConfiguration; -import jline.internal.Preconditions; +import com.google.common.base.Preconditions; /** * Configuration object for creating a {@link PeriodicNotificationApplication}. http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/4089e706/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/exporter/KafkaExporterExecutor.java ---------------------------------------------------------------------- diff --git a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/exporter/KafkaExporterExecutor.java b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/exporter/KafkaExporterExecutor.java index c2e5ebf..3b639e9 100644 --- a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/exporter/KafkaExporterExecutor.java +++ b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/exporter/KafkaExporterExecutor.java @@ -32,7 +32,7 @@ import org.apache.rya.periodic.notification.api.BindingSetRecord; import org.apache.rya.periodic.notification.api.LifeCycle; import org.openrdf.query.BindingSet; -import jline.internal.Preconditions; +import com.google.common.base.Preconditions; /** * Executor service that runs {@link KafkaPeriodicBindingSetExporter}s. http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/4089e706/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/exporter/KafkaPeriodicBindingSetExporter.java ---------------------------------------------------------------------- diff --git a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/exporter/KafkaPeriodicBindingSetExporter.java b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/exporter/KafkaPeriodicBindingSetExporter.java index 8a0322f..5397618 100644 --- a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/exporter/KafkaPeriodicBindingSetExporter.java +++ b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/exporter/KafkaPeriodicBindingSetExporter.java @@ -36,7 +36,7 @@ import org.apache.rya.periodic.notification.api.BindingSetRecordExportException; import org.openrdf.model.Literal; import org.openrdf.query.BindingSet; -import jline.internal.Preconditions; +import com.google.common.base.Preconditions; /** * Object that exports {@link BindingSet}s to the Kafka topic indicated by http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/4089e706/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/pruner/AccumuloBinPruner.java ---------------------------------------------------------------------- diff --git a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/pruner/AccumuloBinPruner.java b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/pruner/AccumuloBinPruner.java index 4dac64c..a9403c2 100644 --- a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/pruner/AccumuloBinPruner.java +++ b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/pruner/AccumuloBinPruner.java @@ -24,7 +24,7 @@ import org.apache.rya.indexing.pcj.storage.PeriodicQueryStorageException; import org.apache.rya.periodic.notification.api.BinPruner; import org.apache.rya.periodic.notification.api.NodeBin; -import jline.internal.Preconditions; +import com.google.common.base.Preconditions; /** * Deletes BindingSets from time bins in the indicated PCJ table http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/4089e706/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/pruner/PeriodicQueryPruner.java ---------------------------------------------------------------------- diff --git a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/pruner/PeriodicQueryPruner.java b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/pruner/PeriodicQueryPruner.java index 516690e..327154a 100644 --- a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/pruner/PeriodicQueryPruner.java +++ b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/pruner/PeriodicQueryPruner.java @@ -32,7 +32,7 @@ import org.apache.rya.indexing.pcj.fluo.app.util.PeriodicQueryUtil; import org.apache.rya.periodic.notification.api.BinPruner; import org.apache.rya.periodic.notification.api.NodeBin; -import jline.internal.Preconditions; +import com.google.common.base.Preconditions; /** * Implementation of {@link BinPruner} that deletes old, already processed http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/4089e706/extras/periodic.notification/tests/pom.xml ---------------------------------------------------------------------- diff --git a/extras/periodic.notification/tests/pom.xml b/extras/periodic.notification/tests/pom.xml index 229a761..feb1f0f 100644 --- a/extras/periodic.notification/tests/pom.xml +++ b/extras/periodic.notification/tests/pom.xml @@ -26,6 +26,10 @@ <dependencies> <dependency> <groupId>org.apache.rya</groupId> + <artifactId>rya.test.kafka</artifactId> + </dependency> + <dependency> + <groupId>org.apache.rya</groupId> <artifactId>rya.pcj.fluo.test.base</artifactId> <exclusions> <exclusion> http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/4089e706/extras/periodic.notification/tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationIT.java ---------------------------------------------------------------------- diff --git a/extras/periodic.notification/tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationIT.java b/extras/periodic.notification/tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationIT.java index 9109775..3b6062f 100644 --- a/extras/periodic.notification/tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationIT.java +++ b/extras/periodic.notification/tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationIT.java @@ -18,6 +18,9 @@ */ package org.apache.rya.periodic.notification.application; +import static org.apache.rya.periodic.notification.application.PeriodicNotificationApplicationConfiguration.KAFKA_BOOTSTRAP_SERVERS; +import static org.apache.rya.periodic.notification.application.PeriodicNotificationApplicationConfiguration.NOTIFICATION_TOPIC; + import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; @@ -60,14 +63,14 @@ import org.apache.rya.indexing.pcj.fluo.app.util.FluoQueryUtils; import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage; import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator; import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPeriodicQueryResultStorage; -import org.apache.rya.kafka.base.EmbeddedKafkaInstance; -import org.apache.rya.kafka.base.EmbeddedKafkaSingleton; -import org.apache.rya.kafka.base.KafkaTestInstanceRule; import org.apache.rya.pcj.fluo.test.base.RyaExportITBase; import org.apache.rya.periodic.notification.notification.CommandNotification; import org.apache.rya.periodic.notification.registration.KafkaNotificationRegistrationClient; import org.apache.rya.periodic.notification.serialization.BindingSetSerDe; import org.apache.rya.periodic.notification.serialization.CommandNotificationSerializer; +import org.apache.rya.test.kafka.EmbeddedKafkaInstance; +import org.apache.rya.test.kafka.EmbeddedKafkaSingleton; +import org.apache.rya.test.kafka.KafkaTestInstanceRule; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -85,10 +88,7 @@ import org.openrdf.query.algebra.evaluation.QueryBindingSet; import com.google.common.collect.HashMultimap; import com.google.common.collect.Multimap; -import com.google.common.collect.Sets; - -import static org.apache.rya.periodic.notification.application.PeriodicNotificationApplicationConfiguration.NOTIFICATION_TOPIC; -import static org.apache.rya.periodic.notification.application.PeriodicNotificationApplicationConfiguration.KAFKA_BOOTSTRAP_SERVERS;; +import com.google.common.collect.Sets;; public class PeriodicNotificationApplicationIT extends RyaExportITBase { @@ -101,62 +101,64 @@ public class PeriodicNotificationApplicationIT extends RyaExportITBase { private PeriodicNotificationApplicationConfiguration conf; private static EmbeddedKafkaInstance embeddedKafka = EmbeddedKafkaSingleton.getInstance(); private static String bootstrapServers; - + @Rule public KafkaTestInstanceRule rule = new KafkaTestInstanceRule(false); - + @BeforeClass public static void initClass() { bootstrapServers = embeddedKafka.createBootstrapServerConfig().getProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG); } - + @Before public void init() throws Exception { - String topic = rule.getKafkaTopicName(); + final String topic = rule.getKafkaTopicName(); rule.createTopic(topic); - + //get user specified props and update with the embedded kafka bootstrap servers and rule generated topic props = getProps(); props.setProperty(NOTIFICATION_TOPIC, topic); props.setProperty(KAFKA_BOOTSTRAP_SERVERS, bootstrapServers); conf = new PeriodicNotificationApplicationConfiguration(props); - + //create Kafka Producer kafkaProps = getKafkaProperties(conf); producer = new KafkaProducer<>(kafkaProps, new StringSerializer(), new CommandNotificationSerializer()); - + //extract kafka specific properties from application config app = PeriodicNotificationApplicationFactory.getPeriodicApplication(props); registrar = new KafkaNotificationRegistrationClient(conf.getNotificationTopic(), producer); } - + @Test public void periodicApplicationWithAggAndGroupByTest() throws Exception { - String sparql = "prefix function: <http://org.apache.rya/function#> " // n + final String sparql = "prefix function: <http://org.apache.rya/function#> " // n + "prefix time: <http://www.w3.org/2006/time#> " // n + "select ?type (count(?obs) as ?total) where {" // n + "Filter(function:periodic(?time, 1, .25, time:minutes)) " // n + "?obs <uri:hasTime> ?time. " // n + "?obs <uri:hasObsType> ?type } group by ?type"; // n - + //make data - int periodMult = 15; + final int periodMult = 15; final ValueFactory vf = new ValueFactoryImpl(); final DatatypeFactory dtf = DatatypeFactory.newInstance(); //Sleep until current time aligns nicely with period to makell //results more predictable - while(System.currentTimeMillis() % (periodMult*1000) > 500); - ZonedDateTime time = ZonedDateTime.now(); + while(System.currentTimeMillis() % (periodMult*1000) > 500) { + ; + } + final ZonedDateTime time = ZonedDateTime.now(); - ZonedDateTime zTime1 = time.minusSeconds(2*periodMult); - String time1 = zTime1.format(DateTimeFormatter.ISO_INSTANT); + final ZonedDateTime zTime1 = time.minusSeconds(2*periodMult); + final String time1 = zTime1.format(DateTimeFormatter.ISO_INSTANT); - ZonedDateTime zTime2 = zTime1.minusSeconds(periodMult); - String time2 = zTime2.format(DateTimeFormatter.ISO_INSTANT); + final ZonedDateTime zTime2 = zTime1.minusSeconds(periodMult); + final String time2 = zTime2.format(DateTimeFormatter.ISO_INSTANT); - ZonedDateTime zTime3 = zTime2.minusSeconds(periodMult); - String time3 = zTime3.format(DateTimeFormatter.ISO_INSTANT); + final ZonedDateTime zTime3 = zTime2.minusSeconds(periodMult); + final String time3 = zTime3.format(DateTimeFormatter.ISO_INSTANT); final Collection<Statement> statements = Sets.newHashSet( vf.createStatement(vf.createURI("urn:obs_1"), vf.createURI("uri:hasTime"), @@ -174,26 +176,26 @@ public class PeriodicNotificationApplicationIT extends RyaExportITBase { vf.createStatement(vf.createURI("urn:obs_5"), vf.createURI("uri:hasTime"), vf.createLiteral(dtf.newXMLGregorianCalendar(time3))), vf.createStatement(vf.createURI("urn:obs_5"), vf.createURI("uri:hasObsType"), vf.createLiteral("automobile"))); - + try (FluoClient fluo = FluoClientFactory.getFluoClient(conf.getFluoAppName(), Optional.of(conf.getFluoTableName()), conf)) { - Connector connector = ConfigUtils.getConnector(conf); - PeriodicQueryResultStorage storage = new AccumuloPeriodicQueryResultStorage(connector, conf.getTablePrefix()); - CreatePeriodicQuery periodicQuery = new CreatePeriodicQuery(fluo, storage); - String id = FluoQueryUtils.convertFluoQueryIdToPcjId(periodicQuery.createPeriodicQuery(sparql, registrar).getQueryId()); + final Connector connector = ConfigUtils.getConnector(conf); + final PeriodicQueryResultStorage storage = new AccumuloPeriodicQueryResultStorage(connector, conf.getTablePrefix()); + final CreatePeriodicQuery periodicQuery = new CreatePeriodicQuery(fluo, storage); + final String id = FluoQueryUtils.convertFluoQueryIdToPcjId(periodicQuery.createPeriodicQuery(sparql, registrar).getQueryId()); addData(statements); app.start(); - - Multimap<Long, BindingSet> actual = HashMultimap.create(); + + final Multimap<Long, BindingSet> actual = HashMultimap.create(); try (KafkaConsumer<String, BindingSet> consumer = new KafkaConsumer<>(kafkaProps, new StringDeserializer(), new BindingSetSerDe())) { consumer.subscribe(Arrays.asList(id)); - long end = System.currentTimeMillis() + 4*periodMult*1000; + final long end = System.currentTimeMillis() + 4*periodMult*1000; long lastBinId = 0L; long binId = 0L; - List<Long> ids = new ArrayList<>(); + final List<Long> ids = new ArrayList<>(); while (System.currentTimeMillis() < end) { - ConsumerRecords<String, BindingSet> records = consumer.poll(periodMult*1000); - for(ConsumerRecord<String, BindingSet> record: records){ - BindingSet result = record.value(); + final ConsumerRecords<String, BindingSet> records = consumer.poll(periodMult*1000); + for(final ConsumerRecord<String, BindingSet> record: records){ + final BindingSet result = record.value(); binId = Long.parseLong(result.getBinding(IncrementalUpdateConstants.PERIODIC_BIN_ID).getValue().stringValue()); if(lastBinId != binId) { lastBinId = binId; @@ -202,103 +204,105 @@ public class PeriodicNotificationApplicationIT extends RyaExportITBase { actual.put(binId, result); } } - - Map<Long, Set<BindingSet>> expected = new HashMap<>(); - - Set<BindingSet> expected1 = new HashSet<>(); - QueryBindingSet bs1 = new QueryBindingSet(); + + final Map<Long, Set<BindingSet>> expected = new HashMap<>(); + + final Set<BindingSet> expected1 = new HashSet<>(); + final QueryBindingSet bs1 = new QueryBindingSet(); bs1.addBinding(IncrementalUpdateConstants.PERIODIC_BIN_ID, vf.createLiteral(ids.get(0))); bs1.addBinding("total", new LiteralImpl("2", XMLSchema.INTEGER)); bs1.addBinding("type", vf.createLiteral("airplane")); - - QueryBindingSet bs2 = new QueryBindingSet(); + + final QueryBindingSet bs2 = new QueryBindingSet(); bs2.addBinding(IncrementalUpdateConstants.PERIODIC_BIN_ID, vf.createLiteral(ids.get(0))); bs2.addBinding("total", new LiteralImpl("2", XMLSchema.INTEGER)); bs2.addBinding("type", vf.createLiteral("ship")); - - QueryBindingSet bs3 = new QueryBindingSet(); + + final QueryBindingSet bs3 = new QueryBindingSet(); bs3.addBinding(IncrementalUpdateConstants.PERIODIC_BIN_ID, vf.createLiteral(ids.get(0))); bs3.addBinding("total", new LiteralImpl("1", XMLSchema.INTEGER)); bs3.addBinding("type", vf.createLiteral("automobile")); - + expected1.add(bs1); expected1.add(bs2); expected1.add(bs3); - - Set<BindingSet> expected2 = new HashSet<>(); - QueryBindingSet bs4 = new QueryBindingSet(); + + final Set<BindingSet> expected2 = new HashSet<>(); + final QueryBindingSet bs4 = new QueryBindingSet(); bs4.addBinding(IncrementalUpdateConstants.PERIODIC_BIN_ID, vf.createLiteral(ids.get(1))); bs4.addBinding("total", new LiteralImpl("2", XMLSchema.INTEGER)); bs4.addBinding("type", vf.createLiteral("airplane")); - - QueryBindingSet bs5 = new QueryBindingSet(); + + final QueryBindingSet bs5 = new QueryBindingSet(); bs5.addBinding(IncrementalUpdateConstants.PERIODIC_BIN_ID, vf.createLiteral(ids.get(1))); bs5.addBinding("total", new LiteralImpl("2", XMLSchema.INTEGER)); bs5.addBinding("type", vf.createLiteral("ship")); - + expected2.add(bs4); expected2.add(bs5); - - Set<BindingSet> expected3 = new HashSet<>(); - QueryBindingSet bs6 = new QueryBindingSet(); + + final Set<BindingSet> expected3 = new HashSet<>(); + final QueryBindingSet bs6 = new QueryBindingSet(); bs6.addBinding(IncrementalUpdateConstants.PERIODIC_BIN_ID, vf.createLiteral(ids.get(2))); bs6.addBinding("total", new LiteralImpl("1", XMLSchema.INTEGER)); bs6.addBinding("type", vf.createLiteral("ship")); - - QueryBindingSet bs7 = new QueryBindingSet(); + + final QueryBindingSet bs7 = new QueryBindingSet(); bs7.addBinding(IncrementalUpdateConstants.PERIODIC_BIN_ID, vf.createLiteral(ids.get(2))); bs7.addBinding("total", new LiteralImpl("1", XMLSchema.INTEGER)); bs7.addBinding("type", vf.createLiteral("airplane")); - + expected3.add(bs6); expected3.add(bs7); - + expected.put(ids.get(0), expected1); expected.put(ids.get(1), expected2); expected.put(ids.get(2), expected3); - + Assert.assertEquals(3, actual.asMap().size()); - for(Long ident: ids) { + for(final Long ident: ids) { Assert.assertEquals(expected.get(ident), actual.get(ident)); } } - - Set<BindingSet> expectedResults = new HashSet<>(); + + final Set<BindingSet> expectedResults = new HashSet<>(); try (CloseableIterator<BindingSet> results = storage.listResults(id, Optional.empty())) { results.forEachRemaining(x -> expectedResults.add(x)); Assert.assertEquals(0, expectedResults.size()); } } } - - + + @Test public void periodicApplicationWithAggTest() throws Exception { - String sparql = "prefix function: <http://org.apache.rya/function#> " // n + final String sparql = "prefix function: <http://org.apache.rya/function#> " // n + "prefix time: <http://www.w3.org/2006/time#> " // n + "select (count(?obs) as ?total) where {" // n + "Filter(function:periodic(?time, 1, .25, time:minutes)) " // n + "?obs <uri:hasTime> ?time. " // n + "?obs <uri:hasId> ?id } "; // n - + //make data - int periodMult = 15; + final int periodMult = 15; final ValueFactory vf = new ValueFactoryImpl(); final DatatypeFactory dtf = DatatypeFactory.newInstance(); //Sleep until current time aligns nicely with period to make //results more predictable - while(System.currentTimeMillis() % (periodMult*1000) > 500); - ZonedDateTime time = ZonedDateTime.now(); + while(System.currentTimeMillis() % (periodMult*1000) > 500) { + ; + } + final ZonedDateTime time = ZonedDateTime.now(); - ZonedDateTime zTime1 = time.minusSeconds(2*periodMult); - String time1 = zTime1.format(DateTimeFormatter.ISO_INSTANT); + final ZonedDateTime zTime1 = time.minusSeconds(2*periodMult); + final String time1 = zTime1.format(DateTimeFormatter.ISO_INSTANT); - ZonedDateTime zTime2 = zTime1.minusSeconds(periodMult); - String time2 = zTime2.format(DateTimeFormatter.ISO_INSTANT); + final ZonedDateTime zTime2 = zTime1.minusSeconds(periodMult); + final String time2 = zTime2.format(DateTimeFormatter.ISO_INSTANT); - ZonedDateTime zTime3 = zTime2.minusSeconds(periodMult); - String time3 = zTime3.format(DateTimeFormatter.ISO_INSTANT); + final ZonedDateTime zTime3 = zTime2.minusSeconds(periodMult); + final String time3 = zTime3.format(DateTimeFormatter.ISO_INSTANT); final Collection<Statement> statements = Sets.newHashSet( vf.createStatement(vf.createURI("urn:obs_1"), vf.createURI("uri:hasTime"), @@ -310,26 +314,26 @@ public class PeriodicNotificationApplicationIT extends RyaExportITBase { vf.createStatement(vf.createURI("urn:obs_3"), vf.createURI("uri:hasTime"), vf.createLiteral(dtf.newXMLGregorianCalendar(time3))), vf.createStatement(vf.createURI("urn:obs_3"), vf.createURI("uri:hasId"), vf.createLiteral("id_3"))); - + try (FluoClient fluo = FluoClientFactory.getFluoClient(conf.getFluoAppName(), Optional.of(conf.getFluoTableName()), conf)) { - Connector connector = ConfigUtils.getConnector(conf); - PeriodicQueryResultStorage storage = new AccumuloPeriodicQueryResultStorage(connector, conf.getTablePrefix()); - CreatePeriodicQuery periodicQuery = new CreatePeriodicQuery(fluo, storage); - String id = FluoQueryUtils.convertFluoQueryIdToPcjId(periodicQuery.createPeriodicQuery(sparql, registrar).getQueryId()); + final Connector connector = ConfigUtils.getConnector(conf); + final PeriodicQueryResultStorage storage = new AccumuloPeriodicQueryResultStorage(connector, conf.getTablePrefix()); + final CreatePeriodicQuery periodicQuery = new CreatePeriodicQuery(fluo, storage); + final String id = FluoQueryUtils.convertFluoQueryIdToPcjId(periodicQuery.createPeriodicQuery(sparql, registrar).getQueryId()); addData(statements); app.start(); - - Multimap<Long, BindingSet> expected = HashMultimap.create(); + + final Multimap<Long, BindingSet> expected = HashMultimap.create(); try (KafkaConsumer<String, BindingSet> consumer = new KafkaConsumer<>(kafkaProps, new StringDeserializer(), new BindingSetSerDe())) { consumer.subscribe(Arrays.asList(id)); - long end = System.currentTimeMillis() + 4*periodMult*1000; + final long end = System.currentTimeMillis() + 4*periodMult*1000; long lastBinId = 0L; long binId = 0L; - List<Long> ids = new ArrayList<>(); + final List<Long> ids = new ArrayList<>(); while (System.currentTimeMillis() < end) { - ConsumerRecords<String, BindingSet> records = consumer.poll(periodMult*1000); - for(ConsumerRecord<String, BindingSet> record: records){ - BindingSet result = record.value(); + final ConsumerRecords<String, BindingSet> records = consumer.poll(periodMult*1000); + for(final ConsumerRecord<String, BindingSet> record: records){ + final BindingSet result = record.value(); binId = Long.parseLong(result.getBinding(IncrementalUpdateConstants.PERIODIC_BIN_ID).getValue().stringValue()); if(lastBinId != binId) { lastBinId = binId; @@ -338,21 +342,21 @@ public class PeriodicNotificationApplicationIT extends RyaExportITBase { expected.put(binId, result); } } - + Assert.assertEquals(3, expected.asMap().size()); int i = 0; - for(Long ident: ids) { + for(final Long ident: ids) { Assert.assertEquals(1, expected.get(ident).size()); - BindingSet bs = expected.get(ident).iterator().next(); - Value val = bs.getValue("total"); - int total = Integer.parseInt(val.stringValue()); + final BindingSet bs = expected.get(ident).iterator().next(); + final Value val = bs.getValue("total"); + final int total = Integer.parseInt(val.stringValue()); Assert.assertEquals(3-i, total); i++; } } - - - Set<BindingSet> expectedResults = new HashSet<>(); + + + final Set<BindingSet> expectedResults = new HashSet<>(); try (CloseableIterator<BindingSet> results = storage.listResults(id, Optional.empty())) { results.forEachRemaining(x -> expectedResults.add(x)); Assert.assertEquals(0, expectedResults.size()); @@ -360,35 +364,37 @@ public class PeriodicNotificationApplicationIT extends RyaExportITBase { } } - - + + @Test public void periodicApplicationTest() throws Exception { - String sparql = "prefix function: <http://org.apache.rya/function#> " // n + final String sparql = "prefix function: <http://org.apache.rya/function#> " // n + "prefix time: <http://www.w3.org/2006/time#> " // n + "select ?obs ?id where {" // n + "Filter(function:periodic(?time, 1, .25, time:minutes)) " // n + "?obs <uri:hasTime> ?time. " // n + "?obs <uri:hasId> ?id } "; // n - + //make data - int periodMult = 15; + final int periodMult = 15; final ValueFactory vf = new ValueFactoryImpl(); final DatatypeFactory dtf = DatatypeFactory.newInstance(); //Sleep until current time aligns nicely with period to make //results more predictable - while(System.currentTimeMillis() % (periodMult*1000) > 500); - ZonedDateTime time = ZonedDateTime.now(); + while(System.currentTimeMillis() % (periodMult*1000) > 500) { + ; + } + final ZonedDateTime time = ZonedDateTime.now(); - ZonedDateTime zTime1 = time.minusSeconds(2*periodMult); - String time1 = zTime1.format(DateTimeFormatter.ISO_INSTANT); + final ZonedDateTime zTime1 = time.minusSeconds(2*periodMult); + final String time1 = zTime1.format(DateTimeFormatter.ISO_INSTANT); - ZonedDateTime zTime2 = zTime1.minusSeconds(periodMult); - String time2 = zTime2.format(DateTimeFormatter.ISO_INSTANT); + final ZonedDateTime zTime2 = zTime1.minusSeconds(periodMult); + final String time2 = zTime2.format(DateTimeFormatter.ISO_INSTANT); - ZonedDateTime zTime3 = zTime2.minusSeconds(periodMult); - String time3 = zTime3.format(DateTimeFormatter.ISO_INSTANT); + final ZonedDateTime zTime3 = zTime2.minusSeconds(periodMult); + final String time3 = zTime3.format(DateTimeFormatter.ISO_INSTANT); final Collection<Statement> statements = Sets.newHashSet( vf.createStatement(vf.createURI("urn:obs_1"), vf.createURI("uri:hasTime"), @@ -400,26 +406,26 @@ public class PeriodicNotificationApplicationIT extends RyaExportITBase { vf.createStatement(vf.createURI("urn:obs_3"), vf.createURI("uri:hasTime"), vf.createLiteral(dtf.newXMLGregorianCalendar(time3))), vf.createStatement(vf.createURI("urn:obs_3"), vf.createURI("uri:hasId"), vf.createLiteral("id_3"))); - + try (FluoClient fluo = FluoClientFactory.getFluoClient(conf.getFluoAppName(), Optional.of(conf.getFluoTableName()), conf)) { - Connector connector = ConfigUtils.getConnector(conf); - PeriodicQueryResultStorage storage = new AccumuloPeriodicQueryResultStorage(connector, conf.getTablePrefix()); - CreatePeriodicQuery periodicQuery = new CreatePeriodicQuery(fluo, storage); - String id = FluoQueryUtils.convertFluoQueryIdToPcjId(periodicQuery.createPeriodicQuery(sparql, registrar).getQueryId()); + final Connector connector = ConfigUtils.getConnector(conf); + final PeriodicQueryResultStorage storage = new AccumuloPeriodicQueryResultStorage(connector, conf.getTablePrefix()); + final CreatePeriodicQuery periodicQuery = new CreatePeriodicQuery(fluo, storage); + final String id = FluoQueryUtils.convertFluoQueryIdToPcjId(periodicQuery.createPeriodicQuery(sparql, registrar).getQueryId()); addData(statements); app.start(); - - Multimap<Long, BindingSet> expected = HashMultimap.create(); + + final Multimap<Long, BindingSet> expected = HashMultimap.create(); try (KafkaConsumer<String, BindingSet> consumer = new KafkaConsumer<>(kafkaProps, new StringDeserializer(), new BindingSetSerDe())) { consumer.subscribe(Arrays.asList(id)); - long end = System.currentTimeMillis() + 4*periodMult*1000; + final long end = System.currentTimeMillis() + 4*periodMult*1000; long lastBinId = 0L; long binId = 0L; - List<Long> ids = new ArrayList<>(); + final List<Long> ids = new ArrayList<>(); while (System.currentTimeMillis() < end) { - ConsumerRecords<String, BindingSet> records = consumer.poll(periodMult*1000); - for(ConsumerRecord<String, BindingSet> record: records){ - BindingSet result = record.value(); + final ConsumerRecords<String, BindingSet> records = consumer.poll(periodMult*1000); + for(final ConsumerRecord<String, BindingSet> record: records){ + final BindingSet result = record.value(); binId = Long.parseLong(result.getBinding(IncrementalUpdateConstants.PERIODIC_BIN_ID).getValue().stringValue()); if(lastBinId != binId) { lastBinId = binId; @@ -428,17 +434,17 @@ public class PeriodicNotificationApplicationIT extends RyaExportITBase { expected.put(binId, result); } } - + Assert.assertEquals(3, expected.asMap().size()); int i = 0; - for(Long ident: ids) { + for(final Long ident: ids) { Assert.assertEquals(3-i, expected.get(ident).size()); i++; } } - - - Set<BindingSet> expectedResults = new HashSet<>(); + + + final Set<BindingSet> expectedResults = new HashSet<>(); try (CloseableIterator<BindingSet> results = storage.listResults(id, Optional.empty())) { results.forEachRemaining(x -> expectedResults.add(x)); Assert.assertEquals(0, expectedResults.size()); @@ -446,40 +452,40 @@ public class PeriodicNotificationApplicationIT extends RyaExportITBase { } } - - + + @After public void shutdown() { registrar.close(); app.stop(); } - - private void addData(Collection<Statement> statements) throws DatatypeConfigurationException { + + private void addData(final Collection<Statement> statements) throws DatatypeConfigurationException { // add statements to Fluo try (FluoClient fluo = new FluoClientImpl(getFluoConfiguration())) { - InsertTriples inserter = new InsertTriples(); + final InsertTriples inserter = new InsertTriples(); statements.forEach(x -> inserter.insert(fluo, RdfToRyaConversions.convertStatement(x))); getMiniFluo().waitForObservers(); } } - private static Properties getKafkaProperties(PeriodicNotificationApplicationConfiguration conf) { - Properties kafkaProps = new Properties(); + private static Properties getKafkaProperties(final PeriodicNotificationApplicationConfiguration conf) { + final Properties kafkaProps = new Properties(); kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); kafkaProps.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString()); kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, conf.getNotificationGroupId()); kafkaProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); return kafkaProps; } - + private Properties getProps() throws IOException { - - Properties props = new Properties(); + + final Properties props = new Properties(); try(InputStream in = new FileInputStream("src/test/resources/notification.properties")) { props.load(in); - } - - FluoConfiguration fluoConf = getFluoConfiguration(); + } + + final FluoConfiguration fluoConf = getFluoConfiguration(); props.setProperty("accumulo.user", getUsername()); props.setProperty("accumulo.password", getPassword()); props.setProperty("accumulo.instance", getMiniAccumuloCluster().getInstanceName()); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/4089e706/extras/periodic.notification/tests/src/test/java/org/apache/rya/periodic/notification/exporter/PeriodicNotificationExporterIT.java ---------------------------------------------------------------------- diff --git a/extras/periodic.notification/tests/src/test/java/org/apache/rya/periodic/notification/exporter/PeriodicNotificationExporterIT.java b/extras/periodic.notification/tests/src/test/java/org/apache/rya/periodic/notification/exporter/PeriodicNotificationExporterIT.java index 874e7e2..82338b9 100644 --- a/extras/periodic.notification/tests/src/test/java/org/apache/rya/periodic/notification/exporter/PeriodicNotificationExporterIT.java +++ b/extras/periodic.notification/tests/src/test/java/org/apache/rya/periodic/notification/exporter/PeriodicNotificationExporterIT.java @@ -34,10 +34,10 @@ import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage; -import org.apache.rya.kafka.base.KafkaITBase; -import org.apache.rya.kafka.base.KafkaTestInstanceRule; import org.apache.rya.periodic.notification.api.BindingSetRecord; import org.apache.rya.periodic.notification.serialization.BindingSetSerDe; +import org.apache.rya.test.kafka.KafkaITBase; +import org.apache.rya.test.kafka.KafkaTestInstanceRule; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/4089e706/extras/periodic.notification/tests/src/test/java/org/apache/rya/periodic/notification/registration/kafka/PeriodicCommandNotificationConsumerIT.java ---------------------------------------------------------------------- diff --git a/extras/periodic.notification/tests/src/test/java/org/apache/rya/periodic/notification/registration/kafka/PeriodicCommandNotificationConsumerIT.java b/extras/periodic.notification/tests/src/test/java/org/apache/rya/periodic/notification/registration/kafka/PeriodicCommandNotificationConsumerIT.java index 522e69d..1fb6167 100644 --- a/extras/periodic.notification/tests/src/test/java/org/apache/rya/periodic/notification/registration/kafka/PeriodicCommandNotificationConsumerIT.java +++ b/extras/periodic.notification/tests/src/test/java/org/apache/rya/periodic/notification/registration/kafka/PeriodicCommandNotificationConsumerIT.java @@ -30,13 +30,13 @@ import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.log4j.BasicConfigurator; -import org.apache.rya.kafka.base.KafkaITBase; -import org.apache.rya.kafka.base.KafkaTestInstanceRule; import org.apache.rya.periodic.notification.coordinator.PeriodicNotificationCoordinatorExecutor; import org.apache.rya.periodic.notification.notification.CommandNotification; import org.apache.rya.periodic.notification.notification.TimestampedNotification; import org.apache.rya.periodic.notification.registration.KafkaNotificationRegistrationClient; import org.apache.rya.periodic.notification.serialization.CommandNotificationSerializer; +import org.apache.rya.test.kafka.KafkaITBase; +import org.apache.rya.test.kafka.KafkaTestInstanceRule; import org.junit.Assert; import org.junit.Before; import org.junit.Rule; @@ -48,10 +48,10 @@ public class PeriodicCommandNotificationConsumerIT extends KafkaITBase { private PeriodicNotificationCoordinatorExecutor coord; private KafkaNotificationProvider provider; private String bootstrapServer; - + @Rule public KafkaTestInstanceRule rule = new KafkaTestInstanceRule(false); - + @Before public void init() throws Exception { bootstrapServer = createBootstrapServerConfig().getProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG); @@ -62,12 +62,12 @@ public class PeriodicCommandNotificationConsumerIT extends KafkaITBase { BasicConfigurator.configure(); - BlockingQueue<TimestampedNotification> notifications = new LinkedBlockingQueue<>(); - Properties props = createKafkaConfig(); - KafkaProducer<String, CommandNotification> producer = new KafkaProducer<>(props); - String topic = rule.getKafkaTopicName(); + final BlockingQueue<TimestampedNotification> notifications = new LinkedBlockingQueue<>(); + final Properties props = createKafkaConfig(); + final KafkaProducer<String, CommandNotification> producer = new KafkaProducer<>(props); + final String topic = rule.getKafkaTopicName(); rule.createTopic(topic); - + registration = new KafkaNotificationRegistrationClient(topic, producer); coord = new PeriodicNotificationCoordinatorExecutor(1, notifications); provider = new KafkaNotificationProvider(topic, new StringDeserializer(), new CommandNotificationSerializer(), props, coord, 1); @@ -80,11 +80,11 @@ public class PeriodicCommandNotificationConsumerIT extends KafkaITBase { registration.deleteNotification("1"); Thread.sleep(2000); - int size = notifications.size(); + final int size = notifications.size(); // sleep for 2 seconds to ensure no more messages being produced Thread.sleep(2000); Assert.assertEquals(size, notifications.size()); - + tearDown(); } @@ -93,12 +93,12 @@ public class PeriodicCommandNotificationConsumerIT extends KafkaITBase { BasicConfigurator.configure(); - BlockingQueue<TimestampedNotification> notifications = new LinkedBlockingQueue<>(); - Properties props = createKafkaConfig(); - KafkaProducer<String, CommandNotification> producer = new KafkaProducer<>(props); - String topic = rule.getKafkaTopicName(); + final BlockingQueue<TimestampedNotification> notifications = new LinkedBlockingQueue<>(); + final Properties props = createKafkaConfig(); + final KafkaProducer<String, CommandNotification> producer = new KafkaProducer<>(props); + final String topic = rule.getKafkaTopicName(); rule.createTopic(topic); - + registration = new KafkaNotificationRegistrationClient(topic, producer); coord = new PeriodicNotificationCoordinatorExecutor(1, notifications); provider = new KafkaNotificationProvider(topic, new StringDeserializer(), new CommandNotificationSerializer(), props, coord, 1); @@ -111,11 +111,11 @@ public class PeriodicCommandNotificationConsumerIT extends KafkaITBase { registration.deleteNotification("1"); Thread.sleep(2000); - int size = notifications.size(); + final int size = notifications.size(); // sleep for 2 seconds to ensure no more messages being produced Thread.sleep(2000); Assert.assertEquals(size, notifications.size()); - + tearDown(); } @@ -126,7 +126,7 @@ public class PeriodicCommandNotificationConsumerIT extends KafkaITBase { } private Properties createKafkaConfig() { - Properties props = new Properties(); + final Properties props = new Properties(); props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer); props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString()); props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "consumer0"); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/4089e706/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/AbstractSpanBatchInformation.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/AbstractSpanBatchInformation.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/AbstractSpanBatchInformation.java index 498dd85..4933d57 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/AbstractSpanBatchInformation.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/AbstractSpanBatchInformation.java @@ -23,7 +23,7 @@ import java.util.Objects; import org.apache.fluo.api.data.Column; import org.apache.fluo.api.data.Span; -import jline.internal.Preconditions; +import com.google.common.base.Preconditions; /** * Abstract class for generating span based notifications. A spanned notification http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/4089e706/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/JoinBatchInformation.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/JoinBatchInformation.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/JoinBatchInformation.java index d049ff0..3354fdc 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/JoinBatchInformation.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/JoinBatchInformation.java @@ -27,7 +27,7 @@ import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; import org.openrdf.query.Binding; -import jline.internal.Preconditions; +import com.google.common.base.Preconditions; /** * This class updates join results based on parameters specified for the join's http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/4089e706/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaExportParameterBase.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaExportParameterBase.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaExportParameterBase.java index aab3929..8686c85 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaExportParameterBase.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaExportParameterBase.java @@ -26,7 +26,7 @@ import org.apache.fluo.api.observer.Observer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.rya.indexing.pcj.fluo.app.export.ParametersBase; -import jline.internal.Preconditions; +import com.google.common.base.Preconditions; /** * Provides read/write functions to the parameters map that is passed into an http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/4089e706/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/kafka/base/EmbeddedKafkaInstance.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/kafka/base/EmbeddedKafkaInstance.java b/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/kafka/base/EmbeddedKafkaInstance.java deleted file mode 100644 index 97d8b90..0000000 --- a/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/kafka/base/EmbeddedKafkaInstance.java +++ /dev/null @@ -1,143 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.rya.kafka.base; - -import java.nio.file.Files; -import java.util.Properties; -import java.util.concurrent.atomic.AtomicInteger; - -import org.apache.fluo.core.util.PortUtils; -import org.apache.kafka.clients.CommonClientConfigs; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import kafka.server.KafkaConfig; -import kafka.server.KafkaConfig$; -import kafka.server.KafkaServer; -import kafka.utils.MockTime; -import kafka.utils.TestUtils; -import kafka.utils.Time; -import kafka.zk.EmbeddedZookeeper; - -/** - * This class provides a {@link KafkaServer} and a dedicated - * {@link EmbeddedZookeeper} server for integtration testing. Both servers use a - * random free port, so it is necesssary to use the - * {@link #getZookeeperConnect()} and {@link #createBootstrapServerConfig()} - * methods to determine how to connect to them. - * - */ -public class EmbeddedKafkaInstance { - - private static final Logger logger = LoggerFactory.getLogger(EmbeddedKafkaInstance.class); - - private static final AtomicInteger KAFKA_TOPIC_COUNTER = new AtomicInteger(1); - private static final String IPv4_LOOPBACK = "127.0.0.1"; - private static final String ZKHOST = IPv4_LOOPBACK; - private static final String BROKERHOST = IPv4_LOOPBACK; - private KafkaServer kafkaServer; - private EmbeddedZookeeper zkServer; - private String brokerPort; - private String zookeperConnect; - - /** - * Starts the Embedded Kafka and Zookeeper Servers. - * @throws Exception - If an exeption occurs during startup. - */ - protected void startup() throws Exception { - // Setup the embedded zookeeper - logger.info("Starting up Embedded Zookeeper..."); - zkServer = new EmbeddedZookeeper(); - zookeperConnect = ZKHOST + ":" + zkServer.port(); - logger.info("Embedded Zookeeper started at: {}", zookeperConnect); - - // setup Broker - logger.info("Starting up Embedded Kafka..."); - brokerPort = Integer.toString(PortUtils.getRandomFreePort()); - final Properties brokerProps = new Properties(); - brokerProps.setProperty(KafkaConfig$.MODULE$.BrokerIdProp(), "0"); - brokerProps.setProperty(KafkaConfig$.MODULE$.HostNameProp(), BROKERHOST); - brokerProps.setProperty(KafkaConfig$.MODULE$.PortProp(), brokerPort); - brokerProps.setProperty(KafkaConfig$.MODULE$.ZkConnectProp(), zookeperConnect); - brokerProps.setProperty(KafkaConfig$.MODULE$.LogDirsProp(), Files.createTempDirectory(getClass().getSimpleName() + "-").toAbsolutePath().toString()); - final KafkaConfig config = new KafkaConfig(brokerProps); - final Time mock = new MockTime(); - kafkaServer = TestUtils.createServer(config, mock); - logger.info("Embedded Kafka Server started at: {}:{}", BROKERHOST, brokerPort); - } - - /** - * Shutdown the Embedded Kafka and Zookeeper. - * @throws Exception - */ - protected void shutdown() throws Exception { - try { - if(kafkaServer != null) { - kafkaServer.shutdown(); - } - } finally { - if(zkServer != null) { - zkServer.shutdown(); - } - } - } - - /** - * @return A new Property object containing the correct value of - * {@link CommonClientConfigs#BOOTSTRAP_SERVERS_CONFIG}, for - * connecting to this instance. - */ - public Properties createBootstrapServerConfig() { - final Properties config = new Properties(); - config.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, BROKERHOST + ":" + brokerPort); - return config; - } - - /** - * - * @return The host of the Kafka Broker. - */ - public String getBrokerHost() { - return BROKERHOST; - } - - /** - * - * @return The port of the Kafka Broker. - */ - public String getBrokerPort() { - return brokerPort; - } - - /** - * - * @return The Zookeeper Connect String. - */ - public String getZookeeperConnect() { - return zookeperConnect; - } - - /** - * - * @return A unique Kafka topic name for this instance. - */ - public String getUniqueTopicName() { - return "topic_" + KAFKA_TOPIC_COUNTER.getAndIncrement() + "_"; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/4089e706/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/kafka/base/EmbeddedKafkaSingleton.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/kafka/base/EmbeddedKafkaSingleton.java b/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/kafka/base/EmbeddedKafkaSingleton.java deleted file mode 100644 index 933377b..0000000 --- a/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/kafka/base/EmbeddedKafkaSingleton.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.rya.kafka.base; - -import java.io.IOException; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Provides a singleton instance of an {@link EmbeddedKafkaInstance} and - * includes a shutdown hook to ensure any open resources are closed on JVM exit. - * <p> - * This class is derived from MiniAccumuloSingleton. - */ -public class EmbeddedKafkaSingleton { - - public static EmbeddedKafkaInstance getInstance() { - return InstanceHolder.SINGLETON.instance; - } - - private EmbeddedKafkaSingleton() { - // hiding implicit default constructor - } - - private enum InstanceHolder { - - SINGLETON; - - private final Logger log; - private final EmbeddedKafkaInstance instance; - - InstanceHolder() { - this.log = LoggerFactory.getLogger(EmbeddedKafkaInstance.class); - this.instance = new EmbeddedKafkaInstance(); - try { - this.instance.startup(); - - // JUnit does not have an overall lifecycle event for tearing down - // this kind of resource, but shutdown hooks work alright in practice - // since this should only be used during testing - - // The only other alternative for lifecycle management is to use a - // suite lifecycle to enclose the tests that need this resource. - // In practice this becomes unwieldy. - - Runtime.getRuntime().addShutdownHook(new Thread() { - @Override - public void run() { - try { - InstanceHolder.this.instance.shutdown(); - } catch (final Throwable t) { - // logging frameworks will likely be shut down - t.printStackTrace(System.err); - } - } - }); - - } catch (final InterruptedException e) { - Thread.currentThread().interrupt(); - log.error("Interrupted while starting EmbeddedKafkaInstance", e); - } catch (final IOException e) { - log.error("Unexpected error while starting EmbeddedKafkaInstance", e); - } catch (final Throwable e) { - // catching throwable because failure to construct an enum - // instance will lead to another error being thrown downstream - log.error("Unexpected throwable while starting EmbeddedKafkaInstance", e); - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/4089e706/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/kafka/base/KafkaITBase.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/kafka/base/KafkaITBase.java b/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/kafka/base/KafkaITBase.java deleted file mode 100644 index da4526c..0000000 --- a/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/kafka/base/KafkaITBase.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.rya.kafka.base; - -import java.util.Properties; - -/** - * A class intended to be extended for Kafka Integration tests. - */ -public class KafkaITBase { - - private static EmbeddedKafkaInstance embeddedKafka = EmbeddedKafkaSingleton.getInstance(); - - /** - * @return A new Property object containing the correct value for Kafka's - * {@link CommonClientConfigs#BOOTSTRAP_SERVERS_CONFIG}. - */ - protected Properties createBootstrapServerConfig() { - return embeddedKafka.createBootstrapServerConfig(); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/4089e706/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/kafka/base/KafkaTestInstanceRule.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/kafka/base/KafkaTestInstanceRule.java b/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/kafka/base/KafkaTestInstanceRule.java deleted file mode 100644 index a9ee7b5..0000000 --- a/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/kafka/base/KafkaTestInstanceRule.java +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.rya.kafka.base; - -import java.util.Properties; - -import org.I0Itec.zkclient.ZkClient; -import org.junit.rules.ExternalResource; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import kafka.admin.AdminUtils; -import kafka.admin.RackAwareMode; -import kafka.utils.ZKStringSerializer$; -import kafka.utils.ZkUtils; - - -/** - * Provides a JUnit Rule for interacting with the {@link EmbeddedKafkaSingleton}. - * - */ -public class KafkaTestInstanceRule extends ExternalResource { - private static final Logger logger = LoggerFactory.getLogger(KafkaTestInstanceRule.class); - private static final EmbeddedKafkaInstance kafkaInstance = EmbeddedKafkaSingleton.getInstance(); - private String kafkaTopicName; - private final boolean createTopic; - - /** - * @param createTopic - If true, a topic shall be created for the value - * provided by {@link #getKafkaTopicName()}. If false, no topics - * shall be created. - */ - public KafkaTestInstanceRule(final boolean createTopic) { - this.createTopic = createTopic; - } - - /** - * @return A unique topic name for this test execution. If multiple topics are required by a test, use this value as - * a prefix. - */ - public String getKafkaTopicName() { - if (kafkaTopicName == null) { - throw new IllegalStateException("Cannot get Kafka Topic Name outside of a test execution."); - } - return kafkaTopicName; - } - - @Override - protected void before() throws Throwable { - // Get the next kafka topic name. - kafkaTopicName = kafkaInstance.getUniqueTopicName(); - - if(createTopic) { - createTopic(kafkaTopicName); - } - } - - @Override - protected void after() { - kafkaTopicName = null; - } - - /** - * Utility method to provide additional unique topics if they are required. - * @param topicName - The Kafka topic to create. - */ - public void createTopic(final String topicName) { - // Setup Kafka. - ZkUtils zkUtils = null; - try { - logger.info("Creating Kafka Topic: '{}'", topicName); - zkUtils = ZkUtils.apply(new ZkClient(kafkaInstance.getZookeeperConnect(), 30000, 30000, ZKStringSerializer$.MODULE$), false); - AdminUtils.createTopic(zkUtils, topicName, 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$); - } - finally { - if(zkUtils != null) { - zkUtils.close(); - } - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/4089e706/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 92cfba8..3dc7c68 100644 --- a/pom.xml +++ b/pom.xml @@ -65,6 +65,7 @@ under the License. <module>pig</module> <module>sail</module> <module>spark</module> + <module>test</module> <module>web</module> </modules> <properties> @@ -280,6 +281,16 @@ under the License. <version>${project.version}</version> </dependency> <dependency> + <groupId>org.apache.rya</groupId> + <artifactId>rya.test.parent</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.rya</groupId> + <artifactId>rya.test.kafka</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> <groupId>org.apache.accumulo</groupId> <artifactId>accumulo-core</artifactId> <version>${accumulo.version}</version> http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/4089e706/test/kafka/pom.xml ---------------------------------------------------------------------- diff --git a/test/kafka/pom.xml b/test/kafka/pom.xml new file mode 100644 index 0000000..44773a7 --- /dev/null +++ b/test/kafka/pom.xml @@ -0,0 +1,81 @@ +<?xml version="1.0" encoding="utf-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + <parent> + <groupId>org.apache.rya</groupId> + <artifactId>rya.test.parent</artifactId> + <version>3.2.12-incubating-SNAPSHOT</version> + </parent> + + <modelVersion>4.0.0</modelVersion> + <artifactId>rya.test.kafka</artifactId> + + <name>Apache Rya Test Kafka</name> + <description> + This module contains the Rya Test Kakfa components that help write Kafka + based integration tests. + </description> + + <dependencies> + <!-- Kafka dependencies. --> + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka-clients</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka-clients</artifactId> + <classifier>test</classifier> + </dependency> + + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka_2.11</artifactId> + <exclusions> + <exclusion> + <artifactId>slf4j-log4j12</artifactId> + <groupId>org.slf4j</groupId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka_2.11</artifactId> + <classifier>test</classifier> + <scope>compile</scope> + <exclusions> + <exclusion> + <artifactId>slf4j-log4j12</artifactId> + <groupId>org.slf4j</groupId> + </exclusion> + </exclusions> + </dependency> + + <!-- Testing dependencies. --> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>compile</scope> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/4089e706/test/kafka/src/main/java/org/apache/rya/test/kafka/EmbeddedKafkaInstance.java ---------------------------------------------------------------------- diff --git a/test/kafka/src/main/java/org/apache/rya/test/kafka/EmbeddedKafkaInstance.java b/test/kafka/src/main/java/org/apache/rya/test/kafka/EmbeddedKafkaInstance.java new file mode 100644 index 0000000..c7c5929 --- /dev/null +++ b/test/kafka/src/main/java/org/apache/rya/test/kafka/EmbeddedKafkaInstance.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.test.kafka; + +import java.nio.file.Files; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import kafka.server.KafkaConfig; +import kafka.server.KafkaConfig$; +import kafka.server.KafkaServer; +import kafka.utils.MockTime; +import kafka.utils.TestUtils; +import kafka.utils.Time; +import kafka.zk.EmbeddedZookeeper; + +/** + * This class provides a {@link KafkaServer} and a dedicated + * {@link EmbeddedZookeeper} server for integtration testing. Both servers use a + * random free port, so it is necesssary to use the + * {@link #getZookeeperConnect()} and {@link #createBootstrapServerConfig()} + * methods to determine how to connect to them. + * + */ +public class EmbeddedKafkaInstance { + + private static final Logger logger = LoggerFactory.getLogger(EmbeddedKafkaInstance.class); + + private static final AtomicInteger KAFKA_TOPIC_COUNTER = new AtomicInteger(1); + private static final String IPv4_LOOPBACK = "127.0.0.1"; + private static final String ZKHOST = IPv4_LOOPBACK; + private static final String BROKERHOST = IPv4_LOOPBACK; + private KafkaServer kafkaServer; + private EmbeddedZookeeper zkServer; + private String brokerPort; + private String zookeperConnect; + + /** + * Starts the Embedded Kafka and Zookeeper Servers. + * @throws Exception - If an exeption occurs during startup. + */ + protected void startup() throws Exception { + // Setup the embedded zookeeper + logger.info("Starting up Embedded Zookeeper..."); + zkServer = new EmbeddedZookeeper(); + zookeperConnect = ZKHOST + ":" + zkServer.port(); + logger.info("Embedded Zookeeper started at: {}", zookeperConnect); + + // setup Broker + logger.info("Starting up Embedded Kafka..."); + brokerPort = Integer.toString(PortUtils.getRandomFreePort()); + final Properties brokerProps = new Properties(); + brokerProps.setProperty(KafkaConfig$.MODULE$.BrokerIdProp(), "0"); + brokerProps.setProperty(KafkaConfig$.MODULE$.HostNameProp(), BROKERHOST); + brokerProps.setProperty(KafkaConfig$.MODULE$.PortProp(), brokerPort); + brokerProps.setProperty(KafkaConfig$.MODULE$.ZkConnectProp(), zookeperConnect); + brokerProps.setProperty(KafkaConfig$.MODULE$.LogDirsProp(), Files.createTempDirectory(getClass().getSimpleName() + "-").toAbsolutePath().toString()); + final KafkaConfig config = new KafkaConfig(brokerProps); + final Time mock = new MockTime(); + kafkaServer = TestUtils.createServer(config, mock); + logger.info("Embedded Kafka Server started at: {}:{}", BROKERHOST, brokerPort); + } + + /** + * Shutdown the Embedded Kafka and Zookeeper. + * @throws Exception + */ + protected void shutdown() throws Exception { + try { + if(kafkaServer != null) { + kafkaServer.shutdown(); + } + } finally { + if(zkServer != null) { + zkServer.shutdown(); + } + } + } + + /** + * @return A new Property object containing the correct value of + * {@link CommonClientConfigs#BOOTSTRAP_SERVERS_CONFIG}, for + * connecting to this instance. + */ + public Properties createBootstrapServerConfig() { + final Properties config = new Properties(); + config.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, BROKERHOST + ":" + brokerPort); + return config; + } + + /** + * + * @return The host of the Kafka Broker. + */ + public String getBrokerHost() { + return BROKERHOST; + } + + /** + * + * @return The port of the Kafka Broker. + */ + public String getBrokerPort() { + return brokerPort; + } + + /** + * + * @return The Zookeeper Connect String. + */ + public String getZookeeperConnect() { + return zookeperConnect; + } + + /** + * + * @return A unique Kafka topic name for this instance. + */ + public String getUniqueTopicName() { + return "topic_" + KAFKA_TOPIC_COUNTER.getAndIncrement() + "_"; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/4089e706/test/kafka/src/main/java/org/apache/rya/test/kafka/EmbeddedKafkaSingleton.java ---------------------------------------------------------------------- diff --git a/test/kafka/src/main/java/org/apache/rya/test/kafka/EmbeddedKafkaSingleton.java b/test/kafka/src/main/java/org/apache/rya/test/kafka/EmbeddedKafkaSingleton.java new file mode 100644 index 0000000..3a930ee --- /dev/null +++ b/test/kafka/src/main/java/org/apache/rya/test/kafka/EmbeddedKafkaSingleton.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.test.kafka; + +import java.io.IOException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Provides a singleton instance of an {@link EmbeddedKafkaInstance} and + * includes a shutdown hook to ensure any open resources are closed on JVM exit. + * <p> + * This class is derived from MiniAccumuloSingleton. + */ +public class EmbeddedKafkaSingleton { + + public static EmbeddedKafkaInstance getInstance() { + return InstanceHolder.SINGLETON.instance; + } + + private EmbeddedKafkaSingleton() { + // hiding implicit default constructor + } + + private enum InstanceHolder { + + SINGLETON; + + private final Logger log; + private final EmbeddedKafkaInstance instance; + + InstanceHolder() { + this.log = LoggerFactory.getLogger(EmbeddedKafkaInstance.class); + this.instance = new EmbeddedKafkaInstance(); + try { + this.instance.startup(); + + // JUnit does not have an overall lifecycle event for tearing down + // this kind of resource, but shutdown hooks work alright in practice + // since this should only be used during testing + + // The only other alternative for lifecycle management is to use a + // suite lifecycle to enclose the tests that need this resource. + // In practice this becomes unwieldy. + + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + try { + InstanceHolder.this.instance.shutdown(); + } catch (final Throwable t) { + // logging frameworks will likely be shut down + t.printStackTrace(System.err); + } + } + }); + + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + log.error("Interrupted while starting EmbeddedKafkaInstance", e); + } catch (final IOException e) { + log.error("Unexpected error while starting EmbeddedKafkaInstance", e); + } catch (final Throwable e) { + // catching throwable because failure to construct an enum + // instance will lead to another error being thrown downstream + log.error("Unexpected throwable while starting EmbeddedKafkaInstance", e); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/4089e706/test/kafka/src/main/java/org/apache/rya/test/kafka/KafkaITBase.java ---------------------------------------------------------------------- diff --git a/test/kafka/src/main/java/org/apache/rya/test/kafka/KafkaITBase.java b/test/kafka/src/main/java/org/apache/rya/test/kafka/KafkaITBase.java new file mode 100644 index 0000000..ddafbcb --- /dev/null +++ b/test/kafka/src/main/java/org/apache/rya/test/kafka/KafkaITBase.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.test.kafka; + +import java.util.Properties; + +/** + * A class intended to be extended for Kafka Integration tests. + */ +public class KafkaITBase { + + private static EmbeddedKafkaInstance embeddedKafka = EmbeddedKafkaSingleton.getInstance(); + + /** + * @return A new Property object containing the correct value for Kafka's + * {@link CommonClientConfigs#BOOTSTRAP_SERVERS_CONFIG}. + */ + protected Properties createBootstrapServerConfig() { + return embeddedKafka.createBootstrapServerConfig(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/4089e706/test/kafka/src/main/java/org/apache/rya/test/kafka/KafkaTestInstanceRule.java ---------------------------------------------------------------------- diff --git a/test/kafka/src/main/java/org/apache/rya/test/kafka/KafkaTestInstanceRule.java b/test/kafka/src/main/java/org/apache/rya/test/kafka/KafkaTestInstanceRule.java new file mode 100644 index 0000000..5fe3c88 --- /dev/null +++ b/test/kafka/src/main/java/org/apache/rya/test/kafka/KafkaTestInstanceRule.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.test.kafka; + +import java.util.Properties; + +import org.I0Itec.zkclient.ZkClient; +import org.junit.rules.ExternalResource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import kafka.admin.AdminUtils; +import kafka.admin.RackAwareMode; +import kafka.utils.ZKStringSerializer$; +import kafka.utils.ZkUtils; + + +/** + * Provides a JUnit Rule for interacting with the {@link EmbeddedKafkaSingleton}. + * + */ +public class KafkaTestInstanceRule extends ExternalResource { + private static final Logger logger = LoggerFactory.getLogger(KafkaTestInstanceRule.class); + private static final EmbeddedKafkaInstance kafkaInstance = EmbeddedKafkaSingleton.getInstance(); + private String kafkaTopicName; + private final boolean createTopic; + + /** + * @param createTopic - If true, a topic shall be created for the value + * provided by {@link #getKafkaTopicName()}. If false, no topics + * shall be created. + */ + public KafkaTestInstanceRule(final boolean createTopic) { + this.createTopic = createTopic; + } + + /** + * @return A unique topic name for this test execution. If multiple topics are required by a test, use this value as + * a prefix. + */ + public String getKafkaTopicName() { + if (kafkaTopicName == null) { + throw new IllegalStateException("Cannot get Kafka Topic Name outside of a test execution."); + } + return kafkaTopicName; + } + + @Override + protected void before() throws Throwable { + // Get the next kafka topic name. + kafkaTopicName = kafkaInstance.getUniqueTopicName(); + + if(createTopic) { + createTopic(kafkaTopicName); + } + } + + @Override + protected void after() { + kafkaTopicName = null; + } + + /** + * Utility method to provide additional unique topics if they are required. + * @param topicName - The Kafka topic to create. + */ + public void createTopic(final String topicName) { + // Setup Kafka. + ZkUtils zkUtils = null; + try { + logger.info("Creating Kafka Topic: '{}'", topicName); + zkUtils = ZkUtils.apply(new ZkClient(kafkaInstance.getZookeeperConnect(), 30000, 30000, ZKStringSerializer$.MODULE$), false); + AdminUtils.createTopic(zkUtils, topicName, 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$); + } + finally { + if(zkUtils != null) { + zkUtils.close(); + } + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/4089e706/test/kafka/src/main/java/org/apache/rya/test/kafka/PortUtils.java ---------------------------------------------------------------------- diff --git a/test/kafka/src/main/java/org/apache/rya/test/kafka/PortUtils.java b/test/kafka/src/main/java/org/apache/rya/test/kafka/PortUtils.java new file mode 100644 index 0000000..7dad966 --- /dev/null +++ b/test/kafka/src/main/java/org/apache/rya/test/kafka/PortUtils.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.rya.test.kafka; + +import java.io.IOException; +import java.net.ServerSocket; +import java.util.Random; + +public class PortUtils { + + private PortUtils() {} + + public static int getRandomFreePort() { + final Random r = new Random(); + int count = 0; + + while (count < 13) { + final int port = r.nextInt((1 << 16) - 1024) + 1024; + + try (ServerSocket so = new ServerSocket(port)) { + so.setReuseAddress(true); + return port; + } catch (final IOException e) { + // ignore + } + + count++; + } + + throw new RuntimeException("Unable to find port"); + } +}