RYA-440 Added commands to Rya Shell used to interact with Rya Streams. Closes #267.
Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/59b20263 Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/59b20263 Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/59b20263 Branch: refs/heads/master Commit: 59b20263c7c574d46d8462985fd7be126e8c4ea1 Parents: 3d4a5d0 Author: kchilton2 <[email protected]> Authored: Tue Jan 16 17:40:08 2018 -0500 Committer: caleb <[email protected]> Committed: Mon Jan 29 14:01:11 2018 -0500 ---------------------------------------------------------------------- .../org/apache/rya/api/client/RyaClient.java | 10 + .../api/client/SetRyaStreamsConfiguration.java | 41 ++ .../org/apache/rya/api/instance/RyaDetails.java | 104 ++++- .../apache/rya/api/instance/RyaDetailsTest.java | 8 +- .../mongodb/instance/MongoDetailsAdapter.java | 87 +++- .../instance/MongoDetailsAdapterTest.java | 9 +- .../client/SetRyaStreamsConfigurationBase.java | 83 ++++ .../accumulo/AccumuloRyaClientFactory.java | 6 +- .../AccumuloSetRyaStreamsConfiguration.java | 59 +++ .../api/client/mongo/MongoRyaClientFactory.java | 1 + .../mongo/MongoSetRyaStreamsConfiguration.java | 60 +++ .../AccumuloSetRyaStreamsConfigurationIT.java | 81 ++++ .../MongoSetRyaStreamsConfigurationIT.java | 85 ++++ .../rya/streams/api/RyaStreamsClient.java | 137 ++++++ .../rya/streams/api/interactor/GetQuery.java | 44 ++ .../interactor/defaults/DefaultGetQuery.java | 55 +++ .../kafka/KafkaRyaStreamsClientFactory.java | 170 +++++++ extras/shell/pom.xml | 4 + .../apache/rya/shell/RyaConnectionCommands.java | 50 +- .../apache/rya/shell/RyaStreamsCommands.java | 297 ++++++++++++ .../org/apache/rya/shell/SharedShellState.java | 58 ++- .../rya/shell/util/StreamsQueryFormatter.java | 106 +++++ .../META-INF/spring/spring-shell-plugin.xml | 3 +- .../rya/shell/RyaStreamsCommandsTest.java | 461 +++++++++++++++++++ .../apache/rya/shell/SharedShellStateTest.java | 31 ++ .../shell/util/StreamsQueryFormatterTest.java | 102 ++++ .../src/test/resources/RyaShellTest-context.xml | 1 + 27 files changed, 2101 insertions(+), 52 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/59b20263/common/rya.api/src/main/java/org/apache/rya/api/client/RyaClient.java ---------------------------------------------------------------------- diff --git a/common/rya.api/src/main/java/org/apache/rya/api/client/RyaClient.java b/common/rya.api/src/main/java/org/apache/rya/api/client/RyaClient.java index 92b18a1..c122f43 100644 --- a/common/rya.api/src/main/java/org/apache/rya/api/client/RyaClient.java +++ b/common/rya.api/src/main/java/org/apache/rya/api/client/RyaClient.java @@ -45,6 +45,7 @@ public class RyaClient { private final ListInstances listInstances; private final Optional<AddUser> addUser; private final Optional<RemoveUser> removeUser; + private final SetRyaStreamsConfiguration setRyaStreamsConfig; private final Uninstall uninstall; private final LoadStatements loadStatements; private final LoadStatementsFile loadStatementsFile; @@ -65,6 +66,7 @@ public class RyaClient { final ListInstances listInstances, final Optional<AddUser> addUser, final Optional<RemoveUser> removeUser, + final SetRyaStreamsConfiguration setRyaStreamsConfig, final Uninstall uninstall, final LoadStatements loadStatements, final LoadStatementsFile loadStatementsFile, @@ -81,6 +83,7 @@ public class RyaClient { this.listInstances = requireNonNull(listInstances); this.addUser = requireNonNull(addUser); this.removeUser = requireNonNull(removeUser); + this.setRyaStreamsConfig = requireNonNull(setRyaStreamsConfig); this.uninstall = requireNonNull(uninstall); this.loadStatements = requireNonNull(loadStatements); this.loadStatementsFile = requireNonNull(loadStatementsFile); @@ -176,6 +179,13 @@ public class RyaClient { } /** + * @return An instance of {@link SetRyaStreamsConfiguration} that is connected to a Rya storage. + */ + public SetRyaStreamsConfiguration getSetRyaStreamsConfiguration() { + return setRyaStreamsConfig; + } + + /** * @return An instance of {@link Uninstall} that is connected to a Rya storage. */ public Uninstall getUninstall() { http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/59b20263/common/rya.api/src/main/java/org/apache/rya/api/client/SetRyaStreamsConfiguration.java ---------------------------------------------------------------------- diff --git a/common/rya.api/src/main/java/org/apache/rya/api/client/SetRyaStreamsConfiguration.java b/common/rya.api/src/main/java/org/apache/rya/api/client/SetRyaStreamsConfiguration.java new file mode 100644 index 0000000..5e75f06 --- /dev/null +++ b/common/rya.api/src/main/java/org/apache/rya/api/client/SetRyaStreamsConfiguration.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.api.client; + +import org.apache.rya.api.instance.RyaDetails.RyaStreamsDetails; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * Update which Rya Streams subsystem a Rya instance is connected to. + */ +@DefaultAnnotation(NonNull.class) +public interface SetRyaStreamsConfiguration { + + /** + * Update which Rya Streams subsystem a Rya instance is connected to. + * + * @param instanceName - Indicates which Rya instance will have a Rya Streams subsystem assigned to it. (not null) + * @param streamsDetails - Indicates which Rya Streams subsystem the instance will use. (not null) + * @throws InstanceDoesNotExistException No instance of Rya exists for the provided name. + * @throws RyaClientException Something caused the command to fail. + */ + public void setRyaStreamsConfiguration(String ryaInstance, RyaStreamsDetails streamsDetails) throws InstanceDoesNotExistException, RyaClientException; +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/59b20263/common/rya.api/src/main/java/org/apache/rya/api/instance/RyaDetails.java ---------------------------------------------------------------------- diff --git a/common/rya.api/src/main/java/org/apache/rya/api/instance/RyaDetails.java b/common/rya.api/src/main/java/org/apache/rya/api/instance/RyaDetails.java index 9d2c1e5..bda7390 100644 --- a/common/rya.api/src/main/java/org/apache/rya/api/instance/RyaDetails.java +++ b/common/rya.api/src/main/java/org/apache/rya/api/instance/RyaDetails.java @@ -29,25 +29,22 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Objects; -import edu.umd.cs.findbugs.annotations.Nullable; -import edu.umd.cs.findbugs.annotations.DefaultAnnotation; -import edu.umd.cs.findbugs.annotations.NonNull; -import net.jcip.annotations.Immutable; - -import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails; -import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails; - import com.google.common.base.Optional; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; +import edu.umd.cs.findbugs.annotations.Nullable; +import net.jcip.annotations.Immutable; + /** * Details about how a Rya instance's state. */ @Immutable @DefaultAnnotation(NonNull.class) public class RyaDetails implements Serializable { - private static final long serialVersionUID = 1L; + private static final long serialVersionUID = 2L; // General metadata about the instance. private final String instanceName; @@ -68,6 +65,9 @@ public class RyaDetails implements Serializable { private final ProspectorDetails prospectorDetails; private final JoinSelectivityDetails joinSelectivityDetails; + // Rya Streams Details. + private final Optional<RyaStreamsDetails> ryaStreamsDetails; + /** * Private to prevent initialization through the constructor. To build * instances of this class, use the {@link Builder}. @@ -82,7 +82,8 @@ public class RyaDetails implements Serializable { final TemporalIndexDetails temporalDetails, final FreeTextIndexDetails freeTextDetails, final ProspectorDetails prospectorDetails, - final JoinSelectivityDetails joinSelectivityDetails) { + final JoinSelectivityDetails joinSelectivityDetails, + final Optional<RyaStreamsDetails> ryaStreamsDetails) { this.instanceName = requireNonNull(instanceName); this.version = requireNonNull(version); this.users = requireNonNull(users); @@ -93,6 +94,7 @@ public class RyaDetails implements Serializable { this.freeTextDetails = requireNonNull(freeTextDetails); this.prospectorDetails = requireNonNull(prospectorDetails); this.joinSelectivityDetails = requireNonNull(joinSelectivityDetails); + this.ryaStreamsDetails = requireNonNull(ryaStreamsDetails); } /** @@ -168,6 +170,13 @@ public class RyaDetails implements Serializable { return joinSelectivityDetails; } + /** + * @return Information about the instance's Rya Streams integration, if it was set. + */ + public Optional<RyaStreamsDetails> getRyaStreamsDetails() { + return ryaStreamsDetails; + } + @Override public int hashCode() { return Objects.hash( @@ -179,7 +188,8 @@ public class RyaDetails implements Serializable { temporalDetails, freeTextDetails, prospectorDetails, - joinSelectivityDetails); + joinSelectivityDetails, + ryaStreamsDetails); } @Override @@ -197,7 +207,8 @@ public class RyaDetails implements Serializable { Objects.equals(temporalDetails, details.temporalDetails) && Objects.equals(freeTextDetails, details.freeTextDetails) && Objects.equals(prospectorDetails, details.prospectorDetails) && - Objects.equals(joinSelectivityDetails, details.joinSelectivityDetails); + Objects.equals(joinSelectivityDetails, details.joinSelectivityDetails) && + Objects.equals(ryaStreamsDetails, details.ryaStreamsDetails); } return false; } @@ -239,6 +250,9 @@ public class RyaDetails implements Serializable { private ProspectorDetails prospectorDetails; private JoinSelectivityDetails joinSelectivityDetails; + // Rya Streams Details. + private RyaStreamsDetails ryaStreamsDetails; + /** * Construcst an empty instance of {@link Builder}. */ @@ -262,6 +276,7 @@ public class RyaDetails implements Serializable { freeTextDetails = details.freeTextDetails; prospectorDetails = details.prospectorDetails; joinSelectivityDetails = details.joinSelectivityDetails; + ryaStreamsDetails = details.ryaStreamsDetails.orNull(); } /** @@ -375,6 +390,15 @@ public class RyaDetails implements Serializable { } /** + * @param ryaStreamsDetails - Information about the instance's Rya Streams integration. + * @return This {@link Builder} so that method invocations may be chained. + */ + public Builder setRyaStreamsDetails(@Nullable final RyaStreamsDetails ryaStreamsDetails) { + this.ryaStreamsDetails = ryaStreamsDetails; + return this; + } + + /** * @return An instance of {@link RyaDetails} built using this * builder's values. */ @@ -389,7 +413,8 @@ public class RyaDetails implements Serializable { temporalDetails, freeTextDetails, prospectorDetails, - joinSelectivityDetails); + joinSelectivityDetails, + Optional.fromNullable(ryaStreamsDetails)); } } @@ -1071,4 +1096,57 @@ public class RyaDetails implements Serializable { return false; } } + + /** + * Details about the Rya instance's Rya Streams integration. + */ + public static class RyaStreamsDetails implements Serializable { + private static final long serialVersionUID = 1L; + + private final String hostname; + private final int port; + + /** + * Constructs an instance of {@link RyaStreamsDetails}. + * + * @param hostname - The hostname used to communicate with the Rya Streams subsystem. (not null) + * @param port - The port used to communicate with the Rya Streams subsystem. + */ + public RyaStreamsDetails(final String hostname, final int port) { + this.hostname = requireNonNull(hostname); + this.port = port; + } + + /** + * @return The hostname used to communicate with the Rya Streams subsystem. + */ + public String getHostname() { + return hostname; + } + + /** + * @return The port used to communicate with the Rya Streams subsystem. + */ + public int getPort() { + return port; + } + + @Override + public int hashCode() { + return Objects.hash(hostname, port); + } + + @Override + public boolean equals(final Object obj) { + if(this == obj) { + return true; + } + if(obj instanceof RyaStreamsDetails) { + final RyaStreamsDetails other = (RyaStreamsDetails) obj; + return Objects.equals(hostname, other.hostname) && + port == other.port; + } + return false; + } + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/59b20263/common/rya.api/src/test/java/org/apache/rya/api/instance/RyaDetailsTest.java ---------------------------------------------------------------------- diff --git a/common/rya.api/src/test/java/org/apache/rya/api/instance/RyaDetailsTest.java b/common/rya.api/src/test/java/org/apache/rya/api/instance/RyaDetailsTest.java index a356877..b6e92e0 100644 --- a/common/rya.api/src/test/java/org/apache/rya/api/instance/RyaDetailsTest.java +++ b/common/rya.api/src/test/java/org/apache/rya/api/instance/RyaDetailsTest.java @@ -31,6 +31,7 @@ import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails.FluoDetails; import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails; import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails.PCJUpdateStrategy; import org.apache.rya.api.instance.RyaDetails.ProspectorDetails; +import org.apache.rya.api.instance.RyaDetails.RyaStreamsDetails; import org.apache.rya.api.instance.RyaDetails.TemporalIndexDetails; import org.junit.Test; @@ -65,7 +66,8 @@ public class RyaDetailsTest { .setId("pcj 2") .setUpdateStrategy(PCJUpdateStrategy.INCREMENTAL))) .setProspectorDetails( new ProspectorDetails(Optional.of(new Date())) ) - .setJoinSelectivityDetails( new JoinSelectivityDetails(Optional.of(new Date())) ); + .setJoinSelectivityDetails( new JoinSelectivityDetails(Optional.of(new Date())) ) + .setRyaStreamsDetails(new RyaStreamsDetails("localhost", 5)); final RyaDetails details1 = builder.build(); final RyaDetails details2 = builder.build(); @@ -96,7 +98,8 @@ public class RyaDetailsTest { .setId("pcj 2") .setUpdateStrategy(PCJUpdateStrategy.INCREMENTAL))) .setProspectorDetails( new ProspectorDetails(Optional.of(new Date())) ) - .setJoinSelectivityDetails( new JoinSelectivityDetails(Optional.of(new Date())) ); + .setJoinSelectivityDetails( new JoinSelectivityDetails(Optional.of(new Date())) ) + .setRyaStreamsDetails(new RyaStreamsDetails("localhost", 5)); final RyaDetails details1 = builder.build(); final RyaDetails details2 = builder.build(); @@ -127,6 +130,7 @@ public class RyaDetailsTest { .setUpdateStrategy(PCJUpdateStrategy.INCREMENTAL))) .setProspectorDetails( new ProspectorDetails(Optional.of(new Date())) ) .setJoinSelectivityDetails( new JoinSelectivityDetails(Optional.of(new Date())) ) + .setRyaStreamsDetails(new RyaStreamsDetails("localhost", 5)) .build(); // Create a new Builder using another RyaDetails object. http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/59b20263/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/instance/MongoDetailsAdapter.java ---------------------------------------------------------------------- diff --git a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/instance/MongoDetailsAdapter.java b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/instance/MongoDetailsAdapter.java index 8010808..f86c150 100644 --- a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/instance/MongoDetailsAdapter.java +++ b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/instance/MongoDetailsAdapter.java @@ -32,10 +32,10 @@ import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails; import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails; import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails.PCJUpdateStrategy; import org.apache.rya.api.instance.RyaDetails.ProspectorDetails; +import org.apache.rya.api.instance.RyaDetails.RyaStreamsDetails; import org.apache.rya.api.instance.RyaDetails.TemporalIndexDetails; import com.google.common.base.Optional; -import com.google.common.base.Preconditions; import com.mongodb.BasicDBList; import com.mongodb.BasicDBObject; import com.mongodb.BasicDBObjectBuilder; @@ -48,7 +48,6 @@ import edu.umd.cs.findbugs.annotations.NonNull; * Serializes configuration details for use in Mongo. * The {@link DBObject} will look like: * <pre> - * {@code * { * "instanceName": <string>, * "version": <string>?, @@ -68,6 +67,10 @@ import edu.umd.cs.findbugs.annotations.NonNull; * "freeTextDetails": <boolean>, * "prospectorDetails": <date>, * "joinSelectivityDetails": <date> + * "ryaStreamsDetails": { + * "hostname": <string> + * "port": <int> + * } * } * </pre> */ @@ -91,13 +94,19 @@ public class MongoDetailsAdapter { public static final String PROSPECTOR_DETAILS_KEY = "prospectorDetails"; public static final String JOIN_SELECTIVITY_DETAILS_KEY = "joinSelectivitiyDetails"; + public static final String RYA_STREAMS_DETAILS_KEY = "ryaStreamsDetails"; + public static final String RYA_STREAMS_HOSTNAME_KEY = "hostname"; + public static final String RYA_STREAMS_PORT_KEY = "port"; + /** - * Serializes {@link RyaDetails} to mongo {@link DBObject}. - * @param details - The details to be serialized. - * @return The mongo {@link DBObject}. + * Converts a {@link RyaDetails} object into its MongoDB {@link DBObject} equivalent. + * + * @param details - The details to convert. (not null) + * @return The MongoDB {@link DBObject} equivalent. */ public static BasicDBObject toDBObject(final RyaDetails details) { - Preconditions.checkNotNull(details); + requireNonNull(details); + final BasicDBObjectBuilder builder = BasicDBObjectBuilder.start() .add(INSTANCE_KEY, details.getRyaInstanceName()) .add(VERSION_KEY, details.getRyaVersion()) @@ -106,12 +115,29 @@ public class MongoDetailsAdapter { .add(PCJ_DETAILS_KEY, toDBObject(details.getPCJIndexDetails())) .add(TEMPORAL_DETAILS_KEY, details.getTemporalIndexDetails().isEnabled()) .add(FREETEXT_DETAILS_KEY, details.getFreeTextIndexDetails().isEnabled()); + if(details.getProspectorDetails().getLastUpdated().isPresent()) { builder.add(PROSPECTOR_DETAILS_KEY, details.getProspectorDetails().getLastUpdated().get()); } + if(details.getJoinSelectivityDetails().getLastUpdated().isPresent()) { builder.add(JOIN_SELECTIVITY_DETAILS_KEY, details.getJoinSelectivityDetails().getLastUpdated().get()); } + + // If the Rya Streams Details are present, then add them. + if(details.getRyaStreamsDetails().isPresent()) { + final RyaStreamsDetails ryaStreamsDetails = details.getRyaStreamsDetails().get(); + + // The embedded object that holds onto the fields. + final DBObject ryaStreamsFields = BasicDBObjectBuilder.start() + .add(RYA_STREAMS_HOSTNAME_KEY, ryaStreamsDetails.getHostname()) + .add(RYA_STREAMS_PORT_KEY, ryaStreamsDetails.getPort()) + .get(); + + // Add them to the main builder. + builder.add(RYA_STREAMS_DETAILS_KEY, ryaStreamsFields); + } + return (BasicDBObject) builder.get(); } @@ -154,20 +180,38 @@ public class MongoDetailsAdapter { return builder.get(); } + /** + * Converts a MongoDB {@link DBObject} into its {@link RyaDetails} equivalent. + * + * @param mongoObj - The MongoDB object to convert. (not null) + * @return The equivalent {@link RyaDetails} object. + * @throws MalformedRyaDetailsException The MongoDB object could not be converted. + */ public static RyaDetails toRyaDetails(final DBObject mongoObj) throws MalformedRyaDetailsException { + requireNonNull(mongoObj); final BasicDBObject basicObj = (BasicDBObject) mongoObj; try { - return RyaDetails.builder() - .setRyaInstanceName(basicObj.getString(INSTANCE_KEY)) - .setRyaVersion(basicObj.getString(VERSION_KEY)) - .setEntityCentricIndexDetails(new EntityCentricIndexDetails(basicObj.getBoolean(ENTITY_DETAILS_KEY))) - //RYA-215 .setGeoIndexDetails(new GeoIndexDetails(basicObj.getBoolean(GEO_DETAILS_KEY))) - .setPCJIndexDetails(getPCJIndexDetails(basicObj)) - .setTemporalIndexDetails(new TemporalIndexDetails(basicObj.getBoolean(TEMPORAL_DETAILS_KEY))) - .setFreeTextDetails(new FreeTextIndexDetails(basicObj.getBoolean(FREETEXT_DETAILS_KEY))) - .setProspectorDetails(new ProspectorDetails(Optional.<Date>fromNullable(basicObj.getDate(PROSPECTOR_DETAILS_KEY)))) - .setJoinSelectivityDetails(new JoinSelectivityDetails(Optional.<Date>fromNullable(basicObj.getDate(JOIN_SELECTIVITY_DETAILS_KEY)))) - .build(); + final RyaDetails.Builder builder = RyaDetails.builder() + .setRyaInstanceName(basicObj.getString(INSTANCE_KEY)) + .setRyaVersion(basicObj.getString(VERSION_KEY)) + .setEntityCentricIndexDetails(new EntityCentricIndexDetails(basicObj.getBoolean(ENTITY_DETAILS_KEY))) + //RYA-215 .setGeoIndexDetails(new GeoIndexDetails(basicObj.getBoolean(GEO_DETAILS_KEY))) + .setPCJIndexDetails(getPCJIndexDetails(basicObj)) + .setTemporalIndexDetails(new TemporalIndexDetails(basicObj.getBoolean(TEMPORAL_DETAILS_KEY))) + .setFreeTextDetails(new FreeTextIndexDetails(basicObj.getBoolean(FREETEXT_DETAILS_KEY))) + .setProspectorDetails(new ProspectorDetails(Optional.<Date>fromNullable(basicObj.getDate(PROSPECTOR_DETAILS_KEY)))) + .setJoinSelectivityDetails(new JoinSelectivityDetails(Optional.<Date>fromNullable(basicObj.getDate(JOIN_SELECTIVITY_DETAILS_KEY)))); + + // If the Rya Streams Details are present, then add them. + if(basicObj.containsField(RYA_STREAMS_DETAILS_KEY)) { + final BasicDBObject streamsObject = (BasicDBObject) basicObj.get(RYA_STREAMS_DETAILS_KEY); + final String hostname = streamsObject.getString(RYA_STREAMS_HOSTNAME_KEY); + final int port = streamsObject.getInt(RYA_STREAMS_PORT_KEY); + builder.setRyaStreamsDetails(new RyaStreamsDetails(hostname, port)); + } + + return builder.build(); + } catch(final Exception e) { throw new MalformedRyaDetailsException("Failed to make RyaDetail from Mongo Object, it is malformed.", e); } @@ -213,14 +257,15 @@ public class MongoDetailsAdapter { } /** - * Exception thrown when a MongoDB {@link DBObject} is malformed when attemptin - * to adapt it into a {@link RyaDetails}. + * Indicates a MongoDB {@link DBObject} was malformed when attempting + * to convert it into a {@link RyaDetails} object. */ public static class MalformedRyaDetailsException extends Exception { private static final long serialVersionUID = 1L; /** - * Creates a new {@link MalformedRyaDetailsException} + * Creates a new {@link MalformedRyaDetailsException}. + * * @param message - The message to be displayed by the exception. * @param e - The source cause of the exception. */ @@ -228,4 +273,4 @@ public class MongoDetailsAdapter { super(message, e); } } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/59b20263/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/instance/MongoDetailsAdapterTest.java ---------------------------------------------------------------------- diff --git a/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/instance/MongoDetailsAdapterTest.java b/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/instance/MongoDetailsAdapterTest.java index 0ea9456..f5845c2 100644 --- a/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/instance/MongoDetailsAdapterTest.java +++ b/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/instance/MongoDetailsAdapterTest.java @@ -32,6 +32,7 @@ import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails.FluoDetails; import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails; import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails.PCJUpdateStrategy; import org.apache.rya.api.instance.RyaDetails.ProspectorDetails; +import org.apache.rya.api.instance.RyaDetails.RyaStreamsDetails; import org.apache.rya.api.instance.RyaDetails.TemporalIndexDetails; import org.apache.rya.mongodb.instance.MongoDetailsAdapter.MalformedRyaDetailsException; import org.junit.Test; @@ -72,6 +73,7 @@ public class MongoDetailsAdapterTest { .setFreeTextDetails(new FreeTextIndexDetails(true)) .setProspectorDetails(new ProspectorDetails(Optional.fromNullable(new Date(0L)))) .setJoinSelectivityDetails(new JoinSelectivityDetails(Optional.fromNullable(new Date(1L)))) + .setRyaStreamsDetails(new RyaStreamsDetails("localhost", 6)) .build(); final BasicDBObject actual = MongoDetailsAdapter.toDBObject(details); @@ -100,7 +102,8 @@ public class MongoDetailsAdapterTest { + "temporalDetails : true," + "freeTextDetails : true," + "prospectorDetails : { $date : \"1970-01-01T00:00:00.000Z\"}," - + "joinSelectivitiyDetails : { $date : \"1970-01-01T00:00:00.001Z\"}" + + "joinSelectivitiyDetails : { $date : \"1970-01-01T00:00:00.001Z\"}," + + "ryaStreamsDetails : { hostname : \"localhost\" , port : 6}" + "}" ); @@ -134,7 +137,8 @@ public class MongoDetailsAdapterTest { + "temporalDetails : true," + "freeTextDetails : true," + "prospectorDetails : { $date : \"1970-01-01T00:00:00.000Z\"}," - + "joinSelectivitiyDetails : { $date : \"1970-01-01T00:00:00.001Z\"}" + + "joinSelectivitiyDetails : { $date : \"1970-01-01T00:00:00.001Z\"}," + + "ryaStreamsDetails : { hostname : \"localhost\" , port : 6}" + "}" ); @@ -163,6 +167,7 @@ public class MongoDetailsAdapterTest { .setFreeTextDetails(new FreeTextIndexDetails(true)) .setProspectorDetails(new ProspectorDetails(Optional.<Date>fromNullable(new Date(0L)))) .setJoinSelectivityDetails(new JoinSelectivityDetails(Optional.<Date>fromNullable(new Date(1L)))) + .setRyaStreamsDetails(new RyaStreamsDetails("localhost", 6)) .build(); assertEquals(expected, actual); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/59b20263/extras/indexing/src/main/java/org/apache/rya/api/client/SetRyaStreamsConfigurationBase.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/org/apache/rya/api/client/SetRyaStreamsConfigurationBase.java b/extras/indexing/src/main/java/org/apache/rya/api/client/SetRyaStreamsConfigurationBase.java new file mode 100644 index 0000000..92a4c44 --- /dev/null +++ b/extras/indexing/src/main/java/org/apache/rya/api/client/SetRyaStreamsConfigurationBase.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.api.client; + +import static java.util.Objects.requireNonNull; + +import org.apache.rya.api.instance.RyaDetails; +import org.apache.rya.api.instance.RyaDetails.RyaStreamsDetails; +import org.apache.rya.api.instance.RyaDetailsRepository; +import org.apache.rya.api.instance.RyaDetailsRepository.RyaDetailsRepositoryException; +import org.apache.rya.api.instance.RyaDetailsUpdater; +import org.apache.rya.api.instance.RyaDetailsUpdater.RyaDetailsMutator.CouldNotApplyMutationException; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * A base class that implements the core functionality of the {@link SetRyaStreamsConfiguration} interactor. + * Subclasses just need to implement {@link #getRyaDetailsRepo(String)} so that the common code may update + * any implementation of that repository. + */ +@DefaultAnnotation(NonNull.class) +public abstract class SetRyaStreamsConfigurationBase implements SetRyaStreamsConfiguration { + + private final InstanceExists instanceExists; + + /** + * Constructs an instance of {@link SetRyaStreamsConfigurationBase}. + * + * @param instanceExists - The interactor used to verify Rya instances exist. (not null) + */ + public SetRyaStreamsConfigurationBase(final InstanceExists instanceExists) { + this.instanceExists = requireNonNull(instanceExists); + } + + /** + * Get a {@link RyaDetailsRepository} that is connected to a specific instance of Rya. + * + * @param ryaInstance - The Rya instance the repository must be connected to. (not null) + * @return A {@link RyaDetailsRepository} connected to the specified Rya instance. + */ + protected abstract RyaDetailsRepository getRyaDetailsRepo(String ryaInstance); + + @Override + public void setRyaStreamsConfiguration(final String ryaInstance, final RyaStreamsDetails streamsDetails) throws InstanceDoesNotExistException, RyaClientException{ + requireNonNull(ryaInstance); + requireNonNull(streamsDetails); + + // Verify the Rya Instance exists. + if(!instanceExists.exists(ryaInstance)) { + throw new InstanceDoesNotExistException("There is no Rya instance named '" + ryaInstance + "' in this storage."); + } + + // Update the old details object using the provided Rya Streams details. + final RyaDetailsRepository repo = getRyaDetailsRepo(ryaInstance); + try { + new RyaDetailsUpdater(repo).update(oldDetails -> { + final RyaDetails.Builder builder = RyaDetails.builder(oldDetails); + builder.setRyaStreamsDetails(streamsDetails); + return builder.build(); + }); + } catch (CouldNotApplyMutationException | RyaDetailsRepositoryException e) { + throw new RyaClientException("Unable to update which Rya Streams subsystem is used by the '" + + ryaInstance + "' Rya instance.", e); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/59b20263/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloRyaClientFactory.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloRyaClientFactory.java b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloRyaClientFactory.java index 17fddaa..fdabea9 100644 --- a/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloRyaClientFactory.java +++ b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloRyaClientFactory.java @@ -23,6 +23,7 @@ import static java.util.Objects.requireNonNull; import java.util.Optional; import org.apache.accumulo.core.client.Connector; +import org.apache.rya.api.client.InstanceExists; import org.apache.rya.api.client.RyaClient; import edu.umd.cs.findbugs.annotations.DefaultAnnotation; @@ -50,6 +51,8 @@ public class AccumuloRyaClientFactory { requireNonNull(connector); // Build the RyaCommands option with the initialized commands. + final InstanceExists instanceExists = new AccumuloInstanceExists(connectionDetails, connector); + return new RyaClient( new AccumuloInstall(connectionDetails, connector), new AccumuloCreatePCJ(connectionDetails, connector), @@ -59,10 +62,11 @@ public class AccumuloRyaClientFactory { Optional.of(new AccumuloListIncrementalQueries(connectionDetails, connector)), new AccumuloBatchUpdatePCJ(connectionDetails, connector), new AccumuloGetInstanceDetails(connectionDetails, connector), - new AccumuloInstanceExists(connectionDetails, connector), + instanceExists, new AccumuloListInstances(connectionDetails, connector), Optional.of(new AccumuloAddUser(connectionDetails, connector)), Optional.of(new AccumuloRemoveUser(connectionDetails, connector)), + new AccumuloSetRyaStreamsConfiguration(instanceExists, connector), new AccumuloUninstall(connectionDetails, connector), new AccumuloLoadStatements(connectionDetails, connector), new AccumuloLoadStatementsFile(connectionDetails, connector), http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/59b20263/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloSetRyaStreamsConfiguration.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloSetRyaStreamsConfiguration.java b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloSetRyaStreamsConfiguration.java new file mode 100644 index 0000000..f19d9fb --- /dev/null +++ b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloSetRyaStreamsConfiguration.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.api.client.accumulo; + +import static java.util.Objects.requireNonNull; + +import org.apache.accumulo.core.client.Connector; +import org.apache.rya.accumulo.instance.AccumuloRyaInstanceDetailsRepository; +import org.apache.rya.api.client.InstanceExists; +import org.apache.rya.api.client.SetRyaStreamsConfiguration; +import org.apache.rya.api.client.SetRyaStreamsConfigurationBase; +import org.apache.rya.api.instance.RyaDetailsRepository; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * An Accumulo implementation of {@link SetRyaStreamsConfiguration}. + */ +@DefaultAnnotation(NonNull.class) +public class AccumuloSetRyaStreamsConfiguration extends SetRyaStreamsConfigurationBase { + + private final Connector connector; + + /** + * Constructs an instance of {@link AccumuloSetRyaStreamsConfiguration}. + * + * @param instanceExists - The interactor used to verify Rya instances exist. (not null) + * @param connector - Provides programmatic access to the instance of Accumulo that hosts Rya instance. (not null) + */ + public AccumuloSetRyaStreamsConfiguration( + final InstanceExists instanceExists, + final Connector connector) { + super(instanceExists); + this.connector = requireNonNull(connector); + } + + @Override + protected RyaDetailsRepository getRyaDetailsRepo(final String ryaInstance) { + requireNonNull(ryaInstance); + return new AccumuloRyaInstanceDetailsRepository(connector, ryaInstance); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/59b20263/extras/indexing/src/main/java/org/apache/rya/api/client/mongo/MongoRyaClientFactory.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/org/apache/rya/api/client/mongo/MongoRyaClientFactory.java b/extras/indexing/src/main/java/org/apache/rya/api/client/mongo/MongoRyaClientFactory.java index fbbec2a..5fa4877 100644 --- a/extras/indexing/src/main/java/org/apache/rya/api/client/mongo/MongoRyaClientFactory.java +++ b/extras/indexing/src/main/java/org/apache/rya/api/client/mongo/MongoRyaClientFactory.java @@ -66,6 +66,7 @@ public class MongoRyaClientFactory { new MongoListInstances(adminClient), Optional.empty(), Optional.empty(), + new MongoSetRyaStreamsConfiguration(instanceExists, adminClient), new MongoUninstall(adminClient, instanceExists), new MongoLoadStatements(connectionDetails, instanceExists), new MongoLoadStatementsFile(connectionDetails, instanceExists), http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/59b20263/extras/indexing/src/main/java/org/apache/rya/api/client/mongo/MongoSetRyaStreamsConfiguration.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/org/apache/rya/api/client/mongo/MongoSetRyaStreamsConfiguration.java b/extras/indexing/src/main/java/org/apache/rya/api/client/mongo/MongoSetRyaStreamsConfiguration.java new file mode 100644 index 0000000..592e663 --- /dev/null +++ b/extras/indexing/src/main/java/org/apache/rya/api/client/mongo/MongoSetRyaStreamsConfiguration.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.api.client.mongo; + +import static java.util.Objects.requireNonNull; + +import org.apache.rya.api.client.InstanceExists; +import org.apache.rya.api.client.SetRyaStreamsConfiguration; +import org.apache.rya.api.client.SetRyaStreamsConfigurationBase; +import org.apache.rya.api.instance.RyaDetailsRepository; +import org.apache.rya.mongodb.instance.MongoRyaInstanceDetailsRepository; + +import com.mongodb.MongoClient; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * A MongoDB implementation of {@link SetRyaStreamsConfiguration}. + */ +@DefaultAnnotation(NonNull.class) +public class MongoSetRyaStreamsConfiguration extends SetRyaStreamsConfigurationBase { + + private final MongoClient client; + + /** + * Constructs an instance of {@link MongoSetRyaStreamsConfiguration}. + * + * @param instanceExists - The interactor used to verify Rya instances exist. (not null) + * @param client - The MongoDB client used to connect to the Rya storage. (not null) + */ + public MongoSetRyaStreamsConfiguration( + final InstanceExists instanceExists, + final MongoClient client) { + super(instanceExists); + this.client = requireNonNull(client); + } + + @Override + protected RyaDetailsRepository getRyaDetailsRepo(final String ryaInstance) { + requireNonNull(ryaInstance); + return new MongoRyaInstanceDetailsRepository(client, ryaInstance); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/59b20263/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloSetRyaStreamsConfigurationIT.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloSetRyaStreamsConfigurationIT.java b/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloSetRyaStreamsConfigurationIT.java new file mode 100644 index 0000000..928a29e --- /dev/null +++ b/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloSetRyaStreamsConfigurationIT.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.api.client.accumulo; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +import org.apache.rya.accumulo.AccumuloITBase; +import org.apache.rya.api.client.Install; +import org.apache.rya.api.client.Install.InstallConfiguration; +import org.apache.rya.api.client.InstanceDoesNotExistException; +import org.apache.rya.api.client.RyaClient; +import org.apache.rya.api.instance.RyaDetails.RyaStreamsDetails; +import org.junit.Test; + +/** + * Integration tests the methods of {@link AccumuloSetRyaStreamsConfiguration}. + */ +public class AccumuloSetRyaStreamsConfigurationIT extends AccumuloITBase { + + @Test(expected = InstanceDoesNotExistException.class) + public void instanceDoesNotExist() throws Exception { + final String ryaInstance = getRyaInstanceName(); + final AccumuloConnectionDetails connectionDetails = new AccumuloConnectionDetails( + getUsername(), + getPassword().toCharArray(), + getInstanceName(), + getZookeepers()); + final RyaClient ryaClient = AccumuloRyaClientFactory.build(connectionDetails, getConnector()); + + // Skip the install step to create error causing situation. + final RyaStreamsDetails details = new RyaStreamsDetails("localhost", 6); + ryaClient.getSetRyaStreamsConfiguration().setRyaStreamsConfiguration(ryaInstance, details); + } + + @Test + public void updatesRyaDetails() throws Exception { + final String ryaInstance = getRyaInstanceName(); + final AccumuloConnectionDetails connectionDetails = new AccumuloConnectionDetails( + getUsername(), + getPassword().toCharArray(), + getInstanceName(), + getZookeepers()); + final RyaClient ryaClient = AccumuloRyaClientFactory.build(connectionDetails, getConnector()); + + // Install an instance of Rya. + final Install installRya = ryaClient.getInstall(); + final InstallConfiguration installConf = InstallConfiguration.builder() + .build(); + installRya.install(ryaInstance, installConf); + + // Fetch its details and show they do not have any RyaStreamsDetails. + com.google.common.base.Optional<RyaStreamsDetails> streamsDetails = + ryaClient.getGetInstanceDetails().getDetails(ryaInstance).get().getRyaStreamsDetails(); + assertFalse(streamsDetails.isPresent()); + + // Set the details. + final RyaStreamsDetails details = new RyaStreamsDetails("localhost", 6); + ryaClient.getSetRyaStreamsConfiguration().setRyaStreamsConfiguration(ryaInstance, details); + + // Fetch its details again and show that they are now filled in. + streamsDetails = ryaClient.getGetInstanceDetails().getDetails(ryaInstance).get().getRyaStreamsDetails(); + assertEquals(details, streamsDetails.get()); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/59b20263/extras/indexing/src/test/java/org/apache/rya/api/client/mongo/MongoSetRyaStreamsConfigurationIT.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/test/java/org/apache/rya/api/client/mongo/MongoSetRyaStreamsConfigurationIT.java b/extras/indexing/src/test/java/org/apache/rya/api/client/mongo/MongoSetRyaStreamsConfigurationIT.java new file mode 100644 index 0000000..5fea578 --- /dev/null +++ b/extras/indexing/src/test/java/org/apache/rya/api/client/mongo/MongoSetRyaStreamsConfigurationIT.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.api.client.mongo; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +import java.util.Optional; + +import org.apache.rya.api.client.Install; +import org.apache.rya.api.client.Install.InstallConfiguration; +import org.apache.rya.api.client.InstanceDoesNotExistException; +import org.apache.rya.api.client.RyaClient; +import org.apache.rya.api.instance.RyaDetails.RyaStreamsDetails; +import org.apache.rya.mongodb.MongoITBase; +import org.junit.Test; + +/** + * Integration tests the methods of {@link MongoSetRyaStreamsConfiguration}. + */ +public class MongoSetRyaStreamsConfigurationIT extends MongoITBase { + + @Test(expected = InstanceDoesNotExistException.class) + public void instanceDoesNotExist() throws Exception { + final RyaClient ryaClient = MongoRyaClientFactory.build(getConnectionDetails(), getMongoClient()); + + // Skip the install step to create error causing situation. + final String ryaInstance = conf.getRyaInstanceName(); + final RyaStreamsDetails details = new RyaStreamsDetails("localhost", 6); + ryaClient.getSetRyaStreamsConfiguration().setRyaStreamsConfiguration(ryaInstance, details); + } + + @Test + public void updatesRyaDetails() throws Exception { + final RyaClient ryaClient = MongoRyaClientFactory.build(getConnectionDetails(), getMongoClient()); + + // Install an instance of Rya. + final String ryaInstance = conf.getRyaInstanceName(); + final Install installRya = ryaClient.getInstall(); + final InstallConfiguration installConf = InstallConfiguration.builder() + .build(); + installRya.install(ryaInstance, installConf); + + // Fetch its details and show they do not have any RyaStreamsDetails. + com.google.common.base.Optional<RyaStreamsDetails> streamsDetails = + ryaClient.getGetInstanceDetails().getDetails(ryaInstance).get().getRyaStreamsDetails(); + assertFalse(streamsDetails.isPresent()); + + // Set the details. + final RyaStreamsDetails details = new RyaStreamsDetails("localhost", 6); + ryaClient.getSetRyaStreamsConfiguration().setRyaStreamsConfiguration(ryaInstance, details); + + // Fetch its details again and show that they are now filled in. + streamsDetails = ryaClient.getGetInstanceDetails().getDetails(ryaInstance).get().getRyaStreamsDetails(); + assertEquals(details, streamsDetails.get()); + } + + private MongoConnectionDetails getConnectionDetails() { + final Optional<char[]> password = conf.getMongoPassword() != null ? + Optional.of(conf.getMongoPassword().toCharArray()) : + Optional.empty(); + + return new MongoConnectionDetails( + conf.getMongoHostname(), + Integer.parseInt(conf.getMongoPort()), + Optional.ofNullable(conf.getMongoUser()), + password); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/59b20263/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/RyaStreamsClient.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/RyaStreamsClient.java b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/RyaStreamsClient.java new file mode 100644 index 0000000..ee86e41 --- /dev/null +++ b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/RyaStreamsClient.java @@ -0,0 +1,137 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.api; + +import static java.util.Objects.requireNonNull; + +import org.apache.rya.api.model.VisibilityBindingSet; +import org.apache.rya.api.model.VisibilityStatement; +import org.apache.rya.streams.api.interactor.AddQuery; +import org.apache.rya.streams.api.interactor.DeleteQuery; +import org.apache.rya.streams.api.interactor.GetQuery; +import org.apache.rya.streams.api.interactor.GetQueryResultStream; +import org.apache.rya.streams.api.interactor.ListQueries; +import org.apache.rya.streams.api.interactor.StartQuery; +import org.apache.rya.streams.api.interactor.StopQuery; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * Provides access to a set of Rya Streams functions.Statement + */ +@DefaultAnnotation(NonNull.class) +public class RyaStreamsClient implements AutoCloseable { + + private final AddQuery addQuery; + private final GetQuery getQuery; + private final DeleteQuery deleteQuery; + private final GetQueryResultStream<VisibilityStatement> getStatementResultStream; + private final GetQueryResultStream<VisibilityBindingSet> getBindingSetResultStream; + private final ListQueries listQueries; + private final StartQuery startQuery; + private final StopQuery stopQuery; + + /** + * Constructs an instance of {@link RyaStreamsClient}. + */ + public RyaStreamsClient( + final AddQuery addQuery, + final GetQuery getQuery, + final DeleteQuery deleteQuery, + final GetQueryResultStream<VisibilityStatement> getStatementResultStream, + final GetQueryResultStream<VisibilityBindingSet> getBindingSetResultStream, + final ListQueries listQueries, + final StartQuery startQuery, + final StopQuery stopQuery) { + this.addQuery = requireNonNull(addQuery); + this.getQuery = requireNonNull(getQuery); + this.deleteQuery = requireNonNull(deleteQuery); + this.getStatementResultStream = requireNonNull(getStatementResultStream); + this.getBindingSetResultStream = requireNonNull(getBindingSetResultStream); + this.listQueries = requireNonNull(listQueries); + this.startQuery = requireNonNull(startQuery); + this.stopQuery = requireNonNull(stopQuery); + } + + /** + * @return The connected {@link AddQuery} interactor. + */ + public AddQuery getAddQuery() { + return addQuery; + } + + /** + * @return The connected {@link GetQuery} interactor. + */ + public GetQuery getGetQuery() { + return getQuery; + } + + /** + * @return The connected {@link DeleteQuery} interactor. + */ + public DeleteQuery getDeleteQuery() { + return deleteQuery; + } + + /** + * @return The connected {@link GetQueryResultStream} interactor for a query that produces + * {@link VisibilityStatement}s. + */ + public GetQueryResultStream<VisibilityStatement> getGetStatementResultStream() { + return getStatementResultStream; + } + + /** + * @return The connected {@link GetQueryResultStream} interactor for a query that produces + * {@link VisibilityBindingSet}s. + */ + public GetQueryResultStream<VisibilityBindingSet> getGetBindingSetResultStream() { + return getBindingSetResultStream; + } + + /** + * @return The connected {@link ListQueries} interactor. + */ + public ListQueries getListQueries() { + return listQueries; + } + + /** + * @return The connected {@link StartQuery} interactor. + */ + public StartQuery getStartQuery() { + return startQuery; + } + + /** + * @return The connected {@link StopQuery} interactor. + */ + public StopQuery getStopQuery() { + return stopQuery; + } + + /** + * By defualt, this client doesn't close anything. If an implementation of the client + * requires closing components, then override this method. + */ + @Override + public void close() throws Exception { } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/59b20263/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/GetQuery.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/GetQuery.java b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/GetQuery.java new file mode 100644 index 0000000..1293714 --- /dev/null +++ b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/GetQuery.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.api.interactor; + +import java.util.Optional; +import java.util.UUID; + +import org.apache.rya.streams.api.entity.StreamsQuery; +import org.apache.rya.streams.api.exception.RyaStreamsException; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * Get a {@link StreamsQuery} from Rya Streams. + */ +@DefaultAnnotation(NonNull.class) +public interface GetQuery { + + /** + * Get a {@link StreamsQuery} from Rya Streams. + * + * @param queryId - Identifies the query to fetch. (not null) + * @return The {@link StreamsQuery} for the {@code queryId}; if one is stored for the ID. + * @throws RyaStreamsException The query could not be fetched. + */ + public Optional<StreamsQuery> getQuery(UUID queryId) throws RyaStreamsException; +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/59b20263/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/defaults/DefaultGetQuery.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/defaults/DefaultGetQuery.java b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/defaults/DefaultGetQuery.java new file mode 100644 index 0000000..14b93ab --- /dev/null +++ b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/defaults/DefaultGetQuery.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.api.interactor.defaults; + +import static java.util.Objects.requireNonNull; + +import java.util.Optional; +import java.util.UUID; + +import org.apache.rya.streams.api.entity.StreamsQuery; +import org.apache.rya.streams.api.exception.RyaStreamsException; +import org.apache.rya.streams.api.interactor.GetQuery; +import org.apache.rya.streams.api.queries.QueryRepository; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * Get a {@link StreamsQuery} from Rya Streams. + */ +@DefaultAnnotation(NonNull.class) +public class DefaultGetQuery implements GetQuery { + private final QueryRepository repository; + + /** + * Constructs an instance of {@link DefaultGetQuery}. + * + * @param repository - The {@link QueryRepository} to get queries from. (not null) + */ + public DefaultGetQuery(final QueryRepository repository) { + this.repository = requireNonNull(repository); + } + + @Override + public Optional<StreamsQuery> getQuery(final UUID queryId) throws RyaStreamsException { + requireNonNull(queryId); + return repository.get(queryId); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/59b20263/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaRyaStreamsClientFactory.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaRyaStreamsClientFactory.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaRyaStreamsClientFactory.java new file mode 100644 index 0000000..9250d9d --- /dev/null +++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaRyaStreamsClientFactory.java @@ -0,0 +1,170 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.kafka; + +import static java.util.Objects.requireNonNull; + +import java.util.Properties; +import java.util.UUID; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.rya.streams.api.RyaStreamsClient; +import org.apache.rya.streams.api.interactor.defaults.DefaultAddQuery; +import org.apache.rya.streams.api.interactor.defaults.DefaultDeleteQuery; +import org.apache.rya.streams.api.interactor.defaults.DefaultGetQuery; +import org.apache.rya.streams.api.interactor.defaults.DefaultListQueries; +import org.apache.rya.streams.api.interactor.defaults.DefaultStartQuery; +import org.apache.rya.streams.api.interactor.defaults.DefaultStopQuery; +import org.apache.rya.streams.api.queries.InMemoryQueryRepository; +import org.apache.rya.streams.api.queries.QueryChange; +import org.apache.rya.streams.api.queries.QueryChangeLog; +import org.apache.rya.streams.api.queries.QueryRepository; +import org.apache.rya.streams.kafka.interactor.KafkaGetQueryResultStream; +import org.apache.rya.streams.kafka.queries.KafkaQueryChangeLog; +import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer; +import org.apache.rya.streams.kafka.serialization.VisibilityStatementDeserializer; +import org.apache.rya.streams.kafka.serialization.queries.QueryChangeDeserializer; +import org.apache.rya.streams.kafka.serialization.queries.QueryChangeSerializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * Constructs instances of {@link RyaStreamsClient} that are connected to a Kafka cluster. + */ +@DefaultAnnotation(NonNull.class) +public final class KafkaRyaStreamsClientFactory { + private static final Logger log = LoggerFactory.getLogger(KafkaRyaStreamsClientFactory.class); + + /** + * Initialize a {@link RyaStreamsClient} that will interact with an instance of Rya Streams + * that is backed by Kafka. + * + * @param ryaInstance - The name of the Rya Instance the client is connected to. (not null) + * @param kafkaHostname - The hostname of the Kafka Broker. + * @param kafkaPort - The port of the Kafka Broker. + * @return The initialized commands. + */ + public static RyaStreamsClient make( + final String ryaInstance, + final String kafkaHostname, + final int kafkaPort) { + requireNonNull(ryaInstance); + requireNonNull(kafkaHostname); + + // Setup Query Repository used by the Kafka Rya Streams subsystem. + final Producer<?, QueryChange> queryProducer = + makeProducer(kafkaHostname, kafkaPort, StringSerializer.class, QueryChangeSerializer.class); + final Consumer<?, QueryChange>queryConsumer = + fromStartConsumer(kafkaHostname, kafkaPort, StringDeserializer.class, QueryChangeDeserializer.class); + final String changeLogTopic = KafkaTopics.queryChangeLogTopic(ryaInstance); + final QueryChangeLog changeLog = new KafkaQueryChangeLog(queryProducer, queryConsumer, changeLogTopic); + final QueryRepository queryRepo = new InMemoryQueryRepository(changeLog); + + // Create the Rya Streams client that is backed by a Kafka Query Change Log. + return new RyaStreamsClient( + new DefaultAddQuery(queryRepo), + new DefaultGetQuery(queryRepo), + new DefaultDeleteQuery(queryRepo), + new KafkaGetQueryResultStream<>(kafkaHostname, "" + kafkaPort, VisibilityStatementDeserializer.class), + new KafkaGetQueryResultStream<>(kafkaHostname, "" + kafkaPort, VisibilityBindingSetDeserializer.class), + new DefaultListQueries(queryRepo), + new DefaultStartQuery(queryRepo), + new DefaultStopQuery(queryRepo)) { + + /** + * Close the QueryRepository used by the returned client. + */ + @Override + public void close() { + try { + queryRepo.close(); + } catch (final Exception e) { + log.warn("Couldn't close a QueryRepository.", e); + } + } + }; + } + + /** + * Create a {@link Producer} that is able to write to a topic in Kafka. + * + * @param kafkaHostname - The Kafka broker hostname. (not null) + * @param kafkaPort - The Kafka broker port. + * @param keySerializerClass - Serializes the keys. (not null) + * @param valueSerializerClass - Serializes the values. (not null) + * @return A {@link Producer} that can be used to write records to a topic. + */ + private static <K, V> Producer<K, V> makeProducer( + final String kafkaHostname, + final int kakfaPort, + final Class<? extends Serializer<K>> keySerializerClass, + final Class<? extends Serializer<V>> valueSerializerClass) { + requireNonNull(kafkaHostname); + requireNonNull(keySerializerClass); + requireNonNull(valueSerializerClass); + + final Properties producerProps = new Properties(); + producerProps.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, kafkaHostname + ":" + kakfaPort); + producerProps.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializerClass.getName()); + producerProps.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializerClass.getName()); + return new KafkaProducer<>(producerProps); + } + + /** + * Create a {@link Consumer} that has a unique group ID and reads everything from a topic in Kafka + * starting at the earliest point by default. + * + * @param kafkaHostname - The Kafka broker hostname. (not null) + * @param kafkaPort - The Kafka broker port. + * @param keyDeserializerClass - Deserializes the keys. (not null) + * @param valueDeserializerClass - Deserializes the values. (not null) + * @return A {@link Consumer} that can be used to read records from a topic. + */ + private static <K, V> Consumer<K, V> fromStartConsumer( + final String kafkaHostname, + final int kakfaPort, + final Class<? extends Deserializer<K>> keyDeserializerClass, + final Class<? extends Deserializer<V>> valueDeserializerClass) { + requireNonNull(kafkaHostname); + requireNonNull(keyDeserializerClass); + requireNonNull(valueDeserializerClass); + + final Properties consumerProps = new Properties(); + consumerProps.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, kafkaHostname + ":" + kakfaPort); + consumerProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString()); + consumerProps.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString()); + consumerProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + consumerProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializerClass.getName()); + consumerProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializerClass.getName()); + return new KafkaConsumer<>(consumerProps); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/59b20263/extras/shell/pom.xml ---------------------------------------------------------------------- diff --git a/extras/shell/pom.xml b/extras/shell/pom.xml index 1a07400..fcca909 100644 --- a/extras/shell/pom.xml +++ b/extras/shell/pom.xml @@ -59,6 +59,10 @@ <artifactId>rya.pcj.fluo.api</artifactId> </dependency> <dependency> + <groupId>org.apache.rya</groupId> + <artifactId>rya.streams.kafka</artifactId> + </dependency> + <dependency> <groupId>org.apache.fluo</groupId> <artifactId>fluo-core</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/59b20263/extras/shell/src/main/java/org/apache/rya/shell/RyaConnectionCommands.java ---------------------------------------------------------------------- diff --git a/extras/shell/src/main/java/org/apache/rya/shell/RyaConnectionCommands.java b/extras/shell/src/main/java/org/apache/rya/shell/RyaConnectionCommands.java index b6fddb0..b4168af 100644 --- a/extras/shell/src/main/java/org/apache/rya/shell/RyaConnectionCommands.java +++ b/extras/shell/src/main/java/org/apache/rya/shell/RyaConnectionCommands.java @@ -34,10 +34,15 @@ import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails; import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory; import org.apache.rya.api.client.mongo.MongoConnectionDetails; import org.apache.rya.api.client.mongo.MongoRyaClientFactory; +import org.apache.rya.api.instance.RyaDetails; +import org.apache.rya.api.instance.RyaDetails.RyaStreamsDetails; import org.apache.rya.shell.SharedShellState.ConnectionState; +import org.apache.rya.shell.SharedShellState.ShellState; import org.apache.rya.shell.SharedShellState.StorageType; import org.apache.rya.shell.util.ConnectorFactory; import org.apache.rya.shell.util.PasswordPrompt; +import org.apache.rya.streams.api.RyaStreamsClient; +import org.apache.rya.streams.kafka.KafkaRyaStreamsClientFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.shell.core.CommandMarker; import org.springframework.shell.core.annotation.CliAvailabilityIndicator; @@ -222,31 +227,58 @@ public class RyaConnectionCommands implements CommandMarker { @CliCommand(value = CONNECT_INSTANCE_CMD, help = "Connect to a specific Rya instance") public void connectToInstance( @CliOption(key = {"instance"}, mandatory = true, help = "The name of the Rya instance the shell will interact with.") - final String instance) { + final String ryaInstance) { + final RyaClient ryaClient = sharedState.getShellState().getConnectedCommands().get(); try { - final InstanceExists instanceExists = sharedState.getShellState().getConnectedCommands().get().getInstanceExists(); + final InstanceExists instanceExists = ryaClient.getInstanceExists(); // Make sure the requested instance exists. - if(!instanceExists.exists(instance)) { - throw new RuntimeException(String.format("'%s' does not match an existing Rya instance.", instance)); + if(!instanceExists.exists(ryaInstance)) { + throw new RuntimeException(String.format("'%s' does not match an existing Rya instance.", ryaInstance)); + } + + // Store the instance name in the shared state. + sharedState.connectedToInstance(ryaInstance); + + // If the Rya instance is configured to interact with Rya Streams, then connect the + // Rya Streams client to the shared state. + final com.google.common.base.Optional<RyaDetails> ryaDetails = ryaClient.getGetInstanceDetails().getDetails(ryaInstance); + if(ryaDetails.isPresent()) { + final com.google.common.base.Optional<RyaStreamsDetails> streamsDetails = ryaDetails.get().getRyaStreamsDetails(); + if(streamsDetails.isPresent()) { + final String kafkaHostname = streamsDetails.get().getHostname(); + final int kafkaPort = streamsDetails.get().getPort(); + final RyaStreamsClient streamsClient = KafkaRyaStreamsClientFactory.make(ryaInstance, kafkaHostname, kafkaPort); + sharedState.connectedToRyaStreams(streamsClient); + } } } catch(final RyaClientException e) { throw new RuntimeException("Could not connect to Rya instance. Reason: " + e.getMessage(), e); } - - // Store the instance name in the shared state. - sharedState.connectedToInstance(instance); } @CliCommand(value = DISCONNECT_COMMAND_NAME_CMD, help = "Disconnect the shell's Rya storage connection (Accumulo).") public void disconnect() { + final ShellState shellState = sharedState.getShellState(); + // If connected to Mongo, there is a client that needs to be closed. - final com.google.common.base.Optional<MongoClient> mongoAdminClient = sharedState.getShellState().getMongoAdminClient(); + final com.google.common.base.Optional<MongoClient> mongoAdminClient = shellState.getMongoAdminClient(); if(mongoAdminClient.isPresent()) { mongoAdminClient.get().close(); } + // If connected to Rya Streams, then close the associated resources. + final com.google.common.base.Optional<RyaStreamsClient> streamsClient = shellState.getRyaStreamsCommands(); + if(streamsClient.isPresent()) { + try { + streamsClient.get().close(); + } catch (final Exception e) { + System.err.print("Could not close the RyaStreamsClient."); + e.printStackTrace(); + } + } + // Update the shared state to disconnected. sharedState.disconnected(); } -} +} \ No newline at end of file
