This is an automated email from the ASF dual-hosted git repository.
gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new b7641f6 Two fixes related to encoding of % symbols. (#10645)
b7641f6 is described below
commit b7641f644cc19a80b33b3607ef6ef23d977236c6
Author: Gian Merlino <[email protected]>
AuthorDate: Sun Dec 6 22:35:11 2020 -0800
Two fixes related to encoding of % symbols. (#10645)
* Two fixes related to encoding of % symbols.
1) TaskResourceFilter: Don't double-decode task ids.
request.getPathSegments()
returns already-decoded strings. Applying StringUtils.urlDecode on
top of that causes erroneous behavior with '%' characters.
2) Update various ThreadFactoryBuilder name formats to escape '%'
characters. This fixes situations where substrings starting with '%'
are erroneously treated as format specifiers.
ITs are updated to include a '%' in extra.datasource.name.suffix.
* Avoid String.replace.
* Work around surefire bug.
* Fix xml encoding.
* Another try at the proper encoding.
* Give up on the emojis.
* Less ambitious testing.
* Fix an additional problem.
* Adjust encodeForFormat to return null if the input is null.
---
.../apache/druid/java/util/common/StringUtils.java | 15 ++++++++
.../druid/java/util/common/StringUtilsTest.java | 8 +++++
.../MaterializedViewSupervisor.java | 2 +-
.../druid/security/basic/CommonCacheNotifier.java | 4 ++-
.../query/lookup/KafkaLookupExtractorFactory.java | 2 +-
.../apache/druid/server/lookup/PollingLookup.java | 3 +-
.../druid/indexing/common/IndexTaskClient.java | 2 +-
.../common/task/batch/parallel/TaskMonitor.java | 2 +-
.../overlord/http/security/TaskResourceFilter.java | 13 ++-----
.../supervisor/SeekableStreamSupervisor.java | 42 +++++++++++++++-------
integration-tests/pom.xml | 4 ++-
.../discovery/CuratorDruidLeaderSelector.java | 7 +++-
.../CuratorDruidNodeDiscoveryProvider.java | 4 ++-
.../druid/metadata/SqlSegmentsMetadataManager.java | 2 +-
.../realtime/appenderator/AppenderatorImpl.java | 9 +++--
.../appenderator/BaseAppenderatorDriver.java | 5 ++-
.../apache/druid/sql/avatica/DruidStatement.java | 10 ++++--
17 files changed, 93 insertions(+), 41 deletions(-)
diff --git
a/core/src/main/java/org/apache/druid/java/util/common/StringUtils.java
b/core/src/main/java/org/apache/druid/java/util/common/StringUtils.java
index bf95400..97fb433 100644
--- a/core/src/main/java/org/apache/druid/java/util/common/StringUtils.java
+++ b/core/src/main/java/org/apache/druid/java/util/common/StringUtils.java
@@ -188,6 +188,21 @@ public class StringUtils
}
}
+ /**
+ * Encodes a string "s" for insertion into a format string.
+ *
+ * Returns null if the input is null.
+ */
+ @Nullable
+ public static String encodeForFormat(@Nullable final String s)
+ {
+ if (s == null) {
+ return null;
+ } else {
+ return StringUtils.replaceChar(s, '%', "%%");
+ }
+ }
+
public static String toLowerCase(String s)
{
return s.toLowerCase(Locale.ENGLISH);
diff --git
a/core/src/test/java/org/apache/druid/java/util/common/StringUtilsTest.java
b/core/src/test/java/org/apache/druid/java/util/common/StringUtilsTest.java
index bd5c0b2..4f80740 100644
--- a/core/src/test/java/org/apache/druid/java/util/common/StringUtilsTest.java
+++ b/core/src/test/java/org/apache/druid/java/util/common/StringUtilsTest.java
@@ -174,6 +174,14 @@ public class StringUtilsTest
}
@Test
+ public void testEncodeForFormat()
+ {
+ Assert.assertEquals("x %% a %%s", StringUtils.encodeForFormat("x % a %s"));
+ Assert.assertEquals("", StringUtils.encodeForFormat(""));
+ Assert.assertNull(StringUtils.encodeForFormat(null));
+ }
+
+ @Test
public void testURLEncodeSpace()
{
String s1 = StringUtils.urlEncode("aaa bbb");
diff --git
a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java
b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java
index dd3db50..41647d5 100644
---
a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java
+++
b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java
@@ -137,7 +137,7 @@ public class MaterializedViewSupervisor implements
Supervisor
new DerivativeDataSourceMetadata(spec.getBaseDataSource(),
spec.getDimensions(), spec.getMetrics())
);
}
- exec =
MoreExecutors.listeningDecorator(Execs.scheduledSingleThreaded(supervisorId));
+ exec =
MoreExecutors.listeningDecorator(Execs.scheduledSingleThreaded(StringUtils.encodeForFormat(supervisorId)));
final Duration delay =
config.getTaskCheckDuration().toStandardDuration();
future = exec.scheduleWithFixedDelay(
MaterializedViewSupervisor.this::run,
diff --git
a/extensions-core/druid-basic-security/src/main/java/org/apache/druid/security/basic/CommonCacheNotifier.java
b/extensions-core/druid-basic-security/src/main/java/org/apache/druid/security/basic/CommonCacheNotifier.java
index 608beb1..17b046e 100644
---
a/extensions-core/druid-basic-security/src/main/java/org/apache/druid/security/basic/CommonCacheNotifier.java
+++
b/extensions-core/druid-basic-security/src/main/java/org/apache/druid/security/basic/CommonCacheNotifier.java
@@ -88,7 +88,9 @@ public class CommonCacheNotifier
String callerName
)
{
- this.exec = Execs.singleThreaded(StringUtils.format("%s-notifierThread-",
callerName) + "%d");
+ this.exec = Execs.singleThreaded(
+ StringUtils.format("%s-notifierThread-",
StringUtils.encodeForFormat(callerName)) + "%d"
+ );
this.callerName = callerName;
this.updateQueue = new LinkedBlockingQueue<>();
this.itemConfigMap = itemConfigMap;
diff --git
a/extensions-core/kafka-extraction-namespace/src/main/java/org/apache/druid/query/lookup/KafkaLookupExtractorFactory.java
b/extensions-core/kafka-extraction-namespace/src/main/java/org/apache/druid/query/lookup/KafkaLookupExtractorFactory.java
index 28cbad2..a50402a 100644
---
a/extensions-core/kafka-extraction-namespace/src/main/java/org/apache/druid/query/lookup/KafkaLookupExtractorFactory.java
+++
b/extensions-core/kafka-extraction-namespace/src/main/java/org/apache/druid/query/lookup/KafkaLookupExtractorFactory.java
@@ -100,7 +100,7 @@ public class KafkaLookupExtractorFactory implements
LookupExtractorFactory
this.kafkaTopic = Preconditions.checkNotNull(kafkaTopic, "kafkaTopic
required");
this.kafkaProperties = Preconditions.checkNotNull(kafkaProperties,
"kafkaProperties required");
executorService = MoreExecutors.listeningDecorator(Execs.singleThreaded(
- "kafka-factory-" + kafkaTopic + "-%s",
+ "kafka-factory-" + StringUtils.encodeForFormat(kafkaTopic) + "-%s",
Thread.MIN_PRIORITY
));
this.cacheManager = cacheManager;
diff --git
a/extensions-core/lookups-cached-single/src/main/java/org/apache/druid/server/lookup/PollingLookup.java
b/extensions-core/lookups-cached-single/src/main/java/org/apache/druid/server/lookup/PollingLookup.java
index 0b1b14c..375f3d0 100644
---
a/extensions-core/lookups-cached-single/src/main/java/org/apache/druid/server/lookup/PollingLookup.java
+++
b/extensions-core/lookups-cached-single/src/main/java/org/apache/druid/server/lookup/PollingLookup.java
@@ -25,6 +25,7 @@ import
com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.lookup.LookupExtractor;
@@ -75,7 +76,7 @@ public class PollingLookup extends LookupExtractor
refOfCacheKeeper.set(new
CacheRefKeeper(this.cacheFactory.makeOf(dataFetcher.fetchAll())));
if (pollPeriodMs > 0) {
scheduledExecutorService =
MoreExecutors.listeningDecorator(Executors.newSingleThreadScheduledExecutor(
- Execs.makeThreadFactory("PollingLookup-" + id, Thread.MIN_PRIORITY)
+ Execs.makeThreadFactory("PollingLookup-" +
StringUtils.encodeForFormat(id), Thread.MIN_PRIORITY)
));
pollFuture = scheduledExecutorService.scheduleWithFixedDelay(
pollAndSwap(),
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/IndexTaskClient.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/IndexTaskClient.java
index bb073f4..d358462 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/IndexTaskClient.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/IndexTaskClient.java
@@ -117,7 +117,7 @@ public abstract class IndexTaskClient implements
AutoCloseable
numThreads,
StringUtils.format(
"IndexTaskClient-%s-%%d",
- callerId
+ StringUtils.encodeForFormat(callerId)
)
)
);
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitor.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitor.java
index 302d23a..2b75782 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitor.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitor.java
@@ -54,7 +54,7 @@ public class TaskMonitor<T extends Task>
{
private static final Logger log = new Logger(TaskMonitor.class);
- private final ScheduledExecutorService taskStatusChecker =
Execs.scheduledSingleThreaded(("task-monitor-%d"));
+ private final ScheduledExecutorService taskStatusChecker =
Execs.scheduledSingleThreaded("task-monitor-%d");
/**
* A map of subTaskSpecId to {@link MonitorEntry}. This map stores the state
of running {@link SubTaskSpec}s. This is
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/security/TaskResourceFilter.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/security/TaskResourceFilter.java
index 51b17f8..da1e5c5 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/security/TaskResourceFilter.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/security/TaskResourceFilter.java
@@ -21,7 +21,6 @@ package org.apache.druid.indexing.overlord.http.security;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
-import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import com.google.inject.Inject;
import com.sun.jersey.spi.container.ContainerRequest;
@@ -39,7 +38,6 @@ import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.server.security.ResourceType;
import javax.ws.rs.WebApplicationException;
-import javax.ws.rs.core.PathSegment;
import javax.ws.rs.core.Response;
/**
@@ -70,18 +68,11 @@ public class TaskResourceFilter extends
AbstractResourceFilter
.get(
Iterables.indexOf(
request.getPathSegments(),
- new Predicate<PathSegment>()
- {
- @Override
- public boolean apply(PathSegment input)
- {
- return "task".equals(input.getPath());
- }
- }
+ input -> "task".equals(input.getPath())
) + 1
).getPath()
);
- taskId = StringUtils.urlDecode(taskId);
+
IdUtils.validateId("taskId", taskId);
Optional<Task> taskOptional = taskStorageQueryAdapter.getTask(taskId);
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
index da6dc8b..1a4f35f 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
@@ -521,9 +521,9 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
this.tuningConfig = spec.getTuningConfig();
this.taskTuningConfig = this.tuningConfig.convertToTaskTuningConfig();
this.supervisorId = supervisorId;
- this.exec = Execs.singleThreaded(supervisorId);
- this.scheduledExec = Execs.scheduledSingleThreaded(supervisorId +
"-Scheduler-%d");
- this.reportingExec = Execs.scheduledSingleThreaded(supervisorId +
"-Reporting-%d");
+ this.exec =
Execs.singleThreaded(StringUtils.encodeForFormat(supervisorId));
+ this.scheduledExec =
Execs.scheduledSingleThreaded(StringUtils.encodeForFormat(supervisorId) +
"-Scheduler-%d");
+ this.reportingExec =
Execs.scheduledSingleThreaded(StringUtils.encodeForFormat(supervisorId) +
"-Reporting-%d");
this.stateManager = new SeekableStreamSupervisorStateManager(
spec.getSupervisorStateManagerConfig(),
spec.isSuspended()
@@ -533,7 +533,12 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
? this.tuningConfig.getWorkerThreads()
: Math.min(10, this.ioConfig.getTaskCount()));
- this.workerExec =
MoreExecutors.listeningDecorator(Execs.multiThreaded(workerThreads,
supervisorId + "-Worker-%d"));
+ this.workerExec = MoreExecutors.listeningDecorator(
+ Execs.multiThreaded(
+ workerThreads,
+ StringUtils.encodeForFormat(supervisorId) + "-Worker-%d"
+ )
+ );
log.info("Created worker pool with [%d] threads for dataSource [%s]",
workerThreads, this.dataSource);
this.taskInfoProvider = new TaskInfoProvider()
@@ -921,6 +926,7 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
* Collect row ingestion stats from all tasks managed by this supervisor.
*
* @return A map of groupId->taskId->task row stats
+ *
* @throws InterruptedException
* @throws ExecutionException
* @throws TimeoutException
@@ -1597,7 +1603,8 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
throw new RuntimeException(e);
}
- final DataSourceMetadata rawDataSourceMetadata =
indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(dataSource);
+ final DataSourceMetadata rawDataSourceMetadata =
indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(
+ dataSource);
if (rawDataSourceMetadata != null &&
!checkSourceMetadataMatch(rawDataSourceMetadata)) {
throw new IAE(
@@ -2088,13 +2095,13 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
* It will mark the expired partitions in metadata and recompute the
partition->task group mappings, updating
* the metadata, the partitionIds list, and the partitionGroups mappings.
*
- * @param storedPartitions Set of partitions previously tracked, from the
metadata store
- * @param newlyClosedPartitions Set of partitions that are closed in the
metadata store but still present in the
- * current {@link #partitionIds}
+ * @param storedPartitions Set of partitions previously
tracked, from the metadata store
+ * @param newlyClosedPartitions Set of partitions that are closed
in the metadata store but still present in the
+ * current {@link #partitionIds}
* @param activePartitionsIdsFromSupplier Set of partitions currently
returned by the record supplier, but with
* any partitions that are
closed/expired in the metadata store removed
- * @param previouslyExpiredPartitions Set of partitions that are recorded as
expired in the metadata store
- * @param partitionIdsFromSupplier Set of partitions currently returned by
the record supplier.
+ * @param previouslyExpiredPartitions Set of partitions that are
recorded as expired in the metadata store
+ * @param partitionIdsFromSupplier Set of partitions currently
returned by the record supplier.
*/
private void cleanupClosedAndExpiredPartitions(
Set<PartitionIdType> storedPartitions,
@@ -2176,6 +2183,7 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
* by this method.
*
* @param availablePartitions
+ *
* @return a remapped copy of partitionGroups, containing only the
partitions in availablePartitions
*/
protected Map<Integer, Set<PartitionIdType>>
recomputePartitionGroupsForExpiration(
@@ -2192,6 +2200,7 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
*
* @param currentMetadata The current DataSourceMetadata from metadata
storage
* @param expiredPartitionIds The set of expired partition IDs.
+ *
* @return currentMetadata but with any expired partitions removed.
*/
protected SeekableStreamDataSourceMetadata<PartitionIdType,
SequenceOffsetType> createDataSourceMetadataWithExpiredPartitions(
@@ -2804,6 +2813,7 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
* should be removed from the starting offsets sent to the tasks.
*
* @param startingOffsets
+ *
* @return startingOffsets with entries for expired partitions removed
*/
protected Map<PartitionIdType, OrderedSequenceNumber<SequenceOffsetType>>
filterExpiredPartitionsFromStartingOffsets(
@@ -2832,8 +2842,8 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
minimumMessageTime =
Optional.of(ioConfig.getLateMessageRejectionStartDateTime().get());
} else {
minimumMessageTime =
(ioConfig.getLateMessageRejectionPeriod().isPresent() ? Optional.of(
-
DateTimes.nowUtc().minus(ioConfig.getLateMessageRejectionPeriod().get())
- ) : Optional.absent());
+
DateTimes.nowUtc().minus(ioConfig.getLateMessageRejectionPeriod().get())
+ ) : Optional.absent());
}
Optional<DateTime> maximumMessageTime =
(ioConfig.getEarlyMessageRejectionPeriod().isPresent() ? Optional.of(
@@ -3039,7 +3049,8 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
private Map<PartitionIdType, SequenceOffsetType>
getOffsetsFromMetadataStorage()
{
- final DataSourceMetadata dataSourceMetadata =
indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(dataSource);
+ final DataSourceMetadata dataSourceMetadata =
indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(
+ dataSource);
if (dataSourceMetadata instanceof SeekableStreamDataSourceMetadata
&& checkSourceMetadataMatch(dataSourceMetadata)) {
@SuppressWarnings("unchecked")
@@ -3338,6 +3349,7 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
* the given replicas count
*
* @return list of specific kafka/kinesis index taksks
+ *
* @throws JsonProcessingException
*/
protected abstract List<SeekableStreamIndexTask<PartitionIdType,
SequenceOffsetType>> createIndexTasks(
@@ -3355,6 +3367,7 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
* different between Kafka/Kinesis since Kinesis uses String as partition id
*
* @param partition partition id
+ *
* @return taskgroup id
*/
protected abstract int getTaskGroupIdForPartition(PartitionIdType partition);
@@ -3364,6 +3377,7 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
* of [kafka/kinesis]DataSourceMetadata
*
* @param metadata datasource metadata
+ *
* @return true if isInstance else false
*/
protected abstract boolean checkSourceMetadataMatch(DataSourceMetadata
metadata);
@@ -3373,6 +3387,7 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
* [Kafka/Kinesis]IndexTask
*
* @param task task
+ *
* @return true if isInstance else false
*/
protected abstract boolean doesTaskTypeMatchSupervisor(Task task);
@@ -3382,6 +3397,7 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
*
* @param stream stream name
* @param map partitionId -> sequence
+ *
* @return specific instance of datasource metadata
*/
protected abstract SeekableStreamDataSourceMetadata<PartitionIdType,
SequenceOffsetType> createDataSourceMetaDataForReset(
diff --git a/integration-tests/pom.xml b/integration-tests/pom.xml
index 4f0fbe6..cdc3849 100644
--- a/integration-tests/pom.xml
+++ b/integration-tests/pom.xml
@@ -372,7 +372,9 @@
<docker.build.skip>false</docker.build.skip>
<override.config.path />
<resource.file.dir.path />
- <extra.datasource.name.suffix>\ Россия\ 한국\
中国!?</extra.datasource.name.suffix>
+
+ <!-- Would like to put emojis in here too, but they throw
"Input buffer too short" errors due to
https://issues.apache.org/jira/browse/SUREFIRE-1865 -->
+ <extra.datasource.name.suffix>\ %Россия\ 한국\
中国!?</extra.datasource.name.suffix>
</properties>
<build>
<plugins>
diff --git
a/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidLeaderSelector.java
b/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidLeaderSelector.java
index 10b2a6c..f3c0afe 100644
---
a/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidLeaderSelector.java
+++
b/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidLeaderSelector.java
@@ -181,7 +181,12 @@ public class CuratorDruidLeaderSelector implements
DruidLeaderSelector
}
try {
this.listener = listener;
- this.listenerExecutor =
Execs.singleThreaded(StringUtils.format("LeaderSelector[%s]", latchPath));
+ this.listenerExecutor = Execs.singleThreaded(
+ StringUtils.format(
+ "LeaderSelector[%s]",
+ StringUtils.encodeForFormat(latchPath)
+ )
+ );
createNewLeaderLatchWithListener();
leaderLatch.get().start();
diff --git
a/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidNodeDiscoveryProvider.java
b/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidNodeDiscoveryProvider.java
index 383ec20..e9c8a36 100644
---
a/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidNodeDiscoveryProvider.java
+++
b/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidNodeDiscoveryProvider.java
@@ -204,7 +204,9 @@ public class CuratorDruidNodeDiscoveryProvider extends
DruidNodeDiscoveryProvide
this.jsonMapper = jsonMapper;
// This is required to be single threaded from docs in PathChildrenCache.
- this.cacheExecutor =
Execs.singleThreaded(StringUtils.format("NodeRoleWatcher[%s]", nodeRole));
+ this.cacheExecutor = Execs.singleThreaded(
+ StringUtils.format("NodeRoleWatcher[%s]",
StringUtils.encodeForFormat(nodeRole.toString()))
+ );
cache = new PathChildrenCacheFactory.Builder()
//NOTE: cacheData is temporarily set to false and we get data
directly from ZK on each event.
//this is a workaround to solve curator's out-of-order events problem
diff --git
a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java
b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java
index 60f7f4b..68c333a 100644
---
a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java
+++
b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java
@@ -257,7 +257,7 @@ public class SqlSegmentsMetadataManager implements
SegmentsMetadataManager
if (exec != null) {
return; // Already started
}
- exec = Execs.scheduledSingleThreaded(getClass().getName() + "-Exec--%d");
+ exec =
Execs.scheduledSingleThreaded(StringUtils.encodeForFormat(getClass().getName())
+ "-Exec--%d");
}
finally {
lock.unlock();
diff --git
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java
index c288837..ec9200c 100644
---
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java
+++
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java
@@ -968,21 +968,24 @@ public class AppenderatorImpl implements Appenderator
if (persistExecutor == null) {
// use a blocking single threaded executor to throttle the firehose when
write to disk is slow
persistExecutor = MoreExecutors.listeningDecorator(
- Execs.newBlockingSingleThreaded("[" + myId +
"]-appenderator-persist", maxPendingPersists)
+ Execs.newBlockingSingleThreaded(
+ "[" + StringUtils.encodeForFormat(myId) +
"]-appenderator-persist",
+ maxPendingPersists
+ )
);
}
if (pushExecutor == null) {
// use a blocking single threaded executor to throttle the firehose when
write to disk is slow
pushExecutor = MoreExecutors.listeningDecorator(
- Execs.newBlockingSingleThreaded("[" + myId + "]-appenderator-merge",
1)
+ Execs.newBlockingSingleThreaded("[" +
StringUtils.encodeForFormat(myId) + "]-appenderator-merge", 1)
);
}
if (intermediateTempExecutor == null) {
// use single threaded executor with SynchronousQueue so that all
abandon operations occur sequentially
intermediateTempExecutor = MoreExecutors.listeningDecorator(
- Execs.newBlockingSingleThreaded("[" + myId +
"]-appenderator-abandon", 0)
+ Execs.newBlockingSingleThreaded("[" +
StringUtils.encodeForFormat(myId) + "]-appenderator-abandon", 0)
);
}
}
diff --git
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java
index dc16d59..92b9f3c 100644
---
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java
+++
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java
@@ -38,6 +38,7 @@ import org.apache.druid.data.input.Committer;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.indexing.overlord.SegmentPublishResult;
import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.segment.loading.DataSegmentKiller;
@@ -254,7 +255,9 @@ public abstract class BaseAppenderatorDriver implements
Closeable
this.segmentAllocator = Preconditions.checkNotNull(segmentAllocator,
"segmentAllocator");
this.usedSegmentChecker = Preconditions.checkNotNull(usedSegmentChecker,
"usedSegmentChecker");
this.dataSegmentKiller = Preconditions.checkNotNull(dataSegmentKiller,
"dataSegmentKiller");
- this.executor = MoreExecutors.listeningDecorator(Execs.singleThreaded("["
+ appenderator.getId() + "]-publish"));
+ this.executor = MoreExecutors.listeningDecorator(
+ Execs.singleThreaded("[" +
StringUtils.encodeForFormat(appenderator.getId()) + "]-publish")
+ );
}
@VisibleForTesting
diff --git a/sql/src/main/java/org/apache/druid/sql/avatica/DruidStatement.java
b/sql/src/main/java/org/apache/druid/sql/avatica/DruidStatement.java
index 2b64c30..b618cf6 100644
--- a/sql/src/main/java/org/apache/druid/sql/avatica/DruidStatement.java
+++ b/sql/src/main/java/org/apache/druid/sql/avatica/DruidStatement.java
@@ -99,7 +99,11 @@ public class DruidStatement implements Closeable
this.sqlLifecycle = Preconditions.checkNotNull(sqlLifecycle,
"sqlLifecycle");
this.onClose = Preconditions.checkNotNull(onClose, "onClose");
this.yielderOpenCloseExecutor = Execs.singleThreaded(
-
StringUtils.format("JDBCYielderOpenCloseExecutor-connection-%s-statement-%d",
connectionId, statementId)
+ StringUtils.format(
+ "JDBCYielderOpenCloseExecutor-connection-%s-statement-%d",
+ StringUtils.encodeForFormat(connectionId),
+ statementId
+ )
);
}
@@ -360,11 +364,11 @@ public class DruidStatement implements Closeable
type.getSqlTypeName().getJdbcOrdinal(),
type.getSqlTypeName().getName(),
Calcites.sqlTypeNameJdbcToJavaClass(type.getSqlTypeName()).getName(),
- field.getName());
+ field.getName()
+ );
}
-
private DruidStatement closeAndPropagateThrowable(Throwable t)
{
this.throwable = t;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]