This is an automated email from the ASF dual-hosted git repository.
jonwei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/master by this push:
new 3647d4c Make time-related variables more readable (#6158)
3647d4c is described below
commit 3647d4c94ad52ba16ae9ab9ee583dad355f21fd2
Author: Benedict Jin <[email protected]>
AuthorDate: Wed Aug 22 06:29:40 2018 +0800
Make time-related variables more readable (#6158)
* Make time-related variables more readable
* Patch some improvements from the code reviewer
* Remove unnecessary boxing of Long type variables
---
.../java/io/druid/common/config/Log4jShutdown.java | 5 +++--
.../ambari/metrics/AmbariMetricsEmitterConfig.java | 8 +++----
.../io/druid/emitter/graphite/GraphiteEmitter.java | 4 ++--
.../emitter/graphite/GraphiteEmitterConfig.java | 10 ++++-----
.../KafkaEightSimpleConsumerFirehoseFactory.java | 5 +++--
.../druid/firehose/kafka/KafkaSimpleConsumer.java | 13 ++++++-----
.../datasketches/theta/SketchHolder.java | 2 +-
.../query/lookup/TestKafkaExtractionCluster.java | 3 ++-
.../IncrementalPublishingKafkaIndexTaskRunner.java | 2 +-
.../io/druid/indexing/kafka/KafkaIndexTask.java | 3 ++-
.../indexing/kafka/LegacyKafkaIndexTaskRunner.java | 2 +-
.../indexing/common/task/HadoopIndexTask.java | 2 +-
.../indexing/common/task/RealtimeIndexTask.java | 2 +-
.../java/io/druid/indexing/common/task/Tasks.java | 3 ++-
.../java/io/druid/indexing/overlord/TaskQueue.java | 5 ++++-
.../http/client/pool/ChannelResourceFactory.java | 5 +++--
.../main/java/io/druid/query/QueryContexts.java | 4 +++-
.../java/io/druid/query/AsyncQueryRunnerTest.java | 6 ++---
.../io/druid/guice/http/JettyHttpClientModule.java | 5 +++--
.../druid/segment/loading/SegmentLoaderConfig.java | 3 ++-
.../druid/server/initialization/ServerConfig.java | 3 ++-
.../StreamAppenderatorDriverFailTest.java | 10 ++++-----
.../appenderator/StreamAppenderatorDriverTest.java | 26 +++++++++++-----------
.../firehose/IngestSegmentFirehoseTest.java | 4 ++--
24 files changed, 75 insertions(+), 60 deletions(-)
diff --git a/common/src/main/java/io/druid/common/config/Log4jShutdown.java
b/common/src/main/java/io/druid/common/config/Log4jShutdown.java
index d910b18..7b81ada 100644
--- a/common/src/main/java/io/druid/common/config/Log4jShutdown.java
+++ b/common/src/main/java/io/druid/common/config/Log4jShutdown.java
@@ -28,11 +28,12 @@ import
org.apache.logging.log4j.core.util.ShutdownCallbackRegistry;
import javax.annotation.concurrent.GuardedBy;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
public class Log4jShutdown implements ShutdownCallbackRegistry, LifeCycle
{
- private static final long SHUTDOWN_WAIT_TIMEOUT = 60000;
+ private static final long SHUTDOWN_WAIT_TIMEOUT_MILLIS =
TimeUnit.MINUTES.toMillis(1);
private final SynchronizedStateHolder state = new
SynchronizedStateHolder(State.INITIALIZED);
private final Queue<Runnable> shutdownCallbacks = new
ConcurrentLinkedQueue<>();
@@ -100,7 +101,7 @@ public class Log4jShutdown implements
ShutdownCallbackRegistry, LifeCycle
public void stop()
{
if (!state.compareAndSet(State.STARTED, State.STOPPING)) {
- State current = state.waitForTransition(State.STOPPING, State.STOPPED,
SHUTDOWN_WAIT_TIMEOUT);
+ State current = state.waitForTransition(State.STOPPING, State.STOPPED,
SHUTDOWN_WAIT_TIMEOUT_MILLIS);
if (current != State.STOPPED) {
throw new ISE("Expected state [%s] found [%s]", State.STARTED,
current);
}
diff --git
a/extensions-contrib/ambari-metrics-emitter/src/main/java/io/druid/emitter/ambari/metrics/AmbariMetricsEmitterConfig.java
b/extensions-contrib/ambari-metrics-emitter/src/main/java/io/druid/emitter/ambari/metrics/AmbariMetricsEmitterConfig.java
index a7df750..fdd3595 100644
---
a/extensions-contrib/ambari-metrics-emitter/src/main/java/io/druid/emitter/ambari/metrics/AmbariMetricsEmitterConfig.java
+++
b/extensions-contrib/ambari-metrics-emitter/src/main/java/io/druid/emitter/ambari/metrics/AmbariMetricsEmitterConfig.java
@@ -25,13 +25,13 @@ import com.google.common.base.Preconditions;
import java.util.Collections;
import java.util.List;
-
+import java.util.concurrent.TimeUnit;
public class AmbariMetricsEmitterConfig
{
private static final int DEFAULT_BATCH_SIZE = 100;
- private static final Long DEFAULT_FLUSH_PERIOD_MILLIS = (long) (60 * 1000);
// flush every one minute
- private static final long DEFAULT_GET_TIMEOUT = 1000; // default wait for
get operations on the queue 1 sec
+ private static final long DEFAULT_FLUSH_PERIOD_MILLIS =
TimeUnit.MINUTES.toMillis(1); // flush every one minute
+ private static final long DEFAULT_GET_TIMEOUT_MILLIS =
TimeUnit.SECONDS.toMillis(1); // default wait for get operations on the queue 1
sec
private static final String DEFAULT_PROTOCOL = "http";
@JsonProperty
@@ -106,7 +106,7 @@ public class AmbariMetricsEmitterConfig
);
this.alertEmitters = alertEmitters == null ? Collections.emptyList() :
alertEmitters;
this.emitWaitTime = emitWaitTime == null ? 0 : emitWaitTime;
- this.waitForEventTime = waitForEventTime == null ? DEFAULT_GET_TIMEOUT :
waitForEventTime;
+ this.waitForEventTime = waitForEventTime == null ?
DEFAULT_GET_TIMEOUT_MILLIS : waitForEventTime;
}
@JsonProperty
diff --git
a/extensions-contrib/graphite-emitter/src/main/java/io/druid/emitter/graphite/GraphiteEmitter.java
b/extensions-contrib/graphite-emitter/src/main/java/io/druid/emitter/graphite/GraphiteEmitter.java
index 531e90e..e29b0cb 100644
---
a/extensions-contrib/graphite-emitter/src/main/java/io/druid/emitter/graphite/GraphiteEmitter.java
+++
b/extensions-contrib/graphite-emitter/src/main/java/io/druid/emitter/graphite/GraphiteEmitter.java
@@ -56,7 +56,7 @@ public class GraphiteEmitter implements Emitter
private final List<Emitter> requestLogEmitters;
private final AtomicBoolean started = new AtomicBoolean(false);
private final LinkedBlockingQueue<GraphiteEvent> eventsQueue;
- private static final long FLUSH_TIMEOUT = 60000; // default flush wait 1 min
+ private static final long FLUSH_TIMEOUT_MILLIS =
TimeUnit.MINUTES.toMillis(1); // default flush wait 1 min
private final ScheduledExecutorService exec =
Executors.newScheduledThreadPool(2, new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("GraphiteEmitter-%s")
@@ -222,7 +222,7 @@ public class GraphiteEmitter implements Emitter
if (started.get()) {
Future future = exec.schedule(new ConsumerRunnable(), 0,
TimeUnit.MILLISECONDS);
try {
- future.get(FLUSH_TIMEOUT, TimeUnit.MILLISECONDS);
+ future.get(FLUSH_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
}
catch (InterruptedException | ExecutionException | TimeoutException e) {
if (e instanceof InterruptedException) {
diff --git
a/extensions-contrib/graphite-emitter/src/main/java/io/druid/emitter/graphite/GraphiteEmitterConfig.java
b/extensions-contrib/graphite-emitter/src/main/java/io/druid/emitter/graphite/GraphiteEmitterConfig.java
index 8013027..ee426a0 100644
---
a/extensions-contrib/graphite-emitter/src/main/java/io/druid/emitter/graphite/GraphiteEmitterConfig.java
+++
b/extensions-contrib/graphite-emitter/src/main/java/io/druid/emitter/graphite/GraphiteEmitterConfig.java
@@ -25,15 +25,15 @@ import com.google.common.base.Preconditions;
import java.util.Collections;
import java.util.List;
-
+import java.util.concurrent.TimeUnit;
public class GraphiteEmitterConfig
{
public static final String PLAINTEXT_PROTOCOL = "plaintext";
public static final String PICKLE_PROTOCOL = "pickle";
private static final int DEFAULT_BATCH_SIZE = 100;
- private static final Long DEFAULT_FLUSH_PERIOD = (long) (60 * 1000); //
flush every one minute
- private static final long DEFAULT_GET_TIMEOUT = 1000; // default wait for
get operations on the queue 1 sec
+ private static final long DEFAULT_FLUSH_PERIOD_MILLIS =
TimeUnit.MINUTES.toMillis(1); // flush every one minute
+ private static final long DEFAULT_GET_TIMEOUT_MILLIS =
TimeUnit.SECONDS.toMillis(1); // default wait for get operations on the queue 1
sec
@JsonProperty
private final String hostname;
@@ -142,7 +142,7 @@ public class GraphiteEmitterConfig
@JsonProperty("waitForEventTime") Long waitForEventTime
)
{
- this.waitForEventTime = waitForEventTime == null ? DEFAULT_GET_TIMEOUT :
waitForEventTime;
+ this.waitForEventTime = waitForEventTime == null ?
DEFAULT_GET_TIMEOUT_MILLIS : waitForEventTime;
this.emitWaitTime = emitWaitTime == null ? 0 : emitWaitTime;
this.alertEmitters = alertEmitters == null ? Collections.emptyList() :
alertEmitters;
this.requestLogEmitters = requestLogEmitters == null ?
Collections.emptyList() : requestLogEmitters;
@@ -150,7 +150,7 @@ public class GraphiteEmitterConfig
druidToGraphiteEventConverter,
"Event converter can not ne null dude"
);
- this.flushPeriod = flushPeriod == null ? DEFAULT_FLUSH_PERIOD :
flushPeriod;
+ this.flushPeriod = flushPeriod == null ? DEFAULT_FLUSH_PERIOD_MILLIS :
flushPeriod;
this.maxQueueSize = maxQueueSize == null ? Integer.MAX_VALUE :
maxQueueSize;
this.hostname = Preconditions.checkNotNull(hostname, "hostname can not be
null");
this.port = Preconditions.checkNotNull(port, "port can not be null");
diff --git
a/extensions-contrib/kafka-eight-simpleConsumer/src/main/java/io/druid/firehose/kafka/KafkaEightSimpleConsumerFirehoseFactory.java
b/extensions-contrib/kafka-eight-simpleConsumer/src/main/java/io/druid/firehose/kafka/KafkaEightSimpleConsumerFirehoseFactory.java
index 8461c9b..4109a43 100644
---
a/extensions-contrib/kafka-eight-simpleConsumer/src/main/java/io/druid/firehose/kafka/KafkaEightSimpleConsumerFirehoseFactory.java
+++
b/extensions-contrib/kafka-eight-simpleConsumer/src/main/java/io/druid/firehose/kafka/KafkaEightSimpleConsumerFirehoseFactory.java
@@ -43,6 +43,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
public class KafkaEightSimpleConsumerFirehoseFactory implements
@@ -72,7 +73,7 @@ public class KafkaEightSimpleConsumerFirehoseFactory
implements
private final List<PartitionConsumerWorker> consumerWorkers = new
CopyOnWriteArrayList<>();
private static final int DEFAULT_QUEUE_BUFFER_LENGTH = 20000;
- private static final int CONSUMER_FETCH_TIMEOUT = 10000;
+ private static final int CONSUMER_FETCH_TIMEOUT_MILLIS = (int)
TimeUnit.SECONDS.toMillis(10);
@JsonCreator
public KafkaEightSimpleConsumerFirehoseFactory(
@@ -307,7 +308,7 @@ public class KafkaEightSimpleConsumerFirehoseFactory
implements
try {
while (!stopped.get()) {
try {
- Iterable<BytesMessageWithOffset> msgs = consumer.fetch(offset,
CONSUMER_FETCH_TIMEOUT);
+ Iterable<BytesMessageWithOffset> msgs = consumer.fetch(offset,
CONSUMER_FETCH_TIMEOUT_MILLIS);
int count = 0;
for (BytesMessageWithOffset msgWithOffset : msgs) {
offset = msgWithOffset.offset();
diff --git
a/extensions-contrib/kafka-eight-simpleConsumer/src/main/java/io/druid/firehose/kafka/KafkaSimpleConsumer.java
b/extensions-contrib/kafka-eight-simpleConsumer/src/main/java/io/druid/firehose/kafka/KafkaSimpleConsumer.java
index aabef05..21a3664 100644
---
a/extensions-contrib/kafka-eight-simpleConsumer/src/main/java/io/druid/firehose/kafka/KafkaSimpleConsumer.java
+++
b/extensions-contrib/kafka-eight-simpleConsumer/src/main/java/io/druid/firehose/kafka/KafkaSimpleConsumer.java
@@ -49,6 +49,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
/**
* refer @{link
https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example}
@@ -74,10 +75,10 @@ public class KafkaSimpleConsumer
private List<HostAndPort> replicaBrokers;
private SimpleConsumer consumer = null;
- private static final int SO_TIMEOUT = 30000;
+ private static final int SO_TIMEOUT_MILLIS = (int)
TimeUnit.SECONDS.toMillis(30);
private static final int BUFFER_SIZE = 65536;
- private static final long RETRY_INTERVAL = 1000L;
- private static final int FETCH_SIZE = 100000000;
+ private static final long RETRY_INTERVAL_MILLIS =
TimeUnit.MINUTES.toMillis(1);
+ private static final int FETCH_SIZE = 100_000_000;
public KafkaSimpleConsumer(String topic, int partitionId, String clientId,
List<String> brokers, boolean earliest)
{
@@ -121,7 +122,7 @@ public class KafkaSimpleConsumer
);
consumer = new SimpleConsumer(
- leaderBroker.host(), leaderBroker.port(), SO_TIMEOUT, BUFFER_SIZE,
clientId
+ leaderBroker.host(), leaderBroker.port(), SO_TIMEOUT_MILLIS,
BUFFER_SIZE, clientId
);
}
}
@@ -306,7 +307,7 @@ public class KafkaSimpleConsumer
SimpleConsumer consumer = null;
try {
log.info("Finding new leader from Kafka brokers, try broker [%s]",
broker.toString());
- consumer = new SimpleConsumer(broker.getHostText(), broker.getPort(),
SO_TIMEOUT, BUFFER_SIZE, leaderLookupClientId);
+ consumer = new SimpleConsumer(broker.getHostText(), broker.getPort(),
SO_TIMEOUT_MILLIS, BUFFER_SIZE, leaderLookupClientId);
TopicMetadataResponse resp = consumer.send(new
TopicMetadataRequest(Collections.singletonList(topic)));
List<TopicMetadata> metaData = resp.topicsMetadata();
@@ -361,7 +362,7 @@ public class KafkaSimpleConsumer
}
}
- Thread.sleep(RETRY_INTERVAL);
+ Thread.sleep(RETRY_INTERVAL_MILLIS);
retryCnt++;
// if could not find the leader for current replicaBrokers, let's try to
// find one via allBrokers
diff --git
a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchHolder.java
b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchHolder.java
index a2f14aa..e72cbfe 100644
---
a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchHolder.java
+++
b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchHolder.java
@@ -243,7 +243,7 @@ public class SketchHolder
{
UNION,
INTERSECT,
- NOT;
+ NOT
}
public static SketchHolder sketchSetOperation(Func func, int sketchSize,
Object... holders)
diff --git
a/extensions-core/kafka-extraction-namespace/src/test/java/io/druid/query/lookup/TestKafkaExtractionCluster.java
b/extensions-core/kafka-extraction-namespace/src/test/java/io/druid/query/lookup/TestKafkaExtractionCluster.java
index a000679..4fcf917 100644
---
a/extensions-core/kafka-extraction-namespace/src/test/java/io/druid/query/lookup/TestKafkaExtractionCluster.java
+++
b/extensions-core/kafka-extraction-namespace/src/test/java/io/druid/query/lookup/TestKafkaExtractionCluster.java
@@ -61,6 +61,7 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
/**
*
@@ -149,7 +150,7 @@ public class TestKafkaExtractionCluster
@Override
public long nanoseconds()
{
- return milliseconds() * 1_000_000;
+ return TimeUnit.MILLISECONDS.toNanos(milliseconds());
}
@Override
diff --git
a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
index 6d42416..434e16e 100644
---
a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
+++
b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
@@ -448,7 +448,7 @@ public class IncrementalPublishingKafkaIndexTaskRunner
implements KafkaIndexTask
// that has not been written yet (which is totally legitimate). So
let's wait for it to show up.
ConsumerRecords<byte[], byte[]> records = ConsumerRecords.empty();
try {
- records = consumer.poll(KafkaIndexTask.POLL_TIMEOUT);
+ records = consumer.poll(KafkaIndexTask.POLL_TIMEOUT_MILLIS);
}
catch (OffsetOutOfRangeException e) {
log.warn("OffsetOutOfRangeException with message [%s]",
e.getMessage());
diff --git
a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java
b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java
index 8cb6323..0e7ebcb 100644
---
a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java
+++
b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java
@@ -65,6 +65,7 @@ import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
public class KafkaIndexTask extends AbstractTask implements ChatHandler
@@ -83,7 +84,7 @@ public class KafkaIndexTask extends AbstractTask implements
ChatHandler
private static final EmittingLogger log = new
EmittingLogger(KafkaIndexTask.class);
private static final String TYPE = "index_kafka";
private static final Random RANDOM = new Random();
- static final long POLL_TIMEOUT = 100;
+ static final long POLL_TIMEOUT_MILLIS = TimeUnit.MILLISECONDS.toMillis(100);
static final long LOCK_ACQUIRE_TIMEOUT_SECONDS = 15;
private final DataSchema dataSchema;
diff --git
a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java
b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java
index a9dff63..a8016a1 100644
---
a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java
+++
b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java
@@ -371,7 +371,7 @@ public class LegacyKafkaIndexTaskRunner implements
KafkaIndexTaskRunner
// that has not been written yet (which is totally legitimate). So
let's wait for it to show up.
ConsumerRecords<byte[], byte[]> records = ConsumerRecords.empty();
try {
- records = consumer.poll(KafkaIndexTask.POLL_TIMEOUT);
+ records = consumer.poll(KafkaIndexTask.POLL_TIMEOUT_MILLIS);
}
catch (OffsetOutOfRangeException e) {
log.warn("OffsetOutOfRangeException with message [%s]",
e.getMessage());
diff --git
a/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java
b/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java
index 4a72114..3982806 100644
---
a/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java
+++
b/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java
@@ -327,7 +327,7 @@ public class HadoopIndexTask extends HadoopTask implements
ChatHandler
indexerSchema.getDataSchema().getGranularitySpec().bucketIntervals().get()
)
);
- final long lockTimeoutMs = getContextValue(Tasks.LOCK_TIMEOUT_KEY,
Tasks.DEFAULT_LOCK_TIMEOUT);
+ final long lockTimeoutMs = getContextValue(Tasks.LOCK_TIMEOUT_KEY,
Tasks.DEFAULT_LOCK_TIMEOUT_MILLIS);
// Note: if lockTimeoutMs is larger than ServerConfig.maxIdleTime, the
below line can incur http timeout error.
final TaskLock lock = Preconditions.checkNotNull(
toolbox.getTaskActionClient().submit(
diff --git
a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java
b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java
index a496182..c374604 100644
---
a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java
+++
b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java
@@ -220,7 +220,7 @@ public class RealtimeIndexTask extends AbstractTask
// which will typically be either the main data processing loop or the
persist thread.
// Wrap default DataSegmentAnnouncer such that we unlock intervals as we
unannounce segments
- final long lockTimeoutMs = getContextValue(Tasks.LOCK_TIMEOUT_KEY,
Tasks.DEFAULT_LOCK_TIMEOUT);
+ final long lockTimeoutMs = getContextValue(Tasks.LOCK_TIMEOUT_KEY,
Tasks.DEFAULT_LOCK_TIMEOUT_MILLIS);
// Note: if lockTimeoutMs is larger than ServerConfig.maxIdleTime, http
timeout error can occur while waiting for a
// lock to be acquired.
final DataSegmentAnnouncer lockingSegmentAnnouncer = new
DataSegmentAnnouncer()
diff --git
a/indexing-service/src/main/java/io/druid/indexing/common/task/Tasks.java
b/indexing-service/src/main/java/io/druid/indexing/common/task/Tasks.java
index e15d6e5..c9bf0aa 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/task/Tasks.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/task/Tasks.java
@@ -35,6 +35,7 @@ import java.util.List;
import java.util.Map;
import java.util.SortedSet;
import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
public class Tasks
{
@@ -42,7 +43,7 @@ public class Tasks
public static final int DEFAULT_BATCH_INDEX_TASK_PRIORITY = 50;
public static final int DEFAULT_MERGE_TASK_PRIORITY = 25;
public static final int DEFAULT_TASK_PRIORITY = 0;
- public static final long DEFAULT_LOCK_TIMEOUT = 5 * 60 * 1000; // 5 min
+ public static final long DEFAULT_LOCK_TIMEOUT_MILLIS =
TimeUnit.MINUTES.toMillis(5);
public static final String PRIORITY_KEY = "priority";
public static final String LOCK_TIMEOUT_KEY = "taskLockTimeout";
diff --git
a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskQueue.java
b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskQueue.java
index 3946248..a14b7ad 100644
--- a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskQueue.java
+++ b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskQueue.java
@@ -54,6 +54,7 @@ import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
@@ -68,6 +69,8 @@ import java.util.concurrent.locks.ReentrantLock;
*/
public class TaskQueue
{
+ private final long MANAGEMENT_WAIT_TIMEOUT_NANOS =
TimeUnit.SECONDS.toNanos(60);
+
private final List<Task> tasks = Lists.newArrayList();
private final Map<String, ListenableFuture<TaskStatus>> taskFutures =
Maps.newHashMap();
@@ -290,7 +293,7 @@ public class TaskQueue
}
// awaitNanos because management may become necessary without this
condition signalling,
// due to e.g. tasks becoming ready when other folks mess with the
TaskLockbox.
- managementMayBeNecessary.awaitNanos(60000000000L /* 60 seconds */);
+ managementMayBeNecessary.awaitNanos(MANAGEMENT_WAIT_TIMEOUT_NANOS);
}
finally {
giant.unlock();
diff --git
a/java-util/src/main/java/io/druid/java/util/http/client/pool/ChannelResourceFactory.java
b/java-util/src/main/java/io/druid/java/util/http/client/pool/ChannelResourceFactory.java
index 04124b2..be04e72 100644
---
a/java-util/src/main/java/io/druid/java/util/http/client/pool/ChannelResourceFactory.java
+++
b/java-util/src/main/java/io/druid/java/util/http/client/pool/ChannelResourceFactory.java
@@ -41,6 +41,7 @@ import javax.net.ssl.SSLParameters;
import java.net.InetSocketAddress;
import java.net.MalformedURLException;
import java.net.URL;
+import java.util.concurrent.TimeUnit;
/**
*/
@@ -48,7 +49,7 @@ public class ChannelResourceFactory implements
ResourceFactory<String, ChannelFu
{
private static final Logger log = new Logger(ChannelResourceFactory.class);
- private static final long DEFAULT_SSL_HANDSHAKE_TIMEOUT = 10000L; /* 10
seconds */
+ private static final long DEFAULT_SSL_HANDSHAKE_TIMEOUT_MILLIS =
TimeUnit.SECONDS.toMillis(10);
private final ClientBootstrap bootstrap;
private final SSLContext sslContext;
@@ -65,7 +66,7 @@ public class ChannelResourceFactory implements
ResourceFactory<String, ChannelFu
this.bootstrap = Preconditions.checkNotNull(bootstrap, "bootstrap");
this.sslContext = sslContext;
this.timer = timer;
- this.sslHandshakeTimeout = sslHandshakeTimeout >= 0 ? sslHandshakeTimeout
: DEFAULT_SSL_HANDSHAKE_TIMEOUT;
+ this.sslHandshakeTimeout = sslHandshakeTimeout >= 0 ? sslHandshakeTimeout
: DEFAULT_SSL_HANDSHAKE_TIMEOUT_MILLIS;
if (sslContext != null) {
Preconditions.checkNotNull(timer, "timer is required when sslContext is
present");
diff --git a/processing/src/main/java/io/druid/query/QueryContexts.java
b/processing/src/main/java/io/druid/query/QueryContexts.java
index 4252ad4..fe5c762 100644
--- a/processing/src/main/java/io/druid/query/QueryContexts.java
+++ b/processing/src/main/java/io/druid/query/QueryContexts.java
@@ -25,6 +25,8 @@ import io.druid.guice.annotations.PublicApi;
import io.druid.java.util.common.IAE;
import io.druid.java.util.common.Numbers;
+import java.util.concurrent.TimeUnit;
+
@PublicApi
public class QueryContexts
{
@@ -41,7 +43,7 @@ public class QueryContexts
public static final boolean DEFAULT_USE_RESULTLEVEL_CACHE = true;
public static final int DEFAULT_PRIORITY = 0;
public static final int DEFAULT_UNCOVERED_INTERVALS_LIMIT = 0;
- public static final long DEFAULT_TIMEOUT_MILLIS = 300_000; // 5 minutes
+ public static final long DEFAULT_TIMEOUT_MILLIS =
TimeUnit.MINUTES.toMillis(5);
public static final long NO_TIMEOUT = 0;
public static <T> boolean isBySegment(Query<T> query)
diff --git a/processing/src/test/java/io/druid/query/AsyncQueryRunnerTest.java
b/processing/src/test/java/io/druid/query/AsyncQueryRunnerTest.java
index b9f18a6..0ea8fc8 100644
--- a/processing/src/test/java/io/druid/query/AsyncQueryRunnerTest.java
+++ b/processing/src/test/java/io/druid/query/AsyncQueryRunnerTest.java
@@ -38,7 +38,7 @@ import java.util.concurrent.TimeoutException;
public class AsyncQueryRunnerTest
{
- private static final long TEST_TIMEOUT = 60000;
+ private static final long TEST_TIMEOUT_MILLIS = 60_000;
private final ExecutorService executor;
private final Query query;
@@ -53,7 +53,7 @@ public class AsyncQueryRunnerTest
.build();
}
- @Test(timeout = TEST_TIMEOUT)
+ @Test(timeout = TEST_TIMEOUT_MILLIS)
public void testAsyncNature()
{
final CountDownLatch latch = new CountDownLatch(1);
@@ -83,7 +83,7 @@ public class AsyncQueryRunnerTest
Assert.assertEquals(Collections.singletonList(1), lazy.toList());
}
- @Test(timeout = TEST_TIMEOUT)
+ @Test(timeout = TEST_TIMEOUT_MILLIS)
public void testQueryTimeoutHonored()
{
QueryRunner baseRunner = new QueryRunner()
diff --git
a/server/src/main/java/io/druid/guice/http/JettyHttpClientModule.java
b/server/src/main/java/io/druid/guice/http/JettyHttpClientModule.java
index 8309848..80f167e 100644
--- a/server/src/main/java/io/druid/guice/http/JettyHttpClientModule.java
+++ b/server/src/main/java/io/druid/guice/http/JettyHttpClientModule.java
@@ -31,12 +31,13 @@ import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import java.lang.annotation.Annotation;
+import java.util.concurrent.TimeUnit;
/**
*/
public class JettyHttpClientModule implements Module
{
- private static final long CLIENT_CONNECT_TIMEOUT = 500;
+ private static final long CLIENT_CONNECT_TIMEOUT_MILLIS =
TimeUnit.MILLISECONDS.toMillis(500);
public static JettyHttpClientModule global()
{
@@ -120,7 +121,7 @@ public class JettyHttpClientModule implements Module
httpClient.setIdleTimeout(config.getReadTimeout().getMillis());
httpClient.setMaxConnectionsPerDestination(config.getNumConnections());
httpClient.setMaxRequestsQueuedPerDestination(config.getNumRequestsQueued());
- httpClient.setConnectTimeout(CLIENT_CONNECT_TIMEOUT);
+ httpClient.setConnectTimeout(CLIENT_CONNECT_TIMEOUT_MILLIS);
httpClient.setRequestBufferSize(config.getRequestBuffersize());
final QueuedThreadPool pool = new
QueuedThreadPool(config.getNumMaxThreads());
pool.setName(JettyHttpClientModule.class.getSimpleName() +
"-threadPool-" + pool.hashCode());
diff --git
a/server/src/main/java/io/druid/segment/loading/SegmentLoaderConfig.java
b/server/src/main/java/io/druid/segment/loading/SegmentLoaderConfig.java
index 4b2eddc..306d6c8 100644
--- a/server/src/main/java/io/druid/segment/loading/SegmentLoaderConfig.java
+++ b/server/src/main/java/io/druid/segment/loading/SegmentLoaderConfig.java
@@ -26,6 +26,7 @@ import org.hibernate.validator.constraints.NotEmpty;
import java.io.File;
import java.util.List;
+import java.util.concurrent.TimeUnit;
/**
*/
@@ -39,7 +40,7 @@ public class SegmentLoaderConfig
private boolean deleteOnRemove = true;
@JsonProperty("dropSegmentDelayMillis")
- private int dropSegmentDelayMillis = 30 * 1000; // 30 seconds
+ private int dropSegmentDelayMillis = (int) TimeUnit.SECONDS.toMillis(30);
@JsonProperty("announceIntervalMillis")
private int announceIntervalMillis = 0; // do not background announce
diff --git
a/server/src/main/java/io/druid/server/initialization/ServerConfig.java
b/server/src/main/java/io/druid/server/initialization/ServerConfig.java
index 3827df9..0ef91cc 100644
--- a/server/src/main/java/io/druid/server/initialization/ServerConfig.java
+++ b/server/src/main/java/io/druid/server/initialization/ServerConfig.java
@@ -26,6 +26,7 @@ import javax.validation.constraints.Max;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;
import java.util.Objects;
+import java.util.concurrent.TimeUnit;
import java.util.zip.Deflater;
/**
@@ -52,7 +53,7 @@ public class ServerConfig
@JsonProperty
@Min(0)
- private long defaultQueryTimeout = 300_000; // 5 minutes
+ private long defaultQueryTimeout = TimeUnit.MINUTES.toMillis(5);
@JsonProperty
@Min(1)
diff --git
a/server/src/test/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java
b/server/src/test/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java
index 1c7abb1..6eb29d8 100644
---
a/server/src/test/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java
+++
b/server/src/test/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java
@@ -73,7 +73,7 @@ public class StreamAppenderatorDriverFailTest extends
EasyMockSupport
{
private static final String DATA_SOURCE = "foo";
private static final ObjectMapper OBJECT_MAPPER = new DefaultObjectMapper();
- private static final long PUBLISH_TIMEOUT = 5000;
+ private static final long PUBLISH_TIMEOUT_MILLIS =
TimeUnit.SECONDS.toMillis(5);
private static final List<InputRow> ROWS = ImmutableList.of(
new MapBasedInputRow(
@@ -153,7 +153,7 @@ public class StreamAppenderatorDriverFailTest extends
EasyMockSupport
StreamAppenderatorDriverTest.makeOkPublisher(),
committerSupplier.get(),
ImmutableList.of("dummy")
- ).get(PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS);
+ ).get(PUBLISH_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
}
@Test
@@ -191,7 +191,7 @@ public class StreamAppenderatorDriverFailTest extends
EasyMockSupport
StreamAppenderatorDriverTest.makeOkPublisher(),
committerSupplier.get(),
ImmutableList.of("dummy")
- ).get(PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS);
+ ).get(PUBLISH_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
}
@Test
@@ -229,7 +229,7 @@ public class StreamAppenderatorDriverFailTest extends
EasyMockSupport
StreamAppenderatorDriverTest.makeOkPublisher(),
committerSupplier.get(),
ImmutableList.of("dummy")
- ).get(PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS);
+ ).get(PUBLISH_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
driver.registerHandoff(published).get();
}
@@ -314,7 +314,7 @@ public class StreamAppenderatorDriverFailTest extends
EasyMockSupport
StreamAppenderatorDriverTest.makeFailingPublisher(failWithException),
committerSupplier.get(),
ImmutableList.of("dummy")
- ).get(PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS);
+ ).get(PUBLISH_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
}
catch (Exception e) {
throw e;
diff --git
a/server/src/test/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java
b/server/src/test/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java
index b9c5e22..32f04a1 100644
---
a/server/src/test/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java
+++
b/server/src/test/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java
@@ -72,8 +72,8 @@ public class StreamAppenderatorDriverTest extends
EasyMockSupport
private static final ObjectMapper OBJECT_MAPPER = new DefaultObjectMapper();
private static final int MAX_ROWS_IN_MEMORY = 100;
private static final int MAX_ROWS_PER_SEGMENT = 3;
- private static final long PUBLISH_TIMEOUT = 10000;
- private static final long HANDOFF_CONDITION_TIMEOUT = 1000;
+ private static final long PUBLISH_TIMEOUT_MILLIS =
TimeUnit.SECONDS.toMillis(10);
+ private static final long HANDOFF_CONDITION_TIMEOUT_MILLIS =
TimeUnit.SECONDS.toMillis(1);
private static final List<InputRow> ROWS = Arrays.asList(
new MapBasedInputRow(
@@ -144,14 +144,14 @@ public class StreamAppenderatorDriverTest extends
EasyMockSupport
makeOkPublisher(),
committerSupplier.get(),
ImmutableList.of("dummy")
- ).get(PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS);
+ ).get(PUBLISH_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
while (driver.getSegments().containsKey("dummy")) {
Thread.sleep(100);
}
final SegmentsAndMetadata segmentsAndMetadata =
driver.registerHandoff(published)
-
.get(HANDOFF_CONDITION_TIMEOUT, TimeUnit.MILLISECONDS);
+
.get(HANDOFF_CONDITION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
Assert.assertEquals(
ImmutableSet.of(
@@ -194,14 +194,14 @@ public class StreamAppenderatorDriverTest extends
EasyMockSupport
makeOkPublisher(),
committerSupplier.get(),
ImmutableList.of("dummy")
- ).get(PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS);
+ ).get(PUBLISH_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
while (driver.getSegments().containsKey("dummy")) {
Thread.sleep(100);
}
final SegmentsAndMetadata segmentsAndMetadata =
driver.registerHandoff(published)
-
.get(HANDOFF_CONDITION_TIMEOUT, TimeUnit.MILLISECONDS);
+
.get(HANDOFF_CONDITION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
Assert.assertEquals(numSegments, segmentsAndMetadata.getSegments().size());
Assert.assertEquals(numSegments * MAX_ROWS_PER_SEGMENT,
segmentsAndMetadata.getCommitMetadata());
}
@@ -223,13 +223,13 @@ public class StreamAppenderatorDriverTest extends
EasyMockSupport
makeOkPublisher(),
committerSupplier.get(),
ImmutableList.of("dummy")
- ).get(PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS);
+ ).get(PUBLISH_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
while (driver.getSegments().containsKey("dummy")) {
Thread.sleep(100);
}
- driver.registerHandoff(published).get(HANDOFF_CONDITION_TIMEOUT,
TimeUnit.MILLISECONDS);
+ driver.registerHandoff(published).get(HANDOFF_CONDITION_TIMEOUT_MILLIS,
TimeUnit.MILLISECONDS);
}
@Test
@@ -248,7 +248,7 @@ public class StreamAppenderatorDriverTest extends
EasyMockSupport
makeOkPublisher(),
committerSupplier.get(),
ImmutableList.of("dummy")
- ).get(PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS);
+ ).get(PUBLISH_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
Assert.assertEquals(
ImmutableSet.of(
@@ -269,7 +269,7 @@ public class StreamAppenderatorDriverTest extends
EasyMockSupport
makeOkPublisher(),
committerSupplier.get(),
ImmutableList.of("dummy")
- ).get(PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS);
+ ).get(PUBLISH_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
Assert.assertEquals(
ImmutableSet.of(
@@ -290,7 +290,7 @@ public class StreamAppenderatorDriverTest extends
EasyMockSupport
makeOkPublisher(),
committerSupplier.get(),
ImmutableList.of("dummy")
- ).get(PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS);
+ ).get(PUBLISH_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
Assert.assertEquals(
ImmutableSet.of(),
@@ -328,11 +328,11 @@ public class StreamAppenderatorDriverTest extends
EasyMockSupport
);
final SegmentsAndMetadata handedoffFromSequence0 = futureForSequence0.get(
- HANDOFF_CONDITION_TIMEOUT,
+ HANDOFF_CONDITION_TIMEOUT_MILLIS,
TimeUnit.MILLISECONDS
);
final SegmentsAndMetadata handedoffFromSequence1 = futureForSequence1.get(
- HANDOFF_CONDITION_TIMEOUT,
+ HANDOFF_CONDITION_TIMEOUT_MILLIS,
TimeUnit.MILLISECONDS
);
diff --git
a/server/src/test/java/io/druid/segment/realtime/firehose/IngestSegmentFirehoseTest.java
b/server/src/test/java/io/druid/segment/realtime/firehose/IngestSegmentFirehoseTest.java
index 1556b2f..bbf92b1 100644
---
a/server/src/test/java/io/druid/segment/realtime/firehose/IngestSegmentFirehoseTest.java
+++
b/server/src/test/java/io/druid/segment/realtime/firehose/IngestSegmentFirehoseTest.java
@@ -134,7 +134,7 @@ public class IngestSegmentFirehoseTest
.build()
)
.setMaxRowCount(5000)
- .buildOnheap();
+ .buildOnheap()
) {
final StorageAdapter sa = new QueryableIndexStorageAdapter(qi);
final WindowedStorageAdapter wsa = new WindowedStorageAdapter(sa,
sa.getInterval());
@@ -224,7 +224,7 @@ public class IngestSegmentFirehoseTest
.build()
)
.setMaxRowCount(5000)
- .buildOnheap();
+ .buildOnheap()
) {
for (String line : rows) {
index.add(parser.parse(line));
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]