Removing thread local variables.
Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/991fb043 Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/991fb043 Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/991fb043 Branch: refs/heads/master Commit: 991fb0435a373283b2fb05ef29c711c916f0aec9 Parents: 98359a4 Author: Aaron McCurry <amccu...@gmail.com> Authored: Mon Aug 29 21:49:31 2016 -0400 Committer: Aaron McCurry <amccu...@gmail.com> Committed: Mon Aug 29 21:49:31 2016 -0400 ---------------------------------------------------------------------- .../apache/blur/command/BaseCommandManager.java | 15 +- .../MasterBasedDistributedLayoutFactory.java | 3 +- .../java/org/apache/blur/utils/BlurUtil.java | 2 +- .../blur/command/ShardCommandManagerTest.java | 2 +- .../org/apache/blur/hive/BlurSerializer.java | 7 +- .../blur/mapreduce/lib/BlurOutputFormat.java | 5 +- .../mapreduce/lib/GenericBlurRecordWriter.java | 3 +- .../analysis/type/DateFieldTypeDefinition.java | 5 +- .../java/org/apache/blur/index/ExitObject.java | 4 +- .../blur/lucene/codec/CachedDecompressor.java | 3 +- .../lucene/codec/DiskDocValuesProducer.java | 5 +- .../blur/store/blockcache/BlockCache.java | 4 +- .../blur/store/blockcache_v2/MeterWrapper.java | 3 +- .../blur/store/hdfs_v2/StoreDirection.java | 4 +- .../lucene/codecs/BlockTreeTermsReader.java | 2939 ++++++++++++++++++ .../apache/blur/thrift/BlurClientManager.java | 3 +- .../apache/blur/thrift/sasl/TSaslTransport.java | 3 +- .../main/java/org/apache/blur/trace/Trace.java | 5 +- .../java/org/apache/blur/user/UserContext.java | 4 +- .../java/org/apache/blur/utils/ThreadValue.java | 56 + .../apache/blur/utils/BlurConstantsTest.java | 2 +- 21 files changed, 3048 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/991fb043/blur-core/src/main/java/org/apache/blur/command/BaseCommandManager.java ---------------------------------------------------------------------- diff --git a/blur-core/src/main/java/org/apache/blur/command/BaseCommandManager.java b/blur-core/src/main/java/org/apache/blur/command/BaseCommandManager.java index be92e34..b91e378 100644 --- a/blur-core/src/main/java/org/apache/blur/command/BaseCommandManager.java +++ b/blur-core/src/main/java/org/apache/blur/command/BaseCommandManager.java @@ -390,7 +390,9 @@ public abstract class BaseCommandManager implements Closeable { throw new IOException("Execution instance id [" + instanceExecutionId + "] did not find any executing commands."); } try { - return future.get(_connectionTimeout, TimeUnit.MILLISECONDS); + Response response = future.get(_connectionTimeout, TimeUnit.MILLISECONDS); + _driverRunningMap.remove(instanceExecutionId); + return response; } catch (CancellationException e) { throw new IOException(e); } catch (InterruptedException e) { @@ -411,7 +413,9 @@ public abstract class BaseCommandManager implements Closeable { _driverRunningMap.put(instanceExecutionId, new ResponseFuture<Response>(_runningCacheTombstoneTime, future, commandExecuting, originalCommandStatusObject, running)); try { - return future.get(_connectionTimeout, TimeUnit.MILLISECONDS); + Response response = future.get(_connectionTimeout, TimeUnit.MILLISECONDS); + _driverRunningMap.remove(instanceExecutionId); + return response; } catch (CancellationException e) { throw new IOException(e); } catch (InterruptedException e) { @@ -447,9 +451,10 @@ public abstract class BaseCommandManager implements Closeable { CommandStatus originalCommandStatusObject, AtomicBoolean running) { Future<T> future = _executorServiceWorker.submit(callable); Long instanceExecutionId = getInstanceExecutionId(); - _workerRunningMap.put(instanceExecutionId, new ResponseFuture<T>(_runningCacheTombstoneTime, future, - commandExecuting, originalCommandStatusObject, running)); - return future; + ResponseFuture<T> responseFuture = new ResponseFuture<T>(_runningCacheTombstoneTime, future, + commandExecuting, originalCommandStatusObject, running); + _workerRunningMap.put(instanceExecutionId, responseFuture); + return responseFuture; } @Override http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/991fb043/blur-core/src/main/java/org/apache/blur/manager/indexserver/MasterBasedDistributedLayoutFactory.java ---------------------------------------------------------------------- diff --git a/blur-core/src/main/java/org/apache/blur/manager/indexserver/MasterBasedDistributedLayoutFactory.java b/blur-core/src/main/java/org/apache/blur/manager/indexserver/MasterBasedDistributedLayoutFactory.java index 6fff994..1a2e916 100644 --- a/blur-core/src/main/java/org/apache/blur/manager/indexserver/MasterBasedDistributedLayoutFactory.java +++ b/blur-core/src/main/java/org/apache/blur/manager/indexserver/MasterBasedDistributedLayoutFactory.java @@ -38,6 +38,7 @@ import java.util.concurrent.ConcurrentMap; import org.apache.blur.log.Log; import org.apache.blur.log.LogFactory; +import org.apache.blur.utils.ThreadValue; import org.apache.blur.zookeeper.ZkUtils; import org.apache.blur.zookeeper.ZooKeeperLockManager; import org.apache.blur.zookeeper.ZookeeperPathConstants; @@ -57,7 +58,7 @@ public class MasterBasedDistributedLayoutFactory implements DistributedLayoutFac private final String _storagePath; private final ZooKeeperLockManager _zooKeeperLockManager; private final String _locksStoragePath; - private final ThreadLocal<Random> _random = new ThreadLocal<Random>() { + private final ThreadValue<Random> _random = new ThreadValue<Random>() { @Override protected Random initialValue() { return new Random(); http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/991fb043/blur-core/src/main/java/org/apache/blur/utils/BlurUtil.java ---------------------------------------------------------------------- diff --git a/blur-core/src/main/java/org/apache/blur/utils/BlurUtil.java b/blur-core/src/main/java/org/apache/blur/utils/BlurUtil.java index 9406e66..5c0022c 100644 --- a/blur-core/src/main/java/org/apache/blur/utils/BlurUtil.java +++ b/blur-core/src/main/java/org/apache/blur/utils/BlurUtil.java @@ -221,7 +221,7 @@ public class BlurUtil { final String prefix = new SimpleDateFormat("yyyyMMddHHmmss").format(new Date()); InvocationHandler handler = new InvocationHandler() { private final AtomicLong _requestCounter = new AtomicLong(); - private ThreadLocal<LoggerArgsState> _loggerArgsState = new ThreadLocal<LoggerArgsState>() { + private ThreadValue<LoggerArgsState> _loggerArgsState = new ThreadValue<LoggerArgsState>() { @Override protected LoggerArgsState initialValue() { return new LoggerArgsState(1024); http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/991fb043/blur-core/src/test/java/org/apache/blur/command/ShardCommandManagerTest.java ---------------------------------------------------------------------- diff --git a/blur-core/src/test/java/org/apache/blur/command/ShardCommandManagerTest.java b/blur-core/src/test/java/org/apache/blur/command/ShardCommandManagerTest.java index f5fe4ea..0fb966a 100644 --- a/blur-core/src/test/java/org/apache/blur/command/ShardCommandManagerTest.java +++ b/blur-core/src/test/java/org/apache/blur/command/ShardCommandManagerTest.java @@ -96,7 +96,7 @@ public class ShardCommandManagerTest { @Test public void testGetCommands() { Map<String, BigInteger> commands = _manager.getCommands(); - assertEquals(4, commands.size()); + assertEquals(5, commands.size()); assertTrue(commands.containsKey("wait")); assertTrue(commands.containsKey("error")); assertTrue(commands.containsKey("RunSlowForTesting")); http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/991fb043/blur-hive/src/main/java/org/apache/blur/hive/BlurSerializer.java ---------------------------------------------------------------------- diff --git a/blur-hive/src/main/java/org/apache/blur/hive/BlurSerializer.java b/blur-hive/src/main/java/org/apache/blur/hive/BlurSerializer.java index 460c7f9..bd5ae74 100644 --- a/blur-hive/src/main/java/org/apache/blur/hive/BlurSerializer.java +++ b/blur-hive/src/main/java/org/apache/blur/hive/BlurSerializer.java @@ -26,6 +26,7 @@ import java.util.Set; import org.apache.blur.mapreduce.lib.BlurRecord; import org.apache.blur.thrift.generated.ColumnDefinition; +import org.apache.blur.utils.ThreadValue; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; @@ -39,7 +40,7 @@ public class BlurSerializer { private static final String DATE_FORMAT = "dateFormat"; private static final String DATE = "date"; - private Map<String, ThreadLocal<SimpleDateFormat>> _dateFormat = new HashMap<String, ThreadLocal<SimpleDateFormat>>(); + private Map<String, ThreadValue<SimpleDateFormat>> _dateFormat = new HashMap<String, ThreadValue<SimpleDateFormat>>(); private BlurColumnNameResolver _columnNameResolver; public BlurSerializer(Map<String, ColumnDefinition> colDefs, BlurColumnNameResolver columnNameResolver) { @@ -52,7 +53,7 @@ public class BlurSerializer { if (fieldType.equals(DATE)) { Map<String, String> properties = columnDefinition.getProperties(); final String dateFormat = properties.get(DATE_FORMAT); - ThreadLocal<SimpleDateFormat> threadLocal = new ThreadLocal<SimpleDateFormat>() { + ThreadValue<SimpleDateFormat> threadLocal = new ThreadValue<SimpleDateFormat>() { @Override protected SimpleDateFormat initialValue() { return new SimpleDateFormat(dateFormat); @@ -184,7 +185,7 @@ public class BlurSerializer { } private SimpleDateFormat getSimpleDateFormat(String columnName) throws SerDeException { - ThreadLocal<SimpleDateFormat> threadLocal = _dateFormat.get(columnName); + ThreadValue<SimpleDateFormat> threadLocal = _dateFormat.get(columnName); if (threadLocal == null) { throw new SerDeException("Date format missing for column [" + columnName + "]"); } http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/991fb043/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputFormat.java ---------------------------------------------------------------------- diff --git a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputFormat.java b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputFormat.java index 52900cf..3e72772 100644 --- a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputFormat.java +++ b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputFormat.java @@ -27,6 +27,7 @@ import org.apache.blur.thirdparty.thrift_0_9_0.transport.TIOStreamTransport; import org.apache.blur.thrift.BlurClient; import org.apache.blur.thrift.generated.Blur.Iface; import org.apache.blur.thrift.generated.TableDescriptor; +import org.apache.blur.utils.ThreadValue; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; @@ -77,8 +78,8 @@ public class BlurOutputFormat extends OutputFormat<Text, BlurMutate> { public static final String BLUR_OUTPUT_PATH = "blur.output.path"; private static final String MAPRED_OUTPUT_COMMITTER_CLASS = "mapred.output.committer.class"; - private static ThreadLocal<Progressable> _progressable = new ThreadLocal<Progressable>(); - private static ThreadLocal<GetCounter> _getCounter = new ThreadLocal<GetCounter>(); + private static ThreadValue<Progressable> _progressable = new ThreadValue<Progressable>(); + private static ThreadValue<GetCounter> _getCounter = new ThreadValue<GetCounter>(); public static void setProgressable(Progressable progressable) { _progressable.set(progressable); http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/991fb043/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/GenericBlurRecordWriter.java ---------------------------------------------------------------------- diff --git a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/GenericBlurRecordWriter.java b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/GenericBlurRecordWriter.java index 8828f85..fba819e 100644 --- a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/GenericBlurRecordWriter.java +++ b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/GenericBlurRecordWriter.java @@ -39,6 +39,7 @@ import org.apache.blur.thrift.generated.TableDescriptor; import org.apache.blur.utils.BlurConstants; import org.apache.blur.utils.RowDocumentUtil; import org.apache.blur.utils.ShardUtil; +import org.apache.blur.utils.ThreadValue; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; @@ -278,7 +279,7 @@ public class GenericBlurRecordWriter { return record; } - private static ThreadLocal<AtomicBoolean> _existingRow = new ThreadLocal<AtomicBoolean>() { + private static ThreadValue<AtomicBoolean> _existingRow = new ThreadValue<AtomicBoolean>() { @Override protected AtomicBoolean initialValue() { return new AtomicBoolean(); http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/991fb043/blur-query/src/main/java/org/apache/blur/analysis/type/DateFieldTypeDefinition.java ---------------------------------------------------------------------- diff --git a/blur-query/src/main/java/org/apache/blur/analysis/type/DateFieldTypeDefinition.java b/blur-query/src/main/java/org/apache/blur/analysis/type/DateFieldTypeDefinition.java index 695cdc3..0e35125 100644 --- a/blur-query/src/main/java/org/apache/blur/analysis/type/DateFieldTypeDefinition.java +++ b/blur-query/src/main/java/org/apache/blur/analysis/type/DateFieldTypeDefinition.java @@ -25,6 +25,7 @@ import java.util.Map; import java.util.concurrent.TimeUnit; import org.apache.blur.thrift.generated.Column; +import org.apache.blur.utils.ThreadValue; import org.apache.hadoop.conf.Configuration; import org.apache.lucene.document.Field; import org.apache.lucene.document.FieldType; @@ -44,7 +45,7 @@ public class DateFieldTypeDefinition extends NumericFieldTypeDefinition { public static final String DATE_FORMAT = "dateFormat"; public static final String NAME = "date"; private FieldType _typeNotStored; - private ThreadLocal<SimpleDateFormat> _simpleDateFormat; + private ThreadValue<SimpleDateFormat> _simpleDateFormat; private TimeUnit _timeUnit = TimeUnit.SECONDS; @Override @@ -62,7 +63,7 @@ public class DateFieldTypeDefinition extends NumericFieldTypeDefinition { if (timeUnitStr != null) { _timeUnit = TimeUnit.valueOf(timeUnitStr.trim().toUpperCase()); } - _simpleDateFormat = new ThreadLocal<SimpleDateFormat>() { + _simpleDateFormat = new ThreadValue<SimpleDateFormat>() { @Override protected SimpleDateFormat initialValue() { return new SimpleDateFormat(dateFormat); http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/991fb043/blur-store/src/main/java/org/apache/blur/index/ExitObject.java ---------------------------------------------------------------------- diff --git a/blur-store/src/main/java/org/apache/blur/index/ExitObject.java b/blur-store/src/main/java/org/apache/blur/index/ExitObject.java index 92cd219..d2ad352 100644 --- a/blur-store/src/main/java/org/apache/blur/index/ExitObject.java +++ b/blur-store/src/main/java/org/apache/blur/index/ExitObject.java @@ -18,9 +18,11 @@ package org.apache.blur.index; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.blur.utils.ThreadValue; + public class ExitObject { - private final ThreadLocal<AtomicBoolean> _running = new ThreadLocal<AtomicBoolean>() { + private final ThreadValue<AtomicBoolean> _running = new ThreadValue<AtomicBoolean>() { @Override protected AtomicBoolean initialValue() { return new AtomicBoolean(true); http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/991fb043/blur-store/src/main/java/org/apache/blur/lucene/codec/CachedDecompressor.java ---------------------------------------------------------------------- diff --git a/blur-store/src/main/java/org/apache/blur/lucene/codec/CachedDecompressor.java b/blur-store/src/main/java/org/apache/blur/lucene/codec/CachedDecompressor.java index 18c4cc1..1a438da 100644 --- a/blur-store/src/main/java/org/apache/blur/lucene/codec/CachedDecompressor.java +++ b/blur-store/src/main/java/org/apache/blur/lucene/codec/CachedDecompressor.java @@ -18,6 +18,7 @@ package org.apache.blur.lucene.codec; import java.io.IOException; +import org.apache.blur.utils.ThreadValue; import org.apache.lucene.codecs.compressing.Decompressor; import org.apache.lucene.store.DataInput; import org.apache.lucene.store.IndexInput; @@ -27,7 +28,7 @@ import org.apache.lucene.util.BytesRef; public class CachedDecompressor extends Decompressor { private final Decompressor _decompressor; - private final ThreadLocal<Entry> _entry = new ThreadLocal<Entry>() { + private final ThreadValue<Entry> _entry = new ThreadValue<Entry>() { @Override protected Entry initialValue() { return new Entry(); http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/991fb043/blur-store/src/main/java/org/apache/blur/lucene/codec/DiskDocValuesProducer.java ---------------------------------------------------------------------- diff --git a/blur-store/src/main/java/org/apache/blur/lucene/codec/DiskDocValuesProducer.java b/blur-store/src/main/java/org/apache/blur/lucene/codec/DiskDocValuesProducer.java index fd617b6..ff51ddd 100644 --- a/blur-store/src/main/java/org/apache/blur/lucene/codec/DiskDocValuesProducer.java +++ b/blur-store/src/main/java/org/apache/blur/lucene/codec/DiskDocValuesProducer.java @@ -23,6 +23,7 @@ import java.util.concurrent.ConcurrentHashMap; import org.apache.blur.trace.Trace; import org.apache.blur.trace.Tracer; +import org.apache.blur.utils.ThreadValue; import org.apache.lucene.codecs.CodecUtil; import org.apache.lucene.codecs.DocValuesProducer; import org.apache.lucene.index.BinaryDocValues; @@ -236,7 +237,7 @@ class DiskDocValuesProducer extends DocValuesProducer { return new LongBinaryDocValues() { - private final ThreadLocal<IndexInput> in = new ThreadLocal<IndexInput>() { + private final ThreadValue<IndexInput> in = new ThreadValue<IndexInput>() { @Override protected IndexInput initialValue() { return data.clone(); @@ -277,7 +278,7 @@ class DiskDocValuesProducer extends DocValuesProducer { } return new LongBinaryDocValues() { - private final ThreadLocal<IndexInput> _input = new ThreadLocal<IndexInput>() { + private final ThreadValue<IndexInput> _input = new ThreadValue<IndexInput>() { @Override protected IndexInput initialValue() { return data.clone(); http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/991fb043/blur-store/src/main/java/org/apache/blur/store/blockcache/BlockCache.java ---------------------------------------------------------------------- diff --git a/blur-store/src/main/java/org/apache/blur/store/blockcache/BlockCache.java b/blur-store/src/main/java/org/apache/blur/store/blockcache/BlockCache.java index 4597ffa..1c6d9a1 100644 --- a/blur-store/src/main/java/org/apache/blur/store/blockcache/BlockCache.java +++ b/blur-store/src/main/java/org/apache/blur/store/blockcache/BlockCache.java @@ -33,6 +33,8 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.blur.utils.ThreadValue; + import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap; import com.googlecode.concurrentlinkedhashmap.EvictionListener; import com.yammer.metrics.Metrics; @@ -71,7 +73,7 @@ public class BlockCache implements Closeable { private final Meter evictions; private final int _numberOfSlabs; private final boolean _directAllocation; - private final ThreadLocal<ByteBuffer[]> _threadLocalSlabs = new ThreadLocal<ByteBuffer[]>() { + private final ThreadValue<ByteBuffer[]> _threadLocalSlabs = new ThreadValue<ByteBuffer[]>() { @Override protected ByteBuffer[] initialValue() { return new ByteBuffer[_numberOfSlabs]; http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/991fb043/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/MeterWrapper.java ---------------------------------------------------------------------- diff --git a/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/MeterWrapper.java b/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/MeterWrapper.java index 269245a..5a7102d 100644 --- a/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/MeterWrapper.java +++ b/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/MeterWrapper.java @@ -31,6 +31,7 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.blur.log.Log; import org.apache.blur.log.LogFactory; +import org.apache.blur.utils.ThreadValue; import com.yammer.metrics.core.Meter; @@ -75,7 +76,7 @@ public abstract class MeterWrapper implements Closeable { public static MeterWrapper wrap(final SimpleMeter meter) { final String id = UUID.randomUUID().toString(); - final ThreadLocal<AtomicLong> countThreadLocal = new ThreadLocal<AtomicLong>() { + final ThreadValue<AtomicLong> countThreadLocal = new ThreadValue<AtomicLong>() { @Override protected AtomicLong initialValue() { AtomicLong counter = new AtomicLong(); http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/991fb043/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/StoreDirection.java ---------------------------------------------------------------------- diff --git a/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/StoreDirection.java b/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/StoreDirection.java index 2dacc8a..2ee6e3c 100644 --- a/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/StoreDirection.java +++ b/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/StoreDirection.java @@ -16,8 +16,10 @@ */ package org.apache.blur.store.hdfs_v2; +import org.apache.blur.utils.ThreadValue; + public class StoreDirection { - public static ThreadLocal<Boolean> LONG_TERM = new ThreadLocal<Boolean>() { + public static ThreadValue<Boolean> LONG_TERM = new ThreadValue<Boolean>() { @Override protected Boolean initialValue() { return false;