kfaraz commented on code in PR #16144:
URL: https://github.com/apache/druid/pull/16144#discussion_r1566658230
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalAppendAction.java:
##########
@@ -115,6 +127,13 @@ public TypeReference<SegmentPublishResult>
getReturnTypeReference()
@Override
public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox)
{
+ if (!(task instanceof PendingSegmentAllocatingTask)) {
+ throw new IAE(
Review Comment:
Use `DruidException` instead.
##########
server/src/main/java/org/apache/druid/metadata/PendingSegmentRecord.java:
##########
@@ -32,6 +30,15 @@
import javax.annotation.Nullable;
import java.sql.ResultSet;
+/**
+ * Representation of a record in the pending segments table. <br/>
+ * Mapping of column in table to field:
+ * <li/> id -> id (Unique identifier for pending segment)
+ * <li/> sequence_name -> sequenceName (sequence name used for segment
allocation)
+ * <li/> sequence_prev_id -> sequencePrevId (previous segment id used for
segment allocation)
+ * <li/> upgraded_from_segment_id -> upgradedFromSegmentId (Id of the root
segment from which this was upgraded)
+ * <li/> task_allocator_id -> taskAllocatorId (Associates a task / task group
/ replica group with the pending segment)
Review Comment:
Does this render correctly as a javadoc?
The `li` tags should ideally open and close like standard HTML tags:
`<li>content</li>`
The mapping of column name to field is not needed as it is pretty self
explanatory.
Also, the field descriptions should just be added in the respective getters
and/or the constructor rather than at the class level.
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/PendingSegmentAllocatingTask.java:
##########
@@ -19,6 +19,9 @@
package org.apache.druid.indexing.common.task;
+/**
+ * An interface to be implemented by every appending task that allocates
pending segments.
+ */
Review Comment:
```suggestion
* An append task that can allocate pending segments. All concrete {@link
Task} implementations that need to allocate pending segments must implement
this interface.
*/
```
##########
server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java:
##########
@@ -1473,183 +1427,6 @@ private void insertPendingSegmentIntoMetastore(
.execute();
}
- /**
- * Creates new IDs for the given append segments if a REPLACE task started
and
- * finished after these append segments had already been allocated. The newly
- * created IDs belong to the same interval and version as the segments
committed
- * by the REPLACE task.
- */
- private Set<DataSegment> createNewIdsForAppendSegments(
- Handle handle,
- String dataSource,
- Set<DataSegment> segmentsToAppend
- )
- {
- if (segmentsToAppend.isEmpty()) {
- return Collections.emptySet();
- }
-
- final Set<Interval> appendIntervals = new HashSet<>();
- final TreeMap<String, Set<DataSegment>> appendVersionToSegments = new
TreeMap<>();
- for (DataSegment segment : segmentsToAppend) {
- appendIntervals.add(segment.getInterval());
- appendVersionToSegments.computeIfAbsent(segment.getVersion(), v -> new
HashSet<>())
- .add(segment);
- }
-
- // Fetch all used segments that overlap with any of the append intervals
- final Collection<DataSegment> overlappingSegments =
retrieveUsedSegmentsForIntervals(
- dataSource,
- new ArrayList<>(appendIntervals),
- Segments.INCLUDING_OVERSHADOWED
- );
-
- final Map<String, Set<Interval>> overlappingVersionToIntervals = new
HashMap<>();
- final Map<Interval, Set<DataSegment>> overlappingIntervalToSegments = new
HashMap<>();
- for (DataSegment segment : overlappingSegments) {
- overlappingVersionToIntervals.computeIfAbsent(segment.getVersion(), v ->
new HashSet<>())
- .add(segment.getInterval());
- overlappingIntervalToSegments.computeIfAbsent(segment.getInterval(), i
-> new HashSet<>())
- .add(segment);
- }
-
- final Set<DataSegment> upgradedSegments = new HashSet<>();
- for (Map.Entry<String, Set<Interval>> entry :
overlappingVersionToIntervals.entrySet()) {
- final String upgradeVersion = entry.getKey();
- Map<Interval, Set<DataSegment>> segmentsToUpgrade =
getSegmentsWithVersionLowerThan(
- upgradeVersion,
- entry.getValue(),
- appendVersionToSegments
- );
- for (Map.Entry<Interval, Set<DataSegment>> upgradeEntry :
segmentsToUpgrade.entrySet()) {
- final Interval upgradeInterval = upgradeEntry.getKey();
- final Set<DataSegment> segmentsAlreadyOnVersion
- = overlappingIntervalToSegments.getOrDefault(upgradeInterval,
Collections.emptySet())
- .stream()
- .filter(s ->
s.getVersion().equals(upgradeVersion))
- .collect(Collectors.toSet());
- Set<DataSegment> segmentsUpgradedToVersion =
createNewIdsForAppendSegmentsWithVersion(
- handle,
- upgradeVersion,
- upgradeInterval,
- upgradeEntry.getValue(),
- segmentsAlreadyOnVersion
- );
- log.info("Upgraded [%d] segments to version[%s].",
segmentsUpgradedToVersion.size(), upgradeVersion);
- upgradedSegments.addAll(segmentsUpgradedToVersion);
- }
- }
-
- return upgradedSegments;
- }
-
- /**
- * Creates a Map from eligible interval to Set of segments that are fully
- * contained in that interval and have a version strictly lower than {@code
#cutoffVersion}.
- */
- private Map<Interval, Set<DataSegment>> getSegmentsWithVersionLowerThan(
Review Comment:
Cool cleanup! I assume none of these methods are needed anymore?
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocateAction.java:
##########
@@ -210,6 +211,13 @@ public SegmentIdWithShardSpec perform(
final TaskActionToolbox toolbox
)
{
+ if (!(task instanceof PendingSegmentAllocatingTask)) {
+ throw new IAE(
+ "Task[%s] of type[%s] cannot allocate segments as it does not
implement PendingSegmentAllocatingTask.",
+ task.getId(),
+ task.getType()
Review Comment:
```suggestion
task.getId(), task.getType()
```
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocateAction.java:
##########
@@ -210,6 +211,13 @@ public SegmentIdWithShardSpec perform(
final TaskActionToolbox toolbox
)
{
+ if (!(task instanceof PendingSegmentAllocatingTask)) {
+ throw new IAE(
Review Comment:
Use a `DruidException` instead.
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java:
##########
@@ -37,14 +37,28 @@
import org.apache.druid.timeline.DataSegment;
import java.util.HashMap;
-import java.util.List;
+import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
/**
* Replace segments in metadata storage. The segment versions must all be less
than or equal to a lock held by
* your task for the segment intervals.
+ *
+ * <pre>
+ * Pseudo code (for a single interval)
Review Comment:
Same comment regarding pseudo-code.
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalAppendAction.java:
##########
@@ -44,6 +45,17 @@
/**
* Append segments to metadata storage. The segment versions must all be less
than or equal to a lock held by
* your task for the segment intervals.
+ *
+ * <pre>
+ * Pseudo code (for a single interval):
+ * For an append lock held over an interval:
+ * transaction {
+ * commit input segments contained within interval
+ * if there is an active replace lock over the interval:
+ * add an entry for the inputSegment corresponding to the replace
lock's task in the upgradeSegments table
+ * fetch pending segments with parent contained within the input
segments, and commit them
+ * }
Review Comment:
It doesn't seem appropriate to have the implementation described as
pseudo-code. Someone might as well read the code. It is better to briefly
describe the key points of the implementation in a list fashion. (This is not a
blocker for the PR).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]