http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/extras/generic.mr/generic.mr.cloudbase/pom.xml ---------------------------------------------------------------------- diff --git a/extras/generic.mr/generic.mr.cloudbase/pom.xml b/extras/generic.mr/generic.mr.cloudbase/pom.xml new file mode 100644 index 0000000..bae1b9f --- /dev/null +++ b/extras/generic.mr/generic.mr.cloudbase/pom.xml @@ -0,0 +1,33 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <groupId>mvm.rya</groupId> + <artifactId>generic.mr</artifactId> + <version>3.2.5-SNAPSHOT</version> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>generic.mr.cloudbase</artifactId> + <name>${project.groupId}.${project.artifactId}</name> + <dependencies> + <dependency> + <groupId>mvm.rya</groupId> + <artifactId>generic.mr.api</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>cloudbase</groupId> + <artifactId>cloudbase-core</artifactId> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.codehaus.gmaven</groupId> + <artifactId>gmaven-plugin</artifactId> + </plugin> + </plugins> + </build> + +</project>
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/extras/generic.mr/generic.mr.cloudbase/src/main/groovy/mvm/rya/generic/mr/cloudbase/CloudbaseMRInfo.groovy ---------------------------------------------------------------------- diff --git a/extras/generic.mr/generic.mr.cloudbase/src/main/groovy/mvm/rya/generic/mr/cloudbase/CloudbaseMRInfo.groovy b/extras/generic.mr/generic.mr.cloudbase/src/main/groovy/mvm/rya/generic/mr/cloudbase/CloudbaseMRInfo.groovy new file mode 100644 index 0000000..7608fb7 --- /dev/null +++ b/extras/generic.mr/generic.mr.cloudbase/src/main/groovy/mvm/rya/generic/mr/cloudbase/CloudbaseMRInfo.groovy @@ -0,0 +1,146 @@ +package mvm.rya.generic.mr.cloudbase + +import cloudbase.core.client.mapreduce.CloudbaseInputFormat +import cloudbase.core.client.mapreduce.CloudbaseOutputFormat +import cloudbase.core.security.Authorizations +import mvm.rya.generic.mr.api.MRInfo +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.io.Text +import org.apache.hadoop.mapreduce.Job +import cloudbase.core.data.Mutation +import cloudbase.core.data.Key +import cloudbase.core.data.Value +import cloudbase.core.security.ColumnVisibility +import cloudbase.core.client.mock.MockInstance +import cloudbase.core.client.ZooKeeperInstance + +/** + * Date: 12/3/12 + * Time: 9:00 AM + */ +class CloudbaseMRInfo implements MRInfo { + + def Configuration conf + def connector; + + @Override + void initMRJob(Job job, String table, String outtable, String[] auths) { + Configuration conf = job.configuration + String username = conf.get(USERNAME) + String password = conf.get(PASSWORD) + String instance = conf.get(INSTANCE) + String zookeepers = conf.get(ZOOKEEPERS) + String mock = conf.get(MOCK) + + //input + if (Boolean.parseBoolean(mock)) { + CloudbaseInputFormat.setMockInstance(job, instance) +// CloudbaseOutputFormat.setMockInstance(conf, instance) + } else if (zookeepers != null) { + CloudbaseInputFormat.setZooKeeperInstance(job, instance, zookeepers) + CloudbaseOutputFormat.setZooKeeperInstance(job, instance, zookeepers) + } else { + throw new IllegalArgumentException("Must specify either mock or zookeepers"); + } + + CloudbaseInputFormat.setInputInfo(job, username, password.getBytes(), table, new Authorizations(auths)) + job.setInputFormatClass(CloudbaseInputFormat.class); + + // OUTPUT + job.setOutputFormatClass(CloudbaseOutputFormat.class); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(Mutation.class); + CloudbaseOutputFormat.setOutputInfo(job, username, password.getBytes(), true, outtable); + } + + @Override + def key(byte[] data) { + Key key = new Key(); + key.readFields(new DataInputStream(new ByteArrayInputStream(data))) + return key + } + + @Override + def key(String row, String cf, String cq, String cv, long timestamp) { + return new Key(row, cf, cq, cv, timestamp) + } + + @Override + def value(byte[] data) { + return new Value(data) + } + + @Override + def columnVisibility(String cv) { + return new ColumnVisibility(cv) + } + + @Override + def mutation(String row, String cf, String cq, String cv, long timestamp, byte[] val) { + Mutation mutation = new Mutation(row); + mutation.put(cf, cq, columnVisibility(cv), timestamp, value(val)) + return mutation + } + + @Override + def instance() { + assert conf != null + + String instance_str = conf.get(INSTANCE) + String zookeepers = conf.get(ZOOKEEPERS) + String mock = conf.get(MOCK) + if (Boolean.parseBoolean(mock)) { + return new MockInstance(instance_str) + } else if (zookeepers != null) { + return new ZooKeeperInstance(instance_str, zookeepers) + } else { + throw new IllegalArgumentException("Must specify either mock or zookeepers"); + } + } + + @Override + def connector(def instance) { + if (connector != null) return connector + + String username = conf.get(USERNAME) + String password = conf.get(PASSWORD) + if (instance == null) + instance = instance() + connector = instance.getConnector(username, password) + return connector + } + + @Override + def void writeMutations(def connector, String tableName, Iterator mutations) { + def bw = connector.createBatchWriter(tableName, 10000l, 10000l, 4); + mutations.each { m -> + bw.addMutation(m) + } + bw.flush() + bw.close() + } + + @Override + def scanner(def connector, String tableName, String[] auths) { + return connector.createScanner(tableName, new Authorizations(auths)) + } + + @Override + def batchScanner(def connector, String tableName, String[] auths, int numThreads) { + return connector.createBatchScanner(tableName, new Authorizations(auths), numThreads) + } + + @Override + def range(def startKey, def endKey) { + assert startKey != null + + if (endKey != null) + return new cloudbase.core.data.Range(startKey, endKey) + return new cloudbase.core.data.Range(startKey) + } + + @Override + def authorizations(String[] auths) { + return new Authorizations(auths) + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/extras/generic.mr/generic.mr.cloudbase/src/main/resources/META-INF/services/mvm.rya.generic.mr.api.MRInfo ---------------------------------------------------------------------- diff --git a/extras/generic.mr/generic.mr.cloudbase/src/main/resources/META-INF/services/mvm.rya.generic.mr.api.MRInfo b/extras/generic.mr/generic.mr.cloudbase/src/main/resources/META-INF/services/mvm.rya.generic.mr.api.MRInfo new file mode 100644 index 0000000..728f9dd --- /dev/null +++ b/extras/generic.mr/generic.mr.cloudbase/src/main/resources/META-INF/services/mvm.rya.generic.mr.api.MRInfo @@ -0,0 +1 @@ +mvm.rya.generic.mr.cloudbase.CloudbaseMRInfo \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/extras/generic.mr/pom.xml ---------------------------------------------------------------------- diff --git a/extras/generic.mr/pom.xml b/extras/generic.mr/pom.xml new file mode 100644 index 0000000..2a277d2 --- /dev/null +++ b/extras/generic.mr/pom.xml @@ -0,0 +1,27 @@ +<?xml version="1.0" encoding="utf-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + <parent> + <groupId>mvm.rya</groupId> + <artifactId>rya.extras</artifactId> + <version>3.2.5-SNAPSHOT</version> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>generic.mr</artifactId> + <name>${project.groupId}.${project.artifactId}</name> + <packaging>pom</packaging> + <modules> + <module>generic.mr.api</module> + <module>generic.mr.accumulo</module> + </modules> + + <profiles> + <profile> + <id>cloudbase</id> + <modules> + <module>generic.mr.cloudbase</module> + </modules> + </profile> + </profiles> +</project> http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/extras/indexing/pom.xml ---------------------------------------------------------------------- diff --git a/extras/indexing/pom.xml b/extras/indexing/pom.xml new file mode 100644 index 0000000..7977b20 --- /dev/null +++ b/extras/indexing/pom.xml @@ -0,0 +1,201 @@ +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <groupId>mvm.rya</groupId> + <artifactId>rya.extras</artifactId> + <version>3.2.9</version> + </parent> + + <modelVersion>4.0.0</modelVersion> + <name>${project.groupId}.${project.artifactId}</name> + <artifactId>rya.indexing</artifactId> + + <dependencies> + <dependency> + <groupId>mvm.rya</groupId> + <artifactId>rya.sail.impl</artifactId> + <exclusions> + <exclusion> + <artifactId>hsqldb</artifactId> + <groupId>hsqldb</groupId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>mvm.rya</groupId> + <artifactId>accumulo.rya</artifactId> + </dependency> + <dependency> + <groupId>mvm.rya</groupId> + <artifactId>mongodb.rya</artifactId> + </dependency> + <dependency> + <groupId>org.apache.lucene</groupId> + <artifactId>lucene-core</artifactId> + <version>3.6.2</version> + </dependency> + <dependency> + <groupId>org.apache.lucene</groupId> + <artifactId>lucene-analyzers</artifactId> + <version>3.6.2</version> + </dependency> + <dependency> + <groupId>org.apache.lucene</groupId> + <artifactId>lucene-analyzers</artifactId> + <version>3.6.2</version> + </dependency> + + <dependency> + <groupId>commons-codec</groupId> + <artifactId>commons-codec</artifactId> + <version>1.4</version> + </dependency> + + <!-- I was having issues with hadoop conf, but adding xerces and xalan fixed it --> + + <dependency> + <groupId>xerces</groupId> + <artifactId>xercesImpl</artifactId> + <version>2.9.1</version> + </dependency> + <dependency> + <groupId>xalan</groupId> + <artifactId>xalan</artifactId> + <version>2.7.1</version> + </dependency> + + <!-- Geo Indexing --> + <dependency> + <groupId>org.locationtech.geomesa</groupId> + <artifactId>geomesa-accumulo-datastore</artifactId> + <version>${geomesa.version}</version> + </dependency> + <dependency> + <groupId>mvm.rya</groupId> + <artifactId>rya.prospector</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.accumulo</groupId> + <artifactId>accumulo-core</artifactId> + </dependency> + <dependency> + <groupId>org.mongodb</groupId> + <artifactId>mongo-java-driver</artifactId> + <version>2.13.0-rc0</version> + </dependency> + </dependencies> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <executions> + <execution> + <configuration> + <shadedArtifactAttached>true</shadedArtifactAttached> + <shadedClassifierName>map-reduce</shadedClassifierName> + <transformers> + <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" /> + </transformers> + </configuration> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <executions> + <execution> + <id>accumulo-server</id> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + <configuration> + <shadedArtifactAttached>true</shadedArtifactAttached> + <shadedClassifierName>accumulo-server</shadedClassifierName> + <artifactSet> + <excludes> + <exclude>org.locationtech.geomesa:*</exclude> + <exclude>scala:*</exclude> + <exclude>org.apache.accumulo:*</exclude> + <exclude>org.apache.thrift:*</exclude> + <exclude>org.apache.hadoop:*</exclude> + <exclude>org.apache.zookeeper:*</exclude> + </excludes> + </artifactSet> + <transformers> + <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" /> + </transformers> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + <profiles> + <profile> + <id>accumulo</id> + <activation> + <activeByDefault>true</activeByDefault> + </activation> + <dependencies> + <dependency> + <groupId>org.apache.accumulo</groupId> + <artifactId>accumulo-core</artifactId> + <optional>true</optional> + </dependency> + <dependency> + <groupId>mvm.rya</groupId> + <artifactId>accumulo.iterators</artifactId> + <optional>true</optional> + </dependency> + </dependencies> + </profile> + <profile> + <id>cloudbase</id> + <activation> + <activeByDefault>false</activeByDefault> + </activation> + <dependencies> + <dependency> + <groupId>com.texeltek</groupId> + <artifactId>accumulo-cloudbase-shim</artifactId> + <optional>true</optional> + </dependency> + <dependency> + <groupId>mvm.rya</groupId> + <artifactId>cloudbase.iterators</artifactId> + <optional>true</optional> + </dependency> + </dependencies> + </profile> + <profile> + <id>mr</id> + <activation> + <activeByDefault>true</activeByDefault> + </activation> + </profile> + </profiles> + + + <repositories> + <repository> + <id>cloudera</id> + <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url> + </repository> + <repository> + <id>public.opensahara.com</id> + <url>http://dev.opensahara.com/nexus/content/groups/public/</url> + </repository> + <repository> + <id>geotools</id> + <url>http://download.osgeo.org/webdav/geotools/</url> + </repository> + </repositories> +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/extras/indexing/src/main/java/mvm/rya/accumulo/documentIndex/DocIndexIteratorUtil.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/accumulo/documentIndex/DocIndexIteratorUtil.java b/extras/indexing/src/main/java/mvm/rya/accumulo/documentIndex/DocIndexIteratorUtil.java new file mode 100644 index 0000000..b6063ca --- /dev/null +++ b/extras/indexing/src/main/java/mvm/rya/accumulo/documentIndex/DocIndexIteratorUtil.java @@ -0,0 +1,11 @@ +package mvm.rya.accumulo.documentIndex; + +public class DocIndexIteratorUtil { + + + + public static final String DOC_ID_INDEX_DELIM = "\u001D" + "\u001E"; + + + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/extras/indexing/src/main/java/mvm/rya/accumulo/documentIndex/DocumentIndexIntersectingIterator.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/accumulo/documentIndex/DocumentIndexIntersectingIterator.java b/extras/indexing/src/main/java/mvm/rya/accumulo/documentIndex/DocumentIndexIntersectingIterator.java new file mode 100644 index 0000000..ad38b2b --- /dev/null +++ b/extras/indexing/src/main/java/mvm/rya/accumulo/documentIndex/DocumentIndexIntersectingIterator.java @@ -0,0 +1,850 @@ +package mvm.rya.accumulo.documentIndex; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.Map; + +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.data.ArrayByteSequence; +import org.apache.accumulo.core.data.ByteSequence; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.PartialKey; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.iterators.IteratorEnvironment; +import org.apache.accumulo.core.iterators.SortedKeyValueIterator; +import org.apache.accumulo.core.tabletserver.thrift.TabletClientService; +import org.apache.accumulo.core.util.TextUtil; +import org.apache.commons.codec.binary.Base64; +import org.apache.hadoop.io.Text; +import org.apache.log4j.Logger; + +/** + * This iterator facilitates document-partitioned indexing. It involves grouping a set of documents together and indexing those documents into a single row of + * an Accumulo table. This allows a tablet server to perform boolean AND operations on terms in the index. + * + * The table structure should have the following form: + * + * row: shardID, colfam: term, colqual: docID + * + * When you configure this iterator with a set of terms (column families), it will return only the docIDs that appear with all of the specified terms. The + * result will have an empty column family, as follows: + * + * row: shardID, colfam: (empty), colqual: docID + * + * This iterator is commonly used with BatchScanner or AccumuloInputFormat, to parallelize the search over all shardIDs. + * + * This iterator will *ignore* any columnFamilies passed to {@link #seek(Range, Collection, boolean)} as it performs intersections over terms. Extending classes + * should override the {@link TermSource#seekColfams} in their implementation's {@link #init(SortedKeyValueIterator, Map, IteratorEnvironment)} method. + * + * README.shard in docs/examples shows an example of using the IntersectingIterator. + */ +public class DocumentIndexIntersectingIterator implements SortedKeyValueIterator<Key,Value> { + + + + + protected Text nullText = new Text(); + + protected Text getRow(Key key) { + return key.getRow(); + } + + protected Text getTerm(Key key) { + return key.getColumnFamily(); + } + + protected Text getTermCond(Key key) { + return key.getColumnQualifier(); + } + + protected Key buildKey(Text row, TextColumn column) { + return new Key(row, (column.getColumnFamily() == null) ? nullText: column.getColumnFamily(), column.getColumnQualifier()); + } + + protected Key buildKey(Text row, Text term) { + return new Key(row, (term == null) ? nullText : term); + } + + protected Key buildKey(Text row, Text term, Text termCond) { + return new Key(row, (term == null) ? nullText : term, termCond); + } + + protected Key buildFollowRowKey(Key key, Text term, Text termCond) { + return new Key(getRow(key.followingKey(PartialKey.ROW)),(term == null) ? nullText : term, termCond); + } + + protected static final Logger log = Logger.getLogger(DocumentIndexIntersectingIterator.class); + + public static class TermSource { + public SortedKeyValueIterator<Key, Value> iter; + public Text term; + public Text termCond; + public Collection<ByteSequence> seekColfams; + public TextColumn column; + public boolean isPrefix; + public Key top ; + public Key next ; + public Text currentCQ; + private boolean seeked = false; + + public TermSource(TermSource other) { + + this.iter = other.iter; + this.term = other.term; + this.termCond = other.termCond; + this.seekColfams = other.seekColfams; + this.column = other.column; + this.top = other.top; + this.next = other.next; + this.currentCQ = other.currentCQ; + this.isPrefix = other.isPrefix; + } + + + public TermSource(SortedKeyValueIterator<Key, Value> iter, TextColumn column) { + + this.iter = iter; + this.column = column; + this.term = column.getColumnFamily(); + this.termCond = column.getColumnQualifier(); + this.currentCQ = new Text(emptyByteArray); + this.seekColfams = Collections.<ByteSequence> singletonList(new ArrayByteSequence(term + .getBytes(), 0, term.getLength())); + + } + + + + public void seek(Range r) throws IOException { + + if (seeked) { + + if (next != null && !r.beforeStartKey(next)) { + if (next.getColumnFamily().equals(term)) { + this.updateTop(); + } + } else if (iter.hasTop()) { + iter.seek(r, seekColfams, true); + this.updateTopNext(); + } else { + top = null; + next = null; + + } + } else { + + iter.seek(r, seekColfams, true); + this.updateTopNext(); + seeked = true; + } + + } + + + public void next() throws IOException { + + this.updateTop(); + } + + public void updateTop() throws IOException { + + top = next; + if (next != null) { + iter.next(); + if (iter.hasTop()) { + next = iter.getTopKey(); + } else { + next = null; + } + } + + } + + public void updateTopNext() throws IOException { + + if (iter.hasTop()) { + top = iter.getTopKey(); + } else { + top = null; + next = null; + return; + } + + iter.next(); + + if(iter.hasTop()) { + next = iter.getTopKey(); + } else { + next = null; + } + } + + public boolean hasTop() { + return top != null; + } + + + public String getTermString() { + return (this.term == null) ? new String("Iterator") : this.term.toString(); + } + } + + TermSource[] sources; + int sourcesCount = 0; + Range overallRange; + + // query-time settings + protected Text currentRow = null; + protected Text currentTermCond = new Text(emptyByteArray); + static final byte[] emptyByteArray = new byte[0]; + + protected Key topKey = null; + protected Value value = new Value(emptyByteArray); + protected String ctxt = null; + protected boolean hasContext = false; + protected boolean termCondSet = false; + + public DocumentIndexIntersectingIterator() {} + + @Override + public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) { + //log.info("Calling deep copy on " + this); + return new DocumentIndexIntersectingIterator(this, env); + } + + private DocumentIndexIntersectingIterator(DocumentIndexIntersectingIterator other, IteratorEnvironment env) { + if (other.sources != null) { + sourcesCount = other.sourcesCount; + sources = new TermSource[sourcesCount]; + for (int i = 0; i < sourcesCount; i++) { + sources[i] = new TermSource(other.sources[i].iter.deepCopy(env), other.sources[i].column); + } + } + } + + @Override + public Key getTopKey() { + + return topKey; + } + + @Override + public Value getTopValue() { + // we don't really care about values + return value; + } + + @Override + public boolean hasTop() { + return currentRow != null; + } + + // precondition: currentRow is not null + private boolean seekOneSource(int sourceID) throws IOException { + // find the next key in the appropriate column family that is at or + // beyond the cursor (currentRow, currentCQ) + // advance the cursor if this source goes beyond it + // return whether we advanced the cursor + + // within this loop progress must be made in one of the following forms: + // - currentRow or currentCQ must be increased + // - the given source must advance its iterator + // this loop will end when any of the following criteria are met + // - the iterator for the given source is pointing to the key + // (currentRow, columnFamilies[sourceID], currentCQ) + // - the given source is out of data and currentRow is set to null + // - the given source has advanced beyond the endRow and currentRow is + // set to null + boolean advancedCursor = false; + + + + + + while (true) { + +// if(currentRow.toString().equals(s)) { +// log.info("Source id is " + sourceID); +// if (sources[sourceID].top != null) { +// log.info("Top row is " + getRow(sources[sourceID].top)); +// log.info("Top cq is " + getTermCond(sources[sourceID].top)); +// } +// if (sources[sourceID].next != null) { +// log.info("Next row is " + getRow(sources[sourceID].next)); +// log.info("Next termCond is " + getTermCond(sources[sourceID].next)); +// } +// } + + if (sources[sourceID].hasTop() == false) { + currentRow = null; + // setting currentRow to null counts as advancing the cursor + return true; + } + // check if we're past the end key + int endCompare = -1; + // we should compare the row to the end of the range + + if (overallRange.getEndKey() != null) { + endCompare = overallRange.getEndKey().getRow().compareTo(sources[sourceID].top.getRow()); + if ((!overallRange.isEndKeyInclusive() && endCompare <= 0) || endCompare < 0) { + currentRow = null; + // setting currentRow to null counts as advancing the cursor + return true; + } + } + + + + int rowCompare = currentRow.compareTo(getRow(sources[sourceID].top)); + // check if this source is already at or beyond currentRow + // if not, then seek to at least the current row + + + + if (rowCompare > 0) { + // seek to at least the currentRow + Key seekKey = buildKey(currentRow, sources[sourceID].term); + sources[sourceID].seek(new Range(seekKey, true, null, false)); + + continue; + } + // check if this source has gone beyond currentRow + // if so, advance currentRow + if (rowCompare < 0) { + currentRow.set(getRow(sources[sourceID].top)); + //log.info("Current row is " + currentRow); + advancedCursor = true; + continue; + } + // we have verified that the current source is positioned in + // currentRow + // now we must make sure we're in the right columnFamily in the + // current row + // Note: Iterators are auto-magically set to the correct + // columnFamily + + if (sources[sourceID].column.isValid()) { + + boolean isPrefix = false; + boolean contextEqual = false; + String tempContext = ""; + + int termCompare; + + String[] cQ = getTermCond(sources[sourceID].top).toString().split("\u0000"); + tempContext = cQ[0]; + + if (!hasContext && ctxt == null) { + ctxt = cQ[0]; + } + + contextEqual = ctxt.equals(cQ[0]); + + String s1 = sources[sourceID].termCond.toString(); + String s2 = cQ[1] + "\u0000" + cQ[2]; + + if (sources[sourceID].isPrefix) { + isPrefix = s2.startsWith(s1 + "\u0000"); + } else { + isPrefix = s2.startsWith(s1); + } + + termCompare = (contextEqual && isPrefix) ? 0 : (ctxt + "\u0000" + s1).compareTo(cQ[0] + "\u0000" + s2); + + // if(currentRow.toString().equals(s)) { + // log.info("Term compare is " + termCompare); + // } + + // check if this source is already on the right columnFamily + // if not, then seek forwards to the right columnFamily + if (termCompare > 0) { + Key seekKey = buildKey(currentRow, sources[sourceID].term, new Text(ctxt + + "\u0000" + sources[sourceID].termCond.toString())); + sources[sourceID].seek(new Range(seekKey, true, null, false)); + + continue; + } + // check if this source is beyond the right columnFamily + // if so, then seek to the next row + if (termCompare < 0) { + // we're out of entries in the current row, so seek to the + // next one + + if (endCompare == 0) { + // we're done + currentRow = null; + // setting currentRow to null counts as advancing the + // cursor + return true; + } + + + + //advance to next row if context set - all entries in given row exhausted + if (hasContext || tempContext.length() == 0) { + Key seekKey = buildFollowRowKey(sources[sourceID].top, sources[sourceID].term, + new Text(ctxt + "\u0000" + sources[sourceID].termCond.toString())); + sources[sourceID].seek(new Range(seekKey, true, null, false)); + } else { + + if(contextEqual && !isPrefix) { + Key seekKey = buildKey(currentRow, sources[sourceID].term, new Text(ctxt + "\u0001")); + sources[sourceID].seek(new Range(seekKey, true, null, false)); + if(sources[sourceID].top != null) { + ctxt = getTermCond(sources[sourceID].top).toString().split("\u0000")[0]; + } + } else { + Key seekKey = buildKey(currentRow, sources[sourceID].term, new Text(tempContext + + "\u0000" + sources[sourceID].termCond.toString())); + sources[sourceID].seek(new Range(seekKey, true, null, false)); + if(sources[sourceID].top != null) { + ctxt = getTermCond(sources[sourceID].top).toString().split("\u0000")[0]; + } + } + + } + + +// if(currentRow.toString().equals(s)) { +// log.info("current term cond is " + currentTermCond); +// +// } + + + continue; + } + } + + + + + + + + + + + //set currentTermCond -- gets appended to end of currentKey column qualifier + //used to determine which term iterator to advance when a new iterator is created + + sources[sourceID].currentCQ.set(getTermCond(sources[sourceID].top)); + + if (sources[sourceID].next != null) { + + //is hasContext, only consider sourceID with next having designated context + //otherwise don't set currentTermCond + if (!termCondSet && hasContext) { + if (sources[sourceID].next.getRow().equals(currentRow) + && sources[sourceID].next.getColumnQualifier().toString() + .startsWith(ctxt + "\u0000" + sources[sourceID].termCond.toString())) { + currentTermCond.set(new Text(Integer.toString(sourceID))); + termCondSet = true; + } + } else if(!termCondSet){ + String[] cq = getTermCond(sources[sourceID].next).toString().split("\u0000"); + + //set currentTermCond with preference given to sourceID having next with same context + //otherwise set currentTermCond sourceID with next having termCond as prefix + if (sources[sourceID].next.getRow().equals(currentRow)) { + if (sources[sourceID].next.getColumnQualifier().toString() + .startsWith(ctxt + "\u0000" + sources[sourceID].termCond.toString())) { + currentTermCond.set(new Text(Integer.toString(sourceID))); + termCondSet = true; + } else if ((cq[1] + "\u0000" + cq[2]).startsWith(sources[sourceID].termCond.toString())) { + currentTermCond.set(new Text(Integer.toString(sourceID))); + } + } + } + } + + + break; + } + + return advancedCursor; + } + + @Override + public void next() throws IOException { + if (currentRow == null) { + return; + } + + + + if(currentTermCond.getLength() != 0) { + + int id = Integer.parseInt(currentTermCond.toString()); + + sources[id].next(); + currentTermCond.set(emptyByteArray); + termCondSet = false; + if(sources[id].top != null && !hasContext) { + ctxt = getTermCond(sources[id].top).toString().split("\u0000")[0]; + } + advanceToIntersection(); + return; + } + + sources[0].next(); + if(sources[0].top != null && !hasContext) { + ctxt = getTermCond(sources[0].top).toString().split("\u0000")[0]; + } + advanceToIntersection(); + } + + protected void advanceToIntersection() throws IOException { + boolean cursorChanged = true; + while (cursorChanged) { + // seek all of the sources to at least the highest seen column qualifier in the current row + cursorChanged = false; + for (int i = 0; i < sourcesCount; i++) { +// log.info("New sourceID is " + i); + if (currentRow == null) { + topKey = null; + return; + } + if (seekOneSource(i)) { + currentTermCond.set(emptyByteArray); + termCondSet = false; + cursorChanged = true; + break; + } + } + } + String cq = ""; + for(int i = 0; i < sourcesCount; i++) { + cq = cq + sources[i].currentCQ.toString() + DocIndexIteratorUtil.DOC_ID_INDEX_DELIM; + } + + if (currentTermCond.getLength() == 0) { + topKey = buildKey(currentRow, nullText, new Text(cq + -1)); + } else { + topKey = buildKey(currentRow, nullText, new Text(cq + currentTermCond.toString())); + } + } + + public static String stringTopKey(SortedKeyValueIterator<Key,Value> iter) { + if (iter.hasTop()) + return iter.getTopKey().toString(); + return ""; + } + + private static final String columnOptionName = "columns"; + private static final String columnPrefix = "prefixes"; + private static final String context = "context"; + + + + protected static String encodeColumns(TextColumn[] columns) { + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < columns.length; i++) { + sb.append(new String(Base64.encodeBase64(TextUtil.getBytes(columns[i].getColumnFamily())))); + sb.append('\n'); + sb.append(new String(Base64.encodeBase64(TextUtil.getBytes(columns[i].getColumnQualifier())))); + sb.append('\u0001'); + } + return sb.toString(); + } + + + + protected static TextColumn[] decodeColumns(String columns) { + String[] columnStrings = columns.split("\u0001"); + TextColumn[] columnTexts = new TextColumn[columnStrings.length]; + for (int i = 0; i < columnStrings.length; i++) { + String[] columnComponents = columnStrings[i].split("\n"); + columnTexts[i] = new TextColumn(new Text(Base64.decodeBase64(columnComponents[0].getBytes())), + new Text(Base64.decodeBase64(columnComponents[1].getBytes()))); + } + return columnTexts; + } + + + + + + /** + * @param context + * @return encoded context + */ + protected static String encodeContext(String context) { + + return new String(Base64.encodeBase64(context.getBytes())); + } + + + + /** + * @param context + * @return decoded context + */ + protected static String decodeContext(String context) { + + if (context == null) { + return null; + } else { + return new String(Base64.decodeBase64(context.getBytes())); + } + } + + + + + + protected static String encodeBooleans(boolean[] prefixes) { + byte[] bytes = new byte[prefixes.length]; + for (int i = 0; i < prefixes.length; i++) { + if (prefixes[i]) + bytes[i] = 1; + else + bytes[i] = 0; + } + return new String(Base64.encodeBase64(bytes)); + } + + /** + * @param flags + * @return decoded flags + */ + protected static boolean[] decodeBooleans(String prefixes) { + // return null of there were no flags + if (prefixes == null) + return null; + + byte[] bytes = Base64.decodeBase64(prefixes.getBytes()); + boolean[] bFlags = new boolean[bytes.length]; + for (int i = 0; i < bytes.length; i++) { + if (bytes[i] == 1) + bFlags[i] = true; + else + bFlags[i] = false; + } + return bFlags; + } + + + + + + + + + @Override + public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException { + TextColumn[] terms = decodeColumns(options.get(columnOptionName)); + boolean[] prefixes = decodeBooleans(options.get(columnPrefix)); + ctxt = decodeContext(options.get(context)); + + if(ctxt != null) { + hasContext = true; + } + + + + if (terms.length < 2) { + throw new IllegalArgumentException("IntersectionIterator requires two or more columns families"); + } + + sources = new TermSource[terms.length]; + sources[0] = new TermSource(source, terms[0]); + for (int i = 1; i < terms.length; i++) { + //log.info("For decoded column " + i + " column family is " + terms[i].getColumnFamily() + " and qualifier is " + terms[i].getColumnQualifier()); + sources[i] = new TermSource(source.deepCopy(env), terms[i]); + sources[i].isPrefix = prefixes[i]; + } + sourcesCount = terms.length; + } + + @Override + public void seek(Range range, Collection<ByteSequence> seekColumnFamilies, boolean inclusive) throws IOException { + overallRange = new Range(range); + currentRow = new Text(); + currentTermCond.set(emptyByteArray); + termCondSet = false; + + + +// log.info("Calling seek with range " + range); + + // seek each of the sources to the right column family within the row + // given by key + + Key sourceKey; + + if (rangeCqValid(range)) { + + String[] cqInfo = cqParser(range.getStartKey().getColumnQualifier()); + int id = Integer.parseInt(cqInfo[1]); + + + + if (id >= 0) { + for (int i = 0; i < sourcesCount; i++) { + + if (i == id) { + sourceKey = buildKey(getRow(range.getStartKey()), sources[i].term, new Text(cqInfo[0])); + sources[i].seek(new Range(sourceKey, true, null, false)); + sources[i].next(); + if(!hasContext && sources[i].hasTop()) { + ctxt = getTermCond(sources[i].top).toString().split("\u0000")[0]; + } + } else { + sourceKey = buildKey(getRow(range.getStartKey()), sources[i].term); + sources[i].seek(new Range(sourceKey, true, null, false)); + } + } + } else { + + + for (int i = 0; i < sourcesCount; i++) { + sourceKey = buildKey(getRow(range.getStartKey()), sources[i].term, range.getStartKey() + .getColumnQualifier()); + sources[i].seek(new Range(sourceKey, true, null, false)); + } + } + + + } else { + +// log.info("Range is invalid."); + for (int i = 0; i < sourcesCount; i++) { + + if (range.getStartKey() != null) { + + sourceKey = buildKey(getRow(range.getStartKey()), sources[i].term); + + // Seek only to the term for this source as a column family + sources[i].seek(new Range(sourceKey, true, null, false)); + } else { + // Seek only to the term for this source as a column family + + sources[i].seek(range); + } + } + } + + advanceToIntersection(); + + } + + + private String[] cqParser(Text cq) { + + String cQ = cq.toString(); + String[] cqComponents = cQ.split(DocIndexIteratorUtil.DOC_ID_INDEX_DELIM); + int id = -1; + String[] valPos = new String[2]; + + + + + if(cqComponents.length > 1) { + id = Integer.parseInt(cqComponents[cqComponents.length-1]); + if (id >= 0) { + valPos[0] = cqComponents[id].toString(); + valPos[1] = "" + id; + } else { + valPos[0] = cqComponents[0].toString(); + valPos[1] = "" + id; + } + } else { + valPos[0] = cq.toString(); + valPos[1] = "" + -1; + } + + return valPos; + + } + + + private boolean rangeCqValid(Range range) { + return (range.getStartKey() != null) && (range.getStartKey().getColumnQualifier() != null); + } + + + + public void addSource(SortedKeyValueIterator<Key,Value> source, IteratorEnvironment env, TextColumn column) { + // Check if we have space for the added Source + if (sources == null) { + sources = new TermSource[1]; + } else { + // allocate space for node, and copy current tree. + // TODO: Should we change this to an ArrayList so that we can just add() ? - ACCUMULO-1309 + TermSource[] localSources = new TermSource[sources.length + 1]; + int currSource = 0; + for (TermSource myTerm : sources) { + // TODO: Do I need to call new here? or can I just re-use the term? - ACCUMULO-1309 + localSources[currSource] = new TermSource(myTerm); + currSource++; + } + sources = localSources; + } + sources[sourcesCount] = new TermSource(source.deepCopy(env), column); + sourcesCount++; + } + + /** + * Encode the columns to be used when iterating. + * + * @param cfg + * @param columns + */ + public static void setColumnFamilies(IteratorSetting cfg, TextColumn[] columns) { + if (columns.length < 2) + throw new IllegalArgumentException("Must supply at least two terms to intersect"); + + boolean[] prefix = new boolean[columns.length]; + + for(int i = 0; i < columns.length; i++) { + prefix[i] = columns[i].isPrefix(); + } + + + + cfg.addOption(DocumentIndexIntersectingIterator.columnPrefix, DocumentIndexIntersectingIterator.encodeBooleans(prefix)); + cfg.addOption(DocumentIndexIntersectingIterator.columnOptionName, DocumentIndexIntersectingIterator.encodeColumns(columns)); + } + + + + + + public static void setContext(IteratorSetting cfg, String context) { + + cfg.addOption(DocumentIndexIntersectingIterator.context, DocumentIndexIntersectingIterator.encodeContext(context)); + + } + + + + + + + + + + + + + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/extras/indexing/src/main/java/mvm/rya/accumulo/documentIndex/TextColumn.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/accumulo/documentIndex/TextColumn.java b/extras/indexing/src/main/java/mvm/rya/accumulo/documentIndex/TextColumn.java new file mode 100644 index 0000000..9747f60 --- /dev/null +++ b/extras/indexing/src/main/java/mvm/rya/accumulo/documentIndex/TextColumn.java @@ -0,0 +1,88 @@ +package mvm.rya.accumulo.documentIndex; + +import org.apache.hadoop.io.Text; + +public class TextColumn { + + + private Text columnFamily; + private Text columnQualifier; + private boolean isPrefix = false; + + + + public TextColumn(Text columnFamily, Text columnQualifier) { + this.columnFamily = columnFamily; + this.columnQualifier = columnQualifier; + } + + + public TextColumn(TextColumn other) { + + this.columnFamily = new Text(other.columnFamily); + this.columnQualifier = new Text(other.columnQualifier); + this.isPrefix = other.isPrefix; + + } + + + public Text getColumnFamily() { + return columnFamily; + } + + + public boolean isPrefix() { + return isPrefix; + } + + + public void setIsPrefix(boolean isPrefix) { + this.isPrefix = isPrefix; + } + + + public boolean isValid() { + return (columnFamily != null && columnQualifier != null); + } + + + + public Text getColumnQualifier() { + return columnQualifier; + } + + + public void setColumnFamily(Text cf) { + this.columnFamily = cf; + } + + public void setColumnQualifier(Text cq) { + this.columnQualifier = cq; + } + + public String toString() { + + return columnFamily.toString() + ", " + columnQualifier.toString() + ", prefix:" + isPrefix; + } + + @Override + public boolean equals(Object other) { + + if(other == null) { + return false; + } + + if(!(other instanceof TextColumn)) { + return false; + } + + TextColumn tc = (TextColumn) other; + + return this.columnFamily.equals(tc.columnFamily) && this.columnQualifier.equals(tc.columnQualifier) && this.isPrefix == tc.isPrefix; + + + + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/extras/indexing/src/main/java/mvm/rya/accumulo/mr/NullFreeTextIndexer.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/accumulo/mr/NullFreeTextIndexer.java b/extras/indexing/src/main/java/mvm/rya/accumulo/mr/NullFreeTextIndexer.java new file mode 100644 index 0000000..11d58c5 --- /dev/null +++ b/extras/indexing/src/main/java/mvm/rya/accumulo/mr/NullFreeTextIndexer.java @@ -0,0 +1,69 @@ +package mvm.rya.accumulo.mr; + +/* + * #%L + * mvm.rya.indexing.accumulo + * %% + * Copyright (C) 2014 Rya + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import info.aduna.iteration.CloseableIteration; + +import java.io.IOException; +import java.util.Set; + +import mvm.rya.accumulo.experimental.AbstractAccumuloIndexer; +import mvm.rya.api.domain.RyaStatement; +import mvm.rya.indexing.FreeTextIndexer; +import mvm.rya.indexing.StatementContraints; + +import org.apache.hadoop.conf.Configuration; +import org.openrdf.model.Statement; +import org.openrdf.model.URI; +import org.openrdf.query.QueryEvaluationException; + +public class NullFreeTextIndexer extends AbstractAccumuloIndexer implements FreeTextIndexer { + + @Override + public String getTableName() { + return null; + } + + @Override + public void storeStatement(RyaStatement statement) throws IOException { + } + + @Override + public Configuration getConf() { + return null; + } + + @Override + public void setConf(Configuration arg0) { + } + + @Override + public CloseableIteration<Statement, QueryEvaluationException> queryText(String query, StatementContraints contraints) + throws IOException { + return null; + } + + @Override + public Set<URI> getIndexablePredicates() { + return null; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/extras/indexing/src/main/java/mvm/rya/accumulo/mr/NullGeoIndexer.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/accumulo/mr/NullGeoIndexer.java b/extras/indexing/src/main/java/mvm/rya/accumulo/mr/NullGeoIndexer.java new file mode 100644 index 0000000..0629adb --- /dev/null +++ b/extras/indexing/src/main/java/mvm/rya/accumulo/mr/NullGeoIndexer.java @@ -0,0 +1,120 @@ +package mvm.rya.accumulo.mr; + +/* + * #%L + * mvm.rya.indexing.accumulo + * %% + * Copyright (C) 2014 Rya + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import info.aduna.iteration.CloseableIteration; + +import java.io.IOException; +import java.util.Set; + +import mvm.rya.accumulo.experimental.AbstractAccumuloIndexer; +import mvm.rya.api.domain.RyaStatement; +import mvm.rya.indexing.GeoIndexer; +import mvm.rya.indexing.StatementContraints; + +import org.apache.hadoop.conf.Configuration; +import org.openrdf.model.Statement; +import org.openrdf.model.URI; +import org.openrdf.query.QueryEvaluationException; + +import com.vividsolutions.jts.geom.Geometry; + +public class NullGeoIndexer extends AbstractAccumuloIndexer implements GeoIndexer { + + @Override + public String getTableName() { + + return null; + } + + @Override + public void storeStatement(RyaStatement statement) throws IOException { + + + } + + @Override + public Configuration getConf() { + + return null; + } + + @Override + public void setConf(Configuration arg0) { + + + } + + @Override + public CloseableIteration<Statement, QueryEvaluationException> queryEquals(Geometry query, StatementContraints contraints) { + + return null; + } + + @Override + public CloseableIteration<Statement, QueryEvaluationException> queryDisjoint(Geometry query, StatementContraints contraints) { + + return null; + } + + @Override + public CloseableIteration<Statement, QueryEvaluationException> queryIntersects(Geometry query, StatementContraints contraints) { + + return null; + } + + @Override + public CloseableIteration<Statement, QueryEvaluationException> queryTouches(Geometry query, StatementContraints contraints) { + + return null; + } + + @Override + public CloseableIteration<Statement, QueryEvaluationException> queryCrosses(Geometry query, StatementContraints contraints) { + + return null; + } + + @Override + public CloseableIteration<Statement, QueryEvaluationException> queryWithin(Geometry query, StatementContraints contraints) { + + return null; + } + + @Override + public CloseableIteration<Statement, QueryEvaluationException> queryContains(Geometry query, StatementContraints contraints) { + + return null; + } + + @Override + public CloseableIteration<Statement, QueryEvaluationException> queryOverlaps(Geometry query, StatementContraints contraints) { + + return null; + } + + @Override + public Set<URI> getIndexablePredicates() { + + return null; + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/extras/indexing/src/main/java/mvm/rya/accumulo/mr/NullTemporalIndexer.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/accumulo/mr/NullTemporalIndexer.java b/extras/indexing/src/main/java/mvm/rya/accumulo/mr/NullTemporalIndexer.java new file mode 100644 index 0000000..cf83178 --- /dev/null +++ b/extras/indexing/src/main/java/mvm/rya/accumulo/mr/NullTemporalIndexer.java @@ -0,0 +1,134 @@ +package mvm.rya.accumulo.mr; + +import info.aduna.iteration.CloseableIteration; + +import java.io.IOException; +import java.util.Collection; +import java.util.Set; + +import mvm.rya.accumulo.experimental.AbstractAccumuloIndexer; +import mvm.rya.api.domain.RyaStatement; +import mvm.rya.api.domain.RyaURI; +import mvm.rya.indexing.StatementContraints; +import mvm.rya.indexing.TemporalIndexer; +import mvm.rya.indexing.TemporalInstant; +import mvm.rya.indexing.TemporalInterval; + +import org.apache.hadoop.conf.Configuration; +import org.openrdf.model.Statement; +import org.openrdf.model.URI; +import org.openrdf.query.QueryEvaluationException; + +/** + * Temporal Indexer that does nothing, like when disabled. + * + */ +public class NullTemporalIndexer extends AbstractAccumuloIndexer implements TemporalIndexer { + + @Override + public String getTableName() { + + return null; + } + + @Override + public void storeStatement(RyaStatement statement) throws IOException { + + + } + + @Override + public Configuration getConf() { + + return null; + } + + @Override + public void setConf(Configuration arg0) { + + + } + + @Override + public CloseableIteration<Statement, QueryEvaluationException> queryInstantEqualsInstant(TemporalInstant queryInstant, + StatementContraints contraints) throws QueryEvaluationException { + + return null; + } + + @Override + public CloseableIteration<Statement, QueryEvaluationException> queryInstantBeforeInstant(TemporalInstant queryInstant, + StatementContraints contraints) throws QueryEvaluationException { + + return null; + } + + @Override + public CloseableIteration<Statement, QueryEvaluationException> queryInstantAfterInstant(TemporalInstant queryInstant, + StatementContraints contraints) throws QueryEvaluationException { + + return null; + } + + @Override + public CloseableIteration<Statement, QueryEvaluationException> queryInstantBeforeInterval(TemporalInterval givenInterval, + StatementContraints contraints) throws QueryEvaluationException { + + return null; + } + + @Override + public CloseableIteration<Statement, QueryEvaluationException> queryInstantAfterInterval(TemporalInterval givenInterval, + StatementContraints contraints) throws QueryEvaluationException { + + return null; + } + + @Override + public CloseableIteration<Statement, QueryEvaluationException> queryInstantInsideInterval(TemporalInterval givenInterval, + StatementContraints contraints) throws QueryEvaluationException { + + return null; + } + + @Override + public CloseableIteration<Statement, QueryEvaluationException> queryInstantHasBeginningInterval(TemporalInterval queryInterval, + StatementContraints contraints) throws QueryEvaluationException { + + return null; + } + + @Override + public CloseableIteration<Statement, QueryEvaluationException> queryInstantHasEndInterval(TemporalInterval queryInterval, + StatementContraints contraints) throws QueryEvaluationException { + + return null; + } + + @Override + public CloseableIteration<Statement, QueryEvaluationException> queryIntervalEquals(TemporalInterval query, + StatementContraints contraints) throws QueryEvaluationException { + + return null; + } + + @Override + public CloseableIteration<Statement, QueryEvaluationException> queryIntervalBefore(TemporalInterval query, + StatementContraints contraints) throws QueryEvaluationException { + + return null; + } + + @Override + public CloseableIteration<Statement, QueryEvaluationException> queryIntervalAfter(TemporalInterval query, StatementContraints contraints) + throws QueryEvaluationException { + + return null; + } + + @Override + public Set<URI> getIndexablePredicates() { + + return null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/extras/indexing/src/main/java/mvm/rya/accumulo/mr/RyaOutputFormat.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/accumulo/mr/RyaOutputFormat.java b/extras/indexing/src/main/java/mvm/rya/accumulo/mr/RyaOutputFormat.java new file mode 100644 index 0000000..1e26626 --- /dev/null +++ b/extras/indexing/src/main/java/mvm/rya/accumulo/mr/RyaOutputFormat.java @@ -0,0 +1,328 @@ +package mvm.rya.accumulo.mr; + +/* + * #%L + * mvm.rya.indexing.accumulo + * %% + * Copyright (C) 2014 Rya + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.io.Closeable; +import java.io.Flushable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; + +import mvm.rya.accumulo.AccumuloRdfConfiguration; +import mvm.rya.accumulo.AccumuloRyaDAO; +import mvm.rya.accumulo.mr.utils.MRUtils; +import mvm.rya.api.domain.RyaStatement; +import mvm.rya.api.persist.RyaDAOException; +import mvm.rya.api.resolver.RdfToRyaConversions; +import mvm.rya.indexing.FreeTextIndexer; +import mvm.rya.indexing.GeoIndexer; +import mvm.rya.indexing.TemporalIndexer; +import mvm.rya.indexing.accumulo.ConfigUtils; +import mvm.rya.indexing.accumulo.StatementSerializer; +import mvm.rya.indexing.accumulo.freetext.AccumuloFreeTextIndexer; +import mvm.rya.indexing.accumulo.geo.GeoMesaGeoIndexer; +import mvm.rya.indexing.accumulo.temporal.AccumuloTemporalIndexer; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.TableExistsException; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.data.Mutation; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; +import org.apache.log4j.Logger; +import org.geotools.feature.SchemaException; +import org.openrdf.model.Statement; + +/** + * Hadoop Map/Reduce class to use Rya, the {@link GeoIndexer}, the {@link FreeTextIndexer}, and the {@link TemporalIndexer} as the sink of {@link Statement} data. + * wrapped in an {@link StatementWritable} objects. This {@link OutputFormat} ignores the Keys and only writes the Values to Rya. + * + * The user must specify connection parameters for Rya, {@link GeoIndexer}, {@link FreeTextIndexer}, and {@link TemporalIndexer}. + */ +public class RyaOutputFormat extends OutputFormat<Writable, StatementWritable> { + private static final Logger logger = Logger.getLogger(RyaOutputFormat.class); + + private static final String PREFIX = RyaOutputFormat.class.getSimpleName(); + private static final String MAX_MUTATION_BUFFER_SIZE = PREFIX + ".maxmemory"; + private static final String ENABLE_FREETEXT = PREFIX + ".freetext.enable"; + private static final String ENABLE_GEO = PREFIX + ".geo.enable"; + private static final String ENABLE_TEMPORAL = PREFIX + ".temporal.enable";; + + + @Override + public void checkOutputSpecs(JobContext jobContext) throws IOException, InterruptedException { + Configuration conf = jobContext.getConfiguration(); + + // make sure that all of the indexers can connect + getGeoIndexer(conf); + getFreeTextIndexer(conf); + getTemporalIndexer(conf); + getRyaIndexer(conf); + } + + @Override + public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException { + // copied from AccumuloOutputFormat + return new NullOutputFormat<Text, Mutation>().getOutputCommitter(context); + } + + @Override + public RecordWriter<Writable, StatementWritable> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException { + return new RyaRecordWriter(context); + } + + private static GeoIndexer getGeoIndexer(Configuration conf) throws IOException { + if (!conf.getBoolean(ENABLE_GEO, true)) { + return new NullGeoIndexer(); + } + + GeoMesaGeoIndexer geo = new GeoMesaGeoIndexer(); + geo.setConf(conf); + return geo; + + } + + private static FreeTextIndexer getFreeTextIndexer(Configuration conf) throws IOException { + if (!conf.getBoolean(ENABLE_FREETEXT, true)) { + return new NullFreeTextIndexer(); + } + + AccumuloFreeTextIndexer freeText = new AccumuloFreeTextIndexer(); + freeText.setConf(conf); + return freeText; + + } + + private static TemporalIndexer getTemporalIndexer(Configuration conf) throws IOException { + if (!conf.getBoolean(ENABLE_TEMPORAL, true)) { + return new NullTemporalIndexer(); + } + AccumuloTemporalIndexer temporal = new AccumuloTemporalIndexer(); + temporal.setConf(conf); + return temporal; + } + + private static AccumuloRyaDAO getRyaIndexer(Configuration conf) throws IOException { + try { + AccumuloRyaDAO ryaIndexer = new AccumuloRyaDAO(); + Connector conn = ConfigUtils.getConnector(conf); + ryaIndexer.setConnector(conn); + + AccumuloRdfConfiguration ryaConf = new AccumuloRdfConfiguration(); + + String tablePrefix = conf.get(MRUtils.TABLE_PREFIX_PROPERTY, null); + if (tablePrefix != null) { + ryaConf.setTablePrefix(tablePrefix); + } + ryaConf.setDisplayQueryPlan(false); + ryaIndexer.setConf(ryaConf); + ryaIndexer.init(); + return ryaIndexer; + } catch (AccumuloException e) { + logger.error("Cannot create RyaIndexer", e); + throw new IOException(e); + } catch (AccumuloSecurityException e) { + logger.error("Cannot create RyaIndexer", e); + throw new IOException(e); + } catch (RyaDAOException e) { + logger.error("Cannot create RyaIndexer", e); + throw new IOException(e); + } + } + + public static class RyaRecordWriter extends RecordWriter<Writable, StatementWritable> implements Closeable, Flushable { + private static final Logger logger = Logger.getLogger(RyaRecordWriter.class); + + private FreeTextIndexer freeTextIndexer; + private GeoIndexer geoIndexer; + private TemporalIndexer temporalIndexer; + private AccumuloRyaDAO ryaIndexer; + + private static final long ONE_MEGABYTE = 1024L * 1024L; + private static final long AVE_STATEMENT_SIZE = 100L; + + private long bufferSizeLimit; + private long bufferCurrentSize = 0; + + private ArrayList<RyaStatement> buffer; + + public RyaRecordWriter(TaskAttemptContext context) throws IOException { + this(context.getConfiguration()); + } + + public RyaRecordWriter(Configuration conf) throws IOException { + // set up the buffer + bufferSizeLimit = conf.getLong(MAX_MUTATION_BUFFER_SIZE, ONE_MEGABYTE); + int bufferCapacity = (int) (bufferSizeLimit / AVE_STATEMENT_SIZE); + buffer = new ArrayList<RyaStatement>(bufferCapacity); + + // set up the indexers + freeTextIndexer = getFreeTextIndexer(conf); + geoIndexer = getGeoIndexer(conf); + temporalIndexer = getTemporalIndexer(conf); + ryaIndexer = getRyaIndexer(conf); + + // update fields used for metrics + startTime = System.currentTimeMillis(); + lastCommitFinishTime = startTime; + } + + @Override + public void flush() throws IOException { + flushBuffer(); + } + + @Override + public void close() throws IOException { + close(null); + } + + @Override + public void close(TaskAttemptContext paramTaskAttemptContext) throws IOException { + // close everything. log errors + try { + flush(); + } catch (IOException e) { + logger.error("Error flushing the buffer on RyaOutputFormat Close", e); + } + try { + if (geoIndexer != null) + geoIndexer.close(); + } catch (IOException e) { + logger.error("Error closing the geoIndexer on RyaOutputFormat Close", e); + } + try { + if (freeTextIndexer != null) + freeTextIndexer.close(); + } catch (IOException e) { + logger.error("Error closing the freetextIndexer on RyaOutputFormat Close", e); + } + try { + if (temporalIndexer != null) + temporalIndexer.close(); + } catch (IOException e) { + logger.error("Error closing the temporalIndexer on RyaOutputFormat Close", e); + } + try { + ryaIndexer.destroy(); + } catch (RyaDAOException e) { + logger.error("Error closing RyaDAO on RyaOutputFormat Close", e); + } + } + + public void write(Statement statement) throws IOException, InterruptedException { + write(null, new StatementWritable(statement)); + } + + @Override + public void write(Writable key, StatementWritable value) throws IOException, InterruptedException { + buffer.add(RdfToRyaConversions.convertStatement(value)); + + bufferCurrentSize += StatementSerializer.writeStatement(value).length(); + + if (bufferCurrentSize >= bufferSizeLimit) { + flushBuffer(); + } + } + + // fields for storing metrics + private long startTime = 0; + private long lastCommitFinishTime = 0; + private long totalCommitRecords = 0; + + private double totalReadDuration = 0; + private double totalWriteDuration = 0; + + private long commitCount = 0; + + private void flushBuffer() throws IOException { + totalCommitRecords += buffer.size(); + commitCount++; + + long startCommitTime = System.currentTimeMillis(); + + logger.info(String.format("(C-%d) Flushing buffer with %,d objects and %,d bytes", commitCount, buffer.size(), + bufferCurrentSize)); + + double readingDuration = (startCommitTime - lastCommitFinishTime) / 1000.; + totalReadDuration += readingDuration; + double currentReadRate = buffer.size() / readingDuration; + double totalReadRate = totalCommitRecords / totalReadDuration; + + // Print "reading" metrics + logger.info(String.format("(C-%d) (Reading) Duration, Current Rate, Total Rate: %.2f %.2f %.2f ", commitCount, readingDuration, + currentReadRate, totalReadRate)); + + // write to geo + geoIndexer.storeStatements(buffer); + geoIndexer.flush(); + + // write to free text + freeTextIndexer.storeStatements(buffer); + freeTextIndexer.flush(); + + // write to temporal + temporalIndexer.storeStatements(buffer); + temporalIndexer.flush(); + + // write to rya + try { + ryaIndexer.add(buffer.iterator()); + } catch (RyaDAOException e) { + logger.error("Cannot writing statement to Rya", e); + throw new IOException(e); + } + + lastCommitFinishTime = System.currentTimeMillis(); + + double writingDuration = (lastCommitFinishTime - startCommitTime) / 1000.; + totalWriteDuration += writingDuration; + double currentWriteRate = buffer.size() / writingDuration; + double totalWriteRate = totalCommitRecords / totalWriteDuration; + + // Print "writing" stats + logger.info(String.format("(C-%d) (Writing) Duration, Current Rate, Total Rate: %.2f %.2f %.2f ", commitCount, writingDuration, + currentWriteRate, totalWriteRate)); + + double processDuration = writingDuration + readingDuration; + double totalProcessDuration = totalWriteDuration + totalReadDuration; + double currentProcessRate = buffer.size() / processDuration; + double totalProcessRate = totalCommitRecords / (totalProcessDuration); + + // Print "total" stats + logger.info(String.format("(C-%d) (Total) Duration, Current Rate, Total Rate: %.2f %.2f %.2f ", commitCount, processDuration, + currentProcessRate, totalProcessRate)); + + // clear the buffer + buffer.clear(); + bufferCurrentSize = 0L; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/extras/indexing/src/main/java/mvm/rya/accumulo/mr/StatementWritable.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/accumulo/mr/StatementWritable.java b/extras/indexing/src/main/java/mvm/rya/accumulo/mr/StatementWritable.java new file mode 100644 index 0000000..5c43687 --- /dev/null +++ b/extras/indexing/src/main/java/mvm/rya/accumulo/mr/StatementWritable.java @@ -0,0 +1,85 @@ +package mvm.rya.accumulo.mr; + +/* + * #%L + * mvm.rya.indexing.accumulo + * %% + * Copyright (C) 2014 Rya + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import mvm.rya.indexing.accumulo.StatementSerializer; + +import org.apache.hadoop.io.Writable; +import org.openrdf.model.Resource; +import org.openrdf.model.Statement; +import org.openrdf.model.URI; +import org.openrdf.model.Value; + +/** + * A {@link Writable} wrapper for {@link Statement} objects. + */ +@SuppressWarnings("serial") +public class StatementWritable implements Statement, Writable { + + private Statement statement; + + public StatementWritable(Statement statement) { + setStatement(statement); + } + + public void setStatement(Statement statement) { + this.statement = statement; + } + + public Statement getStatement() { + return statement; + } + + @Override + public void readFields(DataInput paramDataInput) throws IOException { + statement = StatementSerializer.readStatement(paramDataInput.readUTF()); + } + + @Override + public void write(DataOutput paramDataOutput) throws IOException { + paramDataOutput.writeUTF(StatementSerializer.writeStatement(statement)); + } + + @Override + public Resource getSubject() { + return statement.getSubject(); + } + + @Override + public URI getPredicate() { + return statement.getPredicate(); + } + + @Override + public Value getObject() { + return statement.getObject(); + } + + @Override + public Resource getContext() { + return statement.getContext(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/extras/indexing/src/main/java/mvm/rya/accumulo/mr/fileinput/BulkNtripsInputToolIndexing.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/accumulo/mr/fileinput/BulkNtripsInputToolIndexing.java b/extras/indexing/src/main/java/mvm/rya/accumulo/mr/fileinput/BulkNtripsInputToolIndexing.java new file mode 100644 index 0000000..1b469cd --- /dev/null +++ b/extras/indexing/src/main/java/mvm/rya/accumulo/mr/fileinput/BulkNtripsInputToolIndexing.java @@ -0,0 +1,226 @@ +package mvm.rya.accumulo.mr.fileinput; + +/* + * #%L + * mvm.rya.indexing.accumulo + * %% + * Copyright (C) 2014 Rya + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import static com.google.common.base.Preconditions.checkNotNull; + +import java.io.IOException; +import java.io.StringReader; + +import mvm.rya.accumulo.AccumuloRdfConfiguration; +import mvm.rya.accumulo.mr.utils.MRUtils; +import mvm.rya.api.resolver.RdfToRyaConversions; +import mvm.rya.indexing.FreeTextIndexer; +import mvm.rya.indexing.GeoIndexer; +import mvm.rya.indexing.accumulo.ConfigUtils; +import mvm.rya.indexing.accumulo.freetext.AccumuloFreeTextIndexer; +import mvm.rya.indexing.accumulo.geo.GeoMesaGeoIndexer; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.TableExistsException; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; +import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.apache.log4j.Logger; +import org.geotools.feature.SchemaException; +import org.openrdf.model.Resource; +import org.openrdf.model.Statement; +import org.openrdf.model.ValueFactory; +import org.openrdf.model.impl.ContextStatementImpl; +import org.openrdf.model.impl.ValueFactoryImpl; +import org.openrdf.rio.ParserConfig; +import org.openrdf.rio.RDFFormat; +import org.openrdf.rio.RDFHandlerException; +import org.openrdf.rio.RDFParseException; +import org.openrdf.rio.RDFParser; +import org.openrdf.rio.Rio; +import org.openrdf.rio.helpers.RDFHandlerBase; + +import com.google.common.base.Preconditions; + +/** + * Take large ntrips files and use MapReduce to ingest into other indexing + */ +public class BulkNtripsInputToolIndexing extends Configured implements Tool { + + private String userName = null; + private String pwd = null; + private String instance = null; + private String zk = null; + + private String format = RDFFormat.NTRIPLES.getName(); + + @Override + public int run(final String[] args) throws Exception { + final Configuration conf = getConf(); + // conf + zk = conf.get(MRUtils.AC_ZK_PROP, zk); + instance = conf.get(MRUtils.AC_INSTANCE_PROP, instance); + userName = conf.get(MRUtils.AC_USERNAME_PROP, userName); + pwd = conf.get(MRUtils.AC_PWD_PROP, pwd); + format = conf.get(MRUtils.FORMAT_PROP, format); + + String auths = conf.get(MRUtils.AC_CV_PROP, ""); + + conf.set(MRUtils.FORMAT_PROP, format); + Preconditions.checkNotNull(zk, MRUtils.AC_ZK_PROP + " not set"); + Preconditions.checkNotNull(instance, MRUtils.AC_INSTANCE_PROP + " not set"); + Preconditions.checkNotNull(userName, MRUtils.AC_USERNAME_PROP + " not set"); + Preconditions.checkNotNull(pwd, MRUtils.AC_PWD_PROP + " not set"); + + // map the config values to free text configu values + conf.set(ConfigUtils.CLOUDBASE_ZOOKEEPERS, zk); + conf.set(ConfigUtils.CLOUDBASE_INSTANCE, instance); + conf.set(ConfigUtils.CLOUDBASE_USER, userName); + conf.set(ConfigUtils.CLOUDBASE_PASSWORD, pwd); + conf.set(ConfigUtils.CLOUDBASE_AUTHS, auths); + + final String inputDir = args[0]; + + String tablePrefix = conf.get(MRUtils.TABLE_PREFIX_PROPERTY, null); + Preconditions.checkNotNull(tablePrefix, MRUtils.TABLE_PREFIX_PROPERTY + " not set"); + + String docTextTable = tablePrefix + "text"; + conf.set(ConfigUtils.FREE_TEXT_DOC_TABLENAME, docTextTable); + + String docTermTable = tablePrefix + "terms"; + conf.set(ConfigUtils.FREE_TEXT_TERM_TABLENAME, docTermTable); + + String geoTable = tablePrefix + "geo"; + conf.set(ConfigUtils.GEO_TABLENAME, geoTable); + + System.out.println("Loading data into tables[freetext, geo]"); + System.out.println("Loading data into tables[" + docTermTable + " " + docTextTable + " " + geoTable + "]"); + + Job job = new Job(new Configuration(conf), "Bulk Ingest load data into Indexing Tables"); + job.setJarByClass(this.getClass()); + + // setting long job + Configuration jobConf = job.getConfiguration(); + jobConf.setBoolean("mapred.map.tasks.speculative.execution", false); + jobConf.setBoolean("mapred.reduce.tasks.speculative.execution", false); + jobConf.set("io.sort.mb", jobConf.get("io.sort.mb", "256")); + jobConf.setBoolean("mapred.compress.map.output", true); + + job.setInputFormatClass(TextInputFormat.class); + + job.setMapperClass(ParseNtripsMapper.class); + + // I'm not actually going to write output. + job.setOutputFormatClass(NullOutputFormat.class); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(Text.class); + job.setMapOutputKeyClass(Text.class); + job.setMapOutputValueClass(Text.class); + + TextInputFormat.setInputPaths(job, new Path(inputDir)); + + job.setNumReduceTasks(0); + + job.waitForCompletion(true); + + return 0; + } + + public static void main(String[] args) throws Exception { + ToolRunner.run(new Configuration(), new BulkNtripsInputToolIndexing(), args); + } + + public static class ParseNtripsMapper extends Mapper<LongWritable, Text, Text, Text> { + private static final Logger logger = Logger.getLogger(ParseNtripsMapper.class); + + public static final String TABLE_PROPERTY = "parsentripsmapper.table"; + + private RDFParser parser; + private FreeTextIndexer freeTextIndexer; + private GeoIndexer geoIndexer; + private String rdfFormat; + + @Override + protected void setup(final Context context) throws IOException, InterruptedException { + super.setup(context); + Configuration conf = context.getConfiguration(); + + freeTextIndexer = new AccumuloFreeTextIndexer(); + freeTextIndexer.setConf(conf); + geoIndexer = new GeoMesaGeoIndexer(); + geoIndexer.setConf(conf); + final ValueFactory vf = new ValueFactoryImpl(); + + rdfFormat = conf.get(MRUtils.FORMAT_PROP); + checkNotNull(rdfFormat, "Rdf format cannot be null"); + + String namedGraphString = conf.get(MRUtils.NAMED_GRAPH_PROP); + checkNotNull(namedGraphString, MRUtils.NAMED_GRAPH_PROP + " cannot be null"); + + final Resource namedGraph = vf.createURI(namedGraphString); + + parser = Rio.createParser(RDFFormat.valueOf(rdfFormat)); + parser.setParserConfig(new ParserConfig(true, true, true, RDFParser.DatatypeHandling.VERIFY)); + parser.setRDFHandler(new RDFHandlerBase() { + + @Override + public void handleStatement(Statement statement) throws RDFHandlerException { + Statement contextStatement = new ContextStatementImpl(statement.getSubject(), statement + .getPredicate(), statement.getObject(), namedGraph); + try { + freeTextIndexer.storeStatement(RdfToRyaConversions.convertStatement(contextStatement)); + geoIndexer.storeStatement(RdfToRyaConversions.convertStatement(contextStatement)); + } catch (IOException e) { + logger.error("Error creating indexers", e); + } + } + }); + } + + @Override + public void map(LongWritable key, Text value, Context output) throws IOException, InterruptedException { + String rdf = value.toString(); + try { + parser.parse(new StringReader(rdf), ""); + } catch (RDFParseException e) { + System.out.println("Line[" + rdf + "] cannot be formatted with format[" + rdfFormat + "]. Exception[" + e.getMessage() + + "]"); + } catch (Exception e) { + logger.error("error during map", e); + throw new IOException("Exception occurred parsing triple[" + rdf + "]"); + } + } + + @Override + public void cleanup(Context context) { + IOUtils.closeStream(freeTextIndexer); + IOUtils.closeStream(geoIndexer); + } + } + +}
