Fourth round of udpates.
Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/96a1821a Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/96a1821a Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/96a1821a Branch: refs/heads/master Commit: 96a1821a0a723fafefd79a8a6769cf1286e95eaa Parents: ea50630 Author: Aaron McCurry <amccu...@gmail.com> Authored: Sat May 7 13:12:48 2016 -0400 Committer: Aaron McCurry <amccu...@gmail.com> Committed: Sat May 7 13:12:48 2016 -0400 ---------------------------------------------------------------------- .../mapreduce/lib/GenericBlurRecordWriter.java | 57 +++++- .../mapreduce/lib/PrimeDocOverFlowHelper.java | 31 ++- .../mapreduce/lib/update/UpdateReducer.java | 23 ++- .../blur/lucene/search/FacetExecutor.java | 89 ++++++--- .../apache/blur/lucene/search/FacetQuery.java | 14 +- .../BaseReadMaskFieldTypeDefinitionTest.java | 13 +- blur-shell/pom.xml | 7 + .../ListRunningPlatformCommandsCommand.java | 43 ++-- .../main/java/org/apache/blur/shell/Main.java | 3 +- .../org/apache/blur/shell/TableDisplay.java | 3 +- .../org/apache/blur/shell/WatchCommands.java | 149 ++++++++++++++ .../lucene/codec/DiskDocValuesProducer.java | 195 ++++++++++++++----- .../store/blockcache_v2/CacheIndexInput.java | 4 +- .../blur/store/blockcache_v2/MeterWrapper.java | 13 +- .../cachevalue/DetachableCacheValue.java | 48 +++-- .../blur/store/hdfs/SequentialReadControl.java | 3 +- .../packed/DirectPacked64SingleBlockReader.java | 65 +++++++ .../lucene/util/packed/DirectPackedReader.java | 80 ++++++++ .../org/apache/blur/utils/BlurConstants.java | 5 +- deploy.sh | 62 ++++++ 20 files changed, 754 insertions(+), 153 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/96a1821a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/GenericBlurRecordWriter.java ---------------------------------------------------------------------- diff --git a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/GenericBlurRecordWriter.java b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/GenericBlurRecordWriter.java index a947980..8828f85 100644 --- a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/GenericBlurRecordWriter.java +++ b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/GenericBlurRecordWriter.java @@ -18,9 +18,12 @@ package org.apache.blur.mapreduce.lib; import java.io.File; import java.io.IOException; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.UUID; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.blur.analysis.FieldManager; import org.apache.blur.log.Log; @@ -47,11 +50,11 @@ import org.apache.lucene.document.Field; import org.apache.lucene.document.Field.Store; import org.apache.lucene.document.StringField; import org.apache.lucene.index.AtomicReader; +import org.apache.lucene.index.AtomicReaderContext; import org.apache.lucene.index.CorruptIndexException; import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriterConfig; -import org.apache.lucene.index.SlowCompositeReaderWrapper; import org.apache.lucene.index.TieredMergePolicy; import org.apache.lucene.store.Directory; import org.apache.lucene.store.IOContext; @@ -95,6 +98,7 @@ public class GenericBlurRecordWriter { private ProgressableDirectory _localTmpDir; private String _deletedRowId; private Configuration _configuration; + private String _currentRowId; public GenericBlurRecordWriter(Configuration configuration, int attemptId, String tmpDirName) throws IOException { _configuration = configuration; @@ -200,6 +204,7 @@ public class GenericBlurRecordWriter { private void add(BlurMutate value) throws IOException { BlurRecord blurRecord = value.getRecord(); + _currentRowId = blurRecord.getRowId(); Record record = getRecord(blurRecord); String recordId = record.getRecordId(); if (value.getMutateType() == MUTATE_TYPE.DELETE) { @@ -224,7 +229,7 @@ public class GenericBlurRecordWriter { private void flushToTmpIndexIfNeeded() throws IOException { if (_documentBufferStrategy.isFull()) { - LOG.info("Document Buffer is full overflow to disk."); + LOG.info("RowId [" + _currentRowId + "] - Document Buffer is full overflow to disk."); flushToTmpIndex(); } } @@ -273,15 +278,35 @@ public class GenericBlurRecordWriter { return record; } + private static ThreadLocal<AtomicBoolean> _existingRow = new ThreadLocal<AtomicBoolean>() { + @Override + protected AtomicBoolean initialValue() { + return new AtomicBoolean(); + } + }; + + public static boolean isCurrentRowExisting() { + return _existingRow.get().get(); + } + + public static void setCurrentRowExistingRowId(boolean existing) { + _existingRow.get().set(existing); + } + private void flush() throws CorruptIndexException, IOException { + boolean newRow = !isCurrentRowExisting(); if (_usingLocalTmpindex) { // since we have flushed to disk then we do not need to index the // delete. flushToTmpIndex(); - _localTmpWriter.close(false); + LOG.info("RowId [" + _currentRowId + "] - forceMerge"); + _localTmpWriter.forceMerge(1, true); + _localTmpWriter.close(true); + DirectoryReader reader = DirectoryReader.open(_localTmpDir); - AtomicReader atomicReader = SlowCompositeReaderWrapper.wrap(reader); - AtomicReader primeDocAtomicReader = PrimeDocOverFlowHelper.addPrimeDoc(atomicReader); + AtomicReader atomicReader = getAtomicReader(reader); + LOG.info("RowId [" + _currentRowId + "] - total documents [" + atomicReader.maxDoc() + "]"); + AtomicReader primeDocAtomicReader = PrimeDocOverFlowHelper.addPrimeDoc(atomicReader, newRow, _currentRowId); if (_countersSetup) { _recordRateCounter.mark(reader.numDocs()); } @@ -289,6 +314,7 @@ public class GenericBlurRecordWriter { primeDocAtomicReader.close(); resetLocalTmp(); _writer.maybeMerge(); + LOG.info("RowId [" + _currentRowId + "] - add complete"); if (_countersSetup) { _rowOverFlowCount.increment(1); } @@ -303,6 +329,11 @@ public class GenericBlurRecordWriter { } else { List<List<Field>> docs = _documentBufferStrategy.getAndClearBuffer(); docs.get(0).add(new StringField(BlurConstants.PRIME_DOC, BlurConstants.PRIME_DOC_VALUE, Store.NO)); + if (newRow) { + docs.get(0).add(new StringField(BlurConstants.NEW_ROW, BlurConstants.PRIME_DOC_VALUE, Store.NO)); + } else { + docs.get(0).add(new StringField(BlurConstants.UPDATE_ROW, _currentRowId, Store.NO)); + } _writer.addDocuments(docs); if (_countersSetup) { _recordRateCounter.mark(docs.size()); @@ -316,10 +347,19 @@ public class GenericBlurRecordWriter { } } + private AtomicReader getAtomicReader(DirectoryReader reader) throws IOException { + List<AtomicReaderContext> leaves = reader.leaves(); + if (leaves.size() == 1) { + return leaves.get(0).reader(); + } + throw new IOException("Reader [" + reader + "] has more than one segment after optimize."); + } + private Document getDeleteDoc() { Document document = new Document(); document.add(new StringField(BlurConstants.ROW_ID, _deletedRowId, Store.NO)); document.add(new StringField(BlurConstants.DELETE_MARKER, BlurConstants.DELETE_MARKER_VALUE, Store.NO)); + document.add(new StringField(BlurConstants.UPDATE_ROW, _deletedRowId, Store.NO)); return document; } @@ -348,10 +388,17 @@ public class GenericBlurRecordWriter { DirectoryReader reader = DirectoryReader.open(_localDir); IndexWriter writer = new IndexWriter(copyRateDirectory, _conf.clone()); writer.addIndexes(reader); + writer.setCommitData(getInternalMarker()); writer.close(); rm(_localPath); } + private Map<String, String> getInternalMarker() { + Map<String, String> map = new HashMap<String, String>(); + map.put(BlurConstants.INTERNAL, BlurConstants.INTERNAL); + return map; + } + private void copyDir() throws IOException { CopyRateDirectory copyRateDirectory = new CopyRateDirectory(_finalDir, _copyRateCounter); String[] fileNames = _localDir.listAll(); http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/96a1821a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/PrimeDocOverFlowHelper.java ---------------------------------------------------------------------- diff --git a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/PrimeDocOverFlowHelper.java b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/PrimeDocOverFlowHelper.java index 672a1c1..73d9c78 100644 --- a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/PrimeDocOverFlowHelper.java +++ b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/PrimeDocOverFlowHelper.java @@ -38,26 +38,47 @@ import org.apache.lucene.util.Version; public class PrimeDocOverFlowHelper { - private static Directory _directory; + private static Directory _directoryNewRow; static { try { - _directory = new RAMDirectory(); - IndexWriter writer = new IndexWriter(_directory, new IndexWriterConfig(Version.LUCENE_43, new KeywordAnalyzer())); + _directoryNewRow = new RAMDirectory(); + IndexWriter writer = new IndexWriter(_directoryNewRow, new IndexWriterConfig(Version.LUCENE_43, + new KeywordAnalyzer())); Document document = new Document(); document.add(new StringField(BlurConstants.PRIME_DOC, BlurConstants.PRIME_DOC_VALUE, Store.NO)); + document.add(new StringField(BlurConstants.NEW_ROW, BlurConstants.PRIME_DOC_VALUE, Store.NO)); writer.addDocument(document); writer.close(); } catch (Exception e) { throw new RuntimeException(e); } + } - public static AtomicReader addPrimeDoc(AtomicReader atomicReader) throws IOException { - AtomicReaderContext context = DirectoryReader.open(_directory).leaves().get(0); + public static AtomicReader addPrimeDoc(AtomicReader atomicReader, boolean newRow, String currentRowId) + throws IOException { + AtomicReaderContext context = DirectoryReader.open(newRow ? _directoryNewRow : getDirectoryUpdateRow(currentRowId)) + .leaves().get(0); return new ParallelAtomicReader(true, setDocSize(context.reader(), atomicReader.maxDoc()), atomicReader); } + private static Directory getDirectoryUpdateRow(String currentRowId) { + try { + RAMDirectory directoryUpdateRow = new RAMDirectory(); + IndexWriter writer = new IndexWriter(directoryUpdateRow, new IndexWriterConfig(Version.LUCENE_43, + new KeywordAnalyzer())); + Document document = new Document(); + document.add(new StringField(BlurConstants.PRIME_DOC, BlurConstants.PRIME_DOC_VALUE, Store.NO)); + document.add(new StringField(BlurConstants.UPDATE_ROW, currentRowId, Store.NO)); + writer.addDocument(document); + writer.close(); + return directoryUpdateRow; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + private static AtomicReader setDocSize(AtomicReader reader, final int count) { return new FilterAtomicReader(reader) { @Override http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/96a1821a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/update/UpdateReducer.java ---------------------------------------------------------------------- diff --git a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/update/UpdateReducer.java b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/update/UpdateReducer.java index f8705aa..d62617b 100644 --- a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/update/UpdateReducer.java +++ b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/update/UpdateReducer.java @@ -20,10 +20,11 @@ import java.io.IOException; import org.apache.blur.mapreduce.lib.BlurMutate; import org.apache.blur.mapreduce.lib.BlurMutate.MUTATE_TYPE; -import org.apache.blur.mapreduce.lib.update.IndexKey.TYPE; import org.apache.blur.mapreduce.lib.BlurOutputFormat; import org.apache.blur.mapreduce.lib.BlurRecord; +import org.apache.blur.mapreduce.lib.GenericBlurRecordWriter; import org.apache.blur.mapreduce.lib.GetCounter; +import org.apache.blur.mapreduce.lib.update.IndexKey.TYPE; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Counter; import org.apache.hadoop.mapreduce.Reducer; @@ -37,9 +38,9 @@ public class UpdateReducer extends Reducer<IndexKey, IndexValue, Text, BlurMutat private static final String MARKER_RECORDS = "Marker Records"; private static final String SEP = " - "; private static final String BLUR_UPDATE = "Blur Update"; - private static final String EXISTING_RCORDS = "Existing Rcords"; - private static final String NEW_RCORDS = "New Rcords"; - private static final String NO_UPDATE = "NoUpdate"; + private static final String EXISTING_RECORDS = "Existing Records"; + private static final String NEW_RECORDS = "New Records"; + private static final String NO_UPDATE = "No Update"; private static final String UPDATE = "Update"; private static final String BLUR_UPDATE_DEBUG = BLUR_UPDATE + SEP + "DEBUG"; @@ -64,10 +65,10 @@ public class UpdateReducer extends Reducer<IndexKey, IndexValue, Text, BlurMutat } }); - _newRecordsUpdate = context.getCounter(BLUR_UPDATE, NEW_RCORDS + SEP + UPDATE); - _newRecordsNoUpdate = context.getCounter(BLUR_UPDATE, NEW_RCORDS + SEP + NO_UPDATE); - _existingRecordsUpdate = context.getCounter(BLUR_UPDATE, EXISTING_RCORDS + SEP + UPDATE); - _existingRecordsNoUpdate = context.getCounter(BLUR_UPDATE, EXISTING_RCORDS + SEP + NO_UPDATE); + _newRecordsUpdate = context.getCounter(BLUR_UPDATE, NEW_RECORDS + SEP + UPDATE); + _newRecordsNoUpdate = context.getCounter(BLUR_UPDATE, NEW_RECORDS + SEP + NO_UPDATE); + _existingRecordsUpdate = context.getCounter(BLUR_UPDATE, EXISTING_RECORDS + SEP + UPDATE); + _existingRecordsNoUpdate = context.getCounter(BLUR_UPDATE, EXISTING_RECORDS + SEP + NO_UPDATE); _ignoredExistingRows = context.getCounter(BLUR_UPDATE, IGNORED_EXISTING_ROWS); _debugRecordsWithSameRecordId = context.getCounter(BLUR_UPDATE_DEBUG, MULTIPLE_RECORD_W_SAME_RECORD_ID); @@ -76,7 +77,6 @@ public class UpdateReducer extends Reducer<IndexKey, IndexValue, Text, BlurMutat _debugMarkerRecordsUpdate = context.getCounter(BLUR_UPDATE_DEBUG, MARKER_RECORDS + SEP + UPDATE); _debugIndexValues = context.getCounter(BLUR_UPDATE_DEBUG, INDEX_VALUES); _debugNullBlurRecords = context.getCounter(BLUR_UPDATE_DEBUG, NULL_BLUR_RECORDS); - } @Override @@ -93,6 +93,7 @@ public class UpdateReducer extends Reducer<IndexKey, IndexValue, Text, BlurMutat InterruptedException { BlurRecord prevBlurRecord = null; String prevRecordId = null; + boolean existing = false; for (IndexValue value : values) { updateCounters(true, key); BlurRecord br = value.getBlurRecord(); @@ -103,6 +104,9 @@ public class UpdateReducer extends Reducer<IndexKey, IndexValue, Text, BlurMutat } else { // Safe Copy BlurRecord currentBlurRecord = new BlurRecord(br); + if (key.getType() == IndexKey.TYPE.OLD_DATA) { + existing = true; + } String currentRecordId = currentBlurRecord.getRecordId(); if (prevRecordId != null) { if (prevRecordId.equals(currentRecordId)) { @@ -120,6 +124,7 @@ public class UpdateReducer extends Reducer<IndexKey, IndexValue, Text, BlurMutat if (prevBlurRecord != null) { context.write(new Text(prevBlurRecord.getRowId()), toMutate(prevBlurRecord)); } + GenericBlurRecordWriter.setCurrentRowExistingRowId(existing); } private void updateCounters(boolean update, IndexKey key) { http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/96a1821a/blur-query/src/main/java/org/apache/blur/lucene/search/FacetExecutor.java ---------------------------------------------------------------------- diff --git a/blur-query/src/main/java/org/apache/blur/lucene/search/FacetExecutor.java b/blur-query/src/main/java/org/apache/blur/lucene/search/FacetExecutor.java index 683ba98..00383f7 100644 --- a/blur-query/src/main/java/org/apache/blur/lucene/search/FacetExecutor.java +++ b/blur-query/src/main/java/org/apache/blur/lucene/search/FacetExecutor.java @@ -18,11 +18,13 @@ package org.apache.blur.lucene.search; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.UUID; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; @@ -37,10 +39,11 @@ import org.apache.blur.log.Log; import org.apache.blur.log.LogFactory; import org.apache.blur.trace.Trace; import org.apache.blur.trace.Tracer; +import org.apache.blur.user.User; +import org.apache.blur.user.UserContext; import org.apache.lucene.index.AtomicReader; import org.apache.lucene.index.AtomicReaderContext; import org.apache.lucene.search.Collector; -import org.apache.lucene.search.DocIdSetIterator; import org.apache.lucene.search.Scorer; import org.apache.lucene.util.OpenBitSet; @@ -75,19 +78,21 @@ public class FacetExecutor { public void collect(int doc) throws IOException { if (_bitSet.fastGet(doc)) { _hits++; - } else { - int nextSetBit = _bitSet.nextSetBit(doc); - if (nextSetBit < 0) { - throw new Finished(); - } else { - int advance = _scorer.advance(nextSetBit); - if (advance == DocIdSetIterator.NO_MORE_DOCS) { - throw new Finished(); - } - if (_bitSet.fastGet(advance)) { - _hits++; - } - } + // } else { + // int nextSetBit = _bitSet.nextSetBit(doc); + // if (nextSetBit < 0) { + // LOG.debug("finished early, no more hits in query."); + // throw new Finished(); + // } else { + // int advance = _scorer.advance(nextSetBit); + // if (advance == DocIdSetIterator.NO_MORE_DOCS) { + // LOG.debug("finished early, no more hits in facet."); + // throw new Finished(); + // } + // if (_bitSet.fastGet(advance)) { + // _hits++; + // } + // } } } @@ -137,14 +142,17 @@ public class FacetExecutor { final String _readerStr; final int _maxDoc; final Lock[] _locks; + final String _instance; @Override public String toString() { - return "Info scorers length [" + _scorers.length + "] reader [" + _reader + "]"; + return "Info scorers length [" + _scorers.length + "] reader [" + _reader + "] scorers [" + + Arrays.asList(_scorers) + "]"; } - Info(AtomicReaderContext context, Scorer[] scorers, Lock[] locks) { + Info(AtomicReaderContext context, Scorer[] scorers, Lock[] locks, String instance) { AtomicReader reader = context.reader(); + _instance = instance; _bitSet = new OpenBitSet(reader.maxDoc()); _scorers = scorers; _reader = reader; @@ -155,13 +163,18 @@ public class FacetExecutor { void process(AtomicLongArray counts, long[] minimumsBeforeReturning, AtomicBoolean running) throws IOException { if (minimumsBeforeReturning == null) { + LOG.debug(getPrefix("no minimums before returning.")); Tracer trace = Trace.trace("processing facet - segment", Trace.param("reader", _readerStr), Trace.param("maxDoc", _maxDoc), Trace.param("minimums", "NONE"), Trace.param("scorers", _scorers.length)); try { for (int i = 0; i < _scorers.length && running.get(); i++) { + LOG.debug(getPrefix("running facet for scorer [{0}] [{1}]."), i, _scorers[i]); SimpleCollector col = new SimpleCollector(_bitSet); runFacet(counts, col, i); } + if (!running.get()) { + LOG.debug(getPrefix("running was stopped.")); + } } finally { trace.done(); } @@ -177,7 +190,7 @@ public class FacetExecutor { long min = minimumsBeforeReturning[id]; long currentCount = counts.get(id); if (currentCount < min) { - LOG.debug("Running facet, current count [{0}] min [{1}]", currentCount, min); + LOG.debug(getPrefix("Running facet, current count [{0}] min [{1}]"), currentCount, min); SimpleCollectorExitEarly col = new SimpleCollectorExitEarly(_bitSet, currentCount, min); runFacet(counts, col, id); } @@ -188,6 +201,9 @@ public class FacetExecutor { ids.put(id); } } + if (!running.get()) { + LOG.debug(getPrefix("running was stopped.")); + } } catch (Exception e) { throw new IOException(e); } @@ -206,19 +222,26 @@ public class FacetExecutor { Tracer traceInner = Trace.trace("processing facet - segment - scorer", Trace.param("scorer", scorer), Trace.param("scorer.cost", scorer.cost())); try { - // new ExitScorer(scorer).score(col); + LOG.debug(getPrefix("starting scorer [" + i + "].")); scorer.score(col); } catch (Finished e) { // Do nothing, exiting early. + LOG.debug(getPrefix("finished early.")); } finally { traceInner.done(); } int hits = col._hits; - LOG.debug("Facet [{0}] result [{1}]", i, hits); + LOG.debug(getPrefix("Facet [{0}] result [{1}]"), i, hits); counts.addAndGet(i, hits); + } else { + LOG.debug(getPrefix("scorer [" + i + "] is null.")); } col._hits = 0; } + + private String getPrefix(String s) { + return _instance + " " + s; + } } private final Map<Object, Info> _infoMap = new ConcurrentHashMap<Object, FacetExecutor.Info>(); @@ -228,6 +251,7 @@ public class FacetExecutor { private final Lock[] _locks; private final AtomicBoolean _running; private boolean _processed; + private final String _instance = UUID.randomUUID().toString(); public FacetExecutor(int length) { this(length, null, new AtomicLongArray(length)); @@ -250,26 +274,34 @@ public class FacetExecutor { _locks[i] = new ReentrantReadWriteLock().writeLock(); } _running = running; + User user = UserContext.getUser(); + LOG.debug(getPrefix("User [{0}]"), user); } public void addScorers(AtomicReaderContext context, Scorer[] scorers) throws IOException { + LOG.debug(getPrefix("adding scorers context [{0}] [{1}]"), context, Arrays.asList(scorers)); if (scorers.length != _length) { throw new IOException("Scorer length is not correct expecting [" + _length + "] actual [" + scorers.length + "]"); } Object key = getKey(context); Info info = _infoMap.get(key); if (info == null) { - info = new Info(context, scorers, _locks); + info = new Info(context, scorers, _locks, _instance); _infoMap.put(key, info); } else { AtomicReader reader = context.reader(); - LOG.warn("Info about reader context [{0}] already created, existing Info [{1}] current reader [{2}].", context, - info, reader); + LOG.warn(getPrefix("Info about reader context [{0}] already created, existing Info [{1}] current reader [{2}]."), + context, info, reader); } } + public String getPrefix(String s) { + return _instance + " " + s; + } + public boolean scorersAlreadyAdded(AtomicReaderContext context) { Object key = getKey(context); + LOG.debug(getPrefix("scorersAlreadyAdded key [{0}]"), context); return _infoMap.containsKey(key); } @@ -297,7 +329,9 @@ public class FacetExecutor { } public void processFacets(ExecutorService executor) throws IOException { + LOG.debug(getPrefix("processFacets called")); if (!_processed) { + LOG.debug(getPrefix("processing Facets")); Tracer trace = Trace.trace("processing facets"); try { processInternal(executor); @@ -310,11 +344,16 @@ public class FacetExecutor { private void processInternal(ExecutorService executor) throws IOException { List<Entry<Object, Info>> entries = new ArrayList<Entry<Object, Info>>(_infoMap.entrySet()); + LOG.debug(getPrefix("entries count [{0}]"), entries.size()); Collections.sort(entries, COMPARATOR); if (executor == null) { + LOG.debug(getPrefix("no executor"), entries.size()); for (Entry<Object, Info> e : entries) { if (_running.get()) { + LOG.debug(getPrefix("processing [{0}] [{1}]"), e.getKey(), e.getValue()); e.getValue().process(_counts, _minimumsBeforeReturning, _running); + } else { + LOG.debug(getPrefix("No longer running.")); } } } else { @@ -326,14 +365,18 @@ public class FacetExecutor { @Override public void run() { try { + LOG.debug(getPrefix("processing [{0}] [{1}]"), entry.getKey(), entry.getValue()); entry.getValue().process(_counts, _minimumsBeforeReturning, _running); } catch (Throwable e) { - LOG.error("Unknown error", e); + LOG.error(getPrefix("Unknown error"), e); } finally { finished.incrementAndGet(); } } }); + } else { + LOG.debug(getPrefix("No longer running.")); + return; } } http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/96a1821a/blur-query/src/main/java/org/apache/blur/lucene/search/FacetQuery.java ---------------------------------------------------------------------- diff --git a/blur-query/src/main/java/org/apache/blur/lucene/search/FacetQuery.java b/blur-query/src/main/java/org/apache/blur/lucene/search/FacetQuery.java index fbb342c..74ee6e0 100644 --- a/blur-query/src/main/java/org/apache/blur/lucene/search/FacetQuery.java +++ b/blur-query/src/main/java/org/apache/blur/lucene/search/FacetQuery.java @@ -17,6 +17,7 @@ package org.apache.blur.lucene.search; * limitations under the License. */ import java.io.IOException; +import java.util.Arrays; import org.apache.blur.log.Log; import org.apache.blur.log.LogFactory; @@ -32,6 +33,8 @@ import org.apache.lucene.util.OpenBitSet; public class FacetQuery extends AbstractWrapperQuery { + private static final Log LOG = LogFactory.getLog(FacetQuery.class); + private final Query[] _facets; private final FacetExecutor _executor; @@ -69,10 +72,11 @@ public class FacetQuery extends AbstractWrapperQuery { if (_rewritten) { return this; } + Query[] facets = new Query[_facets.length]; for (int i = 0; i < _facets.length; i++) { - _facets[i] = _facets[i].rewrite(reader); + facets[i] = _facets[i].rewrite(reader); } - return new FacetQuery(_query.rewrite(reader), _facets, _executor, true); + return new FacetQuery(_query.rewrite(reader), facets, _executor, true); } @Override @@ -125,7 +129,11 @@ public class FacetQuery extends AbstractWrapperQuery { } if (!_executor.scorersAlreadyAdded(context)) { Scorer[] scorers = getScorers(context, true, topScorer, acceptDocs); - _executor.addScorers(context, scorers); + LOG.debug(_executor.getPrefix("Adding scorers for context [{0}] scorers [{1}]"), context, + Arrays.asList(scorers)); + _executor.addScorers(context, scorers); + } else { + LOG.debug(_executor.getPrefix("Scorers already added for context [{0}]"), context); } return new FacetScorer(scorer, _executor, context); } http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/96a1821a/blur-query/src/test/java/org/apache/blur/analysis/type/BaseReadMaskFieldTypeDefinitionTest.java ---------------------------------------------------------------------- diff --git a/blur-query/src/test/java/org/apache/blur/analysis/type/BaseReadMaskFieldTypeDefinitionTest.java b/blur-query/src/test/java/org/apache/blur/analysis/type/BaseReadMaskFieldTypeDefinitionTest.java index 8be969b..4f4f29c 100644 --- a/blur-query/src/test/java/org/apache/blur/analysis/type/BaseReadMaskFieldTypeDefinitionTest.java +++ b/blur-query/src/test/java/org/apache/blur/analysis/type/BaseReadMaskFieldTypeDefinitionTest.java @@ -49,7 +49,6 @@ import org.apache.lucene.index.Fields; import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriterConfig; -import org.apache.lucene.index.SortedDocValues; import org.apache.lucene.index.Term; import org.apache.lucene.index.Terms; import org.apache.lucene.index.TermsEnum; @@ -160,7 +159,7 @@ public abstract class BaseReadMaskFieldTypeDefinitionTest { } private void setupFieldManager(BaseFieldManager fieldManager) throws IOException { - fieldManager.addColumnDefinition(FAM, "string", null, false, "string", true, false, null); + fieldManager.addColumnDefinition(FAM, "string", null, false, "string", false, false, null); fieldManager.addColumnDefinition(FAM, "string2", null, false, "string", false, false, null); fieldManager.addColumnDefinition(FAM, "read", null, false, "acl-read", false, false, null); fieldManager.addColumnDefinition(FAM, "mask", null, false, "read-mask", false, false, null); @@ -222,16 +221,6 @@ public abstract class BaseReadMaskFieldTypeDefinitionTest { assertEquals(defaultReadMask, s); } } - - String s = document.get("fam.string"); - if (s == null || s.equals(getDefaultReadMask())) { - AtomicReader atomicReader = searcher.getIndexReader().leaves().get(0).reader(); - SortedDocValues sortedDocValues = atomicReader.getSortedDocValues("fam.string"); - BytesRef result = new BytesRef(); - sortedDocValues.get(doc, result); - assertEquals(0, result.length); - } - } reader.close(); http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/96a1821a/blur-shell/pom.xml ---------------------------------------------------------------------- diff --git a/blur-shell/pom.xml b/blur-shell/pom.xml index fd7e92d..8834a3c 100644 --- a/blur-shell/pom.xml +++ b/blur-shell/pom.xml @@ -61,6 +61,13 @@ <build> <finalName>blur-shell-${project.version}</finalName> <plugins> + + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>exec-maven-plugin</artifactId> + <version>1.2.1</version> + </plugin> + <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-source-plugin</artifactId> http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/96a1821a/blur-shell/src/main/java/org/apache/blur/shell/ListRunningPlatformCommandsCommand.java ---------------------------------------------------------------------- diff --git a/blur-shell/src/main/java/org/apache/blur/shell/ListRunningPlatformCommandsCommand.java b/blur-shell/src/main/java/org/apache/blur/shell/ListRunningPlatformCommandsCommand.java index 2b48374..d92ad80 100644 --- a/blur-shell/src/main/java/org/apache/blur/shell/ListRunningPlatformCommandsCommand.java +++ b/blur-shell/src/main/java/org/apache/blur/shell/ListRunningPlatformCommandsCommand.java @@ -40,13 +40,24 @@ public class ListRunningPlatformCommandsCommand extends Command { if (args.length != 1) { throw new CommandException("Invalid args: " + help()); } - List<String> commandStatusList = client.commandStatusList(0, Short.MAX_VALUE); RunningSummary runningSummary = new RunningSummary(); for (String id : commandStatusList) { - CommandStatus commandStatus = client.commandStatus(id); + CommandStatus commandStatus; + try { + commandStatus = client.commandStatus(id); + } catch (BlurException e) { + String message = e.getMessage(); + if (message != null && message.startsWith("NOT_FOUND")) { + commandStatus = null; + } else { + throw e; + } + } + if (commandStatus == null) { + continue; + } Map<String, Map<CommandStatusState, Long>> serverStateMap = commandStatus.getServerStateMap(); - out.println(serverStateMap); Map<CommandStatusState, Long> summary = getSummary(serverStateMap); if (summary.containsKey(CommandStatusState.RUNNING)) { runningSummary.add(commandStatus, summary); @@ -56,7 +67,7 @@ public class ListRunningPlatformCommandsCommand extends Command { runningSummary.print(out); } - private Map<CommandStatusState, Long> getSummary(Map<String, Map<CommandStatusState, Long>> serverStateMap) { + public static Map<CommandStatusState, Long> getSummary(Map<String, Map<CommandStatusState, Long>> serverStateMap) { Map<CommandStatusState, Long> map = new HashMap<CommandStatusState, Long>(); for (Map<CommandStatusState, Long> m : serverStateMap.values()) { for (Entry<CommandStatusState, Long> e : m.entrySet()) { @@ -95,18 +106,7 @@ public class ListRunningPlatformCommandsCommand extends Command { String executionId = commandStatus.getExecutionId(); String commandName = commandStatus.getCommandName(); User user = commandStatus.getUser(); - _summary.add(Arrays.asList(executionId, commandName, user.getUsername(), toString(summary))); - } - - private String toString(Map<CommandStatusState, Long> summary) { - StringBuilder builder = new StringBuilder(); - for (Entry<CommandStatusState, Long> e : summary.entrySet()) { - if (builder.length() != 0) { - builder.append(','); - } - builder.append(e.getKey().name()).append(":").append(e.getValue()); - } - return builder.toString(); + _summary.add(Arrays.asList(executionId, commandName, user.getUsername(), toStringSummary(summary))); } public void print(PrintWriter out) { @@ -158,4 +158,15 @@ public class ListRunningPlatformCommandsCommand extends Command { return len; } } + + public static String toStringSummary(Map<CommandStatusState, Long> summary) { + StringBuilder builder = new StringBuilder(); + for (Entry<CommandStatusState, Long> e : summary.entrySet()) { + if (builder.length() != 0) { + builder.append(','); + } + builder.append(e.getKey().name()).append(":").append(e.getValue()); + } + return builder.toString(); + } } http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/96a1821a/blur-shell/src/main/java/org/apache/blur/shell/Main.java ---------------------------------------------------------------------- diff --git a/blur-shell/src/main/java/org/apache/blur/shell/Main.java b/blur-shell/src/main/java/org/apache/blur/shell/Main.java index bf4b0b5..82d8a42 100644 --- a/blur-shell/src/main/java/org/apache/blur/shell/Main.java +++ b/blur-shell/src/main/java/org/apache/blur/shell/Main.java @@ -400,7 +400,7 @@ public class Main { public static String[] shellCommands = { "help", "debug", "timed", "quit", "reset", "user", "whoami", "trace", "trace-remove", "trace-list" }; public static String[] platformCommands = { "command-list", "command-exec", "command-desc", "command-running", - "command-cancel" }; + "command-cancel", "command-watch" }; public static String[] serverCommands = { "logger", "logger-reset", "remove-shard" }; private static class HelpCommand extends Command { @@ -704,6 +704,7 @@ public class Main { register(builder, new ImportDataCommand()); register(builder, new ListRunningPlatformCommandsCommand()); register(builder, new CancelPlatformCommandCommand()); + register(builder, new WatchCommands()); commands = builder.build(); } http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/96a1821a/blur-shell/src/main/java/org/apache/blur/shell/TableDisplay.java ---------------------------------------------------------------------- diff --git a/blur-shell/src/main/java/org/apache/blur/shell/TableDisplay.java b/blur-shell/src/main/java/org/apache/blur/shell/TableDisplay.java index 94783e8..31a08e8 100644 --- a/blur-shell/src/main/java/org/apache/blur/shell/TableDisplay.java +++ b/blur-shell/src/main/java/org/apache/blur/shell/TableDisplay.java @@ -289,7 +289,7 @@ public class TableDisplay implements Closeable { width--; } } - + private void buffer(Canvas canvas, String value, int width) { canvas.append(value); width -= getVisibleLength(value); @@ -683,4 +683,5 @@ public class TableDisplay implements Closeable { public void setStopReadingInput(boolean b) { _stopReadingInput.set(true); } + } http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/96a1821a/blur-shell/src/main/java/org/apache/blur/shell/WatchCommands.java ---------------------------------------------------------------------- diff --git a/blur-shell/src/main/java/org/apache/blur/shell/WatchCommands.java b/blur-shell/src/main/java/org/apache/blur/shell/WatchCommands.java new file mode 100644 index 0000000..4b284e9 --- /dev/null +++ b/blur-shell/src/main/java/org/apache/blur/shell/WatchCommands.java @@ -0,0 +1,149 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.blur.shell; + +import java.io.IOException; +import java.io.PrintWriter; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; + +import jline.console.ConsoleReader; + +import org.apache.blur.thirdparty.thrift_0_9_0.TException; +import org.apache.blur.thrift.generated.Blur; +import org.apache.blur.thrift.generated.Blur.Iface; +import org.apache.blur.thrift.generated.BlurException; +import org.apache.blur.thrift.generated.CommandStatus; +import org.apache.blur.thrift.generated.CommandStatusState; +import org.apache.blur.thrift.generated.User; + +public class WatchCommands extends Command { + + @Override + public void doit(PrintWriter out, Blur.Iface client, String[] args) throws CommandException, TException, + BlurException { + ConsoleReader reader = this.getConsoleReader(); + try { + doitInternal(client, reader); + } catch (IOException e) { + if (Main.debug) { + e.printStackTrace(); + } + throw new CommandException(e.getMessage()); + } finally { + if (reader != null) { + reader.setPrompt(Main.PROMPT); + } + } + } + + private static void doitInternal(Iface client, ConsoleReader reader) throws IOException, TException, CommandException { + TableDisplay tableDisplay = new TableDisplay(reader); + tableDisplay.setSeperator("|"); + tableDisplay.setHeader(0, "id"); + tableDisplay.setHeader(1, "command"); + tableDisplay.setHeader(2, "user"); + tableDisplay.setHeader(3, "summary"); + + final AtomicBoolean running = new AtomicBoolean(true); + tableDisplay.addKeyHook(new Runnable() { + @Override + public void run() { + synchronized (running) { + running.set(false); + running.notifyAll(); + } + } + }, 'q'); + + try { + int maxL = 0; + while (running.get()) { + + List<String> commandStatusList = client.commandStatusList(0, Short.MAX_VALUE); + List<String[]> display = new ArrayList<String[]>(); + for (String id : commandStatusList) { + CommandStatus commandStatus; + try { + commandStatus = client.commandStatus(id); + } catch (BlurException e) { + String message = e.getMessage(); + if (message != null && message.startsWith("NOT_FOUND")) { + commandStatus = null; + } else { + throw e; + } + } + if (commandStatus == null) { + continue; + } + Map<String, Map<CommandStatusState, Long>> serverStateMap = commandStatus.getServerStateMap(); + Map<CommandStatusState, Long> summary = ListRunningPlatformCommandsCommand.getSummary(serverStateMap); + String executionId = commandStatus.getExecutionId(); + String commandName = commandStatus.getCommandName(); + User user = commandStatus.getUser(); + if (summary.containsKey(CommandStatusState.RUNNING)) { + String stringSummary = ListRunningPlatformCommandsCommand.toStringSummary(summary); + display.add(new String[] { executionId, commandName, user.toString(), stringSummary }); + } else if (summary.containsKey(CommandStatusState.INTERRUPTED)) { + display + .add(new String[] { executionId, commandName, user.toString(), CommandStatusState.INTERRUPTED.name() }); + } else { + display.add(new String[] { executionId, commandName, user.toString(), CommandStatusState.COMPLETE.name() }); + } + } + + int l = 0; + for (String[] array : display) { + tableDisplay.set(0, l, array[0]); + tableDisplay.set(1, l, array[1]); + tableDisplay.set(2, l, array[2]); + tableDisplay.set(3, l, array[3]); + l++; + } + if (l > maxL) { + maxL = l; + } + Thread.sleep(3000); + } + } catch (InterruptedException e) { + if (Main.debug) { + e.printStackTrace(); + } + throw new CommandException(e.getMessage()); + } finally { + tableDisplay.close(); + } + } + + @Override + public String description() { + return "Watch commands execute."; + } + + @Override + public String usage() { + return ""; + } + + @Override + public String name() { + return "command-watch"; + } +} http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/96a1821a/blur-store/src/main/java/org/apache/blur/lucene/codec/DiskDocValuesProducer.java ---------------------------------------------------------------------- diff --git a/blur-store/src/main/java/org/apache/blur/lucene/codec/DiskDocValuesProducer.java b/blur-store/src/main/java/org/apache/blur/lucene/codec/DiskDocValuesProducer.java index 3bc6737..fd617b6 100644 --- a/blur-store/src/main/java/org/apache/blur/lucene/codec/DiskDocValuesProducer.java +++ b/blur-store/src/main/java/org/apache/blur/lucene/codec/DiskDocValuesProducer.java @@ -18,8 +18,8 @@ package org.apache.blur.lucene.codec; */ import java.io.IOException; -import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import org.apache.blur.trace.Trace; import org.apache.blur.trace.Tracer; @@ -41,25 +41,33 @@ import org.apache.lucene.util.packed.BlockPackedReader; import org.apache.lucene.util.packed.MonotonicBlockPackedReader; class DiskDocValuesProducer extends DocValuesProducer { - private final Map<Integer,NumericEntry> numerics; - private final Map<Integer,BinaryEntry> binaries; - private final Map<Integer,NumericEntry> ords; - private final Map<Integer,NumericEntry> ordIndexes; + private final Map<Integer, NumericEntry> numerics; + private final Map<Integer, BinaryEntry> binaries; + private final Map<Integer, NumericEntry> ords; + private final Map<Integer, NumericEntry> ordIndexes; + private final Map<Integer, BinaryDocValues> _binaryDocValuesCache; + private final Map<Integer, NumericDocValues> _numericDocValuesCache; + private final Map<Integer, SortedDocValues> _sortedDocValuesCache; + private final Map<Integer, SortedSetDocValues> _sortedSetDocValuesCache; private final IndexInput data; + private final boolean _cache = true; - DiskDocValuesProducer(SegmentReadState state, String dataCodec, String dataExtension, String metaCodec, String metaExtension) throws IOException { + DiskDocValuesProducer(SegmentReadState state, String dataCodec, String dataExtension, String metaCodec, + String metaExtension) throws IOException { String metaName = IndexFileNames.segmentFileName(state.segmentInfo.name, state.segmentSuffix, metaExtension); // read in the entries from the metadata file. IndexInput in = state.directory.openInput(metaName, state.context); boolean success = false; try { - CodecUtil.checkHeader(in, metaCodec, - DiskDocValuesFormat.VERSION_START, - DiskDocValuesFormat.VERSION_START); - numerics = new HashMap<Integer,NumericEntry>(); - ords = new HashMap<Integer,NumericEntry>(); - ordIndexes = new HashMap<Integer,NumericEntry>(); - binaries = new HashMap<Integer,BinaryEntry>(); + CodecUtil.checkHeader(in, metaCodec, DiskDocValuesFormat.VERSION_START, DiskDocValuesFormat.VERSION_START); + numerics = new ConcurrentHashMap<Integer, NumericEntry>(); + ords = new ConcurrentHashMap<Integer, NumericEntry>(); + ordIndexes = new ConcurrentHashMap<Integer, NumericEntry>(); + binaries = new ConcurrentHashMap<Integer, BinaryEntry>(); + _binaryDocValuesCache = new ConcurrentHashMap<Integer, BinaryDocValues>(); + _numericDocValuesCache = new ConcurrentHashMap<Integer, NumericDocValues>(); + _sortedDocValuesCache = new ConcurrentHashMap<Integer, SortedDocValues>(); + _sortedSetDocValuesCache = new ConcurrentHashMap<Integer, SortedSetDocValues>(); readFields(in, state.fieldInfos); success = true; } finally { @@ -69,14 +77,12 @@ class DiskDocValuesProducer extends DocValuesProducer { IOUtils.closeWhileHandlingException(in); } } - + String dataName = IndexFileNames.segmentFileName(state.segmentInfo.name, state.segmentSuffix, dataExtension); data = state.directory.openInput(dataName, state.context); - CodecUtil.checkHeader(data, dataCodec, - DiskDocValuesFormat.VERSION_START, - DiskDocValuesFormat.VERSION_START); + CodecUtil.checkHeader(data, dataCodec, DiskDocValuesFormat.VERSION_START, DiskDocValuesFormat.VERSION_START); } - + private void readFields(IndexInput meta, FieldInfos infos) throws IOException { int fieldNumber = meta.readVInt(); while (fieldNumber != -1) { @@ -96,7 +102,7 @@ class DiskDocValuesProducer extends DocValuesProducer { } BinaryEntry b = readBinaryEntry(meta); binaries.put(fieldNumber, b); - + if (meta.readVInt() != fieldNumber) { throw new CorruptIndexException("sorted entry for field: " + fieldNumber + " is corrupt"); } @@ -115,7 +121,7 @@ class DiskDocValuesProducer extends DocValuesProducer { } BinaryEntry b = readBinaryEntry(meta); binaries.put(fieldNumber, b); - + if (meta.readVInt() != fieldNumber) { throw new CorruptIndexException("sortedset entry for field: " + fieldNumber + " is corrupt"); } @@ -124,7 +130,7 @@ class DiskDocValuesProducer extends DocValuesProducer { } NumericEntry n1 = readNumericEntry(meta); ords.put(fieldNumber, n1); - + if (meta.readVInt() != fieldNumber) { throw new CorruptIndexException("sortedset entry for field: " + fieldNumber + " is corrupt"); } @@ -139,7 +145,7 @@ class DiskDocValuesProducer extends DocValuesProducer { fieldNumber = meta.readVInt(); } } - + static NumericEntry readNumericEntry(IndexInput meta) throws IOException { NumericEntry entry = new NumericEntry(); entry.packedIntsVersion = meta.readVInt(); @@ -148,7 +154,7 @@ class DiskDocValuesProducer extends DocValuesProducer { entry.blockSize = meta.readVInt(); return entry; } - + static BinaryEntry readBinaryEntry(IndexInput meta) throws IOException { BinaryEntry entry = new BinaryEntry(); entry.minLength = meta.readVInt(); @@ -165,15 +171,30 @@ class DiskDocValuesProducer extends DocValuesProducer { @Override public NumericDocValues getNumeric(FieldInfo field) throws IOException { - NumericEntry entry = numerics.get(field.number); - return getNumeric(entry); + NumericDocValues numericDocValues = _numericDocValuesCache.get(field.number); + if (numericDocValues != null) { + return numericDocValues; + } + synchronized (_numericDocValuesCache) { + numericDocValues = _numericDocValuesCache.get(field.number); + if (numericDocValues != null) { + return numericDocValues; + } + NumericEntry entry = numerics.get(field.number); + numericDocValues = newNumeric(entry); + if (_cache && numericDocValues != null) { + _numericDocValuesCache.put(field.number, numericDocValues); + } + return numericDocValues; + } } - - LongNumericDocValues getNumeric(NumericEntry entry) throws IOException { + + LongNumericDocValues newNumeric(NumericEntry entry) throws IOException { final IndexInput data = this.data.clone(); data.seek(entry.offset); - final BlockPackedReader reader = new BlockPackedReader(data, entry.packedIntsVersion, entry.blockSize, entry.count, true); + final BlockPackedReader reader = new BlockPackedReader(data, entry.packedIntsVersion, entry.blockSize, entry.count, + true); return new LongNumericDocValues() { @Override public long get(long id) { @@ -184,6 +205,24 @@ class DiskDocValuesProducer extends DocValuesProducer { @Override public BinaryDocValues getBinary(FieldInfo field) throws IOException { + BinaryDocValues binaryDocValues = _binaryDocValuesCache.get(field.number); + if (binaryDocValues != null) { + return binaryDocValues; + } + synchronized (_binaryDocValuesCache) { + binaryDocValues = _binaryDocValuesCache.get(field.number); + if (binaryDocValues != null) { + return binaryDocValues; + } + binaryDocValues = newBinary(field); + if (_cache && binaryDocValues != null) { + _binaryDocValuesCache.put(field.number, binaryDocValues); + } + return binaryDocValues; + } + } + + private BinaryDocValues newBinary(FieldInfo field) throws IOException { BinaryEntry bytes = binaries.get(field.number); if (bytes.minLength == bytes.maxLength) { return getFixedBinary(field, bytes); @@ -191,20 +230,30 @@ class DiskDocValuesProducer extends DocValuesProducer { return getVariableBinary(field, bytes); } } - + private BinaryDocValues getFixedBinary(FieldInfo field, final BinaryEntry bytes) { final IndexInput data = this.data.clone(); return new LongBinaryDocValues() { + + private final ThreadLocal<IndexInput> in = new ThreadLocal<IndexInput>() { + @Override + protected IndexInput initialValue() { + return data.clone(); + } + }; + @Override public void get(long id, BytesRef result) { long address = bytes.offset + id * bytes.maxLength; try { - data.seek(address); - // NOTE: we could have one buffer, but various consumers (e.g. FieldComparatorSource) + IndexInput indexInput = in.get(); + indexInput.seek(address); + // NOTE: we could have one buffer, but various consumers (e.g. + // FieldComparatorSource) // assume "they" own the bytes after calling this! final byte[] buffer = new byte[bytes.maxLength]; - data.readBytes(buffer, 0, buffer.length); + indexInput.readBytes(buffer, 0, buffer.length); result.bytes = buffer; result.offset = 0; result.length = buffer.length; @@ -214,10 +263,10 @@ class DiskDocValuesProducer extends DocValuesProducer { } }; } - + private BinaryDocValues getVariableBinary(FieldInfo field, final BinaryEntry bytes) throws IOException { final IndexInput data = this.data.clone(); - + Tracer trace = Trace.trace("getSorted - BlockPackedReader - create"); final MonotonicBlockPackedReader addresses; try { @@ -227,17 +276,27 @@ class DiskDocValuesProducer extends DocValuesProducer { trace.done(); } return new LongBinaryDocValues() { + + private final ThreadLocal<IndexInput> _input = new ThreadLocal<IndexInput>() { + @Override + protected IndexInput initialValue() { + return data.clone(); + } + }; + @Override public void get(long id, BytesRef result) { - long startAddress = bytes.offset + (id == 0 ? 0 : addresses.get(id-1)); + long startAddress = bytes.offset + (id == 0 ? 0 : addresses.get(id - 1)); long endAddress = bytes.offset + addresses.get(id); int length = (int) (endAddress - startAddress); try { - data.seek(startAddress); - // NOTE: we could have one buffer, but various consumers (e.g. FieldComparatorSource) + IndexInput indexInput = _input.get(); + indexInput.seek(startAddress); + // NOTE: we could have one buffer, but various consumers (e.g. + // FieldComparatorSource) // assume "they" own the bytes after calling this! final byte[] buffer = new byte[length]; - data.readBytes(buffer, 0, buffer.length); + indexInput.readBytes(buffer, 0, buffer.length); result.bytes = buffer; result.offset = 0; result.length = length; @@ -250,11 +309,29 @@ class DiskDocValuesProducer extends DocValuesProducer { @Override public SortedDocValues getSorted(FieldInfo field) throws IOException { + SortedDocValues sortedDocValues = _sortedDocValuesCache.get(field.number); + if (sortedDocValues != null) { + return sortedDocValues; + } + synchronized (_sortedDocValuesCache) { + sortedDocValues = _sortedDocValuesCache.get(field.number); + if (sortedDocValues != null) { + return sortedDocValues; + } + sortedDocValues = newSortedDocValues(field); + if (_cache && sortedDocValues != null) { + _sortedDocValuesCache.put(field.number, sortedDocValues); + } + return sortedDocValues; + } + } + + private SortedDocValues newSortedDocValues(FieldInfo field) throws IOException { final int valueCount = (int) binaries.get(field.number).count; final BinaryDocValues binary = getBinary(field); Tracer trace = Trace.trace("getSorted - BlockPackedReader - create"); final BlockPackedReader ordinals; - try{ + try { NumericEntry entry = ords.get(field.number); IndexInput data = this.data.clone(); data.seek(entry.offset); @@ -283,14 +360,32 @@ class DiskDocValuesProducer extends DocValuesProducer { @Override public SortedSetDocValues getSortedSet(FieldInfo field) throws IOException { + SortedSetDocValues sortedSetDocValues = _sortedSetDocValuesCache.get(field.number); + if (sortedSetDocValues != null) { + return sortedSetDocValues; + } + synchronized (_sortedSetDocValuesCache) { + sortedSetDocValues = _sortedSetDocValuesCache.get(field.number); + if (sortedSetDocValues != null) { + return sortedSetDocValues; + } + sortedSetDocValues = newSortedSetDocValues(field); + if (_cache && sortedSetDocValues != null) { + _sortedSetDocValuesCache.put(field.number, sortedSetDocValues); + } + return sortedSetDocValues; + } + } + + private SortedSetDocValues newSortedSetDocValues(FieldInfo field) throws IOException { final long valueCount = binaries.get(field.number).count; // we keep the byte[]s and list of ords on disk, these could be large final LongBinaryDocValues binary = (LongBinaryDocValues) getBinary(field); - final LongNumericDocValues ordinals = getNumeric(ords.get(field.number)); + final LongNumericDocValues ordinals = newNumeric(ords.get(field.number)); Tracer trace = Trace.trace("getSortedSet - MonotonicBlockPackedReader - create"); final MonotonicBlockPackedReader ordIndex; - try{ + try { NumericEntry entry = ordIndexes.get(field.number); IndexInput data = this.data.clone(); data.seek(entry.offset); @@ -302,7 +397,7 @@ class DiskDocValuesProducer extends DocValuesProducer { return new SortedSetDocValues() { long offset; long endOffset; - + @Override public long nextOrd() { if (offset == endOffset) { @@ -316,7 +411,7 @@ class DiskDocValuesProducer extends DocValuesProducer { @Override public void setDocument(int docID) { - offset = (docID == 0 ? 0 : ordIndex.get(docID-1)); + offset = (docID == 0 ? 0 : ordIndex.get(docID - 1)); endOffset = ordIndex.get(docID); } @@ -336,7 +431,7 @@ class DiskDocValuesProducer extends DocValuesProducer { public void close() throws IOException { data.close(); } - + static class NumericEntry { long offset; @@ -344,7 +439,7 @@ class DiskDocValuesProducer extends DocValuesProducer { long count; int blockSize; } - + static class BinaryEntry { long offset; @@ -355,23 +450,23 @@ class DiskDocValuesProducer extends DocValuesProducer { int packedIntsVersion; int blockSize; } - + // internally we compose complex dv (sorted/sortedset) from other ones static abstract class LongNumericDocValues extends NumericDocValues { @Override public final long get(int docID) { return get((long) docID); } - + abstract long get(long id); } - + static abstract class LongBinaryDocValues extends BinaryDocValues { @Override public final void get(int docID, BytesRef result) { - get((long)docID, result); + get((long) docID, result); } - + abstract void get(long id, BytesRef Result); } } http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/96a1821a/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/CacheIndexInput.java ---------------------------------------------------------------------- diff --git a/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/CacheIndexInput.java b/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/CacheIndexInput.java index 18b9eda..3803cc5 100644 --- a/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/CacheIndexInput.java +++ b/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/CacheIndexInput.java @@ -224,6 +224,7 @@ public class CacheIndexInput extends IndexInput { @Override public void readBytes(byte[] b, int offset, int len) throws IOException { ensureOpen(); + LOOP: while (len > 0) { tryToFill(); int remaining = remaining(); @@ -232,8 +233,7 @@ public class CacheIndexInput extends IndexInput { _cacheValue.read(_blockPosition, b, offset, length); } catch (EvictionException e) { releaseCache(); - readBytes(b, offset, len); - return; + continue LOOP; } offset += length; len -= length; http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/96a1821a/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/MeterWrapper.java ---------------------------------------------------------------------- diff --git a/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/MeterWrapper.java b/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/MeterWrapper.java index 2e8d245..269245a 100644 --- a/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/MeterWrapper.java +++ b/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/MeterWrapper.java @@ -121,11 +121,14 @@ public abstract class MeterWrapper implements Closeable { } private static void register(String id, SimpleMeter meter, AtomicLong counter) { - { - _counterMap.putIfAbsent(id, new MeterWrapperCounter(meter)); - } - { - _counterMap.get(id).add(counter); + MeterWrapperCounter meterWrapperCounter = new MeterWrapperCounter(meter); + while (true) { + _counterMap.putIfAbsent(id, meterWrapperCounter); + MeterWrapperCounter wrapperCounter = _counterMap.get(id); + if (wrapperCounter != null) { + wrapperCounter.add(counter); + return; + } } } http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/96a1821a/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/cachevalue/DetachableCacheValue.java ---------------------------------------------------------------------- diff --git a/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/cachevalue/DetachableCacheValue.java b/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/cachevalue/DetachableCacheValue.java index e5ce99d..24eb7ca 100644 --- a/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/cachevalue/DetachableCacheValue.java +++ b/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/cachevalue/DetachableCacheValue.java @@ -38,7 +38,6 @@ public class DetachableCacheValue implements CacheValue { } private volatile CacheValue _baseCacheValue; - private volatile boolean _evicted; public DetachableCacheValue(CacheValue cacheValue) { _baseCacheValue = cacheValue; @@ -46,7 +45,6 @@ public class DetachableCacheValue implements CacheValue { @Override public CacheValue detachFromCache() { - _evicted = true; if (_baseCacheValue instanceof ByteArrayCacheValue) { // already detached return null; @@ -64,8 +62,11 @@ public class DetachableCacheValue implements CacheValue { @Override public int length() throws EvictionException { - checkEviction(); - return _baseCacheValue.length(); + try { + return _baseCacheValue.length(); + } catch (NullPointerException npe) { + throw new EvictionException(); + } } @Override @@ -75,20 +76,20 @@ public class DetachableCacheValue implements CacheValue { @Override public void read(int position, byte[] buf, int offset, int length) throws EvictionException { - checkEviction(); - _baseCacheValue.read(position, buf, offset, length); - } - - private void checkEviction() throws EvictionException { - if (_evicted) { + try { + _baseCacheValue.read(position, buf, offset, length); + } catch (NullPointerException npe) { throw new EvictionException(); } } @Override public byte read(int position) throws EvictionException { - checkEviction(); - return _baseCacheValue.read(position); + try { + return _baseCacheValue.read(position); + } catch (NullPointerException npe) { + throw new EvictionException(); + } } @Override @@ -100,20 +101,29 @@ public class DetachableCacheValue implements CacheValue { @Override public short readShort(int position) throws EvictionException { - checkEviction(); - return _baseCacheValue.readShort(position); + try { + return _baseCacheValue.readShort(position); + } catch (NullPointerException npe) { + throw new EvictionException(); + } } @Override public int readInt(int position) throws EvictionException { - checkEviction(); - return _baseCacheValue.readInt(position); + try { + return _baseCacheValue.readInt(position); + } catch (NullPointerException npe) { + throw new EvictionException(); + } } @Override public long readLong(int position) throws EvictionException { - checkEviction(); - return _baseCacheValue.readLong(position); + try { + return _baseCacheValue.readLong(position); + } catch (NullPointerException npe) { + throw new EvictionException(); + } } @Override @@ -123,7 +133,7 @@ public class DetachableCacheValue implements CacheValue { @Override public boolean isEvicted() { - return _evicted; + return _baseCacheValue == null; } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/96a1821a/blur-store/src/main/java/org/apache/blur/store/hdfs/SequentialReadControl.java ---------------------------------------------------------------------- diff --git a/blur-store/src/main/java/org/apache/blur/store/hdfs/SequentialReadControl.java b/blur-store/src/main/java/org/apache/blur/store/hdfs/SequentialReadControl.java index ac72eb9..34371e5 100644 --- a/blur-store/src/main/java/org/apache/blur/store/hdfs/SequentialReadControl.java +++ b/blur-store/src/main/java/org/apache/blur/store/hdfs/SequentialReadControl.java @@ -44,7 +44,8 @@ public class SequentialReadControl implements Cloneable { public SequentialReadControl clone() { try { SequentialReadControl control = (SequentialReadControl) super.clone(); - setup(_configuration, control); + // Setup too heavy for clones + // setup(_configuration, control); return control; } catch (CloneNotSupportedException e) { throw new RuntimeException(e); http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/96a1821a/blur-store/src/main/java/org/apache/lucene/util/packed/DirectPacked64SingleBlockReader.java ---------------------------------------------------------------------- diff --git a/blur-store/src/main/java/org/apache/lucene/util/packed/DirectPacked64SingleBlockReader.java b/blur-store/src/main/java/org/apache/lucene/util/packed/DirectPacked64SingleBlockReader.java new file mode 100644 index 0000000..e5d495b --- /dev/null +++ b/blur-store/src/main/java/org/apache/lucene/util/packed/DirectPacked64SingleBlockReader.java @@ -0,0 +1,65 @@ +package org.apache.lucene.util.packed; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.IOException; + +import org.apache.lucene.store.IndexInput; + +final class DirectPacked64SingleBlockReader extends PackedInts.ReaderImpl { + + private final ThreadLocal<IndexInput> in; + private final long startPointer; + private final int valuesPerBlock; + private final long mask; + + DirectPacked64SingleBlockReader(int bitsPerValue, int valueCount, + IndexInput input) { + super(valueCount, bitsPerValue); + this.in = new ThreadLocal<IndexInput>() { + @Override + protected IndexInput initialValue() { + return input.clone(); + } + }; + startPointer = input.getFilePointer(); + valuesPerBlock = 64 / bitsPerValue; + mask = ~(~0L << bitsPerValue); + } + + @Override + public long get(int index) { + final int blockOffset = index / valuesPerBlock; + final long skip = ((long) blockOffset) << 3; + try { + IndexInput indexInput = in.get(); + indexInput.seek(startPointer + skip); + + long block = indexInput.readLong(); + final int offsetInBlock = index % valuesPerBlock; + return (block >>> (offsetInBlock * bitsPerValue)) & mask; + } catch (IOException e) { + throw new IllegalStateException("failed", e); + } + } + + @Override + public long ramBytesUsed() { + return 0; + } +} http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/96a1821a/blur-store/src/main/java/org/apache/lucene/util/packed/DirectPackedReader.java ---------------------------------------------------------------------- diff --git a/blur-store/src/main/java/org/apache/lucene/util/packed/DirectPackedReader.java b/blur-store/src/main/java/org/apache/lucene/util/packed/DirectPackedReader.java new file mode 100644 index 0000000..9483a10 --- /dev/null +++ b/blur-store/src/main/java/org/apache/lucene/util/packed/DirectPackedReader.java @@ -0,0 +1,80 @@ +package org.apache.lucene.util.packed; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.IOException; + +import org.apache.lucene.store.IndexInput; + +/* Reads directly from disk on each get */ +class DirectPackedReader extends PackedInts.ReaderImpl { + private final long startPointer; + private final ThreadLocal<IndexInput> in; + + public DirectPackedReader(int bitsPerValue, int valueCount, IndexInput input) { + super(valueCount, bitsPerValue); + this.in = new ThreadLocal<IndexInput>() { + @Override + protected IndexInput initialValue() { + return input.clone(); + } + }; + startPointer = input.getFilePointer(); + } + + @Override + public long get(int index) { + final long majorBitPos = (long)index * bitsPerValue; + final long elementPos = majorBitPos >>> 3; + try { + IndexInput indexInput = in.get(); + indexInput .seek(startPointer + elementPos); + + final byte b0 = indexInput.readByte(); + final int bitPos = (int) (majorBitPos & 7); + if (bitPos + bitsPerValue <= 8) { + // special case: all bits are in the first byte + return (b0 & ((1L << (8 - bitPos)) - 1)) >>> (8 - bitPos - bitsPerValue); + } + + // take bits from the first byte + int remainingBits = bitsPerValue - 8 + bitPos; + long result = (b0 & ((1L << (8 - bitPos)) - 1)) << remainingBits; + + // add bits from inner bytes + while (remainingBits >= 8) { + remainingBits -= 8; + result |= (indexInput.readByte() & 0xFFL) << remainingBits; + } + + // take bits from the last byte + if (remainingBits > 0) { + result |= (indexInput.readByte() & 0xFFL) >>> (8 - remainingBits); + } + + return result; + } catch (IOException ioe) { + throw new IllegalStateException("failed", ioe); + } + } + + @Override + public long ramBytesUsed() { + return 0; + } +} http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/96a1821a/blur-util/src/main/java/org/apache/blur/utils/BlurConstants.java ---------------------------------------------------------------------- diff --git a/blur-util/src/main/java/org/apache/blur/utils/BlurConstants.java b/blur-util/src/main/java/org/apache/blur/utils/BlurConstants.java index 472481b..509c0f0 100644 --- a/blur-util/src/main/java/org/apache/blur/utils/BlurConstants.java +++ b/blur-util/src/main/java/org/apache/blur/utils/BlurConstants.java @@ -202,8 +202,11 @@ public class BlurConstants { public static final String SHARED_MERGE_SCHEDULER_PREFIX = "shared-merge-scheduler"; public static final String BLUR_FILTER_ALIAS = "blur.filter.alias."; - + public static final String HADOOP_CONF = "hadoop_conf."; + public static final String UPDATE_ROW = "_update_row_"; + public static final String NEW_ROW = "_new_row_"; + public static final String INTERNAL = "blur.internal"; static { try { http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/96a1821a/deploy.sh ---------------------------------------------------------------------- diff --git a/deploy.sh b/deploy.sh new file mode 100644 index 0000000..2731a85 --- /dev/null +++ b/deploy.sh @@ -0,0 +1,62 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +URL="<URL HERE>" +REPO_ID="snapshots" + +#mvn install -D${PROFILE} -DskipTests +#[ $? -eq 0 ] || exit $?; + +CUR_DIR=`pwd` +for FILE in *; do + if [ -d $FILE ] + then + if [ -f $FILE/pom.xml ] + then + echo "#######################################" + echo "# Deploying $FILE" + echo "#######################################" + + cd $FILE + + VERSION=`mvn help:evaluate -Dexpression=project.version | grep -v "\[INFO\]" | grep -v "\[WARNING\]"` + ARTIFACT=`mvn help:evaluate -Dexpression=project.artifactId | grep -v "\[INFO\]" | grep -v "\[WARNING\]"` + + JAR="target/${ARTIFACT}-${VERSION}.jar" + JAR_SOURCES="target/${ARTIFACT}-${VERSION}-sources.jar" + TESTS_JAR="target/${ARTIFACT}-${VERSION}-tests.jar" + if [ -f $JAR ] + then + if [ -f target/effective-pom.xml ] + then + echo "Args PWD=$PWD REPO_ID=${REPO_ID} URL=${URL} ARTIFACT=${ARTIFACT} VERSION=${VERSION}" + if [ -f $TESTS_JAR ] + then + mvn deploy:deploy-file -DrepositoryId=${REPO_ID} -Durl=${URL} -Dfile=$JAR -DpomFile=target/effective-pom.xml -Dtypes=jar -Dclassifiers=tests -Dfiles=$TESTS_JAR -Dsources=$JAR_SOURCES + else + mvn deploy:deploy-file -DrepositoryId=${REPO_ID} -Durl=${URL} -Dfile=$JAR -DpomFile=target/effective-pom.xml + fi + [ $? -eq 0 ] || exit $?; + else + echo "No effective-pom.xml to deploy, SKIPPING." + fi + fi + cd $CUR_DIR + fi + fi +done +