Modified: incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/util/BaseKeyParser.java URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/util/BaseKeyParser.java?rev=1210600&r1=1210599&r2=1210600&view=diff ============================================================================== --- incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/util/BaseKeyParser.java (original) +++ incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/util/BaseKeyParser.java Mon Dec 5 20:05:49 2011 @@ -1,19 +1,19 @@ /* -* Licensed to the Apache Software Foundation (ASF) under one or more -* contributor license agreements. See the NOTICE file distributed with -* this work for additional information regarding copyright ownership. -* The ASF licenses this file to You under the Apache License, Version 2.0 -* (the "License"); you may not use this file except in compliance with -* the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package util; import java.util.HashMap; @@ -21,68 +21,57 @@ import java.util.Map; import org.apache.accumulo.core.data.Key; -public class BaseKeyParser -{ - public static final String ROW_FIELD = "row"; - public static final String COLUMN_FAMILY_FIELD = "columnFamily"; - public static final String COLUMN_QUALIFIER_FIELD = "columnQualifier"; - - protected Map <String, String> keyFields = new HashMap <String, String> (); - protected Key key = null; - - /** - * Parses a Key object into its constituent fields. This method - * clears any prior values, so the object can be reused - * without requiring a new instantiation. This default implementation - * makes the row, columnFamily, and columnQualifier available. - * - * @param key - */ - public void parse (Key key) - { - this.key = key; - - keyFields.clear(); - - keyFields.put (ROW_FIELD, key.getRow().toString()); - keyFields.put (COLUMN_FAMILY_FIELD, key.getColumnFamily().toString()); - keyFields.put (COLUMN_QUALIFIER_FIELD, key.getColumnQualifier().toString()); - } - - public String getFieldValue (String fieldName) - { - return keyFields.get(fieldName); - } - - public String[] getFieldNames() - { - String[] fieldNames = new String[keyFields.size()]; - return keyFields.keySet().toArray(fieldNames); - } - - public BaseKeyParser duplicate () - { - return new BaseKeyParser(); - } - - public String getRow() - { - return keyFields.get(ROW_FIELD); - } - - public String getColumnFamily () - { - return keyFields.get(COLUMN_FAMILY_FIELD); - } - - public String getColumnQualifier() - { - return keyFields.get(COLUMN_QUALIFIER_FIELD); - } - - public Key getKey() - { - return this.key; - } - +public class BaseKeyParser { + public static final String ROW_FIELD = "row"; + public static final String COLUMN_FAMILY_FIELD = "columnFamily"; + public static final String COLUMN_QUALIFIER_FIELD = "columnQualifier"; + + protected Map<String,String> keyFields = new HashMap<String,String>(); + protected Key key = null; + + /** + * Parses a Key object into its constituent fields. This method clears any prior values, so the object can be reused without requiring a new instantiation. + * This default implementation makes the row, columnFamily, and columnQualifier available. + * + * @param key + */ + public void parse(Key key) { + this.key = key; + + keyFields.clear(); + + keyFields.put(ROW_FIELD, key.getRow().toString()); + keyFields.put(COLUMN_FAMILY_FIELD, key.getColumnFamily().toString()); + keyFields.put(COLUMN_QUALIFIER_FIELD, key.getColumnQualifier().toString()); + } + + public String getFieldValue(String fieldName) { + return keyFields.get(fieldName); + } + + public String[] getFieldNames() { + String[] fieldNames = new String[keyFields.size()]; + return keyFields.keySet().toArray(fieldNames); + } + + public BaseKeyParser duplicate() { + return new BaseKeyParser(); + } + + public String getRow() { + return keyFields.get(ROW_FIELD); + } + + public String getColumnFamily() { + return keyFields.get(COLUMN_FAMILY_FIELD); + } + + public String getColumnQualifier() { + return keyFields.get(COLUMN_QUALIFIER_FIELD); + } + + public Key getKey() { + return this.key; + } + }
Modified: incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/util/FieldIndexKeyParser.java URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/util/FieldIndexKeyParser.java?rev=1210600&r1=1210599&r2=1210600&view=diff ============================================================================== --- incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/util/FieldIndexKeyParser.java (original) +++ incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/util/FieldIndexKeyParser.java Mon Dec 5 20:05:49 2011 @@ -1,78 +1,71 @@ /* -* Licensed to the Apache Software Foundation (ASF) under one or more -* contributor license agreements. See the NOTICE file distributed with -* this work for additional information regarding copyright ownership. -* The ASF licenses this file to You under the Apache License, Version 2.0 -* (the "License"); you may not use this file except in compliance with -* the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package util; import org.apache.accumulo.core.data.Key; public class FieldIndexKeyParser extends KeyParser { - - public static final String DELIMITER = "\0"; - @Override - public void parse(Key key) - { - super.parse (key); - - String[] colFamParts = this.keyFields.get(BaseKeyParser.COLUMN_FAMILY_FIELD).split(DELIMITER); - this.keyFields.put(FIELDNAME_FIELD, colFamParts.length >= 2 ? colFamParts[1] : ""); - - String[] colQualParts = this.keyFields.get(BaseKeyParser.COLUMN_QUALIFIER_FIELD).split(DELIMITER); - this.keyFields.put(SELECTOR_FIELD, colQualParts.length >= 1 ? colQualParts[0] : ""); - this.keyFields.put(DATATYPE_FIELD, colQualParts.length >= 2 ? colQualParts[1] : ""); - this.keyFields.put(UID_FIELD, colQualParts.length >= 3 ? colQualParts[2] : ""); - } - - @Override - public BaseKeyParser duplicate () - { - return new FieldIndexKeyParser(); - } - - @Override - public String getSelector() - { - return keyFields.get(SELECTOR_FIELD); - } - - @Override - public String getDataType() - { - return keyFields.get(DATATYPE_FIELD); - } - - @Override - public String getFieldName () - { - return keyFields.get(FIELDNAME_FIELD); - } - - @Override - public String getUid() - { - return keyFields.get(UID_FIELD); - } - - public String getDataTypeUid() - { - return getDataType()+DELIMITER+getUid(); - } - - // An alias for getSelector - public String getFieldValue() - { - return getSelector(); - } + + public static final String DELIMITER = "\0"; + + @Override + public void parse(Key key) { + super.parse(key); + + String[] colFamParts = this.keyFields.get(BaseKeyParser.COLUMN_FAMILY_FIELD).split(DELIMITER); + this.keyFields.put(FIELDNAME_FIELD, colFamParts.length >= 2 ? colFamParts[1] : ""); + + String[] colQualParts = this.keyFields.get(BaseKeyParser.COLUMN_QUALIFIER_FIELD).split(DELIMITER); + this.keyFields.put(SELECTOR_FIELD, colQualParts.length >= 1 ? colQualParts[0] : ""); + this.keyFields.put(DATATYPE_FIELD, colQualParts.length >= 2 ? colQualParts[1] : ""); + this.keyFields.put(UID_FIELD, colQualParts.length >= 3 ? colQualParts[2] : ""); + } + + @Override + public BaseKeyParser duplicate() { + return new FieldIndexKeyParser(); + } + + @Override + public String getSelector() { + return keyFields.get(SELECTOR_FIELD); + } + + @Override + public String getDataType() { + return keyFields.get(DATATYPE_FIELD); + } + + @Override + public String getFieldName() { + return keyFields.get(FIELDNAME_FIELD); + } + + @Override + public String getUid() { + return keyFields.get(UID_FIELD); + } + + public String getDataTypeUid() { + return getDataType() + DELIMITER + getUid(); + } + + // An alias for getSelector + public String getFieldValue() { + return getSelector(); + } } Modified: incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/util/KeyParser.java URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/util/KeyParser.java?rev=1210600&r1=1210599&r2=1210600&view=diff ============================================================================== --- incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/util/KeyParser.java (original) +++ incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/util/KeyParser.java Mon Dec 5 20:05:49 2011 @@ -1,79 +1,70 @@ /* -* Licensed to the Apache Software Foundation (ASF) under one or more -* contributor license agreements. See the NOTICE file distributed with -* this work for additional information regarding copyright ownership. -* The ASF licenses this file to You under the Apache License, Version 2.0 -* (the "License"); you may not use this file except in compliance with -* the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package util; import org.apache.accumulo.core.data.Key; -public class KeyParser extends BaseKeyParser -{ - public static final String SELECTOR_FIELD = "selector"; - public static final String DATATYPE_FIELD = "dataType"; - public static final String FIELDNAME_FIELD = "fieldName"; - public static final String UID_FIELD = "uid"; - public static final String DELIMITER = "\0"; +public class KeyParser extends BaseKeyParser { + public static final String SELECTOR_FIELD = "selector"; + public static final String DATATYPE_FIELD = "dataType"; + public static final String FIELDNAME_FIELD = "fieldName"; + public static final String UID_FIELD = "uid"; + public static final String DELIMITER = "\0"; + + @Override + public void parse(Key key) { + super.parse(key); - @Override - public void parse(Key key) - { - super.parse (key); - - String[] colFamParts = this.keyFields.get(BaseKeyParser.COLUMN_FAMILY_FIELD).split(DELIMITER); - this.keyFields.put(FIELDNAME_FIELD, colFamParts.length >= 2 ? colFamParts[1] : ""); - - String[] colQualParts = this.keyFields.get(BaseKeyParser.COLUMN_QUALIFIER_FIELD).split(DELIMITER); - this.keyFields.put(SELECTOR_FIELD, colQualParts.length >= 1 ? colQualParts[0] : ""); - this.keyFields.put(DATATYPE_FIELD, colQualParts.length >= 2 ? colQualParts[1] : ""); - this.keyFields.put(UID_FIELD, colQualParts.length >= 3 ? colQualParts[2] : ""); - } - - @Override - public BaseKeyParser duplicate () - { - return new KeyParser(); - } - - public String getSelector() - { - return keyFields.get(SELECTOR_FIELD); - } - - public String getDataType() - { - return keyFields.get(DATATYPE_FIELD); - } - - public String getFieldName () - { - return keyFields.get(FIELDNAME_FIELD); - } + String[] colFamParts = this.keyFields.get(BaseKeyParser.COLUMN_FAMILY_FIELD).split(DELIMITER); + this.keyFields.put(FIELDNAME_FIELD, colFamParts.length >= 2 ? colFamParts[1] : ""); - public String getUid() - { - return keyFields.get(UID_FIELD); - } - - public String getDataTypeUid() - { - return getDataType()+DELIMITER+getUid(); - } - - // An alias for getSelector - public String getFieldValue() - { - return getSelector(); - } + String[] colQualParts = this.keyFields.get(BaseKeyParser.COLUMN_QUALIFIER_FIELD).split(DELIMITER); + this.keyFields.put(SELECTOR_FIELD, colQualParts.length >= 1 ? colQualParts[0] : ""); + this.keyFields.put(DATATYPE_FIELD, colQualParts.length >= 2 ? colQualParts[1] : ""); + this.keyFields.put(UID_FIELD, colQualParts.length >= 3 ? colQualParts[2] : ""); + } + + @Override + public BaseKeyParser duplicate() { + return new KeyParser(); + } + + public String getSelector() { + return keyFields.get(SELECTOR_FIELD); + } + + public String getDataType() { + return keyFields.get(DATATYPE_FIELD); + } + + public String getFieldName() { + return keyFields.get(FIELDNAME_FIELD); + } + + public String getUid() { + return keyFields.get(UID_FIELD); + } + + public String getDataTypeUid() { + return getDataType() + DELIMITER + getUid(); + } + + // An alias for getSelector + public String getFieldValue() { + return getSelector(); + } } Modified: incubator/accumulo/trunk/contrib/accumulo_sample/query/src/test/java/logic/StandaloneStatusReporter.java URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/contrib/accumulo_sample/query/src/test/java/logic/StandaloneStatusReporter.java?rev=1210600&r1=1210599&r2=1210600&view=diff ============================================================================== --- incubator/accumulo/trunk/contrib/accumulo_sample/query/src/test/java/logic/StandaloneStatusReporter.java (original) +++ incubator/accumulo/trunk/contrib/accumulo_sample/query/src/test/java/logic/StandaloneStatusReporter.java Mon Dec 5 20:05:49 2011 @@ -1,19 +1,19 @@ /* -* Licensed to the Apache Software Foundation (ASF) under one or more -* contributor license agreements. See the NOTICE file distributed with -* this work for additional information regarding copyright ownership. -* The ASF licenses this file to You under the Apache License, Version 2.0 -* (the "License"); you may not use this file except in compliance with -* the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package logic; import org.apache.hadoop.mapreduce.Counter; @@ -21,50 +21,50 @@ import org.apache.hadoop.mapreduce.Count import org.apache.hadoop.mapreduce.StatusReporter; public class StandaloneStatusReporter extends StatusReporter { - - private Counters c = new Counters(); - - private long filesProcessed = 0; - private long recordsProcessed = 0; - - public Counters getCounters() { - return c; - } - - @Override - public Counter getCounter(Enum<?> name) { - return c.findCounter(name); - } - - @Override - public Counter getCounter(String group, String name) { - return c.findCounter(group, name); - } - - @Override - public void progress() { - // do nothing - } - - @Override - public void setStatus(String status) { - // do nothing - } - - public long getFilesProcessed() { - return filesProcessed; - } - - public long getRecordsProcessed() { - return recordsProcessed; - } - - public void incrementFilesProcessed() { - filesProcessed++; - recordsProcessed = 0; - } - - public void incrementRecordsProcessed() { - recordsProcessed++; - } + + private Counters c = new Counters(); + + private long filesProcessed = 0; + private long recordsProcessed = 0; + + public Counters getCounters() { + return c; + } + + @Override + public Counter getCounter(Enum<?> name) { + return c.findCounter(name); + } + + @Override + public Counter getCounter(String group, String name) { + return c.findCounter(group, name); + } + + @Override + public void progress() { + // do nothing + } + + @Override + public void setStatus(String status) { + // do nothing + } + + public long getFilesProcessed() { + return filesProcessed; + } + + public long getRecordsProcessed() { + return recordsProcessed; + } + + public void incrementFilesProcessed() { + filesProcessed++; + recordsProcessed = 0; + } + + public void incrementRecordsProcessed() { + recordsProcessed++; + } } Modified: incubator/accumulo/trunk/contrib/accumulo_sample/query/src/test/java/logic/TestQueryLogic.java URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/contrib/accumulo_sample/query/src/test/java/logic/TestQueryLogic.java?rev=1210600&r1=1210599&r2=1210600&view=diff ============================================================================== --- incubator/accumulo/trunk/contrib/accumulo_sample/query/src/test/java/logic/TestQueryLogic.java (original) +++ incubator/accumulo/trunk/contrib/accumulo_sample/query/src/test/java/logic/TestQueryLogic.java Mon Dec 5 20:05:49 2011 @@ -1,19 +1,19 @@ /* -* Licensed to the Apache Software Foundation (ASF) under one or more -* contributor license agreements. See the NOTICE file distributed with -* this work for additional information regarding copyright ownership. -* The ASF licenses this file to You under the Apache License, Version 2.0 -* (the "License"); you may not use this file except in compliance with -* the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package logic; import ingest.WikipediaConfiguration; @@ -64,130 +64,129 @@ import org.apache.accumulo.core.data.Val import org.apache.accumulo.core.security.Authorizations; public class TestQueryLogic { - - private static final String METADATA_TABLE_NAME = "wikiMetadata"; - - private static final String TABLE_NAME = "wiki"; - - private static final String INDEX_TABLE_NAME = "wikiIndex"; - - private static final String RINDEX_TABLE_NAME = "wikiReverseIndex"; - - private class MockAccumuloRecordWriter extends RecordWriter<Text, Mutation> { - @Override - public void write(Text key, Mutation value) throws IOException, InterruptedException { - try { - writerMap.get(key).addMutation(value); - } catch (MutationsRejectedException e) { - throw new IOException("Error adding mutation", e); - } - } - - @Override - public void close(TaskAttemptContext context) throws IOException, InterruptedException { - try { - for (BatchWriter w : writerMap.values()) { - w.flush(); - w.close(); - } - } catch (MutationsRejectedException e) { - throw new IOException("Error closing Batch Writer", e); - } - } - - } - - private Connector c = null; - private Configuration conf = new Configuration(); - private HashMap<Text,BatchWriter> writerMap = new HashMap<Text,BatchWriter>(); - private QueryLogic table = null; - - @Before - public void setup() throws Exception { - - Logger.getLogger(AbstractQueryLogic.class).setLevel(Level.DEBUG); - Logger.getLogger(QueryLogic.class).setLevel(Level.DEBUG); - Logger.getLogger(RangeCalculator.class).setLevel(Level.DEBUG); - - conf.set(AggregatingRecordReader.START_TOKEN, "<page>"); - conf.set(AggregatingRecordReader.END_TOKEN, "</page>"); - conf.set(WikipediaConfiguration.TABLE_NAME, TABLE_NAME); - conf.set(WikipediaConfiguration.NUM_PARTITIONS, "1"); - - - MockInstance i = new MockInstance(); - c = i.getConnector("root", "pass"); - c.tableOperations().delete(METADATA_TABLE_NAME); - c.tableOperations().delete(TABLE_NAME); - c.tableOperations().delete(INDEX_TABLE_NAME); - c.tableOperations().delete(RINDEX_TABLE_NAME); - c.tableOperations().create(METADATA_TABLE_NAME); - c.tableOperations().create(TABLE_NAME); - c.tableOperations().create(INDEX_TABLE_NAME); - c.tableOperations().create(RINDEX_TABLE_NAME); - - writerMap.put(new Text(METADATA_TABLE_NAME), c.createBatchWriter(METADATA_TABLE_NAME, 1000L, 1000L, 1)); - writerMap.put(new Text(TABLE_NAME), c.createBatchWriter(TABLE_NAME, 1000L, 1000L, 1)); - writerMap.put(new Text(INDEX_TABLE_NAME), c.createBatchWriter(INDEX_TABLE_NAME, 1000L, 1000L, 1)); - writerMap.put(new Text(RINDEX_TABLE_NAME), c.createBatchWriter(RINDEX_TABLE_NAME, 1000L, 1000L, 1)); - - TaskAttemptID id = new TaskAttemptID(); - TaskAttemptContext context = new TaskAttemptContext(conf, id); - - RawLocalFileSystem fs = new RawLocalFileSystem(); - fs.setConf(conf); - - URL url = ClassLoader.getSystemResource("enwiki-20110901-001.xml"); - Assert.assertNotNull(url); - File data = new File(url.toURI()); - Path tmpFile = new Path(data.getAbsolutePath()); - - //Setup the Mapper - InputSplit split = new FileSplit(tmpFile, 0, fs.pathToFile(tmpFile).length(), null); - AggregatingRecordReader rr = new AggregatingRecordReader(); - Path ocPath = new Path(tmpFile, "oc"); - OutputCommitter oc = new FileOutputCommitter(ocPath, context); - fs.deleteOnExit(ocPath); - StandaloneStatusReporter sr = new StandaloneStatusReporter(); - rr.initialize(split, context); - MockAccumuloRecordWriter rw = new MockAccumuloRecordWriter(); - WikipediaMapper mapper = new WikipediaMapper(); - - //Load data into Mock Accumulo - Mapper<LongWritable, Text, Text, Mutation>.Context con = mapper.new Context(conf, id, rr, rw, oc, sr, split); - mapper.run(con); - - //Flush and close record writers. - rw.close(context); - - table = new QueryLogic(); - table.setMetadataTableName(METADATA_TABLE_NAME); - table.setTableName(TABLE_NAME); - table.setIndexTableName(INDEX_TABLE_NAME); - table.setReverseIndexTableName(RINDEX_TABLE_NAME); - table.setUseReadAheadIterator(false); - table.setNumPartitions(1); - - } - - private void debugQuery(String tableName) throws Exception { - Scanner s = c.createScanner(tableName, new Authorizations()); - Range r = new Range(); - s.setRange(r); - for (Entry<Key,Value> entry : s) - System.out.println(entry.getKey().toString() +" " + entry.getValue().toString()); - } - - @Test - public void testTitle() { - List<String> auths = new ArrayList<String>(); - auths.add("enwiki"); - Results results = table.runQuery(c, auths, "TITLE == 'afghanistanhistory'", null, null, null); - for (Document doc : results.getResults()) { - System.out.println("id: " + doc.getId()); - for (Field field : doc.getFields()) - System.out.println(field.getFieldName() + " -> " + field.getFieldValue()); - } - } - + + private static final String METADATA_TABLE_NAME = "wikiMetadata"; + + private static final String TABLE_NAME = "wiki"; + + private static final String INDEX_TABLE_NAME = "wikiIndex"; + + private static final String RINDEX_TABLE_NAME = "wikiReverseIndex"; + + private class MockAccumuloRecordWriter extends RecordWriter<Text,Mutation> { + @Override + public void write(Text key, Mutation value) throws IOException, InterruptedException { + try { + writerMap.get(key).addMutation(value); + } catch (MutationsRejectedException e) { + throw new IOException("Error adding mutation", e); + } + } + + @Override + public void close(TaskAttemptContext context) throws IOException, InterruptedException { + try { + for (BatchWriter w : writerMap.values()) { + w.flush(); + w.close(); + } + } catch (MutationsRejectedException e) { + throw new IOException("Error closing Batch Writer", e); + } + } + + } + + private Connector c = null; + private Configuration conf = new Configuration(); + private HashMap<Text,BatchWriter> writerMap = new HashMap<Text,BatchWriter>(); + private QueryLogic table = null; + + @Before + public void setup() throws Exception { + + Logger.getLogger(AbstractQueryLogic.class).setLevel(Level.DEBUG); + Logger.getLogger(QueryLogic.class).setLevel(Level.DEBUG); + Logger.getLogger(RangeCalculator.class).setLevel(Level.DEBUG); + + conf.set(AggregatingRecordReader.START_TOKEN, "<page>"); + conf.set(AggregatingRecordReader.END_TOKEN, "</page>"); + conf.set(WikipediaConfiguration.TABLE_NAME, TABLE_NAME); + conf.set(WikipediaConfiguration.NUM_PARTITIONS, "1"); + + MockInstance i = new MockInstance(); + c = i.getConnector("root", "pass"); + c.tableOperations().delete(METADATA_TABLE_NAME); + c.tableOperations().delete(TABLE_NAME); + c.tableOperations().delete(INDEX_TABLE_NAME); + c.tableOperations().delete(RINDEX_TABLE_NAME); + c.tableOperations().create(METADATA_TABLE_NAME); + c.tableOperations().create(TABLE_NAME); + c.tableOperations().create(INDEX_TABLE_NAME); + c.tableOperations().create(RINDEX_TABLE_NAME); + + writerMap.put(new Text(METADATA_TABLE_NAME), c.createBatchWriter(METADATA_TABLE_NAME, 1000L, 1000L, 1)); + writerMap.put(new Text(TABLE_NAME), c.createBatchWriter(TABLE_NAME, 1000L, 1000L, 1)); + writerMap.put(new Text(INDEX_TABLE_NAME), c.createBatchWriter(INDEX_TABLE_NAME, 1000L, 1000L, 1)); + writerMap.put(new Text(RINDEX_TABLE_NAME), c.createBatchWriter(RINDEX_TABLE_NAME, 1000L, 1000L, 1)); + + TaskAttemptID id = new TaskAttemptID(); + TaskAttemptContext context = new TaskAttemptContext(conf, id); + + RawLocalFileSystem fs = new RawLocalFileSystem(); + fs.setConf(conf); + + URL url = ClassLoader.getSystemResource("enwiki-20110901-001.xml"); + Assert.assertNotNull(url); + File data = new File(url.toURI()); + Path tmpFile = new Path(data.getAbsolutePath()); + + // Setup the Mapper + InputSplit split = new FileSplit(tmpFile, 0, fs.pathToFile(tmpFile).length(), null); + AggregatingRecordReader rr = new AggregatingRecordReader(); + Path ocPath = new Path(tmpFile, "oc"); + OutputCommitter oc = new FileOutputCommitter(ocPath, context); + fs.deleteOnExit(ocPath); + StandaloneStatusReporter sr = new StandaloneStatusReporter(); + rr.initialize(split, context); + MockAccumuloRecordWriter rw = new MockAccumuloRecordWriter(); + WikipediaMapper mapper = new WikipediaMapper(); + + // Load data into Mock Accumulo + Mapper<LongWritable,Text,Text,Mutation>.Context con = mapper.new Context(conf, id, rr, rw, oc, sr, split); + mapper.run(con); + + // Flush and close record writers. + rw.close(context); + + table = new QueryLogic(); + table.setMetadataTableName(METADATA_TABLE_NAME); + table.setTableName(TABLE_NAME); + table.setIndexTableName(INDEX_TABLE_NAME); + table.setReverseIndexTableName(RINDEX_TABLE_NAME); + table.setUseReadAheadIterator(false); + table.setNumPartitions(1); + + } + + private void debugQuery(String tableName) throws Exception { + Scanner s = c.createScanner(tableName, new Authorizations()); + Range r = new Range(); + s.setRange(r); + for (Entry<Key,Value> entry : s) + System.out.println(entry.getKey().toString() + " " + entry.getValue().toString()); + } + + @Test + public void testTitle() { + List<String> auths = new ArrayList<String>(); + auths.add("enwiki"); + Results results = table.runQuery(c, auths, "TITLE == 'afghanistanhistory'", null, null, null); + for (Document doc : results.getResults()) { + System.out.println("id: " + doc.getId()); + for (Field field : doc.getFields()) + System.out.println(field.getFieldName() + " -> " + field.getFieldValue()); + } + } + }
