http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/917e7a57/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/entity/EntityCentricIndex.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/entity/EntityCentricIndex.java b/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/entity/EntityCentricIndex.java index 5633326..c96e2db 100644 --- a/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/entity/EntityCentricIndex.java +++ b/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/entity/EntityCentricIndex.java @@ -8,9 +8,9 @@ package mvm.rya.indexing.accumulo.entity; * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -27,9 +27,9 @@ import static mvm.rya.api.RdfCloudTripleStoreConstants.EMPTY_BYTES; import static mvm.rya.api.RdfCloudTripleStoreConstants.EMPTY_TEXT; import java.io.IOException; -import java.util.Arrays; import java.util.Collection; import java.util.List; +import java.util.Set; import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; @@ -39,13 +39,13 @@ import org.apache.accumulo.core.client.MultiTableBatchWriter; import org.apache.accumulo.core.client.MutationsRejectedException; import org.apache.accumulo.core.client.TableExistsException; import org.apache.accumulo.core.client.TableNotFoundException; -import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.security.ColumnVisibility; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; import org.apache.log4j.Logger; +import org.openrdf.model.URI; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; @@ -73,21 +73,19 @@ public class EntityCentricIndex extends AbstractAccumuloIndexer { public static final String CONF_TABLE_SUFFIX = "ac.indexer.eci.tablename"; - private void initInternal() throws AccumuloException, AccumuloSecurityException, TableNotFoundException, IOException, TableExistsException { ConfigUtils.createTableIfNotExists(conf, ConfigUtils.getEntityTableName(conf)); } - @Override public Configuration getConf() { - return this.conf; + return conf; } //initialization occurs in setConf because index is created using reflection @Override - public void setConf(Configuration conf) { + public void setConf(final Configuration conf) { if (conf instanceof AccumuloRdfConfiguration) { this.conf = (AccumuloRdfConfiguration) conf; } else { @@ -97,149 +95,141 @@ public class EntityCentricIndex extends AbstractAccumuloIndexer { try { initInternal(); isInit = true; - } catch (AccumuloException e) { + } catch (final AccumuloException e) { logger.warn("Unable to initialize index. Throwing Runtime Exception. ", e); throw new RuntimeException(e); - } catch (AccumuloSecurityException e) { + } catch (final AccumuloSecurityException e) { logger.warn("Unable to initialize index. Throwing Runtime Exception. ", e); throw new RuntimeException(e); - } catch (TableNotFoundException e) { + } catch (final TableNotFoundException e) { logger.warn("Unable to initialize index. Throwing Runtime Exception. ", e); throw new RuntimeException(e); - } catch (TableExistsException e) { + } catch (final TableExistsException e) { logger.warn("Unable to initialize index. Throwing Runtime Exception. ", e); throw new RuntimeException(e); - } catch (IOException e) { + } catch (final IOException e) { logger.warn("Unable to initialize index. Throwing Runtime Exception. ", e); throw new RuntimeException(e); } } } - @Override public String getTableName() { return ConfigUtils.getEntityTableName(conf); } @Override - public void setMultiTableBatchWriter(MultiTableBatchWriter writer) throws IOException { + public void setMultiTableBatchWriter(final MultiTableBatchWriter writer) throws IOException { try { this.writer = writer.getBatchWriter(getTableName()); - } catch (AccumuloException e) { + } catch (final AccumuloException e) { throw new IOException(e); - } catch (AccumuloSecurityException e) { + } catch (final AccumuloSecurityException e) { throw new IOException(e); - } catch (TableNotFoundException e) { + } catch (final TableNotFoundException e) { throw new IOException(e); } - } - @Override - public void storeStatement(RyaStatement stmt) throws IOException { + public void storeStatement(final RyaStatement stmt) throws IOException { Preconditions.checkNotNull(writer, "BatchWriter not Set"); try { - for (TripleRow row : serializeStatement(stmt)) { + for (final TripleRow row : serializeStatement(stmt)) { writer.addMutation(createMutation(row)); } - } catch (MutationsRejectedException e) { + } catch (final MutationsRejectedException e) { throw new IOException(e); - } catch (RyaTypeResolverException e) { + } catch (final RyaTypeResolverException e) { throw new IOException(e); } } - @Override - public void deleteStatement(RyaStatement stmt) throws IOException { + public void deleteStatement(final RyaStatement stmt) throws IOException { Preconditions.checkNotNull(writer, "BatchWriter not Set"); try { - for (TripleRow row : serializeStatement(stmt)) { + for (final TripleRow row : serializeStatement(stmt)) { writer.addMutation(deleteMutation(row)); } - } catch (MutationsRejectedException e) { + } catch (final MutationsRejectedException e) { throw new IOException(e); - } catch (RyaTypeResolverException e) { + } catch (final RyaTypeResolverException e) { throw new IOException(e); } } + protected Mutation deleteMutation(final TripleRow tripleRow) { + final Mutation m = new Mutation(new Text(tripleRow.getRow())); - protected Mutation deleteMutation(TripleRow tripleRow) { - Mutation m = new Mutation(new Text(tripleRow.getRow())); + final byte[] columnFamily = tripleRow.getColumnFamily(); + final Text cfText = columnFamily == null ? EMPTY_TEXT : new Text(columnFamily); - byte[] columnFamily = tripleRow.getColumnFamily(); - Text cfText = columnFamily == null ? EMPTY_TEXT : new Text(columnFamily); + final byte[] columnQualifier = tripleRow.getColumnQualifier(); + final Text cqText = columnQualifier == null ? EMPTY_TEXT : new Text(columnQualifier); - byte[] columnQualifier = tripleRow.getColumnQualifier(); - Text cqText = columnQualifier == null ? EMPTY_TEXT : new Text(columnQualifier); - - byte[] columnVisibility = tripleRow.getColumnVisibility(); - ColumnVisibility cv = columnVisibility == null ? EMPTY_CV : new ColumnVisibility(columnVisibility); + final byte[] columnVisibility = tripleRow.getColumnVisibility(); + final ColumnVisibility cv = columnVisibility == null ? EMPTY_CV : new ColumnVisibility(columnVisibility); m.putDelete(cfText, cqText, cv, tripleRow.getTimestamp()); return m; } - public static Collection<Mutation> createMutations(RyaStatement stmt) throws RyaTypeResolverException{ - Collection<Mutation> m = Lists.newArrayList(); - for (TripleRow tr : serializeStatement(stmt)){ + public static Collection<Mutation> createMutations(final RyaStatement stmt) throws RyaTypeResolverException{ + final Collection<Mutation> m = Lists.newArrayList(); + for (final TripleRow tr : serializeStatement(stmt)){ m.add(createMutation(tr)); } return m; } - private static Mutation createMutation(TripleRow tripleRow) { - Mutation mutation = new Mutation(new Text(tripleRow.getRow())); - byte[] columnVisibility = tripleRow.getColumnVisibility(); - ColumnVisibility cv = columnVisibility == null ? EMPTY_CV : new ColumnVisibility(columnVisibility); - Long timestamp = tripleRow.getTimestamp(); - byte[] value = tripleRow.getValue(); - Value v = value == null ? EMPTY_VALUE : new Value(value); - byte[] columnQualifier = tripleRow.getColumnQualifier(); - Text cqText = columnQualifier == null ? EMPTY_TEXT : new Text(columnQualifier); - byte[] columnFamily = tripleRow.getColumnFamily(); - Text cfText = columnFamily == null ? EMPTY_TEXT : new Text(columnFamily); + private static Mutation createMutation(final TripleRow tripleRow) { + final Mutation mutation = new Mutation(new Text(tripleRow.getRow())); + final byte[] columnVisibility = tripleRow.getColumnVisibility(); + final ColumnVisibility cv = columnVisibility == null ? EMPTY_CV : new ColumnVisibility(columnVisibility); + final Long timestamp = tripleRow.getTimestamp(); + final byte[] value = tripleRow.getValue(); + final Value v = value == null ? EMPTY_VALUE : new Value(value); + final byte[] columnQualifier = tripleRow.getColumnQualifier(); + final Text cqText = columnQualifier == null ? EMPTY_TEXT : new Text(columnQualifier); + final byte[] columnFamily = tripleRow.getColumnFamily(); + final Text cfText = columnFamily == null ? EMPTY_TEXT : new Text(columnFamily); mutation.put(cfText, cqText, cv, timestamp, v); return mutation; } - private static List<TripleRow> serializeStatement(RyaStatement stmt) throws RyaTypeResolverException { - RyaURI subject = stmt.getSubject(); - RyaURI predicate = stmt.getPredicate(); - RyaType object = stmt.getObject(); - RyaURI context = stmt.getContext(); - Long timestamp = stmt.getTimestamp(); - byte[] columnVisibility = stmt.getColumnVisibility(); - byte[] value = stmt.getValue(); + private static List<TripleRow> serializeStatement(final RyaStatement stmt) throws RyaTypeResolverException { + final RyaURI subject = stmt.getSubject(); + final RyaURI predicate = stmt.getPredicate(); + final RyaType object = stmt.getObject(); + final RyaURI context = stmt.getContext(); + final Long timestamp = stmt.getTimestamp(); + final byte[] columnVisibility = stmt.getColumnVisibility(); + final byte[] value = stmt.getValue(); assert subject != null && predicate != null && object != null; - byte[] cf = (context == null) ? EMPTY_BYTES : context.getData().getBytes(); - byte[] subjBytes = subject.getData().getBytes(); - byte[] predBytes = predicate.getData().getBytes(); - byte[][] objBytes = RyaContext.getInstance().serializeType(object); - - return Lists.newArrayList(new TripleRow(subjBytes, // - predBytes, // - Bytes.concat(cf, DELIM_BYTES, // - "object".getBytes(), DELIM_BYTES, // - objBytes[0], objBytes[1]), // - timestamp, // - columnVisibility, // - value// - ), - - new TripleRow(objBytes[0], // - predBytes, // - Bytes.concat(cf, DELIM_BYTES, // - "subject".getBytes(), DELIM_BYTES, // - subjBytes, objBytes[1]), // - timestamp, // - columnVisibility, // - value// - )); + final byte[] cf = (context == null) ? EMPTY_BYTES : context.getData().getBytes(); + final byte[] subjBytes = subject.getData().getBytes(); + final byte[] predBytes = predicate.getData().getBytes(); + final byte[][] objBytes = RyaContext.getInstance().serializeType(object); + + return Lists.newArrayList(new TripleRow(subjBytes, + predBytes, + Bytes.concat(cf, DELIM_BYTES, + "object".getBytes(), DELIM_BYTES, + objBytes[0], objBytes[1]), + timestamp, + columnVisibility, + value), + new TripleRow(objBytes[0], + predBytes, + Bytes.concat(cf, DELIM_BYTES, + "subject".getBytes(), DELIM_BYTES, + subjBytes, objBytes[1]), + timestamp, + columnVisibility, + value)); } /** @@ -297,39 +287,28 @@ public class EntityCentricIndex extends AbstractAccumuloIndexer { null, columnVisibility, valueBytes, timestamp); } - @Override - public void init() { - // TODO Auto-generated method stub - - } - - - @Override - public void setConnector(Connector connector) { - // TODO Auto-generated method stub - - } - - - @Override - public void destroy() { - // TODO Auto-generated method stub - - } - + @Override + public void init() { + } - @Override - public void purge(RdfCloudTripleStoreConfiguration configuration) { - // TODO Auto-generated method stub - - } + @Override + public void setConnector(final Connector connector) { + } + @Override + public void destroy() { + } - @Override - public void dropAndDestroy() { - // TODO Auto-generated method stub - - } + @Override + public void purge(final RdfCloudTripleStoreConfiguration configuration) { + } + @Override + public void dropAndDestroy() { + } + @Override + public Set<URI> getIndexablePredicates() { + return null; + } }
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/917e7a57/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/freetext/AccumuloFreeTextIndexer.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/freetext/AccumuloFreeTextIndexer.java b/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/freetext/AccumuloFreeTextIndexer.java index 2429e79..fe70d82 100644 --- a/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/freetext/AccumuloFreeTextIndexer.java +++ b/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/freetext/AccumuloFreeTextIndexer.java @@ -69,10 +69,10 @@ import mvm.rya.api.RdfCloudTripleStoreConfiguration; import mvm.rya.api.domain.RyaStatement; import mvm.rya.api.resolver.RyaToRdfConversions; import mvm.rya.indexing.FreeTextIndexer; -import mvm.rya.indexing.StatementContraints; +import mvm.rya.indexing.Md5Hash; +import mvm.rya.indexing.StatementConstraints; +import mvm.rya.indexing.StatementSerializer; import mvm.rya.indexing.accumulo.ConfigUtils; -import mvm.rya.indexing.accumulo.Md5Hash; -import mvm.rya.indexing.accumulo.StatementSerializer; import mvm.rya.indexing.accumulo.freetext.iterators.BooleanTreeIterator; import mvm.rya.indexing.accumulo.freetext.query.ASTExpression; import mvm.rya.indexing.accumulo.freetext.query.ASTNodeUtils; @@ -480,7 +480,7 @@ public class AccumuloFreeTextIndexer extends AbstractAccumuloIndexer implements /** {@inheritDoc} */ @Override - public CloseableIteration<Statement, QueryEvaluationException> queryText(String query, StatementContraints contraints) + public CloseableIteration<Statement, QueryEvaluationException> queryText(String query, StatementConstraints contraints) throws IOException { Scanner docTableScan = getScanner(ConfigUtils.getFreeTextDocTablename(conf)); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/917e7a57/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/freetext/ColumnPrefixes.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/freetext/ColumnPrefixes.java b/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/freetext/ColumnPrefixes.java index 31666c9..b33206b 100644 --- a/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/freetext/ColumnPrefixes.java +++ b/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/freetext/ColumnPrefixes.java @@ -24,12 +24,12 @@ package mvm.rya.indexing.accumulo.freetext; import java.nio.ByteBuffer; import java.nio.charset.CharacterCodingException; -import mvm.rya.indexing.accumulo.StatementSerializer; - import org.apache.commons.lang.StringUtils; import org.apache.hadoop.io.Text; import org.openrdf.model.Statement; +import mvm.rya.indexing.StatementSerializer; + /** * Row ID: shardId * <p> http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/917e7a57/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/freetext/FreeTextTupleSet.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/freetext/FreeTextTupleSet.java b/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/freetext/FreeTextTupleSet.java index 471870b..cfb0f38 100644 --- a/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/freetext/FreeTextTupleSet.java +++ b/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/freetext/FreeTextTupleSet.java @@ -29,7 +29,7 @@ import mvm.rya.indexing.FreeTextIndexer; import mvm.rya.indexing.IndexingExpr; import mvm.rya.indexing.IteratorFactory; import mvm.rya.indexing.SearchFunction; -import mvm.rya.indexing.StatementContraints; +import mvm.rya.indexing.StatementConstraints; import mvm.rya.indexing.external.tupleSet.ExternalTupleSet; import org.apache.hadoop.conf.Configuration; @@ -132,7 +132,7 @@ public class FreeTextTupleSet extends ExternalTupleSet { @Override public CloseableIteration<Statement, QueryEvaluationException> performSearch(String queryText, - StatementContraints contraints) throws QueryEvaluationException { + StatementConstraints contraints) throws QueryEvaluationException { try { CloseableIteration<Statement, QueryEvaluationException> statements = freeTextIndexer.queryText( queryText, contraints); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/917e7a57/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/geo/GeoMesaGeoIndexer.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/geo/GeoMesaGeoIndexer.java b/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/geo/GeoMesaGeoIndexer.java index f7ad6b3..becd893 100644 --- a/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/geo/GeoMesaGeoIndexer.java +++ b/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/geo/GeoMesaGeoIndexer.java @@ -8,9 +8,9 @@ package mvm.rya.indexing.accumulo.geo; * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -76,10 +76,10 @@ import mvm.rya.api.RdfCloudTripleStoreConfiguration; import mvm.rya.api.domain.RyaStatement; import mvm.rya.api.resolver.RyaToRdfConversions; import mvm.rya.indexing.GeoIndexer; -import mvm.rya.indexing.StatementContraints; +import mvm.rya.indexing.Md5Hash; +import mvm.rya.indexing.StatementConstraints; +import mvm.rya.indexing.StatementSerializer; import mvm.rya.indexing.accumulo.ConfigUtils; -import mvm.rya.indexing.accumulo.Md5Hash; -import mvm.rya.indexing.accumulo.StatementSerializer; /** * A {@link GeoIndexer} wrapper around a GeoMesa {@link AccumuloDataStore}. This class configures and connects to the Datastore, creates the @@ -142,13 +142,13 @@ public class GeoMesaGeoIndexer extends AbstractAccumuloIndexer implements GeoInd //initialization occurs in setConf because index is created using reflection @Override - public void setConf(Configuration conf) { + public void setConf(final Configuration conf) { this.conf = conf; if (!isInit) { try { initInternal(); isInit = true; - } catch (IOException e) { + } catch (final IOException e) { logger.warn("Unable to initialize index. Throwing Runtime Exception. ", e); throw new RuntimeException(e); } @@ -157,20 +157,20 @@ public class GeoMesaGeoIndexer extends AbstractAccumuloIndexer implements GeoInd @Override public Configuration getConf() { - return this.conf; + return conf; } private void initInternal() throws IOException { validPredicates = ConfigUtils.getGeoPredicates(conf); - DataStore dataStore = createDataStore(conf); + final DataStore dataStore = createDataStore(conf); try { featureType = getStatementFeatureType(dataStore); - } catch (IOException e) { + } catch (final IOException e) { throw new IOException(e); - } catch (SchemaException e) { + } catch (final SchemaException e) { throw new IOException(e); } @@ -181,22 +181,22 @@ public class GeoMesaGeoIndexer extends AbstractAccumuloIndexer implements GeoInd featureStore = (FeatureStore<SimpleFeatureType, SimpleFeature>) featureSource; } - private static DataStore createDataStore(Configuration conf) throws IOException { + private static DataStore createDataStore(final Configuration conf) throws IOException { // get the configuration parameters - Instance instance = ConfigUtils.getInstance(conf); - boolean useMock = instance instanceof MockInstance; - String instanceId = instance.getInstanceName(); - String zookeepers = instance.getZooKeepers(); - String user = ConfigUtils.getUsername(conf); - String password = ConfigUtils.getPassword(conf); - String auths = ConfigUtils.getAuthorizations(conf).toString(); - String tableName = ConfigUtils.getGeoTablename(conf); - int numParitions = ConfigUtils.getGeoNumPartitions(conf); - - String featureSchemaFormat = "%~#s%" + numParitions + "#r%" + FEATURE_NAME + final Instance instance = ConfigUtils.getInstance(conf); + final boolean useMock = instance instanceof MockInstance; + final String instanceId = instance.getInstanceName(); + final String zookeepers = instance.getZooKeepers(); + final String user = ConfigUtils.getUsername(conf); + final String password = ConfigUtils.getPassword(conf); + final String auths = ConfigUtils.getAuthorizations(conf).toString(); + final String tableName = ConfigUtils.getGeoTablename(conf); + final int numParitions = ConfigUtils.getGeoNumPartitions(conf); + + final String featureSchemaFormat = "%~#s%" + numParitions + "#r%" + FEATURE_NAME + "#cstr%0,3#gh%yyyyMMdd#d::%~#s%3,2#gh::%~#s%#id"; // build the map of parameters - Map<String, Serializable> params = new HashMap<String, Serializable>(); + final Map<String, Serializable> params = new HashMap<String, Serializable>(); params.put("instanceId", instanceId); params.put("zookeepers", zookeepers); params.put("user", user); @@ -210,14 +210,14 @@ public class GeoMesaGeoIndexer extends AbstractAccumuloIndexer implements GeoInd return DataStoreFinder.getDataStore(params); } - private static SimpleFeatureType getStatementFeatureType(DataStore dataStore) throws IOException, SchemaException { + private static SimpleFeatureType getStatementFeatureType(final DataStore dataStore) throws IOException, SchemaException { SimpleFeatureType featureType; - String[] datastoreFeatures = dataStore.getTypeNames(); + final String[] datastoreFeatures = dataStore.getTypeNames(); if (Arrays.asList(datastoreFeatures).contains(FEATURE_NAME)) { featureType = dataStore.getSchema(FEATURE_NAME); } else { - String featureSchema = SUBJECT_ATTRIBUTE + ":String," // + final String featureSchema = SUBJECT_ATTRIBUTE + ":String," // + PREDICATE_ATTRIBUTE + ":String," // + OBJECT_ATTRIBUTE + ":String," // + CONTEXT_ATTRIBUTE + ":String," // @@ -229,23 +229,23 @@ public class GeoMesaGeoIndexer extends AbstractAccumuloIndexer implements GeoInd } @Override - public void storeStatements(Collection<RyaStatement> ryaStatements) throws IOException { + public void storeStatements(final Collection<RyaStatement> ryaStatements) throws IOException { // create a feature collection - DefaultFeatureCollection featureCollection = new DefaultFeatureCollection(); + final DefaultFeatureCollection featureCollection = new DefaultFeatureCollection(); - for (RyaStatement ryaStatement : ryaStatements) { + for (final RyaStatement ryaStatement : ryaStatements) { - Statement statement = RyaToRdfConversions.convertStatement(ryaStatement); + final Statement statement = RyaToRdfConversions.convertStatement(ryaStatement); // if the predicate list is empty, accept all predicates. // Otherwise, make sure the predicate is on the "valid" list - boolean isValidPredicate = validPredicates.isEmpty() || validPredicates.contains(statement.getPredicate()); + final boolean isValidPredicate = validPredicates.isEmpty() || validPredicates.contains(statement.getPredicate()); if (isValidPredicate && (statement.getObject() instanceof Literal)) { try { - SimpleFeature feature = createFeature(featureType, statement); + final SimpleFeature feature = createFeature(featureType, statement); featureCollection.add(feature); - } catch (ParseException e) { + } catch (final ParseException e) { logger.warn("Error getting geo from statement: " + statement.toString(), e); } } @@ -259,25 +259,25 @@ public class GeoMesaGeoIndexer extends AbstractAccumuloIndexer implements GeoInd @Override - public void storeStatement(RyaStatement statement) throws IOException { + public void storeStatement(final RyaStatement statement) throws IOException { storeStatements(Collections.singleton(statement)); } - private static SimpleFeature createFeature(SimpleFeatureType featureType, Statement statement) throws ParseException { - String subject = StatementSerializer.writeSubject(statement); - String predicate = StatementSerializer.writePredicate(statement); - String object = StatementSerializer.writeObject(statement); - String context = StatementSerializer.writeContext(statement); + private static SimpleFeature createFeature(final SimpleFeatureType featureType, final Statement statement) throws ParseException { + final String subject = StatementSerializer.writeSubject(statement); + final String predicate = StatementSerializer.writePredicate(statement); + final String object = StatementSerializer.writeObject(statement); + final String context = StatementSerializer.writeContext(statement); // create the feature - Object[] noValues = {}; + final Object[] noValues = {}; // create the hash - String statementId = Md5Hash.md5Base64(StatementSerializer.writeStatement(statement)); - SimpleFeature newFeature = SimpleFeatureBuilder.build(featureType, noValues, statementId); + final String statementId = Md5Hash.md5Base64(StatementSerializer.writeStatement(statement)); + final SimpleFeature newFeature = SimpleFeatureBuilder.build(featureType, noValues, statementId); // write the statement data to the fields - Geometry geom = (new WKTReader()).read(GeoParseUtils.getWellKnownText(statement)); + final Geometry geom = (new WKTReader()).read(GeoParseUtils.getWellKnownText(statement)); if(geom == null || geom.isEmpty() || !geom.isValid()) { throw new ParseException("Could not create geometry for statement " + statement); } @@ -295,9 +295,9 @@ public class GeoMesaGeoIndexer extends AbstractAccumuloIndexer implements GeoInd return newFeature; } - private CloseableIteration<Statement, QueryEvaluationException> performQuery(String type, Geometry geometry, - StatementContraints contraints) { - List<String> filterParms = new ArrayList<String>(); + private CloseableIteration<Statement, QueryEvaluationException> performQuery(final String type, final Geometry geometry, + final StatementConstraints contraints) { + final List<String> filterParms = new ArrayList<String>(); filterParms.add(type + "(" + Constants.SF_PROPERTY_GEOMETRY + ", " + geometry + " )"); @@ -308,14 +308,14 @@ public class GeoMesaGeoIndexer extends AbstractAccumuloIndexer implements GeoInd filterParms.add("( " + CONTEXT_ATTRIBUTE + "= '" + contraints.getContext() + "') "); } if (contraints.hasPredicates()) { - List<String> predicates = new ArrayList<String>(); - for (URI u : contraints.getPredicates()) { + final List<String> predicates = new ArrayList<String>(); + for (final URI u : contraints.getPredicates()) { predicates.add("( " + PREDICATE_ATTRIBUTE + "= '" + u.stringValue() + "') "); } filterParms.add("(" + StringUtils.join(predicates, " OR ") + ")"); } - String filterString = StringUtils.join(filterParms, " AND "); + final String filterString = StringUtils.join(filterParms, " AND "); logger.info("Performing geomesa query : " + filterString); return getIteratorWrapper(filterString); @@ -332,15 +332,15 @@ public class GeoMesaGeoIndexer extends AbstractAccumuloIndexer implements GeoInd Filter cqlFilter; try { cqlFilter = ECQL.toFilter(filterString); - } catch (CQLException e) { + } catch (final CQLException e) { logger.error("Error parsing query: " + filterString, e); throw new QueryEvaluationException(e); } - Query query = new Query(featureType.getTypeName(), cqlFilter); + final Query query = new Query(featureType.getTypeName(), cqlFilter); try { featureIterator = featureSource.getFeatures(query).features(); - } catch (IOException e) { + } catch (final IOException e) { logger.error("Error performing query: " + filterString, e); throw new QueryEvaluationException(e); } @@ -356,12 +356,12 @@ public class GeoMesaGeoIndexer extends AbstractAccumuloIndexer implements GeoInd @Override public Statement next() throws QueryEvaluationException { - SimpleFeature feature = getIterator().next(); - String subjectString = feature.getAttribute(SUBJECT_ATTRIBUTE).toString(); - String predicateString = feature.getAttribute(PREDICATE_ATTRIBUTE).toString(); - String objectString = feature.getAttribute(OBJECT_ATTRIBUTE).toString(); - String contextString = feature.getAttribute(CONTEXT_ATTRIBUTE).toString(); - Statement statement = StatementSerializer.readStatement(subjectString, predicateString, objectString, contextString); + final SimpleFeature feature = getIterator().next(); + final String subjectString = feature.getAttribute(SUBJECT_ATTRIBUTE).toString(); + final String predicateString = feature.getAttribute(PREDICATE_ATTRIBUTE).toString(); + final String objectString = feature.getAttribute(OBJECT_ATTRIBUTE).toString(); + final String contextString = feature.getAttribute(CONTEXT_ATTRIBUTE).toString(); + final Statement statement = StatementSerializer.readStatement(subjectString, predicateString, objectString, contextString); return statement; } @@ -378,42 +378,42 @@ public class GeoMesaGeoIndexer extends AbstractAccumuloIndexer implements GeoInd } @Override - public CloseableIteration<Statement, QueryEvaluationException> queryEquals(Geometry query, StatementContraints contraints) { + public CloseableIteration<Statement, QueryEvaluationException> queryEquals(final Geometry query, final StatementConstraints contraints) { return performQuery("EQUALS", query, contraints); } @Override - public CloseableIteration<Statement, QueryEvaluationException> queryDisjoint(Geometry query, StatementContraints contraints) { + public CloseableIteration<Statement, QueryEvaluationException> queryDisjoint(final Geometry query, final StatementConstraints contraints) { return performQuery("DISJOINT", query, contraints); } @Override - public CloseableIteration<Statement, QueryEvaluationException> queryIntersects(Geometry query, StatementContraints contraints) { + public CloseableIteration<Statement, QueryEvaluationException> queryIntersects(final Geometry query, final StatementConstraints contraints) { return performQuery("INTERSECTS", query, contraints); } @Override - public CloseableIteration<Statement, QueryEvaluationException> queryTouches(Geometry query, StatementContraints contraints) { + public CloseableIteration<Statement, QueryEvaluationException> queryTouches(final Geometry query, final StatementConstraints contraints) { return performQuery("TOUCHES", query, contraints); } @Override - public CloseableIteration<Statement, QueryEvaluationException> queryCrosses(Geometry query, StatementContraints contraints) { + public CloseableIteration<Statement, QueryEvaluationException> queryCrosses(final Geometry query, final StatementConstraints contraints) { return performQuery("CROSSES", query, contraints); } @Override - public CloseableIteration<Statement, QueryEvaluationException> queryWithin(Geometry query, StatementContraints contraints) { + public CloseableIteration<Statement, QueryEvaluationException> queryWithin(final Geometry query, final StatementConstraints contraints) { return performQuery("WITHIN", query, contraints); } @Override - public CloseableIteration<Statement, QueryEvaluationException> queryContains(Geometry query, StatementContraints contraints) { + public CloseableIteration<Statement, QueryEvaluationException> queryContains(final Geometry query, final StatementConstraints contraints) { return performQuery("CONTAINS", query, contraints); } @Override - public CloseableIteration<Statement, QueryEvaluationException> queryOverlaps(Geometry query, StatementContraints contraints) { + public CloseableIteration<Statement, QueryEvaluationException> queryOverlaps(final Geometry query, final StatementConstraints contraints) { return performQuery("OVERLAPS", query, contraints); } @@ -438,21 +438,21 @@ public class GeoMesaGeoIndexer extends AbstractAccumuloIndexer implements GeoInd return ConfigUtils.getGeoTablename(conf); } - private void deleteStatements(Collection<RyaStatement> ryaStatements) throws IOException { + private void deleteStatements(final Collection<RyaStatement> ryaStatements) throws IOException { // create a feature collection - DefaultFeatureCollection featureCollection = new DefaultFeatureCollection(); + final DefaultFeatureCollection featureCollection = new DefaultFeatureCollection(); - for (RyaStatement ryaStatement : ryaStatements) { - Statement statement = RyaToRdfConversions.convertStatement(ryaStatement); + for (final RyaStatement ryaStatement : ryaStatements) { + final Statement statement = RyaToRdfConversions.convertStatement(ryaStatement); // if the predicate list is empty, accept all predicates. // Otherwise, make sure the predicate is on the "valid" list - boolean isValidPredicate = validPredicates.isEmpty() || validPredicates.contains(statement.getPredicate()); + final boolean isValidPredicate = validPredicates.isEmpty() || validPredicates.contains(statement.getPredicate()); if (isValidPredicate && (statement.getObject() instanceof Literal)) { try { - SimpleFeature feature = createFeature(featureType, statement); + final SimpleFeature feature = createFeature(featureType, statement); featureCollection.add(feature); - } catch (ParseException e) { + } catch (final ParseException e) { logger.warn("Error getting geo from statement: " + statement.toString(), e); } } @@ -460,50 +460,50 @@ public class GeoMesaGeoIndexer extends AbstractAccumuloIndexer implements GeoInd // remove this feature collection from the store if (!featureCollection.isEmpty()) { - Set<Identifier> featureIds = new HashSet<Identifier>(); - FilterFactory filterFactory = CommonFactoryFinder.getFilterFactory(null); - Set<String> stringIds = DataUtilities.fidSet(featureCollection); - for (String id : stringIds) { + final Set<Identifier> featureIds = new HashSet<Identifier>(); + final FilterFactory filterFactory = CommonFactoryFinder.getFilterFactory(null); + final Set<String> stringIds = DataUtilities.fidSet(featureCollection); + for (final String id : stringIds) { featureIds.add(filterFactory.featureId(id)); } - Filter filter = filterFactory.id(featureIds); + final Filter filter = filterFactory.id(featureIds); featureStore.removeFeatures(filter); } } @Override - public void deleteStatement(RyaStatement statement) throws IOException { + public void deleteStatement(final RyaStatement statement) throws IOException { deleteStatements(Collections.singleton(statement)); } @Override public void init() { // TODO Auto-generated method stub - + } @Override - public void setConnector(Connector connector) { + public void setConnector(final Connector connector) { // TODO Auto-generated method stub - + } @Override public void destroy() { // TODO Auto-generated method stub - + } @Override - public void purge(RdfCloudTripleStoreConfiguration configuration) { + public void purge(final RdfCloudTripleStoreConfiguration configuration) { // TODO Auto-generated method stub - + } @Override public void dropAndDestroy() { // TODO Auto-generated method stub - + } } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/917e7a57/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/geo/GeoTupleSet.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/geo/GeoTupleSet.java b/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/geo/GeoTupleSet.java index 2bc1bb0..d1468b8 100644 --- a/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/geo/GeoTupleSet.java +++ b/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/geo/GeoTupleSet.java @@ -1,5 +1,20 @@ package mvm.rya.indexing.accumulo.geo; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.openrdf.model.Statement; +import org.openrdf.model.URI; +import org.openrdf.query.BindingSet; +import org.openrdf.query.QueryEvaluationException; + +import com.google.common.base.Joiner; +import com.google.common.collect.Maps; +import com.vividsolutions.jts.geom.Geometry; +import com.vividsolutions.jts.io.ParseException; +import com.vividsolutions.jts.io.WKTReader; + /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -21,42 +36,26 @@ package mvm.rya.indexing.accumulo.geo; import info.aduna.iteration.CloseableIteration; - -import java.util.Map; -import java.util.Set; - import mvm.rya.indexing.GeoIndexer; import mvm.rya.indexing.IndexingExpr; import mvm.rya.indexing.IteratorFactory; import mvm.rya.indexing.SearchFunction; -import mvm.rya.indexing.StatementContraints; +import mvm.rya.indexing.StatementConstraints; import mvm.rya.indexing.external.tupleSet.ExternalTupleSet; -import org.apache.hadoop.conf.Configuration; -import org.openrdf.model.Statement; -import org.openrdf.model.URI; -import org.openrdf.query.BindingSet; -import org.openrdf.query.QueryEvaluationException; - -import com.google.common.base.Joiner; -import com.google.common.collect.Maps; -import com.vividsolutions.jts.geom.Geometry; -import com.vividsolutions.jts.io.ParseException; -import com.vividsolutions.jts.io.WKTReader; - //Indexing Node for geo expressions to be inserted into execution plan //to delegate geo portion of query to geo index public class GeoTupleSet extends ExternalTupleSet { - private Configuration conf; - private GeoIndexer geoIndexer; - private IndexingExpr filterInfo; + private final Configuration conf; + private final GeoIndexer geoIndexer; + private final IndexingExpr filterInfo; - public GeoTupleSet(IndexingExpr filterInfo, GeoIndexer geoIndexer) { + public GeoTupleSet(final IndexingExpr filterInfo, final GeoIndexer geoIndexer) { this.filterInfo = filterInfo; this.geoIndexer = geoIndexer; - this.conf = geoIndexer.getConf(); + conf = geoIndexer.getConf(); } @Override @@ -77,21 +76,21 @@ public class GeoTupleSet extends ExternalTupleSet { @Override public String getSignature() { - return "(GeoTuple Projection) " + "variables: " + Joiner.on(", ").join(this.getBindingNames()).replaceAll("\\s+", " "); + return "(GeoTuple Projection) " + "variables: " + Joiner.on(", ").join(getBindingNames()).replaceAll("\\s+", " "); } @Override - public boolean equals(Object other) { + public boolean equals(final Object other) { if (other == this) { return true; } if (!(other instanceof GeoTupleSet)) { return false; } - GeoTupleSet arg = (GeoTupleSet) other; - return this.filterInfo.equals(arg.filterInfo); + final GeoTupleSet arg = (GeoTupleSet) other; + return filterInfo.equals(arg.filterInfo); } @Override @@ -111,17 +110,17 @@ public class GeoTupleSet extends ExternalTupleSet { * method can be expected with some query evaluators. */ @Override - public CloseableIteration<BindingSet, QueryEvaluationException> evaluate(BindingSet bindings) + public CloseableIteration<BindingSet, QueryEvaluationException> evaluate(final BindingSet bindings) throws QueryEvaluationException { - URI funcURI = filterInfo.getFunction(); - SearchFunction searchFunction = new GeoSearchFunctionFactory(conf).getSearchFunction(funcURI); + final URI funcURI = filterInfo.getFunction(); + final SearchFunction searchFunction = new GeoSearchFunctionFactory(conf).getSearchFunction(funcURI); if(filterInfo.getArguments().length > 1) { throw new IllegalArgumentException("Index functions do not support more than two arguments."); } - String queryText = filterInfo.getArguments()[0].stringValue(); + final String queryText = filterInfo.getArguments()[0].stringValue(); return IteratorFactory.getIterator(filterInfo.getSpConstraint(), bindings, queryText, searchFunction); } @@ -136,7 +135,7 @@ public class GeoTupleSet extends ExternalTupleSet { private final Map<URI, SearchFunction> SEARCH_FUNCTION_MAP = Maps.newHashMap(); - public GeoSearchFunctionFactory(Configuration conf) { + public GeoSearchFunctionFactory(final Configuration conf) { this.conf = conf; } @@ -153,7 +152,7 @@ public class GeoTupleSet extends ExternalTupleSet { try { geoFunc = getSearchFunctionInternal(searchFunction); - } catch (QueryEvaluationException e) { + } catch (final QueryEvaluationException e) { e.printStackTrace(); } @@ -161,7 +160,7 @@ public class GeoTupleSet extends ExternalTupleSet { } private SearchFunction getSearchFunctionInternal(final URI searchFunction) throws QueryEvaluationException { - SearchFunction sf = SEARCH_FUNCTION_MAP.get(searchFunction); + final SearchFunction sf = SEARCH_FUNCTION_MAP.get(searchFunction); if (sf != null) { return sf; @@ -173,15 +172,15 @@ public class GeoTupleSet extends ExternalTupleSet { private final SearchFunction GEO_EQUALS = new SearchFunction() { @Override - public CloseableIteration<Statement, QueryEvaluationException> performSearch(String queryText, - StatementContraints contraints) throws QueryEvaluationException { + public CloseableIteration<Statement, QueryEvaluationException> performSearch(final String queryText, + final StatementConstraints contraints) throws QueryEvaluationException { try { - WKTReader reader = new WKTReader(); - Geometry geometry = reader.read(queryText); - CloseableIteration<Statement, QueryEvaluationException> statements = geoIndexer.queryWithin( + final WKTReader reader = new WKTReader(); + final Geometry geometry = reader.read(queryText); + final CloseableIteration<Statement, QueryEvaluationException> statements = geoIndexer.queryWithin( geometry, contraints); return statements; - } catch (ParseException e) { + } catch (final ParseException e) { throw new QueryEvaluationException(e); } } @@ -195,15 +194,15 @@ public class GeoTupleSet extends ExternalTupleSet { private final SearchFunction GEO_DISJOINT = new SearchFunction() { @Override - public CloseableIteration<Statement, QueryEvaluationException> performSearch(String queryText, - StatementContraints contraints) throws QueryEvaluationException { + public CloseableIteration<Statement, QueryEvaluationException> performSearch(final String queryText, + final StatementConstraints contraints) throws QueryEvaluationException { try { - WKTReader reader = new WKTReader(); - Geometry geometry = reader.read(queryText); - CloseableIteration<Statement, QueryEvaluationException> statements = geoIndexer.queryWithin( + final WKTReader reader = new WKTReader(); + final Geometry geometry = reader.read(queryText); + final CloseableIteration<Statement, QueryEvaluationException> statements = geoIndexer.queryWithin( geometry, contraints); return statements; - } catch (ParseException e) { + } catch (final ParseException e) { throw new QueryEvaluationException(e); } } @@ -217,15 +216,15 @@ public class GeoTupleSet extends ExternalTupleSet { private final SearchFunction GEO_INTERSECTS = new SearchFunction() { @Override - public CloseableIteration<Statement, QueryEvaluationException> performSearch(String queryText, - StatementContraints contraints) throws QueryEvaluationException { + public CloseableIteration<Statement, QueryEvaluationException> performSearch(final String queryText, + final StatementConstraints contraints) throws QueryEvaluationException { try { - WKTReader reader = new WKTReader(); - Geometry geometry = reader.read(queryText); - CloseableIteration<Statement, QueryEvaluationException> statements = geoIndexer.queryWithin( + final WKTReader reader = new WKTReader(); + final Geometry geometry = reader.read(queryText); + final CloseableIteration<Statement, QueryEvaluationException> statements = geoIndexer.queryWithin( geometry, contraints); return statements; - } catch (ParseException e) { + } catch (final ParseException e) { throw new QueryEvaluationException(e); } } @@ -239,15 +238,15 @@ public class GeoTupleSet extends ExternalTupleSet { private final SearchFunction GEO_TOUCHES = new SearchFunction() { @Override - public CloseableIteration<Statement, QueryEvaluationException> performSearch(String queryText, - StatementContraints contraints) throws QueryEvaluationException { + public CloseableIteration<Statement, QueryEvaluationException> performSearch(final String queryText, + final StatementConstraints contraints) throws QueryEvaluationException { try { - WKTReader reader = new WKTReader(); - Geometry geometry = reader.read(queryText); - CloseableIteration<Statement, QueryEvaluationException> statements = geoIndexer.queryWithin( + final WKTReader reader = new WKTReader(); + final Geometry geometry = reader.read(queryText); + final CloseableIteration<Statement, QueryEvaluationException> statements = geoIndexer.queryWithin( geometry, contraints); return statements; - } catch (ParseException e) { + } catch (final ParseException e) { throw new QueryEvaluationException(e); } } @@ -261,15 +260,15 @@ public class GeoTupleSet extends ExternalTupleSet { private final SearchFunction GEO_CONTAINS = new SearchFunction() { @Override - public CloseableIteration<Statement, QueryEvaluationException> performSearch(String queryText, - StatementContraints contraints) throws QueryEvaluationException { + public CloseableIteration<Statement, QueryEvaluationException> performSearch(final String queryText, + final StatementConstraints contraints) throws QueryEvaluationException { try { - WKTReader reader = new WKTReader(); - Geometry geometry = reader.read(queryText); - CloseableIteration<Statement, QueryEvaluationException> statements = geoIndexer.queryWithin( + final WKTReader reader = new WKTReader(); + final Geometry geometry = reader.read(queryText); + final CloseableIteration<Statement, QueryEvaluationException> statements = geoIndexer.queryWithin( geometry, contraints); return statements; - } catch (ParseException e) { + } catch (final ParseException e) { throw new QueryEvaluationException(e); } } @@ -283,15 +282,15 @@ public class GeoTupleSet extends ExternalTupleSet { private final SearchFunction GEO_OVERLAPS = new SearchFunction() { @Override - public CloseableIteration<Statement, QueryEvaluationException> performSearch(String queryText, - StatementContraints contraints) throws QueryEvaluationException { + public CloseableIteration<Statement, QueryEvaluationException> performSearch(final String queryText, + final StatementConstraints contraints) throws QueryEvaluationException { try { - WKTReader reader = new WKTReader(); - Geometry geometry = reader.read(queryText); - CloseableIteration<Statement, QueryEvaluationException> statements = geoIndexer.queryWithin( + final WKTReader reader = new WKTReader(); + final Geometry geometry = reader.read(queryText); + final CloseableIteration<Statement, QueryEvaluationException> statements = geoIndexer.queryWithin( geometry, contraints); return statements; - } catch (ParseException e) { + } catch (final ParseException e) { throw new QueryEvaluationException(e); } } @@ -305,15 +304,15 @@ public class GeoTupleSet extends ExternalTupleSet { private final SearchFunction GEO_CROSSES = new SearchFunction() { @Override - public CloseableIteration<Statement, QueryEvaluationException> performSearch(String queryText, - StatementContraints contraints) throws QueryEvaluationException { + public CloseableIteration<Statement, QueryEvaluationException> performSearch(final String queryText, + final StatementConstraints contraints) throws QueryEvaluationException { try { - WKTReader reader = new WKTReader(); - Geometry geometry = reader.read(queryText); - CloseableIteration<Statement, QueryEvaluationException> statements = geoIndexer.queryWithin( + final WKTReader reader = new WKTReader(); + final Geometry geometry = reader.read(queryText); + final CloseableIteration<Statement, QueryEvaluationException> statements = geoIndexer.queryWithin( geometry, contraints); return statements; - } catch (ParseException e) { + } catch (final ParseException e) { throw new QueryEvaluationException(e); } } @@ -327,15 +326,15 @@ public class GeoTupleSet extends ExternalTupleSet { private final SearchFunction GEO_WITHIN = new SearchFunction() { @Override - public CloseableIteration<Statement, QueryEvaluationException> performSearch(String queryText, - StatementContraints contraints) throws QueryEvaluationException { + public CloseableIteration<Statement, QueryEvaluationException> performSearch(final String queryText, + final StatementConstraints contraints) throws QueryEvaluationException { try { - WKTReader reader = new WKTReader(); - Geometry geometry = reader.read(queryText); - CloseableIteration<Statement, QueryEvaluationException> statements = geoIndexer.queryWithin( + final WKTReader reader = new WKTReader(); + final Geometry geometry = reader.read(queryText); + final CloseableIteration<Statement, QueryEvaluationException> statements = geoIndexer.queryWithin( geometry, contraints); return statements; - } catch (ParseException e) { + } catch (final ParseException e) { throw new QueryEvaluationException(e); } }
