This is an automated email from the ASF dual-hosted git repository.
kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new fffb2e4fe7 Speed up SQLMetadataStorageActionHandlerTest (#14856)
fffb2e4fe7 is described below
commit fffb2e4fe72c9c8a0e59d4a191a1de8f6763df88
Author: Kashif Faraz <[email protected]>
AuthorDate: Thu Aug 17 18:02:43 2023 +0530
Speed up SQLMetadataStorageActionHandlerTest (#14856)
Changes
- Reduce test time of `SQLMetadataStorageActionHandlerTest.testMigration`
- Slightly modify log messages to adhere to Druid style
---
.../metadata/SQLMetadataStorageActionHandler.java | 49 ++++++++++++----------
.../SQLMetadataStorageActionHandlerTest.java | 44 +++++++++----------
2 files changed, 46 insertions(+), 47 deletions(-)
diff --git
a/server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java
b/server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java
index 8d84432ff1..9fbc79f273 100644
---
a/server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java
+++
b/server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java
@@ -62,6 +62,7 @@ import java.util.Map.Entry;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.stream.IntStream;
public abstract class SQLMetadataStorageActionHandler<EntryType, StatusType,
LogType, LockType>
implements MetadataStorageActionHandler<EntryType, StatusType, LogType,
LockType>
@@ -1032,16 +1033,14 @@ public abstract class
SQLMetadataStorageActionHandler<EntryType, StatusType, Log
.orElse(null);
}
- private List<TaskIdentifier> fetchTaskMetadatas(String tableName, String id,
int limit)
+ private List<TaskIdentifier>
fetchTasksWithTypeColumnNullAndIdGreaterThan(String id, int limit)
{
List<TaskIdentifier> taskIdentifiers = new ArrayList<>();
connector.retryWithHandle(
handle -> {
String sql = StringUtils.format(
"SELECT * FROM %1$s WHERE id > '%2$s' AND type IS null ORDER BY
id %3$s",
- tableName,
- id,
- connector.limitClause(limit)
+ entryTable, id, connector.limitClause(limit)
);
Query<Map<String, Object>> query = handle.createQuery(sql);
taskIdentifiers.addAll(query.map(taskIdentifierMapper).list());
@@ -1051,19 +1050,21 @@ public abstract class
SQLMetadataStorageActionHandler<EntryType, StatusType, Log
return taskIdentifiers;
}
- private void updateTaskMetadatas(String tasksTable, List<TaskIdentifier>
taskIdentifiers)
+ private int updateColumnsTypeAndGroupIdForTasks(List<TaskIdentifier>
taskIdentifiers)
{
- connector.retryWithHandle(
+ return connector.retryWithHandle(
handle -> {
- Batch batch = handle.createBatch();
- String sql = "UPDATE %1$s SET type = '%2$s', group_id = '%3$s' WHERE
id = '%4$s'";
- for (TaskIdentifier metadata : taskIdentifiers) {
+ final Batch batch = handle.createBatch();
+ for (TaskIdentifier task : taskIdentifiers) {
batch.add(
- StringUtils.format(sql, tasksTable, metadata.getType(),
metadata.getGroupId(), metadata.getId())
+ StringUtils.format(
+ "UPDATE %1$s SET type = '%2$s', group_id = '%3$s' WHERE id
= '%4$s'",
+ entryTable, task.getType(), task.getGroupId(), task.getId()
+ )
);
}
- batch.execute();
- return null;
+ int[] result = batch.execute();
+ return IntStream.of(result).sum();
}
);
}
@@ -1083,14 +1084,14 @@ public abstract class
SQLMetadataStorageActionHandler<EntryType, StatusType, Log
@VisibleForTesting
boolean populateTaskTypeAndGroupId()
{
- log.info("Populate fields task and group_id of task entry table [%s] from
payload", entryTable);
- String id = "";
- int limit = 100;
- int count = 0;
+ log.debug("Populating columns [task] and [group_id] in task table[%s] from
payload.", entryTable);
+ String lastUpdatedTaskId = "";
+ final int limit = 100;
+ int numUpdatedTasks = 0;
while (true) {
List<TaskIdentifier> taskIdentifiers;
try {
- taskIdentifiers = fetchTaskMetadatas(entryTable, id, limit);
+ taskIdentifiers =
fetchTasksWithTypeColumnNullAndIdGreaterThan(lastUpdatedTaskId, limit);
}
catch (Exception e) {
log.warn(e, "Task migration failed while reading entries from task
table");
@@ -1100,15 +1101,17 @@ public abstract class
SQLMetadataStorageActionHandler<EntryType, StatusType, Log
break;
}
try {
- updateTaskMetadatas(entryTable, taskIdentifiers);
- count += taskIdentifiers.size();
- log.info("Successfully updated type and groupId for [%d] tasks",
count);
+ final int updatedCount =
updateColumnsTypeAndGroupIdForTasks(taskIdentifiers);
+ if (updatedCount > 0) {
+ numUpdatedTasks += updatedCount;
+ log.info("Successfully updated columns [type] and [group_id] for
[%d] tasks.", numUpdatedTasks);
+ }
}
catch (Exception e) {
log.warn(e, "Task migration failed while updating entries in task
table");
return false;
}
- id = taskIdentifiers.get(taskIdentifiers.size() - 1).getId();
+ lastUpdatedTaskId = taskIdentifiers.get(taskIdentifiers.size() -
1).getId();
try {
Thread.sleep(1000);
@@ -1118,7 +1121,9 @@ public abstract class
SQLMetadataStorageActionHandler<EntryType, StatusType, Log
Thread.currentThread().interrupt();
}
}
- log.info("Task migration for table [%s] successful", entryTable);
+ if (numUpdatedTasks > 0) {
+ log.info("Task migration for table[%s] successful.", entryTable);
+ }
return true;
}
}
diff --git
a/server/src/test/java/org/apache/druid/metadata/SQLMetadataStorageActionHandlerTest.java
b/server/src/test/java/org/apache/druid/metadata/SQLMetadataStorageActionHandlerTest.java
index 2ade4f9601..cee24d314e 100644
---
a/server/src/test/java/org/apache/druid/metadata/SQLMetadataStorageActionHandlerTest.java
+++
b/server/src/test/java/org/apache/druid/metadata/SQLMetadataStorageActionHandlerTest.java
@@ -43,7 +43,6 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
-import org.junit.rules.ExpectedException;
import java.sql.ResultSet;
import java.util.HashMap;
@@ -59,12 +58,7 @@ public class SQLMetadataStorageActionHandlerTest
@Rule
public final TestDerbyConnector.DerbyConnectorRule derbyConnectorRule = new
TestDerbyConnector.DerbyConnectorRule();
- @Rule
- public final ExpectedException thrown = ExpectedException.none();
-
private static final ObjectMapper JSON_MAPPER = new DefaultObjectMapper();
-
-
private static final Random RANDOM = new Random(1);
private SQLMetadataStorageActionHandler<Map<String, Object>, Map<String,
Object>, Map<String, String>, Map<String, Object>> handler;
@@ -256,16 +250,17 @@ public class SQLMetadataStorageActionHandlerTest
}
@Test(timeout = 60_000L)
- public void testRepeatInsert()
+ public void testDuplicateInsertThrowsEntryExistsException()
{
final String entryId = "abcd";
Map<String, Object> entry = ImmutableMap.of("a", 1);
Map<String, Object> status = ImmutableMap.of("count", 42);
handler.insert(entryId, DateTimes.of("2014-01-01"), "test", entry, true,
status, "type", "group");
-
- thrown.expect(EntryExistsException.class);
- handler.insert(entryId, DateTimes.of("2014-01-01"), "test", entry, true,
status, "type", "group");
+ Assert.assertThrows(
+ EntryExistsException.class,
+ () -> handler.insert(entryId, DateTimes.of("2014-01-01"), "test",
entry, true, status, "type", "group")
+ );
}
@Test
@@ -469,17 +464,17 @@ public class SQLMetadataStorageActionHandlerTest
@Test
public void testMigration()
{
- int active = 1234;
- for (int i = 0; i < active; i++) {
- insertTaskInfo(createRandomTaskInfo(true), false);
+ int numActiveTasks = 123;
+ for (int i = 0; i < numActiveTasks; i++) {
+ insertTaskInfo(createRandomTaskInfo(TaskState.RUNNING), false);
}
- int completed = 2345;
- for (int i = 0; i < completed; i++) {
- insertTaskInfo(createRandomTaskInfo(false), false);
+ int numCompletedTasks = 101;
+ for (int i = 0; i < numCompletedTasks; i++) {
+ insertTaskInfo(createRandomTaskInfo(TaskState.SUCCESS), false);
}
- Assert.assertEquals(active + completed,
getUnmigratedTaskCount().intValue());
+ Assert.assertEquals(numActiveTasks + numCompletedTasks,
getUnmigratedTaskCount().intValue());
handler.populateTaskTypeAndGroupId();
@@ -490,16 +485,16 @@ public class SQLMetadataStorageActionHandlerTest
public void testGetTaskStatusPlusListInternal()
{
// SETUP
- TaskInfo<Map<String, Object>, Map<String, Object>> activeUnaltered =
createRandomTaskInfo(true);
+ TaskInfo<Map<String, Object>, Map<String, Object>> activeUnaltered =
createRandomTaskInfo(TaskState.RUNNING);
insertTaskInfo(activeUnaltered, false);
- TaskInfo<Map<String, Object>, Map<String, Object>> completedUnaltered =
createRandomTaskInfo(false);
+ TaskInfo<Map<String, Object>, Map<String, Object>> completedUnaltered =
createRandomTaskInfo(TaskState.SUCCESS);
insertTaskInfo(completedUnaltered, false);
- TaskInfo<Map<String, Object>, Map<String, Object>> activeAltered =
createRandomTaskInfo(true);
+ TaskInfo<Map<String, Object>, Map<String, Object>> activeAltered =
createRandomTaskInfo(TaskState.RUNNING);
insertTaskInfo(activeAltered, true);
- TaskInfo<Map<String, Object>, Map<String, Object>> completedAltered =
createRandomTaskInfo(false);
+ TaskInfo<Map<String, Object>, Map<String, Object>> completedAltered =
createRandomTaskInfo(TaskState.SUCCESS);
insertTaskInfo(completedAltered, true);
Map<TaskLookup.TaskLookupType, TaskLookup> taskLookups = new HashMap<>();
@@ -561,7 +556,7 @@ public class SQLMetadataStorageActionHandlerTest
);
}
- private TaskInfo<Map<String, Object>, Map<String, Object>>
createRandomTaskInfo(boolean active)
+ private TaskInfo<Map<String, Object>, Map<String, Object>>
createRandomTaskInfo(TaskState taskState)
{
String id = UUID.randomUUID().toString();
DateTime createdTime = DateTime.now(DateTimeZone.UTC);
@@ -576,7 +571,7 @@ public class SQLMetadataStorageActionHandlerTest
Map<String, Object> status = new HashMap<>();
status.put("id", id);
- status.put("status", active ? TaskState.RUNNING : TaskState.SUCCESS);
+ status.put("status", taskState);
status.put("duration", RANDOM.nextLong());
status.put("location", TaskLocation.create(UUID.randomUUID().toString(),
8080, 995));
status.put("errorMsg", UUID.randomUUID().toString());
@@ -590,8 +585,7 @@ public class SQLMetadataStorageActionHandlerTest
);
}
- private void insertTaskInfo(TaskInfo<Map<String, Object>, Map<String,
Object>> taskInfo,
- boolean altered)
+ private void insertTaskInfo(TaskInfo<Map<String, Object>, Map<String,
Object>> taskInfo, boolean altered)
{
try {
handler.insert(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]