This is an automated email from the ASF dual-hosted git repository.

fjy pushed a commit to branch 0.18.0
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/0.18.0 by this push:
     new b46d06a  check paths used for shuffle intermediary data manager get 
and delete (#9630) (#9640)
b46d06a is described below

commit b46d06abc3cc07909b9a52ecb18074937dbcf449
Author: Clint Wylie <cwy...@apache.org>
AuthorDate: Tue Apr 7 14:33:46 2020 -0700

    check paths used for shuffle intermediary data manager get and delete 
(#9630) (#9640)
    
    * check paths used for shuffle intermediary data manager get and delete
    
    * add test
    
    * newline
    
    * meh
---
 .../java/org/apache/druid/indexer/TaskIdUtils.java |  63 ++++++++++++
 .../org/apache/druid/indexer/TaskIdUtilsTest.java  | 111 +++++++++++++++++++++
 .../druid/indexing/kafka/KafkaConsumerConfigs.java |   4 +-
 .../indexing/kafka/supervisor/KafkaSupervisor.java |   4 +-
 .../kinesis/supervisor/KinesisSupervisor.java      |   4 +-
 .../druid/indexing/common/task/AbstractTask.java   |   4 +-
 .../indexing/common/task/utils/RandomIdUtils.java  |  34 -------
 .../supervisor/SeekableStreamSupervisor.java       |   6 +-
 .../indexing/worker/IntermediaryDataManager.java   |   3 +
 ...ermediaryDataManagerManualAddAndDeleteTest.java |  64 +++++++++++-
 .../tests/indexer/AbstractKafkaIndexerTest.java    |   4 +-
 server/pom.xml                                     |   6 ++
 .../apache/druid/segment/indexing/DataSchema.java  |  14 +--
 .../druid/segment/indexing/DataSchemaTest.java     |  66 ++++++++++--
 14 files changed, 316 insertions(+), 71 deletions(-)

diff --git a/core/src/main/java/org/apache/druid/indexer/TaskIdUtils.java 
b/core/src/main/java/org/apache/druid/indexer/TaskIdUtils.java
new file mode 100644
index 0000000..a88341b
--- /dev/null
+++ b/core/src/main/java/org/apache/druid/indexer/TaskIdUtils.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexer;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import org.apache.druid.java.util.common.StringUtils;
+
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class TaskIdUtils
+{
+  private static final Pattern INVALIDCHARS = Pattern.compile("(?s).*[^\\S 
].*");
+
+  public static void validateId(String thingToValidate, String 
stringToValidate)
+  {
+    Preconditions.checkArgument(
+        !Strings.isNullOrEmpty(stringToValidate),
+        StringUtils.format("%s cannot be null or empty. Please provide a %s.", 
thingToValidate, thingToValidate)
+    );
+    Preconditions.checkArgument(
+        !stringToValidate.startsWith("."),
+        StringUtils.format("%s cannot start with the '.' character.", 
thingToValidate)
+    );
+    Preconditions.checkArgument(
+        !stringToValidate.contains("/"),
+        StringUtils.format("%s cannot contain the '/' character.", 
thingToValidate)
+    );
+    Matcher m = INVALIDCHARS.matcher(stringToValidate);
+    Preconditions.checkArgument(
+        !m.matches(),
+        StringUtils.format("%s cannot contain whitespace character except 
space.", thingToValidate)
+    );
+  }
+
+  public static String getRandomId()
+  {
+    final StringBuilder suffix = new StringBuilder(8);
+    for (int i = 0; i < Integer.BYTES * 2; ++i) {
+      suffix.append((char) ('a' + ((ThreadLocalRandom.current().nextInt() >>> 
(i * 4)) & 0x0F)));
+    }
+    return suffix.toString();
+  }
+}
diff --git a/core/src/test/java/org/apache/druid/indexer/TaskIdUtilsTest.java 
b/core/src/test/java/org/apache/druid/indexer/TaskIdUtilsTest.java
new file mode 100644
index 0000000..5fed8fb
--- /dev/null
+++ b/core/src/test/java/org/apache/druid/indexer/TaskIdUtilsTest.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexer;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+public class TaskIdUtilsTest
+{
+  private static final String THINGO = "thingToValidate";
+  public static final String VALID_ID_CHARS = "alpha123..*~!@#&%^&*()-+ 
Россия\\ 한국 中国!";
+
+  @Rule
+  public ExpectedException expectedException = ExpectedException.none();
+
+  @Test
+  public void testValidIdName()
+  {
+    TaskIdUtils.validateId(THINGO, VALID_ID_CHARS);
+  }
+
+  @Test
+  public void testInvalidNull()
+  {
+    expectedException.expect(IllegalArgumentException.class);
+    expectedException.expectMessage("thingToValidate cannot be null or empty. 
Please provide a thingToValidate.");
+    TaskIdUtils.validateId(THINGO, null);
+  }
+
+  @Test
+  public void testInvalidEmpty()
+  {
+    expectedException.expect(IllegalArgumentException.class);
+    expectedException.expectMessage("thingToValidate cannot be null or empty. 
Please provide a thingToValidate.");
+    TaskIdUtils.validateId(THINGO, "");
+  }
+
+  @Test
+  public void testInvalidSlashes()
+  {
+    expectedException.expect(IllegalArgumentException.class);
+    expectedException.expectMessage("thingToValidate cannot contain the '/' 
character.");
+    TaskIdUtils.validateId(THINGO, 
"/paths/are/bad/since/we/make/files/from/stuff");
+  }
+
+  @Test
+  public void testInvalidLeadingDot()
+  {
+    expectedException.expect(IllegalArgumentException.class);
+    expectedException.expectMessage("thingToValidate cannot start with the '.' 
character.");
+    TaskIdUtils.validateId(THINGO, "./nice/try");
+  }
+
+  @Test
+  public void testInvalidSpacesRegexTabs()
+  {
+    expectedException.expect(IllegalArgumentException.class);
+    expectedException.expectMessage("thingToValidate cannot contain whitespace 
character except space.");
+    TaskIdUtils.validateId(THINGO, 
"spaces\tare\tbetter\tthan\ttabs\twhich\tare\tillegal");
+  }
+
+  @Test
+  public void testInvalidSpacesRegexNewline()
+  {
+    expectedException.expect(IllegalArgumentException.class);
+    expectedException.expectMessage("thingToValidate cannot contain whitespace 
character except space.");
+    TaskIdUtils.validateId(THINGO, "new\nline");
+  }
+
+  @Test
+  public void testInvalidSpacesRegexCarriageReturn()
+  {
+    expectedException.expect(IllegalArgumentException.class);
+    expectedException.expectMessage("thingToValidate cannot contain whitespace 
character except space.");
+    TaskIdUtils.validateId(THINGO, "does\rexist\rby\ritself");
+  }
+
+  @Test
+  public void testInvalidSpacesRegexLineTabulation()
+  {
+    expectedException.expect(IllegalArgumentException.class);
+    expectedException.expectMessage("thingToValidate cannot contain whitespace 
character except space.");
+    TaskIdUtils.validateId(THINGO, "wtf\u000Bis line tabulation");
+  }
+
+  @Test
+  public void testInvalidSpacesRegexFormFeed()
+  {
+    expectedException.expect(IllegalArgumentException.class);
+    expectedException.expectMessage("thingToValidate cannot contain whitespace 
character except space.");
+    TaskIdUtils.validateId(THINGO, "form\u000cfeed?");
+  }
+}
diff --git 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerConfigs.java
 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerConfigs.java
index 39174d5..7339d26 100644
--- 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerConfigs.java
+++ 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerConfigs.java
@@ -19,7 +19,7 @@
 
 package org.apache.druid.indexing.kafka;
 
-import org.apache.druid.indexing.common.task.utils.RandomIdUtils;
+import org.apache.druid.indexer.TaskIdUtils;
 import org.apache.druid.java.util.common.StringUtils;
 
 import java.util.HashMap;
@@ -35,7 +35,7 @@ public class KafkaConsumerConfigs
   {
     final Map<String, Object> props = new HashMap<>();
     props.put("metadata.max.age.ms", "10000");
-    props.put("group.id", StringUtils.format("kafka-supervisor-%s", 
RandomIdUtils.getRandomId()));
+    props.put("group.id", StringUtils.format("kafka-supervisor-%s", 
TaskIdUtils.getRandomId()));
     props.put("auto.offset.reset", "none");
     props.put("enable.auto.commit", "false");
     props.put("isolation.level", "read_committed");
diff --git 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
index f5cec3d..ba12128 100644
--- 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
+++ 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
@@ -24,10 +24,10 @@ import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Joiner;
+import org.apache.druid.indexer.TaskIdUtils;
 import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
 import org.apache.druid.indexing.common.task.Task;
 import org.apache.druid.indexing.common.task.TaskResource;
-import org.apache.druid.indexing.common.task.utils.RandomIdUtils;
 import org.apache.druid.indexing.kafka.KafkaDataSourceMetadata;
 import org.apache.druid.indexing.kafka.KafkaIndexTask;
 import org.apache.druid.indexing.kafka.KafkaIndexTaskClientFactory;
@@ -221,7 +221,7 @@ public class KafkaSupervisor extends 
SeekableStreamSupervisor<Integer, Long>
 
     List<SeekableStreamIndexTask<Integer, Long>> taskList = new ArrayList<>();
     for (int i = 0; i < replicas; i++) {
-      String taskId = Joiner.on("_").join(baseSequenceName, 
RandomIdUtils.getRandomId());
+      String taskId = Joiner.on("_").join(baseSequenceName, 
TaskIdUtils.getRandomId());
       taskList.add(new KafkaIndexTask(
           taskId,
           new TaskResource(baseSequenceName, 1),
diff --git 
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
 
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
index 13f94a5..c789fc7 100644
--- 
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
+++ 
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
@@ -25,10 +25,10 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Joiner;
 import com.google.common.collect.ImmutableMap;
 import org.apache.druid.common.aws.AWSCredentialsConfig;
+import org.apache.druid.indexer.TaskIdUtils;
 import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
 import org.apache.druid.indexing.common.task.Task;
 import org.apache.druid.indexing.common.task.TaskResource;
-import org.apache.druid.indexing.common.task.utils.RandomIdUtils;
 import org.apache.druid.indexing.kinesis.KinesisDataSourceMetadata;
 import org.apache.druid.indexing.kinesis.KinesisIndexTask;
 import org.apache.druid.indexing.kinesis.KinesisIndexTaskClientFactory;
@@ -169,7 +169,7 @@ public class KinesisSupervisor extends 
SeekableStreamSupervisor<String, String>
 
     List<SeekableStreamIndexTask<String, String>> taskList = new ArrayList<>();
     for (int i = 0; i < replicas; i++) {
-      String taskId = Joiner.on("_").join(baseSequenceName, 
RandomIdUtils.getRandomId());
+      String taskId = Joiner.on("_").join(baseSequenceName, 
TaskIdUtils.getRandomId());
       taskList.add(new KinesisIndexTask(
           taskId,
           new TaskResource(baseSequenceName, 1),
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java
index d47f900..40745bf 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java
@@ -24,11 +24,11 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Joiner;
 import com.google.common.base.Objects;
 import com.google.common.base.Preconditions;
+import org.apache.druid.indexer.TaskIdUtils;
 import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexing.common.TaskLock;
 import org.apache.druid.indexing.common.actions.LockListAction;
 import org.apache.druid.indexing.common.actions.TaskActionClient;
-import org.apache.druid.indexing.common.task.utils.RandomIdUtils;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.query.Query;
 import org.apache.druid.query.QueryRunner;
@@ -92,7 +92,7 @@ public abstract class AbstractTask implements Task
     }
 
     final List<Object> objects = new ArrayList<>();
-    final String suffix = RandomIdUtils.getRandomId();
+    final String suffix = TaskIdUtils.getRandomId();
     objects.add(typeName);
     objects.add(dataSource);
     objects.add(suffix);
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/utils/RandomIdUtils.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/utils/RandomIdUtils.java
deleted file mode 100644
index a782b66..0000000
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/utils/RandomIdUtils.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.indexing.common.task.utils;
-
-import java.util.concurrent.ThreadLocalRandom;
-
-public class RandomIdUtils
-{
-  public static String getRandomId()
-  {
-    final StringBuilder suffix = new StringBuilder(8);
-    for (int i = 0; i < Integer.BYTES * 2; ++i) {
-      suffix.append((char) ('a' + ((ThreadLocalRandom.current().nextInt() >>> 
(i * 4)) & 0x0F)));
-    }
-    return suffix.toString();
-  }
-}
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
index 948b687..682a560 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
@@ -430,13 +430,13 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
     }
   }
 
-  // Map<{group RandomIdUtils}, {actively reading task group}>; see 
documentation for TaskGroup class
+  // Map<{group id}, {actively reading task group}>; see documentation for 
TaskGroup class
   private final ConcurrentHashMap<Integer, TaskGroup> 
activelyReadingTaskGroups = new ConcurrentHashMap<>();
 
   // After telling a taskGroup to stop reading and begin publishing a segment, 
it is moved from [activelyReadingTaskGroups] to here so
   // we can monitor its status while we queue new tasks to read the next range 
of sequences. This is a list since we could
   // have multiple sets of tasks publishing at once if time-to-publish > 
taskDuration.
-  // Map<{group RandomIdUtils}, List<{pending completion task groups}>>
+  // Map<{group id}, List<{pending completion task groups}>>
   private final ConcurrentHashMap<Integer, CopyOnWriteArrayList<TaskGroup>> 
pendingCompletionTaskGroups = new ConcurrentHashMap<>();
 
   // We keep two separate maps for tracking the current state of 
partition->task group mappings [partitionGroups] and partition->offset
@@ -998,7 +998,7 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
     group.tasks.putAll(tasks.stream().collect(Collectors.toMap(x -> x, x -> 
new TaskData())));
     if (activelyReadingTaskGroups.putIfAbsent(taskGroupId, group) != null) {
       throw new ISE(
-          "trying to add taskGroup with RandomIdUtils [%s] to actively reading 
task groups, but group already exists.",
+          "trying to add taskGroup with id [%s] to actively reading task 
groups, but group already exists.",
           taskGroupId
       );
     }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java
index b35b191..78090ca 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java
@@ -27,6 +27,7 @@ import org.apache.commons.lang3.mutable.MutableInt;
 import org.apache.druid.client.indexing.IndexingServiceClient;
 import org.apache.druid.client.indexing.TaskStatus;
 import org.apache.druid.guice.ManageLifecycle;
+import org.apache.druid.indexer.TaskIdUtils;
 import org.apache.druid.indexing.common.config.TaskConfig;
 import org.apache.druid.indexing.worker.config.WorkerConfig;
 import org.apache.druid.java.util.common.DateTimes;
@@ -336,6 +337,7 @@ public class IntermediaryDataManager
   @Nullable
   public File findPartitionFile(String supervisorTaskId, String subTaskId, 
Interval interval, int partitionId)
   {
+    TaskIdUtils.validateId("supervisorTaskId", supervisorTaskId);
     for (StorageLocation location : shuffleDataLocations) {
       final File partitionDir = new File(location.getPath(), 
getPartitionDir(supervisorTaskId, interval, partitionId));
       if (partitionDir.exists()) {
@@ -364,6 +366,7 @@ public class IntermediaryDataManager
 
   public void deletePartitions(String supervisorTaskId) throws IOException
   {
+    TaskIdUtils.validateId("supervisorTaskId", supervisorTaskId);
     for (StorageLocation location : shuffleDataLocations) {
       final File supervisorTaskPath = new File(location.getPath(), 
supervisorTaskId);
       if (supervisorTaskPath.exists()) {
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/worker/IntermediaryDataManagerManualAddAndDeleteTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/worker/IntermediaryDataManagerManualAddAndDeleteTest.java
index 1e1eab4..15aad92 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/worker/IntermediaryDataManagerManualAddAndDeleteTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/worker/IntermediaryDataManagerManualAddAndDeleteTest.java
@@ -26,6 +26,7 @@ import 
org.apache.druid.client.indexing.NoopIndexingServiceClient;
 import org.apache.druid.indexing.common.config.TaskConfig;
 import org.apache.druid.indexing.worker.config.WorkerConfig;
 import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.segment.loading.StorageLocationConfig;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.partition.NumberedShardSpec;
@@ -51,11 +52,15 @@ public class IntermediaryDataManagerManualAddAndDeleteTest
   public ExpectedException expectedException = ExpectedException.none();
 
   private IntermediaryDataManager intermediaryDataManager;
+  private File intermediarySegmentsLocation;
+  private File siblingLocation;
 
   @Before
   public void setup() throws IOException
   {
     final WorkerConfig workerConfig = new WorkerConfig();
+    intermediarySegmentsLocation = tempDir.newFolder();
+    siblingLocation = tempDir.newFolder();
     final TaskConfig taskConfig = new TaskConfig(
         null,
         null,
@@ -65,7 +70,7 @@ public class IntermediaryDataManagerManualAddAndDeleteTest
         false,
         null,
         null,
-        ImmutableList.of(new StorageLocationConfig(tempDir.newFolder(), 600L, 
null))
+        ImmutableList.of(new 
StorageLocationConfig(intermediarySegmentsLocation, 600L, null))
     );
     final IndexingServiceClient indexingServiceClient = new 
NoopIndexingServiceClient();
     intermediaryDataManager = new IntermediaryDataManager(workerConfig, 
taskConfig, indexingServiceClient);
@@ -157,6 +162,63 @@ public class IntermediaryDataManagerManualAddAndDeleteTest
     intermediaryDataManager.addSegment(supervisorTaskId, "subTaskId", segment, 
segmentFile);
   }
 
+  @Test
+  public void testFailsWithCraftyFabricatedNamesForDelete() throws IOException
+  {
+    expectedException.expect(IllegalArgumentException.class);
+    expectedException.expectMessage("supervisorTaskId cannot start with the 
'.' character.");
+    final String supervisorTaskId = "../" + siblingLocation.getName();
+    final String someFile = "sneaky-snake.txt";
+    File dataFile = new File(siblingLocation, someFile);
+    FileUtils.write(
+        dataFile,
+        "test data",
+        StandardCharsets.UTF_8
+    );
+    Assert.assertTrue(new File(intermediarySegmentsLocation, 
supervisorTaskId).exists());
+    Assert.assertTrue(dataFile.exists());
+    intermediaryDataManager.deletePartitions(supervisorTaskId);
+    Assert.assertTrue(new File(intermediarySegmentsLocation, 
supervisorTaskId).exists());
+    Assert.assertTrue(dataFile.exists());
+  }
+
+  @Test
+  public void testFailsWithCraftyFabricatedNamesForFind() throws IOException
+  {
+    expectedException.expect(IllegalArgumentException.class);
+    expectedException.expectMessage("supervisorTaskId cannot start with the 
'.' character.");
+    final String supervisorTaskId = "../" + siblingLocation.getName();
+    final Interval interval = Intervals.of("2018/2019");
+    final int partitionId = 0;
+    final String intervalAndPart =
+        StringUtils.format("%s/%s/%s", interval.getStart().toString(), 
interval.getEnd().toString(), partitionId);
+
+    final String someFile = "sneaky-snake.txt";
+
+    final String someFilePath = intervalAndPart + "/" + someFile;
+
+    // can only traverse to find files that are in a path of the form 
{start}/{end}/{partitionId}, so write a data file
+    // in a location like that
+    File dataFile = new File(siblingLocation, someFilePath);
+    FileUtils.write(
+        dataFile,
+        "test data",
+        StandardCharsets.UTF_8
+    );
+
+    Assert.assertTrue(new File(intermediarySegmentsLocation, 
supervisorTaskId).exists());
+    Assert.assertTrue(
+        new File(intermediarySegmentsLocation, supervisorTaskId + "/" + 
someFilePath).exists());
+
+    final File foundFile1 = intermediaryDataManager.findPartitionFile(
+        supervisorTaskId,
+        someFile,
+        interval,
+        partitionId
+    );
+    Assert.assertNull(foundFile1);
+  }
+
   private File generateSegmentDir(String fileName) throws IOException
   {
     // Each file size is 138 bytes after compression
diff --git 
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexerTest.java
 
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexerTest.java
index 87a2cce..7ea8bd4 100644
--- 
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexerTest.java
+++ 
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexerTest.java
@@ -28,7 +28,7 @@ import kafka.utils.ZkUtils;
 import org.I0Itec.zkclient.ZkClient;
 import org.I0Itec.zkclient.ZkConnection;
 import org.apache.commons.io.IOUtils;
-import org.apache.druid.indexing.common.task.utils.RandomIdUtils;
+import org.apache.druid.indexer.TaskIdUtils;
 import org.apache.druid.indexing.kafka.KafkaConsumerConfigs;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.ISE;
@@ -161,7 +161,7 @@ abstract class AbstractKafkaIndexerTest extends 
AbstractIndexerTest
     properties.setProperty("value.serializer", 
ByteArraySerializer.class.getName());
     if (txnEnabled) {
       properties.setProperty("enable.idempotence", "true");
-      properties.setProperty("transactional.id", RandomIdUtils.getRandomId());
+      properties.setProperty("transactional.id", TaskIdUtils.getRandomId());
     }
 
     KafkaProducer<String, String> producer = new KafkaProducer<>(
diff --git a/server/pom.xml b/server/pom.xml
index 7bfa266..b2bab34 100644
--- a/server/pom.xml
+++ b/server/pom.xml
@@ -430,6 +430,12 @@
             <artifactId>equalsverifier</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-text</artifactId>
+            <version>1.3</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
diff --git 
a/server/src/main/java/org/apache/druid/segment/indexing/DataSchema.java 
b/server/src/main/java/org/apache/druid/segment/indexing/DataSchema.java
index 37009da..6be9a71 100644
--- a/server/src/main/java/org/apache/druid/segment/indexing/DataSchema.java
+++ b/server/src/main/java/org/apache/druid/segment/indexing/DataSchema.java
@@ -27,12 +27,12 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
 import com.google.common.collect.Sets;
 import org.apache.druid.data.input.impl.DimensionsSpec;
 import org.apache.druid.data.input.impl.InputRowParser;
 import org.apache.druid.data.input.impl.ParseSpec;
 import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.indexer.TaskIdUtils;
 import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.query.aggregation.AggregatorFactory;
@@ -45,7 +45,6 @@ import java.util.Arrays;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
-import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
@@ -148,16 +147,7 @@ public class DataSchema
 
   private static void validateDatasourceName(String dataSource)
   {
-    Preconditions.checkArgument(
-        !Strings.isNullOrEmpty(dataSource),
-        "dataSource cannot be null or empty. Please provide a dataSource."
-    );
-    Matcher m = INVALIDCHARS.matcher(dataSource);
-    Preconditions.checkArgument(
-        !m.matches(),
-        "dataSource cannot contain whitespace character except space."
-    );
-    Preconditions.checkArgument(!dataSource.contains("/"), "dataSource cannot 
contain the '/' character.");
+    TaskIdUtils.validateId("dataSource", dataSource);
   }
 
   private static DimensionsSpec computeDimensionsSpec(
diff --git 
a/server/src/test/java/org/apache/druid/segment/indexing/DataSchemaTest.java 
b/server/src/test/java/org/apache/druid/segment/indexing/DataSchemaTest.java
index de674ae..8ce50d3 100644
--- a/server/src/test/java/org/apache/druid/segment/indexing/DataSchemaTest.java
+++ b/server/src/test/java/org/apache/druid/segment/indexing/DataSchemaTest.java
@@ -21,14 +21,17 @@ package org.apache.druid.segment.indexing;
 
 import com.fasterxml.jackson.databind.JsonMappingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.exc.ValueInstantiationException;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
+import org.apache.commons.text.StringEscapeUtils;
 import org.apache.druid.data.input.InputRow;
 import org.apache.druid.data.input.impl.DimensionsSpec;
 import org.apache.druid.data.input.impl.JSONParseSpec;
 import org.apache.druid.data.input.impl.StringInputRowParser;
 import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.indexer.TaskIdUtilsTest;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.Intervals;
@@ -43,6 +46,7 @@ import org.apache.druid.segment.TestHelper;
 import org.apache.druid.segment.indexing.granularity.ArbitraryGranularitySpec;
 import org.apache.druid.segment.transform.ExpressionTransform;
 import org.apache.druid.segment.transform.TransformSpec;
+import org.apache.druid.testing.InitializedNullHandlingTest;
 import org.hamcrest.CoreMatchers;
 import org.junit.Assert;
 import org.junit.Rule;
@@ -54,9 +58,10 @@ import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 
-public class DataSchemaTest
+public class DataSchemaTest extends InitializedNullHandlingTest
 {
   @Rule
   public ExpectedException expectedException = ExpectedException.none();
@@ -79,7 +84,7 @@ public class DataSchemaTest
     );
 
     DataSchema schema = new DataSchema(
-        "test",
+        TaskIdUtilsTest.VALID_ID_CHARS,
         parser,
         new AggregatorFactory[]{
             new DoubleSumAggregatorFactory("metric1", "col1"),
@@ -116,7 +121,7 @@ public class DataSchemaTest
     );
 
     DataSchema schema = new DataSchema(
-        "test",
+        TaskIdUtilsTest.VALID_ID_CHARS,
         parser,
         new AggregatorFactory[]{
             new DoubleSumAggregatorFactory("metric1", "col1"),
@@ -153,7 +158,7 @@ public class DataSchemaTest
     );
 
     DataSchema schema = new DataSchema(
-        "test",
+        TaskIdUtilsTest.VALID_ID_CHARS,
         parserMap,
         new AggregatorFactory[]{
             new DoubleSumAggregatorFactory("metric1", "col1"),
@@ -211,7 +216,7 @@ public class DataSchemaTest
     );
 
     DataSchema schema = new DataSchema(
-        "test",
+        TaskIdUtilsTest.VALID_ID_CHARS,
         parser,
         new AggregatorFactory[]{
             new DoubleSumAggregatorFactory("metric1", "col1"),
@@ -244,7 +249,7 @@ public class DataSchemaTest
     );
 
     DataSchema schema = new DataSchema(
-        "test",
+        TaskIdUtilsTest.VALID_ID_CHARS,
         parser,
         new AggregatorFactory[]{
             new DoubleSumAggregatorFactory("metric1", "col1"),
@@ -262,7 +267,7 @@ public class DataSchemaTest
   public void testSerdeWithInvalidParserMap() throws Exception
   {
     String jsonStr = "{"
-                     + "\"dataSource\":\"test\","
+                     + "\"dataSource\":\"" + 
StringEscapeUtils.escapeJson(TaskIdUtilsTest.VALID_ID_CHARS) + "\","
                      + "\"parser\":{\"type\":\"invalid\"},"
                      + 
"\"metricsSpec\":[{\"type\":\"doubleSum\",\"name\":\"metric1\",\"fieldName\":\"col1\"}],"
                      + "\"granularitySpec\":{"
@@ -365,7 +370,7 @@ public class DataSchemaTest
   public void testSerde() throws Exception
   {
     String jsonStr = "{"
-                     + "\"dataSource\":\"test\","
+                     + "\"dataSource\":\"" + 
StringEscapeUtils.escapeJson(TaskIdUtilsTest.VALID_ID_CHARS) + "\","
                      + "\"parser\":{"
                      + "\"type\":\"string\","
                      + "\"parseSpec\":{"
@@ -389,7 +394,7 @@ public class DataSchemaTest
         DataSchema.class
     );
 
-    Assert.assertEquals(actual.getDataSource(), "test");
+    Assert.assertEquals(actual.getDataSource(), 
TaskIdUtilsTest.VALID_ID_CHARS);
     Assert.assertEquals(
         actual.getParser().getParseSpec(),
         new JSONParseSpec(
@@ -415,6 +420,45 @@ public class DataSchemaTest
   }
 
   @Test
+  public void testSerializeWithInvalidDataSourceName() throws Exception
+  {
+    // Escape backslashes to insert a tab character in the datasource name.
+    List<String> datasources = ImmutableList.of("", "../invalid", "\tname", 
"name\t invalid");
+    for (String datasource : datasources) {
+      String jsonStr = "{"
+                       + "\"dataSource\":\"" + 
StringEscapeUtils.escapeJson(datasource) + "\","
+                       + "\"parser\":{"
+                       + "\"type\":\"string\","
+                       + "\"parseSpec\":{"
+                       + "\"format\":\"json\","
+                       + "\"timestampSpec\":{\"column\":\"xXx\", \"format\": 
\"auto\", \"missingValue\": null},"
+                       + "\"dimensionsSpec\":{\"dimensions\":[], 
\"dimensionExclusions\":[]},"
+                       + "\"flattenSpec\":{\"useFieldDiscovery\":true, 
\"fields\":[]},"
+                       + "\"featureSpec\":{}},"
+                       + "\"encoding\":\"UTF-8\""
+                       + "},"
+                       + 
"\"metricsSpec\":[{\"type\":\"doubleSum\",\"name\":\"metric1\",\"fieldName\":\"col1\"}],"
+                       + "\"granularitySpec\":{"
+                       + "\"type\":\"arbitrary\","
+                       + 
"\"queryGranularity\":{\"type\":\"duration\",\"duration\":86400000,\"origin\":\"1970-01-01T00:00:00.000Z\"},"
+                       + 
"\"intervals\":[\"2014-01-01T00:00:00.000Z/2015-01-01T00:00:00.000Z\"]}}";
+      try {
+        jsonMapper.readValue(
+            jsonMapper.writeValueAsString(
+                jsonMapper.readValue(jsonStr, DataSchema.class)
+            ),
+            DataSchema.class
+        );
+      }
+      catch (ValueInstantiationException e) {
+        Assert.assertEquals(IllegalArgumentException.class, 
e.getCause().getClass());
+        continue;
+      }
+      Assert.fail("Serialization of datasource " + datasource + " should have 
failed.");
+    }
+  }
+
+  @Test
   public void testSerdeWithUpdatedDataSchemaAddedField() throws IOException
   {
     Map<String, Object> parser = jsonMapper.convertValue(
@@ -430,7 +474,7 @@ public class DataSchemaTest
     );
 
     DataSchema originalSchema = new DataSchema(
-        "test",
+        TaskIdUtilsTest.VALID_ID_CHARS,
         parser,
         new AggregatorFactory[]{
             new DoubleSumAggregatorFactory("metric1", "col1"),
@@ -469,7 +513,7 @@ public class DataSchemaTest
     );
 
     TestModifiedDataSchema originalSchema = new TestModifiedDataSchema(
-        "test",
+        TaskIdUtilsTest.VALID_ID_CHARS,
         null,
         null,
         new AggregatorFactory[]{


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org

Reply via email to