Good stuff, makes me think the mutateBatch, with it's current weak guarantee, should just go away or be implemented on top of this?
With the BlurIndexSimpleWriter, is it the case that this doesn't yet handle shard failovers - I tried to figure out how _bulkWriters could get reinflated on startup but didn't see it so not sure if I was missing something or its not there yet? Also wondering if the sorted path needs to be a temporary file and moved into place? --tim On Thu, Dec 18, 2014 at 6:21 PM, <[email protected]> wrote: > Adding bulk mutate implementation. > > > Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo > Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/89945db1 > Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/89945db1 > Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/89945db1 > > Branch: refs/heads/master > Commit: 89945db12a98bca1290e6ccbedbbee7c526803ba > Parents: a0a7d7d > Author: Aaron McCurry <[email protected]> > Authored: Thu Dec 18 18:21:14 2014 -0500 > Committer: Aaron McCurry <[email protected]> > Committed: Thu Dec 18 18:21:14 2014 -0500 > > ---------------------------------------------------------------------- > .../org/apache/blur/manager/IndexManager.java | 30 ++- > .../apache/blur/manager/writer/BlurIndex.java | 6 + > .../blur/manager/writer/BlurIndexReadOnly.java | 15 ++ > .../manager/writer/BlurIndexSimpleWriter.java | 201 ++++++++++++++++++- > .../apache/blur/server/FilteredBlurServer.java | 15 ++ > .../blur/thrift/BlurControllerServer.java | 75 +++++++ > .../org/apache/blur/thrift/BlurShardServer.java | 39 ++++ > .../blur/command/ShardCommandManagerTest.java | 15 ++ > .../blur/manager/writer/IndexImporterTest.java | 15 ++ > 9 files changed, 407 insertions(+), 4 deletions(-) > ---------------------------------------------------------------------- > > > http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/89945db1/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java > ---------------------------------------------------------------------- > diff --git > a/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java > b/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java > index 51c4561..0dbc634 100644 > --- a/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java > +++ b/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java > @@ -1015,18 +1015,18 @@ public class IndexManager { > List<String> terms = new ArrayList<String>(size); > AtomicReader areader = BlurUtil.getAtomicReader(reader); > Terms termsAll = areader.terms(term.field()); > - > + > if (termsAll == null) { > return terms; > } > > TermsEnum termEnum = termsAll.iterator(null); > SeekStatus status = termEnum.seekCeil(term.bytes()); > - > + > if (status == SeekStatus.END) { > return terms; > } > - > + > BytesRef currentTermText = termEnum.term(); > do { > terms.add(currentTermText.utf8ToString()); > @@ -1289,4 +1289,28 @@ public class IndexManager { > enqueue(Arrays.asList(mutation)); > } > > + public void bulkMutateStart(String table, String bulkId) throws > BlurException, IOException { > + Map<String, BlurIndex> indexes = _indexServer.getIndexes(table); > + for (BlurIndex index : indexes.values()) { > + index.startBulkMutate(bulkId); > + } > + } > + > + public void bulkMutateAdd(String table, String bulkId, RowMutation > mutation) throws BlurException, IOException { > + String shard = MutationHelper.getShardName(table, mutation.rowId, > getNumberOfShards(table), _blurPartitioner); > + Map<String, BlurIndex> indexes = _indexServer.getIndexes(table); > + BlurIndex blurIndex = indexes.get(shard); > + if (blurIndex == null) { > + throw new BException("Shard [{0}] for table [{1}] not found on this > server.", shard, table); > + } > + blurIndex.addBulkMutate(bulkId, mutation); > + } > + > + public void bulkMutateFinish(String table, String bulkId, boolean apply, > boolean blockUntilComplete) throws BlurException, IOException { > + Map<String, BlurIndex> indexes = _indexServer.getIndexes(table); > + for (BlurIndex index : indexes.values()) { > + index.finishBulkMutate(bulkId, apply,blockUntilComplete); > + } > + } > + > } > > http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/89945db1/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndex.java > ---------------------------------------------------------------------- > diff --git > a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndex.java > b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndex.java > index 1ca8f0c..eec3f65 100644 > --- a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndex.java > +++ b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndex.java > @@ -136,4 +136,10 @@ public abstract class BlurIndex { > > public abstract void enqueue(List<RowMutation> mutations) throws > IOException; > > + public abstract void startBulkMutate(String bulkId) throws IOException; > + > + public abstract void finishBulkMutate(String bulkId, boolean apply, > boolean blockUntilComplete) throws IOException; > + > + public abstract void addBulkMutate(String bulkId, RowMutation mutation) > throws IOException; > + > } > > http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/89945db1/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReadOnly.java > ---------------------------------------------------------------------- > diff --git > a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReadOnly.java > > b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReadOnly.java > index c2aee75..a33dc62 100644 > --- > a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReadOnly.java > +++ > b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReadOnly.java > @@ -74,4 +74,19 @@ public class BlurIndexReadOnly extends BlurIndex { > throw new RuntimeException("Read-only shard"); > } > > + @Override > + public void startBulkMutate(String bulkId) { > + throw new RuntimeException("Read-only shard"); > + } > + > + @Override > + public void finishBulkMutate(String bulkId, boolean apply, boolean > blockUntilComplete) { > + throw new RuntimeException("Read-only shard"); > + } > + > + @Override > + public void addBulkMutate(String bulkId, RowMutation mutation) { > + throw new RuntimeException("Read-only shard"); > + } > + > } > > http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/89945db1/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java > ---------------------------------------------------------------------- > diff --git > a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java > > b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java > index 792a7d8..a07cc34 100644 > --- > a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java > +++ > b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java > @@ -22,9 +22,11 @@ import static > org.apache.blur.utils.BlurConstants.BLUR_SHARD_QUEUE_MAX_INMEMORY_ > import java.io.IOException; > import java.util.ArrayList; > import java.util.List; > +import java.util.Map; > import java.util.Timer; > import java.util.concurrent.ArrayBlockingQueue; > import java.util.concurrent.BlockingQueue; > +import java.util.concurrent.ConcurrentHashMap; > import java.util.concurrent.ExecutorService; > import java.util.concurrent.TimeUnit; > import java.util.concurrent.atomic.AtomicBoolean; > @@ -45,15 +47,27 @@ import org.apache.blur.lucene.codec.Blur024Codec; > import org.apache.blur.server.IndexSearcherClosable; > import org.apache.blur.server.ShardContext; > import org.apache.blur.server.TableContext; > +import org.apache.blur.thrift.generated.BlurException; > import org.apache.blur.thrift.generated.RowMutation; > import org.apache.blur.trace.Trace; > import org.apache.blur.trace.Tracer; > +import org.apache.blur.utils.BlurUtil; > +import org.apache.hadoop.conf.Configuration; > +import org.apache.hadoop.fs.FileStatus; > +import org.apache.hadoop.fs.FileSystem; > import org.apache.hadoop.fs.Path; > +import org.apache.hadoop.io.BytesWritable; > import org.apache.hadoop.io.IOUtils; > +import org.apache.hadoop.io.SequenceFile; > +import org.apache.hadoop.io.SequenceFile.Reader; > +import org.apache.hadoop.io.SequenceFile.Sorter; > +import org.apache.hadoop.io.SequenceFile.Writer; > +import org.apache.hadoop.io.Text; > import org.apache.lucene.analysis.Analyzer; > import org.apache.lucene.index.BlurIndexWriter; > import org.apache.lucene.index.DirectoryReader; > import org.apache.lucene.index.IndexReader; > +import org.apache.lucene.index.IndexWriter; > import org.apache.lucene.index.IndexWriterConfig; > import org.apache.lucene.index.TieredMergePolicy; > import org.apache.lucene.store.Directory; > @@ -88,10 +102,12 @@ public class BlurIndexSimpleWriter extends BlurIndex { > private final BlockingQueue<RowMutation> _queue; > private final MutationQueueProcessor _mutationQueueProcessor; > private final Timer _indexImporterTimer; > + private final Map<String, BulkEntry> _bulkWriters; > > public BlurIndexSimpleWriter(ShardContext shardContext, Directory > directory, SharedMergeScheduler mergeScheduler, > final ExecutorService searchExecutor, BlurIndexCloser indexCloser, > Timer indexImporterTimer) throws IOException { > super(shardContext, directory, mergeScheduler, searchExecutor, > indexCloser, indexImporterTimer); > + _bulkWriters = new ConcurrentHashMap<String, > BlurIndexSimpleWriter.BulkEntry>(); > _indexImporterTimer = indexImporterTimer; > _searchThreadPool = searchExecutor; > _shardContext = shardContext; > @@ -103,7 +119,7 @@ public class BlurIndexSimpleWriter extends BlurIndex { > _conf.setWriteLockTimeout(TimeUnit.MINUTES.toMillis(5)); > _conf.setCodec(new Blur024Codec(_tableContext.getBlurConfiguration())); > _conf.setSimilarity(_tableContext.getSimilarity()); > - _conf.setInfoStream(new > LoggingInfoStream(_tableContext.getTable(),_shardContext.getShard())); > + _conf.setInfoStream(new LoggingInfoStream(_tableContext.getTable(), > _shardContext.getShard())); > TieredMergePolicy mergePolicy = (TieredMergePolicy) > _conf.getMergePolicy(); > mergePolicy.setUseCompoundFile(false); > _conf.setMergeScheduler(mergeScheduler.getMergeScheduler()); > @@ -381,4 +397,187 @@ public class BlurIndexSimpleWriter extends BlurIndex { > _mutationQueueProcessor.startIfNotRunning(); > } > > + static class BulkEntry { > + final SequenceFile.Writer _writer; > + final Path _path; > + > + BulkEntry(Writer writer, Path path) { > + _writer = writer; > + _path = path; > + } > + } > + > + @Override > + public void startBulkMutate(String bulkId) throws IOException { > + BulkEntry bulkEntry = _bulkWriters.get(bulkId); > + if (bulkEntry == null) { > + Path tablePath = _tableContext.getTablePath(); > + Path bulk = new Path(tablePath, "bulk"); > + Path bulkInstance = new Path(bulk, bulkId); > + Path path = new Path(bulkInstance, _shardContext.getShard() + > ".notsorted.seq"); > + Configuration configuration = _tableContext.getConfiguration(); > + FileSystem fileSystem = path.getFileSystem(configuration); > + Writer writer = new SequenceFile.Writer(fileSystem, configuration, > path, Text.class, BytesWritable.class); > + _bulkWriters.put(bulkId, new BulkEntry(writer, path)); > + } else { > + LOG.info("Bulk [{0}] mutate already started on shard [{1}] in table > [{2}].", bulkId, _shardContext.getShard(), > + _tableContext.getTable()); > + } > + } > + > + @Override > + public void finishBulkMutate(final String bulkId, boolean apply, boolean > blockUntilComplete) throws IOException { > + final String table = _tableContext.getTable(); > + final String shard = _shardContext.getShard(); > + > + LOG.info("Shard [{2}/{3}] Id [{0}] Finishing bulk mutate apply [{1}]", > bulkId, apply, table, shard); > + final BulkEntry bulkEntry = _bulkWriters.get(bulkId); > + bulkEntry._writer.close(); > + > + Configuration configuration = _tableContext.getConfiguration(); > + final Path path = bulkEntry._path; > + final FileSystem fileSystem = path.getFileSystem(configuration); > + > + if (!apply) { > + fileSystem.delete(path, false); > + Path parent = path.getParent(); > + removeParentIfLastFile(fileSystem, parent); > + } else { > + Runnable runnable = new Runnable() { > + @Override > + public void run() { > + try { > + process(new IndexAction() { > + private Path _sorted; > + > + @Override > + public void performMutate(IndexSearcherClosable searcher, > IndexWriter writer) throws IOException { > + Configuration configuration = > _tableContext.getConfiguration(); > + > + SequenceFile.Sorter sorter = new Sorter(fileSystem, > Text.class, BytesWritable.class, configuration); > + > + _sorted = new Path(path.getParent(), shard + ".sorted.seq"); > + > + LOG.info("Shard [{2}/{3}] Id [{4}] Sorting mutates path > [{0}] sorted path [{1}]", path, _sorted, table, > + shard, bulkId); > + sorter.sort(path, _sorted); > + > + LOG.info("Shard [{1}/{2}] Id [{3}] Applying mutates sorted > path [{0}]", _sorted, table, shard, bulkId); > + Reader reader = new SequenceFile.Reader(fileSystem, _sorted, > configuration); > + > + Text key = new Text(); > + BytesWritable value = new BytesWritable(); > + > + Text last = null; > + List<RowMutation> list = new ArrayList<RowMutation>(); > + while (reader.next(key, value)) { > + if (!key.equals(last)) { > + flushMutates(searcher, writer, list); > + last = new Text(key); > + list.clear(); > + } > + list.add(fromBytesWritable(value)); > + } > + flushMutates(searcher, writer, list); > + reader.close(); > + LOG.info("Shard [{0}/{1}] Id [{2}] Finished applying mutates > starting commit.", table, shard, bulkId); > + } > + > + private void flushMutates(IndexSearcherClosable searcher, > IndexWriter writer, List<RowMutation> list) > + throws IOException { > + if (!list.isEmpty()) { > + List<RowMutation> reduceMutates; > + try { > + reduceMutates = MutatableAction.reduceMutates(list); > + } catch (BlurException e) { > + throw new IOException(e); > + } > + for (RowMutation mutation : reduceMutates) { > + MutatableAction mutatableAction = new > MutatableAction(_shardContext); > + mutatableAction.mutate(mutation); > + mutatableAction.performMutate(searcher, writer); > + } > + } > + } > + > + private void cleanupFiles() throws IOException { > + fileSystem.delete(path, false); > + fileSystem.delete(_sorted, false); > + Path parent = path.getParent(); > + removeParentIfLastFile(fileSystem, parent); > + } > + > + @Override > + public void doPreRollback(IndexWriter writer) throws > IOException { > + > + } > + > + @Override > + public void doPreCommit(IndexSearcherClosable indexSearcher, > IndexWriter writer) throws IOException { > + > + } > + > + @Override > + public void doPostRollback(IndexWriter writer) throws > IOException { > + cleanupFiles(); > + } > + > + @Override > + public void doPostCommit(IndexWriter writer) throws > IOException { > + cleanupFiles(); > + } > + }); > + } catch (IOException e) { > + LOG.error("Shard [{0}/{1}] Id [{2}] Unknown error while trying > to finish the bulk updates.", table, shard, > + bulkId); > + } > + } > + }; > + if (blockUntilComplete) { > + runnable.run(); > + } else { > + Thread thread = new Thread(runnable); > + thread.setName("Bulk Finishing Thread Table [" + table + "] Shard [" > + shard + "] BulkId [" + bulkId + "]"); > + thread.start(); > + } > + } > + } > + > + @Override > + public void addBulkMutate(String bulkId, RowMutation mutation) throws > IOException { > + BulkEntry bulkEntry = _bulkWriters.get(bulkId); > + if (bulkEntry == null) { > + throw new IOException("Bulk writer for [" + bulkId + "] not found."); > + } > + bulkEntry._writer.append(getKey(mutation), toBytesWritable(mutation)); > + } > + > + private Text getKey(RowMutation mutation) { > + return new Text(mutation.getRowId()); > + } > + > + private BytesWritable toBytesWritable(RowMutation mutation) { > + BytesWritable value = new BytesWritable(); > + byte[] bytes = BlurUtil.toBytes(mutation); > + value.set(bytes, 0, bytes.length); > + return value; > + } > + > + private RowMutation fromBytesWritable(BytesWritable value) { > + return (RowMutation) BlurUtil.fromBytes(value.getBytes(), 0, > value.getLength()); > + } > + > + private static void removeParentIfLastFile(final FileSystem fileSystem, > Path parent) throws IOException { > + FileStatus[] listStatus = fileSystem.listStatus(parent); > + if (listStatus != null) { > + if (listStatus.length == 0) { > + if (!fileSystem.delete(parent, false)) { > + if (fileSystem.exists(parent)) { > + LOG.error("Could not remove parent directory [{0}]", parent); > + } > + } > + } > + } > + } > + > } > > http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/89945db1/blur-core/src/main/java/org/apache/blur/server/FilteredBlurServer.java > ---------------------------------------------------------------------- > diff --git > a/blur-core/src/main/java/org/apache/blur/server/FilteredBlurServer.java > b/blur-core/src/main/java/org/apache/blur/server/FilteredBlurServer.java > index d561560..47a9376 100644 > --- a/blur-core/src/main/java/org/apache/blur/server/FilteredBlurServer.java > +++ b/blur-core/src/main/java/org/apache/blur/server/FilteredBlurServer.java > @@ -291,4 +291,19 @@ public class FilteredBlurServer implements Iface { > _iface.loadData(table, location); > } > > + @Override > + public void bulkMutateStart(String table, String bulkId) throws > BlurException, TException { > + _iface.bulkMutateStart(table, bulkId); > + } > + > + @Override > + public void bulkMutateAdd(String table, String bulkId, RowMutation > rowMutation) throws BlurException, TException { > + _iface.bulkMutateAdd(table, bulkId, rowMutation); > + } > + > + @Override > + public void bulkMutateFinish(String table, String bulkId, boolean apply, > boolean blockUntilComplete) throws BlurException, TException { > + _iface.bulkMutateFinish(table, bulkId, apply, blockUntilComplete); > + } > + > } > > http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/89945db1/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java > ---------------------------------------------------------------------- > diff --git > a/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java > b/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java > index 23f719c..4cd2327 100644 > --- a/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java > +++ b/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java > @@ -1673,4 +1673,79 @@ public class BlurControllerServer extends TableAdmin > implements Iface { > throw new BException("Not Implemented"); > } > > + @Override > + public void bulkMutateStart(final String table, final String bulkId) > throws BlurException, TException { > + String cluster = getCluster(table); > + try { > + scatter(cluster, new BlurCommand<Void>() { > + @Override > + public Void call(Client client) throws BlurException, TException { > + client.bulkMutateStart(table, bulkId); > + return null; > + } > + }); > + } catch (Exception e) { > + LOG.error("Unknown error while trying to get start a bulk mutate [{0}] > [{1}]", e, table, bulkId); > + if (e instanceof BlurException) { > + throw (BlurException) e; > + } > + throw new BException(e.getMessage(), e); > + } > + } > + > + @Override > + public void bulkMutateAdd(final String table, final String bulkId, final > RowMutation mutation) throws BlurException, TException { > + try { > + checkTable(mutation.table); > + checkForUpdates(mutation.table); > + MutationHelper.validateMutation(mutation); > + if (!table.equals(mutation.getTable())) { > + throw new BException("RowMutation table [{0}] has to match method > table [{1}]", mutation.getTable(), table); > + } > + > + int numberOfShards = getShardCount(table); > + Map<String, String> tableLayout = getTableLayout(table); > + if (tableLayout.size() != numberOfShards) { > + throw new BException("Cannot update data while shard is missing"); > + } > + > + String shardName = MutationHelper.getShardName(table, mutation.rowId, > numberOfShards, _blurPartitioner); > + String node = tableLayout.get(shardName); > + _client.execute(node, new BlurCommand<Void>() { > + @Override > + public Void call(Client client) throws BlurException, TException { > + client.bulkMutateAdd(table, bulkId, mutation); > + return null; > + } > + }, _maxMutateRetries, _mutateDelay, _maxMutateDelay); > + } catch (Exception e) { > + LOG.error("Unknown error during bulk mutation of [{0}]", e, mutation); > + if (e instanceof BlurException) { > + throw (BlurException) e; > + } > + throw new BException("Unknown error during bulk mutation of [{0}]", e, > mutation); > + } > + } > + > + @Override > + public void bulkMutateFinish(final String table, final String bulkId, > final boolean apply, final boolean blockUntilComplete) throws BlurException, > + TException { > + String cluster = getCluster(table); > + try { > + scatter(cluster, new BlurCommand<Void>() { > + @Override > + public Void call(Client client) throws BlurException, TException { > + client.bulkMutateFinish(table, bulkId, apply, blockUntilComplete); > + return null; > + } > + }); > + } catch (Exception e) { > + LOG.error("Unknown error while trying to get finish a bulk mutate > [{0}] [{1}]", e, table, bulkId); > + if (e instanceof BlurException) { > + throw (BlurException) e; > + } > + throw new BException(e.getMessage(), e); > + } > + } > + > } > > http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/89945db1/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java > ---------------------------------------------------------------------- > diff --git > a/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java > b/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java > index 135feaf..976896c 100644 > --- a/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java > +++ b/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java > @@ -695,4 +695,43 @@ public class BlurShardServer extends TableAdmin > implements Iface { > return CommandStatusStateEnum.valueOf(state.name()); > } > > + @Override > + public void bulkMutateStart(String table, String bulkId) throws > BlurException, TException { > + try { > + _indexManager.bulkMutateStart(table, bulkId); > + } catch (Exception e) { > + LOG.error("Unknown error while trying to start a bulk mutate on table > [" + table + "]", e); > + if (e instanceof BlurException) { > + throw (BlurException) e; > + } > + throw new BException(e.getMessage(), e); > + } > + } > + > + @Override > + public void bulkMutateAdd(String table, String bulkId, RowMutation > rowMutation) throws BlurException, TException { > + try { > + _indexManager.bulkMutateAdd(table, bulkId, rowMutation); > + } catch (Exception e) { > + LOG.error("Unknown error while trying to add to a bulk mutate on table > [" + table + "]", e); > + if (e instanceof BlurException) { > + throw (BlurException) e; > + } > + throw new BException(e.getMessage(), e); > + } > + } > + > + @Override > + public void bulkMutateFinish(String table, String bulkId, boolean apply, > boolean blockUntilComplete) throws BlurException, TException { > + try { > + _indexManager.bulkMutateFinish(table, bulkId, > apply,blockUntilComplete); > + } catch (Exception e) { > + LOG.error("Unknown error while trying to finsh a bulk mutate on table > [" + table + "]", e); > + if (e instanceof BlurException) { > + throw (BlurException) e; > + } > + throw new BException(e.getMessage(), e); > + } > + } > + > } > > http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/89945db1/blur-core/src/test/java/org/apache/blur/command/ShardCommandManagerTest.java > ---------------------------------------------------------------------- > diff --git > a/blur-core/src/test/java/org/apache/blur/command/ShardCommandManagerTest.java > > b/blur-core/src/test/java/org/apache/blur/command/ShardCommandManagerTest.java > index d2b4eed..673838b 100644 > --- > a/blur-core/src/test/java/org/apache/blur/command/ShardCommandManagerTest.java > +++ > b/blur-core/src/test/java/org/apache/blur/command/ShardCommandManagerTest.java > @@ -430,6 +430,21 @@ public class ShardCommandManagerTest { > public void close() throws IOException { > throw new RuntimeException("Not implemented."); > } > + > + @Override > + public void startBulkMutate(String bulkId) throws IOException { > + throw new RuntimeException("Not implemented."); > + } > + > + @Override > + public void finishBulkMutate(String bulkId, boolean apply, boolean > blockUntilComplete) throws IOException { > + throw new RuntimeException("Not implemented."); > + } > + > + @Override > + public void addBulkMutate(String bulkId, RowMutation mutation) throws > IOException { > + throw new RuntimeException("Not implemented."); > + } > }; > } > > > http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/89945db1/blur-core/src/test/java/org/apache/blur/manager/writer/IndexImporterTest.java > ---------------------------------------------------------------------- > diff --git > a/blur-core/src/test/java/org/apache/blur/manager/writer/IndexImporterTest.java > > b/blur-core/src/test/java/org/apache/blur/manager/writer/IndexImporterTest.java > index 28b0d21..9e83196 100644 > --- > a/blur-core/src/test/java/org/apache/blur/manager/writer/IndexImporterTest.java > +++ > b/blur-core/src/test/java/org/apache/blur/manager/writer/IndexImporterTest.java > @@ -197,6 +197,21 @@ public class IndexImporterTest { > public void enqueue(List<RowMutation> mutations) { > throw new RuntimeException("Not Implemented"); > } > + > + @Override > + public void startBulkMutate(String bulkId) throws IOException { > + throw new RuntimeException("Not implemented."); > + } > + > + @Override > + public void finishBulkMutate(String bulkId, boolean apply, boolean > blockUntilComplete) throws IOException { > + throw new RuntimeException("Not implemented."); > + } > + > + @Override > + public void addBulkMutate(String bulkId, RowMutation mutation) throws > IOException { > + throw new RuntimeException("Not implemented."); > + } > }; > } > >
