This is an automated email from the ASF dual-hosted git repository.

abhishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 7d48414  Optimize supervisor history retrieval for specific id (#11807)
7d48414 is described below

commit 7d4841471fd271ac24edde376f234abd00010233
Author: David Bar <[email protected]>
AuthorDate: Tue Oct 19 11:38:25 2021 +0300

    Optimize supervisor history retrieval for specific id (#11807)
    
    Optimization. Fetch from the metadata store only the relevant history items 
for the requested supervisor id.
---
 .../overlord/supervisor/SupervisorManager.java     |  5 ++
 .../overlord/supervisor/SupervisorResource.java    |  5 +-
 .../overlord/supervisor/SupervisorManagerTest.java | 16 +++++
 .../supervisor/SupervisorResourceTest.java         | 16 ++---
 .../druid/metadata/MetadataSupervisorManager.java  |  2 +
 .../metadata/SQLMetadataSupervisorManager.java     | 74 ++++++++++++++++------
 .../metadata/SQLMetadataSupervisorManagerTest.java | 34 ++++++++++
 7 files changed, 122 insertions(+), 30 deletions(-)

diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
index b638fcf..04e76df 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
@@ -166,6 +166,11 @@ public class SupervisorManager
     log.info("SupervisorManager stopped.");
   }
 
+  public List<VersionedSupervisorSpec> getSupervisorHistoryForId(String id)
+  {
+    return metadataSupervisorManager.getAllForId(id);
+  }
+
   public Map<String, List<VersionedSupervisorSpec>> getSupervisorHistory()
   {
     return metadataSupervisorManager.getAll();
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java
index 9104666..26baed3 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java
@@ -393,9 +393,8 @@ public class SupervisorResource
   {
     return asLeaderWithSupervisorManager(
         manager -> {
-          Map<String, List<VersionedSupervisorSpec>> supervisorHistory = 
manager.getSupervisorHistory();
-          Iterable<VersionedSupervisorSpec> historyForId = 
supervisorHistory.get(id);
-          if (historyForId != null) {
+          List<VersionedSupervisorSpec> historyForId = 
manager.getSupervisorHistoryForId(id);
+          if (!historyForId.isEmpty()) {
             final List<VersionedSupervisorSpec> authorizedHistoryForId =
                 Lists.newArrayList(
                     AuthorizationUtils.filterAuthorizedResources(
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java
index 6b93f84..498b6cf 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java
@@ -20,6 +20,7 @@
 package org.apache.druid.indexing.overlord.supervisor;
 
 import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import org.apache.druid.indexing.overlord.DataSourceMetadata;
 import org.apache.druid.java.util.common.DateTimes;
@@ -191,6 +192,21 @@ public class SupervisorManagerTest extends EasyMockSupport
   }
 
   @Test
+  public void testGetSupervisorHistoryForId()
+  {
+    String id = "test-supervisor-1";
+    List<VersionedSupervisorSpec> supervisorHistory = ImmutableList.of();
+
+    
EasyMock.expect(metadataSupervisorManager.getAllForId(id)).andReturn(supervisorHistory);
+    replayAll();
+
+    List<VersionedSupervisorSpec> history = 
manager.getSupervisorHistoryForId(id);
+    verifyAll();
+
+    Assert.assertEquals(supervisorHistory, history);
+  }
+
+  @Test
   public void testGetSupervisorStatus()
   {
     SupervisorReport<Void> report = new SupervisorReport<>("id1", 
DateTimes.nowUtc(), null);
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java
index 8c77ea0..f3d794a 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java
@@ -914,12 +914,11 @@ public class SupervisorResourceTest extends 
EasyMockSupport
             "v2"
         )
     );
-    Map<String, List<VersionedSupervisorSpec>> history = new HashMap<>();
-    history.put("id1", versions1);
-    history.put("id2", versions2);
 
     
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager)).times(3);
-    
EasyMock.expect(supervisorManager.getSupervisorHistory()).andReturn(history).times(3);
+    
EasyMock.expect(supervisorManager.getSupervisorHistoryForId("id1")).andReturn(versions1).times(1);
+    
EasyMock.expect(supervisorManager.getSupervisorHistoryForId("id2")).andReturn(versions2).times(1);
+    
EasyMock.expect(supervisorManager.getSupervisorHistoryForId("id3")).andReturn(Collections.emptyList()).times(1);
     
EasyMock.expect(request.getAttribute(AuthConfig.DRUID_ALLOW_UNSECURED_PATH)).andReturn(null).atLeastOnce();
     
EasyMock.expect(request.getAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED)).andReturn(null).atLeastOnce();
     
EasyMock.expect(request.getAttribute(AuthConfig.DRUID_AUTHENTICATION_RESULT)).andReturn(
@@ -1011,13 +1010,12 @@ public class SupervisorResourceTest extends 
EasyMockSupport
             "tombstone"
         )
     );
-    Map<String, List<VersionedSupervisorSpec>> history = new HashMap<>();
-    history.put("id1", versions1);
-    history.put("id2", versions2);
-    history.put("id3", versions3);
 
     
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager)).times(4);
-    
EasyMock.expect(supervisorManager.getSupervisorHistory()).andReturn(history).times(4);
+    
EasyMock.expect(supervisorManager.getSupervisorHistoryForId("id1")).andReturn(versions1).times(1);
+    
EasyMock.expect(supervisorManager.getSupervisorHistoryForId("id2")).andReturn(versions2).times(1);
+    
EasyMock.expect(supervisorManager.getSupervisorHistoryForId("id3")).andReturn(versions3).times(1);
+    
EasyMock.expect(supervisorManager.getSupervisorHistoryForId("id4")).andReturn(Collections.emptyList()).times(1);
     
EasyMock.expect(request.getAttribute(AuthConfig.DRUID_ALLOW_UNSECURED_PATH)).andReturn(null).atLeastOnce();
     
EasyMock.expect(request.getAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED)).andReturn(null).atLeastOnce();
     
EasyMock.expect(request.getAttribute(AuthConfig.DRUID_AUTHENTICATION_RESULT)).andReturn(
diff --git 
a/server/src/main/java/org/apache/druid/metadata/MetadataSupervisorManager.java 
b/server/src/main/java/org/apache/druid/metadata/MetadataSupervisorManager.java
index 9eb254f..70b25d9 100644
--- 
a/server/src/main/java/org/apache/druid/metadata/MetadataSupervisorManager.java
+++ 
b/server/src/main/java/org/apache/druid/metadata/MetadataSupervisorManager.java
@@ -33,6 +33,8 @@ public interface MetadataSupervisorManager
 
   Map<String, List<VersionedSupervisorSpec>> getAll();
 
+  List<VersionedSupervisorSpec> getAllForId(String id);
+
   /**
    * Return latest supervisors (both active and terminated)
    *
diff --git 
a/server/src/main/java/org/apache/druid/metadata/SQLMetadataSupervisorManager.java
 
b/server/src/main/java/org/apache/druid/metadata/SQLMetadataSupervisorManager.java
index 0480c45..0c69047 100644
--- 
a/server/src/main/java/org/apache/druid/metadata/SQLMetadataSupervisorManager.java
+++ 
b/server/src/main/java/org/apache/druid/metadata/SQLMetadataSupervisorManager.java
@@ -24,6 +24,7 @@ import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.JsonMappingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Supplier;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.inject.Inject;
 import org.apache.druid.guice.ManageLifecycle;
@@ -133,26 +134,9 @@ public class SQLMetadataSupervisorManager implements 
MetadataSupervisorManager
                       public Pair<String, VersionedSupervisorSpec> map(int 
index, ResultSet r, StatementContext ctx)
                           throws SQLException
                       {
-                        SupervisorSpec payload;
-                        try {
-                          payload = jsonMapper.readValue(
-                              r.getBytes("payload"),
-                              new TypeReference<SupervisorSpec>()
-                              {
-                              }
-                          );
-                        }
-                        catch (JsonParseException | JsonMappingException e) {
-                          log.warn("Failed to deserialize payload for 
spec_id[%s]", r.getString("spec_id"));
-                          payload = null;
-                        }
-                        catch (IOException e) {
-                          throw new RuntimeException(e);
-                        }
-
                         return Pair.of(
                             r.getString("spec_id"),
-                            new VersionedSupervisorSpec(payload, 
r.getString("created_date"))
+                            createVersionSupervisorSpecFromResponse(r)
                         );
                       }
                     }
@@ -186,6 +170,60 @@ public class SQLMetadataSupervisorManager implements 
MetadataSupervisorManager
   }
 
   @Override
+  public List<VersionedSupervisorSpec> getAllForId(String id)
+  {
+    return ImmutableList.copyOf(
+        dbi.withHandle(
+            new HandleCallback<List<VersionedSupervisorSpec>>()
+            {
+              @Override
+              public List<VersionedSupervisorSpec> withHandle(Handle handle)
+              {
+                return handle.createQuery(
+                    StringUtils.format(
+                        "SELECT id, spec_id, created_date, payload FROM %1$s 
WHERE spec_id = :spec_id ORDER BY id DESC",
+                        getSupervisorsTable()
+                    )
+                ).bind("spec_id", id
+                ).map(
+                    new ResultSetMapper<VersionedSupervisorSpec>()
+                    {
+                      @Override
+                      public VersionedSupervisorSpec map(int index, ResultSet 
r, StatementContext ctx)
+                          throws SQLException
+                      {
+                        return createVersionSupervisorSpecFromResponse(r);
+                      }
+                    }
+                ).list();
+              }
+            }
+        )
+    );
+  }
+
+  private VersionedSupervisorSpec 
createVersionSupervisorSpecFromResponse(ResultSet r) throws SQLException
+  {
+    SupervisorSpec payload;
+    try {
+      payload = jsonMapper.readValue(
+          r.getBytes("payload"),
+          new TypeReference<SupervisorSpec>()
+          {
+          }
+      );
+    }
+    catch (JsonParseException | JsonMappingException e) {
+      log.warn("Failed to deserialize payload for spec_id[%s]", 
r.getString("spec_id"));
+      payload = null;
+    }
+    catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    return new VersionedSupervisorSpec(payload, r.getString("created_date"));
+  }
+
+  @Override
   public Map<String, SupervisorSpec> getLatest()
   {
     return ImmutableMap.copyOf(
diff --git 
a/server/src/test/java/org/apache/druid/metadata/SQLMetadataSupervisorManagerTest.java
 
b/server/src/test/java/org/apache/druid/metadata/SQLMetadataSupervisorManagerTest.java
index 9547387..7ea2696 100644
--- 
a/server/src/test/java/org/apache/druid/metadata/SQLMetadataSupervisorManagerTest.java
+++ 
b/server/src/test/java/org/apache/druid/metadata/SQLMetadataSupervisorManagerTest.java
@@ -219,6 +219,40 @@ public class SQLMetadataSupervisorManagerTest
   }
 
   @Test
+  public void testInsertAndGetForId()
+  {
+    final String supervisor1 = "test-supervisor-1";
+    final String supervisor2 = "test-supervisor-2";
+    final Map<String, String> data1rev1 = ImmutableMap.of("key1-1", 
"value1-1-1", "key1-2", "value1-2-1");
+    final Map<String, String> data1rev2 = ImmutableMap.of("key1-1", 
"value1-1-2", "key1-2", "value1-2-2");
+    final Map<String, String> data1rev3 = ImmutableMap.of("key1-1", 
"value1-1-3", "key1-2", "value1-2-3");
+    final Map<String, String> data2rev1 = ImmutableMap.of("key2-1", 
"value2-1-1", "key2-2", "value2-2-1");
+    final Map<String, String> data2rev2 = ImmutableMap.of("key2-3", 
"value2-3-2", "key2-4", "value2-4-2");
+
+    Assert.assertTrue(supervisorManager.getAllForId(supervisor1).isEmpty());
+    Assert.assertTrue(supervisorManager.getAllForId(supervisor2).isEmpty());
+
+    // add 2 supervisors, with revisions
+    supervisorManager.insert(supervisor1, new TestSupervisorSpec(supervisor1, 
data1rev1));
+    supervisorManager.insert(supervisor1, new TestSupervisorSpec(supervisor1, 
data1rev2));
+    supervisorManager.insert(supervisor1, new TestSupervisorSpec(supervisor1, 
data1rev3));
+    supervisorManager.insert(supervisor2, new TestSupervisorSpec(supervisor2, 
data2rev1));
+    supervisorManager.insert(supervisor2, new TestSupervisorSpec(supervisor2, 
data2rev2));
+
+    List<VersionedSupervisorSpec> supervisor1Specs = 
supervisorManager.getAllForId(supervisor1);
+    List<VersionedSupervisorSpec> supervisor2Specs = 
supervisorManager.getAllForId(supervisor2);
+
+    Assert.assertEquals(3, supervisor1Specs.size());
+    Assert.assertEquals(2, supervisor2Specs.size());
+    // make sure getAll() returns each spec in descending order
+    Assert.assertEquals(data1rev3, ((TestSupervisorSpec) 
supervisor1Specs.get(0).getSpec()).getData());
+    Assert.assertEquals(data1rev2, ((TestSupervisorSpec) 
supervisor1Specs.get(1).getSpec()).getData());
+    Assert.assertEquals(data1rev1, ((TestSupervisorSpec) 
supervisor1Specs.get(2).getSpec()).getData());
+    Assert.assertEquals(data2rev2, ((TestSupervisorSpec) 
supervisor2Specs.get(0).getSpec()).getData());
+    Assert.assertEquals(data2rev1, ((TestSupervisorSpec) 
supervisor2Specs.get(1).getSpec()).getData());
+  }
+
+  @Test
   public void testSkipDeserializingBadSpecs()
   {
     final String supervisor1 = "test-supervisor-1";

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to