Repository: incubator-rya Updated Branches: refs/heads/master b372ebcdb -> 8acd24b5e
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java index 1cf2825..c132ad4 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java @@ -54,6 +54,8 @@ import edu.umd.cs.findbugs.annotations.NonNull; /** * Reads and writes {@link FluoQuery} instances and their components to/from * a Fluo table. + * <p> + * Note, this class should be implemented in a thread-safe manner due to current usage. */ @DefaultAnnotation(NonNull.class) public class FluoQueryMetadataDAO { @@ -68,8 +70,8 @@ public class FluoQueryMetadataDAO { requireNonNull(tx); requireNonNull(metadata); - Joiner joiner = Joiner.on(IncrementalUpdateConstants.VAR_DELIM); - + final Joiner joiner = Joiner.on(IncrementalUpdateConstants.VAR_DELIM); + final String rowId = metadata.getNodeId(); tx.set(rowId, FluoQueryColumns.QUERY_NODE_ID, rowId); tx.set(rowId, FluoQueryColumns.QUERY_VARIABLE_ORDER, metadata.getVariableOrder().toString()); @@ -111,9 +113,9 @@ public class FluoQueryMetadataDAO { final String childNodeId = values.get(FluoQueryColumns.QUERY_CHILD_NODE_ID); final String queryType = values.get(FluoQueryColumns.QUERY_TYPE); final String[] exportStrategies = values.get(FluoQueryColumns.QUERY_EXPORT_STRATEGIES).split(IncrementalUpdateConstants.VAR_DELIM); - - Set<ExportStrategy> strategies = new HashSet<>(); - for (String strategy : exportStrategies) { + + final Set<ExportStrategy> strategies = new HashSet<>(); + for (final String strategy : exportStrategies) { if (!strategy.isEmpty()) { strategies.add(ExportStrategy.valueOf(strategy)); } @@ -126,8 +128,8 @@ public class FluoQueryMetadataDAO { .setQueryType(QueryType.valueOf(queryType)) .setChildNodeId( childNodeId ); } - - + + /** * Write an instance of {@link ProjectionMetadata} to the Fluo table. * @@ -177,15 +179,15 @@ public class FluoQueryMetadataDAO { final String childNodeId = values.get(FluoQueryColumns.PROJECTION_CHILD_NODE_ID); final String parentNodeId = values.get(FluoQueryColumns.PROJECTION_PARENT_NODE_ID); - + return ProjectionMetadata.builder(nodeId) .setVarOrder( varOrder ) .setProjectedVars(projectedVars) .setParentNodeId(parentNodeId) .setChildNodeId( childNodeId ); } - - + + /** * Write an instance of {@link ConstructQueryMetadata} to the Fluo table. * @@ -221,9 +223,9 @@ public class FluoQueryMetadataDAO { // Fetch the values from the Fluo table. final String rowId = nodeId; - final Map<Column, String> values = sx.gets(rowId, + final Map<Column, String> values = sx.gets(rowId, FluoQueryColumns.CONSTRUCT_GRAPH, - FluoQueryColumns.CONSTRUCT_CHILD_NODE_ID, + FluoQueryColumns.CONSTRUCT_CHILD_NODE_ID, FluoQueryColumns.CONSTRUCT_PARENT_NODE_ID, FluoQueryColumns.CONSTRUCT_VARIABLE_ORDER); @@ -232,7 +234,7 @@ public class FluoQueryMetadataDAO { final String childNodeId = values.get(FluoQueryColumns.CONSTRUCT_CHILD_NODE_ID); final String parentNodeId = values.get(FluoQueryColumns.CONSTRUCT_PARENT_NODE_ID); final String varOrderString = values.get(FluoQueryColumns.CONSTRUCT_VARIABLE_ORDER); - + return ConstructQueryMetadata.builder() .setNodeId(nodeId) @@ -241,8 +243,8 @@ public class FluoQueryMetadataDAO { .setVarOrder(new VariableOrder(varOrderString)) .setChildNodeId(childNodeId); } - - + + /** * Write an instance of {@link FilterMetadata} to the Fluo table. * @@ -368,8 +370,8 @@ public class FluoQueryMetadataDAO { .setUnit(TimeUnit.valueOf(timeUnit)); } - - + + /** * Write an instance of {@link JoinMetadata} to the Fluo table. @@ -586,23 +588,23 @@ public class FluoQueryMetadataDAO { public void write(final TransactionBase tx, final FluoQuery query) { requireNonNull(tx); requireNonNull(query); - + // The results of the query are eventually exported to an instance // of Rya, so store the Rya ID for the PCJ. write(tx, query.getQueryMetadata()); // Write the rest of the metadata objects. - + if (query.getQueryType() == QueryType.CONSTRUCT) { - ConstructQueryMetadata constructMetadata = query.getConstructQueryMetadata().get(); + final ConstructQueryMetadata constructMetadata = query.getConstructQueryMetadata().get(); write(tx, constructMetadata); } - + for(final ProjectionMetadata projection : query.getProjectionMetadata()) { write(tx, projection); } - - Optional<PeriodicQueryMetadata> periodicMetadata = query.getPeriodicQueryMetadata(); + + final Optional<PeriodicQueryMetadata> periodicMetadata = query.getPeriodicQueryMetadata(); if(periodicMetadata.isPresent()) { write(tx, periodicMetadata.get()); } @@ -630,7 +632,7 @@ public class FluoQueryMetadataDAO { * @param sx - The snapshot that will be used to read the metadata from the Fluo table. (not null) * @param queryId - The ID of the query whose nodes will be read. (not null) * @return The {@link FluoQuery} that was read from table. - * @throws UnsupportedQueryException + * @throws UnsupportedQueryException */ public FluoQuery readFluoQuery(final SnapshotBase sx, final String queryId) throws UnsupportedQueryException { requireNonNull(sx); @@ -656,20 +658,20 @@ public class FluoQueryMetadataDAO { // Add it's child's metadata. addChildMetadata(sx, builder, queryBuilder.build().getChildNodeId()); break; - + case PROJECTION: //Add this node's metadata final ProjectionMetadata.Builder projectionBuilder = readProjectionMetadataBuilder(sx, childNodeId); builder.addProjectionBuilder(projectionBuilder); - + //Add it's child's metadata addChildMetadata(sx, builder, projectionBuilder.build().getChildNodeId()); - break; - + break; + case CONSTRUCT: final ConstructQueryMetadata.Builder constructBuilder = readConstructQueryMetadataBuilder(sx, childNodeId); builder.setConstructQueryMetadata(constructBuilder); - + // Add it's child's metadata. addChildMetadata(sx, builder, constructBuilder.build().getChildNodeId()); break; @@ -682,16 +684,16 @@ public class FluoQueryMetadataDAO { // Add it's child's metadata. addChildMetadata(sx, builder, periodicQueryBuilder.build().getChildNodeId()); break; - + case AGGREGATION: // Add this node's metadata. final AggregationMetadata.Builder aggregationBuilder = readAggregationMetadataBuilder(sx, childNodeId); builder.addAggregateMetadata(aggregationBuilder); - + // Add it's child's metadata. addChildMetadata(sx, builder, aggregationBuilder.build().getChildNodeId()); break; - + case JOIN: // Add this node's metadata. final JoinMetadata.Builder joinBuilder = readJoinMetadataBuilder(sx, childNodeId); @@ -719,7 +721,7 @@ public class FluoQueryMetadataDAO { break; default: break; - + } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaExportIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaExportIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaExportIT.java index dbedfb3..7dbd79a 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaExportIT.java +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaExportIT.java @@ -29,9 +29,6 @@ import java.util.Map; import java.util.Set; import java.util.UUID; -import org.apache.fluo.api.client.FluoClient; -import org.apache.fluo.core.client.FluoClientImpl; -import org.apache.fluo.recipes.test.FluoITHelper; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; @@ -92,8 +89,6 @@ public class KafkaExportIT extends KafkaExportITBase { // Create the PCJ in Fluo and load the statements into Rya. final String pcjId = loadDataAndCreateQuery(sparql, statements); - FluoITHelper.printFluoTable(super.getFluoConfiguration()); - // The expected results of the SPARQL query once the PCJ has been computed. final Set<BindingSet> expectedResult = new HashSet<>(); @@ -249,10 +244,6 @@ public class KafkaExportIT extends KafkaExportITBase { // Create the PCJ in Fluo and load the statements into Rya. final String pcjId = loadDataAndCreateQuery(sparql, statements); - - try(FluoClient fluo = new FluoClientImpl(super.getFluoConfiguration())) { - FluoITHelper.printFluoTable(fluo); - } // Create the expected results of the SPARQL query once the PCJ has been computed. final MapBindingSet expectedResult = new MapBindingSet(); @@ -433,7 +424,7 @@ public class KafkaExportIT extends KafkaExportITBase { assertEquals(expectedResults, results); } - + @Test public void nestedGroupByManyBindings_averages() throws Exception { // A query that groups what is aggregated by two of the keys. @@ -493,7 +484,7 @@ public class KafkaExportIT extends KafkaExportITBase { bs.addBinding("location", vf.createLiteral("France", XMLSchema.STRING)); bs.addBinding("averagePrice", vf.createLiteral("4.49", XMLSchema.DECIMAL)); expectedResults.add( new VisibilityBindingSet(bs) ); - + bs = new MapBindingSet(); bs.addBinding("type", vf.createLiteral("cheese", XMLSchema.STRING)); bs.addBinding("location", vf.createLiteral("USA", XMLSchema.STRING)); @@ -504,11 +495,11 @@ public class KafkaExportIT extends KafkaExportITBase { final Set<VisibilityBindingSet> results = readGroupedResults(pcjId, new VariableOrder("type", "location")); assertEquals(expectedResults, results); } - - + + @Test public void nestedWithJoinGroupByManyBindings_averages() throws Exception { - + // A query that groups what is aggregated by two of the keys. final String sparql = "SELECT ?type ?location ?averagePrice ?milkType {" + @@ -524,7 +515,7 @@ public class KafkaExportIT extends KafkaExportITBase { // Create the Statements that will be loaded into Rya. final ValueFactory vf = new ValueFactoryImpl(); final Collection<Statement> statements = Sets.newHashSet( - + vf.createStatement(vf.createURI("urn:1"), vf.createURI("urn:type"), vf.createURI("urn:blue")), vf.createStatement(vf.createURI("urn:1"), vf.createURI("urn:location"), vf.createLiteral("France")), vf.createStatement(vf.createURI("urn:1"), vf.createURI("urn:price"), vf.createLiteral(8.5)), @@ -543,7 +534,7 @@ public class KafkaExportIT extends KafkaExportITBase { vf.createStatement(vf.createURI("urn:4"), vf.createURI("urn:location"), vf.createLiteral("France")), vf.createStatement(vf.createURI("urn:4"), vf.createURI("urn:price"), vf.createLiteral(6.5)), vf.createStatement(vf.createURI("urn:goat"), vf.createURI("urn:hasMilkType"), vf.createLiteral("goat", XMLSchema.STRING)), - + vf.createStatement(vf.createURI("urn:5"), vf.createURI("urn:type"), vf.createURI("urn:fontina")), vf.createStatement(vf.createURI("urn:5"), vf.createURI("urn:location"), vf.createLiteral("Italy")), vf.createStatement(vf.createURI("urn:5"), vf.createURI("urn:price"), vf.createLiteral(3.99)), @@ -572,7 +563,7 @@ public class KafkaExportIT extends KafkaExportITBase { bs.addBinding("averagePrice", vf.createLiteral("6.5", XMLSchema.DECIMAL)); bs.addBinding("milkType", vf.createLiteral("goat", XMLSchema.STRING)); expectedResults.add( new VisibilityBindingSet(bs) ); - + bs = new MapBindingSet(); bs.addBinding("type", vf.createURI("urn:fontina")); bs.addBinding("location", vf.createLiteral("Italy", XMLSchema.STRING)); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/shell/src/main/java/org/apache/rya/shell/RyaAdminCommands.java ---------------------------------------------------------------------- diff --git a/extras/shell/src/main/java/org/apache/rya/shell/RyaAdminCommands.java b/extras/shell/src/main/java/org/apache/rya/shell/RyaAdminCommands.java index db81096..3358806 100644 --- a/extras/shell/src/main/java/org/apache/rya/shell/RyaAdminCommands.java +++ b/extras/shell/src/main/java/org/apache/rya/shell/RyaAdminCommands.java @@ -99,7 +99,8 @@ public class RyaAdminCommands implements CommandMarker { */ @CliAvailabilityIndicator({ LIST_INSTANCES_CMD, - INSTALL_CMD }) + INSTALL_CMD, + INSTALL_PARAMETERS_CMD}) public boolean areStorageCommandsAvailable() { switch(state.getShellState().getConnectionState()) { case CONNECTED_TO_STORAGE: http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 0b64157..ed8a2b6 100644 --- a/pom.xml +++ b/pom.xml @@ -97,6 +97,7 @@ under the License. <gmaven.version>1.3</gmaven.version> <!-- Newest: 1.5 --> <guava.version>14.0.1</guava.version> <!-- Newest: 18.0 --> + <gson.version>2.8.1</gson.version> <httpcomponents.httpclient.version>4.5.2</httpcomponents.httpclient.version> <!-- Newest: 4.5.3 --> <httpcomponents.httpcore.version>4.4.4</httpcomponents.httpcore.version> <!-- Newest: 4.4.6 --> @@ -115,7 +116,7 @@ under the License. <junit.version>4.12</junit.version> <!-- Newest: 4.12 --> <mockito.version>1.10.19</mockito.version> <!-- Newest: 1.10.19 --> <mrunit.version>1.1.0</mrunit.version> <!-- Newest: 1.1.0 --> - <slf4j.version>1.6.6</slf4j.version> <!-- Newest: 1.7.13 --> + <slf4j.version>1.7.25</slf4j.version> <!-- Newest: 1.7.13 --> <powermock.version>1.6.1</powermock.version> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> @@ -129,7 +130,7 @@ under the License. <plexus.version>3.0.8</plexus.version> <thrift.version>0.9.1</thrift.version> <commons.cli.version>1.2</commons.cli.version> - <jcommander.version>1.48</jcommander.version> + <jcommander.version>1.60</jcommander.version> <!-- geowave declares a 1.48 dependency and does not support a version higher than 1.60 --> <twitter4jstream.version>4.0.1</twitter4jstream.version> <jmh.version>1.13</jmh.version> @@ -137,7 +138,7 @@ under the License. <jsr305.version>1.3.9-1</jsr305.version> <jcip.version>1.0-1</jcip.version> <kafka.version>0.10.0.1</kafka.version> - <jopt-simple.version>4.9</jopt-simple.version> + <kryo.version>3.0.3</kryo.version> <!-- set profile property defaults --> <skip.rya.it>true</skip.rya.it> <!-- modified by -P enable-it --> @@ -242,11 +243,6 @@ under the License. </dependency> <dependency> <groupId>org.apache.rya</groupId> - <artifactId>accumulo.utils</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>org.apache.rya</groupId> <artifactId>rya.mapreduce</artifactId> <version>${project.version}</version> </dependency> @@ -449,6 +445,11 @@ under the License. <version>${guava.version}</version> </dependency> <dependency> + <groupId>com.google.code.gson</groupId> + <artifactId>gson</artifactId> + <version>${gson.version}</version> + </dependency> + <dependency> <groupId>org.hamcrest</groupId> <artifactId>hamcrest-all</artifactId> <version>${hamcrest.version}</version> @@ -475,7 +476,22 @@ under the License. <artifactId>slf4j-log4j12</artifactId> <version>${slf4j.version}</version> </dependency> - + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>log4j-over-slf4j</artifactId> + <version>${slf4j.version}</version> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>jcl-over-slf4j</artifactId> + <version>${slf4j.version}</version> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>jul-to-slf4j</artifactId> + <version>${slf4j.version}</version> + </dependency> + <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> @@ -495,6 +511,26 @@ under the License. </dependency> <dependency> <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-hdfs</artifactId> + <version>${hadoop.version}</version> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-client</artifactId> + <version>${hadoop.version}</version> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-common</artifactId> + <version>${hadoop.version}</version> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-api</artifactId> + <version>${hadoop.version}</version> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-minicluster</artifactId> <version>${hadoop.version}</version> </dependency> @@ -816,6 +852,11 @@ under the License. <version>${kafka.version}</version> <classifier>test</classifier> </dependency> + <dependency> + <groupId>com.esotericsoftware</groupId> + <artifactId>kryo</artifactId> + <version>${kryo.version}</version> + </dependency> </dependencies> </dependencyManagement> @@ -943,14 +984,6 @@ under the License. <configuration> <shadedArtifactAttached>true</shadedArtifactAttached> </configuration> - <executions> - <execution> - <phase>package</phase> - <goals> - <goal>shade</goal> - </goals> - </execution> - </executions> </plugin> <plugin> <groupId>org.codehaus.mojo</groupId> @@ -1152,6 +1185,13 @@ under the License. <property name="message" value="Please use Guava imports instead of com.beust.jcommander.internal.*" /> </module> + <module name="RegexpSinglelineJava"> + <property name="format" + value="FluoITHelper[.]printFluoTable" /> + <property name="message" + value="Please comment out stdout debugging utilities like FluoITHelper.printFluoTable()" /> + <property name="ignoreComments" value="true" /> + </module> <!-- <module name="RegexpSinglelineJava"> <property name="format"