Interesting, do you have a specific use-case for this you can share? Thanks, --tim
On Thu, Dec 12, 2013 at 10:25 AM, <[email protected]> wrote: > Updated Branches: > refs/heads/apache-blur-0.2 a4169a999 -> 7ff903332 > > > Adding a way to change the BlurIndex implementation for a cluster or a table. > > > Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo > Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/3dc5b842 > Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/3dc5b842 > Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/3dc5b842 > > Branch: refs/heads/apache-blur-0.2 > Commit: 3dc5b842ba4a3b836adf035206c7b157cf73e0e9 > Parents: a4169a9 > Author: Aaron McCurry <[email protected]> > Authored: Thu Dec 12 10:24:55 2013 -0500 > Committer: Aaron McCurry <[email protected]> > Committed: Thu Dec 12 10:24:55 2013 -0500 > > ---------------------------------------------------------------------- > .../indexserver/DistributedIndexServer.java | 23 +- > .../manager/indexserver/LocalIndexServer.java | 2 +- > .../apache/blur/manager/writer/BlurIndex.java | 63 +++-- > .../blur/manager/writer/BlurIndexNRTSimple.java | 237 ++++++++++++++++++ > .../blur/manager/writer/BlurIndexReadOnly.java | 3 +- > .../blur/manager/writer/BlurIndexReader.java | 12 +- > .../blur/manager/writer/BlurNRTIndex.java | 6 +- > .../org/apache/blur/server/TableContext.java | 49 ++++ > .../manager/writer/BlurIndexNRTSimpleTest.java | 242 +++++++++++++++++++ > .../manager/writer/BlurIndexReaderTest.java | 2 +- > .../blur/manager/writer/BlurNRTIndexTest.java | 33 ++- > .../src/main/resources/blur-default.properties | 3 + > 12 files changed, 610 insertions(+), 65 deletions(-) > ---------------------------------------------------------------------- > > > http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/3dc5b842/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java > ---------------------------------------------------------------------- > diff --git > a/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java > > b/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java > index f4a69ac..de45d29 100644 > --- > a/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java > +++ > b/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java > @@ -48,7 +48,6 @@ import org.apache.blur.manager.writer.BlurIndex; > import org.apache.blur.manager.writer.BlurIndexCloser; > import org.apache.blur.manager.writer.BlurIndexReadOnly; > import org.apache.blur.manager.writer.BlurIndexRefresher; > -import org.apache.blur.manager.writer.BlurNRTIndex; > import org.apache.blur.manager.writer.SharedMergeScheduler; > import org.apache.blur.server.IndexSearcherClosable; > import org.apache.blur.server.ShardContext; > @@ -193,7 +192,7 @@ public class DistributedIndexServer extends > AbstractDistributedIndexServer { > > @Override > public DistributedLayout createDistributedLayout(String table, > List<String> shardList, > - List<String> shardServerList, List<String> offlineShardServers, > boolean readOnly) { > + List<String> shardServerList, List<String> offlineShardServers) { > DistributedLayoutManager layoutManager = new > DistributedLayoutManager(); > layoutManager.setNodes(shardServerList); > layoutManager.setNodesOffline(offlineShardServers); > @@ -201,6 +200,11 @@ public class DistributedIndexServer extends > AbstractDistributedIndexServer { > layoutManager.init(); > return layoutManager; > } > + > + @Override > + public DistributedLayout readCurrentLayout(String table) { > + throw new RuntimeException("Not implemented"); > + } > }; > } > > @@ -474,18 +478,11 @@ public class DistributedIndexServer extends > AbstractDistributedIndexServer { > dir = directory; > } > > - BlurIndex index; > - > - BlurNRTIndex writer = new BlurNRTIndex(shardContext, _mergeScheduler, > dir, _gc, _searchExecutor); > - > - // BlurIndexNRTSimple writer = new BlurIndexNRTSimple(shardContext, > - // _mergeScheduler, dir, _gc, _searchExecutor, > - // _indexCloser, _refresher); > + BlurIndex index = tableContext.newInstanceBlurIndex(shardContext, dir, > _mergeScheduler, _gc, _searchExecutor, > + _indexCloser, _refresher); > > if (_clusterStatus.isReadOnly(true, _cluster, table)) { > - index = new BlurIndexReadOnly(writer); > - } else { > - index = writer; > + index = new BlurIndexReadOnly(index); > } > _filterCache.opening(table, shard, index); > TableDescriptor tableDescriptor = > _clusterStatus.getTableDescriptor(true, _cluster, table); > @@ -598,7 +595,7 @@ public class DistributedIndexServer extends > AbstractDistributedIndexServer { > } > > DistributedLayout layoutManager = > _distributedLayoutFactory.createDistributedLayout(table, shardList, > - shardServerList, offlineShardServers, false); > + shardServerList, offlineShardServers); > > Map<String, String> layout = layoutManager.getLayout(); > String nodeName = getNodeName(); > > http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/3dc5b842/blur-core/src/main/java/org/apache/blur/manager/indexserver/LocalIndexServer.java > ---------------------------------------------------------------------- > diff --git > a/blur-core/src/main/java/org/apache/blur/manager/indexserver/LocalIndexServer.java > > b/blur-core/src/main/java/org/apache/blur/manager/indexserver/LocalIndexServer.java > index 51a7468..27960cd 100644 > --- > a/blur-core/src/main/java/org/apache/blur/manager/indexserver/LocalIndexServer.java > +++ > b/blur-core/src/main/java/org/apache/blur/manager/indexserver/LocalIndexServer.java > @@ -156,7 +156,7 @@ public class LocalIndexServer extends AbstractIndexServer > { > > private BlurIndex openIndex(String table, String shard, Directory dir) > throws CorruptIndexException, IOException { > ShardContext shardContext = ShardContext.create(_tableContext, shard); > - BlurNRTIndex index = new BlurNRTIndex(shardContext, _mergeScheduler, > dir, _gc, _searchExecutor); > + BlurNRTIndex index = new BlurNRTIndex(shardContext, dir, > _mergeScheduler, _gc, _searchExecutor, null, null); > return index; > } > > > http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/3dc5b842/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndex.java > ---------------------------------------------------------------------- > diff --git > a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndex.java > b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndex.java > index 8100a58..49fe965 100644 > --- a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndex.java > +++ b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndex.java > @@ -18,10 +18,13 @@ package org.apache.blur.manager.writer; > */ > import java.io.IOException; > import java.util.List; > +import java.util.concurrent.ExecutorService; > import java.util.concurrent.TimeUnit; > import java.util.concurrent.atomic.AtomicBoolean; > > +import org.apache.blur.lucene.store.refcounter.DirectoryReferenceFileGC; > import org.apache.blur.server.IndexSearcherClosable; > +import org.apache.blur.server.ShardContext; > import org.apache.blur.thrift.generated.Row; > import org.apache.blur.utils.BlurUtil; > import org.apache.lucene.index.IndexReader; > @@ -29,6 +32,7 @@ import org.apache.lucene.index.IndexReaderContext; > import org.apache.lucene.search.IndexSearcher; > import org.apache.lucene.search.TermQuery; > import org.apache.lucene.search.TopDocs; > +import org.apache.lucene.store.Directory; > > public abstract class BlurIndex { > > @@ -37,6 +41,12 @@ public abstract class BlurIndex { > private long _lastMemoryCheck = 0; > private long _memoryUsage = 0; > > + public BlurIndex(ShardContext shardContext, Directory directory, > SharedMergeScheduler mergeScheduler, > + DirectoryReferenceFileGC gc, ExecutorService searchExecutor, > BlurIndexCloser indexCloser, > + BlurIndexRefresher refresher) throws IOException { > + > + } > + > public abstract void replaceRow(boolean waitToBeVisible, boolean wal, Row > row) throws IOException; > > public abstract void deleteRow(boolean waitToBeVisible, boolean wal, > String rowId) throws IOException; > @@ -50,11 +60,11 @@ public abstract class BlurIndex { > public abstract AtomicBoolean isClosed(); > > public abstract void optimize(int numberOfSegmentsPerShard) throws > IOException; > - > + > public abstract void createSnapshot(String name) throws IOException; > - > + > public abstract void removeSnapshot(String name) throws IOException; > - > + > public abstract List<String> getSnapshots() throws IOException; > > public long getRecordCount() throws IOException { > @@ -86,29 +96,30 @@ public abstract class BlurIndex { > > public long getIndexMemoryUsage() throws IOException { > return 0; > -// long now = System.currentTimeMillis(); > -// if (_lastMemoryCheck + ONE_MINUTE > now) { > -// return _memoryUsage; > -// } > -// IndexSearcherClosable searcher = getIndexReader(); > -// try { > -// IndexReaderContext topReaderContext = searcher.getTopReaderContext(); > -// return _memoryUsage = RamUsageEstimator.sizeOf(topReaderContext, new > ClassNameFilter() { > -// @Override > -// public boolean include(String className) { > -// if > (className.startsWith("org.apache.blur.index.ExitableReader")) { > -// return true; > -// } else if (className.startsWith("org.apache.blur.")) { > -// // System.out.println("className [" + className + "]"); > -// return false; > -// } > -// return true; > -// } > -// }); > -// } finally { > -// searcher.close(); > -// _lastMemoryCheck = System.currentTimeMillis(); > -// } > + // long now = System.currentTimeMillis(); > + // if (_lastMemoryCheck + ONE_MINUTE > now) { > + // return _memoryUsage; > + // } > + // IndexSearcherClosable searcher = getIndexReader(); > + // try { > + // IndexReaderContext topReaderContext = searcher.getTopReaderContext(); > + // return _memoryUsage = RamUsageEstimator.sizeOf(topReaderContext, new > + // ClassNameFilter() { > + // @Override > + // public boolean include(String className) { > + // if (className.startsWith("org.apache.blur.index.ExitableReader")) { > + // return true; > + // } else if (className.startsWith("org.apache.blur.")) { > + // // System.out.println("className [" + className + "]"); > + // return false; > + // } > + // return true; > + // } > + // }); > + // } finally { > + // searcher.close(); > + // _lastMemoryCheck = System.currentTimeMillis(); > + // } > } > > public long getSegmentCount() throws IOException { > > http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/3dc5b842/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexNRTSimple.java > ---------------------------------------------------------------------- > diff --git > a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexNRTSimple.java > > b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexNRTSimple.java > new file mode 100644 > index 0000000..f35560a > --- /dev/null > +++ > b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexNRTSimple.java > @@ -0,0 +1,237 @@ > +/** > + * Licensed to the Apache Software Foundation (ASF) under one or more > + * contributor license agreements. See the NOTICE file distributed with > + * this work for additional information regarding copyright ownership. > + * The ASF licenses this file to You under the Apache License, Version 2.0 > + * (the "License"); you may not use this file except in compliance with > + * the License. You may obtain a copy of the License at > + * > + * http://www.apache.org/licenses/LICENSE-2.0 > + * > + * Unless required by applicable law or agreed to in writing, software > + * distributed under the License is distributed on an "AS IS" BASIS, > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. > + * See the License for the specific language governing permissions and > + * limitations under the License. > + */ > +package org.apache.blur.manager.writer; > + > +import static org.apache.blur.lucene.LuceneVersionConstant.LUCENE_VERSION; > + > +import java.io.IOException; > +import java.util.List; > +import java.util.concurrent.ExecutorService; > +import java.util.concurrent.TimeUnit; > +import java.util.concurrent.atomic.AtomicBoolean; > +import java.util.concurrent.atomic.AtomicReference; > + > +import org.apache.blur.analysis.FieldManager; > +import org.apache.blur.index.ExitableReader; > +import org.apache.blur.log.Log; > +import org.apache.blur.log.LogFactory; > +import org.apache.blur.lucene.codec.Blur022Codec; > +import org.apache.blur.lucene.store.refcounter.DirectoryReferenceCounter; > +import org.apache.blur.lucene.store.refcounter.DirectoryReferenceFileGC; > +import org.apache.blur.lucene.warmup.TraceableDirectory; > +import org.apache.blur.server.IndexSearcherClosable; > +import org.apache.blur.server.ShardContext; > +import org.apache.blur.server.TableContext; > +import org.apache.blur.thrift.generated.Row; > +import org.apache.hadoop.io.IOUtils; > +import org.apache.lucene.analysis.Analyzer; > +import org.apache.lucene.document.Field; > +import org.apache.lucene.index.BlurIndexWriter; > +import org.apache.lucene.index.DirectoryReader; > +import org.apache.lucene.index.IndexReader; > +import org.apache.lucene.index.IndexWriterConfig; > +import org.apache.lucene.index.TieredMergePolicy; > +import org.apache.lucene.store.Directory; > + > +public class BlurIndexNRTSimple extends BlurIndex { > + > + private static final Log LOG = LogFactory.getLog(BlurIndexNRTSimple.class); > + > + private final AtomicBoolean _isClosed = new AtomicBoolean(); > + private final BlurIndexCloser _indexCloser; > + private final AtomicReference<DirectoryReader> _indexReader = new > AtomicReference<DirectoryReader>(); > + private final ExecutorService _searchThreadPool; > + private final Directory _directory; > + private final Thread _writerOpener; > + private final IndexWriterConfig _conf; > + private final TableContext _tableContext; > + private final FieldManager _fieldManager; > + private final BlurIndexRefresher _refresher; > + private final ShardContext _shardContext; > + private final AtomicReference<BlurIndexWriter> _writer = new > AtomicReference<BlurIndexWriter>(); > + private final boolean _makeReaderExitable = true; > + > + public BlurIndexNRTSimple(ShardContext shardContext, Directory directory, > SharedMergeScheduler mergeScheduler, > + DirectoryReferenceFileGC gc, final ExecutorService searchExecutor, > BlurIndexCloser indexCloser, > + BlurIndexRefresher refresher) throws IOException { > + super(shardContext, directory, mergeScheduler, gc, searchExecutor, > indexCloser, refresher); > + _searchThreadPool = searchExecutor; > + _shardContext = shardContext; > + _tableContext = _shardContext.getTableContext(); > + _fieldManager = _tableContext.getFieldManager(); > + Analyzer analyzer = _fieldManager.getAnalyzerForIndex(); > + _conf = new IndexWriterConfig(LUCENE_VERSION, analyzer); > + _conf.setWriteLockTimeout(TimeUnit.MINUTES.toMillis(5)); > + _conf.setCodec(new Blur022Codec(_tableContext.getBlurConfiguration())); > + _conf.setSimilarity(_tableContext.getSimilarity()); > + AtomicBoolean stop = new AtomicBoolean(); > + _conf.setMergedSegmentWarmer(new FieldBasedWarmer(shardContext, stop, > _isClosed)); > + TieredMergePolicy mergePolicy = (TieredMergePolicy) > _conf.getMergePolicy(); > + mergePolicy.setUseCompoundFile(false); > + _conf.setMergeScheduler(mergeScheduler.getMergeScheduler()); > + > + if (!DirectoryReader.indexExists(directory)) { > + new BlurIndexWriter(directory, _conf).close(); > + } > + DirectoryReferenceCounter referenceCounter = new > DirectoryReferenceCounter(directory, gc); > + // This directory allows for warm up by adding tracing ability. > + TraceableDirectory dir = new TraceableDirectory(referenceCounter); > + _directory = dir; > + > + // _directory = directory; > + > + _indexCloser = indexCloser; > + _indexReader.set(wrap(DirectoryReader.open(_directory))); > + _refresher = refresher; > + > + _writerOpener = getWriterOpener(shardContext); > + _writerOpener.start(); > + _refresher.register(this); > + } > + > + private DirectoryReader wrap(DirectoryReader reader) { > + if (_makeReaderExitable) { > + reader = new ExitableReader(reader); > + } > + return reader; > + } > + > + private Thread getWriterOpener(ShardContext shardContext) { > + Thread thread = new Thread(new Runnable() { > + @Override > + public void run() { > + try { > + _writer.set(new BlurIndexWriter(_directory, _conf.clone())); > + synchronized (_writer) { > + _writer.notify(); > + } > + } catch (IOException e) { > + LOG.error("Unknown error on index writer open.", e); > + } > + } > + }); > + thread.setName("Writer Opener for Table [" + > shardContext.getTableContext().getTable() + "] Shard [" > + + shardContext.getShard() + "]"); > + thread.setDaemon(true); > + return thread; > + } > + > + @Override > + public IndexSearcherClosable getIndexSearcher() throws IOException { > + final IndexReader indexReader = _indexReader.get(); > + while (!indexReader.tryIncRef()) { > + // keep trying to increment the ref > + } > + return new IndexSearcherClosable(indexReader, _searchThreadPool) { > + @Override > + public Directory getDirectory() { > + return _directory; > + } > + > + @Override > + public void close() throws IOException { > + indexReader.decRef(); > + } > + }; > + } > + > + @Override > + public void replaceRow(boolean waitToBeVisible, boolean wal, Row row) > throws IOException { > + waitUntilNotNull(_writer); > + BlurIndexWriter writer = _writer.get(); > + List<List<Field>> docs = TransactionRecorder.getDocs(row, _fieldManager); > + writer.updateDocuments(TransactionRecorder.createRowId(row.getId()), > docs); > + waitToBeVisible(waitToBeVisible); > + } > + > + @Override > + public void deleteRow(boolean waitToBeVisible, boolean wal, String rowId) > throws IOException { > + waitUntilNotNull(_writer); > + BlurIndexWriter writer = _writer.get(); > + writer.deleteDocuments(TransactionRecorder.createRowId(rowId)); > + waitToBeVisible(waitToBeVisible); > + } > + > + private void waitUntilNotNull(AtomicReference<?> ref) { > + while (true) { > + Object object = ref.get(); > + if (object != null) { > + return; > + } > + synchronized (ref) { > + try { > + ref.wait(TimeUnit.SECONDS.toMillis(1)); > + } catch (InterruptedException e) { > + return; > + } > + } > + } > + } > + > + @Override > + public void close() throws IOException { > + _isClosed.set(true); > + IOUtils.cleanup(LOG, _writer.get()); > + IOUtils.cleanup(LOG, _indexReader.get()); > + } > + > + @Override > + public void refresh() throws IOException { > + DirectoryReader currentReader = _indexReader.get(); > + DirectoryReader newReader = DirectoryReader.openIfChanged(currentReader); > + if (newReader != null) { > + LOG.info("Refreshing index for table [{0}] shard [{1}].", > _tableContext.getTable(), _shardContext.getShard()); > + _indexReader.set(wrap(newReader)); > + _indexCloser.close(currentReader); > + } > + } > + > + @Override > + public AtomicBoolean isClosed() { > + return _isClosed; > + } > + > + @Override > + public void optimize(int numberOfSegmentsPerShard) throws IOException { > + throw new RuntimeException("not impl"); > + } > + > + @Override > + public void createSnapshot(String name) throws IOException { > + throw new RuntimeException("not impl"); > + } > + > + @Override > + public void removeSnapshot(String name) throws IOException { > + throw new RuntimeException("not impl"); > + } > + > + @Override > + public List<String> getSnapshots() throws IOException { > + throw new RuntimeException("not impl"); > + } > + > + private void waitToBeVisible(boolean waitToBeVisible) throws IOException { > + if (waitToBeVisible) { > + waitUntilNotNull(_writer); > + BlurIndexWriter writer = _writer.get(); > + writer.commit(); > + refresh(); > + } > + } > + > +} > > http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/3dc5b842/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReadOnly.java > ---------------------------------------------------------------------- > diff --git > a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReadOnly.java > > b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReadOnly.java > index ddd82e1..e8a3c32 100644 > --- > a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReadOnly.java > +++ > b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReadOnly.java > @@ -27,7 +27,8 @@ public class BlurIndexReadOnly extends BlurIndex { > > private final BlurIndex _blurIndex; > > - public BlurIndexReadOnly(BlurIndex blurIndex) { > + public BlurIndexReadOnly(BlurIndex blurIndex) throws IOException { > + super(null, null, null, null, null, null, null); > _blurIndex = blurIndex; > } > > > http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/3dc5b842/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReader.java > ---------------------------------------------------------------------- > diff --git > a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReader.java > b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReader.java > index 13effa3..db5301a 100644 > --- > a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReader.java > +++ > b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReader.java > @@ -20,6 +20,7 @@ import static > org.apache.blur.lucene.LuceneVersionConstant.LUCENE_VERSION; > > import java.io.IOException; > import java.util.List; > +import java.util.concurrent.ExecutorService; > import java.util.concurrent.TimeUnit; > import java.util.concurrent.atomic.AtomicBoolean; > import java.util.concurrent.atomic.AtomicReference; > @@ -27,6 +28,7 @@ import java.util.concurrent.atomic.AtomicReference; > import org.apache.blur.log.Log; > import org.apache.blur.log.LogFactory; > import org.apache.blur.lucene.codec.Blur022Codec; > +import org.apache.blur.lucene.store.refcounter.DirectoryReferenceFileGC; > import org.apache.blur.lucene.warmup.TraceableDirectory; > import org.apache.blur.server.IndexSearcherClosable; > import org.apache.blur.server.ShardContext; > @@ -50,15 +52,17 @@ public class BlurIndexReader extends BlurIndex { > private BlurIndexRefresher _refresher; > private final TableContext _tableContext; > private final ShardContext _shardContext; > - > - public BlurIndexReader(ShardContext shardContext, Directory directory, > BlurIndexRefresher refresher, > - BlurIndexCloser closer) throws IOException { > + > + public BlurIndexReader(ShardContext shardContext, Directory directory, > SharedMergeScheduler mergeScheduler, > + DirectoryReferenceFileGC gc, final ExecutorService searchExecutor, > BlurIndexCloser indexCloser, > + BlurIndexRefresher refresher) throws IOException { > + super(shardContext, directory, mergeScheduler, gc, searchExecutor, > indexCloser, refresher); > _tableContext = shardContext.getTableContext(); > // This directory allows for warm up by adding tracing ability. > _directory = new TraceableDirectory(directory); > _shardContext = shardContext; > _refresher = refresher; > - _closer = closer; > + _closer = indexCloser; > > _open.set(true); > > > http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/3dc5b842/blur-core/src/main/java/org/apache/blur/manager/writer/BlurNRTIndex.java > ---------------------------------------------------------------------- > diff --git > a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurNRTIndex.java > b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurNRTIndex.java > index a74678b..2b9d38e 100644 > --- a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurNRTIndex.java > +++ b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurNRTIndex.java > @@ -95,8 +95,10 @@ public class BlurNRTIndex extends BlurIndex { > private final ReadWriteLock _lock = new ReentrantReadWriteLock(); > private long _lastRefresh = 0; > > - public BlurNRTIndex(ShardContext shardContext, SharedMergeScheduler > mergeScheduler, Directory directory, > - DirectoryReferenceFileGC gc, final ExecutorService searchExecutor) > throws IOException { > + public BlurNRTIndex(ShardContext shardContext, Directory directory, > SharedMergeScheduler mergeScheduler, > + DirectoryReferenceFileGC gc, final ExecutorService searchExecutor, > BlurIndexCloser indexCloser, > + BlurIndexRefresher refresher) throws IOException { > + super(shardContext, directory, mergeScheduler, gc, searchExecutor, > indexCloser, refresher); > _tableContext = shardContext.getTableContext(); > _directory = directory; > _shardContext = shardContext; > > http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/3dc5b842/blur-core/src/main/java/org/apache/blur/server/TableContext.java > ---------------------------------------------------------------------- > diff --git a/blur-core/src/main/java/org/apache/blur/server/TableContext.java > b/blur-core/src/main/java/org/apache/blur/server/TableContext.java > index 3256404..0102a70 100644 > --- a/blur-core/src/main/java/org/apache/blur/server/TableContext.java > +++ b/blur-core/src/main/java/org/apache/blur/server/TableContext.java > @@ -17,6 +17,7 @@ package org.apache.blur.server; > * limitations under the License. > */ > import static org.apache.blur.utils.BlurConstants.BLUR_FIELDTYPE; > +import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_BLURINDEX_CLASS; > import static > org.apache.blur.utils.BlurConstants.BLUR_SHARD_INDEX_DELETION_POLICY_MAXAGE; > import static > org.apache.blur.utils.BlurConstants.BLUR_SHARD_INDEX_SIMILARITY; > import static > org.apache.blur.utils.BlurConstants.BLUR_SHARD_TIME_BETWEEN_COMMITS; > @@ -24,11 +25,14 @@ import static > org.apache.blur.utils.BlurConstants.BLUR_SHARD_TIME_BETWEEN_REFRES > import static org.apache.blur.utils.BlurConstants.SUPER; > > import java.io.IOException; > +import java.lang.reflect.Constructor; > +import java.lang.reflect.InvocationTargetException; > import java.util.HashMap; > import java.util.Map; > import java.util.Map.Entry; > import java.util.Set; > import java.util.concurrent.ConcurrentHashMap; > +import java.util.concurrent.ExecutorService; > import java.util.concurrent.TimeUnit; > > import org.apache.blur.BlurConfiguration; > @@ -39,6 +43,12 @@ import org.apache.blur.analysis.NoStopWordStandardAnalyzer; > import org.apache.blur.log.Log; > import org.apache.blur.log.LogFactory; > import org.apache.blur.lucene.search.FairSimilarity; > +import org.apache.blur.lucene.store.refcounter.DirectoryReferenceFileGC; > +import org.apache.blur.manager.writer.BlurIndex; > +import org.apache.blur.manager.writer.BlurIndexCloser; > +import org.apache.blur.manager.writer.BlurIndexRefresher; > +import org.apache.blur.manager.writer.BlurNRTIndex; > +import org.apache.blur.manager.writer.SharedMergeScheduler; > import org.apache.blur.thrift.generated.ScoreType; > import org.apache.blur.thrift.generated.TableDescriptor; > import org.apache.blur.utils.BlurConstants; > @@ -49,6 +59,7 @@ import org.apache.lucene.index.IndexDeletionPolicy; > import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy; > import org.apache.lucene.index.Term; > import org.apache.lucene.search.similarities.Similarity; > +import org.apache.lucene.store.Directory; > > public class TableContext { > > @@ -279,4 +290,42 @@ public class TableContext { > public static void setSystemBlurConfiguration(BlurConfiguration > systemBlurConfiguration) { > TableContext.systemBlurConfiguration = systemBlurConfiguration; > } > + > + @SuppressWarnings("unchecked") > + public BlurIndex newInstanceBlurIndex(ShardContext shardContext, Directory > dir, SharedMergeScheduler mergeScheduler, > + DirectoryReferenceFileGC gc, ExecutorService searchExecutor, > BlurIndexCloser indexCloser, > + BlurIndexRefresher refresher) throws IOException { > + > + String className = blurConfiguration.get(BLUR_SHARD_BLURINDEX_CLASS, > BlurNRTIndex.class.getName()); > + > + Class<? extends BlurIndex> clazz; > + try { > + clazz = (Class<? extends BlurIndex>) Class.forName(className); > + } catch (ClassNotFoundException e) { > + throw new IOException(e); > + } > + Constructor<? extends BlurIndex> constructor = findConstructor(clazz); > + try { > + return constructor.newInstance(shardContext, dir, mergeScheduler, gc, > searchExecutor, indexCloser, refresher); > + } catch (InstantiationException e) { > + throw new IOException(e); > + } catch (IllegalAccessException e) { > + throw new IOException(e); > + } catch (IllegalArgumentException e) { > + throw new IOException(e); > + } catch (InvocationTargetException e) { > + throw new IOException(e); > + } > + } > + > + private Constructor<? extends BlurIndex> findConstructor(Class<? extends > BlurIndex> clazz) throws IOException { > + try { > + return clazz.getConstructor(new Class[] { ShardContext.class, > Directory.class, SharedMergeScheduler.class, > + DirectoryReferenceFileGC.class, ExecutorService.class, > BlurIndexCloser.class, BlurIndexRefresher.class }); > + } catch (NoSuchMethodException e) { > + throw new IOException(e); > + } catch (SecurityException e) { > + throw new IOException(e); > + } > + } > } > > http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/3dc5b842/blur-core/src/test/java/org/apache/blur/manager/writer/BlurIndexNRTSimpleTest.java > ---------------------------------------------------------------------- > diff --git > a/blur-core/src/test/java/org/apache/blur/manager/writer/BlurIndexNRTSimpleTest.java > > b/blur-core/src/test/java/org/apache/blur/manager/writer/BlurIndexNRTSimpleTest.java > new file mode 100644 > index 0000000..cd528cb > --- /dev/null > +++ > b/blur-core/src/test/java/org/apache/blur/manager/writer/BlurIndexNRTSimpleTest.java > @@ -0,0 +1,242 @@ > +package org.apache.blur.manager.writer; > + > +/** > + * Licensed to the Apache Software Foundation (ASF) under one or more > + * contributor license agreements. See the NOTICE file distributed with > + * this work for additional information regarding copyright ownership. > + * The ASF licenses this file to You under the Apache License, Version 2.0 > + * (the "License"); you may not use this file except in compliance with > + * the License. You may obtain a copy of the License at > + * > + * http://www.apache.org/licenses/LICENSE-2.0 > + * > + * Unless required by applicable law or agreed to in writing, software > + * distributed under the License is distributed on an "AS IS" BASIS, > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. > + * See the License for the specific language governing permissions and > + * limitations under the License. > + */ > + > +import static org.junit.Assert.assertEquals; > + > +import java.io.File; > +import java.io.IOException; > +import java.util.Random; > +import java.util.UUID; > +import java.util.concurrent.ExecutorService; > + > +import org.apache.blur.concurrent.Executors; > +import org.apache.blur.lucene.store.refcounter.DirectoryReferenceFileGC; > +import org.apache.blur.server.IndexSearcherClosable; > +import org.apache.blur.server.ShardContext; > +import org.apache.blur.server.TableContext; > +import org.apache.blur.thrift.generated.Column; > +import org.apache.blur.thrift.generated.Record; > +import org.apache.blur.thrift.generated.Row; > +import org.apache.blur.thrift.generated.TableDescriptor; > +import org.apache.hadoop.conf.Configuration; > +import org.apache.lucene.index.IndexReader; > +import org.apache.lucene.store.FSDirectory; > +import org.junit.After; > +import org.junit.Before; > +import org.junit.Test; > + > +public class BlurIndexNRTSimpleTest { > + > + private static final int TEST_NUMBER_WAIT_VISIBLE = 500; > + private static final int TEST_NUMBER = 50000; > + > + private static final File TMPDIR = new File("./target/tmp"); > + > + private BlurIndexNRTSimple writer; > + private Random random = new Random(); > + private ExecutorService service; > + private File base; > + private Configuration configuration; > + > + private DirectoryReferenceFileGC gc; > + private SharedMergeScheduler mergeScheduler; > + private String uuid; > + private BlurIndexRefresher _refresher; > + private BlurIndexCloser _closer; > + > + @Before > + public void setup() throws IOException { > + TableContext.clear(); > + base = new File(TMPDIR, "blur-index-writer-test"); > + rm(base); > + base.mkdirs(); > + > + mergeScheduler = new SharedMergeScheduler(1); > + gc = new DirectoryReferenceFileGC(); > + > + configuration = new Configuration(); > + service = Executors.newThreadPool("test", 10); > + _refresher = new BlurIndexRefresher(); > + _closer = new BlurIndexCloser(); > + } > + > + private void setupWriter(Configuration configuration, long refresh, > boolean reload) throws IOException { > + TableDescriptor tableDescriptor = new TableDescriptor(); > + tableDescriptor.setName("test-table"); > + /* > + * if reload is set to true...we create a new writer instance pointing > + * to the same location as the old one..... > + * so previous writer instances should be closed > + */ > + > + if (!reload && uuid == null) { > + uuid = UUID.randomUUID().toString(); > + } > + > + tableDescriptor.setTableUri(new File(base, "table-store-" + > uuid).toURI().toString()); > + tableDescriptor.putToTableProperties("blur.shard.time.between.refreshs", > Long.toString(refresh)); > + > + TableContext tableContext = TableContext.create(tableDescriptor); > + File path = new File(base, "index_" + uuid); > + path.mkdirs(); > + FSDirectory directory = FSDirectory.open(path); > + ShardContext shardContext = ShardContext.create(tableContext, > "test-shard-" + uuid); > + writer = new BlurIndexNRTSimple(shardContext, directory, mergeScheduler, > gc, service, _closer, _refresher); > + } > + > + @After > + public void tearDown() throws IOException { > + _refresher.close(); > + writer.close(); > + mergeScheduler.close(); > + gc.close(); > + service.shutdownNow(); > + rm(base); > + } > + > + private void rm(File file) { > + if (!file.exists()) { > + return; > + } > + if (file.isDirectory()) { > + for (File f : file.listFiles()) { > + rm(f); > + } > + } > + file.delete(); > + } > + > + @Test > + public void testBlurIndexWriter() throws IOException { > + setupWriter(configuration, 5, false); > + long s = System.nanoTime(); > + int total = 0; > + for (int i = 0; i < TEST_NUMBER_WAIT_VISIBLE; i++) { > + writer.replaceRow(true, true, genRow()); > + IndexSearcherClosable searcher = writer.getIndexSearcher(); > + IndexReader reader = searcher.getIndexReader(); > + assertEquals(i + 1, reader.numDocs()); > + searcher.close(); > + total++; > + } > + long e = System.nanoTime(); > + double seconds = (e - s) / 1000000000.0; > + double rate = total / seconds; > + System.out.println("Rate " + rate); > + IndexSearcherClosable searcher = writer.getIndexSearcher(); > + IndexReader reader = searcher.getIndexReader(); > + assertEquals(TEST_NUMBER_WAIT_VISIBLE, reader.numDocs()); > + searcher.close(); > + } > + > + @Test > + public void testBlurIndexWriterFaster() throws IOException, > InterruptedException { > + setupWriter(configuration, 100, false); > + IndexSearcherClosable searcher1 = writer.getIndexSearcher(); > + IndexReader reader1 = searcher1.getIndexReader(); > + assertEquals(0, reader1.numDocs()); > + searcher1.close(); > + long s = System.nanoTime(); > + int total = 0; > + for (int i = 0; i < TEST_NUMBER; i++) { > + if (i == TEST_NUMBER - 1) { > + writer.replaceRow(true, true, genRow()); > + } else { > + writer.replaceRow(false, true, genRow()); > + } > + total++; > + } > + long e = System.nanoTime(); > + double seconds = (e - s) / 1000000000.0; > + double rate = total / seconds; > + System.out.println("Rate " + rate); > + // //wait one second for the data to become visible the test is set to > + // refresh once every 25 ms > + // Thread.sleep(1000); > + writer.refresh(); > + IndexSearcherClosable searcher2 = writer.getIndexSearcher(); > + IndexReader reader2 = searcher2.getIndexReader(); > + assertEquals(TEST_NUMBER, reader2.numDocs()); > + searcher2.close(); > + } > + > + private Row genRow() { > + Row row = new Row(); > + row.setId(Long.toString(random.nextLong())); > + Record record = new Record(); > + record.setFamily("testing"); > + record.setRecordId(Long.toString(random.nextLong())); > + for (int i = 0; i < 10; i++) { > + record.addToColumns(new Column("col" + i, > Long.toString(random.nextLong()))); > + } > + row.addToRecords(record); > + return row; > + } > + > +// @Test > +// public void testCreateSnapshot() throws IOException { > +// setupWriter(configuration, 5, false); > +// writer.createSnapshot("test_snapshot"); > +// assertTrue(writer.getSnapshots().contains("test_snapshot")); > +// > +// // check that the file is persisted > +// Path snapshotsDirPath = writer.getSnapshotsDirectoryPath(); > +// FileSystem fileSystem = snapshotsDirPath.getFileSystem(new > Configuration()); > +// Path snapshotFilePath = new Path(snapshotsDirPath, "test_snapshot"); > +// assertTrue(fileSystem.exists(snapshotFilePath)); > +// > +// // create a new writer instance and test whether the snapshots are > loaded properly > +// writer.close(); > +// setupWriter(configuration, 5, true); > +// assertTrue(writer.getSnapshots().contains("test_snapshot")); > +// } > +// > +// > +// @Test > +// public void testRemoveSnapshots() throws IOException { > +// setupWriter(configuration, 5, false); > +// Path snapshotsDirPath = writer.getSnapshotsDirectoryPath(); > +// FileSystem fileSystem = snapshotsDirPath.getFileSystem(new > Configuration()); > +// fileSystem.mkdirs(snapshotsDirPath); > +// > +// // create 2 files in snapshots sub-dir > +// Path snapshotFile1 = new Path(snapshotsDirPath, "test_snapshot1"); > +// Path snapshotFile2 = new Path(snapshotsDirPath, "test_snapshot2"); > +// > +// BufferedWriter br1 = new BufferedWriter(new > OutputStreamWriter(fileSystem.create(snapshotFile1, true))); > +// br1.write("segments_1"); > +// br1.close(); > +// > +// BufferedWriter br2 = new BufferedWriter(new > OutputStreamWriter(fileSystem.create(snapshotFile2, true))); > +// br2.write("segments_1"); > +// br2.close(); > +// > +// // re-load the writer to load the snpshots > +// writer.close(); > +// setupWriter(configuration, 5, true); > +// assertEquals(writer.getSnapshots().size(), 2); > +// > +// > +// writer.removeSnapshot("test_snapshot2"); > +// assertEquals(writer.getSnapshots().size(), 1); > +// assertTrue(!writer.getSnapshots().contains("test_snapshot2")); > +// assertTrue(!fileSystem.exists(snapshotFile2)); > +// > +// } > +} > > http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/3dc5b842/blur-core/src/test/java/org/apache/blur/manager/writer/BlurIndexReaderTest.java > ---------------------------------------------------------------------- > diff --git > a/blur-core/src/test/java/org/apache/blur/manager/writer/BlurIndexReaderTest.java > > b/blur-core/src/test/java/org/apache/blur/manager/writer/BlurIndexReaderTest.java > index 01762b1..cb7d649 100644 > --- > a/blur-core/src/test/java/org/apache/blur/manager/writer/BlurIndexReaderTest.java > +++ > b/blur-core/src/test/java/org/apache/blur/manager/writer/BlurIndexReaderTest.java > @@ -90,7 +90,7 @@ public class BlurIndexReaderTest { > ShardContext shardContext = ShardContext.create(tableContext, > "test-shard"); > refresher = new BlurIndexRefresher(); > indexCloser = new BlurIndexCloser(); > - reader = new BlurIndexReader(shardContext, directory, refresher, > indexCloser); > + reader = new BlurIndexReader(shardContext, directory, null, null, null, > indexCloser, refresher); > } > > @After > > http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/3dc5b842/blur-core/src/test/java/org/apache/blur/manager/writer/BlurNRTIndexTest.java > ---------------------------------------------------------------------- > diff --git > a/blur-core/src/test/java/org/apache/blur/manager/writer/BlurNRTIndexTest.java > > b/blur-core/src/test/java/org/apache/blur/manager/writer/BlurNRTIndexTest.java > index 5f40fac..9f2ffc3 100644 > --- > a/blur-core/src/test/java/org/apache/blur/manager/writer/BlurNRTIndexTest.java > +++ > b/blur-core/src/test/java/org/apache/blur/manager/writer/BlurNRTIndexTest.java > @@ -81,15 +81,15 @@ public class BlurNRTIndexTest { > TableDescriptor tableDescriptor = new TableDescriptor(); > tableDescriptor.setName("test-table"); > /* > - * if reload is set to true...we create a new writer instance pointing > - * to the same location as the old one..... > - * so previous writer instances should be closed > + * if reload is set to true...we create a new writer instance pointing to > + * the same location as the old one..... so previous writer instances > should > + * be closed > */ > - > + > if (!reload && uuid == null) { > uuid = UUID.randomUUID().toString(); > } > - > + > tableDescriptor.setTableUri(new File(base, "table-store-" + > uuid).toURI().toString()); > tableDescriptor.putToTableProperties("blur.shard.time.between.refreshs", > Long.toString(refresh)); > > @@ -98,7 +98,7 @@ public class BlurNRTIndexTest { > path.mkdirs(); > FSDirectory directory = FSDirectory.open(path); > ShardContext shardContext = ShardContext.create(tableContext, > "test-shard-" + uuid); > - writer = new BlurNRTIndex(shardContext, mergeScheduler, directory, gc, > service); > + writer = new BlurNRTIndex(shardContext, directory, mergeScheduler, gc, > service, null, null); > } > > @After > @@ -194,45 +194,44 @@ public class BlurNRTIndexTest { > setupWriter(configuration, 5, false); > writer.createSnapshot("test_snapshot"); > assertTrue(writer.getSnapshots().contains("test_snapshot")); > - > + > // check that the file is persisted > Path snapshotsDirPath = writer.getSnapshotsDirectoryPath(); > FileSystem fileSystem = snapshotsDirPath.getFileSystem(new > Configuration()); > Path snapshotFilePath = new Path(snapshotsDirPath, "test_snapshot"); > assertTrue(fileSystem.exists(snapshotFilePath)); > - > - // create a new writer instance and test whether the snapshots are > loaded properly > + > + // create a new writer instance and test whether the snapshots are loaded > + // properly > writer.close(); > setupWriter(configuration, 5, true); > assertTrue(writer.getSnapshots().contains("test_snapshot")); > } > - > - > + > @Test > public void testRemoveSnapshots() throws IOException { > setupWriter(configuration, 5, false); > Path snapshotsDirPath = writer.getSnapshotsDirectoryPath(); > FileSystem fileSystem = snapshotsDirPath.getFileSystem(new > Configuration()); > fileSystem.mkdirs(snapshotsDirPath); > - > + > // create 2 files in snapshots sub-dir > Path snapshotFile1 = new Path(snapshotsDirPath, "test_snapshot1"); > Path snapshotFile2 = new Path(snapshotsDirPath, "test_snapshot2"); > - > + > BufferedWriter br1 = new BufferedWriter(new > OutputStreamWriter(fileSystem.create(snapshotFile1, true))); > br1.write("segments_1"); > br1.close(); > - > + > BufferedWriter br2 = new BufferedWriter(new > OutputStreamWriter(fileSystem.create(snapshotFile2, true))); > br2.write("segments_1"); > br2.close(); > - > + > // re-load the writer to load the snpshots > writer.close(); > setupWriter(configuration, 5, true); > assertEquals(writer.getSnapshots().size(), 2); > - > - > + > writer.removeSnapshot("test_snapshot2"); > assertEquals(writer.getSnapshots().size(), 1); > assertTrue(!writer.getSnapshots().contains("test_snapshot2")); > > http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/3dc5b842/blur-util/src/main/resources/blur-default.properties > ---------------------------------------------------------------------- > diff --git a/blur-util/src/main/resources/blur-default.properties > b/blur-util/src/main/resources/blur-default.properties > index 72bf66d..1876519 100644 > --- a/blur-util/src/main/resources/blur-default.properties > +++ b/blur-util/src/main/resources/blur-default.properties > @@ -181,6 +181,9 @@ blur.gui.shard.port=40090 > # To intercept the calls made to the shard server and perform server side > changes to the calls extend org.apache.blur.server.FilteredBlurServer. > blur.shard.filtered.server.class= > > +# Defines the blur index class to be used to handle index requests. This > class has to extend org.apache.blur.manager.writer.BlurIndex. This can be > defined globally as well as per table. > +blur.shard.blurindex.class= > + > > ### Controller Server Configuration > >
