On Wed, Feb 11, 2015 at 9:19 AM, <[email protected]> wrote: > Repository: incubator-blur > Updated Branches: > refs/heads/master 1f61fff25 -> 45c9d4cdb > > > Fixing issue with bulk mutate where writers may become idle and later they > fail because of lease exception issues. > > > Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo > Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/9714a1de > Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/9714a1de > Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/9714a1de > > Branch: refs/heads/master > Commit: 9714a1ded7788a008640a0ec6c0bfda585960c96 > Parents: 1f61fff > Author: Aaron McCurry <[email protected]> > Authored: Wed Feb 11 08:50:49 2015 -0500 > Committer: Aaron McCurry <[email protected]> > Committed: Wed Feb 11 08:50:49 2015 -0500 > > ---------------------------------------------------------------------- > .../indexserver/DistributedIndexServer.java | 8 +- > .../manager/indexserver/LocalIndexServer.java | 4 +- > .../apache/blur/manager/writer/BlurIndex.java | 2 +- > .../blur/manager/writer/BlurIndexReadOnly.java | 2 +- > .../manager/writer/BlurIndexSimpleWriter.java | 461 ++++++++++++------- > .../org/apache/blur/server/TableContext.java | 9 +- > .../blur/thrift/ThriftBlurShardServer.java | 12 +- > .../blur/command/ShardCommandManagerTest.java | 2 +- > .../writer/BlurIndexSimpleWriterTest.java | 14 +- > .../blur/manager/writer/IndexImporterTest.java | 2 +- > .../apache/blur/thrift/BlurClusterTestBase.java | 2 +- > 11 files changed, 326 insertions(+), 192 deletions(-) > ---------------------------------------------------------------------- > > > http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/9714a1de/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java > ---------------------------------------------------------------------- > diff --git > a/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java > > b/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java > index 79a5b3c..324090f 100644 > --- > a/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java > +++ > b/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java > @@ -116,15 +116,17 @@ public class DistributedIndexServer extends > AbstractDistributedIndexServer { > private final int _minimumNumberOfNodes; > private final Timer _hdfsKeyValueTimer; > private final Timer _indexImporterTimer; > + private final Timer _indexBulkTimer; > > public DistributedIndexServer(Configuration configuration, ZooKeeper > zookeeper, ClusterStatus clusterStatus, > BlurFilterCache filterCache, BlockCacheDirectoryFactory > blockCacheDirectoryFactory, > DistributedLayoutFactory distributedLayoutFactory, String cluster, > String nodeName, long safeModeDelay, > int shardOpenerThreadCount, int maxMergeThreads, int > internalSearchThreads, > - int minimumNumberOfNodesBeforeExitingSafeMode, Timer > hdfsKeyValueTimer, Timer indexImporterTimer, long smallMergeThreshold) > - throws KeeperException, InterruptedException { > + int minimumNumberOfNodesBeforeExitingSafeMode, Timer > hdfsKeyValueTimer, Timer indexImporterTimer, > + long smallMergeThreshold, Timer indexBulkTimer) throws > KeeperException, InterruptedException { > super(clusterStatus, configuration, nodeName, cluster); > _indexImporterTimer = indexImporterTimer; > + _indexBulkTimer = indexBulkTimer; > _hdfsKeyValueTimer = hdfsKeyValueTimer; > _minimumNumberOfNodes = minimumNumberOfNodesBeforeExitingSafeMode; > _running.set(true); > @@ -520,7 +522,7 @@ public class DistributedIndexServer extends > AbstractDistributedIndexServer { > } > > BlurIndex index = tableContext.newInstanceBlurIndex(shardContext, > directory, _mergeScheduler, _searchExecutor, > - _indexCloser, _indexImporterTimer); > + _indexCloser, _indexImporterTimer, _indexBulkTimer); > > if (_clusterStatus.isReadOnly(true, _cluster, table)) { > index = new BlurIndexReadOnly(index); > > http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/9714a1de/blur-core/src/main/java/org/apache/blur/manager/indexserver/LocalIndexServer.java > ---------------------------------------------------------------------- > diff --git > a/blur-core/src/main/java/org/apache/blur/manager/indexserver/LocalIndexServer.java > > b/blur-core/src/main/java/org/apache/blur/manager/indexserver/LocalIndexServer.java > index 54a06f6..a8cbb23 100644 > --- > a/blur-core/src/main/java/org/apache/blur/manager/indexserver/LocalIndexServer.java > +++ > b/blur-core/src/main/java/org/apache/blur/manager/indexserver/LocalIndexServer.java > @@ -68,6 +68,7 @@ public class LocalIndexServer extends AbstractIndexServer { > private final boolean _ramDir; > private final BlurIndexCloser _indexCloser; > private final Timer _timer; > + private final Timer _bulkTimer; > > public LocalIndexServer(TableDescriptor tableDescriptor) throws > IOException { > this(tableDescriptor, false); > @@ -75,6 +76,7 @@ public class LocalIndexServer extends AbstractIndexServer { > > public LocalIndexServer(TableDescriptor tableDescriptor, boolean ramDir) > throws IOException { > _timer = new Timer("Index Importer", true); > + _bulkTimer = new Timer("Bulk Indexing", true); > _closer = Closer.create(); > _tableContext = TableContext.create(tableDescriptor); > _mergeScheduler = _closer.register(new SharedMergeScheduler(3, 128 * > 1000 * 1000)); > @@ -166,7 +168,7 @@ public class LocalIndexServer extends AbstractIndexServer > { > private BlurIndex openIndex(String table, String shard, Directory dir) > throws CorruptIndexException, IOException { > ShardContext shardContext = ShardContext.create(_tableContext, shard); > BlurIndexSimpleWriter index = new BlurIndexSimpleWriter(shardContext, > dir, _mergeScheduler, _searchExecutor, > - _indexCloser, _timer); > + _indexCloser, _timer, _bulkTimer); > return index; > } > > > http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/9714a1de/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndex.java > ---------------------------------------------------------------------- > diff --git > a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndex.java > b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndex.java > index 192b6fa..dd92fc7 100644 > --- a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndex.java > +++ b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndex.java > @@ -41,7 +41,7 @@ public abstract class BlurIndex { > protected ShardContext _shardContext; > > public BlurIndex(ShardContext shardContext, Directory directory, > SharedMergeScheduler mergeScheduler, > - ExecutorService searchExecutor, BlurIndexCloser indexCloser, Timer > indexImporterTimer) throws IOException { > + ExecutorService searchExecutor, BlurIndexCloser indexCloser, Timer > indexImporterTimer, Timer bulkIndexingTimer) throws IOException { > _shardContext = shardContext; > } > > > http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/9714a1de/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReadOnly.java > ---------------------------------------------------------------------- > diff --git > a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReadOnly.java > > b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReadOnly.java > index 9f1ce8a..4145a3d 100644 > --- > a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReadOnly.java > +++ > b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReadOnly.java > @@ -28,7 +28,7 @@ public class BlurIndexReadOnly extends BlurIndex { > private final BlurIndex _blurIndex; > > public BlurIndexReadOnly(BlurIndex blurIndex) throws IOException { > - super(null, null, null, null, null, null); > + super(null, null, null, null, null, null, null); > _blurIndex = blurIndex; > } > > > http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/9714a1de/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java > ---------------------------------------------------------------------- > diff --git > a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java > > b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java > index 0e8d6c5..608adba 100644 > --- > a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java > +++ > b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java > @@ -21,6 +21,7 @@ import static > org.apache.blur.utils.BlurConstants.ACL_DISCOVER; > import static org.apache.blur.utils.BlurConstants.ACL_READ; > import static > org.apache.blur.utils.BlurConstants.BLUR_SHARD_QUEUE_MAX_INMEMORY_LENGTH; > > +import java.io.Closeable; > import java.io.IOException; > import java.lang.reflect.Method; > import java.util.ArrayList; > @@ -31,6 +32,7 @@ import java.util.List; > import java.util.Map; > import java.util.Set; > import java.util.Timer; > +import java.util.TimerTask; > import java.util.concurrent.ArrayBlockingQueue; > import java.util.concurrent.BlockingQueue; > import java.util.concurrent.ConcurrentHashMap; > @@ -71,6 +73,7 @@ import org.apache.hadoop.conf.Configuration; > import org.apache.hadoop.fs.FileStatus; > import org.apache.hadoop.fs.FileSystem; > import org.apache.hadoop.fs.Path; > +import org.apache.hadoop.fs.PathFilter; > import org.apache.hadoop.io.IOUtils; > import org.apache.hadoop.io.SequenceFile; > import org.apache.hadoop.io.SequenceFile.CompressionType; > @@ -130,17 +133,21 @@ public class BlurIndexSimpleWriter extends BlurIndex { > private final AccessControlFactory _accessControlFactory; > private final Set<String> _discoverableFields; > private final Splitter _commaSplitter; > + private final Timer _bulkIndexingTimer; > + private final TimerTask _watchForIdleBulkWriters; > > private Thread _optimizeThread; > private Thread _writerOpener; > private IndexImporter _indexImporter; > > public BlurIndexSimpleWriter(ShardContext shardContext, Directory > directory, SharedMergeScheduler mergeScheduler, > - final ExecutorService searchExecutor, BlurIndexCloser indexCloser, > Timer indexImporterTimer) throws IOException { > - super(shardContext, directory, mergeScheduler, searchExecutor, > indexCloser, indexImporterTimer); > + final ExecutorService searchExecutor, BlurIndexCloser indexCloser, > Timer indexImporterTimer, > + Timer bulkIndexingTimer) throws IOException { > + super(shardContext, directory, mergeScheduler, searchExecutor, > indexCloser, indexImporterTimer, bulkIndexingTimer); > _commaSplitter = Splitter.on(','); > _bulkWriters = new ConcurrentHashMap<String, > BlurIndexSimpleWriter.BulkEntry>(); > _indexImporterTimer = indexImporterTimer; > + _bulkIndexingTimer = bulkIndexingTimer; > _searchThreadPool = searchExecutor; > _shardContext = shardContext; > _tableContext = _shardContext.getTableContext(); > @@ -189,6 +196,28 @@ public class BlurIndexSimpleWriter extends BlurIndex { > _indexReader.set(wrap(DirectoryReader.open(_directory))); > > openWriter(); > + _watchForIdleBulkWriters = new TimerTask() { > + @Override > + public void run() { > + for (BulkEntry bulkEntry : _bulkWriters.values()) { > + bulkEntry._lock.lock(); > + try { > + if (!bulkEntry.isClosed() && bulkEntry.isIdle()) { > + LOG.info("Bulk Entry [{0}] has become idle and now closing.", > bulkEntry); > + try { > + bulkEntry.close(); > + } catch (IOException e) { > + LOG.error("Unkown error while trying to close bulk writer > when it became idle.", e); > + } > + } > + } finally { > + bulkEntry._lock.unlock(); > + } > + } > + } > + }; > + long delay = TimeUnit.SECONDS.toMillis(30); > + _bulkIndexingTimer.schedule(_watchForIdleBulkWriters, delay, delay); > } > > private synchronized void openWriter() { > @@ -355,7 +384,17 @@ public class BlurIndexSimpleWriter extends BlurIndex { > @Override > public void close() throws IOException { > _isClosed.set(true); > - IOUtils.cleanup(LOG, _indexImporter, _mutationQueueProcessor, > _writer.get(), _indexReader.get()); > + IOUtils.cleanup(LOG, makeCloseable(_watchForIdleBulkWriters), > _indexImporter, _mutationQueueProcessor, > + _writer.get(), _indexReader.get()); > + } > + > + private Closeable makeCloseable(final TimerTask timerTask) { > + return new Closeable() { > + @Override > + public void close() throws IOException { > + timerTask.cancel(); > + } > + }; > } > > @Override > @@ -507,25 +546,38 @@ public class BlurIndexSimpleWriter extends BlurIndex { > } > > static class BulkEntry { > - final SequenceFile.Writer _writer; > - final Path _path; > > - BulkEntry(Writer writer, Path path) { > - _writer = writer; > - _path = path; > + private final long _idleTime = TimeUnit.SECONDS.toNanos(30); > + private final Path _parentPath; > + private final String _bulkId; > + private final TableContext _tableContext; > + private final ShardContext _shardContext; > + private final Configuration _configuration; > + private final FileSystem _fileSystem; > + private final String _table; > + private final String _shard; > + private final Lock _lock = new ReentrantReadWriteLock().writeLock(); > + > + private volatile SequenceFile.Writer _writer; > + private volatile long _lastWrite; > + private volatile int _count = 0; > + > + public BulkEntry(String bulkId, Path parentPath, ShardContext > shardContext) throws IOException { > + _bulkId = bulkId; > + _parentPath = parentPath; > + _shardContext = shardContext; > + _tableContext = shardContext.getTableContext(); > + _configuration = _tableContext.getConfiguration(); > + _fileSystem = _parentPath.getFileSystem(_configuration); > + _shard = _shardContext.getShard(); > + _table = _tableContext.getTable(); > } > - } > > - public BulkEntry startBulkMutate(String bulkId) throws IOException { > - BulkEntry bulkEntry = _bulkWriters.get(bulkId); > - if (bulkEntry == null) { > - Path tablePath = _tableContext.getTablePath(); > - Path bulk = new Path(tablePath, "bulk"); > - Path bulkInstance = new Path(bulk, bulkId); > - Path path = new Path(bulkInstance, _shardContext.getShard() + > ".notsorted.seq"); > - Configuration configuration = _tableContext.getConfiguration(); > - FileSystem fileSystem = path.getFileSystem(configuration); > + public boolean isClosed() { > + return _writer == null; > + } > > + private Writer openSeqWriter() throws IOException { > Progressable progress = new Progressable() { > @Override > public void progress() { > @@ -535,7 +587,7 @@ public class BlurIndexSimpleWriter extends BlurIndex { > final CompressionCodec codec; > final CompressionType type; > > - if (isSnappyCodecLoaded(configuration)) { > + if (isSnappyCodecLoaded(_configuration)) { > codec = new SnappyCodec(); > type = CompressionType.BLOCK; > } else { > @@ -543,180 +595,211 @@ public class BlurIndexSimpleWriter extends BlurIndex { > type = CompressionType.NONE; > } > > - Writer writer = SequenceFile.createWriter(fileSystem, configuration, > path, Text.class, RowMutationWritable.class, > - type, codec, progress); > + Path path = new Path(_parentPath, _shard + "." + _count + > ".unsorted.seq"); > > - bulkEntry = new BulkEntry(writer, path); > - _bulkWriters.put(bulkId, bulkEntry); > - } else { > - LOG.info("Bulk [{0}] mutate already started on shard [{1}] in table > [{2}].", bulkId, _shardContext.getShard(), > - _tableContext.getTable()); > + _count++; > + > + return SequenceFile.createWriter(_fileSystem, _configuration, path, > Text.class, RowMutationWritable.class, type, > + codec, progress); > } > - return bulkEntry; > - } > > - private boolean isSnappyCodecLoaded(Configuration configuration) { > - try { > - Method methodHadoop1 = > SnappyCodec.class.getMethod("isNativeSnappyLoaded", new Class[] { > Configuration.class }); > - Boolean loaded = (Boolean) methodHadoop1.invoke(null, new Object[] { > configuration }); > - if (loaded != null && loaded) { > - LOG.info("Using SnappyCodec"); > - return true; > - } else { > - LOG.info("Not using SnappyCodec"); > - return false; > - } > - } catch (NoSuchMethodException e) { > - Method methodHadoop2; > + public void close() throws IOException { > + _lock.lock(); > try { > - methodHadoop2 = SnappyCodec.class.getMethod("isNativeCodeLoaded", > new Class[] {}); > - } catch (NoSuchMethodException ex) { > - LOG.info("Can not determine if SnappyCodec is loaded."); > - return false; > - } catch (SecurityException ex) { > - LOG.error("Not allowed.", ex); > - return false; > + if (_writer != null) { > + _writer.close(); > + _writer = null; > + } > + } finally { > + _lock.unlock(); > } > - Boolean loaded; > + } > + > + public void append(Text key, RowMutationWritable rowMutationWritable) > throws IOException { > + _lock.lock(); > try { > - loaded = (Boolean) methodHadoop2.invoke(null); > - if (loaded != null && loaded) { > - LOG.info("Using SnappyCodec"); > - return true; > - } else { > - LOG.info("Not using SnappyCodec"); > - return false; > - } > - } catch (Exception ex) { > - LOG.info("Unknown error while trying to determine if SnappyCodec is > loaded.", ex); > - return false; > + getWriter().append(key, rowMutationWritable); > + _lastWrite = System.nanoTime(); > + } finally { > + _lock.unlock(); > + } > + } > + > + private SequenceFile.Writer getWriter() throws IOException { > + if (_writer == null) { > + _writer = openSeqWriter(); > + _lastWrite = System.nanoTime(); > + } > + return _writer; > + } > + > + public boolean isIdle() { > + if (_lastWrite + _idleTime < System.nanoTime()) { > + return true; > } > - } catch (SecurityException e) { > - LOG.error("Not allowed.", e); > - return false; > - } catch (Exception e) { > - LOG.info("Unknown error while trying to determine if SnappyCodec is > loaded.", e); > return false; > } > - } > > - @Override > - public void finishBulkMutate(final String bulkId, boolean apply, boolean > blockUntilComplete) throws IOException { > - final String table = _tableContext.getTable(); > - final String shard = _shardContext.getShard(); > + public List<Path> getUnsortedFiles() throws IOException { > + FileStatus[] listStatus = _fileSystem.listStatus(_parentPath, new > PathFilter() { > + @Override > + public boolean accept(Path path) { > + return path.getName().matches(_shard + > "\\.[0-9].*\\.unsorted\\.seq"); > + } > + }); > > - final BulkEntry bulkEntry = _bulkWriters.get(bulkId); > - if (bulkEntry == null) { > - LOG.info("Shard [{2}/{3}] Id [{0}] Nothing to apply.", bulkId, apply, > table, shard); > - return; > + List<Path> unsortedPaths = new ArrayList<Path>(); > + for (FileStatus fileStatus : listStatus) { > + unsortedPaths.add(fileStatus.getPath()); > + } > + return unsortedPaths; > } > - LOG.info("Shard [{2}/{3}] Id [{0}] Finishing bulk mutate apply [{1}]", > bulkId, apply, table, shard); > - bulkEntry._writer.close(); > > - Configuration configuration = _tableContext.getConfiguration(); > - final Path path = bulkEntry._path; > - final FileSystem fileSystem = path.getFileSystem(configuration); > + public void cleanupFiles(List<Path> unsortedPaths, Path sorted) throws > IOException { > + for (Path p : unsortedPaths) { > + _fileSystem.delete(p, false); > + } > + if (sorted != null) { > + _fileSystem.delete(sorted, false); > + } > + removeParentIfLastFile(_fileSystem, _parentPath); > + } > + > + public IndexAction getIndexAction() throws IOException { > + return new IndexAction() { > + private Path _sorted; > + private List<Path> _unsortedPaths; > > - if (!apply) { > - fileSystem.delete(path, false); > - Path parent = path.getParent(); > - removeParentIfLastFile(fileSystem, parent); > - } else { > - Runnable runnable = new Runnable() { > @Override > - public void run() { > - try { > - process(new IndexAction() { > - private Path _sorted; > - > - @Override > - public void performMutate(IndexSearcherCloseable searcher, > IndexWriter writer) throws IOException { > - Configuration configuration = > _tableContext.getConfiguration(); > - > - SequenceFile.Sorter sorter = new Sorter(fileSystem, > Text.class, RowMutationWritable.class, > - configuration); > - > - _sorted = new Path(path.getParent(), shard + ".sorted.seq"); > - > - LOG.info("Shard [{2}/{3}] Id [{4}] Sorting mutates path > [{0}] sorted path [{1}]", path, _sorted, table, > - shard, bulkId); > - sorter.sort(path, _sorted); > - > - LOG.info("Shard [{1}/{2}] Id [{3}] Applying mutates sorted > path [{0}]", _sorted, table, shard, bulkId); > - Reader reader = new SequenceFile.Reader(fileSystem, _sorted, > configuration); > - > - Text key = new Text(); > - RowMutationWritable value = new RowMutationWritable(); > - > - Text last = null; > - List<RowMutation> list = new ArrayList<RowMutation>(); > - while (reader.next(key, value)) { > - if (!key.equals(last)) { > - flushMutates(searcher, writer, list); > - last = new Text(key); > - list.clear(); > - } > - list.add(value.getRowMutation().deepCopy()); > - } > - flushMutates(searcher, writer, list); > - reader.close(); > - LOG.info("Shard [{0}/{1}] Id [{2}] Finished applying mutates > starting commit.", table, shard, bulkId); > - } > + public void performMutate(IndexSearcherCloseable searcher, > IndexWriter writer) throws IOException { > + Configuration configuration = _tableContext.getConfiguration(); > > - private void flushMutates(IndexSearcherCloseable searcher, > IndexWriter writer, List<RowMutation> list) > - throws IOException { > - if (!list.isEmpty()) { > - List<RowMutation> reduceMutates; > - try { > - reduceMutates = MutatableAction.reduceMutates(list); > - } catch (BlurException e) { > - throw new IOException(e); > - } > - for (RowMutation mutation : reduceMutates) { > - MutatableAction mutatableAction = new > MutatableAction(_shardContext); > - mutatableAction.mutate(mutation); > - mutatableAction.performMutate(searcher, writer); > - } > - } > - } > + SequenceFile.Sorter sorter = new Sorter(_fileSystem, Text.class, > RowMutationWritable.class, configuration); > > - private void cleanupFiles() throws IOException { > - fileSystem.delete(path, false); > - fileSystem.delete(_sorted, false); > - Path parent = path.getParent(); > - removeParentIfLastFile(fileSystem, parent); > - } > + _unsortedPaths = getUnsortedFiles(); > > - @Override > - public void doPreRollback(IndexWriter writer) throws > IOException { > + _sorted = new Path(_parentPath, _shard + ".sorted.seq"); > > - } > + LOG.info("Shard [{2}/{3}] Id [{4}] Sorting mutates paths [{0}] > sorted path [{1}]", _unsortedPaths, _sorted, > + _table, _shard, _bulkId); > + sorter.sort(_unsortedPaths.toArray(new > Path[_unsortedPaths.size()]), _sorted, true); > > - @Override > - public void doPreCommit(IndexSearcherCloseable indexSearcher, > IndexWriter writer) throws IOException { > + LOG.info("Shard [{1}/{2}] Id [{3}] Applying mutates sorted path > [{0}]", _sorted, _table, _shard, _bulkId); > + Reader reader = new SequenceFile.Reader(_fileSystem, _sorted, > configuration); > > - } > + Text key = new Text(); > + RowMutationWritable value = new RowMutationWritable(); > > - @Override > - public void doPostRollback(IndexWriter writer) throws > IOException { > - cleanupFiles(); > - } > + Text last = null; > + List<RowMutation> list = new ArrayList<RowMutation>(); > + while (reader.next(key, value)) { > + if (!key.equals(last)) { > + flushMutates(searcher, writer, list); > + last = new Text(key); > + list.clear(); > + } > + list.add(value.getRowMutation().deepCopy()); > + } > + flushMutates(searcher, writer, list); > + reader.close(); > + LOG.info("Shard [{0}/{1}] Id [{2}] Finished applying mutates > starting commit.", _table, _shard, _bulkId); > + } > > - @Override > - public void doPostCommit(IndexWriter writer) throws > IOException { > - cleanupFiles(); > - } > - }); > - } catch (IOException e) { > - LOG.error("Shard [{0}/{1}] Id [{2}] Unknown error while trying > to finish the bulk updates.", table, shard, > - bulkId, e); > + private void flushMutates(IndexSearcherCloseable searcher, > IndexWriter writer, List<RowMutation> list) > + throws IOException { > + if (!list.isEmpty()) { > + List<RowMutation> reduceMutates; > + try { > + reduceMutates = MutatableAction.reduceMutates(list); > + } catch (BlurException e) { > + throw new IOException(e); > + } > + for (RowMutation mutation : reduceMutates) { > + MutatableAction mutatableAction = new > MutatableAction(_shardContext); > + mutatableAction.mutate(mutation); > + mutatableAction.performMutate(searcher, writer); > + } > } > } > + > + @Override > + public void doPreRollback(IndexWriter writer) throws IOException { > + > + } > + > + @Override > + public void doPreCommit(IndexSearcherCloseable indexSearcher, > IndexWriter writer) throws IOException { > + > + } > + > + @Override > + public void doPostRollback(IndexWriter writer) throws IOException { > + cleanupFiles(_unsortedPaths, _sorted); > + } > + > + @Override > + public void doPostCommit(IndexWriter writer) throws IOException { > + cleanupFiles(_unsortedPaths, _sorted); > + } > }; > + } > + > + @Override > + public String toString() { > + return "BulkEntry [_bulkId=" + _bulkId + ", _table=" + _table + ", > _shard=" + _shard + ", _idleTime=" + _idleTime > + + ", _lastWrite=" + _lastWrite + ", _count=" + _count + "]"; > + } > + > + } > + > + public synchronized BulkEntry startBulkMutate(String bulkId) throws > IOException { > + BulkEntry bulkEntry = _bulkWriters.get(bulkId); > + if (bulkEntry == null) { > + Path tablePath = _tableContext.getTablePath(); > + Path bulk = new Path(tablePath, "bulk"); > + Path bulkInstance = new Path(bulk, bulkId); > + Path path = new Path(bulkInstance, _shardContext.getShard() + > ".notsorted.seq"); > + > + bulkEntry = new BulkEntry(bulkId, path, _shardContext); > + _bulkWriters.put(bulkId, bulkEntry); > + } else { > + LOG.info("Bulk [{0}] mutate already started on shard [{1}] in table > [{2}].", bulkId, _shardContext.getShard(), > + _tableContext.getTable()); > + } > + return bulkEntry; > + } > + > + @Override > + public void finishBulkMutate(final String bulkId, boolean apply, boolean > blockUntilComplete) throws IOException { > + final String table = _tableContext.getTable(); > + final String shard = _shardContext.getShard(); > + > + final BulkEntry bulkEntry = _bulkWriters.get(bulkId); > + if (bulkEntry == null) { > + LOG.info("Shard [{2}/{3}] Id [{0}] Nothing to apply.", bulkId, apply, > table, shard); > + return; > + } > + LOG.info("Shard [{2}/{3}] Id [{0}] Finishing bulk mutate apply [{1}]", > bulkId, apply, table, shard); > + bulkEntry.close(); > + > + if (!apply) { > + bulkEntry.cleanupFiles(bulkEntry.getUnsortedFiles(), null); > + } else { > + final IndexAction indexAction = bulkEntry.getIndexAction(); > if (blockUntilComplete) { > - runnable.run(); > + process(indexAction); > } else { > - Thread thread = new Thread(runnable); > + Thread thread = new Thread(new Runnable() { > + @Override > + public void run() { > + try { > + process(indexAction); > + } catch (IOException e) { > + LOG.error("Shard [{0}/{1}] Id [{2}] Unknown error while trying > to finish the bulk updates.", table, > + shard, bulkId, e);
I think e is misplaced here? Thanks, --tim
