kfaraz commented on code in PR #12404:
URL: https://github.com/apache/druid/pull/12404#discussion_r859426425
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java:
##########
@@ -115,6 +122,10 @@ public MetadataTaskStorage(
public void start()
{
metadataStorageConnector.createTaskTables();
+ ExecutorService executorService = Executors.newSingleThreadExecutor();
+ taskMigrationCompleteFuture = executorService.submit(() -> {
+ return metadataStorageConnector.migrateTaskTable();
+ });
Review Comment:
Nit: can be lambda simplified.
##########
extensions-contrib/sqlserver-metadata-storage/src/test/java/org/apache/druid/metadata/storage/sqlserver/SQLServerConnectorTest.java:
##########
@@ -64,4 +64,27 @@ public void testIsTransientException()
Assert.assertFalse(connector.isTransientException(new Throwable("Throwable
with reason only")));
}
+ @Test
+ public void testLimitClause()
+ {
+ SQLServerConnector connector = new SQLServerConnector(
+ Suppliers.ofInstance(new MetadataStorageConnectorConfig()),
+ Suppliers.ofInstance(
+ new MetadataStorageTablesConfig(
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null
+ )
Review Comment:
this could fit in a single line.
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java:
##########
@@ -225,6 +238,35 @@ public List<TaskInfo<Task, TaskStatus>> getTaskInfos(
Map<TaskLookupType, TaskLookup> taskLookups,
@Nullable String datasource
)
+ {
+ Map<TaskLookupType, TaskLookup> theTaskLookups =
processTaskLookups(taskLookups);
+ return Collections.unmodifiableList(handler.getTaskInfos(theTaskLookups,
datasource));
+ }
+
+ @Override
+ public List<TaskStatusPlus> getTaskStatusPlusList(
+ Map<TaskLookupType, TaskLookup> taskLookups,
+ @Nullable String datasource
+ )
+ {
+ Map<TaskLookupType, TaskLookup> theTaskLookups =
processTaskLookups(taskLookups);
+ boolean fetchPayload = true;
+ if (taskMigrationCompleteFuture.isDone()) {
+ try {
+ fetchPayload = !taskMigrationCompleteFuture.get();
+ }
+ catch (Exception e) {
Review Comment:
Nit: Is this catch needed. In which case can there be an exception here?
##########
server/src/test/java/org/apache/druid/metadata/SQLMetadataStorageActionHandlerTest.java:
##########
@@ -450,4 +465,149 @@ public void testRemoveTasksOlderThan() throws Exception
Assert.assertEquals(1, handler.getLogs(entryId2).size());
Assert.assertEquals(1, handler.getLogs(entryId3).size());
}
+
+ @Test
+ public void testGetTaskStatusPlusList()
+ {
+ // SETUP
+ TaskInfo<Map<String, Object>, Map<String, Object>> activeUnaltered =
getRandomTaskInfo(true);
+ insertTaskInfo(activeUnaltered, false);
+
+ TaskInfo<Map<String, Object>, Map<String, Object>> completedUnaltered =
getRandomTaskInfo(false);
+ insertTaskInfo(completedUnaltered, false);
+
+ TaskInfo<Map<String, Object>, Map<String, Object>> activeAltered =
getRandomTaskInfo(true);
+ insertTaskInfo(activeAltered, true);
+
+ TaskInfo<Map<String, Object>, Map<String, Object>> completedAltered =
getRandomTaskInfo(false);
+ insertTaskInfo(completedAltered, true);
+
+ Map<TaskLookup.TaskLookupType, TaskLookup> taskLookups = new HashMap<>();
+ taskLookups.put(TaskLookup.TaskLookupType.ACTIVE,
ActiveTaskLookup.getInstance());
+ taskLookups.put(TaskLookup.TaskLookupType.COMPLETE,
CompleteTaskLookup.of(null, Duration.millis(86400000)));
+
+ List<TaskStatusPlus> taskStatusPlusList;
+
+ // BEFORE MIGRATION
+
+ // Payload based fetch. task type and groupid will be populated
+ taskStatusPlusList = handler.getTaskStatusPlusList(taskLookups, null,
true);
+ Assert.assertEquals(4, taskStatusPlusList.size());
+ verify(completedUnaltered, taskStatusPlusList, false, false, true);
+ verify(completedAltered, taskStatusPlusList, false, true, false);
+ verify(activeUnaltered, taskStatusPlusList, true, false, false);
+ verify(activeAltered, taskStatusPlusList, true, true, false);
+
+ // New columns based fetch before migration is complete. type and payload
are null when altered = false
+ taskStatusPlusList = handler.getTaskStatusPlusList(taskLookups, null,
false);
+ Assert.assertEquals(4, taskStatusPlusList.size());
+ verify(completedUnaltered, taskStatusPlusList, false, false, true);
+ verify(completedAltered, taskStatusPlusList, false, true, true);
+ verify(activeUnaltered, taskStatusPlusList, true, false, true);
+ verify(activeAltered, taskStatusPlusList, true, true, true);
+
+ // MIGRATION
+ derbyConnectorRule.getConnector().migrateTaskTable(entryTable);
+
+ // Payload based fetch. task type and groupid will still be populated
+ taskStatusPlusList = handler.getTaskStatusPlusList(taskLookups, null,
true);
+ Assert.assertEquals(4, taskStatusPlusList.size());
+ verify(completedUnaltered, taskStatusPlusList, false, false, false);
+ verify(completedAltered, taskStatusPlusList, false, true, false);
+ verify(activeUnaltered, taskStatusPlusList, true, false, false);
+ verify(activeAltered, taskStatusPlusList, true, true, false);
+
+ // New columns based fetch before migration is complete.
+ // type and payload are not null for completed task but are still null for
active ones since they aren't migrated
+ // An active task will be eventually updated on its own due to insertion
Review Comment:
A task that was ACTIVE before the migration continues to have the columns
`type` and `groupId` as null even after the migration (atleast until the task
completes and is finally updated in the db).
How is this handled?
##########
core/src/main/java/org/apache/druid/metadata/MetadataStorageActionHandler.java:
##########
@@ -99,6 +103,30 @@ List<TaskInfo<EntryType, StatusType>> getTaskInfos(
@Nullable String datasource
);
+ /**
+ * Returns a list of TaskStatusPlus for the tasks corresponding to the given
filters
+ *
+ * If {@code taskLookups} includes {@link TaskLookupType#ACTIVE}, it returns
all active tasks in the metadata store.
+ * If {@code taskLookups} includes {@link TaskLookupType#COMPLETE}, it
returns all complete tasks in the metadata
+ * store. For complete tasks, additional filters in {@code
CompleteTaskLookup} can be applied.
+ * All lookups should be processed atomically if there are more than one
lookup is given.
Review Comment:
```suggestion
* All lookups should be processed atomically if more than one lookup is
given.
```
##########
extensions-core/postgresql-metadata-storage/src/test/java/org/apache/druid/metadata/storage/postgresql/PostgreSQLConnectorTest.java:
##########
@@ -62,4 +62,30 @@ public void testIsTransientException()
Assert.assertFalse(connector.isTransientException(new Exception("I'm not
happy")));
Assert.assertFalse(connector.isTransientException(new Throwable("I give
up")));
}
+
+ @Test
+ public void testLimitClause()
+ {
+ PostgreSQLConnector connector = new PostgreSQLConnector(
+ Suppliers.ofInstance(new MetadataStorageConnectorConfig()),
+ Suppliers.ofInstance(
+ new MetadataStorageTablesConfig(
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null
+ )
Review Comment:
Nit: can fit in a single line.
##########
server/src/test/java/org/apache/druid/metadata/SQLMetadataStorageActionHandlerTest.java:
##########
@@ -53,39 +63,44 @@
public final ExpectedException thrown = ExpectedException.none();
private static final ObjectMapper JSON_MAPPER = new DefaultObjectMapper();
- private SQLMetadataStorageActionHandler<Map<String, Integer>, Map<String,
Integer>, Map<String, String>, Map<String, Integer>> handler;
+
+
+ private static final Random RANDOM = new Random(1);
+
+ private SQLMetadataStorageActionHandler<Map<String, Object>, Map<String,
Object>, Map<String, String>, Map<String, Object>> handler;
+
+ final String entryTable = "entries";
Review Comment:
Nit: can be private.
##########
indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java:
##########
@@ -265,7 +266,7 @@ public void testSecuredGetWaitingTask()
"deny",
getTaskWithIdAndDatasource("id_4", "deny")
)
- )
+ ).stream().map(this::toTaskStatusPlus).collect(Collectors.toList())
Review Comment:
this can probably be simplified by just creating a list of TaskStatusPlus
objects that the mocked method should return.
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorageQueryAdapter.java:
##########
@@ -85,21 +84,12 @@ public List<TaskInfo<Task, TaskStatus>>
getActiveTaskInfo(@Nullable String dataS
);
}
- public List<TaskInfo<Task, TaskStatus>>
getCompletedTaskInfoByCreatedTimeDuration(
Review Comment:
Is this not needed anymore?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]