This is an automated email from the ASF dual-hosted git repository.

kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new fffb2e4fe7 Speed up SQLMetadataStorageActionHandlerTest (#14856)
fffb2e4fe7 is described below

commit fffb2e4fe72c9c8a0e59d4a191a1de8f6763df88
Author: Kashif Faraz <[email protected]>
AuthorDate: Thu Aug 17 18:02:43 2023 +0530

    Speed up SQLMetadataStorageActionHandlerTest (#14856)
    
    Changes
    - Reduce test time of `SQLMetadataStorageActionHandlerTest.testMigration`
    - Slightly modify log messages to adhere to Druid style
---
 .../metadata/SQLMetadataStorageActionHandler.java  | 49 ++++++++++++----------
 .../SQLMetadataStorageActionHandlerTest.java       | 44 +++++++++----------
 2 files changed, 46 insertions(+), 47 deletions(-)

diff --git 
a/server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java
 
b/server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java
index 8d84432ff1..9fbc79f273 100644
--- 
a/server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java
+++ 
b/server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java
@@ -62,6 +62,7 @@ import java.util.Map.Entry;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.stream.IntStream;
 
 public abstract class SQLMetadataStorageActionHandler<EntryType, StatusType, 
LogType, LockType>
     implements MetadataStorageActionHandler<EntryType, StatusType, LogType, 
LockType>
@@ -1032,16 +1033,14 @@ public abstract class 
SQLMetadataStorageActionHandler<EntryType, StatusType, Log
                             .orElse(null);
   }
 
-  private List<TaskIdentifier> fetchTaskMetadatas(String tableName, String id, 
int limit)
+  private List<TaskIdentifier> 
fetchTasksWithTypeColumnNullAndIdGreaterThan(String id, int limit)
   {
     List<TaskIdentifier> taskIdentifiers = new ArrayList<>();
     connector.retryWithHandle(
         handle -> {
           String sql = StringUtils.format(
               "SELECT * FROM %1$s WHERE id > '%2$s' AND type IS null ORDER BY 
id %3$s",
-              tableName,
-              id,
-              connector.limitClause(limit)
+              entryTable, id, connector.limitClause(limit)
           );
           Query<Map<String, Object>> query = handle.createQuery(sql);
           taskIdentifiers.addAll(query.map(taskIdentifierMapper).list());
@@ -1051,19 +1050,21 @@ public abstract class 
SQLMetadataStorageActionHandler<EntryType, StatusType, Log
     return taskIdentifiers;
   }
 
-  private void updateTaskMetadatas(String tasksTable, List<TaskIdentifier> 
taskIdentifiers)
+  private int updateColumnsTypeAndGroupIdForTasks(List<TaskIdentifier> 
taskIdentifiers)
   {
-    connector.retryWithHandle(
+    return connector.retryWithHandle(
         handle -> {
-          Batch batch = handle.createBatch();
-          String sql = "UPDATE %1$s SET type = '%2$s', group_id = '%3$s' WHERE 
id = '%4$s'";
-          for (TaskIdentifier metadata : taskIdentifiers) {
+          final Batch batch = handle.createBatch();
+          for (TaskIdentifier task : taskIdentifiers) {
             batch.add(
-                StringUtils.format(sql, tasksTable, metadata.getType(), 
metadata.getGroupId(), metadata.getId())
+                StringUtils.format(
+                    "UPDATE %1$s SET type = '%2$s', group_id = '%3$s' WHERE id 
= '%4$s'",
+                    entryTable, task.getType(), task.getGroupId(), task.getId()
+                )
             );
           }
-          batch.execute();
-          return null;
+          int[] result = batch.execute();
+          return IntStream.of(result).sum();
         }
     );
   }
@@ -1083,14 +1084,14 @@ public abstract class 
SQLMetadataStorageActionHandler<EntryType, StatusType, Log
   @VisibleForTesting
   boolean populateTaskTypeAndGroupId()
   {
-    log.info("Populate fields task and group_id of task entry table [%s] from 
payload", entryTable);
-    String id = "";
-    int limit = 100;
-    int count = 0;
+    log.debug("Populating columns [task] and [group_id] in task table[%s] from 
payload.", entryTable);
+    String lastUpdatedTaskId = "";
+    final int limit = 100;
+    int numUpdatedTasks = 0;
     while (true) {
       List<TaskIdentifier> taskIdentifiers;
       try {
-        taskIdentifiers = fetchTaskMetadatas(entryTable, id, limit);
+        taskIdentifiers = 
fetchTasksWithTypeColumnNullAndIdGreaterThan(lastUpdatedTaskId, limit);
       }
       catch (Exception e) {
         log.warn(e, "Task migration failed while reading entries from task 
table");
@@ -1100,15 +1101,17 @@ public abstract class 
SQLMetadataStorageActionHandler<EntryType, StatusType, Log
         break;
       }
       try {
-        updateTaskMetadatas(entryTable, taskIdentifiers);
-        count += taskIdentifiers.size();
-        log.info("Successfully updated type and groupId for [%d] tasks", 
count);
+        final int updatedCount = 
updateColumnsTypeAndGroupIdForTasks(taskIdentifiers);
+        if (updatedCount > 0) {
+          numUpdatedTasks += updatedCount;
+          log.info("Successfully updated columns [type] and [group_id] for 
[%d] tasks.", numUpdatedTasks);
+        }
       }
       catch (Exception e) {
         log.warn(e, "Task migration failed while updating entries in task 
table");
         return false;
       }
-      id = taskIdentifiers.get(taskIdentifiers.size() - 1).getId();
+      lastUpdatedTaskId = taskIdentifiers.get(taskIdentifiers.size() - 
1).getId();
 
       try {
         Thread.sleep(1000);
@@ -1118,7 +1121,9 @@ public abstract class 
SQLMetadataStorageActionHandler<EntryType, StatusType, Log
         Thread.currentThread().interrupt();
       }
     }
-    log.info("Task migration for table [%s] successful", entryTable);
+    if (numUpdatedTasks > 0) {
+      log.info("Task migration for table[%s] successful.", entryTable);
+    }
     return true;
   }
 }
diff --git 
a/server/src/test/java/org/apache/druid/metadata/SQLMetadataStorageActionHandlerTest.java
 
b/server/src/test/java/org/apache/druid/metadata/SQLMetadataStorageActionHandlerTest.java
index 2ade4f9601..cee24d314e 100644
--- 
a/server/src/test/java/org/apache/druid/metadata/SQLMetadataStorageActionHandlerTest.java
+++ 
b/server/src/test/java/org/apache/druid/metadata/SQLMetadataStorageActionHandlerTest.java
@@ -43,7 +43,6 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
-import org.junit.rules.ExpectedException;
 
 import java.sql.ResultSet;
 import java.util.HashMap;
@@ -59,12 +58,7 @@ public class SQLMetadataStorageActionHandlerTest
   @Rule
   public final TestDerbyConnector.DerbyConnectorRule derbyConnectorRule = new 
TestDerbyConnector.DerbyConnectorRule();
 
-  @Rule
-  public final ExpectedException thrown = ExpectedException.none();
-
   private static final ObjectMapper JSON_MAPPER = new DefaultObjectMapper();
-
-
   private static final Random RANDOM = new Random(1);
 
   private SQLMetadataStorageActionHandler<Map<String, Object>, Map<String, 
Object>, Map<String, String>, Map<String, Object>> handler;
@@ -256,16 +250,17 @@ public class SQLMetadataStorageActionHandlerTest
   }
 
   @Test(timeout = 60_000L)
-  public void testRepeatInsert()
+  public void testDuplicateInsertThrowsEntryExistsException()
   {
     final String entryId = "abcd";
     Map<String, Object> entry = ImmutableMap.of("a", 1);
     Map<String, Object> status = ImmutableMap.of("count", 42);
 
     handler.insert(entryId, DateTimes.of("2014-01-01"), "test", entry, true, 
status, "type", "group");
-
-    thrown.expect(EntryExistsException.class);
-    handler.insert(entryId, DateTimes.of("2014-01-01"), "test", entry, true, 
status, "type", "group");
+    Assert.assertThrows(
+        EntryExistsException.class,
+        () -> handler.insert(entryId, DateTimes.of("2014-01-01"), "test", 
entry, true, status, "type", "group")
+    );
   }
 
   @Test
@@ -469,17 +464,17 @@ public class SQLMetadataStorageActionHandlerTest
   @Test
   public void testMigration()
   {
-    int active = 1234;
-    for (int i = 0; i < active; i++) {
-      insertTaskInfo(createRandomTaskInfo(true), false);
+    int numActiveTasks = 123;
+    for (int i = 0; i < numActiveTasks; i++) {
+      insertTaskInfo(createRandomTaskInfo(TaskState.RUNNING), false);
     }
 
-    int completed = 2345;
-    for (int i = 0; i < completed; i++) {
-      insertTaskInfo(createRandomTaskInfo(false), false);
+    int numCompletedTasks = 101;
+    for (int i = 0; i < numCompletedTasks; i++) {
+      insertTaskInfo(createRandomTaskInfo(TaskState.SUCCESS), false);
     }
 
-    Assert.assertEquals(active + completed, 
getUnmigratedTaskCount().intValue());
+    Assert.assertEquals(numActiveTasks + numCompletedTasks, 
getUnmigratedTaskCount().intValue());
 
     handler.populateTaskTypeAndGroupId();
 
@@ -490,16 +485,16 @@ public class SQLMetadataStorageActionHandlerTest
   public void testGetTaskStatusPlusListInternal()
   {
     // SETUP
-    TaskInfo<Map<String, Object>, Map<String, Object>> activeUnaltered = 
createRandomTaskInfo(true);
+    TaskInfo<Map<String, Object>, Map<String, Object>> activeUnaltered = 
createRandomTaskInfo(TaskState.RUNNING);
     insertTaskInfo(activeUnaltered, false);
 
-    TaskInfo<Map<String, Object>, Map<String, Object>> completedUnaltered = 
createRandomTaskInfo(false);
+    TaskInfo<Map<String, Object>, Map<String, Object>> completedUnaltered = 
createRandomTaskInfo(TaskState.SUCCESS);
     insertTaskInfo(completedUnaltered, false);
 
-    TaskInfo<Map<String, Object>, Map<String, Object>> activeAltered = 
createRandomTaskInfo(true);
+    TaskInfo<Map<String, Object>, Map<String, Object>> activeAltered = 
createRandomTaskInfo(TaskState.RUNNING);
     insertTaskInfo(activeAltered, true);
 
-    TaskInfo<Map<String, Object>, Map<String, Object>> completedAltered = 
createRandomTaskInfo(false);
+    TaskInfo<Map<String, Object>, Map<String, Object>> completedAltered = 
createRandomTaskInfo(TaskState.SUCCESS);
     insertTaskInfo(completedAltered, true);
 
     Map<TaskLookup.TaskLookupType, TaskLookup> taskLookups = new HashMap<>();
@@ -561,7 +556,7 @@ public class SQLMetadataStorageActionHandlerTest
     );
   }
 
-  private TaskInfo<Map<String, Object>, Map<String, Object>> 
createRandomTaskInfo(boolean active)
+  private TaskInfo<Map<String, Object>, Map<String, Object>> 
createRandomTaskInfo(TaskState taskState)
   {
     String id = UUID.randomUUID().toString();
     DateTime createdTime = DateTime.now(DateTimeZone.UTC);
@@ -576,7 +571,7 @@ public class SQLMetadataStorageActionHandlerTest
 
     Map<String, Object> status = new HashMap<>();
     status.put("id", id);
-    status.put("status", active ? TaskState.RUNNING : TaskState.SUCCESS);
+    status.put("status", taskState);
     status.put("duration", RANDOM.nextLong());
     status.put("location", TaskLocation.create(UUID.randomUUID().toString(), 
8080, 995));
     status.put("errorMsg", UUID.randomUUID().toString());
@@ -590,8 +585,7 @@ public class SQLMetadataStorageActionHandlerTest
     );
   }
 
-  private void insertTaskInfo(TaskInfo<Map<String, Object>, Map<String, 
Object>> taskInfo,
-                              boolean altered)
+  private void insertTaskInfo(TaskInfo<Map<String, Object>, Map<String, 
Object>> taskInfo, boolean altered)
   {
     try {
       handler.insert(


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to