This is an automated email from the ASF dual-hosted git repository.

gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new b7641f6  Two fixes related to encoding of % symbols. (#10645)
b7641f6 is described below

commit b7641f644cc19a80b33b3607ef6ef23d977236c6
Author: Gian Merlino <[email protected]>
AuthorDate: Sun Dec 6 22:35:11 2020 -0800

    Two fixes related to encoding of % symbols. (#10645)
    
    * Two fixes related to encoding of % symbols.
    
    1) TaskResourceFilter: Don't double-decode task ids. 
request.getPathSegments()
       returns already-decoded strings. Applying StringUtils.urlDecode on
       top of that causes erroneous behavior with '%' characters.
    
    2) Update various ThreadFactoryBuilder name formats to escape '%'
       characters. This fixes situations where substrings starting with '%'
       are erroneously treated as format specifiers.
    
    ITs are updated to include a '%' in extra.datasource.name.suffix.
    
    * Avoid String.replace.
    
    * Work around surefire bug.
    
    * Fix xml encoding.
    
    * Another try at the proper encoding.
    
    * Give up on the emojis.
    
    * Less ambitious testing.
    
    * Fix an additional problem.
    
    * Adjust encodeForFormat to return null if the input is null.
---
 .../apache/druid/java/util/common/StringUtils.java | 15 ++++++++
 .../druid/java/util/common/StringUtilsTest.java    |  8 +++++
 .../MaterializedViewSupervisor.java                |  2 +-
 .../druid/security/basic/CommonCacheNotifier.java  |  4 ++-
 .../query/lookup/KafkaLookupExtractorFactory.java  |  2 +-
 .../apache/druid/server/lookup/PollingLookup.java  |  3 +-
 .../druid/indexing/common/IndexTaskClient.java     |  2 +-
 .../common/task/batch/parallel/TaskMonitor.java    |  2 +-
 .../overlord/http/security/TaskResourceFilter.java | 13 ++-----
 .../supervisor/SeekableStreamSupervisor.java       | 42 +++++++++++++++-------
 integration-tests/pom.xml                          |  4 ++-
 .../discovery/CuratorDruidLeaderSelector.java      |  7 +++-
 .../CuratorDruidNodeDiscoveryProvider.java         |  4 ++-
 .../druid/metadata/SqlSegmentsMetadataManager.java |  2 +-
 .../realtime/appenderator/AppenderatorImpl.java    |  9 +++--
 .../appenderator/BaseAppenderatorDriver.java       |  5 ++-
 .../apache/druid/sql/avatica/DruidStatement.java   | 10 ++++--
 17 files changed, 93 insertions(+), 41 deletions(-)

diff --git 
a/core/src/main/java/org/apache/druid/java/util/common/StringUtils.java 
b/core/src/main/java/org/apache/druid/java/util/common/StringUtils.java
index bf95400..97fb433 100644
--- a/core/src/main/java/org/apache/druid/java/util/common/StringUtils.java
+++ b/core/src/main/java/org/apache/druid/java/util/common/StringUtils.java
@@ -188,6 +188,21 @@ public class StringUtils
     }
   }
 
+  /**
+   * Encodes a string "s" for insertion into a format string.
+   *
+   * Returns null if the input is null.
+   */
+  @Nullable
+  public static String encodeForFormat(@Nullable final String s)
+  {
+    if (s == null) {
+      return null;
+    } else {
+      return StringUtils.replaceChar(s, '%', "%%");
+    }
+  }
+
   public static String toLowerCase(String s)
   {
     return s.toLowerCase(Locale.ENGLISH);
diff --git 
a/core/src/test/java/org/apache/druid/java/util/common/StringUtilsTest.java 
b/core/src/test/java/org/apache/druid/java/util/common/StringUtilsTest.java
index bd5c0b2..4f80740 100644
--- a/core/src/test/java/org/apache/druid/java/util/common/StringUtilsTest.java
+++ b/core/src/test/java/org/apache/druid/java/util/common/StringUtilsTest.java
@@ -174,6 +174,14 @@ public class StringUtilsTest
   }
 
   @Test
+  public void testEncodeForFormat()
+  {
+    Assert.assertEquals("x %% a %%s", StringUtils.encodeForFormat("x % a %s"));
+    Assert.assertEquals("", StringUtils.encodeForFormat(""));
+    Assert.assertNull(StringUtils.encodeForFormat(null));
+  }
+
+  @Test
   public void testURLEncodeSpace()
   {
     String s1 = StringUtils.urlEncode("aaa bbb");
diff --git 
a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java
 
b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java
index dd3db50..41647d5 100644
--- 
a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java
+++ 
b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java
@@ -137,7 +137,7 @@ public class MaterializedViewSupervisor implements 
Supervisor
             new DerivativeDataSourceMetadata(spec.getBaseDataSource(), 
spec.getDimensions(), spec.getMetrics())
         );
       }
-      exec = 
MoreExecutors.listeningDecorator(Execs.scheduledSingleThreaded(supervisorId));
+      exec = 
MoreExecutors.listeningDecorator(Execs.scheduledSingleThreaded(StringUtils.encodeForFormat(supervisorId)));
       final Duration delay = 
config.getTaskCheckDuration().toStandardDuration();
       future = exec.scheduleWithFixedDelay(
           MaterializedViewSupervisor.this::run,
diff --git 
a/extensions-core/druid-basic-security/src/main/java/org/apache/druid/security/basic/CommonCacheNotifier.java
 
b/extensions-core/druid-basic-security/src/main/java/org/apache/druid/security/basic/CommonCacheNotifier.java
index 608beb1..17b046e 100644
--- 
a/extensions-core/druid-basic-security/src/main/java/org/apache/druid/security/basic/CommonCacheNotifier.java
+++ 
b/extensions-core/druid-basic-security/src/main/java/org/apache/druid/security/basic/CommonCacheNotifier.java
@@ -88,7 +88,9 @@ public class CommonCacheNotifier
       String callerName
   )
   {
-    this.exec = Execs.singleThreaded(StringUtils.format("%s-notifierThread-", 
callerName) + "%d");
+    this.exec = Execs.singleThreaded(
+        StringUtils.format("%s-notifierThread-", 
StringUtils.encodeForFormat(callerName)) + "%d"
+    );
     this.callerName = callerName;
     this.updateQueue = new LinkedBlockingQueue<>();
     this.itemConfigMap = itemConfigMap;
diff --git 
a/extensions-core/kafka-extraction-namespace/src/main/java/org/apache/druid/query/lookup/KafkaLookupExtractorFactory.java
 
b/extensions-core/kafka-extraction-namespace/src/main/java/org/apache/druid/query/lookup/KafkaLookupExtractorFactory.java
index 28cbad2..a50402a 100644
--- 
a/extensions-core/kafka-extraction-namespace/src/main/java/org/apache/druid/query/lookup/KafkaLookupExtractorFactory.java
+++ 
b/extensions-core/kafka-extraction-namespace/src/main/java/org/apache/druid/query/lookup/KafkaLookupExtractorFactory.java
@@ -100,7 +100,7 @@ public class KafkaLookupExtractorFactory implements 
LookupExtractorFactory
     this.kafkaTopic = Preconditions.checkNotNull(kafkaTopic, "kafkaTopic 
required");
     this.kafkaProperties = Preconditions.checkNotNull(kafkaProperties, 
"kafkaProperties required");
     executorService = MoreExecutors.listeningDecorator(Execs.singleThreaded(
-        "kafka-factory-" + kafkaTopic + "-%s",
+        "kafka-factory-" + StringUtils.encodeForFormat(kafkaTopic) + "-%s",
         Thread.MIN_PRIORITY
     ));
     this.cacheManager = cacheManager;
diff --git 
a/extensions-core/lookups-cached-single/src/main/java/org/apache/druid/server/lookup/PollingLookup.java
 
b/extensions-core/lookups-cached-single/src/main/java/org/apache/druid/server/lookup/PollingLookup.java
index 0b1b14c..375f3d0 100644
--- 
a/extensions-core/lookups-cached-single/src/main/java/org/apache/druid/server/lookup/PollingLookup.java
+++ 
b/extensions-core/lookups-cached-single/src/main/java/org/apache/druid/server/lookup/PollingLookup.java
@@ -25,6 +25,7 @@ import 
com.google.common.util.concurrent.ListeningScheduledExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 import org.apache.druid.common.config.NullHandling;
 import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.query.lookup.LookupExtractor;
@@ -75,7 +76,7 @@ public class PollingLookup extends LookupExtractor
     refOfCacheKeeper.set(new 
CacheRefKeeper(this.cacheFactory.makeOf(dataFetcher.fetchAll())));
     if (pollPeriodMs > 0) {
       scheduledExecutorService = 
MoreExecutors.listeningDecorator(Executors.newSingleThreadScheduledExecutor(
-          Execs.makeThreadFactory("PollingLookup-" + id, Thread.MIN_PRIORITY)
+          Execs.makeThreadFactory("PollingLookup-" + 
StringUtils.encodeForFormat(id), Thread.MIN_PRIORITY)
       ));
       pollFuture = scheduledExecutorService.scheduleWithFixedDelay(
           pollAndSwap(),
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/IndexTaskClient.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/IndexTaskClient.java
index bb073f4..d358462 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/IndexTaskClient.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/IndexTaskClient.java
@@ -117,7 +117,7 @@ public abstract class IndexTaskClient implements 
AutoCloseable
             numThreads,
             StringUtils.format(
                 "IndexTaskClient-%s-%%d",
-                callerId
+                StringUtils.encodeForFormat(callerId)
             )
         )
     );
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitor.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitor.java
index 302d23a..2b75782 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitor.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitor.java
@@ -54,7 +54,7 @@ public class TaskMonitor<T extends Task>
 {
   private static final Logger log = new Logger(TaskMonitor.class);
 
-  private final ScheduledExecutorService taskStatusChecker = 
Execs.scheduledSingleThreaded(("task-monitor-%d"));
+  private final ScheduledExecutorService taskStatusChecker = 
Execs.scheduledSingleThreaded("task-monitor-%d");
 
   /**
    * A map of subTaskSpecId to {@link MonitorEntry}. This map stores the state 
of running {@link SubTaskSpec}s. This is
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/security/TaskResourceFilter.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/security/TaskResourceFilter.java
index 51b17f8..da1e5c5 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/security/TaskResourceFilter.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/security/TaskResourceFilter.java
@@ -21,7 +21,6 @@ package org.apache.druid.indexing.overlord.http.security;
 
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
-import com.google.common.base.Predicate;
 import com.google.common.collect.Iterables;
 import com.google.inject.Inject;
 import com.sun.jersey.spi.container.ContainerRequest;
@@ -39,7 +38,6 @@ import org.apache.druid.server.security.ResourceAction;
 import org.apache.druid.server.security.ResourceType;
 
 import javax.ws.rs.WebApplicationException;
-import javax.ws.rs.core.PathSegment;
 import javax.ws.rs.core.Response;
 
 /**
@@ -70,18 +68,11 @@ public class TaskResourceFilter extends 
AbstractResourceFilter
                .get(
                    Iterables.indexOf(
                        request.getPathSegments(),
-                       new Predicate<PathSegment>()
-                       {
-                         @Override
-                         public boolean apply(PathSegment input)
-                         {
-                           return "task".equals(input.getPath());
-                         }
-                       }
+                       input -> "task".equals(input.getPath())
                    ) + 1
                ).getPath()
     );
-    taskId = StringUtils.urlDecode(taskId);
+
     IdUtils.validateId("taskId", taskId);
 
     Optional<Task> taskOptional = taskStorageQueryAdapter.getTask(taskId);
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
index da6dc8b..1a4f35f 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
@@ -521,9 +521,9 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
     this.tuningConfig = spec.getTuningConfig();
     this.taskTuningConfig = this.tuningConfig.convertToTaskTuningConfig();
     this.supervisorId = supervisorId;
-    this.exec = Execs.singleThreaded(supervisorId);
-    this.scheduledExec = Execs.scheduledSingleThreaded(supervisorId + 
"-Scheduler-%d");
-    this.reportingExec = Execs.scheduledSingleThreaded(supervisorId + 
"-Reporting-%d");
+    this.exec = 
Execs.singleThreaded(StringUtils.encodeForFormat(supervisorId));
+    this.scheduledExec = 
Execs.scheduledSingleThreaded(StringUtils.encodeForFormat(supervisorId) + 
"-Scheduler-%d");
+    this.reportingExec = 
Execs.scheduledSingleThreaded(StringUtils.encodeForFormat(supervisorId) + 
"-Reporting-%d");
     this.stateManager = new SeekableStreamSupervisorStateManager(
         spec.getSupervisorStateManagerConfig(),
         spec.isSuspended()
@@ -533,7 +533,12 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
                          ? this.tuningConfig.getWorkerThreads()
                          : Math.min(10, this.ioConfig.getTaskCount()));
 
-    this.workerExec = 
MoreExecutors.listeningDecorator(Execs.multiThreaded(workerThreads, 
supervisorId + "-Worker-%d"));
+    this.workerExec = MoreExecutors.listeningDecorator(
+        Execs.multiThreaded(
+            workerThreads,
+            StringUtils.encodeForFormat(supervisorId) + "-Worker-%d"
+        )
+    );
     log.info("Created worker pool with [%d] threads for dataSource [%s]", 
workerThreads, this.dataSource);
 
     this.taskInfoProvider = new TaskInfoProvider()
@@ -921,6 +926,7 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
    * Collect row ingestion stats from all tasks managed by this supervisor.
    *
    * @return A map of groupId->taskId->task row stats
+   *
    * @throws InterruptedException
    * @throws ExecutionException
    * @throws TimeoutException
@@ -1597,7 +1603,8 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
       throw new RuntimeException(e);
     }
 
-    final DataSourceMetadata rawDataSourceMetadata = 
indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(dataSource);
+    final DataSourceMetadata rawDataSourceMetadata = 
indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(
+        dataSource);
 
     if (rawDataSourceMetadata != null && 
!checkSourceMetadataMatch(rawDataSourceMetadata)) {
       throw new IAE(
@@ -2088,13 +2095,13 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
    * It will mark the expired partitions in metadata and recompute the 
partition->task group mappings, updating
    * the metadata, the partitionIds list, and the partitionGroups mappings.
    *
-   * @param storedPartitions Set of partitions previously tracked, from the 
metadata store
-   * @param newlyClosedPartitions Set of partitions that are closed in the 
metadata store but still present in the
-   *                              current {@link #partitionIds}
+   * @param storedPartitions                Set of partitions previously 
tracked, from the metadata store
+   * @param newlyClosedPartitions           Set of partitions that are closed 
in the metadata store but still present in the
+   *                                        current {@link #partitionIds}
    * @param activePartitionsIdsFromSupplier Set of partitions currently 
returned by the record supplier, but with
    *                                        any partitions that are 
closed/expired in the metadata store removed
-   * @param previouslyExpiredPartitions Set of partitions that are recorded as 
expired in the metadata store
-   * @param partitionIdsFromSupplier Set of partitions currently returned by 
the record supplier.
+   * @param previouslyExpiredPartitions     Set of partitions that are 
recorded as expired in the metadata store
+   * @param partitionIdsFromSupplier        Set of partitions currently 
returned by the record supplier.
    */
   private void cleanupClosedAndExpiredPartitions(
       Set<PartitionIdType> storedPartitions,
@@ -2176,6 +2183,7 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
    * by this method.
    *
    * @param availablePartitions
+   *
    * @return a remapped copy of partitionGroups, containing only the 
partitions in availablePartitions
    */
   protected Map<Integer, Set<PartitionIdType>> 
recomputePartitionGroupsForExpiration(
@@ -2192,6 +2200,7 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
    *
    * @param currentMetadata     The current DataSourceMetadata from metadata 
storage
    * @param expiredPartitionIds The set of expired partition IDs.
+   *
    * @return currentMetadata but with any expired partitions removed.
    */
   protected SeekableStreamDataSourceMetadata<PartitionIdType, 
SequenceOffsetType> createDataSourceMetadataWithExpiredPartitions(
@@ -2804,6 +2813,7 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
    * should be removed from the starting offsets sent to the tasks.
    *
    * @param startingOffsets
+   *
    * @return startingOffsets with entries for expired partitions removed
    */
   protected Map<PartitionIdType, OrderedSequenceNumber<SequenceOffsetType>> 
filterExpiredPartitionsFromStartingOffsets(
@@ -2832,8 +2842,8 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
           minimumMessageTime = 
Optional.of(ioConfig.getLateMessageRejectionStartDateTime().get());
         } else {
           minimumMessageTime = 
(ioConfig.getLateMessageRejectionPeriod().isPresent() ? Optional.of(
-                                
DateTimes.nowUtc().minus(ioConfig.getLateMessageRejectionPeriod().get())
-                               ) : Optional.absent());
+              
DateTimes.nowUtc().minus(ioConfig.getLateMessageRejectionPeriod().get())
+          ) : Optional.absent());
         }
 
         Optional<DateTime> maximumMessageTime = 
(ioConfig.getEarlyMessageRejectionPeriod().isPresent() ? Optional.of(
@@ -3039,7 +3049,8 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
 
   private Map<PartitionIdType, SequenceOffsetType> 
getOffsetsFromMetadataStorage()
   {
-    final DataSourceMetadata dataSourceMetadata = 
indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(dataSource);
+    final DataSourceMetadata dataSourceMetadata = 
indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(
+        dataSource);
     if (dataSourceMetadata instanceof SeekableStreamDataSourceMetadata
         && checkSourceMetadataMatch(dataSourceMetadata)) {
       @SuppressWarnings("unchecked")
@@ -3338,6 +3349,7 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
    * the given replicas count
    *
    * @return list of specific kafka/kinesis index taksks
+   *
    * @throws JsonProcessingException
    */
   protected abstract List<SeekableStreamIndexTask<PartitionIdType, 
SequenceOffsetType>> createIndexTasks(
@@ -3355,6 +3367,7 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
    * different between Kafka/Kinesis since Kinesis uses String as partition id
    *
    * @param partition partition id
+   *
    * @return taskgroup id
    */
   protected abstract int getTaskGroupIdForPartition(PartitionIdType partition);
@@ -3364,6 +3377,7 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
    * of [kafka/kinesis]DataSourceMetadata
    *
    * @param metadata datasource metadata
+   *
    * @return true if isInstance else false
    */
   protected abstract boolean checkSourceMetadataMatch(DataSourceMetadata 
metadata);
@@ -3373,6 +3387,7 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
    * [Kafka/Kinesis]IndexTask
    *
    * @param task task
+   *
    * @return true if isInstance else false
    */
   protected abstract boolean doesTaskTypeMatchSupervisor(Task task);
@@ -3382,6 +3397,7 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
    *
    * @param stream stream name
    * @param map    partitionId -> sequence
+   *
    * @return specific instance of datasource metadata
    */
   protected abstract SeekableStreamDataSourceMetadata<PartitionIdType, 
SequenceOffsetType> createDataSourceMetaDataForReset(
diff --git a/integration-tests/pom.xml b/integration-tests/pom.xml
index 4f0fbe6..cdc3849 100644
--- a/integration-tests/pom.xml
+++ b/integration-tests/pom.xml
@@ -372,7 +372,9 @@
                 <docker.build.skip>false</docker.build.skip>
                 <override.config.path />
                 <resource.file.dir.path />
-                <extra.datasource.name.suffix>\ Россия\ 한국\ 
中国!?</extra.datasource.name.suffix>
+
+                <!-- Would like to put emojis in here too, but they throw 
"Input buffer too short" errors due to 
https://issues.apache.org/jira/browse/SUREFIRE-1865 -->
+                <extra.datasource.name.suffix>\ %Россия\ 한국\ 
中国!?</extra.datasource.name.suffix>
             </properties>
             <build>
                 <plugins>
diff --git 
a/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidLeaderSelector.java
 
b/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidLeaderSelector.java
index 10b2a6c..f3c0afe 100644
--- 
a/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidLeaderSelector.java
+++ 
b/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidLeaderSelector.java
@@ -181,7 +181,12 @@ public class CuratorDruidLeaderSelector implements 
DruidLeaderSelector
     }
     try {
       this.listener = listener;
-      this.listenerExecutor = 
Execs.singleThreaded(StringUtils.format("LeaderSelector[%s]", latchPath));
+      this.listenerExecutor = Execs.singleThreaded(
+          StringUtils.format(
+              "LeaderSelector[%s]",
+              StringUtils.encodeForFormat(latchPath)
+          )
+      );
 
       createNewLeaderLatchWithListener();
       leaderLatch.get().start();
diff --git 
a/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidNodeDiscoveryProvider.java
 
b/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidNodeDiscoveryProvider.java
index 383ec20..e9c8a36 100644
--- 
a/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidNodeDiscoveryProvider.java
+++ 
b/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidNodeDiscoveryProvider.java
@@ -204,7 +204,9 @@ public class CuratorDruidNodeDiscoveryProvider extends 
DruidNodeDiscoveryProvide
       this.jsonMapper = jsonMapper;
 
       // This is required to be single threaded from docs in PathChildrenCache.
-      this.cacheExecutor = 
Execs.singleThreaded(StringUtils.format("NodeRoleWatcher[%s]", nodeRole));
+      this.cacheExecutor = Execs.singleThreaded(
+          StringUtils.format("NodeRoleWatcher[%s]", 
StringUtils.encodeForFormat(nodeRole.toString()))
+      );
       cache = new PathChildrenCacheFactory.Builder()
           //NOTE: cacheData is temporarily set to false and we get data 
directly from ZK on each event.
           //this is a workaround to solve curator's out-of-order events problem
diff --git 
a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java
 
b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java
index 60f7f4b..68c333a 100644
--- 
a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java
+++ 
b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java
@@ -257,7 +257,7 @@ public class SqlSegmentsMetadataManager implements 
SegmentsMetadataManager
       if (exec != null) {
         return; // Already started
       }
-      exec = Execs.scheduledSingleThreaded(getClass().getName() + "-Exec--%d");
+      exec = 
Execs.scheduledSingleThreaded(StringUtils.encodeForFormat(getClass().getName()) 
+ "-Exec--%d");
     }
     finally {
       lock.unlock();
diff --git 
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java
 
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java
index c288837..ec9200c 100644
--- 
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java
+++ 
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java
@@ -968,21 +968,24 @@ public class AppenderatorImpl implements Appenderator
     if (persistExecutor == null) {
       // use a blocking single threaded executor to throttle the firehose when 
write to disk is slow
       persistExecutor = MoreExecutors.listeningDecorator(
-          Execs.newBlockingSingleThreaded("[" + myId + 
"]-appenderator-persist", maxPendingPersists)
+          Execs.newBlockingSingleThreaded(
+              "[" + StringUtils.encodeForFormat(myId) + 
"]-appenderator-persist",
+              maxPendingPersists
+          )
       );
     }
 
     if (pushExecutor == null) {
       // use a blocking single threaded executor to throttle the firehose when 
write to disk is slow
       pushExecutor = MoreExecutors.listeningDecorator(
-          Execs.newBlockingSingleThreaded("[" + myId + "]-appenderator-merge", 
1)
+          Execs.newBlockingSingleThreaded("[" + 
StringUtils.encodeForFormat(myId) + "]-appenderator-merge", 1)
       );
     }
 
     if (intermediateTempExecutor == null) {
       // use single threaded executor with SynchronousQueue so that all 
abandon operations occur sequentially
       intermediateTempExecutor = MoreExecutors.listeningDecorator(
-          Execs.newBlockingSingleThreaded("[" + myId + 
"]-appenderator-abandon", 0)
+          Execs.newBlockingSingleThreaded("[" + 
StringUtils.encodeForFormat(myId) + "]-appenderator-abandon", 0)
       );
     }
   }
diff --git 
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java
 
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java
index dc16d59..92b9f3c 100644
--- 
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java
+++ 
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java
@@ -38,6 +38,7 @@ import org.apache.druid.data.input.Committer;
 import org.apache.druid.data.input.InputRow;
 import org.apache.druid.indexing.overlord.SegmentPublishResult;
 import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.segment.loading.DataSegmentKiller;
@@ -254,7 +255,9 @@ public abstract class BaseAppenderatorDriver implements 
Closeable
     this.segmentAllocator = Preconditions.checkNotNull(segmentAllocator, 
"segmentAllocator");
     this.usedSegmentChecker = Preconditions.checkNotNull(usedSegmentChecker, 
"usedSegmentChecker");
     this.dataSegmentKiller = Preconditions.checkNotNull(dataSegmentKiller, 
"dataSegmentKiller");
-    this.executor = MoreExecutors.listeningDecorator(Execs.singleThreaded("[" 
+ appenderator.getId() + "]-publish"));
+    this.executor = MoreExecutors.listeningDecorator(
+        Execs.singleThreaded("[" + 
StringUtils.encodeForFormat(appenderator.getId()) + "]-publish")
+    );
   }
 
   @VisibleForTesting
diff --git a/sql/src/main/java/org/apache/druid/sql/avatica/DruidStatement.java 
b/sql/src/main/java/org/apache/druid/sql/avatica/DruidStatement.java
index 2b64c30..b618cf6 100644
--- a/sql/src/main/java/org/apache/druid/sql/avatica/DruidStatement.java
+++ b/sql/src/main/java/org/apache/druid/sql/avatica/DruidStatement.java
@@ -99,7 +99,11 @@ public class DruidStatement implements Closeable
     this.sqlLifecycle = Preconditions.checkNotNull(sqlLifecycle, 
"sqlLifecycle");
     this.onClose = Preconditions.checkNotNull(onClose, "onClose");
     this.yielderOpenCloseExecutor = Execs.singleThreaded(
-        
StringUtils.format("JDBCYielderOpenCloseExecutor-connection-%s-statement-%d", 
connectionId, statementId)
+        StringUtils.format(
+            "JDBCYielderOpenCloseExecutor-connection-%s-statement-%d",
+            StringUtils.encodeForFormat(connectionId),
+            statementId
+        )
     );
   }
 
@@ -360,11 +364,11 @@ public class DruidStatement implements Closeable
         type.getSqlTypeName().getJdbcOrdinal(),
         type.getSqlTypeName().getName(),
         Calcites.sqlTypeNameJdbcToJavaClass(type.getSqlTypeName()).getName(),
-        field.getName());
+        field.getName()
+    );
   }
 
 
-
   private DruidStatement closeAndPropagateThrowable(Throwable t)
   {
     this.throwable = t;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to