This is an automated email from the ASF dual-hosted git repository.
amatya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new cfa2a901b3a Redact passwords from tasks fetched from the TaskQueue
(#16182)
cfa2a901b3a is described below
commit cfa2a901b3a9b1b070831ddbb5887951f4c63e1d
Author: AmatyaAvadhanula <[email protected]>
AuthorDate: Sat Mar 23 14:22:11 2024 +0530
Redact passwords from tasks fetched from the TaskQueue (#16182)
* Redact passwords from tasks fetched from the TaskQueue
---
.../apache/druid/indexing/overlord/TaskMaster.java | 7 +-
.../apache/druid/indexing/overlord/TaskQueue.java | 31 ++++-
.../concurrent/ConcurrentReplaceAndAppendTest.java | 3 +-
.../druid/indexing/overlord/TaskLifecycleTest.java | 2 +-
.../indexing/overlord/TaskLockConfigTest.java | 4 +-
.../indexing/overlord/TaskQueueScaleTest.java | 3 +-
.../druid/indexing/overlord/TaskQueueTest.java | 138 +++++++++++++++++++--
.../druid/indexing/overlord/http/OverlordTest.java | 4 +-
8 files changed, 172 insertions(+), 20 deletions(-)
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java
index 4798513aafd..9829e1ffa19 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java
@@ -19,6 +19,7 @@
package org.apache.druid.indexing.overlord;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional;
import com.google.inject.Inject;
import org.apache.druid.client.indexing.IndexingService;
@@ -94,7 +95,8 @@ public class TaskMaster implements TaskCountStatsProvider,
TaskSlotCountStatsPro
final SupervisorManager supervisorManager,
final OverlordDutyExecutor overlordDutyExecutor,
@IndexingService final DruidLeaderSelector overlordLeaderSelector,
- final SegmentAllocationQueue segmentAllocationQueue
+ final SegmentAllocationQueue segmentAllocationQueue,
+ final ObjectMapper mapper
)
{
this.supervisorManager = supervisorManager;
@@ -125,7 +127,8 @@ public class TaskMaster implements TaskCountStatsProvider,
TaskSlotCountStatsPro
taskRunner,
taskActionClientFactory,
taskLockbox,
- emitter
+ emitter,
+ mapper
);
// Sensible order to start stuff:
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
index 6dbf6e70798..d3ca29d3abc 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
@@ -19,6 +19,8 @@
package org.apache.druid.indexing.overlord;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
@@ -56,6 +58,8 @@ import
org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.metadata.PasswordProvider;
+import org.apache.druid.metadata.PasswordProviderRedactionMixIn;
import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
import org.apache.druid.utils.CollectionUtils;
@@ -117,6 +121,7 @@ public class TaskQueue
private final TaskActionClientFactory taskActionClientFactory;
private final TaskLockbox taskLockbox;
private final ServiceEmitter emitter;
+ private final ObjectMapper passwordRedactingMapper;
private final ReentrantLock giant = new ReentrantLock(true);
@SuppressWarnings("MismatchedQueryAndUpdateOfCollection")
@@ -160,7 +165,8 @@ public class TaskQueue
TaskRunner taskRunner,
TaskActionClientFactory taskActionClientFactory,
TaskLockbox taskLockbox,
- ServiceEmitter emitter
+ ServiceEmitter emitter,
+ ObjectMapper mapper
)
{
this.lockConfig = Preconditions.checkNotNull(lockConfig, "lockConfig");
@@ -175,6 +181,8 @@ public class TaskQueue
config.getTaskCompleteHandlerNumThreads(),
"TaskQueue-OnComplete-%d"
);
+ this.passwordRedactingMapper = mapper.copy()
+ .addMixIn(PasswordProvider.class,
PasswordProviderRedactionMixIn.class);
}
@VisibleForTesting
@@ -970,15 +978,34 @@ public class TaskQueue
return stats;
}
+ /**
+ * Returns an optional containing the task payload after successfully
redacting credentials.
+ * Returns an absent optional if there is no task payload corresponding to
the taskId in memory.
+ * Throws DruidException if password could not be redacted due to
serialization / deserialization failure
+ */
public Optional<Task> getActiveTask(String id)
{
+ Task task;
giant.lock();
try {
- return Optional.fromNullable(tasks.get(id));
+ task = tasks.get(id);
}
finally {
giant.unlock();
}
+ if (task != null) {
+ try {
+ // Write and read the value using a mapper with password redaction
mixin.
+ task =
passwordRedactingMapper.readValue(passwordRedactingMapper.writeValueAsString(task),
Task.class);
+ }
+ catch (JsonProcessingException e) {
+ log.error(e, "Failed to serialize or deserialize task with id [%s].",
task.getId());
+ throw DruidException.forPersona(DruidException.Persona.OPERATOR)
+
.ofCategory(DruidException.Category.RUNTIME_FAILURE)
+ .build(e, "Failed to serialize or deserialize
task[%s].", task.getId());
+ }
+ }
+ return Optional.fromNullable(task);
}
@VisibleForTesting
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java
index 91c7d6a7175..864d84dbfa1 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java
@@ -142,7 +142,8 @@ public class ConcurrentReplaceAndAppendTest extends
IngestionTestBase
taskRunner,
taskActionClientFactory,
getLockbox(),
- new NoopServiceEmitter()
+ new NoopServiceEmitter(),
+ getObjectMapper()
);
runningTasks.clear();
taskQueue.start();
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
index 852ea0df02d..b60bc10a8de 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
@@ -696,7 +696,7 @@ public class TaskLifecycleTest extends
InitializedNullHandlingTest
TaskQueueConfig.class
);
- return new TaskQueue(lockConfig, tqc, new DefaultTaskConfig(), ts, tr,
tac, taskLockbox, emitter);
+ return new TaskQueue(lockConfig, tqc, new DefaultTaskConfig(), ts, tr,
tac, taskLockbox, emitter, mapper);
}
@After
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockConfigTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockConfigTest.java
index 7d2fbcd7923..1c90802f480 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockConfigTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockConfigTest.java
@@ -30,6 +30,7 @@ import
org.apache.druid.indexing.overlord.config.DefaultTaskConfig;
import org.apache.druid.indexing.overlord.config.TaskLockConfig;
import org.apache.druid.indexing.overlord.config.TaskQueueConfig;
import org.apache.druid.indexing.test.TestIndexerMetadataStorageCoordinator;
+import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.easymock.EasyMock;
@@ -120,7 +121,8 @@ public class TaskLockConfigTest
taskRunner,
actionClientFactory,
lockbox,
- emitter
+ emitter,
+ new DefaultObjectMapper()
);
}
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueScaleTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueScaleTest.java
index ade1314884e..1d478d9cd14 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueScaleTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueScaleTest.java
@@ -122,7 +122,8 @@ public class TaskQueueScaleTest
taskRunner,
unsupportedTaskActionFactory, // Not used for anything serious
new TaskLockbox(taskStorage, storageCoordinator),
- new NoopServiceEmitter()
+ new NoopServiceEmitter(),
+ jsonMapper
);
taskQueue.start();
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java
index 8ee341c5db1..6a45cbb4c79 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java
@@ -19,6 +19,9 @@
package org.apache.druid.indexing.overlord;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.InjectableValues;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
@@ -26,6 +29,11 @@ import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.curator.framework.CuratorFramework;
import org.apache.druid.common.guava.DSuppliers;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.HttpInputSource;
+import org.apache.druid.data.input.impl.HttpInputSourceConfig;
+import org.apache.druid.data.input.impl.NoopInputFormat;
+import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
import org.apache.druid.discovery.WorkerNodeService;
import org.apache.druid.error.DruidException;
@@ -38,10 +46,14 @@ import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
+import org.apache.druid.indexing.common.config.TaskStorageConfig;
import org.apache.druid.indexing.common.task.AbstractBatchIndexTask;
import org.apache.druid.indexing.common.task.IngestionTestBase;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.Tasks;
+import
org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIOConfig;
+import
org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIngestionSpec;
+import
org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask;
import
org.apache.druid.indexing.common.task.batch.parallel.SinglePhaseParallelIndexTaskRunner;
import org.apache.druid.indexing.overlord.autoscaling.NoopProvisioningStrategy;
import org.apache.druid.indexing.overlord.autoscaling.ScalingStats;
@@ -53,9 +65,11 @@ import
org.apache.druid.indexing.overlord.hrtr.HttpRemoteTaskRunner;
import org.apache.druid.indexing.overlord.hrtr.HttpRemoteTaskRunnerTest;
import org.apache.druid.indexing.overlord.hrtr.WorkerHolder;
import org.apache.druid.indexing.overlord.setup.DefaultWorkerBehaviorConfig;
+import org.apache.druid.indexing.test.TestIndexerMetadataStorageCoordinator;
import org.apache.druid.indexing.worker.TaskAnnouncement;
import org.apache.druid.indexing.worker.Worker;
import org.apache.druid.indexing.worker.config.WorkerConfig;
+import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
@@ -64,7 +78,12 @@ import
org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.java.util.metrics.StubServiceEmitter;
+import org.apache.druid.metadata.DefaultPasswordProvider;
+import org.apache.druid.metadata.DerbyMetadataStorageActionHandlerFactory;
+import org.apache.druid.metadata.SQLMetadataConnector;
import org.apache.druid.segment.TestHelper;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
import org.apache.druid.server.initialization.IndexerZkConfig;
import org.apache.druid.server.initialization.ZkPathsConfig;
@@ -77,6 +96,7 @@ import org.junit.Test;
import org.mockito.Mockito;
import javax.annotation.Nullable;
+import java.net.URI;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
@@ -109,7 +129,8 @@ public class TaskQueueTest extends IngestionTestBase
new SimpleTaskRunner(actionClientFactory),
actionClientFactory,
getLockbox(),
- new NoopServiceEmitter()
+ new NoopServiceEmitter(),
+ getObjectMapper()
);
taskQueue.setActive(true);
// task1 emulates a case when there is a task that was issued before task2
and acquired locks conflicting
@@ -154,7 +175,8 @@ public class TaskQueueTest extends IngestionTestBase
new SimpleTaskRunner(actionClientFactory),
actionClientFactory,
getLockbox(),
- new NoopServiceEmitter()
+ new NoopServiceEmitter(),
+ getObjectMapper()
);
taskQueue.setActive(true);
@@ -194,7 +216,8 @@ public class TaskQueueTest extends IngestionTestBase
new SimpleTaskRunner(actionClientFactory),
actionClientFactory,
getLockbox(),
- new NoopServiceEmitter()
+ new NoopServiceEmitter(),
+ getObjectMapper()
);
taskQueue.setActive(true);
@@ -219,7 +242,8 @@ public class TaskQueueTest extends IngestionTestBase
new SimpleTaskRunner(actionClientFactory),
actionClientFactory,
getLockbox(),
- new NoopServiceEmitter()
+ new NoopServiceEmitter(),
+ getObjectMapper()
);
taskQueue.setActive(true);
final Task task = new TestTask("t1", Intervals.of("2021-01-01/P1D"));
@@ -254,7 +278,8 @@ public class TaskQueueTest extends IngestionTestBase
new SimpleTaskRunner(actionClientFactory),
actionClientFactory,
getLockbox(),
- new NoopServiceEmitter()
+ new NoopServiceEmitter(),
+ getObjectMapper()
);
taskQueue.setActive(true);
final Task task = new TestTask("t1", Intervals.of("2021-01-01/P1D"));
@@ -279,7 +304,8 @@ public class TaskQueueTest extends IngestionTestBase
new SimpleTaskRunner(actionClientFactory),
actionClientFactory,
getLockbox(),
- new NoopServiceEmitter()
+ new NoopServiceEmitter(),
+ getObjectMapper()
);
taskQueue.setActive(true);
final Task task = new TestTask(
@@ -321,7 +347,8 @@ public class TaskQueueTest extends IngestionTestBase
new SimpleTaskRunner(actionClientFactory),
actionClientFactory,
getLockbox(),
- new NoopServiceEmitter()
+ new NoopServiceEmitter(),
+ getObjectMapper()
);
taskQueue.setActive(true);
final Task task = new TestTask("t1", Intervals.of("2021-01-01/P1D"));
@@ -344,7 +371,8 @@ public class TaskQueueTest extends IngestionTestBase
new SimpleTaskRunner(actionClientFactory),
actionClientFactory,
getLockbox(),
- new NoopServiceEmitter()
+ new NoopServiceEmitter(),
+ getObjectMapper()
);
taskQueue.setActive(true);
final Task task = new TestTask(
@@ -374,7 +402,8 @@ public class TaskQueueTest extends IngestionTestBase
new SimpleTaskRunner(actionClientFactory),
actionClientFactory,
getLockbox(),
- new NoopServiceEmitter()
+ new NoopServiceEmitter(),
+ getObjectMapper()
);
taskQueue.setActive(true);
final Task task = new TestTask("t1", Intervals.of("2021-01-01/P1D"))
@@ -421,7 +450,8 @@ public class TaskQueueTest extends IngestionTestBase
taskRunner,
actionClientFactory,
getLockbox(),
- metricsVerifier
+ metricsVerifier,
+ getObjectMapper()
);
taskQueue.setActive(true);
final Task task = new TestTask(
@@ -507,7 +537,8 @@ public class TaskQueueTest extends IngestionTestBase
taskRunner,
createActionClientFactory(),
getLockbox(),
- new StubServiceEmitter("druid/overlord", "testHost")
+ new StubServiceEmitter("druid/overlord", "testHost"),
+ getObjectMapper()
);
taskQueue.setActive(true);
@@ -519,6 +550,91 @@ public class TaskQueueTest extends IngestionTestBase
Assert.assertEquals(TaskStatus.failure(failedTask, failedTask),
taskQueue.getTaskStatus(failedTask).get());
}
+ @Test
+ public void testGetActiveTaskRedactsPassword() throws JsonProcessingException
+ {
+ final String password = "AbCd_1234";
+ final ObjectMapper mapper = getObjectMapper();
+
+ final HttpInputSourceConfig httpInputSourceConfig = new
HttpInputSourceConfig(Collections.singleton("http"));
+ mapper.setInjectableValues(new InjectableValues.Std()
+ .addValue(HttpInputSourceConfig.class,
httpInputSourceConfig)
+ .addValue(ObjectMapper.class, new
DefaultObjectMapper())
+ );
+
+ final SQLMetadataConnector derbyConnector =
derbyConnectorRule.getConnector();
+ final TaskStorage taskStorage = new MetadataTaskStorage(
+ derbyConnector,
+ new TaskStorageConfig(null),
+ new DerbyMetadataStorageActionHandlerFactory(
+ derbyConnector,
+ derbyConnectorRule.metadataTablesConfigSupplier().get(),
+ mapper
+ )
+ );
+
+ final TaskQueue taskQueue = new TaskQueue(
+ new TaskLockConfig(),
+ new TaskQueueConfig(null, null, null, null, null),
+ new DefaultTaskConfig(),
+ taskStorage,
+ EasyMock.createMock(HttpRemoteTaskRunner.class),
+ createActionClientFactory(),
+ new TaskLockbox(taskStorage, new
TestIndexerMetadataStorageCoordinator()),
+ new StubServiceEmitter("druid/overlord", "testHost"),
+ mapper
+ );
+
+ final DataSchema dataSchema = new DataSchema(
+ "DS",
+ new TimestampSpec(null, null, null),
+ new DimensionsSpec(null),
+ null,
+ new UniformGranularitySpec(Granularities.YEAR, Granularities.DAY,
null),
+ null
+ );
+ final ParallelIndexIOConfig ioConfig = new ParallelIndexIOConfig(
+ null,
+ new
HttpInputSource(Collections.singletonList(URI.create("http://host.org")),
+ "user",
+ new DefaultPasswordProvider(password),
+ null,
+ httpInputSourceConfig),
+ new NoopInputFormat(),
+ null,
+ null
+ );
+ final ParallelIndexSupervisorTask taskWithPassword = new
ParallelIndexSupervisorTask(
+ "taskWithPassword",
+ "taskWithPassword",
+ null,
+ new ParallelIndexIngestionSpec(
+ dataSchema,
+ ioConfig,
+ null
+ ),
+ null,
+ null,
+ false
+ );
+
Assert.assertTrue(mapper.writeValueAsString(taskWithPassword).contains(password));
+
+ taskQueue.start();
+ taskQueue.add(taskWithPassword);
+
+ final Optional<Task> taskInStorage =
taskStorage.getTask(taskWithPassword.getId());
+ Assert.assertTrue(taskInStorage.isPresent());
+ final String taskInStorageAsString =
mapper.writeValueAsString(taskInStorage.get());
+ Assert.assertFalse(taskInStorageAsString.contains(password));
+
+ final Optional<Task> taskInQueue =
taskQueue.getActiveTask(taskWithPassword.getId());
+ Assert.assertTrue(taskInQueue.isPresent());
+ final String taskInQueueAsString =
mapper.writeValueAsString(taskInQueue.get());
+ Assert.assertFalse(taskInQueueAsString.contains(password));
+
+ Assert.assertEquals(taskInStorageAsString, taskInQueueAsString);
+ }
+
private HttpRemoteTaskRunner createHttpRemoteTaskRunner(List<String>
runningTasks)
{
HttpRemoteTaskRunnerTest.TestDruidNodeDiscovery druidNodeDiscovery = new
HttpRemoteTaskRunnerTest.TestDruidNodeDiscovery();
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java
index b0df15ce4ef..066e4c6c58a 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java
@@ -65,6 +65,7 @@ import
org.apache.druid.indexing.overlord.config.TaskQueueConfig;
import org.apache.druid.indexing.overlord.duty.OverlordDutyExecutor;
import org.apache.druid.indexing.overlord.supervisor.SupervisorManager;
import org.apache.druid.indexing.test.TestIndexerMetadataStorageCoordinator;
+import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.concurrent.Execs;
@@ -247,7 +248,8 @@ public class OverlordTest
supervisorManager,
EasyMock.createNiceMock(OverlordDutyExecutor.class),
new TestDruidLeaderSelector(),
- EasyMock.createNiceMock(SegmentAllocationQueue.class)
+ EasyMock.createNiceMock(SegmentAllocationQueue.class),
+ new DefaultObjectMapper()
);
EmittingLogger.registerEmitter(serviceEmitter);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]