kfaraz commented on code in PR #16667: URL: https://github.com/apache/druid/pull/16667#discussion_r1671741488
########## server/src/main/java/org/apache/druid/metadata/UpgradedFromSegmentsResponse.java: ########## @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.metadata; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.Map; + +/** + * Response for the RetrieveUpgradedFromSegmentIds task action + */ +public class UpgradedFromSegmentsResponse +{ + // Map from a segment ID to the segment ID from which it was upgraded + // There should be no entry in the map for an original non-upgraded segment + private final Map<String, String> upgradedFromSegmentIds; + + @JsonCreator + public UpgradedFromSegmentsResponse( + @JsonProperty("upgradedFromSegmentIds") Map<String, String> upgradedFromSegmentIds + ) + { + this.upgradedFromSegmentIds = upgradedFromSegmentIds; + } + + @JsonProperty + public Map<String, String> getUpgradedFromSegmentId() Review Comment: ```suggestion public Map<String, String> getUpgradedFromSegmentIds() ``` ########## server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java: ########## @@ -587,6 +587,12 @@ protected void alterSegmentTable() Map<String, String> columnNameTypes = new HashMap<>(); columnNameTypes.put("used_status_last_updated", "VARCHAR(255)"); + // upgraded_from_segment_id is the first segment to which the same load spec originally belonged + // Load specs can be shared as a result of segment version upgrade Review Comment: Not really needed, there is no definition for any of the other columns. ########## indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java: ########## @@ -203,11 +208,9 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception ); // Fetch the load specs of all segments overlapping with the unused segment intervals Review Comment: These javadoc changes are pending. ########## server/src/main/java/org/apache/druid/metadata/UpgradedFromSegmentsResponse.java: ########## @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.metadata; Review Comment: Why are these classes in this package? They should be in the same package as the task action itself. ########## server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java: ########## @@ -473,4 +475,22 @@ SegmentPublishResult commitMetadataOnly( * @return List of pending segment records */ List<PendingSegmentRecord> getPendingSegments(String datasource, Interval interval); + + /** + * Returns a mapping from the segments ids to their parent segments ids + * + * @param dataSource data source + * @param segmentIds ids of segments + * @return UpgradedFromSegmentsResponse + */ + UpgradedFromSegmentsResponse retrieveUpgradedFromSegmentIds(String dataSource, Set<String> segmentIds); Review Comment: This method should return only the map, not the response object. ########## server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java: ########## @@ -473,4 +475,22 @@ SegmentPublishResult commitMetadataOnly( * @return List of pending segment records */ List<PendingSegmentRecord> getPendingSegments(String datasource, Interval interval); + + /** + * Returns a mapping from the segments ids to their parent segments ids + * + * @param dataSource data source + * @param segmentIds ids of segments + * @return UpgradedFromSegmentsResponse + */ + UpgradedFromSegmentsResponse retrieveUpgradedFromSegmentIds(String dataSource, Set<String> segmentIds); + + /** + * Returns a mapping from segment ids to their child segment ids + * + * @param dataSource data source + * @param upgradedFromSegmentIds ids of the first segments which had the corresponding load spec + * @return UpgradedToSegmentsResponse + */ + UpgradedToSegmentsResponse retrieveUpgradedToSegmentIds(String dataSource, Set<String> upgradedFromSegmentIds); Review Comment: ```suggestion UpgradedToSegmentsResponse retrieveUpgradedToSegmentIds(String dataSource, Set<String> segmentIds); ``` ########## server/src/main/java/org/apache/druid/metadata/UpgradedToSegmentsResponse.java: ########## @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.metadata; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.Map; +import java.util.Set; + +/** + * Response for the RetrieveUpgradedToSegmentIds task action + */ +public class UpgradedToSegmentsResponse +{ + + // Map from a segment ID to a set containing + // all segment IDs that were upgraded from it AND are still present in the metadata store Review Comment: This info should be a javadoc of the task action itself. ########## server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java: ########## @@ -473,4 +475,22 @@ SegmentPublishResult commitMetadataOnly( * @return List of pending segment records */ List<PendingSegmentRecord> getPendingSegments(String datasource, Interval interval); + + /** + * Returns a mapping from the segments ids to their parent segments ids + * + * @param dataSource data source + * @param segmentIds ids of segments + * @return UpgradedFromSegmentsResponse Review Comment: Not needed as they don't add new information. ########## server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java: ########## @@ -473,4 +475,22 @@ SegmentPublishResult commitMetadataOnly( * @return List of pending segment records */ List<PendingSegmentRecord> getPendingSegments(String datasource, Interval interval); + + /** + * Returns a mapping from the segments ids to their parent segments ids + * + * @param dataSource data source + * @param segmentIds ids of segments + * @return UpgradedFromSegmentsResponse + */ + UpgradedFromSegmentsResponse retrieveUpgradedFromSegmentIds(String dataSource, Set<String> segmentIds); + + /** + * Returns a mapping from segment ids to their child segment ids + * + * @param dataSource data source + * @param upgradedFromSegmentIds ids of the first segments which had the corresponding load spec + * @return UpgradedToSegmentsResponse + */ + UpgradedToSegmentsResponse retrieveUpgradedToSegmentIds(String dataSource, Set<String> upgradedFromSegmentIds); Review Comment: Return the map, not the response object as response object is meant only to be sent over the wire. ########## server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java: ########## @@ -473,4 +475,22 @@ SegmentPublishResult commitMetadataOnly( * @return List of pending segment records */ List<PendingSegmentRecord> getPendingSegments(String datasource, Interval interval); + + /** + * Returns a mapping from the segments ids to their parent segments ids + * + * @param dataSource data source + * @param segmentIds ids of segments + * @return UpgradedFromSegmentsResponse + */ + UpgradedFromSegmentsResponse retrieveUpgradedFromSegmentIds(String dataSource, Set<String> segmentIds); + + /** + * Returns a mapping from segment ids to their child segment ids + * + * @param dataSource data source + * @param upgradedFromSegmentIds ids of the first segments which had the corresponding load spec + * @return UpgradedToSegmentsResponse Review Comment: Not needed. ########## server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java: ########## @@ -473,4 +475,22 @@ SegmentPublishResult commitMetadataOnly( * @return List of pending segment records */ List<PendingSegmentRecord> getPendingSegments(String datasource, Interval interval); + + /** + * Returns a mapping from the segments ids to their parent segments ids + * + * @param dataSource data source + * @param segmentIds ids of segments + * @return UpgradedFromSegmentsResponse + */ + UpgradedFromSegmentsResponse retrieveUpgradedFromSegmentIds(String dataSource, Set<String> segmentIds); + + /** + * Returns a mapping from segment ids to their child segment ids Review Comment: Javadoc should contains the details mentioned in the response class, i.e. ``` Returns a map from a segment ID to the set containing all segment IDs that were upgraded from it AND are still present in the metadata store. ``` ########## server/src/test/java/org/apache/druid/metadata/IndexerSqlMetadataStorageCoordinatorTestBase.java: ########## @@ -322,6 +323,7 @@ protected DataSegment createSegment(Interval interval, String version, ShardSpec .version(version) .shardSpec(shardSpec) .size(100) + .loadSpec(ImmutableMap.of("hash", Objects.hash(interval, version, shardSpec))) Review Comment: Why even use a hash then? Why not just use the segment ID itself? ########## server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java: ########## @@ -138,16 +138,19 @@ public void testCommitAppendSegments() final String v1 = "2023-01-01"; final String v2 = "2023-01-02"; final String v3 = "2023-01-03"; + final String alreadyUpgradedVersion = "2023-02-01"; final String lockVersion = "2024-01-01"; - final String replaceTaskId = "replaceTask1"; + final String taskAllocatorId = "appendTask"; + final String replaceTaskId = "replaceTask"; Review Comment: Task ID seems to have changed. Just curious about why this was needed? ########## server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java: ########## @@ -2266,7 +2328,8 @@ private Set<DataSegment> insertSegments( Set<DataSegment> segments, @Nullable SegmentSchemaMapping segmentSchemaMapping, Map<SegmentId, SegmentMetadata> upgradeSegmentMetadata, - Map<SegmentId, SegmentId> newVersionForAppendToParent + Map<SegmentId, SegmentId> newVersionForAppendToParent, + Map<String, String> upgradedFromSegmentIdMap Review Comment: Please deduplicate these two maps. They serve the same purpose. ########## server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java: ########## @@ -2217,14 +2271,22 @@ private Set<DataSegmentPlus> createNewIdsOfAppendSegmentsAfterReplace( .shardSpec(shardSpec) .build(); + // When the segment already has an upgraded_from_segment_id, reuse it for its children + // This ensures that the row has a single level of lineage Review Comment: Sounds confusing, let's remove this. ########## server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java: ########## @@ -1540,6 +1551,48 @@ int insertPendingSegmentsIntoMetastore( return Arrays.stream(updated).sum(); } + @VisibleForTesting + public void insertUsedSegments(Set<DataSegment> dataSegments, Map<SegmentId, String> upgradedFromSegmentIdMap) Review Comment: This method should not be in this class as it is invoked only from tests and does not update any state of this class. ########## server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java: ########## @@ -587,6 +587,11 @@ protected void alterSegmentTable() Map<String, String> columnNameTypes = new HashMap<>(); columnNameTypes.put("used_status_last_updated", "VARCHAR(255)"); + // upgraded_from_segment_id is the first segment to which the same load spec originally belonged + // Load specs can be shared as a result of segment version upgrade + // This column is null for segments that haven't been upgraded. + columnNameTypes.put("upgraded_from_segment_id", "VARCHAR(255)"); Review Comment: Keeping the conversation as unresolved so that we know that is to be addressed later. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
