On Friday, December 19, 2014, Tim Williams <[email protected]> wrote:
> Good stuff, makes me think the mutateBatch, with it's current weak > guarantee, should just go away or be implemented on top of this? Yes I was thinking the same thing. Though there isn't a total commit guarantee with the new solution. Yet. > > With the BlurIndexSimpleWriter, is it the case that this doesn't yet > handle shard failovers - I tried to figure out how _bulkWriters could > get reinflated on startup but didn't see it so not sure if I was > missing something or its not there yet? Also wondering if the sorted > path needs to be a temporary file and moved into place? You are correct it doesn't survive fail overs yet. We should discuss some strageties for that. > > --tim > > > On Thu, Dec 18, 2014 at 6:21 PM, <[email protected] <javascript:;>> > wrote: > > Adding bulk mutate implementation. > > > > > > Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo > > Commit: > http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/89945db1 > > Tree: > http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/89945db1 > > Diff: > http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/89945db1 > > > > Branch: refs/heads/master > > Commit: 89945db12a98bca1290e6ccbedbbee7c526803ba > > Parents: a0a7d7d > > Author: Aaron McCurry <[email protected] <javascript:;>> > > Authored: Thu Dec 18 18:21:14 2014 -0500 > > Committer: Aaron McCurry <[email protected] <javascript:;>> > > Committed: Thu Dec 18 18:21:14 2014 -0500 > > > > ---------------------------------------------------------------------- > > .../org/apache/blur/manager/IndexManager.java | 30 ++- > > .../apache/blur/manager/writer/BlurIndex.java | 6 + > > .../blur/manager/writer/BlurIndexReadOnly.java | 15 ++ > > .../manager/writer/BlurIndexSimpleWriter.java | 201 > ++++++++++++++++++- > > .../apache/blur/server/FilteredBlurServer.java | 15 ++ > > .../blur/thrift/BlurControllerServer.java | 75 +++++++ > > .../org/apache/blur/thrift/BlurShardServer.java | 39 ++++ > > .../blur/command/ShardCommandManagerTest.java | 15 ++ > > .../blur/manager/writer/IndexImporterTest.java | 15 ++ > > 9 files changed, 407 insertions(+), 4 deletions(-) > > ---------------------------------------------------------------------- > > > > > > > http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/89945db1/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java > > ---------------------------------------------------------------------- > > diff --git > a/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java > b/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java > > index 51c4561..0dbc634 100644 > > --- a/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java > > +++ b/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java > > @@ -1015,18 +1015,18 @@ public class IndexManager { > > List<String> terms = new ArrayList<String>(size); > > AtomicReader areader = BlurUtil.getAtomicReader(reader); > > Terms termsAll = areader.terms(term.field()); > > - > > + > > if (termsAll == null) { > > return terms; > > } > > > > TermsEnum termEnum = termsAll.iterator(null); > > SeekStatus status = termEnum.seekCeil(term.bytes()); > > - > > + > > if (status == SeekStatus.END) { > > return terms; > > } > > - > > + > > BytesRef currentTermText = termEnum.term(); > > do { > > terms.add(currentTermText.utf8ToString()); > > @@ -1289,4 +1289,28 @@ public class IndexManager { > > enqueue(Arrays.asList(mutation)); > > } > > > > + public void bulkMutateStart(String table, String bulkId) throws > BlurException, IOException { > > + Map<String, BlurIndex> indexes = _indexServer.getIndexes(table); > > + for (BlurIndex index : indexes.values()) { > > + index.startBulkMutate(bulkId); > > + } > > + } > > + > > + public void bulkMutateAdd(String table, String bulkId, RowMutation > mutation) throws BlurException, IOException { > > + String shard = MutationHelper.getShardName(table, mutation.rowId, > getNumberOfShards(table), _blurPartitioner); > > + Map<String, BlurIndex> indexes = _indexServer.getIndexes(table); > > + BlurIndex blurIndex = indexes.get(shard); > > + if (blurIndex == null) { > > + throw new BException("Shard [{0}] for table [{1}] not found on > this server.", shard, table); > > + } > > + blurIndex.addBulkMutate(bulkId, mutation); > > + } > > + > > + public void bulkMutateFinish(String table, String bulkId, boolean > apply, boolean blockUntilComplete) throws BlurException, IOException { > > + Map<String, BlurIndex> indexes = _indexServer.getIndexes(table); > > + for (BlurIndex index : indexes.values()) { > > + index.finishBulkMutate(bulkId, apply,blockUntilComplete); > > + } > > + } > > + > > } > > > > > http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/89945db1/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndex.java > > ---------------------------------------------------------------------- > > diff --git > a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndex.java > b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndex.java > > index 1ca8f0c..eec3f65 100644 > > --- > a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndex.java > > +++ > b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndex.java > > @@ -136,4 +136,10 @@ public abstract class BlurIndex { > > > > public abstract void enqueue(List<RowMutation> mutations) throws > IOException; > > > > + public abstract void startBulkMutate(String bulkId) throws > IOException; > > + > > + public abstract void finishBulkMutate(String bulkId, boolean apply, > boolean blockUntilComplete) throws IOException; > > + > > + public abstract void addBulkMutate(String bulkId, RowMutation > mutation) throws IOException; > > + > > } > > > > > http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/89945db1/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReadOnly.java > > ---------------------------------------------------------------------- > > diff --git > a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReadOnly.java > b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReadOnly.java > > index c2aee75..a33dc62 100644 > > --- > a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReadOnly.java > > +++ > b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReadOnly.java > > @@ -74,4 +74,19 @@ public class BlurIndexReadOnly extends BlurIndex { > > throw new RuntimeException("Read-only shard"); > > } > > > > + @Override > > + public void startBulkMutate(String bulkId) { > > + throw new RuntimeException("Read-only shard"); > > + } > > + > > + @Override > > + public void finishBulkMutate(String bulkId, boolean apply, boolean > blockUntilComplete) { > > + throw new RuntimeException("Read-only shard"); > > + } > > + > > + @Override > > + public void addBulkMutate(String bulkId, RowMutation mutation) { > > + throw new RuntimeException("Read-only shard"); > > + } > > + > > } > > > > > http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/89945db1/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java > > ---------------------------------------------------------------------- > > diff --git > a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java > b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java > > index 792a7d8..a07cc34 100644 > > --- > a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java > > +++ > b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java > > @@ -22,9 +22,11 @@ import static > org.apache.blur.utils.BlurConstants.BLUR_SHARD_QUEUE_MAX_INMEMORY_ > > import java.io.IOException; > > import java.util.ArrayList; > > import java.util.List; > > +import java.util.Map; > > import java.util.Timer; > > import java.util.concurrent.ArrayBlockingQueue; > > import java.util.concurrent.BlockingQueue; > > +import java.util.concurrent.ConcurrentHashMap; > > import java.util.concurrent.ExecutorService; > > import java.util.concurrent.TimeUnit; > > import java.util.concurrent.atomic.AtomicBoolean; > > @@ -45,15 +47,27 @@ import org.apache.blur.lucene.codec.Blur024Codec; > > import org.apache.blur.server.IndexSearcherClosable; > > import org.apache.blur.server.ShardContext; > > import org.apache.blur.server.TableContext; > > +import org.apache.blur.thrift.generated.BlurException; > > import org.apache.blur.thrift.generated.RowMutation; > > import org.apache.blur.trace.Trace; > > import org.apache.blur.trace.Tracer; > > +import org.apache.blur.utils.BlurUtil; > > +import org.apache.hadoop.conf.Configuration; > > +import org.apache.hadoop.fs.FileStatus; > > +import org.apache.hadoop.fs.FileSystem; > > import org.apache.hadoop.fs.Path; > > +import org.apache.hadoop.io.BytesWritable; > > import org.apache.hadoop.io.IOUtils; > > +import org.apache.hadoop.io.SequenceFile; > > +import org.apache.hadoop.io.SequenceFile.Reader; > > +import org.apache.hadoop.io.SequenceFile.Sorter; > > +import org.apache.hadoop.io.SequenceFile.Writer; > > +import org.apache.hadoop.io.Text; > > import org.apache.lucene.analysis.Analyzer; > > import org.apache.lucene.index.BlurIndexWriter; > > import org.apache.lucene.index.DirectoryReader; > > import org.apache.lucene.index.IndexReader; > > +import org.apache.lucene.index.IndexWriter; > > import org.apache.lucene.index.IndexWriterConfig; > > import org.apache.lucene.index.TieredMergePolicy; > > import org.apache.lucene.store.Directory; > > @@ -88,10 +102,12 @@ public class BlurIndexSimpleWriter extends > BlurIndex { > > private final BlockingQueue<RowMutation> _queue; > > private final MutationQueueProcessor _mutationQueueProcessor; > > private final Timer _indexImporterTimer; > > + private final Map<String, BulkEntry> _bulkWriters; > > > > public BlurIndexSimpleWriter(ShardContext shardContext, Directory > directory, SharedMergeScheduler mergeScheduler, > > final ExecutorService searchExecutor, BlurIndexCloser > indexCloser, Timer indexImporterTimer) throws IOException { > > super(shardContext, directory, mergeScheduler, searchExecutor, > indexCloser, indexImporterTimer); > > + _bulkWriters = new ConcurrentHashMap<String, > BlurIndexSimpleWriter.BulkEntry>(); > > _indexImporterTimer = indexImporterTimer; > > _searchThreadPool = searchExecutor; > > _shardContext = shardContext; > > @@ -103,7 +119,7 @@ public class BlurIndexSimpleWriter extends BlurIndex > { > > _conf.setWriteLockTimeout(TimeUnit.MINUTES.toMillis(5)); > > _conf.setCodec(new > Blur024Codec(_tableContext.getBlurConfiguration())); > > _conf.setSimilarity(_tableContext.getSimilarity()); > > - _conf.setInfoStream(new > LoggingInfoStream(_tableContext.getTable(),_shardContext.getShard())); > > + _conf.setInfoStream(new LoggingInfoStream(_tableContext.getTable(), > _shardContext.getShard())); > > TieredMergePolicy mergePolicy = (TieredMergePolicy) > _conf.getMergePolicy(); > > mergePolicy.setUseCompoundFile(false); > > _conf.setMergeScheduler(mergeScheduler.getMergeScheduler()); > > @@ -381,4 +397,187 @@ public class BlurIndexSimpleWriter extends > BlurIndex { > > _mutationQueueProcessor.startIfNotRunning(); > > } > > > > + static class BulkEntry { > > + final SequenceFile.Writer _writer; > > + final Path _path; > > + > > + BulkEntry(Writer writer, Path path) { > > + _writer = writer; > > + _path = path; > > + } > > + } > > + > > + @Override > > + public void startBulkMutate(String bulkId) throws IOException { > > + BulkEntry bulkEntry = _bulkWriters.get(bulkId); > > + if (bulkEntry == null) { > > + Path tablePath = _tableContext.getTablePath(); > > + Path bulk = new Path(tablePath, "bulk"); > > + Path bulkInstance = new Path(bulk, bulkId); > > + Path path = new Path(bulkInstance, _shardContext.getShard() + > ".notsorted.seq"); > > + Configuration configuration = _tableContext.getConfiguration(); > > + FileSystem fileSystem = path.getFileSystem(configuration); > > + Writer writer = new SequenceFile.Writer(fileSystem, > configuration, path, Text.class, BytesWritable.class); > > + _bulkWriters.put(bulkId, new BulkEntry(writer, path)); > > + } else { > > + LOG.info("Bulk [{0}] mutate already started on shard [{1}] in > table [{2}].", bulkId, _shardContext.getShard(), > > + _tableContext.getTable()); > > + } > > + } > > + > > + @Override > > + public void finishBulkMutate(final String bulkId, boolean apply, > boolean blockUntilComplete) throws IOException { > > + final String table = _tableContext.getTable(); > > + final String shard = _shardContext.getShard(); > > + > > + LOG.info("Shard [{2}/{3}] Id [{0}] Finishing bulk mutate apply > [{1}]", bulkId, apply, table, shard); > > + final BulkEntry bulkEntry = _bulkWriters.get(bulkId); > > + bulkEntry._writer.close(); > > + > > + Configuration configuration = _tableContext.getConfiguration(); > > + final Path path = bulkEntry._path; > > + final FileSystem fileSystem = path.getFileSystem(configuration); > > + > > + if (!apply) { > > + fileSystem.delete(path, false); > > + Path parent = path.getParent(); > > + removeParentIfLastFile(fileSystem, parent); > > + } else { > > + Runnable runnable = new Runnable() { > > + @Override > > + public void run() { > > + try { > > + process(new IndexAction() { > > + private Path _sorted; > > + > > + @Override > > + public void performMutate(IndexSearcherClosable searcher, > IndexWriter writer) throws IOException { > > + Configuration configuration = > _tableContext.getConfiguration(); > > + > > + SequenceFile.Sorter sorter = new Sorter(fileSystem, > Text.class, BytesWritable.class, configuration); > > + > > + _sorted = new Path(path.getParent(), shard + > ".sorted.seq"); > > + > > + LOG.info("Shard [{2}/{3}] Id [{4}] Sorting mutates path > [{0}] sorted path [{1}]", path, _sorted, table, > > + shard, bulkId); > > + sorter.sort(path, _sorted); > > + > > + LOG.info("Shard [{1}/{2}] Id [{3}] Applying mutates > sorted path [{0}]", _sorted, table, shard, bulkId); > > + Reader reader = new SequenceFile.Reader(fileSystem, > _sorted, configuration); > > + > > + Text key = new Text(); > > + BytesWritable value = new BytesWritable(); > > + > > + Text last = null; > > + List<RowMutation> list = new ArrayList<RowMutation>(); > > + while (reader.next(key, value)) { > > + if (!key.equals(last)) { > > + flushMutates(searcher, writer, list); > > + last = new Text(key); > > + list.clear(); > > + } > > + list.add(fromBytesWritable(value)); > > + } > > + flushMutates(searcher, writer, list); > > + reader.close(); > > + LOG.info("Shard [{0}/{1}] Id [{2}] Finished applying > mutates starting commit.", table, shard, bulkId); > > + } > > + > > + private void flushMutates(IndexSearcherClosable searcher, > IndexWriter writer, List<RowMutation> list) > > + throws IOException { > > + if (!list.isEmpty()) { > > + List<RowMutation> reduceMutates; > > + try { > > + reduceMutates = MutatableAction.reduceMutates(list); > > + } catch (BlurException e) { > > + throw new IOException(e); > > + } > > + for (RowMutation mutation : reduceMutates) { > > + MutatableAction mutatableAction = new > MutatableAction(_shardContext); > > + mutatableAction.mutate(mutation); > > + mutatableAction.performMutate(searcher, writer); > > + } > > + } > > + } > > + > > + private void cleanupFiles() throws IOException { > > + fileSystem.delete(path, false); > > + fileSystem.delete(_sorted, false); > > + Path parent = path.getParent(); > > + removeParentIfLastFile(fileSystem, parent); > > + } > > + > > + @Override > > + public void doPreRollback(IndexWriter writer) throws > IOException { > > + > > + } > > + > > + @Override > > + public void doPreCommit(IndexSearcherClosable > indexSearcher, IndexWriter writer) throws IOException { > > + > > + } > > + > > + @Override > > + public void doPostRollback(IndexWriter writer) throws > IOException { > > + cleanupFiles(); > > + } > > + > > + @Override > > + public void doPostCommit(IndexWriter writer) throws > IOException { > > + cleanupFiles(); > > + } > > + }); > > + } catch (IOException e) { > > + LOG.error("Shard [{0}/{1}] Id [{2}] Unknown error while > trying to finish the bulk updates.", table, shard, > > + bulkId); > > + } > > + } > > + }; > > + if (blockUntilComplete) { > > + runnable.run(); > > + } else { > > + Thread thread = new Thread(runnable); > > + thread.setName("Bulk Finishing Thread Table [" + table + "] > Shard [" + shard + "] BulkId [" + bulkId + "]"); > > + thread.start(); > > + } > > + } > > + } > > + > > + @Override > > + public void addBulkMutate(String bulkId, RowMutation mutation) throws > IOException { > > + BulkEntry bulkEntry = _bulkWriters.get(bulkId); > > + if (bulkEntry == null) { > > + throw new IOException("Bulk writer for [" + bulkId + "] not > found."); > > + } > > + bulkEntry._writer.append(getKey(mutation), > toBytesWritable(mutation)); > > + } > > + > > + private Text getKey(RowMutation mutation) { > > + return new Text(mutation.getRowId()); > > + } > > + > > + private BytesWritable toBytesWritable(RowMutation mutation) { > > + BytesWritable value = new BytesWritable(); > > + byte[] bytes = BlurUtil.toBytes(mutation); > > + value.set(bytes, 0, bytes.length); > > + return value; > > + } > > + > > + private RowMutation fromBytesWritable(BytesWritable value) { > > + return (RowMutation) BlurUtil.fromBytes(value.getBytes(), 0, > value.getLength()); > > + } > > + > > + private static void removeParentIfLastFile(final FileSystem > fileSystem, Path parent) throws IOException { > > + FileStatus[] listStatus = fileSystem.listStatus(parent); > > + if (listStatus != null) { > > + if (listStatus.length == 0) { > > + if (!fileSystem.delete(parent, false)) { > > + if (fileSystem.exists(parent)) { > > + LOG.error("Could not remove parent directory [{0}]", > parent); > > + } > > + } > > + } > > + } > > + } > > + > > } > > > > > http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/89945db1/blur-core/src/main/java/org/apache/blur/server/FilteredBlurServer.java > > ---------------------------------------------------------------------- > > diff --git > a/blur-core/src/main/java/org/apache/blur/server/FilteredBlurServer.java > b/blur-core/src/main/java/org/apache/blur/server/FilteredBlurServer.java > > index d561560..47a9376 100644 > > --- > a/blur-core/src/main/java/org/apache/blur/server/FilteredBlurServer.java > > +++ > b/blur-core/src/main/java/org/apache/blur/server/FilteredBlurServer.java > > @@ -291,4 +291,19 @@ public class FilteredBlurServer implements Iface { > > _iface.loadData(table, location); > > } > > > > + @Override > > + public void bulkMutateStart(String table, String bulkId) throws > BlurException, TException { > > + _iface.bulkMutateStart(table, bulkId); > > + } > > + > > + @Override > > + public void bulkMutateAdd(String table, String bulkId, RowMutation > rowMutation) throws BlurException, TException { > > + _iface.bulkMutateAdd(table, bulkId, rowMutation); > > + } > > + > > + @Override > > + public void bulkMutateFinish(String table, String bulkId, boolean > apply, boolean blockUntilComplete) throws BlurException, TException { > > + _iface.bulkMutateFinish(table, bulkId, apply, blockUntilComplete); > > + } > > + > > } > > > > > http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/89945db1/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java > > ---------------------------------------------------------------------- > > diff --git > a/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java > b/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java > > index 23f719c..4cd2327 100644 > > --- > a/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java > > +++ > b/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java > > @@ -1673,4 +1673,79 @@ public class BlurControllerServer extends > TableAdmin implements Iface { > > throw new BException("Not Implemented"); > > } > > > > + @Override > > + public void bulkMutateStart(final String table, final String bulkId) > throws BlurException, TException { > > + String cluster = getCluster(table); > > + try { > > + scatter(cluster, new BlurCommand<Void>() { > > + @Override > > + public Void call(Client client) throws BlurException, > TException { > > + client.bulkMutateStart(table, bulkId); > > + return null; > > + } > > + }); > > + } catch (Exception e) { > > + LOG.error("Unknown error while trying to get start a bulk mutate > [{0}] [{1}]", e, table, bulkId); > > + if (e instanceof BlurException) { > > + throw (BlurException) e; > > + } > > + throw new BException(e.getMessage(), e); > > + } > > + } > > + > > + @Override > > + public void bulkMutateAdd(final String table, final String bulkId, > final RowMutation mutation) throws BlurException, TException { > > + try { > > + checkTable(mutation.table); > > + checkForUpdates(mutation.table); > > + MutationHelper.validateMutation(mutation); > > + if (!table.equals(mutation.getTable())) { > > + throw new BException("RowMutation table [{0}] has to match > method table [{1}]", mutation.getTable(), table); > > + } > > + > > + int numberOfShards = getShardCount(table); > > + Map<String, String> tableLayout = getTableLayout(table); > > + if (tableLayout.size() != numberOfShards) { > > + throw new BException("Cannot update data while shard is > missing"); > > + } > > + > > + String shardName = MutationHelper.getShardName(table, > mutation.rowId, numberOfShards, _blurPartitioner); > > + String node = tableLayout.get(shardName); > > + _client.execute(node, new BlurCommand<Void>() { > > + @Override > > + public Void call(Client client) throws BlurException, > TException { > > + client.bulkMutateAdd(table, bulkId, mutation); > > + return null; > > + } > > + }, _maxMutateRetries, _mutateDelay, _maxMutateDelay); > > + } catch (Exception e) { > > + LOG.error("Unknown error during bulk mutation of [{0}]", e, > mutation); > > + if (e instanceof BlurException) { > > + throw (BlurException) e; > > + } > > + throw new BException("Unknown error during bulk mutation of > [{0}]", e, mutation); > > + } > > + } > > + > > + @Override > > + public void bulkMutateFinish(final String table, final String bulkId, > final boolean apply, final boolean blockUntilComplete) throws BlurException, > > + TException { > > + String cluster = getCluster(table); > > + try { > > + scatter(cluster, new BlurCommand<Void>() { > > + @Override > > + public Void call(Client client) throws BlurException, > TException { > > + client.bulkMutateFinish(table, bulkId, apply, > blockUntilComplete); > > + return null; > > + } > > + }); > > + } catch (Exception e) { > > + LOG.error("Unknown error while trying to get finish a bulk mutate > [{0}] [{1}]", e, table, bulkId); > > + if (e instanceof BlurException) { > > + throw (BlurException) e; > > + } > > + throw new BException(e.getMessage(), e); > > + } > > + } > > + > > } > > > > > http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/89945db1/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java > > ---------------------------------------------------------------------- > > diff --git > a/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java > b/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java > > index 135feaf..976896c 100644 > > --- a/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java > > +++ b/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java > > @@ -695,4 +695,43 @@ public class BlurShardServer extends TableAdmin > implements Iface { > > return CommandStatusStateEnum.valueOf(state.name()); > > } > > > > + @Override > > + public void bulkMutateStart(String table, String bulkId) throws > BlurException, TException { > > + try { > > + _indexManager.bulkMutateStart(table, bulkId); > > + } catch (Exception e) { > > + LOG.error("Unknown error while trying to start a bulk mutate on > table [" + table + "]", e); > > + if (e instanceof BlurException) { > > + throw (BlurException) e; > > + } > > + throw new BException(e.getMessage(), e); > > + } > > + } > > + > > + @Override > > + public void bulkMutateAdd(String table, String bulkId, RowMutation > rowMutation) throws BlurException, TException { > > + try { > > + _indexManager.bulkMutateAdd(table, bulkId, rowMutation); > > + } catch (Exception e) { > > + LOG.error("Unknown error while trying to add to a bulk mutate on > table [" + table + "]", e); > > + if (e instanceof BlurException) { > > + throw (BlurException) e; > > + } > > + throw new BException(e.getMessage(), e); > > + } > > + } > > + > > + @Override > > + public void bulkMutateFinish(String table, String bulkId, boolean > apply, boolean blockUntilComplete) throws BlurException, TException { > > + try { > > + _indexManager.bulkMutateFinish(table, bulkId, > apply,blockUntilComplete); > > + } catch (Exception e) { > > + LOG.error("Unknown error while trying to finsh a bulk mutate on > table [" + table + "]", e); > > + if (e instanceof BlurException) { > > + throw (BlurException) e; > > + } > > + throw new BException(e.getMessage(), e); > > + } > > + } > > + > > } > > > > > http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/89945db1/blur-core/src/test/java/org/apache/blur/command/ShardCommandManagerTest.java > > ---------------------------------------------------------------------- > > diff --git > a/blur-core/src/test/java/org/apache/blur/command/ShardCommandManagerTest.java > b/blur-core/src/test/java/org/apache/blur/command/ShardCommandManagerTest.java > > index d2b4eed..673838b 100644 > > --- > a/blur-core/src/test/java/org/apache/blur/command/ShardCommandManagerTest.java > > +++ > b/blur-core/src/test/java/org/apache/blur/command/ShardCommandManagerTest.java > > @@ -430,6 +430,21 @@ public class ShardCommandManagerTest { > > public void close() throws IOException { > > throw new RuntimeException("Not implemented."); > > } > > + > > + @Override > > + public void startBulkMutate(String bulkId) throws IOException { > > + throw new RuntimeException("Not implemented."); > > + } > > + > > + @Override > > + public void finishBulkMutate(String bulkId, boolean apply, > boolean blockUntilComplete) throws IOException { > > + throw new RuntimeException("Not implemented."); > > + } > > + > > + @Override > > + public void addBulkMutate(String bulkId, RowMutation mutation) > throws IOException { > > + throw new RuntimeException("Not implemented."); > > + } > > }; > > } > > > > > > > http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/89945db1/blur-core/src/test/java/org/apache/blur/manager/writer/IndexImporterTest.java > > ---------------------------------------------------------------------- > > diff --git > a/blur-core/src/test/java/org/apache/blur/manager/writer/IndexImporterTest.java > b/blur-core/src/test/java/org/apache/blur/manager/writer/IndexImporterTest.java > > index 28b0d21..9e83196 100644 > > --- > a/blur-core/src/test/java/org/apache/blur/manager/writer/IndexImporterTest.java > > +++ > b/blur-core/src/test/java/org/apache/blur/manager/writer/IndexImporterTest.java > > @@ -197,6 +197,21 @@ public class IndexImporterTest { > > public void enqueue(List<RowMutation> mutations) { > > throw new RuntimeException("Not Implemented"); > > } > > + > > + @Override > > + public void startBulkMutate(String bulkId) throws IOException { > > + throw new RuntimeException("Not implemented."); > > + } > > + > > + @Override > > + public void finishBulkMutate(String bulkId, boolean apply, > boolean blockUntilComplete) throws IOException { > > + throw new RuntimeException("Not implemented."); > > + } > > + > > + @Override > > + public void addBulkMutate(String bulkId, RowMutation mutation) > throws IOException { > > + throw new RuntimeException("Not implemented."); > > + } > > }; > > } > > > > >
