kfaraz commented on code in PR #16144:
URL: https://github.com/apache/druid/pull/16144#discussion_r1566699055
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java:
##########
@@ -710,7 +732,7 @@ void allocateSegmentIds(
}
}
- private SegmentIdWithShardSpec allocateSegmentId(LockRequestForNewSegment
request, String version)
+ private SegmentIdWithShardSpec allocateSegmentId(LockRequestForNewSegment
request, String version, String taskGroup)
Review Comment:
Rename argument to `taskAllocatorId`.
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java:
##########
@@ -213,6 +220,16 @@ public int compare(Pair<Task, TaskLock> left, Pair<Task,
TaskLock> right)
activeTasks.remove(task.getId());
}
}
+ activePendingTaskGroupToTaskIds.clear();
+ for (Task task : storedActiveTasks) {
Review Comment:
Merge this with the previous `for` loop, no need to iterate over the same
list twice.
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java:
##########
@@ -1176,13 +1204,35 @@ public void remove(final Task task)
try {
try {
log.info("Removing task[%s] from activeTasks", task.getId());
- if (findLocksForTask(task).stream().anyMatch(lock -> lock.getType() ==
TaskLockType.REPLACE)) {
- final int upgradeSegmentsDeleted =
metadataStorageCoordinator.deleteUpgradeSegmentsForTask(task.getId());
- log.info(
- "Deleted [%d] entries from upgradeSegments table for task[%s]
with REPLACE locks.",
- upgradeSegmentsDeleted,
- task.getId()
- );
+ try {
+ if (findLocksForTask(task).stream().anyMatch(lock -> lock.getType()
== TaskLockType.REPLACE)) {
+ final int upgradeSegmentsDeleted =
metadataStorageCoordinator.deleteUpgradeSegmentsForTask(task.getId());
+ log.info(
+ "Deleted [%d] entries from upgradeSegments table for task[%s]
with REPLACE locks.",
+ upgradeSegmentsDeleted,
+ task.getId()
+ );
+ }
+ if (task instanceof PendingSegmentAllocatingTask) {
Review Comment:
Leave a line empty before this block, and also add a comment.
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java:
##########
@@ -1159,6 +1182,11 @@ public void add(Task task)
try {
log.info("Adding task[%s] to activeTasks", task.getId());
activeTasks.add(task.getId());
+ if (task instanceof PendingSegmentAllocatingTask) {
+ final String pendingSegmentGroupId = ((PendingSegmentAllocatingTask)
task).getPendingSegmentGroupId();
+ activePendingTaskGroupToTaskIds.computeIfAbsent(pendingSegmentGroupId,
s -> new HashSet<>())
+ .add(task.getId());
+ }
Review Comment:
This whole logic can go into a private method as it is being used in
multiple places. Annotate the method`@GuardedBy("giant")`.
##########
server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java:
##########
@@ -281,99 +279,74 @@ public int markSegmentsAsUnusedWithinInterval(String
dataSource, Interval interv
}
/**
- * Fetches all the pending segments, whose interval overlaps with the given
- * search interval and has a sequence_name that begins with one of the
prefixes in sequenceNamePrefixFilter
- * from the metadata store. Returns a Map from the pending segment ID to the
sequence name.
+ * Fetches all the pending segments, whose interval overlaps with the given
search interval, from the metadata store.
*/
@VisibleForTesting
- Map<SegmentIdWithShardSpec, String> getPendingSegmentsForIntervalWithHandle(
+ List<PendingSegmentRecord> getPendingSegmentsForIntervalWithHandle(
final Handle handle,
final String dataSource,
- final Interval interval,
- final Set<String> sequenceNamePrefixFilter
- ) throws IOException
+ final Interval interval
+ )
{
- if (sequenceNamePrefixFilter.isEmpty()) {
- return Collections.emptyMap();
- }
-
- final List<String> sequenceNamePrefixes = new
ArrayList<>(sequenceNamePrefixFilter);
- final List<String> sequenceNamePrefixConditions = new ArrayList<>();
- for (int i = 0; i < sequenceNamePrefixes.size(); i++) {
- sequenceNamePrefixConditions.add(StringUtils.format("(sequence_name LIKE
:prefix%d)", i));
- }
+ boolean compareIntervalEndpointsAsStrings =
Intervals.canCompareEndpointsAsStrings(interval);
Review Comment:
```suggestion
final boolean compareIntervalEndpointsAsStrings =
Intervals.canCompareEndpointsAsStrings(interval);
```
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java:
##########
@@ -213,6 +220,16 @@ public int compare(Pair<Task, TaskLock> left, Pair<Task,
TaskLock> right)
activeTasks.remove(task.getId());
}
}
+ activePendingTaskGroupToTaskIds.clear();
+ for (Task task : storedActiveTasks) {
+ if (task instanceof PendingSegmentAllocatingTask) {
+ final String pendingSegmentAllocatingTask =
((PendingSegmentAllocatingTask) task).getPendingSegmentGroupId();
Review Comment:
```suggestion
final String allocatorId = ((PendingSegmentAllocatingTask)
task).getPendingSegmentGroupId();
```
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java:
##########
@@ -411,7 +428,12 @@ public LockResult tryLock(final Task task, final
LockRequest request)
newSegmentId
);
}
- newSegmentId = allocateSegmentId(lockRequestForNewSegment,
posseToUse.getTaskLock().getVersion());
+ final String pendingSegmentGroupId =
((PendingSegmentAllocatingTask) task).getPendingSegmentGroupId();
Review Comment:
`taskAllocatorId`.
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/PendingSegmentAllocatingTask.java:
##########
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task;
+
+/**
+ * An interface to be implemented by every appending task that allocates
pending segments.
+ */
+public interface PendingSegmentAllocatingTask
+{
+ /**
+ * Unique string used by an appending task (or its sub-tasks and replicas)
to allocate pending segments
+ * and identify pending segments allocated to it.
+ */
+ String getPendingSegmentGroupId();
Review Comment:
`getTaskAllocatorId`.
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java:
##########
@@ -1159,6 +1182,11 @@ public void add(Task task)
try {
log.info("Adding task[%s] to activeTasks", task.getId());
activeTasks.add(task.getId());
+ if (task instanceof PendingSegmentAllocatingTask) {
+ final String pendingSegmentGroupId = ((PendingSegmentAllocatingTask)
task).getPendingSegmentGroupId();
Review Comment:
Rename to `taskAllocatorId`.
##########
server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java:
##########
@@ -340,7 +344,8 @@ SegmentPublishResult commitAppendSegmentsAndMetadata(
Set<DataSegment> appendSegments,
Map<DataSegment, ReplaceTaskLock> appendSegmentToReplaceLock,
DataSourceMetadata startMetadata,
- DataSourceMetadata endMetadata
+ DataSourceMetadata endMetadata,
+ String taskGroup
Review Comment:
Rename to `taskAllocatorId`.
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java:
##########
@@ -1771,7 +1821,9 @@ SegmentCreateRequest getSegmentRequest()
action.getSequenceName(),
action.getPreviousSegmentId(),
acquiredLock == null ? lockRequest.getVersion() :
acquiredLock.getVersion(),
- action.getPartialShardSpec()
+ action.getPartialShardSpec(),
+ null,
+ ((PendingSegmentAllocatingTask) task).getPendingSegmentGroupId()
Review Comment:
Maybe assign this to a field `taskAllocatorId` in the constructor of
`SegmentAllocationHolder` itself, after performing an `instanceof` check.
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java:
##########
@@ -1176,13 +1204,35 @@ public void remove(final Task task)
try {
try {
log.info("Removing task[%s] from activeTasks", task.getId());
- if (findLocksForTask(task).stream().anyMatch(lock -> lock.getType() ==
TaskLockType.REPLACE)) {
- final int upgradeSegmentsDeleted =
metadataStorageCoordinator.deleteUpgradeSegmentsForTask(task.getId());
- log.info(
- "Deleted [%d] entries from upgradeSegments table for task[%s]
with REPLACE locks.",
- upgradeSegmentsDeleted,
- task.getId()
- );
+ try {
+ if (findLocksForTask(task).stream().anyMatch(lock -> lock.getType()
== TaskLockType.REPLACE)) {
Review Comment:
Add a comment before this block.
##########
server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java:
##########
@@ -476,4 +478,19 @@ SegmentPublishResult commitMetadataOnly(
* @return number of deleted entries from the metadata store
*/
int deleteUpgradeSegmentsForTask(String taskId);
+
+ /**
+ * Delete pending segment for a give task group after all the tasks
belonging to it have completed.
+ * @param taskGroup task group
+ * @return number of pending segments deleted from the metadata store
+ */
+ int deletePendingSegmentsForTaskGroup(String taskGroup);
+
+ /**
+ * Fetches all the pending segments present in the metadata store for a
given datasource
+ * @param datasource datasource to be queried
+ * @param interval interval with which segments overlap
+ * @return List of pending segment records
Review Comment:
Not really needed.
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java:
##########
@@ -1176,13 +1204,35 @@ public void remove(final Task task)
try {
try {
log.info("Removing task[%s] from activeTasks", task.getId());
- if (findLocksForTask(task).stream().anyMatch(lock -> lock.getType() ==
TaskLockType.REPLACE)) {
- final int upgradeSegmentsDeleted =
metadataStorageCoordinator.deleteUpgradeSegmentsForTask(task.getId());
- log.info(
- "Deleted [%d] entries from upgradeSegments table for task[%s]
with REPLACE locks.",
- upgradeSegmentsDeleted,
- task.getId()
- );
+ try {
+ if (findLocksForTask(task).stream().anyMatch(lock -> lock.getType()
== TaskLockType.REPLACE)) {
+ final int upgradeSegmentsDeleted =
metadataStorageCoordinator.deleteUpgradeSegmentsForTask(task.getId());
+ log.info(
+ "Deleted [%d] entries from upgradeSegments table for task[%s]
with REPLACE locks.",
+ upgradeSegmentsDeleted,
+ task.getId()
Review Comment:
```suggestion
"Deleted [%d] entries from upgradeSegments table for
task[%s] with REPLACE locks.",
upgradeSegmentsDeleted, task.getId()
```
##########
server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java:
##########
@@ -476,4 +478,19 @@ SegmentPublishResult commitMetadataOnly(
* @return number of deleted entries from the metadata store
*/
int deleteUpgradeSegmentsForTask(String taskId);
+
+ /**
+ * Delete pending segment for a give task group after all the tasks
belonging to it have completed.
+ * @param taskGroup task group
+ * @return number of pending segments deleted from the metadata store
+ */
+ int deletePendingSegmentsForTaskGroup(String taskGroup);
+
+ /**
+ * Fetches all the pending segments present in the metadata store for a
given datasource
Review Comment:
```suggestion
* Fetches all pending segments of the datasource that overlap with the
given interval.
```
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java:
##########
@@ -103,6 +103,11 @@ public class TaskLockbox
// this set should be accessed under the giant lock.
private final Set<String> activeTasks = new HashSet<>();
+ // Stores map of pending task group of tasks to the set of their ids.
Review Comment:
Please address this and also rename this map to
`activeAllocatorIdToTaskIds`. You need not declare it as a `HashMap`, just a
`Map` would suffice.
##########
server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java:
##########
@@ -281,99 +279,74 @@ public int markSegmentsAsUnusedWithinInterval(String
dataSource, Interval interv
}
/**
- * Fetches all the pending segments, whose interval overlaps with the given
- * search interval and has a sequence_name that begins with one of the
prefixes in sequenceNamePrefixFilter
- * from the metadata store. Returns a Map from the pending segment ID to the
sequence name.
+ * Fetches all the pending segments, whose interval overlaps with the given
search interval, from the metadata store.
*/
@VisibleForTesting
- Map<SegmentIdWithShardSpec, String> getPendingSegmentsForIntervalWithHandle(
+ List<PendingSegmentRecord> getPendingSegmentsForIntervalWithHandle(
final Handle handle,
final String dataSource,
- final Interval interval,
- final Set<String> sequenceNamePrefixFilter
- ) throws IOException
+ final Interval interval
+ )
{
- if (sequenceNamePrefixFilter.isEmpty()) {
- return Collections.emptyMap();
- }
-
- final List<String> sequenceNamePrefixes = new
ArrayList<>(sequenceNamePrefixFilter);
- final List<String> sequenceNamePrefixConditions = new ArrayList<>();
- for (int i = 0; i < sequenceNamePrefixes.size(); i++) {
- sequenceNamePrefixConditions.add(StringUtils.format("(sequence_name LIKE
:prefix%d)", i));
- }
+ boolean compareIntervalEndpointsAsStrings =
Intervals.canCompareEndpointsAsStrings(interval);
- String sql = "SELECT sequence_name, payload"
+ String sql = "SELECT payload, sequence_name, sequence_prev_id,
task_allocator_id, upgraded_from_segment_id"
+ " FROM " + dbTables.getPendingSegmentsTable()
- + " WHERE dataSource = :dataSource"
- + " AND start < :end"
- + StringUtils.format(" AND %1$send%1$s > :start",
connector.getQuoteString())
- + " AND ( " + String.join(" OR ",
sequenceNamePrefixConditions) + " )";
+ + " WHERE dataSource = :dataSource";
+ if (compareIntervalEndpointsAsStrings) {
+ sql = sql
+ + " AND start < :end"
+ + StringUtils.format(" AND %1$send%1$s > :start",
connector.getQuoteString());
+ }
Query<Map<String, Object>> query = handle.createQuery(sql)
- .bind("dataSource", dataSource)
- .bind("start",
interval.getStart().toString())
- .bind("end",
interval.getEnd().toString());
-
- for (int i = 0; i < sequenceNamePrefixes.size(); i++) {
- query.bind(StringUtils.format("prefix%d", i),
sequenceNamePrefixes.get(i) + "%");
+ .bind("dataSource", dataSource);
+ if (compareIntervalEndpointsAsStrings) {
+ query = query.bind("start", interval.getStart().toString())
+ .bind("end", interval.getEnd().toString());
}
- final ResultIterator<PendingSegmentsRecord> dbSegments =
- query.map((index, r, ctx) -> PendingSegmentsRecord.fromResultSet(r))
- .iterator();
- final Map<SegmentIdWithShardSpec, String> pendingSegmentToSequenceName =
new HashMap<>();
- while (dbSegments.hasNext()) {
- PendingSegmentsRecord record = dbSegments.next();
- final SegmentIdWithShardSpec identifier =
jsonMapper.readValue(record.payload, SegmentIdWithShardSpec.class);
-
- if (interval.overlaps(identifier.getInterval())) {
- pendingSegmentToSequenceName.put(identifier, record.sequenceName);
+ final ResultIterator<PendingSegmentRecord> pendingSegmentIterator =
+ query.map((index, r, ctx) -> PendingSegmentRecord.fromResultSet(r,
jsonMapper))
+ .iterator();
+ final ImmutableList.Builder<PendingSegmentRecord> pendingSegments =
ImmutableList.builder();
+ while (pendingSegmentIterator.hasNext()) {
+ final PendingSegmentRecord pendingSegment =
pendingSegmentIterator.next();
+ if (compareIntervalEndpointsAsStrings ||
pendingSegment.getId().getInterval().overlaps(interval)) {
+ pendingSegments.add(pendingSegment);
}
}
-
- dbSegments.close();
-
- return pendingSegmentToSequenceName;
+ pendingSegmentIterator.close();
+ return pendingSegments.build();
}
- private Map<SegmentIdWithShardSpec, String>
getPendingSegmentsForIntervalWithHandle(
+ List<PendingSegmentRecord> getPendingSegmentsForTaskAllocatorIdWithHandle(
Review Comment:
Should this be private?
##########
server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java:
##########
@@ -476,4 +478,19 @@ SegmentPublishResult commitMetadataOnly(
* @return number of deleted entries from the metadata store
*/
int deleteUpgradeSegmentsForTask(String taskId);
+
+ /**
+ * Delete pending segment for a give task group after all the tasks
belonging to it have completed.
+ * @param taskGroup task group
+ * @return number of pending segments deleted from the metadata store
+ */
+ int deletePendingSegmentsForTaskGroup(String taskGroup);
Review Comment:
```suggestion
int deletePendingSegmentsForAllocatorId(String taskAllocatorId);
```
Please ensure that there are no places in the code that continue to refer to
`pendingSegmentGroup`. All occurrences should be replaced with
`taskAllocatorId`.
##########
server/src/main/java/org/apache/druid/metadata/PendingSegmentRecord.java:
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.metadata;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
+import com.google.common.io.BaseEncoding;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.sql.ResultSet;
+
+/**
+ * Representation of a record in the pending segments table. <br/>
+ * Mapping of column in table to field:
+ * <li/> id -> id (Unique identifier for pending segment)
+ * <li/> sequence_name -> sequenceName (sequence name used for segment
allocation)
+ * <li/> sequence_prev_id -> sequencePrevId (previous segment id used for
segment allocation)
+ * <li/> upgraded_from_segment_id -> upgradedFromSegmentId (Id of the root
segment from which this was upgraded)
+ * <li/> task_allocator_id -> taskAllocatorId (Associates a task / task group
/ replica group with the pending segment)
+ */
+public class PendingSegmentRecord
+{
+ private final SegmentIdWithShardSpec id;
+ private final String sequenceName;
+ private final String sequencePrevId;
+ private final String upgradedFromSegmentId;
+ private final String taskAllocatorId;
+
+ public PendingSegmentRecord(
+ SegmentIdWithShardSpec id,
+ String sequenceName,
+ String sequencePrevId,
+ @Nullable String upgradedFromSegmentId,
+ @Nullable String taskAllocatorId
+ )
+ {
+ this.id = id;
+ this.sequenceName = sequenceName;
+ this.sequencePrevId = sequencePrevId;
+ this.upgradedFromSegmentId = upgradedFromSegmentId;
+ this.taskAllocatorId = taskAllocatorId;
+ }
+
+ public SegmentIdWithShardSpec getId()
+ {
+ return id;
+ }
+
+ public String getSequenceName()
+ {
+ return sequenceName;
+ }
+
+ public String getSequencePrevId()
+ {
+ return sequencePrevId;
+ }
+
+ /**
+ * The original pending segment using which this upgraded segment was
created.
+ * Corresponds to the column upgraded_from_segment_id in
druid_pendingSegments.
+ * Can be null for pending segments allocated before this column was added
or for segments that have not been upgraded.
+ */
+ @Nullable
+ public String getUpgradedFromSegmentId()
+ {
+ return upgradedFromSegmentId;
+ }
+
+ /**
+ * task / taskGroup / replica group of task that allocated this segment.
+ * Corresponds to the column task_allocator_id in druid_pendingSegments.
Review Comment:
This line is not needed. It is self explanatory.
```suggestion
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]