RYA-487 Closes #296, Implement Kafka Connect Sink implementations for Accumulo and Mongo DB backed Rya.
Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/af736749 Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/af736749 Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/af736749 Branch: refs/heads/master Commit: af736749a375c54fc09efddbc749395f3e743937 Parents: ea91e26 Author: kchilton2 <[email protected]> Authored: Tue Apr 17 15:10:26 2018 -0400 Committer: Valiyil <[email protected]> Committed: Wed May 16 11:52:55 2018 -0400 ---------------------------------------------------------------------- .../rya/accumulo/AccumuloRdfConfiguration.java | 96 ++-- .../rya/mongodb/MongoDBRdfConfiguration.java | 8 +- .../rya/indexing/accumulo/ConfigUtils.java | 3 + extras/kafka.connect/README.md | 22 + extras/kafka.connect/accumulo-it/README.md | 19 + extras/kafka.connect/accumulo-it/pom.xml | 62 +++ .../connect/accumulo/AccumuloRyaSinkTaskIT.java | 100 ++++ extras/kafka.connect/accumulo/README.md | 23 + extras/kafka.connect/accumulo/pom.xml | 79 +++ .../connect/accumulo/AccumuloRyaSinkConfig.java | 97 ++++ .../accumulo/AccumuloRyaSinkConnector.java | 66 +++ .../connect/accumulo/AccumuloRyaSinkTask.java | 112 +++++ .../accumulo/AccumuloRyaSinkConfigTest.java | 42 ++ extras/kafka.connect/api/README.md | 20 + extras/kafka.connect/api/pom.xml | 96 ++++ .../kafka/connect/api/StatementsConverter.java | 62 +++ .../connect/api/StatementsDeserializer.java | 87 ++++ .../rya/kafka/connect/api/StatementsSerde.java | 57 +++ .../kafka/connect/api/StatementsSerializer.java | 77 +++ .../kafka/connect/api/sink/RyaSinkConfig.java | 67 +++ .../connect/api/sink/RyaSinkConnector.java | 69 +++ .../rya/kafka/connect/api/sink/RyaSinkTask.java | 145 ++++++ .../kafka/connect/api/StatementsSerdeTest.java | 84 ++++ .../kafka/connect/api/sink/RyaSinkTaskTest.java | 264 ++++++++++ .../src/test/resources/simplelogger.properties | 17 + extras/kafka.connect/client/README.md | 21 + extras/kafka.connect/client/pom.xml | 113 +++++ .../rya/kafka/connect/client/CLIDriver.java | 121 +++++ .../connect/client/RyaKafkaClientCommand.java | 115 +++++ .../client/command/ReadStatementsCommand.java | 120 +++++ .../client/command/WriteStatementsCommand.java | 187 +++++++ .../client/src/main/resources/log4j.properties | 27 + extras/kafka.connect/mongo-it/README.md | 19 + extras/kafka.connect/mongo-it/pom.xml | 62 +++ .../kafka/connect/mongo/MongoRyaSinkTaskIT.java | 95 ++++ extras/kafka.connect/mongo/README.md | 23 + extras/kafka.connect/mongo/pom.xml | 79 +++ .../kafka/connect/mongo/MongoRyaSinkConfig.java | 94 ++++ .../connect/mongo/MongoRyaSinkConnector.java | 63 +++ .../kafka/connect/mongo/MongoRyaSinkTask.java | 123 +++++ .../connect/mongo/MongoRyaSinkConfigTest.java | 42 ++ extras/kafka.connect/pom.xml | 66 +++ extras/pom.xml | 1 + extras/rya.manual/src/site/markdown/_index.md | 1 + extras/rya.manual/src/site/markdown/index.md | 1 + .../site/markdown/kafka-connect-integration.md | 493 +++++++++++++++++++ extras/rya.manual/src/site/site.xml | 3 +- pom.xml | 52 +- 48 files changed, 3637 insertions(+), 58 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/af736749/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRdfConfiguration.java ---------------------------------------------------------------------- diff --git a/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRdfConfiguration.java b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRdfConfiguration.java index ed76b4a..cbfe2ea 100644 --- a/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRdfConfiguration.java +++ b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRdfConfiguration.java @@ -62,14 +62,14 @@ public class AccumuloRdfConfiguration extends RdfCloudTripleStoreConfiguration { super(); } - public AccumuloRdfConfiguration(Configuration other) { + public AccumuloRdfConfiguration(final Configuration other) { super(other); } - public AccumuloRdfConfigurationBuilder getBuilder() { + public static AccumuloRdfConfigurationBuilder getBuilder() { return new AccumuloRdfConfigurationBuilder(); } - + /** * Creates an AccumuloRdfConfiguration object from a Properties file. This method assumes * that all values in the Properties file are Strings and that the Properties file uses the keys below. @@ -94,26 +94,26 @@ public class AccumuloRdfConfiguration extends RdfCloudTripleStoreConfiguration { * @param props - Properties file containing Accumulo specific configuration parameters * @return AccumumuloRdfConfiguration with properties set */ - - public static AccumuloRdfConfiguration fromProperties(Properties props) { + + public static AccumuloRdfConfiguration fromProperties(final Properties props) { return AccumuloRdfConfigurationBuilder.fromProperties(props).build(); } - + @Override public AccumuloRdfConfiguration clone() { return new AccumuloRdfConfiguration(this); } - + /** * Sets the Accumulo username from the configuration object that is meant to * be used when connecting a {@link Connector} to Accumulo. * */ - public void setAccumuloUser(String user) { + public void setAccumuloUser(final String user) { Preconditions.checkNotNull(user); set(CLOUDBASE_USER, user); } - + /** * Get the Accumulo username from the configuration object that is meant to * be used when connecting a {@link Connector} to Accumulo. @@ -121,19 +121,19 @@ public class AccumuloRdfConfiguration extends RdfCloudTripleStoreConfiguration { * @return The username if one could be found; otherwise {@code null}. */ public String getAccumuloUser(){ - return get(CLOUDBASE_USER); + return get(CLOUDBASE_USER); } - + /** * Sets the Accumulo password from the configuration object that is meant to * be used when connecting a {@link Connector} to Accumulo. * */ - public void setAccumuloPassword(String password) { + public void setAccumuloPassword(final String password) { Preconditions.checkNotNull(password); set(CLOUDBASE_PASSWORD, password); } - + /** * Get the Accumulo password from the configuration object that is meant to * be used when connecting a {@link Connector} to Accumulo. @@ -143,18 +143,18 @@ public class AccumuloRdfConfiguration extends RdfCloudTripleStoreConfiguration { public String getAccumuloPassword() { return get(CLOUDBASE_PASSWORD); } - + /** * Sets a comma delimited list of the names of the Zookeeper servers from * the configuration object that is meant to be used when connecting a * {@link Connector} to Accumulo. * */ - public void setAccumuloZookeepers(String zookeepers) { + public void setAccumuloZookeepers(final String zookeepers) { Preconditions.checkNotNull(zookeepers); set(CLOUDBASE_ZOOKEEPERS, zookeepers); } - + /** * Get a comma delimited list of the names of the Zookeeper servers from * the configuration object that is meant to be used when connecting a @@ -165,17 +165,17 @@ public class AccumuloRdfConfiguration extends RdfCloudTripleStoreConfiguration { public String getAccumuloZookeepers() { return get(CLOUDBASE_ZOOKEEPERS); } - + /** * Sets the Accumulo instance name from the configuration object that is * meant to be used when connecting a {@link Connector} to Accumulo. * */ - public void setAccumuloInstance(String instance) { + public void setAccumuloInstance(final String instance) { Preconditions.checkNotNull(instance); set(CLOUDBASE_INSTANCE, instance); } - + /** * Get the Accumulo instance name from the configuration object that is * meant to be used when connecting a {@link Connector} to Accumulo. @@ -185,15 +185,15 @@ public class AccumuloRdfConfiguration extends RdfCloudTripleStoreConfiguration { public String getAccumuloInstance() { return get(CLOUDBASE_INSTANCE); } - + /** * Tells the Rya instance to use a Mock instance of Accumulo as its backing. * */ - public void setUseMockAccumulo(boolean useMock) { + public void setUseMockAccumulo(final boolean useMock) { setBoolean(USE_MOCK_INSTANCE, useMock); } - + /** * Indicates that a Mock instance of Accumulo is being used to back the Rya instance. * @@ -202,12 +202,12 @@ public class AccumuloRdfConfiguration extends RdfCloudTripleStoreConfiguration { public boolean getUseMockAccumulo() { return getBoolean(USE_MOCK_INSTANCE, false); } - + /** * @param enabled - {@code true} if the Rya instance is backed by a mock Accumulo; otherwise {@code false}. */ - public void useMockInstance(boolean enabled) { + public void useMockInstance(final boolean enabled) { super.setBooleanIfUnset(USE_MOCK_INSTANCE, enabled); } @@ -224,7 +224,7 @@ public class AccumuloRdfConfiguration extends RdfCloudTripleStoreConfiguration { * @param username - The Accumulo username from the configuration object that is meant to * be used when connecting a {@link Connector} to Accumulo. */ - public void setUsername(String username) { + public void setUsername(final String username) { super.set(CLOUDBASE_USER, username); } @@ -242,7 +242,7 @@ public class AccumuloRdfConfiguration extends RdfCloudTripleStoreConfiguration { * @param password - The Accumulo password from the configuration object that is meant to * be used when connecting a {@link Connector} to Accumulo. */ - public void setPassword(String password) { + public void setPassword(final String password) { super.set(CLOUDBASE_PASSWORD, password); } @@ -260,7 +260,7 @@ public class AccumuloRdfConfiguration extends RdfCloudTripleStoreConfiguration { * @param instanceName - The Accumulo instance name from the configuration object that is * meant to be used when connecting a {@link Connector} to Accumulo. */ - public void setInstanceName(String instanceName) { + public void setInstanceName(final String instanceName) { super.set(CLOUDBASE_INSTANCE, instanceName); } @@ -279,7 +279,7 @@ public class AccumuloRdfConfiguration extends RdfCloudTripleStoreConfiguration { * the configuration object that is meant to be used when connecting a * {@link Connector} to Accumulo. */ - public void setZookeepers(String zookeepers) { + public void setZookeepers(final String zookeepers) { super.set(CLOUDBASE_ZOOKEEPERS, zookeepers); } @@ -295,14 +295,14 @@ public class AccumuloRdfConfiguration extends RdfCloudTripleStoreConfiguration { } public Authorizations getAuthorizations() { - String[] auths = getAuths(); + final String[] auths = getAuths(); if (auths == null || auths.length == 0) { return AccumuloRdfConstants.ALL_AUTHORIZATIONS; } return new Authorizations(auths); } - public void setMaxRangesForScanner(Integer max) { + public void setMaxRangesForScanner(final Integer max) { setInt(MAXRANGES_SCANNER, max); } @@ -310,9 +310,9 @@ public class AccumuloRdfConfiguration extends RdfCloudTripleStoreConfiguration { return getInt(MAXRANGES_SCANNER, 2); } - public void setAdditionalIndexers(Class<? extends AccumuloIndexer>... indexers) { - List<String> strs = Lists.newArrayList(); - for (Class<? extends AccumuloIndexer> ai : indexers){ + public void setAdditionalIndexers(final Class<? extends AccumuloIndexer>... indexers) { + final List<String> strs = Lists.newArrayList(); + for (final Class<? extends AccumuloIndexer> ai : indexers){ strs.add(ai.getName()); } @@ -326,25 +326,25 @@ public class AccumuloRdfConfiguration extends RdfCloudTripleStoreConfiguration { return getBoolean(CONF_FLUSH_EACH_UPDATE, true); } - public void setFlush(boolean flush){ + public void setFlush(final boolean flush){ setBoolean(CONF_FLUSH_EACH_UPDATE, flush); } - public void setAdditionalIterators(IteratorSetting... additionalIterators){ + public void setAdditionalIterators(final IteratorSetting... additionalIterators){ //TODO do we need to worry about cleaning up this.set(ITERATOR_SETTINGS_SIZE, Integer.toString(additionalIterators.length)); int i = 0; - for(IteratorSetting iterator : additionalIterators) { + for(final IteratorSetting iterator : additionalIterators) { this.set(String.format(ITERATOR_SETTINGS_NAME, i), iterator.getName()); this.set(String.format(ITERATOR_SETTINGS_CLASS, i), iterator.getIteratorClass()); this.set(String.format(ITERATOR_SETTINGS_PRIORITY, i), Integer.toString(iterator.getPriority())); - Map<String, String> options = iterator.getOptions(); + final Map<String, String> options = iterator.getOptions(); this.set(String.format(ITERATOR_SETTINGS_OPTIONS_SIZE, i), Integer.toString(options.size())); - Iterator<Entry<String, String>> it = options.entrySet().iterator(); + final Iterator<Entry<String, String>> it = options.entrySet().iterator(); int j = 0; while(it.hasNext()) { - Entry<String, String> item = it.next(); + final Entry<String, String> item = it.next(); this.set(String.format(ITERATOR_SETTINGS_OPTIONS_KEY, i, j), item.getKey()); this.set(String.format(ITERATOR_SETTINGS_OPTIONS_VALUE, i, j), item.getValue()); j++; @@ -354,22 +354,22 @@ public class AccumuloRdfConfiguration extends RdfCloudTripleStoreConfiguration { } public IteratorSetting[] getAdditionalIterators(){ - int size = Integer.valueOf(this.get(ITERATOR_SETTINGS_SIZE, "0")); + final int size = Integer.valueOf(this.get(ITERATOR_SETTINGS_SIZE, "0")); if(size == 0) { return new IteratorSetting[0]; } - IteratorSetting[] settings = new IteratorSetting[size]; + final IteratorSetting[] settings = new IteratorSetting[size]; for(int i = 0; i < size; i++) { - String name = this.get(String.format(ITERATOR_SETTINGS_NAME, i)); - String iteratorClass = this.get(String.format(ITERATOR_SETTINGS_CLASS, i)); - int priority = Integer.valueOf(this.get(String.format(ITERATOR_SETTINGS_PRIORITY, i))); + final String name = this.get(String.format(ITERATOR_SETTINGS_NAME, i)); + final String iteratorClass = this.get(String.format(ITERATOR_SETTINGS_CLASS, i)); + final int priority = Integer.valueOf(this.get(String.format(ITERATOR_SETTINGS_PRIORITY, i))); - int optionsSize = Integer.valueOf(this.get(String.format(ITERATOR_SETTINGS_OPTIONS_SIZE, i))); - Map<String, String> options = new HashMap<>(optionsSize); + final int optionsSize = Integer.valueOf(this.get(String.format(ITERATOR_SETTINGS_OPTIONS_SIZE, i))); + final Map<String, String> options = new HashMap<>(optionsSize); for(int j = 0; j < optionsSize; j++) { - String key = this.get(String.format(ITERATOR_SETTINGS_OPTIONS_KEY, i, j)); - String value = this.get(String.format(ITERATOR_SETTINGS_OPTIONS_VALUE, i, j)); + final String key = this.get(String.format(ITERATOR_SETTINGS_OPTIONS_KEY, i, j)); + final String value = this.get(String.format(ITERATOR_SETTINGS_OPTIONS_VALUE, i, j)); options.put(key, value); } settings[i] = new IteratorSetting(priority, name, iteratorClass, options); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/af736749/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBRdfConfiguration.java ---------------------------------------------------------------------- diff --git a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBRdfConfiguration.java b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBRdfConfiguration.java index d49f2ee..b207d79 100644 --- a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBRdfConfiguration.java +++ b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBRdfConfiguration.java @@ -274,17 +274,17 @@ public class MongoDBRdfConfiguration extends RdfCloudTripleStoreConfiguration { * on their child subtrees. * @param value whether to use aggregation pipeline optimization. */ - public void setUseAggregationPipeline(boolean value) { + public void setUseAggregationPipeline(final boolean value) { setBoolean(USE_AGGREGATION_PIPELINE, value); } @Override public List<Class<QueryOptimizer>> getOptimizers() { - List<Class<QueryOptimizer>> optimizers = super.getOptimizers(); + final List<Class<QueryOptimizer>> optimizers = super.getOptimizers(); if (getUseAggregationPipeline()) { - Class<?> cl = AggregationPipelineQueryOptimizer.class; + final Class<?> cl = AggregationPipelineQueryOptimizer.class; @SuppressWarnings("unchecked") - Class<QueryOptimizer> optCl = (Class<QueryOptimizer>) cl; + final Class<QueryOptimizer> optCl = (Class<QueryOptimizer>) cl; optimizers.add(optCl); } return optimizers; http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/af736749/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/ConfigUtils.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/ConfigUtils.java b/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/ConfigUtils.java index d2fe58a..77c77cd 100644 --- a/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/ConfigUtils.java +++ b/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/ConfigUtils.java @@ -438,6 +438,9 @@ public class ConfigUtils { return Optional.fromNullable(conf.get(FLUO_APP_NAME)); } + public static void setUseMongo(final Configuration conf, final boolean useMongo) { + conf.setBoolean(USE_MONGO, useMongo); + } public static boolean getUseMongo(final Configuration conf) { return conf.getBoolean(USE_MONGO, false); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/af736749/extras/kafka.connect/README.md ---------------------------------------------------------------------- diff --git a/extras/kafka.connect/README.md b/extras/kafka.connect/README.md new file mode 100644 index 0000000..03b63c2 --- /dev/null +++ b/extras/kafka.connect/README.md @@ -0,0 +1,22 @@ +<!-- Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. --> + +The parent project for all Rya Kafka Connect work. All projects that are part +of that system must use this project's pom as their parent pom. + +For more information about the Rya's Kafka Connect support, see +[the manual](../rya.manual/src/site/markdown/kafka-connect-integration.md). \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/af736749/extras/kafka.connect/accumulo-it/README.md ---------------------------------------------------------------------- diff --git a/extras/kafka.connect/accumulo-it/README.md b/extras/kafka.connect/accumulo-it/README.md new file mode 100644 index 0000000..abcc12d --- /dev/null +++ b/extras/kafka.connect/accumulo-it/README.md @@ -0,0 +1,19 @@ +<!-- Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. --> + +This project contains integration tests that verify an Accumulo backed +implementation of the Rya Kafka Connect Sink is working properly. \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/af736749/extras/kafka.connect/accumulo-it/pom.xml ---------------------------------------------------------------------- diff --git a/extras/kafka.connect/accumulo-it/pom.xml b/extras/kafka.connect/accumulo-it/pom.xml new file mode 100644 index 0000000..af088a9 --- /dev/null +++ b/extras/kafka.connect/accumulo-it/pom.xml @@ -0,0 +1,62 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.rya</groupId> + <artifactId>rya.kafka.connect.parent</artifactId> + <version>4.0.0-incubating-SNAPSHOT</version> + </parent> + + <artifactId>rya.kafka.connect.accumulo.it</artifactId> + + <name>Apache Rya Kafka Connect - Accumulo Integration Tests</name> + <description>Tests the Kafka Connect Sink that writes to a Rya instance backed by Accumulo.</description> + + <dependencies> + <!-- 1st party dependencies. --> + <dependency> + <groupId>org.apache.rya</groupId> + <artifactId>rya.kafka.connect.accumulo</artifactId> + </dependency> + + <!-- 3rd party dependencies. --> + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>connect-api</artifactId> + <scope>provided</scope> + </dependency> + + <!-- Testing dependencies. --> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.rya</groupId> + <artifactId>rya.test.accumulo</artifactId> + <scope>test</scope> + </dependency> + </dependencies> +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/af736749/extras/kafka.connect/accumulo-it/src/test/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkTaskIT.java ---------------------------------------------------------------------- diff --git a/extras/kafka.connect/accumulo-it/src/test/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkTaskIT.java b/extras/kafka.connect/accumulo-it/src/test/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkTaskIT.java new file mode 100644 index 0000000..1775a74 --- /dev/null +++ b/extras/kafka.connect/accumulo-it/src/test/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkTaskIT.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.accumulo; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.rya.api.client.Install.InstallConfiguration; +import org.apache.rya.api.client.RyaClient; +import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails; +import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory; +import org.apache.rya.test.accumulo.AccumuloITBase; +import org.junit.Test; + +/** + * Integration tests for the methods of {@link AccumuloRyaSinkTask}. + */ +public class AccumuloRyaSinkTaskIT extends AccumuloITBase { + + @Test + public void instanceExists() throws Exception { + // Install an instance of Rya. + final String ryaInstanceName = getRyaInstanceName(); + final AccumuloConnectionDetails connectionDetails = new AccumuloConnectionDetails( + getUsername(), + getPassword().toCharArray(), + getInstanceName(), + getZookeepers()); + + final InstallConfiguration installConfig = InstallConfiguration.builder() + .setEnableTableHashPrefix(false) + .setEnableEntityCentricIndex(false) + .setEnableFreeTextIndex(false) + .setEnableTemporalIndex(false) + .setEnablePcjIndex(false) + .setEnableGeoIndex(false) + .build(); + + final RyaClient ryaClient = AccumuloRyaClientFactory.build(connectionDetails, getConnector()); + ryaClient.getInstall().install(ryaInstanceName, installConfig); + + // Create the task that will be tested. + final AccumuloRyaSinkTask task = new AccumuloRyaSinkTask(); + + try { + // Configure the task to use the embedded accumulo instance for Rya. + final Map<String, String> config = new HashMap<>(); + config.put(AccumuloRyaSinkConfig.ZOOKEEPERS, getZookeepers()); + config.put(AccumuloRyaSinkConfig.CLUSTER_NAME, getInstanceName()); + config.put(AccumuloRyaSinkConfig.USERNAME, getUsername()); + config.put(AccumuloRyaSinkConfig.PASSWORD, getPassword()); + config.put(AccumuloRyaSinkConfig.RYA_INSTANCE_NAME, ryaInstanceName); + + // This will pass because the Rya instance exists. + task.start(config); + + } finally { + task.stop(); + } + } + + @Test(expected = ConnectException.class) + public void instanceDoesNotExist() throws Exception { + // Create the task that will be tested. + final AccumuloRyaSinkTask task = new AccumuloRyaSinkTask(); + + try { + // Configure the task to use the embedded accumulo instance for Rya. + final Map<String, String> config = new HashMap<>(); + config.put(AccumuloRyaSinkConfig.ZOOKEEPERS, getZookeepers()); + config.put(AccumuloRyaSinkConfig.CLUSTER_NAME, getInstanceName()); + config.put(AccumuloRyaSinkConfig.USERNAME, getUsername()); + config.put(AccumuloRyaSinkConfig.PASSWORD, getPassword()); + config.put(AccumuloRyaSinkConfig.RYA_INSTANCE_NAME, getRyaInstanceName()); + + // Staring the task will fail because the Rya instance does not exist. + task.start(config); + + } finally { + task.stop(); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/af736749/extras/kafka.connect/accumulo/README.md ---------------------------------------------------------------------- diff --git a/extras/kafka.connect/accumulo/README.md b/extras/kafka.connect/accumulo/README.md new file mode 100644 index 0000000..eecfd21 --- /dev/null +++ b/extras/kafka.connect/accumulo/README.md @@ -0,0 +1,23 @@ +<!-- Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. --> + +This project is the Rya Kafka Connect Sink that writes to Accumulo backed +instances of Rya. + +This project produces a shaded jar that may be installed into Kafka Connect. +For more information about how to install and configure this connector, see +[the manual](../../rya.manual/src/site/markdown/kafka-connect-integration.md). \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/af736749/extras/kafka.connect/accumulo/pom.xml ---------------------------------------------------------------------- diff --git a/extras/kafka.connect/accumulo/pom.xml b/extras/kafka.connect/accumulo/pom.xml new file mode 100644 index 0000000..54188db --- /dev/null +++ b/extras/kafka.connect/accumulo/pom.xml @@ -0,0 +1,79 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.rya</groupId> + <artifactId>rya.kafka.connect.parent</artifactId> + <version>4.0.0-incubating-SNAPSHOT</version> + </parent> + + <artifactId>rya.kafka.connect.accumulo</artifactId> + + <name>Apache Rya Kafka Connect - Accumulo</name> + <description>A Kafka Connect Sink that writes to a Rya instance backed by Accumulo.</description> + + <dependencies> + <!-- 1st party dependencies. --> + <dependency> + <groupId>org.apache.rya</groupId> + <artifactId>rya.kafka.connect.api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.rya</groupId> + <artifactId>rya.indexing</artifactId> + </dependency> + + <!-- 3rd party dependencies. --> + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>connect-api</artifactId> + <scope>provided</scope> + </dependency> + + <!-- Testing dependencies. --> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + </dependencies> + + <build> + <plugins> + <!-- Build the uber jar that may be deployed to Kafka Connect. --> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <executions> + <execution> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/af736749/extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkConfig.java ---------------------------------------------------------------------- diff --git a/extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkConfig.java b/extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkConfig.java new file mode 100644 index 0000000..8db4f1c --- /dev/null +++ b/extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkConfig.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.accumulo; + +import static java.util.Objects.requireNonNull; + +import java.util.Map; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigDef.Importance; +import org.apache.kafka.common.config.ConfigDef.Type; +import org.apache.rya.kafka.connect.api.sink.RyaSinkConfig; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * A Kafka Connect configuration that is used to configure {@link AccumuloRyaSinkConnector}s + * and {@link AccumuloRyaSinkTask}s. + */ +@DefaultAnnotation(NonNull.class) +public class AccumuloRyaSinkConfig extends RyaSinkConfig { + + public static final String ZOOKEEPERS = "accumulo.zookeepers"; + private static final String ZOOKEEPERS_DOC = "A comma delimited list of the Zookeeper server hostname/port pairs."; + + public static final String CLUSTER_NAME = "accumulo.cluster.name"; + private static final String CLUSTER_NAME_DOC = "The name of the Accumulo instance within Zookeeper."; + + public static final String USERNAME = "accumulo.username"; + private static final String USERNAME_DOC = "The Accumulo username the Sail connections will use."; + + public static final String PASSWORD = "accumulo.password"; + private static final String PASSWORD_DOC = "The Accumulo password the Sail connections will use."; + + public static final ConfigDef CONFIG_DEF = new ConfigDef() + .define(ZOOKEEPERS, Type.STRING, Importance.HIGH, ZOOKEEPERS_DOC) + .define(CLUSTER_NAME, Type.STRING, Importance.HIGH, CLUSTER_NAME_DOC) + .define(USERNAME, Type.STRING, Importance.HIGH, USERNAME_DOC) + .define(PASSWORD, Type.PASSWORD, Importance.HIGH, PASSWORD_DOC); + static { + RyaSinkConfig.addCommonDefinitions(CONFIG_DEF); + } + + /** + * Constructs an instance of {@link AccumuloRyaSinkConfig}. + * + * @param originals - The key/value pairs that define the configuration. (not null) + */ + public AccumuloRyaSinkConfig(final Map<?, ?> originals) { + super(CONFIG_DEF, requireNonNull(originals)); + } + + /** + * @return A comma delimited list of the Zookeeper server hostname/port pairs. + */ + public String getZookeepers() { + return super.getString(ZOOKEEPERS); + } + + /** + * @return The name of the Accumulo instance within Zookeeper. + */ + public String getClusterName() { + return super.getString(CLUSTER_NAME); + } + + /** + * @return The Accumulo username the Sail connections will use. + */ + public String getUsername() { + return super.getString(USERNAME); + } + + /** + * @return The Accumulo password the Sail connections will use. + */ + public String getPassword() { + return super.getPassword(PASSWORD).value(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/af736749/extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkConnector.java ---------------------------------------------------------------------- diff --git a/extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkConnector.java b/extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkConnector.java new file mode 100644 index 0000000..eeb3d75 --- /dev/null +++ b/extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkConnector.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.accumulo; + +import static java.util.Objects.requireNonNull; + +import java.util.Map; + +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.connector.Task; +import org.apache.rya.kafka.connect.api.sink.RyaSinkConnector; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; +import edu.umd.cs.findbugs.annotations.Nullable; + +/** + * A {@link RyaSinkConnector} that uses an Accumulo Rya backend when creating tasks. + */ +@DefaultAnnotation(NonNull.class) +public class AccumuloRyaSinkConnector extends RyaSinkConnector { + + @Nullable + private AccumuloRyaSinkConfig config = null; + + @Override + public void start(final Map<String, String> props) { + requireNonNull(props); + this.config = new AccumuloRyaSinkConfig( props ); + } + + @Override + protected AbstractConfig getConfig() { + if(config == null) { + throw new IllegalStateException("The configuration has not been set yet. Invoke start(Map) first."); + } + return config; + } + + @Override + public Class<? extends Task> taskClass() { + return AccumuloRyaSinkTask.class; + } + + @Override + public ConfigDef config() { + return AccumuloRyaSinkConfig.CONFIG_DEF; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/af736749/extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkTask.java ---------------------------------------------------------------------- diff --git a/extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkTask.java b/extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkTask.java new file mode 100644 index 0000000..7d19f29 --- /dev/null +++ b/extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkTask.java @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.accumulo; + +import static java.util.Objects.requireNonNull; + +import java.util.Map; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.ZooKeeperInstance; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.rya.accumulo.AccumuloRdfConfiguration; +import org.apache.rya.api.client.RyaClient; +import org.apache.rya.api.client.RyaClientException; +import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails; +import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory; +import org.apache.rya.api.log.LogUtils; +import org.apache.rya.api.persist.RyaDAOException; +import org.apache.rya.kafka.connect.api.sink.RyaSinkTask; +import org.apache.rya.rdftriplestore.inference.InferenceEngineException; +import org.apache.rya.sail.config.RyaSailFactory; +import org.eclipse.rdf4j.sail.Sail; +import org.eclipse.rdf4j.sail.SailException; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * A {@link RyaSinkTask} that uses the Accumulo implementation of Rya to store data. + */ +@DefaultAnnotation(NonNull.class) +public class AccumuloRyaSinkTask extends RyaSinkTask { + + @Override + protected void checkRyaInstanceExists(final Map<String, String> taskConfig) throws ConnectException { + requireNonNull(taskConfig); + + // Parse the configuration object. + final AccumuloRyaSinkConfig config = new AccumuloRyaSinkConfig(taskConfig); + + // Connect to the instance of Accumulo. + final Connector connector; + try { + final Instance instance = new ZooKeeperInstance(config.getClusterName(), config.getZookeepers()); + connector = instance.getConnector(config.getUsername(), new PasswordToken( config.getPassword() )); + } catch (final AccumuloException | AccumuloSecurityException e) { + throw new ConnectException("Could not create a Connector to the configured Accumulo instance.", e); + } + + // Use a RyaClient to see if the configured instance exists. + try { + final AccumuloConnectionDetails connectionDetails = new AccumuloConnectionDetails( + config.getUsername(), + config.getPassword().toCharArray(), + config.getClusterName(), + config.getZookeepers()); + final RyaClient client = AccumuloRyaClientFactory.build(connectionDetails, connector); + + if(!client.getInstanceExists().exists( config.getRyaInstanceName() )) { + throw new ConnectException("The Rya Instance named " + + LogUtils.clean(config.getRyaInstanceName()) + " has not been installed."); + } + + } catch (final RyaClientException e) { + throw new ConnectException("Unable to determine if the Rya Instance named " + + LogUtils.clean(config.getRyaInstanceName()) + " has been installed.", e); + } + } + + @Override + protected Sail makeSail(final Map<String, String> taskConfig) throws ConnectException { + requireNonNull(taskConfig); + + // Parse the configuration object. + final AccumuloRyaSinkConfig config = new AccumuloRyaSinkConfig(taskConfig); + + // Move the configuration into a Rya Configuration object. + final AccumuloRdfConfiguration ryaConfig = new AccumuloRdfConfiguration(); + ryaConfig.setTablePrefix( config.getRyaInstanceName() ); + ryaConfig.setAccumuloZookeepers( config.getZookeepers() ); + ryaConfig.setAccumuloInstance( config.getClusterName() ); + ryaConfig.setAccumuloUser( config.getUsername() ); + ryaConfig.setAccumuloPassword( config.getPassword() ); + + // Create the Sail object. + try { + return RyaSailFactory.getInstance(ryaConfig); + } catch (SailException | AccumuloException | AccumuloSecurityException | RyaDAOException | InferenceEngineException e) { + throw new ConnectException("Could not connect to the Rya Instance named " + config.getRyaInstanceName(), e); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/af736749/extras/kafka.connect/accumulo/src/test/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkConfigTest.java ---------------------------------------------------------------------- diff --git a/extras/kafka.connect/accumulo/src/test/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkConfigTest.java b/extras/kafka.connect/accumulo/src/test/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkConfigTest.java new file mode 100644 index 0000000..66ecd87 --- /dev/null +++ b/extras/kafka.connect/accumulo/src/test/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkConfigTest.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.accumulo; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.rya.kafka.connect.api.sink.RyaSinkConfig; +import org.junit.Test; + +/** + * Unit tests the methods of {@link AccumuloRyaSinkConfig}. + */ +public class AccumuloRyaSinkConfigTest { + + @Test + public void parses() { + final Map<String, String> properties = new HashMap<>(); + properties.put(AccumuloRyaSinkConfig.ZOOKEEPERS, "zoo1:2181,zoo2"); + properties.put(AccumuloRyaSinkConfig.CLUSTER_NAME, "test"); + properties.put(AccumuloRyaSinkConfig.USERNAME, "alice"); + properties.put(AccumuloRyaSinkConfig.PASSWORD, "alice1234!@"); + properties.put(RyaSinkConfig.RYA_INSTANCE_NAME, "rya_"); + new AccumuloRyaSinkConfig(properties); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/af736749/extras/kafka.connect/api/README.md ---------------------------------------------------------------------- diff --git a/extras/kafka.connect/api/README.md b/extras/kafka.connect/api/README.md new file mode 100644 index 0000000..777fd2a --- /dev/null +++ b/extras/kafka.connect/api/README.md @@ -0,0 +1,20 @@ +<!-- Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. --> + +This project contains the common components of a Rya Kafka Connect Sink. Each +backend database that Rya is built on top of must have an implementation using +this project's components. \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/af736749/extras/kafka.connect/api/pom.xml ---------------------------------------------------------------------- diff --git a/extras/kafka.connect/api/pom.xml b/extras/kafka.connect/api/pom.xml new file mode 100644 index 0000000..3727394 --- /dev/null +++ b/extras/kafka.connect/api/pom.xml @@ -0,0 +1,96 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.rya</groupId> + <artifactId>rya.kafka.connect.parent</artifactId> + <version>4.0.0-incubating-SNAPSHOT</version> + </parent> + + <artifactId>rya.kafka.connect.api</artifactId> + + <name>Apache Rya Kafka Connect - API</name> + <description>Contains common components used when implementing a Kafka Connect Sink + that writes to a Rya instance.</description> + + <dependencies> + <!-- 1st party dependencies. --> + <dependency> + <groupId>org.apache.rya</groupId> + <artifactId>rya.api.model</artifactId> + </dependency> + + <!-- 3rd party dependencies. --> + <dependency> + <groupId>org.eclipse.rdf4j</groupId> + <artifactId>rdf4j-rio-api</artifactId> + </dependency> + <dependency> + <groupId>org.eclipse.rdf4j</groupId> + <artifactId>rdf4j-rio-binary</artifactId> + </dependency> + <dependency> + <groupId>org.eclipse.rdf4j</groupId> + <artifactId>rdf4j-rio-datatypes</artifactId> + </dependency> + <dependency> + <groupId>com.github.stephenc.findbugs</groupId> + <artifactId>findbugs-annotations</artifactId> + </dependency> + <dependency> + <groupId>com.jcabi</groupId> + <artifactId>jcabi-manifests</artifactId> + </dependency> + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>connect-api</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.eclipse.rdf4j</groupId> + <artifactId>rdf4j-runtime</artifactId> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </dependency> + + <!-- Testing dependencies. --> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-all</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-simple</artifactId> + <scope>test</scope> + </dependency> + </dependencies> +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/af736749/extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/StatementsConverter.java ---------------------------------------------------------------------- diff --git a/extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/StatementsConverter.java b/extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/StatementsConverter.java new file mode 100644 index 0000000..eb4b611 --- /dev/null +++ b/extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/StatementsConverter.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.api; + +import static java.util.Objects.requireNonNull; + +import java.util.Map; +import java.util.Set; + +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaAndValue; +import org.apache.kafka.connect.storage.Converter; +import org.eclipse.rdf4j.model.Statement; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * A plugin into the Kafka Connect platform that converts {@link Set}s of {@link Statement}s + * to/from byte[]s by using a {@link StatementsSerializer} and a {@link StatementsDeserializer}. + * <p/> + * This converter does not use Kafka's Schema Registry. + */ +@DefaultAnnotation(NonNull.class) +public class StatementsConverter implements Converter { + + private static final StatementsSerializer SERIALIZER = new StatementsSerializer(); + private static final StatementsDeserializer DESERIALIZER = new StatementsDeserializer(); + + @Override + public void configure(final Map<String, ?> configs, final boolean isKey) { + // This converter's behavior can not be tuned with configurations. + } + + @Override + public byte[] fromConnectData(final String topic, final Schema schema, final Object value) { + requireNonNull(value); + return SERIALIZER.serialize(topic, (Set<Statement>) value); + } + + @Override + public SchemaAndValue toConnectData(final String topic, final byte[] value) { + requireNonNull(value); + return new SchemaAndValue(null, DESERIALIZER.deserialize(topic, value)); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/af736749/extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/StatementsDeserializer.java ---------------------------------------------------------------------- diff --git a/extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/StatementsDeserializer.java b/extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/StatementsDeserializer.java new file mode 100644 index 0000000..fb03347 --- /dev/null +++ b/extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/StatementsDeserializer.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.api; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.apache.kafka.common.serialization.Deserializer; +import org.eclipse.rdf4j.model.Statement; +import org.eclipse.rdf4j.rio.RDFHandlerException; +import org.eclipse.rdf4j.rio.RDFParseException; +import org.eclipse.rdf4j.rio.RDFParser; +import org.eclipse.rdf4j.rio.binary.BinaryRDFParserFactory; +import org.eclipse.rdf4j.rio.helpers.AbstractRDFHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * A Kafka {@link Deserializer} that is able to deserialize an RDF4J Rio Binary format serialized + * set of {@link Statement}s. + */ +@DefaultAnnotation(NonNull.class) +public class StatementsDeserializer implements Deserializer<Set<Statement>> { + private static final Logger log = LoggerFactory.getLogger(StatementsDeserializer.class); + + private static final BinaryRDFParserFactory PARSER_FACTORY = new BinaryRDFParserFactory(); + + @Override + public void configure(final Map<String, ?> configs, final boolean isKey) { + // Nothing to do. + } + + @Override + public Set<Statement> deserialize(final String topic, final byte[] data) { + if(data == null || data.length == 0) { + // Return null because that is the contract of this method. + return null; + } + + try { + final RDFParser parser = PARSER_FACTORY.getParser(); + final Set<Statement> statements = new HashSet<>(); + + parser.setRDFHandler(new AbstractRDFHandler() { + @Override + public void handleStatement(final Statement statement) throws RDFHandlerException { + log.debug("Statement: " + statement); + statements.add( statement ); + } + }); + + parser.parse(new ByteArrayInputStream(data), null); + return statements; + + } catch(final RDFParseException | RDFHandlerException | IOException e) { + log.error("Could not deserialize a Set of VisibilityStatement objects using the RDF4J Rio Binary format.", e); + return null; + } + } + + @Override + public void close() { + // Nothing to do. + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/af736749/extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/StatementsSerde.java ---------------------------------------------------------------------- diff --git a/extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/StatementsSerde.java b/extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/StatementsSerde.java new file mode 100644 index 0000000..f2101d6 --- /dev/null +++ b/extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/StatementsSerde.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.api; + +import java.util.Map; +import java.util.Set; + +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serializer; +import org.eclipse.rdf4j.model.Statement; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * Provides a {@link Serializer} and {@link Deserializer} for {@link Statement}s. + */ +@DefaultAnnotation(NonNull.class) +public class StatementsSerde implements Serde<Set<Statement>> { + + @Override + public void configure(final Map<String, ?> configs, final boolean isKey) { + // Nothing to do. + } + + @Override + public Serializer<Set<Statement>> serializer() { + return new StatementsSerializer(); + } + + @Override + public Deserializer<Set<Statement>> deserializer() { + return new StatementsDeserializer(); + } + + @Override + public void close() { + // Nothing to do. + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/af736749/extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/StatementsSerializer.java ---------------------------------------------------------------------- diff --git a/extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/StatementsSerializer.java b/extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/StatementsSerializer.java new file mode 100644 index 0000000..893df0c --- /dev/null +++ b/extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/StatementsSerializer.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.api; + +import java.io.ByteArrayOutputStream; +import java.util.Map; +import java.util.Set; + +import org.apache.kafka.common.serialization.Serializer; +import org.eclipse.rdf4j.model.Statement; +import org.eclipse.rdf4j.rio.RDFWriter; +import org.eclipse.rdf4j.rio.binary.BinaryRDFWriterFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * A Kafka {@link Serializer} that is able to serialize a set of {@link Statement}s + * using the RDF4J Rio Binary format. + */ +@DefaultAnnotation(NonNull.class) +public class StatementsSerializer implements Serializer<Set<Statement>> { + private static final Logger log = LoggerFactory.getLogger(StatementsSerializer.class); + + private static final BinaryRDFWriterFactory WRITER_FACTORY = new BinaryRDFWriterFactory(); + + @Override + public void configure(final Map<String, ?> configs, final boolean isKey) { + // Nothing to do. + } + + @Override + public byte[] serialize(final String topic, final Set<Statement> data) { + if(data == null) { + // Returning null because that is the contract of this method. + return null; + } + + // Write the statements using a Binary RDF Writer. + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + final RDFWriter writer = WRITER_FACTORY.getWriter(baos); + writer.startRDF(); + + for(final Statement stmt : data) { + // Write the statement. + log.debug("Writing Statement: " + stmt); + writer.handleStatement(stmt); + } + writer.endRDF(); + + // Return the byte[] version of the data. + return baos.toByteArray(); + } + + @Override + public void close() { + // Nothing to do. + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/af736749/extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/sink/RyaSinkConfig.java ---------------------------------------------------------------------- diff --git a/extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/sink/RyaSinkConfig.java b/extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/sink/RyaSinkConfig.java new file mode 100644 index 0000000..5c3e2cc --- /dev/null +++ b/extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/sink/RyaSinkConfig.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.api.sink; + +import static java.util.Objects.requireNonNull; + +import java.util.Map; + +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigDef.Importance; +import org.apache.kafka.common.config.ConfigDef.Type; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * Contains common configuration fields for a Rya Sinks. + */ +@DefaultAnnotation(NonNull.class) +public class RyaSinkConfig extends AbstractConfig { + + public static final String RYA_INSTANCE_NAME = "rya.instance.name"; + private static final String RYA_INSTANCE_NAME_DOC = "The name of the RYA instance that will be connected to."; + + /** + * @param configDef - The configuration schema definition that will be updated to include + * this configuration's fields. (not null) + */ + public static void addCommonDefinitions(final ConfigDef configDef) { + requireNonNull(configDef); + configDef.define(RYA_INSTANCE_NAME, Type.STRING, Importance.HIGH, RYA_INSTANCE_NAME_DOC); + } + + /** + * Constructs an instance of {@link RyaSinkConfig}. + * + * @param definition - Defines the schema of the configuration. (not null) + * @param originals - The key/value pairs that define the configuration. (not null) + */ + public RyaSinkConfig(final ConfigDef definition, final Map<?, ?> originals) { + super(definition, originals); + } + + /** + * @return The name of the RYA instance that will be connected to. + */ + public String getRyaInstanceName() { + return super.getString(RYA_INSTANCE_NAME); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/af736749/extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/sink/RyaSinkConnector.java ---------------------------------------------------------------------- diff --git a/extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/sink/RyaSinkConnector.java b/extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/sink/RyaSinkConnector.java new file mode 100644 index 0000000..f288af2 --- /dev/null +++ b/extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/sink/RyaSinkConnector.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.api.sink; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.connect.sink.SinkConnector; + +import com.jcabi.manifests.Manifests; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * Handles the common components required to task {@link RyaSinkTask}s that write to Rya. + * </p> + * Implementations of this class only need to specify functionality that is specific to the Rya implementation. + */ +@DefaultAnnotation(NonNull.class) +public abstract class RyaSinkConnector extends SinkConnector { + + /** + * Get the configuration that will be provided to the tasks when {@link #taskConfigs(int)} is invoked. + * </p> + * Only called after start has been invoked + * + * @return The configuration object for the connector. + * @throws IllegalStateException Thrown if {@link SinkConnector#start(Map)} has not been invoked yet. + */ + protected abstract AbstractConfig getConfig() throws IllegalStateException; + + @Override + public String version() { + return Manifests.exists("Build-Version") ? Manifests.read("Build-Version") : "UNKNOWN"; + } + + @Override + public List<Map<String, String>> taskConfigs(final int maxTasks) { + final List<Map<String, String>> configs = new ArrayList<>(maxTasks); + for(int i = 0; i < maxTasks; i++) { + configs.add( getConfig().originalsStrings() ); + } + return configs; + } + + @Override + public void stop() { + // Nothing to do since the RyaSinkConnector has no background monitoring. + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/af736749/extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/sink/RyaSinkTask.java ---------------------------------------------------------------------- diff --git a/extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/sink/RyaSinkTask.java b/extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/sink/RyaSinkTask.java new file mode 100644 index 0000000..5ff118a --- /dev/null +++ b/extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/sink/RyaSinkTask.java @@ -0,0 +1,145 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.api.sink; + +import static java.util.Objects.requireNonNull; + +import java.util.Collection; +import java.util.Map; +import java.util.Set; + +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.sink.SinkRecord; +import org.apache.kafka.connect.sink.SinkTask; +import org.eclipse.rdf4j.model.Statement; +import org.eclipse.rdf4j.repository.sail.SailRepository; +import org.eclipse.rdf4j.repository.sail.SailRepositoryConnection; +import org.eclipse.rdf4j.sail.Sail; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.jcabi.manifests.Manifests; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; +import edu.umd.cs.findbugs.annotations.Nullable; + +/** + * Handles the common components required to write {@link Statement}s to Rya. + * <p/> + * Implementations of this class only need to specify functionality that is specific to the + * Rya implementation. + */ +@DefaultAnnotation(NonNull.class) +public abstract class RyaSinkTask extends SinkTask { + private static final Logger log = LoggerFactory.getLogger(RyaSinkTask.class); + + @Nullable + private SailRepository sailRepo = null; + + @Nullable + private SailRepositoryConnection conn = null; + + /** + * Throws an exception if the configured Rya Instance is not already installed + * within the configured database. + * + * @param taskConfig - The configuration values that were provided to the task. (not null) + * @throws ConnectException The configured Rya Instance is not installed to the configured database + * or we were unable to figure out if it is installed. + */ + protected abstract void checkRyaInstanceExists(final Map<String, String> taskConfig) throws ConnectException; + + /** + * Creates an initialized {@link Sail} object that may be used to write {@link Statement}s to the configured + * Rya Instance. + * + * @param taskConfig - Configures how the Sail object will be created. (not null) + * @return The created Sail object. + * @throws ConnectException The Sail object could not be made. + */ + protected abstract Sail makeSail(final Map<String, String> taskConfig) throws ConnectException; + + @Override + public String version() { + return Manifests.exists("Build-Version") ? Manifests.read("Build-Version"): "UNKNOWN"; + } + + @Override + public void start(final Map<String, String> props) throws ConnectException { + requireNonNull(props); + + // Ensure the configured Rya Instance is installed within the configured database. + checkRyaInstanceExists(props); + + // Create the Sail object that is connected to the Rya Instance. + final Sail sail = makeSail(props); + sailRepo = new SailRepository( sail ); + conn = sailRepo.getConnection(); + } + + @Override + public void put(final Collection<SinkRecord> records) { + requireNonNull(records); + + // Return immediately if there are no records to handle. + if(records.isEmpty()) { + return; + } + + // If a transaction has not been started yet, then start one. + if(!conn.isActive()) { + conn.begin(); + } + + // Iterate through the records and write them to the Sail object. + for(final SinkRecord record : records) { + // If everything has been configured correctly, then the record's value will be a Set<Statement>. + conn.add((Set<? extends Statement>) record.value()); + } + } + + @Override + public void flush(final Map<TopicPartition, OffsetAndMetadata> currentOffsets) { + requireNonNull(currentOffsets); + // Flush the current transaction. + conn.commit(); + } + + @Override + public void stop() { + try { + if(conn != null) { + conn.close(); + } + } catch(final Exception e) { + log.error("Could not close the Sail Repository Connection.", e); + } + + try { + if(sailRepo != null) { + sailRepo.shutDown(); + } + } catch(final Exception e) { + log.error("Could not shut down the Sail Repository.", e); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/af736749/extras/kafka.connect/api/src/test/java/org/apache/rya/kafka/connect/api/StatementsSerdeTest.java ---------------------------------------------------------------------- diff --git a/extras/kafka.connect/api/src/test/java/org/apache/rya/kafka/connect/api/StatementsSerdeTest.java b/extras/kafka.connect/api/src/test/java/org/apache/rya/kafka/connect/api/StatementsSerdeTest.java new file mode 100644 index 0000000..01e5b76 --- /dev/null +++ b/extras/kafka.connect/api/src/test/java/org/apache/rya/kafka/connect/api/StatementsSerdeTest.java @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.api; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +import java.util.Set; + +import org.apache.kafka.common.serialization.Serde; +import org.eclipse.rdf4j.model.Statement; +import org.eclipse.rdf4j.model.ValueFactory; +import org.eclipse.rdf4j.model.impl.SimpleValueFactory; +import org.junit.Test; + +import com.google.common.collect.Sets; + +/** + * Unit tests the methods of {@link StatementsSerde}. + */ +public class StatementsSerdeTest { + + @Test + public void serializeAndDeserialize() { + // Create the object that will be serialized. + final ValueFactory vf = SimpleValueFactory.getInstance(); + + final Set<Statement> original = Sets.newHashSet( + vf.createStatement( + vf.createIRI("urn:alice"), + vf.createIRI("urn:talksTo"), + vf.createIRI("urn:bob"), + vf.createIRI("urn:testGraph")), + vf.createStatement( + vf.createIRI("urn:bob"), + vf.createIRI("urn:talksTo"), + vf.createIRI("urn:charlie"), + vf.createIRI("urn:graph2")), + vf.createStatement( + vf.createIRI("urn:charlie"), + vf.createIRI("urn:talksTo"), + vf.createIRI("urn:bob"), + vf.createIRI("urn:graph2")), + vf.createStatement( + vf.createIRI("urn:alice"), + vf.createIRI("urn:listensTo"), + vf.createIRI("urn:charlie"), + vf.createIRI("urn:testGraph"))); + + // Serialize it. + try(final Serde<Set<Statement>> serde = new StatementsSerde()) { + final byte[] bytes = serde.serializer().serialize("topic", original); + + // Deserialize it. + final Set<Statement> deserialized = serde.deserializer().deserialize("topic", bytes); + + // Show the deserialized value matches the original. + assertEquals(original, deserialized); + } + } + + @Test + public void deserializeEmptyData() { + try(final Serde<Set<Statement>> serde = new StatementsSerde()) { + assertNull( serde.deserializer().deserialize("topic", new byte[0]) ); + } + } +} \ No newline at end of file
