kfaraz commented on code in PR #16162:
URL: https://github.com/apache/druid/pull/16162#discussion_r1578807492
##########
server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java:
##########
@@ -377,7 +388,8 @@ public void onSuccess(Object result)
new SegmentsAndCommitMetadata(
segments,
((AppenderatorDriverMetadata)
metadata).getCallerMetadata(),
-
segmentsAndCommitMetadata.getSegmentSchemaMapping()
+
segmentsAndCommitMetadata.getSegmentSchemaMapping(),
+ upgradedSegments
Review Comment:
```suggestion
segmentsAndCommitMetadata.getUpgradedSegments()
```
##########
server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java:
##########
@@ -1334,6 +1372,18 @@ private SegmentPublishResult
commitAppendSegmentsAndMetadataInTransaction(
}
insertIntoUpgradeSegmentsTable(handle, appendSegmentToReplaceLock);
+
+ // Delete the pending segments to be committed in this transaction
in batches of at most 100
+ final List<List<String>> pendingSegmentIdBatches = Lists.partition(
+ allSegmentsToInsert.stream()
+ .map(pendingSegment ->
pendingSegment.getId().toString())
+ .collect(Collectors.toList()),
+ 100
+ );
+ for (List<String> pendingSegmentIdBatch : pendingSegmentIdBatches)
{
+ deletePendingSegmentsById(handle, dataSource,
pendingSegmentIdBatch);
Review Comment:
Maybe add an info log for total count of pending segment records deleted.
##########
server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java:
##########
@@ -221,10 +221,8 @@ public void testFailDuringDrop() throws IOException,
InterruptedException, Timeo
Assert.assertNull(driver.startJob(null));
- for (int i = 0; i < ROWS.size(); i++) {
- committerSupplier.setMetadata(i + 1);
- Assert.assertTrue(driver.add(ROWS.get(i), "dummy", committerSupplier,
false, true).isOk());
- }
+ committerSupplier.setMetadata(1);
Review Comment:
Can't we control the order in the test?
##########
server/src/main/java/org/apache/druid/segment/realtime/appenderator/SegmentsAndCommitMetadata.java:
##########
@@ -20,28 +20,50 @@
package org.apache.druid.segment.realtime.appenderator;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
import org.apache.druid.segment.SegmentUtils;
import org.apache.druid.timeline.DataSegment;
import javax.annotation.Nullable;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
+import java.util.Set;
public class SegmentsAndCommitMetadata
{
private static final SegmentsAndCommitMetadata NIL = new
SegmentsAndCommitMetadata(Collections.emptyList(), null);
private final Object commitMetadata;
private final ImmutableList<DataSegment> segments;
+ private final ImmutableSet<DataSegment> upgradedSegments;
Review Comment:
Better to have this as a javadoc on `getUpgradedSegments()`.
##########
server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java:
##########
@@ -1368,6 +1399,10 @@ private Object bootstrapSinksFromDisk()
);
rowsSoFar += currSink.getNumRows();
sinks.put(identifier, currSink);
+ idToPendingSegment.put(identifier.asSegmentId().toString(),
identifier);
Review Comment:
Need a comment here.
##########
server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java:
##########
@@ -166,8 +168,13 @@ public class StreamAppenderator implements Appenderator
private final AtomicBoolean closed = new AtomicBoolean(false);
- private final ConcurrentHashMap<SegmentId, Set<SegmentIdWithShardSpec>>
- baseSegmentToUpgradedVersions = new ConcurrentHashMap<>();
+ private final ConcurrentMap<SegmentIdWithShardSpec,
Set<SegmentIdWithShardSpec>> baseSegmentToUpgradedSegments
+ = new ConcurrentHashMap<>();
+ private final ConcurrentMap<SegmentIdWithShardSpec, SegmentIdWithShardSpec>
upgradedSegmentToBaseSegment
+ = new ConcurrentHashMap<>();
+ private final ConcurrentHashMap.KeySetView<SegmentIdWithShardSpec, Boolean>
abandonedSegments
Review Comment:
This needs a javadoc.
##########
server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java:
##########
@@ -1098,16 +1104,16 @@ SinkSchemaAnnouncer getSinkSchemaAnnouncer()
/**
* Unannounces the given base segment and all its upgraded versions.
*/
- private void unannounceAllVersionsOfSegment(DataSegment baseSegment) throws
IOException
+ private void unannounceAllVersionsOfSegment(DataSegment baseSegment)
{
- segmentAnnouncer.unannounceSegment(baseSegment);
-
- final Set<SegmentIdWithShardSpec> upgradedVersionsOfSegment
- = baseSegmentToUpgradedVersions.remove(baseSegment.getId());
- if (upgradedVersionsOfSegment == null ||
upgradedVersionsOfSegment.isEmpty()) {
+ final SegmentIdWithShardSpec baseId =
SegmentIdWithShardSpec.fromDataSegment(baseSegment);
+ if (!baseSegmentToUpgradedSegments.containsKey(baseId)) {
return;
}
+ final List<SegmentIdWithShardSpec> upgradedVersionsOfSegment
+ = ImmutableList.copyOf(baseSegmentToUpgradedSegments.get(baseId));
Review Comment:
Why do we need to make a copy? Why can't we use the original set itself?
##########
server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java:
##########
@@ -528,6 +535,10 @@ private Sink getOrCreateSink(final SegmentIdWithShardSpec
identifier)
}
sinks.put(identifier, retVal);
+ idToPendingSegment.put(identifier.asSegmentId().toString(), identifier);
+ baseSegmentToUpgradedSegments.put(identifier, new HashSet<>());
+ baseSegmentToUpgradedSegments.get(identifier).add(identifier);
Review Comment:
Do we want to clear the set even if this base segment was already mapped to
some other segments?
Does the set of upgraded segments need to contain self too?
```suggestion
baseSegmentToUpgradedSegments.computeIfAbsent(identifier, i -> new
HashSet<>()).add(identifier);
```
##########
server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java:
##########
@@ -366,7 +373,11 @@ public void onSuccess(Object result)
{
if (numRemainingHandoffSegments.decrementAndGet() == 0) {
List<DataSegment> segments =
segmentsAndCommitMetadata.getSegments();
+ Set<DataSegment> upgradedSegments =
segmentsAndCommitMetadata.getUpgradedSegments();
log.info("Successfully handed off [%d] segments.",
segments.size());
+ if (upgradedSegments != null) {
+ log.info("Successfully handed off [%d] upgraded
segments.", upgradedSegments.size());
+ }
Review Comment:
Maybe just use `segmentsToBeHandedOff.size()` and remove the unnecessary
references to `segmentsAndCommitMetadata`.
```suggestion
log.info("Successfully handed off [%d] segments.",
segmentsToBeHandedOff.size());
```
##########
server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java:
##########
@@ -1098,16 +1104,16 @@ SinkSchemaAnnouncer getSinkSchemaAnnouncer()
/**
* Unannounces the given base segment and all its upgraded versions.
*/
- private void unannounceAllVersionsOfSegment(DataSegment baseSegment) throws
IOException
+ private void unannounceAllVersionsOfSegment(DataSegment baseSegment)
{
- segmentAnnouncer.unannounceSegment(baseSegment);
-
- final Set<SegmentIdWithShardSpec> upgradedVersionsOfSegment
- = baseSegmentToUpgradedVersions.remove(baseSegment.getId());
Review Comment:
Why don't we need to`remove` from map anymore?
##########
server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java:
##########
@@ -201,7 +201,7 @@ public void testFailDuringDrop() throws IOException,
InterruptedException, Timeo
expectedException.expect(ExecutionException.class);
expectedException.expectCause(CoreMatchers.instanceOf(ISE.class));
expectedException.expectMessage(
- "Fail test while dropping
segment[foo_2000-01-01T00:00:00.000Z_2000-01-01T01:00:00.000Z_abc123]"
Review Comment:
Why don't we have the segment ID in the message anymore?
##########
server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java:
##########
@@ -1402,6 +1437,17 @@ private ListenableFuture<?> abandonSegment(
final boolean removeOnDiskData
)
{
+ abandonedSegments.add(identifier);
+ final SegmentIdWithShardSpec baseIdentifier =
upgradedSegmentToBaseSegment.getOrDefault(identifier, identifier);
+ synchronized (sink) {
+ if (baseSegmentToUpgradedSegments.containsKey(baseIdentifier)) {
Review Comment:
Need a comment here.
##########
server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java:
##########
@@ -1138,21 +1164,26 @@ public void registerNewVersionOfPendingSegment(
// Announce segments
final DataSegment baseSegment = sinks.get(basePendingSegment).getSegment();
+ final DataSegment newSegment = getUpgradedSegment(baseSegment,
newSegmentVersion);
- final DataSegment newSegment = new DataSegment(
- newSegmentVersion.getDataSource(),
- newSegmentVersion.getInterval(),
- newSegmentVersion.getVersion(),
+ segmentAnnouncer.announceSegment(newSegment);
+
baseSegmentToUpgradedSegments.get(basePendingSegment).add(newSegmentVersion);
+ upgradedSegmentToBaseSegment.put(newSegmentVersion, basePendingSegment);
Review Comment:
These two operations can be put inside a method. Also, I guess it is better
to use `computeIfAbsent` for `baseSegmentToUpgradedSegments`.
##########
server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java:
##########
@@ -528,6 +535,10 @@ private Sink getOrCreateSink(final SegmentIdWithShardSpec
identifier)
}
sinks.put(identifier, retVal);
+ idToPendingSegment.put(identifier.asSegmentId().toString(), identifier);
Review Comment:
Need some comments here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]