Yes I think you are correct. On Wed, Feb 11, 2015 at 9:43 AM, Tim Williams <[email protected]> wrote:
> On Wed, Feb 11, 2015 at 9:19 AM, <[email protected]> wrote: > > Repository: incubator-blur > > Updated Branches: > > refs/heads/master 1f61fff25 -> 45c9d4cdb > > > > > > Fixing issue with bulk mutate where writers may become idle and later > they fail because of lease exception issues. > > > > > > Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo > > Commit: > http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/9714a1de > > Tree: > http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/9714a1de > > Diff: > http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/9714a1de > > > > Branch: refs/heads/master > > Commit: 9714a1ded7788a008640a0ec6c0bfda585960c96 > > Parents: 1f61fff > > Author: Aaron McCurry <[email protected]> > > Authored: Wed Feb 11 08:50:49 2015 -0500 > > Committer: Aaron McCurry <[email protected]> > > Committed: Wed Feb 11 08:50:49 2015 -0500 > > > > ---------------------------------------------------------------------- > > .../indexserver/DistributedIndexServer.java | 8 +- > > .../manager/indexserver/LocalIndexServer.java | 4 +- > > .../apache/blur/manager/writer/BlurIndex.java | 2 +- > > .../blur/manager/writer/BlurIndexReadOnly.java | 2 +- > > .../manager/writer/BlurIndexSimpleWriter.java | 461 > ++++++++++++------- > > .../org/apache/blur/server/TableContext.java | 9 +- > > .../blur/thrift/ThriftBlurShardServer.java | 12 +- > > .../blur/command/ShardCommandManagerTest.java | 2 +- > > .../writer/BlurIndexSimpleWriterTest.java | 14 +- > > .../blur/manager/writer/IndexImporterTest.java | 2 +- > > .../apache/blur/thrift/BlurClusterTestBase.java | 2 +- > > 11 files changed, 326 insertions(+), 192 deletions(-) > > ---------------------------------------------------------------------- > > > > > > > http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/9714a1de/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java > > ---------------------------------------------------------------------- > > diff --git > a/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java > b/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java > > index 79a5b3c..324090f 100644 > > --- > a/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java > > +++ > b/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java > > @@ -116,15 +116,17 @@ public class DistributedIndexServer extends > AbstractDistributedIndexServer { > > private final int _minimumNumberOfNodes; > > private final Timer _hdfsKeyValueTimer; > > private final Timer _indexImporterTimer; > > + private final Timer _indexBulkTimer; > > > > public DistributedIndexServer(Configuration configuration, ZooKeeper > zookeeper, ClusterStatus clusterStatus, > > BlurFilterCache filterCache, BlockCacheDirectoryFactory > blockCacheDirectoryFactory, > > DistributedLayoutFactory distributedLayoutFactory, String > cluster, String nodeName, long safeModeDelay, > > int shardOpenerThreadCount, int maxMergeThreads, int > internalSearchThreads, > > - int minimumNumberOfNodesBeforeExitingSafeMode, Timer > hdfsKeyValueTimer, Timer indexImporterTimer, long smallMergeThreshold) > > - throws KeeperException, InterruptedException { > > + int minimumNumberOfNodesBeforeExitingSafeMode, Timer > hdfsKeyValueTimer, Timer indexImporterTimer, > > + long smallMergeThreshold, Timer indexBulkTimer) throws > KeeperException, InterruptedException { > > super(clusterStatus, configuration, nodeName, cluster); > > _indexImporterTimer = indexImporterTimer; > > + _indexBulkTimer = indexBulkTimer; > > _hdfsKeyValueTimer = hdfsKeyValueTimer; > > _minimumNumberOfNodes = minimumNumberOfNodesBeforeExitingSafeMode; > > _running.set(true); > > @@ -520,7 +522,7 @@ public class DistributedIndexServer extends > AbstractDistributedIndexServer { > > } > > > > BlurIndex index = tableContext.newInstanceBlurIndex(shardContext, > directory, _mergeScheduler, _searchExecutor, > > - _indexCloser, _indexImporterTimer); > > + _indexCloser, _indexImporterTimer, _indexBulkTimer); > > > > if (_clusterStatus.isReadOnly(true, _cluster, table)) { > > index = new BlurIndexReadOnly(index); > > > > > http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/9714a1de/blur-core/src/main/java/org/apache/blur/manager/indexserver/LocalIndexServer.java > > ---------------------------------------------------------------------- > > diff --git > a/blur-core/src/main/java/org/apache/blur/manager/indexserver/LocalIndexServer.java > b/blur-core/src/main/java/org/apache/blur/manager/indexserver/LocalIndexServer.java > > index 54a06f6..a8cbb23 100644 > > --- > a/blur-core/src/main/java/org/apache/blur/manager/indexserver/LocalIndexServer.java > > +++ > b/blur-core/src/main/java/org/apache/blur/manager/indexserver/LocalIndexServer.java > > @@ -68,6 +68,7 @@ public class LocalIndexServer extends > AbstractIndexServer { > > private final boolean _ramDir; > > private final BlurIndexCloser _indexCloser; > > private final Timer _timer; > > + private final Timer _bulkTimer; > > > > public LocalIndexServer(TableDescriptor tableDescriptor) throws > IOException { > > this(tableDescriptor, false); > > @@ -75,6 +76,7 @@ public class LocalIndexServer extends > AbstractIndexServer { > > > > public LocalIndexServer(TableDescriptor tableDescriptor, boolean > ramDir) throws IOException { > > _timer = new Timer("Index Importer", true); > > + _bulkTimer = new Timer("Bulk Indexing", true); > > _closer = Closer.create(); > > _tableContext = TableContext.create(tableDescriptor); > > _mergeScheduler = _closer.register(new SharedMergeScheduler(3, 128 > * 1000 * 1000)); > > @@ -166,7 +168,7 @@ public class LocalIndexServer extends > AbstractIndexServer { > > private BlurIndex openIndex(String table, String shard, Directory > dir) throws CorruptIndexException, IOException { > > ShardContext shardContext = ShardContext.create(_tableContext, > shard); > > BlurIndexSimpleWriter index = new > BlurIndexSimpleWriter(shardContext, dir, _mergeScheduler, _searchExecutor, > > - _indexCloser, _timer); > > + _indexCloser, _timer, _bulkTimer); > > return index; > > } > > > > > > > http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/9714a1de/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndex.java > > ---------------------------------------------------------------------- > > diff --git > a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndex.java > b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndex.java > > index 192b6fa..dd92fc7 100644 > > --- > a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndex.java > > +++ > b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndex.java > > @@ -41,7 +41,7 @@ public abstract class BlurIndex { > > protected ShardContext _shardContext; > > > > public BlurIndex(ShardContext shardContext, Directory directory, > SharedMergeScheduler mergeScheduler, > > - ExecutorService searchExecutor, BlurIndexCloser indexCloser, > Timer indexImporterTimer) throws IOException { > > + ExecutorService searchExecutor, BlurIndexCloser indexCloser, > Timer indexImporterTimer, Timer bulkIndexingTimer) throws IOException { > > _shardContext = shardContext; > > } > > > > > > > http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/9714a1de/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReadOnly.java > > ---------------------------------------------------------------------- > > diff --git > a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReadOnly.java > b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReadOnly.java > > index 9f1ce8a..4145a3d 100644 > > --- > a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReadOnly.java > > +++ > b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReadOnly.java > > @@ -28,7 +28,7 @@ public class BlurIndexReadOnly extends BlurIndex { > > private final BlurIndex _blurIndex; > > > > public BlurIndexReadOnly(BlurIndex blurIndex) throws IOException { > > - super(null, null, null, null, null, null); > > + super(null, null, null, null, null, null, null); > > _blurIndex = blurIndex; > > } > > > > > > > http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/9714a1de/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java > > ---------------------------------------------------------------------- > > diff --git > a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java > b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java > > index 0e8d6c5..608adba 100644 > > --- > a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java > > +++ > b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java > > @@ -21,6 +21,7 @@ import static > org.apache.blur.utils.BlurConstants.ACL_DISCOVER; > > import static org.apache.blur.utils.BlurConstants.ACL_READ; > > import static > org.apache.blur.utils.BlurConstants.BLUR_SHARD_QUEUE_MAX_INMEMORY_LENGTH; > > > > +import java.io.Closeable; > > import java.io.IOException; > > import java.lang.reflect.Method; > > import java.util.ArrayList; > > @@ -31,6 +32,7 @@ import java.util.List; > > import java.util.Map; > > import java.util.Set; > > import java.util.Timer; > > +import java.util.TimerTask; > > import java.util.concurrent.ArrayBlockingQueue; > > import java.util.concurrent.BlockingQueue; > > import java.util.concurrent.ConcurrentHashMap; > > @@ -71,6 +73,7 @@ import org.apache.hadoop.conf.Configuration; > > import org.apache.hadoop.fs.FileStatus; > > import org.apache.hadoop.fs.FileSystem; > > import org.apache.hadoop.fs.Path; > > +import org.apache.hadoop.fs.PathFilter; > > import org.apache.hadoop.io.IOUtils; > > import org.apache.hadoop.io.SequenceFile; > > import org.apache.hadoop.io.SequenceFile.CompressionType; > > @@ -130,17 +133,21 @@ public class BlurIndexSimpleWriter extends > BlurIndex { > > private final AccessControlFactory _accessControlFactory; > > private final Set<String> _discoverableFields; > > private final Splitter _commaSplitter; > > + private final Timer _bulkIndexingTimer; > > + private final TimerTask _watchForIdleBulkWriters; > > > > private Thread _optimizeThread; > > private Thread _writerOpener; > > private IndexImporter _indexImporter; > > > > public BlurIndexSimpleWriter(ShardContext shardContext, Directory > directory, SharedMergeScheduler mergeScheduler, > > - final ExecutorService searchExecutor, BlurIndexCloser > indexCloser, Timer indexImporterTimer) throws IOException { > > - super(shardContext, directory, mergeScheduler, searchExecutor, > indexCloser, indexImporterTimer); > > + final ExecutorService searchExecutor, BlurIndexCloser > indexCloser, Timer indexImporterTimer, > > + Timer bulkIndexingTimer) throws IOException { > > + super(shardContext, directory, mergeScheduler, searchExecutor, > indexCloser, indexImporterTimer, bulkIndexingTimer); > > _commaSplitter = Splitter.on(','); > > _bulkWriters = new ConcurrentHashMap<String, > BlurIndexSimpleWriter.BulkEntry>(); > > _indexImporterTimer = indexImporterTimer; > > + _bulkIndexingTimer = bulkIndexingTimer; > > _searchThreadPool = searchExecutor; > > _shardContext = shardContext; > > _tableContext = _shardContext.getTableContext(); > > @@ -189,6 +196,28 @@ public class BlurIndexSimpleWriter extends > BlurIndex { > > _indexReader.set(wrap(DirectoryReader.open(_directory))); > > > > openWriter(); > > + _watchForIdleBulkWriters = new TimerTask() { > > + @Override > > + public void run() { > > + for (BulkEntry bulkEntry : _bulkWriters.values()) { > > + bulkEntry._lock.lock(); > > + try { > > + if (!bulkEntry.isClosed() && bulkEntry.isIdle()) { > > + LOG.info("Bulk Entry [{0}] has become idle and now > closing.", bulkEntry); > > + try { > > + bulkEntry.close(); > > + } catch (IOException e) { > > + LOG.error("Unkown error while trying to close bulk > writer when it became idle.", e); > > + } > > + } > > + } finally { > > + bulkEntry._lock.unlock(); > > + } > > + } > > + } > > + }; > > + long delay = TimeUnit.SECONDS.toMillis(30); > > + _bulkIndexingTimer.schedule(_watchForIdleBulkWriters, delay, delay); > > } > > > > private synchronized void openWriter() { > > @@ -355,7 +384,17 @@ public class BlurIndexSimpleWriter extends > BlurIndex { > > @Override > > public void close() throws IOException { > > _isClosed.set(true); > > - IOUtils.cleanup(LOG, _indexImporter, _mutationQueueProcessor, > _writer.get(), _indexReader.get()); > > + IOUtils.cleanup(LOG, makeCloseable(_watchForIdleBulkWriters), > _indexImporter, _mutationQueueProcessor, > > + _writer.get(), _indexReader.get()); > > + } > > + > > + private Closeable makeCloseable(final TimerTask timerTask) { > > + return new Closeable() { > > + @Override > > + public void close() throws IOException { > > + timerTask.cancel(); > > + } > > + }; > > } > > > > @Override > > @@ -507,25 +546,38 @@ public class BlurIndexSimpleWriter extends > BlurIndex { > > } > > > > static class BulkEntry { > > - final SequenceFile.Writer _writer; > > - final Path _path; > > > > - BulkEntry(Writer writer, Path path) { > > - _writer = writer; > > - _path = path; > > + private final long _idleTime = TimeUnit.SECONDS.toNanos(30); > > + private final Path _parentPath; > > + private final String _bulkId; > > + private final TableContext _tableContext; > > + private final ShardContext _shardContext; > > + private final Configuration _configuration; > > + private final FileSystem _fileSystem; > > + private final String _table; > > + private final String _shard; > > + private final Lock _lock = new ReentrantReadWriteLock().writeLock(); > > + > > + private volatile SequenceFile.Writer _writer; > > + private volatile long _lastWrite; > > + private volatile int _count = 0; > > + > > + public BulkEntry(String bulkId, Path parentPath, ShardContext > shardContext) throws IOException { > > + _bulkId = bulkId; > > + _parentPath = parentPath; > > + _shardContext = shardContext; > > + _tableContext = shardContext.getTableContext(); > > + _configuration = _tableContext.getConfiguration(); > > + _fileSystem = _parentPath.getFileSystem(_configuration); > > + _shard = _shardContext.getShard(); > > + _table = _tableContext.getTable(); > > } > > - } > > > > - public BulkEntry startBulkMutate(String bulkId) throws IOException { > > - BulkEntry bulkEntry = _bulkWriters.get(bulkId); > > - if (bulkEntry == null) { > > - Path tablePath = _tableContext.getTablePath(); > > - Path bulk = new Path(tablePath, "bulk"); > > - Path bulkInstance = new Path(bulk, bulkId); > > - Path path = new Path(bulkInstance, _shardContext.getShard() + > ".notsorted.seq"); > > - Configuration configuration = _tableContext.getConfiguration(); > > - FileSystem fileSystem = path.getFileSystem(configuration); > > + public boolean isClosed() { > > + return _writer == null; > > + } > > > > + private Writer openSeqWriter() throws IOException { > > Progressable progress = new Progressable() { > > @Override > > public void progress() { > > @@ -535,7 +587,7 @@ public class BlurIndexSimpleWriter extends BlurIndex > { > > final CompressionCodec codec; > > final CompressionType type; > > > > - if (isSnappyCodecLoaded(configuration)) { > > + if (isSnappyCodecLoaded(_configuration)) { > > codec = new SnappyCodec(); > > type = CompressionType.BLOCK; > > } else { > > @@ -543,180 +595,211 @@ public class BlurIndexSimpleWriter extends > BlurIndex { > > type = CompressionType.NONE; > > } > > > > - Writer writer = SequenceFile.createWriter(fileSystem, > configuration, path, Text.class, RowMutationWritable.class, > > - type, codec, progress); > > + Path path = new Path(_parentPath, _shard + "." + _count + > ".unsorted.seq"); > > > > - bulkEntry = new BulkEntry(writer, path); > > - _bulkWriters.put(bulkId, bulkEntry); > > - } else { > > - LOG.info("Bulk [{0}] mutate already started on shard [{1}] in > table [{2}].", bulkId, _shardContext.getShard(), > > - _tableContext.getTable()); > > + _count++; > > + > > + return SequenceFile.createWriter(_fileSystem, _configuration, > path, Text.class, RowMutationWritable.class, type, > > + codec, progress); > > } > > - return bulkEntry; > > - } > > > > - private boolean isSnappyCodecLoaded(Configuration configuration) { > > - try { > > - Method methodHadoop1 = > SnappyCodec.class.getMethod("isNativeSnappyLoaded", new Class[] { > Configuration.class }); > > - Boolean loaded = (Boolean) methodHadoop1.invoke(null, new > Object[] { configuration }); > > - if (loaded != null && loaded) { > > - LOG.info("Using SnappyCodec"); > > - return true; > > - } else { > > - LOG.info("Not using SnappyCodec"); > > - return false; > > - } > > - } catch (NoSuchMethodException e) { > > - Method methodHadoop2; > > + public void close() throws IOException { > > + _lock.lock(); > > try { > > - methodHadoop2 = > SnappyCodec.class.getMethod("isNativeCodeLoaded", new Class[] {}); > > - } catch (NoSuchMethodException ex) { > > - LOG.info("Can not determine if SnappyCodec is loaded."); > > - return false; > > - } catch (SecurityException ex) { > > - LOG.error("Not allowed.", ex); > > - return false; > > + if (_writer != null) { > > + _writer.close(); > > + _writer = null; > > + } > > + } finally { > > + _lock.unlock(); > > } > > - Boolean loaded; > > + } > > + > > + public void append(Text key, RowMutationWritable > rowMutationWritable) throws IOException { > > + _lock.lock(); > > try { > > - loaded = (Boolean) methodHadoop2.invoke(null); > > - if (loaded != null && loaded) { > > - LOG.info("Using SnappyCodec"); > > - return true; > > - } else { > > - LOG.info("Not using SnappyCodec"); > > - return false; > > - } > > - } catch (Exception ex) { > > - LOG.info("Unknown error while trying to determine if > SnappyCodec is loaded.", ex); > > - return false; > > + getWriter().append(key, rowMutationWritable); > > + _lastWrite = System.nanoTime(); > > + } finally { > > + _lock.unlock(); > > + } > > + } > > + > > + private SequenceFile.Writer getWriter() throws IOException { > > + if (_writer == null) { > > + _writer = openSeqWriter(); > > + _lastWrite = System.nanoTime(); > > + } > > + return _writer; > > + } > > + > > + public boolean isIdle() { > > + if (_lastWrite + _idleTime < System.nanoTime()) { > > + return true; > > } > > - } catch (SecurityException e) { > > - LOG.error("Not allowed.", e); > > - return false; > > - } catch (Exception e) { > > - LOG.info("Unknown error while trying to determine if SnappyCodec > is loaded.", e); > > return false; > > } > > - } > > > > - @Override > > - public void finishBulkMutate(final String bulkId, boolean apply, > boolean blockUntilComplete) throws IOException { > > - final String table = _tableContext.getTable(); > > - final String shard = _shardContext.getShard(); > > + public List<Path> getUnsortedFiles() throws IOException { > > + FileStatus[] listStatus = _fileSystem.listStatus(_parentPath, new > PathFilter() { > > + @Override > > + public boolean accept(Path path) { > > + return path.getName().matches(_shard + > "\\.[0-9].*\\.unsorted\\.seq"); > > + } > > + }); > > > > - final BulkEntry bulkEntry = _bulkWriters.get(bulkId); > > - if (bulkEntry == null) { > > - LOG.info("Shard [{2}/{3}] Id [{0}] Nothing to apply.", bulkId, > apply, table, shard); > > - return; > > + List<Path> unsortedPaths = new ArrayList<Path>(); > > + for (FileStatus fileStatus : listStatus) { > > + unsortedPaths.add(fileStatus.getPath()); > > + } > > + return unsortedPaths; > > } > > - LOG.info("Shard [{2}/{3}] Id [{0}] Finishing bulk mutate apply > [{1}]", bulkId, apply, table, shard); > > - bulkEntry._writer.close(); > > > > - Configuration configuration = _tableContext.getConfiguration(); > > - final Path path = bulkEntry._path; > > - final FileSystem fileSystem = path.getFileSystem(configuration); > > + public void cleanupFiles(List<Path> unsortedPaths, Path sorted) > throws IOException { > > + for (Path p : unsortedPaths) { > > + _fileSystem.delete(p, false); > > + } > > + if (sorted != null) { > > + _fileSystem.delete(sorted, false); > > + } > > + removeParentIfLastFile(_fileSystem, _parentPath); > > + } > > + > > + public IndexAction getIndexAction() throws IOException { > > + return new IndexAction() { > > + private Path _sorted; > > + private List<Path> _unsortedPaths; > > > > - if (!apply) { > > - fileSystem.delete(path, false); > > - Path parent = path.getParent(); > > - removeParentIfLastFile(fileSystem, parent); > > - } else { > > - Runnable runnable = new Runnable() { > > @Override > > - public void run() { > > - try { > > - process(new IndexAction() { > > - private Path _sorted; > > - > > - @Override > > - public void performMutate(IndexSearcherCloseable > searcher, IndexWriter writer) throws IOException { > > - Configuration configuration = > _tableContext.getConfiguration(); > > - > > - SequenceFile.Sorter sorter = new Sorter(fileSystem, > Text.class, RowMutationWritable.class, > > - configuration); > > - > > - _sorted = new Path(path.getParent(), shard + > ".sorted.seq"); > > - > > - LOG.info("Shard [{2}/{3}] Id [{4}] Sorting mutates path > [{0}] sorted path [{1}]", path, _sorted, table, > > - shard, bulkId); > > - sorter.sort(path, _sorted); > > - > > - LOG.info("Shard [{1}/{2}] Id [{3}] Applying mutates > sorted path [{0}]", _sorted, table, shard, bulkId); > > - Reader reader = new SequenceFile.Reader(fileSystem, > _sorted, configuration); > > - > > - Text key = new Text(); > > - RowMutationWritable value = new RowMutationWritable(); > > - > > - Text last = null; > > - List<RowMutation> list = new ArrayList<RowMutation>(); > > - while (reader.next(key, value)) { > > - if (!key.equals(last)) { > > - flushMutates(searcher, writer, list); > > - last = new Text(key); > > - list.clear(); > > - } > > - list.add(value.getRowMutation().deepCopy()); > > - } > > - flushMutates(searcher, writer, list); > > - reader.close(); > > - LOG.info("Shard [{0}/{1}] Id [{2}] Finished applying > mutates starting commit.", table, shard, bulkId); > > - } > > + public void performMutate(IndexSearcherCloseable searcher, > IndexWriter writer) throws IOException { > > + Configuration configuration = > _tableContext.getConfiguration(); > > > > - private void flushMutates(IndexSearcherCloseable > searcher, IndexWriter writer, List<RowMutation> list) > > - throws IOException { > > - if (!list.isEmpty()) { > > - List<RowMutation> reduceMutates; > > - try { > > - reduceMutates = MutatableAction.reduceMutates(list); > > - } catch (BlurException e) { > > - throw new IOException(e); > > - } > > - for (RowMutation mutation : reduceMutates) { > > - MutatableAction mutatableAction = new > MutatableAction(_shardContext); > > - mutatableAction.mutate(mutation); > > - mutatableAction.performMutate(searcher, writer); > > - } > > - } > > - } > > + SequenceFile.Sorter sorter = new Sorter(_fileSystem, > Text.class, RowMutationWritable.class, configuration); > > > > - private void cleanupFiles() throws IOException { > > - fileSystem.delete(path, false); > > - fileSystem.delete(_sorted, false); > > - Path parent = path.getParent(); > > - removeParentIfLastFile(fileSystem, parent); > > - } > > + _unsortedPaths = getUnsortedFiles(); > > > > - @Override > > - public void doPreRollback(IndexWriter writer) throws > IOException { > > + _sorted = new Path(_parentPath, _shard + ".sorted.seq"); > > > > - } > > + LOG.info("Shard [{2}/{3}] Id [{4}] Sorting mutates paths > [{0}] sorted path [{1}]", _unsortedPaths, _sorted, > > + _table, _shard, _bulkId); > > + sorter.sort(_unsortedPaths.toArray(new > Path[_unsortedPaths.size()]), _sorted, true); > > > > - @Override > > - public void doPreCommit(IndexSearcherCloseable > indexSearcher, IndexWriter writer) throws IOException { > > + LOG.info("Shard [{1}/{2}] Id [{3}] Applying mutates sorted > path [{0}]", _sorted, _table, _shard, _bulkId); > > + Reader reader = new SequenceFile.Reader(_fileSystem, _sorted, > configuration); > > > > - } > > + Text key = new Text(); > > + RowMutationWritable value = new RowMutationWritable(); > > > > - @Override > > - public void doPostRollback(IndexWriter writer) throws > IOException { > > - cleanupFiles(); > > - } > > + Text last = null; > > + List<RowMutation> list = new ArrayList<RowMutation>(); > > + while (reader.next(key, value)) { > > + if (!key.equals(last)) { > > + flushMutates(searcher, writer, list); > > + last = new Text(key); > > + list.clear(); > > + } > > + list.add(value.getRowMutation().deepCopy()); > > + } > > + flushMutates(searcher, writer, list); > > + reader.close(); > > + LOG.info("Shard [{0}/{1}] Id [{2}] Finished applying mutates > starting commit.", _table, _shard, _bulkId); > > + } > > > > - @Override > > - public void doPostCommit(IndexWriter writer) throws > IOException { > > - cleanupFiles(); > > - } > > - }); > > - } catch (IOException e) { > > - LOG.error("Shard [{0}/{1}] Id [{2}] Unknown error while > trying to finish the bulk updates.", table, shard, > > - bulkId, e); > > + private void flushMutates(IndexSearcherCloseable searcher, > IndexWriter writer, List<RowMutation> list) > > + throws IOException { > > + if (!list.isEmpty()) { > > + List<RowMutation> reduceMutates; > > + try { > > + reduceMutates = MutatableAction.reduceMutates(list); > > + } catch (BlurException e) { > > + throw new IOException(e); > > + } > > + for (RowMutation mutation : reduceMutates) { > > + MutatableAction mutatableAction = new > MutatableAction(_shardContext); > > + mutatableAction.mutate(mutation); > > + mutatableAction.performMutate(searcher, writer); > > + } > > } > > } > > + > > + @Override > > + public void doPreRollback(IndexWriter writer) throws > IOException { > > + > > + } > > + > > + @Override > > + public void doPreCommit(IndexSearcherCloseable indexSearcher, > IndexWriter writer) throws IOException { > > + > > + } > > + > > + @Override > > + public void doPostRollback(IndexWriter writer) throws > IOException { > > + cleanupFiles(_unsortedPaths, _sorted); > > + } > > + > > + @Override > > + public void doPostCommit(IndexWriter writer) throws IOException > { > > + cleanupFiles(_unsortedPaths, _sorted); > > + } > > }; > > + } > > + > > + @Override > > + public String toString() { > > + return "BulkEntry [_bulkId=" + _bulkId + ", _table=" + _table + > ", _shard=" + _shard + ", _idleTime=" + _idleTime > > + + ", _lastWrite=" + _lastWrite + ", _count=" + _count + "]"; > > + } > > + > > + } > > + > > + public synchronized BulkEntry startBulkMutate(String bulkId) throws > IOException { > > + BulkEntry bulkEntry = _bulkWriters.get(bulkId); > > + if (bulkEntry == null) { > > + Path tablePath = _tableContext.getTablePath(); > > + Path bulk = new Path(tablePath, "bulk"); > > + Path bulkInstance = new Path(bulk, bulkId); > > + Path path = new Path(bulkInstance, _shardContext.getShard() + > ".notsorted.seq"); > > + > > + bulkEntry = new BulkEntry(bulkId, path, _shardContext); > > + _bulkWriters.put(bulkId, bulkEntry); > > + } else { > > + LOG.info("Bulk [{0}] mutate already started on shard [{1}] in > table [{2}].", bulkId, _shardContext.getShard(), > > + _tableContext.getTable()); > > + } > > + return bulkEntry; > > + } > > + > > + @Override > > + public void finishBulkMutate(final String bulkId, boolean apply, > boolean blockUntilComplete) throws IOException { > > + final String table = _tableContext.getTable(); > > + final String shard = _shardContext.getShard(); > > + > > + final BulkEntry bulkEntry = _bulkWriters.get(bulkId); > > + if (bulkEntry == null) { > > + LOG.info("Shard [{2}/{3}] Id [{0}] Nothing to apply.", bulkId, > apply, table, shard); > > + return; > > + } > > + LOG.info("Shard [{2}/{3}] Id [{0}] Finishing bulk mutate apply > [{1}]", bulkId, apply, table, shard); > > + bulkEntry.close(); > > + > > + if (!apply) { > > + bulkEntry.cleanupFiles(bulkEntry.getUnsortedFiles(), null); > > + } else { > > + final IndexAction indexAction = bulkEntry.getIndexAction(); > > if (blockUntilComplete) { > > - runnable.run(); > > + process(indexAction); > > } else { > > - Thread thread = new Thread(runnable); > > + Thread thread = new Thread(new Runnable() { > > + @Override > > + public void run() { > > + try { > > + process(indexAction); > > + } catch (IOException e) { > > + LOG.error("Shard [{0}/{1}] Id [{2}] Unknown error while > trying to finish the bulk updates.", table, > > + shard, bulkId, e); > > I think e is misplaced here? > > Thanks, > --tim >
