Second patch of updates.
Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/01416562 Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/01416562 Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/01416562 Branch: refs/heads/master Commit: 014165621b11c6c1814d5233baae9f85f99c6ae6 Parents: 0e8d0e8 Author: Aaron McCurry <amccu...@gmail.com> Authored: Sat May 7 13:10:19 2016 -0400 Committer: Aaron McCurry <amccu...@gmail.com> Committed: Sat May 7 13:10:19 2016 -0400 ---------------------------------------------------------------------- .../apache/blur/command/BaseCommandManager.java | 23 ++- .../java/org/apache/blur/command/Command.java | 2 +- .../blur/command/ControllerClusterContext.java | 6 +- .../org/apache/blur/command/ResponseFuture.java | 4 + .../blur/manager/stats/MergerTableStats.java | 2 +- .../manager/writer/BlurIndexSimpleWriter.java | 72 ++++---- .../blur/manager/writer/IndexImporter.java | 130 +++++++++++--- .../writer/SnapshotIndexDeletionPolicy.java | 81 +++++---- .../blur/thrift/BlurControllerServer.java | 80 +++------ .../org/apache/blur/thrift/BlurShardServer.java | 6 +- .../blur/thrift/ThriftBlurShardServer.java | 2 +- .../java/org/apache/blur/utils/GCWatcher.java | 28 ++- .../org/apache/blur/utils/GCWatcherJdk6.java | 2 +- .../blur/manager/writer/IndexImporterTest.java | 17 ++ .../index/FilterAccessControlFactory.java | 1 + ...etDocumentVisibilityFilterCacheStrategy.java | 172 ++++++++++++++++++- 16 files changed, 448 insertions(+), 180 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/01416562/blur-core/src/main/java/org/apache/blur/command/BaseCommandManager.java ---------------------------------------------------------------------- diff --git a/blur-core/src/main/java/org/apache/blur/command/BaseCommandManager.java b/blur-core/src/main/java/org/apache/blur/command/BaseCommandManager.java index ad542ef..be92e34 100644 --- a/blur-core/src/main/java/org/apache/blur/command/BaseCommandManager.java +++ b/blur-core/src/main/java/org/apache/blur/command/BaseCommandManager.java @@ -134,19 +134,20 @@ public abstract class BaseCommandManager implements Closeable { } public CommandStatus getCommandStatus(String commandExecutionId) { - CommandStatus cso = findCommandStatusObject(commandExecutionId, _workerRunningMap.values()); - if (cso != null) { - return cso; - } - return findCommandStatusObject(commandExecutionId, _driverRunningMap.values()); + CommandStatus cso1 = findCommandStatusObject(commandExecutionId, _workerRunningMap.values()); + CommandStatus cso2 = findCommandStatusObject(commandExecutionId, _driverRunningMap.values()); + return CommandStatusUtil.mergeCommandStatus(cso1, cso2); } private CommandStatus findCommandStatusObject(String commandExecutionId, Collection<ResponseFuture<?>> values) { Map<String, Map<CommandStatusState, Long>> serverStateMap = new HashMap<String, Map<CommandStatusState, Long>>(); CommandStatus commandStatus = null; for (ResponseFuture<?> responseFuture : values) { + if (responseFuture == null) { + continue; + } Command<?> commandExecuting = responseFuture.getCommandExecuting(); - if (commandExecuting.getCommandExecutionId().equals(commandExecutionId)) { + if (commandExecutionId.equals(commandExecuting.getCommandExecutionId())) { if (commandStatus == null) { CommandStatus originalCommandStatusObject = responseFuture.getOriginalCommandStatusObject(); String commandName = responseFuture.getCommandExecuting().getName(); @@ -182,7 +183,10 @@ public abstract class BaseCommandManager implements Closeable { List<String> result = new ArrayList<String>(); for (ResponseFuture<?> responseFuture : values) { Command<?> commandExecuting = responseFuture.getCommandExecuting(); - result.add(commandExecuting.getCommandExecutionId()); + String commandExecutionId = commandExecuting.getCommandExecutionId(); + if (commandExecutionId != null) { + result.add(commandExecutionId); + } } return result; } @@ -400,11 +404,12 @@ public abstract class BaseCommandManager implements Closeable { } protected Response submitDriverCallable(Callable<Response> callable, Command<?> commandExecuting, - CommandStatus originalCommandStatusObject, AtomicBoolean running) throws IOException, TimeoutException, ExceptionCollector { + CommandStatus originalCommandStatusObject, AtomicBoolean running) throws IOException, TimeoutException, + ExceptionCollector { Future<Response> future = _executorServiceDriver.submit(callable); Long instanceExecutionId = getInstanceExecutionId(); _driverRunningMap.put(instanceExecutionId, new ResponseFuture<Response>(_runningCacheTombstoneTime, future, - commandExecuting, originalCommandStatusObject,running)); + commandExecuting, originalCommandStatusObject, running)); try { return future.get(_connectionTimeout, TimeUnit.MILLISECONDS); } catch (CancellationException e) { http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/01416562/blur-core/src/main/java/org/apache/blur/command/Command.java ---------------------------------------------------------------------- diff --git a/blur-core/src/main/java/org/apache/blur/command/Command.java b/blur-core/src/main/java/org/apache/blur/command/Command.java index 9cf2719..ff6f559 100644 --- a/blur-core/src/main/java/org/apache/blur/command/Command.java +++ b/blur-core/src/main/java/org/apache/blur/command/Command.java @@ -30,7 +30,7 @@ import org.apache.blur.thrift.generated.Blur.Iface; public abstract class Command<R> implements Cloneable { - @OptionalArgument("The ") + @OptionalArgument private String commandExecutionId; public abstract String getName(); http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/01416562/blur-core/src/main/java/org/apache/blur/command/ControllerClusterContext.java ---------------------------------------------------------------------- diff --git a/blur-core/src/main/java/org/apache/blur/command/ControllerClusterContext.java b/blur-core/src/main/java/org/apache/blur/command/ControllerClusterContext.java index b7a1a63..59f5b7c 100644 --- a/blur-core/src/main/java/org/apache/blur/command/ControllerClusterContext.java +++ b/blur-core/src/main/java/org/apache/blur/command/ControllerClusterContext.java @@ -23,6 +23,7 @@ import org.apache.blur.thirdparty.thrift_0_9_0.TException; import org.apache.blur.thrift.BlurClientManager; import org.apache.blur.thrift.ClientPool; import org.apache.blur.thrift.Connection; +import org.apache.blur.thrift.UserConverter; import org.apache.blur.thrift.generated.Arguments; import org.apache.blur.thrift.generated.Blur.Client; import org.apache.blur.thrift.generated.BlurException; @@ -31,6 +32,7 @@ import org.apache.blur.thrift.generated.Response; import org.apache.blur.thrift.generated.TimeoutException; import org.apache.blur.thrift.generated.ValueObject; import org.apache.blur.trace.Tracer; +import org.apache.blur.user.UserContext; /** * Licensed to the Apache Software Foundation (ASF) under one or more @@ -134,7 +136,7 @@ public class ControllerClusterContext extends ClusterContext implements Closeabl final Arguments arguments = _manager.toArguments(command); - CommandStatus originalCommandStatusObject = new CommandStatus(null, command.getName(), arguments, null, null); + CommandStatus originalCommandStatusObject = new CommandStatus(null, command.getName(), arguments, null, UserConverter.toThriftUser(UserContext.getUser())); for (Entry<Server, Client> e : clientMap.entrySet()) { Server server = e.getKey(); final Client client = e.getValue(); @@ -226,7 +228,7 @@ public class ControllerClusterContext extends ClusterContext implements Closeabl Set<Shard> shards = command.routeShards(this, tables); Map<Server, Client> clientMap = getClientMap(command, tables, shards); final Arguments arguments = _manager.toArguments(command); - CommandStatus originalCommandStatusObject = new CommandStatus(null, command.getName(), arguments, null, null); + CommandStatus originalCommandStatusObject = new CommandStatus(null, command.getName(), arguments, null, UserConverter.toThriftUser(UserContext.getUser())); for (Entry<Server, Client> e : clientMap.entrySet()) { Server server = e.getKey(); final Client client = e.getValue(); http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/01416562/blur-core/src/main/java/org/apache/blur/command/ResponseFuture.java ---------------------------------------------------------------------- diff --git a/blur-core/src/main/java/org/apache/blur/command/ResponseFuture.java b/blur-core/src/main/java/org/apache/blur/command/ResponseFuture.java index a5a629e..ef4a046 100644 --- a/blur-core/src/main/java/org/apache/blur/command/ResponseFuture.java +++ b/blur-core/src/main/java/org/apache/blur/command/ResponseFuture.java @@ -16,6 +16,7 @@ */ package org.apache.blur.command; +import java.util.UUID; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -39,6 +40,9 @@ public class ResponseFuture<T> implements Future<T> { _tombstone = tombstone; _future = future; _commandExecuting = commandExecuting; + if (_commandExecuting.getCommandExecutionId() == null) { + _commandExecuting.setCommandExecutionId(UUID.randomUUID().toString()); + } _originalCommandStatusObject = originalCommandStatusObject; _running = running; } http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/01416562/blur-core/src/main/java/org/apache/blur/manager/stats/MergerTableStats.java ---------------------------------------------------------------------- diff --git a/blur-core/src/main/java/org/apache/blur/manager/stats/MergerTableStats.java b/blur-core/src/main/java/org/apache/blur/manager/stats/MergerTableStats.java index 2f89c9e..4ac5631 100644 --- a/blur-core/src/main/java/org/apache/blur/manager/stats/MergerTableStats.java +++ b/blur-core/src/main/java/org/apache/blur/manager/stats/MergerTableStats.java @@ -46,7 +46,7 @@ public class MergerTableStats implements Merger<TableStats> { private TableStats merge(TableStats s1, TableStats s2) { s1.tableName = s2.tableName; - s1.bytes = Math.max(s1.bytes, s2.bytes); + s1.bytes = s1.bytes + s2.bytes; s1.recordCount = s1.recordCount + s2.recordCount; s1.rowCount = s1.rowCount + s2.rowCount; s1.segmentImportInProgressCount = s1.segmentImportInProgressCount + s2.segmentImportInProgressCount; http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/01416562/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java ---------------------------------------------------------------------- diff --git a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java index e21a952..ff17e27 100644 --- a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java +++ b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java @@ -25,7 +25,6 @@ import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_INDEX_WRITER_SORT_M import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_QUEUE_MAX_INMEMORY_LENGTH; import java.io.Closeable; -import java.io.FileNotFoundException; import java.io.IOException; import java.lang.reflect.Method; import java.util.ArrayList; @@ -77,6 +76,7 @@ import org.apache.blur.user.User; import org.apache.blur.user.UserContext; import org.apache.blur.utils.BlurConstants; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -428,31 +428,31 @@ public class BlurIndexSimpleWriter extends BlurIndex { } private void closeWriter() { - if (_lastWrite.get() + _maxWriterIdle < System.currentTimeMillis()) { - synchronized (_writer) { - _writeLock.lock(); - try { - BlurIndexWriter writer = _writer.getAndSet(null); - if (writer != null) { - LOG.info("Closing idle writer for table [{0}] shard [{1}]", _tableContext.getTable(), - _shardContext.getShard()); - IOUtils.cleanup(LOG, writer); - } - } finally { - _writeLock.unlock(); + _writeLock.lock(); + try { + if (_lastWrite.get() + _maxWriterIdle < System.currentTimeMillis()) { + BlurIndexWriter writer = _writer.getAndSet(null); + if (writer != null) { + LOG.info("Closing idle writer for table [{0}] shard [{1}]", _tableContext.getTable(), + _shardContext.getShard()); + IOUtils.cleanup(LOG, writer); } } + } finally { + _writeLock.unlock(); } } + /** + * Testing only. + */ protected boolean isWriterClosed() { - synchronized (_writer) { - return _writer.get() == null; - } + return _writer.get() == null; } private BlurIndexWriter getBlurIndexWriter() throws IOException { - synchronized (_writer) { + _writeLock.lock(); + try { BlurIndexWriter blurIndexWriter = _writer.get(); if (blurIndexWriter == null) { blurIndexWriter = new BlurIndexWriter(_directory, _conf.clone()); @@ -460,12 +460,17 @@ public class BlurIndexSimpleWriter extends BlurIndex { _lastWrite.set(System.currentTimeMillis()); } return blurIndexWriter; + } finally { + _writeLock.unlock(); } } private void resetBlurIndexWriter() { - synchronized (_writer) { + _writeLock.lock(); + try { _writer.set(null); + } finally { + _writeLock.unlock(); } } @@ -501,22 +506,12 @@ public class BlurIndexSimpleWriter extends BlurIndex { @Override public void createSnapshot(String name) throws IOException { - _writeLock.lock(); - try { - _snapshotIndexDeletionPolicy.createSnapshot(name, _indexReader.get(), _context); - } finally { - _writeLock.unlock(); - } + _snapshotIndexDeletionPolicy.createSnapshot(name, _indexReader.get(), _context); } @Override public void removeSnapshot(String name) throws IOException { - _writeLock.lock(); - try { - _snapshotIndexDeletionPolicy.removeSnapshot(name, _context); - } finally { - _writeLock.unlock(); - } + _snapshotIndexDeletionPolicy.removeSnapshot(name, _context); } @Override @@ -1024,17 +1019,10 @@ public class BlurIndexSimpleWriter extends BlurIndex { @Override public long getOnDiskSize() throws IOException { - long total = 0; - String[] listAll = _directory.listAll(); - for (String name : listAll) { - try { - total += _directory.fileLength(name); - } catch (FileNotFoundException e) { - // If file is not found that means that is was removed between the time - // we started iterating over the file names and when we asked for it's - // size. - } - } - return total; + Path hdfsDirPath = _shardContext.getHdfsDirPath(); + Configuration configuration = _tableContext.getConfiguration(); + FileSystem fileSystem = hdfsDirPath.getFileSystem(configuration); + ContentSummary contentSummary = fileSystem.getContentSummary(hdfsDirPath); + return contentSummary.getLength(); } } http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/01416562/blur-core/src/main/java/org/apache/blur/manager/writer/IndexImporter.java ---------------------------------------------------------------------- diff --git a/blur-core/src/main/java/org/apache/blur/manager/writer/IndexImporter.java b/blur-core/src/main/java/org/apache/blur/manager/writer/IndexImporter.java index 42171e8..33db0ae 100644 --- a/blur-core/src/main/java/org/apache/blur/manager/writer/IndexImporter.java +++ b/blur-core/src/main/java/org/apache/blur/manager/writer/IndexImporter.java @@ -36,6 +36,7 @@ import org.apache.blur.log.Log; import org.apache.blur.log.LogFactory; import org.apache.blur.lucene.search.IndexSearcherCloseable; import org.apache.blur.manager.BlurPartitioner; +import org.apache.blur.manager.writer.MergeSortRowIdLookup.Action; import org.apache.blur.server.ShardContext; import org.apache.blur.server.TableContext; import org.apache.blur.server.cache.ThriftCache; @@ -54,12 +55,16 @@ import org.apache.lucene.index.AtomicReader; import org.apache.lucene.index.AtomicReaderContext; import org.apache.lucene.index.CompositeReaderContext; import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.DocsEnum; import org.apache.lucene.index.Fields; +import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.Term; import org.apache.lucene.index.Terms; import org.apache.lucene.index.TermsEnum; +import org.apache.lucene.search.DocIdSetIterator; import org.apache.lucene.store.Directory; +import org.apache.lucene.util.Bits; import org.apache.lucene.util.BytesRef; public class IndexImporter extends TimerTask implements Closeable { @@ -292,7 +297,9 @@ public class IndexImporter extends TimerTask implements Closeable { public void performMutate(IndexSearcherCloseable searcher, IndexWriter writer) throws IOException { LOG.info("About to import [{0}] into [{1}/{2}]", directory, _shard, _table); boolean emitDeletes = searcher.getIndexReader().numDocs() != 0; - applyDeletes(directory, writer, _shard, emitDeletes); + Configuration configuration = _shardContext.getTableContext().getConfiguration(); + + applyDeletes(directory, writer, searcher, _shard, emitDeletes, configuration); LOG.info("Add index [{0}] [{1}/{2}]", directory, _shard, _table); writer.addIndexes(directory); LOG.info("Removing delete markers [{0}] on [{1}/{2}]", directory, _shard, _table); @@ -336,40 +343,113 @@ public class IndexImporter extends TimerTask implements Closeable { return result; } - private void applyDeletes(Directory directory, IndexWriter indexWriter, String shard, boolean emitDeletes) - throws IOException { - DirectoryReader reader = DirectoryReader.open(directory); + private void applyDeletes(Directory directory, IndexWriter indexWriter, IndexSearcherCloseable searcher, + String shard, boolean emitDeletes, Configuration configuration) throws IOException { + DirectoryReader newReader = DirectoryReader.open(directory); try { - LOG.info("Applying deletes in reader [{0}]", reader); - CompositeReaderContext compositeReaderContext = reader.getContext(); - List<AtomicReaderContext> leaves = compositeReaderContext.leaves(); + List<AtomicReaderContext> newLeaves = newReader.getContext().leaves(); BlurPartitioner blurPartitioner = new BlurPartitioner(); Text key = new Text(); int numberOfShards = _shardContext.getTableContext().getDescriptor().getShardCount(); int shardId = ShardUtil.getShardIndex(shard); - for (AtomicReaderContext context : leaves) { - AtomicReader atomicReader = context.reader(); - Fields fields = atomicReader.fields(); - Terms terms = fields.terms(BlurConstants.ROW_ID); - if (terms != null) { - TermsEnum termsEnum = terms.iterator(null); - BytesRef ref = null; - while ((ref = termsEnum.next()) != null) { - key.set(ref.bytes, ref.offset, ref.length); - int partition = blurPartitioner.getPartition(key, null, numberOfShards); - if (shardId != partition) { - throw new IOException("Index is corrupted, RowIds are found in wrong shard, partition [" + partition - + "] does not shard [" + shardId + "], this can happen when rows are not hashed correctly."); - } - if (emitDeletes) { - indexWriter.deleteDocuments(new Term(BlurConstants.ROW_ID, BytesRef.deepCopyOf(ref))); - } + + Action action = new Action() { + @Override + public void found(AtomicReader reader, Bits liveDocs, TermsEnum termsEnum) throws IOException { + DocsEnum docsEnum = termsEnum.docs(liveDocs, null); + if (docsEnum.nextDoc() != DocIdSetIterator.NO_MORE_DOCS) { + indexWriter.deleteDocuments(new Term(BlurConstants.ROW_ID, BytesRef.deepCopyOf(termsEnum.term()))); } } + }; + + LOG.info("Applying deletes for table [{0}] shard [{1}] new reader [{2}]", _table, shard, newReader); + boolean skipCheckRowIds = isInternal(newReader); + LOG.info("Skip rowid check [{0}] for table [{1}] shard [{2}] new reader [{3}]", skipCheckRowIds, _table, shard, + newReader); + for (AtomicReaderContext context : newLeaves) { + AtomicReader newAtomicReader = context.reader(); + if (isFastRowIdDeleteSupported(newAtomicReader)) { + runNewRowIdCheckAndDelete(indexWriter, emitDeletes, blurPartitioner, key, numberOfShards, shardId, + newAtomicReader, skipCheckRowIds); + } else { + runOldMergeSortRowIdCheckAndDelete(emitDeletes, searcher.getIndexReader(), blurPartitioner, key, + numberOfShards, shardId, action, newAtomicReader); + } } } finally { - reader.close(); + newReader.close(); + } + } + + private boolean isInternal(DirectoryReader reader) throws IOException { + Map<String, String> map = reader.getIndexCommit().getUserData(); + return BlurConstants.INTERNAL.equals(map.get(BlurConstants.INTERNAL)); + } + + private void runNewRowIdCheckAndDelete(IndexWriter indexWriter, boolean emitDeletes, BlurPartitioner blurPartitioner, + Text key, int numberOfShards, int shardId, AtomicReader atomicReader, boolean skipCheckRowIds) throws IOException { + Fields fields = atomicReader.fields(); + if (skipCheckRowIds) { + Terms rowIdTerms = fields.terms(BlurConstants.ROW_ID); + if (rowIdTerms != null) { + LOG.info("Checking rowIds for import on table [{0}] shard [{1}]", _table, _shard); + TermsEnum rowIdTermsEnum = rowIdTerms.iterator(null); + BytesRef ref = null; + while ((ref = rowIdTermsEnum.next()) != null) { + key.set(ref.bytes, ref.offset, ref.length); + int partition = blurPartitioner.getPartition(key, null, numberOfShards); + if (shardId != partition) { + throw new IOException("Index is corrupted, RowIds are found in wrong shard, partition [" + partition + + "] does not shard [" + shardId + "], this can happen when rows are not hashed correctly."); + } + } + } + } + if (emitDeletes) { + Terms rowIdsToDeleteTerms = fields.terms(BlurConstants.UPDATE_ROW); + if (rowIdsToDeleteTerms != null) { + LOG.info("Performing deletes on rowIds for import on table [{0}] shard [{1}]", _table, _shard); + TermsEnum rowIdsToDeleteTermsEnum = rowIdsToDeleteTerms.iterator(null); + BytesRef ref = null; + while ((ref = rowIdsToDeleteTermsEnum.next()) != null) { + indexWriter.deleteDocuments(new Term(BlurConstants.ROW_ID, BytesRef.deepCopyOf(ref))); + } + } + } + } + + private void runOldMergeSortRowIdCheckAndDelete(boolean emitDeletes, IndexReader currentIndexReader, + BlurPartitioner blurPartitioner, Text key, int numberOfShards, int shardId, Action action, + AtomicReader atomicReader) throws IOException { + MergeSortRowIdLookup lookup = new MergeSortRowIdLookup(currentIndexReader); + Fields fields = atomicReader.fields(); + Terms terms = fields.terms(BlurConstants.ROW_ID); + if (terms != null) { + TermsEnum termsEnum = terms.iterator(null); + BytesRef ref = null; + while ((ref = termsEnum.next()) != null) { + key.set(ref.bytes, ref.offset, ref.length); + int partition = blurPartitioner.getPartition(key, null, numberOfShards); + if (shardId != partition) { + throw new IOException("Index is corrupted, RowIds are found in wrong shard, partition [" + partition + + "] does not shard [" + shardId + "], this can happen when rows are not hashed correctly."); + } + if (emitDeletes) { + lookup.lookup(ref, action); + } + } + } + } + + private boolean isFastRowIdDeleteSupported(AtomicReader atomicReader) throws IOException { + if (atomicReader.fields().terms(BlurConstants.NEW_ROW) != null) { + return true; + } + if (atomicReader.fields().terms(BlurConstants.UPDATE_ROW) != null) { + return true; } + return false; } public void cleanupOldDirs() throws IOException { http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/01416562/blur-core/src/main/java/org/apache/blur/manager/writer/SnapshotIndexDeletionPolicy.java ---------------------------------------------------------------------- diff --git a/blur-core/src/main/java/org/apache/blur/manager/writer/SnapshotIndexDeletionPolicy.java b/blur-core/src/main/java/org/apache/blur/manager/writer/SnapshotIndexDeletionPolicy.java index 30690e5..15d9272 100644 --- a/blur-core/src/main/java/org/apache/blur/manager/writer/SnapshotIndexDeletionPolicy.java +++ b/blur-core/src/main/java/org/apache/blur/manager/writer/SnapshotIndexDeletionPolicy.java @@ -28,6 +28,8 @@ import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; import org.apache.blur.log.Log; import org.apache.blur.log.LogFactory; @@ -54,6 +56,7 @@ public class SnapshotIndexDeletionPolicy extends IndexDeletionPolicy { private final Path _path; private final Map<String, Long> _namesToGenerations = new ConcurrentHashMap<String, Long>(); private final Map<Long, Set<String>> _generationsToNames = new ConcurrentHashMap<Long, Set<String>>(); + private final WriteLock _writeLock = new ReentrantReadWriteLock().writeLock(); public SnapshotIndexDeletionPolicy(Configuration configuration, Path path) throws IOException { _configuration = configuration; @@ -70,13 +73,18 @@ public class SnapshotIndexDeletionPolicy extends IndexDeletionPolicy { @Override public void onCommit(List<? extends IndexCommit> commits) throws IOException { - int size = commits.size(); - for (int i = 0; i < size - 1; i++) { - IndexCommit indexCommit = commits.get(i); - long generation = indexCommit.getGeneration(); - if (!_generationsToNames.containsKey(generation)) { - indexCommit.delete(); + _writeLock.lock(); + try { + int size = commits.size(); + for (int i = 0; i < size - 1; i++) { + IndexCommit indexCommit = commits.get(i); + long generation = indexCommit.getGeneration(); + if (!_generationsToNames.containsKey(generation)) { + indexCommit.delete(); + } } + } finally { + _writeLock.unlock(); } } @@ -147,36 +155,46 @@ public class SnapshotIndexDeletionPolicy extends IndexDeletionPolicy { } public void createSnapshot(String name, DirectoryReader reader, String context) throws IOException { - if (_namesToGenerations.containsKey(name)) { - throw new IOException("Snapshot [" + name + "] already exists."); - } - LOG.info("Creating snapshot [{0}] in [{1}].", name, context); - IndexCommit indexCommit = reader.getIndexCommit(); - long generation = indexCommit.getGeneration(); - _namesToGenerations.put(name, generation); - Set<String> names = _generationsToNames.get(generation); - if (names == null) { - names = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>()); - _generationsToNames.put(generation, names); + _writeLock.lock(); + try { + if (_namesToGenerations.containsKey(name)) { + throw new IOException("Snapshot [" + name + "] already exists."); + } + LOG.info("Creating snapshot [{0}] in [{1}].", name, context); + IndexCommit indexCommit = reader.getIndexCommit(); + long generation = indexCommit.getGeneration(); + _namesToGenerations.put(name, generation); + Set<String> names = _generationsToNames.get(generation); + if (names == null) { + names = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>()); + _generationsToNames.put(generation, names); + } + names.add(name); + storeGenerations(); + } finally { + _writeLock.unlock(); } - names.add(name); - storeGenerations(); } public void removeSnapshot(String name, String context) throws IOException { - Long gen = _namesToGenerations.get(name); - if (gen == null) { - LOG.info("Snapshot [{0}] does not exist in [{1}].", name, context); - return; - } - LOG.info("Removing snapshot [{0}] from [{1}].", name, context); - _namesToGenerations.remove(name); - Set<String> names = _generationsToNames.get(gen); - names.remove(name); - if (names.isEmpty()) { - _generationsToNames.remove(gen); + _writeLock.lock(); + try { + Long gen = _namesToGenerations.get(name); + if (gen == null) { + LOG.info("Snapshot [{0}] does not exist in [{1}].", name, context); + return; + } + LOG.info("Removing snapshot [{0}] from [{1}].", name, context); + _namesToGenerations.remove(name); + Set<String> names = _generationsToNames.get(gen); + names.remove(name); + if (names.isEmpty()) { + _generationsToNames.remove(gen); + } + storeGenerations(); + } finally { + _writeLock.unlock(); } - storeGenerations(); } public Collection<String> getSnapshots() { @@ -194,5 +212,4 @@ public class SnapshotIndexDeletionPolicy extends IndexDeletionPolicy { public static Path getGenerationsPath(Path shardDir) { return new Path(shardDir, "generations"); } - } http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/01416562/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java ---------------------------------------------------------------------- diff --git a/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java b/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java index 477c923..e4d29e0 100644 --- a/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java +++ b/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java @@ -51,6 +51,7 @@ import java.util.concurrent.atomic.AtomicReferenceArray; import org.apache.blur.command.ArgumentOverlay; import org.apache.blur.command.BlurObject; import org.apache.blur.command.BlurObjectSerDe; +import org.apache.blur.command.CommandStatusUtil; import org.apache.blur.command.CommandUtil; import org.apache.blur.command.ControllerCommandManager; import org.apache.blur.command.Response; @@ -92,7 +93,6 @@ import org.apache.blur.thrift.generated.BlurResults; import org.apache.blur.thrift.generated.ColumnDefinition; import org.apache.blur.thrift.generated.CommandDescriptor; import org.apache.blur.thrift.generated.CommandStatus; -import org.apache.blur.thrift.generated.CommandStatusState; import org.apache.blur.thrift.generated.ErrorType; import org.apache.blur.thrift.generated.FetchResult; import org.apache.blur.thrift.generated.HighlightOptions; @@ -108,6 +108,7 @@ import org.apache.blur.thrift.generated.User; import org.apache.blur.trace.Trace; import org.apache.blur.trace.Trace.TraceId; import org.apache.blur.trace.Tracer; +import org.apache.blur.user.UserContext; import org.apache.blur.utils.BlurExecutorCompletionService; import org.apache.blur.utils.BlurIterator; import org.apache.blur.utils.BlurUtil; @@ -1514,7 +1515,8 @@ public class BlurControllerServer extends TableAdmin implements Iface { throws BlurException, TException { try { BlurObject args = CommandUtil.toBlurObject(arguments); - CommandStatus originalCommandStatusObject = new CommandStatus(null, commandName, arguments, null, null); + CommandStatus originalCommandStatusObject = new CommandStatus(null, commandName, arguments, null, + UserConverter.toThriftUser(UserContext.getUser())); Response response = _commandManager.execute(getTableContextFactory(), getLayoutFactory(), commandName, new ArgumentOverlay(args, _serDe), originalCommandStatusObject); return CommandUtil.fromObjectToThrift(response, _serDe); @@ -1861,7 +1863,7 @@ public class BlurControllerServer extends TableAdmin implements Iface { } })); } - return new ArrayList<String>(result).subList(startingAt, Math.min(fetch, result.size())); + return new ArrayList<String>(result).subList(startingAt, startingAt + Math.min(fetch, result.size())); } catch (Exception e) { throw new BException(e.getMessage(), e); } @@ -1876,7 +1878,15 @@ public class BlurControllerServer extends TableAdmin implements Iface { CommandStatus cs = scatterGather(cluster, new BlurCommand<CommandStatus>() { @Override public CommandStatus call(Client client) throws BlurException, TException { - return client.commandStatus(commandExecutionId); + try { + return client.commandStatus(commandExecutionId); + } catch (BlurException e) { + String message = e.getMessage(); + if (message.startsWith("NOT_FOUND")) { + return null; + } + throw e; + } } }, new Merger<CommandStatus>() { @Override @@ -1884,12 +1894,16 @@ public class BlurControllerServer extends TableAdmin implements Iface { CommandStatus commandStatus = null; while (service.getRemainingCount() > 0) { Future<CommandStatus> future = service.poll(_defaultParallelCallTimeout, TimeUnit.MILLISECONDS, true); - commandStatus = mergeCommandStatus(commandStatus, service.getResultThrowException(future)); + commandStatus = CommandStatusUtil.mergeCommandStatus(commandStatus, + service.getResultThrowException(future)); } return commandStatus; } }); - commandStatus = mergeCommandStatus(commandStatus, cs); + commandStatus = CommandStatusUtil.mergeCommandStatus(commandStatus, cs); + } + if (commandStatus == null) { + throw new BException("NOT_FOUND {0}", commandExecutionId); } return commandStatus; } catch (Exception e) { @@ -1897,60 +1911,6 @@ public class BlurControllerServer extends TableAdmin implements Iface { } } - private static CommandStatus mergeCommandStatus(CommandStatus cs1, CommandStatus cs2) { - if (cs1 == null && cs2 == null) { - return null; - } else if (cs1 == null) { - return cs2; - } else if (cs2 == null) { - return cs1; - } else { - Map<String, Map<CommandStatusState, Long>> serverStateMap1 = cs1.getServerStateMap(); - Map<String, Map<CommandStatusState, Long>> serverStateMap2 = cs2.getServerStateMap(); - Map<String, Map<CommandStatusState, Long>> merge = mergeServerStateMap(serverStateMap1, serverStateMap2); - return new CommandStatus(cs1.getExecutionId(), cs1.getCommandName(), cs1.getArguments(), merge, cs1.getUser()); - } - } - - private static Map<String, Map<CommandStatusState, Long>> mergeServerStateMap( - Map<String, Map<CommandStatusState, Long>> serverStateMap1, - Map<String, Map<CommandStatusState, Long>> serverStateMap2) { - Map<String, Map<CommandStatusState, Long>> result = new HashMap<String, Map<CommandStatusState, Long>>(); - Set<String> keys = new HashSet<String>(); - keys.addAll(serverStateMap1.keySet()); - keys.addAll(serverStateMap2.keySet()); - for (String key : keys) { - Map<CommandStatusState, Long> css1 = serverStateMap2.get(key); - Map<CommandStatusState, Long> css2 = serverStateMap2.get(key); - result.put(key, mergeCommandStatusState(css1, css2)); - } - return result; - } - - private static Map<CommandStatusState, Long> mergeCommandStatusState(Map<CommandStatusState, Long> css1, - Map<CommandStatusState, Long> css2) { - if (css1 == null && css2 == null) { - return new HashMap<CommandStatusState, Long>(); - } else if (css1 == null) { - return css2; - } else if (css2 == null) { - return css1; - } else { - Map<CommandStatusState, Long> result = new HashMap<CommandStatusState, Long>(css1); - for (Entry<CommandStatusState, Long> e : css2.entrySet()) { - CommandStatusState key = e.getKey(); - Long l = result.get(key); - Long value = e.getValue(); - if (l == null) { - result.put(key, value); - } else { - result.put(key, l + value); - } - } - return result; - } - } - @Override public void commandCancel(String commandExecutionId) throws BlurException, TException { try { http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/01416562/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java ---------------------------------------------------------------------- diff --git a/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java b/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java index 08b4400..ff03210 100644 --- a/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java +++ b/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java @@ -684,7 +684,11 @@ public class BlurShardServer extends TableAdmin implements Iface { @Override public CommandStatus commandStatus(String commandExecutionId) throws BlurException, TException { try { - return _commandManager.getCommandStatus(commandExecutionId); + CommandStatus commandStatus = _commandManager.getCommandStatus(commandExecutionId); + if (commandStatus == null) { + throw new BException("NOT_FOUND {0}", commandExecutionId); + } + return commandStatus; } catch (Exception e) { throw new BException(e.getMessage(), e); } http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/01416562/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java ---------------------------------------------------------------------- diff --git a/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java b/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java index 0b4e290..46bd8b0 100644 --- a/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java +++ b/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java @@ -316,7 +316,7 @@ public class ThriftBlurShardServer extends ThriftServer { StreamServer streamServer; int streamThreadCount = configuration.getInt(BLUR_STREAM_SERVER_THREADS, 100); if (streamThreadCount > 0) { - StreamProcessor streamProcessor = new StreamProcessor(indexServer, tmpPath); + StreamProcessor streamProcessor = new StreamProcessor(indexServer, tmpPath, config); streamServer = new StreamServer(0, streamThreadCount, streamProcessor); streamServer.start(); configuration.setInt(BLUR_STREAM_SERVER_RUNNING_PORT, streamServer.getPort()); http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/01416562/blur-core/src/main/java/org/apache/blur/utils/GCWatcher.java ---------------------------------------------------------------------- diff --git a/blur-core/src/main/java/org/apache/blur/utils/GCWatcher.java b/blur-core/src/main/java/org/apache/blur/utils/GCWatcher.java index c9c3774..3242931 100644 --- a/blur-core/src/main/java/org/apache/blur/utils/GCWatcher.java +++ b/blur-core/src/main/java/org/apache/blur/utils/GCWatcher.java @@ -16,24 +16,50 @@ */ package org.apache.blur.utils; +import java.lang.management.ManagementFactory; +import java.lang.management.MemoryUsage; +import java.util.ArrayList; +import java.util.List; import java.util.Properties; public class GCWatcher { private static final String JAVA_VERSION = "java.version"; private static final String _1_7 = "1.7"; + private static final String _1_8 = "1.8"; private static final boolean JDK7; static { Properties properties = System.getProperties(); String javaVersion = properties.getProperty(JAVA_VERSION); - if (javaVersion.startsWith(_1_7)) { + if (javaVersion.startsWith(_1_7) || javaVersion.startsWith(_1_8)) { JDK7 = true; } else { JDK7 = false; } } + public static void main(String[] args) { + GCWatcher.init(0.50); + + GCWatcher.registerAction(new GCAction() { + @Override + public void takeAction() throws Exception { + System.out.println("OOM"); + System.exit(0); + } + }); + + List<byte[]> lst = new ArrayList<byte[]>(); + + while (true) { + lst.add(new byte[1_000_000]); + MemoryUsage heapMemoryUsage = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage(); + System.out.println(heapMemoryUsage.getMax() + " " + heapMemoryUsage.getUsed()); + } + + } + /** * Initializes the GCWatcher to watch for any garbage collection that leaves * more then the given ratio free. If more remains then all the given http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/01416562/blur-core/src/main/java/org/apache/blur/utils/GCWatcherJdk6.java ---------------------------------------------------------------------- diff --git a/blur-core/src/main/java/org/apache/blur/utils/GCWatcherJdk6.java b/blur-core/src/main/java/org/apache/blur/utils/GCWatcherJdk6.java index 03bf6bb..2eb9f56 100644 --- a/blur-core/src/main/java/org/apache/blur/utils/GCWatcherJdk6.java +++ b/blur-core/src/main/java/org/apache/blur/utils/GCWatcherJdk6.java @@ -151,7 +151,7 @@ public class GCWatcherJdk6 extends TimerTask { } _lastIndex = _gcInfo.getIndex(); } - } catch (Exception e) { + } catch (Throwable e) { e.printStackTrace(); } } http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/01416562/blur-core/src/test/java/org/apache/blur/manager/writer/IndexImporterTest.java ---------------------------------------------------------------------- diff --git a/blur-core/src/test/java/org/apache/blur/manager/writer/IndexImporterTest.java b/blur-core/src/test/java/org/apache/blur/manager/writer/IndexImporterTest.java index ae635c3..f46b184 100644 --- a/blur-core/src/test/java/org/apache/blur/manager/writer/IndexImporterTest.java +++ b/blur-core/src/test/java/org/apache/blur/manager/writer/IndexImporterTest.java @@ -41,12 +41,15 @@ import org.apache.blur.thrift.generated.Column; import org.apache.blur.thrift.generated.Record; import org.apache.blur.thrift.generated.RowMutation; import org.apache.blur.thrift.generated.TableDescriptor; +import org.apache.blur.utils.BlurConstants; import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.lucene.analysis.Analyzer; import org.apache.lucene.document.Field; +import org.apache.lucene.document.StringField; +import org.apache.lucene.document.Field.Store; import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriterConfig; @@ -292,6 +295,20 @@ public class IndexImporterTest { } @Test + public void testIndexImporterWithCorrectRowIdShardCombinationWithFastImport() throws IOException { + List<Field> document = _fieldManager.getFields("1", genRecord("1")); + document.add(new StringField(BlurConstants.NEW_ROW, BlurConstants.PRIME_DOC_VALUE, Store.NO)); + _commitWriter.addDocument(document); + _commitWriter.commit(); + _commitWriter.close(); + _indexImporter.run(); + assertFalse(_fileSystem.exists(_path)); + assertFalse(_fileSystem.exists(_badRowIdsPath)); + assertTrue(_fileSystem.exists(_inUsePath)); + validateIndex(); + } + + @Test public void testIndexImporterWithWrongRowIdShardCombination() throws IOException { List<Field> document = _fieldManager.getFields("2", genRecord("1")); _commitWriter.addDocument(document); http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/01416562/blur-document-security/src/main/java/org/apache/blur/lucene/security/index/FilterAccessControlFactory.java ---------------------------------------------------------------------- diff --git a/blur-document-security/src/main/java/org/apache/blur/lucene/security/index/FilterAccessControlFactory.java b/blur-document-security/src/main/java/org/apache/blur/lucene/security/index/FilterAccessControlFactory.java index 059ad05..ea7e5ad 100644 --- a/blur-document-security/src/main/java/org/apache/blur/lucene/security/index/FilterAccessControlFactory.java +++ b/blur-document-security/src/main/java/org/apache/blur/lucene/security/index/FilterAccessControlFactory.java @@ -338,6 +338,7 @@ public class FilterAccessControlFactory extends AccessControlFactory { } List<IndexableField> result = new ArrayList<IndexableField>(); for (IndexableField field : fields) { + // If field is to be indexed and is to be read masked. if (fieldsToMask.contains(field.name())) { // If field is a doc value, then don't bother indexing. if (!isDocValue(field)) { http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/01416562/blur-document-security/src/main/java/org/apache/blur/lucene/security/search/BitSetDocumentVisibilityFilterCacheStrategy.java ---------------------------------------------------------------------- diff --git a/blur-document-security/src/main/java/org/apache/blur/lucene/security/search/BitSetDocumentVisibilityFilterCacheStrategy.java b/blur-document-security/src/main/java/org/apache/blur/lucene/security/search/BitSetDocumentVisibilityFilterCacheStrategy.java index 8a142cf..37e17a6 100644 --- a/blur-document-security/src/main/java/org/apache/blur/lucene/security/search/BitSetDocumentVisibilityFilterCacheStrategy.java +++ b/blur-document-security/src/main/java/org/apache/blur/lucene/security/search/BitSetDocumentVisibilityFilterCacheStrategy.java @@ -31,6 +31,7 @@ import org.apache.lucene.index.IndexReader.ReaderClosedListener; import org.apache.lucene.index.SegmentReader; import org.apache.lucene.search.DocIdSet; import org.apache.lucene.search.DocIdSetIterator; +import org.apache.lucene.util.Bits; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.OpenBitSet; @@ -62,12 +63,16 @@ public class BitSetDocumentVisibilityFilterCacheStrategy extends DocumentVisibil @Override public Builder createBuilder(String fieldName, BytesRef term, final AtomicReader reader) { - final OpenBitSet bitSet = new OpenBitSet(reader.maxDoc()); + int maxDoc = reader.maxDoc(); final Key key = new Key(fieldName, term, reader.getCoreCacheKey()); LOG.debug("Creating new bitset for key [" + key + "] on index [" + reader + "]"); return new Builder() { + + private OpenBitSet bitSet = new OpenBitSet(maxDoc); + @Override public void or(DocIdSetIterator it) throws IOException { + LOG.debug("Building bitset for key [" + key + "]"); int doc; while ((doc = it.nextDoc()) != DocsEnum.NO_MORE_DOCS) { bitSet.set(doc); @@ -76,7 +81,6 @@ public class BitSetDocumentVisibilityFilterCacheStrategy extends DocumentVisibil @Override public DocIdSet getDocIdSet() throws IOException { - LOG.debug("Building bitset for key [" + key + "]"); SegmentReader segmentReader = getSegmentReader(reader); segmentReader.addReaderClosedListener(new ReaderClosedListener() { @Override @@ -88,12 +92,172 @@ public class BitSetDocumentVisibilityFilterCacheStrategy extends DocumentVisibil } } }); - _cache.put(key, bitSet); - return bitSet; + long cardinality = bitSet.cardinality(); + DocIdSet cacheDocIdSet; + if (isFullySet(maxDoc, bitSet, cardinality)) { + cacheDocIdSet = getFullySetDocIdSet(maxDoc); + } else if (isFullyEmpty(bitSet, cardinality)) { + cacheDocIdSet = getFullyEmptyDocIdSet(maxDoc); + } else { + cacheDocIdSet = bitSet; + } + _cache.put(key, cacheDocIdSet); + return cacheDocIdSet; + } + }; + } + + public static DocIdSet getFullyEmptyDocIdSet(int maxDoc) { + Bits bits = getFullyEmptyBits(maxDoc); + return new DocIdSet() { + @Override + public DocIdSetIterator iterator() throws IOException { + return getFullyEmptyDocIdSetIterator(maxDoc); + } + + @Override + public Bits bits() throws IOException { + return bits; + } + + @Override + public boolean isCacheable() { + return true; + } + }; + } + + public static DocIdSetIterator getFullyEmptyDocIdSetIterator(int maxDoc) { + return new DocIdSetIterator() { + + private int _docId = -1; + + @Override + public int docID() { + return _docId; + } + + @Override + public int nextDoc() throws IOException { + return _docId = DocIdSetIterator.NO_MORE_DOCS; + } + + @Override + public int advance(int target) throws IOException { + return _docId = DocIdSetIterator.NO_MORE_DOCS; + } + + @Override + public long cost() { + return 0; + } + }; + } + + public static Bits getFullyEmptyBits(int maxDoc) { + return new Bits() { + @Override + public boolean get(int index) { + return false; + } + + @Override + public int length() { + return maxDoc; + } + }; + } + + public static DocIdSet getFullySetDocIdSet(int maxDoc) { + Bits bits = getFullySetBits(maxDoc); + return new DocIdSet() { + @Override + public DocIdSetIterator iterator() throws IOException { + return getFullySetDocIdSetIterator(maxDoc); + } + + @Override + public Bits bits() throws IOException { + return bits; + } + + @Override + public boolean isCacheable() { + return true; + } + }; + } + + public static DocIdSetIterator getFullySetDocIdSetIterator(int maxDoc) { + return new DocIdSetIterator() { + + private int _docId = -1; + + @Override + public int advance(int target) throws IOException { + if (_docId == DocIdSetIterator.NO_MORE_DOCS) { + return DocIdSetIterator.NO_MORE_DOCS; + } + _docId = target; + if (_docId >= maxDoc) { + return _docId = DocIdSetIterator.NO_MORE_DOCS; + } + return _docId; + } + + @Override + public int nextDoc() throws IOException { + if (_docId == DocIdSetIterator.NO_MORE_DOCS) { + return DocIdSetIterator.NO_MORE_DOCS; + } + _docId++; + if (_docId >= maxDoc) { + return _docId = DocIdSetIterator.NO_MORE_DOCS; + } + return _docId; + } + + @Override + public int docID() { + return _docId; + } + + @Override + public long cost() { + return 0l; } + }; } + public static Bits getFullySetBits(int maxDoc) { + return new Bits() { + @Override + public boolean get(int index) { + return true; + } + + @Override + public int length() { + return maxDoc; + } + }; + } + + public static boolean isFullyEmpty(OpenBitSet bitSet, long cardinality) { + if (cardinality == 0) { + return true; + } + return false; + } + + public static boolean isFullySet(int maxDoc, OpenBitSet bitSet, long cardinality) { + if (cardinality >= maxDoc) { + return true; + } + return false; + } + public static SegmentReader getSegmentReader(IndexReader indexReader) throws IOException { if (indexReader instanceof SegmentReader) { return (SegmentReader) indexReader;