Fixing issue with record reader where needed all the files in the processing cluster cache directory instead of the few required.
Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/ab31c5fe Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/ab31c5fe Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/ab31c5fe Branch: refs/heads/blur-0.2.4-parcel Commit: ab31c5fea8c167841e641c7154a421ae6f4983d6 Parents: 4fd8687 Author: Aaron McCurry <amccu...@gmail.com> Authored: Fri Jul 31 10:02:29 2015 -0400 Committer: Aaron McCurry <amccu...@gmail.com> Committed: Fri Jul 31 10:02:29 2015 -0400 ---------------------------------------------------------------------- .../blur/mapreduce/lib/GenericRecordReader.java | 65 +++++++++++++++----- 1 file changed, 51 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/ab31c5fe/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/GenericRecordReader.java ---------------------------------------------------------------------- diff --git a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/GenericRecordReader.java b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/GenericRecordReader.java index af5ce9f..1335a73 100644 --- a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/GenericRecordReader.java +++ b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/GenericRecordReader.java @@ -30,17 +30,22 @@ import org.apache.blur.utils.RowDocumentUtil; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; +import org.apache.lucene.codecs.Codec; +import org.apache.lucene.codecs.CodecUtil; import org.apache.lucene.codecs.StoredFieldsReader; import org.apache.lucene.document.DocumentStoredFieldVisitor; +import org.apache.lucene.index.CorruptIndexException; import org.apache.lucene.index.FieldInfos; import org.apache.lucene.index.SegmentInfo; import org.apache.lucene.index.SegmentInfoPerCommit; import org.apache.lucene.index.SegmentInfos; +import org.apache.lucene.store.ChecksumIndexInput; import org.apache.lucene.store.Directory; import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.IndexOutput; import org.apache.lucene.util.Bits; +import org.apache.lucene.util.IOUtils; public class GenericRecordReader { @@ -71,9 +76,8 @@ public class GenericRecordReader { LOG.info("Local cache path [{0}]", localCachePath); _directory = BlurInputFormat.getDirectory(configuration, _table.toString(), blurInputSplit.getDir(), files); - SegmentInfos segmentInfos = new SegmentInfos(); - segmentInfos.read(_directory, blurInputSplit.getSegmentsName()); - SegmentInfoPerCommit commit = findSegmentInfoPerCommit(segmentInfos, blurInputSplit); + SegmentInfoPerCommit commit = segmentInfosRead(_directory, blurInputSplit.getSegmentsName(), + blurInputSplit.getSegmentInfoName()); SegmentInfo segmentInfo = commit.info; if (localCachePath != null) { @@ -98,6 +102,50 @@ public class GenericRecordReader { _maxDoc = commit.info.getDocCount(); } + private SegmentInfoPerCommit segmentInfosRead(Directory directory, String segmentFileName, String segmentInfoName) + throws IOException { + boolean success = false; + + ChecksumIndexInput input = new ChecksumIndexInput(directory.openInput(segmentFileName, IOContext.READ)); + try { + final int format = input.readInt(); + if (format == CodecUtil.CODEC_MAGIC) { + // 4.0+ + CodecUtil.checkHeaderNoMagic(input, "segments", SegmentInfos.VERSION_40, SegmentInfos.VERSION_40); + input.readLong();// read version + input.readInt(); // read counter + int numSegments = input.readInt(); + if (numSegments < 0) { + throw new CorruptIndexException("invalid segment count: " + numSegments + " (resource: " + input + ")"); + } + for (int seg = 0; seg < numSegments; seg++) { + String segName = input.readString(); + Codec codec = Codec.forName(input.readString()); + SegmentInfo info = codec.segmentInfoFormat().getSegmentInfoReader().read(directory, segName, IOContext.READ); + info.setCodec(codec); + long delGen = input.readLong(); + int delCount = input.readInt(); + if (delCount < 0 || delCount > info.getDocCount()) { + throw new CorruptIndexException("invalid deletion count: " + delCount + " (resource: " + input + ")"); + } + if (segName.equals(segmentInfoName)) { + success = true; + return new SegmentInfoPerCommit(info, delCount, delGen); + } + } + } else { + throw new IOException("Legacy Infos not supported for dir [" + directory + "]."); + } + throw new IOException("Segment [" + segmentInfoName + "] nout found in dir [" + directory + "]"); + } finally { + if (!success) { + IOUtils.closeWhileHandlingException(input); + } else { + input.close(); + } + } + } + private static Directory copyFilesLocally(Configuration configuration, Directory dir, String table, Path shardDir, Path localCachePath, Collection<String> files) throws IOException { LOG.info("Copying files need to local cache for faster reads [{0}].", shardDir); @@ -170,17 +218,6 @@ public class GenericRecordReader { return false; } - private SegmentInfoPerCommit findSegmentInfoPerCommit(SegmentInfos segmentInfos, BlurInputSplit blurInputSplit) - throws IOException { - String segmentInfoName = blurInputSplit.getSegmentInfoName(); - for (SegmentInfoPerCommit commit : segmentInfos) { - if (commit.info.name.equals(segmentInfoName)) { - return commit; - } - } - throw new IOException("SegmentInfoPerCommit of [" + segmentInfoName + "] not found."); - } - public boolean nextKeyValue() throws IOException { if (_docId >= _maxDoc) { return false;