http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/pruner/PeriodicQueryPruner.java ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/pruner/PeriodicQueryPruner.java b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/pruner/PeriodicQueryPruner.java index 97e3f22..516690e 100644 --- a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/pruner/PeriodicQueryPruner.java +++ b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/pruner/PeriodicQueryPruner.java @@ -26,9 +26,8 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.fluo.api.client.FluoClient; import org.apache.fluo.api.client.Snapshot; import org.apache.fluo.api.client.SnapshotBase; -import org.apache.fluo.api.data.Bytes; import org.apache.log4j.Logger; -import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; +import org.apache.rya.indexing.pcj.fluo.app.NodeType; import org.apache.rya.indexing.pcj.fluo.app.util.PeriodicQueryUtil; import org.apache.rya.periodic.notification.api.BinPruner; import org.apache.rya.periodic.notification.api.NodeBin; @@ -79,12 +78,12 @@ public class PeriodicQueryPruner implements BinPruner, Runnable { */ @Override public void pruneBindingSetBin(NodeBin nodeBin) { - String id = nodeBin.getNodeId(); + String pcjId = nodeBin.getNodeId(); long bin = nodeBin.getBin(); try(Snapshot sx = client.newSnapshot()) { - String queryId = sx.get(Bytes.of(id), FluoQueryColumns.PCJ_ID_QUERY_ID).toString(); + String queryId = NodeType.generateNewIdForType(NodeType.QUERY, pcjId); Set<String> fluoIds = getNodeIdsFromResultId(sx, queryId); - accPruner.pruneBindingSetBin(new NodeBin(id, bin)); + accPruner.pruneBindingSetBin(nodeBin); for(String fluoId: fluoIds) { fluoPruner.pruneBindingSetBin(new NodeBin(fluoId, bin)); }
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/recovery/PeriodicNotificationProvider.java ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/recovery/PeriodicNotificationProvider.java b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/recovery/PeriodicNotificationProvider.java index 27e06f0..69bd39c 100644 --- a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/recovery/PeriodicNotificationProvider.java +++ b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/recovery/PeriodicNotificationProvider.java @@ -35,6 +35,7 @@ import org.apache.rya.indexing.pcj.fluo.app.NodeType; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO; import org.apache.rya.indexing.pcj.fluo.app.query.PeriodicQueryMetadata; +import org.apache.rya.indexing.pcj.fluo.app.util.FluoQueryUtils; import org.apache.rya.periodic.notification.api.NotificationCoordinatorExecutor; import org.apache.rya.periodic.notification.coordinator.PeriodicNotificationCoordinatorExecutor; import org.apache.rya.periodic.notification.notification.CommandNotification; @@ -120,7 +121,7 @@ public class PeriodicNotificationProvider { id = getQueryIdFromPeriodicId(sx, sx.get(Bytes.of(nodeId), FluoQueryColumns.PERIODIC_QUERY_PARENT_NODE_ID).toString()); break; case QUERY: - id = sx.get(Bytes.of(nodeId), FluoQueryColumns.RYA_PCJ_ID).toString(); + id = FluoQueryUtils.convertFluoQueryIdToPcjId(nodeId); break; case AGGREGATION: id = getQueryIdFromPeriodicId(sx, sx.get(Bytes.of(nodeId), FluoQueryColumns.AGGREGATION_PARENT_NODE_ID).toString()); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/shell/src/main/java/org/apache/rya/shell/RyaAdminCommands.java ---------------------------------------------------------------------- diff --git a/extras/shell/src/main/java/org/apache/rya/shell/RyaAdminCommands.java b/extras/shell/src/main/java/org/apache/rya/shell/RyaAdminCommands.java index 9239dc7..8fd95d3 100644 --- a/extras/shell/src/main/java/org/apache/rya/shell/RyaAdminCommands.java +++ b/extras/shell/src/main/java/org/apache/rya/shell/RyaAdminCommands.java @@ -22,8 +22,11 @@ import static java.util.Objects.requireNonNull; import java.io.IOException; import java.util.Collections; +import java.util.HashSet; import java.util.List; +import java.util.Set; +import org.apache.rya.api.client.CreatePCJ.ExportStrategy; import org.apache.rya.api.client.GetInstanceDetails; import org.apache.rya.api.client.Install.DuplicateInstanceNameException; import org.apache.rya.api.client.Install.InstallConfiguration; @@ -280,7 +283,11 @@ public class RyaAdminCommands implements CommandMarker { } @CliCommand(value = CREATE_PCJ_CMD, help = "Creates and starts the maintenance of a new PCJ using a Fluo application.") - public String createPcj() { + public String createPcj( + @CliOption(key = {"exportToRya"}, mandatory = false, help = "Indicates that results for the query should be exported to a Rya PCJ table.") + boolean exportToRya, + @CliOption(key = {"exportToKafka"}, mandatory = false, help = "Indicates that results for the query should be exported to a Kafka Topic.") + boolean exportToKafka) { // Fetch the command that is connected to the store. final ShellState shellState = state.getShellState(); final RyaClient commands = shellState.getConnectedCommands().get(); @@ -290,8 +297,18 @@ public class RyaAdminCommands implements CommandMarker { // Prompt the user for the SPARQL. final Optional<String> sparql = sparqlPrompt.getSparql(); if (sparql.isPresent()) { + Set<ExportStrategy> strategies = new HashSet<>(); + if(exportToRya) { + strategies.add(ExportStrategy.RYA); + } + if(exportToKafka) { + strategies.add(ExportStrategy.KAFKA); + } + if(strategies.size() == 0) { + return "The user must specify at least one export strategy by setting either exportToRya or exportToKafka to true."; + } // Execute the command. - final String pcjId = commands.getCreatePCJ().createPCJ(ryaInstance, sparql.get()); + final String pcjId = commands.getCreatePCJ().createPCJ(ryaInstance, sparql.get(), strategies); // Return a message that indicates the ID of the newly created ID. return String.format("The PCJ has been created. Its ID is '%s'.", pcjId); } else { http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/shell/src/test/java/org/apache/rya/shell/RyaAdminCommandsTest.java ---------------------------------------------------------------------- diff --git a/extras/shell/src/test/java/org/apache/rya/shell/RyaAdminCommandsTest.java b/extras/shell/src/test/java/org/apache/rya/shell/RyaAdminCommandsTest.java index e3e8d98..cab34e9 100644 --- a/extras/shell/src/test/java/org/apache/rya/shell/RyaAdminCommandsTest.java +++ b/extras/shell/src/test/java/org/apache/rya/shell/RyaAdminCommandsTest.java @@ -28,6 +28,7 @@ import static org.mockito.Mockito.when; import java.io.IOException; import java.util.Date; import java.util.List; +import java.util.Set; import java.util.TimeZone; import org.apache.rya.api.client.AddUser; @@ -43,6 +44,7 @@ import org.apache.rya.api.client.RemoveUser; import org.apache.rya.api.client.RyaClient; import org.apache.rya.api.client.RyaClientException; import org.apache.rya.api.client.Uninstall; +import org.apache.rya.api.client.CreatePCJ.ExportStrategy; import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails; import org.apache.rya.api.instance.RyaDetails; import org.apache.rya.api.instance.RyaDetails.EntityCentricIndexDetails; @@ -61,6 +63,7 @@ import org.junit.Test; import com.google.common.base.Optional; import com.google.common.collect.Lists; +import com.google.common.collect.Sets; /** * Unit tests the methods of {@link RyaAdminCommands}. @@ -74,7 +77,8 @@ public class RyaAdminCommandsTest { final String sparql = "SELECT * WHERE { ?person <http://isA> ?noun }"; final String pcjId = "123412342"; final CreatePCJ mockCreatePCJ = mock(CreatePCJ.class); - when(mockCreatePCJ.createPCJ( eq(instanceName), eq(sparql) ) ).thenReturn( pcjId ); + final Set<ExportStrategy> strategies = Sets.newHashSet(ExportStrategy.RYA); + when(mockCreatePCJ.createPCJ( eq(instanceName), eq(sparql), eq(strategies) ) ).thenReturn( pcjId ); final RyaClient mockCommands = mock(RyaClient.class); when(mockCommands.getCreatePCJ()).thenReturn( mockCreatePCJ ); @@ -88,10 +92,10 @@ public class RyaAdminCommandsTest { // Execute the command. final RyaAdminCommands commands = new RyaAdminCommands(state, mock(InstallPrompt.class), mockSparqlPrompt, mock(UninstallPrompt.class)); - final String message = commands.createPcj(); + final String message = commands.createPcj(true, false); // Verify the values that were provided to the command were passed through to CreatePCJ. - verify(mockCreatePCJ).createPCJ(eq(instanceName), eq(sparql)); + verify(mockCreatePCJ).createPCJ(eq(instanceName), eq(sparql), eq(strategies)); // Verify a message is returned that explains what was created. final String expected = "The PCJ has been created. Its ID is '123412342'."; @@ -114,7 +118,7 @@ public class RyaAdminCommandsTest { // Execute the command. final RyaAdminCommands commands = new RyaAdminCommands(state, mock(InstallPrompt.class), mockSparqlPrompt, mock(UninstallPrompt.class)); - final String message = commands.createPcj(); + final String message = commands.createPcj(true, false); // Verify a message is returned that explains what was created. final String expected = "";
