RYA-273-Construct Query Support. Closes #161.
Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/60090ad5 Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/60090ad5 Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/60090ad5 Branch: refs/heads/master Commit: 60090ad52de294d55e2bcea2a0629ee19bfb3827 Parents: 646d21b Author: Caleb Meier <[email protected]> Authored: Fri Apr 14 19:20:25 2017 -0700 Committer: Caleb Meier <[email protected]> Committed: Thu Jun 22 11:03:02 2017 -0700 ---------------------------------------------------------------------- common/rya.api/pom.xml | 6 +- .../org/apache/rya/api/domain/RyaSubGraph.java | 118 +++++++ .../kryo/RyaStatementSerializer.java | 162 +++++++++ .../kryo/RyaSubGraphSerializer.java | 84 +++++ ...AbstractAccumuloRdfConfigurationBuilder.java | 26 +- .../apache/rya/sail/config/RyaSailFactory.java | 40 +++ .../rya/indexing/pcj/fluo/api/CreatePcj.java | 71 +++- .../rya/indexing/pcj/fluo/api/DeletePcj.java | 11 +- .../indexing/pcj/fluo/app/ConstructGraph.java | 141 ++++++++ .../pcj/fluo/app/ConstructGraphSerializer.java | 52 +++ .../pcj/fluo/app/ConstructProjection.java | 266 ++++++++++++++ .../fluo/app/ConstructQueryResultUpdater.java | 91 +++++ .../pcj/fluo/app/FluoStringConverter.java | 51 ++- .../fluo/app/IncrementalUpdateConstants.java | 1 + .../rya/indexing/pcj/fluo/app/NodeType.java | 23 +- .../export/IncrementalBindingSetExporter.java | 69 ++++ .../IncrementalBindingSetExporterFactory.java | 104 ++++++ .../app/export/IncrementalResultExporter.java | 69 ---- .../IncrementalResultExporterFactory.java | 104 ------ .../export/IncrementalRyaSubGraphExporter.java | 39 ++ .../IncrementalRyaSubGraphExporterFactory.java | 47 +++ .../export/kafka/KafkaBindingSetExporter.java | 87 +++++ .../kafka/KafkaBindingSetExporterFactory.java | 64 ++++ .../app/export/kafka/KafkaResultExporter.java | 87 ----- .../kafka/KafkaResultExporterFactory.java | 64 ---- .../export/kafka/KafkaRyaSubGraphExporter.java | 81 +++++ .../kafka/KafkaRyaSubGraphExporterFactory.java | 62 ++++ .../app/export/kafka/RyaSubGraphKafkaSerDe.java | 100 ++++++ .../app/export/rya/RyaBindingSetExporter.java | 72 ++++ .../rya/RyaBindingSetExporterFactory.java | 77 ++++ .../app/export/rya/RyaExportParameters.java | 15 + .../fluo/app/export/rya/RyaResultExporter.java | 72 ---- .../export/rya/RyaResultExporterFactory.java | 77 ---- .../fluo/app/observers/BindingSetUpdater.java | 12 + .../observers/ConstructQueryResultObserver.java | 198 +++++++++++ .../fluo/app/observers/QueryResultObserver.java | 36 +- .../fluo/app/query/ConstructQueryMetadata.java | 192 ++++++++++ .../indexing/pcj/fluo/app/query/FluoQuery.java | 106 +++++- .../pcj/fluo/app/query/FluoQueryColumns.java | 33 ++ .../fluo/app/query/FluoQueryMetadataDAO.java | 181 +++++++--- .../pcj/fluo/app/query/QueryMetadata.java | 3 +- .../fluo/app/query/SparqlFluoQueryBuilder.java | 210 ++++++++--- .../pcj/fluo/app/ConstructGraphTest.java | 145 ++++++++ .../pcj/fluo/app/ConstructGraphTestUtils.java | 126 +++++++ .../pcj/fluo/app/ConstructProjectionTest.java | 112 ++++++ .../pcj/fluo/app/FluoStringConverterTest.java | 7 - .../pcj/fluo/app/RyaSubGraphKafkaSerDeTest.java | 57 +++ .../export/rya/KafkaExportParametersTest.java | 4 +- .../fluo/client/util/QueryReportRenderer.java | 28 +- .../rya.pcj.fluo/pcj.fluo.integration/pom.xml | 180 +++++----- .../pcj/fluo/ConstructGraphTestUtils.java | 126 +++++++ .../indexing/pcj/fluo/KafkaExportITBase.java | 143 +++++--- .../rya/indexing/pcj/fluo/RyaExportITBase.java | 9 + .../indexing/pcj/fluo/api/GetQueryReportIT.java | 2 +- .../fluo/app/query/FluoQueryMetadataDAOIT.java | 94 ++++- .../pcj/fluo/integration/CreateDeleteIT.java | 1 + .../pcj/fluo/integration/KafkaExportIT.java | 36 +- .../integration/KafkaRyaSubGraphExportIT.java | 352 +++++++++++++++++++ .../pcj/functions/geo/FunctionAdapter.java | 2 - 59 files changed, 3986 insertions(+), 842 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/common/rya.api/pom.xml ---------------------------------------------------------------------- diff --git a/common/rya.api/pom.xml b/common/rya.api/pom.xml index f73c006..94f191d 100644 --- a/common/rya.api/pom.xml +++ b/common/rya.api/pom.xml @@ -70,7 +70,11 @@ under the License. <groupId>com.github.stephenc.jcip</groupId> <artifactId>jcip-annotations</artifactId> </dependency> - + <dependency> + <groupId>com.esotericsoftware.kryo</groupId> + <artifactId>kryo</artifactId> + <version>2.24.0</version> + </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/common/rya.api/src/main/java/org/apache/rya/api/domain/RyaSubGraph.java ---------------------------------------------------------------------- diff --git a/common/rya.api/src/main/java/org/apache/rya/api/domain/RyaSubGraph.java b/common/rya.api/src/main/java/org/apache/rya/api/domain/RyaSubGraph.java new file mode 100644 index 0000000..f08eba4 --- /dev/null +++ b/common/rya.api/src/main/java/org/apache/rya/api/domain/RyaSubGraph.java @@ -0,0 +1,118 @@ +package org.apache.rya.api.domain; +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import java.util.HashSet; +import java.util.Set; + +import com.google.common.base.Objects; + +/** + * This class packages together a collection of {@link RyaStatement}s to form a subgraph + */ +public class RyaSubGraph { + + private String id; + private Set<RyaStatement> statements; + + /** + * Creates empty subgraph with given id + * @param id - id of the created subgraph + */ + public RyaSubGraph(String id) { + this.id = id; + this.statements = new HashSet<>(); + } + + /** + * Creates sugraph with specified id and statements + * @param id - id of the created subgraph + * @param statements - statements that make up subgraph + */ + public RyaSubGraph(String id, Set<RyaStatement> statements) { + this.id = id; + this.statements = statements; + } + + /** + * @return id of this subgraph + */ + public String getId() { + return id; + } + + /** + * @return RyaStatements representing this subgraph + */ + public Set<RyaStatement> getStatements() { + return statements; + } + + /** + * Sets id of subgraph + * @param id - id of subgraph + */ + public void setId(String id) { + this.id = id; + } + + /** + * Sets subgraph statements to specified statements + * @param statements - statements that will be set to subgraph statements + */ + public void setStatements(Set<RyaStatement> statements) { + this.statements = statements; + } + + + /** + * Adds statement to this subgraph + * @param statement - RyaStatement to be added to subgraph + */ + public void addStatement(RyaStatement statement){ + statements.add(statement); + } + + @Override + public boolean equals(Object other) { + + if(this == other) { + return true; + } + + if(other instanceof RyaSubGraph) { + RyaSubGraph bundle = (RyaSubGraph) other; + return Objects.equal(this.id, ((RyaSubGraph) other).id) && Objects.equal(this.statements,bundle.statements); + } + + return false; + } + + @Override + public int hashCode() { + return Objects.hashCode(this.id, this.statements); + } + + + @Override + public String toString() { + return new StringBuilder().append("Rya Subgraph {\n").append(" Rya Subgraph ID: " + id + "\n") + .append(" Rya Statements: " + statements + "\n").toString(); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/common/rya.api/src/main/java/org/apache/rya/api/domain/serialization/kryo/RyaStatementSerializer.java ---------------------------------------------------------------------- diff --git a/common/rya.api/src/main/java/org/apache/rya/api/domain/serialization/kryo/RyaStatementSerializer.java b/common/rya.api/src/main/java/org/apache/rya/api/domain/serialization/kryo/RyaStatementSerializer.java new file mode 100644 index 0000000..6c0efd2 --- /dev/null +++ b/common/rya.api/src/main/java/org/apache/rya/api/domain/serialization/kryo/RyaStatementSerializer.java @@ -0,0 +1,162 @@ +package org.apache.rya.api.domain.serialization.kryo; +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import org.openrdf.model.impl.URIImpl; +import org.openrdf.model.vocabulary.XMLSchema; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.Serializer; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; +import com.google.common.base.Objects; +import com.google.common.base.Preconditions; + +import org.apache.rya.api.domain.RyaStatement; +import org.apache.rya.api.domain.RyaType; +import org.apache.rya.api.domain.RyaURI; + +/** + * Kryo Serializer for {@link RyaStatement}s + * + */ +public class RyaStatementSerializer extends Serializer<RyaStatement> { + + /** + * Uses Kryo to write RyaStatement to {@lin Output} + * @param kryo - writes statement to output + * @param output - output stream that statement is written to + * @param object - statement written to output + */ + public static void writeToKryo(Kryo kryo, Output output, RyaStatement object) { + Preconditions.checkNotNull(kryo); + Preconditions.checkNotNull(output); + Preconditions.checkNotNull(object); + output.writeString(object.getSubject().getData()); + output.writeString(object.getPredicate().getData()); + output.writeString(object.getObject().getDataType().toString()); + output.writeString(object.getObject().getData()); + boolean hasContext = object.getContext() != null; + output.writeBoolean(hasContext); + if(hasContext){ + output.writeString(object.getContext().getData()); + } + boolean shouldWrite = object.getColumnVisibility() != null; + output.writeBoolean(shouldWrite); + if(shouldWrite){ + output.writeInt(object.getColumnVisibility().length); + output.writeBytes(object.getColumnVisibility()); + } + shouldWrite = object.getQualifer() != null; + output.writeBoolean(shouldWrite); + if(shouldWrite){ + output.writeString(object.getQualifer()); + } + shouldWrite = object.getTimestamp() != null; + output.writeBoolean(shouldWrite); + if(shouldWrite){ + output.writeLong(object.getTimestamp()); + } + shouldWrite = object.getValue() != null; + output.writeBoolean(shouldWrite); + if(shouldWrite){ + output.writeBytes(object.getValue()); + } + } + + /** + * Uses Kryo to write RyaStatement to {@lin Output} + * @param kryo - writes statement to output + * @param output - output stream that statement is written to + * @param object - statement written to output + */ + @Override + public void write(Kryo kryo, Output output, RyaStatement object) { + writeToKryo(kryo, output, object); + } + + /** + * Uses Kryo to read a RyaStatement from {@link Input} + * @param kryo - reads statement from input + * @param input - Input stream that statement is read from + * @param type - Type read from input stream + * @return - statement read from input stream + */ + public static RyaStatement readFromKryo(Kryo kryo, Input input, Class<RyaStatement> type){ + return read(input); + } + + /** + * Reads RyaStatement from {@link Input} stream + * @param input - input stream that statement is read from + * @return - statement read from input stream + */ + public static RyaStatement read(Input input){ + Preconditions.checkNotNull(input); + String subject = input.readString(); + String predicate = input.readString(); + String objectType = input.readString(); + String objectValue = input.readString(); + RyaType value; + if (objectType.equals(XMLSchema.ANYURI.toString())){ + value = new RyaURI(objectValue); + } + else { + value = new RyaType(new URIImpl(objectType), objectValue); + } + RyaStatement statement = new RyaStatement(new RyaURI(subject), new RyaURI(predicate), value); + int length = 0; + boolean hasNextValue = input.readBoolean(); + if (hasNextValue){ + statement.setContext(new RyaURI(input.readString())); + } + hasNextValue = input.readBoolean(); + if (hasNextValue){ + length = input.readInt(); + statement.setColumnVisibility(input.readBytes(length)); + } + hasNextValue = input.readBoolean(); + if (hasNextValue){ + statement.setQualifer(input.readString()); + } + hasNextValue = input.readBoolean(); + if (hasNextValue){ + statement.setTimestamp(input.readLong()); + } + hasNextValue = input.readBoolean(); + if (hasNextValue){ + length = input.readInt(); + statement.setValue(input.readBytes(length)); + } + + return statement; + } + + /** + * Uses Kryo to read a RyaStatement from {@link Input} + * @param kryo - reads statement from input + * @param input - Input stream that statement is read from + * @param type - Type read from input stream + * @return - statement read from input stream + */ + @Override + public RyaStatement read(Kryo kryo, Input input, Class<RyaStatement> type) { + return readFromKryo(kryo, input, type); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/common/rya.api/src/main/java/org/apache/rya/api/domain/serialization/kryo/RyaSubGraphSerializer.java ---------------------------------------------------------------------- diff --git a/common/rya.api/src/main/java/org/apache/rya/api/domain/serialization/kryo/RyaSubGraphSerializer.java b/common/rya.api/src/main/java/org/apache/rya/api/domain/serialization/kryo/RyaSubGraphSerializer.java new file mode 100644 index 0000000..dbb6c3b --- /dev/null +++ b/common/rya.api/src/main/java/org/apache/rya/api/domain/serialization/kryo/RyaSubGraphSerializer.java @@ -0,0 +1,84 @@ +package org.apache.rya.api.domain.serialization.kryo; +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import org.apache.rya.api.domain.RyaSubGraph; +import org.apache.rya.api.domain.RyaStatement; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.Serializer; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; + +/** + * Kryo based Serializer/Deserializer for {@link RyaSubGraph}. + * + */ +public class RyaSubGraphSerializer extends Serializer<RyaSubGraph>{ + static final Logger log = LoggerFactory.getLogger(RyaSubGraphSerializer.class); + + /** + * Use Kryo to write RyaSubGraph to {@link Output} stream + * @param kryo - used to write subgraph to output stream + * @param output - stream that subgraph is written to + * @param object - subgraph written to output stream + */ + @Override + public void write(Kryo kryo, Output output, RyaSubGraph object) { + output.writeString(object.getId()); + output.writeInt(object.getStatements().size()); + for (RyaStatement statement : object.getStatements()){ + RyaStatementSerializer.writeToKryo(kryo, output, statement); + } + } + + /** + * Reads RyaSubGraph from {@link Input} stream + * @param input - stream that subgraph is read from + * @return subgraph read from input stream + */ + public static RyaSubGraph read(Input input){ + RyaSubGraph bundle = new RyaSubGraph(input.readString()); + int numStatements = input.readInt(); + for (int i=0; i < numStatements; i++){ + bundle.addStatement(RyaStatementSerializer.read(input)); + } + return bundle; + } + + /** + * Uses Kryo to read RyaSubGraph from {@link Input} stream + * @param kryo - used to read subgraph from input stream + * @param input - stream that subgraph is read from + * @param type - class of object to be read from input stream (RyaSubgraph) + * @return subgraph read from input stream + */ + @Override + public RyaSubGraph read(Kryo kryo, Input input, Class<RyaSubGraph> type) { + RyaSubGraph bundle = new RyaSubGraph(input.readString()); + int numStatements = input.readInt(); + + for (int i=0; i < numStatements; i++){ + bundle.addStatement(RyaStatementSerializer.readFromKryo(kryo, input, RyaStatement.class)); + } + return bundle; + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AbstractAccumuloRdfConfigurationBuilder.java ---------------------------------------------------------------------- diff --git a/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AbstractAccumuloRdfConfigurationBuilder.java b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AbstractAccumuloRdfConfigurationBuilder.java index e342db2..d1422f6 100644 --- a/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AbstractAccumuloRdfConfigurationBuilder.java +++ b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AbstractAccumuloRdfConfigurationBuilder.java @@ -44,19 +44,19 @@ public abstract class AbstractAccumuloRdfConfigurationBuilder<B extends Abstract private boolean useComposite = false; private boolean useSelectivity = false; - protected static final String ACCUMULO_USER = "accumulo.user"; - protected static final String ACCUMULO_PASSWORD = "accumulo.password"; - protected static final String ACCUMULO_INSTANCE = "accumulo.instance"; - protected static final String ACCUMULO_AUTHS = "accumulo.auths"; - protected static final String ACCUMULO_VISIBILITIES = "accumulo.visibilities"; - protected static final String ACCUMULO_ZOOKEEPERS = "accumulo.zookeepers"; - protected static final String ACCUMULO_RYA_PREFIX = "accumulo.rya.prefix"; - protected static final String USE_INFERENCE = "use.inference"; - protected static final String USE_DISPLAY_QUERY_PLAN = "use.display.plan"; - protected static final String USE_MOCK_ACCUMULO = "use.mock"; - protected static final String USE_PREFIX_HASHING = "use.prefix.hashing"; - protected static final String USE_COUNT_STATS = "use.count.stats"; - protected static final String USE_JOIN_SELECTIVITY = "use.join.selectivity"; + public static final String ACCUMULO_USER = "accumulo.user"; + public static final String ACCUMULO_PASSWORD = "accumulo.password"; + public static final String ACCUMULO_INSTANCE = "accumulo.instance"; + public static final String ACCUMULO_AUTHS = "accumulo.auths"; + public static final String ACCUMULO_VISIBILITIES = "accumulo.visibilities"; + public static final String ACCUMULO_ZOOKEEPERS = "accumulo.zookeepers"; + public static final String ACCUMULO_RYA_PREFIX = "accumulo.rya.prefix"; + public static final String USE_INFERENCE = "use.inference"; + public static final String USE_DISPLAY_QUERY_PLAN = "use.display.plan"; + public static final String USE_MOCK_ACCUMULO = "use.mock"; + public static final String USE_PREFIX_HASHING = "use.prefix.hashing"; + public static final String USE_COUNT_STATS = "use.count.stats"; + public static final String USE_JOIN_SELECTIVITY = "use.join.selectivity"; /** * Sets Accumulo user. This is a required parameter to connect to an http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/indexing/src/main/java/org/apache/rya/sail/config/RyaSailFactory.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/org/apache/rya/sail/config/RyaSailFactory.java b/extras/indexing/src/main/java/org/apache/rya/sail/config/RyaSailFactory.java index bdb33ce..e156f86 100644 --- a/extras/indexing/src/main/java/org/apache/rya/sail/config/RyaSailFactory.java +++ b/extras/indexing/src/main/java/org/apache/rya/sail/config/RyaSailFactory.java @@ -131,6 +131,19 @@ public class RyaSailFactory { return dao; } + /** + * Creates AccumuloRyaDAO without updating the AccumuloRdfConfiguration. This method does not force + * the user's configuration to be consistent with the Rya Instance configuration. As a result, new index + * tables might be created when using this method. This method does not require the {@link AccumuloRyaInstanceDetailsRepository} + * to exist. This is for internal use, backwards compatibility and testing purposes only. It is recommended that + * {@link RyaSailFactory#getAccumuloDAOWithUpdatedConfig(AccumuloRdfConfiguration)} be used for new installations of Rya. + * + * @param config - user configuration + * @return - AccumuloRyaDAO with Indexers configured according to user's specification + * @throws AccumuloException + * @throws AccumuloSecurityException + * @throws RyaDAOException + */ public static AccumuloRyaDAO getAccumuloDAO(final AccumuloRdfConfiguration config) throws AccumuloException, AccumuloSecurityException, RyaDAOException { final Connector connector = ConfigUtils.getConnector(config); final AccumuloRyaDAO dao = new AccumuloRyaDAO(); @@ -142,6 +155,33 @@ public class RyaSailFactory { dao.init(); return dao; } + + /** + * Creates an AccumuloRyaDAO after updating the AccumuloRdfConfiguration so that it is consistent + * with the configuration of the RyaInstance that the user is trying to connect to. This ensures + * that user configuration aligns with Rya instance configuration and prevents the creation of + * new index tables based on a user's query configuration. This method requires the {@link AccumuloRyaInstanceDetailsRepository} + * to exist. + * + * @param config - user's query configuration + * @return - AccumuloRyaDAO with an updated configuration that is consistent with the Rya instance configuration + * @throws AccumuloException + * @throws AccumuloSecurityException + * @throws RyaDAOException + */ + public static AccumuloRyaDAO getAccumuloDAOWithUpdatedConfig(final AccumuloRdfConfiguration config) throws AccumuloException, AccumuloSecurityException, RyaDAOException { + + String ryaInstance = config.get(RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX); + Objects.requireNonNull(ryaInstance, "RyaInstance or table prefix is missing from configuration."+RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX); + String user = config.get(AccumuloRdfConfiguration.CLOUDBASE_USER); + String pswd = config.get(AccumuloRdfConfiguration.CLOUDBASE_PASSWORD); + Objects.requireNonNull(user, "Accumulo user name is missing from configuration."+AccumuloRdfConfiguration.CLOUDBASE_USER); + Objects.requireNonNull(pswd, "Accumulo user password is missing from configuration."+AccumuloRdfConfiguration.CLOUDBASE_PASSWORD); + config.setTableLayoutStrategy( new TablePrefixLayoutStrategy(ryaInstance) ); + updateAccumuloConfig(config, user, pswd, ryaInstance); + + return getAccumuloDAO(config); + } public static void updateAccumuloConfig(final AccumuloRdfConfiguration config, final String user, final String pswd, final String ryaInstance) throws AccumuloException, AccumuloSecurityException { try { http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreatePcj.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreatePcj.java b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreatePcj.java index 1de0813..a17f02f 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreatePcj.java +++ b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreatePcj.java @@ -113,9 +113,49 @@ public class CreatePcj { checkArgument(spInsertBatchSize > 0, "The SP insert batch size '" + spInsertBatchSize + "' must be greater than 0."); this.spInsertBatchSize = spInsertBatchSize; } + + + /** + * Tells the Fluo PCJ Updater application to maintain a new PCJ. This method does not + * require a pcjId and does not require a PCJ table to have already been created via {@link PrecomputedJoinStorage}. + * This method only adds the metadata to the Fluo table to incrementally generate query results. Since there + * is no PCJ table, the incremental results must be exported to some external queuing service such as Kafka. + * This method currently only supports SPARQL COSNTRUCT queries, as they only export to Kafka by default. + * + * @param sparql - SPARQL query whose results will be updated in the Fluo table + * @param fluo - A connection to the Fluo application that updates the PCJ index. (not null) + * @return The metadata that was written to the Fluo application for the PCJ. + * @throws MalformedQueryException The SPARQL query stored for the {@code pcjId} is malformed. + * @throws PcjException The PCJ Metadata for {@code pcjId} could not be read from {@code pcjStorage}. + * @throws RuntimeException If SPARQL query is not a CONSTRUCT query. + */ + public FluoQuery createFluoPcj(final FluoClient fluo, String sparql) throws MalformedQueryException, PcjException { + requireNonNull(sparql); + requireNonNull(fluo); + + // Keeps track of the IDs that are assigned to each of the query's nodes in Fluo. + // We use these IDs later when scanning Rya for historic Statement Pattern matches + // as well as setting up automatic exports. + final NodeIds nodeIds = new NodeIds(); + final ParsedQuery parsedQuery = new SPARQLParser().parseQuery(sparql, null); + final FluoQuery fluoQuery = new SparqlFluoQueryBuilder().make(parsedQuery, nodeIds); + checkArgument(fluoQuery.getConstructQueryMetadata().isPresent(), "Sparql query: " + sparql + " must begin with a construct."); + + try (Transaction tx = fluo.newTransaction()) { + // Write the query's structure to Fluo. + new FluoQueryMetadataDAO().write(tx, fluoQuery); + tx.commit(); + } + + return fluoQuery; + } + + /** - * Tells the Fluo PCJ Updater application to maintain a new PCJ. + * Tells the Fluo PCJ Updater application to maintain a new PCJ. This method requires that a + * PCJ table already exist for the query corresponding to the pcjId. Results will be exported + * to this table. * * @param pcjId - Identifies the PCJ that will be updated by the Fluo app. (not null) * @param pcjStorage - Provides access to the PCJ index. (not null) @@ -146,12 +186,14 @@ public class CreatePcj { try (Transaction tx = fluo.newTransaction()) { // Write the query's structure to Fluo. new FluoQueryMetadataDAO().write(tx, fluoQuery); - - // The results of the query are eventually exported to an instance of Rya, so store the Rya ID for the PCJ. - final String queryId = fluoQuery.getQueryMetadata().getNodeId(); - tx.set(queryId, FluoQueryColumns.RYA_PCJ_ID, pcjId); - tx.set(pcjId, FluoQueryColumns.PCJ_ID_QUERY_ID, queryId); - + + if (fluoQuery.getQueryMetadata().isPresent()) { + // If the query is not a construct query, + // the results of the query are eventually exported to an instance of Rya, so store the Rya ID for the PCJ. + final String queryId = fluoQuery.getQueryMetadata().get().getNodeId(); + tx.set(queryId, FluoQueryColumns.RYA_PCJ_ID, pcjId); + tx.set(pcjId, FluoQueryColumns.PCJ_ID_QUERY_ID, queryId); + } // Flush the changes to Fluo. tx.commit(); } @@ -165,7 +207,9 @@ public class CreatePcj { * This call scans Rya for Statement Pattern matches and inserts them into * the Fluo application. The Fluo application will then maintain the intermediate * results as new triples are inserted and export any new query results to the - * {@code pcjId} within the provided {@code pcjStorage}. + * {@code pcjId} within the provided {@code pcjStorage}. This method requires that a + * PCJ table already exist for the query corresponding to the pcjId. Results will be exported + * to this table. * * @param pcjId - Identifies the PCJ that will be updated by the Fluo app. (not null) * @param pcjStorage - Provides access to the PCJ index. (not null) @@ -227,9 +271,14 @@ public class CreatePcj { } catch (final IOException e) { log.warn("Ignoring IOException thrown while closing the AccumuloRyaQueryEngine used by CreatePCJ.", e); } - - // return queryId to the caller for later monitoring from the export. - return fluoQuery.getQueryMetadata().getNodeId(); + + //return queryId to the caller for later monitoring from the export + if(fluoQuery.getConstructQueryMetadata().isPresent()) { + return fluoQuery.getConstructQueryMetadata().get().getNodeId(); + } + + return fluoQuery.getQueryMetadata().get().getNodeId(); + } private static void writeBatch(final FluoClient fluo, final Set<RyaStatement> batch) { http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/DeletePcj.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/DeletePcj.java b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/DeletePcj.java index c11f9fb..87eb9cc 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/DeletePcj.java +++ b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/DeletePcj.java @@ -34,6 +34,7 @@ import org.apache.fluo.api.data.RowColumnValue; import org.apache.fluo.api.data.Span; import org.apache.rya.indexing.pcj.fluo.app.NodeType; import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata; +import org.apache.rya.indexing.pcj.fluo.app.query.ConstructQueryMetadata; import org.apache.rya.indexing.pcj.fluo.app.query.FilterMetadata; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO; @@ -139,6 +140,12 @@ public class DeletePcj { nodeIds.add(queryChild); getChildNodeIds(tx, queryChild, nodeIds); break; + case CONSTRUCT: + final ConstructQueryMetadata constructMeta = dao.readConstructQueryMetadata(tx, nodeId); + final String constructChild = constructMeta.getChildNodeId(); + nodeIds.add(constructChild); + getChildNodeIds(tx, constructChild, nodeIds); + break; case JOIN: final JoinMetadata joinMeta = dao.readJoinMetadata(tx, nodeId); final String lchild = joinMeta.getLeftChildNodeId(); @@ -229,7 +236,7 @@ public class DeletePcj { /** - * Deletes all BindingSets associated with the specified nodeId. + * Deletes all results (BindingSets or Statements) associated with the specified nodeId. * * @param nodeId - nodeId whose {@link BindingSet}s will be deleted. (not null) * @param client - Used to delete the data. (not null) @@ -240,7 +247,7 @@ public class DeletePcj { final NodeType type = NodeType.fromNodeId(nodeId).get(); Transaction tx = client.newTransaction(); - while(deleteDataBatch(tx, getIterator(tx, nodeId, type.getBsColumn()), type.getBsColumn())) { + while (deleteDataBatch(tx, getIterator(tx, nodeId, type.getResultColumn()), type.getResultColumn())) { tx = client.newTransaction(); } } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/ConstructGraph.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/ConstructGraph.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/ConstructGraph.java new file mode 100644 index 0000000..6c6f833 --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/ConstructGraph.java @@ -0,0 +1,141 @@ +package org.apache.rya.indexing.pcj.fluo.app; +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; + +import org.apache.rya.api.domain.RyaStatement; +import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; +import org.openrdf.model.BNode; +import org.openrdf.model.Value; +import org.openrdf.model.impl.BNodeImpl; +import org.openrdf.query.algebra.StatementPattern; + +import com.google.common.base.Preconditions; + +/** + * Creates a construct query graph (represented as a Set of + * {@link RyaStatement}s with Binding names subject, predicate, object) from a + * given BindingSet and the underlying {@link ConstructProjection}s. + * + */ +public class ConstructGraph { + + private Set<ConstructProjection> projections; + private Set<String> bNodeNames; + + /** + * Creates a ConstructGraph from the specified collection of {@link ConstructProjection}s. + * @param projections - ConstructProjections used to create a ConstructGraph + */ + public ConstructGraph(Set<ConstructProjection> projections) { + Preconditions.checkNotNull(projections); + Preconditions.checkArgument(projections.size() > 0); + this.projections = projections; + this.bNodeNames = getBNodeNames(projections); + } + + /** + * Creates a ConstructGraph from the given Collection of {@link StatementPattern}s. + * @param patterns - StatementPatterns used to create a ConstructGraph + */ + public ConstructGraph(Collection<StatementPattern> patterns) { + Preconditions.checkNotNull(patterns); + Preconditions.checkArgument(patterns.size() > 0); + Set<ConstructProjection> projections = new HashSet<>(); + for(StatementPattern pattern: patterns) { + projections.add(new ConstructProjection(pattern)); + } + this.projections = projections; + this.bNodeNames = getBNodeNames(projections); + } + + private Set<String> getBNodeNames(Set<ConstructProjection> projections) { + Set<String> bNodeNames = new HashSet<>(); + for (ConstructProjection projection : projections) { + Optional<Value> optVal = projection.getSubjValue(); + if (optVal.isPresent() && optVal.get() instanceof BNode) { + bNodeNames.add(projection.getSubjectSourceName()); + } + } + return bNodeNames; + } + + private Map<String, BNode> getBNodeMap() { + Map<String, BNode> bNodeMap = new HashMap<>(); + for(String name: bNodeNames) { + bNodeMap.put(name, new BNodeImpl(UUID.randomUUID().toString())); + } + return bNodeMap; + } + + /** + * @return - the {@link ConstructProjection}s used to build the construct graph + * returned by {@link ConstructGraph#createGraphFromBindingSet(VisibilityBindingSet)}. + */ + public Set<ConstructProjection> getProjections() { + return projections; + } + + /** + * Creates a construct query graph represented as a Set of {@link RyaStatement}s + * @param bs - VisiblityBindingSet used to build statement BindingSets + * @return - Set of RyaStatements that represent a construct query graph. + */ + public Set<RyaStatement> createGraphFromBindingSet(VisibilityBindingSet bs) { + Set<RyaStatement> bSets = new HashSet<>(); + long ts = System.currentTimeMillis(); + Map<String, BNode> bNodes = getBNodeMap(); + for(ConstructProjection projection: projections) { + RyaStatement statement = projection.projectBindingSet(bs, bNodes); + //ensure that all RyaStatements in graph have the same timestamp + statement.setTimestamp(ts); + bSets.add(statement); + } + return bSets; + } + + @Override + public boolean equals(Object o) { + if(this == o) { + return true; + } + + if(o instanceof ConstructGraph) { + ConstructGraph graph = (ConstructGraph) o; + return this.projections.equals(graph.projections); + } + return false; + } + + @Override + public int hashCode() { + int hash = 17; + for(ConstructProjection projection: projections) { + hash += projection.hashCode(); + } + + return hash; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/ConstructGraphSerializer.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/ConstructGraphSerializer.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/ConstructGraphSerializer.java new file mode 100644 index 0000000..82a6c6c --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/ConstructGraphSerializer.java @@ -0,0 +1,52 @@ +package org.apache.rya.indexing.pcj.fluo.app; +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import java.util.HashSet; +import java.util.Set; + +import com.google.common.base.Joiner; + +/** + * Converts {@link ConstructGraph}s to and from Strings for + * storage and retrieval from Fluo. + * + */ +public class ConstructGraphSerializer { + + public static final String SP_DELIM = "\u0002"; + + public static ConstructGraph toConstructGraph(String graphString) { + Set<ConstructProjection> projections = new HashSet<>(); + String[] spStrings = graphString.split(SP_DELIM); + for(String sp: spStrings) { + projections.add(new ConstructProjection(FluoStringConverter.toStatementPattern(sp))); + } + return new ConstructGraph(projections); + } + + public static String toConstructString(ConstructGraph graph) { + Set<ConstructProjection> projections = graph.getProjections(); + Set<String> spStrings = new HashSet<>(); + for(ConstructProjection projection: projections) { + spStrings.add(FluoStringConverter.toStatementPatternString(projection.getStatementPatternRepresentation())); + } + return Joiner.on(SP_DELIM).join(spStrings); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/ConstructProjection.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/ConstructProjection.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/ConstructProjection.java new file mode 100644 index 0000000..6c1aa01 --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/ConstructProjection.java @@ -0,0 +1,266 @@ +package org.apache.rya.indexing.pcj.fluo.app; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import java.io.UnsupportedEncodingException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.UUID; + +import org.apache.commons.lang3.builder.EqualsBuilder; +import org.apache.log4j.Logger; +import org.apache.rya.api.domain.RyaStatement; +import org.apache.rya.api.domain.RyaType; +import org.apache.rya.api.domain.RyaURI; +import org.apache.rya.api.resolver.RdfToRyaConversions; +import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; +import org.openrdf.model.BNode; +import org.openrdf.model.Resource; +import org.openrdf.model.URI; +import org.openrdf.model.Value; +import org.openrdf.model.impl.BNodeImpl; +import org.openrdf.query.BindingSet; +import org.openrdf.query.algebra.StatementPattern; +import org.openrdf.query.algebra.Var; + +import com.google.common.base.Objects; +import com.google.common.base.Preconditions; + +/** + * This class projects a VisibilityBindingSet onto a RyaStatement. The Binding + * {@link Value}s that get projected onto subject, predicate and object are + * indicated by the names {@link ConstructProjection#getSubjectSourceVar()}, + * {@link ConstructProjection#getPredicateSourceVar()} and + * {@link ConstructProjection#getObjectSourceVar()} and must satisfy standard + * RDF constraints for RDF subjects, predicates and objects. The purpose of + * projecting {@link BindingSet}s in this way is to provide functionality for + * SPARQL Construct queries which create RDF statements from query results. + * + */ +public class ConstructProjection { + + private static final Logger log = Logger.getLogger(ConstructProjection.class); + private String subjName; + private String predName; + private String objName; + private Optional<Value> subjValue; + private Optional<Value> predValue; + private Optional<Value> objValue; + private Var subjVar; + private Var predVar; + private Var objVar; + + public ConstructProjection(Var subjectVar, Var predicateVar, Var objectVar) { + Preconditions.checkNotNull(subjectVar); + Preconditions.checkNotNull(predicateVar); + Preconditions.checkNotNull(objectVar); + subjName = subjectVar.getName(); + predName = predicateVar.getName(); + objName = objectVar.getName(); + Preconditions.checkNotNull(subjName); + Preconditions.checkNotNull(predName); + Preconditions.checkNotNull(objName); + this.subjVar = subjectVar; + this.predVar = predicateVar; + this.objVar = objectVar; + if((subjVar.isAnonymous() || subjName.startsWith("-anon-")) && subjectVar.getValue() == null) { + subjValue = Optional.of(new BNodeImpl("")); + } else { + subjValue = Optional.ofNullable(subjectVar.getValue()); + } + predValue = Optional.ofNullable(predicateVar.getValue()); + objValue = Optional.ofNullable(objectVar.getValue()); + } + + public ConstructProjection(StatementPattern pattern) { + this(pattern.getSubjectVar(), pattern.getPredicateVar(), pattern.getObjectVar()); + } + + /** + * Returns a Var with info about the Value projected onto the RyaStatement + * subject. If the org.openrdf.query.algebra.Var returned by this method is + * not constant (as indicated by {@link Var#isConstant()}, then + * {@link Var#getName()} is the Binding name that gets projected. If the Var + * is constant, then {@link Var#getValue()} is assigned to the subject + * + * @return {@link org.openrdf.query.algebra.Var} containing info about + * Binding that gets projected onto the subject + */ + public String getSubjectSourceName() { + return subjName; + } + + /** + * Returns a Var with info about the Value projected onto the RyaStatement + * predicate. If the org.openrdf.query.algebra.Var returned by this method + * is not constant (as indicated by {@link Var#isConstant()}, then + * {@link Var#getName()} is the Binding name that gets projected. If the Var + * is constant, then {@link Var#getValue()} is assigned to the predicate + * + * @return {@link org.openrdf.query.algebra.Var} containing info about + * Binding that gets projected onto the predicate + */ + public String getPredicateSourceName() { + return predName; + } + + /** + * Returns a Var with info about the Value projected onto the RyaStatement + * object. If the org.openrdf.query.algebra.Var returned by this method is + * not constant (as indicated by {@link Var#isConstant()}, then + * {@link Var#getName()} is the Binding name that gets projected. If the Var + * is constant, then {@link Var#getValue()} is assigned to the object + * + * @return {@link org.openrdf.query.algebra.Var} containing info about + * Binding that gets projected onto the object + */ + public String getObjectSourceName() { + return objName; + } + + /** + * @return Value set for RyaStatement subject (if present) + */ + public Optional<Value> getSubjValue() { + return subjValue; + } + + /** + * @return Value set for RyaStatement predicate (if present) + */ + public Optional<Value> getPredValue() { + return predValue; + } + + /** + * @return Value set for RyaStatement object (if present) + */ + public Optional<Value> getObjValue() { + return objValue; + } + + + /** + * @return SubjectPattern representation of this ConstructProjection + * containing the {@link ConstructProjection#subjectSourceVar}, + * {@link ConstructProjection#predicateSourceVar}, + * {@link ConstructProjection#objectSourceVar} + */ + public StatementPattern getStatementPatternRepresentation() { + return new StatementPattern(subjVar, predVar, objVar); + } + + /** + * Projects a given BindingSet onto a RyaStatement. The subject, predicate, + * and object are extracted from the input VisibilityBindingSet (if the + * subjectSourceVar, predicateSourceVar, objectSourceVar is resp. + * non-constant) and from the Var Value itself (if subjectSourceVar, + * predicateSource, objectSourceVar is resp. constant). + * + * + * @param vBs + * - Visibility BindingSet that gets projected onto an RDF + * Statement BindingSet with Binding names subject, predicate and + * object + * @param bNodeMap - Optional Map used to pass {@link BNode}s for given variable names into + * multiple {@link ConstructProjection}s. This allows a ConstructGraph to create + * RyaStatements with the same BNode for a given variable name across multiple ConstructProjections. + * @return - RyaStatement whose values are determined by + * {@link ConstructProjection#getSubjectSourceVar()}, + * {@link ConstructProjection#getPredicateSourceVar()}, + * {@link ConstructProjection#getObjectSourceVar()}. + * + */ + public RyaStatement projectBindingSet(VisibilityBindingSet vBs, Map<String, BNode> bNodes) { + + Preconditions.checkNotNull(vBs); + Preconditions.checkNotNull(bNodes); + + Value subj = getValue(subjName, subjValue, vBs, bNodes); + Value pred = getValue(predName, predValue, vBs, bNodes); + Value obj = getValue(objName, objValue, vBs, bNodes); + + Preconditions.checkNotNull(subj); + Preconditions.checkNotNull(pred); + Preconditions.checkNotNull(obj); + Preconditions.checkArgument(subj instanceof Resource); + Preconditions.checkArgument(pred instanceof URI); + + RyaURI subjType = RdfToRyaConversions.convertResource((Resource) subj); + RyaURI predType = RdfToRyaConversions.convertURI((URI) pred); + RyaType objectType = RdfToRyaConversions.convertValue(obj); + + RyaStatement statement = new RyaStatement(subjType, predType, objectType); + try { + statement.setColumnVisibility(vBs.getVisibility().getBytes("UTF-8")); + } catch (UnsupportedEncodingException e) { + log.trace("Unable to decode column visibility. RyaStatement being created without column visibility."); + } + return statement; + } + + private Value getValue(String name, Optional<Value> optValue, VisibilityBindingSet bs, Map<String, BNode> bNodes) { + Value returnValue = null; + if (optValue.isPresent()) { + Value tempValue = optValue.get(); + if(tempValue instanceof BNode) { + Preconditions.checkArgument(bNodes.containsKey(name)); + returnValue = bNodes.get(name); + } else { + returnValue = tempValue; + } + } else { + returnValue = bs.getValue(name); + } + return returnValue; + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + + if (o instanceof ConstructProjection) { + ConstructProjection projection = (ConstructProjection) o; + return new EqualsBuilder().append(this.subjName, projection.subjName).append(this.predName, projection.predName) + .append(this.objName, projection.objName).append(this.subjValue, projection.subjValue) + .append(this.predValue, projection.predValue).append(this.objValue, projection.objValue).isEquals(); + } + return false; + + } + + @Override + public int hashCode() { + return Objects.hashCode(this.subjName, this.predName, this.objName, this.subjValue, this.predValue, this.objValue); + } + + @Override + public String toString() { + return new StringBuilder().append("Construct Projection {\n").append(" Subject Name: " + subjName + "\n") + .append(" Subject Value: " + subjValue + "\n").append(" Predicate Name: " + predName + "\n") + .append(" Predicate Value: " + predValue + "\n").append(" Object Name: " + objName + "\n") + .append(" Object Value: " + objValue + "\n").append("}").toString(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/ConstructQueryResultUpdater.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/ConstructQueryResultUpdater.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/ConstructQueryResultUpdater.java new file mode 100644 index 0000000..d8d60b5 --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/ConstructQueryResultUpdater.java @@ -0,0 +1,91 @@ +package org.apache.rya.indexing.pcj.fluo.app; +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import java.util.Set; + +import org.apache.fluo.api.client.TransactionBase; +import org.apache.fluo.api.data.Bytes; +import org.apache.fluo.api.data.Column; +import org.apache.log4j.Logger; +import org.apache.rya.api.domain.RyaSchema; +import org.apache.rya.api.domain.RyaStatement; +import org.apache.rya.api.domain.RyaSubGraph; +import org.apache.rya.indexing.pcj.fluo.app.export.kafka.RyaSubGraphKafkaSerDe; +import org.apache.rya.indexing.pcj.fluo.app.query.ConstructQueryMetadata; +import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; +import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; + +/** + * This class creates results for the ConstructQuery. This class applies the {@link ConstructGraph} + * associated with the Construct Query to generate a collection of {@link RyaStatement}s. These statements + * are then used to form a {@link RyaSubGraph} that is serialized and stored as a value in the Column + * {@link FluoQueryColumns#CONSTRUCT_STATEMENTS}. + * + */ +public class ConstructQueryResultUpdater { + + private static final Logger log = Logger.getLogger(ConstructQueryResultUpdater.class); + private static final RyaSubGraphKafkaSerDe serializer = new RyaSubGraphKafkaSerDe(); + + /** + * Updates the Construct Query results by applying the {@link ConnstructGraph} to + * create a {@link RyaSubGraph} and then writing the subgraph to {@link FluoQueryColumns#CONSTRUCT_STATEMENTS}. + * @param tx - transaction used to write the subgraph + * @param bs - BindingSet that the ConstructProjection expands into a subgraph + * @param metadata - metadata that the ConstructProjection is extracted from + */ + public void updateConstructQueryResults(TransactionBase tx, VisibilityBindingSet bs, ConstructQueryMetadata metadata) { + + String nodeId = metadata.getNodeId(); + Column column = FluoQueryColumns.CONSTRUCT_STATEMENTS; + ConstructGraph graph = metadata.getConstructGraph(); + + try { + Set<RyaStatement> statements = graph.createGraphFromBindingSet(bs); + RyaSubGraph subgraph = new RyaSubGraph(metadata.getNodeId(), statements); + String resultId = nodeId + "_" + getSubGraphId(subgraph); + tx.set(Bytes.of(resultId), column, Bytes.of(serializer.toBytes(subgraph))); + } catch (Exception e) { + log.trace("Unable to serialize RyaStatement generated by ConstructGraph: " + graph + " from BindingSet: " + bs ); + } + } + + /** + * Generates a simple hash used as an id for the subgraph. Id generated as hash as opposed + * to UUID to avoid the same subgraph result being stored under multiple UUID. + * @param subgraph - subgraph that an id is need for + * @return - hash of subgraph used as an id + */ + private int getSubGraphId(RyaSubGraph subgraph) { + int id = 17; + id = 31*id + subgraph.getId().hashCode(); + for(RyaStatement statement: subgraph.getStatements()) { + int statementId = 7; + if(!statement.getSubject().getData().startsWith(RyaSchema.BNODE_NAMESPACE)) { + statementId = 17*statementId + statement.getSubject().hashCode(); + } + statementId = 17*statementId + statement.getPredicate().hashCode(); + statementId = 17*statementId + statement.getObject().hashCode(); + id += statementId; + } + return Math.abs(id); + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FluoStringConverter.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FluoStringConverter.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FluoStringConverter.java index 5221c21..05a8d1c 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FluoStringConverter.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FluoStringConverter.java @@ -23,17 +23,25 @@ import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.DE import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.TYPE_DELIM; import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.URI_TYPE; +import java.util.UUID; + import edu.umd.cs.findbugs.annotations.DefaultAnnotation; import edu.umd.cs.findbugs.annotations.NonNull; +import org.openrdf.model.BNode; import org.openrdf.model.Literal; import org.openrdf.model.URI; +import org.openrdf.model.Value; +import org.openrdf.model.impl.BNodeImpl; import org.openrdf.model.impl.LiteralImpl; import org.openrdf.model.impl.URIImpl; import org.openrdf.query.BindingSet; import org.openrdf.query.algebra.StatementPattern; import org.openrdf.query.algebra.Var; +import com.google.common.base.Preconditions; + +import org.apache.rya.api.domain.RyaSchema; import org.apache.rya.api.domain.RyaType; import org.apache.rya.api.resolver.RdfToRyaConversions; @@ -85,25 +93,33 @@ public class FluoStringConverter { */ public static Var toVar(final String varString) { checkNotNull(varString); - - if(varString.startsWith("-const-")) { - // The variable is a constant value. - final String[] varParts = varString.split(TYPE_DELIM); - final String name = varParts[0]; - final String valueString = name.substring("-const-".length()); - + final String[] varParts = varString.split(TYPE_DELIM); + final String name = varParts[0]; + + // The variable is a constant value. + if(varParts.length > 1) { final String dataTypeString = varParts[1]; if(dataTypeString.equals(URI_TYPE)) { // Handle a URI object. + Preconditions.checkArgument(varParts.length == 2); + final String valueString = name.substring("-const-".length()); final Var var = new Var(name, new URIImpl(valueString)); - var.setAnonymous(true); + var.setConstant(true); + return var; + } else if(dataTypeString.equals(RyaSchema.BNODE_NAMESPACE)) { + // Handle a BNode object + Preconditions.checkArgument(varParts.length == 3); + Var var = new Var(name); + var.setValue(new BNodeImpl(varParts[2])); return var; } else { - // Literal value. + // Handle a Literal Value. + Preconditions.checkArgument(varParts.length == 2); + final String valueString = name.substring("-const-".length()); final URI dataType = new URIImpl(dataTypeString); final Literal value = new LiteralImpl(valueString, dataType); final Var var = new Var(name, value); - var.setAnonymous(true); + var.setConstant(true); return var; } } else { @@ -126,19 +142,24 @@ public class FluoStringConverter { final Var subjVar = sp.getSubjectVar(); String subj = subjVar.getName(); - if(subjVar.isConstant()) { - subj = subj + TYPE_DELIM + URI_TYPE; - } + if(subjVar.getValue() != null) { + Value subjValue = subjVar.getValue(); + if (subjValue instanceof BNode ) { + subj = subj + TYPE_DELIM + RyaSchema.BNODE_NAMESPACE + TYPE_DELIM + ((BNode) subjValue).getID(); + } else { + subj = subj + TYPE_DELIM + URI_TYPE; + } + } final Var predVar = sp.getPredicateVar(); String pred = predVar.getName(); - if(predVar.isConstant()) { + if(predVar.getValue() != null) { pred = pred + TYPE_DELIM + URI_TYPE; } final Var objVar = sp.getObjectVar(); String obj = objVar.getName(); - if (objVar.isConstant()) { + if (objVar.getValue() != null) { final RyaType rt = RdfToRyaConversions.convertValue(objVar.getValue()); obj = obj + TYPE_DELIM + rt.getDataType().stringValue(); } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncrementalUpdateConstants.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncrementalUpdateConstants.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncrementalUpdateConstants.java index be4df71..f9d14b5 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncrementalUpdateConstants.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncrementalUpdateConstants.java @@ -33,6 +33,7 @@ public class IncrementalUpdateConstants { public static final String FILTER_PREFIX = "FILTER"; public static final String AGGREGATION_PREFIX = "AGGREGATION"; public static final String QUERY_PREFIX = "QUERY"; + public static final String CONSTRUCT_PREFIX = "CONSTRUCT"; public static final String URI_TYPE = "http://www.w3.org/2001/XMLSchema#anyURI"; } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/NodeType.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/NodeType.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/NodeType.java index 5365e30..b829b7e 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/NodeType.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/NodeType.java @@ -20,6 +20,7 @@ package org.apache.rya.indexing.pcj.fluo.app; import static java.util.Objects.requireNonNull; import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.AGGREGATION_PREFIX; +import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.CONSTRUCT_PREFIX; import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.FILTER_PREFIX; import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.JOIN_PREFIX; import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.QUERY_PREFIX; @@ -30,7 +31,6 @@ import java.util.List; import org.apache.fluo.api.data.Column; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns.QueryNodeMetadataColumns; -import org.openrdf.query.BindingSet; import com.google.common.base.Optional; @@ -42,23 +42,24 @@ public enum NodeType { JOIN(QueryNodeMetadataColumns.JOIN_COLUMNS, FluoQueryColumns.JOIN_BINDING_SET), STATEMENT_PATTERN(QueryNodeMetadataColumns.STATEMENTPATTERN_COLUMNS, FluoQueryColumns.STATEMENT_PATTERN_BINDING_SET), QUERY(QueryNodeMetadataColumns.QUERY_COLUMNS, FluoQueryColumns.QUERY_BINDING_SET), - AGGREGATION(QueryNodeMetadataColumns.AGGREGATION_COLUMNS, FluoQueryColumns.AGGREGATION_BINDING_SET); + AGGREGATION(QueryNodeMetadataColumns.AGGREGATION_COLUMNS, FluoQueryColumns.AGGREGATION_BINDING_SET), + CONSTRUCT(QueryNodeMetadataColumns.CONSTRUCT_COLUMNS, FluoQueryColumns.CONSTRUCT_STATEMENTS); //Metadata Columns associated with given NodeType private QueryNodeMetadataColumns metadataColumns; - //Column where BindingSet results are stored for given NodeType - private Column bindingSetColumn; + //Column where results are stored for given NodeType + private Column resultColumn; /** * Constructs an instance of {@link NodeType}. * * @param metadataColumns - Metadata {@link Column}s associated with this {@link NodeType}. (not null) - * @param bindingSetColumn - The {@link Column} used to store this {@link NodeType|'s {@link BindingSet}s. (not null) + * @param resultColumn - The {@link Column} used to store this {@link NodeType}'s results. (not null) */ - private NodeType(final QueryNodeMetadataColumns metadataColumns, final Column bindingSetColumn) { + private NodeType(QueryNodeMetadataColumns metadataColumns, Column resultColumn) { this.metadataColumns = requireNonNull(metadataColumns); - this.bindingSetColumn = requireNonNull(bindingSetColumn); + this.resultColumn = requireNonNull(resultColumn); } /** @@ -70,10 +71,10 @@ public enum NodeType { /** - * @return The {@link Column} used to store this {@link NodeType|'s {@link BindingSet}s. + * @return The {@link Column} used to store this {@link NodeType}'s query results. */ - public Column getBsColumn() { - return bindingSetColumn; + public Column getResultColumn() { + return resultColumn; } /** @@ -98,6 +99,8 @@ public enum NodeType { type = QUERY; } else if(nodeId.startsWith(AGGREGATION_PREFIX)) { type = AGGREGATION; + } else if(nodeId.startsWith(CONSTRUCT_PREFIX)) { + type = CONSTRUCT; } return Optional.fromNullable(type); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalBindingSetExporter.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalBindingSetExporter.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalBindingSetExporter.java new file mode 100644 index 0000000..c2f4cb4 --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalBindingSetExporter.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app.export; + +import org.apache.fluo.api.client.TransactionBase; +import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * Exports a single Binding Set that is a new result for a SPARQL query to some + * other location. + */ +@DefaultAnnotation(NonNull.class) +public interface IncrementalBindingSetExporter extends AutoCloseable { + + /** + * Export a Binding Set that is a result of a SPARQL query that does not include a Group By clause. + * + * @param tx - The Fluo transaction this export is a part of. (not null) + * @param queryId - The Fluo ID of the SPARQL query the binding set is a result of. (not null) + * @param bindingSetString - The Binding Set as it was represented within the Fluo application. (not null) + * @throws ResultExportException The result could not be exported. + */ + public void export(TransactionBase tx, String queryId, VisibilityBindingSet result) throws ResultExportException; + + /** + * A result could not be exported. + */ + public static class ResultExportException extends Exception { + private static final long serialVersionUID = 1L; + + /** + * Constructs an instance of {@link ResultExportException}. + * + * @param message - Explains why the exception was thrown. + */ + public ResultExportException(final String message) { + super(message); + } + + /** + * Constructs an instance of {@link ResultExportException}. + * + * @param message - Explains why the exception was thrown. + * @param cause - The exception that caused this one to be thrown. + */ + public ResultExportException(final String message, final Throwable cause) { + super(message, cause); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalBindingSetExporterFactory.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalBindingSetExporterFactory.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalBindingSetExporterFactory.java new file mode 100644 index 0000000..1bf492a --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalBindingSetExporterFactory.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app.export; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +import com.google.common.base.Optional; + +import org.apache.fluo.api.observer.Observer.Context; + +/** + * Builds instances of {@link IncrementalBindingSetExporter} using the provided + * configurations. + */ +@DefaultAnnotation(NonNull.class) +public interface IncrementalBindingSetExporterFactory { + + /** + * Builds an instance of {@link IncrementalBindingSetExporter} using the + * configurations that are provided. + * + * @param context - Contains the host application's configuration values + * and any parameters that were provided at initialization. (not null) + * @return An exporter if configurations were found in the context; otherwise absent. + * @throws IncrementalExporterFactoryException A non-configuration related + * problem has occurred and the exporter could not be created as a result. + * @throws ConfigurationException Thrown if configuration values were + * provided, but an instance of the exporter could not be initialized + * using them. This could be because they were improperly formatted, + * a required field was missing, or some other configuration based problem. + */ + public Optional<IncrementalBindingSetExporter> build(Context context) throws IncrementalExporterFactoryException, ConfigurationException; + + /** + * Indicates a {@link IncrementalBindingSetExporter} could not be created by a + * {@link IncrementalBindingSetExporterFactory}. + */ + public static class IncrementalExporterFactoryException extends Exception { + private static final long serialVersionUID = 1L; + + /** + * Constructs an instance of {@link }. + * + * @param message - Explains why this exception is being thrown. + */ + public IncrementalExporterFactoryException(final String message) { + super(message); + } + + /** + * Constructs an instance of {@link }. + * + * @param message - Explains why this exception is being thrown. + * @param cause - The exception that caused this one to be thrown. + */ + public IncrementalExporterFactoryException(final String message, final Throwable t) { + super(message, t); + } + } + + /** + * The configuration could not be interpreted because required fields were + * missing or a value wasn't properly formatted. + */ + public static class ConfigurationException extends IncrementalExporterFactoryException { + private static final long serialVersionUID = 1L; + + /** + * Constructs an instance of {@link ConfigurationException}. + * + * @param message - Explains why this exception is being thrown. + */ + public ConfigurationException(final String message) { + super(message); + } + + /** + * Constructs an instance of {@link ConfigurationException}. + * + * @param message - Explains why this exception is being thrown. + * @param cause - The exception that caused this one to be thrown. + */ + public ConfigurationException(final String message, final Throwable cause) { + super(message, cause); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalResultExporter.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalResultExporter.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalResultExporter.java deleted file mode 100644 index 02dced7..0000000 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalResultExporter.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.rya.indexing.pcj.fluo.app.export; - -import org.apache.fluo.api.client.TransactionBase; -import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; - -import edu.umd.cs.findbugs.annotations.DefaultAnnotation; -import edu.umd.cs.findbugs.annotations.NonNull; - -/** - * Exports a single Binding Set that is a new result for a SPARQL query to some - * other location. - */ -@DefaultAnnotation(NonNull.class) -public interface IncrementalResultExporter extends AutoCloseable { - - /** - * Export a Binding Set that is a result of a SPARQL query that does not include a Group By clause. - * - * @param tx - The Fluo transaction this export is a part of. (not null) - * @param queryId - The Fluo ID of the SPARQL query the binding set is a result of. (not null) - * @param bindingSetString - The Binding Set as it was represented within the Fluo application. (not null) - * @throws ResultExportException The result could not be exported. - */ - public void export(TransactionBase tx, String queryId, VisibilityBindingSet result) throws ResultExportException; - - /** - * A result could not be exported. - */ - public static class ResultExportException extends Exception { - private static final long serialVersionUID = 1L; - - /** - * Constructs an instance of {@link ResultExportException}. - * - * @param message - Explains why the exception was thrown. - */ - public ResultExportException(final String message) { - super(message); - } - - /** - * Constructs an instance of {@link ResultExportException}. - * - * @param message - Explains why the exception was thrown. - * @param cause - The exception that caused this one to be thrown. - */ - public ResultExportException(final String message, final Throwable cause) { - super(message, cause); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalResultExporterFactory.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalResultExporterFactory.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalResultExporterFactory.java deleted file mode 100644 index f9fe2bd..0000000 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalResultExporterFactory.java +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.rya.indexing.pcj.fluo.app.export; - -import edu.umd.cs.findbugs.annotations.DefaultAnnotation; -import edu.umd.cs.findbugs.annotations.NonNull; - -import com.google.common.base.Optional; - -import org.apache.fluo.api.observer.Observer.Context; - -/** - * Builds instances of {@link IncrementalResultExporter} using the provided - * configurations. - */ -@DefaultAnnotation(NonNull.class) -public interface IncrementalResultExporterFactory { - - /** - * Builds an instance of {@link IncrementalResultExporter} using the - * configurations that are provided. - * - * @param context - Contains the host application's configuration values - * and any parameters that were provided at initialization. (not null) - * @return An exporter if configurations were found in the context; otherwise absent. - * @throws IncrementalExporterFactoryException A non-configuration related - * problem has occurred and the exporter could not be created as a result. - * @throws ConfigurationException Thrown if configuration values were - * provided, but an instance of the exporter could not be initialized - * using them. This could be because they were improperly formatted, - * a required field was missing, or some other configuration based problem. - */ - public Optional<IncrementalResultExporter> build(Context context) throws IncrementalExporterFactoryException, ConfigurationException; - - /** - * Indicates a {@link IncrementalResultExporter} could not be created by a - * {@link IncrementalResultExporterFactory}. - */ - public static class IncrementalExporterFactoryException extends Exception { - private static final long serialVersionUID = 1L; - - /** - * Constructs an instance of {@link }. - * - * @param message - Explains why this exception is being thrown. - */ - public IncrementalExporterFactoryException(final String message) { - super(message); - } - - /** - * Constructs an instance of {@link }. - * - * @param message - Explains why this exception is being thrown. - * @param cause - The exception that caused this one to be thrown. - */ - public IncrementalExporterFactoryException(final String message, final Throwable t) { - super(message, t); - } - } - - /** - * The configuration could not be interpreted because required fields were - * missing or a value wasn't properly formatted. - */ - public static class ConfigurationException extends IncrementalExporterFactoryException { - private static final long serialVersionUID = 1L; - - /** - * Constructs an instance of {@link ConfigurationException}. - * - * @param message - Explains why this exception is being thrown. - */ - public ConfigurationException(final String message) { - super(message); - } - - /** - * Constructs an instance of {@link ConfigurationException}. - * - * @param message - Explains why this exception is being thrown. - * @param cause - The exception that caused this one to be thrown. - */ - public ConfigurationException(final String message, final Throwable cause) { - super(message, cause); - } - } -} \ No newline at end of file
