This is an automated email from the ASF dual-hosted git repository.
abhishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 7d48414 Optimize supervisor history retrieval for specific id (#11807)
7d48414 is described below
commit 7d4841471fd271ac24edde376f234abd00010233
Author: David Bar <[email protected]>
AuthorDate: Tue Oct 19 11:38:25 2021 +0300
Optimize supervisor history retrieval for specific id (#11807)
Optimization. Fetch from the metadata store only the relevant history items
for the requested supervisor id.
---
.../overlord/supervisor/SupervisorManager.java | 5 ++
.../overlord/supervisor/SupervisorResource.java | 5 +-
.../overlord/supervisor/SupervisorManagerTest.java | 16 +++++
.../supervisor/SupervisorResourceTest.java | 16 ++---
.../druid/metadata/MetadataSupervisorManager.java | 2 +
.../metadata/SQLMetadataSupervisorManager.java | 74 ++++++++++++++++------
.../metadata/SQLMetadataSupervisorManagerTest.java | 34 ++++++++++
7 files changed, 122 insertions(+), 30 deletions(-)
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
index b638fcf..04e76df 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
@@ -166,6 +166,11 @@ public class SupervisorManager
log.info("SupervisorManager stopped.");
}
+ public List<VersionedSupervisorSpec> getSupervisorHistoryForId(String id)
+ {
+ return metadataSupervisorManager.getAllForId(id);
+ }
+
public Map<String, List<VersionedSupervisorSpec>> getSupervisorHistory()
{
return metadataSupervisorManager.getAll();
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java
index 9104666..26baed3 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java
@@ -393,9 +393,8 @@ public class SupervisorResource
{
return asLeaderWithSupervisorManager(
manager -> {
- Map<String, List<VersionedSupervisorSpec>> supervisorHistory =
manager.getSupervisorHistory();
- Iterable<VersionedSupervisorSpec> historyForId =
supervisorHistory.get(id);
- if (historyForId != null) {
+ List<VersionedSupervisorSpec> historyForId =
manager.getSupervisorHistoryForId(id);
+ if (!historyForId.isEmpty()) {
final List<VersionedSupervisorSpec> authorizedHistoryForId =
Lists.newArrayList(
AuthorizationUtils.filterAuthorizedResources(
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java
index 6b93f84..498b6cf 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java
@@ -20,6 +20,7 @@
package org.apache.druid.indexing.overlord.supervisor;
import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.java.util.common.DateTimes;
@@ -191,6 +192,21 @@ public class SupervisorManagerTest extends EasyMockSupport
}
@Test
+ public void testGetSupervisorHistoryForId()
+ {
+ String id = "test-supervisor-1";
+ List<VersionedSupervisorSpec> supervisorHistory = ImmutableList.of();
+
+
EasyMock.expect(metadataSupervisorManager.getAllForId(id)).andReturn(supervisorHistory);
+ replayAll();
+
+ List<VersionedSupervisorSpec> history =
manager.getSupervisorHistoryForId(id);
+ verifyAll();
+
+ Assert.assertEquals(supervisorHistory, history);
+ }
+
+ @Test
public void testGetSupervisorStatus()
{
SupervisorReport<Void> report = new SupervisorReport<>("id1",
DateTimes.nowUtc(), null);
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java
index 8c77ea0..f3d794a 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java
@@ -914,12 +914,11 @@ public class SupervisorResourceTest extends
EasyMockSupport
"v2"
)
);
- Map<String, List<VersionedSupervisorSpec>> history = new HashMap<>();
- history.put("id1", versions1);
- history.put("id2", versions2);
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager)).times(3);
-
EasyMock.expect(supervisorManager.getSupervisorHistory()).andReturn(history).times(3);
+
EasyMock.expect(supervisorManager.getSupervisorHistoryForId("id1")).andReturn(versions1).times(1);
+
EasyMock.expect(supervisorManager.getSupervisorHistoryForId("id2")).andReturn(versions2).times(1);
+
EasyMock.expect(supervisorManager.getSupervisorHistoryForId("id3")).andReturn(Collections.emptyList()).times(1);
EasyMock.expect(request.getAttribute(AuthConfig.DRUID_ALLOW_UNSECURED_PATH)).andReturn(null).atLeastOnce();
EasyMock.expect(request.getAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED)).andReturn(null).atLeastOnce();
EasyMock.expect(request.getAttribute(AuthConfig.DRUID_AUTHENTICATION_RESULT)).andReturn(
@@ -1011,13 +1010,12 @@ public class SupervisorResourceTest extends
EasyMockSupport
"tombstone"
)
);
- Map<String, List<VersionedSupervisorSpec>> history = new HashMap<>();
- history.put("id1", versions1);
- history.put("id2", versions2);
- history.put("id3", versions3);
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager)).times(4);
-
EasyMock.expect(supervisorManager.getSupervisorHistory()).andReturn(history).times(4);
+
EasyMock.expect(supervisorManager.getSupervisorHistoryForId("id1")).andReturn(versions1).times(1);
+
EasyMock.expect(supervisorManager.getSupervisorHistoryForId("id2")).andReturn(versions2).times(1);
+
EasyMock.expect(supervisorManager.getSupervisorHistoryForId("id3")).andReturn(versions3).times(1);
+
EasyMock.expect(supervisorManager.getSupervisorHistoryForId("id4")).andReturn(Collections.emptyList()).times(1);
EasyMock.expect(request.getAttribute(AuthConfig.DRUID_ALLOW_UNSECURED_PATH)).andReturn(null).atLeastOnce();
EasyMock.expect(request.getAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED)).andReturn(null).atLeastOnce();
EasyMock.expect(request.getAttribute(AuthConfig.DRUID_AUTHENTICATION_RESULT)).andReturn(
diff --git
a/server/src/main/java/org/apache/druid/metadata/MetadataSupervisorManager.java
b/server/src/main/java/org/apache/druid/metadata/MetadataSupervisorManager.java
index 9eb254f..70b25d9 100644
---
a/server/src/main/java/org/apache/druid/metadata/MetadataSupervisorManager.java
+++
b/server/src/main/java/org/apache/druid/metadata/MetadataSupervisorManager.java
@@ -33,6 +33,8 @@ public interface MetadataSupervisorManager
Map<String, List<VersionedSupervisorSpec>> getAll();
+ List<VersionedSupervisorSpec> getAllForId(String id);
+
/**
* Return latest supervisors (both active and terminated)
*
diff --git
a/server/src/main/java/org/apache/druid/metadata/SQLMetadataSupervisorManager.java
b/server/src/main/java/org/apache/druid/metadata/SQLMetadataSupervisorManager.java
index 0480c45..0c69047 100644
---
a/server/src/main/java/org/apache/druid/metadata/SQLMetadataSupervisorManager.java
+++
b/server/src/main/java/org/apache/druid/metadata/SQLMetadataSupervisorManager.java
@@ -24,6 +24,7 @@ import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Supplier;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Inject;
import org.apache.druid.guice.ManageLifecycle;
@@ -133,26 +134,9 @@ public class SQLMetadataSupervisorManager implements
MetadataSupervisorManager
public Pair<String, VersionedSupervisorSpec> map(int
index, ResultSet r, StatementContext ctx)
throws SQLException
{
- SupervisorSpec payload;
- try {
- payload = jsonMapper.readValue(
- r.getBytes("payload"),
- new TypeReference<SupervisorSpec>()
- {
- }
- );
- }
- catch (JsonParseException | JsonMappingException e) {
- log.warn("Failed to deserialize payload for
spec_id[%s]", r.getString("spec_id"));
- payload = null;
- }
- catch (IOException e) {
- throw new RuntimeException(e);
- }
-
return Pair.of(
r.getString("spec_id"),
- new VersionedSupervisorSpec(payload,
r.getString("created_date"))
+ createVersionSupervisorSpecFromResponse(r)
);
}
}
@@ -186,6 +170,60 @@ public class SQLMetadataSupervisorManager implements
MetadataSupervisorManager
}
@Override
+ public List<VersionedSupervisorSpec> getAllForId(String id)
+ {
+ return ImmutableList.copyOf(
+ dbi.withHandle(
+ new HandleCallback<List<VersionedSupervisorSpec>>()
+ {
+ @Override
+ public List<VersionedSupervisorSpec> withHandle(Handle handle)
+ {
+ return handle.createQuery(
+ StringUtils.format(
+ "SELECT id, spec_id, created_date, payload FROM %1$s
WHERE spec_id = :spec_id ORDER BY id DESC",
+ getSupervisorsTable()
+ )
+ ).bind("spec_id", id
+ ).map(
+ new ResultSetMapper<VersionedSupervisorSpec>()
+ {
+ @Override
+ public VersionedSupervisorSpec map(int index, ResultSet
r, StatementContext ctx)
+ throws SQLException
+ {
+ return createVersionSupervisorSpecFromResponse(r);
+ }
+ }
+ ).list();
+ }
+ }
+ )
+ );
+ }
+
+ private VersionedSupervisorSpec
createVersionSupervisorSpecFromResponse(ResultSet r) throws SQLException
+ {
+ SupervisorSpec payload;
+ try {
+ payload = jsonMapper.readValue(
+ r.getBytes("payload"),
+ new TypeReference<SupervisorSpec>()
+ {
+ }
+ );
+ }
+ catch (JsonParseException | JsonMappingException e) {
+ log.warn("Failed to deserialize payload for spec_id[%s]",
r.getString("spec_id"));
+ payload = null;
+ }
+ catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ return new VersionedSupervisorSpec(payload, r.getString("created_date"));
+ }
+
+ @Override
public Map<String, SupervisorSpec> getLatest()
{
return ImmutableMap.copyOf(
diff --git
a/server/src/test/java/org/apache/druid/metadata/SQLMetadataSupervisorManagerTest.java
b/server/src/test/java/org/apache/druid/metadata/SQLMetadataSupervisorManagerTest.java
index 9547387..7ea2696 100644
---
a/server/src/test/java/org/apache/druid/metadata/SQLMetadataSupervisorManagerTest.java
+++
b/server/src/test/java/org/apache/druid/metadata/SQLMetadataSupervisorManagerTest.java
@@ -219,6 +219,40 @@ public class SQLMetadataSupervisorManagerTest
}
@Test
+ public void testInsertAndGetForId()
+ {
+ final String supervisor1 = "test-supervisor-1";
+ final String supervisor2 = "test-supervisor-2";
+ final Map<String, String> data1rev1 = ImmutableMap.of("key1-1",
"value1-1-1", "key1-2", "value1-2-1");
+ final Map<String, String> data1rev2 = ImmutableMap.of("key1-1",
"value1-1-2", "key1-2", "value1-2-2");
+ final Map<String, String> data1rev3 = ImmutableMap.of("key1-1",
"value1-1-3", "key1-2", "value1-2-3");
+ final Map<String, String> data2rev1 = ImmutableMap.of("key2-1",
"value2-1-1", "key2-2", "value2-2-1");
+ final Map<String, String> data2rev2 = ImmutableMap.of("key2-3",
"value2-3-2", "key2-4", "value2-4-2");
+
+ Assert.assertTrue(supervisorManager.getAllForId(supervisor1).isEmpty());
+ Assert.assertTrue(supervisorManager.getAllForId(supervisor2).isEmpty());
+
+ // add 2 supervisors, with revisions
+ supervisorManager.insert(supervisor1, new TestSupervisorSpec(supervisor1,
data1rev1));
+ supervisorManager.insert(supervisor1, new TestSupervisorSpec(supervisor1,
data1rev2));
+ supervisorManager.insert(supervisor1, new TestSupervisorSpec(supervisor1,
data1rev3));
+ supervisorManager.insert(supervisor2, new TestSupervisorSpec(supervisor2,
data2rev1));
+ supervisorManager.insert(supervisor2, new TestSupervisorSpec(supervisor2,
data2rev2));
+
+ List<VersionedSupervisorSpec> supervisor1Specs =
supervisorManager.getAllForId(supervisor1);
+ List<VersionedSupervisorSpec> supervisor2Specs =
supervisorManager.getAllForId(supervisor2);
+
+ Assert.assertEquals(3, supervisor1Specs.size());
+ Assert.assertEquals(2, supervisor2Specs.size());
+ // make sure getAll() returns each spec in descending order
+ Assert.assertEquals(data1rev3, ((TestSupervisorSpec)
supervisor1Specs.get(0).getSpec()).getData());
+ Assert.assertEquals(data1rev2, ((TestSupervisorSpec)
supervisor1Specs.get(1).getSpec()).getData());
+ Assert.assertEquals(data1rev1, ((TestSupervisorSpec)
supervisor1Specs.get(2).getSpec()).getData());
+ Assert.assertEquals(data2rev2, ((TestSupervisorSpec)
supervisor2Specs.get(0).getSpec()).getData());
+ Assert.assertEquals(data2rev1, ((TestSupervisorSpec)
supervisor2Specs.get(1).getSpec()).getData());
+ }
+
+ @Test
public void testSkipDeserializingBadSpecs()
{
final String supervisor1 = "test-supervisor-1";
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]