Repository: incubator-rya
Updated Branches:
  refs/heads/master ea91e26a5 -> af736749a


http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/af736749/extras/rya.manual/src/site/markdown/kafka-connect-integration.md
----------------------------------------------------------------------
diff --git a/extras/rya.manual/src/site/markdown/kafka-connect-integration.md 
b/extras/rya.manual/src/site/markdown/kafka-connect-integration.md
new file mode 100644
index 0000000..ed31c33
--- /dev/null
+++ b/extras/rya.manual/src/site/markdown/kafka-connect-integration.md
@@ -0,0 +1,493 @@
+<!--
+[comment]: # Licensed to the Apache Software Foundation (ASF) under one
+[comment]: # or more contributor license agreements.  See the NOTICE file
+[comment]: # distributed with this work for additional information
+[comment]: # regarding copyright ownership.  The ASF licenses this file
+[comment]: # to you under the Apache License, Version 2.0 (the
+[comment]: # "License"); you may not use this file except in compliance
+[comment]: # with the License.  You may obtain a copy of the License at
+[comment]: # 
+[comment]: #   http://www.apache.org/licenses/LICENSE-2.0
+[comment]: # 
+[comment]: # Unless required by applicable law or agreed to in writing,
+[comment]: # software distributed under the License is distributed on an
+[comment]: # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+[comment]: # KIND, either express or implied.  See the License for the
+[comment]: # specific language governing permissions and limitations
+[comment]: # under the License.
+-->
+
+# Kafka Connect Integration #
+
+Introduced in 4.0.0
+
+# Table of Contents #
+- [Introduction](#introduction)
+- [An Important Note About Deploying the 
Plugins](#an-important-note-about-deploying-the-plugins)
+- [Statement Serialization Format](#statement-serialization-format)
+- [Quick Start](#quick-start)
+- [Future Work](#future-work)
+
+<div id='introduction'/>
+
+## Introduction ##
+
+[Kafka Connect](https://kafka.apache.org/documentation/#connect) is a system 
+that is able to pull data from data sources into Kafka topics as well as write 
+data from Kafka topics into data sinks. This project implements a Kafka 
+Connector Sink for both Accumulo backed and Mongo backed instances of Rya. 
+
+<div id='an-important-note-about-deploying-the-plugins'/>
+
+## An Important Note About Deploying the Plugins ##
+
+While testing the application with both the Mongo Rya Sink and Accumulo Rya 
Sink
+uber jars installed, we were seeing ClassCastExceptions being thrown when some 
+code was trying to cast a ContextStatement into a Statement. Normally, this 
+wouldn't cause a problem. However, within Connect, this was caused by both uber
+jars containing a copy of the ContextStatement and Statement classes. Different
+Classloaders loaded each of those classes and the relationship between them
+was lost.
+
+For now, it's important that you only deploy one of the uber jars at a time.
+
+<div id='statement-serialization-format'/>
+
+## Statement Serialization Format ##
+
+Applications that would like to write to a Kafka topic using the format that
+the sink is able to recognize must write ```Set<Statement>``` objects by using 
the 
+[StatementsSerializer](../../../../../extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/StatementsSerializer.java).
+
+Rather than using the Confluent Schema Registry and Avro to serialize 
Statements, 
+we're going with RDF4J's Rio Binary Format. You may read more about how that 
+format is implemented [here](http://docs.rdf4j.org/rdf4j-binary/).
+          
+<div id='quick-start'/>
+
+## Quick Start ##
+
+This tutorial demonstrates how to install and start the Accumulo Rya Sink and 
+the Mongo Rya Sink by using the Open Source version of the Confluent platform.
+You can download it [here](https://www.confluent.io/download/). We're going to
+use the standalone version of Connect, but in a production environment you may
+want to use the distributed mode if there is a lot of data that needs to be 
+inserted.
+
+We suggest you go through the 
+[Confluent Platform Open Source Quick 
Start](https://docs.confluent.io/current/quickstart/cos-quickstart.html),
+so that you can ensure the Confluent platform is installed and ready for use. 
+We're using Confluent 4.1.0 in this tutorial, so be aware some things may 
change 
+when using newer versions of the platform. You may also find it beneficial to 
+go through the [Kafka Connect Quick 
Start](https://docs.confluent.io/current/connect/quickstart.html) 
+as well to see how Kafka Connects works in general.
+
+### Step 1: Download the applications ###
+
+You can fetch the artifacts you need to follow this Quick Start from our
+[downloads page](http://rya.apache.org/download/). Click on the release of
+interest and follow the "Central repository for Maven and other dependency
+managers" URL.
+
+Fetch the following four artifacts:
+
+Artifact Id | Type 
+--- | ---
+rya.shell | shaded jar
+rya.kafka.connect.client | shaded jar
+rya.kafka.connect.accumulo | shaded jar
+rya.kafka.connect.mongo | shaded jar
+
+### Step 2: Load statements into a kafka topic ###
+
+The sink connector that we will be demonstrating reads Statements from
+a Kafka topic and loads them into an instance of Rya. Just to keep things 
simple,
+lets create a topic that only has a single partition and is unreplicated. 
Within
+a production environment, you will want to tune these values based on how many
+concurrent workers you would like to use when processing input. 
+
+```
+kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 
--partitions 1 --topic statements
+```
+
+Next we need to create a file that contains the statements we will load into 
the topic.
+Name the file "quickstart-statements.nt" and use a text editor to write the 
following lines to it:
+
+```
+<urn:Alice> <urn:talksTo> <urn:Bob> .
+<urn:Bob> <urn:talksTo> <urn:Alice> .
+<urn:Bob> <urn:talksTo> <urn:Charlie> .
+<urn:Charlie> <urn:talksTo> <urn:Alice> .
+<urn:David> <urn:talksTo> <urn:Eve> .
+<urn:Eve> <urn:listensTo> <urn:Bob> .
+```
+
+Use the ```rya.kafka.connect.client``` to write the file's contents to the 
topic we just made.
+
+```
+java -jar rya.kafka.connect.client-4.0.0-incubating-shaded.jar write -f 
quickstart-statements.nt -t statements
+```
+ 
+You may verify the statements were written by using the read command.
+
+```
+java -jar rya.kafka.connect.client-4.0.0-incubating-shaded.jar read -t 
statements
+```
+At this point you need to decide whether you are going to use an Accumulo or 
+MongoDB backed instance of Rya. The following steps are pretty much the same
+for both backends, but they require different jars and commands. To follow
+the Accumulo set of steps, start with __Accumulo Step 3__ and go through 
+__Accumulo Step 5__. To follow the Mongo set of steps, then just skip ahead 
+to __Mongo Step 3__ and go through __Mongo Step 5__.
+
+### Accumulo Step 3: Installing a Rya instance ###
+
+The sink needs a place to put the Statements that we just wrote to the kafka 
topic.
+We're going to have it write to a Rya instance named "quickstart" on your 
Accumulo
+cluster. To do this, you'll need to use the Rya Shell. Here's roughly what an
+installation session should look like.
+
+```
+java -jar rya.shell-4.0.0-incubating-shaded.jar
+
+ _____                _____ _          _ _
+|  __ \              / ____| |        | | |
+| |__) |   _  __ _  | (___ | |__   ___| | |
+|  _  / | | |/ _` |  \___ \| '_ \ / _ \ | |
+| | \ \ |_| | (_| |  ____) | | | |  __/ | |
+|_|  \_\__, |\__,_| |_____/|_| |_|\___|_|_|
+        __/ |
+       |___/
+4.0.0-incubating
+
+Welcome to the Rya Shell.
+
+Execute one of the connect commands to start interacting with an instance of 
Rya.
+You may press tab at any time to see which of the commands are available.
+
+rya>connect-accumulo --zookeepers localhost --instanceName quickstart_instance 
--username quickstart
+Password: *******
+
+rya/quickstart_instance> install
+Rya Instance Name [default: rya_]: quickstart
+Use Shard Balancing (improves streamed input write speeds) [default: false]: f
+Use Entity Centric Indexing [default: true]: f
+Use Free Text Indexing [default: true]: f
+Use Temporal Indexing [default: true]: f
+Use Precomputed Join Indexing [default: true]: f
+
+A Rya instance will be installed using the following values:
+   Instance Name: quickstart
+   Use Shard Balancing: false
+   Use Entity Centric Indexing: false
+   Use Free Text Indexing: false
+   Use Temporal Indexing: false
+   Use Precomputed Join Indexing: false
+
+Continue with the install? (y/n) y
+The Rya instance named 'quickstart' has been installed.
+
+```
+We also want to ensure the instance we just installed does not have any 
Statements
+yet. We can do this using that same shell instance.
+
+```
+# 2. Verify no data has been inserted yet.
+rya/quickstart_instance> connect-rya --instance quickstart
+rya/quickstart_instance:quickstart> sparql-query
+Enter a SPARQL Query.
+Type '\e' to execute the current query.
+Type '\c' to clear the current query.
+SPARQL> select * where { ?s ?p ?o .}\e
+Executing Query...
+Query Results:
+
+rya/quickstart_instance:quickstart> 
+```
+
+### Accumulo Step 4: Installing and running the Accumulo Rya Sink ###
+
+At this point we have a kafka topic that is filled with RDF Statements that
+need to be loaded into Rya. We also have a Rya instance for them to be written
+to. All that is left is to install the Accumulo Rya Sink, configure it to 
+use those two endpoints, and then load it.
+
+The version of the Confluent platform we used for this quick start doesn't seem
+to be able to find new connector installs dynamically, so start by shutting 
+everything down.
+
+```
+confluent stop
+```
+
+Install the shaded jar that contains the Accumulo Rya Sink connector.
+
+```
+mkdir confluent-4.1.0/share/java/kafka-connect-rya-accumulo
+cp rya.kafka.connect.accumulo-4.0.0-incubating-shaded.jar 
confluent-4.1.0/share/java/kafka-connect-rya-accumulo
+```
+
+Then we need to configure the connector to read from the "statements" topic,
+specify which Accumulo cluster is hosting the Rya Instance, which Rya Instance 
+to write to, and specify the classes that define the Connector and the 
Converter 
+to use. This file has clear text passwords in it, so ensure it has appropriate 
+access restrictions applied to it. 
+
+```
+touch confluent-4.1.0/etc/kafka/connect-rya-accumulo-sink.properties
+```
+
+And then use your favorite text editor to fill in the following values:
+
+```
+name=rya-accumulo-sink
+connector.class=org.apache.rya.kafka.connect.accumulo.AccumuloRyaSinkConnector
+tasks.max=1
+value.converter=org.apache.rya.kafka.connect.api.StatementsConverter
+topics=statements
+accumulo.zookeepers=127.0.0.1
+accumulo.cluster.name=<your cluster name here>
+accumulo.username=<your username here>
+accumulo.password=<your password here>
+rya.instance.name=quickstart
+```
+
+Start the Confluent platform:
+
+```
+confluent start
+```
+
+Even after the start command says everything is started, it may take a moment 
for
+the load command to work. Rerun this command until you get a response from the 
+REST service printed to the screen and confluent reports it as loaded:
+
+```
+confluent load rya-accumulo-sink -d 
confluent-4.1.0/etc/kafka/connect-rya-accumulo-sink.properties
+```
+
+The connector will automatically start workers that load the data from the 
+configured topic into the configured Rya instance.
+
+### Accumulo Step 5: Verify statements were written to Rya ###
+
+At this point you should be able to rerun the query from __Accumulo Step 3__ 
and
+see that Statements have been added to the Rya instance.
+
+```
+rya/quickstart_instance:quickstart> sparql-query
+Enter a SPARQL Query.
+Type '\e' to execute the current query.
+Type '\c' to clear the current query.
+SPARQL> select * where { ?s ?p ?o . }\e
+Executing Query...
+Query Results:
+p,s,o
+urn:talksTo,urn:Alice,urn:Bob
+urn:talksTo,urn:Bob,urn:Alice
+urn:talksTo,urn:Bob,urn:Charlie
+urn:talksTo,urn:Charlie,urn:Alice
+urn:talksTo,urn:David,urn:Eve
+urn:listensTo,urn:Eve,urn:Bob
+Done.
+```
+
+### Mongo Step 3: Installing a Rya instance ###
+
+The sink needs a place to put the Statements that we just wrote to the kafka 
topic.
+We're going to have it write to a Rya instance named "quickstart" within your 
+Mongo database. To do this, you'll need to use the Rya Shell. Here's roughly 
+what an installation session should look like.
+
+```
+[root@localhost ~]# java -jar rya.shell-4.0.0-incubating-SNAPSHOT-shaded.jar
+ _____                _____ _          _ _
+|  __ \              / ____| |        | | |
+| |__) |   _  __ _  | (___ | |__   ___| | |
+|  _  / | | |/ _` |  \___ \| '_ \ / _ \ | |
+| | \ \ |_| | (_| |  ____) | | | |  __/ | |
+|_|  \_\__, |\__,_| |_____/|_| |_|\___|_|_|
+        __/ |
+       |___/
+4.0.0-incubating-SNAPSHOT
+
+Welcome to the Rya Shell.
+
+Execute one of the connect commands to start interacting with an instance of 
Rya.
+You may press tab at any time to see which of the commands are available.
+
+rya> connect-mongo --hostname localhost --port 27017
+Connected. You must select a Rya instance to interact with next.
+
+rya/localhost> install
+Rya Instance Name [default: rya_]: quickstart
+Use Free Text Indexing [default: true]: f
+Use Temporal Indexing [default: true]: f
+Use PCJ Indexing [default: true]: f
+
+A Rya instance will be installed using the following values:
+   Instance Name: quickstart
+   Use Free Text Indexing: false
+   Use Temporal Indexing: false
+   Use PCJ Indexing: false
+
+Continue with the install? (y/n) y
+The Rya instance named 'quickstart' has been installed.
+```
+We also want to ensure the instance we just installed does not have any 
Statements
+yet. We can do this using that same shell instance.
+
+```
+rya/localhost> connect-rya --instance quickstart
+rya/localhost:quickstart> select * where { ?s ?p ?o .}\e
+Command 'select * where { ?s ?p ?o .}\e' not found (for assistance press TAB)
+rya/localhost:quickstart> sparql-query
+Enter a SPARQL Query.
+Type '\e' to execute the current query.
+Type '\c' to clear the current query.
+SPARQL> select * where { ?s ?p ?o .}\e
+Executing Query...
+No Results Found.
+Done.
+rya/localhost:quickstart> 
+```
+
+### Mongo Step 4: Installing and running the Mongo Rya Sink ###
+
+At this point we have a kafka topic that is filled with RDF Statements that
+need to be loaded into Rya. We also have a Rya instance for them to be written
+to. All that is left is to install the Mongo Rya Sink, configure it to 
+use those two endpoints, and then load it.
+
+The version of the Confluent platform we used for this quick start doesn't seem
+to be able to find new connector installs dynamically, so start but shutting 
+everything down.
+
+```
+confluent stop
+```
+
+Install the shaded jar that contains the Mongo Rya Sink connector.
+
+```
+mkdir confluent-4.1.0/share/java/kafka-connect-rya-mongo
+cp rya.kafka.connect.mongo-4.0.0-incubating-shaded.jar 
confluent-4.1.0/share/java/kafka-connect-rya-mongo
+```
+
+Then we need to configure the connector to read from the "statements" topic,
+specify which Mongo database is hosting the Rya Instance, which Rya Instance 
+to write to, and specify the classes that define the Connector and the 
Converter 
+to use. This file has clear text passwords in it, so ensure it has appropriate 
+access restrictions applied to it. 
+
+```
+touch confluent-4.1.0/etc/kafka/connect-rya-mongo-sink.properties
+```
+
+And then use your favorite text editor to fill in the following values:
+
+```
+name=rya-mongo-sink
+connector.class=org.apache.rya.kafka.connect.mongo.MongoRyaSinkConnector
+tasks.max=1
+value.converter=org.apache.rya.kafka.connect.api.StatementsConverter
+topics=statements
+mongo.hostname=127.0.0.1
+mongo.port=27017
+mongo.username=
+mongo.password=
+rya.instance.name=quickstart
+```
+
+Start the Confluent platform:
+
+```
+confluent start
+```
+
+Even after the start command says everything is started, it may take a moment 
for
+the load command to work. Rerun this command until you get a response from the 
+REST service printed to the screen and confluent reports it as loaded:
+
+```
+confluent load rya-mongo-sink -d 
confluent-4.1.0/etc/kafka/connect-rya-mongo-sink.properties
+```
+
+The connector will automatically start workers that load the data from the 
+configured topic into the configured Rya instance.
+
+### Mongo Step 5: Verify statements were written to Rya ###
+
+At this point you should be able to rerun the query from __Mongo Step 3__ and
+see that statements have been added to the Rya instance.
+
+```
+rya/localhost:quickstart> sparql-query
+Enter a SPARQL Query.
+Type '\e' to execute the current query.
+Type '\c' to clear the current query.
+SPARQL> select * where { ?s ?p ?o . }\e
+Executing Query...
+Query Results:
+p,s,o
+urn:talksTo,urn:Alice,urn:Bob
+urn:talksTo,urn:Bob,urn:Charlie
+urn:talksTo,urn:Charlie,urn:Alice
+urn:talksTo,urn:Bob,urn:Alice
+urn:talksTo,urn:David,urn:Eve
+urn:listensTo,urn:Eve,urn:Bob
+Done.
+```
+
+<div id='future-work'/>
+
+## Future Work ##
+
+### Remove passwords from connector configuration files ###
+
+It's a security flaw that the connector's passwords for connecting to Accumulo
+and Mongo are in clear text within the configuration files. The log files hide
+them when they log the configuration, but it's still written to standard out
+when the use the confluent command to load the connector. There should be 
+another way for the connector to receive the credentials required to connect.
+
+### Support both Mongo and Accumulo connectors at the same time ###
+
+Currently, you can only use the Mongo Rya Sink or the Accumulo Rya Sink because
+of the problem mentioned in
+[An Important Note About Deploying the 
Plugins](#an-important-note-about-deploying-the-plugins).
+
+We could have a single uber jar plugin that supports both backends. We could 
also
+just not use uber jars and include all of the jars that the plugins depend on
+in a single folder.
+
+### Visibility Statement Support ###
+
+The sinks are able to write Statement objects, but that means none of those
+statements are allowed to have visibility expressions. If they do, then the
+visibilities will be dropped when the statement is inserted.
+
+It would be nice if the Rya implementation of the RDF4J RepositoryConnection
+were able to figure out when a Statement is actually a VisibilityStatement. It
+could then retain the visibility expression when it is persisted.
+
+### Visibility Binding Set Sink ###
+
+The Visibility Binding Sets that are stored within the Precomputed Join index
+could be generated by an external system (such as Rya Streams) and written to
+a Kafka topic. It would be convenient to also have a connector that is able to
+write those values to the index.
+
+### Rya Kafka Connect Source ###
+
+Rya Streams would benefit from having a Kafka Connect Source that is able to
+determine when new Statements have been added to the core tables, and then 
write
+those Visibility Statements to a Kafka topic.
+
+### Geo Indexing ###
+
+It's difficult to get the Geo indexing into the Sail object that represents
+Rya because the geo project is optional. While optional, we don't use 
dependency
+injection to get the GeoRyaSailFactory into the application instead of the
+normal Rya Sail Factory. An improvement to this project would be to resolve
+that problem so that it may do geo indexing while inserting statements.
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/af736749/extras/rya.manual/src/site/site.xml
----------------------------------------------------------------------
diff --git a/extras/rya.manual/src/site/site.xml 
b/extras/rya.manual/src/site/site.xml
index 2af3373..a102902 100644
--- a/extras/rya.manual/src/site/site.xml
+++ b/extras/rya.manual/src/site/site.xml
@@ -48,7 +48,8 @@ under the License.
         <item name="MapReduce Interface" href="mapreduce.html"/>
         <item name="Shell Interface" href="shell.html"/>
         <item name="Incremental Join Maintenance" href="pcj-updater.html"/>
-        <item name="Rya Streams" href="rya-streams.html"/>
+        <item name="Rya Streams" href="rya-streams.html"/>
+        <item name="Kafka Connect Integration" 
href="kafka-connect-integration.html"/>
     </menu>
 
     <menu name="Samples">

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/af736749/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 699fccf..b46110f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -136,6 +136,7 @@ under the License.
         <jcip.version>1.0-1</jcip.version>
         <kafka.version>0.10.0.1</kafka.version>
         <kryo.version>3.0.3</kryo.version>
+        <jcabi-manifeses.version>1.1</jcabi-manifeses.version>
         
         <!-- set profile property defaults -->
         <skip.rya.it>true</skip.rya.it>  <!-- modified by  -P enable-it  -->
@@ -555,6 +556,21 @@ under the License.
                 <version>${project.version}</version>
             </dependency>
             <dependency>
+                <groupId>org.apache.rya</groupId>
+                <artifactId>rya.kafka.connect.api</artifactId>
+                <version>${project.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.rya</groupId>
+                <artifactId>rya.kafka.connect.accumulo</artifactId>
+                <version>${project.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.rya</groupId>
+                <artifactId>rya.kafka.connect.mongo</artifactId>
+                <version>${project.version}</version>
+           </dependency>
+            <dependency>
                 <groupId>org.apache.accumulo</groupId>
                 <artifactId>accumulo-core</artifactId>
                 <version>${accumulo.version}</version>
@@ -564,11 +580,6 @@ under the License.
                 <artifactId>accumulo-start</artifactId>
                 <version>${accumulo.version}</version>
             </dependency>
-           <dependency>
-                <groupId>org.eclipse.rdf4j</groupId>
-                <artifactId>rdf4j-runtime-osgi</artifactId>
-                <version>${org.eclipse.rdf4j.version}</version>
-            </dependency>
             <dependency>
                 <groupId>org.eclipse.rdf4j</groupId>
                 <artifactId>rdf4j-runtime</artifactId>
@@ -604,6 +615,22 @@ under the License.
                 <artifactId>rdf4j-queryresultio-text</artifactId>
                 <version>${org.eclipse.rdf4j.version}</version>
             </dependency>
+
+            <dependency>
+                <groupId>org.eclipse.rdf4j</groupId>
+                <artifactId>rdf4j-rio-api</artifactId>
+                <version>${org.eclipse.rdf4j.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>org.eclipse.rdf4j</groupId>
+                <artifactId>rdf4j-rio-binary</artifactId>
+                <version>${org.eclipse.rdf4j.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>org.eclipse.rdf4j</groupId>
+                <artifactId>rdf4j-rio-datatypes</artifactId>
+                <version>${org.eclipse.rdf4j.version}</version>
+            </dependency>
             <dependency>
                 <groupId>org.eclipse.rdf4j</groupId>
                 <artifactId>rdf4j-rio-nquads</artifactId>
@@ -709,6 +736,11 @@ under the License.
             </dependency>
             <dependency>
                 <groupId>org.slf4j</groupId>
+                <artifactId>slf4j-simple</artifactId>
+                <version>${slf4j.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>org.slf4j</groupId>
                 <artifactId>slf4j-log4j12</artifactId>
                 <version>${slf4j.version}</version>
             </dependency>
@@ -1075,6 +1107,11 @@ under the License.
             </dependency>
             <dependency>
                 <groupId>org.apache.kafka</groupId>
+                <artifactId>connect-api</artifactId>
+                <version>${kafka.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.kafka</groupId>
                 <artifactId>kafka_2.11</artifactId>
                 <version>${kafka.version}</version>
                 <classifier>test</classifier>
@@ -1084,6 +1121,11 @@ under the License.
                 <artifactId>kryo</artifactId>
                 <version>${kryo.version}</version>
             </dependency>
+            <dependency>
+                <groupId>com.jcabi</groupId>
+                <artifactId>jcabi-manifests</artifactId>
+                <version>${jcabi-manifeses.version}</version>
+            </dependency>
         </dependencies>
     </dependencyManagement>
 

Reply via email to