kfaraz commented on code in PR #16182:
URL: https://github.com/apache/druid/pull/16182#discussion_r1535468113
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java:
##########
@@ -1000,11 +997,13 @@ public Optional<Task> getActiveTask(String id) throws
JsonProcessingException
if (task != null) {
try {
// Write and read the value using a mapper with password redaction
mixin.
- task = jsonMapper.readValue(jsonMapper.writeValueAsString(task),
Task.class);
+ task =
PASSWORD_REDACTING_MAPPER.readValue(PASSWORD_REDACTING_MAPPER.writeValueAsString(task),
Task.class);
}
catch (JsonProcessingException e) {
- log.error(e, "Failed to serialize or deserialize task with id [%s].",
task.getId());
- throw e;
+ log.error(e, "Failed to serialize or deserialize task with id [%s].",
task.getId());
+ throw DruidException.forPersona(DruidException.Persona.USER)
+
.ofCategory(DruidException.Category.RUNTIME_FAILURE)
+ .build(e, "Failed to serialize or deserialize
task.");
Review Comment:
I think this exception should be targeted either at developer or operator,
not the user persona.
```suggestion
throw DruidException.forPersona(DruidException.Persona.OPERATOR)
.ofCategory(DruidException.Category.RUNTIME_FAILURE)
.build(e, "Failed to serialize or deserialize
task[%s].", task.getId());
```
##########
indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java:
##########
@@ -519,6 +550,95 @@ public void testGetTaskStatus()
Assert.assertEquals(TaskStatus.failure(failedTask, failedTask),
taskQueue.getTaskStatus(failedTask).get());
}
+ @Test
+ public void testGetActiveTaskRedactsPassword() throws JsonProcessingException
+ {
+ final String password = "AbCd_1234";
+ final ObjectMapper mapper = getObjectMapper();
+
+ final HttpInputSourceConfig httpInputSourceConfig = new
HttpInputSourceConfig(Collections.singleton("http"));
+ mapper.setInjectableValues(new InjectableValues.Std()
+ .addValue(HttpInputSourceConfig.class,
httpInputSourceConfig)
+ .addValue(ObjectMapper.class, new
DefaultObjectMapper())
+ );
+
+ final SQLMetadataConnector derbyConnector =
derbyConnectorRule.getConnector();
+ final TaskStorage taskStorage = new MetadataTaskStorage(
+ derbyConnector,
+ new TaskStorageConfig(null),
+ new DerbyMetadataStorageActionHandlerFactory(
+ derbyConnector,
+ derbyConnectorRule.metadataTablesConfigSupplier().get(),
+ mapper
+ )
+ );
+
+ final TaskQueue taskQueue = new TaskQueue(
+ new TaskLockConfig(),
+ new TaskQueueConfig(null, null, null, null, null),
+ new DefaultTaskConfig(),
+ taskStorage,
+ EasyMock.createMock(HttpRemoteTaskRunner.class),
+ createActionClientFactory(),
+ new TaskLockbox(taskStorage, new
TestIndexerMetadataStorageCoordinator()),
+ new StubServiceEmitter("druid/overlord", "testHost"),
+ mapper
+ );
+
+ final DataSchema dataSchema = new DataSchema(
+ "DS",
+ new TimestampSpec(null, null, null),
+ new DimensionsSpec(null),
+ null,
+ new UniformGranularitySpec(Granularities.YEAR, Granularities.DAY,
null),
+ null
+ );
+ final ParallelIndexIOConfig ioConfig = new ParallelIndexIOConfig(
+ null,
+ new
HttpInputSource(Collections.singletonList(URI.create("http://host.org")),
+ "user",
+ new DefaultPasswordProvider(password),
+ null,
+ httpInputSourceConfig),
+ new NoopInputFormat(),
+ null,
+ null
+ );
+ final ParallelIndexSupervisorTask taskWithPassword = new
ParallelIndexSupervisorTask(
+ "taskWithPassword",
+ "taskWithPassword",
+ null,
+ new ParallelIndexIngestionSpec(
+ dataSchema,
+ ioConfig,
+ null
+ ),
+ null,
+ null,
+ false
+ );
+
Assert.assertTrue(mapper.writeValueAsString(taskWithPassword).contains(password));
+
+ taskQueue.start();
+ taskQueue.add(taskWithPassword);
+
+ final Optional<Task> taskFromTaskStorage =
taskStorage.getTask(taskWithPassword.getId());
Review Comment:
```suggestion
final Optional<Task> taskInStorage =
taskStorage.getTask(taskWithPassword.getId());
```
##########
indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java:
##########
@@ -519,6 +550,95 @@ public void testGetTaskStatus()
Assert.assertEquals(TaskStatus.failure(failedTask, failedTask),
taskQueue.getTaskStatus(failedTask).get());
}
+ @Test
+ public void testGetActiveTaskRedactsPassword() throws JsonProcessingException
+ {
+ final String password = "AbCd_1234";
+ final ObjectMapper mapper = getObjectMapper();
+
+ final HttpInputSourceConfig httpInputSourceConfig = new
HttpInputSourceConfig(Collections.singleton("http"));
Review Comment:
Nit: This need not be assigned to a variable, just construct it where needed.
##########
indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java:
##########
@@ -519,6 +550,95 @@ public void testGetTaskStatus()
Assert.assertEquals(TaskStatus.failure(failedTask, failedTask),
taskQueue.getTaskStatus(failedTask).get());
}
+ @Test
+ public void testGetActiveTaskRedactsPassword() throws JsonProcessingException
+ {
+ final String password = "AbCd_1234";
+ final ObjectMapper mapper = getObjectMapper();
+
+ final HttpInputSourceConfig httpInputSourceConfig = new
HttpInputSourceConfig(Collections.singleton("http"));
+ mapper.setInjectableValues(new InjectableValues.Std()
+ .addValue(HttpInputSourceConfig.class,
httpInputSourceConfig)
+ .addValue(ObjectMapper.class, new
DefaultObjectMapper())
+ );
+
+ final SQLMetadataConnector derbyConnector =
derbyConnectorRule.getConnector();
+ final TaskStorage taskStorage = new MetadataTaskStorage(
+ derbyConnector,
+ new TaskStorageConfig(null),
+ new DerbyMetadataStorageActionHandlerFactory(
+ derbyConnector,
+ derbyConnectorRule.metadataTablesConfigSupplier().get(),
+ mapper
+ )
+ );
+
+ final TaskQueue taskQueue = new TaskQueue(
+ new TaskLockConfig(),
+ new TaskQueueConfig(null, null, null, null, null),
+ new DefaultTaskConfig(),
+ taskStorage,
+ EasyMock.createMock(HttpRemoteTaskRunner.class),
+ createActionClientFactory(),
+ new TaskLockbox(taskStorage, new
TestIndexerMetadataStorageCoordinator()),
+ new StubServiceEmitter("druid/overlord", "testHost"),
+ mapper
+ );
+
+ final DataSchema dataSchema = new DataSchema(
+ "DS",
+ new TimestampSpec(null, null, null),
+ new DimensionsSpec(null),
+ null,
+ new UniformGranularitySpec(Granularities.YEAR, Granularities.DAY,
null),
+ null
+ );
+ final ParallelIndexIOConfig ioConfig = new ParallelIndexIOConfig(
+ null,
+ new
HttpInputSource(Collections.singletonList(URI.create("http://host.org")),
+ "user",
+ new DefaultPasswordProvider(password),
+ null,
+ httpInputSourceConfig),
+ new NoopInputFormat(),
+ null,
+ null
+ );
+ final ParallelIndexSupervisorTask taskWithPassword = new
ParallelIndexSupervisorTask(
+ "taskWithPassword",
+ "taskWithPassword",
+ null,
+ new ParallelIndexIngestionSpec(
+ dataSchema,
+ ioConfig,
+ null
+ ),
+ null,
+ null,
+ false
+ );
+
Assert.assertTrue(mapper.writeValueAsString(taskWithPassword).contains(password));
+
+ taskQueue.start();
+ taskQueue.add(taskWithPassword);
+
+ final Optional<Task> taskFromTaskStorage =
taskStorage.getTask(taskWithPassword.getId());
+ Assert.assertTrue(taskFromTaskStorage.isPresent());
+ Assert.assertNotNull(taskFromTaskStorage.get());
+
Assert.assertFalse(mapper.writeValueAsString(taskFromTaskStorage.get()).contains(password));
+
+
+ final Optional<Task> taskFromTaskQueue =
taskQueue.getActiveTask(taskWithPassword.getId());
+ Assert.assertTrue(taskFromTaskQueue.isPresent());
+ Assert.assertNotNull(taskFromTaskQueue.get());
Review Comment:
Assign the result of `optional.get()` to a new variable and then do the
assertions.
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java:
##########
@@ -122,6 +121,7 @@ public class TaskQueue
private final TaskActionClientFactory taskActionClientFactory;
private final TaskLockbox taskLockbox;
private final ServiceEmitter emitter;
+ private final ObjectMapper PASSWORD_REDACTING_MAPPER;
Review Comment:
Rename since it is not a constant anymore.
```suggestion
private final ObjectMapper passwordRedactingMapper;
```
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorageQueryAdapter.java:
##########
@@ -114,7 +114,7 @@ public Optional<Task> getTask(final String taskid)
return activeTask;
}
}
- catch (JsonProcessingException e) {
+ catch (DruidException e) {
Review Comment:
Why are we swallowing this exception?
##########
indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java:
##########
@@ -519,6 +550,95 @@ public void testGetTaskStatus()
Assert.assertEquals(TaskStatus.failure(failedTask, failedTask),
taskQueue.getTaskStatus(failedTask).get());
}
+ @Test
+ public void testGetActiveTaskRedactsPassword() throws JsonProcessingException
+ {
+ final String password = "AbCd_1234";
+ final ObjectMapper mapper = getObjectMapper();
+
+ final HttpInputSourceConfig httpInputSourceConfig = new
HttpInputSourceConfig(Collections.singleton("http"));
+ mapper.setInjectableValues(new InjectableValues.Std()
+ .addValue(HttpInputSourceConfig.class,
httpInputSourceConfig)
+ .addValue(ObjectMapper.class, new
DefaultObjectMapper())
+ );
+
+ final SQLMetadataConnector derbyConnector =
derbyConnectorRule.getConnector();
+ final TaskStorage taskStorage = new MetadataTaskStorage(
+ derbyConnector,
+ new TaskStorageConfig(null),
+ new DerbyMetadataStorageActionHandlerFactory(
+ derbyConnector,
+ derbyConnectorRule.metadataTablesConfigSupplier().get(),
+ mapper
+ )
+ );
+
+ final TaskQueue taskQueue = new TaskQueue(
+ new TaskLockConfig(),
+ new TaskQueueConfig(null, null, null, null, null),
+ new DefaultTaskConfig(),
+ taskStorage,
+ EasyMock.createMock(HttpRemoteTaskRunner.class),
+ createActionClientFactory(),
+ new TaskLockbox(taskStorage, new
TestIndexerMetadataStorageCoordinator()),
+ new StubServiceEmitter("druid/overlord", "testHost"),
+ mapper
+ );
+
+ final DataSchema dataSchema = new DataSchema(
+ "DS",
+ new TimestampSpec(null, null, null),
+ new DimensionsSpec(null),
+ null,
+ new UniformGranularitySpec(Granularities.YEAR, Granularities.DAY,
null),
+ null
+ );
+ final ParallelIndexIOConfig ioConfig = new ParallelIndexIOConfig(
+ null,
+ new
HttpInputSource(Collections.singletonList(URI.create("http://host.org")),
+ "user",
+ new DefaultPasswordProvider(password),
+ null,
+ httpInputSourceConfig),
+ new NoopInputFormat(),
+ null,
+ null
+ );
+ final ParallelIndexSupervisorTask taskWithPassword = new
ParallelIndexSupervisorTask(
+ "taskWithPassword",
+ "taskWithPassword",
+ null,
+ new ParallelIndexIngestionSpec(
+ dataSchema,
+ ioConfig,
+ null
+ ),
+ null,
+ null,
+ false
+ );
+
Assert.assertTrue(mapper.writeValueAsString(taskWithPassword).contains(password));
+
+ taskQueue.start();
+ taskQueue.add(taskWithPassword);
+
+ final Optional<Task> taskFromTaskStorage =
taskStorage.getTask(taskWithPassword.getId());
+ Assert.assertTrue(taskFromTaskStorage.isPresent());
+ Assert.assertNotNull(taskFromTaskStorage.get());
+
Assert.assertFalse(mapper.writeValueAsString(taskFromTaskStorage.get()).contains(password));
+
+
+ final Optional<Task> taskFromTaskQueue =
taskQueue.getActiveTask(taskWithPassword.getId());
Review Comment:
```suggestion
final Optional<Task> taskInQueue =
taskQueue.getActiveTask(taskWithPassword.getId());
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]