http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/af736749/extras/kafka.connect/api/src/test/java/org/apache/rya/kafka/connect/api/sink/RyaSinkTaskTest.java ---------------------------------------------------------------------- diff --git a/extras/kafka.connect/api/src/test/java/org/apache/rya/kafka/connect/api/sink/RyaSinkTaskTest.java b/extras/kafka.connect/api/src/test/java/org/apache/rya/kafka/connect/api/sink/RyaSinkTaskTest.java new file mode 100644 index 0000000..e90042d --- /dev/null +++ b/extras/kafka.connect/api/src/test/java/org/apache/rya/kafka/connect/api/sink/RyaSinkTaskTest.java @@ -0,0 +1,264 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.api.sink; + +import static org.junit.Assert.assertEquals; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.apache.kafka.connect.sink.SinkRecord; +import org.eclipse.rdf4j.common.iteration.CloseableIteration; +import org.eclipse.rdf4j.model.Statement; +import org.eclipse.rdf4j.model.ValueFactory; +import org.eclipse.rdf4j.model.impl.SimpleValueFactory; +import org.eclipse.rdf4j.sail.Sail; +import org.eclipse.rdf4j.sail.SailConnection; +import org.eclipse.rdf4j.sail.SailException; +import org.eclipse.rdf4j.sail.memory.MemoryStore; +import org.junit.Test; + +import com.google.common.collect.Sets; + +/** + * Unit tests the methods of {@link RyaSinkTask}. + */ +public class RyaSinkTaskTest { + + /** + * A {@link RyaSinkTask} used to test against an in memory Sail instance. + */ + private static final class InMemoryRyaSinkTask extends RyaSinkTask { + + private Sail sail = null; + + @Override + protected void checkRyaInstanceExists(final Map<String, String> taskConfig) throws IllegalStateException { + // Do nothing. Always assume the Rya Instance exists. + } + + @Override + protected Sail makeSail(final Map<String, String> taskConfig) { + if(sail == null) { + sail = new MemoryStore(); + sail.initialize(); + } + return sail; + } + } + + @Test(expected = IllegalStateException.class) + public void start_ryaInstanceDoesNotExist() { + // Create the task that will be tested. + final RyaSinkTask task = new RyaSinkTask() { + @Override + protected void checkRyaInstanceExists(final Map<String, String> taskConfig) throws IllegalStateException { + throw new IllegalStateException("It doesn't exist."); + } + + @Override + protected Sail makeSail(final Map<String, String> taskConfig) { return null; } + }; + + // Since the rya instance does not exist, this will throw an exception. + task.start(new HashMap<>()); + } + + @Test + public void singleRecord() { + // Create the Statements that will be put by the task. + final ValueFactory vf = SimpleValueFactory.getInstance(); + final Set<Statement> statements = Sets.newHashSet( + vf.createStatement( + vf.createIRI("urn:Alice"), + vf.createIRI("urn:WorksAt"), + vf.createIRI("urn:Taco Shop"), + vf.createIRI("urn:graph1")), + vf.createStatement( + vf.createIRI("urn:Bob"), + vf.createIRI("urn:TalksTo"), + vf.createIRI("urn:Charlie"), + vf.createIRI("urn:graph2")), + vf.createStatement( + vf.createIRI("urn:Eve"), + vf.createIRI("urn:ListensTo"), + vf.createIRI("urn:Alice"), + vf.createIRI("urn:graph1"))); + + // Create the task that will be tested. + final InMemoryRyaSinkTask task = new InMemoryRyaSinkTask(); + + // Setup the properties that will be used to configure the task. We don't actually need to set anything + // here since we're always returning true for ryaInstanceExists(...) and use an in memory RDF store. + final Map<String, String> props = new HashMap<>(); + + try { + // Start the task. + task.start(props); + + // Put the statements as a SinkRecord. + task.put( Collections.singleton(new SinkRecord("topic", 1, null, "key", null, statements, 0)) ); + + // Flush the statements. + task.flush(new HashMap<>()); + + // Fetch the stored Statements to show they match the original set. + final Set<Statement> fetched = new HashSet<>(); + + final Sail sail = task.makeSail(props); + try(SailConnection conn = sail.getConnection(); + CloseableIteration<? extends Statement, SailException> it = conn.getStatements(null, null, null, false)) { + while(it.hasNext()) { + fetched.add( it.next() ); + } + } + + assertEquals(statements, fetched); + + } finally { + // Stop the task. + task.stop(); + } + } + + @Test + public void multipleRecords() { + // Create the Statements that will be put by the task. + final ValueFactory vf = SimpleValueFactory.getInstance(); + final Set<Statement> batch1 = Sets.newHashSet( + vf.createStatement( + vf.createIRI("urn:Alice"), + vf.createIRI("urn:WorksAt"), + vf.createIRI("urn:Taco Shop"), + vf.createIRI("urn:graph1")), + vf.createStatement( + vf.createIRI("urn:Bob"), + vf.createIRI("urn:TalksTo"), + vf.createIRI("urn:Charlie"), + vf.createIRI("urn:graph2"))); + + final Set<Statement> batch2 = Sets.newHashSet( + vf.createStatement( + vf.createIRI("urn:Eve"), + vf.createIRI("urn:ListensTo"), + vf.createIRI("urn:Alice"), + vf.createIRI("urn:graph1"))); + + // Create the task that will be tested. + final InMemoryRyaSinkTask task = new InMemoryRyaSinkTask(); + + // Setup the properties that will be used to configure the task. We don't actually need to set anything + // here since we're always returning true for ryaInstanceExists(...) and use an in memory RDF store. + final Map<String, String> props = new HashMap<>(); + + try { + // Start the task. + task.start(props); + + // Put the statements as SinkRecords. + final Collection<SinkRecord> records = Sets.newHashSet( + new SinkRecord("topic", 1, null, "key", null, batch1, 0), + new SinkRecord("topic", 1, null, "key", null, batch2, 1)); + task.put( records ); + + // Flush the statements. + task.flush(new HashMap<>()); + + // Fetch the stored Statements to show they match the original set. + final Set<Statement> fetched = new HashSet<>(); + + final Sail sail = task.makeSail(props); + try(SailConnection conn = sail.getConnection(); + CloseableIteration<? extends Statement, SailException> it = conn.getStatements(null, null, null, false)) { + while(it.hasNext()) { + fetched.add( it.next() ); + } + } + + assertEquals(Sets.union(batch1, batch2), fetched); + + } finally { + // Stop the task. + task.stop(); + } + } + + @Test + public void flushBetweenPuts() { + // Create the Statements that will be put by the task. + final ValueFactory vf = SimpleValueFactory.getInstance(); + final Set<Statement> batch1 = Sets.newHashSet( + vf.createStatement( + vf.createIRI("urn:Alice"), + vf.createIRI("urn:WorksAt"), + vf.createIRI("urn:Taco Shop"), + vf.createIRI("urn:graph1")), + vf.createStatement( + vf.createIRI("urn:Bob"), + vf.createIRI("urn:TalksTo"), + vf.createIRI("urn:Charlie"), + vf.createIRI("urn:graph2"))); + + final Set<Statement> batch2 = Sets.newHashSet( + vf.createStatement( + vf.createIRI("urn:Eve"), + vf.createIRI("urn:ListensTo"), + vf.createIRI("urn:Alice"), + vf.createIRI("urn:graph1"))); + + // Create the task that will be tested. + final InMemoryRyaSinkTask task = new InMemoryRyaSinkTask(); + + // Setup the properties that will be used to configure the task. We don't actually need to set anything + // here since we're always returning true for ryaInstanceExists(...) and use an in memory RDF store. + final Map<String, String> props = new HashMap<>(); + + try { + // Start the task. + task.start(props); + + // Put the statements with flushes between them. + task.put( Collections.singleton(new SinkRecord("topic", 1, null, "key", null, batch1, 0)) ); + task.flush(new HashMap<>()); + task.put( Collections.singleton(new SinkRecord("topic", 1, null, "key", null, batch2, 1)) ); + task.flush(new HashMap<>()); + + // Fetch the stored Statements to show they match the original set. + final Set<Statement> fetched = new HashSet<>(); + + final Sail sail = task.makeSail(props); + try(SailConnection conn = sail.getConnection(); + CloseableIteration<? extends Statement, SailException> it = conn.getStatements(null, null, null, false)) { + while(it.hasNext()) { + fetched.add( it.next() ); + } + } + + assertEquals(Sets.union(batch1, batch2), fetched); + + } finally { + // Stop the task. + task.stop(); + } + } +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/af736749/extras/kafka.connect/api/src/test/resources/simplelogger.properties ---------------------------------------------------------------------- diff --git a/extras/kafka.connect/api/src/test/resources/simplelogger.properties b/extras/kafka.connect/api/src/test/resources/simplelogger.properties new file mode 100644 index 0000000..1b21312 --- /dev/null +++ b/extras/kafka.connect/api/src/test/resources/simplelogger.properties @@ -0,0 +1,17 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +org.slf4j.simpleLogger.defaultLogLevel=debug http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/af736749/extras/kafka.connect/client/README.md ---------------------------------------------------------------------- diff --git a/extras/kafka.connect/client/README.md b/extras/kafka.connect/client/README.md new file mode 100644 index 0000000..c7b8963 --- /dev/null +++ b/extras/kafka.connect/client/README.md @@ -0,0 +1,21 @@ +<!-- Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. --> + +This project creates a shaded executable jar that may be used to load and +read statements from a Kafka Topic in the format that the Rya Kafka Connect +Sinks expect. This tool is only meant to be used for testing/debugging Kafka +Connect integration. \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/af736749/extras/kafka.connect/client/pom.xml ---------------------------------------------------------------------- diff --git a/extras/kafka.connect/client/pom.xml b/extras/kafka.connect/client/pom.xml new file mode 100644 index 0000000..1ffc8d6 --- /dev/null +++ b/extras/kafka.connect/client/pom.xml @@ -0,0 +1,113 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.rya</groupId> + <artifactId>rya.kafka.connect.parent</artifactId> + <version>4.0.0-incubating-SNAPSHOT</version> + </parent> + + <artifactId>rya.kafka.connect.client</artifactId> + + <name>Apache Rya Kafka Connect - Client</name> + <description>Contains a client that may be used to load Statements into + a Kafka topic to be read by Kafka Connect.</description> + + <dependencies> + <!-- 1st party dependencies. --> + <dependency> + <groupId>org.apache.rya</groupId> + <artifactId>rya.sail</artifactId> + </dependency> + <dependency> + <groupId>org.apache.rya</groupId> + <artifactId>rya.kafka.connect.api</artifactId> + </dependency> + + <!-- 3rd party dependencies. --> + <dependency> + <groupId>org.eclipse.rdf4j</groupId> + <artifactId>rdf4j-model</artifactId> + </dependency> + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </dependency> + <dependency> + <groupId>com.beust</groupId> + <artifactId>jcommander</artifactId> + </dependency> + <dependency> + <groupId>com.github.stephenc.findbugs</groupId> + <artifactId>findbugs-annotations</artifactId> + </dependency> + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka-clients</artifactId> + </dependency> + + <!-- Testing dependencies. --> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-all</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-simple</artifactId> + <scope>test</scope> + </dependency> + </dependencies> + + <build> + <plugins> + <!-- Create an executable jar for the client application. --> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <executions> + <execution> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + <configuration> + <transformers> + <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> + <mainClass>org.apache.rya.kafka.connect.client.CLIDriver</mainClass> + </transformer> + <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" /> + </transformers> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> +</project> http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/af736749/extras/kafka.connect/client/src/main/java/org/apache/rya/kafka/connect/client/CLIDriver.java ---------------------------------------------------------------------- diff --git a/extras/kafka.connect/client/src/main/java/org/apache/rya/kafka/connect/client/CLIDriver.java b/extras/kafka.connect/client/src/main/java/org/apache/rya/kafka/connect/client/CLIDriver.java new file mode 100644 index 0000000..7ebf083 --- /dev/null +++ b/extras/kafka.connect/client/src/main/java/org/apache/rya/kafka/connect/client/CLIDriver.java @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.client; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import org.apache.rya.kafka.connect.client.RyaKafkaClientCommand.ArgumentsException; +import org.apache.rya.kafka.connect.client.RyaKafkaClientCommand.ExecutionException; +import org.apache.rya.kafka.connect.client.command.ReadStatementsCommand; +import org.apache.rya.kafka.connect.client.command.WriteStatementsCommand; +import org.eclipse.rdf4j.model.Statement; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * A CLI tool used to read/write {@link Statement}s to/from a Kafka topic using the format + * the Rya Kafka Connect Sinks expect. + */ +@DefaultAnnotation(NonNull.class) +public class CLIDriver { + + /** + * Maps from command strings to the object that performs the command. + */ + private static final ImmutableMap<String, RyaKafkaClientCommand> COMMANDS; + static { + final Set<Class<? extends RyaKafkaClientCommand>> commandClasses = new HashSet<>(); + commandClasses.add(ReadStatementsCommand.class); + commandClasses.add(WriteStatementsCommand.class); + final ImmutableMap.Builder<String, RyaKafkaClientCommand> builder = ImmutableMap.builder(); + for(final Class<? extends RyaKafkaClientCommand> commandClass : commandClasses) { + try { + final RyaKafkaClientCommand command = commandClass.newInstance(); + builder.put(command.getCommand(), command); + } catch (InstantiationException | IllegalAccessException e) { + System.err.println("Could not run the application because a RyaKafkaClientCommand is missing its empty constructor."); + e.printStackTrace(); + } + } + COMMANDS = builder.build(); + } + + private static final String USAGE = makeUsage(COMMANDS); + + public static void main(final String[] args) { + // If no command provided or the command isn't recognized, then print the usage. + if (args.length == 0 || !COMMANDS.containsKey(args[0])) { + System.out.println(USAGE); + System.exit(1); + } + + // Fetch the command that will be executed. + final String command = args[0]; + final String[] commandArgs = Arrays.copyOfRange(args, 1, args.length); + final RyaKafkaClientCommand clientCommand = COMMANDS.get(command); + + // Print usage if the arguments are invalid for the command. + if(!clientCommand.validArguments(commandArgs)) { + System.out.println(clientCommand.getUsage()); + System.exit(1); + } + + // Execute the command. + try { + clientCommand.execute(commandArgs); + } catch (ArgumentsException | ExecutionException e) { + System.err.println("The command: " + command + " failed to execute properly."); + e.printStackTrace(); + System.exit(2); + } + } + + private static String makeUsage(final ImmutableMap<String, RyaKafkaClientCommand> commands) { + final StringBuilder usage = new StringBuilder(); + usage.append("Usage: ").append(CLIDriver.class.getSimpleName()).append(" <command> (<argument> ... )\n"); + usage.append("\n"); + usage.append("Possible Commands:\n"); + + // Sort and find the max width of the commands. + final List<String> sortedCommandNames = Lists.newArrayList(commands.keySet()); + Collections.sort(sortedCommandNames); + + int maxCommandLength = 0; + for (final String commandName : sortedCommandNames) { + maxCommandLength = commandName.length() > maxCommandLength ? commandName.length() : maxCommandLength; + } + + // Add each command to the usage. + final String commandFormat = " %-" + maxCommandLength + "s - %s\n"; + for (final String commandName : sortedCommandNames) { + final String commandDescription = commands.get(commandName).getDescription(); + usage.append(String.format(commandFormat, commandName, commandDescription)); + } + + return usage.toString(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/af736749/extras/kafka.connect/client/src/main/java/org/apache/rya/kafka/connect/client/RyaKafkaClientCommand.java ---------------------------------------------------------------------- diff --git a/extras/kafka.connect/client/src/main/java/org/apache/rya/kafka/connect/client/RyaKafkaClientCommand.java b/extras/kafka.connect/client/src/main/java/org/apache/rya/kafka/connect/client/RyaKafkaClientCommand.java new file mode 100644 index 0000000..8a69a07 --- /dev/null +++ b/extras/kafka.connect/client/src/main/java/org/apache/rya/kafka/connect/client/RyaKafkaClientCommand.java @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.client; + +import com.beust.jcommander.JCommander; +import com.beust.jcommander.Parameter; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * A command that may be executed by the Rya Kafka Connect Client {@link CLIDriver}. + */ +@DefaultAnnotation(NonNull.class) +public interface RyaKafkaClientCommand { + + /** + * Command line parameters that are used by all commands that interact with Kafka. + */ + class KafkaParameters { + + @Parameter(names = { "--bootstrapServers", "-b" }, description = + "A list of host/port pairs to use for establishing the initial connection to the Kafka cluster.") + public String bootstrapServers = "localhost:9092"; + + @Parameter(names = { "--topic", "-t" }, required = true, description = "The Kafka topic that will be interacted with.") + public String topic; + } + + /** + * @return What a user would type into the command line to indicate + * they want to execute this command. + */ + public String getCommand(); + + /** + * @return Briefly describes what the command does. + */ + public String getDescription(); + + /** + * @return Describes what arguments may be provided to the command. + */ + default public String getUsage() { + final JCommander parser = new JCommander(new KafkaParameters()); + + final StringBuilder usage = new StringBuilder(); + parser.usage(usage); + return usage.toString(); + } + + /** + * Validates a set of arguments that may be passed into the command. + * + * @param args - The arguments that will be validated. (not null) + * @return {@code true} if the arguments are valid, otherwise {@code false}. + */ + public boolean validArguments(String[] args); + + /** + * Execute the command using the command line arguments. + * + * @param args - Command line arguments that configure how the command will execute. (not null) + * @throws ArgumentsException there was a problem with the provided arguments. + * @throws ExecutionException There was a problem while executing the command. + */ + public void execute(final String[] args) throws ArgumentsException, ExecutionException; + + /** + * A {@link RyaKafkaClientCommand} could not be executed because of a problem with + * the arguments that were provided to it. + */ + public static final class ArgumentsException extends Exception { + private static final long serialVersionUID = 1L; + + public ArgumentsException(final String message) { + super(message); + } + + public ArgumentsException(final String message, final Throwable cause) { + super(message, cause); + } + } + + /** + * A {@link RyaKafkaClientCommand} could not be executed. + */ + public static final class ExecutionException extends Exception { + private static final long serialVersionUID = 1L; + + public ExecutionException(final String message) { + super(message); + } + + public ExecutionException(final String message, final Throwable cause) { + super(message, cause); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/af736749/extras/kafka.connect/client/src/main/java/org/apache/rya/kafka/connect/client/command/ReadStatementsCommand.java ---------------------------------------------------------------------- diff --git a/extras/kafka.connect/client/src/main/java/org/apache/rya/kafka/connect/client/command/ReadStatementsCommand.java b/extras/kafka.connect/client/src/main/java/org/apache/rya/kafka/connect/client/command/ReadStatementsCommand.java new file mode 100644 index 0000000..bf7a647 --- /dev/null +++ b/extras/kafka.connect/client/src/main/java/org/apache/rya/kafka/connect/client/command/ReadStatementsCommand.java @@ -0,0 +1,120 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.client.command; + +import static java.util.Objects.requireNonNull; + +import java.util.Collections; +import java.util.Properties; +import java.util.Set; +import java.util.UUID; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.rya.kafka.connect.api.StatementsDeserializer; +import org.apache.rya.kafka.connect.client.RyaKafkaClientCommand; +import org.eclipse.rdf4j.model.Statement; + +import com.beust.jcommander.JCommander; +import com.beust.jcommander.ParameterException; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * Reads {@link Statement}s from a Kafka topic using the Rya Kafka Connect Sink format. + */ +@DefaultAnnotation(NonNull.class) +public class ReadStatementsCommand implements RyaKafkaClientCommand { + + @Override + public String getCommand() { + return "read"; + } + + @Override + public String getDescription() { + return "Reads Statements from the specified Kafka topic."; + } + + @Override + public boolean validArguments(final String[] args) { + boolean valid = true; + try { + new JCommander(new KafkaParameters(), args); + } catch(final ParameterException e) { + valid = false; + } + return valid; + } + + @Override + public void execute(final String[] args) throws ArgumentsException, ExecutionException { + requireNonNull(args); + + // Parse the command line arguments. + final KafkaParameters params = new KafkaParameters(); + try { + new JCommander(params, args); + } catch(final ParameterException e) { + throw new ArgumentsException("Could not read the Statements from the topic because of invalid command line parameters.", e); + } + + // Set up the consumer. + try(KafkaConsumer<String, Set<Statement>> consumer = makeConsumer(params)) { + // Subscribe to the configured topic. + consumer.subscribe(Collections.singleton(params.topic)); + + // Read the statements and write them to output. + for(final ConsumerRecord<String, Set<Statement>> record : consumer.poll(500)) { + for(final Statement stmt: record.value()) { + System.out.println( stmt ); + } + } + } + } + + private KafkaConsumer<String, Set<Statement>> makeConsumer(final KafkaParameters params) { + requireNonNull(params); + + // Configure which instance of Kafka to connect to. + final Properties props = new Properties(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, params.bootstrapServers); + + // Nothing meaningful is in the key and the values is a Set<BindingSet> object. + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StatementsDeserializer.class); + + // Use a UUID for the Group Id so that we never register as part of the same group as another consumer. + final String groupId = UUID.randomUUID().toString(); + props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); + + // Set a client id so that server side logging can be traced. + props.put(ConsumerConfig.CLIENT_ID_CONFIG, "Kafka-Connect-Client-" + groupId); + + // These consumers always start at the beginning and move forwards until the caller is finished with + // the returned stream, so never commit the consumer's progress. + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); + + return new KafkaConsumer<>(props); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/af736749/extras/kafka.connect/client/src/main/java/org/apache/rya/kafka/connect/client/command/WriteStatementsCommand.java ---------------------------------------------------------------------- diff --git a/extras/kafka.connect/client/src/main/java/org/apache/rya/kafka/connect/client/command/WriteStatementsCommand.java b/extras/kafka.connect/client/src/main/java/org/apache/rya/kafka/connect/client/command/WriteStatementsCommand.java new file mode 100644 index 0000000..83311f5 --- /dev/null +++ b/extras/kafka.connect/client/src/main/java/org/apache/rya/kafka/connect/client/command/WriteStatementsCommand.java @@ -0,0 +1,187 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.client.command; + +import static java.util.Objects.requireNonNull; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.HashSet; +import java.util.Properties; +import java.util.Set; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.rya.kafka.connect.api.StatementsSerializer; +import org.apache.rya.kafka.connect.client.RyaKafkaClientCommand; +import org.apache.rya.rdftriplestore.utils.RdfFormatUtils; +import org.eclipse.rdf4j.model.Statement; +import org.eclipse.rdf4j.rio.RDFFormat; +import org.eclipse.rdf4j.rio.RDFHandlerException; +import org.eclipse.rdf4j.rio.RDFParseException; +import org.eclipse.rdf4j.rio.RDFParser; +import org.eclipse.rdf4j.rio.Rio; +import org.eclipse.rdf4j.rio.UnsupportedRDFormatException; +import org.eclipse.rdf4j.rio.helpers.AbstractRDFHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.beust.jcommander.JCommander; +import com.beust.jcommander.Parameter; +import com.beust.jcommander.ParameterException; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * Writes {@link Statement}s to a Kafka topic using the Rya Kafka Connect Sink format. + */ +@DefaultAnnotation(NonNull.class) +public class WriteStatementsCommand implements RyaKafkaClientCommand { + private static final Logger log = LoggerFactory.getLogger(WriteStatementsCommand.class); + + /** + * Command line parameters that are used by this command to configure itself. + */ + public static class WriteParameters extends KafkaParameters { + @Parameter(names = {"--statementsFile", "-f"}, required = true, description = "The file of RDF statements to load into Rya Streams.") + public String statementsFile; + } + + @Override + public String getCommand() { + return "write"; + } + + @Override + public String getDescription() { + return "Writes Statements to the specified Kafka topic."; + } + + @Override + public boolean validArguments(final String[] args) { + boolean valid = true; + try { + new JCommander(new WriteParameters(), args); + } catch(final ParameterException e) { + valid = false; + } + return valid; + } + + /** + * @return Describes what arguments may be provided to the command. + */ + @Override + public String getUsage() { + final JCommander parser = new JCommander(new WriteParameters()); + + final StringBuilder usage = new StringBuilder(); + parser.usage(usage); + return usage.toString(); + } + + @Override + public void execute(final String[] args) throws ArgumentsException, ExecutionException { + requireNonNull(args); + + // Parse the command line arguments. + final WriteParameters params = new WriteParameters(); + try { + new JCommander(params, args); + } catch(final ParameterException e) { + throw new ArgumentsException("Could not stream the query's results because of invalid command line parameters.", e); + } + + // Verify the configured statements file path. + final Path statementsPath = Paths.get(params.statementsFile); + if(!statementsPath.toFile().exists()) { + throw new ArgumentsException("Could not load statements at path '" + statementsPath + "' because that " + + "file does not exist. Make sure you've entered the correct path."); + } + + // Create an RDF Parser whose format is derived from the statementPath's file extension. + final String filename = statementsPath.getFileName().toString(); + final RDFFormat format = RdfFormatUtils.forFileName(filename); + if (format == null) { + throw new UnsupportedRDFormatException("Unknown RDF format for the file: " + filename); + } + final RDFParser parser = Rio.createParser(format); + + // Set up the producer. + try(Producer<String, Set<Statement>> producer = makeProducer(params)) { + // Set a handler that writes the statements to the specified kafka topic. It writes batches of 5 Statements. + parser.setRDFHandler(new AbstractRDFHandler() { + + private Set<Statement> batch = new HashSet<>(5); + + @Override + public void startRDF() throws RDFHandlerException { + log.trace("Starting loading statements."); + } + + @Override + public void handleStatement(final Statement stmnt) throws RDFHandlerException { + log.trace("Adding statement."); + batch.add(stmnt); + + if(batch.size() == 5) { + flushBatch(); + } + } + + @Override + public void endRDF() throws RDFHandlerException { + if(!batch.isEmpty()) { + flushBatch(); + } + log.trace("Done."); + } + + private void flushBatch() { + log.trace("Flushing batch of size " + batch.size()); + producer.send(new ProducerRecord<>(params.topic, null, batch)); + batch = new HashSet<>(5); + producer.flush(); + } + }); + + // Do the parse and load. + try { + parser.parse(Files.newInputStream(statementsPath), ""); + } catch (RDFParseException | RDFHandlerException | IOException e) { + throw new ExecutionException("Could not load the RDF file's Statements into the Kafka topic.", e); + } + } + } + + private static Producer<String, Set<Statement>> makeProducer(final KafkaParameters params) { + requireNonNull(params); + final Properties props = new Properties(); + props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, params.bootstrapServers); + props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StatementsSerializer.class.getName()); + return new KafkaProducer<>(props); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/af736749/extras/kafka.connect/client/src/main/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/extras/kafka.connect/client/src/main/resources/log4j.properties b/extras/kafka.connect/client/src/main/resources/log4j.properties new file mode 100644 index 0000000..b07468c --- /dev/null +++ b/extras/kafka.connect/client/src/main/resources/log4j.properties @@ -0,0 +1,27 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# Root logger option +log4j.rootLogger=INFO, stdout + +# Direct log messages to stdout +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.Target=System.out +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/af736749/extras/kafka.connect/mongo-it/README.md ---------------------------------------------------------------------- diff --git a/extras/kafka.connect/mongo-it/README.md b/extras/kafka.connect/mongo-it/README.md new file mode 100644 index 0000000..b154b95 --- /dev/null +++ b/extras/kafka.connect/mongo-it/README.md @@ -0,0 +1,19 @@ +<!-- Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. --> + +This project contains integration tests that verify a Mongo DB backed +implementation of the Rya Kafka Connect Sink is working properly. \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/af736749/extras/kafka.connect/mongo-it/pom.xml ---------------------------------------------------------------------- diff --git a/extras/kafka.connect/mongo-it/pom.xml b/extras/kafka.connect/mongo-it/pom.xml new file mode 100644 index 0000000..ca439ea --- /dev/null +++ b/extras/kafka.connect/mongo-it/pom.xml @@ -0,0 +1,62 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.rya</groupId> + <artifactId>rya.kafka.connect.parent</artifactId> + <version>4.0.0-incubating-SNAPSHOT</version> + </parent> + + <artifactId>rya.kafka.connect.mongo.it</artifactId> + + <name>Apache Rya Kafka Connect - Mongo DB Integration Tests</name> + <description>Tests the Kafka Connect Sink that writes to a Rya instance backed by Mongo DB.</description> + + <dependencies> + <!-- 1st party dependencies. --> + <dependency> + <groupId>org.apache.rya</groupId> + <artifactId>rya.kafka.connect.mongo</artifactId> + </dependency> + + <!-- 3rd party dependencies. --> + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>connect-api</artifactId> + <scope>provided</scope> + </dependency> + + <!-- Testing dependencies. --> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.rya</groupId> + <artifactId>rya.test.mongo</artifactId> + <scope>test</scope> + </dependency> + </dependencies> +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/af736749/extras/kafka.connect/mongo-it/src/test/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkTaskIT.java ---------------------------------------------------------------------- diff --git a/extras/kafka.connect/mongo-it/src/test/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkTaskIT.java b/extras/kafka.connect/mongo-it/src/test/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkTaskIT.java new file mode 100644 index 0000000..55e7603 --- /dev/null +++ b/extras/kafka.connect/mongo-it/src/test/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkTaskIT.java @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.mongo; + +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.rya.api.client.Install.InstallConfiguration; +import org.apache.rya.api.client.RyaClient; +import org.apache.rya.api.client.mongo.MongoConnectionDetails; +import org.apache.rya.api.client.mongo.MongoRyaClientFactory; +import org.apache.rya.test.mongo.MongoITBase; +import org.junit.Test; + +/** + * Integration tests the methods of {@link MongoRyaSinkTask}. + */ +public class MongoRyaSinkTaskIT extends MongoITBase { + + @Test + public void instanceExists() throws Exception { + // Install an instance of Rya. + final String ryaInstanceName = "rya"; + final MongoConnectionDetails connectionDetails = new MongoConnectionDetails( + super.getMongoHostname(), + super.getMongoPort(), + Optional.empty(), + Optional.empty()); + + final InstallConfiguration installConfig = InstallConfiguration.builder() + .setEnableTableHashPrefix(false) + .setEnableEntityCentricIndex(false) + .setEnableFreeTextIndex(false) + .setEnableTemporalIndex(false) + .setEnablePcjIndex(false) + .setEnableGeoIndex(false) + .build(); + + final RyaClient ryaClient = MongoRyaClientFactory.build(connectionDetails, super.getMongoClient()); + ryaClient.getInstall().install(ryaInstanceName, installConfig); + + // Create the task that will be tested. + final MongoRyaSinkTask task = new MongoRyaSinkTask(); + + try { + // Configure the task to use the embedded Mongo DB instance for Rya. + final Map<String, String> config = new HashMap<>(); + config.put(MongoRyaSinkConfig.HOSTNAME, super.getMongoHostname()); + config.put(MongoRyaSinkConfig.PORT, "" + super.getMongoPort()); + config.put(MongoRyaSinkConfig.RYA_INSTANCE_NAME, "rya"); + + // This will pass because the Rya instance exists. + task.start(config); + } finally { + task.stop(); + } + } + + @Test(expected = ConnectException.class) + public void instanceDoesNotExist() throws Exception { + // Create the task that will be tested. + final MongoRyaSinkTask task = new MongoRyaSinkTask(); + + try { + // Configure the task to use the embedded Mongo DB instance for Rya. + final Map<String, String> config = new HashMap<>(); + config.put(MongoRyaSinkConfig.HOSTNAME, super.getMongoHostname()); + config.put(MongoRyaSinkConfig.PORT, "" + super.getMongoPort()); + config.put(MongoRyaSinkConfig.RYA_INSTANCE_NAME, "instance-does-not-exist"); + + // Starting the task will fail because the Rya instance does not exist. + task.start(config); + } finally { + task.stop(); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/af736749/extras/kafka.connect/mongo/README.md ---------------------------------------------------------------------- diff --git a/extras/kafka.connect/mongo/README.md b/extras/kafka.connect/mongo/README.md new file mode 100644 index 0000000..03b2c9b --- /dev/null +++ b/extras/kafka.connect/mongo/README.md @@ -0,0 +1,23 @@ +<!-- Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. --> + +This project is the Rya Kafka Connect Sink that writes to Mongo DB backed +instances of Rya. + +This project produces a shaded jar that may be installed into Kafka Connect. +For more information about how to install and configure this connector, see +[the manual](../../rya.manual/src/site/markdown/kafka-connect-integration.md). \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/af736749/extras/kafka.connect/mongo/pom.xml ---------------------------------------------------------------------- diff --git a/extras/kafka.connect/mongo/pom.xml b/extras/kafka.connect/mongo/pom.xml new file mode 100644 index 0000000..66eba1b --- /dev/null +++ b/extras/kafka.connect/mongo/pom.xml @@ -0,0 +1,79 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.rya</groupId> + <artifactId>rya.kafka.connect.parent</artifactId> + <version>4.0.0-incubating-SNAPSHOT</version> + </parent> + + <artifactId>rya.kafka.connect.mongo</artifactId> + + <name>Apache Rya Kafka Connect - Mongo DB</name> + <description>A Kafka Connect Sink that writes to a Rya instance backed by Mongo DB.</description> + + <dependencies> + <!-- 1st party dependencies. --> + <dependency> + <groupId>org.apache.rya</groupId> + <artifactId>rya.kafka.connect.api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.rya</groupId> + <artifactId>rya.indexing</artifactId> + </dependency> + + <!-- 3rd party dependencies. --> + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>connect-api</artifactId> + <scope>provided</scope> + </dependency> + + <!-- Testing dependencies. --> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + </dependencies> + + <build> + <plugins> + <!-- Build the uber jar that may be deployed to Kafka Connect. --> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <executions> + <execution> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/af736749/extras/kafka.connect/mongo/src/main/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkConfig.java ---------------------------------------------------------------------- diff --git a/extras/kafka.connect/mongo/src/main/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkConfig.java b/extras/kafka.connect/mongo/src/main/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkConfig.java new file mode 100644 index 0000000..3b48556 --- /dev/null +++ b/extras/kafka.connect/mongo/src/main/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkConfig.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.mongo; + +import java.util.Map; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigDef.Importance; +import org.apache.kafka.common.config.ConfigDef.Type; +import org.apache.rya.kafka.connect.api.sink.RyaSinkConfig; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * A Kafka Connect configuration that is used to configure {@link MongoRyaSinkConnector}s and {@link MongoRyaSinkTask}s. + */ +@DefaultAnnotation(NonNull.class) +public class MongoRyaSinkConfig extends RyaSinkConfig { + + public static final String HOSTNAME = "mongo.hostname"; + private static final String HOSTNAME_DOC = "The Mongo DB hostname the Sail connections will use."; + + public static final String PORT = "mongo.port"; + private static final String PORT_DOC = "The Mongo DB port the Sail connections will use."; + + public static final String USERNAME = "mongo.username"; + private static final String USERNAME_DOC = "The Mongo DB username the Sail connections will use."; + + public static final String PASSWORD = "mongo.password"; + private static final String PASSWORD_DOC = "The Mongo DB password the Sail connections will use."; + + public static final ConfigDef CONFIG_DEF = new ConfigDef() + .define(HOSTNAME, Type.STRING, Importance.HIGH, HOSTNAME_DOC) + .define(PORT, Type.INT, Importance.HIGH, PORT_DOC) + .define(USERNAME, Type.STRING, "", Importance.HIGH, USERNAME_DOC) + .define(PASSWORD, Type.PASSWORD, "", Importance.HIGH, PASSWORD_DOC); + static { + RyaSinkConfig.addCommonDefinitions(CONFIG_DEF); + } + + /** + * Constructs an instance of {@link MongoRyaSinkConfig}. + * + * @param originals - The key/value pairs that define the configuration. (not null) + */ + public MongoRyaSinkConfig(final Map<?, ?> originals) { + super(CONFIG_DEF, originals); + } + + /** + * @return The Mongo DB hostname the Sail connections wlll use. + */ + public String getHostname() { + return super.getString(HOSTNAME); + } + + /** + * @return The Mongo DB port the Sail connections will use. + */ + public int getPort() { + return super.getInt(PORT); + } + + /** + * @return The Mongo DB username the Sail connections will use. + */ + public String getUsername() { + return super.getString(USERNAME); + } + + /** + * @return The Mongo DB password the Sail connections will use. + */ + public String getPassword() { + return super.getPassword(PASSWORD).value(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/af736749/extras/kafka.connect/mongo/src/main/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkConnector.java ---------------------------------------------------------------------- diff --git a/extras/kafka.connect/mongo/src/main/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkConnector.java b/extras/kafka.connect/mongo/src/main/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkConnector.java new file mode 100644 index 0000000..fd91d07 --- /dev/null +++ b/extras/kafka.connect/mongo/src/main/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkConnector.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.mongo; + +import java.util.Map; + +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.connector.Task; +import org.apache.rya.kafka.connect.api.sink.RyaSinkConnector; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; +import edu.umd.cs.findbugs.annotations.Nullable; + +/** + * A {@link RyaSinkConnector} that uses a Mongo DB Rya backend when creating tasks. + */ +@DefaultAnnotation(NonNull.class) +public class MongoRyaSinkConnector extends RyaSinkConnector { + + @Nullable + private MongoRyaSinkConfig config = null; + + @Override + public void start(final Map<String, String> props) { + this.config = new MongoRyaSinkConfig( props ); + } + + @Override + protected AbstractConfig getConfig() { + if(config == null) { + throw new IllegalStateException("The configuration has not been set yet. Invoke start(Map) first."); + } + return config; + } + + @Override + public Class<? extends Task> taskClass() { + return MongoRyaSinkTask.class; + } + + @Override + public ConfigDef config() { + return MongoRyaSinkConfig.CONFIG_DEF; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/af736749/extras/kafka.connect/mongo/src/main/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkTask.java ---------------------------------------------------------------------- diff --git a/extras/kafka.connect/mongo/src/main/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkTask.java b/extras/kafka.connect/mongo/src/main/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkTask.java new file mode 100644 index 0000000..6887fdb --- /dev/null +++ b/extras/kafka.connect/mongo/src/main/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkTask.java @@ -0,0 +1,123 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.mongo; + +import static java.util.Objects.requireNonNull; + +import java.util.Arrays; +import java.util.Map; +import java.util.Optional; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.rya.api.client.RyaClient; +import org.apache.rya.api.client.RyaClientException; +import org.apache.rya.api.client.mongo.MongoConnectionDetails; +import org.apache.rya.api.client.mongo.MongoRyaClientFactory; +import org.apache.rya.api.log.LogUtils; +import org.apache.rya.api.persist.RyaDAOException; +import org.apache.rya.indexing.accumulo.ConfigUtils; +import org.apache.rya.kafka.connect.api.sink.RyaSinkTask; +import org.apache.rya.mongodb.MongoDBRdfConfiguration; +import org.apache.rya.rdftriplestore.inference.InferenceEngineException; +import org.apache.rya.sail.config.RyaSailFactory; +import org.eclipse.rdf4j.sail.Sail; +import org.eclipse.rdf4j.sail.SailException; + +import com.google.common.base.Strings; +import com.mongodb.MongoClient; +import com.mongodb.MongoCredential; +import com.mongodb.ServerAddress; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; +import edu.umd.cs.findbugs.annotations.Nullable; + +/** + * A {@link RyaSinkTask} that uses the Mongo DB implementation of Rya to store data. + */ +@DefaultAnnotation(NonNull.class) +public class MongoRyaSinkTask extends RyaSinkTask { + + @Override + protected void checkRyaInstanceExists(final Map<String, String> taskConfig) throws IllegalStateException { + requireNonNull(taskConfig); + + // Parse the configuration object. + final MongoRyaSinkConfig config = new MongoRyaSinkConfig(taskConfig); + @Nullable + final String username = Strings.isNullOrEmpty(config.getUsername()) ? null : config.getUsername(); + @Nullable + final char[] password = Strings.isNullOrEmpty(config.getPassword()) ? null : config.getPassword().toCharArray(); + + // Connect a Mongo Client to the configured Mongo DB instance. + final ServerAddress serverAddr = new ServerAddress(config.getHostname(), config.getPort()); + final boolean hasCredentials = username != null && password != null; + + try(MongoClient mongoClient = hasCredentials ? + new MongoClient(serverAddr, Arrays.asList(MongoCredential.createCredential(username, config.getRyaInstanceName(), password))) : + new MongoClient(serverAddr)) { + // Use a RyaClient to see if the configured instance exists. + // Create the Mongo Connection Details that describe the Mongo DB Server we are interacting with. + final MongoConnectionDetails connectionDetails = new MongoConnectionDetails( + config.getHostname(), + config.getPort(), + Optional.ofNullable(username), + Optional.ofNullable(password)); + + final RyaClient client = MongoRyaClientFactory.build(connectionDetails, mongoClient); + if(!client.getInstanceExists().exists( config.getRyaInstanceName() )) { + throw new ConnectException("The Rya Instance named " + + LogUtils.clean(config.getRyaInstanceName()) + " has not been installed."); + } + } catch(final RyaClientException e) { + throw new ConnectException("Unable to determine if the Rya Instance named " + + LogUtils.clean(config.getRyaInstanceName()) + " has been installed.", e); + } + } + + @Override + protected Sail makeSail(final Map<String, String> taskConfig) { + requireNonNull(taskConfig); + + // Parse the configuration object. + final MongoRyaSinkConfig config = new MongoRyaSinkConfig(taskConfig); + + // Move the configuration into a Rya Configuration object. + final MongoDBRdfConfiguration ryaConfig = new MongoDBRdfConfiguration(); + ConfigUtils.setUseMongo(ryaConfig, true); + ryaConfig.setMongoDBName( config.getRyaInstanceName() ); + ryaConfig.setTablePrefix( config.getRyaInstanceName() ); + ryaConfig.setMongoHostname( config.getHostname() ); + ryaConfig.setMongoPort( "" + config.getPort() ); + + if(!Strings.isNullOrEmpty(config.getUsername()) && !Strings.isNullOrEmpty(config.getPassword())) { + ryaConfig.setMongoUser( config.getUsername() ); + ryaConfig.setMongoPassword( config.getPassword() ); + } + + // Create the Sail object. + try { + return RyaSailFactory.getInstance(ryaConfig); + } catch (SailException | AccumuloException | AccumuloSecurityException | RyaDAOException | InferenceEngineException e) { + throw new ConnectException("Could not connect to the Rya Instance named " + config.getRyaInstanceName(), e); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/af736749/extras/kafka.connect/mongo/src/test/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkConfigTest.java ---------------------------------------------------------------------- diff --git a/extras/kafka.connect/mongo/src/test/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkConfigTest.java b/extras/kafka.connect/mongo/src/test/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkConfigTest.java new file mode 100644 index 0000000..d6c7c96 --- /dev/null +++ b/extras/kafka.connect/mongo/src/test/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkConfigTest.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.mongo; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.rya.kafka.connect.api.sink.RyaSinkConfig; +import org.junit.Test; + +/** + * Unit tests the methods of {@link MongoRyaSinkConfig}. + */ +public class MongoRyaSinkConfigTest { + + @Test + public void parses() { + final Map<String, String> properties = new HashMap<>(); + properties.put(MongoRyaSinkConfig.HOSTNAME, "127.0.0.1"); + properties.put(MongoRyaSinkConfig.PORT, "27017"); + properties.put(MongoRyaSinkConfig.USERNAME, "alice"); + properties.put(MongoRyaSinkConfig.PASSWORD, "alice1234!@"); + properties.put(RyaSinkConfig.RYA_INSTANCE_NAME, "rya"); + new MongoRyaSinkConfig(properties); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/af736749/extras/kafka.connect/pom.xml ---------------------------------------------------------------------- diff --git a/extras/kafka.connect/pom.xml b/extras/kafka.connect/pom.xml new file mode 100644 index 0000000..9a9702c --- /dev/null +++ b/extras/kafka.connect/pom.xml @@ -0,0 +1,66 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.rya</groupId> + <artifactId>rya.extras</artifactId> + <version>4.0.0-incubating-SNAPSHOT</version> + </parent> + + <artifactId>rya.kafka.connect.parent</artifactId> + + <name>Apache Rya Kafka Connect Parent</name> + <description>The parent pom file for any Rya Kafka Connect project.</description> + + <packaging>pom</packaging> + + <modules> + <module>api</module> + <module>accumulo</module> + <module>accumulo-it</module> + <module>mongo</module> + <module>mongo-it</module> + <module>client</module> + </modules> + + <properties> + <kafka.version>1.1.0</kafka.version> + </properties> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <configuration> + <archive> + <manifestEntries> + <Build-Version>${project.version}</Build-Version> + </manifestEntries> + </archive> + </configuration> + </plugin> + </plugins> + </build> +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/af736749/extras/pom.xml ---------------------------------------------------------------------- diff --git a/extras/pom.xml b/extras/pom.xml index bb6f914..65dd4cc 100644 --- a/extras/pom.xml +++ b/extras/pom.xml @@ -45,6 +45,7 @@ under the License. <module>rya.merger</module> <module>rya.streams</module> <module>rya.forwardchain</module> + <module>kafka.connect</module> </modules> <profiles> http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/af736749/extras/rya.manual/src/site/markdown/_index.md ---------------------------------------------------------------------- diff --git a/extras/rya.manual/src/site/markdown/_index.md b/extras/rya.manual/src/site/markdown/_index.md index d0b61c4..07dfe50 100644 --- a/extras/rya.manual/src/site/markdown/_index.md +++ b/extras/rya.manual/src/site/markdown/_index.md @@ -33,6 +33,7 @@ This project contains documentation about Apache Rya, a scalable RDF triple stor - [Inferencing](infer.md) - [MapReduce Interface](mapreduce.md) - [Rya Streams](rya-streams.md) +- [Kafka Connect Integration](kafka-connect-integration.md) # Samples - [Typical First Steps](sm-firststeps.md) http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/af736749/extras/rya.manual/src/site/markdown/index.md ---------------------------------------------------------------------- diff --git a/extras/rya.manual/src/site/markdown/index.md b/extras/rya.manual/src/site/markdown/index.md index 5372618..e686736 100644 --- a/extras/rya.manual/src/site/markdown/index.md +++ b/extras/rya.manual/src/site/markdown/index.md @@ -35,6 +35,7 @@ This project contains documentation about Apache Rya, a scalable RDF triple stor - [Shell Interface](shell.md) - [Incremental Join Maintenance Application (PCJ Updater)](pcj-updater.md) - [Rya Streams](rya-streams.md) +- [Kafka Connect Integration](kafka-connect-integration.md) # Samples - [Typical First Steps](sm-firststeps.md)