AmatyaAvadhanula commented on code in PR #12404:
URL: https://github.com/apache/druid/pull/12404#discussion_r897967009
##########
server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java:
##########
@@ -350,6 +381,87 @@ tableName, getPayloadType(), getCollation()
);
}
+ public void alterEntryTable(final String tableName)
+ {
+ try {
+ retryWithHandle(
+ new HandleCallback<Void>()
+ {
+ @Override
+ public Void withHandle(Handle handle)
+ {
+ if (!tableContainsColumn(handle, tableName, "type")) {
+ log.info("Adding column: type to table[%s]", tableName);
+ handle.execute(StringUtils.format("ALTER TABLE %1$s ADD COLUMN
type VARCHAR(255)", tableName));
+ }
+ if (!tableContainsColumn(handle, tableName, "group_id")) {
+ log.info("Adding column: group_id to table[%s]", tableName);
+ handle.execute(StringUtils.format("ALTER TABLE %1$s ADD COLUMN
group_id VARCHAR(255)", tableName));
+ }
+ return null;
+ }
+ }
+ );
+ }
+ catch (Exception e) {
+ log.warn(e, "Exception altering table");
+ }
+ }
+
+ @Override
+ public boolean migrateTaskTable()
+ {
+ final MetadataStorageTablesConfig tablesConfig =
tablesConfigSupplier.get();
+ final String entryType = tablesConfig.getTaskEntryType();
+ final String tableName = tablesConfig.getEntryTable(entryType);
+ return migrateTaskTable(tableName);
+ }
+
+ public boolean migrateTaskTable(String tableName)
+ {
+ log.info("Populate fields task and group_id of task entry table [%s] from
payload", tableName);
+ try {
+ retryWithHandle(
+ new HandleCallback<Void>()
+ {
+ @Override
+ public Void withHandle(Handle handle) throws SQLException,
IOException
+ {
+ ObjectMapper objectMapper = new ObjectMapper();
+ Connection connection = handle.getConnection();
+ Statement statement =
connection.createStatement(ResultSet.TYPE_SCROLL_INSENSITIVE,
ResultSet.CONCUR_UPDATABLE);
+ boolean flag = true;
+ while (flag) {
+ // Should ideally use a cursor and sort by id for efficiency,
but updates with ordering aren't allowed
+ String sql = StringUtils.format(
+ "SELECT * FROM %1$s WHERE active = false AND type IS null
%2$s",
+ tableName,
+ limitClause(100)
+ );
+ ResultSet resultSet = statement.executeQuery(sql);
+ flag = false;
+ while (resultSet.next()) {
+ ObjectNode payload =
objectMapper.readValue(resultSet.getBytes("payload"), ObjectNode.class);
+ resultSet.updateString("type", payload.get("type").asText());
+ resultSet.updateString("group_id",
payload.get("groupId").asText());
+ resultSet.updateRow();
Review Comment:
Added logs and a sleep after each iteration
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]