On Mon, May 6, 2013 at 8:48 PM, <[email protected]> wrote: > Updated Branches: > refs/heads/0.1.5 827ba0c48 -> 1798eafe5 > > > Removed some dep warnings. > > > Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo > Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/1798eafe > Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/1798eafe > Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/1798eafe > > Branch: refs/heads/0.1.5 > Commit: 1798eafe5b047da3b82ecd66b2e3e682b707f2ce > Parents: 827ba0c > Author: Aaron McCurry <[email protected]> > Authored: Mon May 6 20:47:55 2013 -0400 > Committer: Aaron McCurry <[email protected]> > Committed: Mon May 6 20:47:55 2013 -0400 > > ---------------------------------------------------------------------- > .../org/apache/blur/mapreduce/BlurReducer.java | 75 +++++++-------- > 1 files changed, 34 insertions(+), 41 deletions(-) > ---------------------------------------------------------------------- > > > http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/1798eafe/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/BlurReducer.java > ---------------------------------------------------------------------- > diff --git > a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/BlurReducer.java > b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/BlurReducer.java > index 17b5cc6..cb6beee 100644 > --- a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/BlurReducer.java > +++ b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/BlurReducer.java > @@ -18,7 +18,6 @@ package org.apache.blur.mapreduce; > */ > import static org.apache.blur.lucene.LuceneVersionConstant.LUCENE_VERSION; > import static org.apache.blur.utils.BlurConstants.RECORD_ID; > -import static org.apache.blur.utils.BlurConstants.ROW_ID; > > import java.io.File; > import java.io.IOException; > @@ -37,6 +36,7 @@ import org.apache.blur.analysis.BlurAnalyzer; > import org.apache.blur.log.Log; > import org.apache.blur.log.LogFactory; > import org.apache.blur.lucene.search.FairSimilarity; > +import org.apache.blur.manager.writer.TransactionRecorder; > import org.apache.blur.mapreduce.BlurMutate.MUTATE_TYPE; > import org.apache.blur.mapreduce.BlurTask.INDEXING_TYPE; > import org.apache.blur.store.hdfs.HdfsDirectory; > @@ -58,8 +58,8 @@ import org.apache.hadoop.mapreduce.Counter; > import org.apache.hadoop.mapreduce.Reducer; > import org.apache.lucene.document.Document; > import org.apache.lucene.document.Field; > -import org.apache.lucene.document.Field.Index; > import org.apache.lucene.document.Field.Store; > +import org.apache.lucene.document.StringField; > import org.apache.lucene.index.DirectoryReader; > import org.apache.lucene.index.IndexReader; > import org.apache.lucene.index.IndexWriter; > @@ -134,7 +134,7 @@ public class BlurReducer extends Reducer<BytesWritable, > BlurMutate, BytesWritabl > setupDirectory(context); > setupWriter(context); > if (_blurTask.getIndexingType() == INDEXING_TYPE.UPDATE) { > - _reader = IndexReader.open(_directory); > + _reader = DirectoryReader.open(_directory); > } > } > > @@ -149,7 +149,8 @@ public class BlurReducer extends Reducer<BytesWritable, > BlurMutate, BytesWritabl > } > > @Override > - protected void reduce(BytesWritable key, Iterable<BlurMutate> values, > Context context) throws IOException, InterruptedException { > + protected void reduce(BytesWritable key, Iterable<BlurMutate> values, > Context context) throws IOException, > + InterruptedException { > if (!index(key, values, context)) { > _rowFailures.increment(1); > } > @@ -165,7 +166,7 @@ public class BlurReducer extends Reducer<BytesWritable, > BlurMutate, BytesWritabl > BlurRecord record = mutate.getRecord(); > if (!rowIdSet) { > String rowId = record.getRowId(); > - _rowIdTerm = new Term(BlurConstants.ROW_ID,rowId); > + _rowIdTerm = new Term(BlurConstants.ROW_ID, rowId); > rowIdSet = true; > } > if (mutate.getMutateType() == MUTATE_TYPE.DELETE) { > @@ -187,7 +188,7 @@ public class BlurReducer extends Reducer<BytesWritable, > BlurMutate, BytesWritabl > > List<Document> docs = documentsToIndex(new > ArrayList<Document>(_newDocs.values())); > if (docs.size() > 0) { > - docs.get(0).add(new Field(BlurConstants.PRIME_DOC, > BlurConstants.PRIME_DOC_VALUE, Store.NO, Index.NOT_ANALYZED_NO_NORMS)); > + docs.get(0).add(new StringField(BlurConstants.PRIME_DOC, > BlurConstants.PRIME_DOC_VALUE, Store.NO)); > } > > switch (_blurTask.getIndexingType()) { > @@ -216,8 +217,9 @@ public class BlurReducer extends Reducer<BytesWritable, > BlurMutate, BytesWritabl > double recordRate = (records - _previousRecord) / seconds; > double rowsRate = (rows - _previousRow) / seconds; > > - String status = String.format("Totals [%d Row, %d Records], Avg Rates > [%.1f Row/s, %.1f Records/s] Rates [%.1f Row/s, %.1f Records/s]", rows, > records, overAllRowsRate, > - overAllRecordRate, rowsRate, recordRate); > + String status = String.format( > + "Totals [%d Row, %d Records], Avg Rates [%.1f Row/s, %.1f > Records/s] Rates [%.1f Row/s, %.1f Records/s]", > + rows, records, overAllRowsRate, overAllRecordRate, rowsRate, > recordRate); > > LOG.info(status); > context.setStatus(status); > @@ -306,7 +308,8 @@ public class BlurReducer extends Reducer<BytesWritable, > BlurMutate, BytesWritabl > long totalBytesCopied = 0; > long startTime = System.currentTimeMillis(); > for (String file : files) { > - totalBytesCopied += copy(_directory, destDirectory, file, file, > context, totalBytesCopied, totalBytesToCopy, startTime); > + totalBytesCopied += copy(_directory, destDirectory, file, file, > context, totalBytesCopied, totalBytesToCopy, > + startTime); > } > long e = System.currentTimeMillis(); > context.setStatus("Copying phase took [" + (e - s) + " ms]"); > @@ -318,15 +321,8 @@ public class BlurReducer extends Reducer<BytesWritable, > BlurMutate, BytesWritabl > return new BufferedDirectory(destDirectory, 32768); > } > > - protected Directory getDestDirectory(Configuration configuration, > TableDescriptor descriptor, Path directoryPath) throws IOException { > - String compressionClass = descriptor.compressionClass; > - int compressionBlockSize = descriptor.getCompressionBlockSize(); > - if (compressionClass == null) { > - compressionClass = "org.apache.hadoop.io.compress.DefaultCodec"; > - } > - // if (compressionBlockSize == 0) { > - compressionBlockSize = 32768; > - // } > + protected Directory getDestDirectory(Configuration configuration, > TableDescriptor descriptor, Path directoryPath) > + throws IOException { > return new HdfsDirectory(configuration, directoryPath); > } > > @@ -356,7 +352,8 @@ public class BlurReducer extends Reducer<BytesWritable, > BlurMutate, BytesWritabl > return total; > } > > - protected long copy(Directory from, Directory to, String src, String dest, > Context context, long totalBytesCopied, long totalBytesToCopy, long > startTime) throws IOException { > + protected long copy(Directory from, Directory to, String src, String dest, > Context context, long totalBytesCopied, > + long totalBytesToCopy, long startTime) throws IOException { > IndexOutput os = to.createOutput(dest, new IOContext()); > IndexInput is = from.openInput(src, new IOContext()); > IOException priorException = null; > @@ -370,8 +367,8 @@ public class BlurReducer extends Reducer<BytesWritable, > BlurMutate, BytesWritabl > return 0;// this should never be called > } > > - protected long copyBytes(IndexInput in, IndexOutput out, long numBytes, > Context context, long totalBytesCopied, long totalBytesToCopy, long > startTime, String src) > - throws IOException { > + protected long copyBytes(IndexInput in, IndexOutput out, long numBytes, > Context context, long totalBytesCopied, > + long totalBytesToCopy, long startTime, String src) throws IOException { > if (_copyBuf == null) { > _copyBuf = new byte[BufferedIndexInput.BUFFER_SIZE]; > } > @@ -413,17 +410,8 @@ public class BlurReducer extends Reducer<BytesWritable, > BlurMutate, BytesWritabl > File path = new File(dir, "index"); > rm(path); > LOG.info("Using local path [" + path + "] for indexing."); > - String compressionClass = descriptor.compressionClass; > - int compressionBlockSize = descriptor.getCompressionBlockSize(); > - if (compressionClass == null) { > - compressionClass = "org.apache.hadoop.io.compress.DefaultCodec"; > - } > > Directory localDirectory = FSDirectory.open(path); > - // if (compressionBlockSize == 0) { > - compressionBlockSize = 32768; > - // } > -// CompressedFieldDataDirectory compressedFieldDataDirectory = new > CompressedFieldDataDirectory(localDirectory, getInstance(compressionClass), > compressionBlockSize); > _directory = new ProgressableDirectory(localDirectory, context); > return; > default: > @@ -471,16 +459,18 @@ public class BlurReducer extends Reducer<BytesWritable, > BlurMutate, BytesWritabl > > protected Document toDocument(BlurRecord record, StringBuilder builder) { > Document document = new Document(); > - document.add(new Field(ROW_ID, record.getRowId(), Store.YES, > Index.NOT_ANALYZED_NO_NORMS)); > - document.add(new Field(RECORD_ID, record.getRecordId(), Store.YES, > Index.NOT_ANALYZED_NO_NORMS)); > + document.add(new Field(BlurConstants.ROW_ID, record.getRowId(), > TransactionRecorder.ID_TYPE)); > + document.add(new Field(BlurConstants.RECORD_ID, record.getRecordId(), > TransactionRecorder.ID_TYPE));
Did you miss a scope change on TransactionRecorder? --tim
