kfaraz commented on code in PR #16182:
URL: https://github.com/apache/druid/pull/16182#discussion_r1535468113


##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java:
##########
@@ -1000,11 +997,13 @@ public Optional<Task> getActiveTask(String id) throws 
JsonProcessingException
     if (task != null) {
       try {
         // Write and read the value using a mapper with password redaction 
mixin.
-        task = jsonMapper.readValue(jsonMapper.writeValueAsString(task), 
Task.class);
+        task = 
PASSWORD_REDACTING_MAPPER.readValue(PASSWORD_REDACTING_MAPPER.writeValueAsString(task),
 Task.class);
       }
       catch (JsonProcessingException e) {
-        log.error(e, "Failed to  serialize or deserialize task with id [%s].", 
task.getId());
-        throw e;
+        log.error(e, "Failed to serialize or deserialize task with id [%s].", 
task.getId());
+        throw DruidException.forPersona(DruidException.Persona.USER)
+                            
.ofCategory(DruidException.Category.RUNTIME_FAILURE)
+                            .build(e, "Failed to serialize or deserialize 
task.");

Review Comment:
   I think this exception should be targeted either at developer or operator, 
not the user persona.
   
   ```suggestion
           throw DruidException.forPersona(DruidException.Persona.OPERATOR)
                               
.ofCategory(DruidException.Category.RUNTIME_FAILURE)
                               .build(e, "Failed to serialize or deserialize 
task[%s].", task.getId());
   ```



##########
indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java:
##########
@@ -519,6 +550,95 @@ public void testGetTaskStatus()
     Assert.assertEquals(TaskStatus.failure(failedTask, failedTask), 
taskQueue.getTaskStatus(failedTask).get());
   }
 
+  @Test
+  public void testGetActiveTaskRedactsPassword() throws JsonProcessingException
+  {
+    final String password = "AbCd_1234";
+    final ObjectMapper mapper = getObjectMapper();
+
+    final HttpInputSourceConfig httpInputSourceConfig = new 
HttpInputSourceConfig(Collections.singleton("http"));
+    mapper.setInjectableValues(new InjectableValues.Std()
+                                   .addValue(HttpInputSourceConfig.class, 
httpInputSourceConfig)
+                                   .addValue(ObjectMapper.class, new 
DefaultObjectMapper())
+    );
+
+    final SQLMetadataConnector derbyConnector = 
derbyConnectorRule.getConnector();
+    final TaskStorage taskStorage = new MetadataTaskStorage(
+        derbyConnector,
+        new TaskStorageConfig(null),
+        new DerbyMetadataStorageActionHandlerFactory(
+            derbyConnector,
+            derbyConnectorRule.metadataTablesConfigSupplier().get(),
+            mapper
+        )
+    );
+
+    final TaskQueue taskQueue = new TaskQueue(
+        new TaskLockConfig(),
+        new TaskQueueConfig(null, null, null, null, null),
+        new DefaultTaskConfig(),
+        taskStorage,
+        EasyMock.createMock(HttpRemoteTaskRunner.class),
+        createActionClientFactory(),
+        new TaskLockbox(taskStorage, new 
TestIndexerMetadataStorageCoordinator()),
+        new StubServiceEmitter("druid/overlord", "testHost"),
+        mapper
+    );
+
+    final DataSchema dataSchema = new DataSchema(
+        "DS",
+        new TimestampSpec(null, null, null),
+        new DimensionsSpec(null),
+        null,
+        new UniformGranularitySpec(Granularities.YEAR, Granularities.DAY, 
null),
+        null
+    );
+    final ParallelIndexIOConfig ioConfig = new ParallelIndexIOConfig(
+        null,
+        new 
HttpInputSource(Collections.singletonList(URI.create("http://host.org";)),
+                            "user",
+                            new DefaultPasswordProvider(password),
+                            null,
+                            httpInputSourceConfig),
+        new NoopInputFormat(),
+        null,
+        null
+    );
+    final ParallelIndexSupervisorTask taskWithPassword = new 
ParallelIndexSupervisorTask(
+        "taskWithPassword",
+        "taskWithPassword",
+        null,
+        new ParallelIndexIngestionSpec(
+            dataSchema,
+            ioConfig,
+            null
+        ),
+        null,
+        null,
+        false
+    );
+    
Assert.assertTrue(mapper.writeValueAsString(taskWithPassword).contains(password));
+
+    taskQueue.start();
+    taskQueue.add(taskWithPassword);
+
+    final Optional<Task> taskFromTaskStorage = 
taskStorage.getTask(taskWithPassword.getId());

Review Comment:
   ```suggestion
       final Optional<Task> taskInStorage = 
taskStorage.getTask(taskWithPassword.getId());
   ```



##########
indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java:
##########
@@ -519,6 +550,95 @@ public void testGetTaskStatus()
     Assert.assertEquals(TaskStatus.failure(failedTask, failedTask), 
taskQueue.getTaskStatus(failedTask).get());
   }
 
+  @Test
+  public void testGetActiveTaskRedactsPassword() throws JsonProcessingException
+  {
+    final String password = "AbCd_1234";
+    final ObjectMapper mapper = getObjectMapper();
+
+    final HttpInputSourceConfig httpInputSourceConfig = new 
HttpInputSourceConfig(Collections.singleton("http"));

Review Comment:
   Nit: This need not be assigned to a variable, just construct it where needed.



##########
indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java:
##########
@@ -519,6 +550,95 @@ public void testGetTaskStatus()
     Assert.assertEquals(TaskStatus.failure(failedTask, failedTask), 
taskQueue.getTaskStatus(failedTask).get());
   }
 
+  @Test
+  public void testGetActiveTaskRedactsPassword() throws JsonProcessingException
+  {
+    final String password = "AbCd_1234";
+    final ObjectMapper mapper = getObjectMapper();
+
+    final HttpInputSourceConfig httpInputSourceConfig = new 
HttpInputSourceConfig(Collections.singleton("http"));
+    mapper.setInjectableValues(new InjectableValues.Std()
+                                   .addValue(HttpInputSourceConfig.class, 
httpInputSourceConfig)
+                                   .addValue(ObjectMapper.class, new 
DefaultObjectMapper())
+    );
+
+    final SQLMetadataConnector derbyConnector = 
derbyConnectorRule.getConnector();
+    final TaskStorage taskStorage = new MetadataTaskStorage(
+        derbyConnector,
+        new TaskStorageConfig(null),
+        new DerbyMetadataStorageActionHandlerFactory(
+            derbyConnector,
+            derbyConnectorRule.metadataTablesConfigSupplier().get(),
+            mapper
+        )
+    );
+
+    final TaskQueue taskQueue = new TaskQueue(
+        new TaskLockConfig(),
+        new TaskQueueConfig(null, null, null, null, null),
+        new DefaultTaskConfig(),
+        taskStorage,
+        EasyMock.createMock(HttpRemoteTaskRunner.class),
+        createActionClientFactory(),
+        new TaskLockbox(taskStorage, new 
TestIndexerMetadataStorageCoordinator()),
+        new StubServiceEmitter("druid/overlord", "testHost"),
+        mapper
+    );
+
+    final DataSchema dataSchema = new DataSchema(
+        "DS",
+        new TimestampSpec(null, null, null),
+        new DimensionsSpec(null),
+        null,
+        new UniformGranularitySpec(Granularities.YEAR, Granularities.DAY, 
null),
+        null
+    );
+    final ParallelIndexIOConfig ioConfig = new ParallelIndexIOConfig(
+        null,
+        new 
HttpInputSource(Collections.singletonList(URI.create("http://host.org";)),
+                            "user",
+                            new DefaultPasswordProvider(password),
+                            null,
+                            httpInputSourceConfig),
+        new NoopInputFormat(),
+        null,
+        null
+    );
+    final ParallelIndexSupervisorTask taskWithPassword = new 
ParallelIndexSupervisorTask(
+        "taskWithPassword",
+        "taskWithPassword",
+        null,
+        new ParallelIndexIngestionSpec(
+            dataSchema,
+            ioConfig,
+            null
+        ),
+        null,
+        null,
+        false
+    );
+    
Assert.assertTrue(mapper.writeValueAsString(taskWithPassword).contains(password));
+
+    taskQueue.start();
+    taskQueue.add(taskWithPassword);
+
+    final Optional<Task> taskFromTaskStorage = 
taskStorage.getTask(taskWithPassword.getId());
+    Assert.assertTrue(taskFromTaskStorage.isPresent());
+    Assert.assertNotNull(taskFromTaskStorage.get());
+    
Assert.assertFalse(mapper.writeValueAsString(taskFromTaskStorage.get()).contains(password));
+
+
+    final Optional<Task> taskFromTaskQueue = 
taskQueue.getActiveTask(taskWithPassword.getId());
+    Assert.assertTrue(taskFromTaskQueue.isPresent());
+    Assert.assertNotNull(taskFromTaskQueue.get());

Review Comment:
   Assign the result of `optional.get()` to a new variable and then do the 
assertions. 



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java:
##########
@@ -122,6 +121,7 @@ public class TaskQueue
   private final TaskActionClientFactory taskActionClientFactory;
   private final TaskLockbox taskLockbox;
   private final ServiceEmitter emitter;
+  private final ObjectMapper PASSWORD_REDACTING_MAPPER;

Review Comment:
   Rename since it is not a constant anymore.
   ```suggestion
     private final ObjectMapper passwordRedactingMapper;
   ```



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorageQueryAdapter.java:
##########
@@ -114,7 +114,7 @@ public Optional<Task> getTask(final String taskid)
           return activeTask;
         }
       }
-      catch (JsonProcessingException e) {
+      catch (DruidException e) {

Review Comment:
   Why are we swallowing this exception?



##########
indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java:
##########
@@ -519,6 +550,95 @@ public void testGetTaskStatus()
     Assert.assertEquals(TaskStatus.failure(failedTask, failedTask), 
taskQueue.getTaskStatus(failedTask).get());
   }
 
+  @Test
+  public void testGetActiveTaskRedactsPassword() throws JsonProcessingException
+  {
+    final String password = "AbCd_1234";
+    final ObjectMapper mapper = getObjectMapper();
+
+    final HttpInputSourceConfig httpInputSourceConfig = new 
HttpInputSourceConfig(Collections.singleton("http"));
+    mapper.setInjectableValues(new InjectableValues.Std()
+                                   .addValue(HttpInputSourceConfig.class, 
httpInputSourceConfig)
+                                   .addValue(ObjectMapper.class, new 
DefaultObjectMapper())
+    );
+
+    final SQLMetadataConnector derbyConnector = 
derbyConnectorRule.getConnector();
+    final TaskStorage taskStorage = new MetadataTaskStorage(
+        derbyConnector,
+        new TaskStorageConfig(null),
+        new DerbyMetadataStorageActionHandlerFactory(
+            derbyConnector,
+            derbyConnectorRule.metadataTablesConfigSupplier().get(),
+            mapper
+        )
+    );
+
+    final TaskQueue taskQueue = new TaskQueue(
+        new TaskLockConfig(),
+        new TaskQueueConfig(null, null, null, null, null),
+        new DefaultTaskConfig(),
+        taskStorage,
+        EasyMock.createMock(HttpRemoteTaskRunner.class),
+        createActionClientFactory(),
+        new TaskLockbox(taskStorage, new 
TestIndexerMetadataStorageCoordinator()),
+        new StubServiceEmitter("druid/overlord", "testHost"),
+        mapper
+    );
+
+    final DataSchema dataSchema = new DataSchema(
+        "DS",
+        new TimestampSpec(null, null, null),
+        new DimensionsSpec(null),
+        null,
+        new UniformGranularitySpec(Granularities.YEAR, Granularities.DAY, 
null),
+        null
+    );
+    final ParallelIndexIOConfig ioConfig = new ParallelIndexIOConfig(
+        null,
+        new 
HttpInputSource(Collections.singletonList(URI.create("http://host.org";)),
+                            "user",
+                            new DefaultPasswordProvider(password),
+                            null,
+                            httpInputSourceConfig),
+        new NoopInputFormat(),
+        null,
+        null
+    );
+    final ParallelIndexSupervisorTask taskWithPassword = new 
ParallelIndexSupervisorTask(
+        "taskWithPassword",
+        "taskWithPassword",
+        null,
+        new ParallelIndexIngestionSpec(
+            dataSchema,
+            ioConfig,
+            null
+        ),
+        null,
+        null,
+        false
+    );
+    
Assert.assertTrue(mapper.writeValueAsString(taskWithPassword).contains(password));
+
+    taskQueue.start();
+    taskQueue.add(taskWithPassword);
+
+    final Optional<Task> taskFromTaskStorage = 
taskStorage.getTask(taskWithPassword.getId());
+    Assert.assertTrue(taskFromTaskStorage.isPresent());
+    Assert.assertNotNull(taskFromTaskStorage.get());
+    
Assert.assertFalse(mapper.writeValueAsString(taskFromTaskStorage.get()).contains(password));
+
+
+    final Optional<Task> taskFromTaskQueue = 
taskQueue.getActiveTask(taskWithPassword.getId());

Review Comment:
   ```suggestion
       final Optional<Task> taskInQueue = 
taskQueue.getActiveTask(taskWithPassword.getId());
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to