imply-cheddar commented on code in PR #12404:
URL: https://github.com/apache/druid/pull/12404#discussion_r858145172


##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/HeapMemoryTaskStorage.java:
##########
@@ -170,6 +172,25 @@ public List<Task> getActiveTasksByDatasource(String 
datasource)
     return listBuilder.build();
   }
 
+  private TaskStatusPlus toTaskStatusPlus(TaskInfo<Task, TaskStatus> taskInfo)

Review Comment:
   Why not make this a static method on `TaskStatusPlus`?  Is it really 
important for it to be private to this class?



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java:
##########
@@ -745,87 +745,57 @@ private List<TaskStatusPlus> getTasks(
 
     if (state == TaskStateLookup.PENDING || state == TaskStateLookup.RUNNING) {
       // We are interested in only those tasks which are in taskRunner.
-      taskInfoStreamFromTaskStorage = taskInfoStreamFromTaskStorage
-          .filter(info -> runnerWorkItems.containsKey(info.getId()));
+      taskStatusPlusStream = taskStatusPlusStream
+          .filter(statusPlus -> 
runnerWorkItems.containsKey(statusPlus.getId()));
     }
-    final List<TaskInfo<Task, TaskStatus>> taskInfoFromTaskStorage = 
taskInfoStreamFromTaskStorage
-        .collect(Collectors.toList());
+    final List<TaskStatusPlus> taskStatusPlusList = 
taskStatusPlusStream.collect(Collectors.toList());
 
     // Separate complete and active tasks from taskStorage.
     // Note that taskStorage can return only either complete tasks or active 
tasks per TaskLookupType.
-    final List<TaskInfo<Task, TaskStatus>> completeTaskInfoFromTaskStorage = 
new ArrayList<>();
-    final List<TaskInfo<Task, TaskStatus>> activeTaskInfoFromTaskStorage = new 
ArrayList<>();
-    for (TaskInfo<Task, TaskStatus> info : taskInfoFromTaskStorage) {
-      if (info.getStatus().isComplete()) {
-        completeTaskInfoFromTaskStorage.add(info);
+    final List<TaskStatusPlus> completeTaskStatusPlusList = new ArrayList<>();
+    final List<TaskStatusPlus> activeTaskStatusPlusList = new ArrayList<>();
+    for (TaskStatusPlus statusPlus : taskStatusPlusList) {
+      if (statusPlus.getStatusCode().isComplete()) {
+        completeTaskStatusPlusList.add(statusPlus);
       } else {
-        activeTaskInfoFromTaskStorage.add(info);
+        activeTaskStatusPlusList.add(statusPlus);
       }
     }
 
-    final List<TaskStatusPlus> statuses = new ArrayList<>();
-    completeTaskInfoFromTaskStorage.forEach(taskInfo -> statuses.add(
-        new TaskStatusPlus(
-            taskInfo.getId(),
-            taskInfo.getTask() == null ? null : 
taskInfo.getTask().getGroupId(),
-            taskInfo.getTask() == null ? null : taskInfo.getTask().getType(),
-            taskInfo.getCreatedTime(),
-            DateTimes.EPOCH,
-            taskInfo.getStatus().getStatusCode(),
-            RunnerTaskState.NONE,
-            taskInfo.getStatus().getDuration(),
-            taskInfo.getStatus().getLocation(),
-            taskInfo.getDataSource(),
-            taskInfo.getStatus().getErrorMsg()
-        )
-    ));
+    final List<TaskStatusPlus> taskStatuses = new 
ArrayList<>(completeTaskStatusPlusList);
 
-    activeTaskInfoFromTaskStorage.forEach(taskInfo -> {
-      final TaskRunnerWorkItem runnerWorkItem = 
runnerWorkItems.get(taskInfo.getId());
+    activeTaskStatusPlusList.forEach(statusPlus -> {
+      final TaskRunnerWorkItem runnerWorkItem = 
runnerWorkItems.get(statusPlus.getId());
       if (runnerWorkItem == null) {
         // a task is assumed to be a waiting task if it exists in taskStorage 
but not in taskRunner.
         if (state == TaskStateLookup.WAITING || state == TaskStateLookup.ALL) {
-          statuses.add(
-              new TaskStatusPlus(
-                  taskInfo.getId(),
-                  taskInfo.getTask() == null ? null : 
taskInfo.getTask().getGroupId(),
-                  taskInfo.getTask() == null ? null : 
taskInfo.getTask().getType(),
-                  taskInfo.getCreatedTime(),
-                  DateTimes.EPOCH,
-                  taskInfo.getStatus().getStatusCode(),
-                  RunnerTaskState.WAITING,
-                  taskInfo.getStatus().getDuration(),
-                  taskInfo.getStatus().getLocation(),
-                  taskInfo.getDataSource(),
-                  taskInfo.getStatus().getErrorMsg()
-              )
-          );
+          taskStatuses.add(statusPlus);
         }
       } else {
         if (state == TaskStateLookup.PENDING || state == 
TaskStateLookup.RUNNING || state == TaskStateLookup.ALL) {
-          statuses.add(
+          taskStatuses.add(
               new TaskStatusPlus(

Review Comment:
   If it's already a TaskStatusPlus, why do you need to do all of this?  It 
looks like this is setting createdTime and insertion time, are those not 
already set?



##########
server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java:
##########
@@ -350,6 +381,87 @@ tableName, getPayloadType(), getCollation()
     );
   }
 
+  public void alterEntryTable(final String tableName)
+  {
+    try {
+      retryWithHandle(
+          new HandleCallback<Void>()
+          {
+            @Override
+            public Void withHandle(Handle handle)
+            {
+              if (!tableContainsColumn(handle, tableName, "type")) {

Review Comment:
   Why not add both of the columns at the same time?



##########
server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java:
##########
@@ -350,6 +381,87 @@ tableName, getPayloadType(), getCollation()
     );
   }
 
+  public void alterEntryTable(final String tableName)
+  {
+    try {
+      retryWithHandle(
+          new HandleCallback<Void>()
+          {
+            @Override
+            public Void withHandle(Handle handle)
+            {
+              if (!tableContainsColumn(handle, tableName, "type")) {
+                log.info("Adding column: type to table[%s]", tableName);
+                handle.execute(StringUtils.format("ALTER TABLE %1$s ADD COLUMN 
type VARCHAR(255)", tableName));
+              }
+              if (!tableContainsColumn(handle, tableName, "group_id")) {
+                log.info("Adding column: group_id to table[%s]", tableName);
+                handle.execute(StringUtils.format("ALTER TABLE %1$s ADD COLUMN 
group_id VARCHAR(255)", tableName));
+              }
+              return null;
+            }
+          }
+      );
+    }
+    catch (Exception e) {
+      log.warn(e, "Exception altering table");
+    }
+  }
+
+  @Override
+  public boolean migrateTaskTable()
+  {
+    final MetadataStorageTablesConfig tablesConfig = 
tablesConfigSupplier.get();
+    final String entryType = tablesConfig.getTaskEntryType();
+    final String tableName = tablesConfig.getEntryTable(entryType);
+    return migrateTaskTable(tableName);
+  }
+
+  public boolean migrateTaskTable(String tableName)
+  {
+    log.info("Populate fields task and group_id of task entry table [%s] from 
payload", tableName);
+    try {
+      retryWithHandle(
+          new HandleCallback<Void>()
+          {
+            @Override
+            public Void withHandle(Handle handle) throws SQLException, 
IOException
+            {
+              ObjectMapper objectMapper = new ObjectMapper();

Review Comment:
   `new ObjectMapper()` should never exist in the Druid code.  It's almost 
always incorrect and if you find yourself needing to do it because you have no 
other way of getting an ObjectMapper, the code structure is likely to blame.
   
   In this case, the fact that the `MetadataConnector` is dealing with these 
things is to blame.  `SQLMetadataStorageActionHandler` already has an object 
mapper that it uses.  The logic for this migration should actually exist inside 
of that class or `MetadataTaskStorage` and *not* in here.



##########
server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java:
##########
@@ -311,7 +322,104 @@ public List<TaskInfo<EntryType, StatusType>> getTaskInfos(
     );
   }
 
-  protected Query<Map<String, Object>> createCompletedTaskInfoQuery(
+  @Override
+  public List<TaskStatusPlus> getTaskStatusPlusList(
+      Map<TaskLookupType, TaskLookup> taskLookups,
+      @Nullable String dataSource,
+      boolean fetchPayload

Review Comment:
   This boolean here about fetching payload is weird, especially on a public 
method.  This class should already know if it shoudl be using the payload or 
not.  Once you move the mgiration logic into this class, you won't need this 
anymore.



##########
indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java:
##########
@@ -1782,6 +1783,25 @@ private TaskInfo<Task, TaskStatus> createTaskInfo(
     );
   }
 
+  private TaskStatusPlus toTaskStatusPlus(TaskInfo<Task, TaskStatus> taskInfo)

Review Comment:
   This method shows up again?



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/HeapMemoryTaskStorage.java:
##########
@@ -233,6 +254,35 @@ public List<TaskInfo<Task, TaskStatus>> getTaskInfos(
     return tasks;
   }
 
+  @Override
+  public List<TaskStatusPlus> getTaskStatusPlusList(
+      Map<TaskLookupType, TaskLookup> taskLookups,
+      @Nullable String datasource
+  )
+  {
+    final List<TaskStatusPlus> tasks = new ArrayList<>();
+    taskLookups.forEach((type, lookup) -> {
+      if (type == TaskLookupType.COMPLETE) {
+        CompleteTaskLookup completeTaskLookup = (CompleteTaskLookup) lookup;
+        tasks.addAll(
+            getRecentlyCreatedAlreadyFinishedTaskInfo(
+                completeTaskLookup.hasTaskCreatedTimeFilter()
+                ? completeTaskLookup
+                : 
completeTaskLookup.withDurationBeforeNow(config.getRecentlyFinishedThreshold()),
+                datasource
+            ).stream()
+             .map(taskInfo -> toTaskStatusPlus(taskInfo))

Review Comment:
   nit: might as well just pass `toTaskStatusPlus` it can be a lambda on its 
own.



##########
server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java:
##########
@@ -350,6 +381,87 @@ tableName, getPayloadType(), getCollation()
     );
   }
 
+  public void alterEntryTable(final String tableName)
+  {
+    try {
+      retryWithHandle(
+          new HandleCallback<Void>()
+          {
+            @Override
+            public Void withHandle(Handle handle)
+            {
+              if (!tableContainsColumn(handle, tableName, "type")) {
+                log.info("Adding column: type to table[%s]", tableName);
+                handle.execute(StringUtils.format("ALTER TABLE %1$s ADD COLUMN 
type VARCHAR(255)", tableName));

Review Comment:
   We likely want to explicitly default the column to null.  This tends to 
avoid locks on DDL tables, where if there is a default value (or if the default 
that the ALTER table uses is non null), then the ALTER TABLE can end up locking 
the table and that can end up causing other sadness.



##########
server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java:
##########
@@ -350,6 +381,87 @@ tableName, getPayloadType(), getCollation()
     );
   }
 
+  public void alterEntryTable(final String tableName)
+  {
+    try {
+      retryWithHandle(

Review Comment:
   Why retry?  Why do we expect it to be normal for this to fail once and then 
succeed subsequently?



##########
server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java:
##########
@@ -350,6 +381,87 @@ tableName, getPayloadType(), getCollation()
     );
   }
 
+  public void alterEntryTable(final String tableName)
+  {
+    try {
+      retryWithHandle(
+          new HandleCallback<Void>()
+          {
+            @Override
+            public Void withHandle(Handle handle)
+            {
+              if (!tableContainsColumn(handle, tableName, "type")) {
+                log.info("Adding column: type to table[%s]", tableName);
+                handle.execute(StringUtils.format("ALTER TABLE %1$s ADD COLUMN 
type VARCHAR(255)", tableName));
+              }
+              if (!tableContainsColumn(handle, tableName, "group_id")) {
+                log.info("Adding column: group_id to table[%s]", tableName);
+                handle.execute(StringUtils.format("ALTER TABLE %1$s ADD COLUMN 
group_id VARCHAR(255)", tableName));
+              }
+              return null;
+            }
+          }
+      );
+    }
+    catch (Exception e) {
+      log.warn(e, "Exception altering table");
+    }
+  }
+
+  @Override
+  public boolean migrateTaskTable()

Review Comment:
   I'm not sure this really belongs in the `MetadataConnector`...  
`MetadataTaskStorage` would likely be a more natural place to actually have the 
migration logic as that is also the place that actually understands what to do 
with the tables.



##########
server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java:
##########
@@ -350,6 +381,87 @@ tableName, getPayloadType(), getCollation()
     );
   }
 
+  public void alterEntryTable(final String tableName)
+  {
+    try {
+      retryWithHandle(
+          new HandleCallback<Void>()
+          {
+            @Override
+            public Void withHandle(Handle handle)
+            {
+              if (!tableContainsColumn(handle, tableName, "type")) {
+                log.info("Adding column: type to table[%s]", tableName);
+                handle.execute(StringUtils.format("ALTER TABLE %1$s ADD COLUMN 
type VARCHAR(255)", tableName));
+              }
+              if (!tableContainsColumn(handle, tableName, "group_id")) {
+                log.info("Adding column: group_id to table[%s]", tableName);
+                handle.execute(StringUtils.format("ALTER TABLE %1$s ADD COLUMN 
group_id VARCHAR(255)", tableName));
+              }
+              return null;
+            }
+          }
+      );
+    }
+    catch (Exception e) {
+      log.warn(e, "Exception altering table");
+    }
+  }
+
+  @Override
+  public boolean migrateTaskTable()
+  {
+    final MetadataStorageTablesConfig tablesConfig = 
tablesConfigSupplier.get();
+    final String entryType = tablesConfig.getTaskEntryType();
+    final String tableName = tablesConfig.getEntryTable(entryType);
+    return migrateTaskTable(tableName);
+  }
+
+  public boolean migrateTaskTable(String tableName)
+  {
+    log.info("Populate fields task and group_id of task entry table [%s] from 
payload", tableName);
+    try {
+      retryWithHandle(
+          new HandleCallback<Void>()
+          {
+            @Override
+            public Void withHandle(Handle handle) throws SQLException, 
IOException
+            {
+              ObjectMapper objectMapper = new ObjectMapper();
+              Connection connection = handle.getConnection();
+              Statement statement = 
connection.createStatement(ResultSet.TYPE_SCROLL_INSENSITIVE, 
ResultSet.CONCUR_UPDATABLE);
+              boolean flag = true;
+              while (flag) {
+                // Should ideally use a cursor and sort by id for efficiency, 
but updates with ordering aren't allowed
+                String sql = StringUtils.format(
+                    "SELECT * FROM %1$s WHERE active = false AND type IS null 
%2$s",
+                    tableName,
+                    limitClause(100)
+                );
+                ResultSet resultSet = statement.executeQuery(sql);
+                flag = false;
+                while (resultSet.next()) {
+                  ObjectNode payload = 
objectMapper.readValue(resultSet.getBytes("payload"), ObjectNode.class);
+                  resultSet.updateString("type", payload.get("type").asText());
+                  resultSet.updateString("group_id", 
payload.get("groupId").asText());
+                  resultSet.updateRow();

Review Comment:
   Fwiw, you probably don't even need to set a "limit" clause on the query 
anymore if you follow what I suggested above.  Basically, if you just tell JDBI 
to only give you the first 100 things from the ResultSet, it should be able to 
just return them and then when you close the cursor, the DB will close things 
and be happy-ish.



##########
server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java:
##########
@@ -350,6 +381,87 @@ tableName, getPayloadType(), getCollation()
     );
   }
 
+  public void alterEntryTable(final String tableName)
+  {
+    try {
+      retryWithHandle(
+          new HandleCallback<Void>()
+          {
+            @Override
+            public Void withHandle(Handle handle)
+            {
+              if (!tableContainsColumn(handle, tableName, "type")) {
+                log.info("Adding column: type to table[%s]", tableName);
+                handle.execute(StringUtils.format("ALTER TABLE %1$s ADD COLUMN 
type VARCHAR(255)", tableName));
+              }
+              if (!tableContainsColumn(handle, tableName, "group_id")) {
+                log.info("Adding column: group_id to table[%s]", tableName);
+                handle.execute(StringUtils.format("ALTER TABLE %1$s ADD COLUMN 
group_id VARCHAR(255)", tableName));
+              }
+              return null;
+            }
+          }
+      );
+    }
+    catch (Exception e) {
+      log.warn(e, "Exception altering table");
+    }
+  }
+
+  @Override
+  public boolean migrateTaskTable()
+  {
+    final MetadataStorageTablesConfig tablesConfig = 
tablesConfigSupplier.get();
+    final String entryType = tablesConfig.getTaskEntryType();
+    final String tableName = tablesConfig.getEntryTable(entryType);
+    return migrateTaskTable(tableName);
+  }
+
+  public boolean migrateTaskTable(String tableName)
+  {
+    log.info("Populate fields task and group_id of task entry table [%s] from 
payload", tableName);
+    try {
+      retryWithHandle(
+          new HandleCallback<Void>()
+          {
+            @Override
+            public Void withHandle(Handle handle) throws SQLException, 
IOException
+            {
+              ObjectMapper objectMapper = new ObjectMapper();
+              Connection connection = handle.getConnection();
+              Statement statement = 
connection.createStatement(ResultSet.TYPE_SCROLL_INSENSITIVE, 
ResultSet.CONCUR_UPDATABLE);
+              boolean flag = true;
+              while (flag) {
+                // Should ideally use a cursor and sort by id for efficiency, 
but updates with ordering aren't allowed
+                String sql = StringUtils.format(
+                    "SELECT * FROM %1$s WHERE active = false AND type IS null 
%2$s",
+                    tableName,
+                    limitClause(100)
+                );
+                ResultSet resultSet = statement.executeQuery(sql);
+                flag = false;
+                while (resultSet.next()) {
+                  ObjectNode payload = 
objectMapper.readValue(resultSet.getBytes("payload"), ObjectNode.class);
+                  resultSet.updateString("type", payload.get("type").asText());
+                  resultSet.updateString("group_id", 
payload.get("groupId").asText());
+                  resultSet.updateRow();

Review Comment:
   Doing updates like this in the same transaction is overkill and might get 
into weird locking behaviors on the table.  It's best not to do it.  Each 
update should be effectively 2 queries: 1 to get the task payloads, that would 
exit and return all DB resources and then a second one to actually issue an 
UPDATE command to set the extra fields based on the taskId.
   
   When the first query stops returning results, that's when you know the 
migration is complete.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to