This is an automated email from the ASF dual-hosted git repository.
leventov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/master by this push:
new 3bee6ad Use map.putIfAbsent() or map.computeIfAbsent() as appropriate
instead of containsKey() + put() (#7764)
3bee6ad is described below
commit 3bee6adcf7338b501687f11ea866608e873c5b0a
Author: Sashidhar Thallam <[email protected]>
AuthorDate: Fri Jun 14 21:29:36 2019 +0530
Use map.putIfAbsent() or map.computeIfAbsent() as appropriate instead of
containsKey() + put() (#7764)
* https://github.com/apache/incubator-druid/issues/7316 Use
Map.putIfAbsent() instead of containsKey() + put()
* fixing indentation
* Using map.computeIfAbsent() instead of map.putIfAbsent() where appropriate
* fixing checkstyle
* Changing the recommendation text
* Reverting auto changes made by IDE
* Implementing recommendation: A ConcurrentHashMap on which
computeIfAbsent() is called should be assigned into variables of
ConcurrentHashMap type, not ConcurrentMap
* Removing unused import
---
.idea/inspectionProfiles/Druid.xml | 7 +-
.../druid/storage/s3/S3DataSegmentMoverTest.java | 4 +-
.../indexer/DetermineHashedPartitionsJob.java | 4 +-
.../ActionBasedUsedSegmentChecker.java | 5 +-
.../druid/indexing/common/task/IndexTask.java | 8 +-
.../firehose/IngestSegmentFirehoseFactory.java | 149 +++---
.../druid/indexing/overlord/ForkingTaskRunner.java | 548 ++++++++++-----------
.../metadata/SQLMetadataSupervisorManager.java | 4 +-
.../druid/server/http/DataSourcesResource.java | 4 +-
.../druid/client/CachingClusteredClientTest.java | 4 +-
.../appenderator/StreamAppenderatorDriverTest.java | 4 +-
.../java/org/apache/druid/sql/SqlLifecycle.java | 4 +-
.../util/SpecificSegmentsQuerySegmentWalker.java | 4 +-
13 files changed, 365 insertions(+), 384 deletions(-)
diff --git a/.idea/inspectionProfiles/Druid.xml
b/.idea/inspectionProfiles/Druid.xml
index 9770c11..ed890c8 100644
--- a/.idea/inspectionProfiles/Druid.xml
+++ b/.idea/inspectionProfiles/Druid.xml
@@ -306,6 +306,11 @@
<constraint name="x" maxCount="2147483647" within="" contains="" />
<constraint name="ImmutableMap" regexp="Immutable.*" within=""
contains="" />
</searchConfiguration>
+ <searchConfiguration name="Use map.putIfAbsent(k,v) or
map.computeIfAbsent(k,v) where appropriate instead of containsKey() + put(). If
computing v is expensive or has side effects use map.computeIfAbsent() instead"
created="1558868694225" text="if (!$m$.containsKey($k$)) { $m$.put($k$,
$v$); }" recursive="false" caseInsensitive="true" type="JAVA">
+ <constraint name="m" within="" contains="" />
+ <constraint name="k" within="" contains="" />
+ <constraint name="v" within="" contains="" />
+ </searchConfiguration>
</inspection_tool>
<inspection_tool class="SimplifyStreamApiCallChains" enabled="true"
level="ERROR" enabled_by_default="true" />
<inspection_tool class="SpellCheckingInspection" enabled="false"
level="TYPO" enabled_by_default="false">
@@ -400,4 +405,4 @@
<option name="ADD_NONJAVA_TO_ENTRIES" value="true" />
</inspection_tool>
</profile>
-</component>
\ No newline at end of file
+</component>
diff --git
a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3DataSegmentMoverTest.java
b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3DataSegmentMoverTest.java
index ec30aa9..34496d9 100644
---
a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3DataSegmentMoverTest.java
+++
b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3DataSegmentMoverTest.java
@@ -261,9 +261,7 @@ public class S3DataSegmentMoverTest
@Override
public PutObjectResult putObject(String bucketName, String key, File file)
{
- if (!storage.containsKey(bucketName)) {
- storage.put(bucketName, new HashSet<>());
- }
+ storage.putIfAbsent(bucketName, new HashSet<>());
storage.get(bucketName).add(key);
return new PutObjectResult();
}
diff --git
a/indexing-hadoop/src/main/java/org/apache/druid/indexer/DetermineHashedPartitionsJob.java
b/indexing-hadoop/src/main/java/org/apache/druid/indexer/DetermineHashedPartitionsJob.java
index c83bc08..17f5172 100644
---
a/indexing-hadoop/src/main/java/org/apache/druid/indexer/DetermineHashedPartitionsJob.java
+++
b/indexing-hadoop/src/main/java/org/apache/druid/indexer/DetermineHashedPartitionsJob.java
@@ -307,9 +307,7 @@ public class DetermineHashedPartitionsJob implements Jobby
.getSegmentGranularity()
.bucket(DateTimes.utc(inputRow.getTimestampFromEpoch()));
- if (!hyperLogLogs.containsKey(interval)) {
- hyperLogLogs.put(interval,
HyperLogLogCollector.makeLatestCollector());
- }
+ hyperLogLogs.computeIfAbsent(interval, intv ->
HyperLogLogCollector.makeLatestCollector());
} else {
final Optional<Interval> maybeInterval = config.getGranularitySpec()
.bucketInterval(DateTimes.utc(inputRow.getTimestampFromEpoch()));
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/appenderator/ActionBasedUsedSegmentChecker.java
b/indexing-service/src/main/java/org/apache/druid/indexing/appenderator/ActionBasedUsedSegmentChecker.java
index a1eb90f..96ce6ae 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/appenderator/ActionBasedUsedSegmentChecker.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/appenderator/ActionBasedUsedSegmentChecker.java
@@ -50,9 +50,8 @@ public class ActionBasedUsedSegmentChecker implements
UsedSegmentChecker
// Group by dataSource
final Map<String, Set<SegmentIdWithShardSpec>> identifiersByDataSource =
new TreeMap<>();
for (SegmentIdWithShardSpec identifier : identifiers) {
- if (!identifiersByDataSource.containsKey(identifier.getDataSource())) {
- identifiersByDataSource.put(identifier.getDataSource(), new
HashSet<>());
- }
+ identifiersByDataSource.computeIfAbsent(identifier.getDataSource(), k ->
new HashSet<>());
+
identifiersByDataSource.get(identifier.getDataSource()).add(identifier);
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
index b7e5123..458c621 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
@@ -768,9 +768,7 @@ public class IndexTask extends AbstractTask implements
ChatHandler
}
if (determineNumPartitions) {
- if (!hllCollectors.containsKey(interval)) {
- hllCollectors.put(interval,
Optional.of(HyperLogLogCollector.makeLatestCollector()));
- }
+ hllCollectors.computeIfAbsent(interval, intv ->
Optional.of(HyperLogLogCollector.makeLatestCollector()));
List<Object> groupKey = Rows.toGroupKey(
queryGranularity.bucketStart(inputRow.getTimestamp()).getMillis(),
@@ -781,9 +779,7 @@ public class IndexTask extends AbstractTask implements
ChatHandler
} else {
// we don't need to determine partitions but we still need to
determine intervals, so add an Optional.absent()
// for the interval and don't instantiate a HLL collector
- if (!hllCollectors.containsKey(interval)) {
- hllCollectors.put(interval, Optional.absent());
- }
+ hllCollectors.putIfAbsent(interval, Optional.absent());
}
determinePartitionsMeters.incrementProcessed();
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java
b/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java
index 4d1e0bf..2caf91d 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java
@@ -204,87 +204,87 @@ public class IngestSegmentFirehoseFactory implements
FiniteFirehoseFactory<Input
segmentIds
);
- try {
- final List<TimelineObjectHolder<String, DataSegment>> timeLineSegments =
getTimeline();
-
- // Download all segments locally.
- // Note: this requires enough local storage space to fit all of the
segments, even though
- // IngestSegmentFirehose iterates over the segments in series. We may
want to change this
- // to download files lazily, perhaps sharing code with
PrefetchableTextFilesFirehoseFactory.
- final SegmentLoader segmentLoader =
segmentLoaderFactory.manufacturate(temporaryDirectory);
- Map<DataSegment, File> segmentFileMap = Maps.newLinkedHashMap();
- for (TimelineObjectHolder<String, DataSegment> holder :
timeLineSegments) {
- for (PartitionChunk<DataSegment> chunk : holder.getObject()) {
- final DataSegment segment = chunk.getObject();
- if (!segmentFileMap.containsKey(segment)) {
- segmentFileMap.put(segment,
segmentLoader.getSegmentFiles(segment));
+ final List<TimelineObjectHolder<String, DataSegment>> timeLineSegments =
getTimeline();
+
+ // Download all segments locally.
+ // Note: this requires enough local storage space to fit all of the
segments, even though
+ // IngestSegmentFirehose iterates over the segments in series. We may want
to change this
+ // to download files lazily, perhaps sharing code with
PrefetchableTextFilesFirehoseFactory.
+ final SegmentLoader segmentLoader =
segmentLoaderFactory.manufacturate(temporaryDirectory);
+ Map<DataSegment, File> segmentFileMap = Maps.newLinkedHashMap();
+ for (TimelineObjectHolder<String, DataSegment> holder : timeLineSegments) {
+ for (PartitionChunk<DataSegment> chunk : holder.getObject()) {
+ final DataSegment segment = chunk.getObject();
+
+ segmentFileMap.computeIfAbsent(segment, k -> {
+ try {
+ return segmentLoader.getSegmentFiles(segment);
}
- }
- }
+ catch (SegmentLoadingException e) {
+ throw new RuntimeException(e);
+ }
+ });
- final List<String> dims;
- if (dimensions != null) {
- dims = dimensions;
- } else if
(inputRowParser.getParseSpec().getDimensionsSpec().hasCustomDimensions()) {
- dims =
inputRowParser.getParseSpec().getDimensionsSpec().getDimensionNames();
- } else {
- dims = getUniqueDimensions(
- timeLineSegments,
-
inputRowParser.getParseSpec().getDimensionsSpec().getDimensionExclusions()
- );
}
+ }
- final List<String> metricsList = metrics == null ?
getUniqueMetrics(timeLineSegments) : metrics;
+ final List<String> dims;
+ if (dimensions != null) {
+ dims = dimensions;
+ } else if
(inputRowParser.getParseSpec().getDimensionsSpec().hasCustomDimensions()) {
+ dims =
inputRowParser.getParseSpec().getDimensionsSpec().getDimensionNames();
+ } else {
+ dims = getUniqueDimensions(
+ timeLineSegments,
+
inputRowParser.getParseSpec().getDimensionsSpec().getDimensionExclusions()
+ );
+ }
- final List<WindowedStorageAdapter> adapters = Lists.newArrayList(
- Iterables.concat(
- Iterables.transform(
- timeLineSegments,
- new Function<TimelineObjectHolder<String, DataSegment>,
Iterable<WindowedStorageAdapter>>()
- {
+ final List<String> metricsList = metrics == null ?
getUniqueMetrics(timeLineSegments) : metrics;
+
+ final List<WindowedStorageAdapter> adapters = Lists.newArrayList(
+ Iterables.concat(
+ Iterables.transform(
+ timeLineSegments,
+ new Function<TimelineObjectHolder<String, DataSegment>,
Iterable<WindowedStorageAdapter>>() {
+ @Override
+ public Iterable<WindowedStorageAdapter> apply(final
TimelineObjectHolder<String, DataSegment> holder)
+ {
+ return
+ Iterables.transform(
+ holder.getObject(),
+ new Function<PartitionChunk<DataSegment>,
WindowedStorageAdapter>() {
@Override
- public Iterable<WindowedStorageAdapter> apply(final
TimelineObjectHolder<String, DataSegment> holder)
+ public WindowedStorageAdapter apply(final
PartitionChunk<DataSegment> input)
{
- return
- Iterables.transform(
- holder.getObject(),
- new Function<PartitionChunk<DataSegment>,
WindowedStorageAdapter>()
- {
- @Override
- public WindowedStorageAdapter apply(final
PartitionChunk<DataSegment> input)
- {
- final DataSegment segment =
input.getObject();
- try {
- return new WindowedStorageAdapter(
- new QueryableIndexStorageAdapter(
- indexIO.loadIndex(
- Preconditions.checkNotNull(
-
segmentFileMap.get(segment),
- "File for segment %s",
segment.getId()
- )
- )
- ),
- holder.getInterval()
- );
- }
- catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
- }
- );
+ final DataSegment segment = input.getObject();
+ try {
+ return new WindowedStorageAdapter(
+ new QueryableIndexStorageAdapter(
+ indexIO.loadIndex(
+ Preconditions.checkNotNull(
+ segmentFileMap.get(segment),
+ "File for segment %s", segment.getId()
+ )
+ )
+ ),
+ holder.getInterval()
+ );
+ }
+ catch (IOException e) {
+ throw new RuntimeException(e);
+ }
}
}
- )
- )
- );
+ );
+ }
+ }
+ )
+ )
+ );
- final TransformSpec transformSpec =
TransformSpec.fromInputRowParser(inputRowParser);
- return new IngestSegmentFirehose(adapters, transformSpec, dims,
metricsList, dimFilter);
- }
- catch (SegmentLoadingException e) {
- throw new RuntimeException(e);
- }
+ final TransformSpec transformSpec =
TransformSpec.fromInputRowParser(inputRowParser);
+ return new IngestSegmentFirehose(adapters, transformSpec, dims,
metricsList, dimFilter);
}
private long jitter(long input)
@@ -508,13 +508,14 @@ public class IngestSegmentFirehoseFactory implements
FiniteFirehoseFactory<Input
// segments to olders.
// timelineSegments are sorted in order of interval
- int index = 0;
+ int[] index = {0};
for (TimelineObjectHolder<String, DataSegment> timelineHolder :
Lists.reverse(timelineSegments)) {
for (PartitionChunk<DataSegment> chunk : timelineHolder.getObject()) {
for (String metric : chunk.getObject().getMetrics()) {
- if (!uniqueMetrics.containsKey(metric)) {
- uniqueMetrics.put(metric, index++);
+ uniqueMetrics.computeIfAbsent(metric, k -> {
+ return index[0]++;
}
+ );
}
}
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java
index 05f2f52..116747b 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java
@@ -84,7 +84,6 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
@@ -108,7 +107,7 @@ public class ForkingTaskRunner implements TaskRunner,
TaskLogStreamer
private final CopyOnWriteArrayList<Pair<TaskRunnerListener, Executor>>
listeners = new CopyOnWriteArrayList<>();
/** Writes must be synchronized. This is only a ConcurrentMap so
"informational" reads can occur without waiting. */
- private final ConcurrentMap<String, ForkingTaskRunnerWorkItem> tasks = new
ConcurrentHashMap<>();
+ private final ConcurrentHashMap<String, ForkingTaskRunnerWorkItem> tasks =
new ConcurrentHashMap<>();
private volatile boolean stopping = false;
@@ -214,309 +213,306 @@ public class ForkingTaskRunner implements TaskRunner,
TaskLogStreamer
public ListenableFuture<TaskStatus> run(final Task task)
{
synchronized (tasks) {
- if (!tasks.containsKey(task.getId())) {
- tasks.put(
- task.getId(),
- new ForkingTaskRunnerWorkItem(
- task,
- exec.submit(
- new Callable<TaskStatus>()
- {
- @Override
- public TaskStatus call()
- {
- final String attemptUUID =
UUID.randomUUID().toString();
- final File taskDir =
taskConfig.getTaskDir(task.getId());
- final File attemptDir = new File(taskDir, attemptUUID);
-
- final ProcessHolder processHolder;
- final String childHost = node.getHost();
- int childPort = -1;
- int tlsChildPort = -1;
-
- if (node.isEnablePlaintextPort()) {
- childPort = portFinder.findUnusedPort();
+ tasks.computeIfAbsent(
+ task.getId(), k ->
+ new ForkingTaskRunnerWorkItem(
+ task,
+ exec.submit(
+ new Callable<TaskStatus>() {
+ @Override
+ public TaskStatus call()
+ {
+ final String attemptUUID = UUID.randomUUID().toString();
+ final File taskDir = taskConfig.getTaskDir(task.getId());
+ final File attemptDir = new File(taskDir, attemptUUID);
+
+ final ProcessHolder processHolder;
+ final String childHost = node.getHost();
+ int childPort = -1;
+ int tlsChildPort = -1;
+
+ if (node.isEnablePlaintextPort()) {
+ childPort = portFinder.findUnusedPort();
+ }
+
+ if (node.isEnableTlsPort()) {
+ tlsChildPort = portFinder.findUnusedPort();
+ }
+
+ final TaskLocation taskLocation =
TaskLocation.create(childHost, childPort, tlsChildPort);
+
+ try {
+ final Closer closer = Closer.create();
+ try {
+ if (!attemptDir.mkdirs()) {
+ throw new IOE("Could not create directories: %s",
attemptDir);
+ }
+
+ final File taskFile = new File(taskDir, "task.json");
+ final File statusFile = new File(attemptDir,
"status.json");
+ final File logFile = new File(taskDir, "log");
+ final File reportsFile = new File(attemptDir,
"report.json");
+
+ // time to adjust process holders
+ synchronized (tasks) {
+ final ForkingTaskRunnerWorkItem taskWorkItem =
tasks.get(task.getId());
+
+ if (taskWorkItem.shutdown) {
+ throw new IllegalStateException("Task has been shut
down!");
}
- if (node.isEnableTlsPort()) {
- tlsChildPort = portFinder.findUnusedPort();
+ if (taskWorkItem == null) {
+ log.makeAlert("WTF?! TaskInfo
disappeared!").addData("task", task.getId()).emit();
+ throw new ISE("TaskInfo disappeared for task[%s]!",
task.getId());
}
- final TaskLocation taskLocation =
TaskLocation.create(childHost, childPort, tlsChildPort);
+ if (taskWorkItem.processHolder != null) {
+ log.makeAlert("WTF?! TaskInfo already has a
processHolder")
+ .addData("task", task.getId())
+ .emit();
+ throw new ISE("TaskInfo already has processHolder
for task[%s]!", task.getId());
+ }
- try {
- final Closer closer = Closer.create();
- try {
- if (!attemptDir.mkdirs()) {
- throw new IOE("Could not create directories:
%s", attemptDir);
- }
+ final List<String> command = new ArrayList<>();
+ final String taskClasspath;
+ if (task.getClasspathPrefix() != null &&
!task.getClasspathPrefix().isEmpty()) {
+ taskClasspath = Joiner.on(File.pathSeparator).join(
+ task.getClasspathPrefix(),
+ config.getClasspath()
+ );
+ } else {
+ taskClasspath = config.getClasspath();
+ }
- final File taskFile = new File(taskDir,
"task.json");
- final File statusFile = new File(attemptDir,
"status.json");
- final File logFile = new File(taskDir, "log");
- final File reportsFile = new File(attemptDir,
"report.json");
-
- // time to adjust process holders
- synchronized (tasks) {
- final ForkingTaskRunnerWorkItem taskWorkItem =
tasks.get(task.getId());
-
- if (taskWorkItem.shutdown) {
- throw new IllegalStateException("Task has been
shut down!");
- }
-
- if (taskWorkItem == null) {
- log.makeAlert("WTF?! TaskInfo
disappeared!").addData("task", task.getId()).emit();
- throw new ISE("TaskInfo disappeared for
task[%s]!", task.getId());
- }
-
- if (taskWorkItem.processHolder != null) {
- log.makeAlert("WTF?! TaskInfo already has a
processHolder")
- .addData("task", task.getId())
- .emit();
- throw new ISE("TaskInfo already has
processHolder for task[%s]!", task.getId());
- }
-
- final List<String> command = new ArrayList<>();
- final String taskClasspath;
- if (task.getClasspathPrefix() != null &&
!task.getClasspathPrefix().isEmpty()) {
- taskClasspath =
Joiner.on(File.pathSeparator).join(
- task.getClasspathPrefix(),
- config.getClasspath()
- );
- } else {
- taskClasspath = config.getClasspath();
- }
-
- command.add(config.getJavaCommand());
- command.add("-cp");
- command.add(taskClasspath);
-
- Iterables.addAll(command, new
QuotableWhiteSpaceSplitter(config.getJavaOpts()));
- Iterables.addAll(command,
config.getJavaOptsArray());
-
- // Override task specific javaOpts
- Object taskJavaOpts = task.getContextValue(
- ForkingTaskRunnerConfig.JAVA_OPTS_PROPERTY
- );
- if (taskJavaOpts != null) {
- Iterables.addAll(
- command,
- new QuotableWhiteSpaceSplitter((String)
taskJavaOpts)
- );
- }
-
- for (String propName :
props.stringPropertyNames()) {
- for (String allowedPrefix :
config.getAllowedPrefixes()) {
- // See
https://github.com/apache/incubator-druid/issues/1841
- if (propName.startsWith(allowedPrefix)
- &&
!ForkingTaskRunnerConfig.JAVA_OPTS_PROPERTY.equals(propName)
- &&
!ForkingTaskRunnerConfig.JAVA_OPTS_ARRAY_PROPERTY.equals(propName)
- ) {
- command.add(
- StringUtils.format(
- "-D%s=%s",
- propName,
- props.getProperty(propName)
- )
- );
- }
- }
- }
-
- // Override child JVM specific properties
- for (String propName :
props.stringPropertyNames()) {
- if
(propName.startsWith(CHILD_PROPERTY_PREFIX)) {
- command.add(
- StringUtils.format(
- "-D%s=%s",
-
propName.substring(CHILD_PROPERTY_PREFIX.length()),
- props.getProperty(propName)
- )
- );
- }
- }
-
- // Override task specific properties
- final Map<String, Object> context =
task.getContext();
- if (context != null) {
- for (String propName : context.keySet()) {
- if
(propName.startsWith(CHILD_PROPERTY_PREFIX)) {
- command.add(
- StringUtils.format(
- "-D%s=%s",
-
propName.substring(CHILD_PROPERTY_PREFIX.length()),
- task.getContextValue(propName)
- )
- );
- }
- }
- }
-
- // Add dataSource, taskId and taskType for
metrics or logging
- command.add(
- StringUtils.format(
- "-D%s%s=%s",
- MonitorsConfig.METRIC_DIMENSION_PREFIX,
- DruidMetrics.DATASOURCE,
- task.getDataSource()
- )
- );
+ command.add(config.getJavaCommand());
+ command.add("-cp");
+ command.add(taskClasspath);
+
+ Iterables.addAll(command, new
QuotableWhiteSpaceSplitter(config.getJavaOpts()));
+ Iterables.addAll(command, config.getJavaOptsArray());
+
+ // Override task specific javaOpts
+ Object taskJavaOpts = task.getContextValue(
+ ForkingTaskRunnerConfig.JAVA_OPTS_PROPERTY
+ );
+ if (taskJavaOpts != null) {
+ Iterables.addAll(
+ command,
+ new QuotableWhiteSpaceSplitter((String)
taskJavaOpts)
+ );
+ }
+
+ for (String propName : props.stringPropertyNames()) {
+ for (String allowedPrefix :
config.getAllowedPrefixes()) {
+ // See
https://github.com/apache/incubator-druid/issues/1841
+ if (propName.startsWith(allowedPrefix)
+ &&
!ForkingTaskRunnerConfig.JAVA_OPTS_PROPERTY.equals(propName)
+ &&
!ForkingTaskRunnerConfig.JAVA_OPTS_ARRAY_PROPERTY.equals(propName)
+ ) {
command.add(
StringUtils.format(
- "-D%s%s=%s",
- MonitorsConfig.METRIC_DIMENSION_PREFIX,
- DruidMetrics.TASK_ID,
- task.getId()
- )
+ "-D%s=%s",
+ propName,
+ props.getProperty(propName)
+ )
);
+ }
+ }
+ }
+
+ // Override child JVM specific properties
+ for (String propName : props.stringPropertyNames()) {
+ if (propName.startsWith(CHILD_PROPERTY_PREFIX)) {
+ command.add(
+ StringUtils.format(
+ "-D%s=%s",
+
propName.substring(CHILD_PROPERTY_PREFIX.length()),
+ props.getProperty(propName)
+ )
+ );
+ }
+ }
+
+ // Override task specific properties
+ final Map<String, Object> context = task.getContext();
+ if (context != null) {
+ for (String propName : context.keySet()) {
+ if (propName.startsWith(CHILD_PROPERTY_PREFIX)) {
command.add(
StringUtils.format(
- "-D%s%s=%s",
- MonitorsConfig.METRIC_DIMENSION_PREFIX,
- DruidMetrics.TASK_TYPE,
- task.getType()
- )
+ "-D%s=%s",
+
propName.substring(CHILD_PROPERTY_PREFIX.length()),
+ task.getContextValue(propName)
+ )
);
+ }
+ }
+ }
-
command.add(StringUtils.format("-Ddruid.host=%s", childHost));
-
command.add(StringUtils.format("-Ddruid.plaintextPort=%d", childPort));
-
command.add(StringUtils.format("-Ddruid.tlsPort=%d", tlsChildPort));
- /**
- * These are not enabled per default to allow
the user to either set or not set them
- * Users are highly suggested to be set in
druid.indexer.runner.javaOpts
- * See
org.apache.druid.concurrent.TaskThreadPriority#getThreadPriorityFromTaskPriority(int)
- * for more information
- command.add("-XX:+UseThreadPriorities");
- command.add("-XX:ThreadPriorityPolicy=42");
- */
-
- command.add("org.apache.druid.cli.Main");
- command.add("internal");
- command.add("peon");
- command.add(taskFile.toString());
- command.add(statusFile.toString());
- command.add(reportsFile.toString());
- String nodeType = task.getNodeType();
- if (nodeType != null) {
- command.add("--nodeType");
- command.add(nodeType);
- }
-
- if (!taskFile.exists()) {
- jsonMapper.writeValue(taskFile, task);
- }
-
- log.info("Running command: %s", Joiner.on("
").join(command));
- taskWorkItem.processHolder = new ProcessHolder(
- new
ProcessBuilder(ImmutableList.copyOf(command)).redirectErrorStream(true).start(),
- logFile,
- taskLocation.getHost(),
- taskLocation.getPort(),
- taskLocation.getTlsPort()
- );
+ // Add dataSource, taskId and taskType for metrics or
logging
+ command.add(
+ StringUtils.format(
+ "-D%s%s=%s",
+ MonitorsConfig.METRIC_DIMENSION_PREFIX,
+ DruidMetrics.DATASOURCE,
+ task.getDataSource()
+ )
+ );
+ command.add(
+ StringUtils.format(
+ "-D%s%s=%s",
+ MonitorsConfig.METRIC_DIMENSION_PREFIX,
+ DruidMetrics.TASK_ID,
+ task.getId()
+ )
+ );
+ command.add(
+ StringUtils.format(
+ "-D%s%s=%s",
+ MonitorsConfig.METRIC_DIMENSION_PREFIX,
+ DruidMetrics.TASK_TYPE,
+ task.getType()
+ )
+ );
+
+ command.add(StringUtils.format("-Ddruid.host=%s",
childHost));
+
command.add(StringUtils.format("-Ddruid.plaintextPort=%d", childPort));
+ command.add(StringUtils.format("-Ddruid.tlsPort=%d",
tlsChildPort));
+ /**
+ * These are not enabled per default to allow the user
to either set or not set them
+ * Users are highly suggested to be set in
druid.indexer.runner.javaOpts
+ * See
org.apache.druid.concurrent.TaskThreadPriority#getThreadPriorityFromTaskPriority(int)
+ * for more information
+ command.add("-XX:+UseThreadPriorities");
+ command.add("-XX:ThreadPriorityPolicy=42");
+ */
+
+ command.add("org.apache.druid.cli.Main");
+ command.add("internal");
+ command.add("peon");
+ command.add(taskFile.toString());
+ command.add(statusFile.toString());
+ command.add(reportsFile.toString());
+ String nodeType = task.getNodeType();
+ if (nodeType != null) {
+ command.add("--nodeType");
+ command.add(nodeType);
+ }
- processHolder = taskWorkItem.processHolder;
- processHolder.registerWithCloser(closer);
- }
+ if (!taskFile.exists()) {
+ jsonMapper.writeValue(taskFile, task);
+ }
- TaskRunnerUtils.notifyLocationChanged(listeners,
task.getId(), taskLocation);
- TaskRunnerUtils.notifyStatusChanged(
- listeners,
- task.getId(),
- TaskStatus.running(task.getId())
- );
+ log.info("Running command: %s", Joiner.on("
").join(command));
+ taskWorkItem.processHolder = new ProcessHolder(
+ new
ProcessBuilder(ImmutableList.copyOf(command)).redirectErrorStream(true).start(),
+ logFile,
+ taskLocation.getHost(),
+ taskLocation.getPort(),
+ taskLocation.getTlsPort()
+ );
+
+ processHolder = taskWorkItem.processHolder;
+ processHolder.registerWithCloser(closer);
+ }
- log.info("Logging task %s output to: %s",
task.getId(), logFile);
- boolean runFailed = true;
+ TaskRunnerUtils.notifyLocationChanged(listeners,
task.getId(), taskLocation);
+ TaskRunnerUtils.notifyStatusChanged(
+ listeners,
+ task.getId(),
+ TaskStatus.running(task.getId())
+ );
- final ByteSink logSink = Files.asByteSink(logFile,
FileWriteMode.APPEND);
+ log.info("Logging task %s output to: %s", task.getId(),
logFile);
+ boolean runFailed = true;
- // This will block for a while. So we append the
thread information with more details
- final String priorThreadName =
Thread.currentThread().getName();
-
Thread.currentThread().setName(StringUtils.format("%s-[%s]", priorThreadName,
task.getId()));
+ final ByteSink logSink = Files.asByteSink(logFile,
FileWriteMode.APPEND);
- try (final OutputStream toLogfile =
logSink.openStream()) {
-
ByteStreams.copy(processHolder.process.getInputStream(), toLogfile);
- final int statusCode =
processHolder.process.waitFor();
- log.info("Process exited with status[%d] for
task: %s", statusCode, task.getId());
- if (statusCode == 0) {
- runFailed = false;
- }
- }
- finally {
- Thread.currentThread().setName(priorThreadName);
- // Upload task logs
- taskLogPusher.pushTaskLog(task.getId(), logFile);
- if (reportsFile.exists()) {
- taskLogPusher.pushTaskReports(task.getId(),
reportsFile);
- }
- }
+ // This will block for a while. So we append the thread
information with more details
+ final String priorThreadName =
Thread.currentThread().getName();
+
Thread.currentThread().setName(StringUtils.format("%s-[%s]", priorThreadName,
task.getId()));
- TaskStatus status;
- if (!runFailed) {
- // Process exited successfully
- status = jsonMapper.readValue(statusFile,
TaskStatus.class);
- } else {
- // Process exited unsuccessfully
- status = TaskStatus.failure(task.getId());
- }
+ try (final OutputStream toLogfile =
logSink.openStream()) {
+
ByteStreams.copy(processHolder.process.getInputStream(), toLogfile);
+ final int statusCode = processHolder.process.waitFor();
+ log.info("Process exited with status[%d] for task:
%s", statusCode, task.getId());
+ if (statusCode == 0) {
+ runFailed = false;
+ }
+ }
+ finally {
+ Thread.currentThread().setName(priorThreadName);
+ // Upload task logs
+ taskLogPusher.pushTaskLog(task.getId(), logFile);
+ if (reportsFile.exists()) {
+ taskLogPusher.pushTaskReports(task.getId(),
reportsFile);
+ }
+ }
- TaskRunnerUtils.notifyStatusChanged(listeners,
task.getId(), status);
- return status;
- }
- catch (Throwable t) {
- throw closer.rethrow(t);
- }
- finally {
- closer.close();
- }
+ TaskStatus status;
+ if (!runFailed) {
+ // Process exited successfully
+ status = jsonMapper.readValue(statusFile,
TaskStatus.class);
+ } else {
+ // Process exited unsuccessfully
+ status = TaskStatus.failure(task.getId());
+ }
+
+ TaskRunnerUtils.notifyStatusChanged(listeners,
task.getId(), status);
+ return status;
+ }
+ catch (Throwable t) {
+ throw closer.rethrow(t);
+ }
+ finally {
+ closer.close();
+ }
+ }
+ catch (Throwable t) {
+ log.info(t, "Exception caught during execution");
+ throw new RuntimeException(t);
+ }
+ finally {
+ try {
+ synchronized (tasks) {
+ final ForkingTaskRunnerWorkItem taskWorkItem =
tasks.remove(task.getId());
+ if (taskWorkItem != null && taskWorkItem.processHolder
!= null) {
+ taskWorkItem.processHolder.process.destroy();
}
- catch (Throwable t) {
- log.info(t, "Exception caught during execution");
- throw new RuntimeException(t);
+ if (!stopping) {
+ saveRunningTasks();
}
- finally {
- try {
- synchronized (tasks) {
- final ForkingTaskRunnerWorkItem taskWorkItem =
tasks.remove(task.getId());
- if (taskWorkItem != null &&
taskWorkItem.processHolder != null) {
- taskWorkItem.processHolder.process.destroy();
- }
- if (!stopping) {
- saveRunningTasks();
- }
- }
+ }
- if (node.isEnablePlaintextPort()) {
- portFinder.markPortUnused(childPort);
- }
- if (node.isEnableTlsPort()) {
- portFinder.markPortUnused(tlsChildPort);
- }
+ if (node.isEnablePlaintextPort()) {
+ portFinder.markPortUnused(childPort);
+ }
+ if (node.isEnableTlsPort()) {
+ portFinder.markPortUnused(tlsChildPort);
+ }
- try {
- if (!stopping && taskDir.exists()) {
- log.info("Removing task directory: %s",
taskDir);
- FileUtils.deleteDirectory(taskDir);
- }
- }
- catch (Exception e) {
- log.makeAlert(e, "Failed to delete task
directory")
- .addData("taskDir", taskDir.toString())
- .addData("task", task.getId())
- .emit();
- }
- }
- catch (Exception e) {
- log.error(e, "Suppressing exception caught while
cleaning up task");
- }
+ try {
+ if (!stopping && taskDir.exists()) {
+ log.info("Removing task directory: %s", taskDir);
+ FileUtils.deleteDirectory(taskDir);
}
}
+ catch (Exception e) {
+ log.makeAlert(e, "Failed to delete task directory")
+ .addData("taskDir", taskDir.toString())
+ .addData("task", task.getId())
+ .emit();
+ }
+ }
+ catch (Exception e) {
+ log.error(e, "Suppressing exception caught while
cleaning up task");
}
- )
+ }
+ }
+ }
)
- );
- }
+ )
+ );
saveRunningTasks();
return tasks.get(task.getId()).getResult();
}
diff --git
a/server/src/main/java/org/apache/druid/metadata/SQLMetadataSupervisorManager.java
b/server/src/main/java/org/apache/druid/metadata/SQLMetadataSupervisorManager.java
index 83fcc05..a5354b1 100644
---
a/server/src/main/java/org/apache/druid/metadata/SQLMetadataSupervisorManager.java
+++
b/server/src/main/java/org/apache/druid/metadata/SQLMetadataSupervisorManager.java
@@ -166,9 +166,7 @@ public class SQLMetadataSupervisorManager implements
MetadataSupervisorManager
{
try {
String specId = pair.lhs;
- if (!retVal.containsKey(specId)) {
- retVal.put(specId, new ArrayList<>());
- }
+ retVal.putIfAbsent(specId, new ArrayList<>());
retVal.get(specId).add(pair.rhs);
return retVal;
diff --git
a/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java
b/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java
index 3787cc6..9255e7c 100644
--- a/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java
+++ b/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java
@@ -549,9 +549,7 @@ public class DataSourcesResource
continue;
}
- if (!tierDistinctSegments.containsKey(tier)) {
- tierDistinctSegments.put(tier, new HashSet<>());
- }
+ tierDistinctSegments.computeIfAbsent(tier, k -> new HashSet<>());
long dataSourceSegmentSize = 0;
for (DataSegment dataSegment : druidDataSource.getSegments()) {
diff --git
a/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java
b/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java
index de24926..8170014 100644
---
a/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java
+++
b/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java
@@ -2198,9 +2198,7 @@ public class CachingClusteredClientTest
serverExpectationList.add(serverExpectations);
for (int j = 0; j < numChunks; ++j) {
DruidServer lastServer = servers[random.nextInt(servers.length)];
- if (!serverExpectations.containsKey(lastServer)) {
- serverExpectations.put(lastServer, new
ServerExpectations(lastServer, makeMock(mocks, QueryRunner.class)));
- }
+ serverExpectations.computeIfAbsent(lastServer, server -> new
ServerExpectations(lastServer, makeMock(mocks, QueryRunner.class)));
DataSegment mockSegment = makeMock(mocks, DataSegment.class);
ServerExpectation<Object> expectation = new ServerExpectation<>(
diff --git
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java
index 9c3e9cc..c7475d9 100644
---
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java
+++
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java
@@ -428,9 +428,7 @@ public class StreamAppenderatorDriverTest extends
EasyMockSupport
synchronized (counters) {
DateTime dateTimeTruncated =
granularity.bucketStart(row.getTimestamp());
final long timestampTruncated = dateTimeTruncated.getMillis();
- if (!counters.containsKey(timestampTruncated)) {
- counters.put(timestampTruncated, new AtomicInteger());
- }
+ counters.putIfAbsent(timestampTruncated, new AtomicInteger());
final int partitionNum =
counters.get(timestampTruncated).getAndIncrement();
return new SegmentIdWithShardSpec(
dataSource,
diff --git a/sql/src/main/java/org/apache/druid/sql/SqlLifecycle.java
b/sql/src/main/java/org/apache/druid/sql/SqlLifecycle.java
index a9c78c4..80cfd04 100644
--- a/sql/src/main/java/org/apache/druid/sql/SqlLifecycle.java
+++ b/sql/src/main/java/org/apache/druid/sql/SqlLifecycle.java
@@ -122,9 +122,7 @@ public class SqlLifecycle
if (queryContext != null) {
newContext.putAll(queryContext);
}
- if (!newContext.containsKey(PlannerContext.CTX_SQL_QUERY_ID)) {
- newContext.put(PlannerContext.CTX_SQL_QUERY_ID,
UUID.randomUUID().toString());
- }
+ newContext.computeIfAbsent(PlannerContext.CTX_SQL_QUERY_ID, k ->
UUID.randomUUID().toString());
return newContext;
}
diff --git
a/sql/src/test/java/org/apache/druid/sql/calcite/util/SpecificSegmentsQuerySegmentWalker.java
b/sql/src/test/java/org/apache/druid/sql/calcite/util/SpecificSegmentsQuerySegmentWalker.java
index af0e894..4d1206b 100644
---
a/sql/src/test/java/org/apache/druid/sql/calcite/util/SpecificSegmentsQuerySegmentWalker.java
+++
b/sql/src/test/java/org/apache/druid/sql/calcite/util/SpecificSegmentsQuerySegmentWalker.java
@@ -78,9 +78,7 @@ public class SpecificSegmentsQuerySegmentWalker implements
QuerySegmentWalker, C
)
{
final Segment segment = new QueryableIndexSegment(index,
descriptor.getId());
- if (!timelines.containsKey(descriptor.getDataSource())) {
- timelines.put(descriptor.getDataSource(), new
VersionedIntervalTimeline<>(Ordering.natural()));
- }
+ timelines.computeIfAbsent(descriptor.getDataSource(), datasource -> new
VersionedIntervalTimeline<>(Ordering.natural()));
final VersionedIntervalTimeline<String, Segment> timeline =
timelines.get(descriptor.getDataSource());
timeline.add(descriptor.getInterval(), descriptor.getVersion(),
descriptor.getShardSpec().createChunk(segment));
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]